mirror of
				https://github.com/LonamiWebs/Telethon.git
				synced 2025-11-04 01:47:27 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			905 lines
		
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			905 lines
		
	
	
		
			36 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import abc
 | 
						|
import re
 | 
						|
import asyncio
 | 
						|
import collections
 | 
						|
import logging
 | 
						|
import platform
 | 
						|
import time
 | 
						|
import typing
 | 
						|
import datetime
 | 
						|
 | 
						|
from .. import version, helpers, __name__ as __base_name__
 | 
						|
from ..crypto import rsa
 | 
						|
from ..entitycache import EntityCache
 | 
						|
from ..extensions import markdown
 | 
						|
from ..network import MTProtoSender, Connection, ConnectionTcpFull, TcpMTProxy
 | 
						|
from ..sessions import Session, SQLiteSession, MemorySession
 | 
						|
from ..tl import functions, types
 | 
						|
from ..tl.alltlobjects import LAYER
 | 
						|
from .._updates import MessageBox, EntityCache as MbEntityCache, SessionState, ChannelState, Entity, EntityType
 | 
						|
 | 
						|
DEFAULT_DC_ID = 2
 | 
						|
DEFAULT_IPV4_IP = '149.154.167.51'
 | 
						|
DEFAULT_IPV6_IP = '2001:67c:4e8:f002::a'
 | 
						|
DEFAULT_PORT = 443
 | 
						|
 | 
						|
if typing.TYPE_CHECKING:
 | 
						|
    from .telegramclient import TelegramClient
 | 
						|
 | 
						|
_base_log = logging.getLogger(__base_name__)
 | 
						|
 | 
						|
 | 
						|
# In seconds, how long to wait before disconnecting a exported sender.
 | 
						|
_DISCONNECT_EXPORTED_AFTER = 60
 | 
						|
 | 
						|
 | 
						|
class _ExportState:
 | 
						|
    def __init__(self):
 | 
						|
        # ``n`` is the amount of borrows a given sender has;
 | 
						|
        # once ``n`` reaches ``0``, disconnect the sender after a while.
 | 
						|
        self._n = 0
 | 
						|
        self._zero_ts = 0
 | 
						|
        self._connected = False
 | 
						|
 | 
						|
    def add_borrow(self):
 | 
						|
        self._n += 1
 | 
						|
        self._connected = True
 | 
						|
 | 
						|
    def add_return(self):
 | 
						|
        self._n -= 1
 | 
						|
        assert self._n >= 0, 'returned sender more than it was borrowed'
 | 
						|
        if self._n == 0:
 | 
						|
            self._zero_ts = time.time()
 | 
						|
 | 
						|
    def should_disconnect(self):
 | 
						|
        return (self._n == 0
 | 
						|
                and self._connected
 | 
						|
                and (time.time() - self._zero_ts) > _DISCONNECT_EXPORTED_AFTER)
 | 
						|
 | 
						|
    def need_connect(self):
 | 
						|
        return not self._connected
 | 
						|
 | 
						|
    def mark_disconnected(self):
 | 
						|
        assert self.should_disconnect(), 'marked as disconnected when it was borrowed'
 | 
						|
        self._connected = False
 | 
						|
 | 
						|
 | 
						|
# TODO How hard would it be to support both `trio` and `asyncio`?
 | 
						|
class TelegramBaseClient(abc.ABC):
 | 
						|
    """
 | 
						|
    This is the abstract base class for the client. It defines some
 | 
						|
    basic stuff like connecting, switching data center, etc, and
 | 
						|
    leaves the `__call__` unimplemented.
 | 
						|
 | 
						|
    Arguments
 | 
						|
        session (`str` | `telethon.sessions.abstract.Session`, `None`):
 | 
						|
            The file name of the session file to be used if a string is
 | 
						|
            given (it may be a full path), or the Session instance to be
 | 
						|
            used otherwise. If it's `None`, the session will not be saved,
 | 
						|
            and you should call :meth:`.log_out()` when you're done.
 | 
						|
 | 
						|
            Note that if you pass a string it will be a file in the current
 | 
						|
            working directory, although you can also pass absolute paths.
 | 
						|
 | 
						|
            The session file contains enough information for you to login
 | 
						|
            without re-sending the code, so if you have to enter the code
 | 
						|
            more than once, maybe you're changing the working directory,
 | 
						|
            renaming or removing the file, or using random names.
 | 
						|
 | 
						|
        api_id (`int` | `str`):
 | 
						|
            The API ID you obtained from https://my.telegram.org.
 | 
						|
 | 
						|
        api_hash (`str`):
 | 
						|
            The API hash you obtained from https://my.telegram.org.
 | 
						|
 | 
						|
        connection (`telethon.network.connection.common.Connection`, optional):
 | 
						|
            The connection instance to be used when creating a new connection
 | 
						|
            to the servers. It **must** be a type.
 | 
						|
 | 
						|
            Defaults to `telethon.network.connection.tcpfull.ConnectionTcpFull`.
 | 
						|
 | 
						|
        use_ipv6 (`bool`, optional):
 | 
						|
            Whether to connect to the servers through IPv6 or not.
 | 
						|
            By default this is `False` as IPv6 support is not
 | 
						|
            too widespread yet.
 | 
						|
 | 
						|
        proxy (`tuple` | `list` | `dict`, optional):
 | 
						|
            An iterable consisting of the proxy info. If `connection` is
 | 
						|
            one of `MTProxy`, then it should contain MTProxy credentials:
 | 
						|
            ``('hostname', port, 'secret')``. Otherwise, it's meant to store
 | 
						|
            function parameters for PySocks, like ``(type, 'hostname', port)``.
 | 
						|
            See https://github.com/Anorov/PySocks#usage-1 for more.
 | 
						|
 | 
						|
        local_addr (`str` | `tuple`, optional):
 | 
						|
            Local host address (and port, optionally) used to bind the socket to locally.
 | 
						|
            You only need to use this if you have multiple network cards and
 | 
						|
            want to use a specific one.
 | 
						|
 | 
						|
        timeout (`int` | `float`, optional):
 | 
						|
            The timeout in seconds to be used when connecting.
 | 
						|
            This is **not** the timeout to be used when ``await``'ing for
 | 
						|
            invoked requests, and you should use ``asyncio.wait`` or
 | 
						|
            ``asyncio.wait_for`` for that.
 | 
						|
 | 
						|
        request_retries (`int` | `None`, optional):
 | 
						|
            How many times a request should be retried. Request are retried
 | 
						|
            when Telegram is having internal issues (due to either
 | 
						|
            ``errors.ServerError`` or ``errors.RpcCallFailError``),
 | 
						|
            when there is a ``errors.FloodWaitError`` less than
 | 
						|
            `flood_sleep_threshold`, or when there's a migrate error.
 | 
						|
 | 
						|
            May take a negative or `None` value for infinite retries, but
 | 
						|
            this is not recommended, since some requests can always trigger
 | 
						|
            a call fail (such as searching for messages).
 | 
						|
 | 
						|
        connection_retries (`int` | `None`, optional):
 | 
						|
            How many times the reconnection should retry, either on the
 | 
						|
            initial connection or when Telegram disconnects us. May be
 | 
						|
            set to a negative or `None` value for infinite retries, but
 | 
						|
            this is not recommended, since the program can get stuck in an
 | 
						|
            infinite loop.
 | 
						|
 | 
						|
        retry_delay (`int` | `float`, optional):
 | 
						|
            The delay in seconds to sleep between automatic reconnections.
 | 
						|
 | 
						|
        auto_reconnect (`bool`, optional):
 | 
						|
            Whether reconnection should be retried `connection_retries`
 | 
						|
            times automatically if Telegram disconnects us or not.
 | 
						|
 | 
						|
        sequential_updates (`bool`, optional):
 | 
						|
            By default every incoming update will create a new task, so
 | 
						|
            you can handle several updates in parallel. Some scripts need
 | 
						|
            the order in which updates are processed to be sequential, and
 | 
						|
            this setting allows them to do so.
 | 
						|
 | 
						|
            If set to `True`, incoming updates will be put in a queue
 | 
						|
            and processed sequentially. This means your event handlers
 | 
						|
            should *not* perform long-running operations since new
 | 
						|
            updates are put inside of an unbounded queue.
 | 
						|
 | 
						|
        flood_sleep_threshold (`int` | `float`, optional):
 | 
						|
            The threshold below which the library should automatically
 | 
						|
            sleep on flood wait and slow mode wait errors (inclusive). For instance, if a
 | 
						|
            ``FloodWaitError`` for 17s occurs and `flood_sleep_threshold`
 | 
						|
            is 20s, the library will ``sleep`` automatically. If the error
 | 
						|
            was for 21s, it would ``raise FloodWaitError`` instead. Values
 | 
						|
            larger than a day (like ``float('inf')``) will be changed to a day.
 | 
						|
 | 
						|
        raise_last_call_error (`bool`, optional):
 | 
						|
            When API calls fail in a way that causes Telethon to retry
 | 
						|
            automatically, should the RPC error of the last attempt be raised
 | 
						|
            instead of a generic ValueError. This is mostly useful for
 | 
						|
            detecting when Telegram has internal issues.
 | 
						|
 | 
						|
        device_model (`str`, optional):
 | 
						|
            "Device model" to be sent when creating the initial connection.
 | 
						|
            Defaults to 'PC (n)bit' derived from ``platform.uname().machine``, or its direct value if unknown.
 | 
						|
 | 
						|
        system_version (`str`, optional):
 | 
						|
            "System version" to be sent when creating the initial connection.
 | 
						|
            Defaults to ``platform.uname().release`` stripped of everything ahead of -.
 | 
						|
 | 
						|
        app_version (`str`, optional):
 | 
						|
            "App version" to be sent when creating the initial connection.
 | 
						|
            Defaults to `telethon.version.__version__`.
 | 
						|
 | 
						|
        lang_code (`str`, optional):
 | 
						|
            "Language code" to be sent when creating the initial connection.
 | 
						|
            Defaults to ``'en'``.
 | 
						|
 | 
						|
        system_lang_code (`str`, optional):
 | 
						|
            "System lang code"  to be sent when creating the initial connection.
 | 
						|
            Defaults to `lang_code`.
 | 
						|
 | 
						|
        loop (`asyncio.AbstractEventLoop`, optional):
 | 
						|
            Asyncio event loop to use. Defaults to `asyncio.get_event_loop()`.
 | 
						|
            This argument is ignored.
 | 
						|
 | 
						|
        base_logger (`str` | `logging.Logger`, optional):
 | 
						|
            Base logger name or instance to use.
 | 
						|
            If a `str` is given, it'll be passed to `logging.getLogger()`. If a
 | 
						|
            `logging.Logger` is given, it'll be used directly. If something
 | 
						|
            else or nothing is given, the default logger will be used.
 | 
						|
 | 
						|
        receive_updates (`bool`, optional):
 | 
						|
            Whether the client will receive updates or not. By default, updates
 | 
						|
            will be received from Telegram as they occur.
 | 
						|
 | 
						|
            Turning this off means that Telegram will not send updates at all
 | 
						|
            so event handlers, conversations, and QR login will not work.
 | 
						|
            However, certain scripts don't need updates, so this will reduce
 | 
						|
            the amount of bandwidth used.
 | 
						|
    """
 | 
						|
 | 
						|
    # Current TelegramClient version
 | 
						|
    __version__ = version.__version__
 | 
						|
 | 
						|
    # Cached server configuration (with .dc_options), can be "global"
 | 
						|
    _config = None
 | 
						|
    _cdn_config = None
 | 
						|
 | 
						|
    # region Initialization
 | 
						|
 | 
						|
    def __init__(
 | 
						|
            self: 'TelegramClient',
 | 
						|
            session: 'typing.Union[str, Session]',
 | 
						|
            api_id: int,
 | 
						|
            api_hash: str,
 | 
						|
            *,
 | 
						|
            connection: 'typing.Type[Connection]' = ConnectionTcpFull,
 | 
						|
            use_ipv6: bool = False,
 | 
						|
            proxy: typing.Union[tuple, dict] = None,
 | 
						|
            local_addr: typing.Union[str, tuple] = None,
 | 
						|
            timeout: int = 10,
 | 
						|
            request_retries: int = 5,
 | 
						|
            connection_retries: int = 5,
 | 
						|
            retry_delay: int = 1,
 | 
						|
            auto_reconnect: bool = True,
 | 
						|
            sequential_updates: bool = False,
 | 
						|
            flood_sleep_threshold: int = 60,
 | 
						|
            raise_last_call_error: bool = False,
 | 
						|
            device_model: str = None,
 | 
						|
            system_version: str = None,
 | 
						|
            app_version: str = None,
 | 
						|
            lang_code: str = 'en',
 | 
						|
            system_lang_code: str = 'en',
 | 
						|
            loop: asyncio.AbstractEventLoop = None,
 | 
						|
            base_logger: typing.Union[str, logging.Logger] = None,
 | 
						|
            receive_updates: bool = True,
 | 
						|
            catch_up: bool = False
 | 
						|
    ):
 | 
						|
        if not api_id or not api_hash:
 | 
						|
            raise ValueError(
 | 
						|
                "Your API ID or Hash cannot be empty or None. "
 | 
						|
                "Refer to telethon.rtfd.io for more information.")
 | 
						|
 | 
						|
        self._use_ipv6 = use_ipv6
 | 
						|
 | 
						|
        if isinstance(base_logger, str):
 | 
						|
            base_logger = logging.getLogger(base_logger)
 | 
						|
        elif not isinstance(base_logger, logging.Logger):
 | 
						|
            base_logger = _base_log
 | 
						|
 | 
						|
        class _Loggers(dict):
 | 
						|
            def __missing__(self, key):
 | 
						|
                if key.startswith("telethon."):
 | 
						|
                    key = key.split('.', maxsplit=1)[1]
 | 
						|
 | 
						|
                return base_logger.getChild(key)
 | 
						|
 | 
						|
        self._log = _Loggers()
 | 
						|
 | 
						|
        # Determine what session object we have
 | 
						|
        if isinstance(session, str) or session is None:
 | 
						|
            try:
 | 
						|
                session = SQLiteSession(session)
 | 
						|
            except ImportError:
 | 
						|
                import warnings
 | 
						|
                warnings.warn(
 | 
						|
                    'The sqlite3 module is not available under this '
 | 
						|
                    'Python installation and no custom session '
 | 
						|
                    'instance was given; using MemorySession.\n'
 | 
						|
                    'You will need to re-login every time unless '
 | 
						|
                    'you use another session storage'
 | 
						|
                )
 | 
						|
                session = MemorySession()
 | 
						|
        elif not isinstance(session, Session):
 | 
						|
            raise TypeError(
 | 
						|
                'The given session must be a str or a Session instance.'
 | 
						|
            )
 | 
						|
 | 
						|
        # ':' in session.server_address is True if it's an IPv6 address
 | 
						|
        if (not session.server_address or
 | 
						|
                (':' in session.server_address) != use_ipv6):
 | 
						|
            session.set_dc(
 | 
						|
                DEFAULT_DC_ID,
 | 
						|
                DEFAULT_IPV6_IP if self._use_ipv6 else DEFAULT_IPV4_IP,
 | 
						|
                DEFAULT_PORT
 | 
						|
            )
 | 
						|
 | 
						|
        self.flood_sleep_threshold = flood_sleep_threshold
 | 
						|
 | 
						|
        # TODO Use AsyncClassWrapper(session)
 | 
						|
        # ChatGetter and SenderGetter can use the in-memory _entity_cache
 | 
						|
        # to avoid network access and the need for await in session files.
 | 
						|
        #
 | 
						|
        # The session files only wants the entities to persist
 | 
						|
        # them to disk, and to save additional useful information.
 | 
						|
        # TODO Session should probably return all cached
 | 
						|
        #      info of entities, not just the input versions
 | 
						|
        self.session = session
 | 
						|
        self._entity_cache = EntityCache()
 | 
						|
        self.api_id = int(api_id)
 | 
						|
        self.api_hash = api_hash
 | 
						|
 | 
						|
        # Current proxy implementation requires `sock_connect`, and some
 | 
						|
        # event loops lack this method. If the current loop is missing it,
 | 
						|
        # bail out early and suggest an alternative.
 | 
						|
        #
 | 
						|
        # TODO A better fix is obviously avoiding the use of `sock_connect`
 | 
						|
        #
 | 
						|
        # See https://github.com/LonamiWebs/Telethon/issues/1337 for details.
 | 
						|
        if not callable(getattr(self.loop, 'sock_connect', None)):
 | 
						|
            raise TypeError(
 | 
						|
                'Event loop of type {} lacks `sock_connect`, which is needed to use proxies.\n\n'
 | 
						|
                'Change the event loop in use to use proxies:\n'
 | 
						|
                '# https://github.com/LonamiWebs/Telethon/issues/1337\n'
 | 
						|
                'import asyncio\n'
 | 
						|
                'asyncio.set_event_loop(asyncio.SelectorEventLoop())'.format(
 | 
						|
                    self.loop.__class__.__name__
 | 
						|
                )
 | 
						|
            )
 | 
						|
 | 
						|
        if local_addr is not None:
 | 
						|
            if use_ipv6 is False and ':' in local_addr:
 | 
						|
                raise TypeError(
 | 
						|
                    'A local IPv6 address must only be used with `use_ipv6=True`.'
 | 
						|
                )
 | 
						|
            elif use_ipv6 is True and ':' not in local_addr:
 | 
						|
                raise TypeError(
 | 
						|
                    '`use_ipv6=True` must only be used with a local IPv6 address.'
 | 
						|
                )
 | 
						|
 | 
						|
        self._raise_last_call_error = raise_last_call_error
 | 
						|
 | 
						|
        self._request_retries = request_retries
 | 
						|
        self._connection_retries = connection_retries
 | 
						|
        self._retry_delay = retry_delay or 0
 | 
						|
        self._proxy = proxy
 | 
						|
        self._local_addr = local_addr
 | 
						|
        self._timeout = timeout
 | 
						|
        self._auto_reconnect = auto_reconnect
 | 
						|
 | 
						|
        assert isinstance(connection, type)
 | 
						|
        self._connection = connection
 | 
						|
        init_proxy = None if not issubclass(connection, TcpMTProxy) else \
 | 
						|
            types.InputClientProxy(*connection.address_info(proxy))
 | 
						|
 | 
						|
        # Used on connection. Capture the variables in a lambda since
 | 
						|
        # exporting clients need to create this InvokeWithLayerRequest.
 | 
						|
        system = platform.uname()
 | 
						|
 | 
						|
        if system.machine in ('x86_64', 'AMD64'):
 | 
						|
            default_device_model = 'PC 64bit'
 | 
						|
        elif system.machine in ('i386','i686','x86'):
 | 
						|
            default_device_model = 'PC 32bit'
 | 
						|
        else:
 | 
						|
            default_device_model = system.machine
 | 
						|
        default_system_version = re.sub(r'-.+','',system.release)
 | 
						|
 | 
						|
        self._init_request = functions.InitConnectionRequest(
 | 
						|
            api_id=self.api_id,
 | 
						|
            device_model=device_model or default_device_model or 'Unknown',
 | 
						|
            system_version=system_version or default_system_version or '1.0',
 | 
						|
            app_version=app_version or self.__version__,
 | 
						|
            lang_code=lang_code,
 | 
						|
            system_lang_code=system_lang_code,
 | 
						|
            lang_pack='',  # "langPacks are for official apps only"
 | 
						|
            query=None,
 | 
						|
            proxy=init_proxy
 | 
						|
        )
 | 
						|
 | 
						|
        # Remember flood-waited requests to avoid making them again
 | 
						|
        self._flood_waited_requests = {}
 | 
						|
 | 
						|
        # Cache ``{dc_id: (_ExportState, MTProtoSender)}`` for all borrowed senders
 | 
						|
        self._borrowed_senders = {}
 | 
						|
        self._borrow_sender_lock = asyncio.Lock()
 | 
						|
 | 
						|
        self._updates_handle = None
 | 
						|
        self._keepalive_handle = None
 | 
						|
        self._last_request = time.time()
 | 
						|
        self._channel_pts = {}
 | 
						|
        self._no_updates = not receive_updates
 | 
						|
 | 
						|
        # Used for non-sequential updates, in order to terminate all pending tasks on disconnect.
 | 
						|
        self._sequential_updates = sequential_updates
 | 
						|
        self._event_handler_tasks = set()
 | 
						|
 | 
						|
        self._authorized = None  # None = unknown, False = no, True = yes
 | 
						|
 | 
						|
        # Some further state for subclasses
 | 
						|
        self._event_builders = []
 | 
						|
 | 
						|
        # {chat_id: {Conversation}}
 | 
						|
        self._conversations = collections.defaultdict(set)
 | 
						|
 | 
						|
        # Hack to workaround the fact Telegram may send album updates as
 | 
						|
        # different Updates when being sent from a different data center.
 | 
						|
        # {grouped_id: AlbumHack}
 | 
						|
        #
 | 
						|
        # FIXME: We don't bother cleaning this up because it's not really
 | 
						|
        #        worth it, albums are pretty rare and this only holds them
 | 
						|
        #        for a second at most.
 | 
						|
        self._albums = {}
 | 
						|
 | 
						|
        # Default parse mode
 | 
						|
        self._parse_mode = markdown
 | 
						|
 | 
						|
        # Some fields to easy signing in. Let {phone: hash} be
 | 
						|
        # a dictionary because the user may change their mind.
 | 
						|
        self._phone_code_hash = {}
 | 
						|
        self._phone = None
 | 
						|
        self._tos = None
 | 
						|
 | 
						|
        # Sometimes we need to know who we are, cache the self peer
 | 
						|
        self._self_input_peer = None
 | 
						|
        self._bot = None
 | 
						|
 | 
						|
        # A place to store if channels are a megagroup or not (see `edit_admin`)
 | 
						|
        self._megagroup_cache = {}
 | 
						|
 | 
						|
        # This is backported from v2 in a very ad-hoc way just to get proper update handling
 | 
						|
        self._catch_up = catch_up
 | 
						|
        self._updates_queue = asyncio.Queue()
 | 
						|
        self._message_box = MessageBox()
 | 
						|
        # This entity cache is tailored for the messagebox and is not used for absolutely everything like _entity_cache
 | 
						|
        self._mb_entity_cache = MbEntityCache()  # required for proper update handling (to know when to getDifference)
 | 
						|
 | 
						|
        self._sender = MTProtoSender(
 | 
						|
            self.session.auth_key,
 | 
						|
            loggers=self._log,
 | 
						|
            retries=self._connection_retries,
 | 
						|
            delay=self._retry_delay,
 | 
						|
            auto_reconnect=self._auto_reconnect,
 | 
						|
            connect_timeout=self._timeout,
 | 
						|
            auth_key_callback=self._auth_key_callback,
 | 
						|
            updates_queue=self._updates_queue,
 | 
						|
            auto_reconnect_callback=self._handle_auto_reconnect
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
    # endregion
 | 
						|
 | 
						|
    # region Properties
 | 
						|
 | 
						|
    @property
 | 
						|
    def loop(self: 'TelegramClient') -> asyncio.AbstractEventLoop:
 | 
						|
        """
 | 
						|
        Property with the ``asyncio`` event loop used by this client.
 | 
						|
 | 
						|
        Example
 | 
						|
            .. code-block:: python
 | 
						|
 | 
						|
                # Download media in the background
 | 
						|
                task = client.loop.create_task(message.download_media())
 | 
						|
 | 
						|
                # Do some work
 | 
						|
                ...
 | 
						|
 | 
						|
                # Join the task (wait for it to complete)
 | 
						|
                await task
 | 
						|
        """
 | 
						|
        return asyncio.get_event_loop()
 | 
						|
 | 
						|
    @property
 | 
						|
    def disconnected(self: 'TelegramClient') -> asyncio.Future:
 | 
						|
        """
 | 
						|
        Property with a ``Future`` that resolves upon disconnection.
 | 
						|
 | 
						|
        Example
 | 
						|
            .. code-block:: python
 | 
						|
 | 
						|
                # Wait for a disconnection to occur
 | 
						|
                try:
 | 
						|
                    await client.disconnected
 | 
						|
                except OSError:
 | 
						|
                    print('Error on disconnect')
 | 
						|
        """
 | 
						|
        return self._sender.disconnected
 | 
						|
 | 
						|
    @property
 | 
						|
    def flood_sleep_threshold(self):
 | 
						|
        return self._flood_sleep_threshold
 | 
						|
 | 
						|
    @flood_sleep_threshold.setter
 | 
						|
    def flood_sleep_threshold(self, value):
 | 
						|
        # None -> 0, negative values don't really matter
 | 
						|
        self._flood_sleep_threshold = min(value or 0, 24 * 60 * 60)
 | 
						|
 | 
						|
    # endregion
 | 
						|
 | 
						|
    # region Connecting
 | 
						|
 | 
						|
    async def connect(self: 'TelegramClient') -> None:
 | 
						|
        """
 | 
						|
        Connects to Telegram.
 | 
						|
 | 
						|
        .. note::
 | 
						|
 | 
						|
            Connect means connect and nothing else, and only one low-level
 | 
						|
            request is made to notify Telegram about which layer we will be
 | 
						|
            using.
 | 
						|
 | 
						|
            Before Telegram sends you updates, you need to make a high-level
 | 
						|
            request, like `client.get_me() <telethon.client.users.UserMethods.get_me>`,
 | 
						|
            as described in https://core.telegram.org/api/updates.
 | 
						|
 | 
						|
        Example
 | 
						|
            .. code-block:: python
 | 
						|
 | 
						|
                try:
 | 
						|
                    await client.connect()
 | 
						|
                except OSError:
 | 
						|
                    print('Failed to connect')
 | 
						|
        """
 | 
						|
        # Workaround specific to SQLiteSession, which sometimes need to persist info after init.
 | 
						|
        # Since .save() is now async we can't do that in init. Instead we do it in the first connect.
 | 
						|
        if isinstance(self.session, SQLiteSession) and not self.session._init_saved:
 | 
						|
            await self.session.save()
 | 
						|
            self.session._init_saved = True
 | 
						|
 | 
						|
        if not await self._sender.connect(self._connection(
 | 
						|
            self.session.server_address,
 | 
						|
            self.session.port,
 | 
						|
            self.session.dc_id,
 | 
						|
            loggers=self._log,
 | 
						|
            proxy=self._proxy,
 | 
						|
            local_addr=self._local_addr
 | 
						|
        )):
 | 
						|
            # We don't want to init or modify anything if we were already connected
 | 
						|
            return
 | 
						|
 | 
						|
        self.session.auth_key = self._sender.auth_key
 | 
						|
        await self.session.save()
 | 
						|
 | 
						|
        if self._catch_up:
 | 
						|
            ss = SessionState(0, 0, False, 0, 0, 0, 0, None)
 | 
						|
            cs = []
 | 
						|
 | 
						|
            for entity_id, state in await self.session.get_update_states():
 | 
						|
                if entity_id == 0:
 | 
						|
                    # TODO current session doesn't store self-user info but adding that is breaking on downstream session impls
 | 
						|
                    ss = SessionState(0, 0, False, state.pts, state.qts, int(state.date.timestamp()), state.seq, None)
 | 
						|
                else:
 | 
						|
                    cs.append(ChannelState(entity_id, state.pts))
 | 
						|
 | 
						|
            self._message_box.load(ss, cs)
 | 
						|
            for state in cs:
 | 
						|
                entity = await self.session.get_input_entity(state.channel_id)
 | 
						|
                if entity:
 | 
						|
                    self._mb_entity_cache.put(Entity(EntityType.CHANNEL, entity.channel_id, entity.access_hash))
 | 
						|
 | 
						|
        self._init_request.query = functions.help.GetConfigRequest()
 | 
						|
 | 
						|
        await self._sender.send(functions.InvokeWithLayerRequest(
 | 
						|
            LAYER, self._init_request
 | 
						|
        ))
 | 
						|
 | 
						|
        if self._message_box.is_empty():
 | 
						|
            me = await self.get_me()
 | 
						|
            if me:
 | 
						|
                await self._on_login(me)  # also calls GetState to initialize the MessageBox
 | 
						|
 | 
						|
        self._updates_handle = self.loop.create_task(self._update_loop())
 | 
						|
        self._keepalive_handle = self.loop.create_task(self._keepalive_loop())
 | 
						|
 | 
						|
    def is_connected(self: 'TelegramClient') -> bool:
 | 
						|
        """
 | 
						|
        Returns `True` if the user has connected.
 | 
						|
 | 
						|
        This method is **not** asynchronous (don't use ``await`` on it).
 | 
						|
 | 
						|
        Example
 | 
						|
            .. code-block:: python
 | 
						|
 | 
						|
                while client.is_connected():
 | 
						|
                    await asyncio.sleep(1)
 | 
						|
        """
 | 
						|
        sender = getattr(self, '_sender', None)
 | 
						|
        return sender and sender.is_connected()
 | 
						|
 | 
						|
    def disconnect(self: 'TelegramClient'):
 | 
						|
        """
 | 
						|
        Disconnects from Telegram.
 | 
						|
 | 
						|
        If the event loop is already running, this method returns a
 | 
						|
        coroutine that you should await on your own code; otherwise
 | 
						|
        the loop is ran until said coroutine completes.
 | 
						|
 | 
						|
        Example
 | 
						|
            .. code-block:: python
 | 
						|
 | 
						|
                # You don't need to use this if you used "with client"
 | 
						|
                await client.disconnect()
 | 
						|
        """
 | 
						|
        if self.loop.is_running():
 | 
						|
            return self._disconnect_coro()
 | 
						|
        else:
 | 
						|
            try:
 | 
						|
                self.loop.run_until_complete(self._disconnect_coro())
 | 
						|
            except RuntimeError:
 | 
						|
                # Python 3.5.x complains when called from
 | 
						|
                # `__aexit__` and there were pending updates with:
 | 
						|
                #   "Event loop stopped before Future completed."
 | 
						|
                #
 | 
						|
                # However, it doesn't really make a lot of sense.
 | 
						|
                pass
 | 
						|
 | 
						|
    def set_proxy(self: 'TelegramClient', proxy: typing.Union[tuple, dict]):
 | 
						|
        """
 | 
						|
        Changes the proxy which will be used on next (re)connection.
 | 
						|
 | 
						|
        Method has no immediate effects if the client is currently connected.
 | 
						|
 | 
						|
        The new proxy will take it's effect on the next reconnection attempt:
 | 
						|
            - on a call `await client.connect()` (after complete disconnect)
 | 
						|
            - on auto-reconnect attempt (e.g, after previous connection was lost)
 | 
						|
        """
 | 
						|
        init_proxy = None if not issubclass(self._connection, TcpMTProxy) else \
 | 
						|
            types.InputClientProxy(*self._connection.address_info(proxy))
 | 
						|
 | 
						|
        self._init_request.proxy = init_proxy
 | 
						|
        self._proxy = proxy
 | 
						|
 | 
						|
        # While `await client.connect()` passes new proxy on each new call,
 | 
						|
        # auto-reconnect attempts use already set up `_connection` inside
 | 
						|
        # the `_sender`, so the only way to change proxy between those
 | 
						|
        # is to directly inject parameters.
 | 
						|
 | 
						|
        connection = getattr(self._sender, "_connection", None)
 | 
						|
        if connection:
 | 
						|
            if isinstance(connection, TcpMTProxy):
 | 
						|
                connection._ip = proxy[0]
 | 
						|
                connection._port = proxy[1]
 | 
						|
            else:
 | 
						|
                connection._proxy = proxy
 | 
						|
 | 
						|
    async def _disconnect_coro(self: 'TelegramClient'):
 | 
						|
        await self._disconnect()
 | 
						|
 | 
						|
        # Also clean-up all exported senders because we're done with them
 | 
						|
        async with self._borrow_sender_lock:
 | 
						|
            for state, sender in self._borrowed_senders.values():
 | 
						|
                # Note that we're not checking for `state.should_disconnect()`.
 | 
						|
                # If the user wants to disconnect the client, ALL connections
 | 
						|
                # to Telegram (including exported senders) should be closed.
 | 
						|
                #
 | 
						|
                # Disconnect should never raise, so there's no try/except.
 | 
						|
                await sender.disconnect()
 | 
						|
                # Can't use `mark_disconnected` because it may be borrowed.
 | 
						|
                state._connected = False
 | 
						|
 | 
						|
            # If any was borrowed
 | 
						|
            self._borrowed_senders.clear()
 | 
						|
 | 
						|
        # trio's nurseries would handle this for us, but this is asyncio.
 | 
						|
        # All tasks spawned in the background should properly be terminated.
 | 
						|
        if self._event_handler_tasks:
 | 
						|
            for task in self._event_handler_tasks:
 | 
						|
                task.cancel()
 | 
						|
 | 
						|
            await asyncio.wait(self._event_handler_tasks)
 | 
						|
            self._event_handler_tasks.clear()
 | 
						|
 | 
						|
        entities = self._mb_entity_cache.get_all_entities()
 | 
						|
 | 
						|
        # Piggy-back on an arbitrary TL type with users and chats so the session can understand to read the entities.
 | 
						|
        # It doesn't matter if we put users in the list of chats.
 | 
						|
        await self.session.process_entities(types.contacts.ResolvedPeer(None, [e._as_input_peer() for e in entities], []))
 | 
						|
 | 
						|
        ss, cs = self._message_box.session_state()
 | 
						|
        await self.session.set_update_state(0, types.updates.State(**ss, unread_count=0))
 | 
						|
        now = datetime.datetime.now()  # any datetime works; channels don't need it
 | 
						|
        for channel_id, pts in cs.items():
 | 
						|
            await self.session.set_update_state(channel_id, types.updates.State(pts, 0, now, 0, unread_count=0))
 | 
						|
 | 
						|
        await self.session.close()
 | 
						|
 | 
						|
    async def _disconnect(self: 'TelegramClient'):
 | 
						|
        """
 | 
						|
        Disconnect only, without closing the session. Used in reconnections
 | 
						|
        to different data centers, where we don't want to close the session
 | 
						|
        file; user disconnects however should close it since it means that
 | 
						|
        their job with the client is complete and we should clean it up all.
 | 
						|
        """
 | 
						|
        await self._sender.disconnect()
 | 
						|
        await helpers._cancel(self._log[__name__],
 | 
						|
                              updates_handle=self._updates_handle,
 | 
						|
                              keepalive_handle=self._keepalive_handle)
 | 
						|
 | 
						|
    async def _switch_dc(self: 'TelegramClient', new_dc):
 | 
						|
        """
 | 
						|
        Permanently switches the current connection to the new data center.
 | 
						|
        """
 | 
						|
        self._log[__name__].info('Reconnecting to new data center %s', new_dc)
 | 
						|
        dc = await self._get_dc(new_dc)
 | 
						|
 | 
						|
        self.session.set_dc(dc.id, dc.ip_address, dc.port)
 | 
						|
        # auth_key's are associated with a server, which has now changed
 | 
						|
        # so it's not valid anymore. Set to None to force recreating it.
 | 
						|
        self._sender.auth_key.key = None
 | 
						|
        self.session.auth_key = None
 | 
						|
        await self.session.save()
 | 
						|
        await self._disconnect()
 | 
						|
        return await self.connect()
 | 
						|
 | 
						|
    async def _auth_key_callback(self: 'TelegramClient', auth_key):
 | 
						|
        """
 | 
						|
        Callback from the sender whenever it needed to generate a
 | 
						|
        new authorization key. This means we are not authorized.
 | 
						|
        """
 | 
						|
        self.session.auth_key = auth_key
 | 
						|
        await self.session.save()
 | 
						|
 | 
						|
    # endregion
 | 
						|
 | 
						|
    # region Working with different connections/Data Centers
 | 
						|
 | 
						|
    async def _get_dc(self: 'TelegramClient', dc_id, cdn=False):
 | 
						|
        """Gets the Data Center (DC) associated to 'dc_id'"""
 | 
						|
        cls = self.__class__
 | 
						|
        if not cls._config:
 | 
						|
            cls._config = await self(functions.help.GetConfigRequest())
 | 
						|
 | 
						|
        if cdn and not self._cdn_config:
 | 
						|
            cls._cdn_config = await self(functions.help.GetCdnConfigRequest())
 | 
						|
            for pk in cls._cdn_config.public_keys:
 | 
						|
                rsa.add_key(pk.public_key)
 | 
						|
 | 
						|
        try:
 | 
						|
            return next(
 | 
						|
                dc for dc in cls._config.dc_options
 | 
						|
                if dc.id == dc_id
 | 
						|
                and bool(dc.ipv6) == self._use_ipv6 and bool(dc.cdn) == cdn
 | 
						|
            )
 | 
						|
        except StopIteration:
 | 
						|
            self._log[__name__].warning(
 | 
						|
                'Failed to get DC %s (cdn = %s) with use_ipv6 = %s; retrying ignoring IPv6 check',
 | 
						|
                dc_id, cdn, self._use_ipv6
 | 
						|
            )
 | 
						|
            return next(
 | 
						|
                dc for dc in cls._config.dc_options
 | 
						|
                if dc.id == dc_id and bool(dc.cdn) == cdn
 | 
						|
            )
 | 
						|
 | 
						|
    async def _create_exported_sender(self: 'TelegramClient', dc_id):
 | 
						|
        """
 | 
						|
        Creates a new exported `MTProtoSender` for the given `dc_id` and
 | 
						|
        returns it. This method should be used by `_borrow_exported_sender`.
 | 
						|
        """
 | 
						|
        # Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
 | 
						|
        # for clearly showing how to export the authorization
 | 
						|
        dc = await self._get_dc(dc_id)
 | 
						|
        # Can't reuse self._sender._connection as it has its own seqno.
 | 
						|
        #
 | 
						|
        # If one were to do that, Telegram would reset the connection
 | 
						|
        # with no further clues.
 | 
						|
        sender = MTProtoSender(None, loggers=self._log)
 | 
						|
        await sender.connect(self._connection(
 | 
						|
            dc.ip_address,
 | 
						|
            dc.port,
 | 
						|
            dc.id,
 | 
						|
            loggers=self._log,
 | 
						|
            proxy=self._proxy,
 | 
						|
            local_addr=self._local_addr
 | 
						|
        ))
 | 
						|
        self._log[__name__].info('Exporting auth for new borrowed sender in %s', dc)
 | 
						|
        auth = await self(functions.auth.ExportAuthorizationRequest(dc_id))
 | 
						|
        self._init_request.query = functions.auth.ImportAuthorizationRequest(id=auth.id, bytes=auth.bytes)
 | 
						|
        req = functions.InvokeWithLayerRequest(LAYER, self._init_request)
 | 
						|
        await sender.send(req)
 | 
						|
        return sender
 | 
						|
 | 
						|
    async def _borrow_exported_sender(self: 'TelegramClient', dc_id):
 | 
						|
        """
 | 
						|
        Borrows a connected `MTProtoSender` for the given `dc_id`.
 | 
						|
        If it's not cached, creates a new one if it doesn't exist yet,
 | 
						|
        and imports a freshly exported authorization key for it to be usable.
 | 
						|
 | 
						|
        Once its job is over it should be `_return_exported_sender`.
 | 
						|
        """
 | 
						|
        async with self._borrow_sender_lock:
 | 
						|
            self._log[__name__].debug('Borrowing sender for dc_id %d', dc_id)
 | 
						|
            state, sender = self._borrowed_senders.get(dc_id, (None, None))
 | 
						|
 | 
						|
            if state is None:
 | 
						|
                state = _ExportState()
 | 
						|
                sender = await self._create_exported_sender(dc_id)
 | 
						|
                sender.dc_id = dc_id
 | 
						|
                self._borrowed_senders[dc_id] = (state, sender)
 | 
						|
 | 
						|
            elif state.need_connect():
 | 
						|
                dc = await self._get_dc(dc_id)
 | 
						|
                await sender.connect(self._connection(
 | 
						|
                    dc.ip_address,
 | 
						|
                    dc.port,
 | 
						|
                    dc.id,
 | 
						|
                    loggers=self._log,
 | 
						|
                    proxy=self._proxy,
 | 
						|
                    local_addr=self._local_addr
 | 
						|
                ))
 | 
						|
 | 
						|
            state.add_borrow()
 | 
						|
            return sender
 | 
						|
 | 
						|
    async def _return_exported_sender(self: 'TelegramClient', sender):
 | 
						|
        """
 | 
						|
        Returns a borrowed exported sender. If all borrows have
 | 
						|
        been returned, the sender is cleanly disconnected.
 | 
						|
        """
 | 
						|
        async with self._borrow_sender_lock:
 | 
						|
            self._log[__name__].debug('Returning borrowed sender for dc_id %d', sender.dc_id)
 | 
						|
            state, _ = self._borrowed_senders[sender.dc_id]
 | 
						|
            state.add_return()
 | 
						|
 | 
						|
    async def _clean_exported_senders(self: 'TelegramClient'):
 | 
						|
        """
 | 
						|
        Cleans-up all unused exported senders by disconnecting them.
 | 
						|
        """
 | 
						|
        async with self._borrow_sender_lock:
 | 
						|
            for dc_id, (state, sender) in self._borrowed_senders.items():
 | 
						|
                if state.should_disconnect():
 | 
						|
                    self._log[__name__].info(
 | 
						|
                        'Disconnecting borrowed sender for DC %d', dc_id)
 | 
						|
 | 
						|
                    # Disconnect should never raise
 | 
						|
                    await sender.disconnect()
 | 
						|
                    state.mark_disconnected()
 | 
						|
 | 
						|
    async def _get_cdn_client(self: 'TelegramClient', cdn_redirect):
 | 
						|
        """Similar to ._borrow_exported_client, but for CDNs"""
 | 
						|
        # TODO Implement
 | 
						|
        raise NotImplementedError
 | 
						|
        session = self._exported_sessions.get(cdn_redirect.dc_id)
 | 
						|
        if not session:
 | 
						|
            dc = await self._get_dc(cdn_redirect.dc_id, cdn=True)
 | 
						|
            session = self.session.clone()
 | 
						|
            session.set_dc(dc.id, dc.ip_address, dc.port)
 | 
						|
            self._exported_sessions[cdn_redirect.dc_id] = session
 | 
						|
 | 
						|
        self._log[__name__].info('Creating new CDN client')
 | 
						|
        client = TelegramBaseClient(
 | 
						|
            session, self.api_id, self.api_hash,
 | 
						|
            proxy=self._sender.connection.conn.proxy,
 | 
						|
            timeout=self._sender.connection.get_timeout()
 | 
						|
        )
 | 
						|
 | 
						|
        # This will make use of the new RSA keys for this specific CDN.
 | 
						|
        #
 | 
						|
        # We won't be calling GetConfigRequest because it's only called
 | 
						|
        # when needed by ._get_dc, and also it's static so it's likely
 | 
						|
        # set already. Avoid invoking non-CDN methods by not syncing updates.
 | 
						|
        client.connect(_sync_updates=False)
 | 
						|
        return client
 | 
						|
 | 
						|
    # endregion
 | 
						|
 | 
						|
    # region Invoking Telegram requests
 | 
						|
 | 
						|
    @abc.abstractmethod
 | 
						|
    def __call__(self: 'TelegramClient', request, ordered=False):
 | 
						|
        """
 | 
						|
        Invokes (sends) one or more MTProtoRequests and returns (receives)
 | 
						|
        their result.
 | 
						|
 | 
						|
        Args:
 | 
						|
            request (`TLObject` | `list`):
 | 
						|
                The request or requests to be invoked.
 | 
						|
 | 
						|
            ordered (`bool`, optional):
 | 
						|
                Whether the requests (if more than one was given) should be
 | 
						|
                executed sequentially on the server. They run in arbitrary
 | 
						|
                order by default.
 | 
						|
 | 
						|
            flood_sleep_threshold (`int` | `None`, optional):
 | 
						|
                The flood sleep threshold to use for this request. This overrides
 | 
						|
                the default value stored in
 | 
						|
                `client.flood_sleep_threshold <telethon.client.telegrambaseclient.TelegramBaseClient.flood_sleep_threshold>`
 | 
						|
 | 
						|
        Returns:
 | 
						|
            The result of the request (often a `TLObject`) or a list of
 | 
						|
            results if more than one request was given.
 | 
						|
        """
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    @abc.abstractmethod
 | 
						|
    def _update_loop(self: 'TelegramClient'):
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    @abc.abstractmethod
 | 
						|
    async def _handle_auto_reconnect(self: 'TelegramClient'):
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    # endregion
 |