mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-26 11:23:46 +03:00
Make a proper use of the logging module
This commit is contained in:
parent
7d189119f4
commit
5842d3741b
|
@ -21,7 +21,7 @@ from ..tl.types import (
|
||||||
)
|
)
|
||||||
from ..tl.functions.auth import LogOutRequest
|
from ..tl.functions.auth import LogOutRequest
|
||||||
|
|
||||||
logging.getLogger(__name__).addHandler(logging.NullHandler())
|
__log__ = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class MtProtoSender:
|
class MtProtoSender:
|
||||||
|
@ -46,7 +46,6 @@ class MtProtoSender:
|
||||||
"""
|
"""
|
||||||
self.session = session
|
self.session = session
|
||||||
self.connection = connection
|
self.connection = connection
|
||||||
self._logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Message IDs that need confirmation
|
# Message IDs that need confirmation
|
||||||
self._need_confirmation = set()
|
self._need_confirmation = set()
|
||||||
|
@ -137,6 +136,9 @@ class MtProtoSender:
|
||||||
# "This packet should be skipped"; since this may have
|
# "This packet should be skipped"; since this may have
|
||||||
# been a result for a request, invalidate every request
|
# been a result for a request, invalidate every request
|
||||||
# and just re-invoke them to avoid problems
|
# and just re-invoke them to avoid problems
|
||||||
|
__log__.exception('Error while receiving server response. '
|
||||||
|
'%d pending request(s) will be ignored',
|
||||||
|
len(self._pending_receive))
|
||||||
self._clear_all_pending()
|
self._clear_all_pending()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -218,7 +220,7 @@ class MtProtoSender:
|
||||||
code = reader.read_int(signed=False)
|
code = reader.read_int(signed=False)
|
||||||
reader.seek(-4)
|
reader.seek(-4)
|
||||||
|
|
||||||
# The following codes are "parsed manually"
|
__log__.debug('Processing server message with ID %s', hex(code))
|
||||||
if code == 0xf35c6d01: # rpc_result, (response of an RPC call)
|
if code == 0xf35c6d01: # rpc_result, (response of an RPC call)
|
||||||
return self._handle_rpc_result(msg_id, sequence, reader)
|
return self._handle_rpc_result(msg_id, sequence, reader)
|
||||||
|
|
||||||
|
@ -257,7 +259,6 @@ class MtProtoSender:
|
||||||
if r:
|
if r:
|
||||||
r.result = True # Telegram won't send this value
|
r.result = True # Telegram won't send this value
|
||||||
r.confirm_received.set()
|
r.confirm_received.set()
|
||||||
self._logger.debug('Message ack confirmed', r)
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -270,11 +271,9 @@ class MtProtoSender:
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
self._logger.debug(
|
__log__.warning(
|
||||||
'[WARN] Unknown message: {}, data left in the buffer: {}'
|
'Unknown message with ID %d, data left in the buffer %s',
|
||||||
.format(
|
hex(code), repr(reader.get_bytes()[reader.tell_position():])
|
||||||
hex(code), repr(reader.get_bytes()[reader.tell_position():])
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -351,13 +350,11 @@ class MtProtoSender:
|
||||||
:param reader: the reader containing the Pong.
|
:param reader: the reader containing the Pong.
|
||||||
:return: true, as it always succeeds.
|
:return: true, as it always succeeds.
|
||||||
"""
|
"""
|
||||||
self._logger.debug('Handling pong')
|
|
||||||
pong = reader.tgread_object()
|
pong = reader.tgread_object()
|
||||||
assert isinstance(pong, Pong)
|
assert isinstance(pong, Pong)
|
||||||
|
|
||||||
request = self._pop_request(pong.msg_id)
|
request = self._pop_request(pong.msg_id)
|
||||||
if request:
|
if request:
|
||||||
self._logger.debug('Pong confirmed a request')
|
|
||||||
request.result = pong
|
request.result = pong
|
||||||
request.confirm_received.set()
|
request.confirm_received.set()
|
||||||
|
|
||||||
|
@ -372,7 +369,6 @@ class MtProtoSender:
|
||||||
:param reader: the reader containing the MessageContainer.
|
:param reader: the reader containing the MessageContainer.
|
||||||
:return: true, as it always succeeds.
|
:return: true, as it always succeeds.
|
||||||
"""
|
"""
|
||||||
self._logger.debug('Handling container')
|
|
||||||
for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader):
|
for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader):
|
||||||
begin_position = reader.tell_position()
|
begin_position = reader.tell_position()
|
||||||
|
|
||||||
|
@ -397,7 +393,6 @@ class MtProtoSender:
|
||||||
:param reader: the reader containing the BadServerSalt.
|
:param reader: the reader containing the BadServerSalt.
|
||||||
:return: true, as it always succeeds.
|
:return: true, as it always succeeds.
|
||||||
"""
|
"""
|
||||||
self._logger.debug('Handling bad server salt')
|
|
||||||
bad_salt = reader.tgread_object()
|
bad_salt = reader.tgread_object()
|
||||||
assert isinstance(bad_salt, BadServerSalt)
|
assert isinstance(bad_salt, BadServerSalt)
|
||||||
|
|
||||||
|
@ -418,28 +413,29 @@ class MtProtoSender:
|
||||||
:param reader: the reader containing the BadMessageError.
|
:param reader: the reader containing the BadMessageError.
|
||||||
:return: true, as it always succeeds.
|
:return: true, as it always succeeds.
|
||||||
"""
|
"""
|
||||||
self._logger.debug('Handling bad message notification')
|
|
||||||
bad_msg = reader.tgread_object()
|
bad_msg = reader.tgread_object()
|
||||||
assert isinstance(bad_msg, BadMsgNotification)
|
assert isinstance(bad_msg, BadMsgNotification)
|
||||||
|
|
||||||
error = BadMessageError(bad_msg.error_code)
|
error = BadMessageError(bad_msg.error_code)
|
||||||
|
__log__.warning('Read bad msg notification %s: %s', bad_msg, error)
|
||||||
if bad_msg.error_code in (16, 17):
|
if bad_msg.error_code in (16, 17):
|
||||||
# sent msg_id too low or too high (respectively).
|
# sent msg_id too low or too high (respectively).
|
||||||
# Use the current msg_id to determine the right time offset.
|
# Use the current msg_id to determine the right time offset.
|
||||||
self.session.update_time_offset(correct_msg_id=msg_id)
|
self.session.update_time_offset(correct_msg_id=msg_id)
|
||||||
self._logger.debug('Read Bad Message error: ' + str(error))
|
__log__.info('Attempting to use the correct time offset')
|
||||||
self._logger.debug('Attempting to use the correct time offset.')
|
|
||||||
self._resend_request(bad_msg.bad_msg_id)
|
self._resend_request(bad_msg.bad_msg_id)
|
||||||
return True
|
return True
|
||||||
elif bad_msg.error_code == 32:
|
elif bad_msg.error_code == 32:
|
||||||
# msg_seqno too low, so just pump it up by some "large" amount
|
# msg_seqno too low, so just pump it up by some "large" amount
|
||||||
# TODO A better fix would be to start with a new fresh session ID
|
# TODO A better fix would be to start with a new fresh session ID
|
||||||
self.session._sequence += 64
|
self.session._sequence += 64
|
||||||
|
__log__.info('Attempting to set the right higher sequence')
|
||||||
self._resend_request(bad_msg.bad_msg_id)
|
self._resend_request(bad_msg.bad_msg_id)
|
||||||
return True
|
return True
|
||||||
elif bad_msg.error_code == 33:
|
elif bad_msg.error_code == 33:
|
||||||
# msg_seqno too high never seems to happen but just in case
|
# msg_seqno too high never seems to happen but just in case
|
||||||
self.session._sequence -= 16
|
self.session._sequence -= 16
|
||||||
|
__log__.info('Attempting to set the right lower sequence')
|
||||||
self._resend_request(bad_msg.bad_msg_id)
|
self._resend_request(bad_msg.bad_msg_id)
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
|
@ -504,7 +500,6 @@ class MtProtoSender:
|
||||||
:return: true if the request ID to which this result belongs is found,
|
:return: true if the request ID to which this result belongs is found,
|
||||||
false otherwise (meaning nothing was read).
|
false otherwise (meaning nothing was read).
|
||||||
"""
|
"""
|
||||||
self._logger.debug('Handling RPC result')
|
|
||||||
reader.read_int(signed=False) # code
|
reader.read_int(signed=False) # code
|
||||||
request_id = reader.read_long()
|
request_id = reader.read_long()
|
||||||
inner_code = reader.read_int(signed=False)
|
inner_code = reader.read_int(signed=False)
|
||||||
|
@ -530,11 +525,9 @@ class MtProtoSender:
|
||||||
request.confirm_received.set()
|
request.confirm_received.set()
|
||||||
# else TODO Where should this error be reported?
|
# else TODO Where should this error be reported?
|
||||||
# Read may be async. Can an error not-belong to a request?
|
# Read may be async. Can an error not-belong to a request?
|
||||||
self._logger.debug('Read RPC error: %s', str(error))
|
|
||||||
return True # All contents were read okay
|
return True # All contents were read okay
|
||||||
|
|
||||||
elif request:
|
elif request:
|
||||||
self._logger.debug('Reading request response')
|
|
||||||
if inner_code == 0x3072cfa1: # GZip packed
|
if inner_code == 0x3072cfa1: # GZip packed
|
||||||
unpacked_data = gzip.decompress(reader.tgread_bytes())
|
unpacked_data = gzip.decompress(reader.tgread_bytes())
|
||||||
with BinaryReader(unpacked_data) as compressed_reader:
|
with BinaryReader(unpacked_data) as compressed_reader:
|
||||||
|
@ -549,7 +542,7 @@ class MtProtoSender:
|
||||||
|
|
||||||
# If it's really a result for RPC from previous connection
|
# If it's really a result for RPC from previous connection
|
||||||
# session, it will be skipped by the handle_container()
|
# session, it will be skipped by the handle_container()
|
||||||
self._logger.debug('Lost request will be skipped.')
|
__log__.warning('Lost request will be skipped')
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _handle_gzip_packed(self, msg_id, sequence, reader, state):
|
def _handle_gzip_packed(self, msg_id, sequence, reader, state):
|
||||||
|
@ -561,7 +554,6 @@ class MtProtoSender:
|
||||||
:param reader: the reader containing the GzipPacked.
|
:param reader: the reader containing the GzipPacked.
|
||||||
:return: the result of processing the packed message.
|
:return: the result of processing the packed message.
|
||||||
"""
|
"""
|
||||||
self._logger.debug('Handling gzip packed data')
|
|
||||||
with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
|
with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
|
||||||
# We are reentering process_msg, which seemingly the same msg_id
|
# We are reentering process_msg, which seemingly the same msg_id
|
||||||
# to the self._need_confirmation set. Remove it from there first
|
# to the self._need_confirmation set. Remove it from there first
|
||||||
|
|
|
@ -43,6 +43,8 @@ DEFAULT_IPV4_IP = '149.154.167.51'
|
||||||
DEFAULT_IPV6_IP = '[2001:67c:4e8:f002::a]'
|
DEFAULT_IPV6_IP = '[2001:67c:4e8:f002::a]'
|
||||||
DEFAULT_PORT = 443
|
DEFAULT_PORT = 443
|
||||||
|
|
||||||
|
__log__ = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class TelegramBareClient:
|
class TelegramBareClient:
|
||||||
"""Bare Telegram Client with just the minimum -
|
"""Bare Telegram Client with just the minimum -
|
||||||
|
@ -117,8 +119,6 @@ class TelegramBareClient:
|
||||||
mode=connection_mode, proxy=proxy, timeout=timeout
|
mode=connection_mode, proxy=proxy, timeout=timeout
|
||||||
))
|
))
|
||||||
|
|
||||||
self._logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Two threads may be calling reconnect() when the connection is lost,
|
# Two threads may be calling reconnect() when the connection is lost,
|
||||||
# we only want one to actually perform the reconnection.
|
# we only want one to actually perform the reconnection.
|
||||||
self._reconnect_lock = Lock()
|
self._reconnect_lock = Lock()
|
||||||
|
@ -191,11 +191,15 @@ class TelegramBareClient:
|
||||||
native data center, raising a "UserMigrateError", and
|
native data center, raising a "UserMigrateError", and
|
||||||
calling .disconnect() in the process.
|
calling .disconnect() in the process.
|
||||||
"""
|
"""
|
||||||
|
__log__.info('Connecting to %s:%d...',
|
||||||
|
self.session.server_address, self.session.port)
|
||||||
|
|
||||||
self._main_thread_ident = threading.get_ident()
|
self._main_thread_ident = threading.get_ident()
|
||||||
self._background_error = None # Clear previous errors
|
self._background_error = None # Clear previous errors
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._sender.connect()
|
self._sender.connect()
|
||||||
|
__log__.info('Connection success!')
|
||||||
|
|
||||||
# Connection was successful! Try syncing the update state
|
# Connection was successful! Try syncing the update state
|
||||||
# UNLESS '_sync_updates' is False (we probably are in
|
# UNLESS '_sync_updates' is False (we probably are in
|
||||||
|
@ -215,14 +219,15 @@ class TelegramBareClient:
|
||||||
|
|
||||||
except TypeNotFoundError as e:
|
except TypeNotFoundError as e:
|
||||||
# This is fine, probably layer migration
|
# This is fine, probably layer migration
|
||||||
self._logger.debug('Found invalid item, probably migrating', e)
|
__log__.warning('Connection failed, got unexpected type with ID '
|
||||||
|
'%s. Migrating?', hex(e.invalid_constructor_id))
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
return self.connect(_sync_updates=_sync_updates)
|
return self.connect(_sync_updates=_sync_updates)
|
||||||
|
|
||||||
except (RPCError, ConnectionError):
|
except (RPCError, ConnectionError) as e:
|
||||||
# Probably errors from the previous session, ignore them
|
# Probably errors from the previous session, ignore them
|
||||||
|
__log__.error('Connection failed due to %s', e)
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
self._logger.exception('Could not stabilise initial connection.')
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def is_connected(self):
|
def is_connected(self):
|
||||||
|
@ -244,14 +249,19 @@ class TelegramBareClient:
|
||||||
def disconnect(self):
|
def disconnect(self):
|
||||||
"""Disconnects from the Telegram server
|
"""Disconnects from the Telegram server
|
||||||
and stops all the spawned threads"""
|
and stops all the spawned threads"""
|
||||||
|
__log__.info('Disconnecting...')
|
||||||
self._user_connected = False # This will stop recv_thread's loop
|
self._user_connected = False # This will stop recv_thread's loop
|
||||||
|
|
||||||
|
__log__.debug('Stopping all workers...')
|
||||||
self.updates.stop_workers()
|
self.updates.stop_workers()
|
||||||
|
|
||||||
# This will trigger a "ConnectionResetError" on the recv_thread,
|
# This will trigger a "ConnectionResetError" on the recv_thread,
|
||||||
# which won't attempt reconnecting as ._user_connected is False.
|
# which won't attempt reconnecting as ._user_connected is False.
|
||||||
|
__log__.debug('Disconnecting the socket...')
|
||||||
self._sender.disconnect()
|
self._sender.disconnect()
|
||||||
|
|
||||||
if self._recv_thread:
|
if self._recv_thread:
|
||||||
|
__log__.debug('Joining the read thread...')
|
||||||
self._recv_thread.join()
|
self._recv_thread.join()
|
||||||
|
|
||||||
# TODO Shall we clear the _exported_sessions, or may be reused?
|
# TODO Shall we clear the _exported_sessions, or may be reused?
|
||||||
|
@ -268,17 +278,21 @@ class TelegramBareClient:
|
||||||
"""
|
"""
|
||||||
if new_dc is None:
|
if new_dc is None:
|
||||||
if self.is_connected():
|
if self.is_connected():
|
||||||
|
__log__.info('Reconnection aborted: already connected')
|
||||||
return True
|
return True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
__log__.info('Attempting reconnection...')
|
||||||
return self.connect()
|
return self.connect()
|
||||||
except ConnectionResetError:
|
except ConnectionResetError as e:
|
||||||
|
__log__.warning('Reconnection failed due to %s', e)
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
# Since we're reconnecting possibly due to a UserMigrateError,
|
# Since we're reconnecting possibly due to a UserMigrateError,
|
||||||
# we need to first know the Data Centers we can connect to. Do
|
# we need to first know the Data Centers we can connect to. Do
|
||||||
# that before disconnecting.
|
# that before disconnecting.
|
||||||
dc = self._get_dc(new_dc)
|
dc = self._get_dc(new_dc)
|
||||||
|
__log__.info('Reconnecting to new data center %s', dc)
|
||||||
|
|
||||||
self.session.server_address = dc.ip_address
|
self.session.server_address = dc.ip_address
|
||||||
self.session.port = dc.port
|
self.session.port = dc.port
|
||||||
|
@ -340,6 +354,7 @@ class TelegramBareClient:
|
||||||
dc = self._get_dc(dc_id)
|
dc = self._get_dc(dc_id)
|
||||||
|
|
||||||
# Export the current authorization to the new DC.
|
# Export the current authorization to the new DC.
|
||||||
|
__log__.info('Exporting authorization for data center %s', dc)
|
||||||
export_auth = self(ExportAuthorizationRequest(dc_id))
|
export_auth = self(ExportAuthorizationRequest(dc_id))
|
||||||
|
|
||||||
# Create a temporary session for this IP address, which needs
|
# Create a temporary session for this IP address, which needs
|
||||||
|
@ -352,6 +367,7 @@ class TelegramBareClient:
|
||||||
session.port = dc.port
|
session.port = dc.port
|
||||||
self._exported_sessions[dc_id] = session
|
self._exported_sessions[dc_id] = session
|
||||||
|
|
||||||
|
__log__.info('Creating exported new client')
|
||||||
client = TelegramBareClient(
|
client = TelegramBareClient(
|
||||||
session, self.api_id, self.api_hash,
|
session, self.api_id, self.api_hash,
|
||||||
proxy=self._sender.connection.conn.proxy,
|
proxy=self._sender.connection.conn.proxy,
|
||||||
|
@ -363,7 +379,7 @@ class TelegramBareClient:
|
||||||
id=export_auth.id, bytes=export_auth.bytes
|
id=export_auth.id, bytes=export_auth.bytes
|
||||||
))
|
))
|
||||||
elif export_auth is not None:
|
elif export_auth is not None:
|
||||||
self._logger.warning('Unknown return export_auth type', export_auth)
|
__log__.warning('Unknown export auth type %s', export_auth)
|
||||||
|
|
||||||
client._authorized = True # We exported the auth, so we got auth
|
client._authorized = True # We exported the auth, so we got auth
|
||||||
return client
|
return client
|
||||||
|
@ -378,6 +394,7 @@ class TelegramBareClient:
|
||||||
session.port = dc.port
|
session.port = dc.port
|
||||||
self._exported_sessions[cdn_redirect.dc_id] = session
|
self._exported_sessions[cdn_redirect.dc_id] = session
|
||||||
|
|
||||||
|
__log__.info('Creating new CDN client')
|
||||||
client = TelegramBareClient(
|
client = TelegramBareClient(
|
||||||
session, self.api_id, self.api_hash,
|
session, self.api_id, self.api_hash,
|
||||||
proxy=self._sender.connection.conn.proxy,
|
proxy=self._sender.connection.conn.proxy,
|
||||||
|
@ -407,12 +424,23 @@ class TelegramBareClient:
|
||||||
x.content_related for x in requests):
|
x.content_related for x in requests):
|
||||||
raise ValueError('You can only invoke requests, not types!')
|
raise ValueError('You can only invoke requests, not types!')
|
||||||
|
|
||||||
|
# For logging purposes
|
||||||
|
if len(requests) == 1:
|
||||||
|
which = type(requests[0]).__name__
|
||||||
|
else:
|
||||||
|
which = '{} requests ({})'.format(
|
||||||
|
len(requests), [type(x).__name__ for x in requests])
|
||||||
|
|
||||||
# Determine the sender to be used (main or a new connection)
|
# Determine the sender to be used (main or a new connection)
|
||||||
on_main_thread = threading.get_ident() == self._main_thread_ident
|
on_main_thread = threading.get_ident() == self._main_thread_ident
|
||||||
if on_main_thread or self._on_read_thread():
|
if on_main_thread or self._on_read_thread():
|
||||||
|
__log__.debug('Invoking %s from main thread', which)
|
||||||
sender = self._sender
|
sender = self._sender
|
||||||
update_state = self.updates
|
update_state = self.updates
|
||||||
else:
|
else:
|
||||||
|
__log__.debug('Invoking %s from background thread. '
|
||||||
|
'Creating temporary connection', which)
|
||||||
|
|
||||||
sender = self._sender.clone()
|
sender = self._sender.clone()
|
||||||
sender.connect()
|
sender.connect()
|
||||||
# We're on another connection, Telegram will resend all the
|
# We're on another connection, Telegram will resend all the
|
||||||
|
@ -431,7 +459,7 @@ class TelegramBareClient:
|
||||||
call_receive = not on_main_thread or self._recv_thread is None \
|
call_receive = not on_main_thread or self._recv_thread is None \
|
||||||
or self._reconnect_lock.locked()
|
or self._reconnect_lock.locked()
|
||||||
try:
|
try:
|
||||||
for _ in range(retries):
|
for attempt in range(retries):
|
||||||
if self._background_error and on_main_thread:
|
if self._background_error and on_main_thread:
|
||||||
raise self._background_error
|
raise self._background_error
|
||||||
|
|
||||||
|
@ -441,7 +469,9 @@ class TelegramBareClient:
|
||||||
if result is not None:
|
if result is not None:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
self._logger.debug('RPC failed. Attempting reconnection.')
|
__log__.warning('Invoking %s failed %d times, '
|
||||||
|
'reconnecting and retrying',
|
||||||
|
[str(x) for x in requests], attempt + 1)
|
||||||
sleep(1)
|
sleep(1)
|
||||||
# The ReadThread has priority when attempting reconnection,
|
# The ReadThread has priority when attempting reconnection,
|
||||||
# since this thread is constantly running while __call__ is
|
# since this thread is constantly running while __call__ is
|
||||||
|
@ -475,11 +505,13 @@ class TelegramBareClient:
|
||||||
if not self.session.auth_key:
|
if not self.session.auth_key:
|
||||||
# New key, we need to tell the server we're going to use
|
# New key, we need to tell the server we're going to use
|
||||||
# the latest layer and initialize the connection doing so.
|
# the latest layer and initialize the connection doing so.
|
||||||
|
__log__.info('Need to generate new auth key before invoking')
|
||||||
self.session.auth_key, self.session.time_offset = \
|
self.session.auth_key, self.session.time_offset = \
|
||||||
authenticator.do_authentication(self._sender.connection)
|
authenticator.do_authentication(self._sender.connection)
|
||||||
init_connection = True
|
init_connection = True
|
||||||
|
|
||||||
if init_connection:
|
if init_connection:
|
||||||
|
__log__.info('Initializing a new connection while invoking')
|
||||||
if len(requests) == 1:
|
if len(requests) == 1:
|
||||||
requests = [self._wrap_init_connection(requests[0])]
|
requests = [self._wrap_init_connection(requests[0])]
|
||||||
else:
|
else:
|
||||||
|
@ -506,13 +538,14 @@ class TelegramBareClient:
|
||||||
sender.receive(update_state=update_state)
|
sender.receive(update_state=update_state)
|
||||||
|
|
||||||
except BrokenAuthKeyError:
|
except BrokenAuthKeyError:
|
||||||
self._logger.error('Broken auth key, a new one will be generated')
|
__log__.error('Authorization key seems broken and was invalid!')
|
||||||
self.session.auth_key = None
|
self.session.auth_key = None
|
||||||
|
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
pass # We will just retry
|
__log__.warning('Invoking timed out') # We will just retry
|
||||||
|
|
||||||
except ConnectionResetError:
|
except ConnectionResetError:
|
||||||
|
__log__.warning('Connection was reset while invoking')
|
||||||
if self._user_connected:
|
if self._user_connected:
|
||||||
# Server disconnected us, __call__ will try reconnecting.
|
# Server disconnected us, __call__ will try reconnecting.
|
||||||
return None
|
return None
|
||||||
|
@ -541,10 +574,6 @@ class TelegramBareClient:
|
||||||
|
|
||||||
except (PhoneMigrateError, NetworkMigrateError,
|
except (PhoneMigrateError, NetworkMigrateError,
|
||||||
UserMigrateError) as e:
|
UserMigrateError) as e:
|
||||||
self._logger.debug(
|
|
||||||
'DC error when invoking request, '
|
|
||||||
'attempting to reconnect at DC {}'.format(e.new_dc)
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO What happens with the background thread here?
|
# TODO What happens with the background thread here?
|
||||||
# For normal use cases, this won't happen, because this will only
|
# For normal use cases, this won't happen, because this will only
|
||||||
|
@ -555,17 +584,13 @@ class TelegramBareClient:
|
||||||
|
|
||||||
except ServerError as e:
|
except ServerError as e:
|
||||||
# Telegram is having some issues, just retry
|
# Telegram is having some issues, just retry
|
||||||
self._logger.debug(
|
__log__.error('Telegram servers are having internal errors %s', e)
|
||||||
'[ERROR] Telegram is having some internal issues', e
|
|
||||||
)
|
|
||||||
|
|
||||||
except FloodWaitError as e:
|
except FloodWaitError as e:
|
||||||
|
__log__.warning('Request invoked too often, wait %ds', e.seconds)
|
||||||
if e.seconds > self.session.flood_sleep_threshold | 0:
|
if e.seconds > self.session.flood_sleep_threshold | 0:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
self._logger.debug(
|
|
||||||
'Sleep of %d seconds below threshold, sleeping' % e.seconds
|
|
||||||
)
|
|
||||||
sleep(e.seconds)
|
sleep(e.seconds)
|
||||||
|
|
||||||
# Some really basic functionality
|
# Some really basic functionality
|
||||||
|
@ -628,6 +653,8 @@ class TelegramBareClient:
|
||||||
file_id = utils.generate_random_long()
|
file_id = utils.generate_random_long()
|
||||||
hash_md5 = md5()
|
hash_md5 = md5()
|
||||||
|
|
||||||
|
__log__.info('Uploading file of %d bytes in %d chunks of %d',
|
||||||
|
file_size, part_count, part_size)
|
||||||
stream = open(file, 'rb') if isinstance(file, str) else BytesIO(file)
|
stream = open(file, 'rb') if isinstance(file, str) else BytesIO(file)
|
||||||
try:
|
try:
|
||||||
for part_index in range(part_count):
|
for part_index in range(part_count):
|
||||||
|
@ -644,6 +671,7 @@ class TelegramBareClient:
|
||||||
|
|
||||||
result = self(request)
|
result = self(request)
|
||||||
if result:
|
if result:
|
||||||
|
__log__.debug('Uploaded %d/%d', part_index, part_count)
|
||||||
if not is_large:
|
if not is_large:
|
||||||
# No need to update the hash if it's a large file
|
# No need to update the hash if it's a large file
|
||||||
hash_md5.update(part)
|
hash_md5.update(part)
|
||||||
|
@ -712,6 +740,7 @@ class TelegramBareClient:
|
||||||
client = self
|
client = self
|
||||||
cdn_decrypter = None
|
cdn_decrypter = None
|
||||||
|
|
||||||
|
__log__.info('Downloading file in chunks of %d bytes', part_size)
|
||||||
try:
|
try:
|
||||||
offset = 0
|
offset = 0
|
||||||
while True:
|
while True:
|
||||||
|
@ -724,12 +753,14 @@ class TelegramBareClient:
|
||||||
))
|
))
|
||||||
|
|
||||||
if isinstance(result, FileCdnRedirect):
|
if isinstance(result, FileCdnRedirect):
|
||||||
|
__log__.info('File lives in a CDN')
|
||||||
cdn_decrypter, result = \
|
cdn_decrypter, result = \
|
||||||
CdnDecrypter.prepare_decrypter(
|
CdnDecrypter.prepare_decrypter(
|
||||||
client, self._get_cdn_client(result), result
|
client, self._get_cdn_client(result), result
|
||||||
)
|
)
|
||||||
|
|
||||||
except FileMigrateError as e:
|
except FileMigrateError as e:
|
||||||
|
__log__.info('File lives in another DC')
|
||||||
client = self._get_exported_client(e.new_dc)
|
client = self._get_exported_client(e.new_dc)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -742,6 +773,7 @@ class TelegramBareClient:
|
||||||
return getattr(result, 'type', '')
|
return getattr(result, 'type', '')
|
||||||
|
|
||||||
f.write(result.bytes)
|
f.write(result.bytes)
|
||||||
|
__log__.debug('Saved %d more bytes', len(result.bytes))
|
||||||
if progress_callback:
|
if progress_callback:
|
||||||
progress_callback(f.tell(), file_size)
|
progress_callback(f.tell(), file_size)
|
||||||
finally:
|
finally:
|
||||||
|
@ -803,7 +835,6 @@ class TelegramBareClient:
|
||||||
if self._user_connected:
|
if self._user_connected:
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
else:
|
else:
|
||||||
self._logger.debug('Forcing exit...')
|
|
||||||
os._exit(1)
|
os._exit(1)
|
||||||
|
|
||||||
def idle(self, stop_signals=(SIGINT, SIGTERM, SIGABRT)):
|
def idle(self, stop_signals=(SIGINT, SIGTERM, SIGABRT)):
|
||||||
|
@ -824,6 +855,11 @@ class TelegramBareClient:
|
||||||
for sig in stop_signals:
|
for sig in stop_signals:
|
||||||
signal(sig, self._signal_handler)
|
signal(sig, self._signal_handler)
|
||||||
|
|
||||||
|
if self._on_read_thread():
|
||||||
|
__log__.info('Starting to wait for items from the network')
|
||||||
|
else:
|
||||||
|
__log__.info('Idling to receive items from the network')
|
||||||
|
|
||||||
while self._user_connected:
|
while self._user_connected:
|
||||||
try:
|
try:
|
||||||
if datetime.now() > self._last_ping + self._ping_delay:
|
if datetime.now() > self._last_ping + self._ping_delay:
|
||||||
|
@ -832,16 +868,21 @@ class TelegramBareClient:
|
||||||
))
|
))
|
||||||
self._last_ping = datetime.now()
|
self._last_ping = datetime.now()
|
||||||
|
|
||||||
|
__log__.debug('Receiving items from the network...')
|
||||||
self._sender.receive(update_state=self.updates)
|
self._sender.receive(update_state=self.updates)
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
# No problem.
|
# No problem
|
||||||
pass
|
__log__.info('Receiving items from the network timed out')
|
||||||
except ConnectionResetError:
|
except ConnectionResetError:
|
||||||
self._logger.debug('Server disconnected us. Reconnecting...')
|
if self._user_connected:
|
||||||
|
__log__.error('Connection was reset while receiving '
|
||||||
|
'items. Reconnecting')
|
||||||
with self._reconnect_lock:
|
with self._reconnect_lock:
|
||||||
while self._user_connected and not self._reconnect():
|
while self._user_connected and not self._reconnect():
|
||||||
sleep(0.1) # Retry forever, this is instant messaging
|
sleep(0.1) # Retry forever, this is instant messaging
|
||||||
|
|
||||||
|
__log__.info('Connection closed by the user, not reading anymore')
|
||||||
|
|
||||||
# By using this approach, another thread will be
|
# By using this approach, another thread will be
|
||||||
# created and started upon connection to constantly read
|
# created and started upon connection to constantly read
|
||||||
# from the other end. Otherwise, manual calls to .receive()
|
# from the other end. Otherwise, manual calls to .receive()
|
||||||
|
@ -857,10 +898,9 @@ class TelegramBareClient:
|
||||||
try:
|
try:
|
||||||
self.idle(stop_signals=tuple())
|
self.idle(stop_signals=tuple())
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
|
__log__.exception('Unknown exception in the read thread! '
|
||||||
|
'Disconnecting and leaving it to main thread')
|
||||||
# Unknown exception, pass it to the main thread
|
# Unknown exception, pass it to the main thread
|
||||||
self._logger.exception(
|
|
||||||
'Unknown error on the read thread, please report'
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import socks
|
import socks
|
||||||
|
|
|
@ -7,6 +7,8 @@ from threading import RLock, Thread
|
||||||
|
|
||||||
from .tl import types as tl
|
from .tl import types as tl
|
||||||
|
|
||||||
|
__log__ = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class UpdateState:
|
class UpdateState:
|
||||||
"""Used to hold the current state of processed updates.
|
"""Used to hold the current state of processed updates.
|
||||||
|
@ -30,8 +32,6 @@ class UpdateState:
|
||||||
self._updates = Queue()
|
self._updates = Queue()
|
||||||
self._latest_updates = deque(maxlen=10)
|
self._latest_updates = deque(maxlen=10)
|
||||||
|
|
||||||
self._logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# https://core.telegram.org/api/updates
|
# https://core.telegram.org/api/updates
|
||||||
self._state = tl.updates.State(0, 0, datetime.now(), 0, 0)
|
self._state = tl.updates.State(0, 0, datetime.now(), 0, 0)
|
||||||
|
|
||||||
|
@ -115,9 +115,7 @@ class UpdateState:
|
||||||
break
|
break
|
||||||
except:
|
except:
|
||||||
# We don't want to crash a worker thread due to any reason
|
# We don't want to crash a worker thread due to any reason
|
||||||
self._logger.exception(
|
__log__.exception('Unhandled exception on worker %d', wid)
|
||||||
'[ERROR] Unhandled exception on worker {}'.format(wid)
|
|
||||||
)
|
|
||||||
|
|
||||||
def process(self, update):
|
def process(self, update):
|
||||||
"""Processes an update object. This method is normally called by
|
"""Processes an update object. This method is normally called by
|
||||||
|
@ -128,11 +126,13 @@ class UpdateState:
|
||||||
|
|
||||||
with self._updates_lock:
|
with self._updates_lock:
|
||||||
if isinstance(update, tl.updates.State):
|
if isinstance(update, tl.updates.State):
|
||||||
|
__log__.debug('Saved new updates state')
|
||||||
self._state = update
|
self._state = update
|
||||||
return # Nothing else to be done
|
return # Nothing else to be done
|
||||||
|
|
||||||
pts = getattr(update, 'pts', self._state.pts)
|
pts = getattr(update, 'pts', self._state.pts)
|
||||||
if hasattr(update, 'pts') and pts <= self._state.pts:
|
if hasattr(update, 'pts') and pts <= self._state.pts:
|
||||||
|
__log__.info('Ignoring %s, already have it', update)
|
||||||
return # We already handled this update
|
return # We already handled this update
|
||||||
|
|
||||||
self._state.pts = pts
|
self._state.pts = pts
|
||||||
|
@ -153,6 +153,7 @@ class UpdateState:
|
||||||
"""
|
"""
|
||||||
data = pickle.dumps(update.to_dict())
|
data = pickle.dumps(update.to_dict())
|
||||||
if data in self._latest_updates:
|
if data in self._latest_updates:
|
||||||
|
__log__.info('Ignoring %s, already have it', update)
|
||||||
return # Duplicated too
|
return # Duplicated too
|
||||||
|
|
||||||
self._latest_updates.append(data)
|
self._latest_updates.append(data)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user