diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index 60759350..9ac2dc35 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -102,6 +102,7 @@ class MTProtoSender: not exist yet. """ if self._user_connected: + __log__.info('User is already connected!') return self._ip = ip @@ -110,42 +111,59 @@ class MTProtoSender: await self._connect() async def _connect(self): + __log__.info('Connecting to {}:{}...'.format(self._ip, self._port)) _last_error = ConnectionError() - for _ in range(self._retries): + for retry in range(1, self._retries + 1): try: + __log__.debug('Connection attempt {}...'.format(retry)) async with self._send_lock: await self._connection.connect(self._ip, self._port) except OSError as e: _last_error = e + __log__.warning('Attempt {} at connecting failed: {}' + .format(retry, e)) else: break else: raise _last_error + __log__.debug('Connection success!') if self.session.auth_key is None: _last_error = SecurityError() plain = MTProtoPlainSender(self._connection) - for _ in range(self._retries): + for retry in range(1, self._retries + 1): try: + __log__.debug('New auth_key attempt {}...'.format(retry)) self.session.auth_key, self.session.time_offset =\ await authenticator.do_authentication(plain) except (SecurityError, AssertionError) as e: _last_error = e + __log__.warning('Attempt {} at new auth_key failed: {}' + .format(retry, e)) else: break else: raise _last_error + __log__.debug('Starting send loop') self._send_loop_handle = asyncio.ensure_future(self._send_loop()) + __log__.debug('Starting receive loop') self._recv_loop_handle = asyncio.ensure_future(self._recv_loop()) + __log__.info('Connection to {} complete!'.format(self._ip)) async def _reconnect(self): """ Cleanly disconnects and then reconnects. """ self._reconnecting = True + + __log__.debug('Awaiting for the send loop before reconnecting...') await self._send_loop_handle + + __log__.debug('Awaiting for the receive loop before reconnecting...') await self._recv_loop_handle + + __log__.debug('Closing current connection...') async with self._send_lock: await self._connection.close() @@ -158,21 +176,32 @@ class MTProtoSender: all pending requests, and closes the send and receive loops. """ if not self._user_connected: + __log__.info('User is already disconnected!') return + __log__.info('Disconnecting from {}...'.format(self._ip)) self._user_connected = False try: + __log__.debug('Closing current connection...') async with self._send_lock: await self._connection.close() finally: + __log__.debug('Cancelling {} pending message(s)...' + .format(len(self._pending_messages))) for message in self._pending_messages.values(): message.future.cancel() self._pending_messages.clear() self._pending_ack.clear() + + __log__.debug('Cancelling the send loop...') self._send_loop_handle.cancel() + + __log__.debug('Cancelling the receive loop...') self._recv_loop_handle.cancel() + __log__.info('Disconnection from {} complete!'.format(self._ip)) + async def send(self, request, ordered=False): """ This method enqueues the given request to be sent. @@ -245,11 +274,14 @@ class MTProtoSender: message = messages messages = [message] + __log__.debug('Packing {} outgoing message(s)...' + .format(len(messages))) body = helpers.pack_message(self.session, message) while not any(m.future.cancelled() for m in messages): try: async with self._send_lock: + __log__.debug('Sending {} bytes...', len(body)) await self._connection.send(body) break # TODO Are there more exceptions besides timeout? @@ -257,6 +289,7 @@ class MTProtoSender: continue else: # Remove the cancelled messages from pending + __log__.info('Some futures were cancelled, aborted send') self._clean_containers([m.msg_id for m in messages]) for m in messages: if m.future.cancelled(): @@ -264,6 +297,8 @@ class MTProtoSender: else: await self._send_queue.put(m) + __log__.debug('Outgoing messages sent!') + async def _recv_loop(self): """ This loop is responsible for reading all incoming responses @@ -277,19 +312,23 @@ class MTProtoSender: # timeouts, and once the network was back it continued # on its own after a short delay. try: + __log__.debug('Receiving items from the network...') async with self._recv_lock: body = await self._connection.recv() except asyncio.TimeoutError: + # TODO If nothing is received for a minute, send a request continue - except ConnectionError: + except ConnectionError as e: + __log__.info('Connection reset while receiving: {}'.format(e)) asyncio.ensure_future(self._reconnect()) break # TODO Check salt, session_id and sequence_number + __log__.debug('Decoding packet of {} bytes...'.format(len(body))) try: message, remote_msg_id, remote_seq =\ helpers.unpack_message(self.session, body) - except (BrokenAuthKeyError, BufferError): + except (BrokenAuthKeyError, BufferError) as e: # The authorization key may be broken if a message was # sent malformed, or if the authkey truly is corrupted. # @@ -299,18 +338,24 @@ class MTProtoSender: # # TODO Is it possible to detect malformed messages vs # an actually broken authkey? + __log__.warning('Broken authorization key?: {}'.format(e)) self.session.auth_key = None asyncio.ensure_future(self._reconnect()) break - except SecurityError: + except SecurityError as e: # A step while decoding had the incorrect data. This message # should not be considered safe and it should be ignored. - # TODO Maybe we should check if the message was decoded OK + __log__.warning('Security error while unpacking a ' + 'received message:'.format(e)) continue else: - with BinaryReader(message) as reader: - await self._process_message( - remote_msg_id, remote_seq, reader) + try: + with BinaryReader(message) as reader: + await self._process_message( + remote_msg_id, remote_seq, reader) + except TypeNotFoundError as e: + __log__.warning('Could not decode received message: {}, ' + 'raw bytes: {!r}'.format(e, message)) # Response Handlers @@ -340,6 +385,7 @@ class MTProtoSender: inner_code = reader.read_int(signed=False) reader.seek(-4) + __log__.debug('Handling RPC result for message {}'.format(message_id)) message = self._pending_messages.pop(message_id, None) if inner_code == 0x2144ca19: # RPC Error reader.seek(4) @@ -372,14 +418,8 @@ class MTProtoSender: return else: # TODO We should not get responses to things we never sent - try: - if inner_code == GzipPacked.CONSTRUCTOR_ID: - with BinaryReader(GzipPacked.read(reader)) as creader: - obj = creader.tgread_object() - else: - obj = reader.tgread_object() - except TypeNotFoundError: - pass + __log__.info('Received response without parent request: {}' + .format(reader.tgread_object())) async def _handle_container(self, msg_id, seq, reader): """ @@ -387,6 +427,7 @@ class MTProtoSender: msg_container#73f1f8dc messages:vector<%Message> = MessageContainer; """ + __log__.debug('Handling container') for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader): next_position = reader.tell_position() + inner_len await self._process_message(inner_msg_id, seq, reader) @@ -398,14 +439,13 @@ class MTProtoSender: gzip_packed#3072cfa1 packed_data:bytes = Object; """ + __log__.debug('Handling gzipped data') with BinaryReader(GzipPacked.read(reader)) as compressed_reader: await self._process_message(msg_id, seq, compressed_reader) async def _handle_update(self, msg_id, seq, reader): - try: - obj = reader.tgread_object() - except TypeNotFoundError: - return + obj = reader.tgread_object() + __log__.debug('Handling update {}'.format(obj.__class__.__name__)) # TODO Further handling of the update self.session.process_entities(obj) @@ -417,6 +457,7 @@ class MTProtoSender: pong#347773c5 msg_id:long ping_id:long = Pong; """ + __log__.debug('Handling pong') pong = reader.tgread_object() message = self._pending_messages.pop(pong.msg_id, None) if message: @@ -430,6 +471,7 @@ class MTProtoSender: bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int error_code:int new_server_salt:long = BadMsgNotification; """ + __log__.debug('Handling bad salt') bad_salt = reader.tgread_object() self.session.salt = bad_salt.new_server_salt self.session.save() @@ -443,6 +485,7 @@ class MTProtoSender: bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int error_code:int = BadMsgNotification; """ + __log__.debug('Handling bad message') bad_msg = reader.tgread_object() if bad_msg.error_code in (16, 17): # Sent msg_id too low or too high (respectively). @@ -472,6 +515,7 @@ class MTProtoSender: bytes:int status:int = MsgDetailedInfo; """ # TODO https://goo.gl/VvpCC6 + __log__.debug('Handling detailed info') self._pending_ack.add(reader.tgread_object().answer_msg_id) async def _handle_new_detailed_info(self, msg_id, seq, reader): @@ -482,6 +526,7 @@ class MTProtoSender: bytes:int status:int = MsgDetailedInfo; """ # TODO https://goo.gl/G7DPsR + __log__.debug('Handling new detailed info') self._pending_ack.add(reader.tgread_object().answer_msg_id) async def _handle_new_session_created(self, msg_id, seq, reader): @@ -492,6 +537,7 @@ class MTProtoSender: server_salt:long = NewSession; """ # TODO https://goo.gl/LMyN7A + __log__.debug('Handling new session created') self.session.salt = reader.tgread_object().server_salt def _clean_containers(self, msg_ids): @@ -526,6 +572,7 @@ class MTProtoSender: also removes containers messages when any of their inner messages are acknowledged. """ + __log__.debug('Handling acknowledge') ack = reader.tgread_object() if self._pending_containers: self._clean_containers(ack.msg_ids) @@ -546,6 +593,7 @@ class MTProtoSender: """ # TODO save these salts and automatically adjust to the # correct one whenever the salt in use expires. + __log__.debug('Handling future salts') salts = reader.tgread_object() msg = self._pending_messages.pop(msg_id, None) if msg: