From 5daad2aaab10ba2b56f9f248d83fe74509bef935 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 28 Sep 2018 17:51:28 +0200 Subject: [PATCH] Actually use the new connection class --- telethon/client/telegrambaseclient.py | 19 +++++++++---------- telethon/network/mtprotosender.py | 26 ++++++++++++-------------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index 2a36d6ce..aad04eaa 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -1,6 +1,5 @@ import abc import asyncio -import collections import inspect import logging import platform @@ -54,7 +53,7 @@ class TelegramBaseClient(abc.ABC): connection (`telethon.network.connection.common.Connection`, optional): The connection instance to be used when creating a new connection - to the servers. If it's a type, the `proxy` argument will be used. + to the servers. It **must** be a type. Defaults to `telethon.network.connection.tcpfull.ConnectionTcpFull`. @@ -206,9 +205,8 @@ class TelegramBaseClient(abc.ABC): self._connection_retries = connection_retries or sys.maxsize self._auto_reconnect = auto_reconnect - if isinstance(connection, type): - connection = connection( - proxy=proxy, timeout=timeout, loop=self._loop) + assert isinstance(connection, type) + self._connection = connection # Used on connection. Capture the variables in a lambda since # exporting clients need to create this InvokeWithLayerRequest. @@ -229,7 +227,7 @@ class TelegramBaseClient(abc.ABC): state = MTProtoState(self.session.auth_key) self._connection = connection self._sender = MTProtoSender( - state, connection, self._loop, + state, self._loop, retries=self._connection_retries, auto_reconnect=self._auto_reconnect, update_callback=self._handle_update, @@ -308,8 +306,8 @@ class TelegramBaseClient(abc.ABC): """ Connects to Telegram. """ - await self._sender.connect( - self.session.server_address, self.session.port) + await self._sender.connect(self._connection( + self.session.server_address, self.session.port, loop=self._loop)) await self._sender.send(self._init_with( functions.help.GetConfigRequest())) @@ -420,8 +418,9 @@ class TelegramBaseClient(abc.ABC): # # If one were to do that, Telegram would reset the connection # with no further clues. - sender = MTProtoSender(state, self._connection.clone(), self._loop) - await sender.connect(dc.ip_address, dc.port) + sender = MTProtoSender(state, self._loop) + await sender.connect(self._connection( + dc.ip_address, dc.port, loop=self._loop)) __log__.info('Exporting authorization for data center %s', dc) auth = await self(functions.auth.ExportAuthorizationRequest(dc_id)) req = self._init_with(functions.auth.ImportAuthorizationRequest( diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 15f459d9..2b6fdc56 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -40,14 +40,12 @@ class MTProtoSender: A new authorization key will be generated on connection if no other key exists yet. """ - def __init__(self, state, connection, loop, *, + def __init__(self, state, loop, *, retries=5, auto_reconnect=True, update_callback=None, auth_key_callback=None, auto_reconnect_callback=None): self.state = state - self._connection = connection + self._connection = None self._loop = loop - self._ip = None - self._port = None self._retries = retries self._auto_reconnect = auto_reconnect self._update_callback = update_callback @@ -110,7 +108,7 @@ class MTProtoSender: # Public API - async def connect(self, ip, port): + async def connect(self, connection): """ Connects to the specified ``ip:port``, and generates a new authorization key for the `MTProtoSender.session` if it does @@ -120,8 +118,7 @@ class MTProtoSender: __log__.info('User is already connected!') return - self._ip = ip - self._port = port + self._connection = connection self._user_connected = True await self._connect() @@ -140,11 +137,11 @@ class MTProtoSender: await self._disconnect() async def _disconnect(self, error=None): - __log__.info('Disconnecting from {}...'.format(self._ip)) + __log__.info('Disconnecting from %s...', self._connection._ip) self._user_connected = False try: __log__.debug('Closing current connection...') - await self._connection.close() + self._connection.disconnect() finally: __log__.debug('Cancelling {} pending message(s)...' .format(len(self._pending_messages))) @@ -166,7 +163,7 @@ class MTProtoSender: __log__.debug('Cancelling the receive loop...') self._recv_loop_handle.cancel() - __log__.info('Disconnection from {} complete!'.format(self._ip)) + __log__.info('Disconnection from %s complete!', self._connection._ip) if self._disconnected and not self._disconnected.done(): if error: self._disconnected.set_exception(error) @@ -238,11 +235,12 @@ class MTProtoSender: authorization key if necessary, and starting the send and receive loops. """ - __log__.info('Connecting to {}:{}...'.format(self._ip, self._port)) + __log__.info('Connecting to %s:%d...', + self._connection._ip, self._connection._port) for retry in range(1, self._retries + 1): try: __log__.debug('Connection attempt {}...'.format(retry)) - await self._connection.connect(self._ip, self._port) + await self._connection.connect() except (asyncio.TimeoutError, OSError) as e: __log__.warning('Attempt {} at connecting failed: {}: {}' .format(retry, type(e).__name__, e)) @@ -283,7 +281,7 @@ class MTProtoSender: # First connection or manual reconnection after a failure if self._disconnected is None or self._disconnected.done(): self._disconnected = self._loop.create_future() - __log__.info('Connection to {} complete!'.format(self._ip)) + __log__.info('Connection to %s complete!', self._connection._ip) async def _reconnect(self): """ @@ -299,7 +297,7 @@ class MTProtoSender: await self._recv_loop_handle __log__.debug('Closing current connection...') - await self._connection.close() + self._connection.disconnect() self._reconnecting = False