import gzip import logging import struct from threading import RLock from .. import helpers as utils from ..crypto import AES from ..errors import ( BadMessageError, InvalidChecksumError, BrokenAuthKeyError, rpc_message_to_error ) from ..extensions import BinaryReader from ..tl import Message, MessageContainer, GzipPacked from ..tl.all_tlobjects import tlobjects from ..tl.types import MsgsAck logging.getLogger(__name__).addHandler(logging.NullHandler()) class MtProtoSender: """MTProto Mobile Protocol sender (https://core.telegram.org/mtproto/description) """ def __init__(self, session, connection): """Creates a new MtProtoSender configured to send messages through 'connection' and using the parameters from 'session'. """ self.session = session self.connection = connection self._logger = logging.getLogger(__name__) # Message IDs that need confirmation self._need_confirmation = [] # Requests (as msg_id: Message) sent waiting to be received self._pending_receive = {} # Sending and receiving are independent, but two threads cannot # send or receive at the same time no matter what. self._send_lock = RLock() self._recv_lock = RLock() # Used when logging out, the only request that seems to use 'ack' # TODO There might be a better way to handle msgs_ack requests self.logging_out = False def connect(self): """Connects to the server""" self.connection.connect() def is_connected(self): return self.connection.is_connected() def disconnect(self): """Disconnects from the server""" self.connection.close() self._need_confirmation.clear() self._clear_all_pending() self.logging_out = False # region Send and receive def send(self, *requests): """Sends the specified MTProtoRequest, previously sending any message which needed confirmation.""" # If any message needs confirmation send an AckRequest first self._send_acknowledges() # Finally send our packed request(s) messages = [Message(self.session, r) for r in requests] self._pending_receive.update({m.msg_id: m for m in messages}) if len(messages) == 1: message = messages[0] else: message = Message(self.session, MessageContainer(messages)) self._send_message(message) def _send_acknowledges(self): """Sends a messages acknowledge for all those who _need_confirmation""" if self._need_confirmation: self._send_message( Message(self.session, MsgsAck(self._need_confirmation)) ) del self._need_confirmation[:] def receive(self, update_state): """Receives a single message from the connected endpoint. This method returns nothing, and will only affect other parts of the MtProtoSender such as the updates callback being fired or a pending request being confirmed. Any unhandled object (likely updates) will be passed to update_state.process(TLObject). """ with self._recv_lock: try: body = self.connection.recv() except (BufferError, InvalidChecksumError): # TODO BufferError, we should spot the cause... # "No more bytes left"; something wrong happened, clear # everything to be on the safe side, or: # # "This packet should be skipped"; since this may have # been a result for a request, invalidate every request # and just re-invoke them to avoid problems self._clear_all_pending() return message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: self._process_msg(remote_msg_id, remote_seq, reader, update_state) # endregion # region Low level processing def _send_message(self, message): """Sends the given Message(TLObject) encrypted through the network""" plain_text = \ struct.pack('