From adfe861e9fd8f1262143c72b70fc6af13cf7c103 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 9 Jun 2018 11:34:01 +0200 Subject: [PATCH] Create a self-contained MTProtoState This frees us from using entire Session objects in something that's supposed to just send and receive items from the net. --- telethon/helpers.py | 76 -------------- telethon/network/mtprotosender.py | 116 ++++++++++------------ telethon/network/mtprotostate.py | 158 ++++++++++++++++++++++++++++++ telethon/tl/message_container.py | 4 +- telethon/tl/tl_message.py | 18 +++- 5 files changed, 226 insertions(+), 146 deletions(-) create mode 100644 telethon/network/mtprotostate.py diff --git a/telethon/helpers.py b/telethon/helpers.py index c76f93ec..de66813f 100644 --- a/telethon/helpers.py +++ b/telethon/helpers.py @@ -1,12 +1,7 @@ """Various helpers not related to the Telegram API itself""" import os -import struct from hashlib import sha1, sha256 -from .crypto import AES -from .errors import SecurityError, BrokenAuthKeyError -from .extensions import BinaryReader - # region Multiple utilities @@ -27,77 +22,6 @@ def ensure_parent_dir_exists(file_path): # region Cryptographic related utils -def pack_message(session, message): - """Packs a message following MtProto 2.0 guidelines""" - # See https://core.telegram.org/mtproto/description - data = struct.pack(' = MessageContainer; """ __log__.debug('Handling container') - for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader): - next_position = reader.tell_position() + inner_len - await self._process_message(inner_msg_id, seq, reader) - reader.set_position(next_position) # Ensure reading correctly + for inner_message in MessageContainer.iter_read(reader): + with BinaryReader(inner_message.body) as inner_reader: + await self._process_message(inner_message, inner_reader) - async def _handle_gzip_packed(self, msg_id, seq, reader): + async def _handle_gzip_packed(self, message, reader): """ Unpacks the data from a gzipped object and processes it: @@ -448,16 +437,16 @@ class MTProtoSender: """ __log__.debug('Handling gzipped data') with BinaryReader(GzipPacked.read(reader)) as compressed_reader: - await self._process_message(msg_id, seq, compressed_reader) + await self._process_message(message, compressed_reader) - async def _handle_update(self, msg_id, seq, reader): + async def _handle_update(self, message, reader): obj = reader.tgread_object() __log__.debug('Handling update {}'.format(obj.__class__.__name__)) # TODO Further handling of the update - self.session.process_entities(obj) + # TODO Process entities - async def _handle_pong(self, msg_id, seq, reader): + async def _handle_pong(self, message, reader): """ Handles pong results, which don't come inside a ``rpc_result`` but are still sent through a request: @@ -470,7 +459,7 @@ class MTProtoSender: if message: message.future.set_result(pong) - async def _handle_bad_server_salt(self, msg_id, seq, reader): + async def _handle_bad_server_salt(self, message, reader): """ Corrects the currently used server salt to use the right value before enqueuing the rejected message to be re-sent: @@ -480,11 +469,10 @@ class MTProtoSender: """ __log__.debug('Handling bad salt') bad_salt = reader.tgread_object() - self.session.salt = bad_salt.new_server_salt - self.session.save() + self.state.salt = bad_salt.new_server_salt await self._send_queue.put(self._pending_messages[bad_salt.bad_msg_id]) - async def _handle_bad_notification(self, msg_id, seq, reader): + async def _handle_bad_notification(self, message, reader): """ Adjusts the current state to be correct based on the received bad message notification whenever possible: @@ -497,14 +485,14 @@ class MTProtoSender: if bad_msg.error_code in (16, 17): # Sent msg_id too low or too high (respectively). # Use the current msg_id to determine the right time offset. - self.session.update_time_offset(correct_msg_id=msg_id) + self.state.update_time_offset(correct_msg_id=message.msg_id) elif bad_msg.error_code == 32: # msg_seqno too low, so just pump it up by some "large" amount # TODO A better fix would be to start with a new fresh session ID - self.session.sequence += 64 + self.state._sequence += 64 elif bad_msg.error_code == 33: # msg_seqno too high never seems to happen but just in case - self.session.sequence -= 16 + self.state._sequence -= 16 else: msg = self._pending_messages.pop(bad_msg.bad_msg_id, None) if msg: @@ -514,7 +502,7 @@ class MTProtoSender: # Messages are to be re-sent once we've corrected the issue await self._send_queue.put(self._pending_messages[bad_msg.bad_msg_id]) - async def _handle_detailed_info(self, msg_id, seq, reader): + async def _handle_detailed_info(self, message, reader): """ Updates the current status with the received detailed information: @@ -525,7 +513,7 @@ class MTProtoSender: __log__.debug('Handling detailed info') self._pending_ack.add(reader.tgread_object().answer_msg_id) - async def _handle_new_detailed_info(self, msg_id, seq, reader): + async def _handle_new_detailed_info(self, message, reader): """ Updates the current status with the received detailed information: @@ -536,7 +524,7 @@ class MTProtoSender: __log__.debug('Handling new detailed info') self._pending_ack.add(reader.tgread_object().answer_msg_id) - async def _handle_new_session_created(self, msg_id, seq, reader): + async def _handle_new_session_created(self, message, reader): """ Updates the current status with the received session information: @@ -545,7 +533,7 @@ class MTProtoSender: """ # TODO https://goo.gl/LMyN7A __log__.debug('Handling new session created') - self.session.salt = reader.tgread_object().server_salt + self.state.salt = reader.tgread_object().server_salt def _clean_containers(self, msg_ids): """ @@ -564,7 +552,7 @@ class MTProtoSender: del self._pending_messages[message.msg_id] break - async def _handle_ack(self, msg_id, seq, reader): + async def _handle_ack(self, message, reader): """ Handles a server acknowledge about our messages. Normally these can be ignored except in the case of ``auth.logOut``: @@ -590,7 +578,7 @@ class MTProtoSender: del self._pending_messages[msg_id] msg.future.set_result(True) - async def _handle_future_salts(self, msg_id, seq, reader): + async def _handle_future_salts(self, message, reader): """ Handles future salt results, which don't come inside a ``rpc_result`` but are still sent through a request: @@ -602,7 +590,7 @@ class MTProtoSender: # correct one whenever the salt in use expires. __log__.debug('Handling future salts') salts = reader.tgread_object() - msg = self._pending_messages.pop(msg_id, None) + msg = self._pending_messages.pop(message.msg_id, None) if msg: msg.future.set_result(salts) diff --git a/telethon/network/mtprotostate.py b/telethon/network/mtprotostate.py new file mode 100644 index 00000000..7c37f14b --- /dev/null +++ b/telethon/network/mtprotostate.py @@ -0,0 +1,158 @@ +import os +import struct +import time +from hashlib import sha256 + +from ..crypto import AES +from ..errors import SecurityError, BrokenAuthKeyError +from ..extensions import BinaryReader +from ..tl import TLMessage + + +class MTProtoState: + """ + `telethon.network.mtprotosender.MTProtoSender` needs to hold a state + in order to be able to encrypt and decrypt incoming/outgoing messages, + as well as generating the message IDs. Instances of this class hold + together all the required information. + + It doesn't make sense to use `telethon.sessions.abstract.Session` for + the sender because the sender should *not* be concerned about storing + this information to disk, as one may create as many senders as they + desire to any other data center, or some CDN. Using the same session + for all these is not a good idea as each need their own authkey, and + the concept of "copying" sessions with the unnecessary entities or + updates state for these connections doesn't make sense. + """ + def __init__(self, auth_key): + # Session IDs can be random on every connection + self.id = struct.unpack('q', os.urandom(8))[0] + self.auth_key = auth_key + self.time_offset = 0 + self.salt = 0 + self._sequence = 0 + self._last_msg_id = 0 + + def create_message(self, request, after=None): + """ + Creates a new `telethon.tl.tl_message.TLMessage` from + the given `telethon.tl.tlobject.TLObject` instance. + """ + return TLMessage( + msg_id=self._get_new_msg_id(), + seq_no=self._get_seq_no(request.content_related), + request=request, + after_id=after.msg_id if after else None + ) + + @staticmethod + def _calc_key(auth_key, msg_key, client): + """ + Calculate the key based on Telegram guidelines for MTProto 2, + specifying whether it's the client or not. See + https://core.telegram.org/mtproto/description#defining-aes-key-and-initialization-vector + """ + x = 0 if client else 8 + sha256a = sha256(msg_key + auth_key[x: x + 36]).digest() + sha256b = sha256(auth_key[x + 40:x + 76] + msg_key).digest() + + aes_key = sha256a[:8] + sha256b[8:24] + sha256a[24:32] + aes_iv = sha256b[:8] + sha256a[8:24] + sha256b[24:32] + + return aes_key, aes_iv + + def pack_message(self, message): + """ + Packs the given `telethon.tl.tl_message.TLMessage` using the + current authorization key following MTProto 2.0 guidelines. + + See https://core.telegram.org/mtproto/description. + """ + data = struct.pack('= new_msg_id: + new_msg_id = self._last_msg_id + 4 + + self._last_msg_id = new_msg_id + return new_msg_id + + def update_time_offset(self, correct_msg_id): + """ + Updates the time offset to the correct + one given a known valid message ID. + """ + now = int(time.time()) + correct = correct_msg_id >> 32 + self.time_offset = correct - now + self._last_msg_id = 0 + + def _get_seq_no(self, content_related): + """ + Generates the next sequence number depending on whether + it should be for a content-related query or not. + """ + if content_related: + result = self._sequence * 2 + 1 + self._sequence += 1 + return result + else: + return self._sequence * 2 diff --git a/telethon/tl/message_container.py b/telethon/tl/message_container.py index 58fb8021..acd51bb4 100644 --- a/telethon/tl/message_container.py +++ b/telethon/tl/message_container.py @@ -1,6 +1,7 @@ import struct from . import TLObject +from .tl_message import TLMessage class MessageContainer(TLObject): @@ -33,7 +34,8 @@ class MessageContainer(TLObject): inner_msg_id = reader.read_long() inner_sequence = reader.read_int() inner_length = reader.read_int() - yield inner_msg_id, inner_sequence, inner_length + yield TLMessage(inner_msg_id, inner_sequence, + body=reader.read(inner_length)) def __str__(self): return TLObject.pretty_format(self) diff --git a/telethon/tl/tl_message.py b/telethon/tl/tl_message.py index da144a91..e37902dc 100644 --- a/telethon/tl/tl_message.py +++ b/telethon/tl/tl_message.py @@ -20,15 +20,23 @@ class TLMessage(TLObject): sent `TLMessage`, and this result can be represented as a `Future` that will eventually be set with either a result, error or cancelled. """ - def __init__(self, session, request, after_id=None): + def __init__(self, msg_id, seq_no, body=None, request=None, after_id=0): super().__init__() - del self.content_related - self.msg_id = session.get_new_msg_id() - self.seq_no = session.generate_sequence(request.content_related) - self.request = request + self.msg_id = msg_id + self.seq_no = seq_no self.container_msg_id = None self.future = asyncio.Future() + # TODO Perhaps it's possible to merge body and request? + # We need things like rpc_result and gzip_packed to + # be readable by the ``BinaryReader`` for such purpose. + + # Used for incoming, not-decoded messages + self.body = body + + # Used for outgoing, not-encoded messages + self.request = request + # After which message ID this one should run. We do this so # InvokeAfterMsgRequest is transparent to the user and we can # easily invoke after while confirming the original request.