1) Some bugs fixed

2) Added test on RedisStorage.post_sync
This commit is contained in:
M1ha 2018-11-14 13:02:33 +05:00
parent bac333d03a
commit 4e340e6212
2 changed files with 34 additions and 12 deletions

View File

@ -128,36 +128,42 @@ class RedisStorage(Storage):
def register_operation(self, import_key, operation, pk):
key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
score = datetime.datetime.now().timestamp()
# key, score, value
self._redis.zadd(key, datetime.datetime.now().timestamp(), '%s:%s' % (operation, str(pk)))
self._redis.zadd(key, score, '%s:%s' % (operation, str(pk)))
def get_operations(self, import_key, count, **kwargs):
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
ops, scores = zip(*self._redis.zrangebyscore(ops_key, '-inf', datetime.datetime.now().timestamp(),
start=0, num=count, withscores=True))
res = self._redis.zrangebyscore(ops_key, '-inf', datetime.datetime.now().timestamp(), start=0, num=count,
withscores=True)
ts_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
self._redis.set(ts_key, max(scores))
if res:
ops, scores = zip(*res)
return list(tuple(op.decode().split(':')) for op in ops)
ts_key = self.REDIS_KEY_TS_TEMPLATE.format(import_key=import_key)
self._redis.set(ts_key, max(scores))
return list(tuple(op.decode().split(':')) for op in ops)
else:
return []
def get_import_batch(self, import_key, **kwargs):
batch_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key)
return tuple(item.decode() for item in self._redis.lrange(batch_key, 0, -1))
def write_import_batch(self, import_key, batch, **kwargs):
batch_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key)
# Elements are pushed to the head, so we need to invert batch in order to save correct order
self._redis.lpush(batch_key, *reversed(batch))
def post_sync(self, import_key, **kwargs):
ts_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
ts_key = self.REDIS_KEY_TS_TEMPLATE.format(import_key=import_key)
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
batch_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key)
score = float(self._redis.pipeline().get(ts_key))
score = float(self._redis.get(ts_key))
self._redis.pipeline()\
.zremrangebyscore(ops_key, '-inf', score)\
.delete(batch_key)\

View File

@ -9,7 +9,10 @@ class StorageTest(TestCase):
def setUp(self):
# Clean storage
redis = self.storage._redis
redis.delete(*redis.keys('clickhouse_sync*'))
keys = redis.keys('clickhouse_sync*')
if keys:
redis.delete(*keys)
def test_operation_pks(self):
self.storage.register_operation_wrapped('test', 'insert', 100500)
@ -46,3 +49,16 @@ class StorageTest(TestCase):
def test_import_batch(self):
self.storage.write_import_batch('test', [str(i) for i in range(10)])
self.assertTupleEqual(tuple(str(i) for i in range(10)), self.storage.get_import_batch('test'))
def test_post_sync(self):
self.storage.register_operation_wrapped('test', 'insert', 100500)
self.storage.register_operation_wrapped('test', 'insert', 100501)
self.storage.get_operations('test', 10)
self.storage.write_import_batch('test', [str(i) for i in range(10)])
self.storage.register_operation_wrapped('test', 'insert', 100502)
self.storage.post_sync('test')
self.assertListEqual([
('insert', '100502')
], self.storage.get_operations('test', 10))
self.assertTupleEqual(tuple(), self.storage.get_import_batch('test'))