Flush in-memory cache to session after a limit is reached

Should fully close #3989.
Should help with #3235.
This commit is contained in:
Lonami Exo 2023-04-06 13:45:12 +02:00
parent cb04e269c0
commit 97b0ba6707
3 changed files with 42 additions and 1 deletions

View File

@ -52,3 +52,9 @@ class EntityCache:
def put(self, entity): def put(self, entity):
self.hash_map[entity.id] = (entity.hash, entity.ty) self.hash_map[entity.id] = (entity.hash, entity.ty)
def retain(self, filter):
self.hash_map = {k: v for k, v in self.hash_map.items() if filter(k)}
def __len__(self):
return len(self.hash_map)

View File

@ -208,6 +208,20 @@ class TelegramBaseClient(abc.ABC):
so event handlers, conversations, and QR login will not work. so event handlers, conversations, and QR login will not work.
However, certain scripts don't need updates, so this will reduce However, certain scripts don't need updates, so this will reduce
the amount of bandwidth used. the amount of bandwidth used.
entity_cache_limit (`int`, optional):
How many users, chats and channels to keep in the in-memory cache
at most. This limit is checked against when processing updates.
When this limit is reached or exceeded, all entities that are not
required for update handling will be flushed to the session file.
Note that this implies that there is a lower bound to the amount
of entities that must be kept in memory.
Setting this limit too low will cause the library to attempt to
flush entities to the session file even if no entities can be
removed from the in-memory cache, which will degrade performance.
""" """
# Current TelegramClient version # Current TelegramClient version
@ -245,7 +259,8 @@ class TelegramBaseClient(abc.ABC):
loop: asyncio.AbstractEventLoop = None, loop: asyncio.AbstractEventLoop = None,
base_logger: typing.Union[str, logging.Logger] = None, base_logger: typing.Union[str, logging.Logger] = None,
receive_updates: bool = True, receive_updates: bool = True,
catch_up: bool = False catch_up: bool = False,
entity_cache_limit: int = 5000
): ):
if not api_id or not api_hash: if not api_id or not api_hash:
raise ValueError( raise ValueError(
@ -432,6 +447,7 @@ class TelegramBaseClient(abc.ABC):
self._updates_queue = asyncio.Queue() self._updates_queue = asyncio.Queue()
self._message_box = MessageBox(self._log['messagebox']) self._message_box = MessageBox(self._log['messagebox'])
self._mb_entity_cache = MbEntityCache() # required for proper update handling (to know when to getDifference) self._mb_entity_cache = MbEntityCache() # required for proper update handling (to know when to getDifference)
self._entity_cache_limit = entity_cache_limit
self._sender = MTProtoSender( self._sender = MTProtoSender(
self.session.auth_key, self.session.auth_key,

View File

@ -7,6 +7,7 @@ import time
import traceback import traceback
import typing import typing
import logging import logging
import warnings
from collections import deque from collections import deque
from .. import events, utils, errors from .. import events, utils, errors
@ -281,6 +282,24 @@ class UpdateMethods:
continue continue
if len(self._mb_entity_cache) >= self._entity_cache_limit:
self._log[__name__].info(
'In-memory entity cache limit reached (%s/%s), flushing to session',
len(self._mb_entity_cache),
self._entity_cache_limit
)
self._save_states_and_entities()
self._mb_entity_cache.retain(lambda id: id == self._mb_entity_cache.self_id or id in self._message_box.map)
if len(self._mb_entity_cache) >= self._entity_cache_limit:
warnings.warn('in-memory entities exceed entity_cache_limit after flushing; consider setting a larger limit')
self._log[__name__].info(
'In-memory entity cache at %s/%s after flushing to session',
len(self._mb_entity_cache),
self._entity_cache_limit
)
get_diff = self._message_box.get_difference() get_diff = self._message_box.get_difference()
if get_diff: if get_diff:
self._log[__name__].debug('Getting difference for account updates') self._log[__name__].debug('Getting difference for account updates')