From 780e0ceddf29201a5001df9af428699907a25270 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Sun, 22 Oct 2017 15:06:36 +0300 Subject: [PATCH 1/6] Update handlers works; it also seems stable --- telethon/extensions/tcp_client.py | 34 +++-- telethon/network/connection.py | 6 +- telethon/network/mtproto_sender.py | 66 ++++++---- telethon/telegram_bare_client.py | 187 ++++++++++++++++++-------- telethon/telegram_client.py | 36 +++-- telethon/tl/message_container.py | 12 ++ telethon/tl/tl_message.py | 17 +++ telethon/tl/tlobject.py | 4 +- telethon/update_state.py | 203 +++++++---------------------- 9 files changed, 300 insertions(+), 265 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index e847873f..9d0a2dee 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -5,13 +5,12 @@ import socket from datetime import timedelta from io import BytesIO, BufferedWriter -loop = asyncio.get_event_loop() - class TcpClient: - def __init__(self, proxy=None, timeout=timedelta(seconds=5)): + def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None): self.proxy = proxy self._socket = None + self._loop = loop if loop else asyncio.get_event_loop() if isinstance(timeout, timedelta): self.timeout = timeout.seconds @@ -31,7 +30,7 @@ class TcpClient: else: # tuple, list, etc. self._socket.set_proxy(*self.proxy) - self._socket.settimeout(self.timeout) + self._socket.setblocking(False) async def connect(self, ip, port): """Connects to the specified IP and port number. @@ -42,20 +41,27 @@ class TcpClient: else: mode, address = socket.AF_INET, (ip, port) + timeout = 1 while True: try: - while not self._socket: + if not self._socket: self._recreate_socket(mode) - await loop.sock_connect(self._socket, address) + await self._loop.sock_connect(self._socket, address) break # Successful connection, stop retrying to connect + except ConnectionError: + self._socket = None + await asyncio.sleep(min(timeout, 15)) + timeout *= 2 except OSError as e: # There are some errors that we know how to handle, and # the loop will allow us to retry - if e.errno == errno.EBADF: + if e.errno in [errno.EBADF, errno.ENOTSOCK, errno.EINVAL]: # Bad file descriptor, i.e. socket was closed, set it # to none to recreate it on the next iteration self._socket = None + await asyncio.sleep(min(timeout, 15)) + timeout *= 2 else: raise @@ -81,13 +87,14 @@ class TcpClient: raise ConnectionResetError() try: - await loop.sock_sendall(self._socket, data) - except socket.timeout as e: + await asyncio.wait_for(self._loop.sock_sendall(self._socket, data), + timeout=self.timeout, loop=self._loop) + except asyncio.TimeoutError as e: raise TimeoutError() from e except BrokenPipeError: self._raise_connection_reset() except OSError as e: - if e.errno == errno.EBADF: + if e.errno in [errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, errno.EINVAL, errno.ENOTCONN]: self._raise_connection_reset() else: raise @@ -104,11 +111,12 @@ class TcpClient: bytes_left = size while bytes_left != 0: try: - partial = await loop.sock_recv(self._socket, bytes_left) - except socket.timeout as e: + partial = await asyncio.wait_for(self._loop.sock_recv(self._socket, bytes_left), + timeout=self.timeout, loop=self._loop) + except asyncio.TimeoutError as e: raise TimeoutError() from e except OSError as e: - if e.errno == errno.EBADF or e.errno == errno.ENOTSOCK: + if e.errno in [errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, errno.EINVAL, errno.ENOTCONN]: self._raise_connection_reset() else: raise diff --git a/telethon/network/connection.py b/telethon/network/connection.py index 270b9451..9ffdd453 100644 --- a/telethon/network/connection.py +++ b/telethon/network/connection.py @@ -43,13 +43,13 @@ class Connection: """ def __init__(self, mode=ConnectionMode.TCP_FULL, - proxy=None, timeout=timedelta(seconds=5)): + proxy=None, timeout=timedelta(seconds=5), loop=None): self._mode = mode self._send_counter = 0 self._aes_encrypt, self._aes_decrypt = None, None # TODO Rename "TcpClient" as some sort of generic socket? - self.conn = TcpClient(proxy=proxy, timeout=timeout) + self.conn = TcpClient(proxy=proxy, timeout=timeout, loop=loop) # Sending messages if mode == ConnectionMode.TCP_FULL: @@ -206,7 +206,7 @@ class Connection: return await self.conn.read(length) async def _read_obfuscated(self, length): - return await self._aes_decrypt.encrypt(self.conn.read(length)) + return self._aes_decrypt.encrypt(await self.conn.read(length)) # endregion diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 1684230d..6eae4cc3 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -1,6 +1,8 @@ import gzip import logging import struct +import asyncio +from asyncio import Event from .. import helpers as utils from ..crypto import AES @@ -30,17 +32,15 @@ class MtProtoSender: in parallel, so thread-safety (hence locking) isn't needed. """ - def __init__(self, session, connection): + def __init__(self, session, connection, loop=None): """Creates a new MtProtoSender configured to send messages through 'connection' and using the parameters from 'session'. """ self.session = session self.connection = connection + self._loop = loop if loop else asyncio.get_event_loop() self._logger = logging.getLogger(__name__) - # Message IDs that need confirmation - self._need_confirmation = [] - # Requests (as msg_id: Message) sent waiting to be received self._pending_receive = {} @@ -54,12 +54,11 @@ class MtProtoSender: def disconnect(self): """Disconnects from the server""" self.connection.close() - self._need_confirmation.clear() self._clear_all_pending() def clone(self): """Creates a copy of this MtProtoSender as a new connection""" - return MtProtoSender(self.session, self.connection.clone()) + return MtProtoSender(self.session, self.connection.clone(), self._loop) # region Send and receive @@ -67,21 +66,23 @@ class MtProtoSender: """Sends the specified MTProtoRequest, previously sending any message which needed confirmation.""" + # Prepare the event of every request + for r in requests: + if r.confirm_received is None: + r.confirm_received = Event(loop=self._loop) + else: + r.confirm_received.clear() + # Finally send our packed request(s) messages = [TLMessage(self.session, r) for r in requests] self._pending_receive.update({m.msg_id: m for m in messages}) - # Pack everything in the same container if we need to send AckRequests - if self._need_confirmation: - messages.append( - TLMessage(self.session, MsgsAck(self._need_confirmation)) - ) - self._need_confirmation.clear() - if len(messages) == 1: message = messages[0] else: message = TLMessage(self.session, MessageContainer(messages)) + for m in messages: + m.container_msg_id = message.msg_id await self._send_message(message) @@ -115,6 +116,7 @@ class MtProtoSender: message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: await self._process_msg(remote_msg_id, remote_seq, reader, update_state) + await self._send_acknowledge(remote_msg_id) # endregion @@ -174,7 +176,6 @@ class MtProtoSender: """ # TODO Check salt, session_id and sequence_number - self._need_confirmation.append(msg_id) code = reader.read_int(signed=False) reader.seek(-4) @@ -210,14 +211,14 @@ class MtProtoSender: if code == MsgsAck.CONSTRUCTOR_ID: # may handle the request we wanted ack = reader.tgread_object() assert isinstance(ack, MsgsAck) - # Ignore every ack request *unless* when logging out, when it's + # Ignore every ack request *unless* when logging out, # when it seems to only make sense. We also need to set a non-None # result since Telegram doesn't send the response for these. for msg_id in ack.msg_ids: r = self._pop_request_of_type(msg_id, LogOutRequest) if r: r.result = True # Telegram won't send this value - r.confirm_received() + r.confirm_received.set() self._logger.debug('Message ack confirmed', r) return True @@ -259,11 +260,29 @@ class MtProtoSender: if message and isinstance(message.request, t): return self._pending_receive.pop(msg_id).request + def _pop_requests_of_container(self, container_msg_id): + msgs = [msg for msg in self._pending_receive.values() if msg.container_msg_id == container_msg_id] + requests = [msg.request for msg in msgs] + for msg in msgs: + self._pending_receive.pop(msg.msg_id, None) + return requests + def _clear_all_pending(self): for r in self._pending_receive.values(): - r.confirm_received.set() + r.request.confirm_received.set() self._pending_receive.clear() + async def _resend_request(self, msg_id): + request = self._pop_request(msg_id) + if request: + self._logger.debug('requests is about to resend') + await self.send(request) + return + requests = self._pop_requests_of_container(msg_id) + if requests: + self._logger.debug('container of requests is about to resend') + await self.send(*requests) + async def _handle_pong(self, msg_id, sequence, reader): self._logger.debug('Handling pong') pong = reader.tgread_object() @@ -303,10 +322,9 @@ class MtProtoSender: self.session.salt = struct.unpack( ' 0: 'workers' background threads will be spawned, any - any of them will invoke all the self.handlers. - """ - self._workers = workers - self._worker_threads = [] - + def __init__(self, loop=None): self.handlers = [] - self._updates_lock = RLock() - self._updates_available = Event() - self._updates = deque() self._latest_updates = deque(maxlen=10) + self._loop = loop if loop else asyncio.get_event_loop() self._logger = logging.getLogger(__name__) # https://core.telegram.org/api/updates self._state = tl.updates.State(0, 0, datetime.now(), 0, 0) - def can_poll(self): - """Returns True if a call to .poll() won't lock""" - return self._updates_available.is_set() - - def poll(self, timeout=None): - """Polls an update or blocks until an update object is available. - If 'timeout is not None', it should be a floating point value, - and the method will 'return None' if waiting times out. - """ - if not self._updates_available.wait(timeout=timeout): - return - - with self._updates_lock: - if not self._updates_available.is_set(): - return - - update = self._updates.popleft() - if not self._updates: - self._updates_available.clear() - - if isinstance(update, Exception): - raise update # Some error was set through (surely StopIteration) - - return update - - def get_workers(self): - return self._workers - - def set_workers(self, n): - """Changes the number of workers running. - If 'n is None', clears all pending updates from memory. - """ - self.stop_workers() - self._workers = n - if n is None: - self._updates.clear() - else: - self.setup_workers() - - workers = property(fget=get_workers, fset=set_workers) - - def stop_workers(self): - """Raises "StopIterationException" on the worker threads to stop them, - and also clears all of them off the list - """ - if self._workers: - with self._updates_lock: - # Insert at the beginning so the very next poll causes an error - # on all the worker threads - # TODO Should this reset the pts and such? - for _ in range(self._workers): - self._updates.appendleft(StopIteration()) - self._updates_available.set() - - for t in self._worker_threads: - t.join() - - self._worker_threads.clear() - - def setup_workers(self): - if self._worker_threads or not self._workers: - # There already are workers, or workers is None or 0. Do nothing. - return - - for i in range(self._workers): - thread = Thread( - target=UpdateState._worker_loop, - name='UpdateWorker{}'.format(i), - daemon=True, - args=(self, i) - ) - self._worker_threads.append(thread) - thread.start() - - def _worker_loop(self, wid): - while True: - try: - update = self.poll(timeout=UpdateState.WORKER_POLL_TIMEOUT) - # TODO Maybe people can add different handlers per update type - if update: - for handler in self.handlers: - handler(update) - except StopIteration: - break - except Exception as e: - # We don't want to crash a worker thread due to any reason - self._logger.debug( - '[ERROR] Unhandled exception on worker {}'.format(wid), e - ) + def handle_update(self, update): + for handler in self.handlers: + asyncio.ensure_future(handler(update), loop=self._loop) def process(self, update): """Processes an update object. This method is normally called by the library itself. """ - if self._workers is None: - return # No processing needs to be done if nobody's working + if isinstance(update, tl.updates.State): + self._state = update + return # Nothing else to be done - with self._updates_lock: - if isinstance(update, tl.updates.State): - self._state = update - return # Nothing else to be done + pts = getattr(update, 'pts', self._state.pts) + if hasattr(update, 'pts') and pts <= self._state.pts: + return # We already handled this update - pts = getattr(update, 'pts', self._state.pts) - if hasattr(update, 'pts') and pts <= self._state.pts: - return # We already handled this update + self._state.pts = pts - self._state.pts = pts + # TODO There must be a better way to handle updates rather than + # keeping a queue with the latest updates only, and handling + # the 'pts' correctly should be enough. However some updates + # like UpdateUserStatus (even inside UpdateShort) will be called + # repeatedly very often if invoking anything inside an update + # handler. TODO Figure out why. + """ + client = TelegramClient('anon', api_id, api_hash, update_workers=1) + client.connect() + def handle(u): + client.get_me() + client.add_update_handler(handle) + input('Enter to exit.') + """ + data = pickle.dumps(update.to_dict()) + if data in self._latest_updates: + return # Duplicated too - # TODO There must be a better way to handle updates rather than - # keeping a queue with the latest updates only, and handling - # the 'pts' correctly should be enough. However some updates - # like UpdateUserStatus (even inside UpdateShort) will be called - # repeatedly very often if invoking anything inside an update - # handler. TODO Figure out why. - """ - client = TelegramClient('anon', api_id, api_hash, update_workers=1) - client.connect() - def handle(u): - client.get_me() - client.add_update_handler(handle) - input('Enter to exit.') - """ - data = pickle.dumps(update.to_dict()) - if data in self._latest_updates: - return # Duplicated too + self._latest_updates.append(data) - self._latest_updates.append(data) + if type(update).SUBCLASS_OF_ID == 0x8af52aac: # crc32(b'Updates') + # Expand "Updates" into "Update", and pass these to callbacks. + # Since .users and .chats have already been processed, we + # don't need to care about those either. + if isinstance(update, tl.UpdateShort): + self.handle_update(update.update) - if type(update).SUBCLASS_OF_ID == 0x8af52aac: # crc32(b'Updates') - # Expand "Updates" into "Update", and pass these to callbacks. - # Since .users and .chats have already been processed, we - # don't need to care about those either. - if isinstance(update, tl.UpdateShort): - self._updates.append(update.update) - self._updates_available.set() + elif isinstance(update, (tl.Updates, tl.UpdatesCombined)): + for upd in update.updates: + self.handle_update(upd) - elif isinstance(update, (tl.Updates, tl.UpdatesCombined)): - self._updates.extend(update.updates) - self._updates_available.set() + elif not isinstance(update, tl.UpdatesTooLong): + # TODO Handle "Updates too long" + self.handle_update(update) - elif not isinstance(update, tl.UpdatesTooLong): - # TODO Handle "Updates too long" - self._updates.append(update) - self._updates_available.set() - - elif type(update).SUBCLASS_OF_ID == 0x9f89304e: # crc32(b'Update') - self._updates.append(update) - self._updates_available.set() - else: - self._logger.debug('Ignoring "update" of type {}'.format( - type(update).__name__) - ) + elif type(update).SUBCLASS_OF_ID == 0x9f89304e: # crc32(b'Update') + self.handle_update(update) + else: + self._logger.debug('Ignoring "update" of type {}'.format( + type(update).__name__) + ) From 1a0d5e75bfd5cad3e8223b6132180b142289dca7 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 22 Oct 2017 19:13:45 +0200 Subject: [PATCH 2/6] Make use of more constants in the TcpClient --- telethon/extensions/tcp_client.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 9d0a2dee..eda4109f 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -5,6 +5,12 @@ import socket from datetime import timedelta from io import BytesIO, BufferedWriter +MAX_TIMEOUT = 15 # in seconds +CONN_RESET_ERRNOS = { + errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, + errno.EINVAL, errno.ENOTCONN +} + class TcpClient: def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None): @@ -51,17 +57,17 @@ class TcpClient: break # Successful connection, stop retrying to connect except ConnectionError: self._socket = None - await asyncio.sleep(min(timeout, 15)) - timeout *= 2 + await asyncio.sleep(timeout) + timeout = min(timeout * 2, MAX_TIMEOUT) except OSError as e: # There are some errors that we know how to handle, and # the loop will allow us to retry - if e.errno in [errno.EBADF, errno.ENOTSOCK, errno.EINVAL]: + if e.errno in (errno.EBADF, errno.ENOTSOCK, errno.EINVAL): # Bad file descriptor, i.e. socket was closed, set it # to none to recreate it on the next iteration self._socket = None - await asyncio.sleep(min(timeout, 15)) - timeout *= 2 + await asyncio.sleep(timeout) + timeout = min(timeout * 2, MAX_TIMEOUT) else: raise @@ -94,7 +100,7 @@ class TcpClient: except BrokenPipeError: self._raise_connection_reset() except OSError as e: - if e.errno in [errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, errno.EINVAL, errno.ENOTCONN]: + if e.errno in CONN_RESET_ERRNOS: self._raise_connection_reset() else: raise @@ -116,7 +122,7 @@ class TcpClient: except asyncio.TimeoutError as e: raise TimeoutError() from e except OSError as e: - if e.errno in [errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, errno.EINVAL, errno.ENOTCONN]: + if e.errno in CONN_RESET_ERRNOS: self._raise_connection_reset() else: raise From 30ac6789ce741f72fdb2b0bbed87fcbe81a1bce7 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 22 Oct 2017 19:27:49 +0200 Subject: [PATCH 3/6] Change _set_connected_and_authorized condition --- telethon/telegram_bare_client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 318ef48e..133661d7 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -209,12 +209,14 @@ class TelegramBareClient: # another data center and this would raise UserMigrateError) # to also assert whether the user is logged in or not. self._user_connected = True - if _sync_updates and not _cdn and not self._authorized: + if self._authorized is None and _sync_updates and not _cdn: try: await self.sync_updates() self._set_connected_and_authorized() except UnauthorizedError: - pass + self._authorized = False + elif self._authorized: + self._set_connected_and_authorized() return True From ffaa3ac0649e47392c3fa82aecf665220f51434e Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 22 Oct 2017 19:47:24 +0200 Subject: [PATCH 4/6] Remove unused timeout variable from the TelegramClient --- telethon/telegram_bare_client.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 133661d7..315aee00 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -726,7 +726,6 @@ class TelegramBareClient: async def _recv_loop_impl(self): need_reconnect = False - timeout = 1 while self._user_connected: try: if need_reconnect: @@ -741,8 +740,7 @@ class TelegramBareClient: except ConnectionError as error: self._logger.debug(error) need_reconnect = True - await asyncio.sleep(min(timeout, 15), loop=self._loop) - timeout *= 2 + await asyncio.sleep(1, loop=self._loop) except Exception as error: # Unknown exception, pass it to the main thread self._logger.debug( @@ -769,7 +767,6 @@ class TelegramBareClient: # add a little sleep to avoid the CPU usage going mad. await asyncio.sleep(0.1, loop=self._loop) break - timeout = 1 self._recv_loop = None # endregion From 3a7fa249a4d503f144666f5e2f05b1f220406a43 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 22 Oct 2017 20:30:55 +0200 Subject: [PATCH 5/6] Revert None result checks on the TelegramClient --- telethon/telegram_client.py | 35 ++++++++++++----------------------- 1 file changed, 12 insertions(+), 23 deletions(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index d58a03d7..b6f2597e 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -106,9 +106,8 @@ class TelegramClient(TelegramBareClient): """Sends a code request to the specified phone number""" phone = EntityDatabase.parse_phone(phone) or self._phone result = await self(SendCodeRequest(phone, self.api_id, self.api_hash)) - if result: - self._phone = phone - self._phone_code_hash = result.phone_code_hash + self._phone = phone + self._phone_code_hash = result.phone_code_hash return result async def sign_in(self, phone=None, code=None, @@ -172,10 +171,8 @@ class TelegramClient(TelegramBareClient): 'and a password only if an RPCError was raised before.' ) - if result: - self._set_connected_and_authorized() - return result.user - return result + self._set_connected_and_authorized() + return result.user async def sign_up(self, code, first_name, last_name=''): """Signs up to Telegram. Make sure you sent a code request first!""" @@ -187,10 +184,8 @@ class TelegramClient(TelegramBareClient): last_name=last_name )) - if result: - self._set_connected_and_authorized() - return result.user - return result + self._set_connected_and_authorized() + return result.user async def log_out(self): """Logs out and deletes the current session. @@ -209,7 +204,7 @@ class TelegramClient(TelegramBareClient): """Gets "me" (the self user) which is currently authenticated, or None if the request fails (hence, not authenticated).""" try: - return await self(GetUsersRequest([InputUserSelf()]))[0] + return (await self(GetUsersRequest([InputUserSelf()])))[0] except UnauthorizedError: return None @@ -246,7 +241,7 @@ class TelegramClient(TelegramBareClient): offset_peer=offset_peer, limit=need if need < float('inf') else 0 )) - if not r or not r.dialogs: + if not r.dialogs: break for d in r.dialogs: @@ -295,12 +290,10 @@ class TelegramClient(TelegramBareClient): :return List[telethon.tl.custom.Draft]: A list of open drafts """ response = await self(GetAllDraftsRequest()) - if response: - self.session.process_entities(response) - self.session.generate_sequence(response.seq) - drafts = [Draft._from_update(self, u) for u in response.updates] - return drafts - return response + self.session.process_entities(response) + self.session.generate_sequence(response.seq) + drafts = [Draft._from_update(self, u) for u in response.updates] + return drafts async def send_message(self, entity, @@ -322,8 +315,6 @@ class TelegramClient(TelegramBareClient): reply_to_msg_id=self._get_reply_to(reply_to) ) result = await self(request) - if not result: - return result if isinstance(result, UpdateShortSentMessage): return Message( @@ -419,8 +410,6 @@ class TelegramClient(TelegramBareClient): min_id=min_id, add_offset=add_offset )) - if not result: - return result # The result may be a messages slice (not all messages were retrieved) # or simply a messages TLObject. In the later case, no "count" From 8bd578711cc6bdcbfb6f111612efb71277d3d865 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Mon, 23 Oct 2017 10:05:15 +0200 Subject: [PATCH 6/6] Revert "no more retries" exception --- telethon/telegram_bare_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 253d888e..82823105 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -417,7 +417,7 @@ class TelegramBareClient: if result is not None: return result - return None + raise ValueError('Number of retries reached 0.') # Let people use client.invoke(SomeRequest()) instead client(...) invoke = __call__