Stop using futures as one-shot channels

Instead, use a single-item queue. This is asyncio.run-friendly,
even when the client is initialized outside of async def.
This commit is contained in:
Lonami Exo 2022-01-16 13:59:43 +01:00
parent a62627534e
commit 4f4c7040d1
2 changed files with 13 additions and 18 deletions

View File

@ -30,7 +30,7 @@ async def set_receive_updates(self: 'TelegramClient', receive_updates):
async def run_until_disconnected(self: 'TelegramClient'): async def run_until_disconnected(self: 'TelegramClient'):
# Make a high-level request to notify that we want updates # Make a high-level request to notify that we want updates
await self(_tl.fn.updates.GetState()) await self(_tl.fn.updates.GetState())
return await self._sender.disconnected await self._sender.wait_disconnected()
def on(self: 'TelegramClient', event: EventBuilder): def on(self: 'TelegramClient', event: EventBuilder):
def decorator(f): def decorator(f):

View File

@ -58,8 +58,8 @@ class MTProtoSender:
# pending futures should be cancelled. # pending futures should be cancelled.
self._user_connected = False self._user_connected = False
self._reconnecting = False self._reconnecting = False
self._disconnected = asyncio.get_running_loop().create_future() self._disconnected = asyncio.Queue(1)
self._disconnected.set_result(None) self._disconnected.put_nowait(None)
# We need to join the loops upon disconnection # We need to join the loops upon disconnection
self._send_loop_handle = None self._send_loop_handle = None
@ -191,16 +191,14 @@ class MTProtoSender:
self._send_queue.extend(states) self._send_queue.extend(states)
return futures return futures
@property async def wait_disconnected(self):
def disconnected(self):
""" """
Future that resolves when the connection to Telegram Wait until the client is disconnected.
ends, either by user action or in the background. Raise if the disconnection finished with error.
Note that it may resolve in either a ``ConnectionError``
or any other unexpected error that could not be handled.
""" """
return asyncio.shield(self._disconnected) res = await self._disconnected.get()
if isinstance(res, BaseException):
raise res
# Private methods # Private methods
@ -257,8 +255,8 @@ class MTProtoSender:
# _disconnected only completes after manual disconnection # _disconnected only completes after manual disconnection
# or errors after which the sender cannot continue such # or errors after which the sender cannot continue such
# as failing to reconnect or any unexpected error. # as failing to reconnect or any unexpected error.
if self._disconnected.done(): while not self._disconnected.empty():
self._disconnected = asyncio.get_running_loop().create_future() self._disconnected.get_nowait()
self._log.info('Connection to %s complete!', self._connection) self._log.info('Connection to %s complete!', self._connection)
@ -316,11 +314,8 @@ class MTProtoSender:
self._log.info('Disconnection from %s complete!', self._connection) self._log.info('Disconnection from %s complete!', self._connection)
self._connection = None self._connection = None
if self._disconnected and not self._disconnected.done(): if not self._disconnected.full():
if error: self._disconnected.put_nowait(error)
self._disconnected.set_exception(error)
else:
self._disconnected.set_result(None)
async def _reconnect(self, last_error): async def _reconnect(self, last_error):
""" """