mirror of
				https://github.com/LonamiWebs/Telethon.git
				synced 2025-11-04 01:47:27 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			379 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			379 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import asyncio
 | 
						|
import collections
 | 
						|
import functools
 | 
						|
import inspect
 | 
						|
import os
 | 
						|
import re
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import tkinter
 | 
						|
import tkinter.constants
 | 
						|
import tkinter.scrolledtext
 | 
						|
import tkinter.ttk
 | 
						|
 | 
						|
from telethon import TelegramClient, events, utils
 | 
						|
 | 
						|
# Some configuration for the app
 | 
						|
TITLE = 'Telethon GUI'
 | 
						|
SIZE = '640x280'
 | 
						|
REPLY = re.compile(r'\.r\s*(\d+)\s*(.+)', re.IGNORECASE)
 | 
						|
DELETE = re.compile(r'\.d\s*(\d+)', re.IGNORECASE)
 | 
						|
EDIT = re.compile(r'\.s(.+?[^\\])/(.*)', re.IGNORECASE)
 | 
						|
 | 
						|
 | 
						|
def get_env(name, message, cast=str):
 | 
						|
    if name in os.environ:
 | 
						|
        return os.environ[name]
 | 
						|
    while True:
 | 
						|
        value = input(message)
 | 
						|
        try:
 | 
						|
            return cast(value)
 | 
						|
        except ValueError as e:
 | 
						|
            print(e, file=sys.stderr)
 | 
						|
            time.sleep(1)
 | 
						|
 | 
						|
 | 
						|
# Session name, API ID and hash to use; loaded from environmental variables
 | 
						|
SESSION = os.environ.get('TG_SESSION', 'gui')
 | 
						|
API_ID = get_env('TG_API_ID', 'Enter your API ID: ', int)
 | 
						|
API_HASH = get_env('TG_API_HASH', 'Enter your API hash: ')
 | 
						|
 | 
						|
 | 
						|
def sanitize_str(string):
 | 
						|
    return ''.join(x if ord(x) <= 0xffff else
 | 
						|
                   '{{{:x}ū}}'.format(ord(x)) for x in string)
 | 
						|
 | 
						|
 | 
						|
def callback(func):
 | 
						|
    """
 | 
						|
    This decorator turns `func` into a callback for Tkinter
 | 
						|
    to be able to use, even if `func` is an awaitable coroutine.
 | 
						|
    """
 | 
						|
    @functools.wraps(func)
 | 
						|
    def wrapped(*args, **kwargs):
 | 
						|
        result = func(*args, **kwargs)
 | 
						|
        if inspect.iscoroutine(result):
 | 
						|
            aio_loop.create_task(result)
 | 
						|
 | 
						|
    return wrapped
 | 
						|
 | 
						|
 | 
						|
def allow_copy(widget):
 | 
						|
    """
 | 
						|
    This helper makes `widget` readonly but allows copying with ``Ctrl+C``.
 | 
						|
    """
 | 
						|
    widget.bind('<Control-c>', lambda e: None)
 | 
						|
    widget.bind('<Key>', lambda e: "break")
 | 
						|
 | 
						|
 | 
						|
class App(tkinter.Tk):
 | 
						|
    """
 | 
						|
    Our main GUI application; we subclass `tkinter.Tk`
 | 
						|
    so the `self` instance can be the root widget.
 | 
						|
 | 
						|
    One must be careful when assigning members or
 | 
						|
    defining methods since those may interfer with
 | 
						|
    the root widget.
 | 
						|
 | 
						|
    You may prefer to have ``App.root = tkinter.Tk()``
 | 
						|
    and create widgets with ``self.root`` as parent.
 | 
						|
    """
 | 
						|
    def __init__(self, client, *args, **kwargs):
 | 
						|
        super().__init__(*args, **kwargs)
 | 
						|
        self.cl = client
 | 
						|
        self.me = None
 | 
						|
 | 
						|
        self.title(TITLE)
 | 
						|
        self.geometry(SIZE)
 | 
						|
 | 
						|
        # Signing in row; the entry supports phone and bot token
 | 
						|
        self.sign_in_label = tkinter.Label(self, text='Loading...')
 | 
						|
        self.sign_in_label.grid(row=0, column=0)
 | 
						|
        self.sign_in_entry = tkinter.Entry(self)
 | 
						|
        self.sign_in_entry.grid(row=0, column=1, sticky=tkinter.EW)
 | 
						|
        self.sign_in_entry.bind('<Return>', self.sign_in)
 | 
						|
        self.sign_in_button = tkinter.Button(self, text='...',
 | 
						|
                                             command=self.sign_in)
 | 
						|
        self.sign_in_button.grid(row=0, column=2)
 | 
						|
        self.code = None
 | 
						|
 | 
						|
        # The chat where to send and show messages from
 | 
						|
        tkinter.Label(self, text='Target chat:').grid(row=1, column=0)
 | 
						|
        self.chat = tkinter.Entry(self)
 | 
						|
        self.chat.grid(row=1, column=1, columnspan=2, sticky=tkinter.EW)
 | 
						|
        self.columnconfigure(1, weight=1)
 | 
						|
        self.chat.bind('<Return>', self.check_chat)
 | 
						|
        self.chat.bind('<FocusOut>', self.check_chat)
 | 
						|
        self.chat.focus()
 | 
						|
        self.chat_id = None
 | 
						|
 | 
						|
        # Message log (incoming and outgoing); we configure it as readonly
 | 
						|
        self.log = tkinter.scrolledtext.ScrolledText(self)
 | 
						|
        allow_copy(self.log)
 | 
						|
        self.log.grid(row=2, column=0, columnspan=3, sticky=tkinter.NSEW)
 | 
						|
        self.rowconfigure(2, weight=1)
 | 
						|
        self.cl.add_event_handler(self.on_message, events.NewMessage)
 | 
						|
 | 
						|
        # Save shown message IDs to support replying with ".rN reply"
 | 
						|
        # For instance to reply to the last message ".r1 this is a reply"
 | 
						|
        # Deletion also works with ".dN".
 | 
						|
        self.message_ids = []
 | 
						|
 | 
						|
        # Save the sent texts to allow editing with ".s text/replacement"
 | 
						|
        # For instance to edit the last "hello" with "bye" ".s hello/bye"
 | 
						|
        self.sent_text = collections.deque(maxlen=10)
 | 
						|
 | 
						|
        # Sending messages
 | 
						|
        tkinter.Label(self, text='Message:').grid(row=3, column=0)
 | 
						|
        self.message = tkinter.Entry(self)
 | 
						|
        self.message.grid(row=3, column=1, sticky=tkinter.EW)
 | 
						|
        self.message.bind('<Return>', self.send_message)
 | 
						|
        tkinter.Button(self, text='Send',
 | 
						|
                       command=self.send_message).grid(row=3, column=2)
 | 
						|
 | 
						|
        # Post-init (async, connect client)
 | 
						|
        self.cl.loop.create_task(self.post_init())
 | 
						|
 | 
						|
    async def post_init(self):
 | 
						|
        """
 | 
						|
        Completes the initialization of our application.
 | 
						|
        Since `__init__` cannot be `async` we use this.
 | 
						|
        """
 | 
						|
        if await self.cl.is_user_authorized():
 | 
						|
            self.set_signed_in(await self.cl.get_me())
 | 
						|
        else:
 | 
						|
            # User is not logged in, configure the button to ask them to login
 | 
						|
            self.sign_in_button.configure(text='Sign in')
 | 
						|
            self.sign_in_label.configure(
 | 
						|
                text='Sign in (phone/token):')
 | 
						|
 | 
						|
    async def on_message(self, event):
 | 
						|
        """
 | 
						|
        Event handler that will add new messages to the message log.
 | 
						|
        """
 | 
						|
        # We want to show only messages sent to this chat
 | 
						|
        if event.chat_id != self.chat_id:
 | 
						|
            return
 | 
						|
 | 
						|
        # Save the message ID so we know which to reply to
 | 
						|
        self.message_ids.append(event.id)
 | 
						|
 | 
						|
        # Decide a prefix (">> " for our messages, "<user>" otherwise)
 | 
						|
        if event.out:
 | 
						|
            text = '>> '
 | 
						|
        else:
 | 
						|
            sender = await event.get_sender()
 | 
						|
            text = '<{}> '.format(sanitize_str(
 | 
						|
                utils.get_display_name(sender)))
 | 
						|
 | 
						|
        # If the message has media show "(MediaType) "
 | 
						|
        if event.media:
 | 
						|
            text += '({}) '.format(event.media.__class__.__name__)
 | 
						|
 | 
						|
        text += sanitize_str(event.text)
 | 
						|
        text += '\n'
 | 
						|
 | 
						|
        # Append the text to the end with a newline, and scroll to the end
 | 
						|
        self.log.insert(tkinter.END, text)
 | 
						|
        self.log.yview(tkinter.END)
 | 
						|
 | 
						|
    # noinspection PyUnusedLocal
 | 
						|
    @callback
 | 
						|
    async def sign_in(self, event=None):
 | 
						|
        """
 | 
						|
        Note the `event` argument. This is required since this callback
 | 
						|
        may be called from a ``widget.bind`` (such as ``'<Return>'``),
 | 
						|
        which sends information about the event we don't care about.
 | 
						|
 | 
						|
        This callback logs out if authorized, signs in if a code was
 | 
						|
        sent or a bot token is input, or sends the code otherwise.
 | 
						|
        """
 | 
						|
        self.sign_in_label.configure(text='Working...')
 | 
						|
        self.sign_in_entry.configure(state=tkinter.DISABLED)
 | 
						|
        if await self.cl.is_user_authorized():
 | 
						|
            await self.cl.log_out()
 | 
						|
            self.destroy()
 | 
						|
            return
 | 
						|
 | 
						|
        value = self.sign_in_entry.get().strip()
 | 
						|
        if self.code:
 | 
						|
            self.set_signed_in(await self.cl.sign_in(code=value))
 | 
						|
        elif ':' in value:
 | 
						|
            self.set_signed_in(await self.cl.sign_in(bot_token=value))
 | 
						|
        else:
 | 
						|
            self.code = await self.cl.send_code_request(value)
 | 
						|
            self.sign_in_label.configure(text='Code:')
 | 
						|
            self.sign_in_entry.configure(state=tkinter.NORMAL)
 | 
						|
            self.sign_in_entry.delete(0, tkinter.END)
 | 
						|
            self.sign_in_entry.focus()
 | 
						|
            return
 | 
						|
 | 
						|
    def set_signed_in(self, me):
 | 
						|
        """
 | 
						|
        Configures the application as "signed in" (displays user's
 | 
						|
        name and disables the entry to input phone/bot token/code).
 | 
						|
        """
 | 
						|
        self.me = me
 | 
						|
        self.sign_in_label.configure(text='Signed in')
 | 
						|
        self.sign_in_entry.configure(state=tkinter.NORMAL)
 | 
						|
        self.sign_in_entry.delete(0, tkinter.END)
 | 
						|
        self.sign_in_entry.insert(tkinter.INSERT, utils.get_display_name(me))
 | 
						|
        self.sign_in_entry.configure(state=tkinter.DISABLED)
 | 
						|
        self.sign_in_button.configure(text='Log out')
 | 
						|
        self.chat.focus()
 | 
						|
 | 
						|
    # noinspection PyUnusedLocal
 | 
						|
    @callback
 | 
						|
    async def send_message(self, event=None):
 | 
						|
        """
 | 
						|
        Sends a message. Does nothing if the client is not connected.
 | 
						|
        """
 | 
						|
        if not self.cl.is_connected():
 | 
						|
            return
 | 
						|
 | 
						|
        # The user needs to configure a chat where the message should be sent.
 | 
						|
        #
 | 
						|
        # If the chat ID does not exist, it was not valid and the user must
 | 
						|
        # configure one; hint them by changing the background to red.
 | 
						|
        if not self.chat_id:
 | 
						|
            self.chat.configure(bg='red')
 | 
						|
            self.chat.focus()
 | 
						|
            return
 | 
						|
 | 
						|
        # Get the message, clear the text field and focus it again
 | 
						|
        text = self.message.get().strip()
 | 
						|
        self.message.delete(0, tkinter.END)
 | 
						|
        self.message.focus()
 | 
						|
        if not text:
 | 
						|
            return
 | 
						|
 | 
						|
        # NOTE: This part is optional but supports editing messages
 | 
						|
        #       You can remove it if you find it too complicated.
 | 
						|
        #
 | 
						|
        # Check if the edit matches any text
 | 
						|
        m = EDIT.match(text)
 | 
						|
        if m:
 | 
						|
            find = re.compile(m.group(1).lstrip())
 | 
						|
            # Cannot reversed(enumerate(...)), use index
 | 
						|
            for i in reversed(range(len(self.sent_text))):
 | 
						|
                msg_id, msg_text = self.sent_text[i]
 | 
						|
                if find.search(msg_text):
 | 
						|
                    # Found text to replace, so replace it and edit
 | 
						|
                    new = find.sub(m.group(2), msg_text)
 | 
						|
                    self.sent_text[i] = (msg_id, new)
 | 
						|
                    await self.cl.edit_message(self.chat_id, msg_id, new)
 | 
						|
 | 
						|
                    # Notify that a replacement was made
 | 
						|
                    self.log.insert(tkinter.END, '(message edited: {} -> {})\n'
 | 
						|
                                    .format(msg_text, new))
 | 
						|
                    self.log.yview(tkinter.END)
 | 
						|
                    return
 | 
						|
 | 
						|
        # Check if we want to delete the message
 | 
						|
        m = DELETE.match(text)
 | 
						|
        if m:
 | 
						|
            try:
 | 
						|
                delete = self.message_ids.pop(-int(m.group(1)))
 | 
						|
            except IndexError:
 | 
						|
                pass
 | 
						|
            else:
 | 
						|
                await self.cl.delete_messages(self.chat_id, delete)
 | 
						|
                # Notify that a message was deleted
 | 
						|
                self.log.insert(tkinter.END, '(message deleted)\n')
 | 
						|
                self.log.yview(tkinter.END)
 | 
						|
                return
 | 
						|
 | 
						|
        # Check if we want to reply to some message
 | 
						|
        reply_to = None
 | 
						|
        m = REPLY.match(text)
 | 
						|
        if m:
 | 
						|
            text = m.group(2)
 | 
						|
            try:
 | 
						|
                reply_to = self.message_ids[-int(m.group(1))]
 | 
						|
            except IndexError:
 | 
						|
                pass
 | 
						|
 | 
						|
        # NOTE: This part is no longer optional. It sends the message.
 | 
						|
        # Send the message text and get back the sent message object
 | 
						|
        message = await self.cl.send_message(self.chat_id, text,
 | 
						|
                                             reply_to=reply_to)
 | 
						|
 | 
						|
        # Save the sent message ID and text to allow edits
 | 
						|
        self.sent_text.append((message.id, text))
 | 
						|
 | 
						|
        # Process the sent message as if it were an event
 | 
						|
        await self.on_message(message)
 | 
						|
 | 
						|
    # noinspection PyUnusedLocal
 | 
						|
    @callback
 | 
						|
    async def check_chat(self, event=None):
 | 
						|
        """
 | 
						|
        Checks the input chat where to send and listen messages from.
 | 
						|
        """
 | 
						|
        if self.me is None:
 | 
						|
            return  # Not logged in yet
 | 
						|
 | 
						|
        chat = self.chat.get().strip()
 | 
						|
        try:
 | 
						|
            chat = int(chat)
 | 
						|
        except ValueError:
 | 
						|
            pass
 | 
						|
 | 
						|
        try:
 | 
						|
            old = self.chat_id
 | 
						|
            # Valid chat ID, set it and configure the colour back to white
 | 
						|
            self.chat_id = await self.cl.get_peer_id(chat)
 | 
						|
            self.chat.configure(bg='white')
 | 
						|
 | 
						|
            # If the chat ID changed, clear the
 | 
						|
            # messages that we could edit or reply
 | 
						|
            if self.chat_id != old:
 | 
						|
                self.message_ids.clear()
 | 
						|
                self.sent_text.clear()
 | 
						|
                self.log.delete('1.0', tkinter.END)
 | 
						|
                if not self.me.bot:
 | 
						|
                    for msg in reversed(
 | 
						|
                            await self.cl.get_messages(self.chat_id, 100)):
 | 
						|
                        await self.on_message(msg)
 | 
						|
        except ValueError:
 | 
						|
            # Invalid chat ID, let the user know with a yellow background
 | 
						|
            self.chat_id = None
 | 
						|
            self.chat.configure(bg='yellow')
 | 
						|
 | 
						|
 | 
						|
async def main(loop, interval=0.05):
 | 
						|
    client = TelegramClient(SESSION, API_ID, API_HASH, loop=loop)
 | 
						|
    try:
 | 
						|
        await client.connect()
 | 
						|
    except Exception as e:
 | 
						|
        print('Failed to connect', e, file=sys.stderr)
 | 
						|
        return
 | 
						|
 | 
						|
    app = App(client)
 | 
						|
    try:
 | 
						|
        while True:
 | 
						|
            # We want to update the application but get back
 | 
						|
            # to asyncio's event loop. For this we sleep a
 | 
						|
            # short time so the event loop can run.
 | 
						|
            #
 | 
						|
            # https://www.reddit.com/r/Python/comments/33ecpl
 | 
						|
            app.update()
 | 
						|
            await asyncio.sleep(interval)
 | 
						|
    except KeyboardInterrupt:
 | 
						|
        pass
 | 
						|
    except tkinter.TclError as e:
 | 
						|
        if 'application has been destroyed' not in e.args[0]:
 | 
						|
            raise
 | 
						|
    finally:
 | 
						|
        app.cl.disconnect()
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    # Some boilerplate code to set up the main method
 | 
						|
    aio_loop = asyncio.get_event_loop()
 | 
						|
    try:
 | 
						|
        aio_loop.run_until_complete(main(aio_loop))
 | 
						|
    finally:
 | 
						|
        if not aio_loop.is_closed():
 | 
						|
            aio_loop.close()
 |