From 20eecf0fd425ddabb55243130d5bd92a1f1a7a7e Mon Sep 17 00:00:00 2001 From: Beda Kosata Date: Mon, 4 Oct 2021 13:51:32 +0200 Subject: [PATCH] fix synchronization without a date_col --- src/django_clickhouse/engines.py | 55 +++++++++++++++++++------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/src/django_clickhouse/engines.py b/src/django_clickhouse/engines.py index 8d467dc..01c7148 100644 --- a/src/django_clickhouse/engines.py +++ b/src/django_clickhouse/engines.py @@ -69,6 +69,15 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre max_date=max_date, object_pks=','.join(object_pks)) return connections[db_alias].select_tuples(query, model_cls) + def _get_final_versions_no_date(self, db_alias, model_cls, object_pks, columns): + query = """ + SELECT {columns} FROM $table FINAL + WHERE `{pk_column}` IN ({object_pks}) + """ + query = query.format(columns=','.join(columns), pk_column=self.pk_column, + object_pks=','.join(object_pks)) + return connections[db_alias].select_tuples(query, model_cls) + def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel], date_col: Optional[str] = None) -> Iterable[tuple]: """ @@ -93,35 +102,37 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre if not objects: raise StopIteration() - date_col = date_col or self.date_col - min_date, max_date = None, None - for obj in objects: - obj_date = getattr(obj, date_col) - - if min_date is None or min_date > obj_date: - min_date = obj_date - - if max_date is None or max_date < obj_date: - max_date = obj_date - - object_pks = [str(getattr(obj, self.pk_column)) for obj in objects] - - db_alias = model_cls.get_database_alias() - - min_date = _dt_to_str(min_date) - max_date = _dt_to_str(max_date) - # Get fields. Sign is replaced to negative for further processing columns = list(model_cls.fields(writable=True).keys()) columns.remove(self.sign_col) columns.append('-1 AS sign') - params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns) + object_pks = [str(getattr(obj, self.pk_column)) for obj in objects] + db_alias = model_cls.get_database_alias() - if self.version_col: - return self._get_final_versions_by_version(*params) + date_col = date_col or self.date_col + if date_col: + min_date, max_date = None, None + for obj in objects: + obj_date = getattr(obj, date_col) + + if min_date is None or min_date > obj_date: + min_date = obj_date + + if max_date is None or max_date < obj_date: + max_date = obj_date + + min_date = _dt_to_str(min_date) + max_date = _dt_to_str(max_date) + + params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns) + + if self.version_col: + return self._get_final_versions_by_version(*params) + else: + return self._get_final_versions_by_final(*params) else: - return self._get_final_versions_by_final(*params) + return self._get_final_versions_no_date(db_alias, model_cls, object_pks, columns) def get_insert_batch(self, model_cls: Type[ClickHouseModel], objects: List[DjangoModel]) -> Iterable[tuple]: """