Use logger

This commit is contained in:
Lonami Exo 2023-10-12 21:10:00 +02:00
parent 325f1ca27d
commit b31f4fe063
8 changed files with 97 additions and 63 deletions

View File

@ -50,7 +50,7 @@ async def complete_login(client: Client, auth: abcs.auth.Authorization) -> User:
async def handle_migrate(client: Client, dc_id: Optional[int]) -> None: async def handle_migrate(client: Client, dc_id: Optional[int]) -> None:
assert dc_id is not None assert dc_id is not None
sender, client._session.dcs = await connect_sender( sender, client._session.dcs = await connect_sender(
client._config, datacenter_for_id(client, dc_id) client._config, datacenter_for_id(client, dc_id), client._logger
) )
async with client._sender_lock: async with client._sender_lock:
client._sender = sender client._sender = sender

View File

@ -1,5 +1,6 @@
import asyncio import asyncio
import datetime import datetime
import logging
from pathlib import Path from pathlib import Path
from types import TracebackType from types import TracebackType
from typing import ( from typing import (
@ -149,6 +150,31 @@ class Client:
This is required to sign in, and can be omitted otherwise. This is required to sign in, and can be omitted otherwise.
:param catch_up:
Whether to "catch up" on updates that occured while the client was not connected.
:param check_all_handlers:
Whether to always check all event handlers or stop early.
The library will call event handlers in the order they were added.
By default, the library stops checking handlers as soon as a filter returns :data:`True`.
By setting ``check_all_handlers=True``, the library will keep calling handlers after the first match.
:param flood_sleep_threshold:
Maximum amount of time, in seconds, to automatically sleep before retrying a request.
This sleeping occurs when ``FLOOD_WAIT`` :class:`~telethon.RpcError` is raised by Telegram.
:param logger:
Logger for the client.
Any dependency of the client will use :meth:`logging.Logger.getChild`.
This effectively makes the parameter the root logger.
The default will get the logger for the package name from the root.
:param update_queue_limit:
Maximum amount of updates to keep in memory before dropping them.
:param device_model: :param device_model:
Device model. Device model.
@ -164,27 +190,9 @@ class Client:
:param lang_code: :param lang_code:
ISO 639-1 language code of the application's language. ISO 639-1 language code of the application's language.
:param catch_up:
Whether to "catch up" on updates that occured while the client was not connected.
:param datacenter: :param datacenter:
Override the datacenter to connect to. Override the datacenter to connect to.
Useful to connect to one of Telegram's test servers. Useful to connect to one of Telegram's test servers.
:param flood_sleep_threshold:
Maximum amount of time, in seconds, to automatically sleep before retrying a request.
This sleeping occurs when ``FLOOD_WAIT`` :class:`~telethon.RpcError` is raised by Telegram.
:param update_queue_limit:
Maximum amount of updates to keep in memory before dropping them.
:param check_all_handlers:
Whether to always check all event handlers or stop early.
The library will call event handlers in the order they were added.
By default, the library stops checking handlers as soon as a filter returns :data:`True`.
By setting ``check_all_handlers=True``, the library will keep calling handlers after the first match.
""" """
def __init__( def __init__(
@ -193,17 +201,22 @@ class Client:
api_id: int, api_id: int,
api_hash: Optional[str] = None, api_hash: Optional[str] = None,
*, *,
catch_up: bool = False,
check_all_handlers: bool = False,
flood_sleep_threshold: Optional[int] = None,
logger: Optional[logging.Logger] = None,
update_queue_limit: Optional[int] = None,
device_model: Optional[str] = None, device_model: Optional[str] = None,
system_version: Optional[str] = None, system_version: Optional[str] = None,
app_version: Optional[str] = None, app_version: Optional[str] = None,
system_lang_code: Optional[str] = None, system_lang_code: Optional[str] = None,
lang_code: Optional[str] = None, lang_code: Optional[str] = None,
catch_up: Optional[bool] = None,
datacenter: Optional[DataCenter] = None, datacenter: Optional[DataCenter] = None,
flood_sleep_threshold: Optional[int] = None,
update_queue_limit: Optional[int] = None,
check_all_handlers: bool = False,
) -> None: ) -> None:
self._logger = logger or logging.getLogger(
__package__[: __package__.index(".")]
)
self._sender: Optional[Sender] = None self._sender: Optional[Sender] = None
self._sender_lock = asyncio.Lock() self._sender_lock = asyncio.Lock()
if isinstance(session, Storage): if isinstance(session, Storage):
@ -231,7 +244,7 @@ class Client:
self._session = Session() self._session = Session()
self._message_box = MessageBox() self._message_box = MessageBox(base_logger=self._logger)
self._chat_hashes = ChatHashCache(None) self._chat_hashes = ChatHashCache(None)
self._last_update_limit_warn: Optional[float] = None self._last_update_limit_warn: Optional[float] = None
self._updates: asyncio.Queue[ self._updates: asyncio.Queue[

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio import asyncio
import itertools import itertools
import logging
import platform import platform
import re import re
from dataclasses import dataclass, field from dataclasses import dataclass, field
@ -67,14 +68,18 @@ DEFAULT_DC = 2
async def connect_sender( async def connect_sender(
config: Config, dc: DataCenter config: Config,
dc: DataCenter,
base_logger: logging.Logger,
) -> Tuple[Sender, List[DataCenter]]: ) -> Tuple[Sender, List[DataCenter]]:
transport = Full() transport = Full()
if dc.auth: if dc.auth:
sender = await connect_with_auth(transport, dc.id, dc.addr, dc.auth) sender = await connect_with_auth(
transport, dc.id, dc.addr, dc.auth, base_logger
)
else: else:
sender = await connect_without_auth(transport, dc.id, dc.addr) sender = await connect_without_auth(transport, dc.id, dc.addr, base_logger)
# TODO handle -404 (we had a previously-valid authkey, but server no longer knows about it) # TODO handle -404 (we had a previously-valid authkey, but server no longer knows about it)
remote_config = await sender.invoke( remote_config = await sender.invoke(
@ -146,7 +151,9 @@ async def connect(self: Client) -> None:
self, self._session.user.dc if self._session.user else DEFAULT_DC self, self._session.user.dc if self._session.user else DEFAULT_DC
) )
self._sender, self._session.dcs = await connect_sender(self._config, datacenter) self._sender, self._session.dcs = await connect_sender(
self._config, datacenter, self._logger
)
if self._message_box.is_empty() and self._session.user: if self._message_box.is_empty() and self._session.user:
try: try:
@ -190,14 +197,16 @@ async def disconnect(self: Client) -> None:
try: try:
await self._dispatcher await self._dispatcher
except Exception: except Exception:
pass # TODO log self._logger.exception(
"unhandled exception when cancelling dispatcher; this is a bug"
)
finally: finally:
self._dispatcher = None self._dispatcher = None
try: try:
await self._sender.disconnect() await self._sender.disconnect()
except Exception: except Exception:
pass # TODO log self._logger.exception("unhandled exception during disconnect; this is a bug")
finally: finally:
self._sender = None self._sender = None

View File

@ -1,7 +1,6 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
@ -117,7 +116,10 @@ def extend_update_queue(
now - client._last_update_limit_warn now - client._last_update_limit_warn
> UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN
): ):
# TODO warn client._logger.warning(
"updates are being dropped because limit=%d has been reached",
client._updates.maxsize,
)
client._last_update_limit_warn = now client._last_update_limit_warn = now
break break
@ -132,14 +134,15 @@ async def dispatcher(client: Client) -> None:
except Exception as e: except Exception as e:
if isinstance(e, RuntimeError) and loop.is_closed(): if isinstance(e, RuntimeError) and loop.is_closed():
# User probably forgot to call disconnect. # User probably forgot to call disconnect.
logging.warning( client._logger.warning(
"client was not closed cleanly, make sure to call client.disconnect()! %s", "client was not closed cleanly, make sure to call client.disconnect()! %s",
e, e,
) )
return return
else: else:
# TODO proper logger client._logger.exception(
logging.exception("Unhandled exception in event handler") "unhandled exception in event handler; this is probably a bug in your code, not telethon"
)
raise raise

View File

@ -44,7 +44,6 @@ def build_chat_map(users: List[abcs.User], chats: List[abcs.Chat]) -> Dict[int,
for k, v in counter.items(): for k, v in counter.items():
if len(v) > 1: if len(v) > 1:
# TODO proper logger
for x in v: for x in v:
print(x, file=sys.stderr) print(x, file=sys.stderr)

View File

@ -1,4 +1,5 @@
import asyncio import asyncio
import logging
import struct import struct
import time import time
from abc import ABC from abc import ABC
@ -78,6 +79,7 @@ class Request(Generic[Return]):
class Sender: class Sender:
dc_id: int dc_id: int
addr: str addr: str
_logger: logging.Logger
_reader: StreamReader _reader: StreamReader
_writer: StreamWriter _writer: StreamWriter
_transport: Transport _transport: Transport
@ -91,13 +93,19 @@ class Sender:
@classmethod @classmethod
async def connect( async def connect(
cls, transport: Transport, mtp: Mtp, dc_id: int, addr: str cls,
transport: Transport,
mtp: Mtp,
dc_id: int,
addr: str,
base_logger: logging.Logger,
) -> Self: ) -> Self:
reader, writer = await asyncio.open_connection(*addr.split(":")) reader, writer = await asyncio.open_connection(*addr.split(":"))
return cls( return cls(
dc_id=dc_id, dc_id=dc_id,
addr=addr, addr=addr,
_logger=base_logger.getChild("mtsender"),
_reader=reader, _reader=reader,
_writer=writer, _writer=writer,
_transport=transport, _transport=transport,
@ -230,7 +238,7 @@ class Sender:
) )
) )
) )
self._next_ping = time.time() + PING_DELAY self._next_ping = asyncio.get_running_loop().time() + PING_DELAY
def _process_mtp_buffer(self, updates: List[Updates]) -> None: def _process_mtp_buffer(self, updates: List[Updates]) -> None:
result = self._mtp.deserialize(self._mtp_buffer) result = self._mtp.deserialize(self._mtp_buffer)
@ -239,7 +247,10 @@ class Sender:
try: try:
u = Updates.from_bytes(update) u = Updates.from_bytes(update)
except ValueError: except ValueError:
pass # TODO log self._logger.warning(
"failed to deserialize incoming update; make sure the session is not in use elsewhere: %s",
update.hex(),
)
else: else:
updates.append(u) updates.append(u)
@ -267,7 +278,11 @@ class Sender:
req.result.set_result(ret) req.result.set_result(ret)
break break
if not found: if not found:
pass # TODO log self._logger.warning(
"telegram sent rpc_result for unknown msg_id=%d: %s",
msg_id,
ret.hex() if isinstance(ret, bytes) else repr(ret),
)
@property @property
def auth_key(self) -> Optional[bytes]: def auth_key(self) -> Optional[bytes]:
@ -277,8 +292,10 @@ class Sender:
return None return None
async def connect(transport: Transport, dc_id: int, addr: str) -> Sender: async def connect(
sender = await Sender.connect(transport, Plain(), dc_id, addr) transport: Transport, dc_id: int, addr: str, base_logger: logging.Logger
) -> Sender:
sender = await Sender.connect(transport, Plain(), dc_id, addr, base_logger)
return await generate_auth_key(sender) return await generate_auth_key(sender)
@ -294,20 +311,9 @@ async def generate_auth_key(sender: Sender) -> Sender:
time_offset = finished.time_offset time_offset = finished.time_offset
first_salt = finished.first_salt first_salt = finished.first_salt
return Sender( sender._mtp = Encrypted(auth_key, time_offset=time_offset, first_salt=first_salt)
dc_id=sender.dc_id, sender._next_ping = asyncio.get_running_loop().time() + PING_DELAY
addr=sender.addr, return sender
_reader=sender._reader,
_writer=sender._writer,
_transport=sender._transport,
_mtp=Encrypted(auth_key, time_offset=time_offset, first_salt=first_salt),
_mtp_buffer=sender._mtp_buffer,
_requests=sender._requests,
_request_event=sender._request_event,
_next_ping=time.time() + PING_DELAY,
_read_buffer=sender._read_buffer,
_write_drain_pending=sender._write_drain_pending,
)
async def connect_with_auth( async def connect_with_auth(
@ -315,7 +321,8 @@ async def connect_with_auth(
dc_id: int, dc_id: int,
addr: str, addr: str,
auth_key: bytes, auth_key: bytes,
base_logger: logging.Logger,
) -> Sender: ) -> Sender:
return await Sender.connect( return await Sender.connect(
transport, Encrypted(AuthKey.from_bytes(auth_key)), dc_id, addr transport, Encrypted(AuthKey.from_bytes(auth_key)), dc_id, addr, base_logger
) )

View File

@ -37,7 +37,7 @@ def epoch() -> datetime.datetime:
# https://core.telegram.org/api/updates#message-related-event-sequences. # https://core.telegram.org/api/updates#message-related-event-sequences.
class MessageBox: class MessageBox:
__slots__ = ( __slots__ = (
"_log", "_logger",
"map", "map",
"date", "date",
"seq", "seq",
@ -49,9 +49,9 @@ class MessageBox:
def __init__( def __init__(
self, self,
*, *,
log: Optional[logging.Logger] = None, base_logger: logging.Logger,
) -> None: ) -> None:
self._log = log or logging.getLogger("telethon.messagebox") self._logger = base_logger.getChild("messagebox")
self.map: Dict[Entry, State] = {} self.map: Dict[Entry, State] = {}
self.date = epoch() self.date = epoch()
self.seq = NO_SEQ self.seq = NO_SEQ
@ -67,14 +67,14 @@ class MessageBox:
# So every call to trace is prefixed with `if __debug__`` instead, to remove # So every call to trace is prefixed with `if __debug__`` instead, to remove
# it when using `python -O`. Probably unnecessary, but it's nice to avoid # it when using `python -O`. Probably unnecessary, but it's nice to avoid
# paying the cost for something that is not used. # paying the cost for something that is not used.
self._log.log( self._logger.log(
LOG_LEVEL_TRACE, LOG_LEVEL_TRACE,
"Current MessageBox state: seq = %r, date = %s, map = %r", "Current MessageBox state: seq = %r, date = %s, map = %r",
self.seq, self.seq,
self.date.isoformat(), self.date.isoformat(),
self.map, self.map,
) )
self._log.log(LOG_LEVEL_TRACE, msg, *args) self._logger.log(LOG_LEVEL_TRACE, msg, *args)
def load(self, state: UpdateState) -> None: def load(self, state: UpdateState) -> None:
if __debug__: if __debug__:

View File

@ -20,7 +20,10 @@ async def test_invoke_encrypted_method(caplog: LogCaptureFixture) -> None:
def timeout() -> float: def timeout() -> float:
return deadline - asyncio.get_running_loop().time() return deadline - asyncio.get_running_loop().time()
sender = await asyncio.wait_for(connect(Full(), *TELEGRAM_TEST_DC), timeout()) sender = await asyncio.wait_for(
connect(Full(), *TELEGRAM_TEST_DC, logging.getLogger(__file__)),
timeout(),
)
rx = sender.enqueue( rx = sender.enqueue(
functions.invoke_with_layer( functions.invoke_with_layer(