mirror of
				https://github.com/LonamiWebs/Telethon.git
				synced 2025-11-04 09:57:29 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			643 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			643 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import asyncio
 | 
						|
import inspect
 | 
						|
import itertools
 | 
						|
import random
 | 
						|
import time
 | 
						|
import typing
 | 
						|
 | 
						|
from .. import events, utils, errors
 | 
						|
from ..events.common import EventBuilder, EventCommon
 | 
						|
from ..tl import types, functions
 | 
						|
 | 
						|
if typing.TYPE_CHECKING:
 | 
						|
    from .telegramclient import TelegramClient
 | 
						|
 | 
						|
 | 
						|
class UpdateMethods:
 | 
						|
 | 
						|
    # region Public methods
 | 
						|
 | 
						|
    async def _run_until_disconnected(self: 'TelegramClient'):
 | 
						|
        try:
 | 
						|
            # Make a high-level request to notify that we want updates
 | 
						|
            await self(functions.updates.GetStateRequest())
 | 
						|
            return await self.disconnected
 | 
						|
        except KeyboardInterrupt:
 | 
						|
            pass
 | 
						|
        finally:
 | 
						|
            await self.disconnect()
 | 
						|
 | 
						|
    def run_until_disconnected(self: 'TelegramClient'):
 | 
						|
        """
 | 
						|
        Runs the event loop until the library is disconnected.
 | 
						|
 | 
						|
        It also notifies Telegram that we want to receive updates
 | 
						|
        as described in https://core.telegram.org/api/updates.
 | 
						|
 | 
						|
        Manual disconnections can be made by calling `disconnect()
 | 
						|
        <telethon.client.telegrambaseclient.TelegramBaseClient.disconnect>`
 | 
						|
        or sending a ``KeyboardInterrupt`` (e.g. by pressing ``Ctrl+C`` on
 | 
						|
        the console window running the script).
 | 
						|
 | 
						|
        If a disconnection error occurs (i.e. the library fails to reconnect
 | 
						|
        automatically), said error will be raised through here, so you have a
 | 
						|
        chance to ``except`` it on your own code.
 | 
						|
 | 
						|
        If the loop is already running, this method returns a coroutine
 | 
						|
        that you should await on your own code.
 | 
						|
 | 
						|
        .. note::
 | 
						|
 | 
						|
            If you want to handle ``KeyboardInterrupt`` in your code,
 | 
						|
            simply run the event loop in your code too in any way, such as
 | 
						|
            ``loop.run_forever()`` or ``await client.disconnected`` (e.g.
 | 
						|
            ``loop.run_until_complete(client.disconnected)``).
 | 
						|
 | 
						|
        Example
 | 
						|
            .. code-block:: python
 | 
						|
 | 
						|
                # Blocks the current task here until a disconnection occurs.
 | 
						|
                #
 | 
						|
                # You will still receive updates, since this prevents the
 | 
						|
                # script from exiting.
 | 
						|
                await client.run_until_disconnected()
 | 
						|
        """
 | 
						|
        if self.loop.is_running():
 | 
						|
            return self._run_until_disconnected()
 | 
						|
        try:
 | 
						|
            return self.loop.run_until_complete(self._run_until_disconnected())
 | 
						|
        except KeyboardInterrupt:
 | 
						|
            pass
 | 
						|
        finally:
 | 
						|
            # No loop.run_until_complete; it's already syncified
 | 
						|
            self.disconnect()
 | 
						|
 | 
						|
    def on(self: 'TelegramClient', event: EventBuilder):
 | 
						|
        """
 | 
						|
        Decorator used to `add_event_handler` more conveniently.
 | 
						|
 | 
						|
 | 
						|
        Arguments
 | 
						|
            event (`_EventBuilder` | `type`):
 | 
						|
                The event builder class or instance to be used,
 | 
						|
                for instance ``events.NewMessage``.
 | 
						|
 | 
						|
        Example
 | 
						|
            .. code-block:: python
 | 
						|
 | 
						|
                from telethon import TelegramClient, events
 | 
						|
                client = TelegramClient(...)
 | 
						|
 | 
						|
                # Here we use client.on
 | 
						|
                @client.on(events.NewMessage)
 | 
						|
                async def handler(event):
 | 
						|
                    ...
 | 
						|
        """
 | 
						|
        def decorator(f):
 | 
						|
            self.add_event_handler(f, event)
 | 
						|
            return f
 | 
						|
 | 
						|
        return decorator
 | 
						|
 | 
						|
    def add_event_handler(
 | 
						|
            self: 'TelegramClient',
 | 
						|
            callback: callable,
 | 
						|
            event: EventBuilder = None):
 | 
						|
        """
 | 
						|
        Registers a new event handler callback.
 | 
						|
 | 
						|
        The callback will be called when the specified event occurs.
 | 
						|
 | 
						|
        Arguments
 | 
						|
            callback (`callable`):
 | 
						|
                The callable function accepting one parameter to be used.
 | 
						|
 | 
						|
                Note that if you have used `telethon.events.register` in
 | 
						|
                the callback, ``event`` will be ignored, and instead the
 | 
						|
                events you previously registered will be used.
 | 
						|
 | 
						|
            event (`_EventBuilder` | `type`, optional):
 | 
						|
                The event builder class or instance to be used,
 | 
						|
                for instance ``events.NewMessage``.
 | 
						|
 | 
						|
                If left unspecified, `telethon.events.raw.Raw` (the
 | 
						|
                :tl:`Update` objects with no further processing) will
 | 
						|
                be passed instead.
 | 
						|
 | 
						|
        Example
 | 
						|
            .. code-block:: python
 | 
						|
 | 
						|
                from telethon import TelegramClient, events
 | 
						|
                client = TelegramClient(...)
 | 
						|
 | 
						|
                async def handler(event):
 | 
						|
                    ...
 | 
						|
 | 
						|
                client.add_event_handler(handler, events.NewMessage)
 | 
						|
        """
 | 
						|
        builders = events._get_handlers(callback)
 | 
						|
        if builders is not None:
 | 
						|
            for event in builders:
 | 
						|
                self._event_builders.append((event, callback))
 | 
						|
            return
 | 
						|
 | 
						|
        if isinstance(event, type):
 | 
						|
            event = event()
 | 
						|
        elif not event:
 | 
						|
            event = events.Raw()
 | 
						|
 | 
						|
        self._event_builders.append((event, callback))
 | 
						|
 | 
						|
    def remove_event_handler(
 | 
						|
            self: 'TelegramClient',
 | 
						|
            callback: callable,
 | 
						|
            event: EventBuilder = None) -> int:
 | 
						|
        """
 | 
						|
        Inverse operation of `add_event_handler()`.
 | 
						|
 | 
						|
        If no event is given, all events for this callback are removed.
 | 
						|
        Returns how many callbacks were removed.
 | 
						|
 | 
						|
        Example
 | 
						|
            .. code-block:: python
 | 
						|
 | 
						|
                @client.on(events.Raw)
 | 
						|
                @client.on(events.NewMessage)
 | 
						|
                async def handler(event):
 | 
						|
                    ...
 | 
						|
 | 
						|
                # Removes only the "Raw" handling
 | 
						|
                # "handler" will still receive "events.NewMessage"
 | 
						|
                client.remove_event_handler(handler, events.Raw)
 | 
						|
 | 
						|
                # "handler" will stop receiving anything
 | 
						|
                client.remove_event_handler(handler)
 | 
						|
        """
 | 
						|
        found = 0
 | 
						|
        if event and not isinstance(event, type):
 | 
						|
            event = type(event)
 | 
						|
 | 
						|
        i = len(self._event_builders)
 | 
						|
        while i:
 | 
						|
            i -= 1
 | 
						|
            ev, cb = self._event_builders[i]
 | 
						|
            if cb == callback and (not event or isinstance(ev, event)):
 | 
						|
                del self._event_builders[i]
 | 
						|
                found += 1
 | 
						|
 | 
						|
        return found
 | 
						|
 | 
						|
    def list_event_handlers(self: 'TelegramClient')\
 | 
						|
            -> 'typing.Sequence[typing.Tuple[callable, EventBuilder]]':
 | 
						|
        """
 | 
						|
        Lists all registered event handlers.
 | 
						|
 | 
						|
        Returns
 | 
						|
            A list of pairs consisting of ``(callback, event)``.
 | 
						|
 | 
						|
        Example
 | 
						|
            .. code-block:: python
 | 
						|
 | 
						|
                @client.on(events.NewMessage(pattern='hello'))
 | 
						|
                async def on_greeting(event):
 | 
						|
                    '''Greets someone'''
 | 
						|
                    await event.reply('Hi')
 | 
						|
 | 
						|
                for callback, event in client.list_event_handlers():
 | 
						|
                    print(id(callback), type(event))
 | 
						|
        """
 | 
						|
        return [(callback, event) for event, callback in self._event_builders]
 | 
						|
 | 
						|
    async def catch_up(self: 'TelegramClient'):
 | 
						|
        """
 | 
						|
        "Catches up" on the missed updates while the client was offline.
 | 
						|
        You should call this method after registering the event handlers
 | 
						|
        so that the updates it loads can by processed by your script.
 | 
						|
 | 
						|
        This can also be used to forcibly fetch new updates if there are any.
 | 
						|
 | 
						|
        Example
 | 
						|
            .. code-block:: python
 | 
						|
 | 
						|
                await client.catch_up()
 | 
						|
        """
 | 
						|
        pts, date = self._state_cache[None]
 | 
						|
        if not pts:
 | 
						|
            return
 | 
						|
 | 
						|
        self.session.catching_up = True
 | 
						|
        try:
 | 
						|
            while True:
 | 
						|
                d = await self(functions.updates.GetDifferenceRequest(
 | 
						|
                    pts, date, 0
 | 
						|
                ))
 | 
						|
                if isinstance(d, (types.updates.DifferenceSlice,
 | 
						|
                                  types.updates.Difference)):
 | 
						|
                    if isinstance(d, types.updates.Difference):
 | 
						|
                        state = d.state
 | 
						|
                    else:
 | 
						|
                        state = d.intermediate_state
 | 
						|
 | 
						|
                    pts, date = state.pts, state.date
 | 
						|
                    self._handle_update(types.Updates(
 | 
						|
                        users=d.users,
 | 
						|
                        chats=d.chats,
 | 
						|
                        date=state.date,
 | 
						|
                        seq=state.seq,
 | 
						|
                        updates=d.other_updates + [
 | 
						|
                            types.UpdateNewMessage(m, 0, 0)
 | 
						|
                            for m in d.new_messages
 | 
						|
                        ]
 | 
						|
                    ))
 | 
						|
 | 
						|
                    # TODO Implement upper limit (max_pts)
 | 
						|
                    # We don't want to fetch updates we already know about.
 | 
						|
                    #
 | 
						|
                    # We may still get duplicates because the Difference
 | 
						|
                    # contains a lot of updates and presumably only has
 | 
						|
                    # the state for the last one, but at least we don't
 | 
						|
                    # unnecessarily fetch too many.
 | 
						|
                    #
 | 
						|
                    # updates.getDifference's pts_total_limit seems to mean
 | 
						|
                    # "how many pts is the request allowed to return", and
 | 
						|
                    # if there is more than that, it returns "too long" (so
 | 
						|
                    # there would be duplicate updates since we know about
 | 
						|
                    # some). This can be used to detect collisions (i.e.
 | 
						|
                    # it would return an update we have already seen).
 | 
						|
                else:
 | 
						|
                    if isinstance(d, types.updates.DifferenceEmpty):
 | 
						|
                        date = d.date
 | 
						|
                    elif isinstance(d, types.updates.DifferenceTooLong):
 | 
						|
                        pts = d.pts
 | 
						|
                    break
 | 
						|
        except (ConnectionError, asyncio.CancelledError):
 | 
						|
            pass
 | 
						|
        finally:
 | 
						|
            # TODO Save new pts to session
 | 
						|
            self._state_cache._pts_date = (pts, date)
 | 
						|
            self.session.catching_up = False
 | 
						|
 | 
						|
    # endregion
 | 
						|
 | 
						|
    # region Private methods
 | 
						|
 | 
						|
    # It is important to not make _handle_update async because we rely on
 | 
						|
    # the order that the updates arrive in to update the pts and date to
 | 
						|
    # be always-increasing. There is also no need to make this async.
 | 
						|
    def _handle_update(self: 'TelegramClient', update):
 | 
						|
        self.session.process_entities(update)
 | 
						|
        self._entity_cache.add(update)
 | 
						|
 | 
						|
        if isinstance(update, (types.Updates, types.UpdatesCombined)):
 | 
						|
            entities = {utils.get_peer_id(x): x for x in
 | 
						|
                        itertools.chain(update.users, update.chats)}
 | 
						|
            for u in update.updates:
 | 
						|
                self._process_update(u, update.updates, entities=entities)
 | 
						|
        elif isinstance(update, types.UpdateShort):
 | 
						|
            self._process_update(update.update, None)
 | 
						|
        else:
 | 
						|
            self._process_update(update, None)
 | 
						|
 | 
						|
        self._state_cache.update(update)
 | 
						|
 | 
						|
    def _process_update(self: 'TelegramClient', update, others, entities=None):
 | 
						|
        update._entities = entities or {}
 | 
						|
 | 
						|
        # This part is somewhat hot so we don't bother patching
 | 
						|
        # update with channel ID/its state. Instead we just pass
 | 
						|
        # arguments which is faster.
 | 
						|
        channel_id = self._state_cache.get_channel_id(update)
 | 
						|
        args = (update, others, channel_id, self._state_cache[channel_id])
 | 
						|
        if self._dispatching_updates_queue is None:
 | 
						|
            task = self._loop.create_task(self._dispatch_update(*args))
 | 
						|
            self._updates_queue.add(task)
 | 
						|
            task.add_done_callback(lambda _: self._updates_queue.discard(task))
 | 
						|
        else:
 | 
						|
            self._updates_queue.put_nowait(args)
 | 
						|
            if not self._dispatching_updates_queue.is_set():
 | 
						|
                self._dispatching_updates_queue.set()
 | 
						|
                self._loop.create_task(self._dispatch_queue_updates())
 | 
						|
 | 
						|
        self._state_cache.update(update)
 | 
						|
 | 
						|
    async def _update_loop(self: 'TelegramClient'):
 | 
						|
        # Pings' ID don't really need to be secure, just "random"
 | 
						|
        rnd = lambda: random.randrange(-2**63, 2**63)
 | 
						|
        while self.is_connected():
 | 
						|
            try:
 | 
						|
                await asyncio.wait_for(
 | 
						|
                    self.disconnected, timeout=60
 | 
						|
                )
 | 
						|
                continue  # We actually just want to act upon timeout
 | 
						|
            except asyncio.TimeoutError:
 | 
						|
                pass
 | 
						|
            except asyncio.CancelledError:
 | 
						|
                return
 | 
						|
            except Exception:
 | 
						|
                continue  # Any disconnected exception should be ignored
 | 
						|
 | 
						|
            # Check if we have any exported senders to clean-up periodically
 | 
						|
            await self._clean_exported_senders()
 | 
						|
 | 
						|
            # Don't bother sending pings until the low-level connection is
 | 
						|
            # ready, otherwise a lot of pings will be batched to be sent upon
 | 
						|
            # reconnect, when we really don't care about that.
 | 
						|
            if not self._sender._transport_connected():
 | 
						|
                continue
 | 
						|
 | 
						|
            # We also don't really care about their result.
 | 
						|
            # Just send them periodically.
 | 
						|
            try:
 | 
						|
                self._sender.send(functions.PingRequest(rnd()))
 | 
						|
            except (ConnectionError, asyncio.CancelledError):
 | 
						|
                return
 | 
						|
 | 
						|
            # Entities and cached files are not saved when they are
 | 
						|
            # inserted because this is a rather expensive operation
 | 
						|
            # (default's sqlite3 takes ~0.1s to commit changes). Do
 | 
						|
            # it every minute instead. No-op if there's nothing new.
 | 
						|
            self.session.save()
 | 
						|
 | 
						|
            # We need to send some content-related request at least hourly
 | 
						|
            # for Telegram to keep delivering updates, otherwise they will
 | 
						|
            # just stop even if we're connected. Do so every 30 minutes.
 | 
						|
            #
 | 
						|
            # TODO Call getDifference instead since it's more relevant
 | 
						|
            if time.time() - self._last_request > 30 * 60:
 | 
						|
                if not await self.is_user_authorized():
 | 
						|
                    # What can be the user doing for so
 | 
						|
                    # long without being logged in...?
 | 
						|
                    continue
 | 
						|
 | 
						|
                try:
 | 
						|
                    await self(functions.updates.GetStateRequest())
 | 
						|
                except (ConnectionError, asyncio.CancelledError):
 | 
						|
                    return
 | 
						|
 | 
						|
    async def _dispatch_queue_updates(self: 'TelegramClient'):
 | 
						|
        while not self._updates_queue.empty():
 | 
						|
            await self._dispatch_update(*self._updates_queue.get_nowait())
 | 
						|
 | 
						|
        self._dispatching_updates_queue.clear()
 | 
						|
 | 
						|
    async def _dispatch_update(self: 'TelegramClient', update, others, channel_id, pts_date):
 | 
						|
        if not self._entity_cache.ensure_cached(update):
 | 
						|
            # We could add a lock to not fetch the same pts twice if we are
 | 
						|
            # already fetching it. However this does not happen in practice,
 | 
						|
            # which makes sense, because different updates have different pts.
 | 
						|
            if self._state_cache.update(update, check_only=True):
 | 
						|
                # If the update doesn't have pts, fetching won't do anything.
 | 
						|
                # For example, UpdateUserStatus or UpdateChatUserTyping.
 | 
						|
                try:
 | 
						|
                    await self._get_difference(update, channel_id, pts_date)
 | 
						|
                except OSError:
 | 
						|
                    pass  # We were disconnected, that's okay
 | 
						|
                except errors.RPCError:
 | 
						|
                    # There's a high chance the request fails because we lack
 | 
						|
                    # the channel. Because these "happen sporadically" (#1428)
 | 
						|
                    # we should be okay (no flood waits) even if more occur.
 | 
						|
                    pass
 | 
						|
                except ValueError:
 | 
						|
                    # There is a chance that GetFullChannelRequest and GetDifferenceRequest
 | 
						|
                    # inside the _get_difference() function will end up with 
 | 
						|
                    # ValueError("Request was unsuccessful N time(s)") for whatever reasons.
 | 
						|
                    pass
 | 
						|
 | 
						|
        if not self._self_input_peer:
 | 
						|
            # Some updates require our own ID, so we must make sure
 | 
						|
            # that the event builder has offline access to it. Calling
 | 
						|
            # `get_me()` will cache it under `self._self_input_peer`.
 | 
						|
            #
 | 
						|
            # It will return `None` if we haven't logged in yet which is
 | 
						|
            # fine, we will just retry next time anyway.
 | 
						|
            try:
 | 
						|
                await self.get_me(input_peer=True)
 | 
						|
            except OSError:
 | 
						|
                pass  # might not have connection
 | 
						|
 | 
						|
        built = EventBuilderDict(self, update, others)
 | 
						|
        for conv_set in self._conversations.values():
 | 
						|
            for conv in conv_set:
 | 
						|
                ev = built[events.NewMessage]
 | 
						|
                if ev:
 | 
						|
                    conv._on_new_message(ev)
 | 
						|
 | 
						|
                ev = built[events.MessageEdited]
 | 
						|
                if ev:
 | 
						|
                    conv._on_edit(ev)
 | 
						|
 | 
						|
                ev = built[events.MessageRead]
 | 
						|
                if ev:
 | 
						|
                    conv._on_read(ev)
 | 
						|
 | 
						|
                if conv._custom:
 | 
						|
                    await conv._check_custom(built)
 | 
						|
 | 
						|
        for builder, callback in self._event_builders:
 | 
						|
            event = built[type(builder)]
 | 
						|
            if not event:
 | 
						|
                continue
 | 
						|
 | 
						|
            if not builder.resolved:
 | 
						|
                await builder.resolve(self)
 | 
						|
 | 
						|
            filter = builder.filter(event)
 | 
						|
            if inspect.isawaitable(filter):
 | 
						|
                filter = await filter
 | 
						|
            if not filter:
 | 
						|
                continue
 | 
						|
 | 
						|
            try:
 | 
						|
                await callback(event)
 | 
						|
            except errors.AlreadyInConversationError:
 | 
						|
                name = getattr(callback, '__name__', repr(callback))
 | 
						|
                self._log[__name__].debug(
 | 
						|
                    'Event handler "%s" already has an open conversation, '
 | 
						|
                    'ignoring new one', name)
 | 
						|
            except events.StopPropagation:
 | 
						|
                name = getattr(callback, '__name__', repr(callback))
 | 
						|
                self._log[__name__].debug(
 | 
						|
                    'Event handler "%s" stopped chain of propagation '
 | 
						|
                    'for event %s.', name, type(event).__name__
 | 
						|
                )
 | 
						|
                break
 | 
						|
            except Exception as e:
 | 
						|
                if not isinstance(e, asyncio.CancelledError) or self.is_connected():
 | 
						|
                    name = getattr(callback, '__name__', repr(callback))
 | 
						|
                    self._log[__name__].exception('Unhandled exception on %s',
 | 
						|
                                                  name)
 | 
						|
 | 
						|
    async def _dispatch_event(self: 'TelegramClient', event):
 | 
						|
        """
 | 
						|
        Dispatches a single, out-of-order event. Used by `AlbumHack`.
 | 
						|
        """
 | 
						|
        # We're duplicating a most logic from `_dispatch_update`, but all in
 | 
						|
        # the name of speed; we don't want to make it worse for all updates
 | 
						|
        # just because albums may need it.
 | 
						|
        for builder, callback in self._event_builders:
 | 
						|
            if isinstance(builder, events.Raw):
 | 
						|
                continue
 | 
						|
            if not isinstance(event, builder.Event):
 | 
						|
                continue
 | 
						|
 | 
						|
            if not builder.resolved:
 | 
						|
                await builder.resolve(self)
 | 
						|
 | 
						|
            filter = builder.filter(event)
 | 
						|
            if inspect.isawaitable(filter):
 | 
						|
                filter = await filter
 | 
						|
            if not filter:
 | 
						|
                continue
 | 
						|
 | 
						|
            try:
 | 
						|
                await callback(event)
 | 
						|
            except errors.AlreadyInConversationError:
 | 
						|
                name = getattr(callback, '__name__', repr(callback))
 | 
						|
                self._log[__name__].debug(
 | 
						|
                    'Event handler "%s" already has an open conversation, '
 | 
						|
                    'ignoring new one', name)
 | 
						|
            except events.StopPropagation:
 | 
						|
                name = getattr(callback, '__name__', repr(callback))
 | 
						|
                self._log[__name__].debug(
 | 
						|
                    'Event handler "%s" stopped chain of propagation '
 | 
						|
                    'for event %s.', name, type(event).__name__
 | 
						|
                )
 | 
						|
                break
 | 
						|
            except Exception as e:
 | 
						|
                if not isinstance(e, asyncio.CancelledError) or self.is_connected():
 | 
						|
                    name = getattr(callback, '__name__', repr(callback))
 | 
						|
                    self._log[__name__].exception('Unhandled exception on %s',
 | 
						|
                                                  name)
 | 
						|
 | 
						|
    async def _get_difference(self: 'TelegramClient', update, channel_id, pts_date):
 | 
						|
        """
 | 
						|
        Get the difference for this `channel_id` if any, then load entities.
 | 
						|
 | 
						|
        Calls :tl:`updates.getDifference`, which fills the entities cache
 | 
						|
        (always done by `__call__`) and lets us know about the full entities.
 | 
						|
        """
 | 
						|
        # Fetch since the last known pts/date before this update arrived,
 | 
						|
        # in order to fetch this update at full, including its entities.
 | 
						|
        self._log[__name__].debug('Getting difference for entities '
 | 
						|
                                  'for %r', update.__class__)
 | 
						|
        if channel_id:
 | 
						|
            # There are reports where we somehow call get channel difference
 | 
						|
            # with `InputPeerEmpty`. Check our assumptions to better debug
 | 
						|
            # this when it happens.
 | 
						|
            assert isinstance(channel_id, int), 'channel_id was {}, not int in {}'.format(type(channel_id), update)
 | 
						|
            try:
 | 
						|
                # Wrap the ID inside a peer to ensure we get a channel back.
 | 
						|
                where = await self.get_input_entity(types.PeerChannel(channel_id))
 | 
						|
            except ValueError:
 | 
						|
                # There's a high chance that this fails, since
 | 
						|
                # we are getting the difference to fetch entities.
 | 
						|
                return
 | 
						|
 | 
						|
            if not pts_date:
 | 
						|
                # First-time, can't get difference. Get pts instead.
 | 
						|
                result = await self(functions.channels.GetFullChannelRequest(
 | 
						|
                    utils.get_input_channel(where)
 | 
						|
                ))
 | 
						|
                self._state_cache[channel_id] = result.full_chat.pts
 | 
						|
                return
 | 
						|
 | 
						|
            result = await self(functions.updates.GetChannelDifferenceRequest(
 | 
						|
                channel=where,
 | 
						|
                filter=types.ChannelMessagesFilterEmpty(),
 | 
						|
                pts=pts_date,  # just pts
 | 
						|
                limit=100,
 | 
						|
                force=True
 | 
						|
            ))
 | 
						|
        else:
 | 
						|
            if not pts_date[0]:
 | 
						|
                # First-time, can't get difference. Get pts instead.
 | 
						|
                result = await self(functions.updates.GetStateRequest())
 | 
						|
                self._state_cache[None] = result.pts, result.date
 | 
						|
                return
 | 
						|
 | 
						|
            result = await self(functions.updates.GetDifferenceRequest(
 | 
						|
                pts=pts_date[0],
 | 
						|
                date=pts_date[1],
 | 
						|
                qts=0
 | 
						|
            ))
 | 
						|
 | 
						|
        if isinstance(result, (types.updates.Difference,
 | 
						|
                               types.updates.DifferenceSlice,
 | 
						|
                               types.updates.ChannelDifference,
 | 
						|
                               types.updates.ChannelDifferenceTooLong)):
 | 
						|
            update._entities.update({
 | 
						|
                utils.get_peer_id(x): x for x in
 | 
						|
                itertools.chain(result.users, result.chats)
 | 
						|
            })
 | 
						|
 | 
						|
    async def _handle_auto_reconnect(self: 'TelegramClient'):
 | 
						|
        # TODO Catch-up
 | 
						|
        # For now we make a high-level request to let Telegram
 | 
						|
        # know we are still interested in receiving more updates.
 | 
						|
        try:
 | 
						|
            await self.get_me()
 | 
						|
        except Exception as e:
 | 
						|
            self._log[__name__].warning('Error executing high-level request '
 | 
						|
                                        'after reconnect: %s: %s', type(e), e)
 | 
						|
 | 
						|
        return
 | 
						|
        try:
 | 
						|
            self._log[__name__].info(
 | 
						|
                'Asking for the current state after reconnect...')
 | 
						|
 | 
						|
            # TODO consider:
 | 
						|
            # If there aren't many updates while the client is disconnected
 | 
						|
            # (I tried with up to 20), Telegram seems to send them without
 | 
						|
            # asking for them (via updates.getDifference).
 | 
						|
            #
 | 
						|
            # On disconnection, the library should probably set a "need
 | 
						|
            # difference" or "catching up" flag so that any new updates are
 | 
						|
            # ignored, and then the library should call updates.getDifference
 | 
						|
            # itself to fetch them.
 | 
						|
            #
 | 
						|
            # In any case (either there are too many updates and Telegram
 | 
						|
            # didn't send them, or there isn't a lot and Telegram sent them
 | 
						|
            # but we dropped them), we fetch the new difference to get all
 | 
						|
            # missed updates. I feel like this would be the best solution.
 | 
						|
 | 
						|
            # If a disconnection occurs, the old known state will be
 | 
						|
            # the latest one we were aware of, so we can catch up since
 | 
						|
            # the most recent state we were aware of.
 | 
						|
            await self.catch_up()
 | 
						|
 | 
						|
            self._log[__name__].info('Successfully fetched missed updates')
 | 
						|
        except errors.RPCError as e:
 | 
						|
            self._log[__name__].warning('Failed to get missed updates after '
 | 
						|
                                        'reconnect: %r', e)
 | 
						|
        except Exception:
 | 
						|
            self._log[__name__].exception('Unhandled exception while getting '
 | 
						|
                                          'update difference after reconnect')
 | 
						|
 | 
						|
    # endregion
 | 
						|
 | 
						|
 | 
						|
class EventBuilderDict:
 | 
						|
    """
 | 
						|
    Helper "dictionary" to return events from types and cache them.
 | 
						|
    """
 | 
						|
    def __init__(self, client: 'TelegramClient', update, others):
 | 
						|
        self.client = client
 | 
						|
        self.update = update
 | 
						|
        self.others = others
 | 
						|
 | 
						|
    def __getitem__(self, builder):
 | 
						|
        try:
 | 
						|
            return self.__dict__[builder]
 | 
						|
        except KeyError:
 | 
						|
            event = self.__dict__[builder] = builder.build(
 | 
						|
                self.update, self.others, self.client._self_id)
 | 
						|
 | 
						|
            if isinstance(event, EventCommon):
 | 
						|
                event.original_update = self.update
 | 
						|
                event._entities = self.update._entities
 | 
						|
                event._set_client(self.client)
 | 
						|
            elif event:
 | 
						|
                event._client = self.client
 | 
						|
 | 
						|
            return event
 |