fix synchronization without a date_col

This commit is contained in:
Beda Kosata 2021-10-04 13:51:32 +02:00
parent 12069db14e
commit 20eecf0fd4

View File

@ -69,6 +69,15 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
max_date=max_date, object_pks=','.join(object_pks))
return connections[db_alias].select_tuples(query, model_cls)
def _get_final_versions_no_date(self, db_alias, model_cls, object_pks, columns):
query = """
SELECT {columns} FROM $table FINAL
WHERE `{pk_column}` IN ({object_pks})
"""
query = query.format(columns=','.join(columns), pk_column=self.pk_column,
object_pks=','.join(object_pks))
return connections[db_alias].select_tuples(query, model_cls)
def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel],
date_col: Optional[str] = None) -> Iterable[tuple]:
"""
@ -93,35 +102,37 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
if not objects:
raise StopIteration()
date_col = date_col or self.date_col
min_date, max_date = None, None
for obj in objects:
obj_date = getattr(obj, date_col)
if min_date is None or min_date > obj_date:
min_date = obj_date
if max_date is None or max_date < obj_date:
max_date = obj_date
object_pks = [str(getattr(obj, self.pk_column)) for obj in objects]
db_alias = model_cls.get_database_alias()
min_date = _dt_to_str(min_date)
max_date = _dt_to_str(max_date)
# Get fields. Sign is replaced to negative for further processing
columns = list(model_cls.fields(writable=True).keys())
columns.remove(self.sign_col)
columns.append('-1 AS sign')
params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns)
object_pks = [str(getattr(obj, self.pk_column)) for obj in objects]
db_alias = model_cls.get_database_alias()
if self.version_col:
return self._get_final_versions_by_version(*params)
date_col = date_col or self.date_col
if date_col:
min_date, max_date = None, None
for obj in objects:
obj_date = getattr(obj, date_col)
if min_date is None or min_date > obj_date:
min_date = obj_date
if max_date is None or max_date < obj_date:
max_date = obj_date
min_date = _dt_to_str(min_date)
max_date = _dt_to_str(max_date)
params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns)
if self.version_col:
return self._get_final_versions_by_version(*params)
else:
return self._get_final_versions_by_final(*params)
else:
return self._get_final_versions_by_final(*params)
return self._get_final_versions_no_date(db_alias, model_cls, object_pks, columns)
def get_insert_batch(self, model_cls: Type[ClickHouseModel], objects: List[DjangoModel]) -> Iterable[tuple]:
"""