From 762c1aa10d4ca8a53b080e328dc95ed96e6aa08c Mon Sep 17 00:00:00 2001 From: M1ha Date: Thu, 29 Nov 2018 19:10:41 +0500 Subject: [PATCH] 1) Fixed SyncKillTest 2) Added some logs --- src/django_clickhouse/models.py | 3 ++- src/django_clickhouse/redis.py | 10 +++++----- src/django_clickhouse/storages.py | 14 +++++++++----- tests/clickhouse_models.py | 1 + tests/test_sync.py | 25 ++++++++++++++----------- 5 files changed, 31 insertions(+), 22 deletions(-) diff --git a/src/django_clickhouse/models.py b/src/django_clickhouse/models.py index 1c78f86..4ecb0d1 100644 --- a/src/django_clickhouse/models.py +++ b/src/django_clickhouse/models.py @@ -175,7 +175,8 @@ class ClickHouseSyncModel(DjangoModel): def _on_commit(): for model_cls in cls.get_clickhouse_sync_models(): - storage.register_operations_wrapped(model_cls.get_import_key(), operation, *model_pks) + if model_cls.sync_enabled: + storage.register_operations_wrapped(model_cls.get_import_key(), operation, *model_pks) if len(model_pks) > 0: storage = cls.get_clickhouse_storage() diff --git a/src/django_clickhouse/redis.py b/src/django_clickhouse/redis.py index ce8f4d2..63426e5 100644 --- a/src/django_clickhouse/redis.py +++ b/src/django_clickhouse/redis.py @@ -26,16 +26,16 @@ class RedisLock: self.lock.release() def acquire(self): - logger.debug('Acquiring lock "%s" with pid %d' % (self.lock.name, os.getpid())) + logger.debug('django-clickhouse: acquiring lock "%s" with pid %d' % (self.lock.name, os.getpid())) if self.lock.acquire(): - logger.debug('Acquired lock "%s" with pid %d' % (self.lock.name, os.getpid())) + logger.debug('django-clickhouse: acquired lock "%s" with pid %d' % (self.lock.name, os.getpid())) return self else: - logger.debug('Timeout lock "%s" with pid %d' % (self.lock.name, os.getpid())) + logger.warning('django-clickhouse: timeout lock "%s" with pid %d' % (self.lock.name, os.getpid())) raise RedisLockTimeoutError() def release(self): - logger.debug('Releasing lock "%s" with pid %d' % (self.lock.name, os.getpid())) + logger.debug('django-clickhouse: releasing lock "%s" with pid %d' % (self.lock.name, os.getpid())) self.lock.release() def hard_release(self) -> bool: @@ -43,7 +43,7 @@ class RedisLock: Drops the lock, not looking if it is acquired by anyone. :return: Boolean - if lock has been acquired before releasing or not """ - logger.debug('Hard releasing lock "%s" with pid %d' % (self.lock.name, os.getpid())) + logger.warning('django-clickhouse: hard releasing lock "%s" with pid %d' % (self.lock.name, os.getpid())) return bool(self.lock.redis.delete(self.lock.name)) diff --git a/src/django_clickhouse/storages.py b/src/django_clickhouse/storages.py index 271de01..7d77d78 100644 --- a/src/django_clickhouse/storages.py +++ b/src/django_clickhouse/storages.py @@ -65,7 +65,6 @@ class Storage: """ key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key) statsd.gauge(key, -batch_size, delta=True) - logger.debug('Removed %d items (%s) from storage' % (batch_size, import_key)) def operations_count(self, import_key, **kwargs): # type: (str, **dict) -> int @@ -114,7 +113,7 @@ class Storage: statsd_key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key) statsd.gauge(statsd_key, len(pks), delta=True) - logger.debug('Registered %d items (%s) to storage' % (len(pks), import_key)) + logger.debug('django-clickhouse: registered %d items (%s) to storage' % (len(pks), import_key)) return self.register_operations(import_key, operation, *pks) @@ -209,9 +208,10 @@ class RedisStorage(Storage): except RedisLockTimeoutError: # Lock is busy. But If the process has been killed, I don't want to wait any more. # Let's check if pid exists - pid = int(self._redis.get(lock_pid_key)) - if not check_pid(pid): - logger.debug('Hard releasing lock "%s" locked by pid %d' % (import_key, pid)) + pid = int(self._redis.get(lock_pid_key) or 0) + if pid and not check_pid(pid): + logger.warning('django-clickhouse: hard releasing lock "%s" locked by pid %d' % (import_key, pid)) + self._redis.delete(lock_pid_key) lock.hard_release() self.pre_sync(import_key, **kwargs) else: @@ -231,8 +231,12 @@ class RedisStorage(Storage): self.post_batch_removed(import_key, batch_size) # unblock lock after sync completed + lock_pid_key = self.REDIS_KEY_LOCK_PID.format(import_key=import_key) + self._redis.delete(lock_pid_key) self.get_lock(import_key, **kwargs).release() + logger.info('django-clickhouse: synced %d items (key: %s)' % (batch_size, import_key)) + def flush(self): key_tpls = [ self.REDIS_KEY_TS_TEMPLATE.format(import_key='*'), diff --git a/tests/clickhouse_models.py b/tests/clickhouse_models.py index 33d5bc4..cfc18a6 100644 --- a/tests/clickhouse_models.py +++ b/tests/clickhouse_models.py @@ -20,6 +20,7 @@ class ClickHouseTestModel(ClickHouseModel): class ClickHouseCollapseTestModel(ClickHouseModel): django_model = TestModel sync_delay = 2 + sync_enabled = True id = fields.Int32Field() created_date = fields.DateField() diff --git a/tests/test_sync.py b/tests/test_sync.py index ee50fb2..628e635 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -1,12 +1,9 @@ import datetime -import signal -from multiprocessing import Process from subprocess import Popen from time import sleep -from unittest import skip, expectedFailure +from unittest import expectedFailure import os -from django.db.models import F from django.test import TransactionTestCase from django.utils.timezone import now from random import randint @@ -113,8 +110,10 @@ class KillTest(TransactionTestCase): def _check_data(self): # Sync all data that is not synced + # Data is expected to be in test_db, not default. So we need to call subprocess + # in order everything works correctly while ClickHouseCollapseTestModel.get_storage().operations_count(ClickHouseCollapseTestModel.get_import_key()): - ClickHouseCollapseTestModel.sync_batch_from_storage() + self.sync_iteration(False) ch_data = list(connections['default'].select('SELECT * FROM $table FINAL ORDER BY id', model_class=ClickHouseCollapseTestModel)) @@ -125,23 +124,27 @@ class KillTest(TransactionTestCase): self.assertListEqual(ch_data, [serializer.serialize(item) for item in pg_data]) @classmethod - def sync_iteration(cls): + def sync_iteration(cls, kill=True): test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py') p_sync = Popen(['python3', test_script, 'sync', '--test-time', str(cls.TEST_TIME)]) - sleep(randint(0, 5)) - print('Killing: %d' % p_sync.pid) - p_sync.kill() + + if kill: + sleep(randint(0, 5)) + print('Killing: %d' % p_sync.pid) + p_sync.kill() + else: + p_sync.wait() def test_kills(self): test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py') p_create = Popen(['python3', test_script, 'create', '--test-time', str(self.TEST_TIME)]) - p_update = Popen(['python3', test_script, 'update', '--test-time', str(self.TEST_TIME)]) + # p_update = Popen(['python3', test_script, 'update', '--test-time', str(self.TEST_TIME)]) start = now() while (now() - start).total_seconds() < self.TEST_TIME: self.sync_iteration() p_create.wait() - p_update.wait() + # p_update.wait() self._check_data()