From 8d31209a58878896b559e34eb925a5c852ffcb4f Mon Sep 17 00:00:00 2001 From: M1ha Date: Wed, 6 Feb 2019 17:47:54 +0500 Subject: [PATCH] Trying to fix queue monitoring increment bug --- src/django_clickhouse/redis.py | 2 +- src/django_clickhouse/storages.py | 24 +++++++++++++----------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/django_clickhouse/redis.py b/src/django_clickhouse/redis.py index 63426e5..4ab9627 100644 --- a/src/django_clickhouse/redis.py +++ b/src/django_clickhouse/redis.py @@ -59,4 +59,4 @@ def redis_zadd(redis_client, key, mapping, **kwargs): else: items = [mapping] - return redis_client.zadd(key, *items, **kwargs) \ No newline at end of file + return redis_client.zadd(key, *items, **kwargs) diff --git a/src/django_clickhouse/storages.py b/src/django_clickhouse/storages.py index 4181d2c..e16a5ac 100644 --- a/src/django_clickhouse/storages.py +++ b/src/django_clickhouse/storages.py @@ -98,38 +98,40 @@ class Storage: """ raise NotImplemented() - def register_operations(self, import_key, operation, *pks): # type: (str, str, *Any) -> None + def register_operations(self, import_key, operation, *pks): # type: (str, str, *Any) -> int """ Registers new incoming operation :param import_key: A key, returned by ClickHouseModel.get_import_key() method :param operation: One of insert, update, delete :param pk: Primary key to find records in main database. Should be string-serializable with str() method. - :return: None + :return: Number of registered operations """ raise NotImplementedError() def register_operations_wrapped(self, import_key, operation, *pks): - # type: (str, str, *Any) -> None + # type: (str, str, *Any) -> int """ This is a wrapper for register_operation method, checking main parameters. This method should be called from inner functions. :param import_key: A key, returned by ClickHouseModel.get_import_key() method :param operation: One of insert, update, delete :param pks: Primary keys to find records in main database. Should be string-serializable with str() method. - :return: None + :return: Number of registered operations """ if operation not in {'insert', 'update', 'delete'}: raise ValueError('operation must be one of [insert, update, delete]') - statsd_key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key) - statsd.gauge(statsd_key, len(pks), delta=True) - logger.debug('django-clickhouse: registered %s on %d items (%s) to storage' - % (operation, len(pks), import_key)) - statsd_key = "%s.sync.%s.register_operations" % (config.STATSD_PREFIX, import_key) statsd.incr(statsd_key + '.%s' % operation, len(pks)) with statsd.timer(statsd_key): - return self.register_operations(import_key, operation, *pks) + ops_count = self.register_operations(import_key, operation, *pks) + + statsd_key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key) + statsd.gauge(statsd_key, ops_count, delta=True) + logger.debug('django-clickhouse: registered %s on %d items (%s) to storage' + % (operation, len(pks), import_key)) + + return ops_count def flush(self): """ @@ -180,7 +182,7 @@ class RedisStorage(with_metaclass(SingletonMeta, Storage)): score = datetime.datetime.now().timestamp() items = {'%s:%s' % (operation, str(pk)): score for pk in pks} - redis_zadd(self._redis, key, items) + return redis_zadd(self._redis, key, items) def operations_count(self, import_key, **kwargs): ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)