Багфикс в версиях вставки данных в кликхаус

This commit is contained in:
M1ha 2019-01-16 17:17:48 +05:00
parent 43815723df
commit 073e002125
5 changed files with 23 additions and 23 deletions

View File

@ -48,7 +48,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
self.version_col = kwargs.pop('version_col', None)
super(CollapsingMergeTree, self).__init__(*args, **kwargs)
def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks):
def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col):
query = """
SELECT * FROM $table WHERE (`{pk_column}`, `{version_col}`) IN (
SELECT `{pk_column}`, MAX(`{version_col}`)
@ -57,19 +57,19 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
AND `{pk_column}` IN ({object_pks})
GROUP BY `{pk_column}`
)
""".format(version_col=self.version_col, date_col=self.date_col, pk_column=self.pk_column,
""".format(version_col=self.version_col, date_col=date_col, pk_column=self.pk_column,
min_date=min_date, max_date=max_date, object_pks=','.join(object_pks))
qs = connections[db_alias].select(query, model_class=model_cls)
return list(qs)
def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks):
def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col):
query = """
SELECT * FROM $table FINAL
WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
AND `{pk_column}` IN ({object_pks})
"""
query = query.format(date_col=self.date_col, pk_column=self.pk_column, min_date=min_date,
query = query.format(date_col=date_col, pk_column=self.pk_column, min_date=min_date,
max_date=max_date, object_pks=','.join(object_pks))
qs = connections[db_alias].select(query, model_class=model_cls)
return list(qs)
@ -99,7 +99,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
date_col = date_col or self.date_col
min_date, max_date = None, None
for obj in objects:
obj_date = getattr(obj, self.date_col)
obj_date = getattr(obj, date_col)
if min_date is None or min_date > obj_date:
min_date = obj_date
@ -115,9 +115,9 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
max_date = _dt_to_str(max_date)
if self.version_col:
return self._get_final_versions_by_version(db_alias, model_cls, min_date, max_date, object_pks)
return self._get_final_versions_by_version(db_alias, model_cls, min_date, max_date, object_pks, date_col)
else:
return self._get_final_versions_by_final(db_alias, model_cls, min_date, max_date, object_pks)
return self._get_final_versions_by_final(db_alias, model_cls, min_date, max_date, object_pks, date_col)
def get_insert_batch(self, model_cls, objects):
# type: (Type[T], List[DjangoModel]) -> List[T]
@ -133,13 +133,17 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
with statsd.timer(statsd_key):
old_objs = self.get_final_versions(model_cls, new_objs)
old_objs_versions = {}
for obj in old_objs:
self.set_obj_sign(obj, -1)
self.inc_obj_version(obj)
old_objs_versions[obj.id] = obj.version
for obj in new_objs:
self.set_obj_sign(obj, 1)
if self.version_col:
setattr(obj, self.version_col, old_objs_versions.get(obj.id, 0) + 1)
return old_objs + new_objs
def set_obj_sign(self, obj, sign): # type: (InfiModel, int) -> None
@ -148,12 +152,3 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
:return: None
"""
setattr(obj, self.sign_col, sign)
def inc_obj_version(self, obj): # type: (InfiModel, int) -> None
"""
Increments object version, if version column is set. By default gets attribute name from sign_col
:return: None
"""
if self.version_col:
prev_version = getattr(obj, self.version_col) or 0
setattr(obj, self.version_col, prev_version + 1)

View File

@ -25,13 +25,12 @@ class ClickHouseCollapseTestModel(ClickHouseModel):
sync_enabled = True
id = fields.Int32Field()
created_date = fields.DateField(materialized='toDate(created)')
created = fields.DateTimeField()
value = fields.Int32Field()
sign = fields.Int8Field()
version = fields.Int8Field(default=1)
engine = CollapsingMergeTree('created_date', ('id',), 'sign')
engine = CollapsingMergeTree('created', ('id',), 'sign')
class ClickHouseMultiTestModel(ClickHouseMultiModel):

View File

@ -5,7 +5,7 @@
"fields": {
"value": 100,
"created_date": "2018-01-01",
"created": "2018-02-01 00:00:00"
"created": "2018-01-01 00:00:00"
}
},
{

View File

@ -95,3 +95,12 @@ class CollapsingMergeTreeTest(TestCase):
self.assertEqual(1, len(final_versions))
self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0},
final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version')))
def test_versions(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects)
self.assertEqual(2, len(batch))
self.assertEqual(4, batch[0].version)
self.assertEqual(-1, batch[0].sign)
self.assertEqual(5, batch[1].version)
self.assertEqual(1, batch[1].sign)

View File

@ -45,7 +45,6 @@ class SyncTest(TransactionTestCase):
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
synced_data = list(ClickHouseCollapseTestModel.objects.all())
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
@ -69,7 +68,6 @@ class SyncTest(TransactionTestCase):
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
synced_data = list(ClickHouseCollapseTestModel.objects.all())
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
@ -110,7 +108,6 @@ class SyncTest(TransactionTestCase):
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
synced_data = list(ClickHouseCollapseTestModel.objects.all())
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)