updates_handler is out from MtProtoSender to gc works properly; unauth_handler log format fix

This commit is contained in:
Andrey Egorov 2018-02-13 15:28:42 +03:00
parent 6cfb829e58
commit a6c6bc73eb

View File

@ -32,13 +32,12 @@ class MtProtoSender:
in parallel, so thread-safety (hence locking) isn't needed. in parallel, so thread-safety (hence locking) isn't needed.
""" """
def __init__(self, session, connection, updates_handler, loop=None): def __init__(self, session, connection, loop=None):
"""Creates a new MtProtoSender configured to send messages through """Creates a new MtProtoSender configured to send messages through
'connection' and using the parameters from 'session'. 'connection' and using the parameters from 'session'.
""" """
self.session = session self.session = session
self.connection = connection self.connection = connection
self.updates_handler = updates_handler
self._loop = loop if loop else asyncio.get_event_loop() self._loop = loop if loop else asyncio.get_event_loop()
self._logger = logging.getLogger(__name__) self._logger = logging.getLogger(__name__)
self._read_lock = asyncio.Lock(loop=self._loop) self._read_lock = asyncio.Lock(loop=self._loop)
@ -100,7 +99,7 @@ class MtProtoSender:
"""Sends a message acknowledge for the given msg_id""" """Sends a message acknowledge for the given msg_id"""
await self._send_message(TLMessage(self.session, MsgsAck([msg_id]))) await self._send_message(TLMessage(self.session, MsgsAck([msg_id])))
async def receive(self): async def receive(self, updates_handler):
"""Receives a single message from the connected endpoint. """Receives a single message from the connected endpoint.
This method returns nothing, and will only affect other parts This method returns nothing, and will only affect other parts
@ -128,7 +127,7 @@ class MtProtoSender:
message, remote_msg_id, remote_seq = self._decode_msg(body) message, remote_msg_id, remote_seq = self._decode_msg(body)
with BinaryReader(message) as reader: with BinaryReader(message) as reader:
await self._process_msg(remote_msg_id, remote_seq, reader) await self._process_msg(remote_msg_id, remote_seq, reader, updates_handler)
# endregion # endregion
@ -184,7 +183,7 @@ class MtProtoSender:
return message, remote_msg_id, remote_sequence return message, remote_msg_id, remote_sequence
async def _process_msg(self, msg_id, sequence, reader): async def _process_msg(self, msg_id, sequence, reader, updates_handler):
"""Processes and handles a Telegram message. """Processes and handles a Telegram message.
Returns True if the message was handled correctly and doesn't Returns True if the message was handled correctly and doesn't
@ -205,10 +204,10 @@ class MtProtoSender:
return await self._handle_pong(msg_id, sequence, reader) return await self._handle_pong(msg_id, sequence, reader)
if code == MessageContainer.CONSTRUCTOR_ID: if code == MessageContainer.CONSTRUCTOR_ID:
return await self._handle_container(msg_id, sequence, reader) return await self._handle_container(msg_id, sequence, reader, updates_handler)
if code == GzipPacked.CONSTRUCTOR_ID: if code == GzipPacked.CONSTRUCTOR_ID:
return await self._handle_gzip_packed(msg_id, sequence, reader) return await self._handle_gzip_packed(msg_id, sequence, reader, updates_handler)
if code == BadServerSalt.CONSTRUCTOR_ID: if code == BadServerSalt.CONSTRUCTOR_ID:
return await self._handle_bad_server_salt(msg_id, sequence, reader) return await self._handle_bad_server_salt(msg_id, sequence, reader)
@ -241,7 +240,7 @@ class MtProtoSender:
if r: if r:
r.result = True # Telegram won't send this value r.result = True # Telegram won't send this value
r.confirm_received.set() r.confirm_received.set()
self._logger.debug('Message ack confirmed', r) self._logger.debug('Message ack confirmed: %r', r)
return True return True
@ -249,7 +248,7 @@ class MtProtoSender:
if code in tlobjects: if code in tlobjects:
await self._send_acknowledge(msg_id) await self._send_acknowledge(msg_id)
result = reader.tgread_object() result = reader.tgread_object()
self.updates_handler(result) updates_handler(result)
return True return True
self._logger.debug( self._logger.debug(
@ -324,7 +323,7 @@ class MtProtoSender:
return True return True
async def _handle_container(self, msg_id, sequence, reader): async def _handle_container(self, msg_id, sequence, reader, updates_handler):
self._logger.debug('Handling container') self._logger.debug('Handling container')
for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader): for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader):
begin_position = reader.tell_position() begin_position = reader.tell_position()
@ -332,7 +331,7 @@ class MtProtoSender:
# Note that this code is IMPORTANT for skipping RPC results of # Note that this code is IMPORTANT for skipping RPC results of
# lost requests (i.e., ones from the previous connection session) # lost requests (i.e., ones from the previous connection session)
try: try:
if not await self._process_msg(inner_msg_id, sequence, reader): if not await self._process_msg(inner_msg_id, sequence, reader, updates_handler):
reader.set_position(begin_position + inner_len) reader.set_position(begin_position + inner_len)
except: except:
# If any error is raised, something went wrong; skip the packet # If any error is raised, something went wrong; skip the packet
@ -453,9 +452,9 @@ class MtProtoSender:
self._logger.debug('Lost request will be skipped.') self._logger.debug('Lost request will be skipped.')
return False return False
async def _handle_gzip_packed(self, msg_id, sequence, reader): async def _handle_gzip_packed(self, msg_id, sequence, reader, updates_handler):
self._logger.debug('Handling gzip packed data') self._logger.debug('Handling gzip packed data')
with BinaryReader(GzipPacked.read(reader)) as compressed_reader: with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
return await self._process_msg(msg_id, sequence, compressed_reader) return await self._process_msg(msg_id, sequence, compressed_reader, updates_handler)
# endregion # endregion