mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2024-11-29 21:03:45 +03:00
Make heavy use of logging
This commit is contained in:
parent
e36517845a
commit
6766c4eea9
|
@ -102,6 +102,7 @@ class MTProtoSender:
|
||||||
not exist yet.
|
not exist yet.
|
||||||
"""
|
"""
|
||||||
if self._user_connected:
|
if self._user_connected:
|
||||||
|
__log__.info('User is already connected!')
|
||||||
return
|
return
|
||||||
|
|
||||||
self._ip = ip
|
self._ip = ip
|
||||||
|
@ -110,42 +111,59 @@ class MTProtoSender:
|
||||||
await self._connect()
|
await self._connect()
|
||||||
|
|
||||||
async def _connect(self):
|
async def _connect(self):
|
||||||
|
__log__.info('Connecting to {}:{}...'.format(self._ip, self._port))
|
||||||
_last_error = ConnectionError()
|
_last_error = ConnectionError()
|
||||||
for _ in range(self._retries):
|
for retry in range(1, self._retries + 1):
|
||||||
try:
|
try:
|
||||||
|
__log__.debug('Connection attempt {}...'.format(retry))
|
||||||
async with self._send_lock:
|
async with self._send_lock:
|
||||||
await self._connection.connect(self._ip, self._port)
|
await self._connection.connect(self._ip, self._port)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
_last_error = e
|
_last_error = e
|
||||||
|
__log__.warning('Attempt {} at connecting failed: {}'
|
||||||
|
.format(retry, e))
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
raise _last_error
|
raise _last_error
|
||||||
|
|
||||||
|
__log__.debug('Connection success!')
|
||||||
if self.session.auth_key is None:
|
if self.session.auth_key is None:
|
||||||
_last_error = SecurityError()
|
_last_error = SecurityError()
|
||||||
plain = MTProtoPlainSender(self._connection)
|
plain = MTProtoPlainSender(self._connection)
|
||||||
for _ in range(self._retries):
|
for retry in range(1, self._retries + 1):
|
||||||
try:
|
try:
|
||||||
|
__log__.debug('New auth_key attempt {}...'.format(retry))
|
||||||
self.session.auth_key, self.session.time_offset =\
|
self.session.auth_key, self.session.time_offset =\
|
||||||
await authenticator.do_authentication(plain)
|
await authenticator.do_authentication(plain)
|
||||||
except (SecurityError, AssertionError) as e:
|
except (SecurityError, AssertionError) as e:
|
||||||
_last_error = e
|
_last_error = e
|
||||||
|
__log__.warning('Attempt {} at new auth_key failed: {}'
|
||||||
|
.format(retry, e))
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
raise _last_error
|
raise _last_error
|
||||||
|
|
||||||
|
__log__.debug('Starting send loop')
|
||||||
self._send_loop_handle = asyncio.ensure_future(self._send_loop())
|
self._send_loop_handle = asyncio.ensure_future(self._send_loop())
|
||||||
|
__log__.debug('Starting receive loop')
|
||||||
self._recv_loop_handle = asyncio.ensure_future(self._recv_loop())
|
self._recv_loop_handle = asyncio.ensure_future(self._recv_loop())
|
||||||
|
__log__.info('Connection to {} complete!'.format(self._ip))
|
||||||
|
|
||||||
async def _reconnect(self):
|
async def _reconnect(self):
|
||||||
"""
|
"""
|
||||||
Cleanly disconnects and then reconnects.
|
Cleanly disconnects and then reconnects.
|
||||||
"""
|
"""
|
||||||
self._reconnecting = True
|
self._reconnecting = True
|
||||||
|
|
||||||
|
__log__.debug('Awaiting for the send loop before reconnecting...')
|
||||||
await self._send_loop_handle
|
await self._send_loop_handle
|
||||||
|
|
||||||
|
__log__.debug('Awaiting for the receive loop before reconnecting...')
|
||||||
await self._recv_loop_handle
|
await self._recv_loop_handle
|
||||||
|
|
||||||
|
__log__.debug('Closing current connection...')
|
||||||
async with self._send_lock:
|
async with self._send_lock:
|
||||||
await self._connection.close()
|
await self._connection.close()
|
||||||
|
|
||||||
|
@ -158,21 +176,32 @@ class MTProtoSender:
|
||||||
all pending requests, and closes the send and receive loops.
|
all pending requests, and closes the send and receive loops.
|
||||||
"""
|
"""
|
||||||
if not self._user_connected:
|
if not self._user_connected:
|
||||||
|
__log__.info('User is already disconnected!')
|
||||||
return
|
return
|
||||||
|
|
||||||
|
__log__.info('Disconnecting from {}...'.format(self._ip))
|
||||||
self._user_connected = False
|
self._user_connected = False
|
||||||
try:
|
try:
|
||||||
|
__log__.debug('Closing current connection...')
|
||||||
async with self._send_lock:
|
async with self._send_lock:
|
||||||
await self._connection.close()
|
await self._connection.close()
|
||||||
finally:
|
finally:
|
||||||
|
__log__.debug('Cancelling {} pending message(s)...'
|
||||||
|
.format(len(self._pending_messages)))
|
||||||
for message in self._pending_messages.values():
|
for message in self._pending_messages.values():
|
||||||
message.future.cancel()
|
message.future.cancel()
|
||||||
|
|
||||||
self._pending_messages.clear()
|
self._pending_messages.clear()
|
||||||
self._pending_ack.clear()
|
self._pending_ack.clear()
|
||||||
|
|
||||||
|
__log__.debug('Cancelling the send loop...')
|
||||||
self._send_loop_handle.cancel()
|
self._send_loop_handle.cancel()
|
||||||
|
|
||||||
|
__log__.debug('Cancelling the receive loop...')
|
||||||
self._recv_loop_handle.cancel()
|
self._recv_loop_handle.cancel()
|
||||||
|
|
||||||
|
__log__.info('Disconnection from {} complete!'.format(self._ip))
|
||||||
|
|
||||||
async def send(self, request, ordered=False):
|
async def send(self, request, ordered=False):
|
||||||
"""
|
"""
|
||||||
This method enqueues the given request to be sent.
|
This method enqueues the given request to be sent.
|
||||||
|
@ -245,11 +274,14 @@ class MTProtoSender:
|
||||||
message = messages
|
message = messages
|
||||||
messages = [message]
|
messages = [message]
|
||||||
|
|
||||||
|
__log__.debug('Packing {} outgoing message(s)...'
|
||||||
|
.format(len(messages)))
|
||||||
body = helpers.pack_message(self.session, message)
|
body = helpers.pack_message(self.session, message)
|
||||||
|
|
||||||
while not any(m.future.cancelled() for m in messages):
|
while not any(m.future.cancelled() for m in messages):
|
||||||
try:
|
try:
|
||||||
async with self._send_lock:
|
async with self._send_lock:
|
||||||
|
__log__.debug('Sending {} bytes...', len(body))
|
||||||
await self._connection.send(body)
|
await self._connection.send(body)
|
||||||
break
|
break
|
||||||
# TODO Are there more exceptions besides timeout?
|
# TODO Are there more exceptions besides timeout?
|
||||||
|
@ -257,6 +289,7 @@ class MTProtoSender:
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
# Remove the cancelled messages from pending
|
# Remove the cancelled messages from pending
|
||||||
|
__log__.info('Some futures were cancelled, aborted send')
|
||||||
self._clean_containers([m.msg_id for m in messages])
|
self._clean_containers([m.msg_id for m in messages])
|
||||||
for m in messages:
|
for m in messages:
|
||||||
if m.future.cancelled():
|
if m.future.cancelled():
|
||||||
|
@ -264,6 +297,8 @@ class MTProtoSender:
|
||||||
else:
|
else:
|
||||||
await self._send_queue.put(m)
|
await self._send_queue.put(m)
|
||||||
|
|
||||||
|
__log__.debug('Outgoing messages sent!')
|
||||||
|
|
||||||
async def _recv_loop(self):
|
async def _recv_loop(self):
|
||||||
"""
|
"""
|
||||||
This loop is responsible for reading all incoming responses
|
This loop is responsible for reading all incoming responses
|
||||||
|
@ -277,19 +312,23 @@ class MTProtoSender:
|
||||||
# timeouts, and once the network was back it continued
|
# timeouts, and once the network was back it continued
|
||||||
# on its own after a short delay.
|
# on its own after a short delay.
|
||||||
try:
|
try:
|
||||||
|
__log__.debug('Receiving items from the network...')
|
||||||
async with self._recv_lock:
|
async with self._recv_lock:
|
||||||
body = await self._connection.recv()
|
body = await self._connection.recv()
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
|
# TODO If nothing is received for a minute, send a request
|
||||||
continue
|
continue
|
||||||
except ConnectionError:
|
except ConnectionError as e:
|
||||||
|
__log__.info('Connection reset while receiving: {}'.format(e))
|
||||||
asyncio.ensure_future(self._reconnect())
|
asyncio.ensure_future(self._reconnect())
|
||||||
break
|
break
|
||||||
|
|
||||||
# TODO Check salt, session_id and sequence_number
|
# TODO Check salt, session_id and sequence_number
|
||||||
|
__log__.debug('Decoding packet of {} bytes...'.format(len(body)))
|
||||||
try:
|
try:
|
||||||
message, remote_msg_id, remote_seq =\
|
message, remote_msg_id, remote_seq =\
|
||||||
helpers.unpack_message(self.session, body)
|
helpers.unpack_message(self.session, body)
|
||||||
except (BrokenAuthKeyError, BufferError):
|
except (BrokenAuthKeyError, BufferError) as e:
|
||||||
# The authorization key may be broken if a message was
|
# The authorization key may be broken if a message was
|
||||||
# sent malformed, or if the authkey truly is corrupted.
|
# sent malformed, or if the authkey truly is corrupted.
|
||||||
#
|
#
|
||||||
|
@ -299,18 +338,24 @@ class MTProtoSender:
|
||||||
#
|
#
|
||||||
# TODO Is it possible to detect malformed messages vs
|
# TODO Is it possible to detect malformed messages vs
|
||||||
# an actually broken authkey?
|
# an actually broken authkey?
|
||||||
|
__log__.warning('Broken authorization key?: {}'.format(e))
|
||||||
self.session.auth_key = None
|
self.session.auth_key = None
|
||||||
asyncio.ensure_future(self._reconnect())
|
asyncio.ensure_future(self._reconnect())
|
||||||
break
|
break
|
||||||
except SecurityError:
|
except SecurityError as e:
|
||||||
# A step while decoding had the incorrect data. This message
|
# A step while decoding had the incorrect data. This message
|
||||||
# should not be considered safe and it should be ignored.
|
# should not be considered safe and it should be ignored.
|
||||||
# TODO Maybe we should check if the message was decoded OK
|
__log__.warning('Security error while unpacking a '
|
||||||
|
'received message:'.format(e))
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
with BinaryReader(message) as reader:
|
try:
|
||||||
await self._process_message(
|
with BinaryReader(message) as reader:
|
||||||
remote_msg_id, remote_seq, reader)
|
await self._process_message(
|
||||||
|
remote_msg_id, remote_seq, reader)
|
||||||
|
except TypeNotFoundError as e:
|
||||||
|
__log__.warning('Could not decode received message: {}, '
|
||||||
|
'raw bytes: {!r}'.format(e, message))
|
||||||
|
|
||||||
# Response Handlers
|
# Response Handlers
|
||||||
|
|
||||||
|
@ -340,6 +385,7 @@ class MTProtoSender:
|
||||||
inner_code = reader.read_int(signed=False)
|
inner_code = reader.read_int(signed=False)
|
||||||
reader.seek(-4)
|
reader.seek(-4)
|
||||||
|
|
||||||
|
__log__.debug('Handling RPC result for message {}'.format(message_id))
|
||||||
message = self._pending_messages.pop(message_id, None)
|
message = self._pending_messages.pop(message_id, None)
|
||||||
if inner_code == 0x2144ca19: # RPC Error
|
if inner_code == 0x2144ca19: # RPC Error
|
||||||
reader.seek(4)
|
reader.seek(4)
|
||||||
|
@ -372,14 +418,8 @@ class MTProtoSender:
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
# TODO We should not get responses to things we never sent
|
# TODO We should not get responses to things we never sent
|
||||||
try:
|
__log__.info('Received response without parent request: {}'
|
||||||
if inner_code == GzipPacked.CONSTRUCTOR_ID:
|
.format(reader.tgread_object()))
|
||||||
with BinaryReader(GzipPacked.read(reader)) as creader:
|
|
||||||
obj = creader.tgread_object()
|
|
||||||
else:
|
|
||||||
obj = reader.tgread_object()
|
|
||||||
except TypeNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def _handle_container(self, msg_id, seq, reader):
|
async def _handle_container(self, msg_id, seq, reader):
|
||||||
"""
|
"""
|
||||||
|
@ -387,6 +427,7 @@ class MTProtoSender:
|
||||||
|
|
||||||
msg_container#73f1f8dc messages:vector<%Message> = MessageContainer;
|
msg_container#73f1f8dc messages:vector<%Message> = MessageContainer;
|
||||||
"""
|
"""
|
||||||
|
__log__.debug('Handling container')
|
||||||
for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader):
|
for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader):
|
||||||
next_position = reader.tell_position() + inner_len
|
next_position = reader.tell_position() + inner_len
|
||||||
await self._process_message(inner_msg_id, seq, reader)
|
await self._process_message(inner_msg_id, seq, reader)
|
||||||
|
@ -398,14 +439,13 @@ class MTProtoSender:
|
||||||
|
|
||||||
gzip_packed#3072cfa1 packed_data:bytes = Object;
|
gzip_packed#3072cfa1 packed_data:bytes = Object;
|
||||||
"""
|
"""
|
||||||
|
__log__.debug('Handling gzipped data')
|
||||||
with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
|
with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
|
||||||
await self._process_message(msg_id, seq, compressed_reader)
|
await self._process_message(msg_id, seq, compressed_reader)
|
||||||
|
|
||||||
async def _handle_update(self, msg_id, seq, reader):
|
async def _handle_update(self, msg_id, seq, reader):
|
||||||
try:
|
obj = reader.tgread_object()
|
||||||
obj = reader.tgread_object()
|
__log__.debug('Handling update {}'.format(obj.__class__.__name__))
|
||||||
except TypeNotFoundError:
|
|
||||||
return
|
|
||||||
|
|
||||||
# TODO Further handling of the update
|
# TODO Further handling of the update
|
||||||
self.session.process_entities(obj)
|
self.session.process_entities(obj)
|
||||||
|
@ -417,6 +457,7 @@ class MTProtoSender:
|
||||||
|
|
||||||
pong#347773c5 msg_id:long ping_id:long = Pong;
|
pong#347773c5 msg_id:long ping_id:long = Pong;
|
||||||
"""
|
"""
|
||||||
|
__log__.debug('Handling pong')
|
||||||
pong = reader.tgread_object()
|
pong = reader.tgread_object()
|
||||||
message = self._pending_messages.pop(pong.msg_id, None)
|
message = self._pending_messages.pop(pong.msg_id, None)
|
||||||
if message:
|
if message:
|
||||||
|
@ -430,6 +471,7 @@ class MTProtoSender:
|
||||||
bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int
|
bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int
|
||||||
error_code:int new_server_salt:long = BadMsgNotification;
|
error_code:int new_server_salt:long = BadMsgNotification;
|
||||||
"""
|
"""
|
||||||
|
__log__.debug('Handling bad salt')
|
||||||
bad_salt = reader.tgread_object()
|
bad_salt = reader.tgread_object()
|
||||||
self.session.salt = bad_salt.new_server_salt
|
self.session.salt = bad_salt.new_server_salt
|
||||||
self.session.save()
|
self.session.save()
|
||||||
|
@ -443,6 +485,7 @@ class MTProtoSender:
|
||||||
bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int
|
bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int
|
||||||
error_code:int = BadMsgNotification;
|
error_code:int = BadMsgNotification;
|
||||||
"""
|
"""
|
||||||
|
__log__.debug('Handling bad message')
|
||||||
bad_msg = reader.tgread_object()
|
bad_msg = reader.tgread_object()
|
||||||
if bad_msg.error_code in (16, 17):
|
if bad_msg.error_code in (16, 17):
|
||||||
# Sent msg_id too low or too high (respectively).
|
# Sent msg_id too low or too high (respectively).
|
||||||
|
@ -472,6 +515,7 @@ class MTProtoSender:
|
||||||
bytes:int status:int = MsgDetailedInfo;
|
bytes:int status:int = MsgDetailedInfo;
|
||||||
"""
|
"""
|
||||||
# TODO https://goo.gl/VvpCC6
|
# TODO https://goo.gl/VvpCC6
|
||||||
|
__log__.debug('Handling detailed info')
|
||||||
self._pending_ack.add(reader.tgread_object().answer_msg_id)
|
self._pending_ack.add(reader.tgread_object().answer_msg_id)
|
||||||
|
|
||||||
async def _handle_new_detailed_info(self, msg_id, seq, reader):
|
async def _handle_new_detailed_info(self, msg_id, seq, reader):
|
||||||
|
@ -482,6 +526,7 @@ class MTProtoSender:
|
||||||
bytes:int status:int = MsgDetailedInfo;
|
bytes:int status:int = MsgDetailedInfo;
|
||||||
"""
|
"""
|
||||||
# TODO https://goo.gl/G7DPsR
|
# TODO https://goo.gl/G7DPsR
|
||||||
|
__log__.debug('Handling new detailed info')
|
||||||
self._pending_ack.add(reader.tgread_object().answer_msg_id)
|
self._pending_ack.add(reader.tgread_object().answer_msg_id)
|
||||||
|
|
||||||
async def _handle_new_session_created(self, msg_id, seq, reader):
|
async def _handle_new_session_created(self, msg_id, seq, reader):
|
||||||
|
@ -492,6 +537,7 @@ class MTProtoSender:
|
||||||
server_salt:long = NewSession;
|
server_salt:long = NewSession;
|
||||||
"""
|
"""
|
||||||
# TODO https://goo.gl/LMyN7A
|
# TODO https://goo.gl/LMyN7A
|
||||||
|
__log__.debug('Handling new session created')
|
||||||
self.session.salt = reader.tgread_object().server_salt
|
self.session.salt = reader.tgread_object().server_salt
|
||||||
|
|
||||||
def _clean_containers(self, msg_ids):
|
def _clean_containers(self, msg_ids):
|
||||||
|
@ -526,6 +572,7 @@ class MTProtoSender:
|
||||||
also removes containers messages when any of their inner
|
also removes containers messages when any of their inner
|
||||||
messages are acknowledged.
|
messages are acknowledged.
|
||||||
"""
|
"""
|
||||||
|
__log__.debug('Handling acknowledge')
|
||||||
ack = reader.tgread_object()
|
ack = reader.tgread_object()
|
||||||
if self._pending_containers:
|
if self._pending_containers:
|
||||||
self._clean_containers(ack.msg_ids)
|
self._clean_containers(ack.msg_ids)
|
||||||
|
@ -546,6 +593,7 @@ class MTProtoSender:
|
||||||
"""
|
"""
|
||||||
# TODO save these salts and automatically adjust to the
|
# TODO save these salts and automatically adjust to the
|
||||||
# correct one whenever the salt in use expires.
|
# correct one whenever the salt in use expires.
|
||||||
|
__log__.debug('Handling future salts')
|
||||||
salts = reader.tgread_object()
|
salts = reader.tgread_object()
|
||||||
msg = self._pending_messages.pop(msg_id, None)
|
msg = self._pending_messages.pop(msg_id, None)
|
||||||
if msg:
|
if msg:
|
||||||
|
|
Loading…
Reference in New Issue
Block a user