diff --git a/telethon/client/telegrambaseclient.py b/telethon/client/telegrambaseclient.py index eb571ea5..9fce65fe 100644 --- a/telethon/client/telegrambaseclient.py +++ b/telethon/client/telegrambaseclient.py @@ -29,6 +29,41 @@ __default_log__ = logging.getLogger(__base_name__) __default_log__.addHandler(logging.NullHandler()) +# In seconds, how long to wait before disconnecting a exported sender. +_DISCONNECT_EXPORTED_AFTER = 60 + + +class _ExportState: + def __init__(self): + # ``n`` is the amount of borrows a given sender has; + # once ``n`` reaches ``0``, disconnect the sender after a while. + self._n = 0 + self._zero_ts = 0 + self._connected = False + + def add_borrow(self): + self._n += 1 + self._connected = True + + def add_return(self): + self._n -= 1 + assert self._n >= 0, 'returned sender more than it was borrowed' + if self._n == 0: + self._zero_ts = time.time() + + def should_disconnect(self): + return (self._n == 0 + and self._connected + and (time.time() - self._zero_ts) > _DISCONNECT_EXPORTED_AFTER) + + def need_connect(self): + return not self._connected + + def mark_disconnected(self): + assert self.should_disconnect(), 'marked as disconnected when it was borrowed' + self._connected = False + + # TODO How hard would it be to support both `trio` and `asyncio`? class TelegramBaseClient(abc.ABC): """ @@ -314,9 +349,7 @@ class TelegramBaseClient(abc.ABC): # Remember flood-waited requests to avoid making them again self._flood_waited_requests = {} - # Cache ``{dc_id: (n, MTProtoSender)}`` for all borrowed senders, - # being ``n`` the amount of borrows a given sender has; once ``n`` - # reaches ``0`` it should be disconnected and removed. + # Cache ``{dc_id: (_ExportState, MTProtoSender)}`` for all borrowed senders self._borrowed_senders = {} self._borrow_sender_lock = asyncio.Lock(loop=self._loop) @@ -500,6 +533,15 @@ class TelegramBaseClient(abc.ABC): async def _disconnect_coro(self: 'TelegramClient'): await self._disconnect() + # Also clean-up all exported senders because we're done with them + async with self._borrow_sender_lock: + for state, sender in self._borrowed_senders.values(): + if state.should_disconnect(): + # disconnect should never raise + await sender.disconnect() + + self._borrowed_senders.clear() + # trio's nurseries would handle this for us, but this is asyncio. # All tasks spawned in the background should properly be terminated. if self._dispatching_updates_queue is None and self._updates_queue: @@ -598,8 +640,7 @@ class TelegramBaseClient(abc.ABC): loggers=self._log, proxy=self._proxy )) - self._log[__name__].info('Exporting authorization for data center %s', - dc) + self._log[__name__].info('Exporting auth for new borrowed sender in %s', dc) auth = await self(functions.auth.ExportAuthorizationRequest(dc_id)) req = self._init_with(functions.auth.ImportAuthorizationRequest( id=auth.id, bytes=auth.bytes @@ -616,11 +657,16 @@ class TelegramBaseClient(abc.ABC): Once its job is over it should be `_return_exported_sender`. """ async with self._borrow_sender_lock: - n, sender = self._borrowed_senders.get(dc_id, (0, None)) - if not sender: + self._log[__name__].debug('Borrowing sender for dc_id %d', dc_id) + state, sender = self._borrowed_senders.get(dc_id, (None, None)) + + if state is None: + state = _ExportState() sender = await self._create_exported_sender(dc_id) sender.dc_id = dc_id - elif not n: + self._borrowed_senders[dc_id] = (state, sender) + + elif state.need_connect(): dc = await self._get_dc(dc_id) await sender.connect(self._connection( dc.ip_address, @@ -631,9 +677,8 @@ class TelegramBaseClient(abc.ABC): proxy=self._proxy )) - self._borrowed_senders[dc_id] = (n + 1, sender) - - return sender + state.add_borrow() + return sender async def _return_exported_sender(self: 'TelegramClient', sender): """ @@ -641,14 +686,23 @@ class TelegramBaseClient(abc.ABC): been returned, the sender is cleanly disconnected. """ async with self._borrow_sender_lock: - dc_id = sender.dc_id - n, _ = self._borrowed_senders[dc_id] - n -= 1 - self._borrowed_senders[dc_id] = (n, sender) - if not n: - self._log[__name__].info( - 'Disconnecting borrowed sender for DC %d', dc_id) - await sender.disconnect() + self._log[__name__].debug('Returning borrowed sender for dc_id %d', sender.dc_id) + state, _ = self._borrowed_senders[sender.dc_id] + state.add_return() + + async def _clean_exported_senders(self: 'TelegramClient'): + """ + Cleans-up all unused exported senders by disconnecting them. + """ + async with self._borrow_sender_lock: + for dc_id, (state, sender) in self._borrowed_senders.items(): + if state.should_disconnect(): + self._log[__name__].info( + 'Disconnecting borrowed sender for DC %d', dc_id) + + # Disconnect should never raise + await sender.disconnect() + state.mark_disconnected() async def _get_cdn_client(self: 'TelegramClient', cdn_redirect): """Similar to ._borrow_exported_client, but for CDNs""" diff --git a/telethon/client/updates.py b/telethon/client/updates.py index fe19eaaf..854860fd 100644 --- a/telethon/client/updates.py +++ b/telethon/client/updates.py @@ -335,6 +335,9 @@ class UpdateMethods: except Exception: continue # Any disconnected exception should be ignored + # Check if we have any exported senders to clean-up periodically + await self._clean_exported_senders() + # Don't bother sending pings until the low-level connection is # ready, otherwise a lot of pings will be batched to be sent upon # reconnect, when we really don't care about that.