diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 4345908e..602353c0 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -1,6 +1,6 @@ import gzip from datetime import timedelta -from threading import RLock +from threading import RLock, Thread from .. import helpers as utils from ..crypto import AES @@ -31,9 +31,16 @@ class MtProtoSender: # TODO There might be a better way to handle msgs_ack requests self.logging_out = False + # Reading and writing shouldn't be related. Call .recv() forever here. + # TODO Maybe this could be disabled with some "constant_read=bool". + self._recv_thread = Thread( + name='ReadThread', daemon=True, target=self._recv_thread_impl + ) + def connect(self): """Connects to the server""" self.connection.connect() + self._recv_thread.start() def is_connected(self): return self.connection.is_connected() @@ -76,57 +83,30 @@ class MtProtoSender: del self._need_confirmation[:] - def receive(self, request=None, updates=None, **kwargs): - """Receives the specified MTProtoRequest ("fills in it" - the received data). This also restores the updates thread. + def _recv_thread_impl(self): + while self.is_connected(): + try: + self._receive_message() + except TimeoutError: + # No problem. + pass + + def _receive_message(self, **kwargs): + """Receives a single message from the connected endpoint. An optional named parameter 'timeout' can be specified if one desires to override 'self.connection.timeout'. - - If 'request' is None, a single item will be read into - the 'updates' list (which cannot be None). - - If 'request' is not None, any update received before - reading the request's result will be put there unless - it's None, in which case updates will be ignored. """ - if request is None and updates is None: - raise ValueError('Both the "request" and "updates"' - 'parameters cannot be None at the same time.') + # TODO Don't ignore updates + self._logger.debug('Receiving a message...') + body = self.connection.recv(**kwargs) + message, remote_msg_id, remote_seq = self._decode_msg(body) - with self._lock: - self._logger.debug('receive() acquired the lock') - # Don't stop trying to receive until we get the request we wanted - # or, if there is no request, until we read an update - while (request and not request.confirm_received) or \ - (not request and not updates): - self._logger.debug('Trying to .receive() the request result...') - body = self.connection.recv(**kwargs) - message, remote_msg_id, remote_seq = self._decode_msg(body) + with BinaryReader(message) as reader: + self._process_msg( + remote_msg_id, remote_seq, reader, updates=None) - with BinaryReader(message) as reader: - self._process_msg( - remote_msg_id, remote_seq, reader, updates) - - # We're done receiving, remove the request from pending, if any - if request: - try: - self._pending_receive.remove(request) - except ValueError: pass - - self._logger.debug('Request result received') - self._logger.debug('receive() released the lock') - - def receive_updates(self, **kwargs): - """Wrapper for .receive(request=None, updates=[])""" - updates = [] - self.receive(updates=updates, **kwargs) - return updates - - def cancel_receive(self): - """Cancels any pending receive operation - by raising a ReadCancelledError""" - self.connection.cancel_receive() + self._logger.debug('Received message.') # endregion diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 22075dc5..716f70ab 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -1,5 +1,5 @@ import logging -import pyaes +from time import sleep from datetime import timedelta from hashlib import md5 from os import path @@ -318,7 +318,8 @@ class TelegramBareClient: try: self._sender.send(request) - self._sender.receive(request, updates=updates) + while not request.confirm_received: + sleep(0.1) # TODO Use a proper lock return request.result except ConnectionResetError: diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 3eb056da..5c09f415 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -184,9 +184,6 @@ class TelegramClient(TelegramBareClient): *args will be ignored. """ - if self._updates_thread_receiving.is_set(): - self._sender.cancel_receive() - try: self._lock.acquire() @@ -918,7 +915,8 @@ class TelegramClient(TelegramBareClient): else: self._updates_thread_running.clear() if self._updates_thread_receiving.is_set(): - self._sender.cancel_receive() + # self._sender.cancel_receive() + pass def _updates_thread_method(self): """This method will run until specified and listen for incoming updates""" @@ -944,7 +942,8 @@ class TelegramClient(TelegramBareClient): self._next_ping_at = time() + self.ping_interval self(PingRequest(utils.generate_random_long())) - updates = self._sender.receive_updates(timeout=timeout) + #updates = self._sender.receive_updates(timeout=timeout) + updates = [] self._updates_thread_receiving.clear() self._logger.debug(