mirror of
https://github.com/carrotquest/django-clickhouse.git
synced 2024-11-24 10:03:46 +03:00
get_final_versions fixes (#38)
1. Fixed problem when get_final_version() could return 2 rows instead of 1 and insert multiple sign < 0 rows with single sign > 0 2. Fixed problem from PR https://github.com/carrotquest/django-clickhouse/pull/37, when date_col is not set for ClickHouseModel
This commit is contained in:
parent
12069db14e
commit
0e65f15333
2
setup.py
2
setup.py
|
@ -13,7 +13,7 @@ with open('requirements.txt') as f:
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name='django-clickhouse',
|
name='django-clickhouse',
|
||||||
version='1.1.1',
|
version='1.1.2',
|
||||||
packages=['django_clickhouse', 'django_clickhouse.management.commands'],
|
packages=['django_clickhouse', 'django_clickhouse.management.commands'],
|
||||||
package_dir={'': 'src'},
|
package_dir={'': 'src'},
|
||||||
url='https://github.com/carrotquest/django-clickhouse',
|
url='https://github.com/carrotquest/django-clickhouse',
|
||||||
|
|
|
@ -2,7 +2,8 @@
|
||||||
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
|
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
|
||||||
"""
|
"""
|
||||||
import datetime
|
import datetime
|
||||||
from typing import List, Type, Union, Iterable, Optional
|
import logging
|
||||||
|
from typing import List, Type, Union, Iterable, Optional, Tuple, NamedTuple
|
||||||
|
|
||||||
from django.db.models import Model as DjangoModel
|
from django.db.models import Model as DjangoModel
|
||||||
from infi.clickhouse_orm import engines as infi_engines
|
from infi.clickhouse_orm import engines as infi_engines
|
||||||
|
@ -14,6 +15,9 @@ from .database import connections
|
||||||
from .utils import format_datetime
|
from .utils import format_datetime
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger('django-clickhouse')
|
||||||
|
|
||||||
|
|
||||||
class InsertOnlyEngineMixin:
|
class InsertOnlyEngineMixin:
|
||||||
def get_insert_batch(self, model_cls: Type[ClickHouseModel], objects: List[DjangoModel]) -> Iterable[tuple]:
|
def get_insert_batch(self, model_cls: Type[ClickHouseModel], objects: List[DjangoModel]) -> Iterable[tuple]:
|
||||||
"""
|
"""
|
||||||
|
@ -45,43 +49,64 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
|
||||||
self.version_col = kwargs.pop('version_col', None)
|
self.version_col = kwargs.pop('version_col', None)
|
||||||
super(CollapsingMergeTree, self).__init__(*args, **kwargs)
|
super(CollapsingMergeTree, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns):
|
def _get_final_versions_by_version(self, db_alias: str, model_cls: Type[ClickHouseModel], object_pks: Iterable[str],
|
||||||
|
columns: str, date_range_filter: str = '') -> List[NamedTuple]:
|
||||||
|
"""
|
||||||
|
Performs request to ClickHouse in order to fetch latest version for each object pk
|
||||||
|
:param db_alias: ClickHouse database alias used
|
||||||
|
:param model_cls: Model class for which data is fetched
|
||||||
|
:param object_pks: Objects primary keys to filter by
|
||||||
|
:param columns: Columns to fetch
|
||||||
|
:param date_range_filter: Optional date_range_filter which speeds up query if date_col is set
|
||||||
|
:return: List of named tuples with requested columns
|
||||||
|
"""
|
||||||
|
if date_range_filter:
|
||||||
|
date_range_filter = 'PREWHERE {}'.format(date_range_filter)
|
||||||
|
|
||||||
query = """
|
query = """
|
||||||
SELECT {columns} FROM $table WHERE (`{pk_column}`, `{version_col}`) IN (
|
SELECT {columns}
|
||||||
SELECT `{pk_column}`, MAX(`{version_col}`)
|
FROM $table
|
||||||
FROM $table
|
{date_range_filter}
|
||||||
PREWHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
|
WHERE `{pk_column}` IN ({object_pks})
|
||||||
AND `{pk_column}` IN ({object_pks})
|
ORDER BY `{pk_column}`, `{version_col}` DESC
|
||||||
GROUP BY `{pk_column}`
|
LIMIT 1 BY `{pk_column}`
|
||||||
)
|
""".format(columns=','.join(columns), version_col=self.version_col, pk_column=self.pk_column,
|
||||||
""".format(columns=','.join(columns), version_col=self.version_col, date_col=date_col, pk_column=self.pk_column,
|
date_range_filter=date_range_filter, object_pks=','.join(object_pks), sign_col=self.sign_col)
|
||||||
min_date=min_date, max_date=max_date, object_pks=','.join(object_pks))
|
|
||||||
|
|
||||||
return connections[db_alias].select_tuples(query, model_cls)
|
return connections[db_alias].select_tuples(query, model_cls)
|
||||||
|
|
||||||
def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns):
|
def _get_final_versions_by_final(self, db_alias: str, model_cls: Type[ClickHouseModel], object_pks: Iterable[str],
|
||||||
|
columns: str, date_range_filter: str = '') -> List[NamedTuple]:
|
||||||
|
"""
|
||||||
|
Performs request to ClickHouse in order to fetch latest version for each object pk
|
||||||
|
:param db_alias: ClickHouse database alias used
|
||||||
|
:param model_cls: Model class for which data is fetched
|
||||||
|
:param object_pks: Objects primary keys to filter by
|
||||||
|
:param columns: Columns to fetch
|
||||||
|
:param date_range_filter: Optional date_range_filter which speeds up query if date_col is set
|
||||||
|
:return: List of named tuples with requested columns
|
||||||
|
"""
|
||||||
|
if date_range_filter:
|
||||||
|
date_range_filter += ' AND'
|
||||||
|
|
||||||
query = """
|
query = """
|
||||||
SELECT {columns} FROM $table FINAL
|
SELECT {columns} FROM $table FINAL
|
||||||
WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
|
WHERE {date_range_filter} `{pk_column}` IN ({object_pks})
|
||||||
AND `{pk_column}` IN ({object_pks})
|
|
||||||
"""
|
"""
|
||||||
query = query.format(columns=','.join(columns), date_col=date_col, pk_column=self.pk_column, min_date=min_date,
|
query = query.format(columns=','.join(columns), pk_column=self.pk_column, date_range_filter=date_range_filter,
|
||||||
max_date=max_date, object_pks=','.join(object_pks))
|
object_pks=','.join(object_pks))
|
||||||
return connections[db_alias].select_tuples(query, model_cls)
|
return connections[db_alias].select_tuples(query, model_cls)
|
||||||
|
|
||||||
def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel],
|
def _get_date_rate_filter(self, objects, model_cls: Type[ClickHouseModel], db_alias: str,
|
||||||
date_col: Optional[str] = None) -> Iterable[tuple]:
|
date_col: Optional[str]) -> str:
|
||||||
"""
|
"""
|
||||||
Get objects, that are currently stored in ClickHouse.
|
Generates datetime filter to speed up final queries, if date_col is present
|
||||||
Depending on the partition key this can be different for different models.
|
:param objects: Objects, which are inserted
|
||||||
In common case, this method is optimized for date field that doesn't change.
|
:param model_cls: Model class for which data is fetched
|
||||||
It also supposes primary key to by self.pk_column
|
:param db_alias: ClickHouse database alias used
|
||||||
:param model_cls: ClickHouseModel subclass to import
|
:param date_col: Optional column name, where partition date is hold. Defaults to self.date_col
|
||||||
:param objects: Objects for which final versions are searched
|
:return: String to add to WHERE or PREWHERE query section
|
||||||
:param date_col: Optional column name, where partiion date is hold. Defaults to self.date_col
|
|
||||||
:return: A generator of named tuples, representing previous state
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str:
|
def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str:
|
||||||
if isinstance(dt, datetime.datetime):
|
if isinstance(dt, datetime.datetime):
|
||||||
return format_datetime(dt, 0, db_alias=db_alias)
|
return format_datetime(dt, 0, db_alias=db_alias)
|
||||||
|
@ -90,10 +115,15 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
|
||||||
else:
|
else:
|
||||||
raise Exception('Invalid date or datetime object: `%s`' % dt)
|
raise Exception('Invalid date or datetime object: `%s`' % dt)
|
||||||
|
|
||||||
if not objects:
|
|
||||||
raise StopIteration()
|
|
||||||
|
|
||||||
date_col = date_col or self.date_col
|
date_col = date_col or self.date_col
|
||||||
|
|
||||||
|
if not date_col:
|
||||||
|
logger.warning('django-clickhouse: date_col is not provided for model %s.'
|
||||||
|
' This can cause significant performance problems while fetching data.'
|
||||||
|
' It is worth inheriting CollapsingMergeTree engine with custom get_final_versions() method,'
|
||||||
|
' based on your partition_key' % model_cls)
|
||||||
|
return ''
|
||||||
|
|
||||||
min_date, max_date = None, None
|
min_date, max_date = None, None
|
||||||
for obj in objects:
|
for obj in objects:
|
||||||
obj_date = getattr(obj, date_col)
|
obj_date = getattr(obj, date_col)
|
||||||
|
@ -104,19 +134,39 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
|
||||||
if max_date is None or max_date < obj_date:
|
if max_date is None or max_date < obj_date:
|
||||||
max_date = obj_date
|
max_date = obj_date
|
||||||
|
|
||||||
|
min_date = _dt_to_str(min_date)
|
||||||
|
max_date = _dt_to_str(max_date)
|
||||||
|
|
||||||
|
return "`{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'".\
|
||||||
|
format(min_date=min_date, max_date=max_date, date_col=date_col)
|
||||||
|
|
||||||
|
def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel],
|
||||||
|
date_col: Optional[str] = None) -> Iterable[tuple]:
|
||||||
|
"""
|
||||||
|
Get objects, that are currently stored in ClickHouse.
|
||||||
|
Depending on the partition key this can be different for different models.
|
||||||
|
In common case, this method is optimized for date field that doesn't change.
|
||||||
|
It also supposes primary key to by self.pk_column
|
||||||
|
:param model_cls: ClickHouseModel subclass to import
|
||||||
|
:param objects: Objects for which final versions are searched
|
||||||
|
:param date_col: Optional column name, where partition date is hold. Defaults to self.date_col
|
||||||
|
:return: A generator of named tuples, representing previous state
|
||||||
|
"""
|
||||||
|
if not objects:
|
||||||
|
raise StopIteration()
|
||||||
|
|
||||||
object_pks = [str(getattr(obj, self.pk_column)) for obj in objects]
|
object_pks = [str(getattr(obj, self.pk_column)) for obj in objects]
|
||||||
|
|
||||||
db_alias = model_cls.get_database_alias()
|
db_alias = model_cls.get_database_alias()
|
||||||
|
|
||||||
min_date = _dt_to_str(min_date)
|
date_range_filter = self._get_date_rate_filter(objects, model_cls, db_alias, date_col)
|
||||||
max_date = _dt_to_str(max_date)
|
|
||||||
|
|
||||||
# Get fields. Sign is replaced to negative for further processing
|
# Get fields. Sign is replaced to negative for further processing
|
||||||
columns = list(model_cls.fields(writable=True).keys())
|
columns = list(model_cls.fields(writable=True).keys())
|
||||||
columns.remove(self.sign_col)
|
columns.remove(self.sign_col)
|
||||||
columns.append('-1 AS sign')
|
columns.append('-1 AS sign')
|
||||||
|
|
||||||
params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns)
|
params = (db_alias, model_cls, object_pks, columns, date_range_filter)
|
||||||
|
|
||||||
if self.version_col:
|
if self.version_col:
|
||||||
return self._get_final_versions_by_version(*params)
|
return self._get_final_versions_by_version(*params)
|
||||||
|
|
|
@ -88,12 +88,25 @@ class CollapsingMergeTreeTest(TestCase):
|
||||||
self.objects, date_col='created')
|
self.objects, date_col='created')
|
||||||
self._test_final_versions(final_versions)
|
self._test_final_versions(final_versions)
|
||||||
|
|
||||||
|
def test_get_final_versions_by_final_no_date_col(self):
|
||||||
|
ClickHouseCollapseTestModel.engine.date_col = None
|
||||||
|
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
|
||||||
|
self.objects)
|
||||||
|
self._test_final_versions(final_versions)
|
||||||
|
|
||||||
def test_get_final_versions_by_version_datetime(self):
|
def test_get_final_versions_by_version_datetime(self):
|
||||||
ClickHouseCollapseTestModel.engine.version_col = 'version'
|
ClickHouseCollapseTestModel.engine.version_col = 'version'
|
||||||
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
|
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
|
||||||
self.objects, date_col='created')
|
self.objects, date_col='created')
|
||||||
self._test_final_versions(final_versions)
|
self._test_final_versions(final_versions)
|
||||||
|
|
||||||
|
def test_get_final_versions_by_version_no_date_col(self):
|
||||||
|
ClickHouseCollapseTestModel.engine.version_col = 'version'
|
||||||
|
ClickHouseCollapseTestModel.engine.date_col = None
|
||||||
|
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
|
||||||
|
self.objects)
|
||||||
|
self._test_final_versions(final_versions)
|
||||||
|
|
||||||
def test_versions(self):
|
def test_versions(self):
|
||||||
ClickHouseCollapseTestModel.engine.version_col = 'version'
|
ClickHouseCollapseTestModel.engine.version_col = 'version'
|
||||||
batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects)
|
batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user