From 0342dc863be01ccbdddc266075cf55286b730eca Mon Sep 17 00:00:00 2001 From: "pv.larkin" Date: Wed, 29 Nov 2017 14:27:54 +0300 Subject: [PATCH 01/39] Add OR and AND operations for Q objects --- src/infi/clickhouse_orm/query.py | 46 +++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/src/infi/clickhouse_orm/query.py b/src/infi/clickhouse_orm/query.py index 0bf764a..dc8f2db 100644 --- a/src/infi/clickhouse_orm/query.py +++ b/src/infi/clickhouse_orm/query.py @@ -143,9 +143,23 @@ class FOV(object): class Q(object): - def __init__(self, **kwargs): - self._fovs = [self._build_fov(k, v) for k, v in six.iteritems(kwargs)] + AND_MODE = 'AND' + OR_MODE = 'OR' + + def __init__(self, **filter_fields): + self._fovs = [self._build_fov(k, v) for k, v in six.iteritems(filter_fields)] + self._l_child = None + self._r_child = None self._negate = False + self._mode = self.AND_MODE + + @classmethod + def _construct_from(cls, l_child, r_child, mode): + q = Q() + q._l_child = l_child + q._r_child = r_child + q._mode = mode + return q def _build_fov(self, key, value): if '__' in key: @@ -155,13 +169,23 @@ class Q(object): return FOV(field_name, operator, value) def to_sql(self, model_cls): - if not self._fovs: - return '1' - sql = ' AND '.join(fov.to_sql(model_cls) for fov in self._fovs) + if self._fovs: + sql = ' {} '.format(self._mode).join(fov.to_sql(model_cls) for fov in self._fovs) + else: + if self._l_child and self._r_child: + sql = '({}) {} ({})'.format(self._l_child.to_sql(model_cls), self._mode, self._r_child.to_sql(model_cls)) + else: + return '1' if self._negate: sql = 'NOT (%s)' % sql return sql + def __or__(self, other): + return Q._construct_from(self, other, self.OR_MODE) + + def __and__(self, other): + return Q._construct_from(self, other, self.AND_MODE) + def __invert__(self): q = copy(self) q._negate = True @@ -286,20 +310,24 @@ class QuerySet(object): qs._fields = field_names return qs - def filter(self, **kwargs): + def filter(self, *q, **filter_fields): """ Returns a copy of this queryset that includes only rows matching the conditions. + Add q object to query if it specified. """ qs = copy(self) - qs._q = list(self._q) + [Q(**kwargs)] + if q: + qs._q = list(self._q) + list(q) + else: + qs._q = list(self._q) + [Q(**filter_fields)] return qs - def exclude(self, **kwargs): + def exclude(self, **filter_fields): """ Returns a copy of this queryset that excludes all rows matching the conditions. """ qs = copy(self) - qs._q = list(self._q) + [~Q(**kwargs)] + qs._q = list(self._q) + [~Q(**filter_fields)] return qs def paginate(self, page_num=1, page_size=100): From d553aaf9ebc553ce6c5a280c13f8ef3e22dd2b45 Mon Sep 17 00:00:00 2001 From: "pv.larkin" Date: Wed, 29 Nov 2017 14:52:52 +0300 Subject: [PATCH 02/39] Add between operator --- src/infi/clickhouse_orm/query.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/infi/clickhouse_orm/query.py b/src/infi/clickhouse_orm/query.py index dc8f2db..3d579ae 100644 --- a/src/infi/clickhouse_orm/query.py +++ b/src/infi/clickhouse_orm/query.py @@ -103,6 +103,21 @@ class NotOperator(Operator): return 'NOT (%s)' % self._base_operator.to_sql(model_cls, field_name, value) +class BetweenOperator(Operator): + + def to_sql(self, model_cls, field_name, value): + field = getattr(model_cls, field_name) + value0 = field.to_db_string( + field.to_python(value[0], pytz.utc)) if value[0] is not None or len(str(value[0])) > 0 else None + value1 = field.to_db_string( + field.to_python(value[1], pytz.utc)) if value[1] is not None or len(str(value[1])) > 0 else None + if value0 and value1: + return '%s BETWEEN %s and %s' % (field_name, value0, value1) + if value0 and not value1: + return ' '.join([field_name, '>=', value0]) + if value1 and not value0: + return ' '.join([field_name, '<=', value1]) + # Define the set of builtin operators _operators = {} @@ -116,6 +131,7 @@ register_operator('gt', SimpleOperator('>')) register_operator('gte', SimpleOperator('>=')) register_operator('lt', SimpleOperator('<')) register_operator('lte', SimpleOperator('<=')) +register_operator('between', BetweenOperator()) register_operator('in', InOperator()) register_operator('not_in', NotOperator(InOperator())) register_operator('contains', LikeOperator('%{}%')) @@ -173,7 +189,8 @@ class Q(object): sql = ' {} '.format(self._mode).join(fov.to_sql(model_cls) for fov in self._fovs) else: if self._l_child and self._r_child: - sql = '({}) {} ({})'.format(self._l_child.to_sql(model_cls), self._mode, self._r_child.to_sql(model_cls)) + sql = '({}) {} ({})'.format( + self._l_child.to_sql(model_cls), self._mode, self._r_child.to_sql(model_cls)) else: return '1' if self._negate: From 52d63cff60264c7a3a0e20c56c4cf1a0d6679a44 Mon Sep 17 00:00:00 2001 From: desile Date: Wed, 29 Nov 2017 23:58:22 +0300 Subject: [PATCH 03/39] Add tests demonstrating and checking usage of Q objects for filtration --- tests/test_querysets.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/test_querysets.py b/tests/test_querysets.py index cbbc65d..8f938fc 100644 --- a/tests/test_querysets.py +++ b/tests/test_querysets.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals, print_function import unittest from infi.clickhouse_orm.database import Database +from infi.clickhouse_orm.query import Q from .base_test_with_data import * import logging from datetime import date, datetime @@ -59,6 +60,15 @@ class QuerySetTestCase(TestCaseWithData): self._test_qs(qs.filter(first_name__iendswith='ia'), 3) # case insensitive self._test_qs(qs.filter(first_name__iendswith=''), 100) # empty suffix + def test_filter_with_q_objects(self): + qs = Person.objects_in(self.database) + self._test_qs(qs.filter(Q(first_name='Ciaran')), 2) + self._test_qs(qs.filter(Q(first_name='Ciaran') | Q(first_name='Chelsea')), 3) + self._test_qs(qs.filter(Q(first_name__in=['Warren', 'Whilemina', 'Whitney']) & Q(height__gte=1.7)), 3) + self._test_qs(qs.filter((Q(first_name__in=['Warren', 'Whilemina', 'Whitney']) & Q(height__gte=1.7) | + (Q(first_name__in=['Victoria', 'Victor', 'Venus']) & Q(height__lt=1.7)))), 4) + self._test_qs(qs.filter(Q(first_name='Elton') & ~Q(last_name='Smith')), 1) + def test_filter_unicode_string(self): self.database.insert([ Person(first_name=u'דונלד', last_name=u'דאק') From 328b924bd282ad108142656f0d8e0e5f886ab9ed Mon Sep 17 00:00:00 2001 From: desile Date: Thu, 30 Nov 2017 00:21:59 +0300 Subject: [PATCH 04/39] Update docs for Q objects and new operator --- docs/querysets.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/querysets.md b/docs/querysets.md index d27c836..e79fcd3 100644 --- a/docs/querysets.md +++ b/docs/querysets.md @@ -26,6 +26,12 @@ It is possible to specify several fields to filter or exclude by: >>> qs.conditions_as_sql() u"last_name = 'Smith' AND height > 1.75" +For filters with compound conditions you can use `Q` objects inside `filter` with overloaded operators `&` (AND), `|` (OR) and `~` (NOT): + + >>> qs = Person.objects_in(database).filter((Q(first_name='Ciaran', last_name='Carver') | Q(height_lte=1.8)) & ~Q(first_name='David')) + >>> qs.conditions_as_sql() + u"((first_name = 'Ciaran' AND last_name = 'Carver') OR height <= 1.8) AND (NOT (first_name = 'David'))" + There are different operators that can be used, by passing `__=` (two underscores separate the field name from the operator). In case no operator is given, `eq` is used by default. Below are all the supported operators. | Operator | Equivalent SQL | Comments | @@ -36,6 +42,7 @@ There are different operators that can be used, by passing `__= value` | | | `lt` | `field < value` | | | `lte` | `field <= value` | | +| `between` | `field BETWEEN value1 AND value2` | | | `in` | `field IN (values)` | See below | | `not_in` | `field NOT IN (values)` | See below | | `contains` | `field LIKE '%value%'` | For string fields only | From c98edc4f774ed57ef57a169caff71210ece1ea8e Mon Sep 17 00:00:00 2001 From: desile Date: Sun, 3 Dec 2017 21:54:58 +0300 Subject: [PATCH 05/39] Added doc for between operator --- src/infi/clickhouse_orm/query.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/infi/clickhouse_orm/query.py b/src/infi/clickhouse_orm/query.py index 3d579ae..7c2d84b 100644 --- a/src/infi/clickhouse_orm/query.py +++ b/src/infi/clickhouse_orm/query.py @@ -104,6 +104,14 @@ class NotOperator(Operator): class BetweenOperator(Operator): + """ + An operator that implements BETWEEN. + Accepts list or tuple of two elements and generates sql condition: + - 'BETWEEN value[0] AND value[1]' if value[0] and value[1] are not None and not empty + Then imitations of BETWEEN, where one of two limits is missing + - '>= value[0]' if value[1] is None or empty + - '<= value[1]' if value[0] is None or empty + """ def to_sql(self, model_cls, field_name, value): field = getattr(model_cls, field_name) From 31ee58967fc7a26266b6c10f53d35f763e588376 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sat, 7 Apr 2018 15:14:14 +0300 Subject: [PATCH 06/39] TRIVIAL fix test --- tests/test_engines.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_engines.py b/tests/test_engines.py index 65497ca..20a1b04 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -31,7 +31,7 @@ class EnginesTestCase(unittest.TestCase): def test_merge_tree_with_sampling(self): class TestModel(SampleModel): - engine = MergeTree('date', ('date', 'event_id', 'event_group'), sampling_expr='intHash32(event_id)') + engine = MergeTree('date', ('date', 'event_id', 'event_group', 'intHash32(event_id)'), sampling_expr='intHash32(event_id)') self._create_and_insert(TestModel) def test_merge_tree_with_granularity(self): From cd8d82c22680dba4da093054bda3dabef3dae09c Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sat, 7 Apr 2018 15:20:33 +0300 Subject: [PATCH 07/39] Update docs --- CHANGELOG.md | 5 +++++ docs/class_reference.md | 10 ++++++---- src/infi/clickhouse_orm/query.py | 3 +-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac840ac..6b716c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ Change Log ========== +Unreleased +---------- +- Add support for compound filters with Q objects (desile) +- Add support for BETWEEN operator (desile) + v0.9.8 ------ - Bug fix: add field names list explicitly to Database.insert method (anci) diff --git a/docs/class_reference.md b/docs/class_reference.md index e56fc7a..7bb9e55 100644 --- a/docs/class_reference.md +++ b/docs/class_reference.md @@ -605,16 +605,17 @@ Adds a DISTINCT clause to the query, meaning that any duplicate rows in the results will be omitted. -#### exclude(**kwargs) +#### exclude(**filter_fields) Returns a copy of this queryset that excludes all rows matching the conditions. -#### filter(**kwargs) +#### filter(*q, **filter_fields) Returns a copy of this queryset that includes only rows matching the conditions. +Add q object to query if it specified. #### only(*field_names) @@ -705,16 +706,17 @@ Adds a DISTINCT clause to the query, meaning that any duplicate rows in the results will be omitted. -#### exclude(**kwargs) +#### exclude(**filter_fields) Returns a copy of this queryset that excludes all rows matching the conditions. -#### filter(**kwargs) +#### filter(*q, **filter_fields) Returns a copy of this queryset that includes only rows matching the conditions. +Add q object to query if it specified. #### group_by(*args) diff --git a/src/infi/clickhouse_orm/query.py b/src/infi/clickhouse_orm/query.py index 7c2d84b..487a836 100644 --- a/src/infi/clickhouse_orm/query.py +++ b/src/infi/clickhouse_orm/query.py @@ -7,7 +7,6 @@ from .utils import comma_join # TODO -# - and/or between Q objects # - check that field names are valid # - operators for arrays: length, has, empty @@ -120,7 +119,7 @@ class BetweenOperator(Operator): value1 = field.to_db_string( field.to_python(value[1], pytz.utc)) if value[1] is not None or len(str(value[1])) > 0 else None if value0 and value1: - return '%s BETWEEN %s and %s' % (field_name, value0, value1) + return '%s BETWEEN %s AND %s' % (field_name, value0, value1) if value0 and not value1: return ' '.join([field_name, '>=', value0]) if value1 and not value0: From c7d3fa2c1e0607d409ca3940100bff0f7da06180 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sat, 7 Apr 2018 18:03:31 +0300 Subject: [PATCH 08/39] use wheels --- buildout.cfg | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/buildout.cfg b/buildout.cfg index 27e9e8b..4872a0d 100644 --- a/buildout.cfg +++ b/buildout.cfg @@ -3,7 +3,8 @@ prefer-final = false newest = false download-cache = .cache develop = . -parts = +parts = +extensions = buildout.wheel [project] name = infi.clickhouse_orm From c023ad407d76663b12fe46ffff6b49a66dc1e93b Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sat, 7 Apr 2018 18:13:02 +0300 Subject: [PATCH 09/39] use wheels --- buildout.cfg | 1 + 1 file changed, 1 insertion(+) diff --git a/buildout.cfg b/buildout.cfg index 4872a0d..2e10159 100644 --- a/buildout.cfg +++ b/buildout.cfg @@ -4,6 +4,7 @@ newest = false download-cache = .cache develop = . parts = +relative-paths = true extensions = buildout.wheel [project] From 7fb05896926acab163a1f373092bf22cc0f3cb4f Mon Sep 17 00:00:00 2001 From: M1ha Date: Thu, 12 Apr 2018 14:21:46 +0500 Subject: [PATCH 10/39] 1. Added support of custom partitioning (https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/) 2. Added attribute server_version to Database class 3. Changed Engine.create_table_sql(), Engine.drop_table_sql(), Model.create_table_sql(), Model.drop_table_sql() parameter to db from db_name --- docs/class_reference.md | 22 +++- docs/ref.md | 8 +- docs/table_engines.md | 17 +++ src/infi/clickhouse_orm/database.py | 10 +- src/infi/clickhouse_orm/engines.py | 128 +++++++++++++++-------- src/infi/clickhouse_orm/models.py | 32 ++---- src/infi/clickhouse_orm/system_models.py | 3 +- tests/test_database.py | 2 +- tests/test_engines.py | 21 +++- tests/test_inheritance.py | 8 +- tests/test_system_models.py | 40 +++++-- 11 files changed, 199 insertions(+), 92 deletions(-) diff --git a/docs/class_reference.md b/docs/class_reference.md index 7bb9e55..254b282 100644 --- a/docs/class_reference.md +++ b/docs/class_reference.md @@ -24,6 +24,18 @@ created on the ClickHouse server if it does not already exist. - `autocreate`: automatically create the database if does not exist (unless in readonly mode). +#### server_timezone + + +Contains [pytz](http://pytz.sourceforge.net/) timezone used on database server + + +#### server_version + + +Contains a version tuple of database server, for example (1, 1, 54310) + + #### count(model_class, conditions=None) @@ -144,13 +156,13 @@ invalid values will cause a `ValueError` to be raised. Unrecognized field names will cause an `AttributeError`. -#### Model.create_table_sql(db_name) +#### Model.create_table_sql(db) Returns the SQL command for creating a table for this model. -#### Model.drop_table_sql(db_name) +#### Model.drop_table_sql(db) Returns the SQL command for deleting this model's table. @@ -233,13 +245,13 @@ invalid values will cause a `ValueError` to be raised. Unrecognized field names will cause an `AttributeError`. -#### BufferModel.create_table_sql(db_name) +#### BufferModel.create_table_sql(db) Returns the SQL command for creating a table for this model. -#### BufferModel.drop_table_sql(db_name) +#### BufferModel.drop_table_sql(db) Returns the SQL command for deleting this model's table. @@ -497,7 +509,7 @@ Extends Engine Extends Engine -#### MergeTree(date_col, key_cols, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) +#### MergeTree(date_col=None, order_by=(), sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) ### Buffer diff --git a/docs/ref.md b/docs/ref.md index f789578..a298d04 100644 --- a/docs/ref.md +++ b/docs/ref.md @@ -119,12 +119,12 @@ invalid values will cause a `ValueError` to be raised. Unrecognized field names will cause an `AttributeError`. -#### Model.create_table_sql(db_name) +#### Model.create_table_sql(db) Returns the SQL command for creating a table for this model. -#### Model.drop_table_sql(db_name) +#### Model.drop_table_sql(db) Returns the SQL command for deleting this model's table. @@ -197,12 +197,12 @@ invalid values will cause a `ValueError` to be raised. Unrecognized field names will cause an `AttributeError`. -#### BufferModel.create_table_sql(db_name) +#### BufferModel.create_table_sql(db) Returns the SQL command for creating a table for this model. -#### BufferModel.drop_table_sql(db_name) +#### BufferModel.drop_table_sql(db) Returns the SQL command for deleting this model's table. diff --git a/docs/table_engines.md b/docs/table_engines.md index 30aa07b..cf5b0c1 100644 --- a/docs/table_engines.md +++ b/docs/table_engines.md @@ -54,6 +54,23 @@ For a `ReplacingMergeTree` you can optionally specify the version column: engine = engines.ReplacingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), ver_col='Version') +### Custom partitioning + +ClickHouse supports [custom partitioning](https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/) expressions since version 1.1.54310 +You can use custom partitioning with any MergeTree family engine. +To set custom partitioning: +* skip date_col (first) constructor parameter or fill it with None value +* add name to order_by (second) constructor parameter +* add partition_key parameter. It should be a tuple of expressions, by which partition are built. + +Standard partitioning by date column can be added using toYYYYMM(date) function. + +Example: + + engine = engines.ReplacingMergeTree(order_by=('OrderID', 'EventDate', 'BannerID'), ver_col='Version', + partition_key=('toYYYYMM(EventDate)', 'BannerID')) + + ### Data Replication Any of the above engines can be converted to a replicated engine (e.g. `ReplicatedMergeTree`) by adding two parameters, `replica_table_path` and `replica_name`: diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index cb47d02..0777714 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -56,6 +56,7 @@ class Database(object): self.db_exists = False self.create_database() self.server_timezone = self._get_server_timezone() + self.server_version = self._get_server_version() def create_database(self): ''' @@ -77,7 +78,7 @@ class Database(object): # TODO check that model has an engine if model_class.system: raise DatabaseException("You can't create system table") - self._send(model_class.create_table_sql(self.db_name)) + self._send(model_class.create_table_sql(self)) def drop_table(self, model_class): ''' @@ -85,7 +86,7 @@ class Database(object): ''' if model_class.system: raise DatabaseException("You can't drop system table") - self._send(model_class.drop_table_sql(self.db_name)) + self._send(model_class.drop_table_sql(self)) def insert(self, model_instances, batch_size=1000): ''' @@ -285,6 +286,11 @@ class Database(object): logger.exception('Cannot determine server timezone, assuming UTC') return pytz.utc + def _get_server_version(self, as_tuple=True): + r = self._send('SELECT version();') + ver = r.text + return tuple(int(n) for n in ver.split('.')) if as_tuple else ver + def _is_connection_readonly(self): r = self._send("SELECT value FROM system.settings WHERE name = 'readonly'") return r.text.strip() != '0' diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 57ca1ce..f6aa062 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -1,89 +1,141 @@ from __future__ import unicode_literals +import logging import six from .utils import comma_join +logger = logging.getLogger('clickhouse_orm') + class Engine(object): - def create_table_sql(self): + def create_table_sql(self, db): raise NotImplementedError() # pragma: no cover class TinyLog(Engine): - def create_table_sql(self): + def create_table_sql(self, db): return 'TinyLog' class Log(Engine): - def create_table_sql(self): + def create_table_sql(self, db): return 'Log' class Memory(Engine): - def create_table_sql(self): + def create_table_sql(self, db): return 'Memory' class MergeTree(Engine): - def __init__(self, date_col, key_cols, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None): - assert type(key_cols) in (list, tuple), 'key_cols must be a list or tuple' + def __init__(self, date_col=None, order_by=(), sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + assert type(order_by) in (list, tuple), 'order_by must be a list or tuple' + assert date_col is None or isinstance(date_col, six.string_types), 'date_col must be string if present' + assert partition_key is None or type(partition_key) in (list, tuple),\ + 'partition_key must be tuple or list if present' + + # These values conflict with each other (old and new syntax of table engines. + # So let's control only one of them is given. + assert date_col or partition_key, "You must set either date_col or partition_key" self.date_col = date_col - self.key_cols = key_cols + self.partition_key = partition_key if partition_key else ('toYYYYMM(`%s`)' % date_col,) + + self.order_by = order_by self.sampling_expr = sampling_expr self.index_granularity = index_granularity self.replica_table_path = replica_table_path self.replica_name = replica_name # TODO verify that both replica fields are either present or missing - def create_table_sql(self): + # I changed field name for new reality and syntax + @property + def key_cols(self): + logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead') + return self.order_by + + @key_cols.setter + def key_cols(self, value): + logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead') + self.order_by = value + + def create_table_sql(self, db): name = self.__class__.__name__ if self.replica_name: name = 'Replicated' + name - params = self._build_sql_params() - return '%s(%s)' % (name, comma_join(params)) - def _build_sql_params(self): + # In ClickHouse 1.1.54310 custom partitioning key was introduced + # https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/ + # Let's check version and use new syntax if available + if db.server_version >= (1, 1, 54310): + partition_sql = "PARTITION BY %s ORDER BY %s" \ + % ('(%s)' % comma_join(self.partition_key), '(%s)' % comma_join(self.order_by)) + + if self.sampling_expr: + partition_sql += " SAMPLE BY %s" % self.sampling_expr + + partition_sql += " SETTINGS index_granularity=%d" % self.index_granularity + + elif not self.date_col: + # Can't import it globally due to circular import + from infi.clickhouse_orm.database import DatabaseException + raise DatabaseException("Custom partitioning is not supported before ClickHouse 1.1.54310. " + "Please update your server or use date_col syntax." + "https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/") + else: + partition_sql = '' + + params = self._build_sql_params(db) + return '%s(%s) %s' % (name, comma_join(params), partition_sql) + + def _build_sql_params(self, db): params = [] if self.replica_name: params += ["'%s'" % self.replica_table_path, "'%s'" % self.replica_name] - params.append(self.date_col) - if self.sampling_expr: - params.append(self.sampling_expr) - params.append('(%s)' % comma_join(self.key_cols)) - params.append(str(self.index_granularity)) + + # In ClickHouse 1.1.54310 custom partitioning key was introduced + # https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/ + # These parameters are process in create_table_sql directly. + # In previous ClickHouse versions this this syntax does not work. + if db.server_version < (1, 1, 54310): + params.append(self.date_col) + if self.sampling_expr: + params.append(self.sampling_expr) + params.append('(%s)' % comma_join(self.order_by)) + params.append(str(self.index_granularity)) + return params class CollapsingMergeTree(MergeTree): - def __init__(self, date_col, key_cols, sign_col, sampling_expr=None, + def __init__(self, date_col, order_by, sign_col, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None): - super(CollapsingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) + super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name) self.sign_col = sign_col - def _build_sql_params(self): - params = super(CollapsingMergeTree, self)._build_sql_params() + def _build_sql_params(self, db): + params = super(CollapsingMergeTree, self)._build_sql_params(db) params.append(self.sign_col) return params class SummingMergeTree(MergeTree): - def __init__(self, date_col, key_cols, summing_cols=None, sampling_expr=None, + def __init__(self, date_col, order_by, summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None): - super(SummingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) + super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name) assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple' self.summing_cols = summing_cols - def _build_sql_params(self): - params = super(SummingMergeTree, self)._build_sql_params() + def _build_sql_params(self, db): + params = super(SummingMergeTree, self)._build_sql_params(db) if self.summing_cols: params.append('(%s)' % comma_join(self.summing_cols)) return params @@ -91,13 +143,13 @@ class SummingMergeTree(MergeTree): class ReplacingMergeTree(MergeTree): - def __init__(self, date_col, key_cols, ver_col=None, sampling_expr=None, + def __init__(self, date_col, order_by, ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None): - super(ReplacingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) + super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name) self.ver_col = ver_col - def _build_sql_params(self): - params = super(ReplacingMergeTree, self)._build_sql_params() + def _build_sql_params(self, db): + params = super(ReplacingMergeTree, self)._build_sql_params(db) if self.ver_col: params.append(self.ver_col) return params @@ -121,11 +173,11 @@ class Buffer(Engine): self.min_bytes = min_bytes self.max_bytes = max_bytes - def create_table_sql(self, db_name): + def create_table_sql(self, db): # Overriden create_table_sql example: - #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' + # sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % ( - db_name, self.main_model.table_name(), self.num_layers, + db.db_name, self.main_model.table_name(), self.num_layers, self.min_time, self.max_time, self.min_rows, self.max_rows, self.min_bytes, self.max_bytes ) @@ -145,13 +197,5 @@ class Merge(Engine): self.table_regex = table_regex - # Use current database as default - self.db_name = None - - def create_table_sql(self): - db_name = ("`%s`" % self.db_name) if self.db_name else 'currentDatabase()' - return "Merge(%s, '%s')" % (db_name, self.table_regex) - - def set_db_name(self, db_name): - assert isinstance(db_name, six.string_types), "'db_name' parameter must be string" - self.db_name = db_name + def create_table_sql(self, db): + return "Merge(`%s`, '%s')" % (db.db_name, self.table_regex) diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index c56b821..38f794f 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -168,25 +168,25 @@ class Model(with_metaclass(ModelBase)): return cls.__name__.lower() @classmethod - def create_table_sql(cls, db_name): + def create_table_sql(cls, db): ''' Returns the SQL command for creating a table for this model. ''' - parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db_name, cls.table_name())] + parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())] cols = [] for name, field in cls._fields: cols.append(' %s %s' % (name, field.get_sql())) parts.append(',\n'.join(cols)) parts.append(')') - parts.append('ENGINE = ' + cls.engine.create_table_sql()) + parts.append('ENGINE = ' + cls.engine.create_table_sql(db)) return '\n'.join(parts) @classmethod - def drop_table_sql(cls, db_name): + def drop_table_sql(cls, db): ''' Returns the SQL command for deleting this model's table. ''' - return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db_name, cls.table_name()) + return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db.db_name, cls.table_name()) @classmethod def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None): @@ -250,12 +250,13 @@ class Model(with_metaclass(ModelBase)): class BufferModel(Model): @classmethod - def create_table_sql(cls, db_name): + def create_table_sql(cls, db): ''' Returns the SQL command for creating a table for this model. ''' - parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.engine.main_model.table_name())] - engine_str = cls.engine.create_table_sql(db_name) + parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db.db_name, cls.table_name(), db.db_name, + cls.engine.main_model.table_name())] + engine_str = cls.engine.create_table_sql(db) parts.append(engine_str) return ' '.join(parts) @@ -271,18 +272,7 @@ class MergeModel(Model): # Virtual fields can't be inserted into database _table = StringField(readonly=True) - def set_database(self, db): - ''' - Gets the `Database` that this model instance belongs to. - Returns `None` unless the instance was read from the database or written to it. - ''' - assert isinstance(self.engine, Merge), "engine must be engines.Merge instance" - res = super(MergeModel, self).set_database(db) - self.engine.set_db_name(db.db_name) - return res - @classmethod - def create_table_sql(cls, db_name): + def create_table_sql(cls, db): assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance" - cls.engine.set_db_name(db_name) - return super(MergeModel, cls).create_table_sql(db_name) + return super(MergeModel, cls).create_table_sql(db) diff --git a/src/infi/clickhouse_orm/system_models.py b/src/infi/clickhouse_orm/system_models.py index 5ca3efd..a4cdf1e 100644 --- a/src/infi/clickhouse_orm/system_models.py +++ b/src/infi/clickhouse_orm/system_models.py @@ -68,7 +68,8 @@ class SystemPart(Model): """ operation = operation.upper() assert operation in self.OPERATIONS, "operation must be in [%s]" % comma_join(self.OPERATIONS) - sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (self._database.db_name, self.table, operation, self.partition) + + sql = "ALTER TABLE `%s`.`%s` %s PARTITION %s" % (self._database.db_name, self.table, operation, self.partition) if from_part is not None: sql += " FROM %s" % from_part self._database.raw(sql, settings=settings, stream=False) diff --git a/tests/test_database.py b/tests/test_database.py index 0214f36..c757f67 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -135,8 +135,8 @@ class DatabaseTestCase(TestCaseWithData): Database(self.database.db_name, username='default', password='wrong') def test_nonexisting_db(self): - db = Database('db_not_here', autocreate=False) with self.assertRaises(DatabaseException): + db = Database('db_not_here', autocreate=False) db.create_table(Person) def test_preexisting_db(self): diff --git a/tests/test_engines.py b/tests/test_engines.py index 20a1b04..835c5d3 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -1,6 +1,8 @@ from __future__ import unicode_literals import unittest +from infi.clickhouse_orm.system_models import SystemPart + from infi.clickhouse_orm.database import Database, DatabaseException from infi.clickhouse_orm.models import Model, MergeModel from infi.clickhouse_orm.fields import * @@ -41,8 +43,12 @@ class EnginesTestCase(unittest.TestCase): def test_replicated_merge_tree(self): engine = MergeTree('date', ('date', 'event_id', 'event_group'), replica_table_path='/clickhouse/tables/{layer}-{shard}/hits', replica_name='{replica}') - expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)" - self.assertEquals(engine.create_table_sql(), expected) + # In ClickHouse 1.1.54310 custom partitioning key was introduced and new syntax is used + if self.database.server_version >= (1, 1, 54310): + expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}') PARTITION BY (toYYYYMM(`date`)) ORDER BY (date, event_id, event_group) SETTINGS index_granularity=8192" + else: + expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)" + self.assertEquals(engine.create_table_sql(self.database), expected) def test_collapsing_merge_tree(self): class TestModel(SampleModel): @@ -124,6 +130,17 @@ class EnginesTestCase(unittest.TestCase): 'event_uversion': 2 }, res[1].to_dict(include_readonly=True)) + def test_custom_partitioning(self): + class TestModel(SampleModel): + engine = MergeTree( + order_by=('date', 'event_id', 'event_group'), + partition_key=('toYYYYMM(date)', 'event_group') + ) + self._create_and_insert(TestModel) + parts = list(SystemPart.get(self.database)) + self.assertEqual(1, len(parts)) + self.assertEqual('(201701, 13)', parts[0].partition) + class SampleModel(Model): diff --git a/tests/test_inheritance.py b/tests/test_inheritance.py index f209995..bdd570f 100644 --- a/tests/test_inheritance.py +++ b/tests/test_inheritance.py @@ -3,6 +3,7 @@ import unittest import datetime import pytz +from infi.clickhouse_orm.database import Database from infi.clickhouse_orm.models import Model from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.engines import * @@ -19,9 +20,10 @@ class InheritanceTestCase(unittest.TestCase): self.assertFieldNames(Model2, ['date_field', 'int_field', 'float_field']) def test_create_table_sql(self): - sql1 = ParentModel.create_table_sql('default') - sql2 = Model1.create_table_sql('default') - sql3 = Model2.create_table_sql('default') + default_db = Database('default') + sql1 = ParentModel.create_table_sql(default_db) + sql2 = Model1.create_table_sql(default_db) + sql3 = Model2.create_table_sql(default_db) self.assertNotEqual(sql1, sql2) self.assertNotEqual(sql1, sql3) self.assertNotEqual(sql2, sql3) diff --git a/tests/test_system_models.py b/tests/test_system_models.py index 54b6650..b49cc52 100644 --- a/tests/test_system_models.py +++ b/tests/test_system_models.py @@ -1,7 +1,10 @@ from __future__ import unicode_literals + import unittest from datetime import date + import os + from infi.clickhouse_orm.database import Database, DatabaseException from infi.clickhouse_orm.engines import * from infi.clickhouse_orm.fields import * @@ -37,7 +40,9 @@ class SystemPartTest(unittest.TestCase): def setUp(self): self.database = Database('test-db') self.database.create_table(TestTable) + self.database.create_table(CustomPartitionedTable) self.database.insert([TestTable(date_field=date.today())]) + self.database.insert([CustomPartitionedTable(date_field=date.today(), group_field=13)]) def tearDown(self): self.database.drop_database() @@ -51,40 +56,46 @@ class SystemPartTest(unittest.TestCase): def test_get_all(self): parts = SystemPart.get(self.database) - self.assertEqual(len(list(parts)), 1) + self.assertEqual(len(list(parts)), 2) def test_get_active(self): parts = list(SystemPart.get_active(self.database)) - self.assertEqual(len(parts), 1) + self.assertEqual(len(parts), 2) parts[0].detach() - self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) + self.assertEqual(len(list(SystemPart.get_active(self.database))), 1) def test_get_conditions(self): parts = list(SystemPart.get(self.database, conditions="table='testtable'")) self.assertEqual(len(parts), 1) - parts = list(SystemPart.get(self.database, conditions=u"table='othertable'")) + parts = list(SystemPart.get(self.database, conditions=u"table='custompartitionedtable'")) + self.assertEqual(len(parts), 1) + parts = list(SystemPart.get(self.database, conditions=u"table='invalidtable'")) self.assertEqual(len(parts), 0) def test_attach_detach(self): parts = list(SystemPart.get_active(self.database)) - self.assertEqual(len(parts), 1) - parts[0].detach() + self.assertEqual(len(parts), 2) + for p in parts: + p.detach() self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) - parts[0].attach() - self.assertEqual(len(list(SystemPart.get_active(self.database))), 1) + for p in parts: + p.attach() + self.assertEqual(len(list(SystemPart.get_active(self.database))), 2) def test_drop(self): parts = list(SystemPart.get_active(self.database)) - parts[0].drop() + for p in parts: + p.drop() self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) def test_freeze(self): parts = list(SystemPart.get(self.database)) # There can be other backups in the folder prev_backups = set(self._get_backups()) - parts[0].freeze() + for p in parts: + p.freeze() backups = set(self._get_backups()) - self.assertEqual(len(backups), len(prev_backups) + 1) + self.assertEqual(len(backups), len(prev_backups) + 2) def test_fetch(self): # TODO Not tested, as I have no replication set @@ -97,5 +108,12 @@ class TestTable(Model): engine = MergeTree('date_field', ('date_field',)) +class CustomPartitionedTable(Model): + date_field = DateField() + group_field = UInt32Field() + + engine = MergeTree(order_by=('date_field', 'group_field'), partition_key=('toYYYYMM(date_field)', 'group_field')) + + class SystemTestModel(Model): system = True From 0927136ffd82fc43aca08c82861075bf7e373cc2 Mon Sep 17 00:00:00 2001 From: M1ha Date: Fri, 20 Apr 2018 12:38:36 +0500 Subject: [PATCH 11/39] 1) Added a test on https://github.com/Infinidat/infi.clickhouse_orm/issues/66 2) Fixed issue --- src/infi/clickhouse_orm/models.py | 4 +++ tests/test_datetime_fields.py | 42 +++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 tests/test_datetime_fields.py diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index c56b821..fd67382 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -57,6 +57,10 @@ class ModelBase(type): # Enums if db_type.startswith('Enum'): return orm_fields.BaseEnumField.create_ad_hoc_field(db_type) + # DateTime with timezone + if db_type.startswith('DateTime('): + # Some functions return DateTimeField with timezone in brackets + return orm_fields.DateTimeField() # Arrays if db_type.startswith('Array'): inner_field = cls.create_ad_hoc_field(db_type[6 : -1]) diff --git a/tests/test_datetime_fields.py b/tests/test_datetime_fields.py new file mode 100644 index 0000000..ebe1e6c --- /dev/null +++ b/tests/test_datetime_fields.py @@ -0,0 +1,42 @@ +from __future__ import unicode_literals +import unittest + +from infi.clickhouse_orm.database import Database +from infi.clickhouse_orm.models import Model +from infi.clickhouse_orm.fields import * +from infi.clickhouse_orm.engines import * + + +class DateFieldsTest(unittest.TestCase): + + def setUp(self): + self.database = Database('test-db') + self.database.create_table(ModelWithDate) + + def tearDown(self): + self.database.drop_database() + + def test_ad_hoc_model(self): + self.database.insert([ + ModelWithDate(date_field='2016-08-30', datetime_field='2016-08-30 03:50:00'), + ModelWithDate(date_field='2016-08-31', datetime_field='2016-08-31 01:30:00') + ]) + + # toStartOfHour returns DateTime('Asia/Yekaterinburg') in my case, so I test it here to + query = 'SELECT toStartOfHour(datetime_field) as hour_start, * from $db.modelwithdate ORDER BY date_field' + results = list(self.database.select(query)) + self.assertEquals(len(results), 2) + self.assertEquals(results[0].date_field, datetime.date(2016, 8, 30)) + self.assertEquals(results[0].datetime_field, datetime.datetime(2016, 8, 30, 3, 50, 0, tzinfo=pytz.UTC)) + self.assertEquals(results[0].hour_start, datetime.datetime(2016, 8, 30, 3, 0, 0, tzinfo=pytz.UTC)) + self.assertEquals(results[1].date_field, datetime.date(2016, 8, 31)) + self.assertEquals(results[1].datetime_field, datetime.datetime(2016, 8, 31, 1, 30, 0, tzinfo=pytz.UTC)) + self.assertEquals(results[1].hour_start, datetime.datetime(2016, 8, 31, 1, 0, 0, tzinfo=pytz.UTC)) + + +class ModelWithDate(Model): + + date_field = DateField() + datetime_field = DateTimeField() + + engine = MergeTree('date_field', ('date_field',)) From 92ab21c99af1ee08a2d07e0c9161c7ed653e82bb Mon Sep 17 00:00:00 2001 From: Ivan Ladelshchikov Date: Thu, 2 Nov 2017 13:14:21 +0500 Subject: [PATCH 12/39] cross-version testing with tox --- .gitignore | 5 ++++- tox.ini | 12 ++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) create mode 100644 tox.ini diff --git a/.gitignore b/.gitignore index ae23e11..0e9fa7b 100644 --- a/.gitignore +++ b/.gitignore @@ -58,4 +58,7 @@ buildout.in src/infi/clickhouse_orm/__version__.py bootstrap.py -htmldocs/ \ No newline at end of file +htmldocs/ + +# tox +.tox/ diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..e012173 --- /dev/null +++ b/tox.ini @@ -0,0 +1,12 @@ +[tox] +envlist = py27, py35 + +[testenv] +deps = + nose + flake8 + +commands = + {envpython} -m compileall -q src/ tests/ + # {envbindir}/flake8 src/ tests/ --max-line-length=120 + nosetests -v {posargs} From 57112f9de600c820dcdb1fa4297f188e61bd179e Mon Sep 17 00:00:00 2001 From: Ivan Ladelshchikov Date: Mon, 6 Nov 2017 21:58:10 +0500 Subject: [PATCH 13/39] add ServerError exception --- src/infi/clickhouse_orm/database.py | 47 +++++++++++++++++++++++++++-- tests/test_database.py | 14 +++++++-- tests/test_readonly.py | 21 +++++++++---- 3 files changed, 70 insertions(+), 12 deletions(-) diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index cb47d02..92bcb09 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals +import re import requests from collections import namedtuple from .models import ModelBase @@ -24,6 +25,46 @@ class DatabaseException(Exception): pass +class ServerError(DatabaseException): + """ + Raised when a server returns an error. + """ + def __init__(self, message): + self.code = None + processed = self.get_error_code_msg(message) + if processed: + self.code, self.message = processed + else: + # just skip custom init + # if non-standard message format + super(ServerError, self).__init__(message) + + ERROR_PATTERN = re.compile(r''' + Code:\ (?P\d+), + \ e\.displayText\(\)\ =\ (?P[^ \n]+):\ (?P.+?), + \ e.what\(\)\ =\ (?P[^ \n]+) + ''', re.VERBOSE | re.DOTALL) + + @classmethod + def get_error_code_msg(cls, full_error_message): + """ + Extract the code and message of the exception that clickhouse-server generated. + + See the list of error codes here: + https://github.com/yandex/ClickHouse/blob/master/dbms/src/Common/ErrorCodes.cpp + """ + match = cls.ERROR_PATTERN.match(full_error_message) + if match: + # assert match.group('type1') == match.group('type2') + return int(match.group('code')), match.group('msg') + + return 0, full_error_message + + def __str__(self): + if self.code is not None: + return "{} ({})".format(self.message, self.code) + + class Database(object): ''' Database instances connect to a specific ClickHouse database for running queries, @@ -250,7 +291,7 @@ class Database(object): params = self._build_params(settings) r = requests.post(self.db_url, params=params, data=data, stream=stream) if r.status_code != 200: - raise DatabaseException(r.text) + raise ServerError(r.text) return r def _build_params(self, settings): @@ -281,8 +322,8 @@ class Database(object): try: r = self._send('SELECT timezone()') return pytz.timezone(r.text.strip()) - except DatabaseException: - logger.exception('Cannot determine server timezone, assuming UTC') + except ServerError as e: + logger.exception('Cannot determine server timezone (%s), assuming UTC', e) return pytz.utc def _is_connection_readonly(self): diff --git a/tests/test_database.py b/tests/test_database.py index 0214f36..8e7c5c4 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -2,7 +2,7 @@ from __future__ import unicode_literals import unittest -from infi.clickhouse_orm.database import Database, DatabaseException +from infi.clickhouse_orm.database import ServerError from .base_test_with_data import * @@ -131,14 +131,22 @@ class DatabaseTestCase(TestCaseWithData): self.assertEqual(results, "Whitney\tDurham\t1977-09-15\t1.72\nWhitney\tScott\t1971-07-04\t1.7\n") def test_invalid_user(self): - with self.assertRaises(DatabaseException): + with self.assertRaises(ServerError) as cm: Database(self.database.db_name, username='default', password='wrong') + exc = cm.exception + self.assertEqual(exc.code, 193) + self.assertEqual(exc.message, 'Wrong password for user default') + def test_nonexisting_db(self): db = Database('db_not_here', autocreate=False) - with self.assertRaises(DatabaseException): + with self.assertRaises(ServerError) as cm: db.create_table(Person) + exc = cm.exception + self.assertEqual(exc.code, 81) + self.assertEqual(exc.message, "Database db_not_here doesn't exist") + def test_preexisting_db(self): db = Database(self.database.db_name, autocreate=False) db.count(Person) diff --git a/tests/test_readonly.py b/tests/test_readonly.py index facbaa0..b1aa595 100644 --- a/tests/test_readonly.py +++ b/tests/test_readonly.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals -from infi.clickhouse_orm.database import DatabaseException +from infi.clickhouse_orm.database import DatabaseException, ServerError from .base_test_with_data import * @@ -12,22 +12,31 @@ class ReadonlyTestCase(TestCaseWithData): orig_database = self.database try: self.database = Database(orig_database.db_name, username=username, readonly=True) - with self.assertRaises(DatabaseException): + with self.assertRaises(ServerError) as cm: self._insert_and_check(self._sample_data(), len(data)) + self._check_db_readonly_err(cm.exception) + self.assertEquals(self.database.count(Person), 100) list(self.database.select('SELECT * from $table', Person)) - with self.assertRaises(DatabaseException): + with self.assertRaises(ServerError) as cm: self.database.drop_table(Person) - with self.assertRaises(DatabaseException): + self._check_db_readonly_err(cm.exception) + + with self.assertRaises(ServerError) as cm: self.database.drop_database() - except DatabaseException as e: - if 'Unknown user' in six.text_type(e): + self._check_db_readonly_err(cm.exception) + except ServerError as e: + if e.code == 192 and e.message.startswith('Unknown user'): raise unittest.SkipTest('Database user "%s" is not defined' % username) else: raise finally: self.database = orig_database + def _check_db_readonly_err(self, exc): + self.assertEqual(exc.code, 164) + self.assertEqual(exc.message, 'Cannot execute query in readonly mode') + def test_readonly_db_with_default_user(self): self._test_readonly_db('default') From 3268019216cacb9f3cdf2e7fcaadae14180c25df Mon Sep 17 00:00:00 2001 From: Ivan Ladelshchikov Date: Tue, 21 Nov 2017 16:29:11 +0500 Subject: [PATCH 14/39] _fields and _writable_fields are OrderedDicts --- src/infi/clickhouse_orm/database.py | 2 +- src/infi/clickhouse_orm/migrations.py | 8 +++-- src/infi/clickhouse_orm/models.py | 42 +++++++++++++++--------- src/infi/clickhouse_orm/system_models.py | 2 +- tests/test_inheritance.py | 2 +- 5 files changed, 34 insertions(+), 22 deletions(-) diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index 92bcb09..776e93f 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -148,7 +148,7 @@ class Database(object): raise DatabaseException("You can't insert into read only and system tables") fields_list = ','.join( - ['`%s`' % name for name, _ in first_instance._writable_fields]) + ['`%s`' % name for name in first_instance.fields(writable=True)]) def gen(): buf = BytesIO() diff --git a/src/infi/clickhouse_orm/migrations.py b/src/infi/clickhouse_orm/migrations.py index 324db74..125097f 100644 --- a/src/infi/clickhouse_orm/migrations.py +++ b/src/infi/clickhouse_orm/migrations.py @@ -6,6 +6,7 @@ from .engines import MergeTree from .utils import escape from six.moves import zip +from six import iteritems import logging logger = logging.getLogger('migrations') @@ -65,7 +66,7 @@ class AlterTable(Operation): table_fields = dict(self._get_table_fields(database)) # Identify fields that were deleted from the model - deleted_fields = set(table_fields.keys()) - set(name for name, field in self.model_class._fields) + deleted_fields = set(table_fields.keys()) - set(self.model_class.fields()) for name in deleted_fields: logger.info(' Drop column %s', name) self._alter_table(database, 'DROP COLUMN %s' % name) @@ -73,7 +74,7 @@ class AlterTable(Operation): # Identify fields that were added to the model prev_name = None - for name, field in self.model_class._fields: + for name, field in iteritems(self.model_class.fields()): if name not in table_fields: logger.info(' Add column %s', name) assert prev_name, 'Cannot add a column to the beginning of the table' @@ -89,7 +90,8 @@ class AlterTable(Operation): # The order of class attributes can be changed any time, so we can't count on it # Secondly, MATERIALIZED and ALIAS fields are always at the end of the DESC, so we can't expect them to save # attribute position. Watch https://github.com/Infinidat/infi.clickhouse_orm/issues/47 - model_fields = {name: field.get_sql(with_default_expression=False) for name, field in self.model_class._fields} + model_fields = {name: field.get_sql(with_default_expression=False) + for name, field in iteritems(self.model_class.fields())} for field_name, field_sql in self._get_table_fields(database): # All fields must have been created and dropped by this moment assert field_name in model_fields, 'Model fields and table columns in disagreement' diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index c56b821..62b22a5 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -1,8 +1,9 @@ from __future__ import unicode_literals import sys +from collections import OrderedDict from logging import getLogger -from six import with_metaclass, reraise +from six import with_metaclass, reraise, iteritems import pytz from .fields import Field, StringField @@ -23,15 +24,19 @@ class ModelBase(type): def __new__(cls, name, bases, attrs): new_cls = super(ModelBase, cls).__new__(cls, str(name), bases, attrs) # Collect fields from parent classes - base_fields = [] + base_fields = dict() for base in bases: if isinstance(base, ModelBase): - base_fields += base._fields + base_fields.update(base._fields) + + fields = base_fields + # Build a list of fields, in the order they were listed in the class - fields = base_fields + [item for item in attrs.items() if isinstance(item[1], Field)] - fields.sort(key=lambda item: item[1].creation_counter) - setattr(new_cls, '_fields', fields) - setattr(new_cls, '_writable_fields', [f for f in fields if not f[1].readonly]) + fields.update({n: f for n, f in iteritems(attrs) if isinstance(f, Field)}) + fields = sorted(iteritems(fields), key=lambda item: item[1].creation_counter) + + setattr(new_cls, '_fields', OrderedDict(fields)) + setattr(new_cls, '_writable_fields', OrderedDict([f for f in fields if not f[1].readonly])) return new_cls @classmethod @@ -107,14 +112,14 @@ class Model(with_metaclass(ModelBase)): self._database = None # Assign field values from keyword arguments - for name, value in kwargs.items(): + for name, value in iteritems(kwargs): field = self.get_field(name) if field: setattr(self, name, value) else: raise AttributeError('%s does not have a field called %s' % (self.__class__.__name__, name)) # Assign default values for fields not included in the keyword arguments - for name, field in self._fields: + for name, field in iteritems(self.fields()): if name not in kwargs: setattr(self, name, field.default) @@ -174,7 +179,7 @@ class Model(with_metaclass(ModelBase)): ''' parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db_name, cls.table_name())] cols = [] - for name, field in cls._fields: + for name, field in iteritems(cls.fields()): cols.append(' %s %s' % (name, field.get_sql())) parts.append(',\n'.join(cols)) parts.append(')') @@ -201,7 +206,7 @@ class Model(with_metaclass(ModelBase)): - `database`: if given, sets the database that this instance belongs to. ''' from six import next - field_names = field_names or [name for name, field in cls._fields] + field_names = field_names or list(cls.fields()) values = iter(parse_tsv(line)) kwargs = {} for name in field_names: @@ -221,8 +226,8 @@ class Model(with_metaclass(ModelBase)): - `include_readonly`: if false, returns only fields that can be inserted into database. ''' data = self.__dict__ - fields = self._fields if include_readonly else self._writable_fields - return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in fields) + fields = self.fields(writable=not include_readonly) + return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in iteritems(fields)) def to_dict(self, include_readonly=True, field_names=None): ''' @@ -231,13 +236,13 @@ class Model(with_metaclass(ModelBase)): - `include_readonly`: if false, returns only fields that can be inserted into database. - `field_names`: an iterable of field names to return (optional) ''' - fields = self._fields if include_readonly else self._writable_fields + fields = self.fields(writable=not include_readonly) if field_names is not None: - fields = [f for f in fields if f[0] in field_names] + fields = [f for f in fields if f in field_names] data = self.__dict__ - return {name: data[name] for name, field in fields} + return {name: data[name] for name in fields} @classmethod def objects_in(cls, database): @@ -246,6 +251,11 @@ class Model(with_metaclass(ModelBase)): ''' return QuerySet(cls, database) + @classmethod + def fields(cls, writable=False): + # noinspection PyProtectedMember,PyUnresolvedReferences + return cls._writable_fields if writable else cls._fields + class BufferModel(Model): diff --git a/src/infi/clickhouse_orm/system_models.py b/src/infi/clickhouse_orm/system_models.py index 5ca3efd..62d6b68 100644 --- a/src/infi/clickhouse_orm/system_models.py +++ b/src/infi/clickhouse_orm/system_models.py @@ -126,7 +126,7 @@ class SystemPart(Model): assert isinstance(conditions, string_types), "conditions must be a string" if conditions: conditions += " AND" - field_names = ','.join([f[0] for f in cls._fields]) + field_names = ','.join(cls.fields()) return database.select("SELECT %s FROM %s WHERE %s database='%s'" % (field_names, cls.table_name(), conditions, database.db_name), model_class=cls) diff --git a/tests/test_inheritance.py b/tests/test_inheritance.py index f209995..9f5cc43 100644 --- a/tests/test_inheritance.py +++ b/tests/test_inheritance.py @@ -11,7 +11,7 @@ from infi.clickhouse_orm.engines import * class InheritanceTestCase(unittest.TestCase): def assertFieldNames(self, model_class, names): - self.assertEquals(names, [name for name, field in model_class._fields]) + self.assertEquals(names, list(model_class.fields())) def test_field_inheritance(self): self.assertFieldNames(ParentModel, ['date_field', 'int_field']) From a5f2fa4d76c133747dee3c9226651e53ca863091 Mon Sep 17 00:00:00 2001 From: Ivan Ladelshchikov Date: Tue, 21 Nov 2017 16:30:25 +0500 Subject: [PATCH 15/39] add Distributed engine --- docs/table_engines.md | 1 + src/infi/clickhouse_orm/engines.py | 60 +++++++++++++ src/infi/clickhouse_orm/models.py | 81 ++++++++++++++++- tests/test_engines.py | 139 ++++++++++++++++++++++++++++- 4 files changed, 277 insertions(+), 4 deletions(-) diff --git a/docs/table_engines.md b/docs/table_engines.md index 30aa07b..50ed056 100644 --- a/docs/table_engines.md +++ b/docs/table_engines.md @@ -16,6 +16,7 @@ The following engines are supported by the ORM: - ReplacingMergeTree / ReplicatedReplacingMergeTree - Buffer - Merge +- Distributed Simple Engines diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 57ca1ce..945c8c4 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -155,3 +155,63 @@ class Merge(Engine): def set_db_name(self, db_name): assert isinstance(db_name, six.string_types), "'db_name' parameter must be string" self.db_name = db_name + + +class Distributed(Engine): + """ + The Distributed engine by itself does not store data, + but allows distributed query processing on multiple servers. + Reading is automatically parallelized. + During a read, the table indexes on remote servers are used, if there are any. + + See full documentation here + https://clickhouse.yandex/docs/en/table_engines/distributed.html + """ + def __init__(self, cluster, table=None, db_name=None, sharding_key=None): + """ + :param cluster: what cluster to access data from + :param table: underlying table that actually stores data. + If you are not specifying any table here, ensure that it can be inferred + from your model's superclass (see models.DistributedModel.fix_engine_table) + :param db_name: which database to access data from + By default it is 'currentDatabase()' + :param sharding_key: how to distribute data among shards when inserting + straightly into Distributed table, optional + """ + self.cluster = cluster + self.table = table + self.db_name = db_name + self.sharding_key = sharding_key + + @property + def table_name(self): + # TODO: circular import is bad + from .models import ModelBase + + table = self.table + + if isinstance(table, ModelBase): + return table.table_name() + + return table + + def set_db_name(self, db_name): + assert isinstance(db_name, six.string_types), "'db_name' parameter must be string" + self.db_name = db_name + + def create_table_sql(self): + name = self.__class__.__name__ + params = self._build_sql_params() + return '%s(%s)' % (name, ', '.join(params)) + + def _build_sql_params(self): + db_name = ("`%s`" % self.db_name) if self.db_name else 'currentDatabase()' + + if self.table_name is None: + raise ValueError("Cannot create {} engine: specify an underlying table".format( + self.__class__.__name__)) + + params = [self.cluster, db_name, self.table_name] + if self.sharding_key: + params.append(self.sharding_key) + return params diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 62b22a5..6b44c37 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -9,7 +9,7 @@ import pytz from .fields import Field, StringField from .utils import parse_tsv from .query import QuerySet -from .engines import Merge +from .engines import Merge, Distributed logger = getLogger('clickhouse_orm') @@ -296,3 +296,82 @@ class MergeModel(Model): assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance" cls.engine.set_db_name(db_name) return super(MergeModel, cls).create_table_sql(db_name) + + +# TODO: base class for models that require specific engine + + +class DistributedModel(Model): + """ + Model for Distributed engine + """ + + def set_database(self, db): + assert isinstance(self.engine, Distributed), "engine must be engines.Distributed instance" + res = super(DistributedModel, self).set_database(db) + self.engine.set_db_name(db.db_name) + return res + + @classmethod + def fix_engine_table(cls): + """ + Remember: Distributed table does not store any data, just provides distributed access to it. + + So if we define a model with engine that has no defined table for data storage + (see FooDistributed below), that table cannot be successfully created. + This routine can automatically fix engine's storage table by finding the first + non-distributed model among your model's superclasses. + + >>> class Foo(Model): + ... id = UInt8Field(1) + ... + >>> class FooDistributed(Foo, DistributedModel): + ... engine = Distributed('my_cluster') + ... + >>> FooDistributed.engine.table + None + >>> FooDistributed.fix_engine() + >>> FooDistributed.engine.table + + + However if you prefer more explicit way of doing things, + you can always mention the Foo model twice without bothering with any fixes: + + >>> class FooDistributedVerbose(Foo, DistributedModel): + ... engine = Distributed('my_cluster', Foo) + >>> FooDistributedVerbose.engine.table + + + See tests.test_engines:DistributedTestCase for more examples + """ + + # apply only when engine has no table defined + if cls.engine.table_name: + return + + # find out all the superclasses of the Model that store any data + storage_models = [b for b in cls.__bases__ if issubclass(b, Model) + and not issubclass(b, DistributedModel)] + if not storage_models: + raise TypeError("When defining Distributed engine without the table_name " + "ensure that your model has a parent model") + + if len(storage_models) > 1: + raise TypeError("When defining Distributed engine without the table_name " + "ensure that your model has exactly one non-distributed superclass") + + # enable correct SQL for engine + cls.engine.table = storage_models[0] + + @classmethod + def create_table_sql(cls, db_name): + assert isinstance(cls.engine, Distributed), "engine must be engines.Distributed instance" + cls.engine.set_db_name(db_name) + + cls.fix_engine_table() + + parts = [ + 'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` AS `{0}`.`{2}`'.format( + db_name, cls.table_name(), cls.engine.table_name), + 'ENGINE = ' + cls.engine.create_table_sql()] + return '\n'.join(parts) diff --git a/tests/test_engines.py b/tests/test_engines.py index 20a1b04..30cca75 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -1,8 +1,8 @@ from __future__ import unicode_literals import unittest -from infi.clickhouse_orm.database import Database, DatabaseException -from infi.clickhouse_orm.models import Model, MergeModel +from infi.clickhouse_orm.database import Database, DatabaseException, ServerError +from infi.clickhouse_orm.models import Model, MergeModel, DistributedModel from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.engines import * @@ -10,7 +10,7 @@ import logging logging.getLogger("requests").setLevel(logging.WARNING) -class EnginesTestCase(unittest.TestCase): +class _EnginesHelperTestCase(unittest.TestCase): def setUp(self): self.database = Database('test-db') @@ -18,6 +18,8 @@ class EnginesTestCase(unittest.TestCase): def tearDown(self): self.database.drop_database() + +class EnginesTestCase(_EnginesHelperTestCase): def _create_and_insert(self, model_class): self.database.create_table(model_class) self.database.insert([ @@ -133,3 +135,134 @@ class SampleModel(Model): event_count = UInt16Field() event_version = Int8Field() event_uversion = UInt8Field(materialized='abs(event_version)') + + +class DistributedTestCase(_EnginesHelperTestCase): + def test_without_table_name(self): + engine = Distributed('my_cluster') + + with self.assertRaises(ValueError) as cm: + engine.create_table_sql() + + exc = cm.exception + self.assertEqual(str(exc), 'Cannot create Distributed engine: specify an underlying table') + + def test_with_table_name(self): + engine = Distributed('my_cluster', 'foo') + sql = engine.create_table_sql() + self.assertEqual(sql, 'Distributed(my_cluster, currentDatabase(), foo)') + + class TestModel(SampleModel): + engine = TinyLog() + + def _create_distributed(self, shard_name, underlying=TestModel): + class TestDistributedModel(DistributedModel, underlying): + engine = Distributed(shard_name, underlying) + + self.database.create_table(underlying) + self.database.create_table(TestDistributedModel) + return TestDistributedModel + + def test_bad_cluster_name(self): + d_model = self._create_distributed('cluster_name') + with self.assertRaises(ServerError) as cm: + self.database.count(d_model) + + exc = cm.exception + self.assertEqual(exc.code, 170) + self.assertEqual(exc.message, "Requested cluster 'cluster_name' not found") + + def test_verbose_engine_two_superclasses(self): + class TestModel2(SampleModel): + engine = Log() + + class TestDistributedModel(DistributedModel, self.TestModel, TestModel2): + engine = Distributed('test_shard_localhost', self.TestModel) + + self.database.create_table(self.TestModel) + self.database.create_table(TestDistributedModel) + self.assertEqual(self.database.count(TestDistributedModel), 0) + + def test_minimal_engine(self): + class TestDistributedModel(DistributedModel, self.TestModel): + engine = Distributed('test_shard_localhost') + + self.database.create_table(self.TestModel) + self.database.create_table(TestDistributedModel) + + self.assertEqual(self.database.count(TestDistributedModel), 0) + + def test_minimal_engine_two_superclasses(self): + class TestModel2(SampleModel): + engine = Log() + + class TestDistributedModel(DistributedModel, self.TestModel, TestModel2): + engine = Distributed('test_shard_localhost') + + self.database.create_table(self.TestModel) + with self.assertRaises(TypeError) as cm: + self.database.create_table(TestDistributedModel) + + exc = cm.exception + self.assertEqual(str(exc), 'When defining Distributed engine without the table_name ensure ' + 'that your model has exactly one non-distributed superclass') + + def test_minimal_engine_no_superclasses(self): + class TestDistributedModel(DistributedModel): + engine = Distributed('test_shard_localhost') + + self.database.create_table(self.TestModel) + with self.assertRaises(TypeError) as cm: + self.database.create_table(TestDistributedModel) + + exc = cm.exception + self.assertEqual(str(exc), 'When defining Distributed engine without the table_name ensure ' + 'that your model has a parent model') + + def _test_insert_select(self, local_to_distributed, test_model=TestModel, include_readonly=True): + d_model = self._create_distributed('test_shard_localhost', underlying=test_model) + + if local_to_distributed: + to_insert, to_select = test_model, d_model + else: + to_insert, to_select = d_model, test_model + + self.database.insert([ + to_insert(date='2017-01-01', event_id=1, event_group=1, event_count=1, event_version=1), + to_insert(date='2017-01-02', event_id=2, event_group=2, event_count=2, event_version=2) + ]) + # event_uversion is materialized field. So * won't select it and it will be zero + res = self.database.select('SELECT *, event_uversion FROM $table ORDER BY event_id', + model_class=to_select) + res = [row for row in res] + self.assertEqual(2, len(res)) + self.assertDictEqual({ + 'date': datetime.date(2017, 1, 1), + 'event_id': 1, + 'event_group': 1, + 'event_count': 1, + 'event_version': 1, + 'event_uversion': 1 + }, res[0].to_dict(include_readonly=include_readonly)) + self.assertDictEqual({ + 'date': datetime.date(2017, 1, 2), + 'event_id': 2, + 'event_group': 2, + 'event_count': 2, + 'event_version': 2, + 'event_uversion': 2 + }, res[1].to_dict(include_readonly=include_readonly)) + + @unittest.skip("Bad support of materialized fields in Distributed tables " + "https://groups.google.com/forum/#!topic/clickhouse/XEYRRwZrsSc") + def test_insert_distributed_select_local(self): + return self._test_insert_select(local_to_distributed=False) + + def test_insert_local_select_distributed(self): + return self._test_insert_select(local_to_distributed=True) + + def _test_insert_distributed_select_local_no_materialized_fields(self): + class TestModel2(self.TestModel): + event_uversion = UInt8Field(readonly=True) + + return self._test_insert_select(local_to_distributed=False, test_model=TestModel2, include_readonly=False) From 99de0f66379301cc753f0f89a1d9bfda633361cd Mon Sep 17 00:00:00 2001 From: Ivan Ladelshchikov Date: Wed, 14 Mar 2018 12:47:46 +0400 Subject: [PATCH 16/39] tests: fix sampling expression for v1.1.54310+ see https://github.com/yandex/ClickHouse/blob/master/CHANGELOG.md#clickhouse-release-1154310-2017-11-01 and the commit that made the change https://github.com/yandex/ClickHouse/commit/75c65c7b59be5620e1e4db305eb4e879531f37f3#diff-fa7377955af063c4fa16e20f2595937bR120 --- tests/test_engines.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_engines.py b/tests/test_engines.py index 30cca75..8818815 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -33,7 +33,9 @@ class EnginesTestCase(_EnginesHelperTestCase): def test_merge_tree_with_sampling(self): class TestModel(SampleModel): - engine = MergeTree('date', ('date', 'event_id', 'event_group', 'intHash32(event_id)'), sampling_expr='intHash32(event_id)') + engine = MergeTree('date', + ('date', 'event_id', 'event_group', 'intHash32(event_id)'), + sampling_expr='intHash32(event_id)') self._create_and_insert(TestModel) def test_merge_tree_with_granularity(self): From 6673841bf9581050cdc7558055874c51768e179a Mon Sep 17 00:00:00 2001 From: Ivan Ladelshchikov Date: Wed, 14 Mar 2018 13:01:41 +0400 Subject: [PATCH 17/39] tests: fix readonly error messages for v1.1.54335+ the commit that made the changes https://github.com/yandex/ClickHouse/commit/59aa1359c866aefbf6690dde31295c264d96301a --- tests/test_readonly.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/test_readonly.py b/tests/test_readonly.py index b1aa595..62a2bf5 100644 --- a/tests/test_readonly.py +++ b/tests/test_readonly.py @@ -20,11 +20,11 @@ class ReadonlyTestCase(TestCaseWithData): list(self.database.select('SELECT * from $table', Person)) with self.assertRaises(ServerError) as cm: self.database.drop_table(Person) - self._check_db_readonly_err(cm.exception) + self._check_db_readonly_err(cm.exception, drop_table=True) with self.assertRaises(ServerError) as cm: self.database.drop_database() - self._check_db_readonly_err(cm.exception) + self._check_db_readonly_err(cm.exception, drop_table=True) except ServerError as e: if e.code == 192 and e.message.startswith('Unknown user'): raise unittest.SkipTest('Database user "%s" is not defined' % username) @@ -33,9 +33,12 @@ class ReadonlyTestCase(TestCaseWithData): finally: self.database = orig_database - def _check_db_readonly_err(self, exc): + def _check_db_readonly_err(self, exc, drop_table=None): self.assertEqual(exc.code, 164) - self.assertEqual(exc.message, 'Cannot execute query in readonly mode') + if drop_table: + self.assertEqual(exc.message, 'Cannot drop table in readonly mode') + else: + self.assertEqual(exc.message, 'Cannot insert into table in readonly mode') def test_readonly_db_with_default_user(self): self._test_readonly_db('default') From d0aba55b0c34240067f71c060e59cdf2d7cc0f17 Mon Sep 17 00:00:00 2001 From: Ivan Ladelshchikov Date: Fri, 6 Apr 2018 13:09:48 +0400 Subject: [PATCH 18/39] tests: fix Distributed with bad cluster name for v1.1.54370 --- tests/test_engines.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_engines.py b/tests/test_engines.py index 8818815..47b4cd6 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -166,8 +166,8 @@ class DistributedTestCase(_EnginesHelperTestCase): return TestDistributedModel def test_bad_cluster_name(self): - d_model = self._create_distributed('cluster_name') with self.assertRaises(ServerError) as cm: + d_model = self._create_distributed('cluster_name') self.database.count(d_model) exc = cm.exception From f0e0c8035a69f222401a2378e3125fb928c33db6 Mon Sep 17 00:00:00 2001 From: Ivan Ladelshchikov Date: Fri, 6 Apr 2018 13:45:05 +0400 Subject: [PATCH 19/39] add instructions to test with tox --- docs/contributing.md | 4 ++++ tox.ini | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/contributing.md b/docs/contributing.md index cb64e57..c173cb9 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -30,6 +30,10 @@ To see test coverage information run: bin/nosetests --with-coverage --cover-package=infi.clickhouse_orm +To test with tox, ensure that the setup.py is present (otherwise run `bin/buildout buildout:develop= setup.py`) and run: + + pip install tox + tox --- diff --git a/tox.ini b/tox.ini index e012173..81299c8 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py27, py35 +envlist = py27, py35, pypy [testenv] deps = From 723bd0354809fd9efaafedb76175d72b3bc6208b Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sat, 7 Apr 2018 15:14:14 +0300 Subject: [PATCH 20/39] TRIVIAL fix test --- tests/test_engines.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_engines.py b/tests/test_engines.py index 47b4cd6..510ca04 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -33,9 +33,7 @@ class EnginesTestCase(_EnginesHelperTestCase): def test_merge_tree_with_sampling(self): class TestModel(SampleModel): - engine = MergeTree('date', - ('date', 'event_id', 'event_group', 'intHash32(event_id)'), - sampling_expr='intHash32(event_id)') + engine = MergeTree('date', ('date', 'event_id', 'event_group', 'intHash32(event_id)'), sampling_expr='intHash32(event_id)') self._create_and_insert(TestModel) def test_merge_tree_with_granularity(self): From 6fa6786a9ccb6243427e0bb0aca8525a29e94346 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sat, 21 Apr 2018 12:10:30 +0300 Subject: [PATCH 21/39] Update docs --- CHANGELOG.md | 3 + docs/class_reference.md | 168 ++++++++++++++++++++++++++++++ docs/toc.md | 2 + scripts/generate_ref.py | 2 +- src/infi/clickhouse_orm/models.py | 5 + 5 files changed, 179 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b716c4..f16c12a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ Unreleased ---------- - Add support for compound filters with Q objects (desile) - Add support for BETWEEN operator (desile) +- Distributed engine support (tsionyx) +- `_fields` and `_writable_fields` are OrderedDicts - note that this might break backwards compatibility (tsionyx) +- Improve error messages returned from the database with the `ServerError` class (tsionyx) v0.9.8 ------ diff --git a/docs/class_reference.md b/docs/class_reference.md index 7bb9e55..ea75f33 100644 --- a/docs/class_reference.md +++ b/docs/class_reference.md @@ -156,6 +156,14 @@ Returns the SQL command for creating a table for this model. Returns the SQL command for deleting this model's table. +#### Model.fields(writable=False) + + +Returns an `OrderedDict` of the model's fields (from name to `Field` instance). +If `writable` is true, only writable fields are included. +Callers should not modify the dictionary. + + #### Model.from_tsv(line, field_names=None, timezone_in_use=UTC, database=None) @@ -245,6 +253,14 @@ Returns the SQL command for creating a table for this model. Returns the SQL command for deleting this model's table. +#### BufferModel.fields(writable=False) + + +Returns an `OrderedDict` of the model's fields (from name to `Field` instance). +If `writable` is true, only writable fields are included. +Callers should not modify the dictionary. + + #### BufferModel.from_tsv(line, field_names=None, timezone_in_use=UTC, database=None) @@ -309,6 +325,132 @@ Returns the instance's column values as a tab-separated line. A newline is not i - `include_readonly`: if false, returns only fields that can be inserted into database. +### DistributedModel + +Extends Model + + +Model for Distributed engine + +#### DistributedModel(**kwargs) + + +Creates a model instance, using keyword arguments as field values. +Since values are immediately converted to their Pythonic type, +invalid values will cause a `ValueError` to be raised. +Unrecognized field names will cause an `AttributeError`. + + +#### DistributedModel.create_table_sql(db_name) + + +#### DistributedModel.drop_table_sql(db_name) + + +Returns the SQL command for deleting this model's table. + + +#### DistributedModel.fields(writable=False) + + +Returns an `OrderedDict` of the model's fields (from name to `Field` instance). +If `writable` is true, only writable fields are included. +Callers should not modify the dictionary. + + +#### DistributedModel.fix_engine_table() + + +Remember: Distributed table does not store any data, just provides distributed access to it. + +So if we define a model with engine that has no defined table for data storage +(see FooDistributed below), that table cannot be successfully created. +This routine can automatically fix engine's storage table by finding the first +non-distributed model among your model's superclasses. + +>>> class Foo(Model): +... id = UInt8Field(1) +... +>>> class FooDistributed(Foo, DistributedModel): +... engine = Distributed('my_cluster') +... +>>> FooDistributed.engine.table +None +>>> FooDistributed.fix_engine() +>>> FooDistributed.engine.table + + +However if you prefer more explicit way of doing things, +you can always mention the Foo model twice without bothering with any fixes: + +>>> class FooDistributedVerbose(Foo, DistributedModel): +... engine = Distributed('my_cluster', Foo) +>>> FooDistributedVerbose.engine.table + + +See tests.test_engines:DistributedTestCase for more examples + + +#### DistributedModel.from_tsv(line, field_names=None, timezone_in_use=UTC, database=None) + + +Create a model instance from a tab-separated line. The line may or may not include a newline. +The `field_names` list must match the fields defined in the model, but does not have to include all of them. +If omitted, it is assumed to be the names of all fields in the model, in order of definition. + +- `line`: the TSV-formatted data. +- `field_names`: names of the model fields in the data. +- `timezone_in_use`: the timezone to use when parsing dates and datetimes. +- `database`: if given, sets the database that this instance belongs to. + + +#### get_database() + + +Gets the `Database` that this model instance belongs to. +Returns `None` unless the instance was read from the database or written to it. + + +#### get_field(name) + + +Gets a `Field` instance given its name, or `None` if not found. + + +#### DistributedModel.objects_in(database) + + +Returns a `QuerySet` for selecting instances of this model class. + + +#### set_database(db) + + +#### DistributedModel.table_name() + + +Returns the model's database table name. By default this is the +class name converted to lowercase. Override this if you want to use +a different table name. + + +#### to_dict(include_readonly=True, field_names=None) + + +Returns the instance's column values as a dict. + +- `include_readonly`: if false, returns only fields that can be inserted into database. +- `field_names`: an iterable of field names to return (optional) + + +#### to_tsv(include_readonly=True) + + +Returns the instance's column values as a tab-separated line. A newline is not included. + +- `include_readonly`: if false, returns only fields that can be inserted into database. + + infi.clickhouse_orm.fields -------------------------- @@ -525,6 +667,32 @@ https://clickhouse.yandex/docs/en/single/index.html#document-table_engines/merge #### Merge(table_regex) +### Distributed + +Extends Engine + + +The Distributed engine by itself does not store data, +but allows distributed query processing on multiple servers. +Reading is automatically parallelized. +During a read, the table indexes on remote servers are used, if there are any. + +See full documentation here +https://clickhouse.yandex/docs/en/table_engines/distributed.html + +#### Distributed(cluster, table=None, db_name=None, sharding_key=None) + + +:param cluster: what cluster to access data from +:param table: underlying table that actually stores data. +If you are not specifying any table here, ensure that it can be inferred +from your model's superclass (see models.DistributedModel.fix_engine_table) +:param db_name: which database to access data from +By default it is 'currentDatabase()' +:param sharding_key: how to distribute data among shards when inserting +straightly into Distributed table, optional + + ### CollapsingMergeTree Extends MergeTree diff --git a/docs/toc.md b/docs/toc.md index 3575c3f..6b4a1b1 100644 --- a/docs/toc.md +++ b/docs/toc.md @@ -58,6 +58,7 @@ * [infi.clickhouse_orm.models](class_reference.md#infi.clickhouse_orm.models) * [Model](class_reference.md#model) * [BufferModel](class_reference.md#buffermodel) + * [DistributedModel](class_reference.md#distributedmodel) * [infi.clickhouse_orm.fields](class_reference.md#infi.clickhouse_orm.fields) * [Field](class_reference.md#field) * [StringField](class_reference.md#stringfield) @@ -89,6 +90,7 @@ * [MergeTree](class_reference.md#mergetree) * [Buffer](class_reference.md#buffer) * [Merge](class_reference.md#merge) + * [Distributed](class_reference.md#distributed) * [CollapsingMergeTree](class_reference.md#collapsingmergetree) * [SummingMergeTree](class_reference.md#summingmergetree) * [ReplacingMergeTree](class_reference.md#replacingmergetree) diff --git a/scripts/generate_ref.py b/scripts/generate_ref.py index d2863fd..6a89d1c 100644 --- a/scripts/generate_ref.py +++ b/scripts/generate_ref.py @@ -132,7 +132,7 @@ if __name__ == '__main__': print '===============' print module_doc([database.Database, database.DatabaseException]) - module_doc([models.Model, models.BufferModel]) + module_doc([models.Model, models.BufferModel, models.DistributedModel]) module_doc([fields.Field] + all_subclasses(fields.Field), False) module_doc([engines.Engine] + all_subclasses(engines.Engine), False) module_doc([query.QuerySet, query.AggregateQuerySet]) diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 6b44c37..9e5fe86 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -253,6 +253,11 @@ class Model(with_metaclass(ModelBase)): @classmethod def fields(cls, writable=False): + ''' + Returns an `OrderedDict` of the model's fields (from name to `Field` instance). + If `writable` is true, only writable fields are included. + Callers should not modify the dictionary. + ''' # noinspection PyProtectedMember,PyUnresolvedReferences return cls._writable_fields if writable else cls._fields From 66eda2214c24c78239fd6ed59824f90835af90fb Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sat, 21 Apr 2018 13:14:15 +0300 Subject: [PATCH 22/39] Make tests pass --- src/infi/clickhouse_orm/database.py | 8 ++++++-- src/infi/clickhouse_orm/engines.py | 21 ++++++--------------- src/infi/clickhouse_orm/models.py | 10 +++------- tests/test_engines.py | 17 ++++------------- 4 files changed, 19 insertions(+), 37 deletions(-) diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index c6610bb..aa53b29 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -328,8 +328,12 @@ class Database(object): return pytz.utc def _get_server_version(self, as_tuple=True): - r = self._send('SELECT version();') - ver = r.text + try: + r = self._send('SELECT version();') + ver = r.text + except ServerError as e: + logger.exception('Cannot determine server version (%s), assuming 1.1.0', e) + ver = '1.1.0' return tuple(int(n) for n in ver.split('.')) if as_tuple else ver def _is_connection_readonly(self): diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index d0714a0..97e0890 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -197,7 +197,7 @@ class Merge(Engine): self.table_regex = table_regex def create_table_sql(self, db): - self.db_name = None + return "Merge(`%s`, '%s')" % (db.db_name, self.table_regex) + return "Merge(`%s`, '%s')" % (db.db_name, self.table_regex) class Distributed(Engine): @@ -210,20 +210,17 @@ class Distributed(Engine): See full documentation here https://clickhouse.yandex/docs/en/table_engines/distributed.html """ - def __init__(self, cluster, table=None, db_name=None, sharding_key=None): + def __init__(self, cluster, table=None, sharding_key=None): """ :param cluster: what cluster to access data from :param table: underlying table that actually stores data. If you are not specifying any table here, ensure that it can be inferred from your model's superclass (see models.DistributedModel.fix_engine_table) - :param db_name: which database to access data from - By default it is 'currentDatabase()' :param sharding_key: how to distribute data among shards when inserting straightly into Distributed table, optional """ self.cluster = cluster self.table = table - self.db_name = db_name self.sharding_key = sharding_key @property @@ -238,23 +235,17 @@ class Distributed(Engine): return table - def set_db_name(self, db_name): - assert isinstance(db_name, six.string_types), "'db_name' parameter must be string" - self.db_name = db_name - - def create_table_sql(self): + def create_table_sql(self, db): name = self.__class__.__name__ - params = self._build_sql_params() + params = self._build_sql_params(db) return '%s(%s)' % (name, ', '.join(params)) - def _build_sql_params(self): - db_name = ("`%s`" % self.db_name) if self.db_name else 'currentDatabase()' - + def _build_sql_params(self, db): if self.table_name is None: raise ValueError("Cannot create {} engine: specify an underlying table".format( self.__class__.__name__)) - params = [self.cluster, db_name, self.table_name] + params = ["`%s`" % p for p in [self.cluster, db.db_name, self.table_name]] if self.sharding_key: params.append(self.sharding_key) return params diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index fb6a286..675a2ce 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -304,7 +304,6 @@ class DistributedModel(Model): def set_database(self, db): assert isinstance(self.engine, Distributed), "engine must be engines.Distributed instance" res = super(DistributedModel, self).set_database(db) - self.engine.set_db_name(db.db_name) return res @classmethod @@ -359,16 +358,13 @@ class DistributedModel(Model): cls.engine.table = storage_models[0] @classmethod - def create_table_sql(cls, db_name): + def create_table_sql(cls, db): assert isinstance(cls.engine, Distributed), "engine must be engines.Distributed instance" - cls.engine.set_db_name(db_name) cls.fix_engine_table() parts = [ 'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` AS `{0}`.`{2}`'.format( - db_name, cls.table_name(), cls.engine.table_name), - 'ENGINE = ' + cls.engine.create_table_sql()] + db.db_name, cls.table_name(), cls.engine.table_name), + 'ENGINE = ' + cls.engine.create_table_sql(db)] return '\n'.join(parts) - cls.engine.set_db_name(db_name) - return super(MergeModel, cls).create_table_sql(db_name) diff --git a/tests/test_engines.py b/tests/test_engines.py index dbae589..7bc4e26 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -1,18 +1,9 @@ from __future__ import unicode_literals import unittest -<<<<<<< HEAD +from infi.clickhouse_orm.system_models import SystemPart from infi.clickhouse_orm.database import Database, DatabaseException, ServerError from infi.clickhouse_orm.models import Model, MergeModel, DistributedModel -||||||| merged common ancestors -from infi.clickhouse_orm.database import Database, DatabaseException -from infi.clickhouse_orm.models import Model, MergeModel -======= -from infi.clickhouse_orm.system_models import SystemPart - -from infi.clickhouse_orm.database import Database, DatabaseException -from infi.clickhouse_orm.models import Model, MergeModel ->>>>>>> 7fb05896926acab163a1f373092bf22cc0f3cb4f from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.engines import * @@ -167,15 +158,15 @@ class DistributedTestCase(_EnginesHelperTestCase): engine = Distributed('my_cluster') with self.assertRaises(ValueError) as cm: - engine.create_table_sql() + engine.create_table_sql(self.database) exc = cm.exception self.assertEqual(str(exc), 'Cannot create Distributed engine: specify an underlying table') def test_with_table_name(self): engine = Distributed('my_cluster', 'foo') - sql = engine.create_table_sql() - self.assertEqual(sql, 'Distributed(my_cluster, currentDatabase(), foo)') + sql = engine.create_table_sql(self.database) + self.assertEqual(sql, 'Distributed(`my_cluster`, `test-db`, `foo`)') class TestModel(SampleModel): engine = TinyLog() From e791923493f559a5d1b3115e05bc14ac157a854a Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sat, 21 Apr 2018 13:48:00 +0300 Subject: [PATCH 23/39] Update docs --- CHANGELOG.md | 3 +++ docs/class_reference.md | 26 ++++++-------------------- docs/table_engines.md | 31 ++++++++++++++++--------------- docs/toc.md | 1 + 4 files changed, 26 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f16c12a..23da704 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ Unreleased - Distributed engine support (tsionyx) - `_fields` and `_writable_fields` are OrderedDicts - note that this might break backwards compatibility (tsionyx) - Improve error messages returned from the database with the `ServerError` class (tsionyx) +- Added support of custom partitioning (M1hacka) +- Added attribute `server_version` to Database class (M1hacka) +- Changed `Engine.create_table_sql()`, `Engine.drop_table_sql()`, `Model.create_table_sql()`, `Model.drop_table_sql()` parameter to db from db_name (M1hacka) v0.9.8 ------ diff --git a/docs/class_reference.md b/docs/class_reference.md index 2b38e0f..b51c89f 100644 --- a/docs/class_reference.md +++ b/docs/class_reference.md @@ -24,18 +24,6 @@ created on the ClickHouse server if it does not already exist. - `autocreate`: automatically create the database if does not exist (unless in readonly mode). -#### server_timezone - - -Contains [pytz](http://pytz.sourceforge.net/) timezone used on database server - - -#### server_version - - -Contains a version tuple of database server, for example (1, 1, 54310) - - #### count(model_class, conditions=None) @@ -353,10 +341,10 @@ invalid values will cause a `ValueError` to be raised. Unrecognized field names will cause an `AttributeError`. -#### DistributedModel.create_table_sql(db_name) +#### DistributedModel.create_table_sql(db) -#### DistributedModel.drop_table_sql(db_name) +#### DistributedModel.drop_table_sql(db) Returns the SQL command for deleting this model's table. @@ -692,15 +680,13 @@ During a read, the table indexes on remote servers are used, if there are any. See full documentation here https://clickhouse.yandex/docs/en/table_engines/distributed.html -#### Distributed(cluster, table=None, db_name=None, sharding_key=None) +#### Distributed(cluster, table=None, sharding_key=None) :param cluster: what cluster to access data from :param table: underlying table that actually stores data. If you are not specifying any table here, ensure that it can be inferred from your model's superclass (see models.DistributedModel.fix_engine_table) -:param db_name: which database to access data from -By default it is 'currentDatabase()' :param sharding_key: how to distribute data among shards when inserting straightly into Distributed table, optional @@ -709,21 +695,21 @@ straightly into Distributed table, optional Extends MergeTree -#### CollapsingMergeTree(date_col, key_cols, sign_col, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) +#### CollapsingMergeTree(date_col, order_by, sign_col, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) ### SummingMergeTree Extends MergeTree -#### SummingMergeTree(date_col, key_cols, summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) +#### SummingMergeTree(date_col, order_by, summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) ### ReplacingMergeTree Extends MergeTree -#### ReplacingMergeTree(date_col, key_cols, ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) +#### ReplacingMergeTree(date_col, order_by, ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) infi.clickhouse_orm.query diff --git a/docs/table_engines.md b/docs/table_engines.md index 7c4f42a..1ad3770 100644 --- a/docs/table_engines.md +++ b/docs/table_engines.md @@ -27,7 +27,7 @@ Simple Engines engine = engines.TinyLog() engine = engines.Log() - + engine = engines.Memory() @@ -58,16 +58,17 @@ For a `ReplacingMergeTree` you can optionally specify the version column: ### Custom partitioning ClickHouse supports [custom partitioning](https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/) expressions since version 1.1.54310 -You can use custom partitioning with any MergeTree family engine. -To set custom partitioning: -* skip date_col (first) constructor parameter or fill it with None value -* add name to order_by (second) constructor parameter -* add partition_key parameter. It should be a tuple of expressions, by which partition are built. -Standard partitioning by date column can be added using toYYYYMM(date) function. +You can use custom partitioning with any `MergeTree` family engine. +To set custom partitioning: + +* Instead of specifying the `date_col` (first) constructor parameter, pass a tuple of field names or expressions in the `order_by` (second) constructor parameter. +* Add `partition_key` parameter. It should be a tuple of expressions, by which partitions are built. + +Standard monthly partitioning by date column can be specified using the `toYYYYMM(date)` function. Example: - + engine = engines.ReplacingMergeTree(order_by=('OrderID', 'EventDate', 'BannerID'), ver_col='Version', partition_key=('toYYYYMM(EventDate)', 'BannerID')) @@ -85,7 +86,7 @@ Buffer Engine ------------- A `Buffer` engine is only used in conjunction with a `BufferModel`. -The model should be a subclass of both `models.BufferModel` and the main model. +The model should be a subclass of both `models.BufferModel` and the main model. The main model is also passed to the engine: class PersonBuffer(models.BufferModel, Person): @@ -94,8 +95,8 @@ The main model is also passed to the engine: Additional buffer parameters can optionally be specified: - engine = engines.Buffer(Person, num_layers=16, min_time=10, - max_time=100, min_rows=10000, max_rows=1000000, + engine = engines.Buffer(Person, num_layers=16, min_time=10, + max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000) Then you can insert objects into Buffer model and they will be handled by ClickHouse properly: @@ -104,13 +105,13 @@ Then you can insert objects into Buffer model and they will be handled by ClickH suzy = PersonBuffer(first_name='Suzy', last_name='Jones') dan = PersonBuffer(first_name='Dan', last_name='Schwartz') db.insert([dan, suzy]) - - + + Merge Engine ------------- -[ClickHouse docs](https://clickhouse.yandex/docs/en/single/index.html#merge) -A `Merge` engine is only used in conjunction with a `MergeModel`. +[ClickHouse docs](https://clickhouse.yandex/docs/en/single/index.html#merge) +A `Merge` engine is only used in conjunction with a `MergeModel`. This table does not store data itself, but allows reading from any number of other tables simultaneously. So you can't insert in it. Engine parameter specifies re2 (similar to PCRE) regular expression, from which data is selected. diff --git a/docs/toc.md b/docs/toc.md index 6b4a1b1..c9c5ee9 100644 --- a/docs/toc.md +++ b/docs/toc.md @@ -36,6 +36,7 @@ * [Table Engines](table_engines.md#table-engines) * [Simple Engines](table_engines.md#simple-engines) * [Engines in the MergeTree Family](table_engines.md#engines-in-the-mergetree-family) + * [Custom partitioning](table_engines.md#custom-partitioning) * [Data Replication](table_engines.md#data-replication) * [Buffer Engine](table_engines.md#buffer-engine) * [Merge Engine](table_engines.md#merge-engine) From 338d686b4c0107e997253d8174cb98c50b3e8a5c Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sat, 21 Apr 2018 13:53:06 +0300 Subject: [PATCH 24/39] Update docs --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 23da704..1c130e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Unreleased - Added support of custom partitioning (M1hacka) - Added attribute `server_version` to Database class (M1hacka) - Changed `Engine.create_table_sql()`, `Engine.drop_table_sql()`, `Model.create_table_sql()`, `Model.drop_table_sql()` parameter to db from db_name (M1hacka) +- Fix parsing of datetime column type when it includes a timezone (M1hacka) v0.9.8 ------ From 20e609f95224c84edc84e3c02fcaf5dc89a0cc37 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sat, 21 Apr 2018 15:23:00 +0300 Subject: [PATCH 25/39] Update docs --- docs/class_reference.md | 2 +- docs/field_types.md | 4 ++- docs/models_and_databases.md | 38 ++++++++++++++++++++++-- docs/system_models.md | 4 +-- docs/table_engines.md | 5 ++-- docs/toc.md | 4 +++ src/infi/clickhouse_orm/engines.py | 2 +- src/infi/clickhouse_orm/system_models.py | 8 ++--- 8 files changed, 54 insertions(+), 13 deletions(-) diff --git a/docs/class_reference.md b/docs/class_reference.md index b51c89f..edcf4f9 100644 --- a/docs/class_reference.md +++ b/docs/class_reference.md @@ -649,7 +649,7 @@ Extends Engine Buffers the data to write in RAM, periodically flushing it to another table. Must be used in conjuction with a `BufferModel`. -Read more [here](https://clickhouse.yandex/reference_en.html#Buffer). +Read more [here](https://clickhouse.yandex/docs/en/table_engines/buffer/). #### Buffer(main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000) diff --git a/docs/field_types.md b/docs/field_types.md index 80d0f2a..4c447eb 100644 --- a/docs/field_types.md +++ b/docs/field_types.md @@ -1,6 +1,8 @@ Field Types =========== +See: [ClickHouse Documentation](https://clickhouse.yandex/docs/en/data_types/) + Currently the following field types are supported: | Class | DB Type | Pythonic Type | Comments @@ -85,7 +87,7 @@ Working with materialized and alias fields ClickHouse provides an opportunity to create MATERIALIZED and ALIAS Fields. -See documentation [here](https://clickhouse.yandex/reference_en.html#Default%20values). +See documentation [here](https://clickhouse.yandex/docs/en/query_language/queries/#default-values). Both field types can't be inserted into the database directly, so they are ignored when using the `Database.insert()` method. ClickHouse does not return the field values if you use `"SELECT * FROM ..."` - you have to list these field names explicitly in the query. diff --git a/docs/models_and_databases.md b/docs/models_and_databases.md index 2b84f99..1919989 100644 --- a/docs/models_and_databases.md +++ b/docs/models_and_databases.md @@ -21,9 +21,43 @@ Models are defined in a way reminiscent of Django's ORM: engine = engines.MergeTree('birthday', ('first_name', 'last_name', 'birthday')) -It is possible to provide a default value for a field, instead of its "natural" default (empty string for string fields, zero for numeric fields etc.). Alternatively it is possible to pass alias or materialized parameters (see below for usage examples). Only one of `default`, `alias` and `materialized` parameters can be provided. +The database columns in the database table are represented by model fields. Each field has a type, which matches the type of the corresponding database column. You can see all the supported fields types [here](field_types.md). -For more details see [Field Types](field_types.md) and [Table Engines](table_engines.md). +A model must have an `engine`, which determines how its table is stored on disk (if at all), and what capabilities it has. For more details about table engines see [here](table_engines.md). + +### Default values + +Each field has a "natural" default value - empty string for string fields, zero for numeric fields etc. To specify a different value use the `default` parameter: + + first_name = fields.StringField(default="anonymous") + +### Null values + +To allow null values in a field, wrap it inside a `NullableField`: + + birthday = fields.NullableField(fields.DateField()) + +In this case, the default value for that fields becomes `null` unless otherwide specified. + +### Materialized fields + +The value of a materialized field is calculated from other fields in the model. For example: + + year_born = fields.Int16Field(materialized="toYear(birthday)") + +Materialized fields are read-only, meaning that their values are not sent to the database when inserting records. + +It is not possible to specify a default value for a materialized field. + +### Alias fields + +An alias field is is simply a different way to call another field in the model. For example: + + date_born = field.DateField(alias="birthday") + +Alias fields are read-only, meaning that their values are not sent to the database when inserting records. + +It is not possible to specify a default value for an alias field. ### Table Names diff --git a/docs/system_models.md b/docs/system_models.md index beed825..56ae447 100644 --- a/docs/system_models.md +++ b/docs/system_models.md @@ -1,7 +1,7 @@ System Models ============= -[Clickhouse docs](https://clickhouse.yandex/reference_en.html#System%20tables). +[Clickhouse docs](https://clickhouse.yandex/docs/en/system_tables/). System models are read only models for implementing part of the system's functionality, and for providing access to information about how the system is working. @@ -14,7 +14,7 @@ Currently the following system models are supported: Partitions and Parts -------------------- -[ClickHouse docs](https://clickhouse.yandex/reference_en.html#Manipulations%20with%20partitions%20and%20parts). +[ClickHouse docs](https://clickhouse.yandex/docs/en/query_language/queries/#manipulations-with-partitions-and-parts). A partition in a table is data for a single calendar month. Table "system.parts" contains information about each part. diff --git a/docs/table_engines.md b/docs/table_engines.md index 1ad3770..d4ba905 100644 --- a/docs/table_engines.md +++ b/docs/table_engines.md @@ -1,7 +1,7 @@ Table Engines ============= -See: [ClickHouse Documentation](https://clickhouse.yandex/reference_en.html#Table+engines) +See: [ClickHouse Documentation](https://clickhouse.yandex/docs/en/table_engines/) Each model must have an engine instance, used when creating the table in ClickHouse. @@ -110,7 +110,8 @@ Then you can insert objects into Buffer model and they will be handled by ClickH Merge Engine ------------- -[ClickHouse docs](https://clickhouse.yandex/docs/en/single/index.html#merge) +[ClickHouse docs](https://clickhouse.yandex/docs/en/table_engines/merge/) + A `Merge` engine is only used in conjunction with a `MergeModel`. This table does not store data itself, but allows reading from any number of other tables simultaneously. So you can't insert in it. Engine parameter specifies re2 (similar to PCRE) regular expression, from which data is selected. diff --git a/docs/toc.md b/docs/toc.md index c9c5ee9..2ed6e89 100644 --- a/docs/toc.md +++ b/docs/toc.md @@ -5,6 +5,10 @@ * [Models and Databases](models_and_databases.md#models-and-databases) * [Defining Models](models_and_databases.md#defining-models) + * [Default values](models_and_databases.md#default-values) + * [Null values](models_and_databases.md#null-values) + * [Materialized fields](models_and_databases.md#materialized-fields) + * [Alias fields](models_and_databases.md#alias-fields) * [Table Names](models_and_databases.md#table-names) * [Using Models](models_and_databases.md#using-models) * [Inserting to the Database](models_and_databases.md#inserting-to-the-database) diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 97e0890..b1aa8f7 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -159,7 +159,7 @@ class Buffer(Engine): """ Buffers the data to write in RAM, periodically flushing it to another table. Must be used in conjuction with a `BufferModel`. - Read more [here](https://clickhouse.yandex/reference_en.html#Buffer). + Read more [here](https://clickhouse.yandex/docs/en/table_engines/buffer/). """ #Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes) diff --git a/src/infi/clickhouse_orm/system_models.py b/src/infi/clickhouse_orm/system_models.py index 8a5217a..7341d14 100644 --- a/src/infi/clickhouse_orm/system_models.py +++ b/src/infi/clickhouse_orm/system_models.py @@ -1,6 +1,6 @@ """ -This file contains system readonly models that can be got from database -https://clickhouse.yandex/reference_en.html#System tables +This file contains system readonly models that can be got from the database +https://clickhouse.yandex/docs/en/system_tables/ """ from __future__ import unicode_literals from six import string_types @@ -15,7 +15,7 @@ class SystemPart(Model): """ Contains information about parts of a table in the MergeTree family. This model operates only fields, described in the reference. Other fields are ignored. - https://clickhouse.yandex/reference_en.html#system.parts + https://clickhouse.yandex/docs/en/system_tables/system.parts/ """ OPERATIONS = frozenset({'DETACH', 'DROP', 'ATTACH', 'FREEZE', 'FETCH'}) @@ -56,7 +56,7 @@ class SystemPart(Model): """ Next methods return SQL for some operations, which can be done with partitions - https://clickhouse.yandex/reference_en.html#Manipulations with partitions and parts + https://clickhouse.yandex/docs/en/query_language/queries/#manipulations-with-partitions-and-parts """ def _partition_operation_sql(self, operation, settings=None, from_part=None): """ From 3976366913a55bbd5693abcfb9f1533b18377d46 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sat, 21 Apr 2018 15:29:29 +0300 Subject: [PATCH 26/39] Update docs --- docs/querysets.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/querysets.md b/docs/querysets.md index e79fcd3..260e66c 100644 --- a/docs/querysets.md +++ b/docs/querysets.md @@ -56,6 +56,7 @@ There are different operators that can be used, by passing `__ Date: Sun, 22 Apr 2018 09:03:31 +0300 Subject: [PATCH 27/39] Update docs --- docs/models_and_databases.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/models_and_databases.md b/docs/models_and_databases.md index 1919989..230fbbb 100644 --- a/docs/models_and_databases.md +++ b/docs/models_and_databases.md @@ -8,7 +8,7 @@ Database instances connect to a specific ClickHouse database for running queries Defining Models --------------- -Models are defined in a way reminiscent of Django's ORM: +Models are defined in a way reminiscent of Django's ORM, by subclassing `Model`: from infi.clickhouse_orm import models, fields, engines @@ -21,7 +21,7 @@ Models are defined in a way reminiscent of Django's ORM: engine = engines.MergeTree('birthday', ('first_name', 'last_name', 'birthday')) -The database columns in the database table are represented by model fields. Each field has a type, which matches the type of the corresponding database column. You can see all the supported fields types [here](field_types.md). +The columns in the database table are represented by model fields. Each field has a type, which matches the type of the corresponding database column. All the supported fields types are listed [here](field_types.md). A model must have an `engine`, which determines how its table is stored on disk (if at all), and what capabilities it has. For more details about table engines see [here](table_engines.md). @@ -51,7 +51,7 @@ It is not possible to specify a default value for a materialized field. ### Alias fields -An alias field is is simply a different way to call another field in the model. For example: +An alias field is simply a different way to call another field in the model. For example: date_born = field.DateField(alias="birthday") From eb15dd65ec4d6d9b85adaa4b632d03e2b55c9e9f Mon Sep 17 00:00:00 2001 From: M1ha Date: Tue, 24 Apr 2018 15:19:05 +0500 Subject: [PATCH 28/39] Added custom partitioning to all MergeTree family engines --- src/infi/clickhouse_orm/engines.py | 24 ++++++++++++++---------- tests/test_engines.py | 21 +++++++++++++++++++-- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index b1aa8f7..1acd73d 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -115,9 +115,10 @@ class MergeTree(Engine): class CollapsingMergeTree(MergeTree): - def __init__(self, date_col, order_by, sign_col, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None): - super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name) + def __init__(self, date_col=None, order_by=(), sign_col='sign', sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, + replica_table_path, replica_name, partition_key) self.sign_col = sign_col def _build_sql_params(self, db): @@ -128,9 +129,10 @@ class CollapsingMergeTree(MergeTree): class SummingMergeTree(MergeTree): - def __init__(self, date_col, order_by, summing_cols=None, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None): - super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name) + def __init__(self, date_col=None, order_by=(), summing_cols=None, sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, + replica_name, partition_key) assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple' self.summing_cols = summing_cols @@ -143,9 +145,10 @@ class SummingMergeTree(MergeTree): class ReplacingMergeTree(MergeTree): - def __init__(self, date_col, order_by, ver_col=None, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None): - super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name) + def __init__(self, date_col=None, order_by=(), ver_col=None, sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, + replica_table_path, replica_name, partition_key) self.ver_col = ver_col def _build_sql_params(self, db): @@ -163,7 +166,8 @@ class Buffer(Engine): """ #Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes) - def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): + def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, + min_bytes=10000000, max_bytes=100000000): self.main_model = main_model self.num_layers = num_layers self.min_time = min_time diff --git a/tests/test_engines.py b/tests/test_engines.py index 7bc4e26..88266cf 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -137,10 +137,27 @@ class EnginesTestCase(_EnginesHelperTestCase): order_by=('date', 'event_id', 'event_group'), partition_key=('toYYYYMM(date)', 'event_group') ) + + class TestCollapseModel(SampleModel): + sign = Int8Field() + + engine = CollapsingMergeTree( + sign_col='sign', + order_by=('date', 'event_id', 'event_group'), + partition_key=('toYYYYMM(date)', 'event_group') + ) + self._create_and_insert(TestModel) - parts = list(SystemPart.get(self.database)) - self.assertEqual(1, len(parts)) + self._create_and_insert(TestCollapseModel) + + # Result order may be different, lets sort manually + parts = sorted(list(SystemPart.get(self.database)), key=lambda x: x.table) + + self.assertEqual(2, len(parts)) + self.assertEqual('testcollapsemodel', parts[0].table) self.assertEqual('(201701, 13)', parts[0].partition) + self.assertEqual('testmodel', parts[1].table) + self.assertEqual('(201701, 13)', parts[1].partition) class SampleModel(Model): From 2499a3f42ab52b093921a84db8112cf99a418be7 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sun, 6 May 2018 14:30:29 +0300 Subject: [PATCH 29/39] Update docs --- CHANGELOG.md | 2 +- docs/class_reference.md | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c130e6..2ca1d84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ Unreleased - Distributed engine support (tsionyx) - `_fields` and `_writable_fields` are OrderedDicts - note that this might break backwards compatibility (tsionyx) - Improve error messages returned from the database with the `ServerError` class (tsionyx) -- Added support of custom partitioning (M1hacka) +- Added support for custom partitioning (M1hacka) - Added attribute `server_version` to Database class (M1hacka) - Changed `Engine.create_table_sql()`, `Engine.drop_table_sql()`, `Model.create_table_sql()`, `Model.drop_table_sql()` parameter to db from db_name (M1hacka) - Fix parsing of datetime column type when it includes a timezone (M1hacka) diff --git a/docs/class_reference.md b/docs/class_reference.md index edcf4f9..637fbc9 100644 --- a/docs/class_reference.md +++ b/docs/class_reference.md @@ -695,21 +695,21 @@ straightly into Distributed table, optional Extends MergeTree -#### CollapsingMergeTree(date_col, order_by, sign_col, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) +#### CollapsingMergeTree(date_col=None, order_by=(), sign_col="sign", sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) ### SummingMergeTree Extends MergeTree -#### SummingMergeTree(date_col, order_by, summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) +#### SummingMergeTree(date_col=None, order_by=(), summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) ### ReplacingMergeTree Extends MergeTree -#### ReplacingMergeTree(date_col, order_by, ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) +#### ReplacingMergeTree(date_col=None, order_by=(), ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) infi.clickhouse_orm.query From a5c93dc176a001299553e6180b47978726f4208d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=BE=D0=BD=D0=B0=D1=80=D0=B5=D0=B2=D1=81=D0=BA?= =?UTF-8?q?=D0=B8=D0=B9=20=D0=92=D0=BB=D0=B0=D0=B4=D0=B8=D0=BC=D0=B8=D1=80?= Date: Tue, 8 May 2018 13:19:45 +0300 Subject: [PATCH 30/39] Add disabler for the timezone getting --- src/infi/clickhouse_orm/database.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index aa53b29..aeb7cf7 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -72,7 +72,8 @@ class Database(object): ''' def __init__(self, db_name, db_url='http://localhost:8123/', - username=None, password=None, readonly=False, autocreate=True): + username=None, password=None, readonly=False, autocreate=True, + get_timezone=True): ''' Initializes a database instance. Unless it's readonly, the database will be created on the ClickHouse server if it does not already exist. @@ -83,6 +84,7 @@ class Database(object): - `password`: optional connection credentials. - `readonly`: use a read-only connection. - `autocreate`: automatically create the database if does not exist (unless in readonly mode). + - `get_timezone`: automatically detect the server timezone. ''' self.db_name = db_name self.db_url = db_url @@ -96,7 +98,8 @@ class Database(object): elif autocreate: self.db_exists = False self.create_database() - self.server_timezone = self._get_server_timezone() + + self.server_timezone = self._get_server_timezone() if get_timezone else pytz.utc self.server_version = self._get_server_version() def create_database(self): From 793726adb2a056ae018919544db444088ef9a611 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=BE=D0=BD=D0=B0=D1=80=D0=B5=D0=B2=D1=81=D0=BA?= =?UTF-8?q?=D0=B8=D0=B9=20=D0=92=D0=BB=D0=B0=D0=B4=D0=B8=D0=BC=D0=B8=D1=80?= Date: Tue, 8 May 2018 16:41:50 +0300 Subject: [PATCH 31/39] Fix after review --- src/infi/clickhouse_orm/database.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index aeb7cf7..a4bb936 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -72,8 +72,7 @@ class Database(object): ''' def __init__(self, db_name, db_url='http://localhost:8123/', - username=None, password=None, readonly=False, autocreate=True, - get_timezone=True): + username=None, password=None, readonly=False, autocreate=True): ''' Initializes a database instance. Unless it's readonly, the database will be created on the ClickHouse server if it does not already exist. @@ -84,7 +83,6 @@ class Database(object): - `password`: optional connection credentials. - `readonly`: use a read-only connection. - `autocreate`: automatically create the database if does not exist (unless in readonly mode). - - `get_timezone`: automatically detect the server timezone. ''' self.db_name = db_name self.db_url = db_url @@ -98,9 +96,9 @@ class Database(object): elif autocreate: self.db_exists = False self.create_database() - - self.server_timezone = self._get_server_timezone() if get_timezone else pytz.utc self.server_version = self._get_server_version() + # Versions 1.1.53981 and below don't have timezone function + self.server_timezone = self._get_server_timezone() if self.server_version[2] > 53981 else pytz.utc def create_database(self): ''' From c20846122fa7f485afbdbc02821772c057fce0b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9F=D0=BE=D0=BD=D0=B0=D1=80=D0=B5=D0=B2=D1=81=D0=BA?= =?UTF-8?q?=D0=B8=D0=B9=20=D0=92=D0=BB=D0=B0=D0=B4=D0=B8=D0=BC=D0=B8=D1=80?= Date: Tue, 8 May 2018 17:24:27 +0300 Subject: [PATCH 32/39] Fix after review --- src/infi/clickhouse_orm/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index a4bb936..471575f 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -98,7 +98,7 @@ class Database(object): self.create_database() self.server_version = self._get_server_version() # Versions 1.1.53981 and below don't have timezone function - self.server_timezone = self._get_server_timezone() if self.server_version[2] > 53981 else pytz.utc + self.server_timezone = self._get_server_timezone() if self.server_version > (1, 1, 53981) else pytz.utc def create_database(self): ''' From 1bd3e63cd48b4ef76be78b72322feaa7e9925db3 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Thu, 10 May 2018 15:24:38 +0300 Subject: [PATCH 33/39] minor improvements in error handling and testing --- src/infi/clickhouse_orm/database.py | 3 ++- src/infi/clickhouse_orm/engines.py | 3 ++- src/infi/clickhouse_orm/fields.py | 2 +- tests/base_test_with_data.py | 4 ++-- tests/test_database.py | 16 +++++++++++++++- tests/test_engines.py | 6 ++++++ 6 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index aa53b29..0ef6b57 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -116,9 +116,10 @@ class Database(object): ''' Creates a table for the given model class, if it does not exist already. ''' - # TODO check that model has an engine if model_class.system: raise DatabaseException("You can't create system table") + if getattr(model_class, 'engine') is None: + raise DatabaseException("%s class must define an engine" % model_class.__name__) self._send(model_class.create_table_sql(self)) def drop_table(self, model_class): diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 1acd73d..ea6d3f4 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -40,6 +40,8 @@ class MergeTree(Engine): assert date_col is None or isinstance(date_col, six.string_types), 'date_col must be string if present' assert partition_key is None or type(partition_key) in (list, tuple),\ 'partition_key must be tuple or list if present' + assert (replica_table_path is None) == (replica_name == None), \ + 'both replica_table_path and replica_name must be specified' # These values conflict with each other (old and new syntax of table engines. # So let's control only one of them is given. @@ -52,7 +54,6 @@ class MergeTree(Engine): self.index_granularity = index_granularity self.replica_table_path = replica_table_path self.replica_name = replica_name - # TODO verify that both replica fields are either present or missing # I changed field name for new reality and syntax @property diff --git a/src/infi/clickhouse_orm/fields.py b/src/infi/clickhouse_orm/fields.py index 5449e9d..fc40790 100644 --- a/src/infi/clickhouse_orm/fields.py +++ b/src/infi/clickhouse_orm/fields.py @@ -39,7 +39,7 @@ class Field(object): data can't be converted. Returns the converted value. Subclasses should override this. The timezone_in_use parameter should be consulted when parsing datetime fields. ''' - return value + return value # pragma: no cover def validate(self, value): ''' diff --git a/tests/base_test_with_data.py b/tests/base_test_with_data.py index 352d3d3..90a328d 100644 --- a/tests/base_test_with_data.py +++ b/tests/base_test_with_data.py @@ -21,8 +21,8 @@ class TestCaseWithData(unittest.TestCase): self.database.drop_table(Person) self.database.drop_database() - def _insert_and_check(self, data, count): - self.database.insert(data) + def _insert_and_check(self, data, count, batch_size=1000): + self.database.insert(data, batch_size=batch_size) self.assertEquals(count, self.database.count(Person)) for instance in data: self.assertEquals(self.database, instance.get_database()) diff --git a/tests/test_database.py b/tests/test_database.py index 8e7c5c4..77bf1f0 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -2,7 +2,7 @@ from __future__ import unicode_literals import unittest -from infi.clickhouse_orm.database import ServerError +from infi.clickhouse_orm.database import ServerError, DatabaseException from .base_test_with_data import * @@ -20,6 +20,12 @@ class DatabaseTestCase(TestCaseWithData): def test_insert__empty(self): self._insert_and_check([], 0) + def test_insert__small_batches(self): + self._insert_and_check(self._sample_data(), len(data), batch_size=10) + + def test_insert__medium_batches(self): + self._insert_and_check(self._sample_data(), len(data), batch_size=100) + def test_count(self): self.database.insert(self._sample_data()) self.assertEquals(self.database.count(Person), 100) @@ -150,3 +156,11 @@ class DatabaseTestCase(TestCaseWithData): def test_preexisting_db(self): db = Database(self.database.db_name, autocreate=False) db.count(Person) + + def test_missing_engine(self): + class EnginelessModel(Model): + float_field = Float32Field() + with self.assertRaises(DatabaseException) as cm: + self.database.create_table(EnginelessModel) + self.assertEqual(cm.exception.message, 'EnginelessModel class must define an engine') + diff --git a/tests/test_engines.py b/tests/test_engines.py index 88266cf..85966df 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -51,6 +51,12 @@ class EnginesTestCase(_EnginesHelperTestCase): expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)" self.assertEquals(engine.create_table_sql(self.database), expected) + def test_replicated_merge_tree_incomplete(self): + with self.assertRaises(AssertionError): + MergeTree('date', ('date', 'event_id', 'event_group'), replica_table_path='/clickhouse/tables/{layer}-{shard}/hits') + with self.assertRaises(AssertionError): + MergeTree('date', ('date', 'event_id', 'event_group'), replica_name='{replica}') + def test_collapsing_merge_tree(self): class TestModel(SampleModel): engine = CollapsingMergeTree('date', ('date', 'event_id', 'event_group'), 'event_version') From ab0755ad90cf115a5c34c6f0fc9a95432dc346b4 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Mon, 14 May 2018 07:09:57 -0400 Subject: [PATCH 34/39] - Rename `Model.system` to `Model._system` to prevent collision with a column that has the same name - Rename `Model.readonly` to `Model._readonly` to prevent collision with a column that has the same name - The `field_names` argument to `Model.to_tsv` is now mandatory --- src/infi/clickhouse_orm/database.py | 6 ++--- src/infi/clickhouse_orm/models.py | 34 +++++++++++++++++++++-------- tests/test_database.py | 12 ++++++++++ tests/test_readonly.py | 3 ++- tests/test_system_models.py | 2 +- 5 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index 0ef6b57..49dcdaa 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -116,7 +116,7 @@ class Database(object): ''' Creates a table for the given model class, if it does not exist already. ''' - if model_class.system: + if model_class.is_system_model(): raise DatabaseException("You can't create system table") if getattr(model_class, 'engine') is None: raise DatabaseException("%s class must define an engine" % model_class.__name__) @@ -126,7 +126,7 @@ class Database(object): ''' Drops the database table of the given model class, if it exists. ''' - if model_class.system: + if model_class.is_system_model(): raise DatabaseException("You can't drop system table") self._send(model_class.drop_table_sql(self)) @@ -146,7 +146,7 @@ class Database(object): return # model_instances is empty model_class = first_instance.__class__ - if first_instance.readonly or first_instance.system: + if first_instance.is_read_only() or first_instance.is_system_model(): raise DatabaseException("You can't insert into read only and system tables") fields_list = ','.join( diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index f5e7377..c2ab40d 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -22,7 +22,6 @@ class ModelBase(type): ad_hoc_model_cache = {} def __new__(cls, name, bases, attrs): - new_cls = super(ModelBase, cls).__new__(cls, str(name), bases, attrs) # Collect fields from parent classes base_fields = dict() for base in bases: @@ -35,9 +34,12 @@ class ModelBase(type): fields.update({n: f for n, f in iteritems(attrs) if isinstance(f, Field)}) fields = sorted(iteritems(fields), key=lambda item: item[1].creation_counter) - setattr(new_cls, '_fields', OrderedDict(fields)) - setattr(new_cls, '_writable_fields', OrderedDict([f for f in fields if not f[1].readonly])) - return new_cls + attrs = dict( + attrs, + _fields=OrderedDict(fields), + _writable_fields=OrderedDict([f for f in fields if not f[1].readonly]), + ) + return super(ModelBase, cls).__new__(cls, str(name), bases, attrs) @classmethod def create_ad_hoc_model(cls, fields, model_name='AdHocModel'): @@ -99,10 +101,12 @@ class Model(with_metaclass(ModelBase)): engine = None # Insert operations are restricted for read only models - readonly = False + _readonly = False # Create table, drop table, insert operations are restricted for system models - system = False + _system = False + + _database = None def __init__(self, **kwargs): ''' @@ -198,11 +202,10 @@ class Model(with_metaclass(ModelBase)): return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db.db_name, cls.table_name()) @classmethod - def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None): + def from_tsv(cls, line, field_names, timezone_in_use=pytz.utc, database=None): ''' Create a model instance from a tab-separated line. The line may or may not include a newline. The `field_names` list must match the fields defined in the model, but does not have to include all of them. - If omitted, it is assumed to be the names of all fields in the model, in order of definition. - `line`: the TSV-formatted data. - `field_names`: names of the model fields in the data. @@ -210,7 +213,6 @@ class Model(with_metaclass(ModelBase)): - `database`: if given, sets the database that this instance belongs to. ''' from six import next - field_names = field_names or list(cls.fields()) values = iter(parse_tsv(line)) kwargs = {} for name in field_names: @@ -265,6 +267,20 @@ class Model(with_metaclass(ModelBase)): # noinspection PyProtectedMember,PyUnresolvedReferences return cls._writable_fields if writable else cls._fields + @classmethod + def is_read_only(cls): + ''' + Returns true if the model is marked as read only. + ''' + return cls._readonly + + @classmethod + def is_system_model(cls): + ''' + Returns true if the model represents a system table. + ''' + return cls._system + class BufferModel(Model): diff --git a/tests/test_database.py b/tests/test_database.py index 77bf1f0..331bcef 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -164,3 +164,15 @@ class DatabaseTestCase(TestCaseWithData): self.database.create_table(EnginelessModel) self.assertEqual(cm.exception.message, 'EnginelessModel class must define an engine') + def test_potentially_problematic_field_names(self): + class Model1(Model): + system = StringField() + readonly = StringField() + engine = Memory() + instance = Model1(system='s', readonly='r') + self.assertEquals(instance.to_dict(), dict(system='s', readonly='r')) + self.database.create_table(Model1) + self.database.insert([instance]) + instance = Model1.objects_in(self.database)[0] + self.assertEquals(instance.to_dict(), dict(system='s', readonly='r')) + diff --git a/tests/test_readonly.py b/tests/test_readonly.py index 62a2bf5..73c7d26 100644 --- a/tests/test_readonly.py +++ b/tests/test_readonly.py @@ -48,6 +48,7 @@ class ReadonlyTestCase(TestCaseWithData): def test_insert_readonly(self): m = ReadOnlyModel(name='readonly') + self.database.create_table(ReadOnlyModel) with self.assertRaises(DatabaseException): self.database.insert([m]) @@ -59,7 +60,7 @@ class ReadonlyTestCase(TestCaseWithData): class ReadOnlyModel(Model): - readonly = True + _readonly = True name = StringField() date = DateField() diff --git a/tests/test_system_models.py b/tests/test_system_models.py index b49cc52..b9576ac 100644 --- a/tests/test_system_models.py +++ b/tests/test_system_models.py @@ -116,4 +116,4 @@ class CustomPartitionedTable(Model): class SystemTestModel(Model): - system = True + _system = True From 2bca8b4fb84bd3eeab4977ea23f1b70f7c0d5847 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Mon, 14 May 2018 07:37:56 -0400 Subject: [PATCH 35/39] Improve creation time of model instances by keeping a dictionary of default values --- CHANGELOG.md | 4 ++++ src/infi/clickhouse_orm/models.py | 16 +++++++--------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ca1d84..089ba81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ Unreleased - Added attribute `server_version` to Database class (M1hacka) - Changed `Engine.create_table_sql()`, `Engine.drop_table_sql()`, `Model.create_table_sql()`, `Model.drop_table_sql()` parameter to db from db_name (M1hacka) - Fix parsing of datetime column type when it includes a timezone (M1hacka) +- Rename `Model.system` to `Model._system` to prevent collision with a column that has the same name +- Rename `Model.readonly` to `Model._readonly` to prevent collision with a column that has the same name +- The `field_names` argument to `Model.to_tsv` is now mandatory +- Improve creation time of model instances by keeping a dictionary of default values v0.9.8 ------ diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index c2ab40d..d292462 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -34,10 +34,14 @@ class ModelBase(type): fields.update({n: f for n, f in iteritems(attrs) if isinstance(f, Field)}) fields = sorted(iteritems(fields), key=lambda item: item[1].creation_counter) + # Build a dictionary of default values + defaults = {n: f.to_python(f.default, pytz.UTC) for n, f in fields} + attrs = dict( attrs, _fields=OrderedDict(fields), _writable_fields=OrderedDict([f for f in fields if not f[1].readonly]), + _defaults=defaults ) return super(ModelBase, cls).__new__(cls, str(name), bases, attrs) @@ -116,9 +120,8 @@ class Model(with_metaclass(ModelBase)): Unrecognized field names will cause an `AttributeError`. ''' super(Model, self).__init__() - - self._database = None - + # Assign default values + self.__dict__.update(self._defaults) # Assign field values from keyword arguments for name, value in iteritems(kwargs): field = self.get_field(name) @@ -126,10 +129,6 @@ class Model(with_metaclass(ModelBase)): setattr(self, name, value) else: raise AttributeError('%s does not have a field called %s' % (self.__class__.__name__, name)) - # Assign default values for fields not included in the keyword arguments - for name, field in iteritems(self.fields()): - if name not in kwargs: - setattr(self, name, field.default) def __setattr__(self, name, value): ''' @@ -168,8 +167,7 @@ class Model(with_metaclass(ModelBase)): ''' Gets a `Field` instance given its name, or `None` if not found. ''' - field = getattr(self.__class__, name, None) - return field if isinstance(field, Field) else None + return self._fields.get(name) @classmethod def table_name(cls): From d7382e1ce2814440ba22cb12c18a46e00cc0c596 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sun, 10 Jun 2018 14:27:51 +0300 Subject: [PATCH 36/39] fix field name parsing when field contains double underscore #69 --- src/infi/clickhouse_orm/query.py | 6 +++++- tests/test_querysets.py | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/infi/clickhouse_orm/query.py b/src/infi/clickhouse_orm/query.py index 487a836..4e079e5 100644 --- a/src/infi/clickhouse_orm/query.py +++ b/src/infi/clickhouse_orm/query.py @@ -157,7 +157,11 @@ class FOV(object): def __init__(self, field_name, operator, value): self._field_name = field_name - self._operator = _operators[operator] + self._operator = _operators.get(operator) + if self._operator is None: + # The field name contains __ like my__field + self._field_name = field_name + '__' + operator + self._operator = _operators['eq'] self._value = value def to_sql(self, model_cls): diff --git a/tests/test_querysets.py b/tests/test_querysets.py index 8f938fc..4176341 100644 --- a/tests/test_querysets.py +++ b/tests/test_querysets.py @@ -334,6 +334,20 @@ class AggregateTestCase(TestCaseWithData): print(qs.as_sql()) self.assertEquals(qs.count(), 1) + def test_double_underscore_field(self): + class Mdl(Model): + the__number = Int32Field() + the__next__number = Int32Field() + engine = Memory() + qs = Mdl.objects_in(self.database).filter(the__number=1) + self.assertEquals(qs.conditions_as_sql(), 'the__number = 1') + qs = Mdl.objects_in(self.database).filter(the__number__gt=1) + self.assertEquals(qs.conditions_as_sql(), 'the__number > 1') + qs = Mdl.objects_in(self.database).filter(the__next__number=1) + self.assertEquals(qs.conditions_as_sql(), 'the__next__number = 1') + qs = Mdl.objects_in(self.database).filter(the__next__number__gt=1) + self.assertEquals(qs.conditions_as_sql(), 'the__next__number > 1') + Color = Enum('Color', u'red blue green yellow brown white black') @@ -351,3 +365,5 @@ class SampleModel(Model): class Numbers(Model): number = UInt64Field() + + From 658d1da5ce8f2737b5624c4d8aace964d91841cf Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sun, 10 Jun 2018 14:30:40 +0300 Subject: [PATCH 37/39] Update docs --- CHANGELOG.md | 2 ++ docs/class_reference.md | 45 +++++++++++++++++++++++++++++++++++------ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 089ba81..1267fde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ Unreleased - Rename `Model.readonly` to `Model._readonly` to prevent collision with a column that has the same name - The `field_names` argument to `Model.to_tsv` is now mandatory - Improve creation time of model instances by keeping a dictionary of default values +- Fix queryset bug when field name contains double underscores (YouCanKeepSilence) +- Prevent exception when determining timezone of old ClickHouse versions (vv-p) v0.9.8 ------ diff --git a/docs/class_reference.md b/docs/class_reference.md index 637fbc9..843570b 100644 --- a/docs/class_reference.md +++ b/docs/class_reference.md @@ -164,12 +164,11 @@ If `writable` is true, only writable fields are included. Callers should not modify the dictionary. -#### Model.from_tsv(line, field_names=None, timezone_in_use=UTC, database=None) +#### Model.from_tsv(line, field_names, timezone_in_use=UTC, database=None) Create a model instance from a tab-separated line. The line may or may not include a newline. The `field_names` list must match the fields defined in the model, but does not have to include all of them. -If omitted, it is assumed to be the names of all fields in the model, in order of definition. - `line`: the TSV-formatted data. - `field_names`: names of the model fields in the data. @@ -190,6 +189,18 @@ Returns `None` unless the instance was read from the database or written to it. Gets a `Field` instance given its name, or `None` if not found. +#### Model.is_read_only() + + +Returns true if the model is marked as read only. + + +#### Model.is_system_model() + + +Returns true if the model represents a system table. + + #### Model.objects_in(database) @@ -261,12 +272,11 @@ If `writable` is true, only writable fields are included. Callers should not modify the dictionary. -#### BufferModel.from_tsv(line, field_names=None, timezone_in_use=UTC, database=None) +#### BufferModel.from_tsv(line, field_names, timezone_in_use=UTC, database=None) Create a model instance from a tab-separated line. The line may or may not include a newline. The `field_names` list must match the fields defined in the model, but does not have to include all of them. -If omitted, it is assumed to be the names of all fields in the model, in order of definition. - `line`: the TSV-formatted data. - `field_names`: names of the model fields in the data. @@ -287,6 +297,18 @@ Returns `None` unless the instance was read from the database or written to it. Gets a `Field` instance given its name, or `None` if not found. +#### BufferModel.is_read_only() + + +Returns true if the model is marked as read only. + + +#### BufferModel.is_system_model() + + +Returns true if the model represents a system table. + + #### BufferModel.objects_in(database) @@ -391,12 +413,11 @@ you can always mention the Foo model twice without bothering with any fixes: See tests.test_engines:DistributedTestCase for more examples -#### DistributedModel.from_tsv(line, field_names=None, timezone_in_use=UTC, database=None) +#### DistributedModel.from_tsv(line, field_names, timezone_in_use=UTC, database=None) Create a model instance from a tab-separated line. The line may or may not include a newline. The `field_names` list must match the fields defined in the model, but does not have to include all of them. -If omitted, it is assumed to be the names of all fields in the model, in order of definition. - `line`: the TSV-formatted data. - `field_names`: names of the model fields in the data. @@ -417,6 +438,18 @@ Returns `None` unless the instance was read from the database or written to it. Gets a `Field` instance given its name, or `None` if not found. +#### DistributedModel.is_read_only() + + +Returns true if the model is marked as read only. + + +#### DistributedModel.is_system_model() + + +Returns true if the model represents a system table. + + #### DistributedModel.objects_in(database) From 86f05a0c23b4c57cf9f569d0aae22ed6cdf6bb13 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sun, 10 Jun 2018 16:17:35 +0300 Subject: [PATCH 38/39] Python 3 compatibility --- src/infi/clickhouse_orm/database.py | 1 + tests/test_database.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index d8c9762..3b7179f 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -37,6 +37,7 @@ class ServerError(DatabaseException): else: # just skip custom init # if non-standard message format + self.message = message super(ServerError, self).__init__(message) ERROR_PATTERN = re.compile(r''' diff --git a/tests/test_database.py b/tests/test_database.py index 331bcef..d4cf387 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -162,7 +162,7 @@ class DatabaseTestCase(TestCaseWithData): float_field = Float32Field() with self.assertRaises(DatabaseException) as cm: self.database.create_table(EnginelessModel) - self.assertEqual(cm.exception.message, 'EnginelessModel class must define an engine') + self.assertEqual(str(cm.exception), 'EnginelessModel class must define an engine') def test_potentially_problematic_field_names(self): class Model1(Model): From 0ffafe3f62005c785433f0016fbead5c2fe0ddc0 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Mon, 11 Jun 2018 13:33:17 +0300 Subject: [PATCH 39/39] Releasing v1.0.0 --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1267fde..f6b4393 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,8 @@ Change Log ========== -Unreleased ----------- +v1.0.0 +------ - Add support for compound filters with Q objects (desile) - Add support for BETWEEN operator (desile) - Distributed engine support (tsionyx)