From 1232e8f607594407bada91e195194e20352355bf Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sun, 9 Apr 2017 13:14:04 +0200 Subject: [PATCH] Add logging to MTProtoSender --- telethon/log.py | 21 ++++++++++++++++++ telethon/network/mtproto_sender.py | 35 ++++++++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 telethon/log.py diff --git a/telethon/log.py b/telethon/log.py new file mode 100644 index 00000000..cf9d35dd --- /dev/null +++ b/telethon/log.py @@ -0,0 +1,21 @@ +import sys +import logging + + +for arg in sys.argv: + if arg.startswith('--telethon-log='): + level = getattr(logging, arg.split('=')[1], None) + if not isinstance(level, int): + raise ValueError('Invalid log level: %s' % level) + print('Using log level', level, 'which is', arg.split('=')[1]) + logging.basicConfig(level=level) + + +class Logger: + def __init__(self): + setattr(self, 'd', logging.debug) + setattr(self, 'i', logging.info) + setattr(self, 'w', logging.warning) + setattr(self, 'e', logging.error) + +Log = Logger() diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 7d61aebd..a14f6297 100755 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -6,6 +6,7 @@ from time import sleep, time import telethon.helpers as utils from telethon.crypto import AES from telethon.errors import * +from telethon.log import Log from telethon.tl.all_tlobjects import tlobjects from telethon.tl.types import MsgsAck from telethon.tl.functions import PingRequest @@ -90,10 +91,12 @@ class MtProtoSender: # updates thread who was receiving. We do # not want to cancel other pending requests! if self.updates_thread_receiving: + Log.i('Cancelling updates receive from send()...') self.transport.cancel_receive() # Now only us can be using this method with self.lock: + Log.d('send() acquired the lock') # Set the flag to true so the updates thread stops trying to receive self.waiting_receive = True @@ -114,6 +117,8 @@ class MtProtoSender: # And update the saved session self.session.save() + Log.d('send() released the lock') + def receive(self, request, timeout=timedelta(seconds=5)): """Receives the specified MTProtoRequest ("fills in it" the received data). This also restores the updates thread. @@ -121,8 +126,10 @@ class MtProtoSender: if no data has been read after its time delta""" with self.lock: + Log.d('receive() acquired the lock') # Don't stop trying to receive until we get the request we wanted while not request.confirm_received: + Log.i('Trying to .receive() the request result...') seq, body = self.transport.receive(timeout) message, remote_msg_id, remote_sequence = self.decode_msg(body) @@ -130,8 +137,11 @@ class MtProtoSender: self.process_msg(remote_msg_id, remote_sequence, reader, request) + Log.i('Request result received') + # We can now set the flag to False thus resuming the updates thread self.waiting_receive = False + Log.d('receive() released the lock') # endregion @@ -221,6 +231,7 @@ class MtProtoSender: if self.ack_requests_confirm and code == 0x62d6b459: ack = reader.tgread_object() if request and request.msg_id in ack.msg_ids: + Log.w('Message ack confirmed a request') request.confirm_received = True return False @@ -239,21 +250,25 @@ class MtProtoSender: def handle_update(self, msg_id, sequence, reader): tlobject = reader.tgread_object() + Log.d('Handling update for object %s', repr(tlobject)) for handler in self.on_update_handlers: handler(tlobject) return False def handle_pong(self, msg_id, sequence, reader, request): + Log.d('Handling pong') reader.read_int(signed=False) # code recv_msg_id = reader.read_long(signed=False) if recv_msg_id == request.msg_id: + Log.w('Pong confirmed a request') request.confirm_received = True return False def handle_container(self, msg_id, sequence, reader, request): + Log.d('Handling container') reader.read_int(signed=False) # code size = reader.read_int() for _ in range(size): @@ -268,6 +283,7 @@ class MtProtoSender: return False def handle_bad_server_salt(self, msg_id, sequence, reader, request): + Log.d('Handling bad server salt') reader.read_int(signed=False) # code reader.read_long(signed=False) # bad_msg_id reader.read_int() # bad_msg_seq_no @@ -286,6 +302,7 @@ class MtProtoSender: return True def handle_bad_msg_notification(self, msg_id, sequence, reader): + Log.d('Handling bad message notification') reader.read_int(signed=False) # code reader.read_long(signed=False) # request_id reader.read_int() # request_sequence @@ -294,6 +311,7 @@ class MtProtoSender: raise BadMessageError(error_code) def handle_rpc_result(self, msg_id, sequence, reader, request): + Log.d('Handling RPC result, request is%s None', ' not' if request else '') reader.read_int(signed=False) # code request_id = reader.read_long(signed=False) inner_code = reader.read_int(signed=False) @@ -304,6 +322,8 @@ class MtProtoSender: if inner_code == 0x2144ca19: # RPC Error error = RPCError( code=reader.read_int(), message=reader.tgread_string()) + + Log.w('Read RPC error: %s', str(error)) if error.must_resend: if not request: raise ValueError( @@ -326,6 +346,7 @@ class MtProtoSender: raise ValueError( 'Cannot receive a request from inside an RPC result from the updates thread.') + Log.d('Reading request response') if inner_code == 0x3072cfa1: # GZip packed unpacked_data = gzip.decompress(reader.tgread_bytes()) with BinaryReader(unpacked_data) as compressed_reader: @@ -335,6 +356,7 @@ class MtProtoSender: request.on_response(reader) def handle_gzip_packed(self, msg_id, sequence, reader, request): + Log.d('Handling gzip packed data') reader.read_int(signed=False) # code packed_data = reader.tgread_bytes() unpacked_data = gzip.decompress(packed_data) @@ -351,6 +373,7 @@ class MtProtoSender: return # Different state, update the saved value and behave as required + Log.i('Changing updates thread running status to %s', running) self.updates_thread_running = running if running: self.updates_thread.start() @@ -368,29 +391,37 @@ class MtProtoSender: # Only try to receive updates if we're not waiting to receive a request if not self.waiting_receive: with self.lock: + Log.d('Updates thread acquired the lock') try: now = time() # If ping_interval seconds passed since last ping, send a new one if now >= self.ping_time_last + self.ping_interval: self.ping_time_last = now self.send_ping() + Log.d('Ping sent from the updates thread') # Exit the loop if we're not expecting to receive any updates if not self.on_update_handlers: + Log.d('No updates handlers found, continuing') continue self.updates_thread_receiving = True + Log.d('Trying to receive updates from the updates thread') seq, body = self.transport.receive(timeout) message, remote_msg_id, remote_sequence = self.decode_msg( body) + Log.i('Received update from the updates thread') with BinaryReader(message) as reader: self.process_msg(remote_msg_id, remote_sequence, reader) - except (ReadCancelledError, TimeoutError): - pass + except TimeoutError: + Log.d('Receiving updates timed out') + except ReadCancelledError: + Log.i('Receiving updates cancelled') + Log.d('Updates thread released the lock') self.updates_thread_receiving = False # If we are here, it is because the read was cancelled