diff --git a/src/django_clickhouse/clickhouse_models.py b/src/django_clickhouse/clickhouse_models.py index 427514e..0a7d86d 100644 --- a/src/django_clickhouse/clickhouse_models.py +++ b/src/django_clickhouse/clickhouse_models.py @@ -2,13 +2,17 @@ This file defines base abstract models to inherit from """ import datetime +from collections import defaultdict +from itertools import chain +from typing import List, Tuple -from django.utils.timezone import now +from django.db.models import Model as DjangoModel from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase -from typing import Set, Union from six import with_metaclass +from .db_clients import connections +from .serializers import Django2ClickHouseModelSerializer from .models import ClickHouseSyncModel from .utils import lazy_class_import from . import config @@ -32,10 +36,17 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): Base model for all other models """ django_model = None + django_model_serializer = Django2ClickHouseModelSerializer sync_batch_size = None sync_storage = None sync_delay = None + sync_database_alias = None + + @classmethod + def get_django_model_serializer(cls): + serializer_cls = lazy_class_import(cls.django_model_serializer) + return serializer_cls() @classmethod def get_sync_batch_size(cls): @@ -72,13 +83,22 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): return True - def import_batch(self): + def get_sync_objects(self, operations): # type: (List[Tuple[str, str]]) -> List[DjangoModel] """ - Imports batch to ClickHouse - :return: + Returns objects from main database to sync + :param operations: A list of operations to perform + :return: A list of django_model instances """ - pass + pk_by_db = defaultdict(set) + for op, pk_str in operations: + using, pk = pk_str.split('.') + pk_by_db[using].add(pk) + objs = chain(*( + self.django_model.objects.filter(pk__in=pk_set).using(using) + for using, pk_set in pk_by_db + )) + return list(objs) def sync_batch_from_storage(self): """ @@ -87,20 +107,21 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): """ storage = self.get_storage() import_key = self.get_import_key() - storage.pre_sync(import_key) - # 1) pre_sync() - # 2) get_import_batch(). If batch is present go to 5) - # 3) If batch is None, call get_operations() - # 4) Transform operations to batch and call write_import_batch() - # 5) Import batch to ClickHouse + conn = connections[self.sync_database_alias] + storage.pre_sync(import_key) batch = storage.get_import_batch(import_key) + if batch is None: operations = storage.get_operations(import_key, self.get_sync_batch_size()) - batch = self.engine.get_batch(operations) - storage.write_import_batch(import_key, batch) + import_objects = self.get_sync_objects(operations) - self.import_batch(batch) + batch = self.engine.get_insert_batch(self.__class__, conn, import_objects) + storage.write_import_batch(import_key, [obj.to_tsv() for obj in batch]) + else: + pass # Previous import error, retry + + conn.insert(batch) storage.post_sync(import_key) diff --git a/src/django_clickhouse/db.py b/src/django_clickhouse/db.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/django_clickhouse/db_clients.py b/src/django_clickhouse/db_clients.py new file mode 100644 index 0000000..db3d73f --- /dev/null +++ b/src/django_clickhouse/db_clients.py @@ -0,0 +1,23 @@ +from infi.clickhouse_orm.database import Database + +from .exceptions import DBAliasError +from .configuration import config + + +class ConnectionProxy: + _connections = {} + + def get_connection(self, alias): + if alias not in self._connections: + if alias not in config.DATABASES: + raise DBAliasError(alias) + + self._connections[alias] = Database(**config.DATABASES[alias]) + + return self._connections[alias] + + def __getattr__(self, item): + return self.get_connection(item) + + +connections = ConnectionProxy() diff --git a/src/django_clickhouse/engines.py b/src/django_clickhouse/engines.py new file mode 100644 index 0000000..27ec230 --- /dev/null +++ b/src/django_clickhouse/engines.py @@ -0,0 +1,86 @@ +""" +This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse +""" +from collections import defaultdict +from itertools import chain +from typing import List, Tuple, TypeVar, Type, Optional +from django.db.models import Model as DjangoModel + +from infi.clickhouse_orm import engines as infi_engines +from infi.clickhouse_orm.database import Database + +T = TypeVar('T') + + +class InsertOnlyEngineMixin: + def get_insert_batch(self, model_cls, database, objects): + # type: (Type[T], Database, List[DjangoModel]) -> List[T] + """ + Gets a list of model_cls instances to insert into database + :param model_cls: ClickHouseModel subclass to import + :param database: infi.clickhouse_orm Database instance to sync data with + :param objects: A list of django Model instances to sync + :return: A list of model_cls objects + """ + serializer = model_cls.get_django_model_serializer() + return [serializer.serialize(obj, model_cls) for obj in objects] + + +class MergeTree(InsertOnlyEngineMixin, infi_engines.MergeTree): + pass + + +class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTree): + def get_final_versions(self, model_cls, objects): + """ + Get objects, that are currently stored in ClickHouse. + Depending on the partition key this can be different for different models. + In common case, this method is optimized for date field that doesn't change. + It also supposes primary key to by id + :param model_cls: ClickHouseModel subclass to import + :param objects: Objects for which final versions are searched + :return: A list of + """ + min_date, max_date = None, None + for obj in objects: + obj_date = getattr(obj, self.date_col) + + if min_date is None or min_date > obj_date: + min_date = obj_date + + if max_date is None or max_date < obj_date: + max_date = obj_date + + obj_ids = [str(obj.id) for obj in objects] + query = "SELECT * FROM $table FINAL WHERE `%s` >= '%s' AND `%s` <= '%s', id IN (%s)" \ + % (self.date_col, min_date.isoformat(), self.date_col, max_date.isoformat(), ', '.join(obj_ids)) + + qs = model_cls.select(query, model_class=model_cls) + return list(qs) + + def get_insert_batch(self, model_cls, database, objects): + # type: (Type[T], Database, List[DjangoModel]) -> List[T] + """ + Gets a list of model_cls instances to insert into database + :param model_cls: ClickHouseModel subclass to import + :param database: infi.clickhouse_orm Database instance to sync data with + :param objects: A list of django Model instances to sync + :return: A list of model_cls objects + """ + new_objs = super(CollapsingMergeTree, self).get_insert_batch(model_cls, database, objects) + old_objs = self.get_final_versions(model_cls, new_objs) + + for obj in old_objs: + self.set_obj_sign(obj, -1) + + for obj in new_objs: + self.set_obj_sign(obj, 1) + + return old_objs + new_objs + + def set_obj_sign(self, obj, sign): # type: (InfiModel, int) -> None + """ + Sets objects sign. By default gets attribute nmae from sign_col + :return: None + """ + setattr(obj, self.sign_col, sign) diff --git a/src/django_clickhouse/exceptions.py b/src/django_clickhouse/exceptions.py index 161986f..3efdd9f 100644 --- a/src/django_clickhouse/exceptions.py +++ b/src/django_clickhouse/exceptions.py @@ -1,10 +1,13 @@ from .configuration import PREFIX -class ClickHouseError(Exception): - pass - class ConfigurationError(Exception): def __init__(self, param_name): param_name = PREFIX + param_name super(ConfigurationError, self).__init__("Config parameter '%s' is not set properly" % param_name) + + +class DBAliasError(Exception): + def __init__(self, alias): + super(DBAliasError, self).__init__( + "Database alias `%s` is not found. Check %s parameter" % (alias, PREFIX + 'DATABASES')) diff --git a/src/django_clickhouse/models.py b/src/django_clickhouse/models.py index 582b33c..5da580a 100644 --- a/src/django_clickhouse/models.py +++ b/src/django_clickhouse/models.py @@ -6,7 +6,7 @@ It saves all operations to storage in order to write them to ClickHouse later. from typing import Optional, Any, List, Type import six -from django.db import transaction +from django.db import transaction, DEFAULT_DB_ALIAS from django.db.models.signals import post_save, post_delete from django.dispatch import receiver from django.db.models import QuerySet as DjangoQuerySet, Manager as DjangoManager, Model as DjangoModel @@ -171,6 +171,8 @@ class ClickHouseSyncModel(DjangoModel): :param using: Database alias registered instances are from :return: None """ + model_pks = ['%s.%d' % (using or DEFAULT_DB_ALIAS, pk) for pk in model_pks] + def _on_commit(): for model_cls in cls.get_clickhouse_sync_models(): storage.register_operations_wrapped(model_cls.get_import_key(), operation, *model_pks) diff --git a/src/django_clickhouse/serializers.py b/src/django_clickhouse/serializers.py index 20b46f5..d8dedff 100644 --- a/src/django_clickhouse/serializers.py +++ b/src/django_clickhouse/serializers.py @@ -1,84 +1,20 @@ -from typing import Any -from django.db.models import DjangoModel +from typing import Type -from .clickhouse_models import ClickHouseModel +from django.db.models import Model as DjangoModel +from django.forms import model_to_dict -class Serializer: - def serialize(self, value: Any) -> Any: - pass +class Django2ClickHouseModelSerializer: + serialize_fields = None + exclude_serialize_fields = None + def serialize(self, obj, model_cls, **kwargs): + # type: (DjangoModel, Type['ClickHouseModel'], **dict) -> 'ClickHouseModel' + data = model_to_dict(obj, self.serialize_fields, self.exclude_serialize_fields) -class DefaultDjango2ClickHouseSerializer(Serializer): - @staticmethod - def _convert_none(values, fields_dict): - """ - ClickHouse не хранит значения NULL, потэтому для них сохраняются невалидные значения параметра. - Преобразует все значения, воспринимаемые как None в None - :param values: Словарь с данными, которые надо преобразовывать - :param fields_dict: Итерируемый объект имен полей, которые надо преобразовывать - :return: Преобразованный словарь - """ - assert isinstance(values, dict), "values parameter must be a dict instance" - result = values.copy() - for key in fields_dict: - if isinstance(values[key], datetime.date) and values[key] == datetime.date(1970, 1, 1) \ - or (isinstance(values[key], datetime.datetime) - and (values[key] in (datetime.datetime(1970, 1, 1, 0, 0, 0), - datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc)))) \ - or type(values[key]) is int and values[key] == 0 \ - or type(values[key]) is str and values[key] == '': - result[key] = None - return result + # Remove None values, they should be initialized as defaults + for key, value in data.items(): + if value is None: + del data[key] - @staticmethod - def _convert_bool(values, fields_dict): - """ - ClickHouse не хранит значения Bool, потэтому для них сохраняются UInt8. - :param values: Словарь с данными, которые надо преобразовывать - :param fields_dict: Итерируемый объект имен полей, которые надо преобразовывать - :return: Преобразованный словарь - """ - assert isinstance(values, dict), "values parameter must be a dict instance" - result = values.copy() - for key in fields_dict: - result[key] = bool(result[key]) - return result - - @staticmethod - def _enum_to_str(values): - """ - Преобразует все значения типа Enum в их строковые представления - :param values: Словарь с данными, которые надо преобразовывать - :return: Преобразованный словарь - """ - assert isinstance(values, dict), "values parameter must be a dict instance" - return {key: val.name if isinstance(val, Enum) else val for key, val in values.items()} - - @classmethod - def from_django_model(cls, obj): - """ - Создает объект модели ClickHouse из модели django - При переопределении метода желательно проверить аргументы, вызвав: - cls._validate_django_model_instance(obj) - :param obj: Объект модели django - :return: Объект модели ClickHouse или список таких объектов - """ - cls._validate_django_model_instance(obj) - raise ClickHouseError('Method "from_django_model" is not implemented') - - def to_django_model(self, obj=None): - """ - Конвертирует эту модель в объект модели django, если это возможно. - Если невозможно - должен поднять исключение. - :param obj: Если передан, то надо не создать новый объект модели django, а обновить существующий - :return: Объект модели django - """ - if obj is not None: - self._validate_django_model_instance(obj) - else: - self._validate_cls_attributes() - raise ClickHouseError('Method "to_django_model" is not implemented') - - def serialize(self, value: DjangoModel) -> ClickHouseModel: - pass \ No newline at end of file + return model_cls(**data) diff --git a/tests/clickhouse_models.py b/tests/clickhouse_models.py index 6de28da..0598764 100644 --- a/tests/clickhouse_models.py +++ b/tests/clickhouse_models.py @@ -1,7 +1,15 @@ from django_clickhouse.clickhouse_models import ClickHouseModel +from django_clickhouse.engines import MergeTree +from infi.clickhouse_orm import fields + from tests.models import TestModel class TestClickHouseModel(ClickHouseModel): django_model = TestModel sync_delay = 5 + + created_date = fields.DateField() + value = fields.UInt32Field() + + engine = MergeTree('created_Date') diff --git a/tests/test_models.py b/tests/test_models.py index 02f0086..41c7288 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -24,24 +24,24 @@ class ClickHouseDjangoModelTest(TransactionTestCase): # INSERT operation instance = TestModel(created_date=datetime.date.today(), value=2) instance.save() - self.assertListEqual([('insert', str(instance.pk))], + self.assertListEqual([('insert', "default.%d" % instance.pk)], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) # UPDATE operation instance.save() - self.assertListEqual([('insert', str(instance.pk)), ('update', str(instance.pk))], + self.assertListEqual([('insert', "default.%d" % instance.pk), ('update', "default.%d" % instance.pk)], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) def test_create(self): instance = TestModel.objects.create(pk=100555, created_date=datetime.date.today(), value=2) - self.assertListEqual([('insert', str(instance.pk))], + self.assertListEqual([('insert', "default.%d" % instance.pk)], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) def test_bulk_create(self): items = [TestModel(created_date=datetime.date.today(), value=i) for i in range(5)] items = TestModel.objects.bulk_create(items) self.assertEqual(5, len(items)) - self.assertListEqual([('insert', str(instance.pk)) for instance in items], + self.assertListEqual([('insert', "default.%d" % instance.pk) for instance in items], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) def test_get_or_create(self): @@ -49,64 +49,64 @@ class ClickHouseDjangoModelTest(TransactionTestCase): get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2}) self.assertTrue(created) - self.assertListEqual([('insert', str(instance.pk))], + self.assertListEqual([('insert', "default.%d" % instance.pk)], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) instance, created = TestModel.objects. \ get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2}) self.assertFalse(created) - self.assertListEqual([('insert', str(instance.pk))], + self.assertListEqual([('insert', "default.%d" % instance.pk)], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) def test_update_or_create(self): instance, created = TestModel.objects. \ update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2}) self.assertTrue(created) - self.assertListEqual([('insert', str(instance.pk))], + self.assertListEqual([('insert', "default.%d" % instance.pk)], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) instance, created = TestModel.objects. \ update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2}) self.assertFalse(created) - self.assertListEqual([('insert', str(instance.pk)), ('update', str(instance.pk))], + self.assertListEqual([('insert', "default.%d" % instance.pk), ('update', "default.%d" % instance.pk)], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) def test_qs_update(self): TestModel.objects.filter(pk=1).update(created_date=datetime.date.today()) - self.assertListEqual([('update', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.assertListEqual([('update', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) # Update, after which updated element will not suit update conditions TestModel.objects.filter(created_date__lt=datetime.date.today()). \ update(created_date=datetime.date.today()) - self.assertListEqual([('update', '1'), ('update', '2')], + self.assertListEqual([('update', 'default.1'), ('update', 'default.2')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) def test_qs_update_returning(self): TestModel.objects.filter(pk=1).update_returning(created_date=datetime.date.today()) - self.assertListEqual([('update', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.assertListEqual([('update', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) # Update, after which updated element will not suit update conditions TestModel.objects.filter(created_date__lt=datetime.date.today()). \ update_returning(created_date=datetime.date.today()) - self.assertListEqual([('update', '1'), ('update', '2')], + self.assertListEqual([('update', 'default.1'), ('update', 'default.2')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) def test_qs_delete_returning(self): TestModel.objects.filter(pk=1).delete_returning() - self.assertListEqual([('delete', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) # Update, после которого исходный фильтр уже не сработает TestModel.objects.filter(created_date__lt=datetime.date.today()).delete_returning() - self.assertListEqual([('delete', '1'), ('delete', '2')], + self.assertListEqual([('delete', 'default.1'), ('delete', 'default.2')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) def test_delete(self): instance = TestModel.objects.get(pk=1) instance.delete() - self.assertListEqual([('delete', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) def test_qs_delete(self): TestModel.objects.filter(pk=1).delete() - self.assertListEqual([('delete', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) + self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))