Trying to fix queue monitoring increment bug

This commit is contained in:
M1ha 2019-02-06 17:47:54 +05:00
parent 4249b0e2b9
commit 8d31209a58
2 changed files with 14 additions and 12 deletions

View File

@ -59,4 +59,4 @@ def redis_zadd(redis_client, key, mapping, **kwargs):
else: else:
items = [mapping] items = [mapping]
return redis_client.zadd(key, *items, **kwargs) return redis_client.zadd(key, *items, **kwargs)

View File

@ -98,38 +98,40 @@ class Storage:
""" """
raise NotImplemented() raise NotImplemented()
def register_operations(self, import_key, operation, *pks): # type: (str, str, *Any) -> None def register_operations(self, import_key, operation, *pks): # type: (str, str, *Any) -> int
""" """
Registers new incoming operation Registers new incoming operation
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
:param operation: One of insert, update, delete :param operation: One of insert, update, delete
:param pk: Primary key to find records in main database. Should be string-serializable with str() method. :param pk: Primary key to find records in main database. Should be string-serializable with str() method.
:return: None :return: Number of registered operations
""" """
raise NotImplementedError() raise NotImplementedError()
def register_operations_wrapped(self, import_key, operation, *pks): def register_operations_wrapped(self, import_key, operation, *pks):
# type: (str, str, *Any) -> None # type: (str, str, *Any) -> int
""" """
This is a wrapper for register_operation method, checking main parameters. This is a wrapper for register_operation method, checking main parameters.
This method should be called from inner functions. This method should be called from inner functions.
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
:param operation: One of insert, update, delete :param operation: One of insert, update, delete
:param pks: Primary keys to find records in main database. Should be string-serializable with str() method. :param pks: Primary keys to find records in main database. Should be string-serializable with str() method.
:return: None :return: Number of registered operations
""" """
if operation not in {'insert', 'update', 'delete'}: if operation not in {'insert', 'update', 'delete'}:
raise ValueError('operation must be one of [insert, update, delete]') raise ValueError('operation must be one of [insert, update, delete]')
statsd_key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key)
statsd.gauge(statsd_key, len(pks), delta=True)
logger.debug('django-clickhouse: registered %s on %d items (%s) to storage'
% (operation, len(pks), import_key))
statsd_key = "%s.sync.%s.register_operations" % (config.STATSD_PREFIX, import_key) statsd_key = "%s.sync.%s.register_operations" % (config.STATSD_PREFIX, import_key)
statsd.incr(statsd_key + '.%s' % operation, len(pks)) statsd.incr(statsd_key + '.%s' % operation, len(pks))
with statsd.timer(statsd_key): with statsd.timer(statsd_key):
return self.register_operations(import_key, operation, *pks) ops_count = self.register_operations(import_key, operation, *pks)
statsd_key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key)
statsd.gauge(statsd_key, ops_count, delta=True)
logger.debug('django-clickhouse: registered %s on %d items (%s) to storage'
% (operation, len(pks), import_key))
return ops_count
def flush(self): def flush(self):
""" """
@ -180,7 +182,7 @@ class RedisStorage(with_metaclass(SingletonMeta, Storage)):
score = datetime.datetime.now().timestamp() score = datetime.datetime.now().timestamp()
items = {'%s:%s' % (operation, str(pk)): score for pk in pks} items = {'%s:%s' % (operation, str(pk)): score for pk in pks}
redis_zadd(self._redis, key, items) return redis_zadd(self._redis, key, items)
def operations_count(self, import_key, **kwargs): def operations_count(self, import_key, **kwargs):
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)