From 4e340e62120d5d7d13427350febf89be39e04f29 Mon Sep 17 00:00:00 2001 From: M1ha Date: Wed, 14 Nov 2018 13:02:33 +0500 Subject: [PATCH] 1) Some bugs fixed 2) Added test on RedisStorage.post_sync --- src/django_clickhouse/storage.py | 28 +++++++++++++++++----------- tests/test_storages.py | 18 +++++++++++++++++- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/django_clickhouse/storage.py b/src/django_clickhouse/storage.py index 334bedc..a7c0152 100644 --- a/src/django_clickhouse/storage.py +++ b/src/django_clickhouse/storage.py @@ -128,36 +128,42 @@ class RedisStorage(Storage): def register_operation(self, import_key, operation, pk): key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) + score = datetime.datetime.now().timestamp() # key, score, value - self._redis.zadd(key, datetime.datetime.now().timestamp(), '%s:%s' % (operation, str(pk))) + self._redis.zadd(key, score, '%s:%s' % (operation, str(pk))) def get_operations(self, import_key, count, **kwargs): ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) - ops, scores = zip(*self._redis.zrangebyscore(ops_key, '-inf', datetime.datetime.now().timestamp(), - start=0, num=count, withscores=True)) + res = self._redis.zrangebyscore(ops_key, '-inf', datetime.datetime.now().timestamp(), start=0, num=count, + withscores=True) - ts_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) - self._redis.set(ts_key, max(scores)) + if res: + ops, scores = zip(*res) - return list(tuple(op.decode().split(':')) for op in ops) + ts_key = self.REDIS_KEY_TS_TEMPLATE.format(import_key=import_key) + self._redis.set(ts_key, max(scores)) + + return list(tuple(op.decode().split(':')) for op in ops) + else: + return [] def get_import_batch(self, import_key, **kwargs): - batch_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) + batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key) return tuple(item.decode() for item in self._redis.lrange(batch_key, 0, -1)) def write_import_batch(self, import_key, batch, **kwargs): - batch_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) + batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key) # Elements are pushed to the head, so we need to invert batch in order to save correct order self._redis.lpush(batch_key, *reversed(batch)) def post_sync(self, import_key, **kwargs): - ts_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) + ts_key = self.REDIS_KEY_TS_TEMPLATE.format(import_key=import_key) ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) - batch_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) + batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key) - score = float(self._redis.pipeline().get(ts_key)) + score = float(self._redis.get(ts_key)) self._redis.pipeline()\ .zremrangebyscore(ops_key, '-inf', score)\ .delete(batch_key)\ diff --git a/tests/test_storages.py b/tests/test_storages.py index b7ddcf5..7f57caa 100644 --- a/tests/test_storages.py +++ b/tests/test_storages.py @@ -9,7 +9,10 @@ class StorageTest(TestCase): def setUp(self): # Clean storage redis = self.storage._redis - redis.delete(*redis.keys('clickhouse_sync*')) + + keys = redis.keys('clickhouse_sync*') + if keys: + redis.delete(*keys) def test_operation_pks(self): self.storage.register_operation_wrapped('test', 'insert', 100500) @@ -46,3 +49,16 @@ class StorageTest(TestCase): def test_import_batch(self): self.storage.write_import_batch('test', [str(i) for i in range(10)]) self.assertTupleEqual(tuple(str(i) for i in range(10)), self.storage.get_import_batch('test')) + + def test_post_sync(self): + self.storage.register_operation_wrapped('test', 'insert', 100500) + self.storage.register_operation_wrapped('test', 'insert', 100501) + self.storage.get_operations('test', 10) + self.storage.write_import_batch('test', [str(i) for i in range(10)]) + self.storage.register_operation_wrapped('test', 'insert', 100502) + + self.storage.post_sync('test') + self.assertListEqual([ + ('insert', '100502') + ], self.storage.get_operations('test', 10)) + self.assertTupleEqual(tuple(), self.storage.get_import_batch('test'))