Added ability to get final rows version with the help of version_col

This commit is contained in:
M1ha 2019-01-11 13:00:42 +05:00
parent ec2f71fc78
commit 1f3dcedf55
3 changed files with 77 additions and 9 deletions

View File

@ -41,12 +41,44 @@ class SummingMergeTree(InsertOnlyEngineMixin, infi_engines.SummingMergeTree):
class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTree): class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTree):
pk_column = 'id'
def __init__(self, *args, **kwargs):
self.version_col = kwargs.pop('version_col', None)
super(CollapsingMergeTree, self).__init__(*args, **kwargs)
def _get_final_versions_by_version(self, model_cls, min_date, max_date, object_pks):
query = """
SELECT * FROM $table WHERE (`{pk_column}`, `{version_col}`) IN (
SELECT `{pk_column}`, MAX(`{version_col}`)
FROM $table
PREWHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
AND `{pk_column}` IN ({object_pks})
GROUP BY `{pk_column}`
)
""".format(version_col=self.version_col, date_col=self.date_col, pk_column=self.pk_column,
min_date=min_date.isoformat(), max_date=max_date.isoformat(), object_pks=','.join(object_pks))
qs = model_cls.get_database().select(query, model_class=model_cls)
return list(qs)
def _get_final_versions_by_final(self, model_cls, min_date, max_date, object_pks):
query = """
SELECT * FROM $table FINAL
WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
AND `{pk_column}` IN ({object_pks})
"""
query = query.format(date_col=self.date_col, pk_column=self.pk_column, min_date=min_date.isoformat(),
max_date=max_date.isoformat(), object_pks=','.join(object_pks))
qs = model_cls.get_database().select(query, model_class=model_cls)
return list(qs)
def get_final_versions(self, model_cls, objects): def get_final_versions(self, model_cls, objects):
""" """
Get objects, that are currently stored in ClickHouse. Get objects, that are currently stored in ClickHouse.
Depending on the partition key this can be different for different models. Depending on the partition key this can be different for different models.
In common case, this method is optimized for date field that doesn't change. In common case, this method is optimized for date field that doesn't change.
It also supposes primary key to by id It also supposes primary key to by self.pk_column
:param model_cls: ClickHouseModel subclass to import :param model_cls: ClickHouseModel subclass to import
:param objects: Objects for which final versions are searched :param objects: Objects for which final versions are searched
:return: A list of model objects :return: A list of model objects
@ -64,14 +96,12 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
if max_date is None or max_date < obj_date: if max_date is None or max_date < obj_date:
max_date = obj_date max_date = obj_date
obj_ids = [str(obj.id) for obj in objects] object_pks = [str(getattr(obj, self.pk_column)) for obj in objects]
query = "SELECT * FROM $table FINAL WHERE `%s` >= '%s' AND `%s` <= '%s' AND id IN (%s)" \
% (self.date_col, min_date.isoformat(), self.date_col, max_date.isoformat(), ', '.join(obj_ids))
db_router = lazy_class_import(config.DATABASE_ROUTER)() if self.version_col:
db = db_router.db_for_read(model_cls) return self._get_final_versions_by_version(model_cls, min_date, max_date, object_pks)
qs = connections[db].select(query, model_class=model_cls) else:
return list(qs) return self._get_final_versions_by_final(model_cls, min_date, max_date, object_pks)
def get_insert_batch(self, model_cls, objects): def get_insert_batch(self, model_cls, objects):
# type: (Type[T], List[DjangoModel]) -> List[T] # type: (Type[T], List[DjangoModel]) -> List[T]
@ -89,6 +119,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
for obj in old_objs: for obj in old_objs:
self.set_obj_sign(obj, -1) self.set_obj_sign(obj, -1)
self.inc_obj_version(obj)
for obj in new_objs: for obj in new_objs:
self.set_obj_sign(obj, 1) self.set_obj_sign(obj, 1)
@ -101,3 +132,12 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
:return: None :return: None
""" """
setattr(obj, self.sign_col, sign) setattr(obj, self.sign_col, sign)
def inc_obj_version(self, obj): # type: (InfiModel, int) -> None
"""
Increments object version, if version column is set. By default gets attribute name from sign_col
:return: None
"""
if self.version_col:
prev_version = getattr(obj, self.version_col) or 0
setattr(obj, self.version_col, prev_version + 1)

View File

@ -28,6 +28,7 @@ class ClickHouseCollapseTestModel(ClickHouseModel):
created_date = fields.DateField() created_date = fields.DateField()
value = fields.Int32Field() value = fields.Int32Field()
sign = fields.Int8Field() sign = fields.Int8Field()
version = fields.Int8Field(default=1)
engine = CollapsingMergeTree('created_date', ('id',), 'sign') engine = CollapsingMergeTree('created_date', ('id',), 'sign')

View File

@ -37,7 +37,7 @@ class SyncTest(TransactionTestCase):
self.assertEqual(obj.value, synced_data[0].value) self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id) self.assertEqual(obj.id, synced_data[0].id)
def test_collapsing_update(self): def test_collapsing_update_by_final(self):
obj = TestModel.objects.create(value=1, created_date=datetime.date.today()) obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
obj.value = 2 obj.value = 2
obj.save() obj.save()
@ -60,6 +60,33 @@ class SyncTest(TransactionTestCase):
self.assertEqual(obj.value, synced_data[0].value) self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id) self.assertEqual(obj.id, synced_data[0].id)
def test_collapsing_update_by_version(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
obj.value = 2
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
synced_data = list(ClickHouseCollapseTestModel.objects.all())
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
obj.value = 3
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
self.assertGreaterEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
ClickHouseCollapseTestModel.engine.version_col = None
@expectedFailure @expectedFailure
def test_collapsing_delete(self): def test_collapsing_delete(self):
obj = TestModel.objects.create(value=1, created_date=datetime.date.today()) obj = TestModel.objects.create(value=1, created_date=datetime.date.today())