diff --git a/docs/table_engines.md b/docs/table_engines.md index 30aa07b..50ed056 100644 --- a/docs/table_engines.md +++ b/docs/table_engines.md @@ -16,6 +16,7 @@ The following engines are supported by the ORM: - ReplacingMergeTree / ReplicatedReplacingMergeTree - Buffer - Merge +- Distributed Simple Engines diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 57ca1ce..945c8c4 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -155,3 +155,63 @@ class Merge(Engine): def set_db_name(self, db_name): assert isinstance(db_name, six.string_types), "'db_name' parameter must be string" self.db_name = db_name + + +class Distributed(Engine): + """ + The Distributed engine by itself does not store data, + but allows distributed query processing on multiple servers. + Reading is automatically parallelized. + During a read, the table indexes on remote servers are used, if there are any. + + See full documentation here + https://clickhouse.yandex/docs/en/table_engines/distributed.html + """ + def __init__(self, cluster, table=None, db_name=None, sharding_key=None): + """ + :param cluster: what cluster to access data from + :param table: underlying table that actually stores data. + If you are not specifying any table here, ensure that it can be inferred + from your model's superclass (see models.DistributedModel.fix_engine_table) + :param db_name: which database to access data from + By default it is 'currentDatabase()' + :param sharding_key: how to distribute data among shards when inserting + straightly into Distributed table, optional + """ + self.cluster = cluster + self.table = table + self.db_name = db_name + self.sharding_key = sharding_key + + @property + def table_name(self): + # TODO: circular import is bad + from .models import ModelBase + + table = self.table + + if isinstance(table, ModelBase): + return table.table_name() + + return table + + def set_db_name(self, db_name): + assert isinstance(db_name, six.string_types), "'db_name' parameter must be string" + self.db_name = db_name + + def create_table_sql(self): + name = self.__class__.__name__ + params = self._build_sql_params() + return '%s(%s)' % (name, ', '.join(params)) + + def _build_sql_params(self): + db_name = ("`%s`" % self.db_name) if self.db_name else 'currentDatabase()' + + if self.table_name is None: + raise ValueError("Cannot create {} engine: specify an underlying table".format( + self.__class__.__name__)) + + params = [self.cluster, db_name, self.table_name] + if self.sharding_key: + params.append(self.sharding_key) + return params diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 62b22a5..6b44c37 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -9,7 +9,7 @@ import pytz from .fields import Field, StringField from .utils import parse_tsv from .query import QuerySet -from .engines import Merge +from .engines import Merge, Distributed logger = getLogger('clickhouse_orm') @@ -296,3 +296,82 @@ class MergeModel(Model): assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance" cls.engine.set_db_name(db_name) return super(MergeModel, cls).create_table_sql(db_name) + + +# TODO: base class for models that require specific engine + + +class DistributedModel(Model): + """ + Model for Distributed engine + """ + + def set_database(self, db): + assert isinstance(self.engine, Distributed), "engine must be engines.Distributed instance" + res = super(DistributedModel, self).set_database(db) + self.engine.set_db_name(db.db_name) + return res + + @classmethod + def fix_engine_table(cls): + """ + Remember: Distributed table does not store any data, just provides distributed access to it. + + So if we define a model with engine that has no defined table for data storage + (see FooDistributed below), that table cannot be successfully created. + This routine can automatically fix engine's storage table by finding the first + non-distributed model among your model's superclasses. + + >>> class Foo(Model): + ... id = UInt8Field(1) + ... + >>> class FooDistributed(Foo, DistributedModel): + ... engine = Distributed('my_cluster') + ... + >>> FooDistributed.engine.table + None + >>> FooDistributed.fix_engine() + >>> FooDistributed.engine.table + + + However if you prefer more explicit way of doing things, + you can always mention the Foo model twice without bothering with any fixes: + + >>> class FooDistributedVerbose(Foo, DistributedModel): + ... engine = Distributed('my_cluster', Foo) + >>> FooDistributedVerbose.engine.table + + + See tests.test_engines:DistributedTestCase for more examples + """ + + # apply only when engine has no table defined + if cls.engine.table_name: + return + + # find out all the superclasses of the Model that store any data + storage_models = [b for b in cls.__bases__ if issubclass(b, Model) + and not issubclass(b, DistributedModel)] + if not storage_models: + raise TypeError("When defining Distributed engine without the table_name " + "ensure that your model has a parent model") + + if len(storage_models) > 1: + raise TypeError("When defining Distributed engine without the table_name " + "ensure that your model has exactly one non-distributed superclass") + + # enable correct SQL for engine + cls.engine.table = storage_models[0] + + @classmethod + def create_table_sql(cls, db_name): + assert isinstance(cls.engine, Distributed), "engine must be engines.Distributed instance" + cls.engine.set_db_name(db_name) + + cls.fix_engine_table() + + parts = [ + 'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` AS `{0}`.`{2}`'.format( + db_name, cls.table_name(), cls.engine.table_name), + 'ENGINE = ' + cls.engine.create_table_sql()] + return '\n'.join(parts) diff --git a/tests/test_engines.py b/tests/test_engines.py index 20a1b04..30cca75 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -1,8 +1,8 @@ from __future__ import unicode_literals import unittest -from infi.clickhouse_orm.database import Database, DatabaseException -from infi.clickhouse_orm.models import Model, MergeModel +from infi.clickhouse_orm.database import Database, DatabaseException, ServerError +from infi.clickhouse_orm.models import Model, MergeModel, DistributedModel from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.engines import * @@ -10,7 +10,7 @@ import logging logging.getLogger("requests").setLevel(logging.WARNING) -class EnginesTestCase(unittest.TestCase): +class _EnginesHelperTestCase(unittest.TestCase): def setUp(self): self.database = Database('test-db') @@ -18,6 +18,8 @@ class EnginesTestCase(unittest.TestCase): def tearDown(self): self.database.drop_database() + +class EnginesTestCase(_EnginesHelperTestCase): def _create_and_insert(self, model_class): self.database.create_table(model_class) self.database.insert([ @@ -133,3 +135,134 @@ class SampleModel(Model): event_count = UInt16Field() event_version = Int8Field() event_uversion = UInt8Field(materialized='abs(event_version)') + + +class DistributedTestCase(_EnginesHelperTestCase): + def test_without_table_name(self): + engine = Distributed('my_cluster') + + with self.assertRaises(ValueError) as cm: + engine.create_table_sql() + + exc = cm.exception + self.assertEqual(str(exc), 'Cannot create Distributed engine: specify an underlying table') + + def test_with_table_name(self): + engine = Distributed('my_cluster', 'foo') + sql = engine.create_table_sql() + self.assertEqual(sql, 'Distributed(my_cluster, currentDatabase(), foo)') + + class TestModel(SampleModel): + engine = TinyLog() + + def _create_distributed(self, shard_name, underlying=TestModel): + class TestDistributedModel(DistributedModel, underlying): + engine = Distributed(shard_name, underlying) + + self.database.create_table(underlying) + self.database.create_table(TestDistributedModel) + return TestDistributedModel + + def test_bad_cluster_name(self): + d_model = self._create_distributed('cluster_name') + with self.assertRaises(ServerError) as cm: + self.database.count(d_model) + + exc = cm.exception + self.assertEqual(exc.code, 170) + self.assertEqual(exc.message, "Requested cluster 'cluster_name' not found") + + def test_verbose_engine_two_superclasses(self): + class TestModel2(SampleModel): + engine = Log() + + class TestDistributedModel(DistributedModel, self.TestModel, TestModel2): + engine = Distributed('test_shard_localhost', self.TestModel) + + self.database.create_table(self.TestModel) + self.database.create_table(TestDistributedModel) + self.assertEqual(self.database.count(TestDistributedModel), 0) + + def test_minimal_engine(self): + class TestDistributedModel(DistributedModel, self.TestModel): + engine = Distributed('test_shard_localhost') + + self.database.create_table(self.TestModel) + self.database.create_table(TestDistributedModel) + + self.assertEqual(self.database.count(TestDistributedModel), 0) + + def test_minimal_engine_two_superclasses(self): + class TestModel2(SampleModel): + engine = Log() + + class TestDistributedModel(DistributedModel, self.TestModel, TestModel2): + engine = Distributed('test_shard_localhost') + + self.database.create_table(self.TestModel) + with self.assertRaises(TypeError) as cm: + self.database.create_table(TestDistributedModel) + + exc = cm.exception + self.assertEqual(str(exc), 'When defining Distributed engine without the table_name ensure ' + 'that your model has exactly one non-distributed superclass') + + def test_minimal_engine_no_superclasses(self): + class TestDistributedModel(DistributedModel): + engine = Distributed('test_shard_localhost') + + self.database.create_table(self.TestModel) + with self.assertRaises(TypeError) as cm: + self.database.create_table(TestDistributedModel) + + exc = cm.exception + self.assertEqual(str(exc), 'When defining Distributed engine without the table_name ensure ' + 'that your model has a parent model') + + def _test_insert_select(self, local_to_distributed, test_model=TestModel, include_readonly=True): + d_model = self._create_distributed('test_shard_localhost', underlying=test_model) + + if local_to_distributed: + to_insert, to_select = test_model, d_model + else: + to_insert, to_select = d_model, test_model + + self.database.insert([ + to_insert(date='2017-01-01', event_id=1, event_group=1, event_count=1, event_version=1), + to_insert(date='2017-01-02', event_id=2, event_group=2, event_count=2, event_version=2) + ]) + # event_uversion is materialized field. So * won't select it and it will be zero + res = self.database.select('SELECT *, event_uversion FROM $table ORDER BY event_id', + model_class=to_select) + res = [row for row in res] + self.assertEqual(2, len(res)) + self.assertDictEqual({ + 'date': datetime.date(2017, 1, 1), + 'event_id': 1, + 'event_group': 1, + 'event_count': 1, + 'event_version': 1, + 'event_uversion': 1 + }, res[0].to_dict(include_readonly=include_readonly)) + self.assertDictEqual({ + 'date': datetime.date(2017, 1, 2), + 'event_id': 2, + 'event_group': 2, + 'event_count': 2, + 'event_version': 2, + 'event_uversion': 2 + }, res[1].to_dict(include_readonly=include_readonly)) + + @unittest.skip("Bad support of materialized fields in Distributed tables " + "https://groups.google.com/forum/#!topic/clickhouse/XEYRRwZrsSc") + def test_insert_distributed_select_local(self): + return self._test_insert_select(local_to_distributed=False) + + def test_insert_local_select_distributed(self): + return self._test_insert_select(local_to_distributed=True) + + def _test_insert_distributed_select_local_no_materialized_fields(self): + class TestModel2(self.TestModel): + event_uversion = UInt8Field(readonly=True) + + return self._test_insert_select(local_to_distributed=False, test_model=TestModel2, include_readonly=False)