From d4e9a705d112b67895a05e1956ed3dd3cef5b45e Mon Sep 17 00:00:00 2001 From: M1ha Date: Fri, 16 Nov 2018 15:16:36 +0500 Subject: [PATCH] 1) Refactoring 2) Bug fixes 3) Simple tests on syncing MergeTree and CollapsingMergeTree --- src/django_clickhouse/clickhouse_models.py | 32 ++++++++------ src/django_clickhouse/engines.py | 10 ++++- src/django_clickhouse/storages.py | 20 +++++---- tests/clickhouse_migrations/0001_initial.py | 5 ++- tests/clickhouse_models.py | 16 ++++++- tests/test_migrations.py | 6 +-- tests/test_models.py | 34 +++++++-------- tests/test_serializers.py | 14 +++--- tests/test_storages.py | 2 +- tests/test_sync.py | 48 +++++++++++++++++++++ 10 files changed, 131 insertions(+), 56 deletions(-) create mode 100644 tests/test_sync.py diff --git a/src/django_clickhouse/clickhouse_models.py b/src/django_clickhouse/clickhouse_models.py index df1774f..33fef2e 100644 --- a/src/django_clickhouse/clickhouse_models.py +++ b/src/django_clickhouse/clickhouse_models.py @@ -70,7 +70,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): @classmethod def get_storage(cls): - return lazy_class_import(cls.sync_storage or config.SYNC_STORAGE) + return lazy_class_import(cls.sync_storage or config.SYNC_STORAGE)() @classmethod def get_sync_delay(cls): @@ -99,7 +99,8 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): return True - def get_sync_objects(self, operations): # type: (List[Tuple[str, str]]) -> List[DjangoModel] + @classmethod + def get_sync_objects(cls, operations): # type: (List[Tuple[str, str]]) -> List[DjangoModel] """ Returns objects from main database to sync :param operations: A list of operations to perform @@ -111,33 +112,38 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): pk_by_db[using].add(pk) objs = chain(*( - self.django_model.objects.filter(pk__in=pk_set).using(using) - for using, pk_set in pk_by_db + cls.django_model.objects.filter(pk__in=pk_set).using(using) + for using, pk_set in pk_by_db.items() )) return list(objs) - def sync_batch_from_storage(self): + @classmethod + def sync_batch_from_storage(cls): """ Gets one batch from storage and syncs it. :return: """ - storage = self.get_storage() - import_key = self.get_import_key() - conn = connections[self.sync_database_alias] + storage = cls.get_storage() + import_key = cls.get_import_key() + conn = connections[cls.sync_database_alias] storage.pre_sync(import_key) batch = storage.get_import_batch(import_key) if batch is None: - operations = storage.get_operations(import_key, self.get_sync_batch_size()) - import_objects = self.get_sync_objects(operations) + operations = storage.get_operations(import_key, cls.get_sync_batch_size()) + import_objects = cls.get_sync_objects(operations) - batch = self.engine.get_insert_batch(self.__class__, conn, import_objects) - storage.write_import_batch(import_key, [obj.to_tsv() for obj in batch]) + batch = cls.engine.get_insert_batch(cls, conn, import_objects) + + if batch: + storage.write_import_batch(import_key, [obj.to_tsv() for obj in batch]) else: pass # Previous import error, retry - conn.insert(batch) + if batch: + conn.insert(batch) + storage.post_sync(import_key) diff --git a/src/django_clickhouse/engines.py b/src/django_clickhouse/engines.py index db0a931..dbf1761 100644 --- a/src/django_clickhouse/engines.py +++ b/src/django_clickhouse/engines.py @@ -7,6 +7,10 @@ from django.db.models import Model as DjangoModel from infi.clickhouse_orm import engines as infi_engines from infi.clickhouse_orm.database import Database +from django_clickhouse.database import connections +from .configuration import config +from .utils import lazy_class_import + T = TypeVar('T') @@ -50,10 +54,12 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre max_date = obj_date obj_ids = [str(obj.id) for obj in objects] - query = "SELECT * FROM $table FINAL WHERE `%s` >= '%s' AND `%s` <= '%s', id IN (%s)" \ + query = "SELECT * FROM $table FINAL WHERE `%s` >= '%s' AND `%s` <= '%s' AND id IN (%s)" \ % (self.date_col, min_date.isoformat(), self.date_col, max_date.isoformat(), ', '.join(obj_ids)) - qs = model_cls.get_database().select(query, model_class=model_cls) + db_router = lazy_class_import(config.DATABASE_ROUTER)() + db = db_router.db_for_read(model_cls) + qs = connections[db].select(query, model_class=model_cls) return list(qs) def get_insert_batch(self, model_cls, database, objects): diff --git a/src/django_clickhouse/storages.py b/src/django_clickhouse/storages.py index 4c0eb20..ae5c7da 100644 --- a/src/django_clickhouse/storages.py +++ b/src/django_clickhouse/storages.py @@ -160,24 +160,26 @@ class RedisStorage(Storage): def get_import_batch(self, import_key, **kwargs): batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key) - return tuple(item.decode() for item in self._redis.lrange(batch_key, 0, -1)) + res = self._redis.lrange(batch_key, 0, -1) + return tuple(item.decode() for item in res) if res else None def write_import_batch(self, import_key, batch, **kwargs): - batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key) - # Elements are pushed to the head, so we need to invert batch in order to save correct order - self._redis.lpush(batch_key, *reversed(batch)) + if batch: + batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key) + self._redis.lpush(batch_key, *reversed(batch)) def post_sync(self, import_key, **kwargs): ts_key = self.REDIS_KEY_TS_TEMPLATE.format(import_key=import_key) ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) batch_key = self.REDIS_KEY_BATCH_TEMPLATE.format(import_key=import_key) - score = float(self._redis.get(ts_key)) - self._redis.pipeline()\ - .zremrangebyscore(ops_key, '-inf', score)\ - .delete(batch_key)\ - .execute() + score = self._redis.get(ts_key) + if score: + self._redis.pipeline()\ + .zremrangebyscore(ops_key, '-inf', float(score))\ + .delete(batch_key)\ + .execute() def flush(self): key_tpls = [ diff --git a/tests/clickhouse_migrations/0001_initial.py b/tests/clickhouse_migrations/0001_initial.py index 007111c..15529a5 100644 --- a/tests/clickhouse_migrations/0001_initial.py +++ b/tests/clickhouse_migrations/0001_initial.py @@ -1,10 +1,11 @@ from infi.clickhouse_orm.migrations import CreateTable from django_clickhouse import migrations -from tests.clickhouse_models import TestClickHouseModel +from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel class Migration(migrations.Migration): operations = [ - CreateTable(TestClickHouseModel) + CreateTable(ClickHouseTestModel), + CreateTable(ClickHouseCollapseTestModel) ] diff --git a/tests/clickhouse_models.py b/tests/clickhouse_models.py index a2af22d..8681ef3 100644 --- a/tests/clickhouse_models.py +++ b/tests/clickhouse_models.py @@ -1,11 +1,11 @@ from django_clickhouse.clickhouse_models import ClickHouseModel -from django_clickhouse.engines import MergeTree +from django_clickhouse.engines import MergeTree, CollapsingMergeTree from infi.clickhouse_orm import fields from tests.models import TestModel -class TestClickHouseModel(ClickHouseModel): +class ClickHouseTestModel(ClickHouseModel): django_model = TestModel sync_delay = 5 @@ -14,3 +14,15 @@ class TestClickHouseModel(ClickHouseModel): value = fields.Int32Field() engine = MergeTree('created_date', ('id',)) + + +class ClickHouseCollapseTestModel(ClickHouseModel): + django_model = TestModel + sync_delay = 5 + + id = fields.Int32Field() + created_date = fields.DateField() + value = fields.Int32Field() + sign = fields.Int8Field() + + engine = CollapsingMergeTree('created_date', ('id',), 'sign') \ No newline at end of file diff --git a/tests/test_migrations.py b/tests/test_migrations.py index 5e81fca..3e5693b 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -4,7 +4,7 @@ from infi.clickhouse_orm.migrations import MigrationHistory from django_clickhouse.database import connections from django_clickhouse.migrations import migrate_app from django_clickhouse.routers import DefaultRouter -from tests.clickhouse_models import TestClickHouseModel +from tests.clickhouse_models import ClickHouseTestModel class NoMigrateRouter(DefaultRouter): @@ -33,7 +33,7 @@ class MigrateAppTest(TestCase): def test_migrate_app(self): migrate_app('tests', 'default') - self.assertTrue(table_exists(self.db, TestClickHouseModel)) + self.assertTrue(table_exists(self.db, ClickHouseTestModel)) self.assertEqual(1, self.db.count(MigrationHistory)) @@ -44,4 +44,4 @@ class MigrateAppTest(TestCase): @override_settings(CLICKHOUSE_DATABASE_ROUTER=NoMigrateRouter) def test_router_not_allowed(self): migrate_app('tests', 'default') - self.assertFalse(table_exists(self.db, TestClickHouseModel)) + self.assertFalse(table_exists(self.db, ClickHouseTestModel)) diff --git a/tests/test_models.py b/tests/test_models.py index 41c7288..c517b39 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -2,7 +2,7 @@ import datetime from django.test import TransactionTestCase -from tests.clickhouse_models import TestClickHouseModel +from tests.clickhouse_models import ClickHouseTestModel from tests.models import TestModel @@ -25,24 +25,24 @@ class ClickHouseDjangoModelTest(TransactionTestCase): instance = TestModel(created_date=datetime.date.today(), value=2) instance.save() self.assertListEqual([('insert', "default.%d" % instance.pk)], - self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) # UPDATE operation instance.save() self.assertListEqual([('insert', "default.%d" % instance.pk), ('update', "default.%d" % instance.pk)], - self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) def test_create(self): instance = TestModel.objects.create(pk=100555, created_date=datetime.date.today(), value=2) self.assertListEqual([('insert', "default.%d" % instance.pk)], - self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) def test_bulk_create(self): items = [TestModel(created_date=datetime.date.today(), value=i) for i in range(5)] items = TestModel.objects.bulk_create(items) self.assertEqual(5, len(items)) self.assertListEqual([('insert', "default.%d" % instance.pk) for instance in items], - self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) def test_get_or_create(self): instance, created = TestModel.objects. \ @@ -50,63 +50,63 @@ class ClickHouseDjangoModelTest(TransactionTestCase): self.assertTrue(created) self.assertListEqual([('insert', "default.%d" % instance.pk)], - self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) instance, created = TestModel.objects. \ get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2}) self.assertFalse(created) self.assertListEqual([('insert', "default.%d" % instance.pk)], - self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) def test_update_or_create(self): instance, created = TestModel.objects. \ update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2}) self.assertTrue(created) self.assertListEqual([('insert', "default.%d" % instance.pk)], - self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) instance, created = TestModel.objects. \ update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2}) self.assertFalse(created) self.assertListEqual([('insert', "default.%d" % instance.pk), ('update', "default.%d" % instance.pk)], - self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) def test_qs_update(self): TestModel.objects.filter(pk=1).update(created_date=datetime.date.today()) - self.assertListEqual([('update', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.assertListEqual([('update', 'default.1')], self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) # Update, after which updated element will not suit update conditions TestModel.objects.filter(created_date__lt=datetime.date.today()). \ update(created_date=datetime.date.today()) self.assertListEqual([('update', 'default.1'), ('update', 'default.2')], - self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) def test_qs_update_returning(self): TestModel.objects.filter(pk=1).update_returning(created_date=datetime.date.today()) - self.assertListEqual([('update', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.assertListEqual([('update', 'default.1')], self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) # Update, after which updated element will not suit update conditions TestModel.objects.filter(created_date__lt=datetime.date.today()). \ update_returning(created_date=datetime.date.today()) self.assertListEqual([('update', 'default.1'), ('update', 'default.2')], - self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) def test_qs_delete_returning(self): TestModel.objects.filter(pk=1).delete_returning() - self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) # Update, после которого исходный фильтр уже не сработает TestModel.objects.filter(created_date__lt=datetime.date.today()).delete_returning() self.assertListEqual([('delete', 'default.1'), ('delete', 'default.2')], - self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) def test_delete(self): instance = TestModel.objects.get(pk=1) instance.delete() - self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) def test_qs_delete(self): TestModel.objects.filter(pk=1).delete() - self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(ClickHouseTestModel.get_import_key(), 10)) diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 4ad66bd..56ddb35 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -3,7 +3,7 @@ import datetime from django.test import TestCase from django_clickhouse.serializers import Django2ClickHouseModelSerializer -from tests.clickhouse_models import TestClickHouseModel +from tests.clickhouse_models import ClickHouseTestModel from tests.models import TestModel @@ -15,24 +15,24 @@ class Django2ClickHouseModelSerializerTest(TestCase): def test_all(self): serializer = Django2ClickHouseModelSerializer() - res = serializer.serialize(self.obj, TestClickHouseModel) - self.assertIsInstance(res, TestClickHouseModel) + res = serializer.serialize(self.obj, ClickHouseTestModel) + self.assertIsInstance(res, ClickHouseTestModel) self.assertEqual(self.obj.id, res.id) self.assertEqual(self.obj.value, res.value) self.assertEqual(self.obj.created_date, res.created_date) def test_fields(self): serializer = Django2ClickHouseModelSerializer(fields=('value')) - res = serializer.serialize(self.obj, TestClickHouseModel) - self.assertIsInstance(res, TestClickHouseModel) + res = serializer.serialize(self.obj, ClickHouseTestModel) + self.assertIsInstance(res, ClickHouseTestModel) self.assertEqual(0, res.id) self.assertEqual(datetime.date(1970, 1, 1), res.created_date) self.assertEqual(self.obj.value, res.value) def test_exclude_fields(self): serializer = Django2ClickHouseModelSerializer(exclude_fields=('created_date',)) - res = serializer.serialize(self.obj, TestClickHouseModel) - self.assertIsInstance(res, TestClickHouseModel) + res = serializer.serialize(self.obj, ClickHouseTestModel) + self.assertIsInstance(res, ClickHouseTestModel) self.assertEqual(datetime.date(1970, 1, 1), res.created_date) self.assertEqual(self.obj.id, res.id) self.assertEqual(self.obj.value, res.value) diff --git a/tests/test_storages.py b/tests/test_storages.py index f88bcf6..6464898 100644 --- a/tests/test_storages.py +++ b/tests/test_storages.py @@ -59,4 +59,4 @@ class StorageTest(TestCase): self.assertListEqual([ ('insert', '100502') ], self.storage.get_operations('test', 10)) - self.assertTupleEqual(tuple(), self.storage.get_import_batch('test')) + self.assertIsNone(self.storage.get_import_batch('test')) diff --git a/tests/test_sync.py b/tests/test_sync.py new file mode 100644 index 0000000..d8c9939 --- /dev/null +++ b/tests/test_sync.py @@ -0,0 +1,48 @@ +import datetime + +from django.test import TransactionTestCase + +from django_clickhouse.database import connections +from django_clickhouse.migrations import migrate_app +from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel +from tests.models import TestModel + + +class SyncTest(TransactionTestCase): + def setUp(self): + self.db = connections['default'] + self.db.drop_database() + self.db.db_exists = False + self.db.create_database() + migrate_app('tests', 'default') + + def test_simple(self): + obj = TestModel.objects.create(value=1, created_date=datetime.date.today()) + ClickHouseTestModel.sync_batch_from_storage() + + synced_data = list(ClickHouseTestModel.objects_in(connections['default'])) + self.assertEqual(1, len(synced_data)) + self.assertEqual(obj.created_date, synced_data[0].created_date) + self.assertEqual(obj.value, synced_data[0].value) + self.assertEqual(obj.id, synced_data[0].id) + + def test_collapsing_update(self): + obj = TestModel.objects.create(value=1, created_date=datetime.date.today()) + obj.value = 2 + obj.save() + ClickHouseCollapseTestModel.sync_batch_from_storage() + + synced_data = list(ClickHouseCollapseTestModel.objects_in(connections['default'])) + self.assertEqual(1, len(synced_data)) + self.assertEqual(obj.created_date, synced_data[0].created_date) + self.assertEqual(obj.value, synced_data[0].value) + self.assertEqual(obj.id, synced_data[0].id) + + obj.value = 3 + obj.save() + ClickHouseCollapseTestModel.sync_batch_from_storage() + + synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel)) + self.assertEqual(obj.created_date, synced_data[0].created_date) + self.assertEqual(obj.value, synced_data[0].value) + self.assertEqual(obj.id, synced_data[0].id)