diff --git a/docs/class_reference.md b/docs/class_reference.md index e98e33b..d2e600e 100644 --- a/docs/class_reference.md +++ b/docs/class_reference.md @@ -737,7 +737,7 @@ Extends Engine Extends Engine -#### MergeTree(date_col=None, order_by=(), sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) +#### MergeTree(date_col=None, order_by=(), sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None, primary_key=None) ### Buffer @@ -793,21 +793,21 @@ straightly into Distributed table, optional Extends MergeTree -#### CollapsingMergeTree(date_col=None, order_by=(), sign_col="sign", sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) +#### CollapsingMergeTree(date_col=None, order_by=(), sign_col="sign", sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None, primary_key=None) ### SummingMergeTree Extends MergeTree -#### SummingMergeTree(date_col=None, order_by=(), summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) +#### SummingMergeTree(date_col=None, order_by=(), summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None, primary_key=None) ### ReplacingMergeTree Extends MergeTree -#### ReplacingMergeTree(date_col=None, order_by=(), ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) +#### ReplacingMergeTree(date_col=None, order_by=(), ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None, primary_key=None) infi.clickhouse_orm.query diff --git a/docs/table_engines.md b/docs/table_engines.md index d4ba905..b71e383 100644 --- a/docs/table_engines.md +++ b/docs/table_engines.md @@ -73,6 +73,19 @@ Example: partition_key=('toYYYYMM(EventDate)', 'BannerID')) +### Primary key +ClickHouse supports [custom primary key](https://clickhouse.yandex/docs/en/operations/table_engines/mergetree/#primary-keys-and-indexes-in-queries/) expressions since version 1.1.54310 + +You can use custom primary key with any `MergeTree` family engine. +To set custom partitioning add `primary_key` parameter. It should be a tuple of expressions, by which partitions are built. + +By default primary key is equal to order_by expression + +Example: + + engine = engines.ReplacingMergeTree(order_by=('OrderID', 'EventDate', 'BannerID'), ver_col='Version', + partition_key=('toYYYYMM(EventDate)', 'BannerID'), primary_key=('OrderID',)) + ### Data Replication Any of the above engines can be converted to a replicated engine (e.g. `ReplicatedMergeTree`) by adding two parameters, `replica_table_path` and `replica_name`: diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index e38bfcf..3fb8ed6 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -35,8 +35,10 @@ class Memory(Engine): class MergeTree(Engine): def __init__(self, date_col=None, order_by=(), sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None, + primary_key=None): assert type(order_by) in (list, tuple), 'order_by must be a list or tuple' + assert primary_key is None or type(primary_key) in (list, tuple), 'primary_key must be a list or tuple' assert date_col is None or isinstance(date_col, six.string_types), 'date_col must be string if present' assert partition_key is None or type(partition_key) in (list, tuple),\ 'partition_key must be tuple or list if present' @@ -48,6 +50,7 @@ class MergeTree(Engine): assert date_col or partition_key, "You must set either date_col or partition_key" self.date_col = date_col self.partition_key = partition_key if partition_key else ('toYYYYMM(`%s`)' % date_col,) + self.primary_key = primary_key self.order_by = order_by self.sampling_expr = sampling_expr @@ -78,6 +81,9 @@ class MergeTree(Engine): partition_sql = "PARTITION BY %s ORDER BY %s" \ % ('(%s)' % comma_join(self.partition_key), '(%s)' % comma_join(self.order_by)) + if self.primary_key: + partition_sql += " PRIMARY KEY (%s)" % comma_join(self.primary_key) + if self.sampling_expr: partition_sql += " SAMPLE BY %s" % self.sampling_expr @@ -117,9 +123,10 @@ class MergeTree(Engine): class CollapsingMergeTree(MergeTree): def __init__(self, date_col=None, order_by=(), sign_col='sign', sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None, + primary_key=None): super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, - replica_table_path, replica_name, partition_key) + replica_table_path, replica_name, partition_key, primary_key) self.sign_col = sign_col def _build_sql_params(self, db): @@ -131,9 +138,10 @@ class CollapsingMergeTree(MergeTree): class SummingMergeTree(MergeTree): def __init__(self, date_col=None, order_by=(), summing_cols=None, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None, + primary_key=None): super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, - replica_name, partition_key) + replica_name, partition_key, primary_key) assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple' self.summing_cols = summing_cols @@ -147,9 +155,10 @@ class SummingMergeTree(MergeTree): class ReplacingMergeTree(MergeTree): def __init__(self, date_col=None, order_by=(), ver_col=None, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None, + primary_key=None): super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, - replica_table_path, replica_name, partition_key) + replica_table_path, replica_name, partition_key, primary_key) self.ver_col = ver_col def _build_sql_params(self, db): diff --git a/tests/test_engines.py b/tests/test_engines.py index a05d359..2d874a9 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -165,6 +165,29 @@ class EnginesTestCase(_EnginesHelperTestCase): self.assertEqual('testmodel', parts[1].table) self.assertEqual('(201701, 13)'.replace(' ', ''), parts[1].partition.replace(' ', '')) + def test_custom_primary_key(self): + class TestModel(SampleModel): + engine = MergeTree( + order_by=('date', 'event_id', 'event_group'), + partition_key=('toYYYYMM(date)',), + primary_key=('date', 'event_id') + ) + + class TestCollapseModel(SampleModel): + sign = Int8Field() + + engine = CollapsingMergeTree( + sign_col='sign', + order_by=('date', 'event_id', 'event_group'), + partition_key=('toYYYYMM(date)',), + primary_key=('date', 'event_id') + ) + + self._create_and_insert(TestModel) + self._create_and_insert(TestCollapseModel) + + self.assertEqual(2, len(list(SystemPart.get(self.database)))) + class SampleModel(Model):