From 49e884b005d71cf7db701dfcecea1b5d934b87a8 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 7 Sep 2017 18:25:17 +0200 Subject: [PATCH 1/8] Raise AssertionError if trying to invoke requests from ReadThread --- telethon/telegram_client.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index a7109ced..f35ac39a 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -205,6 +205,10 @@ class TelegramClient(TelegramBareClient): *args will be ignored. """ + if self._recv_thread is not None and \ + threading.get_ident() == self._recv_thread.ident: + raise AssertionError('Cannot invoke requests from the ReadThread') + try: self._lock.acquire() @@ -213,11 +217,10 @@ class TelegramClient(TelegramBareClient): # will be the one which should be reading (but is invoking the # request) thus not being available to read it "in the background" # and it's needed to call receive. - call_receive = self._recv_thread is None or \ - threading.get_ident() == self._recv_thread.ident - # TODO Retry if 'result' is None? - return super().invoke(request, call_receive=call_receive) + return super().invoke( + request, call_receive=self._recv_thread is None + ) except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: self._logger.debug('DC error when invoking request, ' From d4f36162cd0c29d4f97f818ac5c5ce04622fa4eb Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 7 Sep 2017 18:49:08 +0200 Subject: [PATCH 2/8] Create and use UpdateState to .process() unhandled TLObjects --- telethon/network/mtproto_sender.py | 47 ++++++++++++------------------ telethon/telegram_bare_client.py | 16 +++++----- telethon/telegram_client.py | 19 +++++++++--- telethon/update_state.py | 38 ++++++++++++++++++++++++ 4 files changed, 79 insertions(+), 41 deletions(-) create mode 100644 telethon/update_state.py diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index a8817a06..ef8b4794 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -35,16 +35,6 @@ class MtProtoSender: # TODO There might be a better way to handle msgs_ack requests self.logging_out = False - # Every unhandled result gets passed to these callbacks, which - # should be functions accepting a single parameter: a TLObject. - # This should only be Update(s), although it can actually be any type. - # - # The thread from which these callbacks are called can be any. - # - # The creator of the MtProtoSender is responsible for setting this - # to point to the list wherever their callbacks reside. - self.unhandled_callbacks = None - def connect(self): """Connects to the server""" self.connection.connect() @@ -90,12 +80,15 @@ class MtProtoSender: del self._need_confirmation[:] - def receive(self): + def receive(self, update_state): """Receives a single message from the connected endpoint. This method returns nothing, and will only affect other parts of the MtProtoSender such as the updates callback being fired or a pending request being confirmed. + + Any unhandled object (likely updates) will be passed to + update_state.process(TLObject). """ # TODO Don't ignore updates self._logger.debug('Receiving a message...') @@ -103,8 +96,7 @@ class MtProtoSender: message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: - self._process_msg( - remote_msg_id, remote_seq, reader, updates=None) + self._process_msg(remote_msg_id, remote_seq, reader, update_state) self._logger.debug('Received message.') @@ -172,7 +164,7 @@ class MtProtoSender: return message, remote_msg_id, remote_sequence - def _process_msg(self, msg_id, sequence, reader, updates): + def _process_msg(self, msg_id, sequence, reader, state): """Processes and handles a Telegram message. Returns True if the message was handled correctly and doesn't @@ -193,10 +185,10 @@ class MtProtoSender: return self._handle_pong(msg_id, sequence, reader) if code == 0x73f1f8dc: # msg_container - return self._handle_container(msg_id, sequence, reader, updates) + return self._handle_container(msg_id, sequence, reader, state) if code == 0x3072cfa1: # gzip_packed - return self._handle_gzip_packed(msg_id, sequence, reader, updates) + return self._handle_gzip_packed(msg_id, sequence, reader, state) if code == 0xedab447b: # bad_server_salt return self._handle_bad_server_salt(msg_id, sequence, reader) @@ -221,16 +213,15 @@ class MtProtoSender: # If the code is not parsed manually then it should be a TLObject. if code in tlobjects: result = reader.tgread_object() - if self.unhandled_callbacks: - self._logger.debug( - 'Passing TLObject to callbacks %s', repr(result) - ) - for callback in self.unhandled_callbacks: - callback(result) - else: + if state is None: self._logger.debug( 'Ignoring unhandled TLObject %s', repr(result) ) + else: + self._logger.debug( + 'Processing TLObject %s', repr(result) + ) + state.process(result) return True @@ -261,7 +252,7 @@ class MtProtoSender: return True - def _handle_container(self, msg_id, sequence, reader, updates): + def _handle_container(self, msg_id, sequence, reader, state): self._logger.debug('Handling container') reader.read_int(signed=False) # code size = reader.read_int() @@ -274,8 +265,7 @@ class MtProtoSender: # Note that this code is IMPORTANT for skipping RPC results of # lost requests (i.e., ones from the previous connection session) try: - if not self._process_msg( - inner_msg_id, sequence, reader, updates): + if not self._process_msg(inner_msg_id, sequence, reader, state): reader.set_position(begin_position + inner_length) except: # If any error is raised, something went wrong; skip the packet @@ -366,14 +356,13 @@ class MtProtoSender: self._logger.debug('Lost request will be skipped.') return False - def _handle_gzip_packed(self, msg_id, sequence, reader, updates): + def _handle_gzip_packed(self, msg_id, sequence, reader, state): self._logger.debug('Handling gzip packed data') reader.read_int(signed=False) # code packed_data = reader.tgread_bytes() unpacked_data = gzip.decompress(packed_data) with BinaryReader(unpacked_data) as compressed_reader: - return self._process_msg( - msg_id, sequence, compressed_reader, updates) + return self._process_msg(msg_id, sequence, compressed_reader, state) # endregion diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 8fd94a9d..1685d849 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -26,6 +26,7 @@ from .tl.functions.upload import ( ) from .tl.types import InputFile, InputFileBig from .tl.types.upload import FileCdnRedirect +from .update_state import UpdateState from .utils import get_appropriated_part_size @@ -56,7 +57,9 @@ class TelegramBareClient: def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, - proxy=None, timeout=timedelta(seconds=5)): + proxy=None, + enable_updates=False, + timeout=timedelta(seconds=5)): """Initializes the Telegram client with the specified API ID and Hash. Session must always be a Session instance, and an optional proxy can also be specified to be used on the connection. @@ -74,11 +77,9 @@ class TelegramBareClient: # the time since it's a (somewhat expensive) process. self._cached_clients = {} - # Update callbacks (functions accepting a single TLObject) go here - # - # Note that changing the list to which this variable points to - # will not reflect the changes on the existing senders. - self._update_callbacks = [] + # This member will process updates if enabled. + # One may change self.updates.enabled at any later point. + self.updates = UpdateState(enabled=enable_updates) # These will be set later self.dc_options = None @@ -127,7 +128,6 @@ class TelegramBareClient: self.session.save() self._sender = MtProtoSender(connection, self.session) - self._sender.unhandled_callbacks = self._update_callbacks self._sender.connect() # Now it's time to send an InitConnectionRequest @@ -312,7 +312,7 @@ class TelegramBareClient: request.confirm_received.wait() # TODO Socket's timeout here? else: while not request.confirm_received.is_set(): - self._sender.receive() + self._sender.receive(update_state=self.updates) if request.rpc_error: raise request.rpc_error diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index f35ac39a..ef83cdaa 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -56,6 +56,7 @@ class TelegramClient(TelegramBareClient): def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, + enable_updates=False, timeout=timedelta(seconds=5), **kwargs): """Initializes the Telegram client with the specified API ID and Hash. @@ -69,6 +70,12 @@ class TelegramClient(TelegramBareClient): This will only affect how messages are sent over the network and how much processing is required before sending them. + If 'enable_updates' is set to True, it will by default put + all updates on self.updates. NOTE that you must manually query + this from another thread or it will eventually fill up all your + memory. If you want to ignore updates, leave this set to False. + You may change self.updates.enabled at any later point. + If more named arguments are provided as **kwargs, they will be used to update the Session instance. Most common settings are: device_model = platform.node() @@ -92,7 +99,10 @@ class TelegramClient(TelegramBareClient): super().__init__( session, api_id, api_hash, - connection_mode=connection_mode, proxy=proxy, timeout=timeout + connection_mode=connection_mode, + proxy=proxy, + enable_updates=enable_updates, + timeout=timeout ) # Safety across multiple threads (for the updates thread) @@ -407,8 +417,6 @@ class TelegramClient(TelegramBareClient): no_webpage=not link_preview ) result = self(request) - for callback in self._update_callbacks: - callback(result) return request.random_id def get_message_history(self, @@ -899,12 +907,15 @@ class TelegramClient(TelegramBareClient): def add_update_handler(self, handler): """Adds an update handler (a function which takes a TLObject, an update, as its parameter) and listens for updates""" + return # TODO Implement self._update_callbacks.append(handler) def remove_update_handler(self, handler): + return # TODO Implement self._update_callbacks.remove(handler) def list_update_handlers(self): + return # TODO Implement return self._update_callbacks[:] # endregion @@ -921,7 +932,7 @@ class TelegramClient(TelegramBareClient): def _recv_thread_impl(self): while self._sender and self._sender.is_connected(): try: - self._sender.receive() + self._sender.receive(update_state=self.updates) except TimeoutError: # No problem. pass diff --git a/telethon/update_state.py b/telethon/update_state.py new file mode 100644 index 00000000..14d362a9 --- /dev/null +++ b/telethon/update_state.py @@ -0,0 +1,38 @@ +from threading import Lock, Event +from collections import deque + + +class UpdateState: + """Used to hold the current state of processed updates. + To retrieve an update, .pop_update() should be called. + """ + def __init__(self, enabled): + self.enabled = enabled + self._updates_lock = Lock() + self._updates_available = Event() + self._updates = deque() + + def has_any(self): + """Returns True if a call to .pop_update() won't lock""" + return self._updates_available.is_set() + + def pop(self): + """Pops an update or blocks until an update object is available""" + self._updates_available.wait() + with self._updates_lock: + update = self._updates.popleft() + if not self._updates: + self._updates_available.clear() + + return update + + def process(self, update): + """Processes an update object. This method is normally called by + the library itself. + """ + if not self.enabled: + return + + with self._updates_lock: + self._updates.append(update) + self._updates_available.set() From d23737520880010e6de068e6f600f2023e941ebd Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 7 Sep 2017 18:58:54 +0200 Subject: [PATCH 3/8] Allow adding callback methods to UpdateState --- telethon/telegram_client.py | 9 +++------ telethon/update_state.py | 13 ++++++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index ef83cdaa..545633e7 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -907,16 +907,13 @@ class TelegramClient(TelegramBareClient): def add_update_handler(self, handler): """Adds an update handler (a function which takes a TLObject, an update, as its parameter) and listens for updates""" - return # TODO Implement - self._update_callbacks.append(handler) + self.updates.handlers.append(handler) def remove_update_handler(self, handler): - return # TODO Implement - self._update_callbacks.remove(handler) + self.updates.handlers.remove(handler) def list_update_handlers(self): - return # TODO Implement - return self._update_callbacks[:] + return self.updates.handlers[:] # endregion diff --git a/telethon/update_state.py b/telethon/update_state.py index 14d362a9..fd1a0935 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -8,6 +8,7 @@ class UpdateState: """ def __init__(self, enabled): self.enabled = enabled + self.handlers = [] self._updates_lock = Lock() self._updates_available = Event() self._updates = deque() @@ -30,9 +31,11 @@ class UpdateState: """Processes an update object. This method is normally called by the library itself. """ - if not self.enabled: - return + for handler in self.handlers: + handler(update) + + if self.enabled: + with self._updates_lock: + self._updates.append(update) + self._updates_available.set() - with self._updates_lock: - self._updates.append(update) - self._updates_available.set() From b8e881b6b6281d2b787f1815b344979d29b748e7 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 7 Sep 2017 20:17:40 +0200 Subject: [PATCH 4/8] Add basic updates processing to ignore updates with lower .pts --- telethon/telegram_client.py | 14 ++++++++++- telethon/update_state.py | 25 +++++++++++++------ .../interactive_telegram_client.py | 4 ++- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 545633e7..73ebcdbf 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -1,8 +1,8 @@ import os +import threading from datetime import datetime, timedelta from mimetypes import guess_type from threading import RLock, Thread -import threading from . import TelegramBareClient from . import helpers as utils @@ -27,6 +27,9 @@ from .tl.functions.messages import ( GetDialogsRequest, GetHistoryRequest, ReadHistoryRequest, SendMediaRequest, SendMessageRequest ) +from .tl.functions.updates import ( + GetStateRequest +) from .tl.functions.users import ( GetUsersRequest ) @@ -155,6 +158,8 @@ class TelegramClient(TelegramBareClient): target=self._recv_thread_impl ) self._recv_thread.start() + if self.updates.enabled: + self.sync_updates() return ok @@ -904,6 +909,13 @@ class TelegramClient(TelegramBareClient): # region Updates handling + def sync_updates(self): + """Synchronizes self.updates to their initial state. Will be + called automatically on connection if self.updates.enabled = True, + otherwise it should be called manually after enabling updates. + """ + self.updates.process(self(GetStateRequest())) + def add_update_handler(self, handler): """Adds an update handler (a function which takes a TLObject, an update, as its parameter) and listens for updates""" diff --git a/telethon/update_state.py b/telethon/update_state.py index fd1a0935..6d0b40cd 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -1,5 +1,8 @@ -from threading import Lock, Event from collections import deque +from datetime import datetime +from threading import RLock, Event + +from .tl import types as tl class UpdateState: @@ -9,10 +12,13 @@ class UpdateState: def __init__(self, enabled): self.enabled = enabled self.handlers = [] - self._updates_lock = Lock() + self._updates_lock = RLock() self._updates_available = Event() self._updates = deque() + # https://core.telegram.org/api/updates + self._state = tl.updates.State(0, 0, datetime.now(), 0, 0) + def has_any(self): """Returns True if a call to .pop_update() won't lock""" return self._updates_available.is_set() @@ -31,11 +37,16 @@ class UpdateState: """Processes an update object. This method is normally called by the library itself. """ - for handler in self.handlers: - handler(update) + if not self.enabled: + return + + with self._updates_lock: + if isinstance(update, tl.updates.State): + self._state = update + elif not hasattr(update, 'pts') or update.pts > self._state.pts: + self._state.pts = getattr(update, 'pts', self._state.pts) + for handler in self.handlers: + handler(update) - if self.enabled: - with self._updates_lock: self._updates.append(update) self._updates_available.set() - diff --git a/telethon_examples/interactive_telegram_client.py b/telethon_examples/interactive_telegram_client.py index ae858cb7..f4ef57d4 100644 --- a/telethon_examples/interactive_telegram_client.py +++ b/telethon_examples/interactive_telegram_client.py @@ -51,7 +51,9 @@ class InteractiveTelegramClient(TelegramClient): print('Initializing interactive example...') super().__init__( session_user_id, api_id, api_hash, - connection_mode=ConnectionMode.TCP_ABRIDGED, proxy=proxy + connection_mode=ConnectionMode.TCP_ABRIDGED, + enable_updates=True, + proxy=proxy ) # Store all the found media in memory here, From a24b4020fe6da7327b1771d66563137a4982ddf0 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 7 Sep 2017 20:29:51 +0200 Subject: [PATCH 5/8] Allow adding update handlers without the need to poll updates --- telethon/telegram_bare_client.py | 3 ++- telethon/telegram_client.py | 19 ++++++++++++++----- telethon/update_state.py | 21 ++++++++++++++++----- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 1685d849..1ca4a6d9 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -59,6 +59,7 @@ class TelegramBareClient: connection_mode=ConnectionMode.TCP_FULL, proxy=None, enable_updates=False, + active_updates_polling=False, timeout=timedelta(seconds=5)): """Initializes the Telegram client with the specified API ID and Hash. Session must always be a Session instance, and an optional proxy @@ -79,7 +80,7 @@ class TelegramBareClient: # This member will process updates if enabled. # One may change self.updates.enabled at any later point. - self.updates = UpdateState(enabled=enable_updates) + self.updates = UpdateState(enable_updates, active_updates_polling) # These will be set later self.dc_options = None diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 73ebcdbf..f0add2d4 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -60,6 +60,7 @@ class TelegramClient(TelegramBareClient): connection_mode=ConnectionMode.TCP_FULL, proxy=None, enable_updates=False, + active_updates_polling=False, timeout=timedelta(seconds=5), **kwargs): """Initializes the Telegram client with the specified API ID and Hash. @@ -73,11 +74,18 @@ class TelegramClient(TelegramBareClient): This will only affect how messages are sent over the network and how much processing is required before sending them. - If 'enable_updates' is set to True, it will by default put - all updates on self.updates. NOTE that you must manually query - this from another thread or it will eventually fill up all your - memory. If you want to ignore updates, leave this set to False. - You may change self.updates.enabled at any later point. + If 'enable_updates' is set to True, it will process incoming + updates to ensure that no duplicates are received, and update + handlers will be invoked. You CANNOT invoke requests from within + these handlers. + + In order to invoke requests upon receiving an update, you must + have your own thread (or use the main thread) and enable set + 'active_updates_polling' to True. You must call self.updates.poll() + or you'll memory will be filled with unhandled updates. + + You can also modify 'self.updates.enabled' and + 'self.updates.set_polling()' at any later point. If more named arguments are provided as **kwargs, they will be used to update the Session instance. Most common settings are: @@ -105,6 +113,7 @@ class TelegramClient(TelegramBareClient): connection_mode=connection_mode, proxy=proxy, enable_updates=enable_updates, + active_updates_polling=active_updates_polling, timeout=timeout ) diff --git a/telethon/update_state.py b/telethon/update_state.py index 6d0b40cd..5ad17ab3 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -9,8 +9,9 @@ class UpdateState: """Used to hold the current state of processed updates. To retrieve an update, .pop_update() should be called. """ - def __init__(self, enabled): + def __init__(self, enabled, store_updates): self.enabled = enabled + self._store_updates = store_updates self.handlers = [] self._updates_lock = RLock() self._updates_available = Event() @@ -23,8 +24,11 @@ class UpdateState: """Returns True if a call to .pop_update() won't lock""" return self._updates_available.is_set() - def pop(self): - """Pops an update or blocks until an update object is available""" + def poll(self): + """Polls an update or blocks until an update object is available""" + if not self._store_updates: + raise ValueError('Polling updates is not enabled.') + self._updates_available.wait() with self._updates_lock: update = self._updates.popleft() @@ -33,6 +37,12 @@ class UpdateState: return update + def set_polling(self, store): + self._store_updates = store + if not store: + with self._updates_lock: + self._updates.clear() + def process(self, update): """Processes an update object. This method is normally called by the library itself. @@ -48,5 +58,6 @@ class UpdateState: for handler in self.handlers: handler(update) - self._updates.append(update) - self._updates_available.set() + if self._store_updates: + self._updates.append(update) + self._updates_available.set() From 16a5ab30703cd1f8ad4f3ce0c7af54db28b105be Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 7 Sep 2017 21:23:37 +0200 Subject: [PATCH 6/8] Add back the periodic PingRequest --- telethon/telegram_client.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index f0add2d4..3ac6e65f 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -13,6 +13,7 @@ from .errors import ( ) from .network import ConnectionMode from .tl import Session, TLObject +from .tl.functions import PingRequest from .tl.functions.account import ( GetPasswordRequest ) @@ -136,6 +137,10 @@ class TelegramClient(TelegramBareClient): # Constantly read for results and updates from within the main client self._recv_thread = None + # Default PingRequest delay + self._last_ping = datetime.now() + self._ping_delay = timedelta(minutes=1) + # endregion # region Connecting @@ -950,6 +955,12 @@ class TelegramClient(TelegramBareClient): def _recv_thread_impl(self): while self._sender and self._sender.is_connected(): try: + if datetime.now() > self._last_ping + self._ping_delay: + self._sender.send(PingRequest( + int.from_bytes(os.urandom(8), 'big', signed=True) + )) + self._last_ping = datetime.now() + self._sender.receive(update_state=self.updates) except TimeoutError: # No problem. From 25bbb20b0c234458792d5122f51a5e5cdc54137c Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 7 Sep 2017 21:32:46 +0200 Subject: [PATCH 7/8] Use RLocks properly on MtProtoSender (only needed on net IO) --- telethon/network/mtproto_sender.py | 44 ++++++++++++------------------ telethon/telegram_client.py | 13 +-------- 2 files changed, 19 insertions(+), 38 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index ef8b4794..674a4e03 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -28,8 +28,10 @@ class MtProtoSender: self._need_confirmation = [] # Message IDs that need confirmation self._pending_receive = [] # Requests sent waiting to be received - # Store an RLock instance to make this class safely multi-threaded - self._lock = RLock() + # Sending and receiving are independent, but two threads cannot + # send or receive at the same time no matter what. + self._send_lock = RLock() + self._recv_lock = RLock() # Used when logging out, the only request that seems to use 'ack' # TODO There might be a better way to handle msgs_ack requests @@ -52,23 +54,17 @@ class MtProtoSender: """Sends the specified MTProtoRequest, previously sending any message which needed confirmation.""" - # Now only us can be using this method - with self._lock: - self._logger.debug('send() acquired the lock') + # If any message needs confirmation send an AckRequest first + self._send_acknowledges() - # If any message needs confirmation send an AckRequest first - self._send_acknowledges() + # Finally send our packed request + with BinaryWriter() as writer: + request.on_send(writer) + self._send_packet(writer.get_bytes(), request) + self._pending_receive.append(request) - # Finally send our packed request - with BinaryWriter() as writer: - request.on_send(writer) - self._send_packet(writer.get_bytes(), request) - self._pending_receive.append(request) - - # And update the saved session - self.session.save() - - self._logger.debug('send() released the lock') + # And update the saved session + self.session.save() def _send_acknowledges(self): """Sends a messages acknowledge for all those who _need_confirmation""" @@ -90,16 +86,13 @@ class MtProtoSender: Any unhandled object (likely updates) will be passed to update_state.process(TLObject). """ - # TODO Don't ignore updates - self._logger.debug('Receiving a message...') - body = self.connection.recv() - message, remote_msg_id, remote_seq = self._decode_msg(body) + with self._recv_lock: + body = self.connection.recv() + message, remote_msg_id, remote_seq = self._decode_msg(body) with BinaryReader(message) as reader: self._process_msg(remote_msg_id, remote_seq, reader, update_state) - self._logger.debug('Received message.') - # endregion # region Low level processing @@ -107,8 +100,6 @@ class MtProtoSender: def _send_packet(self, packet, request): """Sends the given packet bytes with the additional information of the original request. - - This does NOT lock the threads! """ request.request_msg_id = self.session.get_new_msg_id() @@ -134,7 +125,8 @@ class MtProtoSender: self.session.auth_key.key_id, signed=False) cipher_writer.write(msg_key) cipher_writer.write(cipher_text) - self.connection.send(cipher_writer.get_bytes()) + with self._send_lock: + self.connection.send(cipher_writer.get_bytes()) def _decode_msg(self, body): """Decodes an received encrypted message body bytes""" diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 3ac6e65f..17e2ccb4 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -2,7 +2,7 @@ import os import threading from datetime import datetime, timedelta from mimetypes import guess_type -from threading import RLock, Thread +from threading import Thread from . import TelegramBareClient from . import helpers as utils @@ -50,9 +50,6 @@ class TelegramClient(TelegramBareClient): As opposed to the TelegramBareClient, this one features downloading media from different data centers, starting a second thread to handle updates, and some very common functionality. - - This should be used when the (slight) overhead of having locks, - threads, and possibly multiple connections is not an issue. """ # region Initialization @@ -118,9 +115,6 @@ class TelegramClient(TelegramBareClient): timeout=timeout ) - # Safety across multiple threads (for the updates thread) - self._lock = RLock() - # Used on connection - the user may modify these and reconnect kwargs['app_version'] = kwargs.get('app_version', self.__version__) for name, value in kwargs.items(): @@ -239,8 +233,6 @@ class TelegramClient(TelegramBareClient): raise AssertionError('Cannot invoke requests from the ReadThread') try: - self._lock.acquire() - # Users may call this method from within some update handler. # If this is the case, then the thread invoking the request # will be the one which should be reading (but is invoking the @@ -259,9 +251,6 @@ class TelegramClient(TelegramBareClient): self.reconnect(new_dc=e.new_dc) return self.invoke(request) - finally: - self._lock.release() - # Let people use client(SomeRequest()) instead client.invoke(...) __call__ = invoke From c81537bed070e3115f894821c83a3336e66c0a0c Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Fri, 8 Sep 2017 12:54:38 +0200 Subject: [PATCH 8/8] Simplify the workflow with UpdateState exposing a single flag param --- telethon/telegram_bare_client.py | 5 ++-- telethon/telegram_client.py | 28 ++++++++--------- telethon/update_state.py | 30 +++++++++++-------- .../interactive_telegram_client.py | 1 - 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 1ca4a6d9..e86d15f4 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -58,8 +58,7 @@ class TelegramBareClient: def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, - enable_updates=False, - active_updates_polling=False, + process_updates=False, timeout=timedelta(seconds=5)): """Initializes the Telegram client with the specified API ID and Hash. Session must always be a Session instance, and an optional proxy @@ -80,7 +79,7 @@ class TelegramBareClient: # This member will process updates if enabled. # One may change self.updates.enabled at any later point. - self.updates = UpdateState(enable_updates, active_updates_polling) + self.updates = UpdateState(process_updates) # These will be set later self.dc_options = None diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 17e2ccb4..babc5499 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -57,8 +57,7 @@ class TelegramClient(TelegramBareClient): def __init__(self, session, api_id, api_hash, connection_mode=ConnectionMode.TCP_FULL, proxy=None, - enable_updates=False, - active_updates_polling=False, + process_updates=False, timeout=timedelta(seconds=5), **kwargs): """Initializes the Telegram client with the specified API ID and Hash. @@ -72,18 +71,15 @@ class TelegramClient(TelegramBareClient): This will only affect how messages are sent over the network and how much processing is required before sending them. - If 'enable_updates' is set to True, it will process incoming - updates to ensure that no duplicates are received, and update - handlers will be invoked. You CANNOT invoke requests from within - these handlers. + If 'process_updates' is set to True, incoming updates will be + processed and you must manually call 'self.updates.poll()' from + another thread to retrieve the saved update objects, or your + memory will fill with these. You may modify the value of + 'self.updates.polling' at any later point. - In order to invoke requests upon receiving an update, you must - have your own thread (or use the main thread) and enable set - 'active_updates_polling' to True. You must call self.updates.poll() - or you'll memory will be filled with unhandled updates. - - You can also modify 'self.updates.enabled' and - 'self.updates.set_polling()' at any later point. + Despite the value of 'process_updates', if you later call + '.add_update_handler(...)', updates will also be processed + and the update objects will be passed to the handlers you added. If more named arguments are provided as **kwargs, they will be used to update the Session instance. Most common settings are: @@ -110,8 +106,7 @@ class TelegramClient(TelegramBareClient): session, api_id, api_hash, connection_mode=connection_mode, proxy=proxy, - enable_updates=enable_updates, - active_updates_polling=active_updates_polling, + process_updates=process_updates, timeout=timeout ) @@ -922,7 +917,10 @@ class TelegramClient(TelegramBareClient): def add_update_handler(self, handler): """Adds an update handler (a function which takes a TLObject, an update, as its parameter) and listens for updates""" + sync = not self.updates.handlers self.updates.handlers.append(handler) + if sync: + self.sync_updates() def remove_update_handler(self, handler): self.updates.handlers.remove(handler) diff --git a/telethon/update_state.py b/telethon/update_state.py index 5ad17ab3..3ceb87cb 100644 --- a/telethon/update_state.py +++ b/telethon/update_state.py @@ -7,11 +7,10 @@ from .tl import types as tl class UpdateState: """Used to hold the current state of processed updates. - To retrieve an update, .pop_update() should be called. + To retrieve an update, .poll() should be called. """ - def __init__(self, enabled, store_updates): - self.enabled = enabled - self._store_updates = store_updates + def __init__(self, polling): + self._polling = polling self.handlers = [] self._updates_lock = RLock() self._updates_available = Event() @@ -20,14 +19,14 @@ class UpdateState: # https://core.telegram.org/api/updates self._state = tl.updates.State(0, 0, datetime.now(), 0, 0) - def has_any(self): - """Returns True if a call to .pop_update() won't lock""" + def can_poll(self): + """Returns True if a call to .poll() won't lock""" return self._updates_available.is_set() def poll(self): """Polls an update or blocks until an update object is available""" - if not self._store_updates: - raise ValueError('Polling updates is not enabled.') + if not self._polling: + raise ValueError('Updates are not being polled hence not saved.') self._updates_available.wait() with self._updates_lock: @@ -37,17 +36,22 @@ class UpdateState: return update - def set_polling(self, store): - self._store_updates = store - if not store: + def get_polling(self): + return self._polling + + def set_polling(self, polling): + self._polling = polling + if not polling: with self._updates_lock: self._updates.clear() + polling = property(fget=get_polling, fset=set_polling) + def process(self, update): """Processes an update object. This method is normally called by the library itself. """ - if not self.enabled: + if not self._polling or not self.handlers: return with self._updates_lock: @@ -58,6 +62,6 @@ class UpdateState: for handler in self.handlers: handler(update) - if self._store_updates: + if self._polling: self._updates.append(update) self._updates_available.set() diff --git a/telethon_examples/interactive_telegram_client.py b/telethon_examples/interactive_telegram_client.py index f4ef57d4..b319dc56 100644 --- a/telethon_examples/interactive_telegram_client.py +++ b/telethon_examples/interactive_telegram_client.py @@ -52,7 +52,6 @@ class InteractiveTelegramClient(TelegramClient): super().__init__( session_user_id, api_id, api_hash, connection_mode=ConnectionMode.TCP_ABRIDGED, - enable_updates=True, proxy=proxy )