Telethon/telethon/network/mtprotosender.py

636 lines
25 KiB
Python
Raw Normal View History

import asyncio
import logging
2018-06-07 17:32:12 +03:00
from . import MTProtoPlainSender, authenticator
from .connection import ConnectionTcpFull
from .. import helpers, utils
2018-06-07 17:32:12 +03:00
from ..errors import (
BadMessageError, TypeNotFoundError, BrokenAuthKeyError, SecurityError,
rpc_message_to_error
)
from ..extensions import BinaryReader
from ..tl import TLMessage, MessageContainer, GzipPacked
from ..tl.functions.auth import LogOutRequest
from ..tl.types import (
MsgsAck, Pong, BadServerSalt, BadMsgNotification, FutureSalts,
MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo
)
__log__ = logging.getLogger(__name__)
# TODO Create some kind of "ReconnectionPolicy" that allows specifying
# what should be done in case of some errors, with some sane defaults.
# For instance, should all messages be set with an error upon network
# loss? Should we try reconnecting forever? A certain amount of times?
# A timeout? What about recoverable errors, like connection reset?
class MTProtoSender:
"""
MTProto Mobile Protocol sender
(https://core.telegram.org/mtproto/description).
This class is responsible for wrapping requests into `TLMessage`'s,
sending them over the network and receiving them in a safe manner.
Automatic reconnection due to temporary network issues is a concern
for this class as well, including retry of messages that could not
be sent successfully.
A new authorization key will be generated on connection if no other
key exists yet.
"""
2018-06-08 21:50:53 +03:00
def __init__(self, session, retries=5):
self.session = session
self._connection = ConnectionTcpFull()
self._ip = None
self._port = None
2018-06-08 21:50:53 +03:00
self._retries = retries
# Whether the user has explicitly connected or disconnected.
#
# If a disconnection happens for any other reason and it
# was *not* user action then the pending messages won't
# be cleared but on explicit user disconnection all the
# pending futures should be cancelled.
self._user_connected = False
self._reconnecting = False
# Send and receive calls must be atomic
self._send_lock = asyncio.Lock()
self._recv_lock = asyncio.Lock()
# We need to join the loops upon disconnection
self._send_loop_handle = None
self._recv_loop_handle = None
# Sending something shouldn't block
self._send_queue = _ContainerQueue()
# Telegram responds to messages out of order. Keep
# {id: Message} to set their Future result upon arrival.
self._pending_messages = {}
# Containers are accepted or rejected as a whole when any of
2018-06-07 19:01:18 +03:00
# its inner requests are acknowledged. For this purpose we
# all the sent containers here.
self._pending_containers = []
# We need to acknowledge every response from Telegram
self._pending_ack = set()
# Jump table from response ID to method that handles it
self._handlers = {
0xf35c6d01: self._handle_rpc_result,
MessageContainer.CONSTRUCTOR_ID: self._handle_container,
GzipPacked.CONSTRUCTOR_ID: self._handle_gzip_packed,
Pong.CONSTRUCTOR_ID: self._handle_pong,
BadServerSalt.CONSTRUCTOR_ID: self._handle_bad_server_salt,
BadMsgNotification.CONSTRUCTOR_ID: self._handle_bad_notification,
MsgDetailedInfo.CONSTRUCTOR_ID: self._handle_detailed_info,
MsgNewDetailedInfo.CONSTRUCTOR_ID: self._handle_new_detailed_info,
NewSessionCreated.CONSTRUCTOR_ID: self._handle_new_session_created,
MsgsAck.CONSTRUCTOR_ID: self._handle_ack,
FutureSalts.CONSTRUCTOR_ID: self._handle_future_salts
}
# Public API
async def connect(self, ip, port):
"""
Connects to the specified ``ip:port``, and generates a new
authorization key for the `MTProtoSender.session` if it does
not exist yet.
"""
if self._user_connected:
2018-06-08 22:13:14 +03:00
__log__.info('User is already connected!')
return
self._ip = ip
self._port = port
self._user_connected = True
await self._connect()
async def disconnect(self):
"""
Cleanly disconnects the instance from the network, cancels
all pending requests, and closes the send and receive loops.
"""
if not self._user_connected:
2018-06-08 22:13:14 +03:00
__log__.info('User is already disconnected!')
return
2018-06-08 22:13:14 +03:00
__log__.info('Disconnecting from {}...'.format(self._ip))
self._user_connected = False
try:
2018-06-08 22:13:14 +03:00
__log__.debug('Closing current connection...')
async with self._send_lock:
await self._connection.close()
finally:
2018-06-08 22:13:14 +03:00
__log__.debug('Cancelling {} pending message(s)...'
.format(len(self._pending_messages)))
for message in self._pending_messages.values():
message.future.cancel()
self._pending_messages.clear()
2018-06-07 15:16:47 +03:00
self._pending_ack.clear()
2018-06-08 22:13:14 +03:00
__log__.debug('Cancelling the send loop...')
self._send_loop_handle.cancel()
2018-06-08 22:13:14 +03:00
__log__.debug('Cancelling the receive loop...')
self._recv_loop_handle.cancel()
2018-06-08 22:13:14 +03:00
__log__.info('Disconnection from {} complete!'.format(self._ip))
async def send(self, request, ordered=False):
2018-06-07 11:30:20 +03:00
"""
This method enqueues the given request to be sent.
The request will be wrapped inside a `TLMessage` until its
response arrives, and the `Future` response of the `TLMessage`
is immediately returned so that one can further ``await`` it:
.. code-block:: python
async def method():
# Sending (enqueued for the send loop)
future = await sender.send(request)
# Receiving (waits for the receive loop to read the result)
result = await future
Designed like this because Telegram may send the response at
any point, and it can send other items while one waits for it.
Once the response for this future arrives, it is set with the
received result, quite similar to how a ``receive()`` call
would otherwise work.
Since the receiving part is "built in" the future, it's
impossible to await receive a result that was never sent.
"""
# TODO Perhaps this method should be synchronous and just return
# a `Future` that you need to further ``await`` instead of the
# currently double ``await (await send())``?
if utils.is_list_like(request):
if not ordered:
# False-y values must be None to do after_id = ordered and ...
ordered = None
result = []
after_id = None
for r in request:
message = TLMessage(self.session, r, after_id=after_id)
self._pending_messages[message.msg_id] = message
after_id = ordered and message.msg_id
await self._send_queue.put(message)
result.append(message.future)
return result
else:
message = TLMessage(self.session, request)
self._pending_messages[message.msg_id] = message
await self._send_queue.put(message)
return message.future
2018-06-08 22:18:15 +03:00
# Private methods
async def _connect(self):
"""
Performs the actual connection, retrying, generating the
authorization key if necessary, and starting the send and
receive loops.
"""
__log__.info('Connecting to {}:{}...'.format(self._ip, self._port))
_last_error = ConnectionError()
for retry in range(1, self._retries + 1):
try:
__log__.debug('Connection attempt {}...'.format(retry))
async with self._send_lock:
await self._connection.connect(self._ip, self._port)
except OSError as e:
_last_error = e
__log__.warning('Attempt {} at connecting failed: {}'
.format(retry, e))
else:
break
else:
raise _last_error
__log__.debug('Connection success!')
if self.session.auth_key is None:
_last_error = SecurityError()
plain = MTProtoPlainSender(self._connection)
for retry in range(1, self._retries + 1):
try:
__log__.debug('New auth_key attempt {}...'.format(retry))
self.session.auth_key, self.session.time_offset =\
await authenticator.do_authentication(plain)
except (SecurityError, AssertionError) as e:
_last_error = e
__log__.warning('Attempt {} at new auth_key failed: {}'
.format(retry, e))
else:
break
else:
raise _last_error
__log__.debug('Starting send loop')
self._send_loop_handle = asyncio.ensure_future(self._send_loop())
__log__.debug('Starting receive loop')
self._recv_loop_handle = asyncio.ensure_future(self._recv_loop())
__log__.info('Connection to {} complete!'.format(self._ip))
async def _reconnect(self):
"""
Cleanly disconnects and then reconnects.
"""
self._reconnecting = True
__log__.debug('Awaiting for the send loop before reconnecting...')
await self._send_loop_handle
__log__.debug('Awaiting for the receive loop before reconnecting...')
await self._recv_loop_handle
__log__.debug('Closing current connection...')
async with self._send_lock:
await self._connection.close()
self._reconnecting = False
await self._connect()
# Loops
async def _send_loop(self):
"""
This loop is responsible for popping items off the send
queue, encrypting them, and sending them over the network.
Besides `connect`, only this method ever sends data.
"""
while self._user_connected and not self._reconnecting:
2018-06-07 15:16:47 +03:00
if self._pending_ack:
await self._send_queue.put(TLMessage(
self.session, MsgsAck(list(self._pending_ack))))
self._pending_ack.clear()
2018-06-07 19:01:18 +03:00
messages = await self._send_queue.get()
if isinstance(messages, list):
message = TLMessage(self.session, MessageContainer(messages))
self._pending_messages[message.msg_id] = message
self._pending_containers.append(message)
2018-06-07 19:01:18 +03:00
else:
message = messages
messages = [message]
2018-06-08 22:13:14 +03:00
__log__.debug('Packing {} outgoing message(s)...'
.format(len(messages)))
body = helpers.pack_message(self.session, message)
2018-06-07 19:01:18 +03:00
while not any(m.future.cancelled() for m in messages):
try:
async with self._send_lock:
2018-06-08 22:13:14 +03:00
__log__.debug('Sending {} bytes...', len(body))
2018-06-07 19:01:18 +03:00
await self._connection.send(body)
break
# TODO Are there more exceptions besides timeout?
except asyncio.TimeoutError:
continue
else:
# Remove the cancelled messages from pending
2018-06-08 22:13:14 +03:00
__log__.info('Some futures were cancelled, aborted send')
2018-06-07 19:01:18 +03:00
self._clean_containers([m.msg_id for m in messages])
for m in messages:
if m.future.cancelled():
self._pending_messages.pop(m.msg_id, None)
else:
await self._send_queue.put(m)
2018-06-08 22:13:14 +03:00
__log__.debug('Outgoing messages sent!')
async def _recv_loop(self):
"""
This loop is responsible for reading all incoming responses
from the network, decrypting and handling or dispatching them.
Besides `connect`, only this method ever receives data.
"""
while self._user_connected and not self._reconnecting:
2018-06-07 19:01:18 +03:00
# TODO Are there more exceptions besides timeout?
# Disconnecting or switching off WiFi only resulted in
# timeouts, and once the network was back it continued
# on its own after a short delay.
try:
2018-06-08 22:13:14 +03:00
__log__.debug('Receiving items from the network...')
2018-06-07 19:01:18 +03:00
async with self._recv_lock:
body = await self._connection.recv()
except asyncio.TimeoutError:
2018-06-08 22:13:14 +03:00
# TODO If nothing is received for a minute, send a request
2018-06-07 19:01:18 +03:00
continue
2018-06-08 22:13:14 +03:00
except ConnectionError as e:
__log__.info('Connection reset while receiving: {}'.format(e))
asyncio.ensure_future(self._reconnect())
break
# TODO Check salt, session_id and sequence_number
2018-06-08 22:13:14 +03:00
__log__.debug('Decoding packet of {} bytes...'.format(len(body)))
2018-06-07 17:32:12 +03:00
try:
message, remote_msg_id, remote_seq =\
helpers.unpack_message(self.session, body)
2018-06-08 22:13:14 +03:00
except (BrokenAuthKeyError, BufferError) as e:
# The authorization key may be broken if a message was
# sent malformed, or if the authkey truly is corrupted.
#
# There may be a buffer error if Telegram's response was too
# short and hence not understood. Reset the authorization key
# and try again in either case.
#
# TODO Is it possible to detect malformed messages vs
# an actually broken authkey?
2018-06-08 22:13:14 +03:00
__log__.warning('Broken authorization key?: {}'.format(e))
self.session.auth_key = None
asyncio.ensure_future(self._reconnect())
break
2018-06-08 22:13:14 +03:00
except SecurityError as e:
# A step while decoding had the incorrect data. This message
# should not be considered safe and it should be ignored.
2018-06-08 22:13:14 +03:00
__log__.warning('Security error while unpacking a '
'received message:'.format(e))
continue
2018-06-07 17:32:12 +03:00
else:
2018-06-08 22:13:14 +03:00
try:
with BinaryReader(message) as reader:
await self._process_message(
remote_msg_id, remote_seq, reader)
except TypeNotFoundError as e:
__log__.warning('Could not decode received message: {}, '
'raw bytes: {!r}'.format(e, message))
# Response Handlers
async def _process_message(self, msg_id, seq, reader):
"""
Adds the given message to the list of messages that must be
acknowledged and dispatches control to different ``_handle_*``
method based on its type.
"""
self._pending_ack.add(msg_id)
code = reader.read_int(signed=False)
reader.seek(-4)
handler = self._handlers.get(code, self._handle_update)
await handler(msg_id, seq, reader)
async def _handle_rpc_result(self, msg_id, seq, reader):
"""
Handles the result for Remote Procedure Calls:
rpc_result#f35c6d01 req_msg_id:long result:bytes = RpcResult;
This is where the future results for sent requests are set.
"""
# TODO Don't make this a special cased object
reader.read_int(signed=False) # code
message_id = reader.read_long()
inner_code = reader.read_int(signed=False)
reader.seek(-4)
2018-06-08 22:13:14 +03:00
__log__.debug('Handling RPC result for message {}'.format(message_id))
2018-06-07 11:30:20 +03:00
message = self._pending_messages.pop(message_id, None)
if inner_code == 0x2144ca19: # RPC Error
reader.seek(4)
if self.session.report_errors and message:
error = rpc_message_to_error(
reader.read_int(), reader.tgread_string(),
report_method=type(message.request).CONSTRUCTOR_ID
)
else:
error = rpc_message_to_error(
reader.read_int(), reader.tgread_string()
)
2018-06-07 11:30:20 +03:00
await self._send_queue.put(
TLMessage(self.session, MsgsAck([msg_id])))
if not message.future.cancelled():
message.future.set_exception(error)
return
elif message:
if inner_code == GzipPacked.CONSTRUCTOR_ID:
with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
2018-06-07 11:30:20 +03:00
result = message.request.read_result(compressed_reader)
else:
2018-06-07 11:30:20 +03:00
result = message.request.read_result(reader)
self.session.process_entities(result)
2018-06-07 11:30:20 +03:00
if not message.future.cancelled():
message.future.set_result(result)
return
else:
# TODO We should not get responses to things we never sent
2018-06-08 22:13:14 +03:00
__log__.info('Received response without parent request: {}'
.format(reader.tgread_object()))
async def _handle_container(self, msg_id, seq, reader):
"""
Processes the inner messages of a container with many of them:
msg_container#73f1f8dc messages:vector<%Message> = MessageContainer;
"""
2018-06-08 22:13:14 +03:00
__log__.debug('Handling container')
for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader):
next_position = reader.tell_position() + inner_len
await self._process_message(inner_msg_id, seq, reader)
reader.set_position(next_position) # Ensure reading correctly
async def _handle_gzip_packed(self, msg_id, seq, reader):
"""
Unpacks the data from a gzipped object and processes it:
gzip_packed#3072cfa1 packed_data:bytes = Object;
"""
2018-06-08 22:13:14 +03:00
__log__.debug('Handling gzipped data')
with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
await self._process_message(msg_id, seq, compressed_reader)
async def _handle_update(self, msg_id, seq, reader):
2018-06-08 22:13:14 +03:00
obj = reader.tgread_object()
__log__.debug('Handling update {}'.format(obj.__class__.__name__))
# TODO Further handling of the update
self.session.process_entities(obj)
async def _handle_pong(self, msg_id, seq, reader):
"""
Handles pong results, which don't come inside a ``rpc_result``
but are still sent through a request:
pong#347773c5 msg_id:long ping_id:long = Pong;
"""
2018-06-08 22:13:14 +03:00
__log__.debug('Handling pong')
pong = reader.tgread_object()
message = self._pending_messages.pop(pong.msg_id, None)
if message:
message.future.set_result(pong)
async def _handle_bad_server_salt(self, msg_id, seq, reader):
"""
Corrects the currently used server salt to use the right value
before enqueuing the rejected message to be re-sent:
bad_server_salt#edab447b bad_msg_id:long bad_msg_seqno:int
error_code:int new_server_salt:long = BadMsgNotification;
"""
2018-06-08 22:13:14 +03:00
__log__.debug('Handling bad salt')
bad_salt = reader.tgread_object()
self.session.salt = bad_salt.new_server_salt
self.session.save()
await self._send_queue.put(self._pending_messages[bad_salt.bad_msg_id])
async def _handle_bad_notification(self, msg_id, seq, reader):
"""
Adjusts the current state to be correct based on the
received bad message notification whenever possible:
bad_msg_notification#a7eff811 bad_msg_id:long bad_msg_seqno:int
error_code:int = BadMsgNotification;
"""
2018-06-08 22:13:14 +03:00
__log__.debug('Handling bad message')
bad_msg = reader.tgread_object()
if bad_msg.error_code in (16, 17):
# Sent msg_id too low or too high (respectively).
# Use the current msg_id to determine the right time offset.
self.session.update_time_offset(correct_msg_id=msg_id)
elif bad_msg.error_code == 32:
# msg_seqno too low, so just pump it up by some "large" amount
# TODO A better fix would be to start with a new fresh session ID
self.session.sequence += 64
elif bad_msg.error_code == 33:
# msg_seqno too high never seems to happen but just in case
self.session.sequence -= 16
else:
msg = self._pending_messages.pop(bad_msg.bad_msg_id, None)
if msg:
msg.future.set_exception(BadMessageError(bad_msg.error_code))
return
# Messages are to be re-sent once we've corrected the issue
await self._send_queue.put(self._pending_messages[bad_msg.bad_msg_id])
async def _handle_detailed_info(self, msg_id, seq, reader):
"""
Updates the current status with the received detailed information:
msg_detailed_info#276d3ec6 msg_id:long answer_msg_id:long
bytes:int status:int = MsgDetailedInfo;
"""
# TODO https://goo.gl/VvpCC6
2018-06-08 22:13:14 +03:00
__log__.debug('Handling detailed info')
self._pending_ack.add(reader.tgread_object().answer_msg_id)
async def _handle_new_detailed_info(self, msg_id, seq, reader):
"""
Updates the current status with the received detailed information:
msg_new_detailed_info#809db6df answer_msg_id:long
bytes:int status:int = MsgDetailedInfo;
"""
# TODO https://goo.gl/G7DPsR
2018-06-08 22:13:14 +03:00
__log__.debug('Handling new detailed info')
self._pending_ack.add(reader.tgread_object().answer_msg_id)
async def _handle_new_session_created(self, msg_id, seq, reader):
"""
Updates the current status with the received session information:
new_session_created#9ec20908 first_msg_id:long unique_id:long
server_salt:long = NewSession;
"""
# TODO https://goo.gl/LMyN7A
2018-06-08 22:13:14 +03:00
__log__.debug('Handling new session created')
self.session.salt = reader.tgread_object().server_salt
def _clean_containers(self, msg_ids):
"""
Helper method to clean containers from the pending messages
once a wrapped msg_id of them has been acknowledged.
This is the only way we can resend TLMessage(MessageContainer)
on bad notifications and also mark them as received once any
of their inner TLMessage is acknowledged.
"""
for i in reversed(range(len(self._pending_containers))):
message = self._pending_containers[i]
for msg in message.request.messages:
if msg.msg_id in msg_ids:
del self._pending_containers[i]
del self._pending_messages[message.msg_id]
break
async def _handle_ack(self, msg_id, seq, reader):
"""
Handles a server acknowledge about our messages. Normally
these can be ignored except in the case of ``auth.logOut``:
auth.logOut#5717da40 = Bool;
Telegram doesn't seem to send its result so we need to confirm
it manually. No other request is known to have this behaviour.
Since the ID of sent messages consisting of a container is
never returned (unless on a bad notification), this method
also removes containers messages when any of their inner
messages are acknowledged.
"""
2018-06-08 22:13:14 +03:00
__log__.debug('Handling acknowledge')
ack = reader.tgread_object()
if self._pending_containers:
self._clean_containers(ack.msg_ids)
for msg_id in ack.msg_ids:
msg = self._pending_messages.get(msg_id, None)
if msg and isinstance(msg.request, LogOutRequest):
del self._pending_messages[msg_id]
msg.future.set_result(True)
async def _handle_future_salts(self, msg_id, seq, reader):
"""
Handles future salt results, which don't come inside a
``rpc_result`` but are still sent through a request:
future_salts#ae500895 req_msg_id:long now:int
salts:vector<future_salt> = FutureSalts;
"""
# TODO save these salts and automatically adjust to the
# correct one whenever the salt in use expires.
2018-06-08 22:13:14 +03:00
__log__.debug('Handling future salts')
salts = reader.tgread_object()
msg = self._pending_messages.pop(msg_id, None)
if msg:
msg.future.set_result(salts)
class _ContainerQueue(asyncio.Queue):
"""
An asyncio queue that's aware of `MessageContainer` instances.
The `get` method returns either a single `TLMessage` or a list
of them that should be turned into a new `MessageContainer`.
Instances of this class can be replaced with the simpler
``asyncio.Queue`` when needed for testing purposes, and
a list won't be returned in said case.
"""
async def get(self):
result = await super().get()
if self.empty() or isinstance(result.request, MessageContainer):
return result
result = [result]
while not self.empty():
item = self.get_nowait()
if isinstance(item.request, MessageContainer):
await self.put(item)
break
else:
result.append(item)
return result