1) Finished sync celery tasks.

2) RedisStorage now prevents parallel sync with RedisLock
This commit is contained in:
M1ha 2018-11-20 17:24:15 +05:00
parent 6e4a5d7723
commit 6e0bc424d7
10 changed files with 258 additions and 8 deletions

View File

@ -7,6 +7,7 @@ from itertools import chain
from typing import List, Tuple
from django.db.models import Model as DjangoModel
from django.utils.timezone import now
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase
from six import with_metaclass
@ -47,6 +48,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
sync_storage = None
sync_delay = None
sync_database_alias = None
sync_lock_timeout = None
def get_database(self, for_write=False):
# type: (bool) -> Database
@ -78,6 +80,10 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
def get_sync_delay(cls):
return cls.sync_delay or config.SYNC_DELAY
@classmethod
def get_lock_timeout(cls):
return cls.sync_lock_timeout or cls.get_sync_delay() * 10
@classmethod
def get_import_key(cls):
return cls.__name__
@ -133,7 +139,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
conn = connections[cls.sync_database_alias]
with statsd.timer(statsd_key.format('pre_sync')):
storage.pre_sync(import_key)
storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout())
with statsd.timer(statsd_key.format('get_import_batch')):
batch = storage.get_import_batch(import_key)
@ -162,6 +168,24 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
with statsd.timer(statsd_key.format('post_sync')):
storage.post_sync(import_key)
storage.set_last_sync_time(import_key, now())
@classmethod
def need_sync(cls): # type: () -> bool
"""
Checks if this model needs synchronization: sync is enabled and delay has passed
:return: Boolean
"""
if not cls.sync_enabled:
return False
last_sync_time = cls.get_storage().get_last_sync_time(cls.get_import_key())
if last_sync_time is None:
return True
return (last_sync_time - datetime.datetime.now()).total_seconds() >= cls.get_sync_delay()
# class ClickHouseModelConverter:
# """

View File

@ -43,8 +43,11 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
It also supposes primary key to by id
:param model_cls: ClickHouseModel subclass to import
:param objects: Objects for which final versions are searched
:return: A list of
:return: A list of model objects
"""
if not objects:
return []
min_date, max_date = None, None
for obj in objects:
obj_date = getattr(obj, self.date_col)

View File

@ -11,3 +11,7 @@ class DBAliasError(Exception):
def __init__(self, alias):
super(DBAliasError, self).__init__(
"Database alias `%s` is not found. Check %s parameter" % (alias, PREFIX + 'DATABASES'))
class RedisLockTimeoutError(Exception):
pass

View File

@ -0,0 +1,35 @@
"""
Contains additional components for redis-py to use in RedisStorage
"""
from .exceptions import RedisLockTimeoutError
class RedisLock:
"""
Fixes issue of https://github.com/andymccurdy/redis-py/issues/621
"""
def __init__(self, redis_client, *args, **kwargs):
self.lock = redis_client.lock(*args, **kwargs)
def __enter__(self):
if self.lock.acquire():
return self
else:
raise RedisLockTimeoutError()
def __exit__(self, type, value, tb):
self.lock.release()
def acquire(self):
self.lock.acquire()
def release(self):
self.lock.release()
def hard_release(self) -> bool:
"""
Drops the lock, not looking if it is acquired by anyone.
:return: Boolean - if lock has been acquired before releasing or not
"""
return bool(self.lock.redis.delete(self.lock.name))

View File

@ -129,6 +129,20 @@ class Storage:
"""
raise NotImplemented()
def get_last_sync_time(self, import_key): # type: (str) -> Optional[datetime.datetime]
"""
Gets the last time, sync has been executed
:return: datetime.datetime if last sync has been. Otherwise - None.
"""
raise NotImplemented()
def set_last_sync_time(self, import_key, dt): # type: (str, datetime.datetime) -> None
"""
Sets successful sync time
:return: None
"""
raise NotImplemented()
class RedisStorage(Storage):
"""
@ -140,6 +154,8 @@ class RedisStorage(Storage):
REDIS_KEY_OPS_TEMPLATE = 'clickhouse_sync:operations:{import_key}'
REDIS_KEY_TS_TEMPLATE = 'clickhouse_sync:timstamp:{import_key}'
REDIS_KEY_BATCH_TEMPLATE = 'clickhouse_sync:batch:{import_key}'
REDIS_KEY_LOCK = 'clickhouse_sync:lock:{import_key}'
REDIS_KEY_LAST_SYNC_TS = 'clickhouse_sync:last_sync:{import_key}'
def __init__(self):
# Create redis library connection. If redis is not connected properly errors should be raised
@ -148,6 +164,7 @@ class RedisStorage(Storage):
from redis import StrictRedis
self._redis = StrictRedis(**config.REDIS_CONFIG)
self._lock = None
def register_operations(self, import_key, operation, *pks):
key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
@ -184,6 +201,21 @@ class RedisStorage(Storage):
batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key)
self._redis.lpush(batch_key, *reversed(batch))
def get_lock(self, import_key, **kwargs):
if self._lock is None:
from .redis import RedisLock
lock_key = self.REDIS_KEY_LOCK.format(import_key=import_key)
lock_timeout = kwargs.get('lock_timeout', config.SYNC_DELAY * 10)
self._lock = RedisLock(self._redis, lock_key, timeout=lock_timeout, blocking_timeout=0)
return self._lock
def pre_sync(self, import_key, **kwargs):
# Block process to be single threaded. Default sync delay is 10 * default sync delay.
# It can be changed for model, by passing `lock_timeout` argument to pre_sync
lock = self.get_lock(import_key, **kwargs)
lock.acquire()
def post_sync(self, import_key, **kwargs):
ts_key = self.REDIS_KEY_TS_TEMPLATE.format(import_key=import_key)
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
@ -201,14 +233,30 @@ class RedisStorage(Storage):
self.post_batch_removed(import_key, batch_size)
# unblock lock after sync completed
self._lock.release()
def flush(self):
key_tpls = [
self.REDIS_KEY_TS_TEMPLATE.format(import_key='*'),
self.REDIS_KEY_OPS_TEMPLATE.format(import_key='*'),
self.REDIS_KEY_BATCH_TEMPLATE.format(import_key='*')
self.REDIS_KEY_BATCH_TEMPLATE.format(import_key='*'),
self.REDIS_KEY_LOCK.format(import_key='*'),
self.REDIS_KEY_LAST_SYNC_TS.format(import_key='*')
]
for tpl in key_tpls:
keys = self._redis.keys(tpl)
if keys:
self._redis.delete(*keys)
def get_last_sync_time(self, import_key):
sync_ts_key = self.REDIS_KEY_LAST_SYNC_TS.format(import_key=import_key)
res = self._redis.get(sync_ts_key)
if res is None:
return None
return datetime.datetime.fromtimestamp(float(res))
def set_last_sync_time(self, import_key, dt):
sync_ts_key = self.REDIS_KEY_LAST_SYNC_TS.format(import_key=import_key)
self._redis.set(sync_ts_key, dt.timestamp())

View File

@ -32,7 +32,7 @@ def clickhouse_auto_sync():
# Start
for cls in get_subclasses(ClickHouseModel, recursive=True):
if cls.start_sync():
if cls.need_sync():
# Даже если синхронизация вдруг не выполнится, не страшно, что мы установили период синхронизации
# Она выполнится следующей таской через интервал.
sync_clickhouse_converter.delay(cls)

View File

@ -7,7 +7,7 @@ from tests.models import TestModel
class ClickHouseTestModel(ClickHouseModel):
django_model = TestModel
sync_delay = 5
sync_delay = 2
id = fields.Int32Field()
created_date = fields.DateField()
@ -18,7 +18,7 @@ class ClickHouseTestModel(ClickHouseModel):
class ClickHouseCollapseTestModel(ClickHouseModel):
django_model = TestModel
sync_delay = 5
sync_delay = 2
id = fields.Int32Field()
created_date = fields.DateField()

View File

@ -11,7 +11,33 @@ DATABASES = {
'PASSWORD': 'test',
'HOST': '127.0.0.1',
'PORT': '5432'
}
},
# I need separate connections for multiprocessing tests
'create': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'test',
'USER': 'test',
'PASSWORD': 'test',
'HOST': '127.0.0.1',
'PORT': '5432'
},
'update': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'test',
'USER': 'test',
'PASSWORD': 'test',
'HOST': '127.0.0.1',
'PORT': '5432'
},
'delete': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'test',
'USER': 'test',
'PASSWORD': 'test',
'HOST': '127.0.0.1',
'PORT': '5432'
},
}
LOGGING = {

View File

@ -1,3 +1,5 @@
import datetime
from django.test import TestCase
from django_clickhouse.storages import RedisStorage
@ -49,6 +51,7 @@ class StorageTest(TestCase):
self.assertTupleEqual(tuple(str(i) for i in range(10)), self.storage.get_import_batch('test'))
def test_post_sync(self):
self.storage.pre_sync('test')
self.storage.register_operations_wrapped('test', 'insert', 100500)
self.storage.register_operations_wrapped('test', 'insert', 100501)
self.storage.get_operations('test', 10)
@ -60,3 +63,8 @@ class StorageTest(TestCase):
('insert', '100502')
], self.storage.get_operations('test', 10))
self.assertIsNone(self.storage.get_import_batch('test'))
def test_last_sync(self):
dt = datetime.datetime.now()
self.storage.set_last_sync_time('test', dt)
self.assertEqual(dt, self.storage.get_last_sync_time('test'))

View File

@ -1,7 +1,17 @@
import datetime
import signal
from django.test import TransactionTestCase
import os
from multiprocessing import Process
from time import sleep
from unittest import skip, expectedFailure
import random
from django.db import connections as django_connections
from django.db.models import F
from django.test import TransactionTestCase, override_settings
from django_clickhouse import config
from django_clickhouse.database import connections
from django_clickhouse.migrations import migrate_app
from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel
@ -15,6 +25,7 @@ class SyncTest(TransactionTestCase):
self.db.db_exists = False
self.db.create_database()
migrate_app('tests', 'default')
ClickHouseTestModel.get_storage().flush()
def test_simple(self):
obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
@ -32,6 +43,7 @@ class SyncTest(TransactionTestCase):
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
synced_data = list(ClickHouseCollapseTestModel.objects_in(connections['default']))
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
@ -47,3 +59,93 @@ class SyncTest(TransactionTestCase):
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
@expectedFailure
def test_collapsing_delete(self):
obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
ClickHouseCollapseTestModel.sync_batch_from_storage()
obj.delete()
ClickHouseCollapseTestModel.sync_batch_from_storage()
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
synced_data = list(ClickHouseCollapseTestModel.objects_in(connections['default']))
self.assertEqual(0, len(synced_data))
@skip("This doesn't work due to different threads connection problems")
class KillTest(TransactionTestCase):
TEST_TIME = 30
start = datetime.datetime.now()
def setUp(self):
ClickHouseTestModel.get_storage().flush()
@staticmethod
def _create_process(count=1000, test_time=60, period=1):
for iteration in range(test_time):
TestModel.objects.using('create').bulk_create([
TestModel(created_date='2018-01-01', value=iteration * count + i) for i in range(count)])
django_connections['create'].close()
sleep(period)
@staticmethod
def _update_process(count=1000, test_time=60, period=1):
for iteration in range(test_time):
TestModel.objects.using('update').filter(id__gte=iteration * count).annotate(idmod10=F('id') % 10). \
filter(idmod10=0).update(value=-1)
django_connections['update'].close()
sleep(period)
@staticmethod
def _delete_process(count=1000, test_time=60, period=1):
for iteration in range(test_time):
TestModel.objects.using('delete').filter(id__gte=iteration * count).annotate(idmod10=F('id') % 10). \
filter(idmod10=1).delete()
django_connections['delete'].close()
sleep(period)
@classmethod
def _sync_process(cls, period=1):
while (datetime.datetime.now() - cls.start).total_seconds() < cls.TEST_TIME:
ClickHouseCollapseTestModel.sync_batch_from_storage()
sleep(period)
def _kill_process(self, p):
# https://stackoverflow.com/questions/47553120/kill-a-multiprocessing-pool-with-sigkill-instead-of-sigterm-i-think
os.kill(p.pid, signal.SIGKILL)
p.terminate()
def _check_data(self):
ClickHouseCollapseTestModel.sync_batch_from_storage()
ch_data = list(connections['default'].select('SELECT * FROM $table FINAL ORDER BY id',
model_class=ClickHouseCollapseTestModel))
pg_data = list(TestModel.objects.all().order_by('id'))
self.assertEqual(len(pg_data), len(ch_data))
serizlier = ClickHouseCollapseTestModel.get_django_model_serializer()
self.assertListEqual(ch_data, [serizlier.serialize(item) for item in pg_data])
def test_kills(self):
p_create = Process(target=self._create_process, kwargs={'test_time': 5})
p_update = Process(target=self._update_process, kwargs={'test_time': 5})
p_delete = Process(target=self._delete_process, kwargs={'test_time': 5})
p_sync = Process(target=self._sync_process)
self.start = datetime.datetime.now()
p_create.start()
p_update.start()
p_delete.start()
p_sync.start()
# while (datetime.datetime.now() - start).total_seconds() < self.TEST_TIME:
# self._kill_process(p_sync)
# p_sync.start()
# sleep(random.randint(0, 5))
p_create.join()
p_update.join()
p_delete.join()
p_sync.join()
# self._check_data()