Make logger fully configurable (#1087)

This commit is contained in:
Tulir Asokan 2019-01-11 16:52:30 +02:00 committed by Lonami
parent eda4178333
commit f271316d7d
15 changed files with 141 additions and 137 deletions

View File

@ -1,12 +1,9 @@
import logging
from .client.telegramclient import TelegramClient from .client.telegramclient import TelegramClient
from .network import connection from .network import connection
from .tl import types, functions, custom from .tl import types, functions, custom
from . import version, events, utils, errors from . import version, events, utils, errors
__version__ = version.__version__ __version__ = version.__version__
logging.getLogger(__name__).addHandler(logging.NullHandler())
__all__ = ['TelegramClient', 'types', 'functions', 'custom', __all__ = ['TelegramClient', 'types', 'functions', 'custom',
'events', 'utils', 'errors'] 'events', 'utils', 'errors']

View File

@ -1,6 +1,5 @@
import datetime import datetime
import io import io
import logging
import os import os
import pathlib import pathlib
@ -14,9 +13,6 @@ except ImportError:
aiohttp = None aiohttp = None
__log__ = logging.getLogger(__name__)
class DownloadMethods(UserMethods): class DownloadMethods(UserMethods):
# region Public methods # region Public methods
@ -239,7 +235,8 @@ class DownloadMethods(UserMethods):
# The used sender will also change if ``FileMigrateError`` occurs # The used sender will also change if ``FileMigrateError`` occurs
sender = self._sender sender = self._sender
__log__.info('Downloading file in chunks of %d bytes', part_size) self._log[__name__].info('Downloading file in chunks of %d bytes',
part_size)
try: try:
offset = 0 offset = 0
while True: while True:
@ -251,7 +248,7 @@ class DownloadMethods(UserMethods):
# TODO Implement # TODO Implement
raise NotImplementedError raise NotImplementedError
except errors.FileMigrateError as e: except errors.FileMigrateError as e:
__log__.info('File lives in another DC') self._log[__name__].info('File lives in another DC')
sender = await self._borrow_exported_sender(e.new_dc) sender = await self._borrow_exported_sender(e.new_dc)
exported = True exported = True
continue continue
@ -264,7 +261,8 @@ class DownloadMethods(UserMethods):
else: else:
return getattr(result, 'type', '') return getattr(result, 'type', '')
__log__.debug('Saving %d more bytes', len(result.bytes)) self._log[__name__].debug('Saving %d more bytes',
len(result.bytes))
f.write(result.bytes) f.write(result.bytes)
if progress_callback: if progress_callback:
progress_callback(f.tell(), file_size) progress_callback(f.tell(), file_size)

View File

@ -1,6 +1,5 @@
import asyncio import asyncio
import itertools import itertools
import logging
import time import time
from async_generator import async_generator, yield_ from async_generator import async_generator, yield_
@ -11,8 +10,6 @@ from .buttons import ButtonMethods
from .. import helpers, utils, errors from .. import helpers, utils, errors
from ..tl import types, functions from ..tl import types, functions
__log__ = logging.getLogger(__name__)
class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods): class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):

View File

@ -6,7 +6,7 @@ import sys
import time import time
from datetime import datetime from datetime import datetime
from .. import version from .. import version, __name__ as __base_name__
from ..crypto import rsa from ..crypto import rsa
from ..extensions import markdown from ..extensions import markdown
from ..network import MTProtoSender, ConnectionTcpFull from ..network import MTProtoSender, ConnectionTcpFull
@ -19,7 +19,8 @@ DEFAULT_IPV4_IP = '149.154.167.51'
DEFAULT_IPV6_IP = '[2001:67c:4e8:f002::a]' DEFAULT_IPV6_IP = '[2001:67c:4e8:f002::a]'
DEFAULT_PORT = 443 DEFAULT_PORT = 443
__log__ = logging.getLogger(__name__) __default_log__ = logging.getLogger(__base_name__)
__default_log__.addHandler(logging.NullHandler())
class TelegramBaseClient(abc.ABC): class TelegramBaseClient(abc.ABC):
@ -133,6 +134,15 @@ class TelegramBaseClient(abc.ABC):
system_lang_code (`str`, optional): system_lang_code (`str`, optional):
"System lang code" to be sent when creating the initial connection. "System lang code" to be sent when creating the initial connection.
Defaults to `lang_code`. Defaults to `lang_code`.
loop (`asyncio.AbstractEventLoop`, optional):
Asyncio event loop to use. Defaults to `asyncio.get_event_loop()`
base_logger (`str` | `logging.Logger`, optional):
Base logger name or instance to use.
If a `str` is given, it'll be passed to `logging.getLogger()`. If a
`logging.Logger` is given, it'll be used directly. If something
else or nothing is given, the default logger will be used.
""" """
# Current TelegramClient version # Current TelegramClient version
@ -161,7 +171,8 @@ class TelegramBaseClient(abc.ABC):
app_version=None, app_version=None,
lang_code='en', lang_code='en',
system_lang_code='en', system_lang_code='en',
loop=None): loop=None,
base_logger=None):
if not api_id or not api_hash: if not api_id or not api_hash:
raise ValueError( raise ValueError(
"Your API ID or Hash cannot be empty or None. " "Your API ID or Hash cannot be empty or None. "
@ -170,6 +181,19 @@ class TelegramBaseClient(abc.ABC):
self._use_ipv6 = use_ipv6 self._use_ipv6 = use_ipv6
self._loop = loop or asyncio.get_event_loop() self._loop = loop or asyncio.get_event_loop()
if isinstance(base_logger, str):
base_logger = logging.getLogger(base_logger)
elif not isinstance(base_logger, logging.Logger):
base_logger = __default_log__
class _Loggers(dict):
def __missing__(self, key):
if key.startswith("telethon."):
key = key[len("telethon."):]
return base_logger.getChild(key)
self._log = _Loggers()
# Determine what session object we have # Determine what session object we have
if isinstance(session, str) or session is None: if isinstance(session, str) or session is None:
try: try:
@ -240,6 +264,7 @@ class TelegramBaseClient(abc.ABC):
self._connection = connection self._connection = connection
self._sender = MTProtoSender( self._sender = MTProtoSender(
self.session.auth_key, self._loop, self.session.auth_key, self._loop,
loggers=self._log,
retries=self._connection_retries, retries=self._connection_retries,
delay=self._retry_delay, delay=self._retry_delay,
auto_reconnect=self._auto_reconnect, auto_reconnect=self._auto_reconnect,
@ -317,7 +342,8 @@ class TelegramBaseClient(abc.ABC):
""" """
await self._sender.connect(self._connection( await self._sender.connect(self._connection(
self.session.server_address, self.session.port, self.session.server_address, self.session.port,
loop=self._loop, proxy=self._proxy loop=self._loop, loggers=self._log,
proxy=self._proxy
)) ))
self.session.auth_key = self._sender.auth_key self.session.auth_key = self._sender.auth_key
self.session.save() self.session.save()
@ -387,7 +413,7 @@ class TelegramBaseClient(abc.ABC):
""" """
Permanently switches the current connection to the new data center. Permanently switches the current connection to the new data center.
""" """
__log__.info('Reconnecting to new data center %s', new_dc) self._log[__name__].info('Reconnecting to new data center %s', new_dc)
dc = await self._get_dc(new_dc) dc = await self._get_dc(new_dc)
self.session.set_dc(dc.id, dc.ip_address, dc.port) self.session.set_dc(dc.id, dc.ip_address, dc.port)
@ -443,7 +469,8 @@ class TelegramBaseClient(abc.ABC):
sender = MTProtoSender(None, self._loop) sender = MTProtoSender(None, self._loop)
await sender.connect(self._connection( await sender.connect(self._connection(
dc.ip_address, dc.port, loop=self._loop, proxy=self._proxy)) dc.ip_address, dc.port, loop=self._loop, proxy=self._proxy))
__log__.info('Exporting authorization for data center %s', dc) self._log[__name__].info('Exporting authorization for data center %s',
dc)
auth = await self(functions.auth.ExportAuthorizationRequest(dc_id)) auth = await self(functions.auth.ExportAuthorizationRequest(dc_id))
req = self._init_with(functions.auth.ImportAuthorizationRequest( req = self._init_with(functions.auth.ImportAuthorizationRequest(
id=auth.id, bytes=auth.bytes id=auth.id, bytes=auth.bytes
@ -486,7 +513,8 @@ class TelegramBaseClient(abc.ABC):
n -= 1 n -= 1
self._borrowed_senders[dc_id] = (n, sender) self._borrowed_senders[dc_id] = (n, sender)
if not n: if not n:
__log__.info('Disconnecting borrowed sender for DC %d', dc_id) self._log[__name__].info(
'Disconnecting borrowed sender for DC %d', dc_id)
sender.disconnect() sender.disconnect()
async def _get_cdn_client(self, cdn_redirect): async def _get_cdn_client(self, cdn_redirect):

View File

@ -1,7 +1,5 @@
import asyncio import asyncio
import inspect
import itertools import itertools
import logging
import random import random
import time import time
@ -9,8 +7,6 @@ from .users import UserMethods
from .. import events, utils, errors from .. import events, utils, errors
from ..tl import types, functions from ..tl import types, functions
__log__ = logging.getLogger(__name__)
class UpdateMethods(UserMethods): class UpdateMethods(UserMethods):
@ -281,28 +277,31 @@ class UpdateMethods(UserMethods):
await callback(event) await callback(event)
except errors.AlreadyInConversationError: except errors.AlreadyInConversationError:
name = getattr(callback, '__name__', repr(callback)) name = getattr(callback, '__name__', repr(callback))
__log__.debug('Event handler "%s" already has an open ' self._log[__name__].debug(
'conversation, ignoring new one', name) 'Event handler "%s" already has an open conversation, '
'ignoring new one', name)
except events.StopPropagation: except events.StopPropagation:
name = getattr(callback, '__name__', repr(callback)) name = getattr(callback, '__name__', repr(callback))
__log__.debug( self._log[__name__].debug(
'Event handler "%s" stopped chain of propagation ' 'Event handler "%s" stopped chain of propagation '
'for event %s.', name, type(event).__name__ 'for event %s.', name, type(event).__name__
) )
break break
except Exception: except Exception:
name = getattr(callback, '__name__', repr(callback)) name = getattr(callback, '__name__', repr(callback))
__log__.exception('Unhandled exception on %s', name) self._log[__name__].exception('Unhandled exception on %s',
name)
async def _handle_auto_reconnect(self): async def _handle_auto_reconnect(self):
# Upon reconnection, we want to send getState # Upon reconnection, we want to send getState
# for Telegram to keep sending us updates. # for Telegram to keep sending us updates.
try: try:
__log__.info('Asking for the current state after reconnect...') self._log[__name__].info(
'Asking for the current state after reconnect...')
state = await self(functions.updates.GetStateRequest()) state = await self(functions.updates.GetStateRequest())
__log__.info('Got new state! %s', state) self._log[__name__].info('Got new state! %s', state)
except errors.RPCError as e: except errors.RPCError as e:
__log__.info('Failed to get current state: %r', e) self._log[__name__].info('Failed to get current state: %r', e)
# endregion # endregion

View File

@ -1,6 +1,5 @@
import hashlib import hashlib
import io import io
import logging
import os import os
import pathlib import pathlib
import re import re
@ -12,8 +11,6 @@ from .users import UserMethods
from .. import utils, helpers from .. import utils, helpers
from ..tl import types, functions, custom from ..tl import types, functions, custom
__log__ = logging.getLogger(__name__)
class _CacheType: class _CacheType:
"""Like functools.partial but pretends to be the wrapped class.""" """Like functools.partial but pretends to be the wrapped class."""
@ -196,8 +193,8 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
return msg return msg
async def _send_album(self, entity, files, caption='', async def _send_album(self, entity, files, caption='',
progress_callback=None, reply_to=None, progress_callback=None, reply_to=None,
parse_mode=(), silent=None): parse_mode=(), silent=None):
"""Specialized version of .send_file for albums""" """Specialized version of .send_file for albums"""
# We don't care if the user wants to avoid cache, we will use it # We don't care if the user wants to avoid cache, we will use it
# anyway. Why? The cached version will be exactly the same thing # anyway. Why? The cached version will be exactly the same thing
@ -350,8 +347,8 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
return cached return cached
part_count = (file_size + part_size - 1) // part_size part_count = (file_size + part_size - 1) // part_size
__log__.info('Uploading file of %d bytes in %d chunks of %d', self._log[__name__].info('Uploading file of %d bytes in %d chunks of %d',
file_size, part_count, part_size) file_size, part_count, part_size)
with open(file, 'rb') if isinstance(file, str) else BytesIO(file)\ with open(file, 'rb') if isinstance(file, str) else BytesIO(file)\
as stream: as stream:
@ -370,8 +367,8 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
result = await self(request) result = await self(request)
if result: if result:
__log__.debug('Uploaded %d/%d', part_index + 1, self._log[__name__].debug('Uploaded %d/%d',
part_count) part_index + 1, part_count)
if progress_callback: if progress_callback:
progress_callback(stream.tell(), file_size) progress_callback(stream.tell(), file_size)
else: else:
@ -464,7 +461,7 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
return file_handle, media return file_handle, media
async def _cache_media(self, msg, file, file_handle, async def _cache_media(self, msg, file, file_handle,
force_document=False): force_document=False):
if file and msg and isinstance(file_handle, if file and msg and isinstance(file_handle,
custom.InputSizedFile): custom.InputSizedFile):
# There was a response message and we didn't use cached # There was a response message and we didn't use cached

View File

@ -1,6 +1,5 @@
import asyncio import asyncio
import itertools import itertools
import logging
import time import time
from .telegrambaseclient import TelegramBaseClient from .telegrambaseclient import TelegramBaseClient
@ -8,7 +7,6 @@ from .. import errors, utils
from ..errors import MultiError, RPCError from ..errors import MultiError, RPCError
from ..tl import TLObject, TLRequest, types, functions from ..tl import TLObject, TLRequest, types, functions
__log__ = logging.getLogger(__name__)
_NOT_A_REQUEST = TypeError('You can only invoke requests, not types!') _NOT_A_REQUEST = TypeError('You can only invoke requests, not types!')
@ -27,7 +25,8 @@ class UserMethods(TelegramBaseClient):
if diff <= 3: # Flood waits below 3 seconds are "ignored" if diff <= 3: # Flood waits below 3 seconds are "ignored"
self._flood_waited_requests.pop(r.CONSTRUCTOR_ID, None) self._flood_waited_requests.pop(r.CONSTRUCTOR_ID, None)
elif diff <= self.flood_sleep_threshold: elif diff <= self.flood_sleep_threshold:
__log__.info('Sleeping early for %ds on flood wait', diff) self._log[__name__].info(
'Sleeping early for %ds on flood wait', diff)
await asyncio.sleep(diff, loop=self._loop) await asyncio.sleep(diff, loop=self._loop)
self._flood_waited_requests.pop(r.CONSTRUCTOR_ID, None) self._flood_waited_requests.pop(r.CONSTRUCTOR_ID, None)
else: else:
@ -61,8 +60,9 @@ class UserMethods(TelegramBaseClient):
self.session.process_entities(result) self.session.process_entities(result)
return result return result
except (errors.ServerError, errors.RpcCallFailError) as e: except (errors.ServerError, errors.RpcCallFailError) as e:
__log__.warning('Telegram is having internal issues %s: %s', self._log[__name__].warning(
e.__class__.__name__, e) 'Telegram is having internal issues %s: %s',
e.__class__.__name__, e)
except (errors.FloodWaitError, errors.FloodTestPhoneWaitError) as e: except (errors.FloodWaitError, errors.FloodTestPhoneWaitError) as e:
if utils.is_list_like(request): if utils.is_list_like(request):
request = request[request_index] request = request[request_index]
@ -71,15 +71,16 @@ class UserMethods(TelegramBaseClient):
[request.CONSTRUCTOR_ID] = time.time() + e.seconds [request.CONSTRUCTOR_ID] = time.time() + e.seconds
if e.seconds <= self.flood_sleep_threshold: if e.seconds <= self.flood_sleep_threshold:
__log__.info('Sleeping for %ds on flood wait', e.seconds) self._log[__name__].info('Sleeping for %ds on flood wait',
e.seconds)
await asyncio.sleep(e.seconds, loop=self._loop) await asyncio.sleep(e.seconds, loop=self._loop)
else: else:
raise raise
except (errors.PhoneMigrateError, errors.NetworkMigrateError, except (errors.PhoneMigrateError, errors.NetworkMigrateError,
errors.UserMigrateError) as e: errors.UserMigrateError) as e:
__log__.info('Phone migrated to %d', e.new_dc) self._log[__name__].info('Phone migrated to %d', e.new_dc)
should_raise = isinstance(e, ( should_raise = isinstance(e, (
errors.PhoneMigrateError, errors.NetworkMigrateError errors.PhoneMigrateError, errors.NetworkMigrateError
)) ))
if should_raise and await self.is_user_authorized(): if should_raise and await self.is_user_authorized():
raise raise

View File

@ -1,15 +1,12 @@
import asyncio import asyncio
import collections import collections
import io import io
import logging
import struct import struct
from ..tl import TLRequest from ..tl import TLRequest
from ..tl.core.messagecontainer import MessageContainer from ..tl.core.messagecontainer import MessageContainer
from ..tl.core.tlmessage import TLMessage from ..tl.core.tlmessage import TLMessage
__log__ = logging.getLogger(__name__)
class MessagePacker: class MessagePacker:
""" """
@ -24,11 +21,13 @@ class MessagePacker:
encryption and network overhead also is smaller. It's also a central encryption and network overhead also is smaller. It's also a central
point where outgoing requests are put, and where ready-messages are get. point where outgoing requests are put, and where ready-messages are get.
""" """
def __init__(self, state, loop):
def __init__(self, state, loop, loggers):
self._state = state self._state = state
self._loop = loop self._loop = loop
self._deque = collections.deque() self._deque = collections.deque()
self._ready = asyncio.Event(loop=loop) self._ready = asyncio.Event(loop=loop)
self._log = loggers[__name__]
def append(self, state): def append(self, state):
self._deque.append(state) self._deque.append(state)
@ -65,9 +64,9 @@ class MessagePacker:
after_id=state.after.msg_id if state.after else None after_id=state.after.msg_id if state.after else None
) )
batch.append(state) batch.append(state)
__log__.debug('Assigned msg_id = %d to %s (%x)', self._log.debug('Assigned msg_id = %d to %s (%x)',
state.msg_id, state.request.__class__.__name__, state.msg_id, state.request.__class__.__name__,
id(state.request)) id(state.request))
continue continue
# Put the item back since it can't be sent in this batch # Put the item back since it can't be sent in this batch

View File

@ -1,13 +1,10 @@
import abc import abc
import asyncio import asyncio
import logging
import socket import socket
import ssl as ssl_mod import ssl as ssl_mod
from ...errors import InvalidChecksumError from ...errors import InvalidChecksumError
__log__ = logging.getLogger(__name__)
class Connection(abc.ABC): class Connection(abc.ABC):
""" """
@ -21,10 +18,11 @@ class Connection(abc.ABC):
``ConnectionError``, which will raise when attempting to send if ``ConnectionError``, which will raise when attempting to send if
the client is disconnected (includes remote disconnections). the client is disconnected (includes remote disconnections).
""" """
def __init__(self, ip, port, *, loop, proxy=None): def __init__(self, ip, port, *, loop, loggers, proxy=None):
self._ip = ip self._ip = ip
self._port = port self._port = port
self._loop = loop self._loop = loop
self._log = loggers[__name__]
self._proxy = proxy self._proxy = proxy
self._reader = None self._reader = None
self._writer = None self._writer = None
@ -156,13 +154,13 @@ class Connection(abc.ABC):
except Exception as e: except Exception as e:
if isinstance(e, (ConnectionError, asyncio.IncompleteReadError)): if isinstance(e, (ConnectionError, asyncio.IncompleteReadError)):
msg = 'The server closed the connection' msg = 'The server closed the connection'
__log__.info(msg) self._log.info(msg)
elif isinstance(e, InvalidChecksumError): elif isinstance(e, InvalidChecksumError):
msg = 'The server response had an invalid checksum' msg = 'The server response had an invalid checksum'
__log__.info(msg) self._log.info(msg)
else: else:
msg = 'Unexpected exception in the receive loop' msg = 'Unexpected exception in the receive loop'
__log__.exception(msg) self._log.exception(msg)
self.disconnect() self.disconnect()

View File

@ -10,8 +10,8 @@ class ConnectionTcpFull(Connection):
Default Telegram mode. Sends 12 additional bytes and Default Telegram mode. Sends 12 additional bytes and
needs to calculate the CRC value of the packet itself. needs to calculate the CRC value of the packet itself.
""" """
def __init__(self, ip, port, *, loop, proxy=None): def __init__(self, ip, port, *, loop, loggers, proxy=None):
super().__init__(ip, port, loop=loop, proxy=proxy) super().__init__(ip, port, loop=loop, loggers=loggers, proxy=proxy)
self._send_counter = 0 self._send_counter = 0
async def connect(self, timeout=None, ssl=None): async def connect(self, timeout=None, ssl=None):

View File

@ -10,8 +10,8 @@ class ConnectionTcpObfuscated(ConnectionTcpAbridged):
every message with a randomly generated key using the every message with a randomly generated key using the
AES-CTR mode so the packets are harder to discern. AES-CTR mode so the packets are harder to discern.
""" """
def __init__(self, ip, port, *, loop, proxy=None): def __init__(self, ip, port, *, loop, loggers, proxy=None):
super().__init__(ip, port, loop=loop, proxy=proxy) super().__init__(ip, port, loop=loop, loggers=loggers, proxy=proxy)
self._aes_encrypt = None self._aes_encrypt = None
self._aes_decrypt = None self._aes_decrypt = None

View File

@ -1,7 +1,6 @@
import asyncio import asyncio
import collections import collections
import functools import functools
import logging
from . import authenticator from . import authenticator
from ..extensions.messagepacker import MessagePacker from ..extensions.messagepacker import MessagePacker
@ -24,8 +23,6 @@ from ..tl.types import (
) )
from ..crypto import AuthKey from ..crypto import AuthKey
__log__ = logging.getLogger(__name__)
def _cancellable(func): def _cancellable(func):
""" """
@ -59,12 +56,14 @@ class MTProtoSender:
A new authorization key will be generated on connection if no other A new authorization key will be generated on connection if no other
key exists yet. key exists yet.
""" """
def __init__(self, auth_key, loop, *, def __init__(self, auth_key, loop, *, loggers,
retries=5, delay=1, auto_reconnect=True, connect_timeout=None, retries=5, delay=1, auto_reconnect=True, connect_timeout=None,
auth_key_callback=None, auth_key_callback=None,
update_callback=None, auto_reconnect_callback=None): update_callback=None, auto_reconnect_callback=None):
self._connection = None self._connection = None
self._loop = loop self._loop = loop
self._loggers = loggers
self._log = loggers[__name__]
self._retries = retries self._retries = retries
self._delay = delay self._delay = delay
self._auto_reconnect = auto_reconnect self._auto_reconnect = auto_reconnect
@ -90,11 +89,12 @@ class MTProtoSender:
# Preserving the references of the AuthKey and state is important # Preserving the references of the AuthKey and state is important
self.auth_key = auth_key or AuthKey(None) self.auth_key = auth_key or AuthKey(None)
self._state = MTProtoState(self.auth_key) self._state = MTProtoState(self.auth_key, loggers=self._loggers)
# Outgoing messages are put in a queue and sent in a batch. # Outgoing messages are put in a queue and sent in a batch.
# Note that here we're also storing their ``_RequestState``. # Note that here we're also storing their ``_RequestState``.
self._send_queue = MessagePacker(self._state, self._loop) self._send_queue = MessagePacker(self._state, self._loop,
loggers=self._loggers)
# Sent states are remembered until a response is received. # Sent states are remembered until a response is received.
self._pending_state = {} self._pending_state = {}
@ -132,7 +132,7 @@ class MTProtoSender:
Connects to the specified given connection using the given auth key. Connects to the specified given connection using the given auth key.
""" """
if self._user_connected: if self._user_connected:
__log__.info('User is already connected!') self._log.info('User is already connected!')
return return
self._connection = connection self._connection = connection
@ -210,13 +210,13 @@ class MTProtoSender:
authorization key if necessary, and starting the send and authorization key if necessary, and starting the send and
receive loops. receive loops.
""" """
__log__.info('Connecting to %s...', self._connection) self._log.info('Connecting to %s...', self._connection)
for retry in range(1, self._retries + 1): for retry in range(1, self._retries + 1):
try: try:
__log__.debug('Connection attempt {}...'.format(retry)) self._log.debug('Connection attempt {}...'.format(retry))
await self._connection.connect(timeout=self._connect_timeout) await self._connection.connect(timeout=self._connect_timeout)
except (ConnectionError, asyncio.TimeoutError) as e: except (ConnectionError, asyncio.TimeoutError) as e:
__log__.warning('Attempt {} at connecting failed: {}: {}' self._log.warning('Attempt {} at connecting failed: {}: {}'
.format(retry, type(e).__name__, e)) .format(retry, type(e).__name__, e))
await asyncio.sleep(self._delay) await asyncio.sleep(self._delay)
else: else:
@ -225,12 +225,12 @@ class MTProtoSender:
raise ConnectionError('Connection to Telegram failed {} times' raise ConnectionError('Connection to Telegram failed {} times'
.format(self._retries)) .format(self._retries))
__log__.debug('Connection success!') self._log.debug('Connection success!')
if not self.auth_key: if not self.auth_key:
plain = MTProtoPlainSender(self._connection) plain = MTProtoPlainSender(self._connection)
for retry in range(1, self._retries + 1): for retry in range(1, self._retries + 1):
try: try:
__log__.debug('New auth_key attempt {}...'.format(retry)) self._log.debug('New auth_key attempt {}...'.format(retry))
self.auth_key.key, self._state.time_offset =\ self.auth_key.key, self._state.time_offset =\
await authenticator.do_authentication(plain) await authenticator.do_authentication(plain)
@ -243,7 +243,7 @@ class MTProtoSender:
break break
except (SecurityError, AssertionError) as e: except (SecurityError, AssertionError) as e:
__log__.warning('Attempt {} at new auth_key failed: {}' self._log.warning('Attempt {} at new auth_key failed: {}'
.format(retry, e)) .format(retry, e))
await asyncio.sleep(self._delay) await asyncio.sleep(self._delay)
else: else:
@ -252,10 +252,10 @@ class MTProtoSender:
self._disconnect(error=e) self._disconnect(error=e)
raise e raise e
__log__.debug('Starting send loop') self._log.debug('Starting send loop')
self._send_loop_handle = self._loop.create_task(self._send_loop()) self._send_loop_handle = self._loop.create_task(self._send_loop())
__log__.debug('Starting receive loop') self._log.debug('Starting receive loop')
self._recv_loop_handle = self._loop.create_task(self._recv_loop()) self._recv_loop_handle = self._loop.create_task(self._recv_loop())
# _disconnected only completes after manual disconnection # _disconnected only completes after manual disconnection
@ -264,16 +264,16 @@ class MTProtoSender:
if self._disconnected.done(): if self._disconnected.done():
self._disconnected = self._loop.create_future() self._disconnected = self._loop.create_future()
__log__.info('Connection to %s complete!', self._connection) self._log.info('Connection to %s complete!', self._connection)
def _disconnect(self, error=None): def _disconnect(self, error=None):
__log__.info('Disconnecting from %s...', self._connection) self._log.info('Disconnecting from %s...', self._connection)
self._user_connected = False self._user_connected = False
try: try:
__log__.debug('Closing current connection...') self._log.debug('Closing current connection...')
self._connection.disconnect() self._connection.disconnect()
finally: finally:
__log__.debug('Cancelling {} pending message(s)...' self._log.debug('Cancelling {} pending message(s)...'
.format(len(self._pending_state))) .format(len(self._pending_state)))
for state in self._pending_state.values(): for state in self._pending_state.values():
if error and not state.future.done(): if error and not state.future.done():
@ -286,14 +286,14 @@ class MTProtoSender:
self._last_ack = None self._last_ack = None
if self._send_loop_handle: if self._send_loop_handle:
__log__.debug('Cancelling the send loop...') self._log.debug('Cancelling the send loop...')
self._send_loop_handle.cancel() self._send_loop_handle.cancel()
if self._recv_loop_handle: if self._recv_loop_handle:
__log__.debug('Cancelling the receive loop...') self._log.debug('Cancelling the receive loop...')
self._recv_loop_handle.cancel() self._recv_loop_handle.cancel()
__log__.info('Disconnection from %s complete!', self._connection) self._log.info('Disconnection from %s complete!', self._connection)
if self._disconnected and not self._disconnected.done(): if self._disconnected and not self._disconnected.done():
if error: if error:
self._disconnected.set_exception(error) self._disconnected.set_exception(error)
@ -306,13 +306,13 @@ class MTProtoSender:
""" """
self._reconnecting = True self._reconnecting = True
__log__.debug('Closing current connection...') self._log.debug('Closing current connection...')
self._connection.disconnect() self._connection.disconnect()
__log__.debug('Cancelling the send loop...') self._log.debug('Cancelling the send loop...')
self._send_loop_handle.cancel() self._send_loop_handle.cancel()
__log__.debug('Cancelling the receive loop...') self._log.debug('Cancelling the receive loop...')
self._recv_loop_handle.cancel() self._recv_loop_handle.cancel()
self._reconnecting = False self._reconnecting = False
@ -325,12 +325,12 @@ class MTProtoSender:
try: try:
await self._connect() await self._connect()
except (ConnectionError, asyncio.TimeoutError) as e: except (ConnectionError, asyncio.TimeoutError) as e:
__log__.info('Failed reconnection retry %d/%d with %s', self._log.info('Failed reconnection retry %d/%d with %s',
retry, retries, e.__class__.__name__) retry, retries, e.__class__.__name__)
await asyncio.sleep(self._delay) await asyncio.sleep(self._delay)
except Exception: except Exception:
__log__.exception('Unexpected exception reconnecting on ' self._log.exception('Unexpected exception reconnecting on '
'retry %d/%d', retry, retries) 'retry %d/%d', retry, retries)
await asyncio.sleep(self._delay) await asyncio.sleep(self._delay)
@ -343,7 +343,7 @@ class MTProtoSender:
break break
else: else:
__log__.error('Failed to reconnect automatically.') self._log.error('Failed to reconnect automatically.')
self._disconnect(error=ConnectionError()) self._disconnect(error=ConnectionError())
def _start_reconnect(self): def _start_reconnect(self):
@ -368,7 +368,7 @@ class MTProtoSender:
self._last_acks.append(ack) self._last_acks.append(ack)
self._pending_ack.clear() self._pending_ack.clear()
__log__.debug('Waiting for messages to send...') self._log.debug('Waiting for messages to send...')
# TODO Wait for the connection send queue to be empty? # TODO Wait for the connection send queue to be empty?
# This means that while it's not empty we can wait for # This means that while it's not empty we can wait for
# more messages to be added to the send queue. # more messages to be added to the send queue.
@ -377,14 +377,14 @@ class MTProtoSender:
if not data: if not data:
continue continue
__log__.debug('Encrypting %d message(s) in %d bytes for sending', self._log.debug('Encrypting %d message(s) in %d bytes for sending',
len(batch), len(data)) len(batch), len(data))
data = self._state.encrypt_message_data(data) data = self._state.encrypt_message_data(data)
try: try:
await self._connection.send(data) await self._connection.send(data)
except ConnectionError: except ConnectionError:
__log__.info('Connection closed while sending data') self._log.info('Connection closed while sending data')
self._start_reconnect() self._start_reconnect()
return return
@ -397,7 +397,7 @@ class MTProtoSender:
if isinstance(s.request, TLRequest): if isinstance(s.request, TLRequest):
self._pending_state[s.msg_id] = s self._pending_state[s.msg_id] = s
__log__.debug('Encrypted messages put in a queue to be sent') self._log.debug('Encrypted messages put in a queue to be sent')
@_cancellable @_cancellable
async def _recv_loop(self): async def _recv_loop(self):
@ -408,11 +408,11 @@ class MTProtoSender:
Besides `connect`, only this method ever receives data. Besides `connect`, only this method ever receives data.
""" """
while self._user_connected and not self._reconnecting: while self._user_connected and not self._reconnecting:
__log__.debug('Receiving items from the network...') self._log.debug('Receiving items from the network...')
try: try:
body = await self._connection.recv() body = await self._connection.recv()
except ConnectionError: except ConnectionError:
__log__.info('Connection closed while receiving data') self._log.info('Connection closed while receiving data')
self._start_reconnect() self._start_reconnect()
return return
@ -420,20 +420,20 @@ class MTProtoSender:
message = self._state.decrypt_message_data(body) message = self._state.decrypt_message_data(body)
except TypeNotFoundError as e: except TypeNotFoundError as e:
# Received object which we don't know how to deserialize # Received object which we don't know how to deserialize
__log__.info('Type %08x not found, remaining data %r', self._log.info('Type %08x not found, remaining data %r',
e.invalid_constructor_id, e.remaining) e.invalid_constructor_id, e.remaining)
continue continue
except SecurityError as e: except SecurityError as e:
# A step while decoding had the incorrect data. This message # A step while decoding had the incorrect data. This message
# should not be considered safe and it should be ignored. # should not be considered safe and it should be ignored.
__log__.warning('Security error while unpacking a ' self._log.warning('Security error while unpacking a '
'received message: %s', e) 'received message: %s', e)
continue continue
except BufferError as e: except BufferError as e:
if isinstance(e, InvalidBufferError) and e.code == 404: if isinstance(e, InvalidBufferError) and e.code == 404:
__log__.info('Broken authorization key; resetting') self._log.info('Broken authorization key; resetting')
else: else:
__log__.warning('Invalid buffer %s', e) self._log.warning('Invalid buffer %s', e)
self.auth_key.key = None self.auth_key.key = None
if self._auth_key_callback: if self._auth_key_callback:
@ -442,14 +442,14 @@ class MTProtoSender:
self._start_reconnect() self._start_reconnect()
return return
except Exception: except Exception:
__log__.exception('Unhandled error while receiving data') self._log.exception('Unhandled error while receiving data')
self._start_reconnect() self._start_reconnect()
return return
try: try:
await self._process_message(message) await self._process_message(message)
except Exception: except Exception:
__log__.exception('Unhandled error while processing msgs') self._log.exception('Unhandled error while processing msgs')
# Response Handlers # Response Handlers
@ -498,7 +498,7 @@ class MTProtoSender:
""" """
rpc_result = message.obj rpc_result = message.obj
state = self._pending_state.pop(rpc_result.req_msg_id, None) state = self._pending_state.pop(rpc_result.req_msg_id, None)
__log__.debug('Handling RPC result for message %d', self._log.debug('Handling RPC result for message %d',
rpc_result.req_msg_id) rpc_result.req_msg_id)
if not state: if not state:
@ -511,7 +511,7 @@ class MTProtoSender:
if not isinstance(reader.tgread_object(), upload.File): if not isinstance(reader.tgread_object(), upload.File):
raise ValueError('Not an upload.File') raise ValueError('Not an upload.File')
except (TypeNotFoundError, ValueError): except (TypeNotFoundError, ValueError):
__log__.info('Received response without parent request: {}' self._log.info('Received response without parent request: {}'
.format(rpc_result.body)) .format(rpc_result.body))
return return
@ -535,7 +535,7 @@ class MTProtoSender:
msg_container#73f1f8dc messages:vector<%Message> = MessageContainer; msg_container#73f1f8dc messages:vector<%Message> = MessageContainer;
""" """
__log__.debug('Handling container') self._log.debug('Handling container')
for inner_message in message.obj.messages: for inner_message in message.obj.messages:
await self._process_message(inner_message) await self._process_message(inner_message)
@ -545,13 +545,13 @@ class MTProtoSender:
gzip_packed#3072cfa1 packed_data:bytes = Object; gzip_packed#3072cfa1 packed_data:bytes = Object;
""" """
__log__.debug('Handling gzipped data') self._log.debug('Handling gzipped data')
with BinaryReader(message.obj.data) as reader: with BinaryReader(message.obj.data) as reader:
message.obj = reader.tgread_object() message.obj = reader.tgread_object()
await self._process_message(message) await self._process_message(message)
async def _handle_update(self, message): async def _handle_update(self, message):
__log__.debug('Handling update {}' self._log.debug('Handling update {}'
.format(message.obj.__class__.__name__)) .format(message.obj.__class__.__name__))
if self._update_callback: if self._update_callback:
await self._update_callback(message.obj) await self._update_callback(message.obj)
@ -564,7 +564,7 @@ class MTProtoSender:
pong#347773c5 msg_id:long ping_id:long = Pong; pong#347773c5 msg_id:long ping_id:long = Pong;
""" """
pong = message.obj pong = message.obj
__log__.debug('Handling pong for message %d', pong.msg_id) self._log.debug('Handling pong for message %d', pong.msg_id)
state = self._pending_state.pop(pong.msg_id, None) state = self._pending_state.pop(pong.msg_id, None)
if state: if state:
state.future.set_result(pong) state.future.set_result(pong)
@ -578,12 +578,12 @@ class MTProtoSender:
error_code:int new_server_salt:long = BadMsgNotification; error_code:int new_server_salt:long = BadMsgNotification;
""" """
bad_salt = message.obj bad_salt = message.obj
__log__.debug('Handling bad salt for message %d', bad_salt.bad_msg_id) self._log.debug('Handling bad salt for message %d', bad_salt.bad_msg_id)
self._state.salt = bad_salt.new_server_salt self._state.salt = bad_salt.new_server_salt
states = self._pop_states(bad_salt.bad_msg_id) states = self._pop_states(bad_salt.bad_msg_id)
self._send_queue.extend(states) self._send_queue.extend(states)
__log__.debug('%d message(s) will be resent', len(states)) self._log.debug('%d message(s) will be resent', len(states))
async def _handle_bad_notification(self, message): async def _handle_bad_notification(self, message):
""" """
@ -596,13 +596,13 @@ class MTProtoSender:
bad_msg = message.obj bad_msg = message.obj
states = self._pop_states(bad_msg.bad_msg_id) states = self._pop_states(bad_msg.bad_msg_id)
__log__.debug('Handling bad msg %s', bad_msg) self._log.debug('Handling bad msg %s', bad_msg)
if bad_msg.error_code in (16, 17): if bad_msg.error_code in (16, 17):
# Sent msg_id too low or too high (respectively). # Sent msg_id too low or too high (respectively).
# Use the current msg_id to determine the right time offset. # Use the current msg_id to determine the right time offset.
to = self._state.update_time_offset( to = self._state.update_time_offset(
correct_msg_id=message.msg_id) correct_msg_id=message.msg_id)
__log__.info('System clock is wrong, set time offset to %ds', to) self._log.info('System clock is wrong, set time offset to %ds', to)
elif bad_msg.error_code == 32: elif bad_msg.error_code == 32:
# msg_seqno too low, so just pump it up by some "large" amount # msg_seqno too low, so just pump it up by some "large" amount
# TODO A better fix would be to start with a new fresh session ID # TODO A better fix would be to start with a new fresh session ID
@ -618,7 +618,8 @@ class MTProtoSender:
# Messages are to be re-sent once we've corrected the issue # Messages are to be re-sent once we've corrected the issue
self._send_queue.extend(states) self._send_queue.extend(states)
__log__.debug('%d messages will be resent due to bad msg', len(states)) self._log.debug('%d messages will be resent due to bad msg',
len(states))
async def _handle_detailed_info(self, message): async def _handle_detailed_info(self, message):
""" """
@ -629,7 +630,7 @@ class MTProtoSender:
""" """
# TODO https://goo.gl/VvpCC6 # TODO https://goo.gl/VvpCC6
msg_id = message.obj.answer_msg_id msg_id = message.obj.answer_msg_id
__log__.debug('Handling detailed info for message %d', msg_id) self._log.debug('Handling detailed info for message %d', msg_id)
self._pending_ack.add(msg_id) self._pending_ack.add(msg_id)
async def _handle_new_detailed_info(self, message): async def _handle_new_detailed_info(self, message):
@ -641,7 +642,7 @@ class MTProtoSender:
""" """
# TODO https://goo.gl/G7DPsR # TODO https://goo.gl/G7DPsR
msg_id = message.obj.answer_msg_id msg_id = message.obj.answer_msg_id
__log__.debug('Handling new detailed info for message %d', msg_id) self._log.debug('Handling new detailed info for message %d', msg_id)
self._pending_ack.add(msg_id) self._pending_ack.add(msg_id)
async def _handle_new_session_created(self, message): async def _handle_new_session_created(self, message):
@ -652,7 +653,7 @@ class MTProtoSender:
server_salt:long = NewSession; server_salt:long = NewSession;
""" """
# TODO https://goo.gl/LMyN7A # TODO https://goo.gl/LMyN7A
__log__.debug('Handling new session created') self._log.debug('Handling new session created')
self._state.salt = message.obj.server_salt self._state.salt = message.obj.server_salt
async def _handle_ack(self, message): async def _handle_ack(self, message):
@ -671,7 +672,7 @@ class MTProtoSender:
messages are acknowledged. messages are acknowledged.
""" """
ack = message.obj ack = message.obj
__log__.debug('Handling acknowledge for %s', str(ack.msg_ids)) self._log.debug('Handling acknowledge for %s', str(ack.msg_ids))
for msg_id in ack.msg_ids: for msg_id in ack.msg_ids:
state = self._pending_state.get(msg_id) state = self._pending_state.get(msg_id)
if state and isinstance(state.request, LogOutRequest): if state and isinstance(state.request, LogOutRequest):
@ -688,7 +689,7 @@ class MTProtoSender:
""" """
# TODO save these salts and automatically adjust to the # TODO save these salts and automatically adjust to the
# correct one whenever the salt in use expires. # correct one whenever the salt in use expires.
__log__.debug('Handling future salts for message %d', message.msg_id) self._log.debug('Handling future salts for message %d', message.msg_id)
state = self._pending_state.pop(message.msg_id, None) state = self._pending_state.pop(message.msg_id, None)
if state: if state:
state.future.set_result(message.obj) state.future.set_result(message.obj)

View File

@ -1,4 +1,3 @@
import logging
import os import os
import struct import struct
import time import time
@ -11,8 +10,6 @@ from ..tl.core import TLMessage
from ..tl.functions import InvokeAfterMsgRequest from ..tl.functions import InvokeAfterMsgRequest
from ..tl.core.gzippacked import GzipPacked from ..tl.core.gzippacked import GzipPacked
__log__ = logging.getLogger(__name__)
class MTProtoState: class MTProtoState:
""" """
@ -37,8 +34,9 @@ class MTProtoState:
many methods that would be needed to make it convenient to use for the many methods that would be needed to make it convenient to use for the
authentication process, at which point the `MTProtoPlainSender` is better. authentication process, at which point the `MTProtoPlainSender` is better.
""" """
def __init__(self, auth_key): def __init__(self, auth_key, loggers):
self.auth_key = auth_key self.auth_key = auth_key
self._log = loggers[__name__]
self.time_offset = 0 self.time_offset = 0
self.salt = 0 self.salt = 0
self.reset() self.reset()
@ -183,7 +181,7 @@ class MTProtoState:
if self.time_offset != old: if self.time_offset != old:
self._last_msg_id = 0 self._last_msg_id = 0
__log__.debug( self._log.debug(
'Updated time offset (old offset %d, bad %d, good %d, new %d)', 'Updated time offset (old offset %d, bad %d, good %d, new %d)',
old, bad, correct_msg_id, self.time_offset old, bad, correct_msg_id, self.time_offset
) )

View File

@ -1,11 +1,6 @@
import logging
import struct
from .tlmessage import TLMessage from .tlmessage import TLMessage
from ..tlobject import TLObject from ..tlobject import TLObject
__log__ = logging.getLogger(__name__)
class MessageContainer(TLObject): class MessageContainer(TLObject):
CONSTRUCTOR_ID = 0x73f1f8dc CONSTRUCTOR_ID = 0x73f1f8dc

View File

@ -1,9 +1,5 @@
import logging
from .. import TLObject from .. import TLObject
__log__ = logging.getLogger(__name__)
class TLMessage(TLObject): class TLMessage(TLObject):
""" """