Merge branch 'primary-key-issue-114' of https://github.com/carrotquest/infi.clickhouse_orm into carrotquest-primary-key-issue-114

This commit is contained in:
Itai Shirav 2020-05-01 16:31:43 +03:00
commit 669a40dd5e
4 changed files with 56 additions and 11 deletions

View File

@ -737,7 +737,7 @@ Extends Engine
Extends Engine Extends Engine
#### MergeTree(date_col=None, order_by=(), sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) #### MergeTree(date_col=None, order_by=(), sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None, primary_key=None)
### Buffer ### Buffer
@ -793,21 +793,21 @@ straightly into Distributed table, optional
Extends MergeTree Extends MergeTree
#### CollapsingMergeTree(date_col=None, order_by=(), sign_col="sign", sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) #### CollapsingMergeTree(date_col=None, order_by=(), sign_col="sign", sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None, primary_key=None)
### SummingMergeTree ### SummingMergeTree
Extends MergeTree Extends MergeTree
#### SummingMergeTree(date_col=None, order_by=(), summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) #### SummingMergeTree(date_col=None, order_by=(), summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None, primary_key=None)
### ReplacingMergeTree ### ReplacingMergeTree
Extends MergeTree Extends MergeTree
#### ReplacingMergeTree(date_col=None, order_by=(), ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) #### ReplacingMergeTree(date_col=None, order_by=(), ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None, primary_key=None)
infi.clickhouse_orm.query infi.clickhouse_orm.query

View File

@ -73,6 +73,19 @@ Example:
partition_key=('toYYYYMM(EventDate)', 'BannerID')) partition_key=('toYYYYMM(EventDate)', 'BannerID'))
### Primary key
ClickHouse supports [custom primary key](https://clickhouse.yandex/docs/en/operations/table_engines/mergetree/#primary-keys-and-indexes-in-queries/) expressions since version 1.1.54310
You can use custom primary key with any `MergeTree` family engine.
To set custom partitioning add `primary_key` parameter. It should be a tuple of expressions, by which partitions are built.
By default primary key is equal to order_by expression
Example:
engine = engines.ReplacingMergeTree(order_by=('OrderID', 'EventDate', 'BannerID'), ver_col='Version',
partition_key=('toYYYYMM(EventDate)', 'BannerID'), primary_key=('OrderID',))
### Data Replication ### Data Replication
Any of the above engines can be converted to a replicated engine (e.g. `ReplicatedMergeTree`) by adding two parameters, `replica_table_path` and `replica_name`: Any of the above engines can be converted to a replicated engine (e.g. `ReplicatedMergeTree`) by adding two parameters, `replica_table_path` and `replica_name`:

View File

@ -35,8 +35,10 @@ class Memory(Engine):
class MergeTree(Engine): class MergeTree(Engine):
def __init__(self, date_col=None, order_by=(), sampling_expr=None, def __init__(self, date_col=None, order_by=(), sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None,
primary_key=None):
assert type(order_by) in (list, tuple), 'order_by must be a list or tuple' assert type(order_by) in (list, tuple), 'order_by must be a list or tuple'
assert primary_key is None or type(primary_key) in (list, tuple), 'primary_key must be a list or tuple'
assert date_col is None or isinstance(date_col, six.string_types), 'date_col must be string if present' assert date_col is None or isinstance(date_col, six.string_types), 'date_col must be string if present'
assert partition_key is None or type(partition_key) in (list, tuple),\ assert partition_key is None or type(partition_key) in (list, tuple),\
'partition_key must be tuple or list if present' 'partition_key must be tuple or list if present'
@ -48,6 +50,7 @@ class MergeTree(Engine):
assert date_col or partition_key, "You must set either date_col or partition_key" assert date_col or partition_key, "You must set either date_col or partition_key"
self.date_col = date_col self.date_col = date_col
self.partition_key = partition_key if partition_key else ('toYYYYMM(`%s`)' % date_col,) self.partition_key = partition_key if partition_key else ('toYYYYMM(`%s`)' % date_col,)
self.primary_key = primary_key
self.order_by = order_by self.order_by = order_by
self.sampling_expr = sampling_expr self.sampling_expr = sampling_expr
@ -78,6 +81,9 @@ class MergeTree(Engine):
partition_sql = "PARTITION BY %s ORDER BY %s" \ partition_sql = "PARTITION BY %s ORDER BY %s" \
% ('(%s)' % comma_join(self.partition_key), '(%s)' % comma_join(self.order_by)) % ('(%s)' % comma_join(self.partition_key), '(%s)' % comma_join(self.order_by))
if self.primary_key:
partition_sql += " PRIMARY KEY (%s)" % comma_join(self.primary_key)
if self.sampling_expr: if self.sampling_expr:
partition_sql += " SAMPLE BY %s" % self.sampling_expr partition_sql += " SAMPLE BY %s" % self.sampling_expr
@ -117,9 +123,10 @@ class MergeTree(Engine):
class CollapsingMergeTree(MergeTree): class CollapsingMergeTree(MergeTree):
def __init__(self, date_col=None, order_by=(), sign_col='sign', sampling_expr=None, def __init__(self, date_col=None, order_by=(), sign_col='sign', sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None,
primary_key=None):
super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity,
replica_table_path, replica_name, partition_key) replica_table_path, replica_name, partition_key, primary_key)
self.sign_col = sign_col self.sign_col = sign_col
def _build_sql_params(self, db): def _build_sql_params(self, db):
@ -131,9 +138,10 @@ class CollapsingMergeTree(MergeTree):
class SummingMergeTree(MergeTree): class SummingMergeTree(MergeTree):
def __init__(self, date_col=None, order_by=(), summing_cols=None, sampling_expr=None, def __init__(self, date_col=None, order_by=(), summing_cols=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None,
primary_key=None):
super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path,
replica_name, partition_key) replica_name, partition_key, primary_key)
assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple' assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple'
self.summing_cols = summing_cols self.summing_cols = summing_cols
@ -147,9 +155,10 @@ class SummingMergeTree(MergeTree):
class ReplacingMergeTree(MergeTree): class ReplacingMergeTree(MergeTree):
def __init__(self, date_col=None, order_by=(), ver_col=None, sampling_expr=None, def __init__(self, date_col=None, order_by=(), ver_col=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None,
primary_key=None):
super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity,
replica_table_path, replica_name, partition_key) replica_table_path, replica_name, partition_key, primary_key)
self.ver_col = ver_col self.ver_col = ver_col
def _build_sql_params(self, db): def _build_sql_params(self, db):

View File

@ -165,6 +165,29 @@ class EnginesTestCase(_EnginesHelperTestCase):
self.assertEqual('testmodel', parts[1].table) self.assertEqual('testmodel', parts[1].table)
self.assertEqual('(201701, 13)'.replace(' ', ''), parts[1].partition.replace(' ', '')) self.assertEqual('(201701, 13)'.replace(' ', ''), parts[1].partition.replace(' ', ''))
def test_custom_primary_key(self):
class TestModel(SampleModel):
engine = MergeTree(
order_by=('date', 'event_id', 'event_group'),
partition_key=('toYYYYMM(date)',),
primary_key=('date', 'event_id')
)
class TestCollapseModel(SampleModel):
sign = Int8Field()
engine = CollapsingMergeTree(
sign_col='sign',
order_by=('date', 'event_id', 'event_group'),
partition_key=('toYYYYMM(date)',),
primary_key=('date', 'event_id')
)
self._create_and_insert(TestModel)
self._create_and_insert(TestCollapseModel)
self.assertEqual(2, len(list(SystemPart.get(self.database))))
class SampleModel(Model): class SampleModel(Model):