diff --git a/src/django_clickhouse/clickhouse_models.py b/src/django_clickhouse/clickhouse_models.py index b63bbd0..e3a632f 100644 --- a/src/django_clickhouse/clickhouse_models.py +++ b/src/django_clickhouse/clickhouse_models.py @@ -312,7 +312,7 @@ class ClickHouseModel(InfiModel): if len(model_ids) > 0: if cls.sync_type == 'redis': - cls.django_model.register_clickhouse_operation('INSERT', *model_ids, database=(database or 'default')) + cls.django_model.register_clickhouse_operations('INSERT', *model_ids, database=(database or 'default')) else: # if self.sync_type == 'postgres' from utils.models import ClickHouseModelOperation ClickHouseModelOperation.objects.bulk_update_or_create([ @@ -707,7 +707,7 @@ class ClickHouseCollapseModel(ClickHouseModel): if len(model_ids) > 0: if cls.sync_type == 'redis': - cls.django_model.register_clickhouse_operation('UPDATE', *list(model_ids), database=database) + cls.django_model.register_clickhouse_operations('UPDATE', *list(model_ids), database=database) else: # if self.sync_type == 'postgres' from utils.models import ClickHouseModelOperation ClickHouseModelOperation.objects.bulk_update_or_create([ diff --git a/src/django_clickhouse/configuration.py b/src/django_clickhouse/configuration.py index 7f0f406..a303c54 100644 --- a/src/django_clickhouse/configuration.py +++ b/src/django_clickhouse/configuration.py @@ -16,7 +16,7 @@ DEFAULTS = { 'SYNC_STORAGE': 'django_clickhouse.storage.DBStorage', 'SYNC_DELAY': 5, 'REDIS_CONFIG': None, - 'STATSD_PREFIX': 'clickhouse' + 'STATSD_PREFIX': 'clickhouse', } diff --git a/src/django_clickhouse/models.py b/src/django_clickhouse/models.py index 9a95484..15b9bd1 100644 --- a/src/django_clickhouse/models.py +++ b/src/django_clickhouse/models.py @@ -1,151 +1,187 @@ +""" +This file contains base django model to be synced with ClickHouse. +It saves all operations to storage in order to write them to ClickHouse later. +""" + +from typing import Optional, Any, List + +import six +from django.db import transaction from django.db.models.signals import post_save, post_delete from django.dispatch import receiver from django.db.models import QuerySet as DjangoQuerySet, Manager as DjangoManager, Model as DjangoModel -class ClickHouseDjangoModelQuerySet(DjangoQuerySet): +from .configuration import config +from .storage import Storage +from .utils import lazy_class_import + + +try: + from django_pg_returning.manager import UpdateReturningMixin +except ImportError: + class UpdateReturningMixin: + pass + + +try: + from django_pg_bulk_update.manager import BulkUpdateManagerMixin +except ImportError: + class BulkUpdateManagerMixin: + pass + + +class ClickHouseSyncUpdateReturningQuerySetMixin(UpdateReturningMixin): """ - Переопределяет update, чтобы он сгенерировал данные для обновления ClickHouse + This mixin adopts methods of django-pg-returning library """ - def __init__(self, *args, **kwargs): - super(ClickHouseDjangoModelQuerySet, self).__init__(*args, **kwargs) + def _register_ops(self, result): + pk_name = self.model._meta.pk.name + pk_list = result.values_list(pk_name, flat=True) + self.model.register_clickhouse_operations('update', *pk_list, using=self.db) + def update_returning(self, **updates): + result = super().update_returning(**updates) + self._register_ops(result) + return result + + def delete_returning(self): + result = super().delete_returning() + self._register_ops(result) + return result + + +class ClickHouseSyncBulkUpdateManagerMixin(BulkUpdateManagerMixin): + def _update_returning_param(self, returning): + pk_name = self.model._meta.pk.name + if returning is None: + returning = pk_name + elif isinstance(returning, six.string_types): + returning = [pk_name, returning] + else: + returning = list(returning) + [pk_name] + + return returning + + def _register_ops(self, result): + pk_name = self.model._meta.pk.name + pk_list = [getattr(item, pk_name) for item in result] + self.model.register_clickhouse_operations('update', *pk_list, using=self.db) + + def bulk_update(self, *args, **kwargs): + original_returning = kwargs.pop('returning', None) + kwargs['returning'] = self._update_returning_param(original_returning) + result = super().bulk_update(*args, **kwargs) + self._register_ops(result) + return result.count() if original_returning is None else result + + def bulk_update_or_create(self, *args, **kwargs): + original_returning = kwargs.pop('returning', None) + kwargs['returning'] = self._update_returning_param(original_returning) + result = super().bulk_update_or_create(*args, **kwargs) + self._register_ops(result) + return result.count() if original_returning is None else result + + +class ClickHouseSyncQuerySetMixin: def update(self, **kwargs): if self.model.clickhouse_sync_type == 'redis': pk_name = self.model._meta.pk.name res = self.only(pk_name).update_returning(**kwargs).values_list(pk_name, flat=True) - self.model.register_clickhouse_operation('UPDATE', *res, database=(self._db or 'default')) + self.model.register_clickhouse_operations('update', *res, usint=self.db) return len(res) else: - return super(ClickHouseDjangoModelQuerySet, self).update(**kwargs) - - def update_returning(self, **updates): - result = super(ClickHouseDjangoModelQuerySet, self).update_returning(**updates) - if self.model.clickhouse_sync_type == 'redis': - pk_name = self.model._meta.pk.name - pk_list = result.values_list(pk_name, flat=True) - self.model.register_clickhouse_operation('UPDATE', *pk_list, database=(self._db or 'default')) - return result - - def delete_returning(self): - result = super(ClickHouseDjangoModelQuerySet, self).delete_returning() - if self.model.clickhouse_sync_type == 'redis': - pk_name = self.model._meta.pk.name - pk_list = result.values_list(pk_name, flat=True) - self.model.register_clickhouse_operation('DELETE', *pk_list, database=(self._db or 'default')) - return result - - -class ClickHouseDjangoModelManager(DjangoManager): - def get_queryset(self): - """ - Инициализирует кастомный QuerySet - :return: BaseQuerySet модели - """ - return ClickHouseDjangoModelQuerySet(model=self.model, using=self._db) + return super().update(**kwargs) def bulk_create(self, objs, batch_size=None): - objs = super(ClickHouseDjangoModelManager, self).bulk_create(objs, batch_size=batch_size) - self.model.register_clickhouse_operation('INSERT', *[obj.pk for obj in objs], database=(self._db or 'default')) + objs = super().bulk_create(objs, batch_size=batch_size) + self.model.register_clickhouse_operations('insert', *[obj.pk for obj in objs], using=self.db) return objs -class ClickHouseDjangoModel(DjangoModel): - """ - Определяет базовую абстрактную модель, синхронизируемую с кликхаусом - """ - # TODO PostgreSQL, используемый сейчас не поддерживает UPSERT. Эта функция появилась в PostgreSQL 9.5 - # INSERT INTO "{clickhouse_update_table}" ("table", "model_id", "operation") - # VALUES (TG_TABLE_NAME, NEW.{pk_field_name}, TG_OP) ON CONFILICT DO NOTHING; +class ClickHouseSyncModelMixin: + def get_queryset(self): + return ClickHouseSyncModelQuerySet(model=self.model, using=self._db) - # DEPRECATED Пока не удаляю, вдруг все таки решим переписать - # Синхронизация через Postgres основана на триггерах, которые не работают меж шардами - CREATE_TRIGGER_SQL_TEMPLATE = """ - CREATE OR REPLACE FUNCTION {table}_clickhouse_update() RETURNS TRIGGER AS ${table}_clickhouse_update$ - BEGIN - INSERT INTO "{clickhouse_update_table}" ("table", "model_id", "operation", "database") - SELECT TG_TABLE_NAME, NEW.{pk_field_name}, TG_OP, 'default' WHERE NOT EXISTS ( - SELECT id FROM "{clickhouse_update_table}" WHERE "table"=TG_TABLE_NAME AND "model_id"=NEW.{pk_field_name} - ); - RETURN NEW; - END; - ${table}_clickhouse_update$ LANGUAGE plpgsql; - DROP TRIGGER IF EXISTS {table}_collapsing_model_update ON {table}; - CREATE TRIGGER {table}_collapsing_model_update AFTER INSERT OR UPDATE ON {table} - FOR EACH ROW EXECUTE PROCEDURE {table}_clickhouse_update(); +class ClickHouseSyncModelQuerySet(ClickHouseSyncQuerySetMixin, DjangoQuerySet): + pass + + +class ClickHouseSyncModelManager(ClickHouseSyncModelMixin, DjangoManager): + pass + + +class ClickHouseSyncModel(DjangoModel): """ - - # DEPRECATED Пока не удаляю, вдруг все таки решим переписать - # Синхронизация через Postgres основана на триггерах, которые не работают меж шардами - DROP_TRIGGER_SQL_TEMPLATE = """ - DROP TRIGGER IF EXISTS {table}_collapsing_model_update ON {table}; - DROP FUNCTION IF EXISTS {table}_clickhouse_update(); + Base model for syncing data. Each django model synced with data must inherit this """ - - clickhouse_sync_type = None - objects = ClickHouseDjangoModelManager() + _clickhouse_sync_models = [] + objects = ClickHouseSyncModelManager() class Meta: abstract = True - def __init__(self, *args, **kwargs): - # Добавил, чтобы PyCharm не ругался на неопределенный __init__ - super().__init__(*args, **kwargs) + @classmethod + def get_clickhouse_storage(cls): # type: () -> Storage + """ + Returns Storage instance to save clickhouse sync data to + :return: + """ + storage_cls = lazy_class_import(config.SYNC_STORAGE) + return storage_cls() @classmethod - def register_clickhouse_operation(cls, operation, *model_ids, database=None): + def register_clickhouse_sync_model(cls, model_cls): # type: (Type[ClickHouseModel]) -> None """ - Добавляет в redis запись о том, что произошел Insert, update или delete модели - :param operation: Тип операции INSERT, UPDATE, DELETE - :param model_ids: Id элементов для регистрации - :param database: База данных, в которой лежит данное значение + Registers ClickHouse model to listen to this model updates + :param model_cls: Model class to register :return: None """ - if cls.clickhouse_sync_type != 'redis': - return - - assert operation in {'INSERT', 'UPDATE', 'DELETE'}, 'operation must be one of [INSERT, UPDATE, DELETE]' - model_ids = get_parameter_pk_list(model_ids) - - if len(model_ids) > 0: - key = 'clickhouse_sync:{database}:{table}:{operation}'.format(table=cls._meta.db_table, operation=operation, - database=(database or 'default')) - on_transaction_commit(settings.REDIS.sadd, args=[key] + model_ids) + cls._clickhouse_sync_models.append(model_cls) @classmethod - def get_trigger_sql(cls, drop=False, table=None): + def get_clickhouse_sync_models(cls): # type: () -> List[ClickHouseModel] """ - Формирует SQL для создания или удаления триггера на обновление модели синхронизации с ClickHouse - :param drop: Если флаг указан, формирует SQL для удаления триггера. Иначе - для создания - :return: Строка SQL + Returns all clickhouse models, listening to this class + :return: """ - # DEPRECATED Пока не удаляю, вдруг все таки решим переписать - # Синхронизация через Postgres основана на триггерах, которые не работают меж шардами - raise Exception('This method is deprecated due to sharding released') + return cls._clickhouse_sync_models - # table = table or cls._meta.db_table - # from utils.models import ClickHouseModelOperation - # sql = cls.DROP_TRIGGER_SQL_TEMPLATE if drop else cls.CREATE_TRIGGER_SQL_TEMPLATE - # sql = sql.format(table=table, pk_field_name=cls._meta.pk.name, - # clickhouse_update_table=ClickHouseModelOperation._meta.db_table) - # return sql + @classmethod + def register_clickhouse_operations(cls, operation, *model_pks, using=None): + # type: (str, *Any, Optional[str]) -> None + """ + Registers model operation in storage + :param operation: Operation type - one of [insert, update, delete) + :param model_pks: Elements to import + :param using: Database alias registered instances are from + :return: None + """ + if len(model_pks) > 0: + storage = cls.get_clickhouse_storage() - def post_save(self, created, using=None): - self.register_clickhouse_operation('INSERT' if created else 'UPDATE', self.pk, database=(using or 'default')) + def _on_commit(): + for model_cls in cls.get_clickhouse_sync_models(): + storage.register_operations_wrapped(model_cls.get_import_key(), operation, *model_pks) - def post_delete(self, using=None): - self.register_clickhouse_operation('DELETE', self.pk, database=(using or 'default')) + transaction.on_commit(_on_commit, using=using) + + def post_save(self, created, using=None): # type: (bool, Optional[str]) -> None + self.register_clickhouse_operations('insert' if created else 'update', self.pk, using=using) + + def post_delete(self, using=None): # type: (Optional[str]) -> None + self.register_clickhouse_operations('delete', self.pk, using=using) @receiver(post_save) def post_save(sender, instance, **kwargs): - if issubclass(sender, ClickHouseDjangoModel): - instance.post_save(kwargs.get('created'), using=kwargs.get('using')) + if issubclass(sender, ClickHouseSyncModel): + instance.post_save(kwargs.get('created', False), using=kwargs.get('using')) @receiver(post_delete) def post_delete(sender, instance, **kwargs): - if issubclass(sender, ClickHouseDjangoModel): + if issubclass(sender, ClickHouseSyncModel): instance.post_delete(using=kwargs.get('using')) diff --git a/src/django_clickhouse/storage.py b/src/django_clickhouse/storage.py index a7c0152..ac31d01 100644 --- a/src/django_clickhouse/storage.py +++ b/src/django_clickhouse/storage.py @@ -7,6 +7,7 @@ Important: Storage should be able to restore current importing batch, if something goes wrong. """ import datetime +from itertools import chain from typing import Any, Optional, List, Tuple, Iterable from .exceptions import ConfigurationError @@ -81,7 +82,7 @@ class Storage: """ raise NotImplemented() - def register_operation(self, import_key, operation, pk): # type: (str, str, Any) -> None + def register_operations(self, import_key, operation, *pks): # type: (str, str, *Iterable[Any]) -> None """ Registers new incoming operation :param import_key: A key, returned by ClickHouseModel.get_import_key() method @@ -91,8 +92,8 @@ class Storage: """ raise NotImplementedError() - def register_operation_wrapped(self, import_key, operation, pk): - # type: (str, str, Any) -> None + def register_operations_wrapped(self, import_key, operation, *pks): + # type: (str, str, *Iterable[Any]) -> None """ This is a wrapper for register_operation method, checking main parameters. This method should be called from inner functions. @@ -104,7 +105,7 @@ class Storage: if operation not in {'insert', 'update', 'delete'}: raise ValueError('operation must be one of [insert, update, delete]') - return self.register_operation(import_key, operation, pk) + return self.register_operations(import_key, operation, *pks) class RedisStorage(Storage): @@ -126,12 +127,14 @@ class RedisStorage(Storage): from redis import StrictRedis self._redis = StrictRedis(**config.REDIS_CONFIG) - def register_operation(self, import_key, operation, pk): + def register_operations(self, import_key, operation, *pks): key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) score = datetime.datetime.now().timestamp() - # key, score, value - self._redis.zadd(key, score, '%s:%s' % (operation, str(pk))) + items = chain(*((score, '%s:%s' % (operation, str(pk))) for pk in pks)) + + # key, score1, value1, score2, value2, ... + self._redis.zadd(key, *items) def get_operations(self, import_key, count, **kwargs): ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key) diff --git a/src/django_clickhouse/utils.py b/src/django_clickhouse/utils.py index def9df8..f924b81 100644 --- a/src/django_clickhouse/utils.py +++ b/src/django_clickhouse/utils.py @@ -1,3 +1,8 @@ +from typing import Union, Any + +import six +from importlib import import_module + def get_clickhouse_tz_offset(): """ @@ -31,3 +36,22 @@ def format_datetime(dt, timezone_offset=0, day_end=False): # Если даты форматируются вручную, то сервер воспринимает их как локаль сервера. return (dt - datetime.timedelta(minutes=timezone_offset - get_clickhouse_tz_offset())).strftime("%Y-%m-%d %H:%M:%S") + + +def lazy_class_import(obj): # type: (Union[str, Any]) -> Any + """ + If string is given, imports object by given module path. + Otherwise returns the object + :param obj: A string class path or object to return + :return: Imported object + """ + if isinstance(obj, six.string_types): + module_name, obj_name = obj.rsplit('.', 1) + module = import_module(module_name) + + try: + return getattr(module, obj_name) + except AttributeError: + raise ImportError('Invalid import path `%s`' % obj) + else: + return obj diff --git a/tests/test_storages.py b/tests/test_storages.py index 7f57caa..fd423a8 100644 --- a/tests/test_storages.py +++ b/tests/test_storages.py @@ -15,9 +15,9 @@ class StorageTest(TestCase): redis.delete(*keys) def test_operation_pks(self): - self.storage.register_operation_wrapped('test', 'insert', 100500) - self.storage.register_operation_wrapped('test', 'insert', 100501) - self.storage.register_operation_wrapped('test', 'insert', 100502) + self.storage.register_operations_wrapped('test', 'insert', 100500) + self.storage.register_operations_wrapped('test', 'insert', 100501) + self.storage.register_operations_wrapped('test', 'insert', 100502) self.assertListEqual([ ('insert', '100500'), ('insert', '100501'), @@ -25,9 +25,9 @@ class StorageTest(TestCase): ], self.storage.get_operations('test', 10)) def test_operation_types(self): - self.storage.register_operation_wrapped('test', 'insert', 100500) - self.storage.register_operation_wrapped('test', 'update', 100500) - self.storage.register_operation_wrapped('test', 'delete', 100500) + self.storage.register_operations_wrapped('test', 'insert', 100500) + self.storage.register_operations_wrapped('test', 'update', 100500) + self.storage.register_operations_wrapped('test', 'delete', 100500) self.assertListEqual([ ('insert', '100500'), ('update', '100500'), @@ -35,9 +35,9 @@ class StorageTest(TestCase): ], self.storage.get_operations('test', 10)) def test_operation_import_keys(self): - self.storage.register_operation_wrapped('test1', 'insert', 100500) - self.storage.register_operation_wrapped('test2', 'insert', 100500) - self.storage.register_operation_wrapped('test2', 'insert', 100501) + self.storage.register_operations_wrapped('test1', 'insert', 100500) + self.storage.register_operations_wrapped('test2', 'insert', 100500) + self.storage.register_operations_wrapped('test2', 'insert', 100501) self.assertListEqual([ ('insert', '100500') ], self.storage.get_operations('test1', 10)) @@ -51,11 +51,11 @@ class StorageTest(TestCase): self.assertTupleEqual(tuple(str(i) for i in range(10)), self.storage.get_import_batch('test')) def test_post_sync(self): - self.storage.register_operation_wrapped('test', 'insert', 100500) - self.storage.register_operation_wrapped('test', 'insert', 100501) + self.storage.register_operations_wrapped('test', 'insert', 100500) + self.storage.register_operations_wrapped('test', 'insert', 100501) self.storage.get_operations('test', 10) self.storage.write_import_batch('test', [str(i) for i in range(10)]) - self.storage.register_operation_wrapped('test', 'insert', 100502) + self.storage.register_operations_wrapped('test', 'insert', 100502) self.storage.post_sync('test') self.assertListEqual([