1) Fixed SyncKillTest

2) Added some logs
This commit is contained in:
M1ha 2018-12-03 16:41:34 +05:00
parent b52bfaccae
commit 7e502ee8a8
8 changed files with 117 additions and 34 deletions

View File

@ -5,7 +5,6 @@ from typing import List, TypeVar, Type
from django.db.models import Model as DjangoModel from django.db.models import Model as DjangoModel
from infi.clickhouse_orm import engines as infi_engines from infi.clickhouse_orm import engines as infi_engines
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.models import Model as InfiModel from infi.clickhouse_orm.models import Model as InfiModel
from statsd.defaults.django import statsd from statsd.defaults.django import statsd
@ -98,7 +97,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
def set_obj_sign(self, obj, sign): # type: (InfiModel, int) -> None def set_obj_sign(self, obj, sign): # type: (InfiModel, int) -> None
""" """
Sets objects sign. By default gets attribute nmae from sign_col Sets objects sign. By default gets attribute name from sign_col
:return: None :return: None
""" """
setattr(obj, self.sign_col, sign) setattr(obj, self.sign_col, sign)

View File

@ -113,7 +113,8 @@ class Storage:
statsd_key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key) statsd_key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key)
statsd.gauge(statsd_key, len(pks), delta=True) statsd.gauge(statsd_key, len(pks), delta=True)
logger.debug('django-clickhouse: registered %d items (%s) to storage' % (len(pks), import_key)) logger.debug('django-clickhouse: registered %s on %d items (%s) to storage'
% (operation, len(pks), import_key))
return self.register_operations(import_key, operation, *pks) return self.register_operations(import_key, operation, *pks)
@ -147,7 +148,7 @@ class RedisStorage(Storage):
2) CLICKHOUSE_REDIS_CONFIG parameter defined. This should be a dict of kwargs for redis.StrictRedis(**kwargs). 2) CLICKHOUSE_REDIS_CONFIG parameter defined. This should be a dict of kwargs for redis.StrictRedis(**kwargs).
""" """
REDIS_KEY_OPS_TEMPLATE = 'clickhouse_sync:operations:{import_key}' REDIS_KEY_OPS_TEMPLATE = 'clickhouse_sync:operations:{import_key}'
REDIS_KEY_TS_TEMPLATE = 'clickhouse_sync:timstamp:{import_key}' REDIS_KEY_RANK_TEMPLATE = 'clickhouse_sync:timstamp:{import_key}'
REDIS_KEY_LOCK = 'clickhouse_sync:lock:{import_key}' REDIS_KEY_LOCK = 'clickhouse_sync:lock:{import_key}'
REDIS_KEY_LOCK_PID = 'clickhouse_sync:lock_pid:{import_key}' REDIS_KEY_LOCK_PID = 'clickhouse_sync:lock_pid:{import_key}'
REDIS_KEY_LAST_SYNC_TS = 'clickhouse_sync:last_sync:{import_key}' REDIS_KEY_LAST_SYNC_TS = 'clickhouse_sync:last_sync:{import_key}'
@ -180,8 +181,8 @@ class RedisStorage(Storage):
if res: if res:
ops, scores = zip(*res) ops, scores = zip(*res)
ts_key = self.REDIS_KEY_TS_TEMPLATE.format(import_key=import_key) rank_key = self.REDIS_KEY_RANK_TEMPLATE.format(import_key=import_key)
self._redis.set(ts_key, max(scores)) self._redis.set(rank_key, len(ops) - 1)
return list(tuple(op.decode().split(':')) for op in ops) return list(tuple(op.decode().split(':')) for op in ops)
else: else:
@ -210,7 +211,8 @@ class RedisStorage(Storage):
# Let's check if pid exists # Let's check if pid exists
pid = int(self._redis.get(lock_pid_key) or 0) pid = int(self._redis.get(lock_pid_key) or 0)
if pid and not check_pid(pid): if pid and not check_pid(pid):
logger.warning('django-clickhouse: hard releasing lock "%s" locked by pid %d' % (import_key, pid)) logger.warning('django-clickhouse: hard releasing lock "%s" locked by pid %d (process is dead)'
% (import_key, pid))
self._redis.delete(lock_pid_key) self._redis.delete(lock_pid_key)
lock.hard_release() lock.hard_release()
self.pre_sync(import_key, **kwargs) self.pre_sync(import_key, **kwargs)
@ -218,12 +220,12 @@ class RedisStorage(Storage):
raise raise
def post_sync(self, import_key, **kwargs): def post_sync(self, import_key, **kwargs):
ts_key = self.REDIS_KEY_TS_TEMPLATE.format(import_key=import_key) rank_key = self.REDIS_KEY_RANK_TEMPLATE.format(import_key=import_key)
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
score = self._redis.get(ts_key) top_rank = self._redis.get(rank_key)
if score: if top_rank:
res = self._redis.zremrangebyscore(ops_key, '-inf', float(score)) res = self._redis.zremrangebyrank(ops_key, 0, top_rank)
batch_size = int(res) batch_size = int(res)
else: else:
batch_size = 0 batch_size = 0
@ -239,7 +241,7 @@ class RedisStorage(Storage):
def flush(self): def flush(self):
key_tpls = [ key_tpls = [
self.REDIS_KEY_TS_TEMPLATE.format(import_key='*'), self.REDIS_KEY_RANK_TEMPLATE.format(import_key='*'),
self.REDIS_KEY_OPS_TEMPLATE.format(import_key='*'), self.REDIS_KEY_OPS_TEMPLATE.format(import_key='*'),
self.REDIS_KEY_LOCK.format(import_key='*'), self.REDIS_KEY_LOCK.format(import_key='*'),
self.REDIS_KEY_LAST_SYNC_TS.format(import_key='*') self.REDIS_KEY_LAST_SYNC_TS.format(import_key='*')

View File

@ -1,7 +1,7 @@
import datetime import datetime
import os import os
from itertools import chain from itertools import chain
from typing import Union, Any, Optional, TypeVar, Set, Dict, Iterable from typing import Union, Any, Optional, TypeVar, Set, Dict, Iterable, Tuple, Iterator
import pytz import pytz
import six import six
@ -134,3 +134,27 @@ def check_pid(pid):
return False return False
else: else:
return True return True
def int_ranges(items: Iterable[int]) -> Iterator[Tuple[int, int]]:
"""
Finds continuous intervals in integer iterable.
:param items: Items to search in
:return: Iterator over Tuple[start, end]
"""
interval_start = None
prev_item = None
for item in sorted(items):
if prev_item is None:
interval_start = prev_item = item
elif prev_item + 1 == item:
prev_item = item
else:
interval = interval_start, prev_item
interval_start = prev_item = item
yield interval
if interval_start is None:
raise StopIteration()
else:
yield interval_start, prev_item

View File

@ -8,6 +8,7 @@ from tests.models import TestModel
class ClickHouseTestModel(ClickHouseModel): class ClickHouseTestModel(ClickHouseModel):
django_model = TestModel django_model = TestModel
sync_delay = 2 sync_delay = 2
sync_enabled = True
id = fields.Int32Field() id = fields.Int32Field()
created_date = fields.DateField() created_date = fields.DateField()
@ -33,3 +34,5 @@ class ClickHouseCollapseTestModel(ClickHouseModel):
class ClickHouseMultiTestModel(ClickHouseMultiModel): class ClickHouseMultiTestModel(ClickHouseMultiModel):
django_model = TestModel django_model = TestModel
sub_models = [ClickHouseTestModel, ClickHouseCollapseTestModel] sub_models = [ClickHouseTestModel, ClickHouseCollapseTestModel]
sync_delay = 2
sync_enabled = True

View File

@ -1,3 +1,4 @@
import logging
import sys import sys
import argparse import argparse
@ -18,30 +19,40 @@ from django.db.models import F
from tests.clickhouse_models import ClickHouseCollapseTestModel from tests.clickhouse_models import ClickHouseCollapseTestModel
from tests.models import TestModel from tests.models import TestModel
logger = logging.getLogger('django-clickhouse')
def create(batch_size=1000, test_time=60, period=1, **kwargs): def create(batch_size=1000, test_time=60, period=1, **kwargs):
for iteration in range(int(test_time / period)): for iteration in range(int(test_time / period)):
TestModel.objects.db_manager('test_db').bulk_create([ res = TestModel.objects.db_manager('test_db').bulk_create([
TestModel(created_date='2018-01-01', value=iteration * batch_size + i) for i in range(batch_size) TestModel(created_date='2018-01-01', value=iteration * batch_size + i) for i in range(batch_size)
]) ])
logger.info('django-clickhouse: test created %d records' % len(res))
sleep(period) sleep(period)
def update(batch_size=1000, test_time=60, period=1, **kwargs): def update(batch_size=500, test_time=60, period=1, **kwargs):
for iteration in range(int(test_time / period)): for iteration in range(int(test_time / period)):
TestModel.objects.db_manager('test_db').filter(id__gte=iteration * batch_size).annotate(idmod10=F('id') % 10). \ updated = TestModel.objects.db_manager('test_db').\
filter(idmod10=0).update(value=-1) filter(value__gte=iteration * batch_size, value__lt=(iteration + 1) * batch_size).\
annotate(valmod10=F('value') % 10).filter(valmod10=0).update(value=-1)
logger.debug('django-clickhouse: test updated %d records' % updated)
sleep(period) sleep(period)
def delete(batch_size=1000, test_time=60, period=1, **kwargs): def delete(batch_size=500, test_time=60, period=1, **kwargs):
for iteration in range(int(test_time / period)): for iteration in range(int(test_time / period)):
TestModel.objects.db_manager('test_db').filter(id__gte=iteration * batch_size).annotate(idmod10=F('id') % 10). \ deleted, _ = TestModel.objects.db_manager('test_db'). \
filter(idmod10=1).delete() filter(value__gte=iteration * batch_size, value__lt=(iteration + 1) * batch_size). \
annotate(valmod10=F('value') % 10).filter(valmod10=1).delete()
logger.debug('django-clickhouse: test deleted %d records' % deleted)
sleep(period) sleep(period)
def sync(period=1, test_time=60, **kwargs): def sync(period=1, test_time=60, **kwargs):
if kwargs['once']:
ClickHouseCollapseTestModel.sync_batch_from_storage()
else:
start = datetime.datetime.now() start = datetime.datetime.now()
while (datetime.datetime.now() - start).total_seconds() < test_time: while (datetime.datetime.now() - start).total_seconds() < test_time:
ClickHouseCollapseTestModel.sync_batch_from_storage() ClickHouseCollapseTestModel.sync_batch_from_storage()
@ -52,8 +63,9 @@ if __name__ == '__main__':
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('process', type=str, choices=('sync', 'create', 'update', 'delete')) parser.add_argument('process', type=str, choices=('sync', 'create', 'update', 'delete'))
parser.add_argument('--test-time', type=int, required=False, default=60) parser.add_argument('--test-time', type=int, required=False, default=60)
parser.add_argument('--batch-size', type=str, required=False, default=1000) parser.add_argument('--batch-size', type=int, required=False, default=1000)
parser.add_argument('--period', type=str, required=False, default=1) parser.add_argument('--period', type=int, required=False, default=1)
parser.add_argument('--once', type=bool, required=False, default=False)
params = vars(parser.parse_args()) params = vars(parser.parse_args())
func_name = params['process'] func_name = params['process']

View File

@ -12,6 +12,7 @@ class ClickHouseModelTest(TestCase):
def test_need_sync(self): def test_need_sync(self):
# sync is disabled by default # sync is disabled by default
ClickHouseTestModel.sync_enabled = False
self.assertFalse(ClickHouseTestModel.need_sync()) self.assertFalse(ClickHouseTestModel.need_sync())
# There were no syncs. So it should be done # There were no syncs. So it should be done

View File

@ -2,7 +2,7 @@ import datetime
import logging import logging
from subprocess import Popen from subprocess import Popen
from time import sleep from time import sleep
from unittest import expectedFailure from unittest import expectedFailure, skip
import os import os
from django.test import TransactionTestCase from django.test import TransactionTestCase
@ -11,6 +11,7 @@ from random import randint
from django_clickhouse.database import connections from django_clickhouse.database import connections
from django_clickhouse.migrations import migrate_app from django_clickhouse.migrations import migrate_app
from django_clickhouse.utils import int_ranges
from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel, ClickHouseMultiTestModel from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel, ClickHouseMultiTestModel
from tests.models import TestModel from tests.models import TestModel
@ -101,7 +102,7 @@ class SyncTest(TransactionTestCase):
class KillTest(TransactionTestCase): class KillTest(TransactionTestCase):
TEST_TIME = 30 TEST_TIME = 60
maxDiff = None maxDiff = None
def setUp(self): def setUp(self):
@ -110,17 +111,39 @@ class KillTest(TransactionTestCase):
connections['default'].create_database() connections['default'].create_database()
migrate_app('tests', 'default') migrate_app('tests', 'default')
# Disable sync for not interesting models
ClickHouseMultiTestModel.sync_enabled = False
ClickHouseTestModel.sync_enabled = False
def tearDown(self):
# Disable sync for not interesting models
ClickHouseMultiTestModel.sync_enabled = True
ClickHouseTestModel.sync_enabled = True
def _check_data(self): def _check_data(self):
logger.debug('django-clickhouse: syncing left test data')
# Sync all data that is not synced # Sync all data that is not synced
# Data is expected to be in test_db, not default. So we need to call subprocess # Data is expected to be in test_db, not default. So we need to call subprocess
# in order everything works correctly # in order everything works correctly
while ClickHouseCollapseTestModel.get_storage().operations_count(ClickHouseCollapseTestModel.get_import_key()): import_key = ClickHouseCollapseTestModel.get_import_key()
storage = ClickHouseCollapseTestModel.get_storage()
sync_left = storage.operations_count(import_key)
while sync_left:
logger.debug('django-clickhouse: final sync (%d left)' % sync_left)
self.sync_iteration(False) self.sync_iteration(False)
sync_left = storage.operations_count(import_key)
ch_data = list(connections['default'].select('SELECT * FROM $table FINAL ORDER BY id', ch_data = list(connections['default'].select('SELECT * FROM $table FINAL ORDER BY id',
model_class=ClickHouseCollapseTestModel)) model_class=ClickHouseCollapseTestModel))
pg_data = list(TestModel.objects.all().order_by('id')) pg_data = list(TestModel.objects.all().order_by('id'))
if len(pg_data) != len(ch_data):
absent_ids = set(item.id for item in pg_data) - set(item.id for item in ch_data)
logger.debug('django_clickhouse: absent ranges: %s (min: %d, max: %d)'
% (','.join(('(%d, %d)' % r) for r in int_ranges(absent_ids)),
min(item.id for item in pg_data), max(item.id for item in pg_data)))
self.assertEqual(len(pg_data), len(ch_data)) self.assertEqual(len(pg_data), len(ch_data))
serializer = ClickHouseCollapseTestModel.get_django_model_serializer() serializer = ClickHouseCollapseTestModel.get_django_model_serializer()
self.assertListEqual(ch_data, [serializer.serialize(item) for item in pg_data]) self.assertListEqual(ch_data, [serializer.serialize(item) for item in pg_data])
@ -128,11 +151,15 @@ class KillTest(TransactionTestCase):
@classmethod @classmethod
def sync_iteration(cls, kill=True): def sync_iteration(cls, kill=True):
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py') test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
p_sync = Popen(['python3', test_script, 'sync', '--test-time', str(cls.TEST_TIME)]) if kill:
args = ['--test-time', str(cls.TEST_TIME)]
else:
args = ['--once', 'true']
p_sync = Popen(['python3', test_script, 'sync'] + args)
if kill: if kill:
sleep(randint(0, 5)) sleep(randint(0, 5))
logger.debug('django-clickhouse: killing: %d' % p_sync.pid) logger.debug('django-clickhouse: test killing: %d' % p_sync.pid)
p_sync.kill() p_sync.kill()
else: else:
p_sync.wait() p_sync.wait()
@ -140,7 +167,9 @@ class KillTest(TransactionTestCase):
def test_kills(self): def test_kills(self):
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py') test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
p_create = Popen(['python3', test_script, 'create', '--test-time', str(self.TEST_TIME)]) p_create = Popen(['python3', test_script, 'create', '--test-time', str(self.TEST_TIME)])
p_update = Popen(['python3', test_script, 'update', '--test-time', str(self.TEST_TIME)])
# Updates must be slower than inserts, or they will do nothing
p_update = Popen(['python3', test_script, 'update', '--test-time', str(self.TEST_TIME), '--batch-size', '500'])
start = now() start = now()
while (now() - start).total_seconds() < self.TEST_TIME: while (now() - start).total_seconds() < self.TEST_TIME:

View File

@ -1,10 +1,10 @@
import datetime import datetime
from unittest import TestCase
import pytz import pytz
from django.test import TestCase
from django_clickhouse.models import ClickHouseSyncModel from django_clickhouse.models import ClickHouseSyncModel
from django_clickhouse.utils import get_tz_offset, format_datetime, lazy_class_import from django_clickhouse.utils import get_tz_offset, format_datetime, lazy_class_import, int_ranges
class GetTZOffsetTest(TestCase): class GetTZOffsetTest(TestCase):
@ -54,3 +54,16 @@ class TestLazyClassImport(TestCase):
def test_cls(self): def test_cls(self):
self.assertEqual(ClickHouseSyncModel, lazy_class_import(ClickHouseSyncModel)) self.assertEqual(ClickHouseSyncModel, lazy_class_import(ClickHouseSyncModel))
class TestIntRanges(TestCase):
def test_simple(self):
self.assertListEqual([(1, 3), (5, 6), (8, 10)],
list(int_ranges([1, 2, 3, 5, 6, 8, 9, 10])))
def test_empty(self):
self.assertListEqual([], list(int_ranges([])))
def test_bounds(self):
self.assertListEqual([(1, 1), (5, 6), (10, 10)],
list(int_ranges([1, 5, 6, 10])))