mirror of
synced 2025-03-12 06:45:46 +03:00
Django models and managers refactoring, but not tested
This commit is contained in:
@ -312,7 +312,7 @@ class ClickHouseModel(InfiModel):
if len(model_ids) > 0:
if cls.sync_type == 'redis':
cls.django_model.register_clickhouse_operation('INSERT', *model_ids, database=(database or 'default'))
cls.django_model.register_clickhouse_operations('INSERT', *model_ids, database=(database or 'default'))
else: # if self.sync_type == 'postgres'
from utils.models import ClickHouseModelOperation
@ -707,7 +707,7 @@ class ClickHouseCollapseModel(ClickHouseModel):
if len(model_ids) > 0:
if cls.sync_type == 'redis':
cls.django_model.register_clickhouse_operation('UPDATE', *list(model_ids), database=database)
cls.django_model.register_clickhouse_operations('UPDATE', *list(model_ids), database=database)
else: # if self.sync_type == 'postgres'
from utils.models import ClickHouseModelOperation
@ -16,7 +16,7 @@ DEFAULTS = {
'SYNC_STORAGE': 'django_clickhouse.storage.DBStorage',
'STATSD_PREFIX': 'clickhouse'
'STATSD_PREFIX': 'clickhouse',
@ -1,151 +1,187 @@
This file contains base django model to be synced with ClickHouse.
It saves all operations to storage in order to write them to ClickHouse later.
from typing import Optional, Any, List
import six
from django.db import transaction
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.db.models import QuerySet as DjangoQuerySet, Manager as DjangoManager, Model as DjangoModel
class ClickHouseDjangoModelQuerySet(DjangoQuerySet):
from .configuration import config
from .storage import Storage
from .utils import lazy_class_import
from django_pg_returning.manager import UpdateReturningMixin
except ImportError:
class UpdateReturningMixin:
from django_pg_bulk_update.manager import BulkUpdateManagerMixin
except ImportError:
class BulkUpdateManagerMixin:
class ClickHouseSyncUpdateReturningQuerySetMixin(UpdateReturningMixin):
Переопределяет update, чтобы он сгенерировал данные для обновления ClickHouse
This mixin adopts methods of django-pg-returning library
def __init__(self, *args, **kwargs):
super(ClickHouseDjangoModelQuerySet, self).__init__(*args, **kwargs)
def _register_ops(self, result):
pk_name = self.model._meta.pk.name
pk_list = result.values_list(pk_name, flat=True)
self.model.register_clickhouse_operations('update', *pk_list, using=self.db)
def update_returning(self, **updates):
result = super().update_returning(**updates)
return result
def delete_returning(self):
result = super().delete_returning()
return result
class ClickHouseSyncBulkUpdateManagerMixin(BulkUpdateManagerMixin):
def _update_returning_param(self, returning):
pk_name = self.model._meta.pk.name
if returning is None:
returning = pk_name
elif isinstance(returning, six.string_types):
returning = [pk_name, returning]
returning = list(returning) + [pk_name]
return returning
def _register_ops(self, result):
pk_name = self.model._meta.pk.name
pk_list = [getattr(item, pk_name) for item in result]
self.model.register_clickhouse_operations('update', *pk_list, using=self.db)
def bulk_update(self, *args, **kwargs):
original_returning = kwargs.pop('returning', None)
kwargs['returning'] = self._update_returning_param(original_returning)
result = super().bulk_update(*args, **kwargs)
return result.count() if original_returning is None else result
def bulk_update_or_create(self, *args, **kwargs):
original_returning = kwargs.pop('returning', None)
kwargs['returning'] = self._update_returning_param(original_returning)
result = super().bulk_update_or_create(*args, **kwargs)
return result.count() if original_returning is None else result
class ClickHouseSyncQuerySetMixin:
def update(self, **kwargs):
if self.model.clickhouse_sync_type == 'redis':
pk_name = self.model._meta.pk.name
res = self.only(pk_name).update_returning(**kwargs).values_list(pk_name, flat=True)
self.model.register_clickhouse_operation('UPDATE', *res, database=(self._db or 'default'))
self.model.register_clickhouse_operations('update', *res, usint=self.db)
return len(res)
return super(ClickHouseDjangoModelQuerySet, self).update(**kwargs)
def update_returning(self, **updates):
result = super(ClickHouseDjangoModelQuerySet, self).update_returning(**updates)
if self.model.clickhouse_sync_type == 'redis':
pk_name = self.model._meta.pk.name
pk_list = result.values_list(pk_name, flat=True)
self.model.register_clickhouse_operation('UPDATE', *pk_list, database=(self._db or 'default'))
return result
def delete_returning(self):
result = super(ClickHouseDjangoModelQuerySet, self).delete_returning()
if self.model.clickhouse_sync_type == 'redis':
pk_name = self.model._meta.pk.name
pk_list = result.values_list(pk_name, flat=True)
self.model.register_clickhouse_operation('DELETE', *pk_list, database=(self._db or 'default'))
return result
class ClickHouseDjangoModelManager(DjangoManager):
def get_queryset(self):
Инициализирует кастомный QuerySet
:return: BaseQuerySet модели
return ClickHouseDjangoModelQuerySet(model=self.model, using=self._db)
return super().update(**kwargs)
def bulk_create(self, objs, batch_size=None):
objs = super(ClickHouseDjangoModelManager, self).bulk_create(objs, batch_size=batch_size)
self.model.register_clickhouse_operation('INSERT', *[obj.pk for obj in objs], database=(self._db or 'default'))
objs = super().bulk_create(objs, batch_size=batch_size)
self.model.register_clickhouse_operations('insert', *[obj.pk for obj in objs], using=self.db)
return objs
class ClickHouseDjangoModel(DjangoModel):
Определяет базовую абстрактную модель, синхронизируемую с кликхаусом
# TODO PostgreSQL, используемый сейчас не поддерживает UPSERT. Эта функция появилась в PostgreSQL 9.5
# INSERT INTO "{clickhouse_update_table}" ("table", "model_id", "operation")
class ClickHouseSyncModelMixin:
def get_queryset(self):
return ClickHouseSyncModelQuerySet(model=self.model, using=self._db)
# DEPRECATED Пока не удаляю, вдруг все таки решим переписать
# Синхронизация через Postgres основана на триггерах, которые не работают меж шардами
CREATE OR REPLACE FUNCTION {table}_clickhouse_update() RETURNS TRIGGER AS ${table}_clickhouse_update$
INSERT INTO "{clickhouse_update_table}" ("table", "model_id", "operation", "database")
SELECT TG_TABLE_NAME, NEW.{pk_field_name}, TG_OP, 'default' WHERE NOT EXISTS (
SELECT id FROM "{clickhouse_update_table}" WHERE "table"=TG_TABLE_NAME AND "model_id"=NEW.{pk_field_name}
${table}_clickhouse_update$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS {table}_collapsing_model_update ON {table};
CREATE TRIGGER {table}_collapsing_model_update AFTER INSERT OR UPDATE ON {table}
FOR EACH ROW EXECUTE PROCEDURE {table}_clickhouse_update();
class ClickHouseSyncModelQuerySet(ClickHouseSyncQuerySetMixin, DjangoQuerySet):
class ClickHouseSyncModelManager(ClickHouseSyncModelMixin, DjangoManager):
class ClickHouseSyncModel(DjangoModel):
# DEPRECATED Пока не удаляю, вдруг все таки решим переписать
# Синхронизация через Postgres основана на триггерах, которые не работают меж шардами
DROP TRIGGER IF EXISTS {table}_collapsing_model_update ON {table};
DROP FUNCTION IF EXISTS {table}_clickhouse_update();
Base model for syncing data. Each django model synced with data must inherit this
clickhouse_sync_type = None
objects = ClickHouseDjangoModelManager()
_clickhouse_sync_models = []
objects = ClickHouseSyncModelManager()
class Meta:
abstract = True
def __init__(self, *args, **kwargs):
# Добавил, чтобы PyCharm не ругался на неопределенный __init__
super().__init__(*args, **kwargs)
def get_clickhouse_storage(cls): # type: () -> Storage
Returns Storage instance to save clickhouse sync data to
storage_cls = lazy_class_import(config.SYNC_STORAGE)
return storage_cls()
def register_clickhouse_operation(cls, operation, *model_ids, database=None):
def register_clickhouse_sync_model(cls, model_cls): # type: (Type[ClickHouseModel]) -> None
Добавляет в redis запись о том, что произошел Insert, update или delete модели
:param operation: Тип операции INSERT, UPDATE, DELETE
:param model_ids: Id элементов для регистрации
:param database: База данных, в которой лежит данное значение
Registers ClickHouse model to listen to this model updates
:param model_cls: Model class to register
:return: None
if cls.clickhouse_sync_type != 'redis':
assert operation in {'INSERT', 'UPDATE', 'DELETE'}, 'operation must be one of [INSERT, UPDATE, DELETE]'
model_ids = get_parameter_pk_list(model_ids)
if len(model_ids) > 0:
key = 'clickhouse_sync:{database}:{table}:{operation}'.format(table=cls._meta.db_table, operation=operation,
database=(database or 'default'))
on_transaction_commit(settings.REDIS.sadd, args=[key] + model_ids)
def get_trigger_sql(cls, drop=False, table=None):
def get_clickhouse_sync_models(cls): # type: () -> List[ClickHouseModel]
Формирует SQL для создания или удаления триггера на обновление модели синхронизации с ClickHouse
:param drop: Если флаг указан, формирует SQL для удаления триггера. Иначе - для создания
:return: Строка SQL
Returns all clickhouse models, listening to this class
# DEPRECATED Пока не удаляю, вдруг все таки решим переписать
# Синхронизация через Postgres основана на триггерах, которые не работают меж шардами
raise Exception('This method is deprecated due to sharding released')
return cls._clickhouse_sync_models
# table = table or cls._meta.db_table
# from utils.models import ClickHouseModelOperation
# sql = sql.format(table=table, pk_field_name=cls._meta.pk.name,
# clickhouse_update_table=ClickHouseModelOperation._meta.db_table)
# return sql
def register_clickhouse_operations(cls, operation, *model_pks, using=None):
# type: (str, *Any, Optional[str]) -> None
Registers model operation in storage
:param operation: Operation type - one of [insert, update, delete)
:param model_pks: Elements to import
:param using: Database alias registered instances are from
:return: None
if len(model_pks) > 0:
storage = cls.get_clickhouse_storage()
def post_save(self, created, using=None):
self.register_clickhouse_operation('INSERT' if created else 'UPDATE', self.pk, database=(using or 'default'))
def _on_commit():
for model_cls in cls.get_clickhouse_sync_models():
storage.register_operations_wrapped(model_cls.get_import_key(), operation, *model_pks)
def post_delete(self, using=None):
self.register_clickhouse_operation('DELETE', self.pk, database=(using or 'default'))
transaction.on_commit(_on_commit, using=using)
def post_save(self, created, using=None): # type: (bool, Optional[str]) -> None
self.register_clickhouse_operations('insert' if created else 'update', self.pk, using=using)
def post_delete(self, using=None): # type: (Optional[str]) -> None
self.register_clickhouse_operations('delete', self.pk, using=using)
def post_save(sender, instance, **kwargs):
if issubclass(sender, ClickHouseDjangoModel):
instance.post_save(kwargs.get('created'), using=kwargs.get('using'))
if issubclass(sender, ClickHouseSyncModel):
instance.post_save(kwargs.get('created', False), using=kwargs.get('using'))
def post_delete(sender, instance, **kwargs):
if issubclass(sender, ClickHouseDjangoModel):
if issubclass(sender, ClickHouseSyncModel):
@ -7,6 +7,7 @@ Important:
Storage should be able to restore current importing batch, if something goes wrong.
import datetime
from itertools import chain
from typing import Any, Optional, List, Tuple, Iterable
from .exceptions import ConfigurationError
@ -81,7 +82,7 @@ class Storage:
raise NotImplemented()
def register_operation(self, import_key, operation, pk): # type: (str, str, Any) -> None
def register_operations(self, import_key, operation, *pks): # type: (str, str, *Iterable[Any]) -> None
Registers new incoming operation
:param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -91,8 +92,8 @@ class Storage:
raise NotImplementedError()
def register_operation_wrapped(self, import_key, operation, pk):
# type: (str, str, Any) -> None
def register_operations_wrapped(self, import_key, operation, *pks):
# type: (str, str, *Iterable[Any]) -> None
This is a wrapper for register_operation method, checking main parameters.
This method should be called from inner functions.
@ -104,7 +105,7 @@ class Storage:
if operation not in {'insert', 'update', 'delete'}:
raise ValueError('operation must be one of [insert, update, delete]')
return self.register_operation(import_key, operation, pk)
return self.register_operations(import_key, operation, *pks)
class RedisStorage(Storage):
@ -126,12 +127,14 @@ class RedisStorage(Storage):
from redis import StrictRedis
self._redis = StrictRedis(**config.REDIS_CONFIG)
def register_operation(self, import_key, operation, pk):
def register_operations(self, import_key, operation, *pks):
key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
score = datetime.datetime.now().timestamp()
# key, score, value
self._redis.zadd(key, score, '%s:%s' % (operation, str(pk)))
items = chain(*((score, '%s:%s' % (operation, str(pk))) for pk in pks))
# key, score1, value1, score2, value2, ...
self._redis.zadd(key, *items)
def get_operations(self, import_key, count, **kwargs):
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
@ -1,3 +1,8 @@
from typing import Union, Any
import six
from importlib import import_module
def get_clickhouse_tz_offset():
@ -31,3 +36,22 @@ def format_datetime(dt, timezone_offset=0, day_end=False):
# Если даты форматируются вручную, то сервер воспринимает их как локаль сервера.
return (dt - datetime.timedelta(minutes=timezone_offset - get_clickhouse_tz_offset())).strftime("%Y-%m-%d %H:%M:%S")
def lazy_class_import(obj): # type: (Union[str, Any]) -> Any
If string is given, imports object by given module path.
Otherwise returns the object
:param obj: A string class path or object to return
:return: Imported object
if isinstance(obj, six.string_types):
module_name, obj_name = obj.rsplit('.', 1)
module = import_module(module_name)
return getattr(module, obj_name)
except AttributeError:
raise ImportError('Invalid import path `%s`' % obj)
return obj
@ -15,9 +15,9 @@ class StorageTest(TestCase):
def test_operation_pks(self):
self.storage.register_operation_wrapped('test', 'insert', 100500)
self.storage.register_operation_wrapped('test', 'insert', 100501)
self.storage.register_operation_wrapped('test', 'insert', 100502)
self.storage.register_operations_wrapped('test', 'insert', 100500)
self.storage.register_operations_wrapped('test', 'insert', 100501)
self.storage.register_operations_wrapped('test', 'insert', 100502)
('insert', '100500'),
('insert', '100501'),
@ -25,9 +25,9 @@ class StorageTest(TestCase):
], self.storage.get_operations('test', 10))
def test_operation_types(self):
self.storage.register_operation_wrapped('test', 'insert', 100500)
self.storage.register_operation_wrapped('test', 'update', 100500)
self.storage.register_operation_wrapped('test', 'delete', 100500)
self.storage.register_operations_wrapped('test', 'insert', 100500)
self.storage.register_operations_wrapped('test', 'update', 100500)
self.storage.register_operations_wrapped('test', 'delete', 100500)
('insert', '100500'),
('update', '100500'),
@ -35,9 +35,9 @@ class StorageTest(TestCase):
], self.storage.get_operations('test', 10))
def test_operation_import_keys(self):
self.storage.register_operation_wrapped('test1', 'insert', 100500)
self.storage.register_operation_wrapped('test2', 'insert', 100500)
self.storage.register_operation_wrapped('test2', 'insert', 100501)
self.storage.register_operations_wrapped('test1', 'insert', 100500)
self.storage.register_operations_wrapped('test2', 'insert', 100500)
self.storage.register_operations_wrapped('test2', 'insert', 100501)
('insert', '100500')
], self.storage.get_operations('test1', 10))
@ -51,11 +51,11 @@ class StorageTest(TestCase):
self.assertTupleEqual(tuple(str(i) for i in range(10)), self.storage.get_import_batch('test'))
def test_post_sync(self):
self.storage.register_operation_wrapped('test', 'insert', 100500)
self.storage.register_operation_wrapped('test', 'insert', 100501)
self.storage.register_operations_wrapped('test', 'insert', 100500)
self.storage.register_operations_wrapped('test', 'insert', 100501)
self.storage.get_operations('test', 10)
self.storage.write_import_batch('test', [str(i) for i in range(10)])
self.storage.register_operation_wrapped('test', 'insert', 100502)
self.storage.register_operations_wrapped('test', 'insert', 100502)
Reference in New Issue
Block a user