1) Refactoring

2) Bug fixes
3) Simple tests on syncing MergeTree and CollapsingMergeTree
This commit is contained in:
M1ha 2018-11-16 15:16:36 +05:00
parent 4480c1ae1d
commit d4e9a705d1
10 changed files with 131 additions and 56 deletions

View File

@ -70,7 +70,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
@classmethod
def get_storage(cls):
return lazy_class_import(cls.sync_storage or config.SYNC_STORAGE)
return lazy_class_import(cls.sync_storage or config.SYNC_STORAGE)()
@classmethod
def get_sync_delay(cls):
@ -99,7 +99,8 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return True
def get_sync_objects(self, operations): # type: (List[Tuple[str, str]]) -> List[DjangoModel]
@classmethod
def get_sync_objects(cls, operations): # type: (List[Tuple[str, str]]) -> List[DjangoModel]
"""
Returns objects from main database to sync
:param operations: A list of operations to perform
@ -111,33 +112,38 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
pk_by_db[using].add(pk)
objs = chain(*(
self.django_model.objects.filter(pk__in=pk_set).using(using)
for using, pk_set in pk_by_db
cls.django_model.objects.filter(pk__in=pk_set).using(using)
for using, pk_set in pk_by_db.items()
))
return list(objs)
def sync_batch_from_storage(self):
@classmethod
def sync_batch_from_storage(cls):
"""
Gets one batch from storage and syncs it.
:return:
"""
storage = self.get_storage()
import_key = self.get_import_key()
conn = connections[self.sync_database_alias]
storage = cls.get_storage()
import_key = cls.get_import_key()
conn = connections[cls.sync_database_alias]
storage.pre_sync(import_key)
batch = storage.get_import_batch(import_key)
if batch is None:
operations = storage.get_operations(import_key, self.get_sync_batch_size())
import_objects = self.get_sync_objects(operations)
operations = storage.get_operations(import_key, cls.get_sync_batch_size())
import_objects = cls.get_sync_objects(operations)
batch = self.engine.get_insert_batch(self.__class__, conn, import_objects)
batch = cls.engine.get_insert_batch(cls, conn, import_objects)
if batch:
storage.write_import_batch(import_key, [obj.to_tsv() for obj in batch])
else:
pass # Previous import error, retry
if batch:
conn.insert(batch)
storage.post_sync(import_key)

View File

@ -7,6 +7,10 @@ from django.db.models import Model as DjangoModel
from infi.clickhouse_orm import engines as infi_engines
from infi.clickhouse_orm.database import Database
from django_clickhouse.database import connections
from .configuration import config
from .utils import lazy_class_import
T = TypeVar('T')
@ -50,10 +54,12 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
max_date = obj_date
obj_ids = [str(obj.id) for obj in objects]
query = "SELECT * FROM $table FINAL WHERE `%s` >= '%s' AND `%s` <= '%s', id IN (%s)" \
query = "SELECT * FROM $table FINAL WHERE `%s` >= '%s' AND `%s` <= '%s' AND id IN (%s)" \
% (self.date_col, min_date.isoformat(), self.date_col, max_date.isoformat(), ', '.join(obj_ids))
qs = model_cls.get_database().select(query, model_class=model_cls)
db_router = lazy_class_import(config.DATABASE_ROUTER)()
db = db_router.db_for_read(model_cls)
qs = connections[db].select(query, model_class=model_cls)
return list(qs)
def get_insert_batch(self, model_cls, database, objects):

View File

@ -160,12 +160,13 @@ class RedisStorage(Storage):
def get_import_batch(self, import_key, **kwargs):
batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key)
return tuple(item.decode() for item in self._redis.lrange(batch_key, 0, -1))
res = self._redis.lrange(batch_key, 0, -1)
return tuple(item.decode() for item in res) if res else None
def write_import_batch(self, import_key, batch, **kwargs):
batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key)
# Elements are pushed to the head, so we need to invert batch in order to save correct order
if batch:
batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key)
self._redis.lpush(batch_key, *reversed(batch))
def post_sync(self, import_key, **kwargs):
@ -173,9 +174,10 @@ class RedisStorage(Storage):
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key)
score = float(self._redis.get(ts_key))
score = self._redis.get(ts_key)
if score:
self._redis.pipeline()\
.zremrangebyscore(ops_key, '-inf', score)\
.zremrangebyscore(ops_key, '-inf', float(score))\
.delete(batch_key)\
.execute()

View File

@ -1,10 +1,11 @@
from infi.clickhouse_orm.migrations import CreateTable
from django_clickhouse import migrations
from tests.clickhouse_models import TestClickHouseModel
from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel
class Migration(migrations.Migration):
operations = [
CreateTable(TestClickHouseModel)
CreateTable(ClickHouseTestModel),
CreateTable(ClickHouseCollapseTestModel)
]

View File

@ -1,11 +1,11 @@
from django_clickhouse.clickhouse_models import ClickHouseModel
from django_clickhouse.engines import MergeTree
from django_clickhouse.engines import MergeTree, CollapsingMergeTree
from infi.clickhouse_orm import fields
from tests.models import TestModel
class TestClickHouseModel(ClickHouseModel):
class ClickHouseTestModel(ClickHouseModel):
django_model = TestModel
sync_delay = 5
@ -14,3 +14,15 @@ class TestClickHouseModel(ClickHouseModel):
value = fields.Int32Field()
engine = MergeTree('created_date', ('id',))
class ClickHouseCollapseTestModel(ClickHouseModel):
django_model = TestModel
sync_delay = 5
id = fields.Int32Field()
created_date = fields.DateField()
value = fields.Int32Field()
sign = fields.Int8Field()
engine = CollapsingMergeTree('created_date', ('id',), 'sign')

View File

@ -4,7 +4,7 @@ from infi.clickhouse_orm.migrations import MigrationHistory
from django_clickhouse.database import connections
from django_clickhouse.migrations import migrate_app
from django_clickhouse.routers import DefaultRouter
from tests.clickhouse_models import TestClickHouseModel
from tests.clickhouse_models import ClickHouseTestModel
class NoMigrateRouter(DefaultRouter):
@ -33,7 +33,7 @@ class MigrateAppTest(TestCase):
def test_migrate_app(self):
migrate_app('tests', 'default')
self.assertTrue(table_exists(self.db, TestClickHouseModel))
self.assertTrue(table_exists(self.db, ClickHouseTestModel))
self.assertEqual(1, self.db.count(MigrationHistory))
@ -44,4 +44,4 @@ class MigrateAppTest(TestCase):
@override_settings(CLICKHOUSE_DATABASE_ROUTER=NoMigrateRouter)
def test_router_not_allowed(self):
migrate_app('tests', 'default')
self.assertFalse(table_exists(self.db, TestClickHouseModel))
self.assertFalse(table_exists(self.db, ClickHouseTestModel))

View File

@ -2,7 +2,7 @@ import datetime
from django.test import TransactionTestCase
from tests.clickhouse_models import TestClickHouseModel
from tests.clickhouse_models import ClickHouseTestModel
from tests.models import TestModel
@ -25,24 +25,24 @@ class ClickHouseDjangoModelTest(TransactionTestCase):
instance = TestModel(created_date=datetime.date.today(), value=2)
instance.save()
self.assertListEqual([('insert', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
# UPDATE operation
instance.save()
self.assertListEqual([('insert', "default.%d" % instance.pk), ('update', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
def test_create(self):
instance = TestModel.objects.create(pk=100555, created_date=datetime.date.today(), value=2)
self.assertListEqual([('insert', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
def test_bulk_create(self):
items = [TestModel(created_date=datetime.date.today(), value=i) for i in range(5)]
items = TestModel.objects.bulk_create(items)
self.assertEqual(5, len(items))
self.assertListEqual([('insert', "default.%d" % instance.pk) for instance in items],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
def test_get_or_create(self):
instance, created = TestModel.objects. \
@ -50,63 +50,63 @@ class ClickHouseDjangoModelTest(TransactionTestCase):
self.assertTrue(created)
self.assertListEqual([('insert', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
instance, created = TestModel.objects. \
get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2})
self.assertFalse(created)
self.assertListEqual([('insert', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
def test_update_or_create(self):
instance, created = TestModel.objects. \
update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2})
self.assertTrue(created)
self.assertListEqual([('insert', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
instance, created = TestModel.objects. \
update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2})
self.assertFalse(created)
self.assertListEqual([('insert', "default.%d" % instance.pk), ('update', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
def test_qs_update(self):
TestModel.objects.filter(pk=1).update(created_date=datetime.date.today())
self.assertListEqual([('update', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.assertListEqual([('update', 'default.1')], self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
# Update, after which updated element will not suit update conditions
TestModel.objects.filter(created_date__lt=datetime.date.today()). \
update(created_date=datetime.date.today())
self.assertListEqual([('update', 'default.1'), ('update', 'default.2')],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
def test_qs_update_returning(self):
TestModel.objects.filter(pk=1).update_returning(created_date=datetime.date.today())
self.assertListEqual([('update', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.assertListEqual([('update', 'default.1')], self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
# Update, after which updated element will not suit update conditions
TestModel.objects.filter(created_date__lt=datetime.date.today()). \
update_returning(created_date=datetime.date.today())
self.assertListEqual([('update', 'default.1'), ('update', 'default.2')],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
def test_qs_delete_returning(self):
TestModel.objects.filter(pk=1).delete_returning()
self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
# Update, после которого исходный фильтр уже не сработает
TestModel.objects.filter(created_date__lt=datetime.date.today()).delete_returning()
self.assertListEqual([('delete', 'default.1'), ('delete', 'default.2')],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
def test_delete(self):
instance = TestModel.objects.get(pk=1)
instance.delete()
self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))
def test_qs_delete(self):
TestModel.objects.filter(pk=1).delete()
self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10))

View File

@ -3,7 +3,7 @@ import datetime
from django.test import TestCase
from django_clickhouse.serializers import Django2ClickHouseModelSerializer
from tests.clickhouse_models import TestClickHouseModel
from tests.clickhouse_models import ClickHouseTestModel
from tests.models import TestModel
@ -15,24 +15,24 @@ class Django2ClickHouseModelSerializerTest(TestCase):
def test_all(self):
serializer = Django2ClickHouseModelSerializer()
res = serializer.serialize(self.obj, TestClickHouseModel)
self.assertIsInstance(res, TestClickHouseModel)
res = serializer.serialize(self.obj, ClickHouseTestModel)
self.assertIsInstance(res, ClickHouseTestModel)
self.assertEqual(self.obj.id, res.id)
self.assertEqual(self.obj.value, res.value)
self.assertEqual(self.obj.created_date, res.created_date)
def test_fields(self):
serializer = Django2ClickHouseModelSerializer(fields=('value'))
res = serializer.serialize(self.obj, TestClickHouseModel)
self.assertIsInstance(res, TestClickHouseModel)
res = serializer.serialize(self.obj, ClickHouseTestModel)
self.assertIsInstance(res, ClickHouseTestModel)
self.assertEqual(0, res.id)
self.assertEqual(datetime.date(1970, 1, 1), res.created_date)
self.assertEqual(self.obj.value, res.value)
def test_exclude_fields(self):
serializer = Django2ClickHouseModelSerializer(exclude_fields=('created_date',))
res = serializer.serialize(self.obj, TestClickHouseModel)
self.assertIsInstance(res, TestClickHouseModel)
res = serializer.serialize(self.obj, ClickHouseTestModel)
self.assertIsInstance(res, ClickHouseTestModel)
self.assertEqual(datetime.date(1970, 1, 1), res.created_date)
self.assertEqual(self.obj.id, res.id)
self.assertEqual(self.obj.value, res.value)

View File

@ -59,4 +59,4 @@ class StorageTest(TestCase):
self.assertListEqual([
('insert', '100502')
], self.storage.get_operations('test', 10))
self.assertTupleEqual(tuple(), self.storage.get_import_batch('test'))
self.assertIsNone(self.storage.get_import_batch('test'))

48
tests/test_sync.py Normal file
View File

@ -0,0 +1,48 @@
import datetime
from django.test import TransactionTestCase
from django_clickhouse.database import connections
from django_clickhouse.migrations import migrate_app
from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel
from tests.models import TestModel
class SyncTest(TransactionTestCase):
def setUp(self):
self.db = connections['default']
self.db.drop_database()
self.db.db_exists = False
self.db.create_database()
migrate_app('tests', 'default')
def test_simple(self):
obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
ClickHouseTestModel.sync_batch_from_storage()
synced_data = list(ClickHouseTestModel.objects_in(connections['default']))
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
def test_collapsing_update(self):
obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
obj.value = 2
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
synced_data = list(ClickHouseCollapseTestModel.objects_in(connections['default']))
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
obj.value = 3
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)