Make disconnect synchronous

This also fixes a bug when auto-reconnecting in MTProtoSender.
This commit is contained in:
Lonami Exo 2018-10-16 11:56:17 +02:00
parent 477fbd8dc7
commit 9cbc088b76
6 changed files with 28 additions and 34 deletions

View File

@ -398,7 +398,7 @@ class AuthMethods(MessageParseMethods, UserMethods):
self._self_input_peer = None self._self_input_peer = None
self._state.pts = -1 self._state.pts = -1
await self.disconnect() self.disconnect()
self.session.delete() self.session.delete()
return True return True
@ -489,14 +489,9 @@ class AuthMethods(MessageParseMethods, UserMethods):
return await self.start() return await self.start()
def __exit__(self, *args): def __exit__(self, *args):
if self._loop.is_running(): self.disconnect()
self._loop.create_task(self.disconnect())
elif inspect.iscoroutinefunction(self.disconnect):
self._loop.run_until_complete(self.disconnect())
else:
self.disconnect()
async def __aexit__(self, *args): async def __aexit__(self, *args):
await self.disconnect() self.disconnect()
# endregion # endregion

View File

@ -268,7 +268,7 @@ class DownloadMethods(UserMethods):
if exported: if exported:
await self._return_exported_sender(sender) await self._return_exported_sender(sender)
elif sender != self._sender: elif sender != self._sender:
await sender.disconnect() sender.disconnect()
if isinstance(file, str) or in_memory: if isinstance(file, str) or in_memory:
f.close() f.close()

View File

@ -327,17 +327,25 @@ class TelegramBaseClient(abc.ABC):
sender = getattr(self, '_sender', None) sender = getattr(self, '_sender', None)
return sender and sender.is_connected() return sender and sender.is_connected()
async def disconnect(self): def disconnect(self):
""" """
Disconnects from Telegram. Disconnects from Telegram.
Returns a dummy completed future with ``None`` as a result so
you can ``await`` this method just like every other method for
consistency or compatibility.
""" """
await self._disconnect() self._disconnect()
if getattr(self, 'session', None): if getattr(self, 'session', None):
if getattr(self, '_state', None): if getattr(self, '_state', None):
self.session.set_update_state(0, self._state) self.session.set_update_state(0, self._state)
self.session.close() self.session.close()
async def _disconnect(self): result = self._loop.create_future()
result.set_result(None)
return result
def _disconnect(self):
""" """
Disconnect only, without closing the session. Used in reconnections Disconnect only, without closing the session. Used in reconnections
to different data centers, where we don't want to close the session to different data centers, where we don't want to close the session
@ -347,24 +355,20 @@ class TelegramBaseClient(abc.ABC):
# All properties may be ``None`` if `__init__` fails, and this # All properties may be ``None`` if `__init__` fails, and this
# method will be called from `__del__` which would crash then. # method will be called from `__del__` which would crash then.
if getattr(self, '_sender', None): if getattr(self, '_sender', None):
await self._sender.disconnect() self._sender.disconnect()
if getattr(self, '_updates_handle', None): if getattr(self, '_updates_handle', None):
await self._updates_handle self._updates_handle.cancel()
def __del__(self): def __del__(self):
if not self.is_connected() or self.loop.is_closed(): if not self.is_connected() or self.loop.is_closed():
return return
# READ THIS IF DISCONNECT IS ASYNC AND A TASK WOULD BE MADE.
# Python 3.5.2's ``asyncio`` mod seems to have a bug where it's not # Python 3.5.2's ``asyncio`` mod seems to have a bug where it's not
# able to close the pending tasks properly, and letting the script # able to close the pending tasks properly, and letting the script
# complete without calling disconnect causes the script to trigger # complete without calling disconnect causes the script to trigger
# 100% CPU load. Call disconnect to make sure it doesn't happen. # 100% CPU load. Call disconnect to make sure it doesn't happen.
if not inspect.iscoroutinefunction(self.disconnect): self.disconnect()
self.disconnect()
elif self._loop.is_running():
self._loop.create_task(self.disconnect())
else:
self._loop.run_until_complete(self.disconnect())
async def _switch_dc(self, new_dc): async def _switch_dc(self, new_dc):
""" """
@ -378,7 +382,7 @@ class TelegramBaseClient(abc.ABC):
# so it's not valid anymore. Set to None to force recreating it. # so it's not valid anymore. Set to None to force recreating it.
self.session.auth_key = None self.session.auth_key = None
self.session.save() self.session.save()
await self._disconnect() self._disconnect()
return await self.connect() return await self.connect()
async def _auth_key_callback(self, auth_key): async def _auth_key_callback(self, auth_key):
@ -466,7 +470,7 @@ class TelegramBaseClient(abc.ABC):
self._borrowed_senders[dc_id] = (n, sender) self._borrowed_senders[dc_id] = (n, sender)
if not n: if not n:
__log__.info('Disconnecting borrowed sender for DC %d', dc_id) __log__.info('Disconnecting borrowed sender for DC %d', dc_id)
await sender.disconnect() sender.disconnect()
async def _get_cdn_client(self, cdn_redirect): async def _get_cdn_client(self, cdn_redirect):
"""Similar to ._borrow_exported_client, but for CDNs""" """Similar to ._borrow_exported_client, but for CDNs"""

View File

@ -20,7 +20,7 @@ class UpdateMethods(UserMethods):
try: try:
await self.disconnected await self.disconnected
except KeyboardInterrupt: except KeyboardInterrupt:
await self.disconnect() self.disconnect()
def run_until_disconnected(self): def run_until_disconnected(self):
""" """
@ -37,12 +37,7 @@ class UpdateMethods(UserMethods):
try: try:
return self.loop.run_until_complete(self.disconnected) return self.loop.run_until_complete(self.disconnected)
except KeyboardInterrupt: except KeyboardInterrupt:
# Importing the magic sync module turns disconnect into sync. self.disconnect()
# TODO Maybe disconnect() should not need the magic module...
if inspect.iscoroutinefunction(self.disconnect):
self.loop.run_until_complete(self.disconnect())
else:
self.disconnect()
def on(self, event): def on(self, event):
""" """
@ -223,7 +218,7 @@ class UpdateMethods(UserMethods):
except asyncio.TimeoutError: except asyncio.TimeoutError:
pass pass
except asyncio.CancelledError: except asyncio.CancelledError:
await self.disconnect() self.disconnect()
return return
except Exception as e: except Exception as e:
continue # Any disconnected exception should be ignored continue # Any disconnected exception should be ignored

View File

@ -119,7 +119,7 @@ class MTProtoSender:
def is_connected(self): def is_connected(self):
return self._user_connected return self._user_connected
async def disconnect(self): def disconnect(self):
""" """
Cleanly disconnects the instance from the network, cancels Cleanly disconnects the instance from the network, cancels
all pending requests, and closes the send and receive loops. all pending requests, and closes the send and receive loops.
@ -223,7 +223,7 @@ class MTProtoSender:
else: else:
e = ConnectionError('auth_key generation failed {} times' e = ConnectionError('auth_key generation failed {} times'
.format(self._retries)) .format(self._retries))
await self._disconnect(error=e) self._disconnect(error=e)
raise e raise e
__log__.debug('Starting send loop') __log__.debug('Starting send loop')
@ -310,7 +310,7 @@ class MTProtoSender:
break break
else: else:
__log__.error('Failed to reconnect automatically.') __log__.error('Failed to reconnect automatically.')
await self._disconnect(error=ConnectionError()) self._disconnect(error=ConnectionError())
def _start_reconnect(self): def _start_reconnect(self):
"""Starts a reconnection in the background.""" """Starts a reconnection in the background."""

View File

@ -365,7 +365,7 @@ async def main(loop, interval=0.05):
if 'application has been destroyed' not in e.args[0]: if 'application has been destroyed' not in e.args[0]:
raise raise
finally: finally:
await app.cl.disconnect() app.cl.disconnect()
if __name__ == "__main__": if __name__ == "__main__":