1. Fixed problem when get_final_version() could return 2 rows instead of 1 and insert multiple sign < 0 rows with single sign > 0

2. Fixed problem from PR https://github.com/carrotquest/django-clickhouse/pull/37, when date_col is not set for ClickHouseModel
This commit is contained in:
M1ha 2021-10-05 16:45:41 +05:00
parent 12069db14e
commit 1ed7f5ea60
3 changed files with 96 additions and 40 deletions

View File

@ -13,7 +13,7 @@ with open('requirements.txt') as f:
setup( setup(
name='django-clickhouse', name='django-clickhouse',
version='1.1.1', version='1.1.2',
packages=['django_clickhouse', 'django_clickhouse.management.commands'], packages=['django_clickhouse', 'django_clickhouse.management.commands'],
package_dir={'': 'src'}, package_dir={'': 'src'},
url='https://github.com/carrotquest/django-clickhouse', url='https://github.com/carrotquest/django-clickhouse',

View File

@ -2,7 +2,8 @@
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
""" """
import datetime import datetime
from typing import List, Type, Union, Iterable, Optional import logging
from typing import List, Type, Union, Iterable, Optional, Tuple, NamedTuple
from django.db.models import Model as DjangoModel from django.db.models import Model as DjangoModel
from infi.clickhouse_orm import engines as infi_engines from infi.clickhouse_orm import engines as infi_engines
@ -14,6 +15,9 @@ from .database import connections
from .utils import format_datetime from .utils import format_datetime
logger = logging.getLogger('django-clickhouse')
class InsertOnlyEngineMixin: class InsertOnlyEngineMixin:
def get_insert_batch(self, model_cls: Type[ClickHouseModel], objects: List[DjangoModel]) -> Iterable[tuple]: def get_insert_batch(self, model_cls: Type[ClickHouseModel], objects: List[DjangoModel]) -> Iterable[tuple]:
""" """
@ -45,30 +49,90 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
self.version_col = kwargs.pop('version_col', None) self.version_col = kwargs.pop('version_col', None)
super(CollapsingMergeTree, self).__init__(*args, **kwargs) super(CollapsingMergeTree, self).__init__(*args, **kwargs)
def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns): def _get_final_versions_by_version(self, db_alias: str, model_cls: Type[ClickHouseModel], object_pks: Iterable[str],
columns: str, date_range_filter: str = '') -> List[NamedTuple]:
"""
Performs request to ClickHouse in order to fetch latest version for each object pk
:param db_alias: ClickHouse database alias used
:param model_cls: Model class for which data is fetched
:param object_pks: Objects primary keys to filter by
:param columns: Columns to fetch
:param date_range_filter: Optional date_range_filter which speeds up query if date_col is set
"""
if date_range_filter:
date_range_filter = 'PREWHERE {}'.format(date_range_filter)
query = """ query = """
SELECT {columns} FROM $table WHERE (`{pk_column}`, `{version_col}`) IN ( SELECT {columns}
SELECT `{pk_column}`, MAX(`{version_col}`) FROM $table
FROM $table {date_range_filter}
PREWHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}' WHERE `{pk_column}` IN ({object_pks})
AND `{pk_column}` IN ({object_pks}) ORDER BY `{pk_column}`, `{version_col}` DESC
GROUP BY `{pk_column}` LIMIT 1 BY `{pk_column}`
) """.format(columns=','.join(columns), version_col=self.version_col, pk_column=self.pk_column,
""".format(columns=','.join(columns), version_col=self.version_col, date_col=date_col, pk_column=self.pk_column, date_range_filter=date_range_filter, object_pks=','.join(object_pks), sign_col=self.sign_col)
min_date=min_date, max_date=max_date, object_pks=','.join(object_pks))
return connections[db_alias].select_tuples(query, model_cls) return connections[db_alias].select_tuples(query, model_cls)
def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns): def _get_final_versions_by_final(self, db_alias: str, model_cls: Type[ClickHouseModel], object_pks: Iterable[str],
columns: str, date_range_filter: str = '') -> List[NamedTuple]:
"""
Performs request to ClickHouse in order to fetch latest version for each object pk
:param db_alias: ClickHouse database alias used
:param model_cls: Model class for which data is fetched
:param object_pks: Objects primary keys to filter by
:param columns: Columns to fetch
:param date_range_filter: Optional date_range_filter which speeds up query if date_col is set
"""
if date_range_filter:
date_range_filter += ' AND'
query = """ query = """
SELECT {columns} FROM $table FINAL SELECT {columns} FROM $table FINAL
WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}' WHERE {date_range_filter} `{pk_column}` IN ({object_pks})
AND `{pk_column}` IN ({object_pks})
""" """
query = query.format(columns=','.join(columns), date_col=date_col, pk_column=self.pk_column, min_date=min_date, query = query.format(columns=','.join(columns), pk_column=self.pk_column, date_range_filter=date_range_filter,
max_date=max_date, object_pks=','.join(object_pks)) object_pks=','.join(object_pks))
return connections[db_alias].select_tuples(query, model_cls) return connections[db_alias].select_tuples(query, model_cls)
def _get_date_rate_filter(self, objects, model_cls: Type[ClickHouseModel], db_alias: str,
date_col: Optional[str]) -> str:
"""
Generates
"""
def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str:
if isinstance(dt, datetime.datetime):
return format_datetime(dt, 0, db_alias=db_alias)
elif isinstance(dt, datetime.date):
return dt.isoformat()
else:
raise Exception('Invalid date or datetime object: `%s`' % dt)
date_col = date_col or self.date_col
if not date_col:
logger.warning('django-clickhouse: date_col is not provided for model %s.'
' This can cause significant performance problems while fetching data.'
' It is worth inheriting CollapsingMergeTree engine with custom get_final_versions() method,'
' based on your partition_key' % model_cls)
return ''
min_date, max_date = None, None
for obj in objects:
obj_date = getattr(obj, date_col)
if min_date is None or min_date > obj_date:
min_date = obj_date
if max_date is None or max_date < obj_date:
max_date = obj_date
min_date = _dt_to_str(min_date)
max_date = _dt_to_str(max_date)
return "`{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'".\
format(min_date=min_date, max_date=max_date, date_col=date_col)
def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel], def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel],
date_col: Optional[str] = None) -> Iterable[tuple]: date_col: Optional[str] = None) -> Iterable[tuple]:
""" """
@ -81,42 +145,21 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
:param date_col: Optional column name, where partiion date is hold. Defaults to self.date_col :param date_col: Optional column name, where partiion date is hold. Defaults to self.date_col
:return: A generator of named tuples, representing previous state :return: A generator of named tuples, representing previous state
""" """
def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str:
if isinstance(dt, datetime.datetime):
return format_datetime(dt, 0, db_alias=db_alias)
elif isinstance(dt, datetime.date):
return dt.isoformat()
else:
raise Exception('Invalid date or datetime object: `%s`' % dt)
if not objects: if not objects:
raise StopIteration() raise StopIteration()
date_col = date_col or self.date_col
min_date, max_date = None, None
for obj in objects:
obj_date = getattr(obj, date_col)
if min_date is None or min_date > obj_date:
min_date = obj_date
if max_date is None or max_date < obj_date:
max_date = obj_date
object_pks = [str(getattr(obj, self.pk_column)) for obj in objects] object_pks = [str(getattr(obj, self.pk_column)) for obj in objects]
db_alias = model_cls.get_database_alias() db_alias = model_cls.get_database_alias()
min_date = _dt_to_str(min_date) date_range_filter = self._get_date_rate_filter(objects, model_cls, db_alias, date_col)
max_date = _dt_to_str(max_date)
# Get fields. Sign is replaced to negative for further processing # Get fields. Sign is replaced to negative for further processing
columns = list(model_cls.fields(writable=True).keys()) columns = list(model_cls.fields(writable=True).keys())
columns.remove(self.sign_col) columns.remove(self.sign_col)
columns.append('-1 AS sign') columns.append('-1 AS sign')
params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns) params = (db_alias, model_cls, object_pks, columns, date_range_filter)
if self.version_col: if self.version_col:
return self._get_final_versions_by_version(*params) return self._get_final_versions_by_version(*params)

View File

@ -88,12 +88,25 @@ class CollapsingMergeTreeTest(TestCase):
self.objects, date_col='created') self.objects, date_col='created')
self._test_final_versions(final_versions) self._test_final_versions(final_versions)
def test_get_final_versions_by_final_no_date_col(self):
ClickHouseCollapseTestModel.engine.date_col = None
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects)
self._test_final_versions(final_versions)
def test_get_final_versions_by_version_datetime(self): def test_get_final_versions_by_version_datetime(self):
ClickHouseCollapseTestModel.engine.version_col = 'version' ClickHouseCollapseTestModel.engine.version_col = 'version'
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects, date_col='created') self.objects, date_col='created')
self._test_final_versions(final_versions) self._test_final_versions(final_versions)
def test_get_final_versions_by_version_no_date_col(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
ClickHouseCollapseTestModel.engine.date_col = None
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects)
self._test_final_versions(final_versions)
def test_versions(self): def test_versions(self):
ClickHouseCollapseTestModel.engine.version_col = 'version' ClickHouseCollapseTestModel.engine.version_col = 'version'
batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects) batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects)