From 311115382257a5e12edde5fc27d6c30f46976a63 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Wed, 15 Nov 2017 13:30:44 +0300 Subject: [PATCH 01/13] No route to host catched + other errno to reconnect --- telethon/extensions/tcp_client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 5ad68c20..662327e7 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -8,7 +8,10 @@ from io import BytesIO, BufferedWriter MAX_TIMEOUT = 15 # in seconds CONN_RESET_ERRNOS = { errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, - errno.EINVAL, errno.ENOTCONN + errno.EINVAL, errno.ENOTCONN, errno.EHOSTUNREACH, + errno.ECONNREFUSED, errno.ECONNRESET, errno.ECONNABORTED, + errno.ENETDOWN, errno.ENETRESET, errno.ECONNABORTED, + errno.EHOSTDOWN, } From 2efcfbd416e2a62892451dee67bf28851eaa49d3 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Thu, 16 Nov 2017 02:56:57 +0300 Subject: [PATCH 02/13] More aggressive catching network errors --- telethon/extensions/tcp_client.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 662327e7..817c40c4 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -11,8 +11,10 @@ CONN_RESET_ERRNOS = { errno.EINVAL, errno.ENOTCONN, errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ECONNABORTED, errno.ENETDOWN, errno.ENETRESET, errno.ECONNABORTED, - errno.EHOSTDOWN, + errno.EHOSTDOWN, errno.EPIPE, errno.ESHUTDOWN } +# catched: EHOSTUNREACH, ECONNREFUSED, ECONNRESET, ENETUNREACH +# ConnectionError: EPIPE, ESHUTDOWN, ECONNABORTED, ECONNREFUSED, ECONNRESET class TcpClient: @@ -58,16 +60,9 @@ class TcpClient: await self._loop.sock_connect(self._socket, address) break # Successful connection, stop retrying to connect - except ConnectionError: - self._socket = None - await asyncio.sleep(timeout) - timeout = min(timeout * 2, MAX_TIMEOUT) except OSError as e: - # There are some errors that we know how to handle, and - # the loop will allow us to retry - if e.errno in (errno.EBADF, errno.ENOTSOCK, errno.EINVAL): - # Bad file descriptor, i.e. socket was closed, set it - # to none to recreate it on the next iteration + # ConnectionError + (errno.EBADF, errno.ENOTSOCK, errno.EINVAL) + if e.errno in CONN_RESET_ERRNOS: self._socket = None await asyncio.sleep(timeout) timeout = min(timeout * 2, MAX_TIMEOUT) @@ -103,8 +98,6 @@ class TcpClient: ) except asyncio.TimeoutError as e: raise TimeoutError() from e - except BrokenPipeError: - self._raise_connection_reset() except OSError as e: if e.errno in CONN_RESET_ERRNOS: self._raise_connection_reset() From 653dd212593fc9844edb958ff93bf2df17b066f3 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Thu, 16 Nov 2017 17:31:39 +0300 Subject: [PATCH 03/13] Socket OSError logging --- telethon/extensions/tcp_client.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 817c40c4..41cec3e5 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -2,6 +2,7 @@ import asyncio import errno import socket +import logging from datetime import timedelta from io import BytesIO, BufferedWriter @@ -22,6 +23,7 @@ class TcpClient: self.proxy = proxy self._socket = None self._loop = loop if loop else asyncio.get_event_loop() + self._logger = logging.getLogger(__name__) if isinstance(timeout, timedelta): self.timeout = timeout.seconds @@ -58,9 +60,16 @@ class TcpClient: if not self._socket: self._recreate_socket(mode) - await self._loop.sock_connect(self._socket, address) + await asyncio.wait_for( + self._loop.sock_connect(self._socket, address), + timeout=self.timeout, + loop=self._loop + ) break # Successful connection, stop retrying to connect + except asyncio.TimeoutError as e: + raise TimeoutError() from e except OSError as e: + self._logger.debug('Connect exception: %r' % e) # ConnectionError + (errno.EBADF, errno.ENOTSOCK, errno.EINVAL) if e.errno in CONN_RESET_ERRNOS: self._socket = None @@ -99,6 +108,7 @@ class TcpClient: except asyncio.TimeoutError as e: raise TimeoutError() from e except OSError as e: + self._logger.debug('Write exception: %r' % e) if e.errno in CONN_RESET_ERRNOS: self._raise_connection_reset() else: @@ -123,6 +133,7 @@ class TcpClient: except asyncio.TimeoutError as e: raise TimeoutError() from e except OSError as e: + self._logger.debug('Read exception: %r' % e) if e.errno in CONN_RESET_ERRNOS: self._raise_connection_reset() else: From 32bca4f1b8fe6aa270b1e908394c69f70bc15194 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Sun, 19 Nov 2017 01:55:40 +0300 Subject: [PATCH 04/13] Refactoring of TcpClient --- telethon/extensions/tcp_client.py | 104 +++++++++++++++--------------- 1 file changed, 51 insertions(+), 53 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 41cec3e5..23268e26 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -6,7 +6,6 @@ import logging from datetime import timedelta from io import BytesIO, BufferedWriter -MAX_TIMEOUT = 15 # in seconds CONN_RESET_ERRNOS = { errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH, errno.EINVAL, errno.ENOTCONN, errno.EHOSTUNREACH, @@ -24,6 +23,8 @@ class TcpClient: self._socket = None self._loop = loop if loop else asyncio.get_event_loop() self._logger = logging.getLogger(__name__) + self._closed = asyncio.Event(loop=self._loop) + self._closed.set() if isinstance(timeout, timedelta): self.timeout = timeout.seconds @@ -54,32 +55,27 @@ class TcpClient: else: mode, address = socket.AF_INET, (ip, port) - timeout = 1 - while True: - try: - if not self._socket: - self._recreate_socket(mode) + try: + if not self._socket: + self._recreate_socket(mode) - await asyncio.wait_for( - self._loop.sock_connect(self._socket, address), - timeout=self.timeout, - loop=self._loop - ) - break # Successful connection, stop retrying to connect - except asyncio.TimeoutError as e: - raise TimeoutError() from e - except OSError as e: - self._logger.debug('Connect exception: %r' % e) - # ConnectionError + (errno.EBADF, errno.ENOTSOCK, errno.EINVAL) - if e.errno in CONN_RESET_ERRNOS: - self._socket = None - await asyncio.sleep(timeout) - timeout = min(timeout * 2, MAX_TIMEOUT) - else: - raise + await asyncio.wait_for( + self._loop.sock_connect(self._socket, address), + timeout=self.timeout, + loop=self._loop + ) + + self._closed.clear() + except asyncio.TimeoutError as e: + raise TimeoutError() from e + except OSError as e: + if e.errno in CONN_RESET_ERRNOS: + self._raise_connection_reset(e) + else: + raise def _get_connected(self): - return self._socket is not None and self._socket.fileno() >= 0 + return not self._closed.is_set() connected = property(fget=_get_connected) @@ -87,60 +83,59 @@ class TcpClient: """Closes the connection""" try: if self._socket is not None: - self._socket.shutdown(socket.SHUT_RDWR) + if self.connected: + self._socket.shutdown(socket.SHUT_RDWR) self._socket.close() except OSError: pass # Ignore ENOTCONN, EBADF, and any other error when closing finally: self._socket = None + self._closed.set() + + async def _wait_close(self, coro): + done, _ = await asyncio.wait( + [coro, self._closed.wait()], + timeout=self.timeout, + return_when=asyncio.FIRST_COMPLETED, + loop=self._loop + ) + if not self.connected: + raise ConnectionResetError('Socket has closed') + if not done: + raise TimeoutError() + return await done.pop() async def write(self, data): """Writes (sends) the specified bytes to the connected peer""" - if self._socket is None: - self._raise_connection_reset() - + if not self.connected: + raise ConnectionResetError('No connection') try: - await asyncio.wait_for( - self.sock_sendall(data), - timeout=self.timeout, - loop=self._loop - ) - except asyncio.TimeoutError as e: - raise TimeoutError() from e + await self._wait_close(self.sock_sendall(data)) except OSError as e: - self._logger.debug('Write exception: %r' % e) if e.errno in CONN_RESET_ERRNOS: - self._raise_connection_reset() + self._raise_connection_reset(e) else: raise async def read(self, size): - """Reads (receives) a whole block of 'size bytes + """Reads (receives) a whole block of size bytes from the connected peer. """ - with BufferedWriter(BytesIO(), buffer_size=size) as buffer: bytes_left = size while bytes_left != 0: + if not self.connected: + raise ConnectionResetError('No connection') try: - if self._socket is None: - self._raise_connection_reset() - partial = await asyncio.wait_for( - self.sock_recv(bytes_left), - timeout=self.timeout, - loop=self._loop - ) - except asyncio.TimeoutError as e: - raise TimeoutError() from e + partial = await self._wait_close(self.sock_recv(bytes_left)) except OSError as e: - self._logger.debug('Read exception: %r' % e) if e.errno in CONN_RESET_ERRNOS: - self._raise_connection_reset() + self._raise_connection_reset(e) else: raise if len(partial) == 0: - self._raise_connection_reset() + self._raise_connection_reset('No data on read') buffer.write(partial) bytes_left -= len(partial) @@ -149,9 +144,12 @@ class TcpClient: buffer.flush() return buffer.raw.getvalue() - def _raise_connection_reset(self): + def _raise_connection_reset(self, error): + description = error if isinstance(error, str) else str(error) + if isinstance(error, str): + error = Exception(error) self.close() # Connection reset -> flag as socket closed - raise ConnectionResetError('The server has closed the connection.') + raise ConnectionResetError(description) from error # due to new https://github.com/python/cpython/pull/4386 def sock_recv(self, n): From 004c92edbe0c290945def496b43fc4ea821e5331 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Sun, 19 Nov 2017 13:04:40 +0300 Subject: [PATCH 05/13] SocketClosed exception --- telethon/extensions/tcp_client.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 23268e26..1a7be86f 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -18,6 +18,9 @@ CONN_RESET_ERRNOS = { class TcpClient: + class SocketClosed(ConnectionError): + pass + def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None): self.proxy = proxy self._socket = None @@ -100,7 +103,7 @@ class TcpClient: loop=self._loop ) if not self.connected: - raise ConnectionResetError('Socket has closed') + raise self.SocketClosed() if not done: raise TimeoutError() return await done.pop() @@ -111,6 +114,8 @@ class TcpClient: raise ConnectionResetError('No connection') try: await self._wait_close(self.sock_sendall(data)) + except self.SocketClosed: + raise ConnectionResetError('Socket has closed') except OSError as e: if e.errno in CONN_RESET_ERRNOS: self._raise_connection_reset(e) @@ -128,6 +133,8 @@ class TcpClient: raise ConnectionResetError('No connection') try: partial = await self._wait_close(self.sock_recv(bytes_left)) + except self.SocketClosed: + raise ConnectionResetError('Socket has closed') except OSError as e: if e.errno in CONN_RESET_ERRNOS: self._raise_connection_reset(e) From 984f483b983ecb81ac68b32be7bf03e9bc5d7444 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Sun, 3 Dec 2017 02:11:50 +0300 Subject: [PATCH 06/13] Handle updates and other refactoring --- telethon/network/mtproto_sender.py | 64 +++++++++++++----------------- telethon/telegram_bare_client.py | 9 ++++- telethon/tl/session.py | 6 ++- 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 332180da..0253f925 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -15,8 +15,8 @@ from ..tl import TLMessage, MessageContainer, GzipPacked from ..tl.all_tlobjects import tlobjects from ..tl.types import ( MsgsAck, Pong, BadServerSalt, BadMsgNotification, - MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo -) + MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo, + RpcError) from ..tl.functions.auth import LogOutRequest logging.getLogger(__name__).addHandler(logging.NullHandler()) @@ -32,14 +32,17 @@ class MtProtoSender: in parallel, so thread-safety (hence locking) isn't needed. """ - def __init__(self, session, connection, loop=None): + def __init__(self, session, connection, updates_handler, loop=None): """Creates a new MtProtoSender configured to send messages through 'connection' and using the parameters from 'session'. """ self.session = session self.connection = connection + self.updates_handler = updates_handler self._loop = loop if loop else asyncio.get_event_loop() self._logger = logging.getLogger(__name__) + self._read_lock = asyncio.Lock(loop=self._loop) + self._write_lock = asyncio.Lock(loop=self._loop) # Requests (as msg_id: Message) sent waiting to be received self._pending_receive = {} @@ -56,10 +59,6 @@ class MtProtoSender: self.connection.close() self._clear_all_pending() - def clone(self): - """Creates a copy of this MtProtoSender as a new connection""" - return MtProtoSender(self.session, self.connection.clone(), self._loop) - # region Send and receive async def send(self, *requests): @@ -93,7 +92,7 @@ class MtProtoSender: """Sends a message acknowledge for the given msg_id""" await self._send_message(TLMessage(self.session, MsgsAck([msg_id]))) - async def receive(self, update_state): + async def receive(self): """Receives a single message from the connected endpoint. This method returns nothing, and will only affect other parts @@ -103,6 +102,7 @@ class MtProtoSender: Any unhandled object (likely updates) will be passed to update_state.process(TLObject). """ + await self._read_lock.acquire() try: body = await self.connection.recv() except (BufferError, InvalidChecksumError): @@ -115,10 +115,12 @@ class MtProtoSender: # and just re-invoke them to avoid problems self._clear_all_pending() return + finally: + self._read_lock.release() message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: - await self._process_msg(remote_msg_id, remote_seq, reader, update_state) + await self._process_msg(remote_msg_id, remote_seq, reader) await self._send_acknowledge(remote_msg_id) # endregion @@ -129,7 +131,7 @@ class MtProtoSender: """Sends the given Message(TLObject) encrypted through the network""" plain_text = \ - struct.pack(' Date: Sat, 9 Dec 2017 21:24:13 +0300 Subject: [PATCH 07/13] Another attempt to prevent duplicates --- telethon/network/mtproto_sender.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 0253f925..58f7103d 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -15,8 +15,8 @@ from ..tl import TLMessage, MessageContainer, GzipPacked from ..tl.all_tlobjects import tlobjects from ..tl.types import ( MsgsAck, Pong, BadServerSalt, BadMsgNotification, - MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo, - RpcError) + MsgNewDetailedInfo, MsgDetailedInfo, RpcError, + MsgsStateReq, MsgsStateInfo, MsgsAllInfo, MsgResendReq) from ..tl.functions.auth import LogOutRequest logging.getLogger(__name__).addHandler(logging.NullHandler()) @@ -121,7 +121,6 @@ class MtProtoSender: message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: await self._process_msg(remote_msg_id, remote_seq, reader) - await self._send_acknowledge(remote_msg_id) # endregion @@ -191,6 +190,7 @@ class MtProtoSender: # The following codes are "parsed manually" if code == 0xf35c6d01: # rpc_result, (response of an RPC call) + await self._send_acknowledge(msg_id) return await self._handle_rpc_result(msg_id, sequence, reader) if code == Pong.CONSTRUCTOR_ID: @@ -205,6 +205,14 @@ class MtProtoSender: if code == BadServerSalt.CONSTRUCTOR_ID: return await self._handle_bad_server_salt(msg_id, sequence, reader) + if code in (MsgsStateReq.CONSTRUCTOR_ID, MsgResendReq.CONSTRUCTOR_ID): + # just answer we don't know anything + return await self._handle_msgs_state_forgotten(msg_id, sequence, reader) + + if code == MsgsAllInfo.CONSTRUCTOR_ID: + # not interesting now + return True + if code == BadMsgNotification.CONSTRUCTOR_ID: return await self._handle_bad_msg_notification(msg_id, sequence, reader) @@ -231,6 +239,7 @@ class MtProtoSender: # If the code is not parsed manually then it should be a TLObject. if code in tlobjects: + await self._send_acknowledge(msg_id) result = reader.tgread_object() self.updates_handler(result) return True @@ -339,6 +348,11 @@ class MtProtoSender: return True + async def _handle_msgs_state_forgotten(self, msg_id, sequence, reader): + req = reader.tgread_object() + await self._send_message(TLMessage(self.session, MsgsStateInfo(msg_id, chr(1) * len(req.msg_ids)))) + return True + async def _handle_bad_msg_notification(self, msg_id, sequence, reader): self._logger.debug('Handling bad message notification') bad_msg = reader.tgread_object() From 2f1d5e277eb80ef3d022d749bb76c771f0f300e3 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Tue, 12 Dec 2017 21:22:42 +0300 Subject: [PATCH 08/13] More accurate clear pendings --- telethon/network/mtproto_sender.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 58f7103d..c547994d 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -54,10 +54,11 @@ class MtProtoSender: def is_connected(self): return self.connection.is_connected() - def disconnect(self): + def disconnect(self, clear_pendings=True): """Disconnects from the server""" self.connection.close() - self._clear_all_pending() + if clear_pendings: + self._clear_all_pending() # region Send and receive @@ -75,6 +76,7 @@ class MtProtoSender: # Finally send our packed request(s) messages = [TLMessage(self.session, r) for r in requests] self._pending_receive.update({m.msg_id: m for m in messages}) + msg_ids = [m.msg_id for m in messages] if len(messages) == 1: message = messages[0] @@ -87,6 +89,12 @@ class MtProtoSender: m.container_msg_id = message.msg_id await self._send_message(message) + return msg_ids + + def forget_pendings(self, msg_ids): + for msg_id in msg_ids: + if msg_id in self._pending_receive: + del self._pending_receive[msg_id] async def _send_acknowledge(self, msg_id): """Sends a message acknowledge for the given msg_id""" From 91e5ef852a61bc3fdf2b21ab77b8e88416c1efe7 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Wed, 17 Jan 2018 15:41:13 +0300 Subject: [PATCH 09/13] Pretty format of TLObject's --- telethon/tl/tlobject.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/telethon/tl/tlobject.py b/telethon/tl/tlobject.py index a15519ae..68c18e9b 100644 --- a/telethon/tl/tlobject.py +++ b/telethon/tl/tlobject.py @@ -132,3 +132,6 @@ class TLObject: @staticmethod def from_reader(reader): return TLObject() + + def __repr__(self): + return self.__str__() From 6cfb829e5880d10bd741578650e42f1616b1a51d Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Sun, 21 Jan 2018 18:57:53 +0300 Subject: [PATCH 10/13] Memory leaks fix --- telethon/extensions/tcp_client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 1a7be86f..5df97169 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -96,17 +96,19 @@ class TcpClient: self._closed.set() async def _wait_close(self, coro): - done, _ = await asyncio.wait( + done, running = await asyncio.wait( [coro, self._closed.wait()], timeout=self.timeout, return_when=asyncio.FIRST_COMPLETED, loop=self._loop ) + for r in running: + r.cancel() if not self.connected: raise self.SocketClosed() if not done: raise TimeoutError() - return await done.pop() + return done.pop().result() async def write(self, data): """Writes (sends) the specified bytes to the connected peer""" From a6c6bc73eb23876dffdf193eba2a6cfa32e5a082 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Tue, 13 Feb 2018 15:28:42 +0300 Subject: [PATCH 11/13] updates_handler is out from MtProtoSender to gc works properly; unauth_handler log format fix --- telethon/network/mtproto_sender.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index c547994d..759b238b 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -32,13 +32,12 @@ class MtProtoSender: in parallel, so thread-safety (hence locking) isn't needed. """ - def __init__(self, session, connection, updates_handler, loop=None): + def __init__(self, session, connection, loop=None): """Creates a new MtProtoSender configured to send messages through 'connection' and using the parameters from 'session'. """ self.session = session self.connection = connection - self.updates_handler = updates_handler self._loop = loop if loop else asyncio.get_event_loop() self._logger = logging.getLogger(__name__) self._read_lock = asyncio.Lock(loop=self._loop) @@ -100,7 +99,7 @@ class MtProtoSender: """Sends a message acknowledge for the given msg_id""" await self._send_message(TLMessage(self.session, MsgsAck([msg_id]))) - async def receive(self): + async def receive(self, updates_handler): """Receives a single message from the connected endpoint. This method returns nothing, and will only affect other parts @@ -128,7 +127,7 @@ class MtProtoSender: message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: - await self._process_msg(remote_msg_id, remote_seq, reader) + await self._process_msg(remote_msg_id, remote_seq, reader, updates_handler) # endregion @@ -184,7 +183,7 @@ class MtProtoSender: return message, remote_msg_id, remote_sequence - async def _process_msg(self, msg_id, sequence, reader): + async def _process_msg(self, msg_id, sequence, reader, updates_handler): """Processes and handles a Telegram message. Returns True if the message was handled correctly and doesn't @@ -205,10 +204,10 @@ class MtProtoSender: return await self._handle_pong(msg_id, sequence, reader) if code == MessageContainer.CONSTRUCTOR_ID: - return await self._handle_container(msg_id, sequence, reader) + return await self._handle_container(msg_id, sequence, reader, updates_handler) if code == GzipPacked.CONSTRUCTOR_ID: - return await self._handle_gzip_packed(msg_id, sequence, reader) + return await self._handle_gzip_packed(msg_id, sequence, reader, updates_handler) if code == BadServerSalt.CONSTRUCTOR_ID: return await self._handle_bad_server_salt(msg_id, sequence, reader) @@ -241,7 +240,7 @@ class MtProtoSender: if r: r.result = True # Telegram won't send this value r.confirm_received.set() - self._logger.debug('Message ack confirmed', r) + self._logger.debug('Message ack confirmed: %r', r) return True @@ -249,7 +248,7 @@ class MtProtoSender: if code in tlobjects: await self._send_acknowledge(msg_id) result = reader.tgread_object() - self.updates_handler(result) + updates_handler(result) return True self._logger.debug( @@ -324,7 +323,7 @@ class MtProtoSender: return True - async def _handle_container(self, msg_id, sequence, reader): + async def _handle_container(self, msg_id, sequence, reader, updates_handler): self._logger.debug('Handling container') for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader): begin_position = reader.tell_position() @@ -332,7 +331,7 @@ class MtProtoSender: # Note that this code is IMPORTANT for skipping RPC results of # lost requests (i.e., ones from the previous connection session) try: - if not await self._process_msg(inner_msg_id, sequence, reader): + if not await self._process_msg(inner_msg_id, sequence, reader, updates_handler): reader.set_position(begin_position + inner_len) except: # If any error is raised, something went wrong; skip the packet @@ -453,9 +452,9 @@ class MtProtoSender: self._logger.debug('Lost request will be skipped.') return False - async def _handle_gzip_packed(self, msg_id, sequence, reader): + async def _handle_gzip_packed(self, msg_id, sequence, reader, updates_handler): self._logger.debug('Handling gzip packed data') with BinaryReader(GzipPacked.read(reader)) as compressed_reader: - return await self._process_msg(msg_id, sequence, compressed_reader) + return await self._process_msg(msg_id, sequence, compressed_reader, updates_handler) # endregion From 236fccea7f81b30fb7aea88f0be77609462cdaf8 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Sun, 18 Mar 2018 20:41:00 +0300 Subject: [PATCH 12/13] Very rare exception in the case of reconnect --- telethon/extensions/tcp_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index 5df97169..80a84958 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -169,7 +169,7 @@ class TcpClient: def _sock_recv(self, fut, registered_fd, n): if registered_fd is not None: self._loop.remove_reader(registered_fd) - if fut.cancelled(): + if fut.cancelled() or self._socket is None: return try: @@ -193,7 +193,7 @@ class TcpClient: def _sock_sendall(self, fut, registered_fd, data): if registered_fd: self._loop.remove_writer(registered_fd) - if fut.cancelled(): + if fut.cancelled() or self._socket is None: return try: From f5a7a8da45cd990898b96ff87311fcf4e3b9a7e3 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Fri, 8 Jun 2018 01:33:11 +0300 Subject: [PATCH 13/13] Not need to save (salt is out of DB) --- telethon/network/mtproto_sender.py | 1 - 1 file changed, 1 deletion(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 759b238b..d3aa2255 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -347,7 +347,6 @@ class MtProtoSender: # Our salt is unsigned, but the objects work with signed salts self.session.salt = bad_salt.new_server_salt - self.session.save() # "the bad_server_salt response is received with the # correct salt, and the message is to be re-sent with it"