From 7d21b4040161815b7cb8247668ea3bcb6d44db95 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Tue, 30 Aug 2022 12:32:21 +0200 Subject: [PATCH] Revert "Make sessions async" This reverts commit d2de0f3acae7ab8d2df4897176bd7259d1dec426. --- telethon/client/auth.py | 2 +- telethon/client/downloads.py | 4 +-- telethon/client/messages.py | 2 +- telethon/client/telegrambaseclient.py | 25 +++++++++------- telethon/client/updates.py | 6 +--- telethon/client/users.py | 19 +++--------- telethon/network/mtprotosender.py | 2 +- telethon/sessions/abstract.py | 43 ++++++++++++++++++++++----- telethon/sessions/memory.py | 36 +++++++++++----------- 9 files changed, 78 insertions(+), 61 deletions(-) diff --git a/telethon/client/auth.py b/telethon/client/auth.py index 47cec6d0..8d7d68d3 100644 --- a/telethon/client/auth.py +++ b/telethon/client/auth.py @@ -614,7 +614,7 @@ class AuthMethods: self._authorized = False await self.disconnect() - await self.session.delete() + self.session.delete() return True async def edit_2fa( diff --git a/telethon/client/downloads.py b/telethon/client/downloads.py index bace4211..6d7a8d65 100644 --- a/telethon/client/downloads.py +++ b/telethon/client/downloads.py @@ -55,7 +55,7 @@ class _DirectDownloadIter(RequestIter): if option.ip_address == self.client.session.server_address: self.client.session.set_dc( option.id, option.ip_address, option.port) - await self.client.session.save() + self.client.session.save() break # TODO Figure out why the session may have the wrong DC ID @@ -402,7 +402,7 @@ class DownloadMethods: if isinstance(message.action, types.MessageActionChatEditPhoto): media = media.photo - + if isinstance(media, types.MessageMediaWebPage): if isinstance(media.webpage, types.WebPage): media = media.webpage.document or media.webpage.photo diff --git a/telethon/client/messages.py b/telethon/client/messages.py index 24ec8c92..60db3521 100644 --- a/telethon/client/messages.py +++ b/telethon/client/messages.py @@ -1019,7 +1019,7 @@ class MessageMethods: async def edit_message( self: 'TelegramClient', entity: 'typing.Union[hints.EntityLike, types.Message]', - message: 'hints.MessageIDLike' = None, + message: 'hints.MessageLike' = None, text: str = None, *, parse_mode: str = (), diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index 7f154a97..d4e353e4 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -398,6 +398,11 @@ class TelegramBaseClient(abc.ABC): self._authorized = None # None = unknown, False = no, True = yes + # Update state (for catching up after a disconnection) + # TODO Get state from channels too + self._state_cache = StateCache( + self.session.get_update_state(0), self._log) + # Some further state for subclasses self._event_builders = [] @@ -535,13 +540,13 @@ class TelegramBaseClient(abc.ABC): return self.session.auth_key = self._sender.auth_key - await self.session.save() + self.session.save() if self._catch_up: ss = SessionState(0, 0, False, 0, 0, 0, 0, None) cs = [] - for entity_id, state in await self.session.get_update_states(): + for entity_id, state in self.session.get_update_states(): if entity_id == 0: # TODO current session doesn't store self-user info but adding that is breaking on downstream session impls ss = SessionState(0, 0, False, state.pts, state.qts, int(state.date.timestamp()), state.seq, None) @@ -550,7 +555,7 @@ class TelegramBaseClient(abc.ABC): self._message_box.load(ss, cs) for state in cs: - entity = await self.session.get_input_entity(state.channel_id) + entity = self.session.get_input_entity(state.channel_id) if entity: self._mb_entity_cache.put(Entity(EntityType.CHANNEL, entity.channel_id, entity.access_hash)) @@ -670,15 +675,15 @@ class TelegramBaseClient(abc.ABC): # Piggy-back on an arbitrary TL type with users and chats so the session can understand to read the entities. # It doesn't matter if we put users in the list of chats. - await self.session.process_entities(types.contacts.ResolvedPeer(None, [e._as_input_peer() for e in entities], [])) + self.session.process_entities(types.contacts.ResolvedPeer(None, [e._as_input_peer() for e in entities], [])) ss, cs = self._message_box.session_state() - await self.session.set_update_state(0, types.updates.State(**ss, unread_count=0)) + self.session.set_update_state(0, types.updates.State(**ss, unread_count=0)) now = datetime.datetime.now() # any datetime works; channels don't need it for channel_id, pts in cs.items(): - await self.session.set_update_state(channel_id, types.updates.State(pts, 0, now, 0, unread_count=0)) + self.session.set_update_state(channel_id, types.updates.State(pts, 0, now, 0, unread_count=0)) - await self.session.close() + self.session.close() async def _disconnect(self: 'TelegramClient'): """ @@ -704,17 +709,17 @@ class TelegramBaseClient(abc.ABC): # so it's not valid anymore. Set to None to force recreating it. self._sender.auth_key.key = None self.session.auth_key = None - await self.session.save() + self.session.save() await self._disconnect() return await self.connect() - async def _auth_key_callback(self: 'TelegramClient', auth_key): + def _auth_key_callback(self: 'TelegramClient', auth_key): """ Callback from the sender whenever it needed to generate a new authorization key. This means we are not authorized. """ self.session.auth_key = auth_key - await self.session.save() + self.session.save() # endregion diff --git a/telethon/client/updates.py b/telethon/client/updates.py index 03a66104..538469f0 100644 --- a/telethon/client/updates.py +++ b/telethon/client/updates.py @@ -397,11 +397,7 @@ class UpdateMethods: # inserted because this is a rather expensive operation # (default's sqlite3 takes ~0.1s to commit changes). Do # it every minute instead. No-op if there's nothing new. - try: - await self.session.save() - except OSError as e: - # No big deal if this cannot be immediately saved - self._log[__name__].warning('Could not perform the periodic save of session data: %s: %s', type(e), e) + self.session.save() async def _dispatch_update(self: 'TelegramClient', update): # TODO only used for AlbumHack, and MessageBox is not really designed for this diff --git a/telethon/client/users.py b/telethon/client/users.py index ee76095c..22db969e 100644 --- a/telethon/client/users.py +++ b/telethon/client/users.py @@ -71,11 +71,7 @@ class UserMethods: exceptions.append(e) results.append(None) continue - try: - await self.session.process_entities(result) - except OSError as e: - self._log[__name__].warning( - 'Failed to save possibly new entities to the session: %s: %s', type(e), e) + self.session.process_entities(result) self._entity_cache.add(result) exceptions.append(None) results.append(result) @@ -86,14 +82,7 @@ class UserMethods: return results else: result = await future - # This is called pretty often, and it's okay if it fails every now and then. - # It only means certain entities won't be saved. - try: - await self.session.process_entities(result) - except OSError as e: - self._log[__name__].warning( - 'Failed to save possibly new entities to the session: %s: %s', type(e), e) - + self.session.process_entities(result) self._entity_cache.add(result) return result except (errors.ServerError, errors.RpcCallFailError, @@ -438,7 +427,7 @@ class UserMethods: # No InputPeer, cached peer, or known string. Fetch from disk cache try: - return await self.session.get_input_entity(peer) + return self.session.get_input_entity(peer) except ValueError: pass @@ -578,7 +567,7 @@ class UserMethods: try: # Nobody with this username, maybe it's an exact name/title return await self.get_entity( - await self.session.get_input_entity(string)) + self.session.get_input_entity(string)) except ValueError: pass diff --git a/telethon/network/mtprotosender.py b/telethon/network/mtprotosender.py index c8004fde..86b48e7a 100644 --- a/telethon/network/mtprotosender.py +++ b/telethon/network/mtprotosender.py @@ -296,7 +296,7 @@ class MTProtoSender: # notify whenever we change it. This is crucial when we # switch to different data centers. if self._auth_key_callback: - await self._auth_key_callback(self.auth_key) + self._auth_key_callback(self.auth_key) self._log.debug('auth_key generation success!') return True diff --git a/telethon/sessions/abstract.py b/telethon/sessions/abstract.py index 75dcdd00..8c0a717e 100644 --- a/telethon/sessions/abstract.py +++ b/telethon/sessions/abstract.py @@ -79,7 +79,7 @@ class Session(ABC): raise NotImplementedError @abstractmethod - async def get_update_state(self, entity_id): + def get_update_state(self, entity_id): """ Returns the ``UpdateState`` associated with the given `entity_id`. If the `entity_id` is 0, it should return the ``UpdateState`` for @@ -89,7 +89,7 @@ class Session(ABC): raise NotImplementedError @abstractmethod - async def set_update_state(self, entity_id, state): + def set_update_state(self, entity_id, state): """ Sets the given ``UpdateState`` for the specified `entity_id`, which should be 0 if the ``UpdateState`` is the "general" state (and not @@ -103,15 +103,14 @@ class Session(ABC): Returns an iterable over all known pairs of ``(entity ID, update state)``. """ - @abstractmethod - async def close(self): + def close(self): """ Called on client disconnection. Should be used to free any used resources. Can be left empty if none. """ @abstractmethod - async def save(self): + def save(self): """ Called whenever important properties change. It should make persist the relevant session information to disk. @@ -119,15 +118,22 @@ class Session(ABC): raise NotImplementedError @abstractmethod - async def delete(self): + def delete(self): """ Called upon client.log_out(). Should delete the stored information from disk since it's not valid anymore. """ raise NotImplementedError + @classmethod + def list_sessions(cls): + """ + Lists available sessions. Not used by the library itself. + """ + return [] + @abstractmethod - async def process_entities(self, tlo): + def process_entities(self, tlo): """ Processes the input ``TLObject`` or ``list`` and saves whatever information is relevant (e.g., ID or access hash). @@ -135,7 +141,7 @@ class Session(ABC): raise NotImplementedError @abstractmethod - async def get_input_entity(self, key): + def get_input_entity(self, key): """ Turns the given key into an ``InputPeer`` (e.g. ``InputPeerUser``). The library uses this method whenever an ``InputPeer`` is needed @@ -143,3 +149,24 @@ class Session(ABC): to use a cached username to avoid extra RPC). """ raise NotImplementedError + + @abstractmethod + def cache_file(self, md5_digest, file_size, instance): + """ + Caches the given file information persistently, so that it + doesn't need to be re-uploaded in case the file is used again. + + The ``instance`` will be either an ``InputPhoto`` or ``InputDocument``, + both with an ``.id`` and ``.access_hash`` attributes. + """ + raise NotImplementedError + + @abstractmethod + def get_file(self, md5_digest, file_size, cls): + """ + Returns an instance of ``cls`` if the ``md5_digest`` and ``file_size`` + match an existing saved record. The class will either be an + ``InputPhoto`` or ``InputDocument``, both with two parameters + ``id`` and ``access_hash`` in that order. + """ + raise NotImplementedError diff --git a/telethon/sessions/memory.py b/telethon/sessions/memory.py index 2f8e1ad6..27d7907b 100644 --- a/telethon/sessions/memory.py +++ b/telethon/sessions/memory.py @@ -71,22 +71,22 @@ class MemorySession(Session): def takeout_id(self, value): self._takeout_id = value - async def get_update_state(self, entity_id): + def get_update_state(self, entity_id): return self._update_states.get(entity_id, None) - async def set_update_state(self, entity_id, state): + def set_update_state(self, entity_id, state): self._update_states[entity_id] = state async def get_update_states(self): return self._update_states.items() - async def close(self): + def close(self): pass - async def save(self): + def save(self): pass - async def delete(self): + def delete(self): pass @staticmethod @@ -147,31 +147,31 @@ class MemorySession(Session): rows.append(row) return rows - async def process_entities(self, tlo): + def process_entities(self, tlo): self._entities |= set(self._entities_to_rows(tlo)) - async def get_entity_rows_by_phone(self, phone): + def get_entity_rows_by_phone(self, phone): try: return next((id, hash) for id, hash, _, found_phone, _ in self._entities if found_phone == phone) except StopIteration: pass - async def get_entity_rows_by_username(self, username): + def get_entity_rows_by_username(self, username): try: return next((id, hash) for id, hash, found_username, _, _ in self._entities if found_username == username) except StopIteration: pass - async def get_entity_rows_by_name(self, name): + def get_entity_rows_by_name(self, name): try: return next((id, hash) for id, hash, _, _, found_name in self._entities if found_name == name) except StopIteration: pass - async def get_entity_rows_by_id(self, id, exact=True): + def get_entity_rows_by_id(self, id, exact=True): try: if exact: return next((id, hash) for found_id, hash, _, _, _ @@ -187,7 +187,7 @@ class MemorySession(Session): except StopIteration: pass - async def get_input_entity(self, key): + def get_input_entity(self, key): try: if key.SUBCLASS_OF_ID in (0xc91c90b6, 0xe669bf46, 0x40f202fd): # hex(crc32(b'InputPeer', b'InputUser' and b'InputChannel')) @@ -207,21 +207,21 @@ class MemorySession(Session): if isinstance(key, str): phone = utils.parse_phone(key) if phone: - result = await self.get_entity_rows_by_phone(phone) + result = self.get_entity_rows_by_phone(phone) else: username, invite = utils.parse_username(key) if username and not invite: - result = await self.get_entity_rows_by_username(username) + result = self.get_entity_rows_by_username(username) else: tup = utils.resolve_invite_link(key)[1] if tup: - result = await self.get_entity_rows_by_id(tup, exact=False) + result = self.get_entity_rows_by_id(tup, exact=False) elif isinstance(key, int): - result = await self.get_entity_rows_by_id(key, exact) + result = self.get_entity_rows_by_id(key, exact) if not result and isinstance(key, str): - result = await self.get_entity_rows_by_name(key) + result = self.get_entity_rows_by_name(key) if result: entity_id, entity_hash = result # unpack resulting tuple @@ -236,14 +236,14 @@ class MemorySession(Session): else: raise ValueError('Could not find input entity with key ', key) - async def cache_file(self, md5_digest, file_size, instance): + def cache_file(self, md5_digest, file_size, instance): if not isinstance(instance, (InputDocument, InputPhoto)): raise TypeError('Cannot cache %s instance' % type(instance)) key = (md5_digest, file_size, _SentFileType.from_type(type(instance))) value = (instance.id, instance.access_hash) self._files[key] = value - async def get_file(self, md5_digest, file_size, cls): + def get_file(self, md5_digest, file_size, cls): key = (md5_digest, file_size, _SentFileType.from_type(cls)) try: return cls(*self._files[key])