add Distributed engine

This commit is contained in:
Ivan Ladelshchikov 2017-11-21 16:30:25 +05:00 committed by Itai Shirav
parent 3268019216
commit a5f2fa4d76
4 changed files with 277 additions and 4 deletions

View File

@ -16,6 +16,7 @@ The following engines are supported by the ORM:
- ReplacingMergeTree / ReplicatedReplacingMergeTree
- Buffer
- Merge
- Distributed
Simple Engines

View File

@ -155,3 +155,63 @@ class Merge(Engine):
def set_db_name(self, db_name):
assert isinstance(db_name, six.string_types), "'db_name' parameter must be string"
self.db_name = db_name
class Distributed(Engine):
"""
The Distributed engine by itself does not store data,
but allows distributed query processing on multiple servers.
Reading is automatically parallelized.
During a read, the table indexes on remote servers are used, if there are any.
See full documentation here
https://clickhouse.yandex/docs/en/table_engines/distributed.html
"""
def __init__(self, cluster, table=None, db_name=None, sharding_key=None):
"""
:param cluster: what cluster to access data from
:param table: underlying table that actually stores data.
If you are not specifying any table here, ensure that it can be inferred
from your model's superclass (see models.DistributedModel.fix_engine_table)
:param db_name: which database to access data from
By default it is 'currentDatabase()'
:param sharding_key: how to distribute data among shards when inserting
straightly into Distributed table, optional
"""
self.cluster = cluster
self.table = table
self.db_name = db_name
self.sharding_key = sharding_key
@property
def table_name(self):
# TODO: circular import is bad
from .models import ModelBase
table = self.table
if isinstance(table, ModelBase):
return table.table_name()
return table
def set_db_name(self, db_name):
assert isinstance(db_name, six.string_types), "'db_name' parameter must be string"
self.db_name = db_name
def create_table_sql(self):
name = self.__class__.__name__
params = self._build_sql_params()
return '%s(%s)' % (name, ', '.join(params))
def _build_sql_params(self):
db_name = ("`%s`" % self.db_name) if self.db_name else 'currentDatabase()'
if self.table_name is None:
raise ValueError("Cannot create {} engine: specify an underlying table".format(
self.__class__.__name__))
params = [self.cluster, db_name, self.table_name]
if self.sharding_key:
params.append(self.sharding_key)
return params

View File

@ -9,7 +9,7 @@ import pytz
from .fields import Field, StringField
from .utils import parse_tsv
from .query import QuerySet
from .engines import Merge
from .engines import Merge, Distributed
logger = getLogger('clickhouse_orm')
@ -296,3 +296,82 @@ class MergeModel(Model):
assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance"
cls.engine.set_db_name(db_name)
return super(MergeModel, cls).create_table_sql(db_name)
# TODO: base class for models that require specific engine
class DistributedModel(Model):
"""
Model for Distributed engine
"""
def set_database(self, db):
assert isinstance(self.engine, Distributed), "engine must be engines.Distributed instance"
res = super(DistributedModel, self).set_database(db)
self.engine.set_db_name(db.db_name)
return res
@classmethod
def fix_engine_table(cls):
"""
Remember: Distributed table does not store any data, just provides distributed access to it.
So if we define a model with engine that has no defined table for data storage
(see FooDistributed below), that table cannot be successfully created.
This routine can automatically fix engine's storage table by finding the first
non-distributed model among your model's superclasses.
>>> class Foo(Model):
... id = UInt8Field(1)
...
>>> class FooDistributed(Foo, DistributedModel):
... engine = Distributed('my_cluster')
...
>>> FooDistributed.engine.table
None
>>> FooDistributed.fix_engine()
>>> FooDistributed.engine.table
<class '__main__.Foo'>
However if you prefer more explicit way of doing things,
you can always mention the Foo model twice without bothering with any fixes:
>>> class FooDistributedVerbose(Foo, DistributedModel):
... engine = Distributed('my_cluster', Foo)
>>> FooDistributedVerbose.engine.table
<class '__main__.Foo'>
See tests.test_engines:DistributedTestCase for more examples
"""
# apply only when engine has no table defined
if cls.engine.table_name:
return
# find out all the superclasses of the Model that store any data
storage_models = [b for b in cls.__bases__ if issubclass(b, Model)
and not issubclass(b, DistributedModel)]
if not storage_models:
raise TypeError("When defining Distributed engine without the table_name "
"ensure that your model has a parent model")
if len(storage_models) > 1:
raise TypeError("When defining Distributed engine without the table_name "
"ensure that your model has exactly one non-distributed superclass")
# enable correct SQL for engine
cls.engine.table = storage_models[0]
@classmethod
def create_table_sql(cls, db_name):
assert isinstance(cls.engine, Distributed), "engine must be engines.Distributed instance"
cls.engine.set_db_name(db_name)
cls.fix_engine_table()
parts = [
'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` AS `{0}`.`{2}`'.format(
db_name, cls.table_name(), cls.engine.table_name),
'ENGINE = ' + cls.engine.create_table_sql()]
return '\n'.join(parts)

View File

@ -1,8 +1,8 @@
from __future__ import unicode_literals
import unittest
from infi.clickhouse_orm.database import Database, DatabaseException
from infi.clickhouse_orm.models import Model, MergeModel
from infi.clickhouse_orm.database import Database, DatabaseException, ServerError
from infi.clickhouse_orm.models import Model, MergeModel, DistributedModel
from infi.clickhouse_orm.fields import *
from infi.clickhouse_orm.engines import *
@ -10,7 +10,7 @@ import logging
logging.getLogger("requests").setLevel(logging.WARNING)
class EnginesTestCase(unittest.TestCase):
class _EnginesHelperTestCase(unittest.TestCase):
def setUp(self):
self.database = Database('test-db')
@ -18,6 +18,8 @@ class EnginesTestCase(unittest.TestCase):
def tearDown(self):
self.database.drop_database()
class EnginesTestCase(_EnginesHelperTestCase):
def _create_and_insert(self, model_class):
self.database.create_table(model_class)
self.database.insert([
@ -133,3 +135,134 @@ class SampleModel(Model):
event_count = UInt16Field()
event_version = Int8Field()
event_uversion = UInt8Field(materialized='abs(event_version)')
class DistributedTestCase(_EnginesHelperTestCase):
def test_without_table_name(self):
engine = Distributed('my_cluster')
with self.assertRaises(ValueError) as cm:
engine.create_table_sql()
exc = cm.exception
self.assertEqual(str(exc), 'Cannot create Distributed engine: specify an underlying table')
def test_with_table_name(self):
engine = Distributed('my_cluster', 'foo')
sql = engine.create_table_sql()
self.assertEqual(sql, 'Distributed(my_cluster, currentDatabase(), foo)')
class TestModel(SampleModel):
engine = TinyLog()
def _create_distributed(self, shard_name, underlying=TestModel):
class TestDistributedModel(DistributedModel, underlying):
engine = Distributed(shard_name, underlying)
self.database.create_table(underlying)
self.database.create_table(TestDistributedModel)
return TestDistributedModel
def test_bad_cluster_name(self):
d_model = self._create_distributed('cluster_name')
with self.assertRaises(ServerError) as cm:
self.database.count(d_model)
exc = cm.exception
self.assertEqual(exc.code, 170)
self.assertEqual(exc.message, "Requested cluster 'cluster_name' not found")
def test_verbose_engine_two_superclasses(self):
class TestModel2(SampleModel):
engine = Log()
class TestDistributedModel(DistributedModel, self.TestModel, TestModel2):
engine = Distributed('test_shard_localhost', self.TestModel)
self.database.create_table(self.TestModel)
self.database.create_table(TestDistributedModel)
self.assertEqual(self.database.count(TestDistributedModel), 0)
def test_minimal_engine(self):
class TestDistributedModel(DistributedModel, self.TestModel):
engine = Distributed('test_shard_localhost')
self.database.create_table(self.TestModel)
self.database.create_table(TestDistributedModel)
self.assertEqual(self.database.count(TestDistributedModel), 0)
def test_minimal_engine_two_superclasses(self):
class TestModel2(SampleModel):
engine = Log()
class TestDistributedModel(DistributedModel, self.TestModel, TestModel2):
engine = Distributed('test_shard_localhost')
self.database.create_table(self.TestModel)
with self.assertRaises(TypeError) as cm:
self.database.create_table(TestDistributedModel)
exc = cm.exception
self.assertEqual(str(exc), 'When defining Distributed engine without the table_name ensure '
'that your model has exactly one non-distributed superclass')
def test_minimal_engine_no_superclasses(self):
class TestDistributedModel(DistributedModel):
engine = Distributed('test_shard_localhost')
self.database.create_table(self.TestModel)
with self.assertRaises(TypeError) as cm:
self.database.create_table(TestDistributedModel)
exc = cm.exception
self.assertEqual(str(exc), 'When defining Distributed engine without the table_name ensure '
'that your model has a parent model')
def _test_insert_select(self, local_to_distributed, test_model=TestModel, include_readonly=True):
d_model = self._create_distributed('test_shard_localhost', underlying=test_model)
if local_to_distributed:
to_insert, to_select = test_model, d_model
else:
to_insert, to_select = d_model, test_model
self.database.insert([
to_insert(date='2017-01-01', event_id=1, event_group=1, event_count=1, event_version=1),
to_insert(date='2017-01-02', event_id=2, event_group=2, event_count=2, event_version=2)
])
# event_uversion is materialized field. So * won't select it and it will be zero
res = self.database.select('SELECT *, event_uversion FROM $table ORDER BY event_id',
model_class=to_select)
res = [row for row in res]
self.assertEqual(2, len(res))
self.assertDictEqual({
'date': datetime.date(2017, 1, 1),
'event_id': 1,
'event_group': 1,
'event_count': 1,
'event_version': 1,
'event_uversion': 1
}, res[0].to_dict(include_readonly=include_readonly))
self.assertDictEqual({
'date': datetime.date(2017, 1, 2),
'event_id': 2,
'event_group': 2,
'event_count': 2,
'event_version': 2,
'event_uversion': 2
}, res[1].to_dict(include_readonly=include_readonly))
@unittest.skip("Bad support of materialized fields in Distributed tables "
"https://groups.google.com/forum/#!topic/clickhouse/XEYRRwZrsSc")
def test_insert_distributed_select_local(self):
return self._test_insert_select(local_to_distributed=False)
def test_insert_local_select_distributed(self):
return self._test_insert_select(local_to_distributed=True)
def _test_insert_distributed_select_local_no_materialized_fields(self):
class TestModel2(self.TestModel):
event_uversion = UInt8Field(readonly=True)
return self._test_insert_select(local_to_distributed=False, test_model=TestModel2, include_readonly=False)