From 5842d3741bcda3d54d4fc69b39adf0f1233c768e Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Wed, 20 Dec 2017 12:47:10 +0100 Subject: [PATCH] Make a proper use of the logging module --- telethon/network/mtproto_sender.py | 34 ++++------- telethon/telegram_bare_client.py | 96 +++++++++++++++++++++--------- telethon/update_state.py | 11 ++-- 3 files changed, 87 insertions(+), 54 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 41c791d9..d76d44ae 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -21,7 +21,7 @@ from ..tl.types import ( ) from ..tl.functions.auth import LogOutRequest -logging.getLogger(__name__).addHandler(logging.NullHandler()) +__log__ = logging.getLogger(__name__) class MtProtoSender: @@ -46,7 +46,6 @@ class MtProtoSender: """ self.session = session self.connection = connection - self._logger = logging.getLogger(__name__) # Message IDs that need confirmation self._need_confirmation = set() @@ -137,6 +136,9 @@ class MtProtoSender: # "This packet should be skipped"; since this may have # been a result for a request, invalidate every request # and just re-invoke them to avoid problems + __log__.exception('Error while receiving server response. ' + '%d pending request(s) will be ignored', + len(self._pending_receive)) self._clear_all_pending() return @@ -218,7 +220,7 @@ class MtProtoSender: code = reader.read_int(signed=False) reader.seek(-4) - # The following codes are "parsed manually" + __log__.debug('Processing server message with ID %s', hex(code)) if code == 0xf35c6d01: # rpc_result, (response of an RPC call) return self._handle_rpc_result(msg_id, sequence, reader) @@ -257,7 +259,6 @@ class MtProtoSender: if r: r.result = True # Telegram won't send this value r.confirm_received.set() - self._logger.debug('Message ack confirmed', r) return True @@ -270,11 +271,9 @@ class MtProtoSender: return True - self._logger.debug( - '[WARN] Unknown message: {}, data left in the buffer: {}' - .format( - hex(code), repr(reader.get_bytes()[reader.tell_position():]) - ) + __log__.warning( + 'Unknown message with ID %d, data left in the buffer %s', + hex(code), repr(reader.get_bytes()[reader.tell_position():]) ) return False @@ -351,13 +350,11 @@ class MtProtoSender: :param reader: the reader containing the Pong. :return: true, as it always succeeds. """ - self._logger.debug('Handling pong') pong = reader.tgread_object() assert isinstance(pong, Pong) request = self._pop_request(pong.msg_id) if request: - self._logger.debug('Pong confirmed a request') request.result = pong request.confirm_received.set() @@ -372,7 +369,6 @@ class MtProtoSender: :param reader: the reader containing the MessageContainer. :return: true, as it always succeeds. """ - self._logger.debug('Handling container') for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader): begin_position = reader.tell_position() @@ -397,7 +393,6 @@ class MtProtoSender: :param reader: the reader containing the BadServerSalt. :return: true, as it always succeeds. """ - self._logger.debug('Handling bad server salt') bad_salt = reader.tgread_object() assert isinstance(bad_salt, BadServerSalt) @@ -418,28 +413,29 @@ class MtProtoSender: :param reader: the reader containing the BadMessageError. :return: true, as it always succeeds. """ - self._logger.debug('Handling bad message notification') bad_msg = reader.tgread_object() assert isinstance(bad_msg, BadMsgNotification) error = BadMessageError(bad_msg.error_code) + __log__.warning('Read bad msg notification %s: %s', bad_msg, error) if bad_msg.error_code in (16, 17): # sent msg_id too low or too high (respectively). # Use the current msg_id to determine the right time offset. self.session.update_time_offset(correct_msg_id=msg_id) - self._logger.debug('Read Bad Message error: ' + str(error)) - self._logger.debug('Attempting to use the correct time offset.') + __log__.info('Attempting to use the correct time offset') self._resend_request(bad_msg.bad_msg_id) return True elif bad_msg.error_code == 32: # msg_seqno too low, so just pump it up by some "large" amount # TODO A better fix would be to start with a new fresh session ID self.session._sequence += 64 + __log__.info('Attempting to set the right higher sequence') self._resend_request(bad_msg.bad_msg_id) return True elif bad_msg.error_code == 33: # msg_seqno too high never seems to happen but just in case self.session._sequence -= 16 + __log__.info('Attempting to set the right lower sequence') self._resend_request(bad_msg.bad_msg_id) return True else: @@ -504,7 +500,6 @@ class MtProtoSender: :return: true if the request ID to which this result belongs is found, false otherwise (meaning nothing was read). """ - self._logger.debug('Handling RPC result') reader.read_int(signed=False) # code request_id = reader.read_long() inner_code = reader.read_int(signed=False) @@ -530,11 +525,9 @@ class MtProtoSender: request.confirm_received.set() # else TODO Where should this error be reported? # Read may be async. Can an error not-belong to a request? - self._logger.debug('Read RPC error: %s', str(error)) return True # All contents were read okay elif request: - self._logger.debug('Reading request response') if inner_code == 0x3072cfa1: # GZip packed unpacked_data = gzip.decompress(reader.tgread_bytes()) with BinaryReader(unpacked_data) as compressed_reader: @@ -549,7 +542,7 @@ class MtProtoSender: # If it's really a result for RPC from previous connection # session, it will be skipped by the handle_container() - self._logger.debug('Lost request will be skipped.') + __log__.warning('Lost request will be skipped') return False def _handle_gzip_packed(self, msg_id, sequence, reader, state): @@ -561,7 +554,6 @@ class MtProtoSender: :param reader: the reader containing the GzipPacked. :return: the result of processing the packed message. """ - self._logger.debug('Handling gzip packed data') with BinaryReader(GzipPacked.read(reader)) as compressed_reader: # We are reentering process_msg, which seemingly the same msg_id # to the self._need_confirmation set. Remove it from there first diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 97251547..6c7d3ab0 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -43,6 +43,8 @@ DEFAULT_IPV4_IP = '149.154.167.51' DEFAULT_IPV6_IP = '[2001:67c:4e8:f002::a]' DEFAULT_PORT = 443 +__log__ = logging.getLogger(__name__) + class TelegramBareClient: """Bare Telegram Client with just the minimum - @@ -117,8 +119,6 @@ class TelegramBareClient: mode=connection_mode, proxy=proxy, timeout=timeout )) - self._logger = logging.getLogger(__name__) - # Two threads may be calling reconnect() when the connection is lost, # we only want one to actually perform the reconnection. self._reconnect_lock = Lock() @@ -191,11 +191,15 @@ class TelegramBareClient: native data center, raising a "UserMigrateError", and calling .disconnect() in the process. """ + __log__.info('Connecting to %s:%d...', + self.session.server_address, self.session.port) + self._main_thread_ident = threading.get_ident() self._background_error = None # Clear previous errors try: self._sender.connect() + __log__.info('Connection success!') # Connection was successful! Try syncing the update state # UNLESS '_sync_updates' is False (we probably are in @@ -215,14 +219,15 @@ class TelegramBareClient: except TypeNotFoundError as e: # This is fine, probably layer migration - self._logger.debug('Found invalid item, probably migrating', e) + __log__.warning('Connection failed, got unexpected type with ID ' + '%s. Migrating?', hex(e.invalid_constructor_id)) self.disconnect() return self.connect(_sync_updates=_sync_updates) - except (RPCError, ConnectionError): + except (RPCError, ConnectionError) as e: # Probably errors from the previous session, ignore them + __log__.error('Connection failed due to %s', e) self.disconnect() - self._logger.exception('Could not stabilise initial connection.') return False def is_connected(self): @@ -244,14 +249,19 @@ class TelegramBareClient: def disconnect(self): """Disconnects from the Telegram server and stops all the spawned threads""" + __log__.info('Disconnecting...') self._user_connected = False # This will stop recv_thread's loop + + __log__.debug('Stopping all workers...') self.updates.stop_workers() # This will trigger a "ConnectionResetError" on the recv_thread, # which won't attempt reconnecting as ._user_connected is False. + __log__.debug('Disconnecting the socket...') self._sender.disconnect() if self._recv_thread: + __log__.debug('Joining the read thread...') self._recv_thread.join() # TODO Shall we clear the _exported_sessions, or may be reused? @@ -268,17 +278,21 @@ class TelegramBareClient: """ if new_dc is None: if self.is_connected(): + __log__.info('Reconnection aborted: already connected') return True try: + __log__.info('Attempting reconnection...') return self.connect() - except ConnectionResetError: + except ConnectionResetError as e: + __log__.warning('Reconnection failed due to %s', e) return False else: # Since we're reconnecting possibly due to a UserMigrateError, # we need to first know the Data Centers we can connect to. Do # that before disconnecting. dc = self._get_dc(new_dc) + __log__.info('Reconnecting to new data center %s', dc) self.session.server_address = dc.ip_address self.session.port = dc.port @@ -340,6 +354,7 @@ class TelegramBareClient: dc = self._get_dc(dc_id) # Export the current authorization to the new DC. + __log__.info('Exporting authorization for data center %s', dc) export_auth = self(ExportAuthorizationRequest(dc_id)) # Create a temporary session for this IP address, which needs @@ -352,6 +367,7 @@ class TelegramBareClient: session.port = dc.port self._exported_sessions[dc_id] = session + __log__.info('Creating exported new client') client = TelegramBareClient( session, self.api_id, self.api_hash, proxy=self._sender.connection.conn.proxy, @@ -363,7 +379,7 @@ class TelegramBareClient: id=export_auth.id, bytes=export_auth.bytes )) elif export_auth is not None: - self._logger.warning('Unknown return export_auth type', export_auth) + __log__.warning('Unknown export auth type %s', export_auth) client._authorized = True # We exported the auth, so we got auth return client @@ -378,6 +394,7 @@ class TelegramBareClient: session.port = dc.port self._exported_sessions[cdn_redirect.dc_id] = session + __log__.info('Creating new CDN client') client = TelegramBareClient( session, self.api_id, self.api_hash, proxy=self._sender.connection.conn.proxy, @@ -407,12 +424,23 @@ class TelegramBareClient: x.content_related for x in requests): raise ValueError('You can only invoke requests, not types!') + # For logging purposes + if len(requests) == 1: + which = type(requests[0]).__name__ + else: + which = '{} requests ({})'.format( + len(requests), [type(x).__name__ for x in requests]) + # Determine the sender to be used (main or a new connection) on_main_thread = threading.get_ident() == self._main_thread_ident if on_main_thread or self._on_read_thread(): + __log__.debug('Invoking %s from main thread', which) sender = self._sender update_state = self.updates else: + __log__.debug('Invoking %s from background thread. ' + 'Creating temporary connection', which) + sender = self._sender.clone() sender.connect() # We're on another connection, Telegram will resend all the @@ -431,7 +459,7 @@ class TelegramBareClient: call_receive = not on_main_thread or self._recv_thread is None \ or self._reconnect_lock.locked() try: - for _ in range(retries): + for attempt in range(retries): if self._background_error and on_main_thread: raise self._background_error @@ -441,7 +469,9 @@ class TelegramBareClient: if result is not None: return result - self._logger.debug('RPC failed. Attempting reconnection.') + __log__.warning('Invoking %s failed %d times, ' + 'reconnecting and retrying', + [str(x) for x in requests], attempt + 1) sleep(1) # The ReadThread has priority when attempting reconnection, # since this thread is constantly running while __call__ is @@ -475,11 +505,13 @@ class TelegramBareClient: if not self.session.auth_key: # New key, we need to tell the server we're going to use # the latest layer and initialize the connection doing so. + __log__.info('Need to generate new auth key before invoking') self.session.auth_key, self.session.time_offset = \ authenticator.do_authentication(self._sender.connection) init_connection = True if init_connection: + __log__.info('Initializing a new connection while invoking') if len(requests) == 1: requests = [self._wrap_init_connection(requests[0])] else: @@ -506,13 +538,14 @@ class TelegramBareClient: sender.receive(update_state=update_state) except BrokenAuthKeyError: - self._logger.error('Broken auth key, a new one will be generated') + __log__.error('Authorization key seems broken and was invalid!') self.session.auth_key = None except TimeoutError: - pass # We will just retry + __log__.warning('Invoking timed out') # We will just retry except ConnectionResetError: + __log__.warning('Connection was reset while invoking') if self._user_connected: # Server disconnected us, __call__ will try reconnecting. return None @@ -541,10 +574,6 @@ class TelegramBareClient: except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: - self._logger.debug( - 'DC error when invoking request, ' - 'attempting to reconnect at DC {}'.format(e.new_dc) - ) # TODO What happens with the background thread here? # For normal use cases, this won't happen, because this will only @@ -555,17 +584,13 @@ class TelegramBareClient: except ServerError as e: # Telegram is having some issues, just retry - self._logger.debug( - '[ERROR] Telegram is having some internal issues', e - ) + __log__.error('Telegram servers are having internal errors %s', e) except FloodWaitError as e: + __log__.warning('Request invoked too often, wait %ds', e.seconds) if e.seconds > self.session.flood_sleep_threshold | 0: raise - self._logger.debug( - 'Sleep of %d seconds below threshold, sleeping' % e.seconds - ) sleep(e.seconds) # Some really basic functionality @@ -628,6 +653,8 @@ class TelegramBareClient: file_id = utils.generate_random_long() hash_md5 = md5() + __log__.info('Uploading file of %d bytes in %d chunks of %d', + file_size, part_count, part_size) stream = open(file, 'rb') if isinstance(file, str) else BytesIO(file) try: for part_index in range(part_count): @@ -644,6 +671,7 @@ class TelegramBareClient: result = self(request) if result: + __log__.debug('Uploaded %d/%d', part_index, part_count) if not is_large: # No need to update the hash if it's a large file hash_md5.update(part) @@ -712,6 +740,7 @@ class TelegramBareClient: client = self cdn_decrypter = None + __log__.info('Downloading file in chunks of %d bytes', part_size) try: offset = 0 while True: @@ -724,12 +753,14 @@ class TelegramBareClient: )) if isinstance(result, FileCdnRedirect): + __log__.info('File lives in a CDN') cdn_decrypter, result = \ CdnDecrypter.prepare_decrypter( client, self._get_cdn_client(result), result ) except FileMigrateError as e: + __log__.info('File lives in another DC') client = self._get_exported_client(e.new_dc) continue @@ -742,6 +773,7 @@ class TelegramBareClient: return getattr(result, 'type', '') f.write(result.bytes) + __log__.debug('Saved %d more bytes', len(result.bytes)) if progress_callback: progress_callback(f.tell(), file_size) finally: @@ -803,7 +835,6 @@ class TelegramBareClient: if self._user_connected: self.disconnect() else: - self._logger.debug('Forcing exit...') os._exit(1) def idle(self, stop_signals=(SIGINT, SIGTERM, SIGABRT)): @@ -824,6 +855,11 @@ class TelegramBareClient: for sig in stop_signals: signal(sig, self._signal_handler) + if self._on_read_thread(): + __log__.info('Starting to wait for items from the network') + else: + __log__.info('Idling to receive items from the network') + while self._user_connected: try: if datetime.now() > self._last_ping + self._ping_delay: @@ -832,16 +868,21 @@ class TelegramBareClient: )) self._last_ping = datetime.now() + __log__.debug('Receiving items from the network...') self._sender.receive(update_state=self.updates) except TimeoutError: - # No problem. - pass + # No problem + __log__.info('Receiving items from the network timed out') except ConnectionResetError: - self._logger.debug('Server disconnected us. Reconnecting...') + if self._user_connected: + __log__.error('Connection was reset while receiving ' + 'items. Reconnecting') with self._reconnect_lock: while self._user_connected and not self._reconnect(): sleep(0.1) # Retry forever, this is instant messaging + __log__.info('Connection closed by the user, not reading anymore') + # By using this approach, another thread will be # created and started upon connection to constantly read # from the other end. Otherwise, manual calls to .receive() @@ -857,10 +898,9 @@ class TelegramBareClient: try: self.idle(stop_signals=tuple()) except Exception as error: + __log__.exception('Unknown exception in the read thread! ' + 'Disconnecting and leaving it to main thread') # Unknown exception, pass it to the main thread - self._logger.exception( - 'Unknown error on the read thread, please report' - ) try: import socks diff --git a/telethon/update_state.py b/telethon/update_state.py index c3768fbd..9f308d89 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -7,6 +7,8 @@ from threading import RLock, Thread from .tl import types as tl +__log__ = logging.getLogger(__name__) + class UpdateState: """Used to hold the current state of processed updates. @@ -30,8 +32,6 @@ class UpdateState: self._updates = Queue() self._latest_updates = deque(maxlen=10) - self._logger = logging.getLogger(__name__) - # https://core.telegram.org/api/updates self._state = tl.updates.State(0, 0, datetime.now(), 0, 0) @@ -115,9 +115,7 @@ class UpdateState: break except: # We don't want to crash a worker thread due to any reason - self._logger.exception( - '[ERROR] Unhandled exception on worker {}'.format(wid) - ) + __log__.exception('Unhandled exception on worker %d', wid) def process(self, update): """Processes an update object. This method is normally called by @@ -128,11 +126,13 @@ class UpdateState: with self._updates_lock: if isinstance(update, tl.updates.State): + __log__.debug('Saved new updates state') self._state = update return # Nothing else to be done pts = getattr(update, 'pts', self._state.pts) if hasattr(update, 'pts') and pts <= self._state.pts: + __log__.info('Ignoring %s, already have it', update) return # We already handled this update self._state.pts = pts @@ -153,6 +153,7 @@ class UpdateState: """ data = pickle.dumps(update.to_dict()) if data in self._latest_updates: + __log__.info('Ignoring %s, already have it', update) return # Duplicated too self._latest_updates.append(data)