From c2966297f1a50e2f5d2421e8f6d1a15326ceb625 Mon Sep 17 00:00:00 2001 From: nailerNAS <35928815+nailerNAS@users.noreply.github.com> Date: Fri, 28 Sep 2018 16:47:24 +0300 Subject: [PATCH 01/35] Add delete method to custom.Dialog (#1014) --- telethon/tl/custom/dialog.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/telethon/tl/custom/dialog.py b/telethon/tl/custom/dialog.py index c761141f..6aaf1613 100644 --- a/telethon/tl/custom/dialog.py +++ b/telethon/tl/custom/dialog.py @@ -1,5 +1,5 @@ from . import Draft -from .. import TLObject, types +from .. import TLObject, types, functions from ... import utils @@ -96,6 +96,17 @@ class Dialog: return await self._client.send_message( self.input_entity, *args, **kwargs) + async def delete(self): + if self.is_channel: + await self._client(functions.channels.LeaveChannelRequest( + self.input_entity)) + else: + if self.is_group: + await self._client(functions.messages.DeleteChatUserRequest( + self.entity.id, types.InputPeerSelf())) + await self._client(functions.messages.DeleteHistoryRequest( + self.input_entity, 0)) + def to_dict(self): return { '_': 'Dialog', From 2468b32fc58f80362e5b153f90b196e8b558ce57 Mon Sep 17 00:00:00 2001 From: Manuel1510 Date: Thu, 4 Oct 2018 09:12:12 +0200 Subject: [PATCH 02/35] Implement next_offset and allow empty results in answer() (#1017) --- telethon/events/inlinequery.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/telethon/events/inlinequery.py b/telethon/events/inlinequery.py index 75e991a6..4e3c6c1f 100644 --- a/telethon/events/inlinequery.py +++ b/telethon/events/inlinequery.py @@ -124,7 +124,7 @@ class InlineQuery(EventBuilder): async def answer( self, results=None, cache_time=0, *, - gallery=False, private=False, + gallery=False, next_offset=None, private=False, switch_pm=None, switch_pm_param=''): """ Answers the inline query with the given results. @@ -147,6 +147,10 @@ class InlineQuery(EventBuilder): gallery (`bool`, optional): Whether the results should show as a gallery (grid) or not. + + next_offset (`str`, optional): + The offset the client will send when the user scrolls the + results and it repeats the request. private (`bool`, optional): Whether the results should be cached by Telegram @@ -163,11 +167,14 @@ class InlineQuery(EventBuilder): if self._answered: return - results = [self._as_awaitable(x, self._client.loop) - for x in results] + if results: + results = [self._as_awaitable(x, self._client.loop) + for x in results] - done, _ = await asyncio.wait(results, loop=self._client.loop) - results = [x.result() for x in done] + done, _ = await asyncio.wait(results, loop=self._client.loop) + results = [x.result() for x in done] + else: + results = [] if switch_pm: switch_pm = types.InlineBotSwitchPM(switch_pm, switch_pm_param) @@ -178,6 +185,7 @@ class InlineQuery(EventBuilder): results=results, cache_time=cache_time, gallery=gallery, + next_offset=next_offset, private=private, switch_pm=switch_pm ) From 340f5614b562973ffdee2e71c46a2c1fb565bfbb Mon Sep 17 00:00:00 2001 From: painor Date: Thu, 4 Oct 2018 14:56:32 +0100 Subject: [PATCH 03/35] Add name mention formatting to HTML and Markdown (#1019) --- telethon/extensions/html.py | 7 +++++-- telethon/extensions/markdown.py | 22 ++++++++++++++-------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/telethon/extensions/html.py b/telethon/extensions/html.py index d4233473..8a397336 100644 --- a/telethon/extensions/html.py +++ b/telethon/extensions/html.py @@ -9,8 +9,8 @@ from html.parser import HTMLParser from ..tl.types import ( MessageEntityBold, MessageEntityItalic, MessageEntityCode, MessageEntityPre, MessageEntityEmail, MessageEntityUrl, - MessageEntityTextUrl -) + MessageEntityTextUrl, MessageEntityMentionName + ) # Helpers from markdown.py @@ -178,6 +178,9 @@ def unparse(text, entities): elif entity_type == MessageEntityTextUrl: html.append('{}' .format(escape(entity.url), entity_text)) + elif entity_type == MessageEntityMentionName: + html.append('{}' + .format(entity.user_id, entity_text)) else: skip_entity = True last_offset = entity.offset + (0 if skip_entity else entity.length) diff --git a/telethon/extensions/markdown.py b/telethon/extensions/markdown.py index 5274dc85..be0ea507 100644 --- a/telethon/extensions/markdown.py +++ b/telethon/extensions/markdown.py @@ -9,8 +9,8 @@ from ..helpers import add_surrogate, del_surrogate from ..tl import TLObject from ..tl.types import ( MessageEntityBold, MessageEntityItalic, MessageEntityCode, - MessageEntityPre, MessageEntityTextUrl -) + MessageEntityPre, MessageEntityTextUrl, MessageEntityMentionName + ) DEFAULT_DELIMITERS = { '**': MessageEntityBold, @@ -161,11 +161,17 @@ def unparse(text, entities, delimiters=None, url_fmt=None): delimiter = delimiters.get(type(entity), None) if delimiter: text = text[:s] + delimiter + text[s:e] + delimiter + text[e:] - elif isinstance(entity, MessageEntityTextUrl) and url_fmt: - text = ( - text[:s] + - add_surrogate(url_fmt.format(text[s:e], entity.url)) + - text[e:] - ) + elif url_fmt: + url = None + if isinstance(entity, MessageEntityTextUrl): + url = entity.url + elif isinstance(entity, MessageEntityMentionName): + url = 'tg://user?id={}'.format(entity.user_id) + if url: + text = ( + text[:s] + + add_surrogate(url_fmt.format(text[s:e], url)) + + text[e:] + ) return del_surrogate(text) From 096424ea7809e5512a932c79eff6676695c1d27e Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 27 Sep 2018 18:45:20 +0200 Subject: [PATCH 04/35] Create a new Connection class to work through queues --- telethon/network/connection/connection.py | 96 +++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 telethon/network/connection/connection.py diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py new file mode 100644 index 00000000..60ba2f89 --- /dev/null +++ b/telethon/network/connection/connection.py @@ -0,0 +1,96 @@ +import abc +import asyncio + + +class Connection(abc.ABC): + """ + The `Connection` class is a wrapper around ``asyncio.open_connection``. + + Subclasses are meant to communicate with this class through a queue. + + This class provides a reliable interface that will stay connected + under any conditions for as long as the user doesn't disconnect or + the input parameters to auto-reconnect dictate otherwise. + """ + # TODO Support proxy. Support timeout? + def __init__(self, ip, port, *, loop): + self._ip = ip + self._port = port + self._loop = loop + self._reader = None + self._writer = None + self._disconnected = asyncio.Event(loop=loop) + self._disconnected.set() + self._send_task = None + self._recv_task = None + self._send_queue = asyncio.Queue(1) + self._recv_queue = asyncio.Queue(1) + + async def connect(self): + """ + Establishes a connection with the server. + """ + self._reader, self._writer = await asyncio.open_connection( + self._ip, self._port, loop=self._loop) + + self._disconnected.clear() + self._send_task = self._loop.create_task(self._send_loop()) + self._recv_task = self._loop.create_task(self._send_loop()) + + def disconnect(self): + """ + Disconnects from the server. + """ + self._disconnected.set() + self._writer.close() + + def send(self, data): + """ + Sends a packet of data through this connection mode. + + This method returns a coroutine. + """ + return self._send_queue.put(data) + + def recv(self): + """ + Receives a packet of data through this connection mode. + + This method returns a coroutine. + """ + return self._recv_queue.get() + + # TODO Get/put to the queue with cancellation + async def _send_loop(self): + """ + This loop is constantly popping items off the queue to send them. + """ + while not self._disconnected.is_set(): + self._send(await self._send_queue.get()) + await self._writer.drain() + + async def _recv_loop(self): + """ + This loop is constantly putting items on the queue as they're read. + """ + while not self._disconnected.is_set(): + data = await self._recv() + await self._recv_queue.put(data) + + @abc.abstractmethod + def _send(self, data): + """ + This method should be implemented differently under each + connection mode and serialize the data into the packet + the way it should be sent through `self._writer`. + """ + raise NotImplementedError + + @abc.abstractmethod + async def _recv(self): + """ + This method should be implemented differently under each + connection mode and deserialize the data from the packet + the way it should be read from `self._reader`. + """ + raise NotImplementedError From 2fd51b858252c568057d2b64ae29346bb38096f4 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 27 Sep 2018 19:22:35 +0200 Subject: [PATCH 05/35] Let all connection modes implement the new Connection --- telethon/network/connection/common.py | 74 ------------------ telethon/network/connection/connection.py | 1 + telethon/network/connection/http.py | 76 +++++++------------ telethon/network/connection/tcpabridged.py | 40 ++++++---- telethon/network/connection/tcpfull.py | 55 ++++---------- .../network/connection/tcpintermediate.py | 20 ++--- telethon/network/connection/tcpobfuscated.py | 31 ++++---- 7 files changed, 95 insertions(+), 202 deletions(-) delete mode 100644 telethon/network/connection/common.py diff --git a/telethon/network/connection/common.py b/telethon/network/connection/common.py deleted file mode 100644 index a57c248e..00000000 --- a/telethon/network/connection/common.py +++ /dev/null @@ -1,74 +0,0 @@ -""" -This module holds the abstract `Connection` class. - -The `Connection.send` and `Connection.recv` methods need **not** to be -safe across several tasks and may use any amount of ``await`` keywords. - -The code using these `Connection`'s should be responsible for using -an ``async with asyncio.Lock:`` block when calling said methods. - -Said subclasses need not to worry about reconnecting either, and -should let the errors propagate instead. -""" -import abc - - -class Connection(abc.ABC): - """ - Represents an abstract connection for Telegram. - - Subclasses should implement the actual protocol - being used when encoding/decoding messages. - """ - def __init__(self, *, loop, timeout, proxy=None): - """ - Initializes a new connection. - - :param loop: the event loop to be used. - :param timeout: timeout to be used for all operations. - :param proxy: whether to use a proxy or not. - """ - self._loop = loop - self._proxy = proxy - self._timeout = timeout - - @abc.abstractmethod - async def connect(self, ip, port): - raise NotImplementedError - - @abc.abstractmethod - def get_timeout(self): - """Returns the timeout used by the connection.""" - raise NotImplementedError - - @abc.abstractmethod - def is_connected(self): - """ - Determines whether the connection is alive or not. - - :return: true if it's connected. - """ - raise NotImplementedError - - @abc.abstractmethod - async def close(self): - """Closes the connection.""" - raise NotImplementedError - - def clone(self): - """Creates a copy of this Connection.""" - return self.__class__( - loop=self._loop, - proxy=self._proxy, - timeout=self._timeout - ) - - @abc.abstractmethod - async def recv(self): - """Receives and unpacks a message""" - raise NotImplementedError - - @abc.abstractmethod - async def send(self, message): - """Encapsulates and sends the given message""" - raise NotImplementedError diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 60ba2f89..5d96fc5e 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -69,6 +69,7 @@ class Connection(abc.ABC): self._send(await self._send_queue.get()) await self._writer.drain() + # TODO Handle IncompleteReadError and InvalidChecksumError async def _recv_loop(self): """ This loop is constantly putting items on the queue as they're read. diff --git a/telethon/network/connection/http.py b/telethon/network/connection/http.py index 955b9ab3..c346d2f8 100644 --- a/telethon/network/connection/http.py +++ b/telethon/network/connection/http.py @@ -1,62 +1,38 @@ -import errno -import ssl +import asyncio -from .common import Connection -from ...extensions import TcpClient +from .connection import Connection class ConnectionHttp(Connection): - def __init__(self, *, loop, timeout, proxy=None): - super().__init__(loop=loop, timeout=timeout, proxy=proxy) - self.conn = TcpClient( - timeout=self._timeout, loop=self._loop, proxy=self._proxy, - ssl=dict(ssl_version=ssl.PROTOCOL_SSLv23, ciphers='ADH-AES256-SHA') - ) - self.read = self.conn.read - self.write = self.conn.write - self._host = None + async def connect(self): + # TODO Test if the ssl part works or it needs to be as before: + # dict(ssl_version=ssl.PROTOCOL_SSLv23, ciphers='ADH-AES256-SHA') + self._reader, self._writer = await asyncio.open_connection( + self._ip, self._port, loop=self._loop, ssl=True) - async def connect(self, ip, port): - self._host = '{}:{}'.format(ip, port) - try: - await self.conn.connect(ip, port) - except OSError as e: - if e.errno == errno.EISCONN: - return # Already connected, no need to re-set everything up - else: - raise + self._disconnected.clear() + self._send_task = self._loop.create_task(self._send_loop()) + self._recv_task = self._loop.create_task(self._send_loop()) - def get_timeout(self): - return self.conn.timeout - - def is_connected(self): - return self.conn.is_connected - - async def close(self): - self.conn.close() - - async def recv(self): - while True: - line = await self._read_line() - if line.lower().startswith(b'content-length: '): - await self.read(2) - length = int(line[16:-2]) - return await self.read(length) - - async def _read_line(self): - newline = ord('\n') - line = await self.read(1) - while line[-1] != newline: - line += await self.read(1) - return line - - async def send(self, message): - await self.write( + def _send(self, message): + self._writer.write( 'POST /api HTTP/1.1\r\n' - 'Host: {}\r\n' + 'Host: {}:{}\r\n' 'Content-Type: application/x-www-form-urlencoded\r\n' 'Connection: keep-alive\r\n' 'Keep-Alive: timeout=100000, max=10000000\r\n' - 'Content-Length: {}\r\n\r\n'.format(self._host, len(message)) + 'Content-Length: {}\r\n\r\n' + .format(self._ip, self._port, len(message)) .encode('ascii') + message ) + + async def _recv(self): + while True: + line = await self._reader.readline() + if not line or line[-1] != b'\n': + raise asyncio.IncompleteReadError(line, None) + + if line.lower().startswith(b'content-length: '): + await self._reader.readexactly(2) + length = int(line[16:-2]) + return await self._reader.readexactly(length) diff --git a/telethon/network/connection/tcpabridged.py b/telethon/network/connection/tcpabridged.py index d5943908..6352413e 100644 --- a/telethon/network/connection/tcpabridged.py +++ b/telethon/network/connection/tcpabridged.py @@ -1,31 +1,43 @@ import struct -from .tcpfull import ConnectionTcpFull +from .connection import Connection -class ConnectionTcpAbridged(ConnectionTcpFull): +class ConnectionTcpAbridged(Connection): """ This is the mode with the lowest overhead, as it will only require 1 byte if the packet length is less than 508 bytes (127 << 2, which is very common). """ - async def connect(self, ip, port): - result = await super().connect(ip, port) - await self.conn.write(b'\xef') - return result + async def connect(self): + await super().connect() + await self.send(b'\xef') - async def recv(self): - length = struct.unpack('= 127: - length = struct.unpack('> 2 + def _send(self, data): + length = len(data) >> 2 if length < 127: length = struct.pack('B', length) else: length = b'\x7f' + int.to_bytes(length, 3, 'little') - await self.write(length + message) + self._write(length + data) + + async def _recv(self): + length = struct.unpack('= 127: + length = struct.unpack( + ' Date: Fri, 28 Sep 2018 17:45:45 +0200 Subject: [PATCH 06/35] Fix connection never receiving and missing clone method --- telethon/network/connection/connection.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 5d96fc5e..75cd5b37 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -35,15 +35,23 @@ class Connection(abc.ABC): self._disconnected.clear() self._send_task = self._loop.create_task(self._send_loop()) - self._recv_task = self._loop.create_task(self._send_loop()) + self._recv_task = self._loop.create_task(self._recv_loop()) def disconnect(self): """ Disconnects from the server. """ self._disconnected.set() + self._send_task.cancel() + self._recv_task.cancel() self._writer.close() + def clone(self): + """ + Creates a clone of the connection. + """ + return self.__class__(self._ip, self._port, loop=self._loop) + def send(self, data): """ Sends a packet of data through this connection mode. @@ -75,8 +83,13 @@ class Connection(abc.ABC): This loop is constantly putting items on the queue as they're read. """ while not self._disconnected.is_set(): - data = await self._recv() - await self._recv_queue.put(data) + try: + data = await self._recv() + except asyncio.IncompleteReadError: + if not self._disconnected.is_set(): + raise + else: + await self._recv_queue.put(data) @abc.abstractmethod def _send(self, data): From 5daad2aaab10ba2b56f9f248d83fe74509bef935 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 28 Sep 2018 17:51:28 +0200 Subject: [PATCH 07/35] Actually use the new connection class --- telethon/client/telegrambaseclient.py | 19 +++++++++---------- telethon/network/mtprotosender.py | 26 ++++++++++++-------------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index 2a36d6ce..aad04eaa 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -1,6 +1,5 @@ import abc import asyncio -import collections import inspect import logging import platform @@ -54,7 +53,7 @@ class TelegramBaseClient(abc.ABC): connection (`telethon.network.connection.common.Connection`, optional): The connection instance to be used when creating a new connection - to the servers. If it's a type, the `proxy` argument will be used. + to the servers. It **must** be a type. Defaults to `telethon.network.connection.tcpfull.ConnectionTcpFull`. @@ -206,9 +205,8 @@ class TelegramBaseClient(abc.ABC): self._connection_retries = connection_retries or sys.maxsize self._auto_reconnect = auto_reconnect - if isinstance(connection, type): - connection = connection( - proxy=proxy, timeout=timeout, loop=self._loop) + assert isinstance(connection, type) + self._connection = connection # Used on connection. Capture the variables in a lambda since # exporting clients need to create this InvokeWithLayerRequest. @@ -229,7 +227,7 @@ class TelegramBaseClient(abc.ABC): state = MTProtoState(self.session.auth_key) self._connection = connection self._sender = MTProtoSender( - state, connection, self._loop, + state, self._loop, retries=self._connection_retries, auto_reconnect=self._auto_reconnect, update_callback=self._handle_update, @@ -308,8 +306,8 @@ class TelegramBaseClient(abc.ABC): """ Connects to Telegram. """ - await self._sender.connect( - self.session.server_address, self.session.port) + await self._sender.connect(self._connection( + self.session.server_address, self.session.port, loop=self._loop)) await self._sender.send(self._init_with( functions.help.GetConfigRequest())) @@ -420,8 +418,9 @@ class TelegramBaseClient(abc.ABC): # # If one were to do that, Telegram would reset the connection # with no further clues. - sender = MTProtoSender(state, self._connection.clone(), self._loop) - await sender.connect(dc.ip_address, dc.port) + sender = MTProtoSender(state, self._loop) + await sender.connect(self._connection( + dc.ip_address, dc.port, loop=self._loop)) __log__.info('Exporting authorization for data center %s', dc) auth = await self(functions.auth.ExportAuthorizationRequest(dc_id)) req = self._init_with(functions.auth.ImportAuthorizationRequest( diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 15f459d9..2b6fdc56 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -40,14 +40,12 @@ class MTProtoSender: A new authorization key will be generated on connection if no other key exists yet. """ - def __init__(self, state, connection, loop, *, + def __init__(self, state, loop, *, retries=5, auto_reconnect=True, update_callback=None, auth_key_callback=None, auto_reconnect_callback=None): self.state = state - self._connection = connection + self._connection = None self._loop = loop - self._ip = None - self._port = None self._retries = retries self._auto_reconnect = auto_reconnect self._update_callback = update_callback @@ -110,7 +108,7 @@ class MTProtoSender: # Public API - async def connect(self, ip, port): + async def connect(self, connection): """ Connects to the specified ``ip:port``, and generates a new authorization key for the `MTProtoSender.session` if it does @@ -120,8 +118,7 @@ class MTProtoSender: __log__.info('User is already connected!') return - self._ip = ip - self._port = port + self._connection = connection self._user_connected = True await self._connect() @@ -140,11 +137,11 @@ class MTProtoSender: await self._disconnect() async def _disconnect(self, error=None): - __log__.info('Disconnecting from {}...'.format(self._ip)) + __log__.info('Disconnecting from %s...', self._connection._ip) self._user_connected = False try: __log__.debug('Closing current connection...') - await self._connection.close() + self._connection.disconnect() finally: __log__.debug('Cancelling {} pending message(s)...' .format(len(self._pending_messages))) @@ -166,7 +163,7 @@ class MTProtoSender: __log__.debug('Cancelling the receive loop...') self._recv_loop_handle.cancel() - __log__.info('Disconnection from {} complete!'.format(self._ip)) + __log__.info('Disconnection from %s complete!', self._connection._ip) if self._disconnected and not self._disconnected.done(): if error: self._disconnected.set_exception(error) @@ -238,11 +235,12 @@ class MTProtoSender: authorization key if necessary, and starting the send and receive loops. """ - __log__.info('Connecting to {}:{}...'.format(self._ip, self._port)) + __log__.info('Connecting to %s:%d...', + self._connection._ip, self._connection._port) for retry in range(1, self._retries + 1): try: __log__.debug('Connection attempt {}...'.format(retry)) - await self._connection.connect(self._ip, self._port) + await self._connection.connect() except (asyncio.TimeoutError, OSError) as e: __log__.warning('Attempt {} at connecting failed: {}: {}' .format(retry, type(e).__name__, e)) @@ -283,7 +281,7 @@ class MTProtoSender: # First connection or manual reconnection after a failure if self._disconnected is None or self._disconnected.done(): self._disconnected = self._loop.create_future() - __log__.info('Connection to {} complete!'.format(self._ip)) + __log__.info('Connection to %s complete!', self._connection._ip) async def _reconnect(self): """ @@ -299,7 +297,7 @@ class MTProtoSender: await self._recv_loop_handle __log__.debug('Closing current connection...') - await self._connection.close() + self._connection.disconnect() self._reconnecting = False From 9402b4a26da22f0646c004dbbd941f9377abf06e Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 29 Sep 2018 10:58:45 +0200 Subject: [PATCH 08/35] Create a new layer to lift encryption off the MTProtoSender --- telethon/client/telegrambaseclient.py | 6 +- telethon/network/connection/connection.py | 6 ++ telethon/network/mtprotolayer.py | 104 ++++++++++++++++++++++ telethon/network/mtprotosender.py | 22 ++--- telethon/network/mtprotostate.py | 50 ++++++----- telethon/tl/core/gzippacked.py | 6 +- telethon/tl/core/messagecontainer.py | 7 +- telethon/tl/core/tlmessage.py | 85 +++--------------- 8 files changed, 166 insertions(+), 120 deletions(-) create mode 100644 telethon/network/mtprotolayer.py diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index aad04eaa..5b82b625 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -224,10 +224,9 @@ class TelegramBaseClient(abc.ABC): ) ) - state = MTProtoState(self.session.auth_key) self._connection = connection self._sender = MTProtoSender( - state, self._loop, + self.session.auth_key, self._loop, retries=self._connection_retries, auto_reconnect=self._auto_reconnect, update_callback=self._handle_update, @@ -413,12 +412,11 @@ class TelegramBaseClient(abc.ABC): # Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt # for clearly showing how to export the authorization dc = await self._get_dc(dc_id) - state = MTProtoState(None) # Can't reuse self._sender._connection as it has its own seqno. # # If one were to do that, Telegram would reset the connection # with no further clues. - sender = MTProtoSender(state, self._loop) + sender = MTProtoSender(None, self._loop) await sender.connect(self._connection( dc.ip_address, dc.port, loop=self._loop)) __log__.info('Exporting authorization for data center %s', dc) diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 75cd5b37..be609a57 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -108,3 +108,9 @@ class Connection(abc.ABC): the way it should be read from `self._reader`. """ raise NotImplementedError + + def __str__(self): + return '{}:{}/{}'.format( + self._ip, self._port, + self.__class__.__name__.replace('Connection', '') + ) diff --git a/telethon/network/mtprotolayer.py b/telethon/network/mtprotolayer.py new file mode 100644 index 00000000..754df8b5 --- /dev/null +++ b/telethon/network/mtprotolayer.py @@ -0,0 +1,104 @@ +import io +import struct + +from .mtprotostate import MTProtoState +from ..tl.core.messagecontainer import MessageContainer + + +class MTProtoLayer: + """ + This class is the message encryption layer between the methods defined + in the schema and the response objects. It also holds the necessary state + necessary for this encryption to happen. + + The `connection` parameter is through which these messages will be sent + and received. + + The `auth_key` must be a valid authorization key which will be used to + encrypt these messages. This class is not responsible for generating them. + """ + def __init__(self, connection, auth_key): + self._connection = connection + self._state = MTProtoState(auth_key) + + def connect(self): + """ + Wrapper for ``self._connection.connect()``. + """ + return self._connection.connect() + + def disconnect(self): + """ + Wrapper for ``self._connection.disconnect()``. + """ + self._connection.disconnect() + + async def send(self, data_list): + """ + A list of serialized RPC queries as bytes must be given to be sent. + Nested lists imply an order is required for the messages in them. + Message containers will be used if there is more than one item. + + Returns ``(container_id, msg_ids)``. + """ + data, container_id, msg_ids = self._pack_data_list(data_list) + await self._connection.send(self._state.encrypt_message_data(data)) + return container_id, msg_ids + + async def recv(self): + """ + Reads a single message from the network, decrypts it and returns it. + """ + body = await self._connection.recv() + return self._state.decrypt_message_data(body) + + def _pack_data_list(self, data_list): + """ + A list of serialized RPC queries as bytes must be given to be packed. + Nested lists imply an order is required for the messages in them. + + Returns ``(data, container_id, msg_ids)``. + """ + # TODO write_data_as_message raises on invalid messages, handle it + # TODO This method could be an iterator yielding messages while small + # respecting the ``MessageContainer.MAXIMUM_SIZE`` limit. + # + # Note that the simplest case is writing a single query data into + # a message, and returning the message data and ID. For efficiency + # purposes this method supports more than one message and automatically + # uses containers if deemed necessary. + # + # Technically the message and message container classes could be used + # to store and serialize the data. However, to keep the context local + # and relevant to the only place where such feature is actually used, + # this is not done. + msg_ids = [] + buffer = io.BytesIO() + for data in data_list: + if not isinstance(data, list): + msg_ids.append(self._state.write_data_as_message(buffer, data)) + else: + last_id = None + for d in data: + last_id = self._state.write_data_as_message( + buffer, d, after_id=last_id) + msg_ids.append(last_id) + + if len(msg_ids) == 1: + container_id = None + else: + # Inlined code to pack several messages into a container + # + # TODO This part and encrypting data prepend a few bytes but + # force a potentially large payload to be appended, which + # may be expensive. Can we do better? + data = struct.pack( + ' 512: + # TODO Only content-related requests should be gzipped + if len(data) > 512: gzipped = bytes(GzipPacked(data)) return gzipped if len(gzipped) < len(data) else data else: diff --git a/telethon/tl/core/messagecontainer.py b/telethon/tl/core/messagecontainer.py index de304424..800a31f0 100644 --- a/telethon/tl/core/messagecontainer.py +++ b/telethon/tl/core/messagecontainer.py @@ -27,11 +27,6 @@ class MessageContainer(TLObject): ], } - def __bytes__(self): - return struct.pack( - ' Date: Sat, 29 Sep 2018 12:20:26 +0200 Subject: [PATCH 09/35] Make use of the MTProtoLayer in MTProtoSender --- telethon/network/mtprotolayer.py | 53 +++-- telethon/network/mtprotosender.py | 383 +++++++++--------------------- telethon/network/requeststate.py | 18 ++ 3 files changed, 158 insertions(+), 296 deletions(-) create mode 100644 telethon/network/requeststate.py diff --git a/telethon/network/mtprotolayer.py b/telethon/network/mtprotolayer.py index 754df8b5..8142a5e9 100644 --- a/telethon/network/mtprotolayer.py +++ b/telethon/network/mtprotolayer.py @@ -33,17 +33,16 @@ class MTProtoLayer: """ self._connection.disconnect() - async def send(self, data_list): + async def send(self, state_list): """ - A list of serialized RPC queries as bytes must be given to be sent. + The list of `RequestState` that will be sent. They will + be updated with their new message and container IDs. + Nested lists imply an order is required for the messages in them. Message containers will be used if there is more than one item. - - Returns ``(container_id, msg_ids)``. """ - data, container_id, msg_ids = self._pack_data_list(data_list) + data = self._pack_state_list(state_list) await self._connection.send(self._state.encrypt_message_data(data)) - return container_id, msg_ids async def recv(self): """ @@ -52,12 +51,14 @@ class MTProtoLayer: body = await self._connection.recv() return self._state.decrypt_message_data(body) - def _pack_data_list(self, data_list): + def _pack_state_list(self, state_list): """ - A list of serialized RPC queries as bytes must be given to be packed. - Nested lists imply an order is required for the messages in them. + The list of `RequestState` that will be sent. They will + be updated with their new message and container IDs. - Returns ``(data, container_id, msg_ids)``. + Packs all their serialized data into a message (possibly + nested inside another message and message container) and + returns the serialized message data. """ # TODO write_data_as_message raises on invalid messages, handle it # TODO This method could be an iterator yielding messages while small @@ -72,33 +73,39 @@ class MTProtoLayer: # to store and serialize the data. However, to keep the context local # and relevant to the only place where such feature is actually used, # this is not done. - msg_ids = [] + n = 0 buffer = io.BytesIO() - for data in data_list: - if not isinstance(data, list): - msg_ids.append(self._state.write_data_as_message(buffer, data)) + for state in state_list: + if not isinstance(state, list): + n += 1 + state.msg_id = \ + self._state.write_data_as_message(buffer, state.data) else: last_id = None - for d in data: - last_id = self._state.write_data_as_message( - buffer, d, after_id=last_id) - msg_ids.append(last_id) + for s in state: + n += 1 + last_id = s.msg_id = self._state.write_data_as_message( + buffer, s.data, after_id=last_id) - if len(msg_ids) == 1: - container_id = None - else: + if n > 1: # Inlined code to pack several messages into a container # # TODO This part and encrypting data prepend a few bytes but # force a potentially large payload to be appended, which # may be expensive. Can we do better? data = struct.pack( - ' MessageContainer.MAXIMUM_SIZE): - self.put_nowait(item) - break - else: - size += item.size() - result.append(item) - - return result diff --git a/telethon/network/requeststate.py b/telethon/network/requeststate.py new file mode 100644 index 00000000..018832af --- /dev/null +++ b/telethon/network/requeststate.py @@ -0,0 +1,18 @@ +import asyncio + + +class RequestState: + """ + This request state holds several information relevant to sent messages, + in particular the message ID assigned to the request, the container ID + it belongs to, the request itself, the request as bytes, and the future + result that will eventually be resolved. + """ + __slots__ = ('container_id', 'msg_id', 'request', 'data', 'future') + + def __init__(self, request, loop): + self.container_id = None + self.msg_id = None + self.request = request + self.data = bytes(request) + self.future = asyncio.Future(loop=loop) From b02ebcb69b21481d26706fe1455ea3971e351169 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 29 Sep 2018 12:48:50 +0200 Subject: [PATCH 10/35] Stop waiting for send items on disconnection --- telethon/network/mtprotosender.py | 61 ++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index b51fa122..604cd0fb 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -3,10 +3,10 @@ import collections import logging from .mtprotolayer import MTProtoLayer +from .requeststate import RequestState from .. import utils from ..errors import ( - BadMessageError, TypeNotFoundError, BrokenAuthKeyError, SecurityError, - rpc_message_to_error + BadMessageError, TypeNotFoundError, rpc_message_to_error ) from ..extensions import BinaryReader from ..tl.core import RpcResult, MessageContainer, GzipPacked @@ -16,7 +16,6 @@ from ..tl.types import ( MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo, MsgsStateReq, MsgsStateInfo, MsgsAllInfo, MsgResendReq, upload ) -from .requeststate import RequestState __log__ = logging.getLogger(__name__) @@ -65,7 +64,10 @@ class MTProtoSender: # Outgoing messages are put in a queue and sent in a batch. # Note that here we're also storing their ``_RequestState``. # Note that it may also store lists (implying order must be kept). - self._send_queue = asyncio.Queue() + # + # TODO Abstract this queue away? + self._send_queue = [] + self._send_ready = asyncio.Event(loop=self._loop) # Sent states are remembered until a response is received. self._pending_state = {} @@ -189,7 +191,8 @@ class MTProtoSender: if not utils.is_list_like(request): state = RequestState(request, self._loop) - self._send_queue.put_nowait(state) + self._send_queue.append(state) + self._send_ready.set() return state.future else: states = [] @@ -199,10 +202,11 @@ class MTProtoSender: states.append(state) futures.append(state.future) if ordered: - self._send_queue.put(states) + self._send_queue.append(states) else: - for state in states: - self._send_queue.put(state) + self._send_queue.extend(states) + + self._send_ready.set() return futures @property @@ -328,19 +332,29 @@ class MTProtoSender: while self._user_connected and not self._reconnecting: if self._pending_ack: ack = RequestState(MsgsAck(list(self._pending_ack)), self._loop) - self._send_queue.put_nowait(ack) + self._send_queue.append(ack) + self._send_ready.set() self._last_acks.append(ack) self._pending_ack.clear() - state_list = [] + queue = asyncio.ensure_future( + self._send_ready.wait(), loop=self._loop) - # TODO wait for the list to have one or for a disconnect to happen - # and pop while that's the case - state = await self._send_queue.get() - state_list.append(state) + disconnected = asyncio.ensure_future( + self._connection._connection._disconnected.wait()) - while not self._send_queue.empty(): - state_list.append(self._send_queue.get_nowait()) + # Basically using the disconnected as a cancellation token + done, pending = await asyncio.wait( + [queue, disconnected], + return_when=asyncio.FIRST_COMPLETED, + loop=self._loop + ) + if disconnected in done: + break + + state_list = self._send_queue + self._send_queue = [] + self._send_ready.clear() # TODO Debug logs to notify which messages are being sent # TODO Try sending them while no future was cancelled? @@ -409,8 +423,9 @@ class MTProtoSender: if rpc_result.error: error = rpc_message_to_error(rpc_result.error) - self._send_queue.put_nowait( + self._send_queue.append( RequestState(MsgsAck([state.msg_id]), loop=self._loop)) + self._send_ready.set() if not state.future.cancelled(): state.future.set_exception(error) @@ -477,12 +492,14 @@ class MTProtoSender: __log__.debug('Handling bad salt for message %d', bad_salt.bad_msg_id) self._connection._state.salt = bad_salt.new_server_salt try: - self._send_queue.put_nowait( + self._send_queue.append( self._pending_state.pop(bad_salt.bad_msg_id)) + self._send_ready.set() except KeyError: for ack in self._pending_ack: if ack.msg_id == bad_salt.bad_msg_id: - self._send_queue.put_nowait(ack) + self._send_queue.append(ack) + self._send_ready.set() return __log__.info('Message %d not resent due to bad salt', @@ -521,7 +538,8 @@ class MTProtoSender: # Messages are to be re-sent once we've corrected the issue if state: - self._send_queue.put_nowait(state) + self._send_queue.append(state) + self._send_ready.set() else: # TODO Generic method that may return from the acks too # May be MsgsAck, those are not saved in pending messages @@ -606,9 +624,10 @@ class MTProtoSender: Handles both :tl:`MsgsStateReq` and :tl:`MsgResendReq` by enqueuing a :tl:`MsgsStateInfo` to be sent at a later point. """ - self._send_queue.put_nowait(RequestState(MsgsStateInfo( + self._send_queue.append(RequestState(MsgsStateInfo( req_msg_id=message.msg_id, info=chr(1) * len(message.obj.msg_ids)), loop=self._loop)) + self._send_ready.set() async def _handle_msg_all(self, message): """ From 105bd52eeee66d843317264536185b37705e0138 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 29 Sep 2018 13:29:44 +0200 Subject: [PATCH 11/35] Abstract the send queue off MTProtoSender --- telethon/helpers.py | 39 +++++++++++++++++++++++ telethon/network/connection/connection.py | 9 ++++++ telethon/network/mtprotosender.py | 33 +++---------------- 3 files changed, 53 insertions(+), 28 deletions(-) diff --git a/telethon/helpers.py b/telethon/helpers.py index 1452aad2..e0cced62 100644 --- a/telethon/helpers.py +++ b/telethon/helpers.py @@ -1,4 +1,5 @@ """Various helpers not related to the Telegram API itself""" +import asyncio import collections import os import struct @@ -87,4 +88,42 @@ class TotalList(list): return '[{}, total={}]'.format( ', '.join(repr(x) for x in self), self.total) + +class _ReadyQueue: + """ + A queue list that supports an arbitrary cancellation token for `get`. + """ + def __init__(self, loop): + self._list = [] + self._loop = loop + self._ready = asyncio.Event(loop=loop) + + def append(self, item): + self._list.append(item) + self._ready.set() + + def extend(self, items): + self._list.extend(items) + self._ready.set() + + async def get(self, cancellation): + """ + Returns a list of all the items added to the queue until now and + clears the list from the queue itself. Returns ``None`` if cancelled. + """ + ready = asyncio.ensure_future(self._ready.wait(), loop=self._loop) + done, pending = await asyncio.wait( + [ready, cancellation], + return_when=asyncio.FIRST_COMPLETED, + loop=self._loop + ) + if cancellation in done: + ready.cancel() + return None + + result = self._list + self._list = [] + self._ready.clear() + return result + # endregion diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index be609a57..abe0fa60 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -21,6 +21,7 @@ class Connection(abc.ABC): self._writer = None self._disconnected = asyncio.Event(loop=loop) self._disconnected.set() + self._disconnected_future = None self._send_task = None self._recv_task = None self._send_queue = asyncio.Queue(1) @@ -34,6 +35,7 @@ class Connection(abc.ABC): self._ip, self._port, loop=self._loop) self._disconnected.clear() + self._disconnected_future = None self._send_task = self._loop.create_task(self._send_loop()) self._recv_task = self._loop.create_task(self._recv_loop()) @@ -46,6 +48,13 @@ class Connection(abc.ABC): self._recv_task.cancel() self._writer.close() + @property + def disconnected(self): + if not self._disconnected_future: + self._disconnected_future = asyncio.ensure_future( + self._disconnected.wait(), loop=self._loop) + return self._disconnected_future + def clone(self): """ Creates a clone of the connection. diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 604cd0fb..582945ec 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -9,6 +9,7 @@ from ..errors import ( BadMessageError, TypeNotFoundError, rpc_message_to_error ) from ..extensions import BinaryReader +from ..helpers import _ReadyQueue from ..tl.core import RpcResult, MessageContainer, GzipPacked from ..tl.functions.auth import LogOutRequest from ..tl.types import ( @@ -64,10 +65,7 @@ class MTProtoSender: # Outgoing messages are put in a queue and sent in a batch. # Note that here we're also storing their ``_RequestState``. # Note that it may also store lists (implying order must be kept). - # - # TODO Abstract this queue away? - self._send_queue = [] - self._send_ready = asyncio.Event(loop=self._loop) + self._send_queue = _ReadyQueue(self._loop) # Sent states are remembered until a response is received. self._pending_state = {} @@ -192,7 +190,6 @@ class MTProtoSender: if not utils.is_list_like(request): state = RequestState(request, self._loop) self._send_queue.append(state) - self._send_ready.set() return state.future else: states = [] @@ -206,7 +203,6 @@ class MTProtoSender: else: self._send_queue.extend(states) - self._send_ready.set() return futures @property @@ -333,29 +329,15 @@ class MTProtoSender: if self._pending_ack: ack = RequestState(MsgsAck(list(self._pending_ack)), self._loop) self._send_queue.append(ack) - self._send_ready.set() self._last_acks.append(ack) self._pending_ack.clear() - queue = asyncio.ensure_future( - self._send_ready.wait(), loop=self._loop) + state_list = await self._send_queue.get( + self._connection._connection.disconnected) - disconnected = asyncio.ensure_future( - self._connection._connection._disconnected.wait()) - - # Basically using the disconnected as a cancellation token - done, pending = await asyncio.wait( - [queue, disconnected], - return_when=asyncio.FIRST_COMPLETED, - loop=self._loop - ) - if disconnected in done: + if state_list is None: break - state_list = self._send_queue - self._send_queue = [] - self._send_ready.clear() - # TODO Debug logs to notify which messages are being sent # TODO Try sending them while no future was cancelled? # TODO Handle timeout, cancelled, arbitrary errors @@ -425,7 +407,6 @@ class MTProtoSender: error = rpc_message_to_error(rpc_result.error) self._send_queue.append( RequestState(MsgsAck([state.msg_id]), loop=self._loop)) - self._send_ready.set() if not state.future.cancelled(): state.future.set_exception(error) @@ -494,12 +475,10 @@ class MTProtoSender: try: self._send_queue.append( self._pending_state.pop(bad_salt.bad_msg_id)) - self._send_ready.set() except KeyError: for ack in self._pending_ack: if ack.msg_id == bad_salt.bad_msg_id: self._send_queue.append(ack) - self._send_ready.set() return __log__.info('Message %d not resent due to bad salt', @@ -539,7 +518,6 @@ class MTProtoSender: # Messages are to be re-sent once we've corrected the issue if state: self._send_queue.append(state) - self._send_ready.set() else: # TODO Generic method that may return from the acks too # May be MsgsAck, those are not saved in pending messages @@ -627,7 +605,6 @@ class MTProtoSender: self._send_queue.append(RequestState(MsgsStateInfo( req_msg_id=message.msg_id, info=chr(1) * len(message.obj.msg_ids)), loop=self._loop)) - self._send_ready.set() async def _handle_msg_all(self, message): """ From 2d275989cb372b66b7e0e2f20a7262585a623e90 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 29 Sep 2018 13:36:05 +0200 Subject: [PATCH 12/35] Properly handle cancellation in _ReadyQueue --- telethon/helpers.py | 16 ++++++++++------ telethon/network/connection/connection.py | 4 ++-- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/telethon/helpers.py b/telethon/helpers.py index e0cced62..861e56ea 100644 --- a/telethon/helpers.py +++ b/telethon/helpers.py @@ -111,12 +111,16 @@ class _ReadyQueue: Returns a list of all the items added to the queue until now and clears the list from the queue itself. Returns ``None`` if cancelled. """ - ready = asyncio.ensure_future(self._ready.wait(), loop=self._loop) - done, pending = await asyncio.wait( - [ready, cancellation], - return_when=asyncio.FIRST_COMPLETED, - loop=self._loop - ) + ready = self._loop.create_task(self._ready.wait()) + try: + done, pending = await asyncio.wait( + [ready, cancellation], + return_when=asyncio.FIRST_COMPLETED, + loop=self._loop + ) + except asyncio.CancelledError: + done = [cancellation] + if cancellation in done: ready.cancel() return None diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index abe0fa60..1dfd1698 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -51,8 +51,8 @@ class Connection(abc.ABC): @property def disconnected(self): if not self._disconnected_future: - self._disconnected_future = asyncio.ensure_future( - self._disconnected.wait(), loop=self._loop) + self._disconnected_future = \ + self._loop.create_task(self._disconnected.wait()) return self._disconnected_future def clone(self): From 5edc2216c7e6b858dd42db838e93aa6bf7960136 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 30 Sep 2018 11:58:46 +0200 Subject: [PATCH 13/35] Handle initial connection if network is down correctly --- telethon/network/connection/connection.py | 11 +++- telethon/network/mtprotosender.py | 79 +++++++++++------------ 2 files changed, 46 insertions(+), 44 deletions(-) diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 1dfd1698..2f3957f2 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -44,9 +44,14 @@ class Connection(abc.ABC): Disconnects from the server. """ self._disconnected.set() - self._send_task.cancel() - self._recv_task.cancel() - self._writer.close() + if self._send_task: + self._send_task.cancel() + + if self._recv_task: + self._recv_task.cancel() + + if self._writer: + self._writer.close() @property def disconnected(self): diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 582945ec..03da7f6b 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -126,41 +126,6 @@ class MTProtoSender: await self._disconnect() - # TODO Move this out of the "Public API" section - async def _disconnect(self, error=None): - __log__.info('Disconnecting from %s...', self._connection) - self._user_connected = False - try: - __log__.debug('Closing current connection...') - self._connection.disconnect() - finally: - __log__.debug('Cancelling {} pending message(s)...' - .format(len(self._pending_state))) - for state in self._pending_state.values(): - if error and not state.future.done(): - state.future.set_exception(error) - else: - state.future.cancel() - - self._pending_state.clear() - self._pending_ack.clear() - self._last_ack = None - - if self._send_loop_handle: - __log__.debug('Cancelling the send loop...') - self._send_loop_handle.cancel() - - if self._recv_loop_handle: - __log__.debug('Cancelling the receive loop...') - self._recv_loop_handle.cancel() - - __log__.info('Disconnection from %s complete!', self._connection) - if self._disconnected and not self._disconnected.done(): - if error: - self._disconnected.set_exception(error) - else: - self._disconnected.set_result(None) - def send(self, request, ordered=False): """ This method enqueues the given request to be sent. Its send @@ -224,12 +189,14 @@ class MTProtoSender: authorization key if necessary, and starting the send and receive loops. """ + # TODO With ``asyncio.open_connection``, no timeout occurs + # However, these are probably desirable in some circumstances. __log__.info('Connecting to %s...', self._connection) for retry in range(1, self._retries + 1): try: __log__.debug('Connection attempt {}...'.format(retry)) await self._connection.connect() - except (asyncio.TimeoutError, OSError) as e: + except OSError as e: __log__.warning('Attempt {} at connecting failed: {}: {}' .format(retry, type(e).__name__, e)) else: @@ -274,6 +241,40 @@ class MTProtoSender: self._disconnected = self._loop.create_future() __log__.info('Connection to %s complete!', self._connection) + async def _disconnect(self, error=None): + __log__.info('Disconnecting from %s...', self._connection) + self._user_connected = False + try: + __log__.debug('Closing current connection...') + self._connection.disconnect() + finally: + __log__.debug('Cancelling {} pending message(s)...' + .format(len(self._pending_state))) + for state in self._pending_state.values(): + if error and not state.future.done(): + state.future.set_exception(error) + else: + state.future.cancel() + + self._pending_state.clear() + self._pending_ack.clear() + self._last_ack = None + + if self._send_loop_handle: + __log__.debug('Cancelling the send loop...') + self._send_loop_handle.cancel() + + if self._recv_loop_handle: + __log__.debug('Cancelling the receive loop...') + self._recv_loop_handle.cancel() + + __log__.info('Disconnection from %s complete!', self._connection) + if self._disconnected and not self._disconnected.done(): + if error: + self._disconnected.set_exception(error) + else: + self._disconnected.set_result(None) + async def _reconnect(self): """ Cleanly disconnects and then reconnects. @@ -340,7 +341,7 @@ class MTProtoSender: # TODO Debug logs to notify which messages are being sent # TODO Try sending them while no future was cancelled? - # TODO Handle timeout, cancelled, arbitrary errors + # TODO Handle cancelled?, arbitrary errors await self._connection.send(state_list) for state in state_list: if not isinstance(state, list): @@ -411,10 +412,6 @@ class MTProtoSender: if not state.future.cancelled(): state.future.set_exception(error) else: - # TODO Would be nice to avoid accessing a per-obj read_result - # Instead have a variable that indicated how the result should - # be read (an enum) and dispatch to read the result, mostly - # always it's just a normal TLObject. with BinaryReader(rpc_result.body) as reader: result = state.request.read_result(reader) From 3b1142aaca63fd2b3ef5b7c19c40cfc5fe86fe80 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Mon, 1 Oct 2018 09:58:53 +0200 Subject: [PATCH 14/35] Add back auth key generation process --- telethon/network/mtprotoplainsender.py | 2 +- telethon/network/mtprotosender.py | 17 +++++++++-------- telethon/network/mtprotostate.py | 8 ++++++++ 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/telethon/network/mtprotoplainsender.py b/telethon/network/mtprotoplainsender.py index 942c5e02..b53ac24d 100644 --- a/telethon/network/mtprotoplainsender.py +++ b/telethon/network/mtprotoplainsender.py @@ -30,7 +30,7 @@ class MTProtoPlainSender: body = bytes(request) msg_id = self._state._get_new_msg_id() await self._connection.send( - struct.pack(' Date: Mon, 1 Oct 2018 13:49:30 +0200 Subject: [PATCH 15/35] Handle bad salt/msg not resending containers --- telethon/network/mtprotosender.py | 51 +++++++++++++++++++------------ 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 26c8a52b..26fd4e84 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -378,6 +378,30 @@ class MTProtoSender: self._handle_update) await handler(message) + def _pop_states(self, msg_id): + """ + Pops the states known to match the given ID from pending messages. + + This method should be used when the response isn't specific. + """ + state = self._pending_state.pop(msg_id, None) + if state: + return [state] + + to_pop = [] + for state in self._pending_state.values(): + if state.container_id == msg_id: + to_pop.append(state.msg_id) + + if to_pop: + return [self._pending_state.pop(x) for x in to_pop] + + for ack in self._last_acks: + if ack.msg_id == msg_id: + return [ack] + + return [] + async def _handle_rpc_result(self, message): """ Handles the result for Remote Procedure Calls: @@ -470,17 +494,10 @@ class MTProtoSender: bad_salt = message.obj __log__.debug('Handling bad salt for message %d', bad_salt.bad_msg_id) self._connection._state.salt = bad_salt.new_server_salt - try: - self._send_queue.append( - self._pending_state.pop(bad_salt.bad_msg_id)) - except KeyError: - for ack in self._pending_ack: - if ack.msg_id == bad_salt.bad_msg_id: - self._send_queue.append(ack) - return + states = self._pop_states(bad_salt.bad_msg_id) + self._send_queue.extend(states) - __log__.info('Message %d not resent due to bad salt', - bad_salt.bad_msg_id) + __log__.debug('%d message(s) will be resent', len(states)) async def _handle_bad_notification(self, message): """ @@ -491,8 +508,7 @@ class MTProtoSender: error_code:int = BadMsgNotification; """ bad_msg = message.obj - # TODO Pending state may need to pop by container ID - state = self._pending_state.pop(bad_msg.bad_msg_id, None) + states = self._pop_states(bad_msg.bad_msg_id) __log__.debug('Handling bad msg %s', bad_msg) if bad_msg.error_code in (16, 17): @@ -509,18 +525,13 @@ class MTProtoSender: # msg_seqno too high never seems to happen but just in case self._connection._state._sequence -= 16 else: - if state: + for state in states: state.future.set_exception(BadMessageError(bad_msg.error_code)) return # Messages are to be re-sent once we've corrected the issue - if state: - self._send_queue.append(state) - else: - # TODO Generic method that may return from the acks too - # May be MsgsAck, those are not saved in pending messages - __log__.info('Message %d not resent due to bad msg', - bad_msg.bad_msg_id) + self._send_queue.extend(states) + __log__.debug('%d messages will be resent due to bad msg', len(states)) async def _handle_detailed_info(self, message): """ From 21ffa2f26b1d0ffe9533431aeee93b7a06b03320 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Mon, 1 Oct 2018 14:02:23 +0200 Subject: [PATCH 16/35] Fix DC migration and seqno --- telethon/client/telegrambaseclient.py | 2 +- telethon/network/mtprotolayer.py | 12 ++++++++---- telethon/network/mtprotostate.py | 5 +++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index 5b82b625..89d59495 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -369,7 +369,7 @@ class TelegramBaseClient(abc.ABC): self.session.set_dc(dc.id, dc.ip_address, dc.port) # auth_key's are associated with a server, which has now changed # so it's not valid anymore. Set to None to force recreating it. - self.session.auth_key = self._sender.state.auth_key = None + self.session.auth_key = self._sender._connection._state.auth_key = None self.session.save() await self._disconnect() return await self.connect() diff --git a/telethon/network/mtprotolayer.py b/telethon/network/mtprotolayer.py index 8142a5e9..2960b28e 100644 --- a/telethon/network/mtprotolayer.py +++ b/telethon/network/mtprotolayer.py @@ -2,6 +2,7 @@ import io import struct from .mtprotostate import MTProtoState +from ..tl import TLRequest from ..tl.core.messagecontainer import MessageContainer @@ -78,14 +79,15 @@ class MTProtoLayer: for state in state_list: if not isinstance(state, list): n += 1 - state.msg_id = \ - self._state.write_data_as_message(buffer, state.data) + state.msg_id = self._state.write_data_as_message( + buffer, state.data, isinstance(state.request, TLRequest)) else: last_id = None for s in state: n += 1 last_id = s.msg_id = self._state.write_data_as_message( - buffer, s.data, after_id=last_id) + buffer, s.data, isinstance(s.request, TLRequest), + after_id=last_id) if n > 1: # Inlined code to pack several messages into a container @@ -97,7 +99,9 @@ class MTProtoLayer: ' Date: Mon, 1 Oct 2018 14:20:50 +0200 Subject: [PATCH 17/35] Set auth_key on connection --- telethon/client/telegrambaseclient.py | 11 +++++------ telethon/network/mtprotosender.py | 11 ++++------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index 89d59495..8ef6103a 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -11,7 +11,6 @@ from .. import version from ..crypto import rsa from ..extensions import markdown from ..network import MTProtoSender, ConnectionTcpFull -from ..network.mtprotostate import MTProtoState from ..sessions import Session, SQLiteSession, MemorySession from ..tl import TLObject, functions, types from ..tl.alltlobjects import LAYER @@ -226,7 +225,7 @@ class TelegramBaseClient(abc.ABC): self._connection = connection self._sender = MTProtoSender( - self.session.auth_key, self._loop, + self._loop, retries=self._connection_retries, auto_reconnect=self._auto_reconnect, update_callback=self._handle_update, @@ -305,7 +304,7 @@ class TelegramBaseClient(abc.ABC): """ Connects to Telegram. """ - await self._sender.connect(self._connection( + await self._sender.connect(self.session.auth_key, self._connection( self.session.server_address, self.session.port, loop=self._loop)) await self._sender.send(self._init_with( @@ -369,7 +368,7 @@ class TelegramBaseClient(abc.ABC): self.session.set_dc(dc.id, dc.ip_address, dc.port) # auth_key's are associated with a server, which has now changed # so it's not valid anymore. Set to None to force recreating it. - self.session.auth_key = self._sender._connection._state.auth_key = None + self.session.auth_key = None self.session.save() await self._disconnect() return await self.connect() @@ -416,8 +415,8 @@ class TelegramBaseClient(abc.ABC): # # If one were to do that, Telegram would reset the connection # with no further clues. - sender = MTProtoSender(None, self._loop) - await sender.connect(self._connection( + sender = MTProtoSender(self._loop) + await sender.connect(None, self._connection( dc.ip_address, dc.port, loop=self._loop)) __log__.info('Exporting authorization for data center %s', dc) auth = await self(functions.auth.ExportAuthorizationRequest(dc_id)) diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 26fd4e84..8196932c 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -38,10 +38,9 @@ class MTProtoSender: A new authorization key will be generated on connection if no other key exists yet. """ - def __init__(self, auth_key, loop, *, + def __init__(self, loop, *, retries=5, auto_reconnect=True, update_callback=None, auth_key_callback=None, auto_reconnect_callback=None): - self._auth_key = auth_key self._connection = None # MTProtoLayer, a.k.a. encrypted connection self._loop = loop self._retries = retries @@ -100,17 +99,15 @@ class MTProtoSender: # Public API - async def connect(self, connection): + async def connect(self, auth_key, connection): """ - Connects to the specified ``ip:port``, and generates a new - authorization key for the `MTProtoSender.session` if it does - not exist yet. + Connects to the specified given connection using the given auth key. """ if self._user_connected: __log__.info('User is already connected!') return - self._connection = MTProtoLayer(connection, self._auth_key) + self._connection = MTProtoLayer(connection, auth_key) self._user_connected = True await self._connect() From bc1fd9039dc2a2129b305d37f0d20a9df873a17e Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Tue, 2 Oct 2018 08:55:46 +0200 Subject: [PATCH 18/35] Handle receiving errors --- telethon/network/mtprotolayer.py | 3 ++ telethon/network/mtprotosender.py | 55 ++++++++++++++++++++++++------- telethon/network/mtprotostate.py | 1 + 3 files changed, 47 insertions(+), 12 deletions(-) diff --git a/telethon/network/mtprotolayer.py b/telethon/network/mtprotolayer.py index 2960b28e..d3cd7ae4 100644 --- a/telethon/network/mtprotolayer.py +++ b/telethon/network/mtprotolayer.py @@ -34,6 +34,9 @@ class MTProtoLayer: """ self._connection.disconnect() + def reset_state(self): + self._state = MTProtoState(self._state.auth_key) + async def send(self, state_list): """ The list of `RequestState` that will be sent. They will diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 8196932c..5dc5a530 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -8,7 +8,8 @@ from .mtprotoplainsender import MTProtoPlainSender from .requeststate import RequestState from .. import utils from ..errors import ( - BadMessageError, SecurityError, TypeNotFoundError, rpc_message_to_error + BadMessageError, BrokenAuthKeyError, SecurityError, TypeNotFoundError, + rpc_message_to_error ) from ..extensions import BinaryReader from ..helpers import _ReadyQueue @@ -279,6 +280,9 @@ class MTProtoSender: """ self._reconnecting = True + __log__.debug('Closing current connection...') + self._connection.disconnect() + __log__.debug('Awaiting for the send loop before reconnecting...') await self._send_loop_handle @@ -290,22 +294,24 @@ class MTProtoSender: self._reconnecting = False + # Start with a clean state (and thus session ID) to avoid old msgs + self._connection.reset_state() + retries = self._retries if self._auto_reconnect else 0 for retry in range(1, retries + 1): try: await self._connect() - # TODO Keep this? - """ - for m in self._pending_messages.values(): - self._send_queue.put_nowait(m) - """ + except ConnectionError: + __log__.info('Failed reconnection retry %d/%d', retry, retries) + else: + self._send_queue.extend(self._pending_state.values()) + self._pending_state.clear() + # TODO Where is this needed? if self._auto_reconnect_callback: self._loop.create_task(self._auto_reconnect_callback()) break - except ConnectionError: - __log__.info('Failed reconnection retry %d/%d', retry, retries) else: __log__.error('Failed to reconnect automatically.') await self._disconnect(error=ConnectionError()) @@ -356,11 +362,36 @@ class MTProtoSender: Besides `connect`, only this method ever receives data. """ while self._user_connected and not self._reconnecting: - # TODO Handle timeout, cancelled, arbitrary, broken auth, buffer, - # security and type not found. + # TODO handle incomplete read? __log__.debug('Receiving items from the network...') - message = await self._connection.recv() - await self._process_message(message) + try: + message = await self._connection.recv() + except TypeNotFoundError as e: + __log__.info('Type %08x not found, remaining data %r', + e.invalid_constructor_id, e.remaining) + continue + except SecurityError as e: + # A step while decoding had the incorrect data. This message + # should not be considered safe and it should be ignored. + __log__.warning('Security error while unpacking a ' + 'received message: %s', e) + continue + except asyncio.CancelledError: + return + except (BrokenAuthKeyError, BufferError): + __log__.info('Broken authorization key; resetting') + self._connection._state.auth_key = None + self._start_reconnect() + return + except Exception: + __log__.exception('Unhandled error while receiving data') + self._start_reconnect() + return + else: + try: + await self._process_message(message) + except Exception: + __log__.exception('Unhandled error while processing msgs') # Response Handlers diff --git a/telethon/network/mtprotostate.py b/telethon/network/mtprotostate.py index e38dad60..4b3dbefa 100644 --- a/telethon/network/mtprotostate.py +++ b/telethon/network/mtprotostate.py @@ -120,6 +120,7 @@ class MTProtoState: else: raise BufferError("Can't decode packet ({})".format(body)) + # TODO Check salt, session_id and sequence_number key_id = struct.unpack(' Date: Wed, 3 Oct 2018 14:15:51 +0200 Subject: [PATCH 19/35] Fix automatic reconnect (e.g. on bad auth key) This took more time than it should have to debug. --- telethon/network/connection/tcpfull.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/telethon/network/connection/tcpfull.py b/telethon/network/connection/tcpfull.py index 5825e68b..e07e3558 100644 --- a/telethon/network/connection/tcpfull.py +++ b/telethon/network/connection/tcpfull.py @@ -14,6 +14,10 @@ class ConnectionTcpFull(Connection): super().__init__(ip, port, loop=loop) self._send_counter = 0 + async def connect(self): + await super().connect() + self._send_counter = 0 # Important or Telegram won't reply + def _send(self, data): # https://core.telegram.org/mtproto#tcp-transport # total length, sequence number, packet and checksum (CRC32) From e319fa3aa9bd09d29399dbd289847864fc1bd65f Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Wed, 3 Oct 2018 14:46:10 +0200 Subject: [PATCH 20/35] Handle IncompleteReadError and InvalidChecksumError --- telethon/network/connection/connection.py | 38 +++++++++++++++-------- telethon/network/mtprotosender.py | 14 ++++++++- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 2f3957f2..661d2de2 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -1,5 +1,9 @@ import abc import asyncio +import logging + + +__log__ = logging.getLogger(__name__) class Connection(abc.ABC): @@ -74,36 +78,44 @@ class Connection(abc.ABC): """ return self._send_queue.put(data) - def recv(self): + async def recv(self): """ Receives a packet of data through this connection mode. This method returns a coroutine. """ - return self._recv_queue.get() + ok, result = await self._recv_queue.get() + if ok: + return result + else: + raise result from None # TODO Get/put to the queue with cancellation async def _send_loop(self): """ This loop is constantly popping items off the queue to send them. """ - while not self._disconnected.is_set(): - self._send(await self._send_queue.get()) - await self._writer.drain() + try: + while not self._disconnected.is_set(): + self._send(await self._send_queue.get()) + await self._writer.drain() + except asyncio.CancelledError: + pass + except Exception: + logging.exception('Unhandled exception in the sending loop') + self.disconnect() - # TODO Handle IncompleteReadError and InvalidChecksumError async def _recv_loop(self): """ This loop is constantly putting items on the queue as they're read. """ - while not self._disconnected.is_set(): - try: + try: + while not self._disconnected.is_set(): data = await self._recv() - except asyncio.IncompleteReadError: - if not self._disconnected.is_set(): - raise - else: - await self._recv_queue.put(data) + await self._recv_queue.put((True, data)) + except Exception as e: + await self._recv_queue.put((False, e)) + self.disconnect() @abc.abstractmethod def _send(self, data): diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 5dc5a530..d398eccc 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -9,7 +9,7 @@ from .requeststate import RequestState from .. import utils from ..errors import ( BadMessageError, BrokenAuthKeyError, SecurityError, TypeNotFoundError, - rpc_message_to_error + InvalidChecksumError, rpc_message_to_error ) from ..extensions import BinaryReader from ..helpers import _ReadyQueue @@ -376,6 +376,11 @@ class MTProtoSender: __log__.warning('Security error while unpacking a ' 'received message: %s', e) continue + except InvalidChecksumError as e: + __log__.warning( + 'Invalid checksum on the read packet (was %s expected %s)', + e.checksum, e.valid_checksum + ) except asyncio.CancelledError: return except (BrokenAuthKeyError, BufferError): @@ -383,6 +388,13 @@ class MTProtoSender: self._connection._state.auth_key = None self._start_reconnect() return + except asyncio.IncompleteReadError: + # TODO Handle packets that are too big and trigger this + # If it's not a packet that triggered this, just reconnect + __log__.info('Telegram closed the connection') + self._pending_state.clear() + self._start_reconnect() + return except Exception: __log__.exception('Unhandled error while receiving data') self._start_reconnect() From 37b9922f6482e01ea76e8dd6ff6374dfd5eedbb0 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Wed, 3 Oct 2018 15:31:49 +0200 Subject: [PATCH 21/35] Handle cancellation on the receive loop --- telethon/network/connection/connection.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 661d2de2..5d650a38 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -113,6 +113,8 @@ class Connection(abc.ABC): while not self._disconnected.is_set(): data = await self._recv() await self._recv_queue.put((True, data)) + except asyncio.CancelledError: + pass except Exception as e: await self._recv_queue.put((False, e)) self.disconnect() From 1b9d6aac06ec6dce2a6ef915c9ba516fdfd1a9da Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 4 Oct 2018 16:15:51 +0200 Subject: [PATCH 22/35] Gzip only content related data --- telethon/network/mtprotostate.py | 4 ++-- telethon/tl/core/gzippacked.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/telethon/network/mtprotostate.py b/telethon/network/mtprotostate.py index 4b3dbefa..3495ae8a 100644 --- a/telethon/network/mtprotostate.py +++ b/telethon/network/mtprotostate.py @@ -79,9 +79,9 @@ class MTProtoState: msg_id = self._get_new_msg_id() seq_no = self._get_seq_no(content_related) if after_id is None: - body = GzipPacked.gzip_if_smaller(data) + body = GzipPacked.gzip_if_smaller(content_related, data) else: - body = GzipPacked.gzip_if_smaller( + body = GzipPacked.gzip_if_smaller(content_related, bytes(InvokeAfterMsgRequest(after_id, data))) buffer.write(struct.pack(' 512: + if content_related and len(data) > 512: gzipped = bytes(GzipPacked(data)) return gzipped if len(gzipped) < len(data) else data else: From 0cc8bca098018d05c211e330efd26839bd8b1776 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 4 Oct 2018 16:39:31 +0200 Subject: [PATCH 23/35] Delete TcpClient --- telethon/extensions/__init__.py | 1 - telethon/extensions/tcpclient.py | 171 ------------------------------- 2 files changed, 172 deletions(-) delete mode 100644 telethon/extensions/tcpclient.py diff --git a/telethon/extensions/__init__.py b/telethon/extensions/__init__.py index 9e7f6686..903460b6 100644 --- a/telethon/extensions/__init__.py +++ b/telethon/extensions/__init__.py @@ -4,4 +4,3 @@ communication with support for cancelling the operation, and an utility class to read arbitrary binary data in a more comfortable way, with int/strings/etc. """ from .binaryreader import BinaryReader -from .tcpclient import TcpClient diff --git a/telethon/extensions/tcpclient.py b/telethon/extensions/tcpclient.py deleted file mode 100644 index 4baa46ca..00000000 --- a/telethon/extensions/tcpclient.py +++ /dev/null @@ -1,171 +0,0 @@ -""" -This module holds a rough implementation of the C# TCP client. - -This class is **not** safe across several tasks since partial reads -may be ``await``'ed before being able to return the exact byte count. - -This class is also not concerned about disconnections or retries of -any sort, nor any other kind of errors such as connecting twice. -""" -import asyncio -import errno -import logging -import socket -import ssl - -CONN_RESET_ERRNOS = { - errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, - errno.EINVAL, errno.ENOTCONN, errno.EHOSTUNREACH, - errno.ECONNREFUSED, errno.ECONNRESET, errno.ECONNABORTED, - errno.ENETDOWN, errno.ENETRESET, errno.ECONNABORTED, - errno.EHOSTDOWN, errno.EPIPE, errno.ESHUTDOWN -} -# catched: EHOSTUNREACH, ECONNREFUSED, ECONNRESET, ENETUNREACH -# ConnectionError: EPIPE, ESHUTDOWN, ECONNABORTED, ECONNREFUSED, ECONNRESET - -try: - import socks -except ImportError: - socks = None - -SSL_PORT = 443 -__log__ = logging.getLogger(__name__) - - -class TcpClient: - """A simple TCP client to ease the work with sockets and proxies.""" - - class SocketClosed(ConnectionError): - pass - - def __init__(self, *, loop, timeout, ssl=None, proxy=None): - """ - Initializes the TCP client. - - :param proxy: the proxy to be used, if any. - :param timeout: the timeout for connect, read and write operations. - :param ssl: ssl.wrap_socket keyword arguments to use when connecting - if port == SSL_PORT, or do nothing if not present. - """ - self._loop = loop - self.proxy = proxy - self.ssl = ssl - self._socket = None - self._reader = None - self._writer = None - self._closed = asyncio.Event(loop=self._loop) - self._closed.set() - - if isinstance(timeout, (int, float)): - self.timeout = float(timeout) - elif hasattr(timeout, 'seconds'): - self.timeout = float(timeout.seconds) - else: - raise TypeError('Invalid timeout type: {}'.format(type(timeout))) - - @staticmethod - def _create_socket(mode, proxy): - if proxy is None: - s = socket.socket(mode, socket.SOCK_STREAM) - else: - __log__.info('Connection will be made through proxy %s', proxy) - import socks - s = socks.socksocket(mode, socket.SOCK_STREAM) - if isinstance(proxy, dict): - s.set_proxy(**proxy) - else: # tuple, list, etc. - s.set_proxy(*proxy) - s.setblocking(False) - return s - - async def connect(self, ip, port): - """ - Tries connecting to IP:port unless an OSError is raised. - - :param ip: the IP to connect to. - :param port: the port to connect to. - """ - if ':' in ip: # IPv6 - ip = ip.replace('[', '').replace(']', '') - mode, address = socket.AF_INET6, (ip, port, 0, 0) - else: - mode, address = socket.AF_INET, (ip, port) - - try: - if self._socket is None: - self._socket = self._create_socket(mode, self.proxy) - wrap_ssl = self.ssl and port == SSL_PORT - else: - wrap_ssl = False - - await asyncio.wait_for( - self._loop.sock_connect(self._socket, address), - timeout=self.timeout, - loop=self._loop - ) - if wrap_ssl: - # Temporarily set the socket to blocking - # (timeout) until connection is established. - self._socket.settimeout(self.timeout) - self._socket = ssl.wrap_socket( - self._socket, do_handshake_on_connect=True, **self.ssl) - self._socket.setblocking(False) - - self._closed.clear() - self._reader, self._writer =\ - await asyncio.open_connection(sock=self._socket) - except OSError as e: - if e.errno in CONN_RESET_ERRNOS: - raise ConnectionResetError() from e - else: - raise - - @property - def is_connected(self): - """Determines whether the client is connected or not.""" - return not self._closed.is_set() - - def close(self): - """Closes the connection.""" - fd = None - try: - if self._writer is not None: - self._writer.close() - - if self._socket is not None: - fd = self._socket.fileno() - if self.is_connected: - self._socket.shutdown(socket.SHUT_RDWR) - self._socket.close() - except OSError: - pass # Ignore ENOTCONN, EBADF, and any other error when closing - finally: - self._socket = None - self._reader = None - self._writer = None - self._closed.set() - if fd and fd != -1: - self._loop.remove_reader(fd) - - async def write(self, data): - """ - Writes (sends) the specified bytes to the connected peer. - :param data: the data to send. - """ - if not self.is_connected: - raise ConnectionResetError('Not connected') - - self._writer.write(data) - await self._writer.drain() - - async def read(self, size): - """ - Reads (receives) a whole block of size bytes from the connected peer. - - :param size: the size of the block to be read. - :return: the read data with len(data) == size. - """ - if not self.is_connected: - raise ConnectionResetError('Not connected') - - return await self._reader.readexactly(size) From db83709c6b48c1d25ac4da1b3dbf3a2839386d8c Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 4 Oct 2018 16:39:57 +0200 Subject: [PATCH 24/35] Support connection timeout --- telethon/client/telegrambaseclient.py | 2 ++ telethon/network/connection/connection.py | 10 ++++++---- telethon/network/connection/http.py | 10 +++++++--- telethon/network/connection/tcpabridged.py | 4 ++-- telethon/network/connection/tcpfull.py | 4 ++-- telethon/network/connection/tcpintermediate.py | 4 ++-- telethon/network/connection/tcpobfuscated.py | 4 ++-- telethon/network/mtprotolayer.py | 4 ++-- telethon/network/mtprotosender.py | 10 +++++----- 9 files changed, 30 insertions(+), 22 deletions(-) diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index 8ef6103a..b0face7a 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -202,6 +202,7 @@ class TelegramBaseClient(abc.ABC): self._request_retries = request_retries or sys.maxsize self._connection_retries = connection_retries or sys.maxsize + self._timeout = timeout self._auto_reconnect = auto_reconnect assert isinstance(connection, type) @@ -228,6 +229,7 @@ class TelegramBaseClient(abc.ABC): self._loop, retries=self._connection_retries, auto_reconnect=self._auto_reconnect, + connect_timeout=self._timeout, update_callback=self._handle_update, auth_key_callback=self._auth_key_callback, auto_reconnect_callback=self._handle_auto_reconnect diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 5d650a38..8e3acb44 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -16,7 +16,7 @@ class Connection(abc.ABC): under any conditions for as long as the user doesn't disconnect or the input parameters to auto-reconnect dictate otherwise. """ - # TODO Support proxy. Support timeout? + # TODO Support proxy def __init__(self, ip, port, *, loop): self._ip = ip self._port = port @@ -31,12 +31,14 @@ class Connection(abc.ABC): self._send_queue = asyncio.Queue(1) self._recv_queue = asyncio.Queue(1) - async def connect(self): + async def connect(self, timeout=None): """ Establishes a connection with the server. """ - self._reader, self._writer = await asyncio.open_connection( - self._ip, self._port, loop=self._loop) + self._reader, self._writer = await asyncio.wait_for( + asyncio.open_connection(self._ip, self._port, loop=self._loop), + loop=self._loop, timeout=timeout + ) self._disconnected.clear() self._disconnected_future = None diff --git a/telethon/network/connection/http.py b/telethon/network/connection/http.py index c346d2f8..09096454 100644 --- a/telethon/network/connection/http.py +++ b/telethon/network/connection/http.py @@ -4,13 +4,17 @@ from .connection import Connection class ConnectionHttp(Connection): - async def connect(self): + async def connect(self, timeout=None): # TODO Test if the ssl part works or it needs to be as before: # dict(ssl_version=ssl.PROTOCOL_SSLv23, ciphers='ADH-AES256-SHA') - self._reader, self._writer = await asyncio.open_connection( - self._ip, self._port, loop=self._loop, ssl=True) + self._reader, self._writer = await asyncio.wait_for( + asyncio.open_connection( + self._ip, self._port, loop=self._loop, ssl=True), + loop=self._loop, timeout=timeout + ) self._disconnected.clear() + self._disconnected_future = None self._send_task = self._loop.create_task(self._send_loop()) self._recv_task = self._loop.create_task(self._send_loop()) diff --git a/telethon/network/connection/tcpabridged.py b/telethon/network/connection/tcpabridged.py index 6352413e..dced879f 100644 --- a/telethon/network/connection/tcpabridged.py +++ b/telethon/network/connection/tcpabridged.py @@ -9,8 +9,8 @@ class ConnectionTcpAbridged(Connection): only require 1 byte if the packet length is less than 508 bytes (127 << 2, which is very common). """ - async def connect(self): - await super().connect() + async def connect(self, timeout=None): + await super().connect(timeout=timeout) await self.send(b'\xef') def _write(self, data): diff --git a/telethon/network/connection/tcpfull.py b/telethon/network/connection/tcpfull.py index e07e3558..1d278c30 100644 --- a/telethon/network/connection/tcpfull.py +++ b/telethon/network/connection/tcpfull.py @@ -14,8 +14,8 @@ class ConnectionTcpFull(Connection): super().__init__(ip, port, loop=loop) self._send_counter = 0 - async def connect(self): - await super().connect() + async def connect(self, timeout=None): + await super().connect(timeout=timeout) self._send_counter = 0 # Important or Telegram won't reply def _send(self, data): diff --git a/telethon/network/connection/tcpintermediate.py b/telethon/network/connection/tcpintermediate.py index 82a99250..7b8e3c79 100644 --- a/telethon/network/connection/tcpintermediate.py +++ b/telethon/network/connection/tcpintermediate.py @@ -8,8 +8,8 @@ class ConnectionTcpIntermediate(Connection): Intermediate mode between `ConnectionTcpFull` and `ConnectionTcpAbridged`. Always sends 4 extra bytes for the packet length. """ - async def connect(self): - await super().connect() + async def connect(self, timeout=None): + await super().connect(timeout=timeout) await self.send(b'\xee\xee\xee\xee') def _send(self, data): diff --git a/telethon/network/connection/tcpobfuscated.py b/telethon/network/connection/tcpobfuscated.py index 393977b2..eb4fd7ff 100644 --- a/telethon/network/connection/tcpobfuscated.py +++ b/telethon/network/connection/tcpobfuscated.py @@ -22,8 +22,8 @@ class ConnectionTcpObfuscated(ConnectionTcpAbridged): async def _read(self, n): return self._aes_decrypt.encrypt(await self._reader.readexactly(n)) - async def connect(self): - await Connection.connect(self) + async def connect(self, timeout=None): + await Connection.connect(self, timeout=timeout) # Obfuscated messages secrets cannot start with any of these keywords = (b'PVrG', b'GET ', b'POST', b'\xee\xee\xee\xee') diff --git a/telethon/network/mtprotolayer.py b/telethon/network/mtprotolayer.py index d3cd7ae4..731cabb8 100644 --- a/telethon/network/mtprotolayer.py +++ b/telethon/network/mtprotolayer.py @@ -22,11 +22,11 @@ class MTProtoLayer: self._connection = connection self._state = MTProtoState(auth_key) - def connect(self): + def connect(self, timeout=None): """ Wrapper for ``self._connection.connect()``. """ - return self._connection.connect() + return self._connection.connect(timeout=timeout) def disconnect(self): """ diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index d398eccc..4d96cb2c 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -40,12 +40,14 @@ class MTProtoSender: key exists yet. """ def __init__(self, loop, *, - retries=5, auto_reconnect=True, update_callback=None, + retries=5, auto_reconnect=True, connect_timeout=None, + update_callback=None, auth_key_callback=None, auto_reconnect_callback=None): self._connection = None # MTProtoLayer, a.k.a. encrypted connection self._loop = loop self._retries = retries self._auto_reconnect = auto_reconnect + self._connect_timeout = connect_timeout self._update_callback = update_callback self._auth_key_callback = auth_key_callback self._auto_reconnect_callback = auto_reconnect_callback @@ -189,14 +191,12 @@ class MTProtoSender: authorization key if necessary, and starting the send and receive loops. """ - # TODO With ``asyncio.open_connection``, no timeout occurs - # However, these are probably desirable in some circumstances. __log__.info('Connecting to %s...', self._connection) for retry in range(1, self._retries + 1): try: __log__.debug('Connection attempt {}...'.format(retry)) - await self._connection.connect() - except OSError as e: + await self._connection.connect(timeout=self._connect_timeout) + except (OSError, asyncio.TimeoutError) as e: __log__.warning('Attempt {} at connecting failed: {}: {}' .format(retry, type(e).__name__, e)) else: From ebde3be82030bb89a8fa461d420a777bc827eda7 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 4 Oct 2018 17:11:31 +0200 Subject: [PATCH 25/35] Add support for proxy again --- telethon/client/telegrambaseclient.py | 7 ++- telethon/network/connection/connection.py | 51 ++++++++++++++++--- telethon/network/connection/http.py | 18 ++----- telethon/network/connection/tcpabridged.py | 4 +- telethon/network/connection/tcpfull.py | 8 +-- .../network/connection/tcpintermediate.py | 4 +- telethon/network/connection/tcpobfuscated.py | 8 +-- 7 files changed, 65 insertions(+), 35 deletions(-) diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index b0face7a..4e521291 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -202,6 +202,7 @@ class TelegramBaseClient(abc.ABC): self._request_retries = request_retries or sys.maxsize self._connection_retries = connection_retries or sys.maxsize + self._proxy = proxy self._timeout = timeout self._auto_reconnect = auto_reconnect @@ -307,7 +308,9 @@ class TelegramBaseClient(abc.ABC): Connects to Telegram. """ await self._sender.connect(self.session.auth_key, self._connection( - self.session.server_address, self.session.port, loop=self._loop)) + self.session.server_address, self.session.port, + loop=self._loop, proxy=self._proxy + )) await self._sender.send(self._init_with( functions.help.GetConfigRequest())) @@ -419,7 +422,7 @@ class TelegramBaseClient(abc.ABC): # with no further clues. sender = MTProtoSender(self._loop) await sender.connect(None, self._connection( - dc.ip_address, dc.port, loop=self._loop)) + dc.ip_address, dc.port, loop=self._loop, proxy=self._proxy)) __log__.info('Exporting authorization for data center %s', dc) auth = await self(functions.auth.ExportAuthorizationRequest(dc_id)) req = self._init_with(functions.auth.ImportAuthorizationRequest( diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 8e3acb44..5fce6f11 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -1,7 +1,8 @@ import abc import asyncio import logging - +import socket +import ssl as ssl_mod __log__ = logging.getLogger(__name__) @@ -16,11 +17,11 @@ class Connection(abc.ABC): under any conditions for as long as the user doesn't disconnect or the input parameters to auto-reconnect dictate otherwise. """ - # TODO Support proxy - def __init__(self, ip, port, *, loop): + def __init__(self, ip, port, *, loop, proxy=None): self._ip = ip self._port = port self._loop = loop + self._proxy = proxy self._reader = None self._writer = None self._disconnected = asyncio.Event(loop=loop) @@ -31,14 +32,48 @@ class Connection(abc.ABC): self._send_queue = asyncio.Queue(1) self._recv_queue = asyncio.Queue(1) - async def connect(self, timeout=None): + async def connect(self, timeout=None, ssl=None): """ Establishes a connection with the server. """ - self._reader, self._writer = await asyncio.wait_for( - asyncio.open_connection(self._ip, self._port, loop=self._loop), - loop=self._loop, timeout=timeout - ) + if not self._proxy: + self._reader, self._writer = await asyncio.wait_for( + asyncio.open_connection( + self._ip, self._port, loop=self._loop, ssl=ssl), + loop=self._loop, timeout=timeout + ) + else: + import socks + if ':' in self._ip: + mode, address = socket.AF_INET6, (self._ip, self._port, 0, 0) + else: + mode, address = socket.AF_INET, (self._ip, self._port) + + s = socks.socksocket(mode, socket.SOCK_STREAM) + if isinstance(self._proxy, dict): + s.set_proxy(**self._proxy) + else: + s.set_proxy(*self._proxy) + + s.setblocking(False) + await asyncio.wait_for( + self._loop.sock_connect(s, address), + timeout=timeout, + loop=self._loop + ) + if ssl: + self._socket.settimeout(timeout) + self._socket = ssl_mod.wrap_socket( + s, + do_handshake_on_connect=True, + ssl_version=ssl_mod.PROTOCOL_SSLv23, + ciphers='ADH-AES256-SHA' + ) + self._socket.setblocking(False) + + self._reader, self._writer = await asyncio.open_connection( + self._ip, self._port, loop=self._loop, sock=s + ) self._disconnected.clear() self._disconnected_future = None diff --git a/telethon/network/connection/http.py b/telethon/network/connection/http.py index 09096454..bfda941d 100644 --- a/telethon/network/connection/http.py +++ b/telethon/network/connection/http.py @@ -3,20 +3,12 @@ import asyncio from .connection import Connection -class ConnectionHttp(Connection): - async def connect(self, timeout=None): - # TODO Test if the ssl part works or it needs to be as before: - # dict(ssl_version=ssl.PROTOCOL_SSLv23, ciphers='ADH-AES256-SHA') - self._reader, self._writer = await asyncio.wait_for( - asyncio.open_connection( - self._ip, self._port, loop=self._loop, ssl=True), - loop=self._loop, timeout=timeout - ) +SSL_PORT = 443 - self._disconnected.clear() - self._disconnected_future = None - self._send_task = self._loop.create_task(self._send_loop()) - self._recv_task = self._loop.create_task(self._send_loop()) + +class ConnectionHttp(Connection): + async def connect(self, timeout=None, ssl=None): + await super().connect(timeout=timeout, ssl=self._port == SSL_PORT) def _send(self, message): self._writer.write( diff --git a/telethon/network/connection/tcpabridged.py b/telethon/network/connection/tcpabridged.py index dced879f..cff295ae 100644 --- a/telethon/network/connection/tcpabridged.py +++ b/telethon/network/connection/tcpabridged.py @@ -9,8 +9,8 @@ class ConnectionTcpAbridged(Connection): only require 1 byte if the packet length is less than 508 bytes (127 << 2, which is very common). """ - async def connect(self, timeout=None): - await super().connect(timeout=timeout) + async def connect(self, timeout=None, ssl=None): + await super().connect(timeout=timeout, ssl=ssl) await self.send(b'\xef') def _write(self, data): diff --git a/telethon/network/connection/tcpfull.py b/telethon/network/connection/tcpfull.py index 1d278c30..fd9fd1cf 100644 --- a/telethon/network/connection/tcpfull.py +++ b/telethon/network/connection/tcpfull.py @@ -10,12 +10,12 @@ class ConnectionTcpFull(Connection): Default Telegram mode. Sends 12 additional bytes and needs to calculate the CRC value of the packet itself. """ - def __init__(self, ip, port, *, loop): - super().__init__(ip, port, loop=loop) + def __init__(self, ip, port, *, loop, proxy=None): + super().__init__(ip, port, loop=loop, proxy=proxy) self._send_counter = 0 - async def connect(self, timeout=None): - await super().connect(timeout=timeout) + async def connect(self, timeout=None, ssl=None): + await super().connect(timeout=timeout, ssl=ssl) self._send_counter = 0 # Important or Telegram won't reply def _send(self, data): diff --git a/telethon/network/connection/tcpintermediate.py b/telethon/network/connection/tcpintermediate.py index 7b8e3c79..322b1e8f 100644 --- a/telethon/network/connection/tcpintermediate.py +++ b/telethon/network/connection/tcpintermediate.py @@ -8,8 +8,8 @@ class ConnectionTcpIntermediate(Connection): Intermediate mode between `ConnectionTcpFull` and `ConnectionTcpAbridged`. Always sends 4 extra bytes for the packet length. """ - async def connect(self, timeout=None): - await super().connect(timeout=timeout) + async def connect(self, timeout=None, ssl=None): + await super().connect(timeout=timeout, ssl=ssl) await self.send(b'\xee\xee\xee\xee') def _send(self, data): diff --git a/telethon/network/connection/tcpobfuscated.py b/telethon/network/connection/tcpobfuscated.py index eb4fd7ff..5f2ca7e4 100644 --- a/telethon/network/connection/tcpobfuscated.py +++ b/telethon/network/connection/tcpobfuscated.py @@ -11,8 +11,8 @@ class ConnectionTcpObfuscated(ConnectionTcpAbridged): every message with a randomly generated key using the AES-CTR mode so the packets are harder to discern. """ - def __init__(self, ip, port, *, loop): - super().__init__(ip, port, loop=loop) + def __init__(self, ip, port, *, loop, proxy=None): + super().__init__(ip, port, loop=loop, proxy=proxy) self._aes_encrypt = None self._aes_decrypt = None @@ -22,8 +22,8 @@ class ConnectionTcpObfuscated(ConnectionTcpAbridged): async def _read(self, n): return self._aes_decrypt.encrypt(await self._reader.readexactly(n)) - async def connect(self, timeout=None): - await Connection.connect(self, timeout=timeout) + async def connect(self, timeout=None, ssl=None): + await super().connect(timeout=timeout, ssl=ssl) # Obfuscated messages secrets cannot start with any of these keywords = (b'PVrG', b'GET ', b'POST', b'\xee\xee\xee\xee') From a5d4e979225acd4c4fd9ea10d2474fc1093ab4d1 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 4 Oct 2018 17:34:58 +0200 Subject: [PATCH 26/35] Fix alternative connection modes --- telethon/network/connection/tcpabridged.py | 3 ++- telethon/network/connection/tcpintermediate.py | 3 ++- telethon/network/connection/tcpobfuscated.py | 4 ++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/telethon/network/connection/tcpabridged.py b/telethon/network/connection/tcpabridged.py index cff295ae..c9350da9 100644 --- a/telethon/network/connection/tcpabridged.py +++ b/telethon/network/connection/tcpabridged.py @@ -11,7 +11,8 @@ class ConnectionTcpAbridged(Connection): """ async def connect(self, timeout=None, ssl=None): await super().connect(timeout=timeout, ssl=ssl) - await self.send(b'\xef') + self._writer.write(b'\xef') + await self._writer.drain() def _write(self, data): """ diff --git a/telethon/network/connection/tcpintermediate.py b/telethon/network/connection/tcpintermediate.py index 322b1e8f..59467be7 100644 --- a/telethon/network/connection/tcpintermediate.py +++ b/telethon/network/connection/tcpintermediate.py @@ -10,7 +10,8 @@ class ConnectionTcpIntermediate(Connection): """ async def connect(self, timeout=None, ssl=None): await super().connect(timeout=timeout, ssl=ssl) - await self.send(b'\xee\xee\xee\xee') + self._writer.write(b'\xee\xee\xee\xee') + await self._writer.drain() def _send(self, data): self._writer.write(struct.pack(' Date: Thu, 4 Oct 2018 17:50:56 +0200 Subject: [PATCH 27/35] Remove irrelevant TODOs and add more logging --- telethon/network/connection/connection.py | 1 - telethon/network/mtprotolayer.py | 15 +++++++++++++-- telethon/network/mtprotosender.py | 2 -- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/telethon/network/connection/connection.py b/telethon/network/connection/connection.py index 5fce6f11..e61f8720 100644 --- a/telethon/network/connection/connection.py +++ b/telethon/network/connection/connection.py @@ -127,7 +127,6 @@ class Connection(abc.ABC): else: raise result from None - # TODO Get/put to the queue with cancellation async def _send_loop(self): """ This loop is constantly popping items off the queue to send them. diff --git a/telethon/network/mtprotolayer.py b/telethon/network/mtprotolayer.py index 731cabb8..3c21877b 100644 --- a/telethon/network/mtprotolayer.py +++ b/telethon/network/mtprotolayer.py @@ -1,10 +1,13 @@ import io +import logging import struct from .mtprotostate import MTProtoState from ..tl import TLRequest from ..tl.core.messagecontainer import MessageContainer +__log__ = logging.getLogger(__name__) + class MTProtoLayer: """ @@ -64,7 +67,6 @@ class MTProtoLayer: nested inside another message and message container) and returns the serialized message data. """ - # TODO write_data_as_message raises on invalid messages, handle it # TODO This method could be an iterator yielding messages while small # respecting the ``MessageContainer.MAXIMUM_SIZE`` limit. # @@ -84,6 +86,10 @@ class MTProtoLayer: n += 1 state.msg_id = self._state.write_data_as_message( buffer, state.data, isinstance(state.request, TLRequest)) + + __log__.debug('Assigned msg_id = %d to %s (%x)', + state.msg_id, state.request.__class__.__name__, + id(state.request)) else: last_id = None for s in state: @@ -92,6 +98,9 @@ class MTProtoLayer: buffer, s.data, isinstance(s.request, TLRequest), after_id=last_id) + __log__.debug('Assigned msg_id = %d to %s (%x)', + s.msg_id, s.request.__class__.__name__, + id(s.request)) if n > 1: # Inlined code to pack several messages into a container # @@ -112,7 +121,9 @@ class MTProtoLayer: for s in state: s.container_id = container_id - return buffer.getvalue() + r = buffer.getvalue() + __log__.debug('Packed %d message(s) in %d bytes for sending', n, len(r)) + return r def __str__(self): return str(self._connection) diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 4d96cb2c..29869a63 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -343,7 +343,6 @@ class MTProtoSender: if state_list is None: break - # TODO Debug logs to notify which messages are being sent # TODO Try sending them while no future was cancelled? # TODO Handle cancelled?, arbitrary errors await self._connection.send(state_list) @@ -362,7 +361,6 @@ class MTProtoSender: Besides `connect`, only this method ever receives data. """ while self._user_connected and not self._reconnecting: - # TODO handle incomplete read? __log__.debug('Receiving items from the network...') try: message = await self._connection.recv() From ef60ade647876792fd6ffed7ddadfdfdb3b5aed5 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 5 Oct 2018 12:26:59 +0200 Subject: [PATCH 28/35] Rewrite container packing to support arbitrary sizes --- telethon/network/mtprotolayer.py | 119 +++++++++++++++++++------------ telethon/tl/core/tlmessage.py | 2 + 2 files changed, 76 insertions(+), 45 deletions(-) diff --git a/telethon/network/mtprotolayer.py b/telethon/network/mtprotolayer.py index 3c21877b..95bb644f 100644 --- a/telethon/network/mtprotolayer.py +++ b/telethon/network/mtprotolayer.py @@ -4,6 +4,7 @@ import struct from .mtprotostate import MTProtoState from ..tl import TLRequest +from ..tl.core.tlmessage import TLMessage from ..tl.core.messagecontainer import MessageContainer __log__ = logging.getLogger(__name__) @@ -48,8 +49,8 @@ class MTProtoLayer: Nested lists imply an order is required for the messages in them. Message containers will be used if there is more than one item. """ - data = self._pack_state_list(state_list) - await self._connection.send(self._state.encrypt_message_data(data)) + for data in filter(None, self._pack_state_list(state_list)): + await self._connection.send(self._state.encrypt_message_data(data)) async def recv(self): """ @@ -67,9 +68,6 @@ class MTProtoLayer: nested inside another message and message container) and returns the serialized message data. """ - # TODO This method could be an iterator yielding messages while small - # respecting the ``MessageContainer.MAXIMUM_SIZE`` limit. - # # Note that the simplest case is writing a single query data into # a message, and returning the message data and ID. For efficiency # purposes this method supports more than one message and automatically @@ -79,51 +77,82 @@ class MTProtoLayer: # to store and serialize the data. However, to keep the context local # and relevant to the only place where such feature is actually used, # this is not done. - n = 0 + # + # When iterating over the state_list there are two branches, one + # being just a state and the other being a list so the inner states + # depend on each other. In either case, if the packed size exceeds + # the maximum container size, it must be sent. This code is non- + # trivial so it has been factored into an inner function. + # + # A new buffer instance will be used once the size should be "flushed" buffer = io.BytesIO() - for state in state_list: - if not isinstance(state, list): - n += 1 - state.msg_id = self._state.write_data_as_message( - buffer, state.data, isinstance(state.request, TLRequest)) + # The batch of requests sent in a single buffer-flush. We need to + # remember which states were written to set their container ID. + batch = [] + # The currently written size. Reset when it exceeds the maximum. + size = 0 - __log__.debug('Assigned msg_id = %d to %s (%x)', - state.msg_id, state.request.__class__.__name__, - id(state.request)) - else: - last_id = None - for s in state: - n += 1 - last_id = s.msg_id = self._state.write_data_as_message( - buffer, s.data, isinstance(s.request, TLRequest), - after_id=last_id) + def write_state(state, after_id=None): + nonlocal buffer, batch, size + if state: + batch.append(state) + size += len(state.data) + TLMessage.SIZE_OVERHEAD - __log__.debug('Assigned msg_id = %d to %s (%x)', - s.msg_id, s.request.__class__.__name__, - id(s.request)) - if n > 1: - # Inlined code to pack several messages into a container - # - # TODO This part and encrypting data prepend a few bytes but - # force a potentially large payload to be appended, which - # may be expensive. Can we do better? - data = struct.pack( - ' MessageContainer.MAXIMUM_SIZE: + size -= MessageContainer.MAXIMUM_SIZE + if len(batch) > 1: + # Inlined code to pack several messages into a container + data = struct.pack( + ' MessageContainer.MAXIMUM_SIZE: + state.future.set_exception( + ValueError('Request payload is too big')) + return + + # This is the only requirement to make this work. + state.msg_id = self._state.write_data_as_message( + buffer, state.data, isinstance(state.request, TLRequest), + after_id=after_id + ) + __log__.debug('Assigned msg_id = %d to %s (%x)', + state.msg_id, state.request.__class__.__name__, + id(state.request)) + + # TODO Yield in the inner loop -> Telegram "Invalid container". Why? + for state in state_list: + if not isinstance(state, list): + yield write_state(state) + else: + after_id = None + for s in state: + yield write_state(s, after_id) + after_id = s.msg_id + + yield write_state(None) def __str__(self): return str(self._connection) diff --git a/telethon/tl/core/tlmessage.py b/telethon/tl/core/tlmessage.py index c9e15397..19bcc583 100644 --- a/telethon/tl/core/tlmessage.py +++ b/telethon/tl/core/tlmessage.py @@ -22,6 +22,8 @@ class TLMessage(TLObject): inlined and is unlikely to change. Thus these are only needed to encapsulate responses. """ + SIZE_OVERHEAD = 12 + def __init__(self, msg_id, seq_no, obj): self.msg_id = msg_id self.seq_no = seq_no From dc7713645329c26b83c38fa0350a69031d81c638 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 5 Oct 2018 13:23:38 +0200 Subject: [PATCH 29/35] Don't expect responses from ack, log send errors, remove TODOs --- telethon/network/mtprotosender.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 29869a63..407bc24e 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -6,6 +6,7 @@ from . import authenticator from .mtprotolayer import MTProtoLayer from .mtprotoplainsender import MTProtoPlainSender from .requeststate import RequestState +from ..tl.tlobject import TLRequest from .. import utils from ..errors import ( BadMessageError, BrokenAuthKeyError, SecurityError, TypeNotFoundError, @@ -122,11 +123,7 @@ class MTProtoSender: Cleanly disconnects the instance from the network, cancels all pending requests, and closes the send and receive loops. """ - if not self._user_connected: - __log__.info('User is already disconnected!') - return - - await self._disconnect() + self._disconnect() def send(self, request, ordered=False): """ @@ -215,7 +212,6 @@ class MTProtoSender: state.auth_key, state.time_offset =\ await authenticator.do_authentication(plain) - # TODO This callback feels out of place if self._auth_key_callback: self._auth_key_callback(state.auth_key) @@ -240,7 +236,7 @@ class MTProtoSender: self._disconnected = self._loop.create_future() __log__.info('Connection to %s complete!', self._connection) - async def _disconnect(self, error=None): + def _disconnect(self, error=None): __log__.info('Disconnecting from %s...', self._connection) self._user_connected = False try: @@ -307,7 +303,6 @@ class MTProtoSender: self._send_queue.extend(self._pending_state.values()) self._pending_state.clear() - # TODO Where is this needed? if self._auto_reconnect_callback: self._loop.create_task(self._auto_reconnect_callback()) @@ -343,15 +338,20 @@ class MTProtoSender: if state_list is None: break - # TODO Try sending them while no future was cancelled? - # TODO Handle cancelled?, arbitrary errors - await self._connection.send(state_list) + try: + await self._connection.send(state_list) + except Exception: + __log__.exception('Unhandled error while sending data') + continue + for state in state_list: if not isinstance(state, list): - self._pending_state[state.msg_id] = state + if isinstance(state.request, TLRequest): + self._pending_state[state.msg_id] = state else: for s in state: - self._pending_state[s.msg_id] = s + if isinstance(s.request, TLRequest): + self._pending_state[s.msg_id] = s async def _recv_loop(self): """ @@ -387,10 +387,7 @@ class MTProtoSender: self._start_reconnect() return except asyncio.IncompleteReadError: - # TODO Handle packets that are too big and trigger this - # If it's not a packet that triggered this, just reconnect __log__.info('Telegram closed the connection') - self._pending_state.clear() self._start_reconnect() return except Exception: From dedbf29ca416ecb04473a9dd687eded617593adc Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 5 Oct 2018 13:42:07 +0200 Subject: [PATCH 30/35] Fix Conversation.wait_event not resolving them (from d474458) --- telethon/tl/custom/conversation.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/telethon/tl/custom/conversation.py b/telethon/tl/custom/conversation.py index f8d9cc5b..66ec79f8 100644 --- a/telethon/tl/custom/conversation.py +++ b/telethon/tl/custom/conversation.py @@ -260,10 +260,7 @@ class Conversation(ChatGetter): if isinstance(event, type): event = event() - # Since we await resolve here we don't need to await resolved. - # We know it has already been resolved, unlike when normally - # adding an event handler, for which a task is created to resolve. - await event.resolve() + await event.resolve(self._client) counter = Conversation._custom_counter Conversation._custom_counter += 1 From aa3f26263c904b61e419b538ce28def9dfab5c48 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 5 Oct 2018 14:06:15 +0200 Subject: [PATCH 31/35] Fix timeout not accepting timedelta --- telethon/client/telegrambaseclient.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index 4e521291..8c99ef00 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -65,11 +65,11 @@ class TelegramBaseClient(abc.ABC): A tuple consisting of ``(socks.SOCKS5, 'host', port)``. See https://github.com/Anorov/PySocks#usage-1 for more. - timeout (`int` | `float` | `timedelta`, optional): - The timeout to be used when connecting, sending and receiving - responses from the network. This is **not** the timeout to - be used when ``await``'ing for invoked requests, and you - should use ``asyncio.wait`` or ``asyncio.wait_for`` for that. + timeout (`int` | `float`, optional): + The timeout in seconds to be used when connecting. + This is **not** the timeout to be used when ``await``'ing for + invoked requests, and you should use ``asyncio.wait`` or + ``asyncio.wait_for`` for that. request_retries (`int`, optional): How many times a request should be retried. Request are retried @@ -147,7 +147,7 @@ class TelegramBaseClient(abc.ABC): connection=ConnectionTcpFull, use_ipv6=False, proxy=None, - timeout=timedelta(seconds=10), + timeout=10, request_retries=5, connection_retries=5, auto_reconnect=True, @@ -248,10 +248,6 @@ class TelegramBaseClient(abc.ABC): # Save whether the user is authorized here (a.k.a. logged in) self._authorized = None # None = We don't know yet - # Default PingRequest delay - self._last_ping = datetime.now() - self._ping_delay = timedelta(minutes=1) - self._updates_handle = None self._last_request = time.time() self._channel_pts = {} From 0fcc2e5e52b6c4bc89666ee1280882a1dc31c0b5 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 5 Oct 2018 14:11:47 +0200 Subject: [PATCH 32/35] Add autocast for InputDocument and InputChatPhoto --- telethon/utils.py | 32 +++++++++++++++++++++++ telethon_generator/generators/tlobject.py | 4 ++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/telethon/utils.py b/telethon/utils.py index 60e4c990..4f98fbc3 100644 --- a/telethon/utils.py +++ b/telethon/utils.py @@ -270,9 +270,41 @@ def get_input_photo(photo): if isinstance(photo, types.PhotoEmpty): return types.InputPhotoEmpty() + if isinstance(photo, types.messages.ChatFull): + photo = photo.full_chat + if isinstance(photo, types.ChannelFull): + return get_input_photo(photo.chat_photo) + elif isinstance(photo, types.UserFull): + return get_input_photo(photo.profile_photo) + elif isinstance(photo, (types.Channel, types.Chat, types.User)): + return get_input_photo(photo.photo) + + if isinstance(photo, (types.UserEmpty, types.ChatEmpty, + types.ChatForbidden, types.ChannelForbidden)): + return types.InputPhotoEmpty() + _raise_cast_fail(photo, 'InputPhoto') +def get_input_chat_photo(photo): + """Similar to :meth:`get_input_peer`, but for chat photos""" + try: + if photo.SUBCLASS_OF_ID == 0xd4eb2d74: # crc32(b'InputChatPhoto') + return photo + elif photo.SUBCLASS_OF_ID == 0xe7655f1f: # crc32(b'InputFile'): + return types.InputChatUploadedPhoto(photo) + except AttributeError: + _raise_cast_fail(photo, 'InputChatPhoto') + + photo = get_input_photo(photo) + if isinstance(photo, types.InputPhoto): + return types.InputChatPhoto(photo) + elif isinstance(photo, types.InputPhotoEmpty): + return types.InputChatPhotoEmpty() + + _raise_cast_fail(photo, 'InputChatPhoto') + + def get_input_geo(geo): """Similar to :meth:`get_input_peer`, but for geo points""" try: diff --git a/telethon_generator/generators/tlobject.py b/telethon_generator/generators/tlobject.py index ee31defb..0bcd983d 100644 --- a/telethon_generator/generators/tlobject.py +++ b/telethon_generator/generators/tlobject.py @@ -25,7 +25,9 @@ AUTO_CASTS = { 'InputNotifyPeer': 'await client._get_input_notify({})', 'InputMedia': 'utils.get_input_media({})', 'InputPhoto': 'utils.get_input_photo({})', - 'InputMessage': 'utils.get_input_message({})' + 'InputMessage': 'utils.get_input_message({})', + 'InputDocument': 'utils.get_input_document({})', + 'InputChatPhoto': 'utils.get_input_chat_photo({})', } NAMED_AUTO_CASTS = { From d392939018713b8fbef646bf76786e34998ffebe Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 5 Oct 2018 14:20:50 +0200 Subject: [PATCH 33/35] Create a default module to use as a sentinel value This looks better in the documentation than utils.Default, cleans the utils with specific stuff like this, and users may use it more easily. --- telethon/client/messageparse.py | 6 +++--- telethon/client/messages.py | 8 ++++---- telethon/client/uploads.py | 8 ++++---- telethon/default.py | 5 +++++ telethon/helpers.py | 1 - telethon/tl/custom/draft.py | 7 ++++--- telethon/tl/custom/inline.py | 12 ++++++------ telethon/utils.py | 8 -------- 8 files changed, 26 insertions(+), 29 deletions(-) create mode 100644 telethon/default.py diff --git a/telethon/client/messageparse.py b/telethon/client/messageparse.py index 1e71605b..0adea5d6 100644 --- a/telethon/client/messageparse.py +++ b/telethon/client/messageparse.py @@ -2,8 +2,8 @@ import itertools import re from .users import UserMethods -from .. import utils -from ..tl import types, custom +from .. import default, utils +from ..tl import types class MessageParseMethods(UserMethods): @@ -62,7 +62,7 @@ class MessageParseMethods(UserMethods): """ Returns a (parsed message, entities) tuple depending on ``parse_mode``. """ - if parse_mode == utils.Default: + if parse_mode == default: parse_mode = self._parse_mode else: parse_mode = utils.sanitize_parse_mode(parse_mode) diff --git a/telethon/client/messages.py b/telethon/client/messages.py index 0c7a203a..88aeeaa7 100644 --- a/telethon/client/messages.py +++ b/telethon/client/messages.py @@ -8,8 +8,8 @@ from async_generator import async_generator, yield_ from .messageparse import MessageParseMethods from .uploads import UploadMethods from .buttons import ButtonMethods -from .. import utils, helpers -from ..tl import types, functions, custom +from .. import default, helpers, utils +from ..tl import types, functions __log__ = logging.getLogger(__name__) @@ -360,7 +360,7 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods): async def send_message( self, entity, message='', *, reply_to=None, - parse_mode=utils.Default, link_preview=True, file=None, + parse_mode=default, link_preview=True, file=None, force_document=False, clear_draft=False, buttons=None, silent=None): """ @@ -584,7 +584,7 @@ class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods): async def edit_message( self, entity, message=None, text=None, - *, parse_mode=utils.Default, link_preview=True, file=None, + *, parse_mode=default, link_preview=True, file=None, buttons=None): """ Edits the given message ID (to change its contents or disable preview). diff --git a/telethon/client/uploads.py b/telethon/client/uploads.py index 6854a669..c1075b07 100644 --- a/telethon/client/uploads.py +++ b/telethon/client/uploads.py @@ -5,11 +5,11 @@ import os import pathlib import re from io import BytesIO -from mimetypes import guess_type +from .buttons import ButtonMethods from .messageparse import MessageParseMethods from .users import UserMethods -from .buttons import ButtonMethods +from .. import default from .. import utils, helpers from ..tl import types, functions, custom @@ -23,7 +23,7 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods): async def send_file( self, entity, file, *, caption='', force_document=False, progress_callback=None, reply_to=None, attributes=None, - thumb=None, allow_cache=True, parse_mode=utils.Default, + thumb=None, allow_cache=True, parse_mode=default, voice_note=False, video_note=False, buttons=None, silent=None, **kwargs): """ @@ -180,7 +180,7 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods): async def _send_album(self, entity, files, caption='', progress_callback=None, reply_to=None, - parse_mode=utils.Default, silent=None): + parse_mode=default, silent=None): """Specialized version of .send_file for albums""" # We don't care if the user wants to avoid cache, we will use it # anyway. Why? The cached version will be exactly the same thing diff --git a/telethon/default.py b/telethon/default.py new file mode 100644 index 00000000..6ba4ad80 --- /dev/null +++ b/telethon/default.py @@ -0,0 +1,5 @@ +""" +Sentinel module to signify that a parameter should use its default value. + +Useful when the default value or ``None`` are both valid options. +""" diff --git a/telethon/helpers.py b/telethon/helpers.py index 861e56ea..5033adef 100644 --- a/telethon/helpers.py +++ b/telethon/helpers.py @@ -1,6 +1,5 @@ """Various helpers not related to the Telegram API itself""" import asyncio -import collections import os import struct from hashlib import sha1, sha256 diff --git a/telethon/tl/custom/draft.py b/telethon/tl/custom/draft.py index 6e931dbc..6d6002f1 100644 --- a/telethon/tl/custom/draft.py +++ b/telethon/tl/custom/draft.py @@ -3,9 +3,10 @@ import datetime from .. import TLObject from ..functions.messages import SaveDraftRequest from ..types import UpdateDraftMessage, DraftMessage +from ... import default from ...errors import RPCError from ...extensions import markdown -from ...utils import Default, get_peer_id, get_input_peer +from ...utils import get_peer_id, get_input_peer class Draft: @@ -116,7 +117,7 @@ class Draft: return not self._text async def set_message( - self, text=None, reply_to=0, parse_mode=Default, + self, text=None, reply_to=0, parse_mode=default, link_preview=None): """ Changes the draft message on the Telegram servers. The changes are @@ -163,7 +164,7 @@ class Draft: return result - async def send(self, clear=True, parse_mode=Default): + async def send(self, clear=True, parse_mode=default): """ Sends the contents of this draft to the dialog. This is just a wrapper around ``send_message(dialog.input_entity, *args, **kwargs)``. diff --git a/telethon/tl/custom/inline.py b/telethon/tl/custom/inline.py index 75a81b94..1983056e 100644 --- a/telethon/tl/custom/inline.py +++ b/telethon/tl/custom/inline.py @@ -1,7 +1,7 @@ import hashlib from .. import functions, types -from ... import utils +from ... import default, utils class InlineBuilder: @@ -55,7 +55,7 @@ class InlineBuilder: async def article( self, title, description=None, *, url=None, thumb=None, content=None, - id=None, text=None, parse_mode=utils.Default, link_preview=True, + id=None, text=None, parse_mode=default, link_preview=True, geo=None, period=60, contact=None, game=False, buttons=None ): """ @@ -105,7 +105,7 @@ class InlineBuilder: async def photo( self, file, *, id=None, - text=None, parse_mode=utils.Default, link_preview=True, + text=None, parse_mode=default, link_preview=True, geo=None, period=60, contact=None, game=False, buttons=None ): """ @@ -144,7 +144,7 @@ class InlineBuilder: self, file, title=None, *, description=None, type=None, mime_type=None, attributes=None, force_document=False, voice_note=False, video_note=False, use_cache=True, id=None, - text=None, parse_mode=utils.Default, link_preview=True, + text=None, parse_mode=default, link_preview=True, geo=None, period=60, contact=None, game=False, buttons=None ): """ @@ -219,7 +219,7 @@ class InlineBuilder: async def game( self, short_name, *, id=None, - text=None, parse_mode=utils.Default, link_preview=True, + text=None, parse_mode=default, link_preview=True, geo=None, period=60, contact=None, game=False, buttons=None ): """ @@ -247,7 +247,7 @@ class InlineBuilder: async def _message( self, *, - text=None, parse_mode=utils.Default, link_preview=True, + text=None, parse_mode=default, link_preview=True, geo=None, period=60, contact=None, game=False, buttons=None ): if sum(1 for x in (text, geo, contact, game) if x) != 1: diff --git a/telethon/utils.py b/telethon/utils.py index 4f98fbc3..de0d9900 100644 --- a/telethon/utils.py +++ b/telethon/utils.py @@ -45,14 +45,6 @@ VALID_USERNAME_RE = re.compile( ) -class Default: - """ - Sentinel value to indicate that the default value should be used. - Currently used for the ``parse_mode``, where a ``None`` mode should - be considered different from using the default. - """ - - def chunks(iterable, size=100): """ Turns the given iterable into chunks of the specified size, From 730cf319212a3d1c9b98bb5dd0986e052b8e8653 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 5 Oct 2018 19:36:01 +0200 Subject: [PATCH 34/35] Delete all (broken) tests --- run_tests.py | 24 ----- telethon_tests/__init__.py | 5 - telethon_tests/test_crypto.py | 143 ---------------------------- telethon_tests/test_higher_level.py | 49 ---------- telethon_tests/test_network.py | 44 --------- telethon_tests/test_parser.py | 8 -- telethon_tests/test_tl.py | 8 -- telethon_tests/test_utils.py | 66 ------------- 8 files changed, 347 deletions(-) delete mode 100755 run_tests.py delete mode 100644 telethon_tests/__init__.py delete mode 100644 telethon_tests/test_crypto.py delete mode 100644 telethon_tests/test_higher_level.py delete mode 100644 telethon_tests/test_network.py delete mode 100644 telethon_tests/test_parser.py delete mode 100644 telethon_tests/test_tl.py delete mode 100644 telethon_tests/test_utils.py diff --git a/run_tests.py b/run_tests.py deleted file mode 100755 index d99cfb56..00000000 --- a/run_tests.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python3 -import unittest - -if __name__ == '__main__': - from telethon_tests import \ - CryptoTests, ParserTests, TLTests, UtilsTests, NetworkTests - - test_classes = [CryptoTests, ParserTests, TLTests, UtilsTests] - - network = input('Run network tests (y/n)?: ').lower() == 'y' - if network: - test_classes.append(NetworkTests) - - loader = unittest.TestLoader() - - suites_list = [] - for test_class in test_classes: - suite = loader.loadTestsFromTestCase(test_class) - suites_list.append(suite) - - big_suite = unittest.TestSuite(suites_list) - - runner = unittest.TextTestRunner() - results = runner.run(big_suite) diff --git a/telethon_tests/__init__.py b/telethon_tests/__init__.py deleted file mode 100644 index 64bb92a6..00000000 --- a/telethon_tests/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .crypto_test import CryptoTests -from .network_test import NetworkTests -from .parser_test import ParserTests -from .tl_test import TLTests -from .utils_test import UtilsTests diff --git a/telethon_tests/test_crypto.py b/telethon_tests/test_crypto.py deleted file mode 100644 index 136e6091..00000000 --- a/telethon_tests/test_crypto.py +++ /dev/null @@ -1,143 +0,0 @@ -import unittest -from hashlib import sha1 - -import telethon.helpers as utils -from telethon.crypto import AES, Factorization -# from crypto.PublicKey import RSA as PyCryptoRSA - - -class CryptoTests(unittest.TestCase): - def setUp(self): - # Test known values - self.key = b'\xd1\xf4MXy\x0c\xf8/z,\xe9\xf9\xa4\x17\x04\xd9C\xc9\xaba\x81\xf3\xf8\xdd\xcb\x0c6\x92\x01\x1f\xc2y' - self.iv = b':\x02\x91x\x90Dj\xa6\x03\x90C\x08\x9e@X\xb5E\xffwy\xf3\x1c\xde\xde\xfbo\x8dm\xd6e.Z' - - self.plain_text = b'Non encrypted text :D' - self.plain_text_padded = b'My len is more uniform, promise!' - - self.cipher_text = b'\xb6\xa7\xec.\xb9\x9bG\xcb\xe9{\x91[\x12\xfc\x84D\x1c' \ - b'\x93\xd9\x17\x03\xcd\xd6\xb1D?\x98\xd2\xb5\xa5U\xfd' - - self.cipher_text_padded = b"W\xd1\xed'\x01\xa6c\xc3\xcb\xef\xaa\xe5\x1d\x1a" \ - b"[\x1b\xdf\xcdI\x1f>Z\n\t\xb9\xd2=\xbaF\xd1\x8e'" - - def test_sha1(self): - string = 'Example string' - - hash_sum = sha1(string.encode('utf-8')).digest() - expected = b'\nT\x92|\x8d\x06:)\x99\x04\x8e\xf8j?\xc4\x8e\xd3}m9' - - self.assertEqual(hash_sum, expected, - msg='Invalid sha1 hash_sum representation (should be {}, but is {})' - .format(expected, hash_sum)) - - @unittest.skip("test_aes_encrypt needs fix") - def test_aes_encrypt(self): - value = AES.encrypt_ige(self.plain_text, self.key, self.iv) - take = 16 # Don't take all the bytes, since latest involve are random padding - self.assertEqual(value[:take], self.cipher_text[:take], - msg='Ciphered text ("{}") does not equal expected ("{}")' - .format(value[:take], self.cipher_text[:take])) - - value = AES.encrypt_ige(self.plain_text_padded, self.key, self.iv) - self.assertEqual(value, self.cipher_text_padded, - msg='Ciphered text ("{}") does not equal expected ("{}")' - .format(value, self.cipher_text_padded)) - - def test_aes_decrypt(self): - # The ciphered text must always be padded - value = AES.decrypt_ige(self.cipher_text_padded, self.key, self.iv) - self.assertEqual(value, self.plain_text_padded, - msg='Decrypted text ("{}") does not equal expected ("{}")' - .format(value, self.plain_text_padded)) - - @unittest.skip("test_calc_key needs fix") - def test_calc_key(self): - # TODO Upgrade test for MtProto 2.0 - shared_key = b'\xbc\xd2m\xb7\xcav\xf4][\x88\x83\' \xf3\x11\x8as\xd04\x941\xae' \ - b'*O\x03\x86\x9a/H#\x1a\x8c\xb5j\xe9$\xe0IvCm^\xe70\x1a5C\t\x16' \ - b'\x03\xd2\x9d\xa9\x89\xd6\xce\x08P\x0fdr\xa0\xb3\xeb\xfecv\x1a' \ - b'\xdfJ\x14\x96\x98\x16\xa3G\xab\x04\x14!\\\xeb\n\xbcn\xdf\xc4%' \ - b'\xc6\t\xb7\x16\x14\x9c\'\x81\x15=\xb0\xaf\x0e\x0bR\xaa\x0466s' \ - b'\xf0\xcf\xb7\xb8>,D\x94x\xd7\xf8\xe0\x84\xcb%\xd3\x05\xb2\xe8' \ - b'\x95Mr?\xa2\xe8In\xf9\x0b[E\x9b\xaa\x0cX\x7f\x0ei\xde\xeed\x1d' \ - b'x/J\xce\xea^}0;\xa83B\xbbR\xa1\xbfe\x04\xb9\x1e\xa1"f=\xa5M@' \ - b'\x9e\xdd\x81\x80\xc9\xa5\xfb\xfcg\xdd\x15\x03p!\x0ffD\x16\x892' \ - b'\xea\xca\xb1A\x99O\xa94P\xa9\xa2\xc6;\xb2C9\x1dC5\xd2\r\xecL' \ - b'\xd9\xabw-\x03\ry\xc2v\x17]\x02\x15\x0cBa\x97\xce\xa5\xb1\xe4]' \ - b'\x8e\xe0,\xcfC{o\xfa\x99f\xa4pM\x00' - - # Calculate key being the client - msg_key = b'\xba\x1a\xcf\xda\xa8^Cbl\xfa\xb6\x0c:\x9b\xb0\xfc' - - key, iv = utils.calc_key(shared_key, msg_key, client=True) - expected_key = b"\xaf\xe3\x84Qm\xe0!\x0c\xd91\xe4\x9a\xa0v_gc" \ - b"x\xa1\xb0\xc9\xbc\x16'v\xcf,\x9dM\xae\xc6\xa5" - - expected_iv = b'\xb8Q\xf3\xc5\xa3]\xc6\xdf\x9e\xe0Q\xbd"\x8d' \ - b'\x13\t\x0e\x9a\x9d^8\xa2\xf8\xe7\x00w\xd9\xc1' \ - b'\xa7\xa0\xf7\x0f' - - self.assertEqual(key, expected_key, - msg='Invalid key (expected ("{}"), got ("{}"))' - .format(expected_key, key)) - self.assertEqual(iv, expected_iv, - msg='Invalid IV (expected ("{}"), got ("{}"))' - .format(expected_iv, iv)) - - # Calculate key being the server - msg_key = b'\x86m\x92i\xcf\x8b\x93\xaa\x86K\x1fi\xd04\x83]' - - key, iv = utils.calc_key(shared_key, msg_key, client=False) - expected_key = b'\xdd0X\xb6\x93\x8e\xc9y\xef\x83\xf8\x8cj' \ - b'\xa7h\x03\xe2\xc6\xb16\xc5\xbb\xfc\xe7' \ - b'\xdf\xd6\xb1g\xf7u\xcfk' - - expected_iv = b'\xdcL\xc2\x18\x01J"X\x86lb\xb6\xb547\xfd' \ - b'\xe2a4\xb6\xaf}FS\xd7[\xe0N\r\x19\xfb\xbc' - - self.assertEqual(key, expected_key, - msg='Invalid key (expected ("{}"), got ("{}"))' - .format(expected_key, key)) - self.assertEqual(iv, expected_iv, - msg='Invalid IV (expected ("{}"), got ("{}"))' - .format(expected_iv, iv)) - - def test_generate_key_data_from_nonce(self): - server_nonce = int.from_bytes(b'The 16-bit nonce', byteorder='little') - new_nonce = int.from_bytes(b'The new, calculated 32-bit nonce', byteorder='little') - - key, iv = utils.generate_key_data_from_nonce(server_nonce, new_nonce) - expected_key = b'/\xaa\x7f\xa1\xfcs\xef\xa0\x99zh\x03M\xa4\x8e\xb4\xab\x0eE]b\x95|\xfe\xc0\xf8\x1f\xd4\xa0\xd4\xec\x91' - expected_iv = b'\xf7\xae\xe3\xc8+=\xc2\xb8\xd1\xe1\x1b\x0e\x10\x07\x9fn\x9e\xdc\x960\x05\xf9\xea\xee\x8b\xa1h The ' - - self.assertEqual(key, expected_key, - msg='Key ("{}") does not equal expected ("{}")' - .format(key, expected_key)) - self.assertEqual(iv, expected_iv, - msg='IV ("{}") does not equal expected ("{}")' - .format(iv, expected_iv)) - - # test_fringerprint_from_key can't be skipped due to ImportError - # def test_fingerprint_from_key(self): - # assert rsa._compute_fingerprint(PyCryptoRSA.importKey( - # '-----BEGIN RSA PUBLIC KEY-----\n' - # 'MIIBCgKCAQEAwVACPi9w23mF3tBkdZz+zwrzKOaaQdr01vAbU4E1pvkfj4sqDsm6\n' - # 'lyDONS789sVoD/xCS9Y0hkkC3gtL1tSfTlgCMOOul9lcixlEKzwKENj1Yz/s7daS\n' - # 'an9tqw3bfUV/nqgbhGX81v/+7RFAEd+RwFnK7a+XYl9sluzHRyVVaTTveB2GazTw\n' - # 'Efzk2DWgkBluml8OREmvfraX3bkHZJTKX4EQSjBbbdJ2ZXIsRrYOXfaA+xayEGB+\n' - # '8hdlLmAjbCVfaigxX0CDqWeR1yFL9kwd9P0NsZRPsmoqVwMbMu7mStFai6aIhc3n\n' - # 'Slv8kg9qv1m6XHVQY3PnEw+QQtqSIXklHwIDAQAB\n' - # '-----END RSA PUBLIC KEY-----' - # )) == b'!k\xe8l\x02+\xb4\xc3', 'Wrong fingerprint calculated' - - def test_factorize(self): - pq = 3118979781119966969 - p, q = Factorization.factorize(pq) - if p > q: - p, q = q, p - - self.assertEqual(p, 1719614201, - msg='Factorized pair did not yield the correct result') - self.assertEqual(q, 1813767169, - msg='Factorized pair did not yield the correct result') diff --git a/telethon_tests/test_higher_level.py b/telethon_tests/test_higher_level.py deleted file mode 100644 index 67fac515..00000000 --- a/telethon_tests/test_higher_level.py +++ /dev/null @@ -1,49 +0,0 @@ -import unittest -import os -from io import BytesIO -from random import randint -from hashlib import sha256 -from telethon import TelegramClient - -# Fill in your api_id and api_hash when running the tests -# and REMOVE THEM once you've finished testing them. -api_id = None -api_hash = None - - -class HigherLevelTests(unittest.TestCase): - def setUp(self): - if not api_id or not api_hash: - raise ValueError('Please fill in both your api_id and api_hash.') - - @unittest.skip("you can't seriously trash random mobile numbers like that :)") - def test_cdn_download(self): - client = TelegramClient(None, api_id, api_hash) - client.session.set_dc(0, '149.154.167.40', 80) - self.assertTrue(client.connect()) - - try: - phone = '+999662' + str(randint(0, 9999)).zfill(4) - client.send_code_request(phone) - client.sign_up('22222', 'Test', 'DC') - - me = client.get_me() - data = os.urandom(2 ** 17) - client.send_file( - me, data, - progress_callback=lambda c, t: - print('test_cdn_download:uploading {:.2%}...'.format(c/t)) - ) - msg = client.get_messages(me)[1][0] - - out = BytesIO() - client.download_media(msg, out) - self.assertEqual(sha256(data).digest(), sha256(out.getvalue()).digest()) - - out = BytesIO() - client.download_media(msg, out) # Won't redirect - self.assertEqual(sha256(data).digest(), sha256(out.getvalue()).digest()) - - client.log_out() - finally: - client.disconnect() diff --git a/telethon_tests/test_network.py b/telethon_tests/test_network.py deleted file mode 100644 index 031ad99d..00000000 --- a/telethon_tests/test_network.py +++ /dev/null @@ -1,44 +0,0 @@ -import random -import socket -import threading -import unittest - -import telethon.network.authenticator as authenticator -from telethon.extensions import TcpClient -from telethon.network import Connection - - -def run_server_echo_thread(port): - def server_thread(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', port)) - s.listen(1) - connection, address = s.accept() - with connection: - data = connection.recv(16) - connection.send(data) - - server = threading.Thread(target=server_thread) - server.start() - - -class NetworkTests(unittest.TestCase): - - @unittest.skip("test_tcp_client needs fix") - def test_tcp_client(self): - port = random.randint(50000, 60000) # Arbitrary non-privileged port - run_server_echo_thread(port) - - msg = b'Unit testing...' - client = TcpClient() - client.connect('localhost', port) - client.write(msg) - self.assertEqual(msg, client.read(15), - msg='Read message does not equal sent message') - client.close() - - @unittest.skip("Some parameters changed, so IP doesn't go there anymore.") - def test_authenticator(self): - transport = Connection('149.154.167.91', 443) - self.assertTrue(authenticator.do_authentication(transport)) - transport.close() diff --git a/telethon_tests/test_parser.py b/telethon_tests/test_parser.py deleted file mode 100644 index c87686a6..00000000 --- a/telethon_tests/test_parser.py +++ /dev/null @@ -1,8 +0,0 @@ -import unittest - - -class ParserTests(unittest.TestCase): - """There are no tests yet""" - @unittest.skip("there should be parser tests") - def test_parser(self): - self.assertTrue(True) diff --git a/telethon_tests/test_tl.py b/telethon_tests/test_tl.py deleted file mode 100644 index 189259f5..00000000 --- a/telethon_tests/test_tl.py +++ /dev/null @@ -1,8 +0,0 @@ -import unittest - - -class TLTests(unittest.TestCase): - """There are no tests yet""" - @unittest.skip("there should be TL tests") - def test_tl(self): - self.assertTrue(True) \ No newline at end of file diff --git a/telethon_tests/test_utils.py b/telethon_tests/test_utils.py deleted file mode 100644 index 4a550e3d..00000000 --- a/telethon_tests/test_utils.py +++ /dev/null @@ -1,66 +0,0 @@ -import os -import unittest -from telethon.tl import TLObject -from telethon.extensions import BinaryReader - - -class UtilsTests(unittest.TestCase): - def test_binary_writer_reader(self): - # Test that we can read properly - data = b'\x01\x05\x00\x00\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00' \ - b'\x88A\x00\x00\x00\x00\x00\x009@\x1a\x1b\x1c\x1d\x1e\x1f ' \ - b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' \ - b'\x00\x80' - - with BinaryReader(data) as reader: - value = reader.read_byte() - self.assertEqual(value, 1, - msg='Example byte should be 1 but is {}'.format(value)) - - value = reader.read_int() - self.assertEqual(value, 5, - msg='Example integer should be 5 but is {}'.format(value)) - - value = reader.read_long() - self.assertEqual(value, 13, - msg='Example long integer should be 13 but is {}'.format(value)) - - value = reader.read_float() - self.assertEqual(value, 17.0, - msg='Example float should be 17.0 but is {}'.format(value)) - - value = reader.read_double() - self.assertEqual(value, 25.0, - msg='Example double should be 25.0 but is {}'.format(value)) - - value = reader.read(7) - self.assertEqual(value, bytes([26, 27, 28, 29, 30, 31, 32]), - msg='Example bytes should be {} but is {}' - .format(bytes([26, 27, 28, 29, 30, 31, 32]), value)) - - value = reader.read_large_int(128, signed=False) - self.assertEqual(value, 2**127, - msg='Example large integer should be {} but is {}'.format(2**127, value)) - - def test_binary_tgwriter_tgreader(self): - small_data = os.urandom(33) - small_data_padded = os.urandom(19) # +1 byte for length = 20 (%4 = 0) - - large_data = os.urandom(999) - large_data_padded = os.urandom(1024) - - data = (small_data, small_data_padded, large_data, large_data_padded) - string = 'Testing Telegram strings, this should work properly!' - serialized = b''.join(TLObject.serialize_bytes(d) for d in data) + \ - TLObject.serialize_bytes(string) - - with BinaryReader(serialized) as reader: - # And then try reading it without errors (it should be unharmed!) - for datum in data: - value = reader.tgread_bytes() - self.assertEqual(value, datum, - msg='Example bytes should be {} but is {}'.format(datum, value)) - - value = reader.tgread_string() - self.assertEqual(value, string, - msg='Example string should be {} but is {}'.format(string, value)) From 653f3c043d6644ffca460f4bf53e3da5cc4c8251 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Fri, 5 Oct 2018 20:59:56 +0300 Subject: [PATCH 35/35] Add full_sync module (#1016) --- telethon/__init__.py | 2 +- telethon/full_sync.py | 161 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 telethon/full_sync.py diff --git a/telethon/__init__.py b/telethon/__init__.py index f0d2b0b8..fb945a13 100644 --- a/telethon/__init__.py +++ b/telethon/__init__.py @@ -2,7 +2,7 @@ import logging from .client.telegramclient import TelegramClient from .network import connection from .tl import types, functions, custom -from . import version, events, utils, errors +from . import version, events, utils, errors, full_sync __version__ = version.__version__ diff --git a/telethon/full_sync.py b/telethon/full_sync.py new file mode 100644 index 00000000..5ca4a7e6 --- /dev/null +++ b/telethon/full_sync.py @@ -0,0 +1,161 @@ +""" +This magical module will rewrite all public methods in the public interface of +the library so they can delegate the call to an asyncio event loop in another +thread and wait for the result. This rewrite may not be desirable if the end +user always uses the methods they way they should be ran, but it's incredibly +useful for quick scripts and legacy code. +""" +import asyncio +import functools +import inspect +import threading +from concurrent.futures import Future, ThreadPoolExecutor + +from async_generator import isasyncgenfunction + +from .client.telegramclient import TelegramClient +from .tl.custom import ( + Draft, Dialog, MessageButton, Forward, Message, InlineResult, Conversation +) +from .tl.custom.chatgetter import ChatGetter +from .tl.custom.sendergetter import SenderGetter + + +async def _proxy_future(af, cf): + try: + res = await af + cf.set_result(res) + except Exception as e: + cf.set_exception(e) + + +def _sync_result(loop, x): + f = Future() + loop.call_soon_threadsafe(asyncio.ensure_future, _proxy_future(x, f)) + return f.result() + + +class _SyncGen: + def __init__(self, loop, gen): + self.loop = loop + self.gen = gen + + def __iter__(self): + return self + + def __next__(self): + try: + return _sync_result(self.loop, self.gen.__anext__()) + except StopAsyncIteration: + raise StopIteration from None + + +def _syncify_wrap(t, method_name, loop, thread_ident, syncifier=_sync_result): + method = getattr(t, method_name) + + @functools.wraps(method) + def syncified(*args, **kwargs): + coro = method(*args, **kwargs) + return ( + coro if threading.get_ident() == thread_ident + else syncifier(loop, coro) + ) + + setattr(t, method_name, syncified) + + +def _syncify(*types, loop, thread_ident): + for t in types: + for method_name in dir(t): + if not method_name.startswith('_') or method_name == '__call__': + if inspect.iscoroutinefunction(getattr(t, method_name)): + _syncify_wrap(t, method_name, loop, thread_ident, _sync_result) + elif isasyncgenfunction(getattr(t, method_name)): + _syncify_wrap(t, method_name, loop, thread_ident, _SyncGen) + + +__asyncthread = None + + +def enable(*, loop=None, executor=None, max_workers=1): + """ + Enables the fully synchronous mode. You should enable this at + the beginning of your script, right after importing, only once. + + **Please** make sure to call `stop` at the end of your script. + + You can define the event loop to use and executor, otherwise + the default loop and ``ThreadPoolExecutor`` will be used, in + which case `max_workers` will be passed to it. If you pass a + custom executor, `max_workers` will be ignored. + """ + global __asyncthread + if __asyncthread is not None: + raise RuntimeError("full_sync can only be enabled once") + + if not loop: + loop = asyncio.get_event_loop() + if not executor: + executor = ThreadPoolExecutor(max_workers=max_workers) + + def start(): + asyncio.set_event_loop(loop) + loop.run_forever() + + __asyncthread = threading.Thread( + target=start, name="__telethon_async_thread__", daemon=True + ) + __asyncthread.start() + __asyncthread.loop = loop + __asyncthread.executor = executor + + TelegramClient.__init__ = functools.partialmethod( + TelegramClient.__init__, loop=loop + ) + + _syncify(TelegramClient, Draft, Dialog, MessageButton, ChatGetter, + SenderGetter, Forward, Message, InlineResult, Conversation, + loop=loop, thread_ident=__asyncthread.ident) + _syncify_wrap(TelegramClient, "start", loop, __asyncthread.ident) + + old_add_event_handler = TelegramClient.add_event_handler + old_remove_event_handler = TelegramClient.remove_event_handler + proxied_event_handlers = {} + + @functools.wraps(old_add_event_handler) + def add_proxied_event_handler(self, callback, *args, **kwargs): + async def _proxy(*pargs, **pkwargs): + await loop.run_in_executor( + executor, functools.partial(callback, *pargs, **pkwargs)) + + proxied_event_handlers[callback] = _proxy + + args = (self, _proxy, *args) + return old_add_event_handler(*args, **kwargs) + + @functools.wraps(old_remove_event_handler) + def remove_proxied_event_handler(self, callback, *args, **kwargs): + args = (self, proxied_event_handlers.get(callback, callback), *args) + return old_remove_event_handler(*args, **kwargs) + + TelegramClient.add_event_handler = add_proxied_event_handler + TelegramClient.remove_event_handler = remove_proxied_event_handler + + def run_until_disconnected(self): + return _sync_result(loop, self._run_until_disconnected()) + + TelegramClient.run_until_disconnected = run_until_disconnected + + return __asyncthread + + +def stop(): + """ + Stops the fully synchronous code. You + should call this before your script exits. + """ + global __asyncthread + if not __asyncthread: + raise RuntimeError("Can't find asyncio thread") + __asyncthread.loop.call_soon_threadsafe(__asyncthread.loop.stop) + __asyncthread.executor.shutdown()