Add logging to MTProtoSender

This commit is contained in:
Lonami Exo 2017-04-09 13:14:04 +02:00
parent 6add278f07
commit 1232e8f607
2 changed files with 54 additions and 2 deletions

21
telethon/log.py Normal file
View File

@ -0,0 +1,21 @@
import sys
import logging
for arg in sys.argv:
if arg.startswith('--telethon-log='):
level = getattr(logging, arg.split('=')[1], None)
if not isinstance(level, int):
raise ValueError('Invalid log level: %s' % level)
print('Using log level', level, 'which is', arg.split('=')[1])
logging.basicConfig(level=level)
class Logger:
def __init__(self):
setattr(self, 'd', logging.debug)
setattr(self, 'i', logging.info)
setattr(self, 'w', logging.warning)
setattr(self, 'e', logging.error)
Log = Logger()

View File

@ -6,6 +6,7 @@ from time import sleep, time
import telethon.helpers as utils import telethon.helpers as utils
from telethon.crypto import AES from telethon.crypto import AES
from telethon.errors import * from telethon.errors import *
from telethon.log import Log
from telethon.tl.all_tlobjects import tlobjects from telethon.tl.all_tlobjects import tlobjects
from telethon.tl.types import MsgsAck from telethon.tl.types import MsgsAck
from telethon.tl.functions import PingRequest from telethon.tl.functions import PingRequest
@ -90,10 +91,12 @@ class MtProtoSender:
# updates thread who was receiving. We do # updates thread who was receiving. We do
# not want to cancel other pending requests! # not want to cancel other pending requests!
if self.updates_thread_receiving: if self.updates_thread_receiving:
Log.i('Cancelling updates receive from send()...')
self.transport.cancel_receive() self.transport.cancel_receive()
# Now only us can be using this method # Now only us can be using this method
with self.lock: with self.lock:
Log.d('send() acquired the lock')
# Set the flag to true so the updates thread stops trying to receive # Set the flag to true so the updates thread stops trying to receive
self.waiting_receive = True self.waiting_receive = True
@ -114,6 +117,8 @@ class MtProtoSender:
# And update the saved session # And update the saved session
self.session.save() self.session.save()
Log.d('send() released the lock')
def receive(self, request, timeout=timedelta(seconds=5)): def receive(self, request, timeout=timedelta(seconds=5)):
"""Receives the specified MTProtoRequest ("fills in it" """Receives the specified MTProtoRequest ("fills in it"
the received data). This also restores the updates thread. the received data). This also restores the updates thread.
@ -121,8 +126,10 @@ class MtProtoSender:
if no data has been read after its time delta""" if no data has been read after its time delta"""
with self.lock: with self.lock:
Log.d('receive() acquired the lock')
# Don't stop trying to receive until we get the request we wanted # Don't stop trying to receive until we get the request we wanted
while not request.confirm_received: while not request.confirm_received:
Log.i('Trying to .receive() the request result...')
seq, body = self.transport.receive(timeout) seq, body = self.transport.receive(timeout)
message, remote_msg_id, remote_sequence = self.decode_msg(body) message, remote_msg_id, remote_sequence = self.decode_msg(body)
@ -130,8 +137,11 @@ class MtProtoSender:
self.process_msg(remote_msg_id, remote_sequence, reader, self.process_msg(remote_msg_id, remote_sequence, reader,
request) request)
Log.i('Request result received')
# We can now set the flag to False thus resuming the updates thread # We can now set the flag to False thus resuming the updates thread
self.waiting_receive = False self.waiting_receive = False
Log.d('receive() released the lock')
# endregion # endregion
@ -221,6 +231,7 @@ class MtProtoSender:
if self.ack_requests_confirm and code == 0x62d6b459: if self.ack_requests_confirm and code == 0x62d6b459:
ack = reader.tgread_object() ack = reader.tgread_object()
if request and request.msg_id in ack.msg_ids: if request and request.msg_id in ack.msg_ids:
Log.w('Message ack confirmed a request')
request.confirm_received = True request.confirm_received = True
return False return False
@ -239,21 +250,25 @@ class MtProtoSender:
def handle_update(self, msg_id, sequence, reader): def handle_update(self, msg_id, sequence, reader):
tlobject = reader.tgread_object() tlobject = reader.tgread_object()
Log.d('Handling update for object %s', repr(tlobject))
for handler in self.on_update_handlers: for handler in self.on_update_handlers:
handler(tlobject) handler(tlobject)
return False return False
def handle_pong(self, msg_id, sequence, reader, request): def handle_pong(self, msg_id, sequence, reader, request):
Log.d('Handling pong')
reader.read_int(signed=False) # code reader.read_int(signed=False) # code
recv_msg_id = reader.read_long(signed=False) recv_msg_id = reader.read_long(signed=False)
if recv_msg_id == request.msg_id: if recv_msg_id == request.msg_id:
Log.w('Pong confirmed a request')
request.confirm_received = True request.confirm_received = True
return False return False
def handle_container(self, msg_id, sequence, reader, request): def handle_container(self, msg_id, sequence, reader, request):
Log.d('Handling container')
reader.read_int(signed=False) # code reader.read_int(signed=False) # code
size = reader.read_int() size = reader.read_int()
for _ in range(size): for _ in range(size):
@ -268,6 +283,7 @@ class MtProtoSender:
return False return False
def handle_bad_server_salt(self, msg_id, sequence, reader, request): def handle_bad_server_salt(self, msg_id, sequence, reader, request):
Log.d('Handling bad server salt')
reader.read_int(signed=False) # code reader.read_int(signed=False) # code
reader.read_long(signed=False) # bad_msg_id reader.read_long(signed=False) # bad_msg_id
reader.read_int() # bad_msg_seq_no reader.read_int() # bad_msg_seq_no
@ -286,6 +302,7 @@ class MtProtoSender:
return True return True
def handle_bad_msg_notification(self, msg_id, sequence, reader): def handle_bad_msg_notification(self, msg_id, sequence, reader):
Log.d('Handling bad message notification')
reader.read_int(signed=False) # code reader.read_int(signed=False) # code
reader.read_long(signed=False) # request_id reader.read_long(signed=False) # request_id
reader.read_int() # request_sequence reader.read_int() # request_sequence
@ -294,6 +311,7 @@ class MtProtoSender:
raise BadMessageError(error_code) raise BadMessageError(error_code)
def handle_rpc_result(self, msg_id, sequence, reader, request): def handle_rpc_result(self, msg_id, sequence, reader, request):
Log.d('Handling RPC result, request is%s None', ' not' if request else '')
reader.read_int(signed=False) # code reader.read_int(signed=False) # code
request_id = reader.read_long(signed=False) request_id = reader.read_long(signed=False)
inner_code = reader.read_int(signed=False) inner_code = reader.read_int(signed=False)
@ -304,6 +322,8 @@ class MtProtoSender:
if inner_code == 0x2144ca19: # RPC Error if inner_code == 0x2144ca19: # RPC Error
error = RPCError( error = RPCError(
code=reader.read_int(), message=reader.tgread_string()) code=reader.read_int(), message=reader.tgread_string())
Log.w('Read RPC error: %s', str(error))
if error.must_resend: if error.must_resend:
if not request: if not request:
raise ValueError( raise ValueError(
@ -326,6 +346,7 @@ class MtProtoSender:
raise ValueError( raise ValueError(
'Cannot receive a request from inside an RPC result from the updates thread.') 'Cannot receive a request from inside an RPC result from the updates thread.')
Log.d('Reading request response')
if inner_code == 0x3072cfa1: # GZip packed if inner_code == 0x3072cfa1: # GZip packed
unpacked_data = gzip.decompress(reader.tgread_bytes()) unpacked_data = gzip.decompress(reader.tgread_bytes())
with BinaryReader(unpacked_data) as compressed_reader: with BinaryReader(unpacked_data) as compressed_reader:
@ -335,6 +356,7 @@ class MtProtoSender:
request.on_response(reader) request.on_response(reader)
def handle_gzip_packed(self, msg_id, sequence, reader, request): def handle_gzip_packed(self, msg_id, sequence, reader, request):
Log.d('Handling gzip packed data')
reader.read_int(signed=False) # code reader.read_int(signed=False) # code
packed_data = reader.tgread_bytes() packed_data = reader.tgread_bytes()
unpacked_data = gzip.decompress(packed_data) unpacked_data = gzip.decompress(packed_data)
@ -351,6 +373,7 @@ class MtProtoSender:
return return
# Different state, update the saved value and behave as required # Different state, update the saved value and behave as required
Log.i('Changing updates thread running status to %s', running)
self.updates_thread_running = running self.updates_thread_running = running
if running: if running:
self.updates_thread.start() self.updates_thread.start()
@ -368,29 +391,37 @@ class MtProtoSender:
# Only try to receive updates if we're not waiting to receive a request # Only try to receive updates if we're not waiting to receive a request
if not self.waiting_receive: if not self.waiting_receive:
with self.lock: with self.lock:
Log.d('Updates thread acquired the lock')
try: try:
now = time() now = time()
# If ping_interval seconds passed since last ping, send a new one # If ping_interval seconds passed since last ping, send a new one
if now >= self.ping_time_last + self.ping_interval: if now >= self.ping_time_last + self.ping_interval:
self.ping_time_last = now self.ping_time_last = now
self.send_ping() self.send_ping()
Log.d('Ping sent from the updates thread')
# Exit the loop if we're not expecting to receive any updates # Exit the loop if we're not expecting to receive any updates
if not self.on_update_handlers: if not self.on_update_handlers:
Log.d('No updates handlers found, continuing')
continue continue
self.updates_thread_receiving = True self.updates_thread_receiving = True
Log.d('Trying to receive updates from the updates thread')
seq, body = self.transport.receive(timeout) seq, body = self.transport.receive(timeout)
message, remote_msg_id, remote_sequence = self.decode_msg( message, remote_msg_id, remote_sequence = self.decode_msg(
body) body)
Log.i('Received update from the updates thread')
with BinaryReader(message) as reader: with BinaryReader(message) as reader:
self.process_msg(remote_msg_id, remote_sequence, self.process_msg(remote_msg_id, remote_sequence,
reader) reader)
except (ReadCancelledError, TimeoutError): except TimeoutError:
pass Log.d('Receiving updates timed out')
except ReadCancelledError:
Log.i('Receiving updates cancelled')
Log.d('Updates thread released the lock')
self.updates_thread_receiving = False self.updates_thread_receiving = False
# If we are here, it is because the read was cancelled # If we are here, it is because the read was cancelled