From 984f483b983ecb81ac68b32be7bf03e9bc5d7444 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Sun, 3 Dec 2017 02:11:50 +0300 Subject: [PATCH] Handle updates and other refactoring --- telethon/network/mtproto_sender.py | 64 +++++++++++++----------------- telethon/telegram_bare_client.py | 9 ++++- telethon/tl/session.py | 6 ++- 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 332180da..0253f925 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -15,8 +15,8 @@ from ..tl import TLMessage, MessageContainer, GzipPacked from ..tl.all_tlobjects import tlobjects from ..tl.types import ( MsgsAck, Pong, BadServerSalt, BadMsgNotification, - MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo -) + MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo, + RpcError) from ..tl.functions.auth import LogOutRequest logging.getLogger(__name__).addHandler(logging.NullHandler()) @@ -32,14 +32,17 @@ class MtProtoSender: in parallel, so thread-safety (hence locking) isn't needed. """ - def __init__(self, session, connection, loop=None): + def __init__(self, session, connection, updates_handler, loop=None): """Creates a new MtProtoSender configured to send messages through 'connection' and using the parameters from 'session'. """ self.session = session self.connection = connection + self.updates_handler = updates_handler self._loop = loop if loop else asyncio.get_event_loop() self._logger = logging.getLogger(__name__) + self._read_lock = asyncio.Lock(loop=self._loop) + self._write_lock = asyncio.Lock(loop=self._loop) # Requests (as msg_id: Message) sent waiting to be received self._pending_receive = {} @@ -56,10 +59,6 @@ class MtProtoSender: self.connection.close() self._clear_all_pending() - def clone(self): - """Creates a copy of this MtProtoSender as a new connection""" - return MtProtoSender(self.session, self.connection.clone(), self._loop) - # region Send and receive async def send(self, *requests): @@ -93,7 +92,7 @@ class MtProtoSender: """Sends a message acknowledge for the given msg_id""" await self._send_message(TLMessage(self.session, MsgsAck([msg_id]))) - async def receive(self, update_state): + async def receive(self): """Receives a single message from the connected endpoint. This method returns nothing, and will only affect other parts @@ -103,6 +102,7 @@ class MtProtoSender: Any unhandled object (likely updates) will be passed to update_state.process(TLObject). """ + await self._read_lock.acquire() try: body = await self.connection.recv() except (BufferError, InvalidChecksumError): @@ -115,10 +115,12 @@ class MtProtoSender: # and just re-invoke them to avoid problems self._clear_all_pending() return + finally: + self._read_lock.release() message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: - await self._process_msg(remote_msg_id, remote_seq, reader, update_state) + await self._process_msg(remote_msg_id, remote_seq, reader) await self._send_acknowledge(remote_msg_id) # endregion @@ -129,7 +131,7 @@ class MtProtoSender: """Sends the given Message(TLObject) encrypted through the network""" plain_text = \ - struct.pack('