diff --git a/src/django_clickhouse/clickhouse_models.py b/src/django_clickhouse/clickhouse_models.py index 594aa06..73f2e32 100644 --- a/src/django_clickhouse/clickhouse_models.py +++ b/src/django_clickhouse/clickhouse_models.py @@ -211,13 +211,12 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): Gets one batch from storage and syncs it. :return: """ + import_key = cls.get_import_key() + storage = cls.get_storage() + statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, import_key) + try: - statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, cls.__name__) with statsd.timer(statsd_key.format('total')): - - storage = cls.get_storage() - import_key = cls.get_import_key() - with statsd.timer(statsd_key.format('steps.pre_sync')): storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout()) @@ -244,6 +243,10 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): storage.post_sync(import_key) except RedisLockTimeoutError: pass # skip this sync round if lock is acquired by another thread + except Exception as ex: + with statsd.timer(statsd_key.format('steps.post_sync')): + storage.post_sync_failed(import_key) + raise ex @classmethod def need_sync(cls): # type: () -> bool @@ -282,13 +285,12 @@ class ClickHouseMultiModel(ClickHouseModel): Gets one batch from storage and syncs it. :return: """ + import_key = cls.get_import_key() + storage = cls.get_storage() + statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, import_key) + try: - statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, cls.__name__) with statsd.timer(statsd_key.format('total')): - - storage = cls.get_storage() - import_key = cls.get_import_key() - with statsd.timer(statsd_key.format('steps.pre_sync')): storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout()) @@ -329,3 +331,7 @@ class ClickHouseMultiModel(ClickHouseModel): except RedisLockTimeoutError: pass # skip this sync round if lock is acquired by another thread + except Exception as ex: + with statsd.timer(statsd_key.format('steps.post_sync')): + storage.post_sync_failed(import_key) + raise ex diff --git a/src/django_clickhouse/storages.py b/src/django_clickhouse/storages.py index 418accd..4181d2c 100644 --- a/src/django_clickhouse/storages.py +++ b/src/django_clickhouse/storages.py @@ -57,6 +57,15 @@ class Storage: """ pass + def post_sync_failed(self, import_key, **kwargs): # type: (str, **dict) -> None + """ + This method is called after import process has finished with exception. + :param import_key: A key, returned by ClickHouseModel.get_import_key() method + :param kwargs: Storage dependant arguments + :return: None + """ + pass + def post_batch_removed(self, import_key, batch_size): # type: (str, int) -> None """ This method marks that batch has been removed in statsd @@ -245,6 +254,12 @@ class RedisStorage(with_metaclass(SingletonMeta, Storage)): logger.info('django-clickhouse: synced %d items (key: %s)' % (batch_size, import_key)) + def post_sync_failed(self, import_key, **kwargs): + # unblock lock after sync completed + lock_pid_key = self.REDIS_KEY_LOCK_PID.format(import_key=import_key) + self._redis.delete(lock_pid_key) + self.get_lock(import_key, **kwargs).release() + def flush(self): key_tpls = [ self.REDIS_KEY_RANK_TEMPLATE.format(import_key='*'), diff --git a/tests/test_utils.py b/tests/test_utils.py index e0a4667..5afb27b 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -14,9 +14,7 @@ from django_clickhouse.utils import get_tz_offset, format_datetime, lazy_class_i class GetTZOffsetTest(TestCase): def test_func(self): - ts = time.time() - utc_offset = (datetime.datetime.fromtimestamp(ts) - datetime.datetime.utcfromtimestamp(ts)).total_seconds() - self.assertEqual(utc_offset / 60, get_tz_offset()) + self.assertEqual(300, get_tz_offset()) class FormatDateTimeTest(TestCase):