Revert async sessions from 3dd8b7c (breaks sync properties)

Such as ChatGetter.input_chat and SenderGetter.input_sender
which rely on the fact that access to the session file will
be synchronous.
This commit is contained in:
Lonami Exo 2018-10-12 22:00:02 +02:00
parent e677a6bb05
commit cf6686ff42
6 changed files with 29 additions and 21 deletions

View File

@ -399,7 +399,7 @@ class AuthMethods(MessageParseMethods, UserMethods):
self._self_input_peer = None self._self_input_peer = None
self._state.pts = -1 self._state.pts = -1
await self.disconnect() await self.disconnect()
await self.session.delete() self.session.delete()
return True return True
async def edit_2fa( async def edit_2fa(

View File

@ -223,9 +223,9 @@ class DownloadMethods(UserMethods):
config = await self(functions.help.GetConfigRequest()) config = await self(functions.help.GetConfigRequest())
for option in config.dc_options: for option in config.dc_options:
if option.ip_address == self.session.server_address: if option.ip_address == self.session.server_address:
await self.session.set_dc( self.session.set_dc(
option.id, option.ip_address, option.port) option.id, option.ip_address, option.port)
await self.session.save() self.session.save()
break break
# TODO Figure out why the session may have the wrong DC ID # TODO Figure out why the session may have the wrong DC ID

View File

@ -197,7 +197,15 @@ class TelegramBaseClient(abc.ABC):
) )
self.flood_sleep_threshold = flood_sleep_threshold self.flood_sleep_threshold = flood_sleep_threshold
self.session = AsyncClassWrapper(session)
# TODO Figure out how to use AsyncClassWrapper(session)
# The problem is that ChatGetter and SenderGetter rely
# on synchronous calls to session.get_entity precisely
# to avoid network access and the need for await.
#
# With asynchronous sessions, it would need await,
# and defeats the purpose of properties.
self.session = session
self.api_id = int(api_id) self.api_id = int(api_id)
self.api_hash = api_hash self.api_hash = api_hash
@ -325,8 +333,8 @@ class TelegramBaseClient(abc.ABC):
await self._disconnect() await self._disconnect()
if getattr(self, 'session', None): if getattr(self, 'session', None):
if getattr(self, '_state', None): if getattr(self, '_state', None):
await self.session.set_update_state(0, self._state) self.session.set_update_state(0, self._state)
await self.session.close() self.session.close()
async def _disconnect(self): async def _disconnect(self):
""" """
@ -364,11 +372,11 @@ class TelegramBaseClient(abc.ABC):
__log__.info('Reconnecting to new data center %s', new_dc) __log__.info('Reconnecting to new data center %s', new_dc)
dc = await self._get_dc(new_dc) dc = await self._get_dc(new_dc)
await self.session.set_dc(dc.id, dc.ip_address, dc.port) self.session.set_dc(dc.id, dc.ip_address, dc.port)
# auth_key's are associated with a server, which has now changed # auth_key's are associated with a server, which has now changed
# so it's not valid anymore. Set to None to force recreating it. # so it's not valid anymore. Set to None to force recreating it.
self.session.auth_key = None self.session.auth_key = None
await self.session.save() self.session.save()
await self._disconnect() await self._disconnect()
return await self.connect() return await self.connect()
@ -378,7 +386,7 @@ class TelegramBaseClient(abc.ABC):
new authorization key. This means we are not authorized. new authorization key. This means we are not authorized.
""" """
self.session.auth_key = auth_key self.session.auth_key = auth_key
await self.session.save() self.session.save()
# endregion # endregion
@ -466,7 +474,7 @@ class TelegramBaseClient(abc.ABC):
session = self._exported_sessions.get(cdn_redirect.dc_id) session = self._exported_sessions.get(cdn_redirect.dc_id)
if not session: if not session:
dc = await self._get_dc(cdn_redirect.dc_id, cdn=True) dc = await self._get_dc(cdn_redirect.dc_id, cdn=True)
session = await self.session.clone() session = self.session.clone()
await session.set_dc(dc.id, dc.ip_address, dc.port) await session.set_dc(dc.id, dc.ip_address, dc.port)
self._exported_sessions[cdn_redirect.dc_id] = session self._exported_sessions[cdn_redirect.dc_id] = session

View File

@ -137,7 +137,7 @@ class UpdateMethods(UserMethods):
This can also be used to forcibly fetch new updates if there are any. This can also be used to forcibly fetch new updates if there are any.
""" """
state = await self.session.get_update_state(0) state = self.session.get_update_state(0)
if not state or not state.pts: if not state or not state.pts:
state = await self(functions.updates.GetStateRequest()) state = await self(functions.updates.GetStateRequest())
@ -172,7 +172,7 @@ class UpdateMethods(UserMethods):
state.pts = d.pts state.pts = d.pts
break break
finally: finally:
await self.session.set_update_state(0, state) self.session.set_update_state(0, state)
self.session.catching_up = False self.session.catching_up = False
# endregion # endregion
@ -180,7 +180,7 @@ class UpdateMethods(UserMethods):
# region Private methods # region Private methods
async def _handle_update(self, update): async def _handle_update(self, update):
await self.session.process_entities(update) self.session.process_entities(update)
if isinstance(update, (types.Updates, types.UpdatesCombined)): if isinstance(update, (types.Updates, types.UpdatesCombined)):
entities = {utils.get_peer_id(x): x for x in entities = {utils.get_peer_id(x): x for x in
itertools.chain(update.users, update.chats)} itertools.chain(update.users, update.chats)}
@ -236,7 +236,7 @@ class UpdateMethods(UserMethods):
# inserted because this is a rather expensive operation # inserted because this is a rather expensive operation
# (default's sqlite3 takes ~0.1s to commit changes). Do # (default's sqlite3 takes ~0.1s to commit changes). Do
# it every minute instead. No-op if there's nothing new. # it every minute instead. No-op if there's nothing new.
await self.session.save() self.session.save()
# We need to send some content-related request at least hourly # We need to send some content-related request at least hourly
# for Telegram to keep delivering updates, otherwise they will # for Telegram to keep delivering updates, otherwise they will

View File

@ -213,7 +213,7 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
entity, media=types.InputMediaUploadedPhoto(fh) entity, media=types.InputMediaUploadedPhoto(fh)
)) ))
input_photo = utils.get_input_photo(r.photo) input_photo = utils.get_input_photo(r.photo)
await self.session.cache_file(fh.md5, fh.size, input_photo) self.session.cache_file(fh.md5, fh.size, input_photo)
fh = input_photo fh = input_photo
if captions: if captions:
@ -328,7 +328,7 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
file = stream.read() file = stream.read()
hash_md5.update(file) hash_md5.update(file)
if use_cache: if use_cache:
cached = await self.session.get_file( cached = self.session.get_file(
hash_md5.digest(), file_size, cls=use_cache hash_md5.digest(), file_size, cls=use_cache
) )
if cached: if cached:
@ -459,6 +459,6 @@ class UploadMethods(ButtonMethods, MessageParseMethods, UserMethods):
to_cache = utils.get_input_photo(msg.media.photo) to_cache = utils.get_input_photo(msg.media.photo)
else: else:
to_cache = utils.get_input_document(msg.media.document) to_cache = utils.get_input_document(msg.media.document)
await self.session.cache_file(md5, size, to_cache) self.session.cache_file(md5, size, to_cache)
# endregion # endregion

View File

@ -48,7 +48,7 @@ class UserMethods(TelegramBaseClient):
exceptions.append(e) exceptions.append(e)
results.append(None) results.append(None)
continue continue
await self.session.process_entities(result) self.session.process_entities(result)
exceptions.append(None) exceptions.append(None)
results.append(result) results.append(result)
request_index += 1 request_index += 1
@ -58,7 +58,7 @@ class UserMethods(TelegramBaseClient):
return results return results
else: else:
result = await future result = await future
await self.session.process_entities(result) self.session.process_entities(result)
return result return result
except (errors.ServerError, errors.RpcCallFailError) as e: except (errors.ServerError, errors.RpcCallFailError) as e:
__log__.warning('Telegram is having internal issues %s: %s', __log__.warning('Telegram is having internal issues %s: %s',
@ -288,7 +288,7 @@ class UserMethods(TelegramBaseClient):
try: try:
# First try to get the entity from cache, otherwise figure it out # First try to get the entity from cache, otherwise figure it out
return await self.session.get_input_entity(peer) return self.session.get_input_entity(peer)
except ValueError: except ValueError:
pass pass
@ -393,7 +393,7 @@ class UserMethods(TelegramBaseClient):
try: try:
# Nobody with this username, maybe it's an exact name/title # Nobody with this username, maybe it's an exact name/title
return await self.get_entity( return await self.get_entity(
await self.session.get_input_entity(string)) self.session.get_input_entity(string))
except ValueError: except ValueError:
pass pass