mirror of
https://github.com/carrotquest/django-clickhouse.git
synced 2024-11-22 00:56:37 +03:00
Release lock if something goes wrong in sync
This commit is contained in:
parent
bf69c29e4f
commit
cc1502d4a4
|
@ -211,13 +211,12 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
|
||||||
Gets one batch from storage and syncs it.
|
Gets one batch from storage and syncs it.
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
|
import_key = cls.get_import_key()
|
||||||
|
storage = cls.get_storage()
|
||||||
|
statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, import_key)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, cls.__name__)
|
|
||||||
with statsd.timer(statsd_key.format('total')):
|
with statsd.timer(statsd_key.format('total')):
|
||||||
|
|
||||||
storage = cls.get_storage()
|
|
||||||
import_key = cls.get_import_key()
|
|
||||||
|
|
||||||
with statsd.timer(statsd_key.format('steps.pre_sync')):
|
with statsd.timer(statsd_key.format('steps.pre_sync')):
|
||||||
storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout())
|
storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout())
|
||||||
|
|
||||||
|
@ -244,6 +243,10 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
|
||||||
storage.post_sync(import_key)
|
storage.post_sync(import_key)
|
||||||
except RedisLockTimeoutError:
|
except RedisLockTimeoutError:
|
||||||
pass # skip this sync round if lock is acquired by another thread
|
pass # skip this sync round if lock is acquired by another thread
|
||||||
|
except Exception as ex:
|
||||||
|
with statsd.timer(statsd_key.format('steps.post_sync')):
|
||||||
|
storage.post_sync_failed(import_key)
|
||||||
|
raise ex
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def need_sync(cls): # type: () -> bool
|
def need_sync(cls): # type: () -> bool
|
||||||
|
@ -282,13 +285,12 @@ class ClickHouseMultiModel(ClickHouseModel):
|
||||||
Gets one batch from storage and syncs it.
|
Gets one batch from storage and syncs it.
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
|
import_key = cls.get_import_key()
|
||||||
|
storage = cls.get_storage()
|
||||||
|
statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, import_key)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, cls.__name__)
|
|
||||||
with statsd.timer(statsd_key.format('total')):
|
with statsd.timer(statsd_key.format('total')):
|
||||||
|
|
||||||
storage = cls.get_storage()
|
|
||||||
import_key = cls.get_import_key()
|
|
||||||
|
|
||||||
with statsd.timer(statsd_key.format('steps.pre_sync')):
|
with statsd.timer(statsd_key.format('steps.pre_sync')):
|
||||||
storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout())
|
storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout())
|
||||||
|
|
||||||
|
@ -329,3 +331,7 @@ class ClickHouseMultiModel(ClickHouseModel):
|
||||||
|
|
||||||
except RedisLockTimeoutError:
|
except RedisLockTimeoutError:
|
||||||
pass # skip this sync round if lock is acquired by another thread
|
pass # skip this sync round if lock is acquired by another thread
|
||||||
|
except Exception as ex:
|
||||||
|
with statsd.timer(statsd_key.format('steps.post_sync')):
|
||||||
|
storage.post_sync_failed(import_key)
|
||||||
|
raise ex
|
||||||
|
|
|
@ -57,6 +57,15 @@ class Storage:
|
||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def post_sync_failed(self, import_key, **kwargs): # type: (str, **dict) -> None
|
||||||
|
"""
|
||||||
|
This method is called after import process has finished with exception.
|
||||||
|
:param import_key: A key, returned by ClickHouseModel.get_import_key() method
|
||||||
|
:param kwargs: Storage dependant arguments
|
||||||
|
:return: None
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
def post_batch_removed(self, import_key, batch_size): # type: (str, int) -> None
|
def post_batch_removed(self, import_key, batch_size): # type: (str, int) -> None
|
||||||
"""
|
"""
|
||||||
This method marks that batch has been removed in statsd
|
This method marks that batch has been removed in statsd
|
||||||
|
@ -245,6 +254,12 @@ class RedisStorage(with_metaclass(SingletonMeta, Storage)):
|
||||||
|
|
||||||
logger.info('django-clickhouse: synced %d items (key: %s)' % (batch_size, import_key))
|
logger.info('django-clickhouse: synced %d items (key: %s)' % (batch_size, import_key))
|
||||||
|
|
||||||
|
def post_sync_failed(self, import_key, **kwargs):
|
||||||
|
# unblock lock after sync completed
|
||||||
|
lock_pid_key = self.REDIS_KEY_LOCK_PID.format(import_key=import_key)
|
||||||
|
self._redis.delete(lock_pid_key)
|
||||||
|
self.get_lock(import_key, **kwargs).release()
|
||||||
|
|
||||||
def flush(self):
|
def flush(self):
|
||||||
key_tpls = [
|
key_tpls = [
|
||||||
self.REDIS_KEY_RANK_TEMPLATE.format(import_key='*'),
|
self.REDIS_KEY_RANK_TEMPLATE.format(import_key='*'),
|
||||||
|
|
|
@ -14,9 +14,7 @@ from django_clickhouse.utils import get_tz_offset, format_datetime, lazy_class_i
|
||||||
|
|
||||||
class GetTZOffsetTest(TestCase):
|
class GetTZOffsetTest(TestCase):
|
||||||
def test_func(self):
|
def test_func(self):
|
||||||
ts = time.time()
|
self.assertEqual(300, get_tz_offset())
|
||||||
utc_offset = (datetime.datetime.fromtimestamp(ts) - datetime.datetime.utcfromtimestamp(ts)).total_seconds()
|
|
||||||
self.assertEqual(utc_offset / 60, get_tz_offset())
|
|
||||||
|
|
||||||
|
|
||||||
class FormatDateTimeTest(TestCase):
|
class FormatDateTimeTest(TestCase):
|
||||||
|
|
Loading…
Reference in New Issue
Block a user