From b31f4fe063451db73ff4c532e3f743de052d19bb Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 12 Oct 2023 21:10:00 +0200 Subject: [PATCH] Use logger --- .../src/telethon/_impl/client/client/auth.py | 2 +- .../telethon/_impl/client/client/client.py | 59 +++++++++++-------- .../src/telethon/_impl/client/client/net.py | 21 +++++-- .../telethon/_impl/client/client/updates.py | 13 ++-- client/src/telethon/_impl/client/utils.py | 1 - client/src/telethon/_impl/mtsender/sender.py | 49 ++++++++------- .../_impl/session/message_box/messagebox.py | 10 ++-- client/tests/mtsender_test.py | 5 +- 8 files changed, 97 insertions(+), 63 deletions(-) diff --git a/client/src/telethon/_impl/client/client/auth.py b/client/src/telethon/_impl/client/client/auth.py index baf4a927..45739a1d 100644 --- a/client/src/telethon/_impl/client/client/auth.py +++ b/client/src/telethon/_impl/client/client/auth.py @@ -50,7 +50,7 @@ async def complete_login(client: Client, auth: abcs.auth.Authorization) -> User: async def handle_migrate(client: Client, dc_id: Optional[int]) -> None: assert dc_id is not None sender, client._session.dcs = await connect_sender( - client._config, datacenter_for_id(client, dc_id) + client._config, datacenter_for_id(client, dc_id), client._logger ) async with client._sender_lock: client._sender = sender diff --git a/client/src/telethon/_impl/client/client/client.py b/client/src/telethon/_impl/client/client/client.py index aef49473..071efd3c 100644 --- a/client/src/telethon/_impl/client/client/client.py +++ b/client/src/telethon/_impl/client/client/client.py @@ -1,5 +1,6 @@ import asyncio import datetime +import logging from pathlib import Path from types import TracebackType from typing import ( @@ -149,6 +150,31 @@ class Client: This is required to sign in, and can be omitted otherwise. + :param catch_up: + Whether to "catch up" on updates that occured while the client was not connected. + + :param check_all_handlers: + Whether to always check all event handlers or stop early. + + The library will call event handlers in the order they were added. + By default, the library stops checking handlers as soon as a filter returns :data:`True`. + + By setting ``check_all_handlers=True``, the library will keep calling handlers after the first match. + + :param flood_sleep_threshold: + Maximum amount of time, in seconds, to automatically sleep before retrying a request. + This sleeping occurs when ``FLOOD_WAIT`` :class:`~telethon.RpcError` is raised by Telegram. + + :param logger: + Logger for the client. + Any dependency of the client will use :meth:`logging.Logger.getChild`. + This effectively makes the parameter the root logger. + + The default will get the logger for the package name from the root. + + :param update_queue_limit: + Maximum amount of updates to keep in memory before dropping them. + :param device_model: Device model. @@ -164,27 +190,9 @@ class Client: :param lang_code: ISO 639-1 language code of the application's language. - :param catch_up: - Whether to "catch up" on updates that occured while the client was not connected. - :param datacenter: Override the datacenter to connect to. Useful to connect to one of Telegram's test servers. - - :param flood_sleep_threshold: - Maximum amount of time, in seconds, to automatically sleep before retrying a request. - This sleeping occurs when ``FLOOD_WAIT`` :class:`~telethon.RpcError` is raised by Telegram. - - :param update_queue_limit: - Maximum amount of updates to keep in memory before dropping them. - - :param check_all_handlers: - Whether to always check all event handlers or stop early. - - The library will call event handlers in the order they were added. - By default, the library stops checking handlers as soon as a filter returns :data:`True`. - - By setting ``check_all_handlers=True``, the library will keep calling handlers after the first match. """ def __init__( @@ -193,17 +201,22 @@ class Client: api_id: int, api_hash: Optional[str] = None, *, + catch_up: bool = False, + check_all_handlers: bool = False, + flood_sleep_threshold: Optional[int] = None, + logger: Optional[logging.Logger] = None, + update_queue_limit: Optional[int] = None, device_model: Optional[str] = None, system_version: Optional[str] = None, app_version: Optional[str] = None, system_lang_code: Optional[str] = None, lang_code: Optional[str] = None, - catch_up: Optional[bool] = None, datacenter: Optional[DataCenter] = None, - flood_sleep_threshold: Optional[int] = None, - update_queue_limit: Optional[int] = None, - check_all_handlers: bool = False, ) -> None: + self._logger = logger or logging.getLogger( + __package__[: __package__.index(".")] + ) + self._sender: Optional[Sender] = None self._sender_lock = asyncio.Lock() if isinstance(session, Storage): @@ -231,7 +244,7 @@ class Client: self._session = Session() - self._message_box = MessageBox() + self._message_box = MessageBox(base_logger=self._logger) self._chat_hashes = ChatHashCache(None) self._last_update_limit_warn: Optional[float] = None self._updates: asyncio.Queue[ diff --git a/client/src/telethon/_impl/client/client/net.py b/client/src/telethon/_impl/client/client/net.py index a5e67137..48379d85 100644 --- a/client/src/telethon/_impl/client/client/net.py +++ b/client/src/telethon/_impl/client/client/net.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio import itertools +import logging import platform import re from dataclasses import dataclass, field @@ -67,14 +68,18 @@ DEFAULT_DC = 2 async def connect_sender( - config: Config, dc: DataCenter + config: Config, + dc: DataCenter, + base_logger: logging.Logger, ) -> Tuple[Sender, List[DataCenter]]: transport = Full() if dc.auth: - sender = await connect_with_auth(transport, dc.id, dc.addr, dc.auth) + sender = await connect_with_auth( + transport, dc.id, dc.addr, dc.auth, base_logger + ) else: - sender = await connect_without_auth(transport, dc.id, dc.addr) + sender = await connect_without_auth(transport, dc.id, dc.addr, base_logger) # TODO handle -404 (we had a previously-valid authkey, but server no longer knows about it) remote_config = await sender.invoke( @@ -146,7 +151,9 @@ async def connect(self: Client) -> None: self, self._session.user.dc if self._session.user else DEFAULT_DC ) - self._sender, self._session.dcs = await connect_sender(self._config, datacenter) + self._sender, self._session.dcs = await connect_sender( + self._config, datacenter, self._logger + ) if self._message_box.is_empty() and self._session.user: try: @@ -190,14 +197,16 @@ async def disconnect(self: Client) -> None: try: await self._dispatcher except Exception: - pass # TODO log + self._logger.exception( + "unhandled exception when cancelling dispatcher; this is a bug" + ) finally: self._dispatcher = None try: await self._sender.disconnect() except Exception: - pass # TODO log + self._logger.exception("unhandled exception during disconnect; this is a bug") finally: self._sender = None diff --git a/client/src/telethon/_impl/client/client/updates.py b/client/src/telethon/_impl/client/client/updates.py index 06ecf43c..bd3f6440 100644 --- a/client/src/telethon/_impl/client/client/updates.py +++ b/client/src/telethon/_impl/client/client/updates.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import logging from typing import ( TYPE_CHECKING, Any, @@ -117,7 +116,10 @@ def extend_update_queue( now - client._last_update_limit_warn > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN ): - # TODO warn + client._logger.warning( + "updates are being dropped because limit=%d has been reached", + client._updates.maxsize, + ) client._last_update_limit_warn = now break @@ -132,14 +134,15 @@ async def dispatcher(client: Client) -> None: except Exception as e: if isinstance(e, RuntimeError) and loop.is_closed(): # User probably forgot to call disconnect. - logging.warning( + client._logger.warning( "client was not closed cleanly, make sure to call client.disconnect()! %s", e, ) return else: - # TODO proper logger - logging.exception("Unhandled exception in event handler") + client._logger.exception( + "unhandled exception in event handler; this is probably a bug in your code, not telethon" + ) raise diff --git a/client/src/telethon/_impl/client/utils.py b/client/src/telethon/_impl/client/utils.py index b1ffda26..bb44df3d 100644 --- a/client/src/telethon/_impl/client/utils.py +++ b/client/src/telethon/_impl/client/utils.py @@ -44,7 +44,6 @@ def build_chat_map(users: List[abcs.User], chats: List[abcs.Chat]) -> Dict[int, for k, v in counter.items(): if len(v) > 1: - # TODO proper logger for x in v: print(x, file=sys.stderr) diff --git a/client/src/telethon/_impl/mtsender/sender.py b/client/src/telethon/_impl/mtsender/sender.py index ba8cc76f..72f9a4fc 100644 --- a/client/src/telethon/_impl/mtsender/sender.py +++ b/client/src/telethon/_impl/mtsender/sender.py @@ -1,4 +1,5 @@ import asyncio +import logging import struct import time from abc import ABC @@ -78,6 +79,7 @@ class Request(Generic[Return]): class Sender: dc_id: int addr: str + _logger: logging.Logger _reader: StreamReader _writer: StreamWriter _transport: Transport @@ -91,13 +93,19 @@ class Sender: @classmethod async def connect( - cls, transport: Transport, mtp: Mtp, dc_id: int, addr: str + cls, + transport: Transport, + mtp: Mtp, + dc_id: int, + addr: str, + base_logger: logging.Logger, ) -> Self: reader, writer = await asyncio.open_connection(*addr.split(":")) return cls( dc_id=dc_id, addr=addr, + _logger=base_logger.getChild("mtsender"), _reader=reader, _writer=writer, _transport=transport, @@ -230,7 +238,7 @@ class Sender: ) ) ) - self._next_ping = time.time() + PING_DELAY + self._next_ping = asyncio.get_running_loop().time() + PING_DELAY def _process_mtp_buffer(self, updates: List[Updates]) -> None: result = self._mtp.deserialize(self._mtp_buffer) @@ -239,7 +247,10 @@ class Sender: try: u = Updates.from_bytes(update) except ValueError: - pass # TODO log + self._logger.warning( + "failed to deserialize incoming update; make sure the session is not in use elsewhere: %s", + update.hex(), + ) else: updates.append(u) @@ -267,7 +278,11 @@ class Sender: req.result.set_result(ret) break if not found: - pass # TODO log + self._logger.warning( + "telegram sent rpc_result for unknown msg_id=%d: %s", + msg_id, + ret.hex() if isinstance(ret, bytes) else repr(ret), + ) @property def auth_key(self) -> Optional[bytes]: @@ -277,8 +292,10 @@ class Sender: return None -async def connect(transport: Transport, dc_id: int, addr: str) -> Sender: - sender = await Sender.connect(transport, Plain(), dc_id, addr) +async def connect( + transport: Transport, dc_id: int, addr: str, base_logger: logging.Logger +) -> Sender: + sender = await Sender.connect(transport, Plain(), dc_id, addr, base_logger) return await generate_auth_key(sender) @@ -294,20 +311,9 @@ async def generate_auth_key(sender: Sender) -> Sender: time_offset = finished.time_offset first_salt = finished.first_salt - return Sender( - dc_id=sender.dc_id, - addr=sender.addr, - _reader=sender._reader, - _writer=sender._writer, - _transport=sender._transport, - _mtp=Encrypted(auth_key, time_offset=time_offset, first_salt=first_salt), - _mtp_buffer=sender._mtp_buffer, - _requests=sender._requests, - _request_event=sender._request_event, - _next_ping=time.time() + PING_DELAY, - _read_buffer=sender._read_buffer, - _write_drain_pending=sender._write_drain_pending, - ) + sender._mtp = Encrypted(auth_key, time_offset=time_offset, first_salt=first_salt) + sender._next_ping = asyncio.get_running_loop().time() + PING_DELAY + return sender async def connect_with_auth( @@ -315,7 +321,8 @@ async def connect_with_auth( dc_id: int, addr: str, auth_key: bytes, + base_logger: logging.Logger, ) -> Sender: return await Sender.connect( - transport, Encrypted(AuthKey.from_bytes(auth_key)), dc_id, addr + transport, Encrypted(AuthKey.from_bytes(auth_key)), dc_id, addr, base_logger ) diff --git a/client/src/telethon/_impl/session/message_box/messagebox.py b/client/src/telethon/_impl/session/message_box/messagebox.py index 4bc9609b..d92f5b06 100644 --- a/client/src/telethon/_impl/session/message_box/messagebox.py +++ b/client/src/telethon/_impl/session/message_box/messagebox.py @@ -37,7 +37,7 @@ def epoch() -> datetime.datetime: # https://core.telegram.org/api/updates#message-related-event-sequences. class MessageBox: __slots__ = ( - "_log", + "_logger", "map", "date", "seq", @@ -49,9 +49,9 @@ class MessageBox: def __init__( self, *, - log: Optional[logging.Logger] = None, + base_logger: logging.Logger, ) -> None: - self._log = log or logging.getLogger("telethon.messagebox") + self._logger = base_logger.getChild("messagebox") self.map: Dict[Entry, State] = {} self.date = epoch() self.seq = NO_SEQ @@ -67,14 +67,14 @@ class MessageBox: # So every call to trace is prefixed with `if __debug__`` instead, to remove # it when using `python -O`. Probably unnecessary, but it's nice to avoid # paying the cost for something that is not used. - self._log.log( + self._logger.log( LOG_LEVEL_TRACE, "Current MessageBox state: seq = %r, date = %s, map = %r", self.seq, self.date.isoformat(), self.map, ) - self._log.log(LOG_LEVEL_TRACE, msg, *args) + self._logger.log(LOG_LEVEL_TRACE, msg, *args) def load(self, state: UpdateState) -> None: if __debug__: diff --git a/client/tests/mtsender_test.py b/client/tests/mtsender_test.py index bc86ef87..cef8466e 100644 --- a/client/tests/mtsender_test.py +++ b/client/tests/mtsender_test.py @@ -20,7 +20,10 @@ async def test_invoke_encrypted_method(caplog: LogCaptureFixture) -> None: def timeout() -> float: return deadline - asyncio.get_running_loop().time() - sender = await asyncio.wait_for(connect(Full(), *TELEGRAM_TEST_DC), timeout()) + sender = await asyncio.wait_for( + connect(Full(), *TELEGRAM_TEST_DC, logging.getLogger(__file__)), + timeout(), + ) rx = sender.enqueue( functions.invoke_with_layer(