Telethon/telethon/_updates/entitycache.py
Lonami Exo a657ae0134 Save self user ID in session file
Should result in one less request after connecting,
as there is no longer a need to fetch the self user.
2023-04-06 14:18:42 +02:00

63 lines
1.8 KiB
Python

from .session import EntityType, Entity
_sentinel = object()
class EntityCache:
def __init__(
self,
hash_map: dict = _sentinel,
self_id: int = None,
self_bot: bool = None
):
self.hash_map = {} if hash_map is _sentinel else hash_map
self.self_id = self_id
self.self_bot = self_bot
def set_self_user(self, id, bot, hash):
self.self_id = id
self.self_bot = bot
if hash:
self.hash_map[id] = (hash, EntityType.BOT if bot else EntityType.USER)
def get(self, id):
try:
hash, ty = self.hash_map[id]
return Entity(ty, id, hash)
except KeyError:
return None
def extend(self, users, chats):
# See https://core.telegram.org/api/min for "issues" with "min constructors".
self.hash_map.update(
(u.id, (
u.access_hash,
EntityType.BOT if u.bot else EntityType.USER,
))
for u in users
if getattr(u, 'access_hash', None) and not u.min
)
self.hash_map.update(
(c.id, (
c.access_hash,
EntityType.MEGAGROUP if c.megagroup else (
EntityType.GIGAGROUP if getattr(c, 'gigagroup', None) else EntityType.CHANNEL
),
))
for c in chats
if getattr(c, 'access_hash', None) and not getattr(c, 'min', None)
)
def get_all_entities(self):
return [Entity(ty, id, hash) for id, (hash, ty) in self.hash_map.items()]
def put(self, entity):
self.hash_map[entity.id] = (entity.hash, entity.ty)
def retain(self, filter):
self.hash_map = {k: v for k, v in self.hash_map.items() if filter(k)}
def __len__(self):
return len(self.hash_map)