mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-10 19:46:36 +03:00
Make logger fully configurable (#1087)
This commit is contained in:
parent
eda4178333
commit
f271316d7d
|
@ -1,12 +1,9 @@
|
||||||
import logging
|
|
||||||
from .client.telegramclient import TelegramClient
|
from .client.telegramclient import TelegramClient
|
||||||
from .network import connection
|
from .network import connection
|
||||||
from .tl import types, functions, custom
|
from .tl import types, functions, custom
|
||||||
from . import version, events, utils, errors
|
from . import version, events, utils, errors
|
||||||
|
|
||||||
|
|
||||||
__version__ = version.__version__
|
__version__ = version.__version__
|
||||||
logging.getLogger(__name__).addHandler(logging.NullHandler())
|
|
||||||
|
|
||||||
__all__ = ['TelegramClient', 'types', 'functions', 'custom',
|
__all__ = ['TelegramClient', 'types', 'functions', 'custom',
|
||||||
'events', 'utils', 'errors']
|
'events', 'utils', 'errors']
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
import datetime
|
import datetime
|
||||||
import io
|
import io
|
||||||
import logging
|
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
|
|
||||||
|
@ -14,9 +13,6 @@ except ImportError:
|
||||||
aiohttp = None
|
aiohttp = None
|
||||||
|
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class DownloadMethods(UserMethods):
|
class DownloadMethods(UserMethods):
|
||||||
|
|
||||||
# region Public methods
|
# region Public methods
|
||||||
|
@ -239,7 +235,8 @@ class DownloadMethods(UserMethods):
|
||||||
# The used sender will also change if ``FileMigrateError`` occurs
|
# The used sender will also change if ``FileMigrateError`` occurs
|
||||||
sender = self._sender
|
sender = self._sender
|
||||||
|
|
||||||
__log__.info('Downloading file in chunks of %d bytes', part_size)
|
self._log[__name__].info('Downloading file in chunks of %d bytes',
|
||||||
|
part_size)
|
||||||
try:
|
try:
|
||||||
offset = 0
|
offset = 0
|
||||||
while True:
|
while True:
|
||||||
|
@ -251,7 +248,7 @@ class DownloadMethods(UserMethods):
|
||||||
# TODO Implement
|
# TODO Implement
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
except errors.FileMigrateError as e:
|
except errors.FileMigrateError as e:
|
||||||
__log__.info('File lives in another DC')
|
self._log[__name__].info('File lives in another DC')
|
||||||
sender = await self._borrow_exported_sender(e.new_dc)
|
sender = await self._borrow_exported_sender(e.new_dc)
|
||||||
exported = True
|
exported = True
|
||||||
continue
|
continue
|
||||||
|
@ -264,7 +261,8 @@ class DownloadMethods(UserMethods):
|
||||||
else:
|
else:
|
||||||
return getattr(result, 'type', '')
|
return getattr(result, 'type', '')
|
||||||
|
|
||||||
__log__.debug('Saving %d more bytes', len(result.bytes))
|
self._log[__name__].debug('Saving %d more bytes',
|
||||||
|
len(result.bytes))
|
||||||
f.write(result.bytes)
|
f.write(result.bytes)
|
||||||
if progress_callback:
|
if progress_callback:
|
||||||
progress_callback(f.tell(), file_size)
|
progress_callback(f.tell(), file_size)
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from async_generator import async_generator, yield_
|
from async_generator import async_generator, yield_
|
||||||
|
@ -11,8 +10,6 @@ from .buttons import ButtonMethods
|
||||||
from .. import helpers, utils, errors
|
from .. import helpers, utils, errors
|
||||||
from ..tl import types, functions
|
from ..tl import types, functions
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
|
class MessageMethods(UploadMethods, ButtonMethods, MessageParseMethods):
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ import sys
|
||||||
import time
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
from .. import version
|
from .. import version, __name__ as __base_name__
|
||||||
from ..crypto import rsa
|
from ..crypto import rsa
|
||||||
from ..extensions import markdown
|
from ..extensions import markdown
|
||||||
from ..network import MTProtoSender, ConnectionTcpFull
|
from ..network import MTProtoSender, ConnectionTcpFull
|
||||||
|
@ -19,7 +19,8 @@ DEFAULT_IPV4_IP = '149.154.167.51'
|
||||||
DEFAULT_IPV6_IP = '[2001:67c:4e8:f002::a]'
|
DEFAULT_IPV6_IP = '[2001:67c:4e8:f002::a]'
|
||||||
DEFAULT_PORT = 443
|
DEFAULT_PORT = 443
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
__default_log__ = logging.getLogger(__base_name__)
|
||||||
|
__default_log__.addHandler(logging.NullHandler())
|
||||||
|
|
||||||
|
|
||||||
class TelegramBaseClient(abc.ABC):
|
class TelegramBaseClient(abc.ABC):
|
||||||
|
@ -133,6 +134,15 @@ class TelegramBaseClient(abc.ABC):
|
||||||
system_lang_code (`str`, optional):
|
system_lang_code (`str`, optional):
|
||||||
"System lang code" to be sent when creating the initial connection.
|
"System lang code" to be sent when creating the initial connection.
|
||||||
Defaults to `lang_code`.
|
Defaults to `lang_code`.
|
||||||
|
|
||||||
|
loop (`asyncio.AbstractEventLoop`, optional):
|
||||||
|
Asyncio event loop to use. Defaults to `asyncio.get_event_loop()`
|
||||||
|
|
||||||
|
base_logger (`str` | `logging.Logger`, optional):
|
||||||
|
Base logger name or instance to use.
|
||||||
|
If a `str` is given, it'll be passed to `logging.getLogger()`. If a
|
||||||
|
`logging.Logger` is given, it'll be used directly. If something
|
||||||
|
else or nothing is given, the default logger will be used.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Current TelegramClient version
|
# Current TelegramClient version
|
||||||
|
@ -161,7 +171,8 @@ class TelegramBaseClient(abc.ABC):
|
||||||
app_version=None,
|
app_version=None,
|
||||||
lang_code='en',
|
lang_code='en',
|
||||||
system_lang_code='en',
|
system_lang_code='en',
|
||||||
loop=None):
|
loop=None,
|
||||||
|
base_logger=None):
|
||||||
if not api_id or not api_hash:
|
if not api_id or not api_hash:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Your API ID or Hash cannot be empty or None. "
|
"Your API ID or Hash cannot be empty or None. "
|
||||||
|
@ -170,6 +181,19 @@ class TelegramBaseClient(abc.ABC):
|
||||||
self._use_ipv6 = use_ipv6
|
self._use_ipv6 = use_ipv6
|
||||||
self._loop = loop or asyncio.get_event_loop()
|
self._loop = loop or asyncio.get_event_loop()
|
||||||
|
|
||||||
|
if isinstance(base_logger, str):
|
||||||
|
base_logger = logging.getLogger(base_logger)
|
||||||
|
elif not isinstance(base_logger, logging.Logger):
|
||||||
|
base_logger = __default_log__
|
||||||
|
|
||||||
|
class _Loggers(dict):
|
||||||
|
def __missing__(self, key):
|
||||||
|
if key.startswith("telethon."):
|
||||||
|
key = key[len("telethon."):]
|
||||||
|
return base_logger.getChild(key)
|
||||||
|
|
||||||
|
self._log = _Loggers()
|
||||||
|
|
||||||
# Determine what session object we have
|
# Determine what session object we have
|
||||||
if isinstance(session, str) or session is None:
|
if isinstance(session, str) or session is None:
|
||||||
try:
|
try:
|
||||||
|
@ -240,6 +264,7 @@ class TelegramBaseClient(abc.ABC):
|
||||||
self._connection = connection
|
self._connection = connection
|
||||||
self._sender = MTProtoSender(
|
self._sender = MTProtoSender(
|
||||||
self.session.auth_key, self._loop,
|
self.session.auth_key, self._loop,
|
||||||
|
loggers=self._log,
|
||||||
retries=self._connection_retries,
|
retries=self._connection_retries,
|
||||||
delay=self._retry_delay,
|
delay=self._retry_delay,
|
||||||
auto_reconnect=self._auto_reconnect,
|
auto_reconnect=self._auto_reconnect,
|
||||||
|
@ -317,7 +342,8 @@ class TelegramBaseClient(abc.ABC):
|
||||||
"""
|
"""
|
||||||
await self._sender.connect(self._connection(
|
await self._sender.connect(self._connection(
|
||||||
self.session.server_address, self.session.port,
|
self.session.server_address, self.session.port,
|
||||||
loop=self._loop, proxy=self._proxy
|
loop=self._loop, loggers=self._log,
|
||||||
|
proxy=self._proxy
|
||||||
))
|
))
|
||||||
self.session.auth_key = self._sender.auth_key
|
self.session.auth_key = self._sender.auth_key
|
||||||
self.session.save()
|
self.session.save()
|
||||||
|
@ -387,7 +413,7 @@ class TelegramBaseClient(abc.ABC):
|
||||||
"""
|
"""
|
||||||
Permanently switches the current connection to the new data center.
|
Permanently switches the current connection to the new data center.
|
||||||
"""
|
"""
|
||||||
__log__.info('Reconnecting to new data center %s', new_dc)
|
self._log[__name__].info('Reconnecting to new data center %s', new_dc)
|
||||||
dc = await self._get_dc(new_dc)
|
dc = await self._get_dc(new_dc)
|
||||||
|
|
||||||
self.session.set_dc(dc.id, dc.ip_address, dc.port)
|
self.session.set_dc(dc.id, dc.ip_address, dc.port)
|
||||||
|
@ -443,7 +469,8 @@ class TelegramBaseClient(abc.ABC):
|
||||||
sender = MTProtoSender(None, self._loop)
|
sender = MTProtoSender(None, self._loop)
|
||||||
await sender.connect(self._connection(
|
await sender.connect(self._connection(
|
||||||
dc.ip_address, dc.port, loop=self._loop, proxy=self._proxy))
|
dc.ip_address, dc.port, loop=self._loop, proxy=self._proxy))
|
||||||
__log__.info('Exporting authorization for data center %s', dc)
|
self._log[__name__].info('Exporting authorization for data center %s',
|
||||||
|
dc)
|
||||||
auth = await self(functions.auth.ExportAuthorizationRequest(dc_id))
|
auth = await self(functions.auth.ExportAuthorizationRequest(dc_id))
|
||||||
req = self._init_with(functions.auth.ImportAuthorizationRequest(
|
req = self._init_with(functions.auth.ImportAuthorizationRequest(
|
||||||
id=auth.id, bytes=auth.bytes
|
id=auth.id, bytes=auth.bytes
|
||||||
|
@ -486,7 +513,8 @@ class TelegramBaseClient(abc.ABC):
|
||||||
n -= 1
|
n -= 1
|
||||||
self._borrowed_senders[dc_id] = (n, sender)
|
self._borrowed_senders[dc_id] = (n, sender)
|
||||||
if not n:
|
if not n:
|
||||||
__log__.info('Disconnecting borrowed sender for DC %d', dc_id)
|
self._log[__name__].info(
|
||||||
|
'Disconnecting borrowed sender for DC %d', dc_id)
|
||||||
sender.disconnect()
|
sender.disconnect()
|
||||||
|
|
||||||
async def _get_cdn_client(self, cdn_redirect):
|
async def _get_cdn_client(self, cdn_redirect):
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import inspect
|
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
|
||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -9,8 +7,6 @@ from .users import UserMethods
|
||||||
from .. import events, utils, errors
|
from .. import events, utils, errors
|
||||||
from ..tl import types, functions
|
from ..tl import types, functions
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class UpdateMethods(UserMethods):
|
class UpdateMethods(UserMethods):
|
||||||
|
|
||||||
|
@ -281,28 +277,31 @@ class UpdateMethods(UserMethods):
|
||||||
await callback(event)
|
await callback(event)
|
||||||
except errors.AlreadyInConversationError:
|
except errors.AlreadyInConversationError:
|
||||||
name = getattr(callback, '__name__', repr(callback))
|
name = getattr(callback, '__name__', repr(callback))
|
||||||
__log__.debug('Event handler "%s" already has an open '
|
self._log[__name__].debug(
|
||||||
'conversation, ignoring new one', name)
|
'Event handler "%s" already has an open conversation, '
|
||||||
|
'ignoring new one', name)
|
||||||
except events.StopPropagation:
|
except events.StopPropagation:
|
||||||
name = getattr(callback, '__name__', repr(callback))
|
name = getattr(callback, '__name__', repr(callback))
|
||||||
__log__.debug(
|
self._log[__name__].debug(
|
||||||
'Event handler "%s" stopped chain of propagation '
|
'Event handler "%s" stopped chain of propagation '
|
||||||
'for event %s.', name, type(event).__name__
|
'for event %s.', name, type(event).__name__
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
except Exception:
|
except Exception:
|
||||||
name = getattr(callback, '__name__', repr(callback))
|
name = getattr(callback, '__name__', repr(callback))
|
||||||
__log__.exception('Unhandled exception on %s', name)
|
self._log[__name__].exception('Unhandled exception on %s',
|
||||||
|
name)
|
||||||
|
|
||||||
async def _handle_auto_reconnect(self):
|
async def _handle_auto_reconnect(self):
|
||||||
# Upon reconnection, we want to send getState
|
# Upon reconnection, we want to send getState
|
||||||
# for Telegram to keep sending us updates.
|
# for Telegram to keep sending us updates.
|
||||||
try:
|
try:
|
||||||
__log__.info('Asking for the current state after reconnect...')
|
self._log[__name__].info(
|
||||||
|
'Asking for the current state after reconnect...')
|
||||||
state = await self(functions.updates.GetStateRequest())
|
state = await self(functions.updates.GetStateRequest())
|
||||||
__log__.info('Got new state! %s', state)
|
self._log[__name__].info('Got new state! %s', state)
|
||||||
except errors.RPCError as e:
|
except errors.RPCError as e:
|
||||||
__log__.info('Failed to get current state: %r', e)
|
self._log[__name__].info('Failed to get current state: %r', e)
|
||||||
|
|
||||||
# endregion
|
# endregion
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
import hashlib
|
import hashlib
|
||||||
import io
|
import io
|
||||||
import logging
|
|
||||||
import os
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
import re
|
import re
|
||||||
|
@ -12,8 +11,6 @@ from .users import UserMethods
|
||||||
from .. import utils, helpers
|
from .. import utils, helpers
|
||||||
from ..tl import types, functions, custom
|
from ..tl import types, functions, custom
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class _CacheType:
|
class _CacheType:
|
||||||
"""Like functools.partial but pretends to be the wrapped class."""
|
"""Like functools.partial but pretends to be the wrapped class."""
|
||||||
|
@ -350,7 +347,7 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
|
||||||
return cached
|
return cached
|
||||||
|
|
||||||
part_count = (file_size + part_size - 1) // part_size
|
part_count = (file_size + part_size - 1) // part_size
|
||||||
__log__.info('Uploading file of %d bytes in %d chunks of %d',
|
self._log[__name__].info('Uploading file of %d bytes in %d chunks of %d',
|
||||||
file_size, part_count, part_size)
|
file_size, part_count, part_size)
|
||||||
|
|
||||||
with open(file, 'rb') if isinstance(file, str) else BytesIO(file)\
|
with open(file, 'rb') if isinstance(file, str) else BytesIO(file)\
|
||||||
|
@ -370,8 +367,8 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
|
||||||
|
|
||||||
result = await self(request)
|
result = await self(request)
|
||||||
if result:
|
if result:
|
||||||
__log__.debug('Uploaded %d/%d', part_index + 1,
|
self._log[__name__].debug('Uploaded %d/%d',
|
||||||
part_count)
|
part_index + 1, part_count)
|
||||||
if progress_callback:
|
if progress_callback:
|
||||||
progress_callback(stream.tell(), file_size)
|
progress_callback(stream.tell(), file_size)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from .telegrambaseclient import TelegramBaseClient
|
from .telegrambaseclient import TelegramBaseClient
|
||||||
|
@ -8,7 +7,6 @@ from .. import errors, utils
|
||||||
from ..errors import MultiError, RPCError
|
from ..errors import MultiError, RPCError
|
||||||
from ..tl import TLObject, TLRequest, types, functions
|
from ..tl import TLObject, TLRequest, types, functions
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
|
||||||
_NOT_A_REQUEST = TypeError('You can only invoke requests, not types!')
|
_NOT_A_REQUEST = TypeError('You can only invoke requests, not types!')
|
||||||
|
|
||||||
|
|
||||||
|
@ -27,7 +25,8 @@ class UserMethods(TelegramBaseClient):
|
||||||
if diff <= 3: # Flood waits below 3 seconds are "ignored"
|
if diff <= 3: # Flood waits below 3 seconds are "ignored"
|
||||||
self._flood_waited_requests.pop(r.CONSTRUCTOR_ID, None)
|
self._flood_waited_requests.pop(r.CONSTRUCTOR_ID, None)
|
||||||
elif diff <= self.flood_sleep_threshold:
|
elif diff <= self.flood_sleep_threshold:
|
||||||
__log__.info('Sleeping early for %ds on flood wait', diff)
|
self._log[__name__].info(
|
||||||
|
'Sleeping early for %ds on flood wait', diff)
|
||||||
await asyncio.sleep(diff, loop=self._loop)
|
await asyncio.sleep(diff, loop=self._loop)
|
||||||
self._flood_waited_requests.pop(r.CONSTRUCTOR_ID, None)
|
self._flood_waited_requests.pop(r.CONSTRUCTOR_ID, None)
|
||||||
else:
|
else:
|
||||||
|
@ -61,7 +60,8 @@ class UserMethods(TelegramBaseClient):
|
||||||
self.session.process_entities(result)
|
self.session.process_entities(result)
|
||||||
return result
|
return result
|
||||||
except (errors.ServerError, errors.RpcCallFailError) as e:
|
except (errors.ServerError, errors.RpcCallFailError) as e:
|
||||||
__log__.warning('Telegram is having internal issues %s: %s',
|
self._log[__name__].warning(
|
||||||
|
'Telegram is having internal issues %s: %s',
|
||||||
e.__class__.__name__, e)
|
e.__class__.__name__, e)
|
||||||
except (errors.FloodWaitError, errors.FloodTestPhoneWaitError) as e:
|
except (errors.FloodWaitError, errors.FloodTestPhoneWaitError) as e:
|
||||||
if utils.is_list_like(request):
|
if utils.is_list_like(request):
|
||||||
|
@ -71,13 +71,14 @@ class UserMethods(TelegramBaseClient):
|
||||||
[request.CONSTRUCTOR_ID] = time.time() + e.seconds
|
[request.CONSTRUCTOR_ID] = time.time() + e.seconds
|
||||||
|
|
||||||
if e.seconds <= self.flood_sleep_threshold:
|
if e.seconds <= self.flood_sleep_threshold:
|
||||||
__log__.info('Sleeping for %ds on flood wait', e.seconds)
|
self._log[__name__].info('Sleeping for %ds on flood wait',
|
||||||
|
e.seconds)
|
||||||
await asyncio.sleep(e.seconds, loop=self._loop)
|
await asyncio.sleep(e.seconds, loop=self._loop)
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
except (errors.PhoneMigrateError, errors.NetworkMigrateError,
|
except (errors.PhoneMigrateError, errors.NetworkMigrateError,
|
||||||
errors.UserMigrateError) as e:
|
errors.UserMigrateError) as e:
|
||||||
__log__.info('Phone migrated to %d', e.new_dc)
|
self._log[__name__].info('Phone migrated to %d', e.new_dc)
|
||||||
should_raise = isinstance(e, (
|
should_raise = isinstance(e, (
|
||||||
errors.PhoneMigrateError, errors.NetworkMigrateError
|
errors.PhoneMigrateError, errors.NetworkMigrateError
|
||||||
))
|
))
|
||||||
|
|
|
@ -1,15 +1,12 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import collections
|
import collections
|
||||||
import io
|
import io
|
||||||
import logging
|
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
from ..tl import TLRequest
|
from ..tl import TLRequest
|
||||||
from ..tl.core.messagecontainer import MessageContainer
|
from ..tl.core.messagecontainer import MessageContainer
|
||||||
from ..tl.core.tlmessage import TLMessage
|
from ..tl.core.tlmessage import TLMessage
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class MessagePacker:
|
class MessagePacker:
|
||||||
"""
|
"""
|
||||||
|
@ -24,11 +21,13 @@ class MessagePacker:
|
||||||
encryption and network overhead also is smaller. It's also a central
|
encryption and network overhead also is smaller. It's also a central
|
||||||
point where outgoing requests are put, and where ready-messages are get.
|
point where outgoing requests are put, and where ready-messages are get.
|
||||||
"""
|
"""
|
||||||
def __init__(self, state, loop):
|
|
||||||
|
def __init__(self, state, loop, loggers):
|
||||||
self._state = state
|
self._state = state
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
self._deque = collections.deque()
|
self._deque = collections.deque()
|
||||||
self._ready = asyncio.Event(loop=loop)
|
self._ready = asyncio.Event(loop=loop)
|
||||||
|
self._log = loggers[__name__]
|
||||||
|
|
||||||
def append(self, state):
|
def append(self, state):
|
||||||
self._deque.append(state)
|
self._deque.append(state)
|
||||||
|
@ -65,7 +64,7 @@ class MessagePacker:
|
||||||
after_id=state.after.msg_id if state.after else None
|
after_id=state.after.msg_id if state.after else None
|
||||||
)
|
)
|
||||||
batch.append(state)
|
batch.append(state)
|
||||||
__log__.debug('Assigned msg_id = %d to %s (%x)',
|
self._log.debug('Assigned msg_id = %d to %s (%x)',
|
||||||
state.msg_id, state.request.__class__.__name__,
|
state.msg_id, state.request.__class__.__name__,
|
||||||
id(state.request))
|
id(state.request))
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -1,13 +1,10 @@
|
||||||
import abc
|
import abc
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
|
||||||
import socket
|
import socket
|
||||||
import ssl as ssl_mod
|
import ssl as ssl_mod
|
||||||
|
|
||||||
from ...errors import InvalidChecksumError
|
from ...errors import InvalidChecksumError
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class Connection(abc.ABC):
|
class Connection(abc.ABC):
|
||||||
"""
|
"""
|
||||||
|
@ -21,10 +18,11 @@ class Connection(abc.ABC):
|
||||||
``ConnectionError``, which will raise when attempting to send if
|
``ConnectionError``, which will raise when attempting to send if
|
||||||
the client is disconnected (includes remote disconnections).
|
the client is disconnected (includes remote disconnections).
|
||||||
"""
|
"""
|
||||||
def __init__(self, ip, port, *, loop, proxy=None):
|
def __init__(self, ip, port, *, loop, loggers, proxy=None):
|
||||||
self._ip = ip
|
self._ip = ip
|
||||||
self._port = port
|
self._port = port
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
|
self._log = loggers[__name__]
|
||||||
self._proxy = proxy
|
self._proxy = proxy
|
||||||
self._reader = None
|
self._reader = None
|
||||||
self._writer = None
|
self._writer = None
|
||||||
|
@ -156,13 +154,13 @@ class Connection(abc.ABC):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if isinstance(e, (ConnectionError, asyncio.IncompleteReadError)):
|
if isinstance(e, (ConnectionError, asyncio.IncompleteReadError)):
|
||||||
msg = 'The server closed the connection'
|
msg = 'The server closed the connection'
|
||||||
__log__.info(msg)
|
self._log.info(msg)
|
||||||
elif isinstance(e, InvalidChecksumError):
|
elif isinstance(e, InvalidChecksumError):
|
||||||
msg = 'The server response had an invalid checksum'
|
msg = 'The server response had an invalid checksum'
|
||||||
__log__.info(msg)
|
self._log.info(msg)
|
||||||
else:
|
else:
|
||||||
msg = 'Unexpected exception in the receive loop'
|
msg = 'Unexpected exception in the receive loop'
|
||||||
__log__.exception(msg)
|
self._log.exception(msg)
|
||||||
|
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
|
|
||||||
|
|
|
@ -10,8 +10,8 @@ class ConnectionTcpFull(Connection):
|
||||||
Default Telegram mode. Sends 12 additional bytes and
|
Default Telegram mode. Sends 12 additional bytes and
|
||||||
needs to calculate the CRC value of the packet itself.
|
needs to calculate the CRC value of the packet itself.
|
||||||
"""
|
"""
|
||||||
def __init__(self, ip, port, *, loop, proxy=None):
|
def __init__(self, ip, port, *, loop, loggers, proxy=None):
|
||||||
super().__init__(ip, port, loop=loop, proxy=proxy)
|
super().__init__(ip, port, loop=loop, loggers=loggers, proxy=proxy)
|
||||||
self._send_counter = 0
|
self._send_counter = 0
|
||||||
|
|
||||||
async def connect(self, timeout=None, ssl=None):
|
async def connect(self, timeout=None, ssl=None):
|
||||||
|
|
|
@ -10,8 +10,8 @@ class ConnectionTcpObfuscated(ConnectionTcpAbridged):
|
||||||
every message with a randomly generated key using the
|
every message with a randomly generated key using the
|
||||||
AES-CTR mode so the packets are harder to discern.
|
AES-CTR mode so the packets are harder to discern.
|
||||||
"""
|
"""
|
||||||
def __init__(self, ip, port, *, loop, proxy=None):
|
def __init__(self, ip, port, *, loop, loggers, proxy=None):
|
||||||
super().__init__(ip, port, loop=loop, proxy=proxy)
|
super().__init__(ip, port, loop=loop, loggers=loggers, proxy=proxy)
|
||||||
self._aes_encrypt = None
|
self._aes_encrypt = None
|
||||||
self._aes_decrypt = None
|
self._aes_decrypt = None
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import collections
|
import collections
|
||||||
import functools
|
import functools
|
||||||
import logging
|
|
||||||
|
|
||||||
from . import authenticator
|
from . import authenticator
|
||||||
from ..extensions.messagepacker import MessagePacker
|
from ..extensions.messagepacker import MessagePacker
|
||||||
|
@ -24,8 +23,6 @@ from ..tl.types import (
|
||||||
)
|
)
|
||||||
from ..crypto import AuthKey
|
from ..crypto import AuthKey
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
def _cancellable(func):
|
def _cancellable(func):
|
||||||
"""
|
"""
|
||||||
|
@ -59,12 +56,14 @@ class MTProtoSender:
|
||||||
A new authorization key will be generated on connection if no other
|
A new authorization key will be generated on connection if no other
|
||||||
key exists yet.
|
key exists yet.
|
||||||
"""
|
"""
|
||||||
def __init__(self, auth_key, loop, *,
|
def __init__(self, auth_key, loop, *, loggers,
|
||||||
retries=5, delay=1, auto_reconnect=True, connect_timeout=None,
|
retries=5, delay=1, auto_reconnect=True, connect_timeout=None,
|
||||||
auth_key_callback=None,
|
auth_key_callback=None,
|
||||||
update_callback=None, auto_reconnect_callback=None):
|
update_callback=None, auto_reconnect_callback=None):
|
||||||
self._connection = None
|
self._connection = None
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
|
self._loggers = loggers
|
||||||
|
self._log = loggers[__name__]
|
||||||
self._retries = retries
|
self._retries = retries
|
||||||
self._delay = delay
|
self._delay = delay
|
||||||
self._auto_reconnect = auto_reconnect
|
self._auto_reconnect = auto_reconnect
|
||||||
|
@ -90,11 +89,12 @@ class MTProtoSender:
|
||||||
|
|
||||||
# Preserving the references of the AuthKey and state is important
|
# Preserving the references of the AuthKey and state is important
|
||||||
self.auth_key = auth_key or AuthKey(None)
|
self.auth_key = auth_key or AuthKey(None)
|
||||||
self._state = MTProtoState(self.auth_key)
|
self._state = MTProtoState(self.auth_key, loggers=self._loggers)
|
||||||
|
|
||||||
# Outgoing messages are put in a queue and sent in a batch.
|
# Outgoing messages are put in a queue and sent in a batch.
|
||||||
# Note that here we're also storing their ``_RequestState``.
|
# Note that here we're also storing their ``_RequestState``.
|
||||||
self._send_queue = MessagePacker(self._state, self._loop)
|
self._send_queue = MessagePacker(self._state, self._loop,
|
||||||
|
loggers=self._loggers)
|
||||||
|
|
||||||
# Sent states are remembered until a response is received.
|
# Sent states are remembered until a response is received.
|
||||||
self._pending_state = {}
|
self._pending_state = {}
|
||||||
|
@ -132,7 +132,7 @@ class MTProtoSender:
|
||||||
Connects to the specified given connection using the given auth key.
|
Connects to the specified given connection using the given auth key.
|
||||||
"""
|
"""
|
||||||
if self._user_connected:
|
if self._user_connected:
|
||||||
__log__.info('User is already connected!')
|
self._log.info('User is already connected!')
|
||||||
return
|
return
|
||||||
|
|
||||||
self._connection = connection
|
self._connection = connection
|
||||||
|
@ -210,13 +210,13 @@ class MTProtoSender:
|
||||||
authorization key if necessary, and starting the send and
|
authorization key if necessary, and starting the send and
|
||||||
receive loops.
|
receive loops.
|
||||||
"""
|
"""
|
||||||
__log__.info('Connecting to %s...', self._connection)
|
self._log.info('Connecting to %s...', self._connection)
|
||||||
for retry in range(1, self._retries + 1):
|
for retry in range(1, self._retries + 1):
|
||||||
try:
|
try:
|
||||||
__log__.debug('Connection attempt {}...'.format(retry))
|
self._log.debug('Connection attempt {}...'.format(retry))
|
||||||
await self._connection.connect(timeout=self._connect_timeout)
|
await self._connection.connect(timeout=self._connect_timeout)
|
||||||
except (ConnectionError, asyncio.TimeoutError) as e:
|
except (ConnectionError, asyncio.TimeoutError) as e:
|
||||||
__log__.warning('Attempt {} at connecting failed: {}: {}'
|
self._log.warning('Attempt {} at connecting failed: {}: {}'
|
||||||
.format(retry, type(e).__name__, e))
|
.format(retry, type(e).__name__, e))
|
||||||
await asyncio.sleep(self._delay)
|
await asyncio.sleep(self._delay)
|
||||||
else:
|
else:
|
||||||
|
@ -225,12 +225,12 @@ class MTProtoSender:
|
||||||
raise ConnectionError('Connection to Telegram failed {} times'
|
raise ConnectionError('Connection to Telegram failed {} times'
|
||||||
.format(self._retries))
|
.format(self._retries))
|
||||||
|
|
||||||
__log__.debug('Connection success!')
|
self._log.debug('Connection success!')
|
||||||
if not self.auth_key:
|
if not self.auth_key:
|
||||||
plain = MTProtoPlainSender(self._connection)
|
plain = MTProtoPlainSender(self._connection)
|
||||||
for retry in range(1, self._retries + 1):
|
for retry in range(1, self._retries + 1):
|
||||||
try:
|
try:
|
||||||
__log__.debug('New auth_key attempt {}...'.format(retry))
|
self._log.debug('New auth_key attempt {}...'.format(retry))
|
||||||
self.auth_key.key, self._state.time_offset =\
|
self.auth_key.key, self._state.time_offset =\
|
||||||
await authenticator.do_authentication(plain)
|
await authenticator.do_authentication(plain)
|
||||||
|
|
||||||
|
@ -243,7 +243,7 @@ class MTProtoSender:
|
||||||
|
|
||||||
break
|
break
|
||||||
except (SecurityError, AssertionError) as e:
|
except (SecurityError, AssertionError) as e:
|
||||||
__log__.warning('Attempt {} at new auth_key failed: {}'
|
self._log.warning('Attempt {} at new auth_key failed: {}'
|
||||||
.format(retry, e))
|
.format(retry, e))
|
||||||
await asyncio.sleep(self._delay)
|
await asyncio.sleep(self._delay)
|
||||||
else:
|
else:
|
||||||
|
@ -252,10 +252,10 @@ class MTProtoSender:
|
||||||
self._disconnect(error=e)
|
self._disconnect(error=e)
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
__log__.debug('Starting send loop')
|
self._log.debug('Starting send loop')
|
||||||
self._send_loop_handle = self._loop.create_task(self._send_loop())
|
self._send_loop_handle = self._loop.create_task(self._send_loop())
|
||||||
|
|
||||||
__log__.debug('Starting receive loop')
|
self._log.debug('Starting receive loop')
|
||||||
self._recv_loop_handle = self._loop.create_task(self._recv_loop())
|
self._recv_loop_handle = self._loop.create_task(self._recv_loop())
|
||||||
|
|
||||||
# _disconnected only completes after manual disconnection
|
# _disconnected only completes after manual disconnection
|
||||||
|
@ -264,16 +264,16 @@ class MTProtoSender:
|
||||||
if self._disconnected.done():
|
if self._disconnected.done():
|
||||||
self._disconnected = self._loop.create_future()
|
self._disconnected = self._loop.create_future()
|
||||||
|
|
||||||
__log__.info('Connection to %s complete!', self._connection)
|
self._log.info('Connection to %s complete!', self._connection)
|
||||||
|
|
||||||
def _disconnect(self, error=None):
|
def _disconnect(self, error=None):
|
||||||
__log__.info('Disconnecting from %s...', self._connection)
|
self._log.info('Disconnecting from %s...', self._connection)
|
||||||
self._user_connected = False
|
self._user_connected = False
|
||||||
try:
|
try:
|
||||||
__log__.debug('Closing current connection...')
|
self._log.debug('Closing current connection...')
|
||||||
self._connection.disconnect()
|
self._connection.disconnect()
|
||||||
finally:
|
finally:
|
||||||
__log__.debug('Cancelling {} pending message(s)...'
|
self._log.debug('Cancelling {} pending message(s)...'
|
||||||
.format(len(self._pending_state)))
|
.format(len(self._pending_state)))
|
||||||
for state in self._pending_state.values():
|
for state in self._pending_state.values():
|
||||||
if error and not state.future.done():
|
if error and not state.future.done():
|
||||||
|
@ -286,14 +286,14 @@ class MTProtoSender:
|
||||||
self._last_ack = None
|
self._last_ack = None
|
||||||
|
|
||||||
if self._send_loop_handle:
|
if self._send_loop_handle:
|
||||||
__log__.debug('Cancelling the send loop...')
|
self._log.debug('Cancelling the send loop...')
|
||||||
self._send_loop_handle.cancel()
|
self._send_loop_handle.cancel()
|
||||||
|
|
||||||
if self._recv_loop_handle:
|
if self._recv_loop_handle:
|
||||||
__log__.debug('Cancelling the receive loop...')
|
self._log.debug('Cancelling the receive loop...')
|
||||||
self._recv_loop_handle.cancel()
|
self._recv_loop_handle.cancel()
|
||||||
|
|
||||||
__log__.info('Disconnection from %s complete!', self._connection)
|
self._log.info('Disconnection from %s complete!', self._connection)
|
||||||
if self._disconnected and not self._disconnected.done():
|
if self._disconnected and not self._disconnected.done():
|
||||||
if error:
|
if error:
|
||||||
self._disconnected.set_exception(error)
|
self._disconnected.set_exception(error)
|
||||||
|
@ -306,13 +306,13 @@ class MTProtoSender:
|
||||||
"""
|
"""
|
||||||
self._reconnecting = True
|
self._reconnecting = True
|
||||||
|
|
||||||
__log__.debug('Closing current connection...')
|
self._log.debug('Closing current connection...')
|
||||||
self._connection.disconnect()
|
self._connection.disconnect()
|
||||||
|
|
||||||
__log__.debug('Cancelling the send loop...')
|
self._log.debug('Cancelling the send loop...')
|
||||||
self._send_loop_handle.cancel()
|
self._send_loop_handle.cancel()
|
||||||
|
|
||||||
__log__.debug('Cancelling the receive loop...')
|
self._log.debug('Cancelling the receive loop...')
|
||||||
self._recv_loop_handle.cancel()
|
self._recv_loop_handle.cancel()
|
||||||
|
|
||||||
self._reconnecting = False
|
self._reconnecting = False
|
||||||
|
@ -325,12 +325,12 @@ class MTProtoSender:
|
||||||
try:
|
try:
|
||||||
await self._connect()
|
await self._connect()
|
||||||
except (ConnectionError, asyncio.TimeoutError) as e:
|
except (ConnectionError, asyncio.TimeoutError) as e:
|
||||||
__log__.info('Failed reconnection retry %d/%d with %s',
|
self._log.info('Failed reconnection retry %d/%d with %s',
|
||||||
retry, retries, e.__class__.__name__)
|
retry, retries, e.__class__.__name__)
|
||||||
|
|
||||||
await asyncio.sleep(self._delay)
|
await asyncio.sleep(self._delay)
|
||||||
except Exception:
|
except Exception:
|
||||||
__log__.exception('Unexpected exception reconnecting on '
|
self._log.exception('Unexpected exception reconnecting on '
|
||||||
'retry %d/%d', retry, retries)
|
'retry %d/%d', retry, retries)
|
||||||
|
|
||||||
await asyncio.sleep(self._delay)
|
await asyncio.sleep(self._delay)
|
||||||
|
@ -343,7 +343,7 @@ class MTProtoSender:
|
||||||
|
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
__log__.error('Failed to reconnect automatically.')
|
self._log.error('Failed to reconnect automatically.')
|
||||||
self._disconnect(error=ConnectionError())
|
self._disconnect(error=ConnectionError())
|
||||||
|
|
||||||
def _start_reconnect(self):
|
def _start_reconnect(self):
|
||||||
|
@ -368,7 +368,7 @@ class MTProtoSender:
|
||||||
self._last_acks.append(ack)
|
self._last_acks.append(ack)
|
||||||
self._pending_ack.clear()
|
self._pending_ack.clear()
|
||||||
|
|
||||||
__log__.debug('Waiting for messages to send...')
|
self._log.debug('Waiting for messages to send...')
|
||||||
# TODO Wait for the connection send queue to be empty?
|
# TODO Wait for the connection send queue to be empty?
|
||||||
# This means that while it's not empty we can wait for
|
# This means that while it's not empty we can wait for
|
||||||
# more messages to be added to the send queue.
|
# more messages to be added to the send queue.
|
||||||
|
@ -377,14 +377,14 @@ class MTProtoSender:
|
||||||
if not data:
|
if not data:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
__log__.debug('Encrypting %d message(s) in %d bytes for sending',
|
self._log.debug('Encrypting %d message(s) in %d bytes for sending',
|
||||||
len(batch), len(data))
|
len(batch), len(data))
|
||||||
|
|
||||||
data = self._state.encrypt_message_data(data)
|
data = self._state.encrypt_message_data(data)
|
||||||
try:
|
try:
|
||||||
await self._connection.send(data)
|
await self._connection.send(data)
|
||||||
except ConnectionError:
|
except ConnectionError:
|
||||||
__log__.info('Connection closed while sending data')
|
self._log.info('Connection closed while sending data')
|
||||||
self._start_reconnect()
|
self._start_reconnect()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -397,7 +397,7 @@ class MTProtoSender:
|
||||||
if isinstance(s.request, TLRequest):
|
if isinstance(s.request, TLRequest):
|
||||||
self._pending_state[s.msg_id] = s
|
self._pending_state[s.msg_id] = s
|
||||||
|
|
||||||
__log__.debug('Encrypted messages put in a queue to be sent')
|
self._log.debug('Encrypted messages put in a queue to be sent')
|
||||||
|
|
||||||
@_cancellable
|
@_cancellable
|
||||||
async def _recv_loop(self):
|
async def _recv_loop(self):
|
||||||
|
@ -408,11 +408,11 @@ class MTProtoSender:
|
||||||
Besides `connect`, only this method ever receives data.
|
Besides `connect`, only this method ever receives data.
|
||||||
"""
|
"""
|
||||||
while self._user_connected and not self._reconnecting:
|
while self._user_connected and not self._reconnecting:
|
||||||
__log__.debug('Receiving items from the network...')
|
self._log.debug('Receiving items from the network...')
|
||||||
try:
|
try:
|
||||||
body = await self._connection.recv()
|
body = await self._connection.recv()
|
||||||
except ConnectionError:
|
except ConnectionError:
|
||||||
__log__.info('Connection closed while receiving data')
|
self._log.info('Connection closed while receiving data')
|
||||||
self._start_reconnect()
|
self._start_reconnect()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -420,20 +420,20 @@ class MTProtoSender:
|
||||||
message = self._state.decrypt_message_data(body)
|
message = self._state.decrypt_message_data(body)
|
||||||
except TypeNotFoundError as e:
|
except TypeNotFoundError as e:
|
||||||
# Received object which we don't know how to deserialize
|
# Received object which we don't know how to deserialize
|
||||||
__log__.info('Type %08x not found, remaining data %r',
|
self._log.info('Type %08x not found, remaining data %r',
|
||||||
e.invalid_constructor_id, e.remaining)
|
e.invalid_constructor_id, e.remaining)
|
||||||
continue
|
continue
|
||||||
except SecurityError as e:
|
except SecurityError as e:
|
||||||
# A step while decoding had the incorrect data. This message
|
# A step while decoding had the incorrect data. This message
|
||||||
# should not be considered safe and it should be ignored.
|
# should not be considered safe and it should be ignored.
|
||||||
__log__.warning('Security error while unpacking a '
|
self._log.warning('Security error while unpacking a '
|
||||||
'received message: %s', e)
|
'received message: %s', e)
|
||||||
continue
|
continue
|
||||||
except BufferError as e:
|
except BufferError as e:
|
||||||
if isinstance(e, InvalidBufferError) and e.code == 404:
|
if isinstance(e, InvalidBufferError) and e.code == 404:
|
||||||
__log__.info('Broken authorization key; resetting')
|
self._log.info('Broken authorization key; resetting')
|
||||||
else:
|
else:
|
||||||
__log__.warning('Invalid buffer %s', e)
|
self._log.warning('Invalid buffer %s', e)
|
||||||
|
|
||||||
self.auth_key.key = None
|
self.auth_key.key = None
|
||||||
if self._auth_key_callback:
|
if self._auth_key_callback:
|
||||||
|
@ -442,14 +442,14 @@ class MTProtoSender:
|
||||||
self._start_reconnect()
|
self._start_reconnect()
|
||||||
return
|
return
|
||||||
except Exception:
|
except Exception:
|
||||||
__log__.exception('Unhandled error while receiving data')
|
self._log.exception('Unhandled error while receiving data')
|
||||||
self._start_reconnect()
|
self._start_reconnect()
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self._process_message(message)
|
await self._process_message(message)
|
||||||
except Exception:
|
except Exception:
|
||||||
__log__.exception('Unhandled error while processing msgs')
|
self._log.exception('Unhandled error while processing msgs')
|
||||||
|
|
||||||
# Response Handlers
|
# Response Handlers
|
||||||
|
|
||||||
|
@ -498,7 +498,7 @@ class MTProtoSender:
|
||||||
"""
|
"""
|
||||||
rpc_result = message.obj
|
rpc_result = message.obj
|
||||||
state = self._pending_state.pop(rpc_result.req_msg_id, None)
|
state = self._pending_state.pop(rpc_result.req_msg_id, None)
|
||||||
__log__.debug('Handling RPC result for message %d',
|
self._log.debug('Handling RPC result for message %d',
|
||||||
rpc_result.req_msg_id)
|
rpc_result.req_msg_id)
|
||||||
|
|
||||||
if not state:
|
if not state:
|
||||||
|
@ -511,7 +511,7 @@ class MTProtoSender:
|
||||||
if not isinstance(reader.tgread_object(), upload.File):
|
if not isinstance(reader.tgread_object(), upload.File):
|
||||||
raise ValueError('Not an upload.File')
|
raise ValueError('Not an upload.File')
|
||||||
except (TypeNotFoundError, ValueError):
|
except (TypeNotFoundError, ValueError):
|
||||||
__log__.info('Received response without parent request: {}'
|
self._log.info('Received response without parent request: {}'
|
||||||
.format(rpc_result.body))
|
.format(rpc_result.body))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -535,7 +535,7 @@ class MTProtoSender:
|
||||||
|
|
||||||
msg_container#73f1f8dc messages:vector<%Message> = MessageContainer;
|
msg_container#73f1f8dc messages:vector<%Message> = MessageContainer;
|
||||||
"""
|
"""
|
||||||
__log__.debug('Handling container')
|
self._log.debug('Handling container')
|
||||||
for inner_message in message.obj.messages:
|
for inner_message in message.obj.messages:
|
||||||
await self._process_message(inner_message)
|
await self._process_message(inner_message)
|
||||||
|
|
||||||
|
@ -545,13 +545,13 @@ class MTProtoSender:
|
||||||
|
|
||||||
gzip_packed#3072cfa1 packed_data:bytes = Object;
|
gzip_packed#3072cfa1 packed_data:bytes = Object;
|
||||||
"""
|
"""
|
||||||
__log__.debug('Handling gzipped data')
|
self._log.debug('Handling gzipped data')
|
||||||
with BinaryReader(message.obj.data) as reader:
|
with BinaryReader(message.obj.data) as reader:
|
||||||
message.obj = reader.tgread_object()
|
message.obj = reader.tgread_object()
|
||||||
await self._process_message(message)
|
await self._process_message(message)
|
||||||
|
|
||||||
async def _handle_update(self, message):
|
async def _handle_update(self, message):
|
||||||
__log__.debug('Handling update {}'
|
self._log.debug('Handling update {}'
|
||||||
.format(message.obj.__class__.__name__))
|
.format(message.obj.__class__.__name__))
|
||||||
if self._update_callback:
|
if self._update_callback:
|
||||||
await self._update_callback(message.obj)
|
await self._update_callback(message.obj)
|
||||||
|
@ -564,7 +564,7 @@ class MTProtoSender:
|
||||||
pong#347773c5 msg_id:long ping_id:long = Pong;
|
pong#347773c5 msg_id:long ping_id:long = Pong;
|
||||||
"""
|
"""
|
||||||
pong = message.obj
|
pong = message.obj
|
||||||
__log__.debug('Handling pong for message %d', pong.msg_id)
|
self._log.debug('Handling pong for message %d', pong.msg_id)
|
||||||
state = self._pending_state.pop(pong.msg_id, None)
|
state = self._pending_state.pop(pong.msg_id, None)
|
||||||
if state:
|
if state:
|
||||||
state.future.set_result(pong)
|
state.future.set_result(pong)
|
||||||
|
@ -578,12 +578,12 @@ class MTProtoSender:
|
||||||
error_code:int new_server_salt:long = BadMsgNotification;
|
error_code:int new_server_salt:long = BadMsgNotification;
|
||||||
"""
|
"""
|
||||||
bad_salt = message.obj
|
bad_salt = message.obj
|
||||||
__log__.debug('Handling bad salt for message %d', bad_salt.bad_msg_id)
|
self._log.debug('Handling bad salt for message %d', bad_salt.bad_msg_id)
|
||||||
self._state.salt = bad_salt.new_server_salt
|
self._state.salt = bad_salt.new_server_salt
|
||||||
states = self._pop_states(bad_salt.bad_msg_id)
|
states = self._pop_states(bad_salt.bad_msg_id)
|
||||||
self._send_queue.extend(states)
|
self._send_queue.extend(states)
|
||||||
|
|
||||||
__log__.debug('%d message(s) will be resent', len(states))
|
self._log.debug('%d message(s) will be resent', len(states))
|
||||||
|
|
||||||
async def _handle_bad_notification(self, message):
|
async def _handle_bad_notification(self, message):
|
||||||
"""
|
"""
|
||||||
|
@ -596,13 +596,13 @@ class MTProtoSender:
|
||||||
bad_msg = message.obj
|
bad_msg = message.obj
|
||||||
states = self._pop_states(bad_msg.bad_msg_id)
|
states = self._pop_states(bad_msg.bad_msg_id)
|
||||||
|
|
||||||
__log__.debug('Handling bad msg %s', bad_msg)
|
self._log.debug('Handling bad msg %s', bad_msg)
|
||||||
if bad_msg.error_code in (16, 17):
|
if bad_msg.error_code in (16, 17):
|
||||||
# Sent msg_id too low or too high (respectively).
|
# Sent msg_id too low or too high (respectively).
|
||||||
# Use the current msg_id to determine the right time offset.
|
# Use the current msg_id to determine the right time offset.
|
||||||
to = self._state.update_time_offset(
|
to = self._state.update_time_offset(
|
||||||
correct_msg_id=message.msg_id)
|
correct_msg_id=message.msg_id)
|
||||||
__log__.info('System clock is wrong, set time offset to %ds', to)
|
self._log.info('System clock is wrong, set time offset to %ds', to)
|
||||||
elif bad_msg.error_code == 32:
|
elif bad_msg.error_code == 32:
|
||||||
# msg_seqno too low, so just pump it up by some "large" amount
|
# msg_seqno too low, so just pump it up by some "large" amount
|
||||||
# TODO A better fix would be to start with a new fresh session ID
|
# TODO A better fix would be to start with a new fresh session ID
|
||||||
|
@ -618,7 +618,8 @@ class MTProtoSender:
|
||||||
|
|
||||||
# Messages are to be re-sent once we've corrected the issue
|
# Messages are to be re-sent once we've corrected the issue
|
||||||
self._send_queue.extend(states)
|
self._send_queue.extend(states)
|
||||||
__log__.debug('%d messages will be resent due to bad msg', len(states))
|
self._log.debug('%d messages will be resent due to bad msg',
|
||||||
|
len(states))
|
||||||
|
|
||||||
async def _handle_detailed_info(self, message):
|
async def _handle_detailed_info(self, message):
|
||||||
"""
|
"""
|
||||||
|
@ -629,7 +630,7 @@ class MTProtoSender:
|
||||||
"""
|
"""
|
||||||
# TODO https://goo.gl/VvpCC6
|
# TODO https://goo.gl/VvpCC6
|
||||||
msg_id = message.obj.answer_msg_id
|
msg_id = message.obj.answer_msg_id
|
||||||
__log__.debug('Handling detailed info for message %d', msg_id)
|
self._log.debug('Handling detailed info for message %d', msg_id)
|
||||||
self._pending_ack.add(msg_id)
|
self._pending_ack.add(msg_id)
|
||||||
|
|
||||||
async def _handle_new_detailed_info(self, message):
|
async def _handle_new_detailed_info(self, message):
|
||||||
|
@ -641,7 +642,7 @@ class MTProtoSender:
|
||||||
"""
|
"""
|
||||||
# TODO https://goo.gl/G7DPsR
|
# TODO https://goo.gl/G7DPsR
|
||||||
msg_id = message.obj.answer_msg_id
|
msg_id = message.obj.answer_msg_id
|
||||||
__log__.debug('Handling new detailed info for message %d', msg_id)
|
self._log.debug('Handling new detailed info for message %d', msg_id)
|
||||||
self._pending_ack.add(msg_id)
|
self._pending_ack.add(msg_id)
|
||||||
|
|
||||||
async def _handle_new_session_created(self, message):
|
async def _handle_new_session_created(self, message):
|
||||||
|
@ -652,7 +653,7 @@ class MTProtoSender:
|
||||||
server_salt:long = NewSession;
|
server_salt:long = NewSession;
|
||||||
"""
|
"""
|
||||||
# TODO https://goo.gl/LMyN7A
|
# TODO https://goo.gl/LMyN7A
|
||||||
__log__.debug('Handling new session created')
|
self._log.debug('Handling new session created')
|
||||||
self._state.salt = message.obj.server_salt
|
self._state.salt = message.obj.server_salt
|
||||||
|
|
||||||
async def _handle_ack(self, message):
|
async def _handle_ack(self, message):
|
||||||
|
@ -671,7 +672,7 @@ class MTProtoSender:
|
||||||
messages are acknowledged.
|
messages are acknowledged.
|
||||||
"""
|
"""
|
||||||
ack = message.obj
|
ack = message.obj
|
||||||
__log__.debug('Handling acknowledge for %s', str(ack.msg_ids))
|
self._log.debug('Handling acknowledge for %s', str(ack.msg_ids))
|
||||||
for msg_id in ack.msg_ids:
|
for msg_id in ack.msg_ids:
|
||||||
state = self._pending_state.get(msg_id)
|
state = self._pending_state.get(msg_id)
|
||||||
if state and isinstance(state.request, LogOutRequest):
|
if state and isinstance(state.request, LogOutRequest):
|
||||||
|
@ -688,7 +689,7 @@ class MTProtoSender:
|
||||||
"""
|
"""
|
||||||
# TODO save these salts and automatically adjust to the
|
# TODO save these salts and automatically adjust to the
|
||||||
# correct one whenever the salt in use expires.
|
# correct one whenever the salt in use expires.
|
||||||
__log__.debug('Handling future salts for message %d', message.msg_id)
|
self._log.debug('Handling future salts for message %d', message.msg_id)
|
||||||
state = self._pending_state.pop(message.msg_id, None)
|
state = self._pending_state.pop(message.msg_id, None)
|
||||||
if state:
|
if state:
|
||||||
state.future.set_result(message.obj)
|
state.future.set_result(message.obj)
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
import logging
|
|
||||||
import os
|
import os
|
||||||
import struct
|
import struct
|
||||||
import time
|
import time
|
||||||
|
@ -11,8 +10,6 @@ from ..tl.core import TLMessage
|
||||||
from ..tl.functions import InvokeAfterMsgRequest
|
from ..tl.functions import InvokeAfterMsgRequest
|
||||||
from ..tl.core.gzippacked import GzipPacked
|
from ..tl.core.gzippacked import GzipPacked
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class MTProtoState:
|
class MTProtoState:
|
||||||
"""
|
"""
|
||||||
|
@ -37,8 +34,9 @@ class MTProtoState:
|
||||||
many methods that would be needed to make it convenient to use for the
|
many methods that would be needed to make it convenient to use for the
|
||||||
authentication process, at which point the `MTProtoPlainSender` is better.
|
authentication process, at which point the `MTProtoPlainSender` is better.
|
||||||
"""
|
"""
|
||||||
def __init__(self, auth_key):
|
def __init__(self, auth_key, loggers):
|
||||||
self.auth_key = auth_key
|
self.auth_key = auth_key
|
||||||
|
self._log = loggers[__name__]
|
||||||
self.time_offset = 0
|
self.time_offset = 0
|
||||||
self.salt = 0
|
self.salt = 0
|
||||||
self.reset()
|
self.reset()
|
||||||
|
@ -183,7 +181,7 @@ class MTProtoState:
|
||||||
|
|
||||||
if self.time_offset != old:
|
if self.time_offset != old:
|
||||||
self._last_msg_id = 0
|
self._last_msg_id = 0
|
||||||
__log__.debug(
|
self._log.debug(
|
||||||
'Updated time offset (old offset %d, bad %d, good %d, new %d)',
|
'Updated time offset (old offset %d, bad %d, good %d, new %d)',
|
||||||
old, bad, correct_msg_id, self.time_offset
|
old, bad, correct_msg_id, self.time_offset
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,11 +1,6 @@
|
||||||
import logging
|
|
||||||
import struct
|
|
||||||
|
|
||||||
from .tlmessage import TLMessage
|
from .tlmessage import TLMessage
|
||||||
from ..tlobject import TLObject
|
from ..tlobject import TLObject
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class MessageContainer(TLObject):
|
class MessageContainer(TLObject):
|
||||||
CONSTRUCTOR_ID = 0x73f1f8dc
|
CONSTRUCTOR_ID = 0x73f1f8dc
|
||||||
|
|
|
@ -1,9 +1,5 @@
|
||||||
import logging
|
|
||||||
|
|
||||||
from .. import TLObject
|
from .. import TLObject
|
||||||
|
|
||||||
__log__ = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class TLMessage(TLObject):
|
class TLMessage(TLObject):
|
||||||
"""
|
"""
|
||||||
|
|
Loading…
Reference in New Issue
Block a user