mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-10 19:46:36 +03:00
Make logger fully configurable (#1087)
This commit is contained in:
parent
eda4178333
commit
f271316d7d
|
@ -1,12 +1,9 @@
|
|||
import logging
|
||||
from .client.telegramclient import TelegramClient
|
||||
from .network import connection
|
||||
from .tl import types, functions, custom
|
||||
from . import version, events, utils, errors
|
||||
|
||||
|
||||
__version__ = version.__version__
|
||||
logging.getLogger(__name__).addHandler(logging.NullHandler())
|
||||
|
||||
__all__ = ['TelegramClient', 'types', 'functions', 'custom',
|
||||
'events', 'utils', 'errors']
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import datetime
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
|
||||
|
@ -14,9 +13,6 @@ except ImportError:
|
|||
aiohttp = None
|
||||
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DownloadMethods(UserMethods):
|
||||
|
||||
# region Public methods
|
||||
|
@ -239,7 +235,8 @@ class DownloadMethods(UserMethods):
|
|||
# The used sender will also change if ``FileMigrateError`` occurs
|
||||
sender = self._sender
|
||||
|
||||
__log__.info('Downloading file in chunks of %d bytes', part_size)
|
||||
self._log[__name__].info('Downloading file in chunks of %d bytes',
|
||||
part_size)
|
||||
try:
|
||||
offset = 0
|
||||
while True:
|
||||
|
@ -251,7 +248,7 @@ class DownloadMethods(UserMethods):
|
|||
# TODO Implement
|
||||
raise NotImplementedError
|
||||
except errors.FileMigrateError as e:
|
||||
__log__.info('File lives in another DC')
|
||||
self._log[__name__].info('File lives in another DC')
|
||||
sender = await self._borrow_exported_sender(e.new_dc)
|
||||
exported = True
|
||||
continue
|
||||
|
@ -264,7 +261,8 @@ class DownloadMethods(UserMethods):
|
|||
else:
|
||||
return getattr(result, 'type', '')
|
||||
|
||||
__log__.debug('Saving %d more bytes', len(result.bytes))
|
||||
self._log[__name__].debug('Saving %d more bytes',
|
||||
len(result.bytes))
|
||||
f.write(result.bytes)
|
||||
if progress_callback:
|
||||
progress_callback(f.tell(), file_size)
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import asyncio
|
||||
import itertools
|
||||
import logging
|
||||
import time
|
||||
|
||||
from async_generator import async_generator, yield_
|
||||
|
@ -11,8 +10,6 @@ from .buttons import ButtonMethods
|
|||
from .. import helpers, utils, errors
|
||||
from ..tl import types, functions
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ import sys
|
|||
import time
|
||||
from datetime import datetime
|
||||
|
||||
from .. import version
|
||||
from .. import version, __name__ as __base_name__
|
||||
from ..crypto import rsa
|
||||
from ..extensions import markdown
|
||||
from ..network import MTProtoSender, ConnectionTcpFull
|
||||
|
@ -19,7 +19,8 @@ DEFAULT_IPV4_IP = '149.154.167.51'
|
|||
DEFAULT_IPV6_IP = '[2001:67c:4e8:f002::a]'
|
||||
DEFAULT_PORT = 443
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
__default_log__ = logging.getLogger(__base_name__)
|
||||
__default_log__.addHandler(logging.NullHandler())
|
||||
|
||||
|
||||
class TelegramBaseClient(abc.ABC):
|
||||
|
@ -133,6 +134,15 @@ class TelegramBaseClient(abc.ABC):
|
|||
system_lang_code (`str`, optional):
|
||||
"System lang code" to be sent when creating the initial connection.
|
||||
Defaults to `lang_code`.
|
||||
|
||||
loop (`asyncio.AbstractEventLoop`, optional):
|
||||
Asyncio event loop to use. Defaults to `asyncio.get_event_loop()`
|
||||
|
||||
base_logger (`str` | `logging.Logger`, optional):
|
||||
Base logger name or instance to use.
|
||||
If a `str` is given, it'll be passed to `logging.getLogger()`. If a
|
||||
`logging.Logger` is given, it'll be used directly. If something
|
||||
else or nothing is given, the default logger will be used.
|
||||
"""
|
||||
|
||||
# Current TelegramClient version
|
||||
|
@ -161,7 +171,8 @@ class TelegramBaseClient(abc.ABC):
|
|||
app_version=None,
|
||||
lang_code='en',
|
||||
system_lang_code='en',
|
||||
loop=None):
|
||||
loop=None,
|
||||
base_logger=None):
|
||||
if not api_id or not api_hash:
|
||||
raise ValueError(
|
||||
"Your API ID or Hash cannot be empty or None. "
|
||||
|
@ -170,6 +181,19 @@ class TelegramBaseClient(abc.ABC):
|
|||
self._use_ipv6 = use_ipv6
|
||||
self._loop = loop or asyncio.get_event_loop()
|
||||
|
||||
if isinstance(base_logger, str):
|
||||
base_logger = logging.getLogger(base_logger)
|
||||
elif not isinstance(base_logger, logging.Logger):
|
||||
base_logger = __default_log__
|
||||
|
||||
class _Loggers(dict):
|
||||
def __missing__(self, key):
|
||||
if key.startswith("telethon."):
|
||||
key = key[len("telethon."):]
|
||||
return base_logger.getChild(key)
|
||||
|
||||
self._log = _Loggers()
|
||||
|
||||
# Determine what session object we have
|
||||
if isinstance(session, str) or session is None:
|
||||
try:
|
||||
|
@ -240,6 +264,7 @@ class TelegramBaseClient(abc.ABC):
|
|||
self._connection = connection
|
||||
self._sender = MTProtoSender(
|
||||
self.session.auth_key, self._loop,
|
||||
loggers=self._log,
|
||||
retries=self._connection_retries,
|
||||
delay=self._retry_delay,
|
||||
auto_reconnect=self._auto_reconnect,
|
||||
|
@ -317,7 +342,8 @@ class TelegramBaseClient(abc.ABC):
|
|||
"""
|
||||
await self._sender.connect(self._connection(
|
||||
self.session.server_address, self.session.port,
|
||||
loop=self._loop, proxy=self._proxy
|
||||
loop=self._loop, loggers=self._log,
|
||||
proxy=self._proxy
|
||||
))
|
||||
self.session.auth_key = self._sender.auth_key
|
||||
self.session.save()
|
||||
|
@ -387,7 +413,7 @@ class TelegramBaseClient(abc.ABC):
|
|||
"""
|
||||
Permanently switches the current connection to the new data center.
|
||||
"""
|
||||
__log__.info('Reconnecting to new data center %s', new_dc)
|
||||
self._log[__name__].info('Reconnecting to new data center %s', new_dc)
|
||||
dc = await self._get_dc(new_dc)
|
||||
|
||||
self.session.set_dc(dc.id, dc.ip_address, dc.port)
|
||||
|
@ -443,7 +469,8 @@ class TelegramBaseClient(abc.ABC):
|
|||
sender = MTProtoSender(None, self._loop)
|
||||
await sender.connect(self._connection(
|
||||
dc.ip_address, dc.port, loop=self._loop, proxy=self._proxy))
|
||||
__log__.info('Exporting authorization for data center %s', dc)
|
||||
self._log[__name__].info('Exporting authorization for data center %s',
|
||||
dc)
|
||||
auth = await self(functions.auth.ExportAuthorizationRequest(dc_id))
|
||||
req = self._init_with(functions.auth.ImportAuthorizationRequest(
|
||||
id=auth.id, bytes=auth.bytes
|
||||
|
@ -486,7 +513,8 @@ class TelegramBaseClient(abc.ABC):
|
|||
n -= 1
|
||||
self._borrowed_senders[dc_id] = (n, sender)
|
||||
if not n:
|
||||
__log__.info('Disconnecting borrowed sender for DC %d', dc_id)
|
||||
self._log[__name__].info(
|
||||
'Disconnecting borrowed sender for DC %d', dc_id)
|
||||
sender.disconnect()
|
||||
|
||||
async def _get_cdn_client(self, cdn_redirect):
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
import asyncio
|
||||
import inspect
|
||||
import itertools
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
|
||||
|
@ -9,8 +7,6 @@ from .users import UserMethods
|
|||
from .. import events, utils, errors
|
||||
from ..tl import types, functions
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UpdateMethods(UserMethods):
|
||||
|
||||
|
@ -281,28 +277,31 @@ class UpdateMethods(UserMethods):
|
|||
await callback(event)
|
||||
except errors.AlreadyInConversationError:
|
||||
name = getattr(callback, '__name__', repr(callback))
|
||||
__log__.debug('Event handler "%s" already has an open '
|
||||
'conversation, ignoring new one', name)
|
||||
self._log[__name__].debug(
|
||||
'Event handler "%s" already has an open conversation, '
|
||||
'ignoring new one', name)
|
||||
except events.StopPropagation:
|
||||
name = getattr(callback, '__name__', repr(callback))
|
||||
__log__.debug(
|
||||
self._log[__name__].debug(
|
||||
'Event handler "%s" stopped chain of propagation '
|
||||
'for event %s.', name, type(event).__name__
|
||||
)
|
||||
break
|
||||
except Exception:
|
||||
name = getattr(callback, '__name__', repr(callback))
|
||||
__log__.exception('Unhandled exception on %s', name)
|
||||
self._log[__name__].exception('Unhandled exception on %s',
|
||||
name)
|
||||
|
||||
async def _handle_auto_reconnect(self):
|
||||
# Upon reconnection, we want to send getState
|
||||
# for Telegram to keep sending us updates.
|
||||
try:
|
||||
__log__.info('Asking for the current state after reconnect...')
|
||||
self._log[__name__].info(
|
||||
'Asking for the current state after reconnect...')
|
||||
state = await self(functions.updates.GetStateRequest())
|
||||
__log__.info('Got new state! %s', state)
|
||||
self._log[__name__].info('Got new state! %s', state)
|
||||
except errors.RPCError as e:
|
||||
__log__.info('Failed to get current state: %r', e)
|
||||
self._log[__name__].info('Failed to get current state: %r', e)
|
||||
|
||||
# endregion
|
||||
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import hashlib
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
|
@ -12,8 +11,6 @@ from .users import UserMethods
|
|||
from .. import utils, helpers
|
||||
from ..tl import types, functions, custom
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _CacheType:
|
||||
"""Like functools.partial but pretends to be the wrapped class."""
|
||||
|
@ -196,8 +193,8 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
|
|||
return msg
|
||||
|
||||
async def _send_album(self, entity, files, caption='',
|
||||
progress_callback=None, reply_to=None,
|
||||
parse_mode=(), silent=None):
|
||||
progress_callback=None, reply_to=None,
|
||||
parse_mode=(), silent=None):
|
||||
"""Specialized version of .send_file for albums"""
|
||||
# We don't care if the user wants to avoid cache, we will use it
|
||||
# anyway. Why? The cached version will be exactly the same thing
|
||||
|
@ -350,8 +347,8 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
|
|||
return cached
|
||||
|
||||
part_count = (file_size + part_size - 1) // part_size
|
||||
__log__.info('Uploading file of %d bytes in %d chunks of %d',
|
||||
file_size, part_count, part_size)
|
||||
self._log[__name__].info('Uploading file of %d bytes in %d chunks of %d',
|
||||
file_size, part_count, part_size)
|
||||
|
||||
with open(file, 'rb') if isinstance(file, str) else BytesIO(file)\
|
||||
as stream:
|
||||
|
@ -370,8 +367,8 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
|
|||
|
||||
result = await self(request)
|
||||
if result:
|
||||
__log__.debug('Uploaded %d/%d', part_index + 1,
|
||||
part_count)
|
||||
self._log[__name__].debug('Uploaded %d/%d',
|
||||
part_index + 1, part_count)
|
||||
if progress_callback:
|
||||
progress_callback(stream.tell(), file_size)
|
||||
else:
|
||||
|
@ -464,7 +461,7 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
|
|||
return file_handle, media
|
||||
|
||||
async def _cache_media(self, msg, file, file_handle,
|
||||
force_document=False):
|
||||
force_document=False):
|
||||
if file and msg and isinstance(file_handle,
|
||||
custom.InputSizedFile):
|
||||
# There was a response message and we didn't use cached
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import asyncio
|
||||
import itertools
|
||||
import logging
|
||||
import time
|
||||
|
||||
from .telegrambaseclient import TelegramBaseClient
|
||||
|
@ -8,7 +7,6 @@ from .. import errors, utils
|
|||
from ..errors import MultiError, RPCError
|
||||
from ..tl import TLObject, TLRequest, types, functions
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
_NOT_A_REQUEST = TypeError('You can only invoke requests, not types!')
|
||||
|
||||
|
||||
|
@ -27,7 +25,8 @@ class UserMethods(TelegramBaseClient):
|
|||
if diff <= 3: # Flood waits below 3 seconds are "ignored"
|
||||
self._flood_waited_requests.pop(r.CONSTRUCTOR_ID, None)
|
||||
elif diff <= self.flood_sleep_threshold:
|
||||
__log__.info('Sleeping early for %ds on flood wait', diff)
|
||||
self._log[__name__].info(
|
||||
'Sleeping early for %ds on flood wait', diff)
|
||||
await asyncio.sleep(diff, loop=self._loop)
|
||||
self._flood_waited_requests.pop(r.CONSTRUCTOR_ID, None)
|
||||
else:
|
||||
|
@ -61,8 +60,9 @@ class UserMethods(TelegramBaseClient):
|
|||
self.session.process_entities(result)
|
||||
return result
|
||||
except (errors.ServerError, errors.RpcCallFailError) as e:
|
||||
__log__.warning('Telegram is having internal issues %s: %s',
|
||||
e.__class__.__name__, e)
|
||||
self._log[__name__].warning(
|
||||
'Telegram is having internal issues %s: %s',
|
||||
e.__class__.__name__, e)
|
||||
except (errors.FloodWaitError, errors.FloodTestPhoneWaitError) as e:
|
||||
if utils.is_list_like(request):
|
||||
request = request[request_index]
|
||||
|
@ -71,15 +71,16 @@ class UserMethods(TelegramBaseClient):
|
|||
[request.CONSTRUCTOR_ID] = time.time() + e.seconds
|
||||
|
||||
if e.seconds <= self.flood_sleep_threshold:
|
||||
__log__.info('Sleeping for %ds on flood wait', e.seconds)
|
||||
self._log[__name__].info('Sleeping for %ds on flood wait',
|
||||
e.seconds)
|
||||
await asyncio.sleep(e.seconds, loop=self._loop)
|
||||
else:
|
||||
raise
|
||||
except (errors.PhoneMigrateError, errors.NetworkMigrateError,
|
||||
errors.UserMigrateError) as e:
|
||||
__log__.info('Phone migrated to %d', e.new_dc)
|
||||
self._log[__name__].info('Phone migrated to %d', e.new_dc)
|
||||
should_raise = isinstance(e, (
|
||||
errors.PhoneMigrateError, errors.NetworkMigrateError
|
||||
errors.PhoneMigrateError, errors.NetworkMigrateError
|
||||
))
|
||||
if should_raise and await self.is_user_authorized():
|
||||
raise
|
||||
|
|
|
@ -1,15 +1,12 @@
|
|||
import asyncio
|
||||
import collections
|
||||
import io
|
||||
import logging
|
||||
import struct
|
||||
|
||||
from ..tl import TLRequest
|
||||
from ..tl.core.messagecontainer import MessageContainer
|
||||
from ..tl.core.tlmessage import TLMessage
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MessagePacker:
|
||||
"""
|
||||
|
@ -24,11 +21,13 @@ class MessagePacker:
|
|||
encryption and network overhead also is smaller. It's also a central
|
||||
point where outgoing requests are put, and where ready-messages are get.
|
||||
"""
|
||||
def __init__(self, state, loop):
|
||||
|
||||
def __init__(self, state, loop, loggers):
|
||||
self._state = state
|
||||
self._loop = loop
|
||||
self._deque = collections.deque()
|
||||
self._ready = asyncio.Event(loop=loop)
|
||||
self._log = loggers[__name__]
|
||||
|
||||
def append(self, state):
|
||||
self._deque.append(state)
|
||||
|
@ -65,9 +64,9 @@ class MessagePacker:
|
|||
after_id=state.after.msg_id if state.after else None
|
||||
)
|
||||
batch.append(state)
|
||||
__log__.debug('Assigned msg_id = %d to %s (%x)',
|
||||
state.msg_id, state.request.__class__.__name__,
|
||||
id(state.request))
|
||||
self._log.debug('Assigned msg_id = %d to %s (%x)',
|
||||
state.msg_id, state.request.__class__.__name__,
|
||||
id(state.request))
|
||||
continue
|
||||
|
||||
# Put the item back since it can't be sent in this batch
|
||||
|
|
|
@ -1,13 +1,10 @@
|
|||
import abc
|
||||
import asyncio
|
||||
import logging
|
||||
import socket
|
||||
import ssl as ssl_mod
|
||||
|
||||
from ...errors import InvalidChecksumError
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Connection(abc.ABC):
|
||||
"""
|
||||
|
@ -21,10 +18,11 @@ class Connection(abc.ABC):
|
|||
``ConnectionError``, which will raise when attempting to send if
|
||||
the client is disconnected (includes remote disconnections).
|
||||
"""
|
||||
def __init__(self, ip, port, *, loop, proxy=None):
|
||||
def __init__(self, ip, port, *, loop, loggers, proxy=None):
|
||||
self._ip = ip
|
||||
self._port = port
|
||||
self._loop = loop
|
||||
self._log = loggers[__name__]
|
||||
self._proxy = proxy
|
||||
self._reader = None
|
||||
self._writer = None
|
||||
|
@ -156,13 +154,13 @@ class Connection(abc.ABC):
|
|||
except Exception as e:
|
||||
if isinstance(e, (ConnectionError, asyncio.IncompleteReadError)):
|
||||
msg = 'The server closed the connection'
|
||||
__log__.info(msg)
|
||||
self._log.info(msg)
|
||||
elif isinstance(e, InvalidChecksumError):
|
||||
msg = 'The server response had an invalid checksum'
|
||||
__log__.info(msg)
|
||||
self._log.info(msg)
|
||||
else:
|
||||
msg = 'Unexpected exception in the receive loop'
|
||||
__log__.exception(msg)
|
||||
self._log.exception(msg)
|
||||
|
||||
self.disconnect()
|
||||
|
||||
|
|
|
@ -10,8 +10,8 @@ class ConnectionTcpFull(Connection):
|
|||
Default Telegram mode. Sends 12 additional bytes and
|
||||
needs to calculate the CRC value of the packet itself.
|
||||
"""
|
||||
def __init__(self, ip, port, *, loop, proxy=None):
|
||||
super().__init__(ip, port, loop=loop, proxy=proxy)
|
||||
def __init__(self, ip, port, *, loop, loggers, proxy=None):
|
||||
super().__init__(ip, port, loop=loop, loggers=loggers, proxy=proxy)
|
||||
self._send_counter = 0
|
||||
|
||||
async def connect(self, timeout=None, ssl=None):
|
||||
|
|
|
@ -10,8 +10,8 @@ class ConnectionTcpObfuscated(ConnectionTcpAbridged):
|
|||
every message with a randomly generated key using the
|
||||
AES-CTR mode so the packets are harder to discern.
|
||||
"""
|
||||
def __init__(self, ip, port, *, loop, proxy=None):
|
||||
super().__init__(ip, port, loop=loop, proxy=proxy)
|
||||
def __init__(self, ip, port, *, loop, loggers, proxy=None):
|
||||
super().__init__(ip, port, loop=loop, loggers=loggers, proxy=proxy)
|
||||
self._aes_encrypt = None
|
||||
self._aes_decrypt = None
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
import asyncio
|
||||
import collections
|
||||
import functools
|
||||
import logging
|
||||
|
||||
from . import authenticator
|
||||
from ..extensions.messagepacker import MessagePacker
|
||||
|
@ -24,8 +23,6 @@ from ..tl.types import (
|
|||
)
|
||||
from ..crypto import AuthKey
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _cancellable(func):
|
||||
"""
|
||||
|
@ -59,12 +56,14 @@ class MTProtoSender:
|
|||
A new authorization key will be generated on connection if no other
|
||||
key exists yet.
|
||||
"""
|
||||
def __init__(self, auth_key, loop, *,
|
||||
def __init__(self, auth_key, loop, *, loggers,
|
||||
retries=5, delay=1, auto_reconnect=True, connect_timeout=None,
|
||||
auth_key_callback=None,
|
||||
update_callback=None, auto_reconnect_callback=None):
|
||||
self._connection = None
|
||||
self._loop = loop
|
||||
self._loggers = loggers
|
||||
self._log = loggers[__name__]
|
||||
self._retries = retries
|
||||
self._delay = delay
|
||||
self._auto_reconnect = auto_reconnect
|
||||
|
@ -90,11 +89,12 @@ class MTProtoSender:
|
|||
|
||||
# Preserving the references of the AuthKey and state is important
|
||||
self.auth_key = auth_key or AuthKey(None)
|
||||
self._state = MTProtoState(self.auth_key)
|
||||
self._state = MTProtoState(self.auth_key, loggers=self._loggers)
|
||||
|
||||
# Outgoing messages are put in a queue and sent in a batch.
|
||||
# Note that here we're also storing their ``_RequestState``.
|
||||
self._send_queue = MessagePacker(self._state, self._loop)
|
||||
self._send_queue = MessagePacker(self._state, self._loop,
|
||||
loggers=self._loggers)
|
||||
|
||||
# Sent states are remembered until a response is received.
|
||||
self._pending_state = {}
|
||||
|
@ -132,7 +132,7 @@ class MTProtoSender:
|
|||
Connects to the specified given connection using the given auth key.
|
||||
"""
|
||||
if self._user_connected:
|
||||
__log__.info('User is already connected!')
|
||||
self._log.info('User is already connected!')
|
||||
return
|
||||
|
||||
self._connection = connection
|
||||
|
@ -210,13 +210,13 @@ class MTProtoSender:
|
|||
authorization key if necessary, and starting the send and
|
||||
receive loops.
|
||||
"""
|
||||
__log__.info('Connecting to %s...', self._connection)
|
||||
self._log.info('Connecting to %s...', self._connection)
|
||||
for retry in range(1, self._retries + 1):
|
||||
try:
|
||||
__log__.debug('Connection attempt {}...'.format(retry))
|
||||
self._log.debug('Connection attempt {}...'.format(retry))
|
||||
await self._connection.connect(timeout=self._connect_timeout)
|
||||
except (ConnectionError, asyncio.TimeoutError) as e:
|
||||
__log__.warning('Attempt {} at connecting failed: {}: {}'
|
||||
self._log.warning('Attempt {} at connecting failed: {}: {}'
|
||||
.format(retry, type(e).__name__, e))
|
||||
await asyncio.sleep(self._delay)
|
||||
else:
|
||||
|
@ -225,12 +225,12 @@ class MTProtoSender:
|
|||
raise ConnectionError('Connection to Telegram failed {} times'
|
||||
.format(self._retries))
|
||||
|
||||
__log__.debug('Connection success!')
|
||||
self._log.debug('Connection success!')
|
||||
if not self.auth_key:
|
||||
plain = MTProtoPlainSender(self._connection)
|
||||
for retry in range(1, self._retries + 1):
|
||||
try:
|
||||
__log__.debug('New auth_key attempt {}...'.format(retry))
|
||||
self._log.debug('New auth_key attempt {}...'.format(retry))
|
||||
self.auth_key.key, self._state.time_offset =\
|
||||
await authenticator.do_authentication(plain)
|
||||
|
||||
|
@ -243,7 +243,7 @@ class MTProtoSender:
|
|||
|
||||
break
|
||||
except (SecurityError, AssertionError) as e:
|
||||
__log__.warning('Attempt {} at new auth_key failed: {}'
|
||||
self._log.warning('Attempt {} at new auth_key failed: {}'
|
||||
.format(retry, e))
|
||||
await asyncio.sleep(self._delay)
|
||||
else:
|
||||
|
@ -252,10 +252,10 @@ class MTProtoSender:
|
|||
self._disconnect(error=e)
|
||||
raise e
|
||||
|
||||
__log__.debug('Starting send loop')
|
||||
self._log.debug('Starting send loop')
|
||||
self._send_loop_handle = self._loop.create_task(self._send_loop())
|
||||
|
||||
__log__.debug('Starting receive loop')
|
||||
self._log.debug('Starting receive loop')
|
||||
self._recv_loop_handle = self._loop.create_task(self._recv_loop())
|
||||
|
||||
# _disconnected only completes after manual disconnection
|
||||
|
@ -264,16 +264,16 @@ class MTProtoSender:
|
|||
if self._disconnected.done():
|
||||
self._disconnected = self._loop.create_future()
|
||||
|
||||
__log__.info('Connection to %s complete!', self._connection)
|
||||
self._log.info('Connection to %s complete!', self._connection)
|
||||
|
||||
def _disconnect(self, error=None):
|
||||
__log__.info('Disconnecting from %s...', self._connection)
|
||||
self._log.info('Disconnecting from %s...', self._connection)
|
||||
self._user_connected = False
|
||||
try:
|
||||
__log__.debug('Closing current connection...')
|
||||
self._log.debug('Closing current connection...')
|
||||
self._connection.disconnect()
|
||||
finally:
|
||||
__log__.debug('Cancelling {} pending message(s)...'
|
||||
self._log.debug('Cancelling {} pending message(s)...'
|
||||
.format(len(self._pending_state)))
|
||||
for state in self._pending_state.values():
|
||||
if error and not state.future.done():
|
||||
|
@ -286,14 +286,14 @@ class MTProtoSender:
|
|||
self._last_ack = None
|
||||
|
||||
if self._send_loop_handle:
|
||||
__log__.debug('Cancelling the send loop...')
|
||||
self._log.debug('Cancelling the send loop...')
|
||||
self._send_loop_handle.cancel()
|
||||
|
||||
if self._recv_loop_handle:
|
||||
__log__.debug('Cancelling the receive loop...')
|
||||
self._log.debug('Cancelling the receive loop...')
|
||||
self._recv_loop_handle.cancel()
|
||||
|
||||
__log__.info('Disconnection from %s complete!', self._connection)
|
||||
self._log.info('Disconnection from %s complete!', self._connection)
|
||||
if self._disconnected and not self._disconnected.done():
|
||||
if error:
|
||||
self._disconnected.set_exception(error)
|
||||
|
@ -306,13 +306,13 @@ class MTProtoSender:
|
|||
"""
|
||||
self._reconnecting = True
|
||||
|
||||
__log__.debug('Closing current connection...')
|
||||
self._log.debug('Closing current connection...')
|
||||
self._connection.disconnect()
|
||||
|
||||
__log__.debug('Cancelling the send loop...')
|
||||
self._log.debug('Cancelling the send loop...')
|
||||
self._send_loop_handle.cancel()
|
||||
|
||||
__log__.debug('Cancelling the receive loop...')
|
||||
self._log.debug('Cancelling the receive loop...')
|
||||
self._recv_loop_handle.cancel()
|
||||
|
||||
self._reconnecting = False
|
||||
|
@ -325,12 +325,12 @@ class MTProtoSender:
|
|||
try:
|
||||
await self._connect()
|
||||
except (ConnectionError, asyncio.TimeoutError) as e:
|
||||
__log__.info('Failed reconnection retry %d/%d with %s',
|
||||
self._log.info('Failed reconnection retry %d/%d with %s',
|
||||
retry, retries, e.__class__.__name__)
|
||||
|
||||
await asyncio.sleep(self._delay)
|
||||
except Exception:
|
||||
__log__.exception('Unexpected exception reconnecting on '
|
||||
self._log.exception('Unexpected exception reconnecting on '
|
||||
'retry %d/%d', retry, retries)
|
||||
|
||||
await asyncio.sleep(self._delay)
|
||||
|
@ -343,7 +343,7 @@ class MTProtoSender:
|
|||
|
||||
break
|
||||
else:
|
||||
__log__.error('Failed to reconnect automatically.')
|
||||
self._log.error('Failed to reconnect automatically.')
|
||||
self._disconnect(error=ConnectionError())
|
||||
|
||||
def _start_reconnect(self):
|
||||
|
@ -368,7 +368,7 @@ class MTProtoSender:
|
|||
self._last_acks.append(ack)
|
||||
self._pending_ack.clear()
|
||||
|
||||
__log__.debug('Waiting for messages to send...')
|
||||
self._log.debug('Waiting for messages to send...')
|
||||
# TODO Wait for the connection send queue to be empty?
|
||||
# This means that while it's not empty we can wait for
|
||||
# more messages to be added to the send queue.
|
||||
|
@ -377,14 +377,14 @@ class MTProtoSender:
|
|||
if not data:
|
||||
continue
|
||||
|
||||
__log__.debug('Encrypting %d message(s) in %d bytes for sending',
|
||||
self._log.debug('Encrypting %d message(s) in %d bytes for sending',
|
||||
len(batch), len(data))
|
||||
|
||||
data = self._state.encrypt_message_data(data)
|
||||
try:
|
||||
await self._connection.send(data)
|
||||
except ConnectionError:
|
||||
__log__.info('Connection closed while sending data')
|
||||
self._log.info('Connection closed while sending data')
|
||||
self._start_reconnect()
|
||||
return
|
||||
|
||||
|
@ -397,7 +397,7 @@ class MTProtoSender:
|
|||
if isinstance(s.request, TLRequest):
|
||||
self._pending_state[s.msg_id] = s
|
||||
|
||||
__log__.debug('Encrypted messages put in a queue to be sent')
|
||||
self._log.debug('Encrypted messages put in a queue to be sent')
|
||||
|
||||
@_cancellable
|
||||
async def _recv_loop(self):
|
||||
|
@ -408,11 +408,11 @@ class MTProtoSender:
|
|||
Besides `connect`, only this method ever receives data.
|
||||
"""
|
||||
while self._user_connected and not self._reconnecting:
|
||||
__log__.debug('Receiving items from the network...')
|
||||
self._log.debug('Receiving items from the network...')
|
||||
try:
|
||||
body = await self._connection.recv()
|
||||
except ConnectionError:
|
||||
__log__.info('Connection closed while receiving data')
|
||||
self._log.info('Connection closed while receiving data')
|
||||
self._start_reconnect()
|
||||
return
|
||||
|
||||
|
@ -420,20 +420,20 @@ class MTProtoSender:
|
|||
message = self._state.decrypt_message_data(body)
|
||||
except TypeNotFoundError as e:
|
||||
# Received object which we don't know how to deserialize
|
||||
__log__.info('Type %08x not found, remaining data %r',
|
||||
self._log.info('Type %08x not found, remaining data %r',
|
||||
e.invalid_constructor_id, e.remaining)
|
||||
continue
|
||||
except SecurityError as e:
|
||||
# A step while decoding had the incorrect data. This message
|
||||
# should not be considered safe and it should be ignored.
|
||||
__log__.warning('Security error while unpacking a '
|
||||
self._log.warning('Security error while unpacking a '
|
||||
'received message: %s', e)
|
||||
continue
|
||||
except BufferError as e:
|
||||
if isinstance(e, InvalidBufferError) and e.code == 404:
|
||||
__log__.info('Broken authorization key; resetting')
|
||||
self._log.info('Broken authorization key; resetting')
|
||||
else:
|
||||
__log__.warning('Invalid buffer %s', e)
|
||||
self._log.warning('Invalid buffer %s', e)
|
||||
|
||||
self.auth_key.key = None
|
||||
if self._auth_key_callback:
|
||||
|
@ -442,14 +442,14 @@ class MTProtoSender:
|
|||
self._start_reconnect()
|
||||
return
|
||||
except Exception:
|
||||
__log__.exception('Unhandled error while receiving data')
|
||||
self._log.exception('Unhandled error while receiving data')
|
||||
self._start_reconnect()
|
||||
return
|
||||
|
||||
try:
|
||||
await self._process_message(message)
|
||||
except Exception:
|
||||
__log__.exception('Unhandled error while processing msgs')
|
||||
self._log.exception('Unhandled error while processing msgs')
|
||||
|
||||
# Response Handlers
|
||||
|
||||
|
@ -498,7 +498,7 @@ class MTProtoSender:
|
|||
"""
|
||||
rpc_result = message.obj
|
||||
state = self._pending_state.pop(rpc_result.req_msg_id, None)
|
||||
__log__.debug('Handling RPC result for message %d',
|
||||
self._log.debug('Handling RPC result for message %d',
|
||||
rpc_result.req_msg_id)
|
||||
|
||||
if not state:
|
||||
|
@ -511,7 +511,7 @@ class MTProtoSender:
|
|||
if not isinstance(reader.tgread_object(), upload.File):
|
||||
raise ValueError('Not an upload.File')
|
||||
except (TypeNotFoundError, ValueError):
|
||||
__log__.info('Received response without parent request: {}'
|
||||
self._log.info('Received response without parent request: {}'
|
||||
.format(rpc_result.body))
|
||||
return
|
||||
|
||||
|
@ -535,7 +535,7 @@ class MTProtoSender:
|
|||
|
||||
msg_container#73f1f8dc messages:vector<%Message> = MessageContainer;
|
||||
"""
|
||||
__log__.debug('Handling container')
|
||||
self._log.debug('Handling container')
|
||||
for inner_message in message.obj.messages:
|
||||
await self._process_message(inner_message)
|
||||
|
||||
|
@ -545,13 +545,13 @@ class MTProtoSender:
|
|||
|
||||
gzip_packed#3072cfa1 packed_data:bytes = Object;
|
||||
"""
|
||||
__log__.debug('Handling gzipped data')
|
||||
self._log.debug('Handling gzipped data')
|
||||
with BinaryReader(message.obj.data) as reader:
|
||||
message.obj = reader.tgread_object()
|
||||
await self._process_message(message)
|
||||
|
||||
async def _handle_update(self, message):
|
||||
__log__.debug('Handling update {}'
|
||||
self._log.debug('Handling update {}'
|
||||
.format(message.obj.__class__.__name__))
|
||||
if self._update_callback:
|
||||
await self._update_callback(message.obj)
|
||||
|
@ -564,7 +564,7 @@ class MTProtoSender:
|
|||
pong#347773c5 msg_id:long ping_id:long = Pong;
|
||||
"""
|
||||
pong = message.obj
|
||||
__log__.debug('Handling pong for message %d', pong.msg_id)
|
||||
self._log.debug('Handling pong for message %d', pong.msg_id)
|
||||
state = self._pending_state.pop(pong.msg_id, None)
|
||||
if state:
|
||||
state.future.set_result(pong)
|
||||
|
@ -578,12 +578,12 @@ class MTProtoSender:
|
|||
error_code:int new_server_salt:long = BadMsgNotification;
|
||||
"""
|
||||
bad_salt = message.obj
|
||||
__log__.debug('Handling bad salt for message %d', bad_salt.bad_msg_id)
|
||||
self._log.debug('Handling bad salt for message %d', bad_salt.bad_msg_id)
|
||||
self._state.salt = bad_salt.new_server_salt
|
||||
states = self._pop_states(bad_salt.bad_msg_id)
|
||||
self._send_queue.extend(states)
|
||||
|
||||
__log__.debug('%d message(s) will be resent', len(states))
|
||||
self._log.debug('%d message(s) will be resent', len(states))
|
||||
|
||||
async def _handle_bad_notification(self, message):
|
||||
"""
|
||||
|
@ -596,13 +596,13 @@ class MTProtoSender:
|
|||
bad_msg = message.obj
|
||||
states = self._pop_states(bad_msg.bad_msg_id)
|
||||
|
||||
__log__.debug('Handling bad msg %s', bad_msg)
|
||||
self._log.debug('Handling bad msg %s', bad_msg)
|
||||
if bad_msg.error_code in (16, 17):
|
||||
# Sent msg_id too low or too high (respectively).
|
||||
# Use the current msg_id to determine the right time offset.
|
||||
to = self._state.update_time_offset(
|
||||
correct_msg_id=message.msg_id)
|
||||
__log__.info('System clock is wrong, set time offset to %ds', to)
|
||||
self._log.info('System clock is wrong, set time offset to %ds', to)
|
||||
elif bad_msg.error_code == 32:
|
||||
# msg_seqno too low, so just pump it up by some "large" amount
|
||||
# TODO A better fix would be to start with a new fresh session ID
|
||||
|
@ -618,7 +618,8 @@ class MTProtoSender:
|
|||
|
||||
# Messages are to be re-sent once we've corrected the issue
|
||||
self._send_queue.extend(states)
|
||||
__log__.debug('%d messages will be resent due to bad msg', len(states))
|
||||
self._log.debug('%d messages will be resent due to bad msg',
|
||||
len(states))
|
||||
|
||||
async def _handle_detailed_info(self, message):
|
||||
"""
|
||||
|
@ -629,7 +630,7 @@ class MTProtoSender:
|
|||
"""
|
||||
# TODO https://goo.gl/VvpCC6
|
||||
msg_id = message.obj.answer_msg_id
|
||||
__log__.debug('Handling detailed info for message %d', msg_id)
|
||||
self._log.debug('Handling detailed info for message %d', msg_id)
|
||||
self._pending_ack.add(msg_id)
|
||||
|
||||
async def _handle_new_detailed_info(self, message):
|
||||
|
@ -641,7 +642,7 @@ class MTProtoSender:
|
|||
"""
|
||||
# TODO https://goo.gl/G7DPsR
|
||||
msg_id = message.obj.answer_msg_id
|
||||
__log__.debug('Handling new detailed info for message %d', msg_id)
|
||||
self._log.debug('Handling new detailed info for message %d', msg_id)
|
||||
self._pending_ack.add(msg_id)
|
||||
|
||||
async def _handle_new_session_created(self, message):
|
||||
|
@ -652,7 +653,7 @@ class MTProtoSender:
|
|||
server_salt:long = NewSession;
|
||||
"""
|
||||
# TODO https://goo.gl/LMyN7A
|
||||
__log__.debug('Handling new session created')
|
||||
self._log.debug('Handling new session created')
|
||||
self._state.salt = message.obj.server_salt
|
||||
|
||||
async def _handle_ack(self, message):
|
||||
|
@ -671,7 +672,7 @@ class MTProtoSender:
|
|||
messages are acknowledged.
|
||||
"""
|
||||
ack = message.obj
|
||||
__log__.debug('Handling acknowledge for %s', str(ack.msg_ids))
|
||||
self._log.debug('Handling acknowledge for %s', str(ack.msg_ids))
|
||||
for msg_id in ack.msg_ids:
|
||||
state = self._pending_state.get(msg_id)
|
||||
if state and isinstance(state.request, LogOutRequest):
|
||||
|
@ -688,7 +689,7 @@ class MTProtoSender:
|
|||
"""
|
||||
# TODO save these salts and automatically adjust to the
|
||||
# correct one whenever the salt in use expires.
|
||||
__log__.debug('Handling future salts for message %d', message.msg_id)
|
||||
self._log.debug('Handling future salts for message %d', message.msg_id)
|
||||
state = self._pending_state.pop(message.msg_id, None)
|
||||
if state:
|
||||
state.future.set_result(message.obj)
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import logging
|
||||
import os
|
||||
import struct
|
||||
import time
|
||||
|
@ -11,8 +10,6 @@ from ..tl.core import TLMessage
|
|||
from ..tl.functions import InvokeAfterMsgRequest
|
||||
from ..tl.core.gzippacked import GzipPacked
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MTProtoState:
|
||||
"""
|
||||
|
@ -37,8 +34,9 @@ class MTProtoState:
|
|||
many methods that would be needed to make it convenient to use for the
|
||||
authentication process, at which point the `MTProtoPlainSender` is better.
|
||||
"""
|
||||
def __init__(self, auth_key):
|
||||
def __init__(self, auth_key, loggers):
|
||||
self.auth_key = auth_key
|
||||
self._log = loggers[__name__]
|
||||
self.time_offset = 0
|
||||
self.salt = 0
|
||||
self.reset()
|
||||
|
@ -183,7 +181,7 @@ class MTProtoState:
|
|||
|
||||
if self.time_offset != old:
|
||||
self._last_msg_id = 0
|
||||
__log__.debug(
|
||||
self._log.debug(
|
||||
'Updated time offset (old offset %d, bad %d, good %d, new %d)',
|
||||
old, bad, correct_msg_id, self.time_offset
|
||||
)
|
||||
|
|
|
@ -1,11 +1,6 @@
|
|||
import logging
|
||||
import struct
|
||||
|
||||
from .tlmessage import TLMessage
|
||||
from ..tlobject import TLObject
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MessageContainer(TLObject):
|
||||
CONSTRUCTOR_ID = 0x73f1f8dc
|
||||
|
|
|
@ -1,9 +1,5 @@
|
|||
import logging
|
||||
|
||||
from .. import TLObject
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TLMessage(TLObject):
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue
Block a user