Added sync mechanism without tasks, but not tested as no migrations

This commit is contained in:
M1ha 2018-11-15 16:50:38 +05:00
parent 7c39bd5c84
commit 8e9963ad3b
9 changed files with 192 additions and 113 deletions

View File

@ -2,13 +2,17 @@
This file defines base abstract models to inherit from This file defines base abstract models to inherit from
""" """
import datetime import datetime
from collections import defaultdict
from itertools import chain
from typing import List, Tuple
from django.utils.timezone import now from django.db.models import Model as DjangoModel
from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase
from typing import Set, Union
from six import with_metaclass from six import with_metaclass
from .db_clients import connections
from .serializers import Django2ClickHouseModelSerializer
from .models import ClickHouseSyncModel from .models import ClickHouseSyncModel
from .utils import lazy_class_import from .utils import lazy_class_import
from . import config from . import config
@ -32,10 +36,17 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
Base model for all other models Base model for all other models
""" """
django_model = None django_model = None
django_model_serializer = Django2ClickHouseModelSerializer
sync_batch_size = None sync_batch_size = None
sync_storage = None sync_storage = None
sync_delay = None sync_delay = None
sync_database_alias = None
@classmethod
def get_django_model_serializer(cls):
serializer_cls = lazy_class_import(cls.django_model_serializer)
return serializer_cls()
@classmethod @classmethod
def get_sync_batch_size(cls): def get_sync_batch_size(cls):
@ -72,13 +83,22 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return True return True
def import_batch(self): def get_sync_objects(self, operations): # type: (List[Tuple[str, str]]) -> List[DjangoModel]
""" """
Imports batch to ClickHouse Returns objects from main database to sync
:return: :param operations: A list of operations to perform
:return: A list of django_model instances
""" """
pass pk_by_db = defaultdict(set)
for op, pk_str in operations:
using, pk = pk_str.split('.')
pk_by_db[using].add(pk)
objs = chain(*(
self.django_model.objects.filter(pk__in=pk_set).using(using)
for using, pk_set in pk_by_db
))
return list(objs)
def sync_batch_from_storage(self): def sync_batch_from_storage(self):
""" """
@ -87,20 +107,21 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
""" """
storage = self.get_storage() storage = self.get_storage()
import_key = self.get_import_key() import_key = self.get_import_key()
storage.pre_sync(import_key) conn = connections[self.sync_database_alias]
# 1) pre_sync()
# 2) get_import_batch(). If batch is present go to 5)
# 3) If batch is None, call get_operations()
# 4) Transform operations to batch and call write_import_batch()
# 5) Import batch to ClickHouse
storage.pre_sync(import_key)
batch = storage.get_import_batch(import_key) batch = storage.get_import_batch(import_key)
if batch is None: if batch is None:
operations = storage.get_operations(import_key, self.get_sync_batch_size()) operations = storage.get_operations(import_key, self.get_sync_batch_size())
batch = self.engine.get_batch(operations) import_objects = self.get_sync_objects(operations)
storage.write_import_batch(import_key, batch)
self.import_batch(batch) batch = self.engine.get_insert_batch(self.__class__, conn, import_objects)
storage.write_import_batch(import_key, [obj.to_tsv() for obj in batch])
else:
pass # Previous import error, retry
conn.insert(batch)
storage.post_sync(import_key) storage.post_sync(import_key)

View File

@ -0,0 +1,23 @@
from infi.clickhouse_orm.database import Database
from .exceptions import DBAliasError
from .configuration import config
class ConnectionProxy:
_connections = {}
def get_connection(self, alias):
if alias not in self._connections:
if alias not in config.DATABASES:
raise DBAliasError(alias)
self._connections[alias] = Database(**config.DATABASES[alias])
return self._connections[alias]
def __getattr__(self, item):
return self.get_connection(item)
connections = ConnectionProxy()

View File

@ -0,0 +1,86 @@
"""
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
"""
from collections import defaultdict
from itertools import chain
from typing import List, Tuple, TypeVar, Type, Optional
from django.db.models import Model as DjangoModel
from infi.clickhouse_orm import engines as infi_engines
from infi.clickhouse_orm.database import Database
T = TypeVar('T')
class InsertOnlyEngineMixin:
def get_insert_batch(self, model_cls, database, objects):
# type: (Type[T], Database, List[DjangoModel]) -> List[T]
"""
Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import
:param database: infi.clickhouse_orm Database instance to sync data with
:param objects: A list of django Model instances to sync
:return: A list of model_cls objects
"""
serializer = model_cls.get_django_model_serializer()
return [serializer.serialize(obj, model_cls) for obj in objects]
class MergeTree(InsertOnlyEngineMixin, infi_engines.MergeTree):
pass
class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTree):
def get_final_versions(self, model_cls, objects):
"""
Get objects, that are currently stored in ClickHouse.
Depending on the partition key this can be different for different models.
In common case, this method is optimized for date field that doesn't change.
It also supposes primary key to by id
:param model_cls: ClickHouseModel subclass to import
:param objects: Objects for which final versions are searched
:return: A list of
"""
min_date, max_date = None, None
for obj in objects:
obj_date = getattr(obj, self.date_col)
if min_date is None or min_date > obj_date:
min_date = obj_date
if max_date is None or max_date < obj_date:
max_date = obj_date
obj_ids = [str(obj.id) for obj in objects]
query = "SELECT * FROM $table FINAL WHERE `%s` >= '%s' AND `%s` <= '%s', id IN (%s)" \
% (self.date_col, min_date.isoformat(), self.date_col, max_date.isoformat(), ', '.join(obj_ids))
qs = model_cls.select(query, model_class=model_cls)
return list(qs)
def get_insert_batch(self, model_cls, database, objects):
# type: (Type[T], Database, List[DjangoModel]) -> List[T]
"""
Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import
:param database: infi.clickhouse_orm Database instance to sync data with
:param objects: A list of django Model instances to sync
:return: A list of model_cls objects
"""
new_objs = super(CollapsingMergeTree, self).get_insert_batch(model_cls, database, objects)
old_objs = self.get_final_versions(model_cls, new_objs)
for obj in old_objs:
self.set_obj_sign(obj, -1)
for obj in new_objs:
self.set_obj_sign(obj, 1)
return old_objs + new_objs
def set_obj_sign(self, obj, sign): # type: (InfiModel, int) -> None
"""
Sets objects sign. By default gets attribute nmae from sign_col
:return: None
"""
setattr(obj, self.sign_col, sign)

View File

@ -1,10 +1,13 @@
from .configuration import PREFIX from .configuration import PREFIX
class ClickHouseError(Exception):
pass
class ConfigurationError(Exception): class ConfigurationError(Exception):
def __init__(self, param_name): def __init__(self, param_name):
param_name = PREFIX + param_name param_name = PREFIX + param_name
super(ConfigurationError, self).__init__("Config parameter '%s' is not set properly" % param_name) super(ConfigurationError, self).__init__("Config parameter '%s' is not set properly" % param_name)
class DBAliasError(Exception):
def __init__(self, alias):
super(DBAliasError, self).__init__(
"Database alias `%s` is not found. Check %s parameter" % (alias, PREFIX + 'DATABASES'))

View File

@ -6,7 +6,7 @@ It saves all operations to storage in order to write them to ClickHouse later.
from typing import Optional, Any, List, Type from typing import Optional, Any, List, Type
import six import six
from django.db import transaction from django.db import transaction, DEFAULT_DB_ALIAS
from django.db.models.signals import post_save, post_delete from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver from django.dispatch import receiver
from django.db.models import QuerySet as DjangoQuerySet, Manager as DjangoManager, Model as DjangoModel from django.db.models import QuerySet as DjangoQuerySet, Manager as DjangoManager, Model as DjangoModel
@ -171,6 +171,8 @@ class ClickHouseSyncModel(DjangoModel):
:param using: Database alias registered instances are from :param using: Database alias registered instances are from
:return: None :return: None
""" """
model_pks = ['%s.%d' % (using or DEFAULT_DB_ALIAS, pk) for pk in model_pks]
def _on_commit(): def _on_commit():
for model_cls in cls.get_clickhouse_sync_models(): for model_cls in cls.get_clickhouse_sync_models():
storage.register_operations_wrapped(model_cls.get_import_key(), operation, *model_pks) storage.register_operations_wrapped(model_cls.get_import_key(), operation, *model_pks)

View File

@ -1,84 +1,20 @@
from typing import Any from typing import Type
from django.db.models import DjangoModel
from .clickhouse_models import ClickHouseModel from django.db.models import Model as DjangoModel
from django.forms import model_to_dict
class Serializer: class Django2ClickHouseModelSerializer:
def serialize(self, value: Any) -> Any: serialize_fields = None
pass exclude_serialize_fields = None
def serialize(self, obj, model_cls, **kwargs):
# type: (DjangoModel, Type['ClickHouseModel'], **dict) -> 'ClickHouseModel'
data = model_to_dict(obj, self.serialize_fields, self.exclude_serialize_fields)
class DefaultDjango2ClickHouseSerializer(Serializer): # Remove None values, they should be initialized as defaults
@staticmethod for key, value in data.items():
def _convert_none(values, fields_dict): if value is None:
""" del data[key]
ClickHouse не хранит значения NULL, потэтому для них сохраняются невалидные значения параметра.
Преобразует все значения, воспринимаемые как None в None
:param values: Словарь с данными, которые надо преобразовывать
:param fields_dict: Итерируемый объект имен полей, которые надо преобразовывать
:return: Преобразованный словарь
"""
assert isinstance(values, dict), "values parameter must be a dict instance"
result = values.copy()
for key in fields_dict:
if isinstance(values[key], datetime.date) and values[key] == datetime.date(1970, 1, 1) \
or (isinstance(values[key], datetime.datetime)
and (values[key] in (datetime.datetime(1970, 1, 1, 0, 0, 0),
datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc)))) \
or type(values[key]) is int and values[key] == 0 \
or type(values[key]) is str and values[key] == '':
result[key] = None
return result
@staticmethod return model_cls(**data)
def _convert_bool(values, fields_dict):
"""
ClickHouse не хранит значения Bool, потэтому для них сохраняются UInt8.
:param values: Словарь с данными, которые надо преобразовывать
:param fields_dict: Итерируемый объект имен полей, которые надо преобразовывать
:return: Преобразованный словарь
"""
assert isinstance(values, dict), "values parameter must be a dict instance"
result = values.copy()
for key in fields_dict:
result[key] = bool(result[key])
return result
@staticmethod
def _enum_to_str(values):
"""
Преобразует все значения типа Enum в их строковые представления
:param values: Словарь с данными, которые надо преобразовывать
:return: Преобразованный словарь
"""
assert isinstance(values, dict), "values parameter must be a dict instance"
return {key: val.name if isinstance(val, Enum) else val for key, val in values.items()}
@classmethod
def from_django_model(cls, obj):
"""
Создает объект модели ClickHouse из модели django
При переопределении метода желательно проверить аргументы, вызвав:
cls._validate_django_model_instance(obj)
:param obj: Объект модели django
:return: Объект модели ClickHouse или список таких объектов
"""
cls._validate_django_model_instance(obj)
raise ClickHouseError('Method "from_django_model" is not implemented')
def to_django_model(self, obj=None):
"""
Конвертирует эту модель в объект модели django, если это возможно.
Если невозможно - должен поднять исключение.
:param obj: Если передан, то надо не создать новый объект модели django, а обновить существующий
:return: Объект модели django
"""
if obj is not None:
self._validate_django_model_instance(obj)
else:
self._validate_cls_attributes()
raise ClickHouseError('Method "to_django_model" is not implemented')
def serialize(self, value: DjangoModel) -> ClickHouseModel:
pass

View File

@ -1,7 +1,15 @@
from django_clickhouse.clickhouse_models import ClickHouseModel from django_clickhouse.clickhouse_models import ClickHouseModel
from django_clickhouse.engines import MergeTree
from infi.clickhouse_orm import fields
from tests.models import TestModel from tests.models import TestModel
class TestClickHouseModel(ClickHouseModel): class TestClickHouseModel(ClickHouseModel):
django_model = TestModel django_model = TestModel
sync_delay = 5 sync_delay = 5
created_date = fields.DateField()
value = fields.UInt32Field()
engine = MergeTree('created_Date')

View File

@ -24,24 +24,24 @@ class ClickHouseDjangoModelTest(TransactionTestCase):
# INSERT operation # INSERT operation
instance = TestModel(created_date=datetime.date.today(), value=2) instance = TestModel(created_date=datetime.date.today(), value=2)
instance.save() instance.save()
self.assertListEqual([('insert', str(instance.pk))], self.assertListEqual([('insert', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
# UPDATE operation # UPDATE operation
instance.save() instance.save()
self.assertListEqual([('insert', str(instance.pk)), ('update', str(instance.pk))], self.assertListEqual([('insert', "default.%d" % instance.pk), ('update', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_create(self): def test_create(self):
instance = TestModel.objects.create(pk=100555, created_date=datetime.date.today(), value=2) instance = TestModel.objects.create(pk=100555, created_date=datetime.date.today(), value=2)
self.assertListEqual([('insert', str(instance.pk))], self.assertListEqual([('insert', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_bulk_create(self): def test_bulk_create(self):
items = [TestModel(created_date=datetime.date.today(), value=i) for i in range(5)] items = [TestModel(created_date=datetime.date.today(), value=i) for i in range(5)]
items = TestModel.objects.bulk_create(items) items = TestModel.objects.bulk_create(items)
self.assertEqual(5, len(items)) self.assertEqual(5, len(items))
self.assertListEqual([('insert', str(instance.pk)) for instance in items], self.assertListEqual([('insert', "default.%d" % instance.pk) for instance in items],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_get_or_create(self): def test_get_or_create(self):
@ -49,64 +49,64 @@ class ClickHouseDjangoModelTest(TransactionTestCase):
get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2}) get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2})
self.assertTrue(created) self.assertTrue(created)
self.assertListEqual([('insert', str(instance.pk))], self.assertListEqual([('insert', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
instance, created = TestModel.objects. \ instance, created = TestModel.objects. \
get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2}) get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2})
self.assertFalse(created) self.assertFalse(created)
self.assertListEqual([('insert', str(instance.pk))], self.assertListEqual([('insert', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_update_or_create(self): def test_update_or_create(self):
instance, created = TestModel.objects. \ instance, created = TestModel.objects. \
update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2}) update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2})
self.assertTrue(created) self.assertTrue(created)
self.assertListEqual([('insert', str(instance.pk))], self.assertListEqual([('insert', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
instance, created = TestModel.objects. \ instance, created = TestModel.objects. \
update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2}) update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2})
self.assertFalse(created) self.assertFalse(created)
self.assertListEqual([('insert', str(instance.pk)), ('update', str(instance.pk))], self.assertListEqual([('insert', "default.%d" % instance.pk), ('update', "default.%d" % instance.pk)],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_qs_update(self): def test_qs_update(self):
TestModel.objects.filter(pk=1).update(created_date=datetime.date.today()) TestModel.objects.filter(pk=1).update(created_date=datetime.date.today())
self.assertListEqual([('update', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.assertListEqual([('update', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
# Update, after which updated element will not suit update conditions # Update, after which updated element will not suit update conditions
TestModel.objects.filter(created_date__lt=datetime.date.today()). \ TestModel.objects.filter(created_date__lt=datetime.date.today()). \
update(created_date=datetime.date.today()) update(created_date=datetime.date.today())
self.assertListEqual([('update', '1'), ('update', '2')], self.assertListEqual([('update', 'default.1'), ('update', 'default.2')],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_qs_update_returning(self): def test_qs_update_returning(self):
TestModel.objects.filter(pk=1).update_returning(created_date=datetime.date.today()) TestModel.objects.filter(pk=1).update_returning(created_date=datetime.date.today())
self.assertListEqual([('update', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.assertListEqual([('update', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
# Update, after which updated element will not suit update conditions # Update, after which updated element will not suit update conditions
TestModel.objects.filter(created_date__lt=datetime.date.today()). \ TestModel.objects.filter(created_date__lt=datetime.date.today()). \
update_returning(created_date=datetime.date.today()) update_returning(created_date=datetime.date.today())
self.assertListEqual([('update', '1'), ('update', '2')], self.assertListEqual([('update', 'default.1'), ('update', 'default.2')],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_qs_delete_returning(self): def test_qs_delete_returning(self):
TestModel.objects.filter(pk=1).delete_returning() TestModel.objects.filter(pk=1).delete_returning()
self.assertListEqual([('delete', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
# Update, после которого исходный фильтр уже не сработает # Update, после которого исходный фильтр уже не сработает
TestModel.objects.filter(created_date__lt=datetime.date.today()).delete_returning() TestModel.objects.filter(created_date__lt=datetime.date.today()).delete_returning()
self.assertListEqual([('delete', '1'), ('delete', '2')], self.assertListEqual([('delete', 'default.1'), ('delete', 'default.2')],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_delete(self): def test_delete(self):
instance = TestModel.objects.get(pk=1) instance = TestModel.objects.get(pk=1)
instance.delete() instance.delete()
self.assertListEqual([('delete', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_qs_delete(self): def test_qs_delete(self):
TestModel.objects.filter(pk=1).delete() TestModel.objects.filter(pk=1).delete()
self.assertListEqual([('delete', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10)) self.assertListEqual([('delete', 'default.1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))