Don't disconnect borrowed senders immediately (#1364)

This commit is contained in:
Lonami Exo 2020-04-05 12:34:33 +02:00
parent 3729fde572
commit d0f937bcb6
2 changed files with 76 additions and 19 deletions

View File

@ -29,6 +29,41 @@ __default_log__ = logging.getLogger(__base_name__)
__default_log__.addHandler(logging.NullHandler()) __default_log__.addHandler(logging.NullHandler())
# In seconds, how long to wait before disconnecting a exported sender.
_DISCONNECT_EXPORTED_AFTER = 60
class _ExportState:
def __init__(self):
# ``n`` is the amount of borrows a given sender has;
# once ``n`` reaches ``0``, disconnect the sender after a while.
self._n = 0
self._zero_ts = 0
self._connected = False
def add_borrow(self):
self._n += 1
self._connected = True
def add_return(self):
self._n -= 1
assert self._n >= 0, 'returned sender more than it was borrowed'
if self._n == 0:
self._zero_ts = time.time()
def should_disconnect(self):
return (self._n == 0
and self._connected
and (time.time() - self._zero_ts) > _DISCONNECT_EXPORTED_AFTER)
def need_connect(self):
return not self._connected
def mark_disconnected(self):
assert self.should_disconnect(), 'marked as disconnected when it was borrowed'
self._connected = False
# TODO How hard would it be to support both `trio` and `asyncio`? # TODO How hard would it be to support both `trio` and `asyncio`?
class TelegramBaseClient(abc.ABC): class TelegramBaseClient(abc.ABC):
""" """
@ -314,9 +349,7 @@ class TelegramBaseClient(abc.ABC):
# Remember flood-waited requests to avoid making them again # Remember flood-waited requests to avoid making them again
self._flood_waited_requests = {} self._flood_waited_requests = {}
# Cache ``{dc_id: (n, MTProtoSender)}`` for all borrowed senders, # Cache ``{dc_id: (_ExportState, MTProtoSender)}`` for all borrowed senders
# being ``n`` the amount of borrows a given sender has; once ``n``
# reaches ``0`` it should be disconnected and removed.
self._borrowed_senders = {} self._borrowed_senders = {}
self._borrow_sender_lock = asyncio.Lock(loop=self._loop) self._borrow_sender_lock = asyncio.Lock(loop=self._loop)
@ -500,6 +533,15 @@ class TelegramBaseClient(abc.ABC):
async def _disconnect_coro(self: 'TelegramClient'): async def _disconnect_coro(self: 'TelegramClient'):
await self._disconnect() await self._disconnect()
# Also clean-up all exported senders because we're done with them
async with self._borrow_sender_lock:
for state, sender in self._borrowed_senders.values():
if state.should_disconnect():
# disconnect should never raise
await sender.disconnect()
self._borrowed_senders.clear()
# trio's nurseries would handle this for us, but this is asyncio. # trio's nurseries would handle this for us, but this is asyncio.
# All tasks spawned in the background should properly be terminated. # All tasks spawned in the background should properly be terminated.
if self._dispatching_updates_queue is None and self._updates_queue: if self._dispatching_updates_queue is None and self._updates_queue:
@ -598,8 +640,7 @@ class TelegramBaseClient(abc.ABC):
loggers=self._log, loggers=self._log,
proxy=self._proxy proxy=self._proxy
)) ))
self._log[__name__].info('Exporting authorization for data center %s', self._log[__name__].info('Exporting auth for new borrowed sender in %s', dc)
dc)
auth = await self(functions.auth.ExportAuthorizationRequest(dc_id)) auth = await self(functions.auth.ExportAuthorizationRequest(dc_id))
req = self._init_with(functions.auth.ImportAuthorizationRequest( req = self._init_with(functions.auth.ImportAuthorizationRequest(
id=auth.id, bytes=auth.bytes id=auth.id, bytes=auth.bytes
@ -616,11 +657,16 @@ class TelegramBaseClient(abc.ABC):
Once its job is over it should be `_return_exported_sender`. Once its job is over it should be `_return_exported_sender`.
""" """
async with self._borrow_sender_lock: async with self._borrow_sender_lock:
n, sender = self._borrowed_senders.get(dc_id, (0, None)) self._log[__name__].debug('Borrowing sender for dc_id %d', dc_id)
if not sender: state, sender = self._borrowed_senders.get(dc_id, (None, None))
if state is None:
state = _ExportState()
sender = await self._create_exported_sender(dc_id) sender = await self._create_exported_sender(dc_id)
sender.dc_id = dc_id sender.dc_id = dc_id
elif not n: self._borrowed_senders[dc_id] = (state, sender)
elif state.need_connect():
dc = await self._get_dc(dc_id) dc = await self._get_dc(dc_id)
await sender.connect(self._connection( await sender.connect(self._connection(
dc.ip_address, dc.ip_address,
@ -631,8 +677,7 @@ class TelegramBaseClient(abc.ABC):
proxy=self._proxy proxy=self._proxy
)) ))
self._borrowed_senders[dc_id] = (n + 1, sender) state.add_borrow()
return sender return sender
async def _return_exported_sender(self: 'TelegramClient', sender): async def _return_exported_sender(self: 'TelegramClient', sender):
@ -641,14 +686,23 @@ class TelegramBaseClient(abc.ABC):
been returned, the sender is cleanly disconnected. been returned, the sender is cleanly disconnected.
""" """
async with self._borrow_sender_lock: async with self._borrow_sender_lock:
dc_id = sender.dc_id self._log[__name__].debug('Returning borrowed sender for dc_id %d', sender.dc_id)
n, _ = self._borrowed_senders[dc_id] state, _ = self._borrowed_senders[sender.dc_id]
n -= 1 state.add_return()
self._borrowed_senders[dc_id] = (n, sender)
if not n: async def _clean_exported_senders(self: 'TelegramClient'):
"""
Cleans-up all unused exported senders by disconnecting them.
"""
async with self._borrow_sender_lock:
for dc_id, (state, sender) in self._borrowed_senders.items():
if state.should_disconnect():
self._log[__name__].info( self._log[__name__].info(
'Disconnecting borrowed sender for DC %d', dc_id) 'Disconnecting borrowed sender for DC %d', dc_id)
# Disconnect should never raise
await sender.disconnect() await sender.disconnect()
state.mark_disconnected()
async def _get_cdn_client(self: 'TelegramClient', cdn_redirect): async def _get_cdn_client(self: 'TelegramClient', cdn_redirect):
"""Similar to ._borrow_exported_client, but for CDNs""" """Similar to ._borrow_exported_client, but for CDNs"""

View File

@ -335,6 +335,9 @@ class UpdateMethods:
except Exception: except Exception:
continue # Any disconnected exception should be ignored continue # Any disconnected exception should be ignored
# Check if we have any exported senders to clean-up periodically
await self._clean_exported_senders()
# Don't bother sending pings until the low-level connection is # Don't bother sending pings until the low-level connection is
# ready, otherwise a lot of pings will be batched to be sent upon # ready, otherwise a lot of pings will be batched to be sent upon
# reconnect, when we really don't care about that. # reconnect, when we really don't care about that.