diff --git a/docs/class_reference.md b/docs/class_reference.md index 7bb9e55..254b282 100644 --- a/docs/class_reference.md +++ b/docs/class_reference.md @@ -24,6 +24,18 @@ created on the ClickHouse server if it does not already exist. - `autocreate`: automatically create the database if does not exist (unless in readonly mode). +#### server_timezone + + +Contains [pytz](http://pytz.sourceforge.net/) timezone used on database server + + +#### server_version + + +Contains a version tuple of database server, for example (1, 1, 54310) + + #### count(model_class, conditions=None) @@ -144,13 +156,13 @@ invalid values will cause a `ValueError` to be raised. Unrecognized field names will cause an `AttributeError`. -#### Model.create_table_sql(db_name) +#### Model.create_table_sql(db) Returns the SQL command for creating a table for this model. -#### Model.drop_table_sql(db_name) +#### Model.drop_table_sql(db) Returns the SQL command for deleting this model's table. @@ -233,13 +245,13 @@ invalid values will cause a `ValueError` to be raised. Unrecognized field names will cause an `AttributeError`. -#### BufferModel.create_table_sql(db_name) +#### BufferModel.create_table_sql(db) Returns the SQL command for creating a table for this model. -#### BufferModel.drop_table_sql(db_name) +#### BufferModel.drop_table_sql(db) Returns the SQL command for deleting this model's table. @@ -497,7 +509,7 @@ Extends Engine Extends Engine -#### MergeTree(date_col, key_cols, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) +#### MergeTree(date_col=None, order_by=(), sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) ### Buffer diff --git a/docs/ref.md b/docs/ref.md index f789578..a298d04 100644 --- a/docs/ref.md +++ b/docs/ref.md @@ -119,12 +119,12 @@ invalid values will cause a `ValueError` to be raised. Unrecognized field names will cause an `AttributeError`. -#### Model.create_table_sql(db_name) +#### Model.create_table_sql(db) Returns the SQL command for creating a table for this model. -#### Model.drop_table_sql(db_name) +#### Model.drop_table_sql(db) Returns the SQL command for deleting this model's table. @@ -197,12 +197,12 @@ invalid values will cause a `ValueError` to be raised. Unrecognized field names will cause an `AttributeError`. -#### BufferModel.create_table_sql(db_name) +#### BufferModel.create_table_sql(db) Returns the SQL command for creating a table for this model. -#### BufferModel.drop_table_sql(db_name) +#### BufferModel.drop_table_sql(db) Returns the SQL command for deleting this model's table. diff --git a/docs/table_engines.md b/docs/table_engines.md index 30aa07b..cf5b0c1 100644 --- a/docs/table_engines.md +++ b/docs/table_engines.md @@ -54,6 +54,23 @@ For a `ReplacingMergeTree` you can optionally specify the version column: engine = engines.ReplacingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), ver_col='Version') +### Custom partitioning + +ClickHouse supports [custom partitioning](https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/) expressions since version 1.1.54310 +You can use custom partitioning with any MergeTree family engine. +To set custom partitioning: +* skip date_col (first) constructor parameter or fill it with None value +* add name to order_by (second) constructor parameter +* add partition_key parameter. It should be a tuple of expressions, by which partition are built. + +Standard partitioning by date column can be added using toYYYYMM(date) function. + +Example: + + engine = engines.ReplacingMergeTree(order_by=('OrderID', 'EventDate', 'BannerID'), ver_col='Version', + partition_key=('toYYYYMM(EventDate)', 'BannerID')) + + ### Data Replication Any of the above engines can be converted to a replicated engine (e.g. `ReplicatedMergeTree`) by adding two parameters, `replica_table_path` and `replica_name`: diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index cb47d02..0777714 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -56,6 +56,7 @@ class Database(object): self.db_exists = False self.create_database() self.server_timezone = self._get_server_timezone() + self.server_version = self._get_server_version() def create_database(self): ''' @@ -77,7 +78,7 @@ class Database(object): # TODO check that model has an engine if model_class.system: raise DatabaseException("You can't create system table") - self._send(model_class.create_table_sql(self.db_name)) + self._send(model_class.create_table_sql(self)) def drop_table(self, model_class): ''' @@ -85,7 +86,7 @@ class Database(object): ''' if model_class.system: raise DatabaseException("You can't drop system table") - self._send(model_class.drop_table_sql(self.db_name)) + self._send(model_class.drop_table_sql(self)) def insert(self, model_instances, batch_size=1000): ''' @@ -285,6 +286,11 @@ class Database(object): logger.exception('Cannot determine server timezone, assuming UTC') return pytz.utc + def _get_server_version(self, as_tuple=True): + r = self._send('SELECT version();') + ver = r.text + return tuple(int(n) for n in ver.split('.')) if as_tuple else ver + def _is_connection_readonly(self): r = self._send("SELECT value FROM system.settings WHERE name = 'readonly'") return r.text.strip() != '0' diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 57ca1ce..f6aa062 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -1,89 +1,141 @@ from __future__ import unicode_literals +import logging import six from .utils import comma_join +logger = logging.getLogger('clickhouse_orm') + class Engine(object): - def create_table_sql(self): + def create_table_sql(self, db): raise NotImplementedError() # pragma: no cover class TinyLog(Engine): - def create_table_sql(self): + def create_table_sql(self, db): return 'TinyLog' class Log(Engine): - def create_table_sql(self): + def create_table_sql(self, db): return 'Log' class Memory(Engine): - def create_table_sql(self): + def create_table_sql(self, db): return 'Memory' class MergeTree(Engine): - def __init__(self, date_col, key_cols, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None): - assert type(key_cols) in (list, tuple), 'key_cols must be a list or tuple' + def __init__(self, date_col=None, order_by=(), sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + assert type(order_by) in (list, tuple), 'order_by must be a list or tuple' + assert date_col is None or isinstance(date_col, six.string_types), 'date_col must be string if present' + assert partition_key is None or type(partition_key) in (list, tuple),\ + 'partition_key must be tuple or list if present' + + # These values conflict with each other (old and new syntax of table engines. + # So let's control only one of them is given. + assert date_col or partition_key, "You must set either date_col or partition_key" self.date_col = date_col - self.key_cols = key_cols + self.partition_key = partition_key if partition_key else ('toYYYYMM(`%s`)' % date_col,) + + self.order_by = order_by self.sampling_expr = sampling_expr self.index_granularity = index_granularity self.replica_table_path = replica_table_path self.replica_name = replica_name # TODO verify that both replica fields are either present or missing - def create_table_sql(self): + # I changed field name for new reality and syntax + @property + def key_cols(self): + logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead') + return self.order_by + + @key_cols.setter + def key_cols(self, value): + logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead') + self.order_by = value + + def create_table_sql(self, db): name = self.__class__.__name__ if self.replica_name: name = 'Replicated' + name - params = self._build_sql_params() - return '%s(%s)' % (name, comma_join(params)) - def _build_sql_params(self): + # In ClickHouse 1.1.54310 custom partitioning key was introduced + # https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/ + # Let's check version and use new syntax if available + if db.server_version >= (1, 1, 54310): + partition_sql = "PARTITION BY %s ORDER BY %s" \ + % ('(%s)' % comma_join(self.partition_key), '(%s)' % comma_join(self.order_by)) + + if self.sampling_expr: + partition_sql += " SAMPLE BY %s" % self.sampling_expr + + partition_sql += " SETTINGS index_granularity=%d" % self.index_granularity + + elif not self.date_col: + # Can't import it globally due to circular import + from infi.clickhouse_orm.database import DatabaseException + raise DatabaseException("Custom partitioning is not supported before ClickHouse 1.1.54310. " + "Please update your server or use date_col syntax." + "https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/") + else: + partition_sql = '' + + params = self._build_sql_params(db) + return '%s(%s) %s' % (name, comma_join(params), partition_sql) + + def _build_sql_params(self, db): params = [] if self.replica_name: params += ["'%s'" % self.replica_table_path, "'%s'" % self.replica_name] - params.append(self.date_col) - if self.sampling_expr: - params.append(self.sampling_expr) - params.append('(%s)' % comma_join(self.key_cols)) - params.append(str(self.index_granularity)) + + # In ClickHouse 1.1.54310 custom partitioning key was introduced + # https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/ + # These parameters are process in create_table_sql directly. + # In previous ClickHouse versions this this syntax does not work. + if db.server_version < (1, 1, 54310): + params.append(self.date_col) + if self.sampling_expr: + params.append(self.sampling_expr) + params.append('(%s)' % comma_join(self.order_by)) + params.append(str(self.index_granularity)) + return params class CollapsingMergeTree(MergeTree): - def __init__(self, date_col, key_cols, sign_col, sampling_expr=None, + def __init__(self, date_col, order_by, sign_col, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None): - super(CollapsingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) + super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name) self.sign_col = sign_col - def _build_sql_params(self): - params = super(CollapsingMergeTree, self)._build_sql_params() + def _build_sql_params(self, db): + params = super(CollapsingMergeTree, self)._build_sql_params(db) params.append(self.sign_col) return params class SummingMergeTree(MergeTree): - def __init__(self, date_col, key_cols, summing_cols=None, sampling_expr=None, + def __init__(self, date_col, order_by, summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None): - super(SummingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) + super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name) assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple' self.summing_cols = summing_cols - def _build_sql_params(self): - params = super(SummingMergeTree, self)._build_sql_params() + def _build_sql_params(self, db): + params = super(SummingMergeTree, self)._build_sql_params(db) if self.summing_cols: params.append('(%s)' % comma_join(self.summing_cols)) return params @@ -91,13 +143,13 @@ class SummingMergeTree(MergeTree): class ReplacingMergeTree(MergeTree): - def __init__(self, date_col, key_cols, ver_col=None, sampling_expr=None, + def __init__(self, date_col, order_by, ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None): - super(ReplacingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) + super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name) self.ver_col = ver_col - def _build_sql_params(self): - params = super(ReplacingMergeTree, self)._build_sql_params() + def _build_sql_params(self, db): + params = super(ReplacingMergeTree, self)._build_sql_params(db) if self.ver_col: params.append(self.ver_col) return params @@ -121,11 +173,11 @@ class Buffer(Engine): self.min_bytes = min_bytes self.max_bytes = max_bytes - def create_table_sql(self, db_name): + def create_table_sql(self, db): # Overriden create_table_sql example: - #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' + # sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % ( - db_name, self.main_model.table_name(), self.num_layers, + db.db_name, self.main_model.table_name(), self.num_layers, self.min_time, self.max_time, self.min_rows, self.max_rows, self.min_bytes, self.max_bytes ) @@ -145,13 +197,5 @@ class Merge(Engine): self.table_regex = table_regex - # Use current database as default - self.db_name = None - - def create_table_sql(self): - db_name = ("`%s`" % self.db_name) if self.db_name else 'currentDatabase()' - return "Merge(%s, '%s')" % (db_name, self.table_regex) - - def set_db_name(self, db_name): - assert isinstance(db_name, six.string_types), "'db_name' parameter must be string" - self.db_name = db_name + def create_table_sql(self, db): + return "Merge(`%s`, '%s')" % (db.db_name, self.table_regex) diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index c56b821..38f794f 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -168,25 +168,25 @@ class Model(with_metaclass(ModelBase)): return cls.__name__.lower() @classmethod - def create_table_sql(cls, db_name): + def create_table_sql(cls, db): ''' Returns the SQL command for creating a table for this model. ''' - parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db_name, cls.table_name())] + parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())] cols = [] for name, field in cls._fields: cols.append(' %s %s' % (name, field.get_sql())) parts.append(',\n'.join(cols)) parts.append(')') - parts.append('ENGINE = ' + cls.engine.create_table_sql()) + parts.append('ENGINE = ' + cls.engine.create_table_sql(db)) return '\n'.join(parts) @classmethod - def drop_table_sql(cls, db_name): + def drop_table_sql(cls, db): ''' Returns the SQL command for deleting this model's table. ''' - return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db_name, cls.table_name()) + return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db.db_name, cls.table_name()) @classmethod def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None): @@ -250,12 +250,13 @@ class Model(with_metaclass(ModelBase)): class BufferModel(Model): @classmethod - def create_table_sql(cls, db_name): + def create_table_sql(cls, db): ''' Returns the SQL command for creating a table for this model. ''' - parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.engine.main_model.table_name())] - engine_str = cls.engine.create_table_sql(db_name) + parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db.db_name, cls.table_name(), db.db_name, + cls.engine.main_model.table_name())] + engine_str = cls.engine.create_table_sql(db) parts.append(engine_str) return ' '.join(parts) @@ -271,18 +272,7 @@ class MergeModel(Model): # Virtual fields can't be inserted into database _table = StringField(readonly=True) - def set_database(self, db): - ''' - Gets the `Database` that this model instance belongs to. - Returns `None` unless the instance was read from the database or written to it. - ''' - assert isinstance(self.engine, Merge), "engine must be engines.Merge instance" - res = super(MergeModel, self).set_database(db) - self.engine.set_db_name(db.db_name) - return res - @classmethod - def create_table_sql(cls, db_name): + def create_table_sql(cls, db): assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance" - cls.engine.set_db_name(db_name) - return super(MergeModel, cls).create_table_sql(db_name) + return super(MergeModel, cls).create_table_sql(db) diff --git a/src/infi/clickhouse_orm/system_models.py b/src/infi/clickhouse_orm/system_models.py index 5ca3efd..a4cdf1e 100644 --- a/src/infi/clickhouse_orm/system_models.py +++ b/src/infi/clickhouse_orm/system_models.py @@ -68,7 +68,8 @@ class SystemPart(Model): """ operation = operation.upper() assert operation in self.OPERATIONS, "operation must be in [%s]" % comma_join(self.OPERATIONS) - sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (self._database.db_name, self.table, operation, self.partition) + + sql = "ALTER TABLE `%s`.`%s` %s PARTITION %s" % (self._database.db_name, self.table, operation, self.partition) if from_part is not None: sql += " FROM %s" % from_part self._database.raw(sql, settings=settings, stream=False) diff --git a/tests/test_database.py b/tests/test_database.py index 0214f36..c757f67 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -135,8 +135,8 @@ class DatabaseTestCase(TestCaseWithData): Database(self.database.db_name, username='default', password='wrong') def test_nonexisting_db(self): - db = Database('db_not_here', autocreate=False) with self.assertRaises(DatabaseException): + db = Database('db_not_here', autocreate=False) db.create_table(Person) def test_preexisting_db(self): diff --git a/tests/test_engines.py b/tests/test_engines.py index 20a1b04..835c5d3 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -1,6 +1,8 @@ from __future__ import unicode_literals import unittest +from infi.clickhouse_orm.system_models import SystemPart + from infi.clickhouse_orm.database import Database, DatabaseException from infi.clickhouse_orm.models import Model, MergeModel from infi.clickhouse_orm.fields import * @@ -41,8 +43,12 @@ class EnginesTestCase(unittest.TestCase): def test_replicated_merge_tree(self): engine = MergeTree('date', ('date', 'event_id', 'event_group'), replica_table_path='/clickhouse/tables/{layer}-{shard}/hits', replica_name='{replica}') - expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)" - self.assertEquals(engine.create_table_sql(), expected) + # In ClickHouse 1.1.54310 custom partitioning key was introduced and new syntax is used + if self.database.server_version >= (1, 1, 54310): + expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}') PARTITION BY (toYYYYMM(`date`)) ORDER BY (date, event_id, event_group) SETTINGS index_granularity=8192" + else: + expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)" + self.assertEquals(engine.create_table_sql(self.database), expected) def test_collapsing_merge_tree(self): class TestModel(SampleModel): @@ -124,6 +130,17 @@ class EnginesTestCase(unittest.TestCase): 'event_uversion': 2 }, res[1].to_dict(include_readonly=True)) + def test_custom_partitioning(self): + class TestModel(SampleModel): + engine = MergeTree( + order_by=('date', 'event_id', 'event_group'), + partition_key=('toYYYYMM(date)', 'event_group') + ) + self._create_and_insert(TestModel) + parts = list(SystemPart.get(self.database)) + self.assertEqual(1, len(parts)) + self.assertEqual('(201701, 13)', parts[0].partition) + class SampleModel(Model): diff --git a/tests/test_inheritance.py b/tests/test_inheritance.py index f209995..bdd570f 100644 --- a/tests/test_inheritance.py +++ b/tests/test_inheritance.py @@ -3,6 +3,7 @@ import unittest import datetime import pytz +from infi.clickhouse_orm.database import Database from infi.clickhouse_orm.models import Model from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.engines import * @@ -19,9 +20,10 @@ class InheritanceTestCase(unittest.TestCase): self.assertFieldNames(Model2, ['date_field', 'int_field', 'float_field']) def test_create_table_sql(self): - sql1 = ParentModel.create_table_sql('default') - sql2 = Model1.create_table_sql('default') - sql3 = Model2.create_table_sql('default') + default_db = Database('default') + sql1 = ParentModel.create_table_sql(default_db) + sql2 = Model1.create_table_sql(default_db) + sql3 = Model2.create_table_sql(default_db) self.assertNotEqual(sql1, sql2) self.assertNotEqual(sql1, sql3) self.assertNotEqual(sql2, sql3) diff --git a/tests/test_system_models.py b/tests/test_system_models.py index 54b6650..b49cc52 100644 --- a/tests/test_system_models.py +++ b/tests/test_system_models.py @@ -1,7 +1,10 @@ from __future__ import unicode_literals + import unittest from datetime import date + import os + from infi.clickhouse_orm.database import Database, DatabaseException from infi.clickhouse_orm.engines import * from infi.clickhouse_orm.fields import * @@ -37,7 +40,9 @@ class SystemPartTest(unittest.TestCase): def setUp(self): self.database = Database('test-db') self.database.create_table(TestTable) + self.database.create_table(CustomPartitionedTable) self.database.insert([TestTable(date_field=date.today())]) + self.database.insert([CustomPartitionedTable(date_field=date.today(), group_field=13)]) def tearDown(self): self.database.drop_database() @@ -51,40 +56,46 @@ class SystemPartTest(unittest.TestCase): def test_get_all(self): parts = SystemPart.get(self.database) - self.assertEqual(len(list(parts)), 1) + self.assertEqual(len(list(parts)), 2) def test_get_active(self): parts = list(SystemPart.get_active(self.database)) - self.assertEqual(len(parts), 1) + self.assertEqual(len(parts), 2) parts[0].detach() - self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) + self.assertEqual(len(list(SystemPart.get_active(self.database))), 1) def test_get_conditions(self): parts = list(SystemPart.get(self.database, conditions="table='testtable'")) self.assertEqual(len(parts), 1) - parts = list(SystemPart.get(self.database, conditions=u"table='othertable'")) + parts = list(SystemPart.get(self.database, conditions=u"table='custompartitionedtable'")) + self.assertEqual(len(parts), 1) + parts = list(SystemPart.get(self.database, conditions=u"table='invalidtable'")) self.assertEqual(len(parts), 0) def test_attach_detach(self): parts = list(SystemPart.get_active(self.database)) - self.assertEqual(len(parts), 1) - parts[0].detach() + self.assertEqual(len(parts), 2) + for p in parts: + p.detach() self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) - parts[0].attach() - self.assertEqual(len(list(SystemPart.get_active(self.database))), 1) + for p in parts: + p.attach() + self.assertEqual(len(list(SystemPart.get_active(self.database))), 2) def test_drop(self): parts = list(SystemPart.get_active(self.database)) - parts[0].drop() + for p in parts: + p.drop() self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) def test_freeze(self): parts = list(SystemPart.get(self.database)) # There can be other backups in the folder prev_backups = set(self._get_backups()) - parts[0].freeze() + for p in parts: + p.freeze() backups = set(self._get_backups()) - self.assertEqual(len(backups), len(prev_backups) + 1) + self.assertEqual(len(backups), len(prev_backups) + 2) def test_fetch(self): # TODO Not tested, as I have no replication set @@ -97,5 +108,12 @@ class TestTable(Model): engine = MergeTree('date_field', ('date_field',)) +class CustomPartitionedTable(Model): + date_field = DateField() + group_field = UInt32Field() + + engine = MergeTree(order_by=('date_field', 'group_field'), partition_key=('toYYYYMM(date_field)', 'group_field')) + + class SystemTestModel(Model): system = True