mirror of
https://github.com/carrotquest/django-clickhouse.git
synced 2024-11-25 10:33:47 +03:00
1) Fixed SyncKillTest
2) Added some logs
This commit is contained in:
parent
9742abbaec
commit
762c1aa10d
|
@ -175,6 +175,7 @@ class ClickHouseSyncModel(DjangoModel):
|
||||||
|
|
||||||
def _on_commit():
|
def _on_commit():
|
||||||
for model_cls in cls.get_clickhouse_sync_models():
|
for model_cls in cls.get_clickhouse_sync_models():
|
||||||
|
if model_cls.sync_enabled:
|
||||||
storage.register_operations_wrapped(model_cls.get_import_key(), operation, *model_pks)
|
storage.register_operations_wrapped(model_cls.get_import_key(), operation, *model_pks)
|
||||||
|
|
||||||
if len(model_pks) > 0:
|
if len(model_pks) > 0:
|
||||||
|
|
|
@ -26,16 +26,16 @@ class RedisLock:
|
||||||
self.lock.release()
|
self.lock.release()
|
||||||
|
|
||||||
def acquire(self):
|
def acquire(self):
|
||||||
logger.debug('Acquiring lock "%s" with pid %d' % (self.lock.name, os.getpid()))
|
logger.debug('django-clickhouse: acquiring lock "%s" with pid %d' % (self.lock.name, os.getpid()))
|
||||||
if self.lock.acquire():
|
if self.lock.acquire():
|
||||||
logger.debug('Acquired lock "%s" with pid %d' % (self.lock.name, os.getpid()))
|
logger.debug('django-clickhouse: acquired lock "%s" with pid %d' % (self.lock.name, os.getpid()))
|
||||||
return self
|
return self
|
||||||
else:
|
else:
|
||||||
logger.debug('Timeout lock "%s" with pid %d' % (self.lock.name, os.getpid()))
|
logger.warning('django-clickhouse: timeout lock "%s" with pid %d' % (self.lock.name, os.getpid()))
|
||||||
raise RedisLockTimeoutError()
|
raise RedisLockTimeoutError()
|
||||||
|
|
||||||
def release(self):
|
def release(self):
|
||||||
logger.debug('Releasing lock "%s" with pid %d' % (self.lock.name, os.getpid()))
|
logger.debug('django-clickhouse: releasing lock "%s" with pid %d' % (self.lock.name, os.getpid()))
|
||||||
self.lock.release()
|
self.lock.release()
|
||||||
|
|
||||||
def hard_release(self) -> bool:
|
def hard_release(self) -> bool:
|
||||||
|
@ -43,7 +43,7 @@ class RedisLock:
|
||||||
Drops the lock, not looking if it is acquired by anyone.
|
Drops the lock, not looking if it is acquired by anyone.
|
||||||
:return: Boolean - if lock has been acquired before releasing or not
|
:return: Boolean - if lock has been acquired before releasing or not
|
||||||
"""
|
"""
|
||||||
logger.debug('Hard releasing lock "%s" with pid %d' % (self.lock.name, os.getpid()))
|
logger.warning('django-clickhouse: hard releasing lock "%s" with pid %d' % (self.lock.name, os.getpid()))
|
||||||
return bool(self.lock.redis.delete(self.lock.name))
|
return bool(self.lock.redis.delete(self.lock.name))
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,6 @@ class Storage:
|
||||||
"""
|
"""
|
||||||
key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key)
|
key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key)
|
||||||
statsd.gauge(key, -batch_size, delta=True)
|
statsd.gauge(key, -batch_size, delta=True)
|
||||||
logger.debug('Removed %d items (%s) from storage' % (batch_size, import_key))
|
|
||||||
|
|
||||||
def operations_count(self, import_key, **kwargs):
|
def operations_count(self, import_key, **kwargs):
|
||||||
# type: (str, **dict) -> int
|
# type: (str, **dict) -> int
|
||||||
|
@ -114,7 +113,7 @@ class Storage:
|
||||||
|
|
||||||
statsd_key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key)
|
statsd_key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key)
|
||||||
statsd.gauge(statsd_key, len(pks), delta=True)
|
statsd.gauge(statsd_key, len(pks), delta=True)
|
||||||
logger.debug('Registered %d items (%s) to storage' % (len(pks), import_key))
|
logger.debug('django-clickhouse: registered %d items (%s) to storage' % (len(pks), import_key))
|
||||||
|
|
||||||
return self.register_operations(import_key, operation, *pks)
|
return self.register_operations(import_key, operation, *pks)
|
||||||
|
|
||||||
|
@ -209,9 +208,10 @@ class RedisStorage(Storage):
|
||||||
except RedisLockTimeoutError:
|
except RedisLockTimeoutError:
|
||||||
# Lock is busy. But If the process has been killed, I don't want to wait any more.
|
# Lock is busy. But If the process has been killed, I don't want to wait any more.
|
||||||
# Let's check if pid exists
|
# Let's check if pid exists
|
||||||
pid = int(self._redis.get(lock_pid_key))
|
pid = int(self._redis.get(lock_pid_key) or 0)
|
||||||
if not check_pid(pid):
|
if pid and not check_pid(pid):
|
||||||
logger.debug('Hard releasing lock "%s" locked by pid %d' % (import_key, pid))
|
logger.warning('django-clickhouse: hard releasing lock "%s" locked by pid %d' % (import_key, pid))
|
||||||
|
self._redis.delete(lock_pid_key)
|
||||||
lock.hard_release()
|
lock.hard_release()
|
||||||
self.pre_sync(import_key, **kwargs)
|
self.pre_sync(import_key, **kwargs)
|
||||||
else:
|
else:
|
||||||
|
@ -231,8 +231,12 @@ class RedisStorage(Storage):
|
||||||
self.post_batch_removed(import_key, batch_size)
|
self.post_batch_removed(import_key, batch_size)
|
||||||
|
|
||||||
# unblock lock after sync completed
|
# unblock lock after sync completed
|
||||||
|
lock_pid_key = self.REDIS_KEY_LOCK_PID.format(import_key=import_key)
|
||||||
|
self._redis.delete(lock_pid_key)
|
||||||
self.get_lock(import_key, **kwargs).release()
|
self.get_lock(import_key, **kwargs).release()
|
||||||
|
|
||||||
|
logger.info('django-clickhouse: synced %d items (key: %s)' % (batch_size, import_key))
|
||||||
|
|
||||||
def flush(self):
|
def flush(self):
|
||||||
key_tpls = [
|
key_tpls = [
|
||||||
self.REDIS_KEY_TS_TEMPLATE.format(import_key='*'),
|
self.REDIS_KEY_TS_TEMPLATE.format(import_key='*'),
|
||||||
|
|
|
@ -20,6 +20,7 @@ class ClickHouseTestModel(ClickHouseModel):
|
||||||
class ClickHouseCollapseTestModel(ClickHouseModel):
|
class ClickHouseCollapseTestModel(ClickHouseModel):
|
||||||
django_model = TestModel
|
django_model = TestModel
|
||||||
sync_delay = 2
|
sync_delay = 2
|
||||||
|
sync_enabled = True
|
||||||
|
|
||||||
id = fields.Int32Field()
|
id = fields.Int32Field()
|
||||||
created_date = fields.DateField()
|
created_date = fields.DateField()
|
||||||
|
|
|
@ -1,12 +1,9 @@
|
||||||
import datetime
|
import datetime
|
||||||
import signal
|
|
||||||
from multiprocessing import Process
|
|
||||||
from subprocess import Popen
|
from subprocess import Popen
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from unittest import skip, expectedFailure
|
from unittest import expectedFailure
|
||||||
|
|
||||||
import os
|
import os
|
||||||
from django.db.models import F
|
|
||||||
from django.test import TransactionTestCase
|
from django.test import TransactionTestCase
|
||||||
from django.utils.timezone import now
|
from django.utils.timezone import now
|
||||||
from random import randint
|
from random import randint
|
||||||
|
@ -113,8 +110,10 @@ class KillTest(TransactionTestCase):
|
||||||
|
|
||||||
def _check_data(self):
|
def _check_data(self):
|
||||||
# Sync all data that is not synced
|
# Sync all data that is not synced
|
||||||
|
# Data is expected to be in test_db, not default. So we need to call subprocess
|
||||||
|
# in order everything works correctly
|
||||||
while ClickHouseCollapseTestModel.get_storage().operations_count(ClickHouseCollapseTestModel.get_import_key()):
|
while ClickHouseCollapseTestModel.get_storage().operations_count(ClickHouseCollapseTestModel.get_import_key()):
|
||||||
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
self.sync_iteration(False)
|
||||||
|
|
||||||
ch_data = list(connections['default'].select('SELECT * FROM $table FINAL ORDER BY id',
|
ch_data = list(connections['default'].select('SELECT * FROM $table FINAL ORDER BY id',
|
||||||
model_class=ClickHouseCollapseTestModel))
|
model_class=ClickHouseCollapseTestModel))
|
||||||
|
@ -125,23 +124,27 @@ class KillTest(TransactionTestCase):
|
||||||
self.assertListEqual(ch_data, [serializer.serialize(item) for item in pg_data])
|
self.assertListEqual(ch_data, [serializer.serialize(item) for item in pg_data])
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def sync_iteration(cls):
|
def sync_iteration(cls, kill=True):
|
||||||
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
|
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
|
||||||
p_sync = Popen(['python3', test_script, 'sync', '--test-time', str(cls.TEST_TIME)])
|
p_sync = Popen(['python3', test_script, 'sync', '--test-time', str(cls.TEST_TIME)])
|
||||||
|
|
||||||
|
if kill:
|
||||||
sleep(randint(0, 5))
|
sleep(randint(0, 5))
|
||||||
print('Killing: %d' % p_sync.pid)
|
print('Killing: %d' % p_sync.pid)
|
||||||
p_sync.kill()
|
p_sync.kill()
|
||||||
|
else:
|
||||||
|
p_sync.wait()
|
||||||
|
|
||||||
def test_kills(self):
|
def test_kills(self):
|
||||||
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
|
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
|
||||||
p_create = Popen(['python3', test_script, 'create', '--test-time', str(self.TEST_TIME)])
|
p_create = Popen(['python3', test_script, 'create', '--test-time', str(self.TEST_TIME)])
|
||||||
p_update = Popen(['python3', test_script, 'update', '--test-time', str(self.TEST_TIME)])
|
# p_update = Popen(['python3', test_script, 'update', '--test-time', str(self.TEST_TIME)])
|
||||||
|
|
||||||
start = now()
|
start = now()
|
||||||
while (now() - start).total_seconds() < self.TEST_TIME:
|
while (now() - start).total_seconds() < self.TEST_TIME:
|
||||||
self.sync_iteration()
|
self.sync_iteration()
|
||||||
|
|
||||||
p_create.wait()
|
p_create.wait()
|
||||||
p_update.wait()
|
# p_update.wait()
|
||||||
|
|
||||||
self._check_data()
|
self._check_data()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user