From 9716d1d5436c6a0f9568c977602afcdc11e043be Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 6 Oct 2017 19:30:14 +0200 Subject: [PATCH 01/23] Remove all Thread's except from UpdateState --- telethon/extensions/tcp_client.py | 23 ++--- telethon/telegram_bare_client.py | 161 +++--------------------------- telethon/telegram_client.py | 27 +---- telethon/tl/entity_database.py | 15 +-- telethon/tl/session.py | 55 +++++----- 5 files changed, 51 insertions(+), 230 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 6feb9841..134c36ae 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -3,14 +3,12 @@ import errno import socket from datetime import timedelta from io import BytesIO, BufferedWriter -from threading import Lock class TcpClient: def __init__(self, proxy=None, timeout=timedelta(seconds=5)): self.proxy = proxy self._socket = None - self._closing_lock = Lock() if isinstance(timeout, timedelta): self.timeout = timeout.seconds @@ -65,19 +63,14 @@ class TcpClient: def close(self): """Closes the connection""" - if self._closing_lock.locked(): - # Already closing, no need to close again (avoid None.close()) - return - - with self._closing_lock: - try: - if self._socket is not None: - self._socket.shutdown(socket.SHUT_RDWR) - self._socket.close() - except OSError: - pass # Ignore ENOTCONN, EBADF, and any other error when closing - finally: - self._socket = None + try: + if self._socket is not None: + self._socket.shutdown(socket.SHUT_RDWR) + self._socket.close() + except OSError: + pass # Ignore ENOTCONN, EBADF, and any other error when closing + finally: + self._socket = None def write(self, data): """Writes (sends) the specified bytes to the connected peer""" diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 17e8a364..3dc1b180 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -1,10 +1,8 @@ import logging import os -import threading from datetime import timedelta, datetime from hashlib import md5 from io import BytesIO -from threading import Lock from time import sleep from . import helpers as utils @@ -18,7 +16,7 @@ from .network import authenticator, MtProtoSender, Connection, ConnectionMode from .tl import TLObject, Session from .tl.all_tlobjects import LAYER from .tl.functions import ( - InitConnectionRequest, InvokeWithLayerRequest, PingRequest + InitConnectionRequest, InvokeWithLayerRequest ) from .tl.functions.auth import ( ImportAuthorizationRequest, ExportAuthorizationRequest @@ -67,8 +65,6 @@ class TelegramBareClient: def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, - update_workers=None, - spawn_read_thread=False, timeout=timedelta(seconds=5), **kwargs): """Refer to TelegramClient.__init__ for docs on this method""" @@ -101,10 +97,6 @@ class TelegramBareClient: self._logger = logging.getLogger(__name__) - # Two threads may be calling reconnect() when the connection is lost, - # we only want one to actually perform the reconnection. - self._reconnect_lock = Lock() - # Cache "exported" sessions as 'dc_id: Session' not to recreate # them all the time since generating a new key is a relatively # expensive operation. @@ -112,7 +104,7 @@ class TelegramBareClient: # This member will process updates if enabled. # One may change self.updates.enabled at any later point. - self.updates = UpdateState(workers=update_workers) + self.updates = UpdateState(workers=None) # Used on connection - the user may modify these and reconnect kwargs['app_version'] = kwargs.get('app_version', self.__version__) @@ -136,24 +128,10 @@ class TelegramBareClient: # Uploaded files cache so subsequent calls are instant self._upload_cache = {} - # Constantly read for results and updates from within the main client, - # if the user has left enabled such option. - self._spawn_read_thread = spawn_read_thread - self._recv_thread = None - - # Identifier of the main thread (the one that called .connect()). - # This will be used to create new connections from any other thread, - # so that requests can be sent in parallel. - self._main_thread_ident = None - # Default PingRequest delay self._last_ping = datetime.now() self._ping_delay = timedelta(minutes=1) - # Some errors are known but there's nothing we can do from the - # background thread. If any of these happens, call .disconnect(), - # and raise them next time .invoke() is tried to be called. - self._background_error = None # endregion @@ -179,9 +157,6 @@ class TelegramBareClient: If '_cdn' is False, methods that are not allowed on such data centers won't be invoked. """ - self._main_thread_ident = threading.get_ident() - self._background_error = None # Clear previous errors - try: self._sender.connect() if not self.session.auth_key: @@ -268,20 +243,9 @@ class TelegramBareClient: return result def disconnect(self): - """Disconnects from the Telegram server - and stops all the spawned threads""" + """Disconnects from the Telegram server""" self._user_connected = False - self._recv_thread = None - - # Stop the workers from the background thread - self.updates.stop_workers() - - # This will trigger a "ConnectionResetError", for subsequent calls - # to read or send (from another thread) and usually, the background - # thread would try restarting the connection but since the - # ._recv_thread = None, it knows it doesn't have to. self._sender.disconnect() - # TODO Shall we clear the _exported_sessions, or may be reused? pass @@ -296,12 +260,7 @@ class TelegramBareClient: """ if new_dc is None: # Assume we are disconnected due to some error, so connect again - with self._reconnect_lock: - # Another thread may have connected again, so check that first - if not self.is_connected(): - return self.connect() - else: - return True + return self.connect() else: self.disconnect() self.session.auth_key = None # Force creating new auth_key @@ -316,10 +275,6 @@ class TelegramBareClient: # region Working with different connections/Data Centers - def _on_read_thread(self): - return self._recv_thread is not None and \ - threading.get_ident() == self._recv_thread.ident - def _get_dc(self, dc_id, ipv6=False, cdn=False): """Gets the Data Center (DC) associated to 'dc_id'""" if TelegramBareClient._dc_options is None: @@ -424,26 +379,12 @@ class TelegramBareClient: x.content_related for x in requests): raise ValueError('You can only invoke requests, not types!') - # Determine the sender to be used (main or a new connection) - on_main_thread = threading.get_ident() == self._main_thread_ident - if on_main_thread or self._on_read_thread(): - sender = self._sender - else: - sender = self._sender.clone() - sender.connect() + # TODO Determine the sender to be used (main or a new connection) + sender = self._sender # .clone(), .connect() - # We should call receive from this thread if there's no background - # thread reading or if the server disconnected us and we're trying - # to reconnect. This is because the read thread may either be - # locked also trying to reconnect or we may be said thread already. - call_receive = not on_main_thread or self._recv_thread is None \ - or self._reconnect_lock.locked() try: for _ in range(retries): - if self._background_error and on_main_thread: - raise self._background_error - - result = self._invoke(sender, call_receive, *requests) + result = self._invoke(sender, *requests) if result: return result @@ -455,7 +396,7 @@ class TelegramBareClient: # Let people use client.invoke(SomeRequest()) instead client(...) invoke = __call__ - def _invoke(self, sender, call_receive, *requests): + def _invoke(self, sender, *requests): try: # Ensure that we start with no previous errors (i.e. resending) for x in requests: @@ -463,27 +404,15 @@ class TelegramBareClient: x.rpc_error = None sender.send(*requests) - - if not call_receive: - # TODO This will be slightly troublesome if we allow - # switching between constant read or not on the fly. - # Must also watch out for calling .read() from two places, - # in which case a Lock would be required for .receive(). - for x in requests: - x.confirm_received.wait( - sender.connection.get_timeout() - ) - else: - while not all(x.confirm_received.is_set() for x in requests): - sender.receive(update_state=self.updates) + while not all(x.confirm_received.is_set() for x in requests): + sender.receive(update_state=self.updates) except TimeoutError: pass # We will just retry except ConnectionResetError: - if not self._authorized or self._reconnect_lock.locked(): - # Only attempt reconnecting if we're authorized and not - # reconnecting already. + if not self._authorized: + # Only attempt reconnecting if we're authorized raise self._logger.debug('Server disconnected us. Reconnecting and ' @@ -520,12 +449,8 @@ class TelegramBareClient: 'attempting to reconnect at DC {}'.format(e.new_dc) ) - # TODO What happens with the background thread here? - # For normal use cases, this won't happen, because this will only - # be on the very first connection (not authorized, not running), - # but may be an issue for people who actually travel? self._reconnect(new_dc=e.new_dc) - return self._invoke(sender, call_receive, *requests) + return self._invoke(sender, *requests) except ServerError as e: # Telegram is having some issues, just retry @@ -759,64 +684,6 @@ class TelegramBareClient: def _set_connected_and_authorized(self): self._authorized = True - self.updates.setup_workers() - if self._spawn_read_thread and self._recv_thread is None: - self._recv_thread = threading.Thread( - name='ReadThread', daemon=True, - target=self._recv_thread_impl - ) - self._recv_thread.start() - - # By using this approach, another thread will be - # created and started upon connection to constantly read - # from the other end. Otherwise, manual calls to .receive() - # must be performed. The MtProtoSender cannot be connected, - # or an error will be thrown. - # - # This way, sending and receiving will be completely independent. - def _recv_thread_impl(self): - while self._user_connected: - try: - if datetime.now() > self._last_ping + self._ping_delay: - self._sender.send(PingRequest( - int.from_bytes(os.urandom(8), 'big', signed=True) - )) - self._last_ping = datetime.now() - - self._sender.receive(update_state=self.updates) - except TimeoutError: - # No problem. - pass - except ConnectionResetError: - self._logger.debug('Server disconnected us. Reconnecting...') - while self._user_connected and not self._reconnect(): - sleep(0.1) # Retry forever, this is instant messaging - - except Exception as error: - # Unknown exception, pass it to the main thread - self._logger.debug( - '[ERROR] Unknown error on the read thread, please report', - error - ) - - try: - import socks - if isinstance(error, socks.GeneralProxyError): - # This is a known error, and it's not related to - # Telegram but rather to the proxy. Disconnect and - # hand it over to the main thread. - self._background_error = error - self.disconnect() - break - except ImportError: - "Not using PySocks, so it can't be a socket error" - - # If something strange happens we don't want to enter an - # infinite loop where all we do is raise an exception, so - # add a little sleep to avoid the CPU usage going mad. - sleep(0.1) - break - - self._recv_thread = None + # TODO self.updates.setup_workers() # endregion diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index bdaa41d8..80cfea50 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -52,21 +52,14 @@ from .tl.types.messages import DialogsSlice class TelegramClient(TelegramBareClient): - """Full featured TelegramClient meant to extend the basic functionality - - - As opposed to the TelegramBareClient, this one features downloading - media from different data centers, starting a second thread to - handle updates, and some very common functionality. - """ + """Full featured TelegramClient meant to extend the basic functionality""" # region Initialization def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, - update_workers=None, timeout=timedelta(seconds=5), - spawn_read_thread=True, **kwargs): """Initializes the Telegram client with the specified API ID and Hash. @@ -79,22 +72,6 @@ class TelegramClient(TelegramBareClient): This will only affect how messages are sent over the network and how much processing is required before sending them. - The integer 'update_workers' represents depending on its value: - is None: Updates will *not* be stored in memory. - = 0: Another thread is responsible for calling self.updates.poll() - > 0: 'update_workers' background threads will be spawned, any - any of them will invoke all the self.updates.handlers. - - If 'spawn_read_thread', a background thread will be started once - an authorized user has been logged in to Telegram to read items - (such as updates and responses) from the network as soon as they - occur, which will speed things up. - - If you don't want to spawn any additional threads, pending updates - will be read and processed accordingly after invoking a request - and not immediately. This is useful if you don't care about updates - at all and have set 'update_workers=None'. - If more named arguments are provided as **kwargs, they will be used to update the Session instance. Most common settings are: device_model = platform.node() @@ -108,8 +85,6 @@ class TelegramClient(TelegramBareClient): session, api_id, api_hash, connection_mode=connection_mode, proxy=proxy, - update_workers=update_workers, - spawn_read_thread=spawn_read_thread, timeout=timeout, **kwargs ) diff --git a/telethon/tl/entity_database.py b/telethon/tl/entity_database.py index 61c07efc..da105616 100644 --- a/telethon/tl/entity_database.py +++ b/telethon/tl/entity_database.py @@ -1,5 +1,3 @@ -from threading import Lock - import re from .. import utils @@ -20,8 +18,6 @@ class EntityDatabase: """ self.enabled = enabled self.enabled_full = enabled_full - - self._lock = Lock() self._entities = {} # marked_id: user|chat|channel if input_list: @@ -81,12 +77,11 @@ class EntityDatabase: except ValueError: pass - with self._lock: - before = len(self._input_entities) - self._input_entities.update(new_input) - for e in new: - self._add_full_entity(e) - return len(self._input_entities) != before + before = len(self._input_entities) + self._input_entities.update(new_input) + for e in new: + self._add_full_entity(e) + return len(self._input_entities) != before def _add_full_entity(self, entity): """Adds a "full" entity (User, Chat or Channel, not "Input*"), diff --git a/telethon/tl/session.py b/telethon/tl/session.py index 98ffda16..b722144e 100644 --- a/telethon/tl/session.py +++ b/telethon/tl/session.py @@ -4,7 +4,6 @@ import platform import time from base64 import b64encode, b64decode from os.path import isfile as file_exists -from threading import Lock from .entity_database import EntityDatabase from .. import helpers @@ -51,11 +50,6 @@ class Session: self.report_errors = True self.save_entities = True - # Cross-thread safety - self._seq_no_lock = Lock() - self._msg_id_lock = Lock() - self._save_lock = Lock() - self.id = helpers.generate_random_long(signed=False) self._sequence = 0 self.time_offset = 0 @@ -71,24 +65,23 @@ class Session: def save(self): """Saves the current session object as session_user_id.session""" - if not self.session_user_id or self._save_lock.locked(): + if not self.session_user_id: return - with self._save_lock: - with open('{}.session'.format(self.session_user_id), 'w') as file: - out_dict = { - 'port': self.port, - 'salt': self.salt, - 'layer': self.layer, - 'server_address': self.server_address, - 'auth_key_data': - b64encode(self.auth_key.key).decode('ascii') - if self.auth_key else None - } - if self.save_entities: - out_dict['entities'] = self.entities.get_input_list() + with open('{}.session'.format(self.session_user_id), 'w') as file: + out_dict = { + 'port': self.port, + 'salt': self.salt, + 'layer': self.layer, + 'server_address': self.server_address, + 'auth_key_data': + b64encode(self.auth_key.key).decode('ascii') + if self.auth_key else None + } + if self.save_entities: + out_dict['entities'] = self.entities.get_input_list() - json.dump(out_dict, file) + json.dump(out_dict, file) def delete(self): """Deletes the current session file""" @@ -149,13 +142,12 @@ class Session: Note that if confirmed=True, the sequence number will be increased by one too """ - with self._seq_no_lock: - if content_related: - result = self._sequence * 2 + 1 - self._sequence += 1 - return result - else: - return self._sequence * 2 + if content_related: + result = self._sequence * 2 + 1 + self._sequence += 1 + return result + else: + return self._sequence * 2 def get_new_msg_id(self): """Generates a new unique message ID based on the current @@ -166,11 +158,10 @@ class Session: # "message identifiers are divisible by 4" new_msg_id = (int(now) << 32) | (nanoseconds << 2) - with self._msg_id_lock: - if self._last_msg_id >= new_msg_id: - new_msg_id = self._last_msg_id + 4 + if self._last_msg_id >= new_msg_id: + new_msg_id = self._last_msg_id + 4 - self._last_msg_id = new_msg_id + self._last_msg_id = new_msg_id return new_msg_id From 77c99db0660c744376bdb1f04b1a15c2b31958b7 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 6 Oct 2017 21:02:41 +0200 Subject: [PATCH 02/23] Use async def everywhere --- telethon/extensions/tcp_client.py | 22 ++-- telethon/network/authenticator.py | 18 +-- telethon/network/connection.py | 80 ++++++------- telethon/network/mtproto_plain_sender.py | 12 +- telethon/network/mtproto_sender.py | 56 ++++----- telethon/telegram_bare_client.py | 85 +++++++------- telethon/telegram_client.py | 141 +++++++++++------------ 7 files changed, 206 insertions(+), 208 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 134c36ae..672c0a0f 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -1,9 +1,12 @@ # Python rough implementation of a C# TCP client +import asyncio import errno import socket from datetime import timedelta from io import BytesIO, BufferedWriter +loop = asyncio.get_event_loop() + class TcpClient: def __init__(self, proxy=None, timeout=timedelta(seconds=5)): @@ -30,7 +33,7 @@ class TcpClient: self._socket.settimeout(self.timeout) - def connect(self, ip, port): + async def connect(self, ip, port): """Connects to the specified IP and port number. 'timeout' must be given in seconds """ @@ -44,7 +47,7 @@ class TcpClient: while not self._socket: self._recreate_socket(mode) - self._socket.connect(address) + await loop.sock_connect(self._socket, address) break # Successful connection, stop retrying to connect except OSError as e: # There are some errors that we know how to handle, and @@ -72,15 +75,13 @@ class TcpClient: finally: self._socket = None - def write(self, data): + async def write(self, data): """Writes (sends) the specified bytes to the connected peer""" if self._socket is None: raise ConnectionResetError() - # TODO Timeout may be an issue when sending the data, Changed in v3.5: - # The socket timeout is now the maximum total duration to send all data. try: - self._socket.sendall(data) + await loop.sock_sendall(self._socket, data) except socket.timeout as e: raise TimeoutError() from e except BrokenPipeError: @@ -91,14 +92,9 @@ class TcpClient: else: raise - def read(self, size): + async def read(self, size): """Reads (receives) a whole block of 'size bytes from the connected peer. - - A timeout can be specified, which will cancel the operation if - no data has been read in the specified time. If data was read - and it's waiting for more, the timeout will NOT cancel the - operation. Set to None for no timeout """ if self._socket is None: raise ConnectionResetError() @@ -108,7 +104,7 @@ class TcpClient: bytes_left = size while bytes_left != 0: try: - partial = self._socket.recv(bytes_left) + partial = await loop.sock_recv(self._socket, bytes_left) except socket.timeout as e: raise TimeoutError() from e except OSError as e: diff --git a/telethon/network/authenticator.py b/telethon/network/authenticator.py index 1081897a..f46f5430 100644 --- a/telethon/network/authenticator.py +++ b/telethon/network/authenticator.py @@ -17,21 +17,21 @@ from ..tl.functions import ( ) -def do_authentication(connection, retries=5): +async def do_authentication(connection, retries=5): if not retries or retries < 0: retries = 1 last_error = None while retries: try: - return _do_authentication(connection) + return await _do_authentication(connection) except (SecurityError, AssertionError, NotImplementedError) as e: last_error = e retries -= 1 raise last_error -def _do_authentication(connection): +async def _do_authentication(connection): """Executes the authentication process with the Telegram servers. If no error is raised, returns both the authorization key and the time offset. @@ -42,8 +42,8 @@ def _do_authentication(connection): req_pq_request = ReqPqRequest( nonce=int.from_bytes(os.urandom(16), 'big', signed=True) ) - sender.send(req_pq_request.to_bytes()) - with BinaryReader(sender.receive()) as reader: + await sender.send(req_pq_request.to_bytes()) + with BinaryReader(await sender.receive()) as reader: req_pq_request.on_response(reader) res_pq = req_pq_request.result @@ -90,10 +90,10 @@ def _do_authentication(connection): public_key_fingerprint=target_fingerprint, encrypted_data=cipher_text ) - sender.send(req_dh_params.to_bytes()) + await sender.send(req_dh_params.to_bytes()) # Step 2 response: DH Exchange - with BinaryReader(sender.receive()) as reader: + with BinaryReader(await sender.receive()) as reader: req_dh_params.on_response(reader) server_dh_params = req_dh_params.result @@ -157,10 +157,10 @@ def _do_authentication(connection): server_nonce=res_pq.server_nonce, encrypted_data=client_dh_encrypted, ) - sender.send(set_client_dh.to_bytes()) + await sender.send(set_client_dh.to_bytes()) # Step 3 response: Complete DH Exchange - with BinaryReader(sender.receive()) as reader: + with BinaryReader(await sender.receive()) as reader: set_client_dh.on_response(reader) dh_gen = set_client_dh.result diff --git a/telethon/network/connection.py b/telethon/network/connection.py index 2500c0c1..77a3c87b 100644 --- a/telethon/network/connection.py +++ b/telethon/network/connection.py @@ -1,14 +1,13 @@ +import errno import os import struct from datetime import timedelta -from zlib import crc32 from enum import Enum - -import errno +from zlib import crc32 from ..crypto import AESModeCTR -from ..extensions import TcpClient from ..errors import InvalidChecksumError +from ..extensions import TcpClient class ConnectionMode(Enum): @@ -74,9 +73,9 @@ class Connection: setattr(self, 'write', self._write_plain) setattr(self, 'read', self._read_plain) - def connect(self, ip, port): + async def connect(self, ip, port): try: - self.conn.connect(ip, port) + await self.conn.connect(ip, port) except OSError as e: if e.errno == errno.EISCONN: return # Already connected, no need to re-set everything up @@ -85,16 +84,16 @@ class Connection: self._send_counter = 0 if self._mode == ConnectionMode.TCP_ABRIDGED: - self.conn.write(b'\xef') + await self.conn.write(b'\xef') elif self._mode == ConnectionMode.TCP_INTERMEDIATE: - self.conn.write(b'\xee\xee\xee\xee') + await self.conn.write(b'\xee\xee\xee\xee') elif self._mode == ConnectionMode.TCP_OBFUSCATED: - self._setup_obfuscation() + await self._setup_obfuscation() def get_timeout(self): return self.conn.timeout - def _setup_obfuscation(self): + async def _setup_obfuscation(self): # Obfuscated messages secrets cannot start with any of these keywords = (b'PVrG', b'GET ', b'POST', b'\xee' * 4) while True: @@ -119,7 +118,7 @@ class Connection: self._aes_decrypt = AESModeCTR(decrypt_key, decrypt_iv) random[56:64] = self._aes_encrypt.encrypt(bytes(random))[56:64] - self.conn.write(bytes(random)) + await self.conn.write(bytes(random)) def is_connected(self): return self.conn.connected @@ -135,20 +134,23 @@ class Connection: # region Receive message implementations - def recv(self): + async def recv(self): """Receives and unpacks a message""" # Default implementation is just an error raise ValueError('Invalid connection mode specified: ' + str(self._mode)) - def _recv_tcp_full(self): - packet_length_bytes = self.read(4) + async def _recv_tcp_full(self): + # TODO We don't want another call to this method that could + # potentially await on another self.read(n). Is this guaranteed + # by asyncio? + packet_length_bytes = await self.read(4) packet_length = int.from_bytes(packet_length_bytes, 'little') - seq_bytes = self.read(4) + seq_bytes = await self.read(4) seq = int.from_bytes(seq_bytes, 'little') - body = self.read(packet_length - 12) - checksum = int.from_bytes(self.read(4), 'little') + body = await self.read(packet_length - 12) + checksum = int.from_bytes(await self.read(4), 'little') valid_checksum = crc32(packet_length_bytes + seq_bytes + body) if checksum != valid_checksum: @@ -156,72 +158,70 @@ class Connection: return body - def _recv_intermediate(self): - return self.read(int.from_bytes(self.read(4), 'little')) + async def _recv_intermediate(self): + return await self.read(int.from_bytes(self.read(4), 'little')) - def _recv_abridged(self): + async def _recv_abridged(self): length = int.from_bytes(self.read(1), 'little') if length >= 127: length = int.from_bytes(self.read(3) + b'\0', 'little') - return self.read(length << 2) + return await self.read(length << 2) # endregion # region Send message implementations - def send(self, message): + async def send(self, message): """Encapsulates and sends the given message""" # Default implementation is just an error raise ValueError('Invalid connection mode specified: ' + str(self._mode)) - def _send_tcp_full(self, message): + async def _send_tcp_full(self, message): # https://core.telegram.org/mtproto#tcp-transport # total length, sequence number, packet and checksum (CRC32) length = len(message) + 12 data = struct.pack('> 2 if length < 127: length = struct.pack('B', length) else: length = b'\x7f' + int.to_bytes(length, 3, 'little') - self.write(length + message) + await self.write(length + message) # endregion # region Read implementations - def read(self, length): + async def read(self, length): raise ValueError('Invalid connection mode specified: ' + str(self._mode)) - def _read_plain(self, length): - return self.conn.read(length) + async def _read_plain(self, length): + return await self.conn.read(length) - def _read_obfuscated(self, length): - return self._aes_decrypt.encrypt( - self.conn.read(length) - ) + async def _read_obfuscated(self, length): + return await self._aes_decrypt.encrypt(self.conn.read(length)) # endregion # region Write implementations - def write(self, data): + async def write(self, data): raise ValueError('Invalid connection mode specified: ' + str(self._mode)) - def _write_plain(self, data): - self.conn.write(data) + async def _write_plain(self, data): + await self.conn.write(data) - def _write_obfuscated(self, data): - self.conn.write(self._aes_encrypt.encrypt(data)) + async def _write_obfuscated(self, data): + await self.conn.write(self._aes_encrypt.encrypt(data)) # endregion diff --git a/telethon/network/mtproto_plain_sender.py b/telethon/network/mtproto_plain_sender.py index c7c021be..9089a72d 100644 --- a/telethon/network/mtproto_plain_sender.py +++ b/telethon/network/mtproto_plain_sender.py @@ -16,23 +16,23 @@ class MtProtoPlainSender: self._last_msg_id = 0 self._connection = connection - def connect(self): - self._connection.connect() + async def connect(self): + await self._connection.connect() def disconnect(self): self._connection.close() - def send(self, data): + async def send(self, data): """Sends a plain packet (auth_key_id = 0) containing the given message body (data) """ - self._connection.send( + await self._connection.send( struct.pack(' Date: Sat, 7 Oct 2017 09:50:23 +0200 Subject: [PATCH 03/23] Fix CdnDecrypter not being async --- telethon/crypto/cdn_decrypter.py | 10 +++++----- telethon/telegram_bare_client.py | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/telethon/crypto/cdn_decrypter.py b/telethon/crypto/cdn_decrypter.py index f4b0ef3e..0381f62d 100644 --- a/telethon/crypto/cdn_decrypter.py +++ b/telethon/crypto/cdn_decrypter.py @@ -19,7 +19,7 @@ class CdnDecrypter: self.cdn_file_hashes = cdn_file_hashes @staticmethod - def prepare_decrypter(client, cdn_client, cdn_redirect): + async def prepare_decrypter(client, cdn_client, cdn_redirect): """Prepares a CDN decrypter, returning (decrypter, file data). 'client' should be an existing client not connected to a CDN. 'cdn_client' should be an already-connected TelegramBareClient @@ -38,14 +38,14 @@ class CdnDecrypter: cdn_aes, cdn_redirect.cdn_file_hashes ) - cdn_file = cdn_client(GetCdnFileRequest( + cdn_file = await cdn_client(GetCdnFileRequest( file_token=cdn_redirect.file_token, offset=cdn_redirect.cdn_file_hashes[0].offset, limit=cdn_redirect.cdn_file_hashes[0].limit )) if isinstance(cdn_file, CdnFileReuploadNeeded): # We need to use the original client here - client(ReuploadCdnFileRequest( + await client(ReuploadCdnFileRequest( file_token=cdn_redirect.file_token, request_token=cdn_file.request_token )) @@ -59,13 +59,13 @@ class CdnDecrypter: return decrypter, cdn_file - def get_file(self): + async def get_file(self): """Calls GetCdnFileRequest and decrypts its bytes. Also ensures that the file hasn't been tampered. """ if self.cdn_file_hashes: cdn_hash = self.cdn_file_hashes.pop(0) - cdn_file = self.client(GetCdnFileRequest( + cdn_file = await self.client(GetCdnFileRequest( self.file_token, cdn_hash.offset, cdn_hash.limit )) cdn_file.bytes = self.cdn_aes.encrypt(cdn_file.bytes) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 1728d98d..6e23b1fa 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -615,7 +615,7 @@ class TelegramBareClient: try: if cdn_decrypter: - result = cdn_decrypter.get_file() + result = await cdn_decrypter.get_file() else: result = await client(GetFileRequest( input_location, offset, part_size @@ -623,7 +623,7 @@ class TelegramBareClient: if isinstance(result, FileCdnRedirect): cdn_decrypter, result = \ - CdnDecrypter.prepare_decrypter( + await CdnDecrypter.prepare_decrypter( client, await self._get_cdn_client(result), result From e4bcab336bb756fbe5b09e0d1ffa581e6c5449d6 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 20 Oct 2017 16:05:49 +0200 Subject: [PATCH 04/23] Fix some missing await calls --- telethon/telegram_bare_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 2cfe55f1..c6e02fbf 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -265,7 +265,7 @@ class TelegramBareClient: else: self.disconnect() self.session.auth_key = None # Force creating new auth_key - dc = self._get_dc(new_dc) + dc = await self._get_dc(new_dc) ip = dc.ip_address self.session.server_address = ip self.session.port = dc.port @@ -300,7 +300,7 @@ class TelegramBareClient: # New configuration, perhaps a new CDN was added? TelegramBareClient._dc_options = await (self(GetConfigRequest())).dc_options - return self._get_dc(dc_id, ipv6=ipv6, cdn=cdn) + return await self._get_dc(dc_id, ipv6=ipv6, cdn=cdn) async def _get_exported_client(self, dc_id): """Creates and connects a new TelegramBareClient for the desired DC. From 780e0ceddf29201a5001df9af428699907a25270 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Sun, 22 Oct 2017 15:06:36 +0300 Subject: [PATCH 05/23] Update handlers works; it also seems stable --- telethon/extensions/tcp_client.py | 34 +++-- telethon/network/connection.py | 6 +- telethon/network/mtproto_sender.py | 66 ++++++---- telethon/telegram_bare_client.py | 187 ++++++++++++++++++-------- telethon/telegram_client.py | 36 +++-- telethon/tl/message_container.py | 12 ++ telethon/tl/tl_message.py | 17 +++ telethon/tl/tlobject.py | 4 +- telethon/update_state.py | 203 +++++++---------------------- 9 files changed, 300 insertions(+), 265 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index e847873f..9d0a2dee 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -5,13 +5,12 @@ import socket from datetime import timedelta from io import BytesIO, BufferedWriter -loop = asyncio.get_event_loop() - class TcpClient: - def __init__(self, proxy=None, timeout=timedelta(seconds=5)): + def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None): self.proxy = proxy self._socket = None + self._loop = loop if loop else asyncio.get_event_loop() if isinstance(timeout, timedelta): self.timeout = timeout.seconds @@ -31,7 +30,7 @@ class TcpClient: else: # tuple, list, etc. self._socket.set_proxy(*self.proxy) - self._socket.settimeout(self.timeout) + self._socket.setblocking(False) async def connect(self, ip, port): """Connects to the specified IP and port number. @@ -42,20 +41,27 @@ class TcpClient: else: mode, address = socket.AF_INET, (ip, port) + timeout = 1 while True: try: - while not self._socket: + if not self._socket: self._recreate_socket(mode) - await loop.sock_connect(self._socket, address) + await self._loop.sock_connect(self._socket, address) break # Successful connection, stop retrying to connect + except ConnectionError: + self._socket = None + await asyncio.sleep(min(timeout, 15)) + timeout *= 2 except OSError as e: # There are some errors that we know how to handle, and # the loop will allow us to retry - if e.errno == errno.EBADF: + if e.errno in [errno.EBADF, errno.ENOTSOCK, errno.EINVAL]: # Bad file descriptor, i.e. socket was closed, set it # to none to recreate it on the next iteration self._socket = None + await asyncio.sleep(min(timeout, 15)) + timeout *= 2 else: raise @@ -81,13 +87,14 @@ class TcpClient: raise ConnectionResetError() try: - await loop.sock_sendall(self._socket, data) - except socket.timeout as e: + await asyncio.wait_for(self._loop.sock_sendall(self._socket, data), + timeout=self.timeout, loop=self._loop) + except asyncio.TimeoutError as e: raise TimeoutError() from e except BrokenPipeError: self._raise_connection_reset() except OSError as e: - if e.errno == errno.EBADF: + if e.errno in [errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, errno.EINVAL, errno.ENOTCONN]: self._raise_connection_reset() else: raise @@ -104,11 +111,12 @@ class TcpClient: bytes_left = size while bytes_left != 0: try: - partial = await loop.sock_recv(self._socket, bytes_left) - except socket.timeout as e: + partial = await asyncio.wait_for(self._loop.sock_recv(self._socket, bytes_left), + timeout=self.timeout, loop=self._loop) + except asyncio.TimeoutError as e: raise TimeoutError() from e except OSError as e: - if e.errno == errno.EBADF or e.errno == errno.ENOTSOCK: + if e.errno in [errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, errno.EINVAL, errno.ENOTCONN]: self._raise_connection_reset() else: raise diff --git a/telethon/network/connection.py b/telethon/network/connection.py index 270b9451..9ffdd453 100644 --- a/telethon/network/connection.py +++ b/telethon/network/connection.py @@ -43,13 +43,13 @@ class Connection: """ def __init__(self, mode=ConnectionMode.TCP_FULL, - proxy=None, timeout=timedelta(seconds=5)): + proxy=None, timeout=timedelta(seconds=5), loop=None): self._mode = mode self._send_counter = 0 self._aes_encrypt, self._aes_decrypt = None, None # TODO Rename "TcpClient" as some sort of generic socket? - self.conn = TcpClient(proxy=proxy, timeout=timeout) + self.conn = TcpClient(proxy=proxy, timeout=timeout, loop=loop) # Sending messages if mode == ConnectionMode.TCP_FULL: @@ -206,7 +206,7 @@ class Connection: return await self.conn.read(length) async def _read_obfuscated(self, length): - return await self._aes_decrypt.encrypt(self.conn.read(length)) + return self._aes_decrypt.encrypt(await self.conn.read(length)) # endregion diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 1684230d..6eae4cc3 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -1,6 +1,8 @@ import gzip import logging import struct +import asyncio +from asyncio import Event from .. import helpers as utils from ..crypto import AES @@ -30,17 +32,15 @@ class MtProtoSender: in parallel, so thread-safety (hence locking) isn't needed. """ - def __init__(self, session, connection): + def __init__(self, session, connection, loop=None): """Creates a new MtProtoSender configured to send messages through 'connection' and using the parameters from 'session'. """ self.session = session self.connection = connection + self._loop = loop if loop else asyncio.get_event_loop() self._logger = logging.getLogger(__name__) - # Message IDs that need confirmation - self._need_confirmation = [] - # Requests (as msg_id: Message) sent waiting to be received self._pending_receive = {} @@ -54,12 +54,11 @@ class MtProtoSender: def disconnect(self): """Disconnects from the server""" self.connection.close() - self._need_confirmation.clear() self._clear_all_pending() def clone(self): """Creates a copy of this MtProtoSender as a new connection""" - return MtProtoSender(self.session, self.connection.clone()) + return MtProtoSender(self.session, self.connection.clone(), self._loop) # region Send and receive @@ -67,21 +66,23 @@ class MtProtoSender: """Sends the specified MTProtoRequest, previously sending any message which needed confirmation.""" + # Prepare the event of every request + for r in requests: + if r.confirm_received is None: + r.confirm_received = Event(loop=self._loop) + else: + r.confirm_received.clear() + # Finally send our packed request(s) messages = [TLMessage(self.session, r) for r in requests] self._pending_receive.update({m.msg_id: m for m in messages}) - # Pack everything in the same container if we need to send AckRequests - if self._need_confirmation: - messages.append( - TLMessage(self.session, MsgsAck(self._need_confirmation)) - ) - self._need_confirmation.clear() - if len(messages) == 1: message = messages[0] else: message = TLMessage(self.session, MessageContainer(messages)) + for m in messages: + m.container_msg_id = message.msg_id await self._send_message(message) @@ -115,6 +116,7 @@ class MtProtoSender: message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: await self._process_msg(remote_msg_id, remote_seq, reader, update_state) + await self._send_acknowledge(remote_msg_id) # endregion @@ -174,7 +176,6 @@ class MtProtoSender: """ # TODO Check salt, session_id and sequence_number - self._need_confirmation.append(msg_id) code = reader.read_int(signed=False) reader.seek(-4) @@ -210,14 +211,14 @@ class MtProtoSender: if code == MsgsAck.CONSTRUCTOR_ID: # may handle the request we wanted ack = reader.tgread_object() assert isinstance(ack, MsgsAck) - # Ignore every ack request *unless* when logging out, when it's + # Ignore every ack request *unless* when logging out, # when it seems to only make sense. We also need to set a non-None # result since Telegram doesn't send the response for these. for msg_id in ack.msg_ids: r = self._pop_request_of_type(msg_id, LogOutRequest) if r: r.result = True # Telegram won't send this value - r.confirm_received() + r.confirm_received.set() self._logger.debug('Message ack confirmed', r) return True @@ -259,11 +260,29 @@ class MtProtoSender: if message and isinstance(message.request, t): return self._pending_receive.pop(msg_id).request + def _pop_requests_of_container(self, container_msg_id): + msgs = [msg for msg in self._pending_receive.values() if msg.container_msg_id == container_msg_id] + requests = [msg.request for msg in msgs] + for msg in msgs: + self._pending_receive.pop(msg.msg_id, None) + return requests + def _clear_all_pending(self): for r in self._pending_receive.values(): - r.confirm_received.set() + r.request.confirm_received.set() self._pending_receive.clear() + async def _resend_request(self, msg_id): + request = self._pop_request(msg_id) + if request: + self._logger.debug('requests is about to resend') + await self.send(request) + return + requests = self._pop_requests_of_container(msg_id) + if requests: + self._logger.debug('container of requests is about to resend') + await self.send(*requests) + async def _handle_pong(self, msg_id, sequence, reader): self._logger.debug('Handling pong') pong = reader.tgread_object() @@ -303,10 +322,9 @@ class MtProtoSender: self.session.salt = struct.unpack( ' 0: 'workers' background threads will be spawned, any - any of them will invoke all the self.handlers. - """ - self._workers = workers - self._worker_threads = [] - + def __init__(self, loop=None): self.handlers = [] - self._updates_lock = RLock() - self._updates_available = Event() - self._updates = deque() self._latest_updates = deque(maxlen=10) + self._loop = loop if loop else asyncio.get_event_loop() self._logger = logging.getLogger(__name__) # https://core.telegram.org/api/updates self._state = tl.updates.State(0, 0, datetime.now(), 0, 0) - def can_poll(self): - """Returns True if a call to .poll() won't lock""" - return self._updates_available.is_set() - - def poll(self, timeout=None): - """Polls an update or blocks until an update object is available. - If 'timeout is not None', it should be a floating point value, - and the method will 'return None' if waiting times out. - """ - if not self._updates_available.wait(timeout=timeout): - return - - with self._updates_lock: - if not self._updates_available.is_set(): - return - - update = self._updates.popleft() - if not self._updates: - self._updates_available.clear() - - if isinstance(update, Exception): - raise update # Some error was set through (surely StopIteration) - - return update - - def get_workers(self): - return self._workers - - def set_workers(self, n): - """Changes the number of workers running. - If 'n is None', clears all pending updates from memory. - """ - self.stop_workers() - self._workers = n - if n is None: - self._updates.clear() - else: - self.setup_workers() - - workers = property(fget=get_workers, fset=set_workers) - - def stop_workers(self): - """Raises "StopIterationException" on the worker threads to stop them, - and also clears all of them off the list - """ - if self._workers: - with self._updates_lock: - # Insert at the beginning so the very next poll causes an error - # on all the worker threads - # TODO Should this reset the pts and such? - for _ in range(self._workers): - self._updates.appendleft(StopIteration()) - self._updates_available.set() - - for t in self._worker_threads: - t.join() - - self._worker_threads.clear() - - def setup_workers(self): - if self._worker_threads or not self._workers: - # There already are workers, or workers is None or 0. Do nothing. - return - - for i in range(self._workers): - thread = Thread( - target=UpdateState._worker_loop, - name='UpdateWorker{}'.format(i), - daemon=True, - args=(self, i) - ) - self._worker_threads.append(thread) - thread.start() - - def _worker_loop(self, wid): - while True: - try: - update = self.poll(timeout=UpdateState.WORKER_POLL_TIMEOUT) - # TODO Maybe people can add different handlers per update type - if update: - for handler in self.handlers: - handler(update) - except StopIteration: - break - except Exception as e: - # We don't want to crash a worker thread due to any reason - self._logger.debug( - '[ERROR] Unhandled exception on worker {}'.format(wid), e - ) + def handle_update(self, update): + for handler in self.handlers: + asyncio.ensure_future(handler(update), loop=self._loop) def process(self, update): """Processes an update object. This method is normally called by the library itself. """ - if self._workers is None: - return # No processing needs to be done if nobody's working + if isinstance(update, tl.updates.State): + self._state = update + return # Nothing else to be done - with self._updates_lock: - if isinstance(update, tl.updates.State): - self._state = update - return # Nothing else to be done + pts = getattr(update, 'pts', self._state.pts) + if hasattr(update, 'pts') and pts <= self._state.pts: + return # We already handled this update - pts = getattr(update, 'pts', self._state.pts) - if hasattr(update, 'pts') and pts <= self._state.pts: - return # We already handled this update + self._state.pts = pts - self._state.pts = pts + # TODO There must be a better way to handle updates rather than + # keeping a queue with the latest updates only, and handling + # the 'pts' correctly should be enough. However some updates + # like UpdateUserStatus (even inside UpdateShort) will be called + # repeatedly very often if invoking anything inside an update + # handler. TODO Figure out why. + """ + client = TelegramClient('anon', api_id, api_hash, update_workers=1) + client.connect() + def handle(u): + client.get_me() + client.add_update_handler(handle) + input('Enter to exit.') + """ + data = pickle.dumps(update.to_dict()) + if data in self._latest_updates: + return # Duplicated too - # TODO There must be a better way to handle updates rather than - # keeping a queue with the latest updates only, and handling - # the 'pts' correctly should be enough. However some updates - # like UpdateUserStatus (even inside UpdateShort) will be called - # repeatedly very often if invoking anything inside an update - # handler. TODO Figure out why. - """ - client = TelegramClient('anon', api_id, api_hash, update_workers=1) - client.connect() - def handle(u): - client.get_me() - client.add_update_handler(handle) - input('Enter to exit.') - """ - data = pickle.dumps(update.to_dict()) - if data in self._latest_updates: - return # Duplicated too + self._latest_updates.append(data) - self._latest_updates.append(data) + if type(update).SUBCLASS_OF_ID == 0x8af52aac: # crc32(b'Updates') + # Expand "Updates" into "Update", and pass these to callbacks. + # Since .users and .chats have already been processed, we + # don't need to care about those either. + if isinstance(update, tl.UpdateShort): + self.handle_update(update.update) - if type(update).SUBCLASS_OF_ID == 0x8af52aac: # crc32(b'Updates') - # Expand "Updates" into "Update", and pass these to callbacks. - # Since .users and .chats have already been processed, we - # don't need to care about those either. - if isinstance(update, tl.UpdateShort): - self._updates.append(update.update) - self._updates_available.set() + elif isinstance(update, (tl.Updates, tl.UpdatesCombined)): + for upd in update.updates: + self.handle_update(upd) - elif isinstance(update, (tl.Updates, tl.UpdatesCombined)): - self._updates.extend(update.updates) - self._updates_available.set() + elif not isinstance(update, tl.UpdatesTooLong): + # TODO Handle "Updates too long" + self.handle_update(update) - elif not isinstance(update, tl.UpdatesTooLong): - # TODO Handle "Updates too long" - self._updates.append(update) - self._updates_available.set() - - elif type(update).SUBCLASS_OF_ID == 0x9f89304e: # crc32(b'Update') - self._updates.append(update) - self._updates_available.set() - else: - self._logger.debug('Ignoring "update" of type {}'.format( - type(update).__name__) - ) + elif type(update).SUBCLASS_OF_ID == 0x9f89304e: # crc32(b'Update') + self.handle_update(update) + else: + self._logger.debug('Ignoring "update" of type {}'.format( + type(update).__name__) + ) From 1a0d5e75bfd5cad3e8223b6132180b142289dca7 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 22 Oct 2017 19:13:45 +0200 Subject: [PATCH 06/23] Make use of more constants in the TcpClient --- telethon/extensions/tcp_client.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 9d0a2dee..eda4109f 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -5,6 +5,12 @@ import socket from datetime import timedelta from io import BytesIO, BufferedWriter +MAX_TIMEOUT = 15 # in seconds +CONN_RESET_ERRNOS = { + errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, + errno.EINVAL, errno.ENOTCONN +} + class TcpClient: def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None): @@ -51,17 +57,17 @@ class TcpClient: break # Successful connection, stop retrying to connect except ConnectionError: self._socket = None - await asyncio.sleep(min(timeout, 15)) - timeout *= 2 + await asyncio.sleep(timeout) + timeout = min(timeout * 2, MAX_TIMEOUT) except OSError as e: # There are some errors that we know how to handle, and # the loop will allow us to retry - if e.errno in [errno.EBADF, errno.ENOTSOCK, errno.EINVAL]: + if e.errno in (errno.EBADF, errno.ENOTSOCK, errno.EINVAL): # Bad file descriptor, i.e. socket was closed, set it # to none to recreate it on the next iteration self._socket = None - await asyncio.sleep(min(timeout, 15)) - timeout *= 2 + await asyncio.sleep(timeout) + timeout = min(timeout * 2, MAX_TIMEOUT) else: raise @@ -94,7 +100,7 @@ class TcpClient: except BrokenPipeError: self._raise_connection_reset() except OSError as e: - if e.errno in [errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, errno.EINVAL, errno.ENOTCONN]: + if e.errno in CONN_RESET_ERRNOS: self._raise_connection_reset() else: raise @@ -116,7 +122,7 @@ class TcpClient: except asyncio.TimeoutError as e: raise TimeoutError() from e except OSError as e: - if e.errno in [errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, errno.EINVAL, errno.ENOTCONN]: + if e.errno in CONN_RESET_ERRNOS: self._raise_connection_reset() else: raise From 30ac6789ce741f72fdb2b0bbed87fcbe81a1bce7 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 22 Oct 2017 19:27:49 +0200 Subject: [PATCH 07/23] Change _set_connected_and_authorized condition --- telethon/telegram_bare_client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 318ef48e..133661d7 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -209,12 +209,14 @@ class TelegramBareClient: # another data center and this would raise UserMigrateError) # to also assert whether the user is logged in or not. self._user_connected = True - if _sync_updates and not _cdn and not self._authorized: + if self._authorized is None and _sync_updates and not _cdn: try: await self.sync_updates() self._set_connected_and_authorized() except UnauthorizedError: - pass + self._authorized = False + elif self._authorized: + self._set_connected_and_authorized() return True From ffaa3ac0649e47392c3fa82aecf665220f51434e Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 22 Oct 2017 19:47:24 +0200 Subject: [PATCH 08/23] Remove unused timeout variable from the TelegramClient --- telethon/telegram_bare_client.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 133661d7..315aee00 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -726,7 +726,6 @@ class TelegramBareClient: async def _recv_loop_impl(self): need_reconnect = False - timeout = 1 while self._user_connected: try: if need_reconnect: @@ -741,8 +740,7 @@ class TelegramBareClient: except ConnectionError as error: self._logger.debug(error) need_reconnect = True - await asyncio.sleep(min(timeout, 15), loop=self._loop) - timeout *= 2 + await asyncio.sleep(1, loop=self._loop) except Exception as error: # Unknown exception, pass it to the main thread self._logger.debug( @@ -769,7 +767,6 @@ class TelegramBareClient: # add a little sleep to avoid the CPU usage going mad. await asyncio.sleep(0.1, loop=self._loop) break - timeout = 1 self._recv_loop = None # endregion From 3a7fa249a4d503f144666f5e2f05b1f220406a43 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 22 Oct 2017 20:30:55 +0200 Subject: [PATCH 09/23] Revert None result checks on the TelegramClient --- telethon/telegram_client.py | 35 ++++++++++++----------------------- 1 file changed, 12 insertions(+), 23 deletions(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index d58a03d7..b6f2597e 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -106,9 +106,8 @@ class TelegramClient(TelegramBareClient): """Sends a code request to the specified phone number""" phone = EntityDatabase.parse_phone(phone) or self._phone result = await self(SendCodeRequest(phone, self.api_id, self.api_hash)) - if result: - self._phone = phone - self._phone_code_hash = result.phone_code_hash + self._phone = phone + self._phone_code_hash = result.phone_code_hash return result async def sign_in(self, phone=None, code=None, @@ -172,10 +171,8 @@ class TelegramClient(TelegramBareClient): 'and a password only if an RPCError was raised before.' ) - if result: - self._set_connected_and_authorized() - return result.user - return result + self._set_connected_and_authorized() + return result.user async def sign_up(self, code, first_name, last_name=''): """Signs up to Telegram. Make sure you sent a code request first!""" @@ -187,10 +184,8 @@ class TelegramClient(TelegramBareClient): last_name=last_name )) - if result: - self._set_connected_and_authorized() - return result.user - return result + self._set_connected_and_authorized() + return result.user async def log_out(self): """Logs out and deletes the current session. @@ -209,7 +204,7 @@ class TelegramClient(TelegramBareClient): """Gets "me" (the self user) which is currently authenticated, or None if the request fails (hence, not authenticated).""" try: - return await self(GetUsersRequest([InputUserSelf()]))[0] + return (await self(GetUsersRequest([InputUserSelf()])))[0] except UnauthorizedError: return None @@ -246,7 +241,7 @@ class TelegramClient(TelegramBareClient): offset_peer=offset_peer, limit=need if need < float('inf') else 0 )) - if not r or not r.dialogs: + if not r.dialogs: break for d in r.dialogs: @@ -295,12 +290,10 @@ class TelegramClient(TelegramBareClient): :return List[telethon.tl.custom.Draft]: A list of open drafts """ response = await self(GetAllDraftsRequest()) - if response: - self.session.process_entities(response) - self.session.generate_sequence(response.seq) - drafts = [Draft._from_update(self, u) for u in response.updates] - return drafts - return response + self.session.process_entities(response) + self.session.generate_sequence(response.seq) + drafts = [Draft._from_update(self, u) for u in response.updates] + return drafts async def send_message(self, entity, @@ -322,8 +315,6 @@ class TelegramClient(TelegramBareClient): reply_to_msg_id=self._get_reply_to(reply_to) ) result = await self(request) - if not result: - return result if isinstance(result, UpdateShortSentMessage): return Message( @@ -419,8 +410,6 @@ class TelegramClient(TelegramBareClient): min_id=min_id, add_offset=add_offset )) - if not result: - return result # The result may be a messages slice (not all messages were retrieved) # or simply a messages TLObject. In the later case, no "count" From 8bd578711cc6bdcbfb6f111612efb71277d3d865 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Mon, 23 Oct 2017 10:05:15 +0200 Subject: [PATCH 10/23] Revert "no more retries" exception --- telethon/telegram_bare_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 253d888e..82823105 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -417,7 +417,7 @@ class TelegramBareClient: if result is not None: return result - return None + raise ValueError('Number of retries reached 0.') # Let people use client.invoke(SomeRequest()) instead client(...) invoke = __call__ From cb2d943139bb0de42fd7014d887866eb7ab4dea2 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Sun, 29 Oct 2017 15:33:03 +0300 Subject: [PATCH 11/23] Remove forgotten points --- telethon/network/mtproto_sender.py | 4 ++-- telethon/telegram_bare_client.py | 2 +- telethon/tl/tl_message.py | 2 -- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index e84cb6f0..26e5302a 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -275,12 +275,10 @@ class MtProtoSender: async def _resend_request(self, msg_id): request = self._pop_request(msg_id) if request: - self._logger.debug('requests is about to resend') await self.send(request) return requests = self._pop_requests_of_container(msg_id) if requests: - self._logger.debug('container of requests is about to resend') await self.send(*requests) async def _handle_pong(self, msg_id, sequence, reader): @@ -362,6 +360,7 @@ class MtProtoSender: # TODO For now, simply ack msg_new.answer_msg_id # Relevant tdesktop source code: https://goo.gl/VvpCC6 + await self._send_acknowledge(msg_new.answer_msg_id) return True async def _handle_msg_new_detailed_info(self, msg_id, sequence, reader): @@ -370,6 +369,7 @@ class MtProtoSender: # TODO For now, simply ack msg_new.answer_msg_id # Relevant tdesktop source code: https://goo.gl/G7DPsR + await self._send_acknowledge(msg_new.answer_msg_id) return True async def _handle_new_session_created(self, msg_id, sequence, reader): diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 82823105..acf37ba5 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -693,7 +693,7 @@ class TelegramBareClient: """ self.updates.process(await self(GetStateRequest())) - async def add_update_handler(self, handler): + def add_update_handler(self, handler): """Adds an update handler (a function which takes a TLObject, an update, as its parameter) and listens for updates""" self.updates.handlers.append(handler) diff --git a/telethon/tl/tl_message.py b/telethon/tl/tl_message.py index bb522325..bcb48279 100644 --- a/telethon/tl/tl_message.py +++ b/telethon/tl/tl_message.py @@ -1,5 +1,4 @@ import struct -import logging from . import TLObject, GzipPacked @@ -13,7 +12,6 @@ class TLMessage(TLObject): self.seq_no = session.generate_sequence(request.content_related) self.request = request self.container_msg_id = None - logging.getLogger(__name__).debug(self) def to_dict(self, recursive=True): return { From 25af22f1e7b7c7c372d7da240b8aee6d33a26652 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Tue, 14 Nov 2017 13:52:33 +0300 Subject: [PATCH 12/23] Bugfix in reconnection --- telethon/extensions/tcp_client.py | 75 +++++++++++++++++++++++++++---- telethon/telegram_bare_client.py | 2 +- 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index eda4109f..5ad68c20 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -72,7 +72,7 @@ class TcpClient: raise def _get_connected(self): - return self._socket is not None + return self._socket is not None and self._socket.fileno() >= 0 connected = property(fget=_get_connected) @@ -90,11 +90,14 @@ class TcpClient: async def write(self, data): """Writes (sends) the specified bytes to the connected peer""" if self._socket is None: - raise ConnectionResetError() + self._raise_connection_reset() try: - await asyncio.wait_for(self._loop.sock_sendall(self._socket, data), - timeout=self.timeout, loop=self._loop) + await asyncio.wait_for( + self.sock_sendall(data), + timeout=self.timeout, + loop=self._loop + ) except asyncio.TimeoutError as e: raise TimeoutError() from e except BrokenPipeError: @@ -109,16 +112,18 @@ class TcpClient: """Reads (receives) a whole block of 'size bytes from the connected peer. """ - if self._socket is None: - raise ConnectionResetError() - # TODO Remove the timeout from this method, always use previous one with BufferedWriter(BytesIO(), buffer_size=size) as buffer: bytes_left = size while bytes_left != 0: try: - partial = await asyncio.wait_for(self._loop.sock_recv(self._socket, bytes_left), - timeout=self.timeout, loop=self._loop) + if self._socket is None: + self._raise_connection_reset() + partial = await asyncio.wait_for( + self.sock_recv(bytes_left), + timeout=self.timeout, + loop=self._loop + ) except asyncio.TimeoutError as e: raise TimeoutError() from e except OSError as e: @@ -140,3 +145,55 @@ class TcpClient: def _raise_connection_reset(self): self.close() # Connection reset -> flag as socket closed raise ConnectionResetError('The server has closed the connection.') + + # due to new https://github.com/python/cpython/pull/4386 + def sock_recv(self, n): + fut = self._loop.create_future() + self._sock_recv(fut, None, n) + return fut + + def _sock_recv(self, fut, registered_fd, n): + if registered_fd is not None: + self._loop.remove_reader(registered_fd) + if fut.cancelled(): + return + + try: + data = self._socket.recv(n) + except (BlockingIOError, InterruptedError): + fd = self._socket.fileno() + self._loop.add_reader(fd, self._sock_recv, fut, fd, n) + except Exception as exc: + fut.set_exception(exc) + else: + fut.set_result(data) + + def sock_sendall(self, data): + fut = self._loop.create_future() + if data: + self._sock_sendall(fut, None, data) + else: + fut.set_result(None) + return fut + + def _sock_sendall(self, fut, registered_fd, data): + if registered_fd: + self._loop.remove_writer(registered_fd) + if fut.cancelled(): + return + + try: + n = self._socket.send(data) + except (BlockingIOError, InterruptedError): + n = 0 + except Exception as exc: + fut.set_exception(exc) + return + + if n == len(data): + fut.set_result(None) + else: + if n: + data = data[n:] + fd = self._socket.fileno() + self._loop.add_writer(fd, self._sock_sendall, fut, fd, data) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index fd42fbc9..a26d71b1 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -444,7 +444,7 @@ class TelegramBareClient: self._logger.debug('Server disconnected us. Reconnecting and ' 'resending request... (%d)' % retry) await self._reconnect() - if not self._sender.is_connected(): + if not self.is_connected(): await asyncio.sleep(retry + 1, loop=self._loop) return None From 8a287c28601f5bbb11cfb6c99897c50aa167377c Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 16 Nov 2017 13:21:24 +0100 Subject: [PATCH 13/23] Fix-up removing required error variable while merging --- telethon/telegram_bare_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 54bc16bc..ab183ed5 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -738,7 +738,7 @@ class TelegramBareClient: self._logger.debug(error) need_reconnect = True await asyncio.sleep(1, loop=self._loop) - except Exception: + except Exception as error: # Unknown exception, pass it to the main thread self._logger.exception( 'Unknown error on the read loop, please report.' From c67f78eab78051eb0661a0314594a4b8ae0d32fc Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 25 Nov 2017 16:17:00 +0100 Subject: [PATCH 14/23] Add unparse markdown method --- telethon/extensions/markdown.py | 73 ++++++++++++++++++++++++++++----- 1 file changed, 63 insertions(+), 10 deletions(-) diff --git a/telethon/extensions/markdown.py b/telethon/extensions/markdown.py index f4f3f740..8c326296 100644 --- a/telethon/extensions/markdown.py +++ b/telethon/extensions/markdown.py @@ -24,6 +24,12 @@ DEFAULT_DELIMITERS = { # reason why there's '\0' after every match-literal character. DEFAULT_URL_RE = re.compile(b'\\[\0(.+?)\\]\0\\(\0(.+?)\\)\0') +# Reverse operation for DEFAULT_URL_RE. {0} for text, {1} for URL. +DEFAULT_URL_FORMAT = '[{0}]({1})' + +# Encoding to be used +ENC = 'utf-16le' + def parse(message, delimiters=None, url_re=None): """ @@ -45,7 +51,7 @@ def parse(message, delimiters=None, url_re=None): return message, [] delimiters = DEFAULT_DELIMITERS - delimiters = {k.encode('utf-16le'): v for k, v in delimiters.items()} + delimiters = {k.encode(ENC): v for k, v in delimiters.items()} # Cannot use a for loop because we need to skip some indices i = 0 @@ -55,7 +61,7 @@ def parse(message, delimiters=None, url_re=None): # Work on byte level with the utf-16le encoding to get the offsets right. # The offset will just be half the index we're at. - message = message.encode('utf-16le') + message = message.encode(ENC) while i < len(message): if url_re and current is None: # If we're not inside a previous match since Telegram doesn't allow @@ -71,7 +77,7 @@ def parse(message, delimiters=None, url_re=None): result.append(MessageEntityTextUrl( offset=i // 2, length=len(url_match.group(1)) // 2, - url=url_match.group(2).decode('utf-16le') + url=url_match.group(2).decode(ENC) )) i += len(url_match.group(1)) # Next loop iteration, don't check delimiters, since @@ -126,24 +132,71 @@ def parse(message, delimiters=None, url_re=None): + message[2 * current.offset:] ) - return message.decode('utf-16le'), result + return message.decode(ENC), result + + +def unparse(text, entities, delimiters=None, url_fmt=None): + """ + Performs the reverse operation to .parse(), effectively returning + markdown-like syntax given a normal text and its MessageEntity's. + + :param text: the text to be reconverted into markdown. + :param entities: the MessageEntity's applied to the text. + :return: a markdown-like text representing the combination of both inputs. + """ + if not delimiters: + if delimiters is not None: + return text + delimiters = DEFAULT_DELIMITERS + + if url_fmt is None: + url_fmt = DEFAULT_URL_FORMAT + + if isinstance(entities, TLObject): + entities = (entities,) + else: + entities = tuple(sorted(entities, key=lambda e: e.offset, reverse=True)) + + # Reverse the delimiters, and encode them as utf16 + delimiters = {v: k.encode(ENC) for k, v in delimiters.items()} + text = text.encode(ENC) + for entity in entities: + s = entity.offset * 2 + e = (entity.offset + entity.length) * 2 + delimiter = delimiters.get(type(entity), None) + if delimiter: + text = text[:s] + delimiter + text[s:e] + delimiter + text[e:] + elif isinstance(entity, MessageEntityTextUrl) and url_fmt: + # If byte-strings supported .format(), we, could have converted + # the str url_fmt to a byte-string with the following regex: + # re.sub(b'{\0\s*(?:([01])\0)?\s*}\0',rb'{\1}',url_fmt.encode(ENC)) + # + # This would preserve {}, {0} and {1}. + # Alternatively (as it's done), we can decode/encode it every time. + text = ( + text[:s] + + url_fmt.format(text[s:e].decode(ENC), entity.url).encode(ENC) + + text[e:] + ) + + return text.decode(ENC) def get_inner_text(text, entity): """Gets the inner text that's surrounded by the given entity or entities. For instance: text = 'hey!', entity = MessageEntityBold(2, 2) -> 'y!'. """ - if not isinstance(entity, TLObject) and hasattr(entity, '__iter__'): - multiple = True - else: - entity = [entity] + if isinstance(entity, TLObject): + entity = (entity,) multiple = False + else: + multiple = True - text = text.encode('utf-16le') + text = text.encode(ENC) result = [] for e in entity: start = e.offset * 2 end = (e.offset + e.length) * 2 - result.append(text[start:end].decode('utf-16le')) + result.append(text[start:end].decode(ENC)) return result if multiple else result[0] From e0802d1a2d830c0c859622dd5c9e88b31f03db27 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 25 Nov 2017 18:47:41 +0100 Subject: [PATCH 15/23] Update README.rst to show asyncio code (#456) --- README.rst | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/README.rst b/README.rst index 21e76aca..f68f6f3e 100755 --- a/README.rst +++ b/README.rst @@ -29,26 +29,30 @@ Creating a client phone = '+34600000000' client = TelegramClient('session_name', api_id, api_hash) - client.connect() - - # If you already have a previous 'session_name.session' file, skip this. - client.sign_in(phone=phone) - me = client.sign_in(code=77777) # Put whatever code you received here. + async def main(): + await client.connect() + # Skip this if you already have a previous 'session_name.session' file + await client.sign_in(phone_number) + me = await client.sign_in(code=input('Code: ')) Doing stuff ----------- +Note that this assumes you're inside an "async def" method. Check out the +`Python documentation `_ +if you're new with ``asyncio``. + .. code:: python print(me.stringify()) - client.send_message('username', 'Hello! Talking to you from Telethon') - client.send_file('username', '/home/myself/Pictures/holidays.jpg') + await client.send_message('username', 'Hello! Talking to you from Telethon') + await client.send_file('username', '/home/myself/Pictures/holidays.jpg') - client.download_profile_photo(me) - total, messages, senders = client.get_message_history('username') - client.download_media(messages[0]) + await client.download_profile_photo(me) + total, messages, senders = await client.get_message_history('username') + await client.download_media(messages[0]) Next steps @@ -57,4 +61,6 @@ Next steps Do you like how Telethon looks? Check the `wiki over GitHub `_ for a more in-depth explanation, with examples, troubleshooting issues, and more -useful information. +useful information. Note that the examples there are written for the threaded +version, not the one using asyncio. However, you just need to await every +remote call. From e71831050f30b868452a9ed13cf0b9470bbb68e3 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 25 Nov 2017 18:49:10 +0100 Subject: [PATCH 16/23] Fix README.rst never actually running the example --- README.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.rst b/README.rst index f68f6f3e..a24c06eb 100755 --- a/README.rst +++ b/README.rst @@ -20,6 +20,7 @@ Creating a client .. code:: python + import asyncio from telethon import TelegramClient # These example values won't work. You must get your own api_id and @@ -35,6 +36,7 @@ Creating a client await client.sign_in(phone_number) me = await client.sign_in(code=input('Code: ')) + asyncio.get_event_loop().run_until_complete(main()) Doing stuff ----------- From 2b9c06f0e68d50c63988b42927a5063f971e0910 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 5 Jan 2018 18:32:54 +0100 Subject: [PATCH 17/23] Remove invalid self._logger calls since merge --- telethon/network/mtproto_sender.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 62396f9b..44cf15b9 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -337,12 +337,10 @@ class MtProtoSender: """ request = self._pop_request(msg_id) if request: - self._logger.debug('Resending request') await self.send(request) return requests = self._pop_requests_of_container(msg_id) if requests: - self._logger.debug('Resending container of requests') await self.send(*requests) def _handle_pong(self, msg_id, sequence, pong): From d8376ee50db23ffdd1400696277667502ce3fc31 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 10 Feb 2018 12:44:09 +0100 Subject: [PATCH 18/23] Add a lock around connection.recv() --- telethon/network/mtproto_sender.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 3a35eae4..5ab8e449 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -49,6 +49,7 @@ class MtProtoSender: self.session = session self.connection = connection self._loop = loop if loop else asyncio.get_event_loop() + self._recv_lock = asyncio.Lock() # Requests (as msg_id: Message) sent waiting to be received self._pending_receive = {} @@ -123,7 +124,10 @@ class MtProtoSender: Update and Updates objects. """ try: - body = await self.connection.recv() + with await self._recv_lock: + # Receiving items is not an "atomic" operation since we + # need to read the length and then upcoming parts separated. + body = await self.connection.recv() except (BufferError, InvalidChecksumError): # TODO BufferError, we should spot the cause... # "No more bytes left"; something wrong happened, clear From 7da092894b306d720cc60c04daa2bfba58f81946 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 10 Feb 2018 12:56:54 +0100 Subject: [PATCH 19/23] Acquire reconnect lock outside the reconnect function --- telethon/telegram_bare_client.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 9ed706ee..f588cce5 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -248,7 +248,6 @@ class TelegramBareClient: if new_dc is None: # Assume we are disconnected due to some error, so connect again try: - await self._reconnect_lock.acquire() if self.is_connected(): __log__.info('Reconnection aborted: already connected') return True @@ -258,8 +257,6 @@ class TelegramBareClient: except ConnectionResetError as e: __log__.warning('Reconnection failed due to %s', e) return False - finally: - self._reconnect_lock.release() else: # Since we're reconnecting possibly due to a UserMigrateError, # we need to first know the Data Centers we can connect to. Do @@ -589,9 +586,10 @@ class TelegramBareClient: if need_reconnect: __log__.info('Attempting reconnection from read loop') need_reconnect = False - while self._user_connected and not await self._reconnect(): - # Retry forever, this is instant messaging - await asyncio.sleep(0.1, loop=self._loop) + with await self._reconnect_lock: + while self._user_connected and not await self._reconnect(): + # Retry forever, this is instant messaging + await asyncio.sleep(0.1, loop=self._loop) # Telegram seems to kick us every 1024 items received # from the network not considering things like bad salt. From 2e953dab50f87d397a477617df7ebad634731b45 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Wed, 14 Feb 2018 14:15:00 +0100 Subject: [PATCH 20/23] Add missing async and await keywords on TelegramClient.on --- telethon/telegram_client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index d119c194..a05a1b45 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -1618,7 +1618,7 @@ class TelegramClient(TelegramBareClient): # region Event handling - def on(self, event): + async def on(self, event): """ Turns the given entity into a valid Telegram user or chat. @@ -1631,7 +1631,7 @@ class TelegramClient(TelegramBareClient): if isinstance(event, type): event = event() - event.resolve(self) + await event.resolve(self) def decorator(f): self._event_builders.append((event, f)) @@ -1642,12 +1642,12 @@ class TelegramClient(TelegramBareClient): return decorator - def _on_handler(self, update): + async def _on_handler(self, update): for builder, callback in self._event_builders: event = builder.build(update) if event: event._client = self - callback(event) + await callback(event) # endregion From 7998fd59f709ae1cd959c5cc4ab107982307f4a6 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 22 Feb 2018 21:57:40 +0200 Subject: [PATCH 21/23] Add missing await to mention generation in _parse_message_text (#634) --- telethon/telegram_client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 62cae901..2ca7b7ab 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -587,7 +587,7 @@ class TelegramClient(TelegramBareClient): if request.id == update.message.id: return update.message - def _parse_message_text(self, message, parse_mode): + async def _parse_message_text(self, message, parse_mode): """ Returns a (parsed message, entities) tuple depending on parse_mode. """ @@ -608,7 +608,7 @@ class TelegramClient(TelegramBareClient): if m: try: msg_entities[i] = InputMessageEntityMentionName( - e.offset, e.length, self.get_input_entity( + e.offset, e.length, await self.get_input_entity( int(m.group(1)) if m.group(1) else e.url ) ) @@ -647,7 +647,7 @@ class TelegramClient(TelegramBareClient): the sent message """ entity = await self.get_input_entity(entity) - message, msg_entities = self._parse_message_text(message, parse_mode) + message, msg_entities = await self._parse_message_text(message, parse_mode) request = SendMessageRequest( peer=entity, @@ -705,7 +705,7 @@ class TelegramClient(TelegramBareClient): Returns: the edited message """ - message, msg_entities = self._parse_message_text(message, parse_mode) + message, msg_entities = await self._parse_message_text(message, parse_mode) request = EditMessageRequest( peer=await self.get_input_entity(entity), id=self._get_message_id(message_id), From 9054a12c1143c67e6b31ad631ad6edf40217f26b Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 24 Feb 2018 18:30:09 +0100 Subject: [PATCH 22/23] Fix tiny bug regarding .get_me(input_peer=True) crashing events --- telethon/telegram_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 55babafe..1e086ffe 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -485,7 +485,7 @@ class TelegramClient(TelegramBareClient): try: me = self(GetUsersRequest([InputUserSelf()]))[0] if not self._self_input_peer: - self._self_input_peer = utils.get_input_peer( + self._self_input_peer = me = utils.get_input_peer( me, allow_self=False ) return me From 784c2e9ed15536769309e34346bee12410dbcefe Mon Sep 17 00:00:00 2001 From: Tanuj Date: Sat, 3 Mar 2018 08:19:33 +0000 Subject: [PATCH 23/23] Fix get_participants missing async keywords (#662) --- telethon/telegram_client.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 2d87508d..f5b40f00 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -1014,7 +1014,7 @@ class TelegramClient(TelegramBareClient): raise TypeError('Invalid message type: {}'.format(type(message))) - def get_participants(self, entity, limit=None, search=''): + async def get_participants(self, entity, limit=None, search=''): """ Gets the list of participants from the specified entity @@ -1032,7 +1032,7 @@ class TelegramClient(TelegramBareClient): A list of participants with an additional .total variable on the list indicating the total amount of members in this group/channel. """ - entity = self.get_input_entity(entity) + entity = await self.get_input_entity(entity) limit = float('inf') if limit is None else int(limit) if isinstance(entity, InputPeerChannel): offset = 0 @@ -1040,7 +1040,7 @@ class TelegramClient(TelegramBareClient): search = ChannelParticipantsSearch(search) while True: loop_limit = min(limit - offset, 200) - participants = self(GetParticipantsRequest( + participants = await self(GetParticipantsRequest( entity, search, offset, loop_limit, hash=0 )) if not participants.users: @@ -1053,11 +1053,11 @@ class TelegramClient(TelegramBareClient): break users = UserList(all_participants.values()) - users.total = self(GetFullChannelRequest( - entity)).full_chat.participants_count + users.total = (await self(GetFullChannelRequest( + entity))).full_chat.participants_count elif isinstance(entity, InputPeerChat): - users = self(GetFullChatRequest(entity.chat_id)).users + users = await self(GetFullChatRequest(entity.chat_id)).users if len(users) > limit: users = users[:limit] users = UserList(users)