diff --git a/setup.py b/setup.py index 05c1f77..261d418 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ with open('requirements.txt') as f: setup( name='django-clickhouse', - version='1.1.1', + version='1.1.2', packages=['django_clickhouse', 'django_clickhouse.management.commands'], package_dir={'': 'src'}, url='https://github.com/carrotquest/django-clickhouse', diff --git a/src/django_clickhouse/engines.py b/src/django_clickhouse/engines.py index 8d467dc..d79c993 100644 --- a/src/django_clickhouse/engines.py +++ b/src/django_clickhouse/engines.py @@ -2,7 +2,8 @@ This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse """ import datetime -from typing import List, Type, Union, Iterable, Optional +import logging +from typing import List, Type, Union, Iterable, Optional, Tuple, NamedTuple from django.db.models import Model as DjangoModel from infi.clickhouse_orm import engines as infi_engines @@ -14,6 +15,9 @@ from .database import connections from .utils import format_datetime +logger = logging.getLogger('django-clickhouse') + + class InsertOnlyEngineMixin: def get_insert_batch(self, model_cls: Type[ClickHouseModel], objects: List[DjangoModel]) -> Iterable[tuple]: """ @@ -45,43 +49,64 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre self.version_col = kwargs.pop('version_col', None) super(CollapsingMergeTree, self).__init__(*args, **kwargs) - def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns): + def _get_final_versions_by_version(self, db_alias: str, model_cls: Type[ClickHouseModel], object_pks: Iterable[str], + columns: str, date_range_filter: str = '') -> List[NamedTuple]: + """ + Performs request to ClickHouse in order to fetch latest version for each object pk + :param db_alias: ClickHouse database alias used + :param model_cls: Model class for which data is fetched + :param object_pks: Objects primary keys to filter by + :param columns: Columns to fetch + :param date_range_filter: Optional date_range_filter which speeds up query if date_col is set + :return: List of named tuples with requested columns + """ + if date_range_filter: + date_range_filter = 'PREWHERE {}'.format(date_range_filter) + query = """ - SELECT {columns} FROM $table WHERE (`{pk_column}`, `{version_col}`) IN ( - SELECT `{pk_column}`, MAX(`{version_col}`) - FROM $table - PREWHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}' - AND `{pk_column}` IN ({object_pks}) - GROUP BY `{pk_column}` - ) - """.format(columns=','.join(columns), version_col=self.version_col, date_col=date_col, pk_column=self.pk_column, - min_date=min_date, max_date=max_date, object_pks=','.join(object_pks)) + SELECT {columns} + FROM $table + {date_range_filter} + WHERE `{pk_column}` IN ({object_pks}) + ORDER BY `{pk_column}`, `{version_col}` DESC + LIMIT 1 BY `{pk_column}` + """.format(columns=','.join(columns), version_col=self.version_col, pk_column=self.pk_column, + date_range_filter=date_range_filter, object_pks=','.join(object_pks), sign_col=self.sign_col) return connections[db_alias].select_tuples(query, model_cls) - def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns): + def _get_final_versions_by_final(self, db_alias: str, model_cls: Type[ClickHouseModel], object_pks: Iterable[str], + columns: str, date_range_filter: str = '') -> List[NamedTuple]: + """ + Performs request to ClickHouse in order to fetch latest version for each object pk + :param db_alias: ClickHouse database alias used + :param model_cls: Model class for which data is fetched + :param object_pks: Objects primary keys to filter by + :param columns: Columns to fetch + :param date_range_filter: Optional date_range_filter which speeds up query if date_col is set + :return: List of named tuples with requested columns + """ + if date_range_filter: + date_range_filter += ' AND' + query = """ SELECT {columns} FROM $table FINAL - WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}' - AND `{pk_column}` IN ({object_pks}) + WHERE {date_range_filter} `{pk_column}` IN ({object_pks}) """ - query = query.format(columns=','.join(columns), date_col=date_col, pk_column=self.pk_column, min_date=min_date, - max_date=max_date, object_pks=','.join(object_pks)) + query = query.format(columns=','.join(columns), pk_column=self.pk_column, date_range_filter=date_range_filter, + object_pks=','.join(object_pks)) return connections[db_alias].select_tuples(query, model_cls) - def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel], - date_col: Optional[str] = None) -> Iterable[tuple]: + def _get_date_rate_filter(self, objects, model_cls: Type[ClickHouseModel], db_alias: str, + date_col: Optional[str]) -> str: """ - Get objects, that are currently stored in ClickHouse. - Depending on the partition key this can be different for different models. - In common case, this method is optimized for date field that doesn't change. - It also supposes primary key to by self.pk_column - :param model_cls: ClickHouseModel subclass to import - :param objects: Objects for which final versions are searched - :param date_col: Optional column name, where partiion date is hold. Defaults to self.date_col - :return: A generator of named tuples, representing previous state + Generates datetime filter to speed up final queries, if date_col is present + :param objects: Objects, which are inserted + :param model_cls: Model class for which data is fetched + :param db_alias: ClickHouse database alias used + :param date_col: Optional column name, where partition date is hold. Defaults to self.date_col + :return: String to add to WHERE or PREWHERE query section """ - def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str: if isinstance(dt, datetime.datetime): return format_datetime(dt, 0, db_alias=db_alias) @@ -90,10 +115,15 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre else: raise Exception('Invalid date or datetime object: `%s`' % dt) - if not objects: - raise StopIteration() - date_col = date_col or self.date_col + + if not date_col: + logger.warning('django-clickhouse: date_col is not provided for model %s.' + ' This can cause significant performance problems while fetching data.' + ' It is worth inheriting CollapsingMergeTree engine with custom get_final_versions() method,' + ' based on your partition_key' % model_cls) + return '' + min_date, max_date = None, None for obj in objects: obj_date = getattr(obj, date_col) @@ -104,19 +134,39 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre if max_date is None or max_date < obj_date: max_date = obj_date + min_date = _dt_to_str(min_date) + max_date = _dt_to_str(max_date) + + return "`{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'".\ + format(min_date=min_date, max_date=max_date, date_col=date_col) + + def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel], + date_col: Optional[str] = None) -> Iterable[tuple]: + """ + Get objects, that are currently stored in ClickHouse. + Depending on the partition key this can be different for different models. + In common case, this method is optimized for date field that doesn't change. + It also supposes primary key to by self.pk_column + :param model_cls: ClickHouseModel subclass to import + :param objects: Objects for which final versions are searched + :param date_col: Optional column name, where partition date is hold. Defaults to self.date_col + :return: A generator of named tuples, representing previous state + """ + if not objects: + raise StopIteration() + object_pks = [str(getattr(obj, self.pk_column)) for obj in objects] db_alias = model_cls.get_database_alias() - min_date = _dt_to_str(min_date) - max_date = _dt_to_str(max_date) + date_range_filter = self._get_date_rate_filter(objects, model_cls, db_alias, date_col) # Get fields. Sign is replaced to negative for further processing columns = list(model_cls.fields(writable=True).keys()) columns.remove(self.sign_col) columns.append('-1 AS sign') - params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns) + params = (db_alias, model_cls, object_pks, columns, date_range_filter) if self.version_col: return self._get_final_versions_by_version(*params) diff --git a/tests/test_engines.py b/tests/test_engines.py index 6852e8f..263ae40 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -88,12 +88,25 @@ class CollapsingMergeTreeTest(TestCase): self.objects, date_col='created') self._test_final_versions(final_versions) + def test_get_final_versions_by_final_no_date_col(self): + ClickHouseCollapseTestModel.engine.date_col = None + final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, + self.objects) + self._test_final_versions(final_versions) + def test_get_final_versions_by_version_datetime(self): ClickHouseCollapseTestModel.engine.version_col = 'version' final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, self.objects, date_col='created') self._test_final_versions(final_versions) + def test_get_final_versions_by_version_no_date_col(self): + ClickHouseCollapseTestModel.engine.version_col = 'version' + ClickHouseCollapseTestModel.engine.date_col = None + final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, + self.objects) + self._test_final_versions(final_versions) + def test_versions(self): ClickHouseCollapseTestModel.engine.version_col = 'version' batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects)