Added custom partitioning to all MergeTree family engines

This commit is contained in:
M1ha 2018-04-24 15:19:05 +05:00
parent e4f0c5b1dd
commit eb15dd65ec
2 changed files with 33 additions and 12 deletions

View File

@ -115,9 +115,10 @@ class MergeTree(Engine):
class CollapsingMergeTree(MergeTree):
def __init__(self, date_col, order_by, sign_col, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None):
super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name)
def __init__(self, date_col=None, order_by=(), sign_col='sign', sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None):
super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity,
replica_table_path, replica_name, partition_key)
self.sign_col = sign_col
def _build_sql_params(self, db):
@ -128,9 +129,10 @@ class CollapsingMergeTree(MergeTree):
class SummingMergeTree(MergeTree):
def __init__(self, date_col, order_by, summing_cols=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None):
super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name)
def __init__(self, date_col=None, order_by=(), summing_cols=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None):
super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path,
replica_name, partition_key)
assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple'
self.summing_cols = summing_cols
@ -143,9 +145,10 @@ class SummingMergeTree(MergeTree):
class ReplacingMergeTree(MergeTree):
def __init__(self, date_col, order_by, ver_col=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None):
super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name)
def __init__(self, date_col=None, order_by=(), ver_col=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None):
super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity,
replica_table_path, replica_name, partition_key)
self.ver_col = ver_col
def _build_sql_params(self, db):
@ -163,7 +166,8 @@ class Buffer(Engine):
"""
#Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000):
def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000,
min_bytes=10000000, max_bytes=100000000):
self.main_model = main_model
self.num_layers = num_layers
self.min_time = min_time

View File

@ -137,10 +137,27 @@ class EnginesTestCase(_EnginesHelperTestCase):
order_by=('date', 'event_id', 'event_group'),
partition_key=('toYYYYMM(date)', 'event_group')
)
class TestCollapseModel(SampleModel):
sign = Int8Field()
engine = CollapsingMergeTree(
sign_col='sign',
order_by=('date', 'event_id', 'event_group'),
partition_key=('toYYYYMM(date)', 'event_group')
)
self._create_and_insert(TestModel)
parts = list(SystemPart.get(self.database))
self.assertEqual(1, len(parts))
self._create_and_insert(TestCollapseModel)
# Result order may be different, lets sort manually
parts = sorted(list(SystemPart.get(self.database)), key=lambda x: x.table)
self.assertEqual(2, len(parts))
self.assertEqual('testcollapsemodel', parts[0].table)
self.assertEqual('(201701, 13)', parts[0].partition)
self.assertEqual('testmodel', parts[1].table)
self.assertEqual('(201701, 13)', parts[1].partition)
class SampleModel(Model):