diff --git a/.gitignore b/.gitignore index ae23e11..0e9fa7b 100644 --- a/.gitignore +++ b/.gitignore @@ -58,4 +58,7 @@ buildout.in src/infi/clickhouse_orm/__version__.py bootstrap.py -htmldocs/ \ No newline at end of file +htmldocs/ + +# tox +.tox/ diff --git a/CHANGELOG.md b/CHANGELOG.md index ac840ac..f6b4393 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,24 @@ Change Log ========== +v1.0.0 +------ +- Add support for compound filters with Q objects (desile) +- Add support for BETWEEN operator (desile) +- Distributed engine support (tsionyx) +- `_fields` and `_writable_fields` are OrderedDicts - note that this might break backwards compatibility (tsionyx) +- Improve error messages returned from the database with the `ServerError` class (tsionyx) +- Added support for custom partitioning (M1hacka) +- Added attribute `server_version` to Database class (M1hacka) +- Changed `Engine.create_table_sql()`, `Engine.drop_table_sql()`, `Model.create_table_sql()`, `Model.drop_table_sql()` parameter to db from db_name (M1hacka) +- Fix parsing of datetime column type when it includes a timezone (M1hacka) +- Rename `Model.system` to `Model._system` to prevent collision with a column that has the same name +- Rename `Model.readonly` to `Model._readonly` to prevent collision with a column that has the same name +- The `field_names` argument to `Model.to_tsv` is now mandatory +- Improve creation time of model instances by keeping a dictionary of default values +- Fix queryset bug when field name contains double underscores (YouCanKeepSilence) +- Prevent exception when determining timezone of old ClickHouse versions (vv-p) + v0.9.8 ------ - Bug fix: add field names list explicitly to Database.insert method (anci) diff --git a/buildout.cfg b/buildout.cfg index 27e9e8b..2e10159 100644 --- a/buildout.cfg +++ b/buildout.cfg @@ -3,7 +3,9 @@ prefer-final = false newest = false download-cache = .cache develop = . -parts = +parts = +relative-paths = true +extensions = buildout.wheel [project] name = infi.clickhouse_orm diff --git a/docs/class_reference.md b/docs/class_reference.md index e56fc7a..843570b 100644 --- a/docs/class_reference.md +++ b/docs/class_reference.md @@ -144,24 +144,31 @@ invalid values will cause a `ValueError` to be raised. Unrecognized field names will cause an `AttributeError`. -#### Model.create_table_sql(db_name) +#### Model.create_table_sql(db) Returns the SQL command for creating a table for this model. -#### Model.drop_table_sql(db_name) +#### Model.drop_table_sql(db) Returns the SQL command for deleting this model's table. -#### Model.from_tsv(line, field_names=None, timezone_in_use=UTC, database=None) +#### Model.fields(writable=False) + + +Returns an `OrderedDict` of the model's fields (from name to `Field` instance). +If `writable` is true, only writable fields are included. +Callers should not modify the dictionary. + + +#### Model.from_tsv(line, field_names, timezone_in_use=UTC, database=None) Create a model instance from a tab-separated line. The line may or may not include a newline. The `field_names` list must match the fields defined in the model, but does not have to include all of them. -If omitted, it is assumed to be the names of all fields in the model, in order of definition. - `line`: the TSV-formatted data. - `field_names`: names of the model fields in the data. @@ -182,6 +189,18 @@ Returns `None` unless the instance was read from the database or written to it. Gets a `Field` instance given its name, or `None` if not found. +#### Model.is_read_only() + + +Returns true if the model is marked as read only. + + +#### Model.is_system_model() + + +Returns true if the model represents a system table. + + #### Model.objects_in(database) @@ -233,24 +252,31 @@ invalid values will cause a `ValueError` to be raised. Unrecognized field names will cause an `AttributeError`. -#### BufferModel.create_table_sql(db_name) +#### BufferModel.create_table_sql(db) Returns the SQL command for creating a table for this model. -#### BufferModel.drop_table_sql(db_name) +#### BufferModel.drop_table_sql(db) Returns the SQL command for deleting this model's table. -#### BufferModel.from_tsv(line, field_names=None, timezone_in_use=UTC, database=None) +#### BufferModel.fields(writable=False) + + +Returns an `OrderedDict` of the model's fields (from name to `Field` instance). +If `writable` is true, only writable fields are included. +Callers should not modify the dictionary. + + +#### BufferModel.from_tsv(line, field_names, timezone_in_use=UTC, database=None) Create a model instance from a tab-separated line. The line may or may not include a newline. The `field_names` list must match the fields defined in the model, but does not have to include all of them. -If omitted, it is assumed to be the names of all fields in the model, in order of definition. - `line`: the TSV-formatted data. - `field_names`: names of the model fields in the data. @@ -271,6 +297,18 @@ Returns `None` unless the instance was read from the database or written to it. Gets a `Field` instance given its name, or `None` if not found. +#### BufferModel.is_read_only() + + +Returns true if the model is marked as read only. + + +#### BufferModel.is_system_model() + + +Returns true if the model represents a system table. + + #### BufferModel.objects_in(database) @@ -309,6 +347,143 @@ Returns the instance's column values as a tab-separated line. A newline is not i - `include_readonly`: if false, returns only fields that can be inserted into database. +### DistributedModel + +Extends Model + + +Model for Distributed engine + +#### DistributedModel(**kwargs) + + +Creates a model instance, using keyword arguments as field values. +Since values are immediately converted to their Pythonic type, +invalid values will cause a `ValueError` to be raised. +Unrecognized field names will cause an `AttributeError`. + + +#### DistributedModel.create_table_sql(db) + + +#### DistributedModel.drop_table_sql(db) + + +Returns the SQL command for deleting this model's table. + + +#### DistributedModel.fields(writable=False) + + +Returns an `OrderedDict` of the model's fields (from name to `Field` instance). +If `writable` is true, only writable fields are included. +Callers should not modify the dictionary. + + +#### DistributedModel.fix_engine_table() + + +Remember: Distributed table does not store any data, just provides distributed access to it. + +So if we define a model with engine that has no defined table for data storage +(see FooDistributed below), that table cannot be successfully created. +This routine can automatically fix engine's storage table by finding the first +non-distributed model among your model's superclasses. + +>>> class Foo(Model): +... id = UInt8Field(1) +... +>>> class FooDistributed(Foo, DistributedModel): +... engine = Distributed('my_cluster') +... +>>> FooDistributed.engine.table +None +>>> FooDistributed.fix_engine() +>>> FooDistributed.engine.table + + +However if you prefer more explicit way of doing things, +you can always mention the Foo model twice without bothering with any fixes: + +>>> class FooDistributedVerbose(Foo, DistributedModel): +... engine = Distributed('my_cluster', Foo) +>>> FooDistributedVerbose.engine.table + + +See tests.test_engines:DistributedTestCase for more examples + + +#### DistributedModel.from_tsv(line, field_names, timezone_in_use=UTC, database=None) + + +Create a model instance from a tab-separated line. The line may or may not include a newline. +The `field_names` list must match the fields defined in the model, but does not have to include all of them. + +- `line`: the TSV-formatted data. +- `field_names`: names of the model fields in the data. +- `timezone_in_use`: the timezone to use when parsing dates and datetimes. +- `database`: if given, sets the database that this instance belongs to. + + +#### get_database() + + +Gets the `Database` that this model instance belongs to. +Returns `None` unless the instance was read from the database or written to it. + + +#### get_field(name) + + +Gets a `Field` instance given its name, or `None` if not found. + + +#### DistributedModel.is_read_only() + + +Returns true if the model is marked as read only. + + +#### DistributedModel.is_system_model() + + +Returns true if the model represents a system table. + + +#### DistributedModel.objects_in(database) + + +Returns a `QuerySet` for selecting instances of this model class. + + +#### set_database(db) + + +#### DistributedModel.table_name() + + +Returns the model's database table name. By default this is the +class name converted to lowercase. Override this if you want to use +a different table name. + + +#### to_dict(include_readonly=True, field_names=None) + + +Returns the instance's column values as a dict. + +- `include_readonly`: if false, returns only fields that can be inserted into database. +- `field_names`: an iterable of field names to return (optional) + + +#### to_tsv(include_readonly=True) + + +Returns the instance's column values as a tab-separated line. A newline is not included. + +- `include_readonly`: if false, returns only fields that can be inserted into database. + + infi.clickhouse_orm.fields -------------------------- @@ -497,7 +672,7 @@ Extends Engine Extends Engine -#### MergeTree(date_col, key_cols, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) +#### MergeTree(date_col=None, order_by=(), sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) ### Buffer @@ -507,7 +682,7 @@ Extends Engine Buffers the data to write in RAM, periodically flushing it to another table. Must be used in conjuction with a `BufferModel`. -Read more [here](https://clickhouse.yandex/reference_en.html#Buffer). +Read more [here](https://clickhouse.yandex/docs/en/table_engines/buffer/). #### Buffer(main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000) @@ -525,25 +700,49 @@ https://clickhouse.yandex/docs/en/single/index.html#document-table_engines/merge #### Merge(table_regex) +### Distributed + +Extends Engine + + +The Distributed engine by itself does not store data, +but allows distributed query processing on multiple servers. +Reading is automatically parallelized. +During a read, the table indexes on remote servers are used, if there are any. + +See full documentation here +https://clickhouse.yandex/docs/en/table_engines/distributed.html + +#### Distributed(cluster, table=None, sharding_key=None) + + +:param cluster: what cluster to access data from +:param table: underlying table that actually stores data. +If you are not specifying any table here, ensure that it can be inferred +from your model's superclass (see models.DistributedModel.fix_engine_table) +:param sharding_key: how to distribute data among shards when inserting +straightly into Distributed table, optional + + ### CollapsingMergeTree Extends MergeTree -#### CollapsingMergeTree(date_col, key_cols, sign_col, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) +#### CollapsingMergeTree(date_col=None, order_by=(), sign_col="sign", sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) ### SummingMergeTree Extends MergeTree -#### SummingMergeTree(date_col, key_cols, summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) +#### SummingMergeTree(date_col=None, order_by=(), summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) ### ReplacingMergeTree Extends MergeTree -#### ReplacingMergeTree(date_col, key_cols, ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) +#### ReplacingMergeTree(date_col=None, order_by=(), ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None) infi.clickhouse_orm.query @@ -605,16 +804,17 @@ Adds a DISTINCT clause to the query, meaning that any duplicate rows in the results will be omitted. -#### exclude(**kwargs) +#### exclude(**filter_fields) Returns a copy of this queryset that excludes all rows matching the conditions. -#### filter(**kwargs) +#### filter(*q, **filter_fields) Returns a copy of this queryset that includes only rows matching the conditions. +Add q object to query if it specified. #### only(*field_names) @@ -705,16 +905,17 @@ Adds a DISTINCT clause to the query, meaning that any duplicate rows in the results will be omitted. -#### exclude(**kwargs) +#### exclude(**filter_fields) Returns a copy of this queryset that excludes all rows matching the conditions. -#### filter(**kwargs) +#### filter(*q, **filter_fields) Returns a copy of this queryset that includes only rows matching the conditions. +Add q object to query if it specified. #### group_by(*args) diff --git a/docs/contributing.md b/docs/contributing.md index cb64e57..c173cb9 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -30,6 +30,10 @@ To see test coverage information run: bin/nosetests --with-coverage --cover-package=infi.clickhouse_orm +To test with tox, ensure that the setup.py is present (otherwise run `bin/buildout buildout:develop= setup.py`) and run: + + pip install tox + tox --- diff --git a/docs/field_types.md b/docs/field_types.md index 80d0f2a..4c447eb 100644 --- a/docs/field_types.md +++ b/docs/field_types.md @@ -1,6 +1,8 @@ Field Types =========== +See: [ClickHouse Documentation](https://clickhouse.yandex/docs/en/data_types/) + Currently the following field types are supported: | Class | DB Type | Pythonic Type | Comments @@ -85,7 +87,7 @@ Working with materialized and alias fields ClickHouse provides an opportunity to create MATERIALIZED and ALIAS Fields. -See documentation [here](https://clickhouse.yandex/reference_en.html#Default%20values). +See documentation [here](https://clickhouse.yandex/docs/en/query_language/queries/#default-values). Both field types can't be inserted into the database directly, so they are ignored when using the `Database.insert()` method. ClickHouse does not return the field values if you use `"SELECT * FROM ..."` - you have to list these field names explicitly in the query. diff --git a/docs/models_and_databases.md b/docs/models_and_databases.md index 2b84f99..230fbbb 100644 --- a/docs/models_and_databases.md +++ b/docs/models_and_databases.md @@ -8,7 +8,7 @@ Database instances connect to a specific ClickHouse database for running queries Defining Models --------------- -Models are defined in a way reminiscent of Django's ORM: +Models are defined in a way reminiscent of Django's ORM, by subclassing `Model`: from infi.clickhouse_orm import models, fields, engines @@ -21,9 +21,43 @@ Models are defined in a way reminiscent of Django's ORM: engine = engines.MergeTree('birthday', ('first_name', 'last_name', 'birthday')) -It is possible to provide a default value for a field, instead of its "natural" default (empty string for string fields, zero for numeric fields etc.). Alternatively it is possible to pass alias or materialized parameters (see below for usage examples). Only one of `default`, `alias` and `materialized` parameters can be provided. +The columns in the database table are represented by model fields. Each field has a type, which matches the type of the corresponding database column. All the supported fields types are listed [here](field_types.md). -For more details see [Field Types](field_types.md) and [Table Engines](table_engines.md). +A model must have an `engine`, which determines how its table is stored on disk (if at all), and what capabilities it has. For more details about table engines see [here](table_engines.md). + +### Default values + +Each field has a "natural" default value - empty string for string fields, zero for numeric fields etc. To specify a different value use the `default` parameter: + + first_name = fields.StringField(default="anonymous") + +### Null values + +To allow null values in a field, wrap it inside a `NullableField`: + + birthday = fields.NullableField(fields.DateField()) + +In this case, the default value for that fields becomes `null` unless otherwide specified. + +### Materialized fields + +The value of a materialized field is calculated from other fields in the model. For example: + + year_born = fields.Int16Field(materialized="toYear(birthday)") + +Materialized fields are read-only, meaning that their values are not sent to the database when inserting records. + +It is not possible to specify a default value for a materialized field. + +### Alias fields + +An alias field is simply a different way to call another field in the model. For example: + + date_born = field.DateField(alias="birthday") + +Alias fields are read-only, meaning that their values are not sent to the database when inserting records. + +It is not possible to specify a default value for an alias field. ### Table Names diff --git a/docs/querysets.md b/docs/querysets.md index d27c836..260e66c 100644 --- a/docs/querysets.md +++ b/docs/querysets.md @@ -26,6 +26,12 @@ It is possible to specify several fields to filter or exclude by: >>> qs.conditions_as_sql() u"last_name = 'Smith' AND height > 1.75" +For filters with compound conditions you can use `Q` objects inside `filter` with overloaded operators `&` (AND), `|` (OR) and `~` (NOT): + + >>> qs = Person.objects_in(database).filter((Q(first_name='Ciaran', last_name='Carver') | Q(height_lte=1.8)) & ~Q(first_name='David')) + >>> qs.conditions_as_sql() + u"((first_name = 'Ciaran' AND last_name = 'Carver') OR height <= 1.8) AND (NOT (first_name = 'David'))" + There are different operators that can be used, by passing `__=` (two underscores separate the field name from the operator). In case no operator is given, `eq` is used by default. Below are all the supported operators. | Operator | Equivalent SQL | Comments | @@ -36,6 +42,7 @@ There are different operators that can be used, by passing `__= value` | | | `lt` | `field < value` | | | `lte` | `field <= value` | | +| `between` | `field BETWEEN value1 AND value2` | | | `in` | `field IN (values)` | See below | | `not_in` | `field NOT IN (values)` | See below | | `contains` | `field LIKE '%value%'` | For string fields only | @@ -49,6 +56,7 @@ There are different operators that can be used, by passing `__\d+), + \ e\.displayText\(\)\ =\ (?P[^ \n]+):\ (?P.+?), + \ e.what\(\)\ =\ (?P[^ \n]+) + ''', re.VERBOSE | re.DOTALL) + + @classmethod + def get_error_code_msg(cls, full_error_message): + """ + Extract the code and message of the exception that clickhouse-server generated. + + See the list of error codes here: + https://github.com/yandex/ClickHouse/blob/master/dbms/src/Common/ErrorCodes.cpp + """ + match = cls.ERROR_PATTERN.match(full_error_message) + if match: + # assert match.group('type1') == match.group('type2') + return int(match.group('code')), match.group('msg') + + return 0, full_error_message + + def __str__(self): + if self.code is not None: + return "{} ({})".format(self.message, self.code) + + class Database(object): ''' Database instances connect to a specific ClickHouse database for running queries, @@ -55,7 +97,9 @@ class Database(object): elif autocreate: self.db_exists = False self.create_database() - self.server_timezone = self._get_server_timezone() + self.server_version = self._get_server_version() + # Versions 1.1.53981 and below don't have timezone function + self.server_timezone = self._get_server_timezone() if self.server_version > (1, 1, 53981) else pytz.utc def create_database(self): ''' @@ -74,18 +118,19 @@ class Database(object): ''' Creates a table for the given model class, if it does not exist already. ''' - # TODO check that model has an engine - if model_class.system: + if model_class.is_system_model(): raise DatabaseException("You can't create system table") - self._send(model_class.create_table_sql(self.db_name)) + if getattr(model_class, 'engine') is None: + raise DatabaseException("%s class must define an engine" % model_class.__name__) + self._send(model_class.create_table_sql(self)) def drop_table(self, model_class): ''' Drops the database table of the given model class, if it exists. ''' - if model_class.system: + if model_class.is_system_model(): raise DatabaseException("You can't drop system table") - self._send(model_class.drop_table_sql(self.db_name)) + self._send(model_class.drop_table_sql(self)) def insert(self, model_instances, batch_size=1000): ''' @@ -103,11 +148,11 @@ class Database(object): return # model_instances is empty model_class = first_instance.__class__ - if first_instance.readonly or first_instance.system: + if first_instance.is_read_only() or first_instance.is_system_model(): raise DatabaseException("You can't insert into read only and system tables") fields_list = ','.join( - ['`%s`' % name for name, _ in first_instance._writable_fields]) + ['`%s`' % name for name in first_instance.fields(writable=True)]) def gen(): buf = BytesIO() @@ -250,7 +295,7 @@ class Database(object): params = self._build_params(settings) r = requests.post(self.db_url, params=params, data=data, stream=stream) if r.status_code != 200: - raise DatabaseException(r.text) + raise ServerError(r.text) return r def _build_params(self, settings): @@ -281,10 +326,19 @@ class Database(object): try: r = self._send('SELECT timezone()') return pytz.timezone(r.text.strip()) - except DatabaseException: - logger.exception('Cannot determine server timezone, assuming UTC') + except ServerError as e: + logger.exception('Cannot determine server timezone (%s), assuming UTC', e) return pytz.utc + def _get_server_version(self, as_tuple=True): + try: + r = self._send('SELECT version();') + ver = r.text + except ServerError as e: + logger.exception('Cannot determine server version (%s), assuming 1.1.0', e) + ver = '1.1.0' + return tuple(int(n) for n in ver.split('.')) if as_tuple else ver + def _is_connection_readonly(self): r = self._send("SELECT value FROM system.settings WHERE name = 'readonly'") return r.text.strip() != '0' diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 57ca1ce..ea6d3f4 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -1,89 +1,144 @@ from __future__ import unicode_literals +import logging import six from .utils import comma_join +logger = logging.getLogger('clickhouse_orm') + class Engine(object): - def create_table_sql(self): + def create_table_sql(self, db): raise NotImplementedError() # pragma: no cover class TinyLog(Engine): - def create_table_sql(self): + def create_table_sql(self, db): return 'TinyLog' class Log(Engine): - def create_table_sql(self): + def create_table_sql(self, db): return 'Log' class Memory(Engine): - def create_table_sql(self): + def create_table_sql(self, db): return 'Memory' class MergeTree(Engine): - def __init__(self, date_col, key_cols, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None): - assert type(key_cols) in (list, tuple), 'key_cols must be a list or tuple' + def __init__(self, date_col=None, order_by=(), sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + assert type(order_by) in (list, tuple), 'order_by must be a list or tuple' + assert date_col is None or isinstance(date_col, six.string_types), 'date_col must be string if present' + assert partition_key is None or type(partition_key) in (list, tuple),\ + 'partition_key must be tuple or list if present' + assert (replica_table_path is None) == (replica_name == None), \ + 'both replica_table_path and replica_name must be specified' + + # These values conflict with each other (old and new syntax of table engines. + # So let's control only one of them is given. + assert date_col or partition_key, "You must set either date_col or partition_key" self.date_col = date_col - self.key_cols = key_cols + self.partition_key = partition_key if partition_key else ('toYYYYMM(`%s`)' % date_col,) + + self.order_by = order_by self.sampling_expr = sampling_expr self.index_granularity = index_granularity self.replica_table_path = replica_table_path self.replica_name = replica_name - # TODO verify that both replica fields are either present or missing - def create_table_sql(self): + # I changed field name for new reality and syntax + @property + def key_cols(self): + logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead') + return self.order_by + + @key_cols.setter + def key_cols(self, value): + logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead') + self.order_by = value + + def create_table_sql(self, db): name = self.__class__.__name__ if self.replica_name: name = 'Replicated' + name - params = self._build_sql_params() - return '%s(%s)' % (name, comma_join(params)) - def _build_sql_params(self): + # In ClickHouse 1.1.54310 custom partitioning key was introduced + # https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/ + # Let's check version and use new syntax if available + if db.server_version >= (1, 1, 54310): + partition_sql = "PARTITION BY %s ORDER BY %s" \ + % ('(%s)' % comma_join(self.partition_key), '(%s)' % comma_join(self.order_by)) + + if self.sampling_expr: + partition_sql += " SAMPLE BY %s" % self.sampling_expr + + partition_sql += " SETTINGS index_granularity=%d" % self.index_granularity + + elif not self.date_col: + # Can't import it globally due to circular import + from infi.clickhouse_orm.database import DatabaseException + raise DatabaseException("Custom partitioning is not supported before ClickHouse 1.1.54310. " + "Please update your server or use date_col syntax." + "https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/") + else: + partition_sql = '' + + params = self._build_sql_params(db) + return '%s(%s) %s' % (name, comma_join(params), partition_sql) + + def _build_sql_params(self, db): params = [] if self.replica_name: params += ["'%s'" % self.replica_table_path, "'%s'" % self.replica_name] - params.append(self.date_col) - if self.sampling_expr: - params.append(self.sampling_expr) - params.append('(%s)' % comma_join(self.key_cols)) - params.append(str(self.index_granularity)) + + # In ClickHouse 1.1.54310 custom partitioning key was introduced + # https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/ + # These parameters are process in create_table_sql directly. + # In previous ClickHouse versions this this syntax does not work. + if db.server_version < (1, 1, 54310): + params.append(self.date_col) + if self.sampling_expr: + params.append(self.sampling_expr) + params.append('(%s)' % comma_join(self.order_by)) + params.append(str(self.index_granularity)) + return params class CollapsingMergeTree(MergeTree): - def __init__(self, date_col, key_cols, sign_col, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None): - super(CollapsingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) + def __init__(self, date_col=None, order_by=(), sign_col='sign', sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, + replica_table_path, replica_name, partition_key) self.sign_col = sign_col - def _build_sql_params(self): - params = super(CollapsingMergeTree, self)._build_sql_params() + def _build_sql_params(self, db): + params = super(CollapsingMergeTree, self)._build_sql_params(db) params.append(self.sign_col) return params class SummingMergeTree(MergeTree): - def __init__(self, date_col, key_cols, summing_cols=None, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None): - super(SummingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) + def __init__(self, date_col=None, order_by=(), summing_cols=None, sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, + replica_name, partition_key) assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple' self.summing_cols = summing_cols - def _build_sql_params(self): - params = super(SummingMergeTree, self)._build_sql_params() + def _build_sql_params(self, db): + params = super(SummingMergeTree, self)._build_sql_params(db) if self.summing_cols: params.append('(%s)' % comma_join(self.summing_cols)) return params @@ -91,13 +146,14 @@ class SummingMergeTree(MergeTree): class ReplacingMergeTree(MergeTree): - def __init__(self, date_col, key_cols, ver_col=None, sampling_expr=None, - index_granularity=8192, replica_table_path=None, replica_name=None): - super(ReplacingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) + def __init__(self, date_col=None, order_by=(), ver_col=None, sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None): + super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, + replica_table_path, replica_name, partition_key) self.ver_col = ver_col - def _build_sql_params(self): - params = super(ReplacingMergeTree, self)._build_sql_params() + def _build_sql_params(self, db): + params = super(ReplacingMergeTree, self)._build_sql_params(db) if self.ver_col: params.append(self.ver_col) return params @@ -107,11 +163,12 @@ class Buffer(Engine): """ Buffers the data to write in RAM, periodically flushing it to another table. Must be used in conjuction with a `BufferModel`. - Read more [here](https://clickhouse.yandex/reference_en.html#Buffer). + Read more [here](https://clickhouse.yandex/docs/en/table_engines/buffer/). """ #Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes) - def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): + def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, + min_bytes=10000000, max_bytes=100000000): self.main_model = main_model self.num_layers = num_layers self.min_time = min_time @@ -121,11 +178,11 @@ class Buffer(Engine): self.min_bytes = min_bytes self.max_bytes = max_bytes - def create_table_sql(self, db_name): + def create_table_sql(self, db): # Overriden create_table_sql example: - #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' + # sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % ( - db_name, self.main_model.table_name(), self.num_layers, + db.db_name, self.main_model.table_name(), self.num_layers, self.min_time, self.max_time, self.min_rows, self.max_rows, self.min_bytes, self.max_bytes ) @@ -142,16 +199,58 @@ class Merge(Engine): def __init__(self, table_regex): assert isinstance(table_regex, six.string_types), "'table_regex' parameter must be string" - self.table_regex = table_regex - # Use current database as default - self.db_name = None + def create_table_sql(self, db): + return "Merge(`%s`, '%s')" % (db.db_name, self.table_regex) - def create_table_sql(self): - db_name = ("`%s`" % self.db_name) if self.db_name else 'currentDatabase()' - return "Merge(%s, '%s')" % (db_name, self.table_regex) - def set_db_name(self, db_name): - assert isinstance(db_name, six.string_types), "'db_name' parameter must be string" - self.db_name = db_name +class Distributed(Engine): + """ + The Distributed engine by itself does not store data, + but allows distributed query processing on multiple servers. + Reading is automatically parallelized. + During a read, the table indexes on remote servers are used, if there are any. + + See full documentation here + https://clickhouse.yandex/docs/en/table_engines/distributed.html + """ + def __init__(self, cluster, table=None, sharding_key=None): + """ + :param cluster: what cluster to access data from + :param table: underlying table that actually stores data. + If you are not specifying any table here, ensure that it can be inferred + from your model's superclass (see models.DistributedModel.fix_engine_table) + :param sharding_key: how to distribute data among shards when inserting + straightly into Distributed table, optional + """ + self.cluster = cluster + self.table = table + self.sharding_key = sharding_key + + @property + def table_name(self): + # TODO: circular import is bad + from .models import ModelBase + + table = self.table + + if isinstance(table, ModelBase): + return table.table_name() + + return table + + def create_table_sql(self, db): + name = self.__class__.__name__ + params = self._build_sql_params(db) + return '%s(%s)' % (name, ', '.join(params)) + + def _build_sql_params(self, db): + if self.table_name is None: + raise ValueError("Cannot create {} engine: specify an underlying table".format( + self.__class__.__name__)) + + params = ["`%s`" % p for p in [self.cluster, db.db_name, self.table_name]] + if self.sharding_key: + params.append(self.sharding_key) + return params diff --git a/src/infi/clickhouse_orm/fields.py b/src/infi/clickhouse_orm/fields.py index 5449e9d..fc40790 100644 --- a/src/infi/clickhouse_orm/fields.py +++ b/src/infi/clickhouse_orm/fields.py @@ -39,7 +39,7 @@ class Field(object): data can't be converted. Returns the converted value. Subclasses should override this. The timezone_in_use parameter should be consulted when parsing datetime fields. ''' - return value + return value # pragma: no cover def validate(self, value): ''' diff --git a/src/infi/clickhouse_orm/migrations.py b/src/infi/clickhouse_orm/migrations.py index 324db74..125097f 100644 --- a/src/infi/clickhouse_orm/migrations.py +++ b/src/infi/clickhouse_orm/migrations.py @@ -6,6 +6,7 @@ from .engines import MergeTree from .utils import escape from six.moves import zip +from six import iteritems import logging logger = logging.getLogger('migrations') @@ -65,7 +66,7 @@ class AlterTable(Operation): table_fields = dict(self._get_table_fields(database)) # Identify fields that were deleted from the model - deleted_fields = set(table_fields.keys()) - set(name for name, field in self.model_class._fields) + deleted_fields = set(table_fields.keys()) - set(self.model_class.fields()) for name in deleted_fields: logger.info(' Drop column %s', name) self._alter_table(database, 'DROP COLUMN %s' % name) @@ -73,7 +74,7 @@ class AlterTable(Operation): # Identify fields that were added to the model prev_name = None - for name, field in self.model_class._fields: + for name, field in iteritems(self.model_class.fields()): if name not in table_fields: logger.info(' Add column %s', name) assert prev_name, 'Cannot add a column to the beginning of the table' @@ -89,7 +90,8 @@ class AlterTable(Operation): # The order of class attributes can be changed any time, so we can't count on it # Secondly, MATERIALIZED and ALIAS fields are always at the end of the DESC, so we can't expect them to save # attribute position. Watch https://github.com/Infinidat/infi.clickhouse_orm/issues/47 - model_fields = {name: field.get_sql(with_default_expression=False) for name, field in self.model_class._fields} + model_fields = {name: field.get_sql(with_default_expression=False) + for name, field in iteritems(self.model_class.fields())} for field_name, field_sql in self._get_table_fields(database): # All fields must have been created and dropped by this moment assert field_name in model_fields, 'Model fields and table columns in disagreement' diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index c56b821..d292462 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -1,14 +1,15 @@ from __future__ import unicode_literals import sys +from collections import OrderedDict from logging import getLogger -from six import with_metaclass, reraise +from six import with_metaclass, reraise, iteritems import pytz from .fields import Field, StringField from .utils import parse_tsv from .query import QuerySet -from .engines import Merge +from .engines import Merge, Distributed logger = getLogger('clickhouse_orm') @@ -21,18 +22,28 @@ class ModelBase(type): ad_hoc_model_cache = {} def __new__(cls, name, bases, attrs): - new_cls = super(ModelBase, cls).__new__(cls, str(name), bases, attrs) # Collect fields from parent classes - base_fields = [] + base_fields = dict() for base in bases: if isinstance(base, ModelBase): - base_fields += base._fields + base_fields.update(base._fields) + + fields = base_fields + # Build a list of fields, in the order they were listed in the class - fields = base_fields + [item for item in attrs.items() if isinstance(item[1], Field)] - fields.sort(key=lambda item: item[1].creation_counter) - setattr(new_cls, '_fields', fields) - setattr(new_cls, '_writable_fields', [f for f in fields if not f[1].readonly]) - return new_cls + fields.update({n: f for n, f in iteritems(attrs) if isinstance(f, Field)}) + fields = sorted(iteritems(fields), key=lambda item: item[1].creation_counter) + + # Build a dictionary of default values + defaults = {n: f.to_python(f.default, pytz.UTC) for n, f in fields} + + attrs = dict( + attrs, + _fields=OrderedDict(fields), + _writable_fields=OrderedDict([f for f in fields if not f[1].readonly]), + _defaults=defaults + ) + return super(ModelBase, cls).__new__(cls, str(name), bases, attrs) @classmethod def create_ad_hoc_model(cls, fields, model_name='AdHocModel'): @@ -57,6 +68,10 @@ class ModelBase(type): # Enums if db_type.startswith('Enum'): return orm_fields.BaseEnumField.create_ad_hoc_field(db_type) + # DateTime with timezone + if db_type.startswith('DateTime('): + # Some functions return DateTimeField with timezone in brackets + return orm_fields.DateTimeField() # Arrays if db_type.startswith('Array'): inner_field = cls.create_ad_hoc_field(db_type[6 : -1]) @@ -90,10 +105,12 @@ class Model(with_metaclass(ModelBase)): engine = None # Insert operations are restricted for read only models - readonly = False + _readonly = False # Create table, drop table, insert operations are restricted for system models - system = False + _system = False + + _database = None def __init__(self, **kwargs): ''' @@ -103,20 +120,15 @@ class Model(with_metaclass(ModelBase)): Unrecognized field names will cause an `AttributeError`. ''' super(Model, self).__init__() - - self._database = None - + # Assign default values + self.__dict__.update(self._defaults) # Assign field values from keyword arguments - for name, value in kwargs.items(): + for name, value in iteritems(kwargs): field = self.get_field(name) if field: setattr(self, name, value) else: raise AttributeError('%s does not have a field called %s' % (self.__class__.__name__, name)) - # Assign default values for fields not included in the keyword arguments - for name, field in self._fields: - if name not in kwargs: - setattr(self, name, field.default) def __setattr__(self, name, value): ''' @@ -155,8 +167,7 @@ class Model(with_metaclass(ModelBase)): ''' Gets a `Field` instance given its name, or `None` if not found. ''' - field = getattr(self.__class__, name, None) - return field if isinstance(field, Field) else None + return self._fields.get(name) @classmethod def table_name(cls): @@ -168,32 +179,31 @@ class Model(with_metaclass(ModelBase)): return cls.__name__.lower() @classmethod - def create_table_sql(cls, db_name): + def create_table_sql(cls, db): ''' Returns the SQL command for creating a table for this model. ''' - parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db_name, cls.table_name())] + parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())] cols = [] - for name, field in cls._fields: + for name, field in iteritems(cls.fields()): cols.append(' %s %s' % (name, field.get_sql())) parts.append(',\n'.join(cols)) parts.append(')') - parts.append('ENGINE = ' + cls.engine.create_table_sql()) + parts.append('ENGINE = ' + cls.engine.create_table_sql(db)) return '\n'.join(parts) @classmethod - def drop_table_sql(cls, db_name): + def drop_table_sql(cls, db): ''' Returns the SQL command for deleting this model's table. ''' - return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db_name, cls.table_name()) + return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db.db_name, cls.table_name()) @classmethod - def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None): + def from_tsv(cls, line, field_names, timezone_in_use=pytz.utc, database=None): ''' Create a model instance from a tab-separated line. The line may or may not include a newline. The `field_names` list must match the fields defined in the model, but does not have to include all of them. - If omitted, it is assumed to be the names of all fields in the model, in order of definition. - `line`: the TSV-formatted data. - `field_names`: names of the model fields in the data. @@ -201,7 +211,6 @@ class Model(with_metaclass(ModelBase)): - `database`: if given, sets the database that this instance belongs to. ''' from six import next - field_names = field_names or [name for name, field in cls._fields] values = iter(parse_tsv(line)) kwargs = {} for name in field_names: @@ -221,8 +230,8 @@ class Model(with_metaclass(ModelBase)): - `include_readonly`: if false, returns only fields that can be inserted into database. ''' data = self.__dict__ - fields = self._fields if include_readonly else self._writable_fields - return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in fields) + fields = self.fields(writable=not include_readonly) + return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in iteritems(fields)) def to_dict(self, include_readonly=True, field_names=None): ''' @@ -231,13 +240,13 @@ class Model(with_metaclass(ModelBase)): - `include_readonly`: if false, returns only fields that can be inserted into database. - `field_names`: an iterable of field names to return (optional) ''' - fields = self._fields if include_readonly else self._writable_fields + fields = self.fields(writable=not include_readonly) if field_names is not None: - fields = [f for f in fields if f[0] in field_names] + fields = [f for f in fields if f in field_names] data = self.__dict__ - return {name: data[name] for name, field in fields} + return {name: data[name] for name in fields} @classmethod def objects_in(cls, database): @@ -246,16 +255,41 @@ class Model(with_metaclass(ModelBase)): ''' return QuerySet(cls, database) + @classmethod + def fields(cls, writable=False): + ''' + Returns an `OrderedDict` of the model's fields (from name to `Field` instance). + If `writable` is true, only writable fields are included. + Callers should not modify the dictionary. + ''' + # noinspection PyProtectedMember,PyUnresolvedReferences + return cls._writable_fields if writable else cls._fields + + @classmethod + def is_read_only(cls): + ''' + Returns true if the model is marked as read only. + ''' + return cls._readonly + + @classmethod + def is_system_model(cls): + ''' + Returns true if the model represents a system table. + ''' + return cls._system + class BufferModel(Model): @classmethod - def create_table_sql(cls, db_name): + def create_table_sql(cls, db): ''' Returns the SQL command for creating a table for this model. ''' - parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.engine.main_model.table_name())] - engine_str = cls.engine.create_table_sql(db_name) + parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db.db_name, cls.table_name(), db.db_name, + cls.engine.main_model.table_name())] + engine_str = cls.engine.create_table_sql(db) parts.append(engine_str) return ' '.join(parts) @@ -271,18 +305,84 @@ class MergeModel(Model): # Virtual fields can't be inserted into database _table = StringField(readonly=True) + @classmethod + def create_table_sql(cls, db): + assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance" + return super(MergeModel, cls).create_table_sql(db) + + +# TODO: base class for models that require specific engine + + +class DistributedModel(Model): + """ + Model for Distributed engine + """ + def set_database(self, db): - ''' - Gets the `Database` that this model instance belongs to. - Returns `None` unless the instance was read from the database or written to it. - ''' - assert isinstance(self.engine, Merge), "engine must be engines.Merge instance" - res = super(MergeModel, self).set_database(db) - self.engine.set_db_name(db.db_name) + assert isinstance(self.engine, Distributed), "engine must be engines.Distributed instance" + res = super(DistributedModel, self).set_database(db) return res @classmethod - def create_table_sql(cls, db_name): - assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance" - cls.engine.set_db_name(db_name) - return super(MergeModel, cls).create_table_sql(db_name) + def fix_engine_table(cls): + """ + Remember: Distributed table does not store any data, just provides distributed access to it. + + So if we define a model with engine that has no defined table for data storage + (see FooDistributed below), that table cannot be successfully created. + This routine can automatically fix engine's storage table by finding the first + non-distributed model among your model's superclasses. + + >>> class Foo(Model): + ... id = UInt8Field(1) + ... + >>> class FooDistributed(Foo, DistributedModel): + ... engine = Distributed('my_cluster') + ... + >>> FooDistributed.engine.table + None + >>> FooDistributed.fix_engine() + >>> FooDistributed.engine.table + + + However if you prefer more explicit way of doing things, + you can always mention the Foo model twice without bothering with any fixes: + + >>> class FooDistributedVerbose(Foo, DistributedModel): + ... engine = Distributed('my_cluster', Foo) + >>> FooDistributedVerbose.engine.table + + + See tests.test_engines:DistributedTestCase for more examples + """ + + # apply only when engine has no table defined + if cls.engine.table_name: + return + + # find out all the superclasses of the Model that store any data + storage_models = [b for b in cls.__bases__ if issubclass(b, Model) + and not issubclass(b, DistributedModel)] + if not storage_models: + raise TypeError("When defining Distributed engine without the table_name " + "ensure that your model has a parent model") + + if len(storage_models) > 1: + raise TypeError("When defining Distributed engine without the table_name " + "ensure that your model has exactly one non-distributed superclass") + + # enable correct SQL for engine + cls.engine.table = storage_models[0] + + @classmethod + def create_table_sql(cls, db): + assert isinstance(cls.engine, Distributed), "engine must be engines.Distributed instance" + + cls.fix_engine_table() + + parts = [ + 'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` AS `{0}`.`{2}`'.format( + db.db_name, cls.table_name(), cls.engine.table_name), + 'ENGINE = ' + cls.engine.create_table_sql(db)] + return '\n'.join(parts) diff --git a/src/infi/clickhouse_orm/query.py b/src/infi/clickhouse_orm/query.py index 0bf764a..4e079e5 100644 --- a/src/infi/clickhouse_orm/query.py +++ b/src/infi/clickhouse_orm/query.py @@ -7,7 +7,6 @@ from .utils import comma_join # TODO -# - and/or between Q objects # - check that field names are valid # - operators for arrays: length, has, empty @@ -103,6 +102,29 @@ class NotOperator(Operator): return 'NOT (%s)' % self._base_operator.to_sql(model_cls, field_name, value) +class BetweenOperator(Operator): + """ + An operator that implements BETWEEN. + Accepts list or tuple of two elements and generates sql condition: + - 'BETWEEN value[0] AND value[1]' if value[0] and value[1] are not None and not empty + Then imitations of BETWEEN, where one of two limits is missing + - '>= value[0]' if value[1] is None or empty + - '<= value[1]' if value[0] is None or empty + """ + + def to_sql(self, model_cls, field_name, value): + field = getattr(model_cls, field_name) + value0 = field.to_db_string( + field.to_python(value[0], pytz.utc)) if value[0] is not None or len(str(value[0])) > 0 else None + value1 = field.to_db_string( + field.to_python(value[1], pytz.utc)) if value[1] is not None or len(str(value[1])) > 0 else None + if value0 and value1: + return '%s BETWEEN %s AND %s' % (field_name, value0, value1) + if value0 and not value1: + return ' '.join([field_name, '>=', value0]) + if value1 and not value0: + return ' '.join([field_name, '<=', value1]) + # Define the set of builtin operators _operators = {} @@ -116,6 +138,7 @@ register_operator('gt', SimpleOperator('>')) register_operator('gte', SimpleOperator('>=')) register_operator('lt', SimpleOperator('<')) register_operator('lte', SimpleOperator('<=')) +register_operator('between', BetweenOperator()) register_operator('in', InOperator()) register_operator('not_in', NotOperator(InOperator())) register_operator('contains', LikeOperator('%{}%')) @@ -134,7 +157,11 @@ class FOV(object): def __init__(self, field_name, operator, value): self._field_name = field_name - self._operator = _operators[operator] + self._operator = _operators.get(operator) + if self._operator is None: + # The field name contains __ like my__field + self._field_name = field_name + '__' + operator + self._operator = _operators['eq'] self._value = value def to_sql(self, model_cls): @@ -143,9 +170,23 @@ class FOV(object): class Q(object): - def __init__(self, **kwargs): - self._fovs = [self._build_fov(k, v) for k, v in six.iteritems(kwargs)] + AND_MODE = 'AND' + OR_MODE = 'OR' + + def __init__(self, **filter_fields): + self._fovs = [self._build_fov(k, v) for k, v in six.iteritems(filter_fields)] + self._l_child = None + self._r_child = None self._negate = False + self._mode = self.AND_MODE + + @classmethod + def _construct_from(cls, l_child, r_child, mode): + q = Q() + q._l_child = l_child + q._r_child = r_child + q._mode = mode + return q def _build_fov(self, key, value): if '__' in key: @@ -155,13 +196,24 @@ class Q(object): return FOV(field_name, operator, value) def to_sql(self, model_cls): - if not self._fovs: - return '1' - sql = ' AND '.join(fov.to_sql(model_cls) for fov in self._fovs) + if self._fovs: + sql = ' {} '.format(self._mode).join(fov.to_sql(model_cls) for fov in self._fovs) + else: + if self._l_child and self._r_child: + sql = '({}) {} ({})'.format( + self._l_child.to_sql(model_cls), self._mode, self._r_child.to_sql(model_cls)) + else: + return '1' if self._negate: sql = 'NOT (%s)' % sql return sql + def __or__(self, other): + return Q._construct_from(self, other, self.OR_MODE) + + def __and__(self, other): + return Q._construct_from(self, other, self.AND_MODE) + def __invert__(self): q = copy(self) q._negate = True @@ -286,20 +338,24 @@ class QuerySet(object): qs._fields = field_names return qs - def filter(self, **kwargs): + def filter(self, *q, **filter_fields): """ Returns a copy of this queryset that includes only rows matching the conditions. + Add q object to query if it specified. """ qs = copy(self) - qs._q = list(self._q) + [Q(**kwargs)] + if q: + qs._q = list(self._q) + list(q) + else: + qs._q = list(self._q) + [Q(**filter_fields)] return qs - def exclude(self, **kwargs): + def exclude(self, **filter_fields): """ Returns a copy of this queryset that excludes all rows matching the conditions. """ qs = copy(self) - qs._q = list(self._q) + [~Q(**kwargs)] + qs._q = list(self._q) + [~Q(**filter_fields)] return qs def paginate(self, page_num=1, page_size=100): diff --git a/src/infi/clickhouse_orm/system_models.py b/src/infi/clickhouse_orm/system_models.py index 5ca3efd..7341d14 100644 --- a/src/infi/clickhouse_orm/system_models.py +++ b/src/infi/clickhouse_orm/system_models.py @@ -1,6 +1,6 @@ """ -This file contains system readonly models that can be got from database -https://clickhouse.yandex/reference_en.html#System tables +This file contains system readonly models that can be got from the database +https://clickhouse.yandex/docs/en/system_tables/ """ from __future__ import unicode_literals from six import string_types @@ -15,7 +15,7 @@ class SystemPart(Model): """ Contains information about parts of a table in the MergeTree family. This model operates only fields, described in the reference. Other fields are ignored. - https://clickhouse.yandex/reference_en.html#system.parts + https://clickhouse.yandex/docs/en/system_tables/system.parts/ """ OPERATIONS = frozenset({'DETACH', 'DROP', 'ATTACH', 'FREEZE', 'FETCH'}) @@ -56,7 +56,7 @@ class SystemPart(Model): """ Next methods return SQL for some operations, which can be done with partitions - https://clickhouse.yandex/reference_en.html#Manipulations with partitions and parts + https://clickhouse.yandex/docs/en/query_language/queries/#manipulations-with-partitions-and-parts """ def _partition_operation_sql(self, operation, settings=None, from_part=None): """ @@ -68,7 +68,8 @@ class SystemPart(Model): """ operation = operation.upper() assert operation in self.OPERATIONS, "operation must be in [%s]" % comma_join(self.OPERATIONS) - sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (self._database.db_name, self.table, operation, self.partition) + + sql = "ALTER TABLE `%s`.`%s` %s PARTITION %s" % (self._database.db_name, self.table, operation, self.partition) if from_part is not None: sql += " FROM %s" % from_part self._database.raw(sql, settings=settings, stream=False) @@ -126,7 +127,7 @@ class SystemPart(Model): assert isinstance(conditions, string_types), "conditions must be a string" if conditions: conditions += " AND" - field_names = ','.join([f[0] for f in cls._fields]) + field_names = ','.join(cls.fields()) return database.select("SELECT %s FROM %s WHERE %s database='%s'" % (field_names, cls.table_name(), conditions, database.db_name), model_class=cls) diff --git a/tests/base_test_with_data.py b/tests/base_test_with_data.py index 352d3d3..90a328d 100644 --- a/tests/base_test_with_data.py +++ b/tests/base_test_with_data.py @@ -21,8 +21,8 @@ class TestCaseWithData(unittest.TestCase): self.database.drop_table(Person) self.database.drop_database() - def _insert_and_check(self, data, count): - self.database.insert(data) + def _insert_and_check(self, data, count, batch_size=1000): + self.database.insert(data, batch_size=batch_size) self.assertEquals(count, self.database.count(Person)) for instance in data: self.assertEquals(self.database, instance.get_database()) diff --git a/tests/test_database.py b/tests/test_database.py index 0214f36..d4cf387 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -2,7 +2,7 @@ from __future__ import unicode_literals import unittest -from infi.clickhouse_orm.database import Database, DatabaseException +from infi.clickhouse_orm.database import ServerError, DatabaseException from .base_test_with_data import * @@ -20,6 +20,12 @@ class DatabaseTestCase(TestCaseWithData): def test_insert__empty(self): self._insert_and_check([], 0) + def test_insert__small_batches(self): + self._insert_and_check(self._sample_data(), len(data), batch_size=10) + + def test_insert__medium_batches(self): + self._insert_and_check(self._sample_data(), len(data), batch_size=100) + def test_count(self): self.database.insert(self._sample_data()) self.assertEquals(self.database.count(Person), 100) @@ -131,14 +137,42 @@ class DatabaseTestCase(TestCaseWithData): self.assertEqual(results, "Whitney\tDurham\t1977-09-15\t1.72\nWhitney\tScott\t1971-07-04\t1.7\n") def test_invalid_user(self): - with self.assertRaises(DatabaseException): + with self.assertRaises(ServerError) as cm: Database(self.database.db_name, username='default', password='wrong') + exc = cm.exception + self.assertEqual(exc.code, 193) + self.assertEqual(exc.message, 'Wrong password for user default') + def test_nonexisting_db(self): db = Database('db_not_here', autocreate=False) - with self.assertRaises(DatabaseException): + with self.assertRaises(ServerError) as cm: db.create_table(Person) + exc = cm.exception + self.assertEqual(exc.code, 81) + self.assertEqual(exc.message, "Database db_not_here doesn't exist") + def test_preexisting_db(self): db = Database(self.database.db_name, autocreate=False) db.count(Person) + + def test_missing_engine(self): + class EnginelessModel(Model): + float_field = Float32Field() + with self.assertRaises(DatabaseException) as cm: + self.database.create_table(EnginelessModel) + self.assertEqual(str(cm.exception), 'EnginelessModel class must define an engine') + + def test_potentially_problematic_field_names(self): + class Model1(Model): + system = StringField() + readonly = StringField() + engine = Memory() + instance = Model1(system='s', readonly='r') + self.assertEquals(instance.to_dict(), dict(system='s', readonly='r')) + self.database.create_table(Model1) + self.database.insert([instance]) + instance = Model1.objects_in(self.database)[0] + self.assertEquals(instance.to_dict(), dict(system='s', readonly='r')) + diff --git a/tests/test_datetime_fields.py b/tests/test_datetime_fields.py new file mode 100644 index 0000000..ebe1e6c --- /dev/null +++ b/tests/test_datetime_fields.py @@ -0,0 +1,42 @@ +from __future__ import unicode_literals +import unittest + +from infi.clickhouse_orm.database import Database +from infi.clickhouse_orm.models import Model +from infi.clickhouse_orm.fields import * +from infi.clickhouse_orm.engines import * + + +class DateFieldsTest(unittest.TestCase): + + def setUp(self): + self.database = Database('test-db') + self.database.create_table(ModelWithDate) + + def tearDown(self): + self.database.drop_database() + + def test_ad_hoc_model(self): + self.database.insert([ + ModelWithDate(date_field='2016-08-30', datetime_field='2016-08-30 03:50:00'), + ModelWithDate(date_field='2016-08-31', datetime_field='2016-08-31 01:30:00') + ]) + + # toStartOfHour returns DateTime('Asia/Yekaterinburg') in my case, so I test it here to + query = 'SELECT toStartOfHour(datetime_field) as hour_start, * from $db.modelwithdate ORDER BY date_field' + results = list(self.database.select(query)) + self.assertEquals(len(results), 2) + self.assertEquals(results[0].date_field, datetime.date(2016, 8, 30)) + self.assertEquals(results[0].datetime_field, datetime.datetime(2016, 8, 30, 3, 50, 0, tzinfo=pytz.UTC)) + self.assertEquals(results[0].hour_start, datetime.datetime(2016, 8, 30, 3, 0, 0, tzinfo=pytz.UTC)) + self.assertEquals(results[1].date_field, datetime.date(2016, 8, 31)) + self.assertEquals(results[1].datetime_field, datetime.datetime(2016, 8, 31, 1, 30, 0, tzinfo=pytz.UTC)) + self.assertEquals(results[1].hour_start, datetime.datetime(2016, 8, 31, 1, 0, 0, tzinfo=pytz.UTC)) + + +class ModelWithDate(Model): + + date_field = DateField() + datetime_field = DateTimeField() + + engine = MergeTree('date_field', ('date_field',)) diff --git a/tests/test_engines.py b/tests/test_engines.py index 65497ca..85966df 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -1,8 +1,9 @@ from __future__ import unicode_literals import unittest -from infi.clickhouse_orm.database import Database, DatabaseException -from infi.clickhouse_orm.models import Model, MergeModel +from infi.clickhouse_orm.system_models import SystemPart +from infi.clickhouse_orm.database import Database, DatabaseException, ServerError +from infi.clickhouse_orm.models import Model, MergeModel, DistributedModel from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.engines import * @@ -10,7 +11,7 @@ import logging logging.getLogger("requests").setLevel(logging.WARNING) -class EnginesTestCase(unittest.TestCase): +class _EnginesHelperTestCase(unittest.TestCase): def setUp(self): self.database = Database('test-db') @@ -18,6 +19,8 @@ class EnginesTestCase(unittest.TestCase): def tearDown(self): self.database.drop_database() + +class EnginesTestCase(_EnginesHelperTestCase): def _create_and_insert(self, model_class): self.database.create_table(model_class) self.database.insert([ @@ -31,7 +34,7 @@ class EnginesTestCase(unittest.TestCase): def test_merge_tree_with_sampling(self): class TestModel(SampleModel): - engine = MergeTree('date', ('date', 'event_id', 'event_group'), sampling_expr='intHash32(event_id)') + engine = MergeTree('date', ('date', 'event_id', 'event_group', 'intHash32(event_id)'), sampling_expr='intHash32(event_id)') self._create_and_insert(TestModel) def test_merge_tree_with_granularity(self): @@ -41,8 +44,18 @@ class EnginesTestCase(unittest.TestCase): def test_replicated_merge_tree(self): engine = MergeTree('date', ('date', 'event_id', 'event_group'), replica_table_path='/clickhouse/tables/{layer}-{shard}/hits', replica_name='{replica}') - expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)" - self.assertEquals(engine.create_table_sql(), expected) + # In ClickHouse 1.1.54310 custom partitioning key was introduced and new syntax is used + if self.database.server_version >= (1, 1, 54310): + expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}') PARTITION BY (toYYYYMM(`date`)) ORDER BY (date, event_id, event_group) SETTINGS index_granularity=8192" + else: + expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)" + self.assertEquals(engine.create_table_sql(self.database), expected) + + def test_replicated_merge_tree_incomplete(self): + with self.assertRaises(AssertionError): + MergeTree('date', ('date', 'event_id', 'event_group'), replica_table_path='/clickhouse/tables/{layer}-{shard}/hits') + with self.assertRaises(AssertionError): + MergeTree('date', ('date', 'event_id', 'event_group'), replica_name='{replica}') def test_collapsing_merge_tree(self): class TestModel(SampleModel): @@ -124,6 +137,34 @@ class EnginesTestCase(unittest.TestCase): 'event_uversion': 2 }, res[1].to_dict(include_readonly=True)) + def test_custom_partitioning(self): + class TestModel(SampleModel): + engine = MergeTree( + order_by=('date', 'event_id', 'event_group'), + partition_key=('toYYYYMM(date)', 'event_group') + ) + + class TestCollapseModel(SampleModel): + sign = Int8Field() + + engine = CollapsingMergeTree( + sign_col='sign', + order_by=('date', 'event_id', 'event_group'), + partition_key=('toYYYYMM(date)', 'event_group') + ) + + self._create_and_insert(TestModel) + self._create_and_insert(TestCollapseModel) + + # Result order may be different, lets sort manually + parts = sorted(list(SystemPart.get(self.database)), key=lambda x: x.table) + + self.assertEqual(2, len(parts)) + self.assertEqual('testcollapsemodel', parts[0].table) + self.assertEqual('(201701, 13)', parts[0].partition) + self.assertEqual('testmodel', parts[1].table) + self.assertEqual('(201701, 13)', parts[1].partition) + class SampleModel(Model): @@ -133,3 +174,134 @@ class SampleModel(Model): event_count = UInt16Field() event_version = Int8Field() event_uversion = UInt8Field(materialized='abs(event_version)') + + +class DistributedTestCase(_EnginesHelperTestCase): + def test_without_table_name(self): + engine = Distributed('my_cluster') + + with self.assertRaises(ValueError) as cm: + engine.create_table_sql(self.database) + + exc = cm.exception + self.assertEqual(str(exc), 'Cannot create Distributed engine: specify an underlying table') + + def test_with_table_name(self): + engine = Distributed('my_cluster', 'foo') + sql = engine.create_table_sql(self.database) + self.assertEqual(sql, 'Distributed(`my_cluster`, `test-db`, `foo`)') + + class TestModel(SampleModel): + engine = TinyLog() + + def _create_distributed(self, shard_name, underlying=TestModel): + class TestDistributedModel(DistributedModel, underlying): + engine = Distributed(shard_name, underlying) + + self.database.create_table(underlying) + self.database.create_table(TestDistributedModel) + return TestDistributedModel + + def test_bad_cluster_name(self): + with self.assertRaises(ServerError) as cm: + d_model = self._create_distributed('cluster_name') + self.database.count(d_model) + + exc = cm.exception + self.assertEqual(exc.code, 170) + self.assertEqual(exc.message, "Requested cluster 'cluster_name' not found") + + def test_verbose_engine_two_superclasses(self): + class TestModel2(SampleModel): + engine = Log() + + class TestDistributedModel(DistributedModel, self.TestModel, TestModel2): + engine = Distributed('test_shard_localhost', self.TestModel) + + self.database.create_table(self.TestModel) + self.database.create_table(TestDistributedModel) + self.assertEqual(self.database.count(TestDistributedModel), 0) + + def test_minimal_engine(self): + class TestDistributedModel(DistributedModel, self.TestModel): + engine = Distributed('test_shard_localhost') + + self.database.create_table(self.TestModel) + self.database.create_table(TestDistributedModel) + + self.assertEqual(self.database.count(TestDistributedModel), 0) + + def test_minimal_engine_two_superclasses(self): + class TestModel2(SampleModel): + engine = Log() + + class TestDistributedModel(DistributedModel, self.TestModel, TestModel2): + engine = Distributed('test_shard_localhost') + + self.database.create_table(self.TestModel) + with self.assertRaises(TypeError) as cm: + self.database.create_table(TestDistributedModel) + + exc = cm.exception + self.assertEqual(str(exc), 'When defining Distributed engine without the table_name ensure ' + 'that your model has exactly one non-distributed superclass') + + def test_minimal_engine_no_superclasses(self): + class TestDistributedModel(DistributedModel): + engine = Distributed('test_shard_localhost') + + self.database.create_table(self.TestModel) + with self.assertRaises(TypeError) as cm: + self.database.create_table(TestDistributedModel) + + exc = cm.exception + self.assertEqual(str(exc), 'When defining Distributed engine without the table_name ensure ' + 'that your model has a parent model') + + def _test_insert_select(self, local_to_distributed, test_model=TestModel, include_readonly=True): + d_model = self._create_distributed('test_shard_localhost', underlying=test_model) + + if local_to_distributed: + to_insert, to_select = test_model, d_model + else: + to_insert, to_select = d_model, test_model + + self.database.insert([ + to_insert(date='2017-01-01', event_id=1, event_group=1, event_count=1, event_version=1), + to_insert(date='2017-01-02', event_id=2, event_group=2, event_count=2, event_version=2) + ]) + # event_uversion is materialized field. So * won't select it and it will be zero + res = self.database.select('SELECT *, event_uversion FROM $table ORDER BY event_id', + model_class=to_select) + res = [row for row in res] + self.assertEqual(2, len(res)) + self.assertDictEqual({ + 'date': datetime.date(2017, 1, 1), + 'event_id': 1, + 'event_group': 1, + 'event_count': 1, + 'event_version': 1, + 'event_uversion': 1 + }, res[0].to_dict(include_readonly=include_readonly)) + self.assertDictEqual({ + 'date': datetime.date(2017, 1, 2), + 'event_id': 2, + 'event_group': 2, + 'event_count': 2, + 'event_version': 2, + 'event_uversion': 2 + }, res[1].to_dict(include_readonly=include_readonly)) + + @unittest.skip("Bad support of materialized fields in Distributed tables " + "https://groups.google.com/forum/#!topic/clickhouse/XEYRRwZrsSc") + def test_insert_distributed_select_local(self): + return self._test_insert_select(local_to_distributed=False) + + def test_insert_local_select_distributed(self): + return self._test_insert_select(local_to_distributed=True) + + def _test_insert_distributed_select_local_no_materialized_fields(self): + class TestModel2(self.TestModel): + event_uversion = UInt8Field(readonly=True) + + return self._test_insert_select(local_to_distributed=False, test_model=TestModel2, include_readonly=False) diff --git a/tests/test_inheritance.py b/tests/test_inheritance.py index f209995..91d975c 100644 --- a/tests/test_inheritance.py +++ b/tests/test_inheritance.py @@ -3,6 +3,7 @@ import unittest import datetime import pytz +from infi.clickhouse_orm.database import Database from infi.clickhouse_orm.models import Model from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.engines import * @@ -11,7 +12,7 @@ from infi.clickhouse_orm.engines import * class InheritanceTestCase(unittest.TestCase): def assertFieldNames(self, model_class, names): - self.assertEquals(names, [name for name, field in model_class._fields]) + self.assertEquals(names, list(model_class.fields())) def test_field_inheritance(self): self.assertFieldNames(ParentModel, ['date_field', 'int_field']) @@ -19,9 +20,10 @@ class InheritanceTestCase(unittest.TestCase): self.assertFieldNames(Model2, ['date_field', 'int_field', 'float_field']) def test_create_table_sql(self): - sql1 = ParentModel.create_table_sql('default') - sql2 = Model1.create_table_sql('default') - sql3 = Model2.create_table_sql('default') + default_db = Database('default') + sql1 = ParentModel.create_table_sql(default_db) + sql2 = Model1.create_table_sql(default_db) + sql3 = Model2.create_table_sql(default_db) self.assertNotEqual(sql1, sql2) self.assertNotEqual(sql1, sql3) self.assertNotEqual(sql2, sql3) diff --git a/tests/test_querysets.py b/tests/test_querysets.py index cbbc65d..4176341 100644 --- a/tests/test_querysets.py +++ b/tests/test_querysets.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals, print_function import unittest from infi.clickhouse_orm.database import Database +from infi.clickhouse_orm.query import Q from .base_test_with_data import * import logging from datetime import date, datetime @@ -59,6 +60,15 @@ class QuerySetTestCase(TestCaseWithData): self._test_qs(qs.filter(first_name__iendswith='ia'), 3) # case insensitive self._test_qs(qs.filter(first_name__iendswith=''), 100) # empty suffix + def test_filter_with_q_objects(self): + qs = Person.objects_in(self.database) + self._test_qs(qs.filter(Q(first_name='Ciaran')), 2) + self._test_qs(qs.filter(Q(first_name='Ciaran') | Q(first_name='Chelsea')), 3) + self._test_qs(qs.filter(Q(first_name__in=['Warren', 'Whilemina', 'Whitney']) & Q(height__gte=1.7)), 3) + self._test_qs(qs.filter((Q(first_name__in=['Warren', 'Whilemina', 'Whitney']) & Q(height__gte=1.7) | + (Q(first_name__in=['Victoria', 'Victor', 'Venus']) & Q(height__lt=1.7)))), 4) + self._test_qs(qs.filter(Q(first_name='Elton') & ~Q(last_name='Smith')), 1) + def test_filter_unicode_string(self): self.database.insert([ Person(first_name=u'דונלד', last_name=u'דאק') @@ -324,6 +334,20 @@ class AggregateTestCase(TestCaseWithData): print(qs.as_sql()) self.assertEquals(qs.count(), 1) + def test_double_underscore_field(self): + class Mdl(Model): + the__number = Int32Field() + the__next__number = Int32Field() + engine = Memory() + qs = Mdl.objects_in(self.database).filter(the__number=1) + self.assertEquals(qs.conditions_as_sql(), 'the__number = 1') + qs = Mdl.objects_in(self.database).filter(the__number__gt=1) + self.assertEquals(qs.conditions_as_sql(), 'the__number > 1') + qs = Mdl.objects_in(self.database).filter(the__next__number=1) + self.assertEquals(qs.conditions_as_sql(), 'the__next__number = 1') + qs = Mdl.objects_in(self.database).filter(the__next__number__gt=1) + self.assertEquals(qs.conditions_as_sql(), 'the__next__number > 1') + Color = Enum('Color', u'red blue green yellow brown white black') @@ -341,3 +365,5 @@ class SampleModel(Model): class Numbers(Model): number = UInt64Field() + + diff --git a/tests/test_readonly.py b/tests/test_readonly.py index facbaa0..73c7d26 100644 --- a/tests/test_readonly.py +++ b/tests/test_readonly.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- from __future__ import unicode_literals -from infi.clickhouse_orm.database import DatabaseException +from infi.clickhouse_orm.database import DatabaseException, ServerError from .base_test_with_data import * @@ -12,22 +12,34 @@ class ReadonlyTestCase(TestCaseWithData): orig_database = self.database try: self.database = Database(orig_database.db_name, username=username, readonly=True) - with self.assertRaises(DatabaseException): + with self.assertRaises(ServerError) as cm: self._insert_and_check(self._sample_data(), len(data)) + self._check_db_readonly_err(cm.exception) + self.assertEquals(self.database.count(Person), 100) list(self.database.select('SELECT * from $table', Person)) - with self.assertRaises(DatabaseException): + with self.assertRaises(ServerError) as cm: self.database.drop_table(Person) - with self.assertRaises(DatabaseException): + self._check_db_readonly_err(cm.exception, drop_table=True) + + with self.assertRaises(ServerError) as cm: self.database.drop_database() - except DatabaseException as e: - if 'Unknown user' in six.text_type(e): + self._check_db_readonly_err(cm.exception, drop_table=True) + except ServerError as e: + if e.code == 192 and e.message.startswith('Unknown user'): raise unittest.SkipTest('Database user "%s" is not defined' % username) else: raise finally: self.database = orig_database + def _check_db_readonly_err(self, exc, drop_table=None): + self.assertEqual(exc.code, 164) + if drop_table: + self.assertEqual(exc.message, 'Cannot drop table in readonly mode') + else: + self.assertEqual(exc.message, 'Cannot insert into table in readonly mode') + def test_readonly_db_with_default_user(self): self._test_readonly_db('default') @@ -36,6 +48,7 @@ class ReadonlyTestCase(TestCaseWithData): def test_insert_readonly(self): m = ReadOnlyModel(name='readonly') + self.database.create_table(ReadOnlyModel) with self.assertRaises(DatabaseException): self.database.insert([m]) @@ -47,7 +60,7 @@ class ReadonlyTestCase(TestCaseWithData): class ReadOnlyModel(Model): - readonly = True + _readonly = True name = StringField() date = DateField() diff --git a/tests/test_system_models.py b/tests/test_system_models.py index 54b6650..b9576ac 100644 --- a/tests/test_system_models.py +++ b/tests/test_system_models.py @@ -1,7 +1,10 @@ from __future__ import unicode_literals + import unittest from datetime import date + import os + from infi.clickhouse_orm.database import Database, DatabaseException from infi.clickhouse_orm.engines import * from infi.clickhouse_orm.fields import * @@ -37,7 +40,9 @@ class SystemPartTest(unittest.TestCase): def setUp(self): self.database = Database('test-db') self.database.create_table(TestTable) + self.database.create_table(CustomPartitionedTable) self.database.insert([TestTable(date_field=date.today())]) + self.database.insert([CustomPartitionedTable(date_field=date.today(), group_field=13)]) def tearDown(self): self.database.drop_database() @@ -51,40 +56,46 @@ class SystemPartTest(unittest.TestCase): def test_get_all(self): parts = SystemPart.get(self.database) - self.assertEqual(len(list(parts)), 1) + self.assertEqual(len(list(parts)), 2) def test_get_active(self): parts = list(SystemPart.get_active(self.database)) - self.assertEqual(len(parts), 1) + self.assertEqual(len(parts), 2) parts[0].detach() - self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) + self.assertEqual(len(list(SystemPart.get_active(self.database))), 1) def test_get_conditions(self): parts = list(SystemPart.get(self.database, conditions="table='testtable'")) self.assertEqual(len(parts), 1) - parts = list(SystemPart.get(self.database, conditions=u"table='othertable'")) + parts = list(SystemPart.get(self.database, conditions=u"table='custompartitionedtable'")) + self.assertEqual(len(parts), 1) + parts = list(SystemPart.get(self.database, conditions=u"table='invalidtable'")) self.assertEqual(len(parts), 0) def test_attach_detach(self): parts = list(SystemPart.get_active(self.database)) - self.assertEqual(len(parts), 1) - parts[0].detach() + self.assertEqual(len(parts), 2) + for p in parts: + p.detach() self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) - parts[0].attach() - self.assertEqual(len(list(SystemPart.get_active(self.database))), 1) + for p in parts: + p.attach() + self.assertEqual(len(list(SystemPart.get_active(self.database))), 2) def test_drop(self): parts = list(SystemPart.get_active(self.database)) - parts[0].drop() + for p in parts: + p.drop() self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) def test_freeze(self): parts = list(SystemPart.get(self.database)) # There can be other backups in the folder prev_backups = set(self._get_backups()) - parts[0].freeze() + for p in parts: + p.freeze() backups = set(self._get_backups()) - self.assertEqual(len(backups), len(prev_backups) + 1) + self.assertEqual(len(backups), len(prev_backups) + 2) def test_fetch(self): # TODO Not tested, as I have no replication set @@ -97,5 +108,12 @@ class TestTable(Model): engine = MergeTree('date_field', ('date_field',)) +class CustomPartitionedTable(Model): + date_field = DateField() + group_field = UInt32Field() + + engine = MergeTree(order_by=('date_field', 'group_field'), partition_key=('toYYYYMM(date_field)', 'group_field')) + + class SystemTestModel(Model): - system = True + _system = True diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..81299c8 --- /dev/null +++ b/tox.ini @@ -0,0 +1,12 @@ +[tox] +envlist = py27, py35, pypy + +[testenv] +deps = + nose + flake8 + +commands = + {envpython} -m compileall -q src/ tests/ + # {envbindir}/flake8 src/ tests/ --max-line-length=120 + nosetests -v {posargs}