From f86339ab1701c1043a971a08777da5cdbc3b47b8 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Sat, 11 Sep 2021 14:16:25 +0200 Subject: [PATCH] Remove Conversation API --- readthedocs/misc/v2-migration-guide.rst | 53 ++ readthedocs/modules/custom.rst | 9 - .../quick-references/client-reference.rst | 1 - .../quick-references/objects-reference.rst | 27 - telethon/_client/dialogs.py | 20 - telethon/_client/telegrambaseclient.py | 3 - telethon/_client/telegramclient.py | 131 +---- telethon/_client/updates.py | 26 - telethon/errors/__init__.py | 2 +- telethon/errors/common.py | 11 - telethon/tl/custom/__init__.py | 1 - telethon/tl/custom/conversation.py | 526 ------------------ 12 files changed, 57 insertions(+), 753 deletions(-) delete mode 100644 telethon/tl/custom/conversation.py diff --git a/readthedocs/misc/v2-migration-guide.rst b/readthedocs/misc/v2-migration-guide.rst index 5c748895..1ee9bda4 100644 --- a/readthedocs/misc/v2-migration-guide.rst +++ b/readthedocs/misc/v2-migration-guide.rst @@ -51,3 +51,56 @@ removed. This implies: * ``run_until_disconnected`` // TODO provide standalone alternative for this? + + +The Conversation API has been removed +------------------------------------- + +This API had certain shortcomings, such as lacking persistence, poor interaction with other event +handlers, and overcomplicated usage for anything beyond the simplest case. + +It is not difficult to write your own code to deal with a conversation's state. A simple +`Finite State Machine `__ inside your handlers will do +just fine: + +.. code-block:: python + + from enum import Enum, auto + + # We use a Python Enum for the state because it's a clean and easy way to do it + class State(Enum): + WAIT_NAME = auto() + WAIT_AGE = auto() + + # The state in which different users are, {user_id: state} + conversation_state = {} + + # ...code to create and setup your client... + + @client.on(events.NewMessage) + async def handler(event): + who = event.sender_id + state = conversation_state.get(who) + + if state is None: + # Starting a conversation + await event.respond('Hi! What is your name?') + conversation_state[who] = State.WAIT_NAME + + elif state == State.WAIT_NAME: + name = event.text # Save the name wherever you want + await event.respond('Nice! What is your age?') + conversation_state[who] = State.WAIT_AGE + + elif state == State.WAIT_AGE: + age = event.text # Save the age wherever you want + await event.respond('Thank you!') + # Conversation is done so we can forget the state of this user + del conversation_state[who] + + # ...code to keep Telethon running... + +Not only is this approach simpler, but it can also be easily persisted, and you can adjust it +to your needs and your handlers much more easily. + +// TODO provide standalone alternative for this? diff --git a/readthedocs/modules/custom.rst b/readthedocs/modules/custom.rst index 074b2161..01284fbb 100644 --- a/readthedocs/modules/custom.rst +++ b/readthedocs/modules/custom.rst @@ -46,15 +46,6 @@ ChatGetter :show-inheritance: -Conversation -============ - -.. automodule:: telethon.tl.custom.conversation - :members: - :undoc-members: - :show-inheritance: - - Dialog ====== diff --git a/readthedocs/quick-references/client-reference.rst b/readthedocs/quick-references/client-reference.rst index 6dd8245c..22517288 100644 --- a/readthedocs/quick-references/client-reference.rst +++ b/readthedocs/quick-references/client-reference.rst @@ -107,7 +107,6 @@ Dialogs iter_drafts get_drafts delete_dialog - conversation Users ----- diff --git a/readthedocs/quick-references/objects-reference.rst b/readthedocs/quick-references/objects-reference.rst index 51ed4607..41f73033 100644 --- a/readthedocs/quick-references/objects-reference.rst +++ b/readthedocs/quick-references/objects-reference.rst @@ -155,33 +155,6 @@ its name, bot-API style file ID, etc. sticker_set -Conversation -============ - -The `Conversation ` object -is returned by the `client.conversation() -` method to easily -send and receive responses like a normal conversation. - -It bases `ChatGetter `. - -.. currentmodule:: telethon.tl.custom.conversation.Conversation - -.. autosummary:: - :nosignatures: - - send_message - send_file - mark_read - get_response - get_reply - get_edit - wait_read - wait_event - cancel - cancel_all - - AdminLogEvent ============= diff --git a/telethon/_client/dialogs.py b/telethon/_client/dialogs.py index 67c47458..8471c7fb 100644 --- a/telethon/_client/dialogs.py +++ b/telethon/_client/dialogs.py @@ -252,23 +252,3 @@ async def delete_dialog( await self(functions.messages.DeleteHistoryRequest(entity, 0, revoke=revoke)) return result - -def conversation( - self: 'TelegramClient', - entity: 'hints.EntityLike', - *, - timeout: float = 60, - total_timeout: float = None, - max_messages: int = 100, - exclusive: bool = True, - replies_are_responses: bool = True) -> custom.Conversation: - return custom.Conversation( - self, - entity, - timeout=timeout, - total_timeout=total_timeout, - max_messages=max_messages, - exclusive=exclusive, - replies_are_responses=replies_are_responses - - ) diff --git a/telethon/_client/telegrambaseclient.py b/telethon/_client/telegrambaseclient.py index 16822d6a..256e8e6f 100644 --- a/telethon/_client/telegrambaseclient.py +++ b/telethon/_client/telegrambaseclient.py @@ -267,9 +267,6 @@ def init( # Some further state for subclasses self._event_builders = [] - # {chat_id: {Conversation}} - self._conversations = collections.defaultdict(set) - # Hack to workaround the fact Telegram may send album updates as # different Updates when being sent from a different data center. # {grouped_id: AlbumHack} diff --git a/telethon/_client/telegramclient.py b/telethon/_client/telegramclient.py index 1c50a805..01cd5d14 100644 --- a/telethon/_client/telegramclient.py +++ b/telethon/_client/telegramclient.py @@ -151,9 +151,9 @@ class TelegramClient: will be received from Telegram as they occur. Turning this off means that Telegram will not send updates at all - so event handlers, conversations, and QR login will not work. - However, certain scripts don't need updates, so this will reduce - the amount of bandwidth used. + so event handlers and QR login will not work. However, certain + scripts don't need updates, so this will reduce the amount of + bandwidth used. """ # region Account @@ -1702,131 +1702,6 @@ class TelegramClient: """ return dialogs.delete_dialog(**locals()) - def conversation( - self: 'TelegramClient', - entity: 'hints.EntityLike', - *, - timeout: float = 60, - total_timeout: float = None, - max_messages: int = 100, - exclusive: bool = True, - replies_are_responses: bool = True) -> custom.Conversation: - """ - Creates a `Conversation ` - with the given entity. - - .. note:: - - This Conversation API has certain shortcomings, such as lacking - persistence, poor interaction with other event handlers, and - overcomplicated usage for anything beyond the simplest case. - - If you plan to interact with a bot without handlers, this works - fine, but when running a bot yourself, you may instead prefer - to follow the advice from https://stackoverflow.com/a/62246569/. - - This is not the same as just sending a message to create a "dialog" - with them, but rather a way to easily send messages and await for - responses or other reactions. Refer to its documentation for more. - - Arguments - entity (`entity`): - The entity with which a new conversation should be opened. - - timeout (`int` | `float`, optional): - The default timeout (in seconds) *per action* to be used. You - may also override this timeout on a per-method basis. By - default each action can take up to 60 seconds (the value of - this timeout). - - total_timeout (`int` | `float`, optional): - The total timeout (in seconds) to use for the whole - conversation. This takes priority over per-action - timeouts. After these many seconds pass, subsequent - actions will result in ``asyncio.TimeoutError``. - - max_messages (`int`, optional): - The maximum amount of messages this conversation will - remember. After these many messages arrive in the - specified chat, subsequent actions will result in - ``ValueError``. - - exclusive (`bool`, optional): - By default, conversations are exclusive within a single - chat. That means that while a conversation is open in a - chat, you can't open another one in the same chat, unless - you disable this flag. - - If you try opening an exclusive conversation for - a chat where it's already open, it will raise - ``AlreadyInConversationError``. - - replies_are_responses (`bool`, optional): - Whether replies should be treated as responses or not. - - If the setting is enabled, calls to `conv.get_response - ` - and a subsequent call to `conv.get_reply - ` - will return different messages, otherwise they may return - the same message. - - Consider the following scenario with one outgoing message, - 1, and two incoming messages, the second one replying:: - - Hello! <1 - 2> (reply to 1) Hi! - 3> (reply to 1) How are you? - - And the following code: - - .. code-block:: python - - async with client.conversation(chat) as conv: - msg1 = await conv.send_message('Hello!') - msg2 = await conv.get_response() - msg3 = await conv.get_reply() - - With the setting enabled, ``msg2`` will be ``'Hi!'`` and - ``msg3`` be ``'How are you?'`` since replies are also - responses, and a response was already returned. - - With the setting disabled, both ``msg2`` and ``msg3`` will - be ``'Hi!'`` since one is a response and also a reply. - - Returns - A `Conversation `. - - Example - .. code-block:: python - - # denotes outgoing messages you sent - # denotes incoming response messages - with bot.conversation(chat) as conv: - # Hi! - conv.send_message('Hi!') - - # Hello! - hello = conv.get_response() - - # Please tell me your name - conv.send_message('Please tell me your name') - - # ? - name = conv.get_response().raw_text - - while not any(x.isalpha() for x in name): - # Your name didn't have any letters! Try again - conv.send_message("Your name didn't have any letters! Try again") - - # Human - name = conv.get_response().raw_text - - # Thanks Human! - conv.send_message('Thanks {}!'.format(name)) - """ - return dialogs.conversation(**locals()) - # endregion Dialogs # region Downloads diff --git a/telethon/_client/updates.py b/telethon/_client/updates.py index 04d7fbfb..c5d04ade 100644 --- a/telethon/_client/updates.py +++ b/telethon/_client/updates.py @@ -418,22 +418,6 @@ async def _dispatch_update(self: 'TelegramClient', update, others, channel_id, p pass # might not have connection built = EventBuilderDict(self, update, others) - for conv_set in self._conversations.values(): - for conv in conv_set: - ev = built[events.NewMessage] - if ev: - conv._on_new_message(ev) - - ev = built[events.MessageEdited] - if ev: - conv._on_edit(ev) - - ev = built[events.MessageRead] - if ev: - conv._on_read(ev) - - if conv._custom: - await conv._check_custom(built) for builder, callback in self._event_builders: event = built[type(builder)] @@ -451,11 +435,6 @@ async def _dispatch_update(self: 'TelegramClient', update, others, channel_id, p try: await callback(event) - except errors.AlreadyInConversationError: - name = getattr(callback, '__name__', repr(callback)) - self._log[__name__].debug( - 'Event handler "%s" already has an open conversation, ' - 'ignoring new one', name) except events.StopPropagation: name = getattr(callback, '__name__', repr(callback)) self._log[__name__].debug( @@ -492,11 +471,6 @@ async def _dispatch_event(self: 'TelegramClient', event): try: await callback(event) - except errors.AlreadyInConversationError: - name = getattr(callback, '__name__', repr(callback)) - self._log[__name__].debug( - 'Event handler "%s" already has an open conversation, ' - 'ignoring new one', name) except events.StopPropagation: name = getattr(callback, '__name__', repr(callback)) self._log[__name__].debug( diff --git a/telethon/errors/__init__.py b/telethon/errors/__init__.py index f6bc16e5..a50ae36b 100644 --- a/telethon/errors/__init__.py +++ b/telethon/errors/__init__.py @@ -7,7 +7,7 @@ import re from .common import ( ReadCancelledError, TypeNotFoundError, InvalidChecksumError, InvalidBufferError, SecurityError, CdnFileTamperedError, - AlreadyInConversationError, BadMessageError, MultiError + BadMessageError, MultiError ) # This imports the base errors too, as they're imported there diff --git a/telethon/errors/common.py b/telethon/errors/common.py index de7d95f8..3ac246b3 100644 --- a/telethon/errors/common.py +++ b/telethon/errors/common.py @@ -79,17 +79,6 @@ class CdnFileTamperedError(SecurityError): ) -class AlreadyInConversationError(Exception): - """ - Occurs when another exclusive conversation is opened in the same chat. - """ - def __init__(self): - super().__init__( - 'Cannot open exclusive conversation in a ' - 'chat that already has one open conversation' - ) - - class BadMessageError(Exception): """Occurs when handling a bad_message_notification.""" ErrorMessages = { diff --git a/telethon/tl/custom/__init__.py b/telethon/tl/custom/__init__.py index 9804969e..00a0d00f 100644 --- a/telethon/tl/custom/__init__.py +++ b/telethon/tl/custom/__init__.py @@ -9,6 +9,5 @@ from .button import Button from .inlinebuilder import InlineBuilder from .inlineresult import InlineResult from .inlineresults import InlineResults -from .conversation import Conversation from .qrlogin import QRLogin from .participantpermissions import ParticipantPermissions diff --git a/telethon/tl/custom/conversation.py b/telethon/tl/custom/conversation.py deleted file mode 100644 index b99831f3..00000000 --- a/telethon/tl/custom/conversation.py +++ /dev/null @@ -1,526 +0,0 @@ -import asyncio -import functools -import inspect -import itertools -import time - -from .chatgetter import ChatGetter -from ... import helpers, utils, errors - -# Sometimes the edits arrive very fast (within the same second). -# In that case we add a small delta so that the age is older, for -# comparision purposes. This value is enough for up to 1000 messages. -_EDIT_COLLISION_DELTA = 0.001 - - -def _checks_cancelled(f): - @functools.wraps(f) - def wrapper(self, *args, **kwargs): - if self._cancelled: - raise asyncio.CancelledError('The conversation was cancelled before') - - return f(self, *args, **kwargs) - return wrapper - - -class Conversation(ChatGetter): - """ - Represents a conversation inside an specific chat. - - A conversation keeps track of new messages since it was - created until its exit and easily lets you query the - current state. - - If you need a conversation across two or more chats, - you should use two conversations and synchronize them - as you better see fit. - """ - _id_counter = 0 - _custom_counter = 0 - - def __init__(self, client, input_chat, - *, timeout, total_timeout, max_messages, - exclusive, replies_are_responses): - # This call resets the client - ChatGetter.__init__(self, input_chat=input_chat) - - self._id = Conversation._id_counter - Conversation._id_counter += 1 - - self._client = client - self._timeout = timeout - self._total_timeout = total_timeout - self._total_due = None - - self._outgoing = set() - self._last_outgoing = 0 - self._incoming = [] - self._last_incoming = 0 - self._max_incoming = max_messages - self._last_read = None - self._custom = {} - - self._pending_responses = {} - self._pending_replies = {} - self._pending_edits = {} - self._pending_reads = {} - - self._exclusive = exclusive - self._cancelled = False - - # The user is able to expect two responses for the same message. - # {desired message ID: next incoming index} - self._response_indices = {} - if replies_are_responses: - self._reply_indices = self._response_indices - else: - self._reply_indices = {} - - self._edit_dates = {} - - @_checks_cancelled - async def send_message(self, *args, **kwargs): - """ - Sends a message in the context of this conversation. Shorthand - for `telethon.client.messages.MessageMethods.send_message` with - ``entity`` already set. - """ - sent = await self._client.send_message( - self._input_chat, *args, **kwargs) - - # Albums will be lists, so handle that - ms = sent if isinstance(sent, list) else (sent,) - self._outgoing.update(m.id for m in ms) - self._last_outgoing = ms[-1].id - return sent - - @_checks_cancelled - async def send_file(self, *args, **kwargs): - """ - Sends a file in the context of this conversation. Shorthand - for `telethon.client.uploads.UploadMethods.send_file` with - ``entity`` already set. - """ - sent = await self._client.send_file( - self._input_chat, *args, **kwargs) - - # Albums will be lists, so handle that - ms = sent if isinstance(sent, list) else (sent,) - self._outgoing.update(m.id for m in ms) - self._last_outgoing = ms[-1].id - return sent - - @_checks_cancelled - def mark_read(self, message=None): - """ - Marks as read the latest received message if ``message is None``. - Otherwise, marks as read until the given message (or message ID). - - This is equivalent to calling `client.send_read_acknowledge - `. - """ - if message is None: - if self._incoming: - message = self._incoming[-1].id - else: - message = 0 - elif not isinstance(message, int): - message = message.id - - return self._client.send_read_acknowledge( - self._input_chat, max_id=message) - - def get_response(self, message=None, *, timeout=None): - """ - Gets the next message that responds to a previous one. This is - the method you need most of the time, along with `get_edit`. - - Args: - message (`Message ` | `int`, optional): - The message (or the message ID) for which a response - is expected. By default this is the last sent message. - - timeout (`int` | `float`, optional): - If present, this `timeout` (in seconds) will override the - per-action timeout defined for the conversation. - - .. code-block:: python - - async with client.conversation(...) as conv: - await conv.send_message('Hey, what is your name?') - - response = await conv.get_response() - name = response.text - - await conv.send_message('Nice to meet you, {}!'.format(name)) - """ - return self._get_message( - message, self._response_indices, self._pending_responses, timeout, - lambda x, y: True - ) - - def get_reply(self, message=None, *, timeout=None): - """ - Gets the next message that explicitly replies to a previous one. - """ - return self._get_message( - message, self._reply_indices, self._pending_replies, timeout, - lambda x, y: x.reply_to and x.reply_to.reply_to_msg_id == y - ) - - def _get_message( - self, target_message, indices, pending, timeout, condition): - """ - Gets the next desired message under the desired condition. - - Args: - target_message (`object`): - The target message for which we want to find another - response that applies based on `condition`. - - indices (`dict`): - This dictionary remembers the last ID chosen for the - input `target_message`. - - pending (`dict`): - This dictionary remembers {msg_id: Future} to be set - once `condition` is met. - - timeout (`int`): - The timeout (in seconds) override to use for this operation. - - condition (`callable`): - The condition callable that checks if an incoming - message is a valid response. - """ - start_time = time.time() - target_id = self._get_message_id(target_message) - - # If there is no last-chosen ID, make sure to pick one *after* - # the input message, since we don't want responses back in time - if target_id not in indices: - for i, incoming in enumerate(self._incoming): - if incoming.id > target_id: - indices[target_id] = i - break - else: - indices[target_id] = len(self._incoming) - - # We will always return a future from here, even if the result - # can be set immediately. Otherwise, needing to await only - # sometimes is an annoying edge case (i.e. we would return - # a `Message` but `get_response()` always `await`'s). - future = self._client.loop.create_future() - - # If there are enough responses saved return the next one - last_idx = indices[target_id] - if last_idx < len(self._incoming): - incoming = self._incoming[last_idx] - if condition(incoming, target_id): - indices[target_id] += 1 - future.set_result(incoming) - return future - - # Otherwise the next incoming response will be the one to use - # - # Note how we fill "pending" before giving control back to the - # event loop through "await". We want to register it as soon as - # possible, since any other task switch may arrive with the result. - pending[target_id] = future - return self._get_result(future, start_time, timeout, pending, target_id) - - def get_edit(self, message=None, *, timeout=None): - """ - Awaits for an edit after the last message to arrive. - The arguments are the same as those for `get_response`. - """ - start_time = time.time() - target_id = self._get_message_id(message) - - target_date = self._edit_dates.get(target_id, 0) - earliest_edit = min( - (x for x in self._incoming - if x.edit_date - and x.id > target_id - and x.edit_date.timestamp() > target_date - ), - key=lambda x: x.edit_date.timestamp(), - default=None - ) - - future = self._client.loop.create_future() - if earliest_edit and earliest_edit.edit_date.timestamp() > target_date: - self._edit_dates[target_id] = earliest_edit.edit_date.timestamp() - future.set_result(earliest_edit) - return future # we should always return something we can await - - # Otherwise the next incoming response will be the one to use - self._pending_edits[target_id] = future - return self._get_result(future, start_time, timeout, self._pending_edits, target_id) - - def wait_read(self, message=None, *, timeout=None): - """ - Awaits for the sent message to be marked as read. Note that - receiving a response doesn't imply the message was read, and - this action will also trigger even without a response. - """ - start_time = time.time() - future = self._client.loop.create_future() - target_id = self._get_message_id(message) - - if self._last_read is None: - self._last_read = target_id - 1 - - if self._last_read >= target_id: - return - - self._pending_reads[target_id] = future - return self._get_result(future, start_time, timeout, self._pending_reads, target_id) - - async def wait_event(self, event, *, timeout=None): - """ - Waits for a custom event to occur. Timeouts still apply. - - .. note:: - - **Only use this if there isn't another method available!** - For example, don't use `wait_event` for new messages, - since `get_response` already exists, etc. - - Unless you're certain that your code will run fast enough, - generally you should get a "handle" of this special coroutine - before acting. In this example you will see how to wait for a user - to join a group with proper use of `wait_event`: - - .. code-block:: python - - from telethon import TelegramClient, events - - client = TelegramClient(...) - group_id = ... - - async def main(): - # Could also get the user id from an event; this is just an example - user_id = ... - - async with client.conversation(user_id) as conv: - # Get a handle to the future event we'll wait for - handle = conv.wait_event(events.ChatAction( - group_id, - func=lambda e: e.user_joined and e.user_id == user_id - )) - - # Perform whatever action in between - await conv.send_message('Please join this group before speaking to me!') - - # Wait for the event we registered above to fire - event = await handle - - # Continue with the conversation - await conv.send_message('Thanks!') - - This way your event can be registered before acting, - since the response may arrive before your event was - registered. It depends on your use case since this - also means the event can arrive before you send - a previous action. - """ - start_time = time.time() - if isinstance(event, type): - event = event() - - await event.resolve(self._client) - - counter = Conversation._custom_counter - Conversation._custom_counter += 1 - - future = self._client.loop.create_future() - self._custom[counter] = (event, future) - try: - return await self._get_result(future, start_time, timeout, self._custom, counter) - finally: - # Need to remove it from the dict if it times out, else we may - # try and fail to set the result later (#1618). - self._custom.pop(counter, None) - - async def _check_custom(self, built): - for key, (ev, fut) in list(self._custom.items()): - ev_type = type(ev) - inst = built[ev_type] - - if inst: - filter = ev.filter(inst) - if inspect.isawaitable(filter): - filter = await filter - - if filter: - fut.set_result(inst) - del self._custom[key] - - def _on_new_message(self, response): - response = response.message - if response.chat_id != self.chat_id or response.out: - return - - if len(self._incoming) == self._max_incoming: - self._cancel_all(ValueError('Too many incoming messages')) - return - - self._incoming.append(response) - - # Most of the time, these dictionaries will contain just one item - # TODO In fact, why not make it be that way? Force one item only. - # How often will people want to wait for two responses at - # the same time? It's impossible, first one will arrive - # and then another, so they can do that. - for msg_id, future in list(self._pending_responses.items()): - self._response_indices[msg_id] = len(self._incoming) - future.set_result(response) - del self._pending_responses[msg_id] - - for msg_id, future in list(self._pending_replies.items()): - if response.reply_to and msg_id == response.reply_to.reply_to_msg_id: - self._reply_indices[msg_id] = len(self._incoming) - future.set_result(response) - del self._pending_replies[msg_id] - - def _on_edit(self, message): - message = message.message - if message.chat_id != self.chat_id or message.out: - return - - # We have to update our incoming messages with the new edit date - for i, m in enumerate(self._incoming): - if m.id == message.id: - self._incoming[i] = message - break - - for msg_id, future in list(self._pending_edits.items()): - if msg_id < message.id: - edit_ts = message.edit_date.timestamp() - - # We compare <= because edit_ts resolution is always to - # seconds, but we may have increased _edit_dates before. - # Since the dates are ever growing this is not a problem. - if edit_ts <= self._edit_dates.get(msg_id, 0): - self._edit_dates[msg_id] += _EDIT_COLLISION_DELTA - else: - self._edit_dates[msg_id] = message.edit_date.timestamp() - - future.set_result(message) - del self._pending_edits[msg_id] - - def _on_read(self, event): - if event.chat_id != self.chat_id or event.inbox: - return - - self._last_read = event.max_id - - for msg_id, pending in list(self._pending_reads.items()): - if msg_id >= self._last_read: - pending.set_result(True) - del self._pending_reads[msg_id] - - def _get_message_id(self, message): - if message is not None: # 0 is valid but false-y, check for None - return message if isinstance(message, int) else message.id - elif self._last_outgoing: - return self._last_outgoing - else: - raise ValueError('No message was sent previously') - - @_checks_cancelled - def _get_result(self, future, start_time, timeout, pending, target_id): - due = self._total_due - if timeout is None: - timeout = self._timeout - - if timeout is not None: - due = min(due, start_time + timeout) - - # NOTE: We can't try/finally to pop from pending here because - # the event loop needs to get back to us, but it might - # dispatch another update before, and in that case a - # response could be set twice. So responses must be - # cleared when their futures are set to a result. - return asyncio.wait_for( - future, - timeout=None if due == float('inf') else due - time.time() - ) - - def _cancel_all(self, exception=None): - self._cancelled = True - for pending in itertools.chain( - self._pending_responses.values(), - self._pending_replies.values(), - self._pending_edits.values()): - if exception: - pending.set_exception(exception) - else: - pending.cancel() - - for _, fut in self._custom.values(): - if exception: - fut.set_exception(exception) - else: - fut.cancel() - - async def __aenter__(self): - self._input_chat = \ - await self._client.get_input_entity(self._input_chat) - - self._chat_peer = utils.get_peer(self._input_chat) - - # Make sure we're the only conversation in this chat if it's exclusive - chat_id = utils.get_peer_id(self._chat_peer) - conv_set = self._client._conversations[chat_id] - if self._exclusive and conv_set: - raise errors.AlreadyInConversationError() - - conv_set.add(self) - self._cancelled = False - - self._last_outgoing = 0 - self._last_incoming = 0 - for d in ( - self._outgoing, self._incoming, - self._pending_responses, self._pending_replies, - self._pending_edits, self._response_indices, - self._reply_indices, self._edit_dates, self._custom): - d.clear() - - if self._total_timeout: - self._total_due = time.time() + self._total_timeout - else: - self._total_due = float('inf') - - return self - - def cancel(self): - """ - Cancels the current conversation. Pending responses and subsequent - calls to get a response will raise ``asyncio.CancelledError``. - - This method is synchronous and should not be awaited. - """ - self._cancel_all() - - async def cancel_all(self): - """ - Calls `cancel` on *all* conversations in this chat. - - Note that you should ``await`` this method, since it's meant to be - used outside of a context manager, and it needs to resolve the chat. - """ - chat_id = await self._client.get_peer_id(self._input_chat) - for conv in self._client._conversations[chat_id]: - conv.cancel() - - async def __aexit__(self, exc_type, exc_val, exc_tb): - chat_id = utils.get_peer_id(self._chat_peer) - conv_set = self._client._conversations[chat_id] - conv_set.discard(self) - if not conv_set: - del self._client._conversations[chat_id] - - self._cancel_all()