From a6c6bc73eb23876dffdf193eba2a6cfa32e5a082 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Tue, 13 Feb 2018 15:28:42 +0300 Subject: [PATCH] updates_handler is out from MtProtoSender to gc works properly; unauth_handler log format fix --- telethon/network/mtproto_sender.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index c547994d..759b238b 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -32,13 +32,12 @@ class MtProtoSender: in parallel, so thread-safety (hence locking) isn't needed. """ - def __init__(self, session, connection, updates_handler, loop=None): + def __init__(self, session, connection, loop=None): """Creates a new MtProtoSender configured to send messages through 'connection' and using the parameters from 'session'. """ self.session = session self.connection = connection - self.updates_handler = updates_handler self._loop = loop if loop else asyncio.get_event_loop() self._logger = logging.getLogger(__name__) self._read_lock = asyncio.Lock(loop=self._loop) @@ -100,7 +99,7 @@ class MtProtoSender: """Sends a message acknowledge for the given msg_id""" await self._send_message(TLMessage(self.session, MsgsAck([msg_id]))) - async def receive(self): + async def receive(self, updates_handler): """Receives a single message from the connected endpoint. This method returns nothing, and will only affect other parts @@ -128,7 +127,7 @@ class MtProtoSender: message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: - await self._process_msg(remote_msg_id, remote_seq, reader) + await self._process_msg(remote_msg_id, remote_seq, reader, updates_handler) # endregion @@ -184,7 +183,7 @@ class MtProtoSender: return message, remote_msg_id, remote_sequence - async def _process_msg(self, msg_id, sequence, reader): + async def _process_msg(self, msg_id, sequence, reader, updates_handler): """Processes and handles a Telegram message. Returns True if the message was handled correctly and doesn't @@ -205,10 +204,10 @@ class MtProtoSender: return await self._handle_pong(msg_id, sequence, reader) if code == MessageContainer.CONSTRUCTOR_ID: - return await self._handle_container(msg_id, sequence, reader) + return await self._handle_container(msg_id, sequence, reader, updates_handler) if code == GzipPacked.CONSTRUCTOR_ID: - return await self._handle_gzip_packed(msg_id, sequence, reader) + return await self._handle_gzip_packed(msg_id, sequence, reader, updates_handler) if code == BadServerSalt.CONSTRUCTOR_ID: return await self._handle_bad_server_salt(msg_id, sequence, reader) @@ -241,7 +240,7 @@ class MtProtoSender: if r: r.result = True # Telegram won't send this value r.confirm_received.set() - self._logger.debug('Message ack confirmed', r) + self._logger.debug('Message ack confirmed: %r', r) return True @@ -249,7 +248,7 @@ class MtProtoSender: if code in tlobjects: await self._send_acknowledge(msg_id) result = reader.tgread_object() - self.updates_handler(result) + updates_handler(result) return True self._logger.debug( @@ -324,7 +323,7 @@ class MtProtoSender: return True - async def _handle_container(self, msg_id, sequence, reader): + async def _handle_container(self, msg_id, sequence, reader, updates_handler): self._logger.debug('Handling container') for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader): begin_position = reader.tell_position() @@ -332,7 +331,7 @@ class MtProtoSender: # Note that this code is IMPORTANT for skipping RPC results of # lost requests (i.e., ones from the previous connection session) try: - if not await self._process_msg(inner_msg_id, sequence, reader): + if not await self._process_msg(inner_msg_id, sequence, reader, updates_handler): reader.set_position(begin_position + inner_len) except: # If any error is raised, something went wrong; skip the packet @@ -453,9 +452,9 @@ class MtProtoSender: self._logger.debug('Lost request will be skipped.') return False - async def _handle_gzip_packed(self, msg_id, sequence, reader): + async def _handle_gzip_packed(self, msg_id, sequence, reader, updates_handler): self._logger.debug('Handling gzip packed data') with BinaryReader(GzipPacked.read(reader)) as compressed_reader: - return await self._process_msg(msg_id, sequence, compressed_reader) + return await self._process_msg(msg_id, sequence, compressed_reader, updates_handler) # endregion