From eb15dd65ec4d6d9b85adaa4b632d03e2b55c9e9f Mon Sep 17 00:00:00 2001 From: M1ha Date: Tue, 24 Apr 2018 15:19:05 +0500 Subject: [PATCH] Added custom partitioning to all MergeTree family engines --- src/infi/clickhouse_orm/engines.py | 24 ++++++++++++++---------- tests/test_engines.py | 21 +++++++++++++++++++-- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index b1aa8f7..1acd73d 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -115,9 +115,10 @@ class MergeTree(Engine): class CollapsingMergeTree(MergeTree): - def __init__(self, date_col, order_by, sign_col, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None): - super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name) + def __init__(self, date_col=None, order_by=(), sign_col='sign', sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, + replica_table_path, replica_name, partition_key) self.sign_col = sign_col def _build_sql_params(self, db): @@ -128,9 +129,10 @@ class CollapsingMergeTree(MergeTree): class SummingMergeTree(MergeTree): - def __init__(self, date_col, order_by, summing_cols=None, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None): - super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name) + def __init__(self, date_col=None, order_by=(), summing_cols=None, sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, + replica_name, partition_key) assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple' self.summing_cols = summing_cols @@ -143,9 +145,10 @@ class SummingMergeTree(MergeTree): class ReplacingMergeTree(MergeTree): - def __init__(self, date_col, order_by, ver_col=None, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None): - super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name) + def __init__(self, date_col=None, order_by=(), ver_col=None, sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, + replica_table_path, replica_name, partition_key) self.ver_col = ver_col def _build_sql_params(self, db): @@ -163,7 +166,8 @@ class Buffer(Engine): """ #Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes) - def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): + def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, + min_bytes=10000000, max_bytes=100000000): self.main_model = main_model self.num_layers = num_layers self.min_time = min_time diff --git a/tests/test_engines.py b/tests/test_engines.py index 7bc4e26..88266cf 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -137,10 +137,27 @@ class EnginesTestCase(_EnginesHelperTestCase): order_by=('date', 'event_id', 'event_group'), partition_key=('toYYYYMM(date)', 'event_group') ) + + class TestCollapseModel(SampleModel): + sign = Int8Field() + + engine = CollapsingMergeTree( + sign_col='sign', + order_by=('date', 'event_id', 'event_group'), + partition_key=('toYYYYMM(date)', 'event_group') + ) + self._create_and_insert(TestModel) - parts = list(SystemPart.get(self.database)) - self.assertEqual(1, len(parts)) + self._create_and_insert(TestCollapseModel) + + # Result order may be different, lets sort manually + parts = sorted(list(SystemPart.get(self.database)), key=lambda x: x.table) + + self.assertEqual(2, len(parts)) + self.assertEqual('testcollapsemodel', parts[0].table) self.assertEqual('(201701, 13)', parts[0].partition) + self.assertEqual('testmodel', parts[1].table) + self.assertEqual('(201701, 13)', parts[1].partition) class SampleModel(Model):