From de9f64cd3a7aa9525d4a18c20dc1c7b74d61fe87 Mon Sep 17 00:00:00 2001 From: M1ha Date: Thu, 7 Sep 2017 17:44:27 +0500 Subject: [PATCH] Added Merge engine 1) Divided readonly and system flags of Field model. Readonly flag only restricts insert operations, while system flag restricts also create and drop table operations 2) Added Merge engine and tests for it 3) Added docs for Merge engine 4) Added opportunity to make Field readonly. This is useful for "virtual" columns (https://clickhouse.yandex/docs/en/single/index.html#virtual-columns) --- docs/table_engines.md | 13 ++++++ docs/toc.md | 1 + src/infi/clickhouse_orm/database.py | 12 +++--- src/infi/clickhouse_orm/engines.py | 28 ++++++++++++- src/infi/clickhouse_orm/fields.py | 8 ++-- src/infi/clickhouse_orm/models.py | 30 +++++++++++++- src/infi/clickhouse_orm/system_models.py | 1 + tests/test_engines.py | 52 +++++++++++++++++++++++- tests/test_readonly.py | 15 +++---- tests/test_system_models.py | 28 ++++++++++++- 10 files changed, 162 insertions(+), 26 deletions(-) diff --git a/docs/table_engines.md b/docs/table_engines.md index 2f92183..30aa07b 100644 --- a/docs/table_engines.md +++ b/docs/table_engines.md @@ -15,6 +15,7 @@ The following engines are supported by the ORM: - SummingMergeTree / ReplicatedSummingMergeTree - ReplacingMergeTree / ReplicatedReplacingMergeTree - Buffer +- Merge Simple Engines @@ -85,6 +86,18 @@ Then you can insert objects into Buffer model and they will be handled by ClickH suzy = PersonBuffer(first_name='Suzy', last_name='Jones') dan = PersonBuffer(first_name='Dan', last_name='Schwartz') db.insert([dan, suzy]) + + +Merge Engine +------------- + +[ClickHouse docs](https://clickhouse.yandex/docs/en/single/index.html#merge) +A `Merge` engine is only used in conjunction with a `MergeModel`. +This table does not store data itself, but allows reading from any number of other tables simultaneously. So you can't insert in it. +Engine parameter specifies re2 (similar to PCRE) regular expression, from which data is selected. + + class MergeTable(models.MergeModel): + engine = engines.Merge('^table_prefix') --- diff --git a/docs/toc.md b/docs/toc.md index aa5bb3b..0f83389 100644 --- a/docs/toc.md +++ b/docs/toc.md @@ -36,6 +36,7 @@ * [Engines in the MergeTree Family](table_engines.md#engines-in-the-mergetree-family) * [Data Replication](table_engines.md#data-replication) * [Buffer Engine](table_engines.md#buffer-engine) + * [Merge Engine](table_engines.md#merge-engine) * [Schema Migrations](schema_migrations.md#schema-migrations) * [Writing Migrations](schema_migrations.md#writing-migrations) diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index 5c816d6..4f94b6b 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -75,16 +75,16 @@ class Database(object): Creates a table for the given model class, if it does not exist already. ''' # TODO check that model has an engine - if model_class.readonly: - raise DatabaseException("You can't create read only table") + if model_class.system: + raise DatabaseException("You can't create system table") self._send(model_class.create_table_sql(self.db_name)) def drop_table(self, model_class): ''' Drops the database table of the given model class, if it exists. ''' - if model_class.readonly: - raise DatabaseException("You can't drop read only table") + if model_class.system: + raise DatabaseException("You can't drop system table") self._send(model_class.drop_table_sql(self.db_name)) def insert(self, model_instances, batch_size=1000): @@ -103,8 +103,8 @@ class Database(object): return # model_instances is empty model_class = first_instance.__class__ - if first_instance.readonly: - raise DatabaseException("You can't insert into read only table") + if first_instance.readonly or first_instance.system: + raise DatabaseException("You can't insert into read only and system tables") def gen(): buf = BytesIO() diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index caa05c7..a91fa6c 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -1,4 +1,7 @@ from __future__ import unicode_literals + +import six + from .utils import comma_join @@ -118,7 +121,6 @@ class Buffer(Engine): self.min_bytes = min_bytes self.max_bytes = max_bytes - def create_table_sql(self, db_name): # Overriden create_table_sql example: #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' @@ -128,3 +130,27 @@ class Buffer(Engine): self.max_rows, self.min_bytes, self.max_bytes ) return sql + + +class Merge(Engine): + """ + The Merge engine (not to be confused with MergeTree) does not store data itself, + but allows reading from any number of other tables simultaneously. + Writing to a table is not supported + https://clickhouse.yandex/docs/en/single/index.html#document-table_engines/merge + """ + + def __init__(self, table_regex): + assert isinstance(table_regex, six.string_types), "'db_name' parameter must be string" + + self.table_regex = table_regex + + # Use current database as default + self.db_name = 'currentDatabase()' + + def create_table_sql(self): + return "Merge(%s, '%s')" % (self.db_name, self.table_regex) + + def set_db_name(self, db_name): + assert isinstance(db_name, six.string_types), "'db_name' parameter must be string" + self.db_name = db_name diff --git a/src/infi/clickhouse_orm/fields.py b/src/infi/clickhouse_orm/fields.py index a57314e..0cf2543 100644 --- a/src/infi/clickhouse_orm/fields.py +++ b/src/infi/clickhouse_orm/fields.py @@ -16,19 +16,21 @@ class Field(object): class_default = 0 db_type = None - def __init__(self, default=None, alias=None, materialized=None): + def __init__(self, default=None, alias=None, materialized=None, readonly=None): assert (None, None) in {(default, alias), (alias, materialized), (default, materialized)}, \ "Only one of default, alias and materialized parameters can be given" assert alias is None or isinstance(alias, string_types) and alias != "",\ "Alias field must be string field name, if given" assert materialized is None or isinstance(materialized, string_types) and alias != "",\ "Materialized field must be string, if given" + assert readonly is None or type(readonly) is bool, "readonly parameter must be bool if given" self.creation_counter = Field.creation_counter Field.creation_counter += 1 self.default = self.class_default if default is None else default self.alias = alias self.materialized = materialized + self.readonly = bool(self.alias or self.materialized or readonly) def to_python(self, value, timezone_in_use): ''' @@ -75,10 +77,6 @@ class Field(object): else: return self.db_type - @property - def readonly(self): - return bool(self.alias or self.materialized) - class StringField(Field): diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 8714447..33cfde2 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -4,9 +4,10 @@ from logging import getLogger from six import with_metaclass import pytz -from .fields import Field +from .fields import Field, StringField from .utils import parse_tsv from .query import QuerySet +from .engines import Merge logger = getLogger('clickhouse_orm') @@ -86,8 +87,13 @@ class Model(with_metaclass(ModelBase)): ''' engine = None + + # Insert operations are restricted for read only models readonly = False + # Create table, drop table, insert operations are restricted for system models + system = False + def __init__(self, **kwargs): ''' Creates a model instance, using keyword arguments as field values. @@ -246,3 +252,25 @@ class BufferModel(Model): engine_str = cls.engine.create_table_sql(db_name) parts.append(engine_str) return ' '.join(parts) + + +class MergeModel(Model): + ''' + Model for Merge engine + Predefines virtual _table column an controls that rows can't be inserted to this table type + https://clickhouse.yandex/docs/en/single/index.html#document-table_engines/merge + ''' + readonly = True + + # Virtual fields can't be inserted into database + _table = StringField(readonly=True) + + def set_database(self, db): + ''' + Gets the `Database` that this model instance belongs to. + Returns `None` unless the instance was read from the database or written to it. + ''' + assert isinstance(self.engine, Merge), "engine must be engines.Merge instance" + res = super(MergeModel, self).set_database(db) + self.engine.set_db_name(db.db_name) + return res diff --git a/src/infi/clickhouse_orm/system_models.py b/src/infi/clickhouse_orm/system_models.py index 7edd902..5ca3efd 100644 --- a/src/infi/clickhouse_orm/system_models.py +++ b/src/infi/clickhouse_orm/system_models.py @@ -20,6 +20,7 @@ class SystemPart(Model): OPERATIONS = frozenset({'DETACH', 'DROP', 'ATTACH', 'FREEZE', 'FETCH'}) readonly = True + system = True database = StringField() # Name of the database where the table that this part belongs to is located. table = StringField() # Name of the table that this part belongs to. diff --git a/tests/test_engines.py b/tests/test_engines.py index ddc3a85..65497ca 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -2,7 +2,7 @@ from __future__ import unicode_literals import unittest from infi.clickhouse_orm.database import Database, DatabaseException -from infi.clickhouse_orm.models import Model +from infi.clickhouse_orm.models import Model, MergeModel from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.engines import * @@ -74,6 +74,56 @@ class EnginesTestCase(unittest.TestCase): engine = Memory() self._create_and_insert(TestModel) + def test_merge(self): + class TestModel1(SampleModel): + engine = TinyLog() + + class TestModel2(SampleModel): + engine = TinyLog() + + class TestMergeModel(MergeModel, SampleModel): + engine = Merge('^testmodel') + + self.database.create_table(TestModel1) + self.database.create_table(TestModel2) + self.database.create_table(TestMergeModel) + + # Insert operations are restricted for this model type + with self.assertRaises(DatabaseException): + self.database.insert([ + TestMergeModel(date='2017-01-01', event_id=23423, event_group=13, event_count=7, event_version=1) + ]) + + # Testing select + self.database.insert([ + TestModel1(date='2017-01-01', event_id=1, event_group=1, event_count=1, event_version=1) + ]) + self.database.insert([ + TestModel2(date='2017-01-02', event_id=2, event_group=2, event_count=2, event_version=2) + ]) + # event_uversion is materialized field. So * won't select it and it will be zero + res = self.database.select('SELECT *, event_uversion FROM $table ORDER BY event_id', model_class=TestMergeModel) + res = [row for row in res] + self.assertEqual(2, len(res)) + self.assertDictEqual({ + '_table': 'testmodel1', + 'date': datetime.date(2017, 1, 1), + 'event_id': 1, + 'event_group': 1, + 'event_count': 1, + 'event_version': 1, + 'event_uversion': 1 + }, res[0].to_dict(include_readonly=True)) + self.assertDictEqual({ + '_table': 'testmodel2', + 'date': datetime.date(2017, 1, 2), + 'event_id': 2, + 'event_group': 2, + 'event_count': 2, + 'event_version': 2, + 'event_uversion': 2 + }, res[1].to_dict(include_readonly=True)) + class SampleModel(Model): diff --git a/tests/test_readonly.py b/tests/test_readonly.py index ae3d54f..facbaa0 100644 --- a/tests/test_readonly.py +++ b/tests/test_readonly.py @@ -1,12 +1,7 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals -import unittest -import six -from infi.clickhouse_orm.database import Database, DatabaseException -from infi.clickhouse_orm.models import Model -from infi.clickhouse_orm.fields import * -from infi.clickhouse_orm.engines import * +from infi.clickhouse_orm.database import DatabaseException from .base_test_with_data import * @@ -45,15 +40,15 @@ class ReadonlyTestCase(TestCaseWithData): self.database.insert([m]) def test_create_readonly_table(self): - with self.assertRaises(DatabaseException): - self.database.create_table(ReadOnlyModel) + self.database.create_table(ReadOnlyModel) def test_drop_readonly_table(self): - with self.assertRaises(DatabaseException): - self.database.drop_table(ReadOnlyModel) + self.database.drop_table(ReadOnlyModel) class ReadOnlyModel(Model): readonly = True name = StringField() + date = DateField() + engine = MergeTree('date', ('name',)) diff --git a/tests/test_system_models.py b/tests/test_system_models.py index 1a3b49a..54b6650 100644 --- a/tests/test_system_models.py +++ b/tests/test_system_models.py @@ -2,14 +2,34 @@ from __future__ import unicode_literals import unittest from datetime import date import os -import shutil -from infi.clickhouse_orm.database import Database +from infi.clickhouse_orm.database import Database, DatabaseException from infi.clickhouse_orm.engines import * from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.models import Model from infi.clickhouse_orm.system_models import SystemPart +class SystemTest(unittest.TestCase): + def setUp(self): + self.database = Database('test-db') + + def tearDown(self): + self.database.drop_database() + + def test_insert_system(self): + m = SystemPart() + with self.assertRaises(DatabaseException): + self.database.insert([m]) + + def test_create_readonly_table(self): + with self.assertRaises(DatabaseException): + self.database.create_table(SystemTestModel) + + def test_drop_readonly_table(self): + with self.assertRaises(DatabaseException): + self.database.drop_table(SystemTestModel) + + class SystemPartTest(unittest.TestCase): BACKUP_DIRS = ['/var/lib/clickhouse/shadow', '/opt/clickhouse/shadow/'] @@ -75,3 +95,7 @@ class TestTable(Model): date_field = DateField() engine = MergeTree('date_field', ('date_field',)) + + +class SystemTestModel(Model): + system = True