Finished Release v1.0.0

This commit is contained in:
Itai Shirav 2018-06-11 13:33:49 +03:00
commit dc890dde5e
29 changed files with 1132 additions and 202 deletions

3
.gitignore vendored
View File

@ -59,3 +59,6 @@ src/infi/clickhouse_orm/__version__.py
bootstrap.py
htmldocs/
# tox
.tox/

View File

@ -1,6 +1,24 @@
Change Log
==========
v1.0.0
------
- Add support for compound filters with Q objects (desile)
- Add support for BETWEEN operator (desile)
- Distributed engine support (tsionyx)
- `_fields` and `_writable_fields` are OrderedDicts - note that this might break backwards compatibility (tsionyx)
- Improve error messages returned from the database with the `ServerError` class (tsionyx)
- Added support for custom partitioning (M1hacka)
- Added attribute `server_version` to Database class (M1hacka)
- Changed `Engine.create_table_sql()`, `Engine.drop_table_sql()`, `Model.create_table_sql()`, `Model.drop_table_sql()` parameter to db from db_name (M1hacka)
- Fix parsing of datetime column type when it includes a timezone (M1hacka)
- Rename `Model.system` to `Model._system` to prevent collision with a column that has the same name
- Rename `Model.readonly` to `Model._readonly` to prevent collision with a column that has the same name
- The `field_names` argument to `Model.to_tsv` is now mandatory
- Improve creation time of model instances by keeping a dictionary of default values
- Fix queryset bug when field name contains double underscores (YouCanKeepSilence)
- Prevent exception when determining timezone of old ClickHouse versions (vv-p)
v0.9.8
------
- Bug fix: add field names list explicitly to Database.insert method (anci)

View File

@ -4,6 +4,8 @@ newest = false
download-cache = .cache
develop = .
parts =
relative-paths = true
extensions = buildout.wheel
[project]
name = infi.clickhouse_orm

View File

@ -144,24 +144,31 @@ invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`.
#### Model.create_table_sql(db_name)
#### Model.create_table_sql(db)
Returns the SQL command for creating a table for this model.
#### Model.drop_table_sql(db_name)
#### Model.drop_table_sql(db)
Returns the SQL command for deleting this model's table.
#### Model.from_tsv(line, field_names=None, timezone_in_use=UTC, database=None)
#### Model.fields(writable=False)
Returns an `OrderedDict` of the model's fields (from name to `Field` instance).
If `writable` is true, only writable fields are included.
Callers should not modify the dictionary.
#### Model.from_tsv(line, field_names, timezone_in_use=UTC, database=None)
Create a model instance from a tab-separated line. The line may or may not include a newline.
The `field_names` list must match the fields defined in the model, but does not have to include all of them.
If omitted, it is assumed to be the names of all fields in the model, in order of definition.
- `line`: the TSV-formatted data.
- `field_names`: names of the model fields in the data.
@ -182,6 +189,18 @@ Returns `None` unless the instance was read from the database or written to it.
Gets a `Field` instance given its name, or `None` if not found.
#### Model.is_read_only()
Returns true if the model is marked as read only.
#### Model.is_system_model()
Returns true if the model represents a system table.
#### Model.objects_in(database)
@ -233,24 +252,31 @@ invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`.
#### BufferModel.create_table_sql(db_name)
#### BufferModel.create_table_sql(db)
Returns the SQL command for creating a table for this model.
#### BufferModel.drop_table_sql(db_name)
#### BufferModel.drop_table_sql(db)
Returns the SQL command for deleting this model's table.
#### BufferModel.from_tsv(line, field_names=None, timezone_in_use=UTC, database=None)
#### BufferModel.fields(writable=False)
Returns an `OrderedDict` of the model's fields (from name to `Field` instance).
If `writable` is true, only writable fields are included.
Callers should not modify the dictionary.
#### BufferModel.from_tsv(line, field_names, timezone_in_use=UTC, database=None)
Create a model instance from a tab-separated line. The line may or may not include a newline.
The `field_names` list must match the fields defined in the model, but does not have to include all of them.
If omitted, it is assumed to be the names of all fields in the model, in order of definition.
- `line`: the TSV-formatted data.
- `field_names`: names of the model fields in the data.
@ -271,6 +297,18 @@ Returns `None` unless the instance was read from the database or written to it.
Gets a `Field` instance given its name, or `None` if not found.
#### BufferModel.is_read_only()
Returns true if the model is marked as read only.
#### BufferModel.is_system_model()
Returns true if the model represents a system table.
#### BufferModel.objects_in(database)
@ -309,6 +347,143 @@ Returns the instance's column values as a tab-separated line. A newline is not i
- `include_readonly`: if false, returns only fields that can be inserted into database.
### DistributedModel
Extends Model
Model for Distributed engine
#### DistributedModel(**kwargs)
Creates a model instance, using keyword arguments as field values.
Since values are immediately converted to their Pythonic type,
invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`.
#### DistributedModel.create_table_sql(db)
#### DistributedModel.drop_table_sql(db)
Returns the SQL command for deleting this model's table.
#### DistributedModel.fields(writable=False)
Returns an `OrderedDict` of the model's fields (from name to `Field` instance).
If `writable` is true, only writable fields are included.
Callers should not modify the dictionary.
#### DistributedModel.fix_engine_table()
Remember: Distributed table does not store any data, just provides distributed access to it.
So if we define a model with engine that has no defined table for data storage
(see FooDistributed below), that table cannot be successfully created.
This routine can automatically fix engine's storage table by finding the first
non-distributed model among your model's superclasses.
>>> class Foo(Model):
... id = UInt8Field(1)
...
>>> class FooDistributed(Foo, DistributedModel):
... engine = Distributed('my_cluster')
...
>>> FooDistributed.engine.table
None
>>> FooDistributed.fix_engine()
>>> FooDistributed.engine.table
<class '__main__.Foo'>
However if you prefer more explicit way of doing things,
you can always mention the Foo model twice without bothering with any fixes:
>>> class FooDistributedVerbose(Foo, DistributedModel):
... engine = Distributed('my_cluster', Foo)
>>> FooDistributedVerbose.engine.table
<class '__main__.Foo'>
See tests.test_engines:DistributedTestCase for more examples
#### DistributedModel.from_tsv(line, field_names, timezone_in_use=UTC, database=None)
Create a model instance from a tab-separated line. The line may or may not include a newline.
The `field_names` list must match the fields defined in the model, but does not have to include all of them.
- `line`: the TSV-formatted data.
- `field_names`: names of the model fields in the data.
- `timezone_in_use`: the timezone to use when parsing dates and datetimes.
- `database`: if given, sets the database that this instance belongs to.
#### get_database()
Gets the `Database` that this model instance belongs to.
Returns `None` unless the instance was read from the database or written to it.
#### get_field(name)
Gets a `Field` instance given its name, or `None` if not found.
#### DistributedModel.is_read_only()
Returns true if the model is marked as read only.
#### DistributedModel.is_system_model()
Returns true if the model represents a system table.
#### DistributedModel.objects_in(database)
Returns a `QuerySet` for selecting instances of this model class.
#### set_database(db)
#### DistributedModel.table_name()
Returns the model's database table name. By default this is the
class name converted to lowercase. Override this if you want to use
a different table name.
#### to_dict(include_readonly=True, field_names=None)
Returns the instance's column values as a dict.
- `include_readonly`: if false, returns only fields that can be inserted into database.
- `field_names`: an iterable of field names to return (optional)
#### to_tsv(include_readonly=True)
Returns the instance's column values as a tab-separated line. A newline is not included.
- `include_readonly`: if false, returns only fields that can be inserted into database.
infi.clickhouse_orm.fields
--------------------------
@ -497,7 +672,7 @@ Extends Engine
Extends Engine
#### MergeTree(date_col, key_cols, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None)
#### MergeTree(date_col=None, order_by=(), sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None)
### Buffer
@ -507,7 +682,7 @@ Extends Engine
Buffers the data to write in RAM, periodically flushing it to another table.
Must be used in conjuction with a `BufferModel`.
Read more [here](https://clickhouse.yandex/reference_en.html#Buffer).
Read more [here](https://clickhouse.yandex/docs/en/table_engines/buffer/).
#### Buffer(main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000)
@ -525,25 +700,49 @@ https://clickhouse.yandex/docs/en/single/index.html#document-table_engines/merge
#### Merge(table_regex)
### Distributed
Extends Engine
The Distributed engine by itself does not store data,
but allows distributed query processing on multiple servers.
Reading is automatically parallelized.
During a read, the table indexes on remote servers are used, if there are any.
See full documentation here
https://clickhouse.yandex/docs/en/table_engines/distributed.html
#### Distributed(cluster, table=None, sharding_key=None)
:param cluster: what cluster to access data from
:param table: underlying table that actually stores data.
If you are not specifying any table here, ensure that it can be inferred
from your model's superclass (see models.DistributedModel.fix_engine_table)
:param sharding_key: how to distribute data among shards when inserting
straightly into Distributed table, optional
### CollapsingMergeTree
Extends MergeTree
#### CollapsingMergeTree(date_col, key_cols, sign_col, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None)
#### CollapsingMergeTree(date_col=None, order_by=(), sign_col="sign", sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None)
### SummingMergeTree
Extends MergeTree
#### SummingMergeTree(date_col, key_cols, summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None)
#### SummingMergeTree(date_col=None, order_by=(), summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None)
### ReplacingMergeTree
Extends MergeTree
#### ReplacingMergeTree(date_col, key_cols, ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None)
#### ReplacingMergeTree(date_col=None, order_by=(), ver_col=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None)
infi.clickhouse_orm.query
@ -605,16 +804,17 @@ Adds a DISTINCT clause to the query, meaning that any duplicate rows
in the results will be omitted.
#### exclude(**kwargs)
#### exclude(**filter_fields)
Returns a copy of this queryset that excludes all rows matching the conditions.
#### filter(**kwargs)
#### filter(*q, **filter_fields)
Returns a copy of this queryset that includes only rows matching the conditions.
Add q object to query if it specified.
#### only(*field_names)
@ -705,16 +905,17 @@ Adds a DISTINCT clause to the query, meaning that any duplicate rows
in the results will be omitted.
#### exclude(**kwargs)
#### exclude(**filter_fields)
Returns a copy of this queryset that excludes all rows matching the conditions.
#### filter(**kwargs)
#### filter(*q, **filter_fields)
Returns a copy of this queryset that includes only rows matching the conditions.
Add q object to query if it specified.
#### group_by(*args)

View File

@ -30,6 +30,10 @@ To see test coverage information run:
bin/nosetests --with-coverage --cover-package=infi.clickhouse_orm
To test with tox, ensure that the setup.py is present (otherwise run `bin/buildout buildout:develop= setup.py`) and run:
pip install tox
tox
---

View File

@ -1,6 +1,8 @@
Field Types
===========
See: [ClickHouse Documentation](https://clickhouse.yandex/docs/en/data_types/)
Currently the following field types are supported:
| Class | DB Type | Pythonic Type | Comments
@ -85,7 +87,7 @@ Working with materialized and alias fields
ClickHouse provides an opportunity to create MATERIALIZED and ALIAS Fields.
See documentation [here](https://clickhouse.yandex/reference_en.html#Default%20values).
See documentation [here](https://clickhouse.yandex/docs/en/query_language/queries/#default-values).
Both field types can't be inserted into the database directly, so they are ignored when using the `Database.insert()` method. ClickHouse does not return the field values if you use `"SELECT * FROM ..."` - you have to list these field names explicitly in the query.

View File

@ -8,7 +8,7 @@ Database instances connect to a specific ClickHouse database for running queries
Defining Models
---------------
Models are defined in a way reminiscent of Django's ORM:
Models are defined in a way reminiscent of Django's ORM, by subclassing `Model`:
from infi.clickhouse_orm import models, fields, engines
@ -21,9 +21,43 @@ Models are defined in a way reminiscent of Django's ORM:
engine = engines.MergeTree('birthday', ('first_name', 'last_name', 'birthday'))
It is possible to provide a default value for a field, instead of its "natural" default (empty string for string fields, zero for numeric fields etc.). Alternatively it is possible to pass alias or materialized parameters (see below for usage examples). Only one of `default`, `alias` and `materialized` parameters can be provided.
The columns in the database table are represented by model fields. Each field has a type, which matches the type of the corresponding database column. All the supported fields types are listed [here](field_types.md).
For more details see [Field Types](field_types.md) and [Table Engines](table_engines.md).
A model must have an `engine`, which determines how its table is stored on disk (if at all), and what capabilities it has. For more details about table engines see [here](table_engines.md).
### Default values
Each field has a "natural" default value - empty string for string fields, zero for numeric fields etc. To specify a different value use the `default` parameter:
first_name = fields.StringField(default="anonymous")
### Null values
To allow null values in a field, wrap it inside a `NullableField`:
birthday = fields.NullableField(fields.DateField())
In this case, the default value for that fields becomes `null` unless otherwide specified.
### Materialized fields
The value of a materialized field is calculated from other fields in the model. For example:
year_born = fields.Int16Field(materialized="toYear(birthday)")
Materialized fields are read-only, meaning that their values are not sent to the database when inserting records.
It is not possible to specify a default value for a materialized field.
### Alias fields
An alias field is simply a different way to call another field in the model. For example:
date_born = field.DateField(alias="birthday")
Alias fields are read-only, meaning that their values are not sent to the database when inserting records.
It is not possible to specify a default value for an alias field.
### Table Names

View File

@ -26,6 +26,12 @@ It is possible to specify several fields to filter or exclude by:
>>> qs.conditions_as_sql()
u"last_name = 'Smith' AND height > 1.75"
For filters with compound conditions you can use `Q` objects inside `filter` with overloaded operators `&` (AND), `|` (OR) and `~` (NOT):
>>> qs = Person.objects_in(database).filter((Q(first_name='Ciaran', last_name='Carver') | Q(height_lte=1.8)) & ~Q(first_name='David'))
>>> qs.conditions_as_sql()
u"((first_name = 'Ciaran' AND last_name = 'Carver') OR height <= 1.8) AND (NOT (first_name = 'David'))"
There are different operators that can be used, by passing `<fieldname>__<operator>=<value>` (two underscores separate the field name from the operator). In case no operator is given, `eq` is used by default. Below are all the supported operators.
| Operator | Equivalent SQL | Comments |
@ -36,6 +42,7 @@ There are different operators that can be used, by passing `<fieldname>__<operat
| `gte` | `field >= value` | |
| `lt` | `field < value` | |
| `lte` | `field <= value` | |
| `between` | `field BETWEEN value1 AND value2` | |
| `in` | `field IN (values)` | See below |
| `not_in` | `field NOT IN (values)` | See below |
| `contains` | `field LIKE '%value%'` | For string fields only |
@ -49,6 +56,7 @@ There are different operators that can be used, by passing `<fieldname>__<operat
### Using the `in` Operator
The `in` and `not_in` operators expect one of three types of values:
* A list or tuple of simple values
* A string, which is used verbatim as the contents of the parentheses
* Another queryset (subquery)

View File

@ -119,12 +119,12 @@ invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`.
#### Model.create_table_sql(db_name)
#### Model.create_table_sql(db)
Returns the SQL command for creating a table for this model.
#### Model.drop_table_sql(db_name)
#### Model.drop_table_sql(db)
Returns the SQL command for deleting this model's table.
@ -197,12 +197,12 @@ invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`.
#### BufferModel.create_table_sql(db_name)
#### BufferModel.create_table_sql(db)
Returns the SQL command for creating a table for this model.
#### BufferModel.drop_table_sql(db_name)
#### BufferModel.drop_table_sql(db)
Returns the SQL command for deleting this model's table.

View File

@ -1,7 +1,7 @@
System Models
=============
[Clickhouse docs](https://clickhouse.yandex/reference_en.html#System%20tables).
[Clickhouse docs](https://clickhouse.yandex/docs/en/system_tables/).
System models are read only models for implementing part of the system's functionality, and for providing access to information about how the system is working.
@ -14,7 +14,7 @@ Currently the following system models are supported:
Partitions and Parts
--------------------
[ClickHouse docs](https://clickhouse.yandex/reference_en.html#Manipulations%20with%20partitions%20and%20parts).
[ClickHouse docs](https://clickhouse.yandex/docs/en/query_language/queries/#manipulations-with-partitions-and-parts).
A partition in a table is data for a single calendar month. Table "system.parts" contains information about each part.

View File

@ -1,7 +1,7 @@
Table Engines
=============
See: [ClickHouse Documentation](https://clickhouse.yandex/reference_en.html#Table+engines)
See: [ClickHouse Documentation](https://clickhouse.yandex/docs/en/table_engines/)
Each model must have an engine instance, used when creating the table in ClickHouse.
@ -16,6 +16,7 @@ The following engines are supported by the ORM:
- ReplacingMergeTree / ReplicatedReplacingMergeTree
- Buffer
- Merge
- Distributed
Simple Engines
@ -54,6 +55,24 @@ For a `ReplacingMergeTree` you can optionally specify the version column:
engine = engines.ReplacingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), ver_col='Version')
### Custom partitioning
ClickHouse supports [custom partitioning](https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/) expressions since version 1.1.54310
You can use custom partitioning with any `MergeTree` family engine.
To set custom partitioning:
* Instead of specifying the `date_col` (first) constructor parameter, pass a tuple of field names or expressions in the `order_by` (second) constructor parameter.
* Add `partition_key` parameter. It should be a tuple of expressions, by which partitions are built.
Standard monthly partitioning by date column can be specified using the `toYYYYMM(date)` function.
Example:
engine = engines.ReplacingMergeTree(order_by=('OrderID', 'EventDate', 'BannerID'), ver_col='Version',
partition_key=('toYYYYMM(EventDate)', 'BannerID'))
### Data Replication
Any of the above engines can be converted to a replicated engine (e.g. `ReplicatedMergeTree`) by adding two parameters, `replica_table_path` and `replica_name`:
@ -91,7 +110,8 @@ Then you can insert objects into Buffer model and they will be handled by ClickH
Merge Engine
-------------
[ClickHouse docs](https://clickhouse.yandex/docs/en/single/index.html#merge)
[ClickHouse docs](https://clickhouse.yandex/docs/en/table_engines/merge/)
A `Merge` engine is only used in conjunction with a `MergeModel`.
This table does not store data itself, but allows reading from any number of other tables simultaneously. So you can't insert in it.
Engine parameter specifies re2 (similar to PCRE) regular expression, from which data is selected.

View File

@ -5,6 +5,10 @@
* [Models and Databases](models_and_databases.md#models-and-databases)
* [Defining Models](models_and_databases.md#defining-models)
* [Default values](models_and_databases.md#default-values)
* [Null values](models_and_databases.md#null-values)
* [Materialized fields](models_and_databases.md#materialized-fields)
* [Alias fields](models_and_databases.md#alias-fields)
* [Table Names](models_and_databases.md#table-names)
* [Using Models](models_and_databases.md#using-models)
* [Inserting to the Database](models_and_databases.md#inserting-to-the-database)
@ -36,6 +40,7 @@
* [Table Engines](table_engines.md#table-engines)
* [Simple Engines](table_engines.md#simple-engines)
* [Engines in the MergeTree Family](table_engines.md#engines-in-the-mergetree-family)
* [Custom partitioning](table_engines.md#custom-partitioning)
* [Data Replication](table_engines.md#data-replication)
* [Buffer Engine](table_engines.md#buffer-engine)
* [Merge Engine](table_engines.md#merge-engine)
@ -58,6 +63,7 @@
* [infi.clickhouse_orm.models](class_reference.md#infi.clickhouse_orm.models)
* [Model](class_reference.md#model)
* [BufferModel](class_reference.md#buffermodel)
* [DistributedModel](class_reference.md#distributedmodel)
* [infi.clickhouse_orm.fields](class_reference.md#infi.clickhouse_orm.fields)
* [Field](class_reference.md#field)
* [StringField](class_reference.md#stringfield)
@ -89,6 +95,7 @@
* [MergeTree](class_reference.md#mergetree)
* [Buffer](class_reference.md#buffer)
* [Merge](class_reference.md#merge)
* [Distributed](class_reference.md#distributed)
* [CollapsingMergeTree](class_reference.md#collapsingmergetree)
* [SummingMergeTree](class_reference.md#summingmergetree)
* [ReplacingMergeTree](class_reference.md#replacingmergetree)

View File

@ -132,7 +132,7 @@ if __name__ == '__main__':
print '==============='
print
module_doc([database.Database, database.DatabaseException])
module_doc([models.Model, models.BufferModel])
module_doc([models.Model, models.BufferModel, models.DistributedModel])
module_doc([fields.Field] + all_subclasses(fields.Field), False)
module_doc([engines.Engine] + all_subclasses(engines.Engine), False)
module_doc([query.QuerySet, query.AggregateQuerySet])

View File

@ -1,5 +1,6 @@
from __future__ import unicode_literals
import re
import requests
from collections import namedtuple
from .models import ModelBase
@ -24,6 +25,47 @@ class DatabaseException(Exception):
pass
class ServerError(DatabaseException):
"""
Raised when a server returns an error.
"""
def __init__(self, message):
self.code = None
processed = self.get_error_code_msg(message)
if processed:
self.code, self.message = processed
else:
# just skip custom init
# if non-standard message format
self.message = message
super(ServerError, self).__init__(message)
ERROR_PATTERN = re.compile(r'''
Code:\ (?P<code>\d+),
\ e\.displayText\(\)\ =\ (?P<type1>[^ \n]+):\ (?P<msg>.+?),
\ e.what\(\)\ =\ (?P<type2>[^ \n]+)
''', re.VERBOSE | re.DOTALL)
@classmethod
def get_error_code_msg(cls, full_error_message):
"""
Extract the code and message of the exception that clickhouse-server generated.
See the list of error codes here:
https://github.com/yandex/ClickHouse/blob/master/dbms/src/Common/ErrorCodes.cpp
"""
match = cls.ERROR_PATTERN.match(full_error_message)
if match:
# assert match.group('type1') == match.group('type2')
return int(match.group('code')), match.group('msg')
return 0, full_error_message
def __str__(self):
if self.code is not None:
return "{} ({})".format(self.message, self.code)
class Database(object):
'''
Database instances connect to a specific ClickHouse database for running queries,
@ -55,7 +97,9 @@ class Database(object):
elif autocreate:
self.db_exists = False
self.create_database()
self.server_timezone = self._get_server_timezone()
self.server_version = self._get_server_version()
# Versions 1.1.53981 and below don't have timezone function
self.server_timezone = self._get_server_timezone() if self.server_version > (1, 1, 53981) else pytz.utc
def create_database(self):
'''
@ -74,18 +118,19 @@ class Database(object):
'''
Creates a table for the given model class, if it does not exist already.
'''
# TODO check that model has an engine
if model_class.system:
if model_class.is_system_model():
raise DatabaseException("You can't create system table")
self._send(model_class.create_table_sql(self.db_name))
if getattr(model_class, 'engine') is None:
raise DatabaseException("%s class must define an engine" % model_class.__name__)
self._send(model_class.create_table_sql(self))
def drop_table(self, model_class):
'''
Drops the database table of the given model class, if it exists.
'''
if model_class.system:
if model_class.is_system_model():
raise DatabaseException("You can't drop system table")
self._send(model_class.drop_table_sql(self.db_name))
self._send(model_class.drop_table_sql(self))
def insert(self, model_instances, batch_size=1000):
'''
@ -103,11 +148,11 @@ class Database(object):
return # model_instances is empty
model_class = first_instance.__class__
if first_instance.readonly or first_instance.system:
if first_instance.is_read_only() or first_instance.is_system_model():
raise DatabaseException("You can't insert into read only and system tables")
fields_list = ','.join(
['`%s`' % name for name, _ in first_instance._writable_fields])
['`%s`' % name for name in first_instance.fields(writable=True)])
def gen():
buf = BytesIO()
@ -250,7 +295,7 @@ class Database(object):
params = self._build_params(settings)
r = requests.post(self.db_url, params=params, data=data, stream=stream)
if r.status_code != 200:
raise DatabaseException(r.text)
raise ServerError(r.text)
return r
def _build_params(self, settings):
@ -281,10 +326,19 @@ class Database(object):
try:
r = self._send('SELECT timezone()')
return pytz.timezone(r.text.strip())
except DatabaseException:
logger.exception('Cannot determine server timezone, assuming UTC')
except ServerError as e:
logger.exception('Cannot determine server timezone (%s), assuming UTC', e)
return pytz.utc
def _get_server_version(self, as_tuple=True):
try:
r = self._send('SELECT version();')
ver = r.text
except ServerError as e:
logger.exception('Cannot determine server version (%s), assuming 1.1.0', e)
ver = '1.1.0'
return tuple(int(n) for n in ver.split('.')) if as_tuple else ver
def _is_connection_readonly(self):
r = self._send("SELECT value FROM system.settings WHERE name = 'readonly'")
return r.text.strip() != '0'

View File

@ -1,89 +1,144 @@
from __future__ import unicode_literals
import logging
import six
from .utils import comma_join
logger = logging.getLogger('clickhouse_orm')
class Engine(object):
def create_table_sql(self):
def create_table_sql(self, db):
raise NotImplementedError() # pragma: no cover
class TinyLog(Engine):
def create_table_sql(self):
def create_table_sql(self, db):
return 'TinyLog'
class Log(Engine):
def create_table_sql(self):
def create_table_sql(self, db):
return 'Log'
class Memory(Engine):
def create_table_sql(self):
def create_table_sql(self, db):
return 'Memory'
class MergeTree(Engine):
def __init__(self, date_col, key_cols, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None):
assert type(key_cols) in (list, tuple), 'key_cols must be a list or tuple'
def __init__(self, date_col=None, order_by=(), sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None):
assert type(order_by) in (list, tuple), 'order_by must be a list or tuple'
assert date_col is None or isinstance(date_col, six.string_types), 'date_col must be string if present'
assert partition_key is None or type(partition_key) in (list, tuple),\
'partition_key must be tuple or list if present'
assert (replica_table_path is None) == (replica_name == None), \
'both replica_table_path and replica_name must be specified'
# These values conflict with each other (old and new syntax of table engines.
# So let's control only one of them is given.
assert date_col or partition_key, "You must set either date_col or partition_key"
self.date_col = date_col
self.key_cols = key_cols
self.partition_key = partition_key if partition_key else ('toYYYYMM(`%s`)' % date_col,)
self.order_by = order_by
self.sampling_expr = sampling_expr
self.index_granularity = index_granularity
self.replica_table_path = replica_table_path
self.replica_name = replica_name
# TODO verify that both replica fields are either present or missing
def create_table_sql(self):
# I changed field name for new reality and syntax
@property
def key_cols(self):
logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead')
return self.order_by
@key_cols.setter
def key_cols(self, value):
logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead')
self.order_by = value
def create_table_sql(self, db):
name = self.__class__.__name__
if self.replica_name:
name = 'Replicated' + name
params = self._build_sql_params()
return '%s(%s)' % (name, comma_join(params))
def _build_sql_params(self):
# In ClickHouse 1.1.54310 custom partitioning key was introduced
# https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/
# Let's check version and use new syntax if available
if db.server_version >= (1, 1, 54310):
partition_sql = "PARTITION BY %s ORDER BY %s" \
% ('(%s)' % comma_join(self.partition_key), '(%s)' % comma_join(self.order_by))
if self.sampling_expr:
partition_sql += " SAMPLE BY %s" % self.sampling_expr
partition_sql += " SETTINGS index_granularity=%d" % self.index_granularity
elif not self.date_col:
# Can't import it globally due to circular import
from infi.clickhouse_orm.database import DatabaseException
raise DatabaseException("Custom partitioning is not supported before ClickHouse 1.1.54310. "
"Please update your server or use date_col syntax."
"https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/")
else:
partition_sql = ''
params = self._build_sql_params(db)
return '%s(%s) %s' % (name, comma_join(params), partition_sql)
def _build_sql_params(self, db):
params = []
if self.replica_name:
params += ["'%s'" % self.replica_table_path, "'%s'" % self.replica_name]
# In ClickHouse 1.1.54310 custom partitioning key was introduced
# https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/
# These parameters are process in create_table_sql directly.
# In previous ClickHouse versions this this syntax does not work.
if db.server_version < (1, 1, 54310):
params.append(self.date_col)
if self.sampling_expr:
params.append(self.sampling_expr)
params.append('(%s)' % comma_join(self.key_cols))
params.append('(%s)' % comma_join(self.order_by))
params.append(str(self.index_granularity))
return params
class CollapsingMergeTree(MergeTree):
def __init__(self, date_col, key_cols, sign_col, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None):
super(CollapsingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name)
def __init__(self, date_col=None, order_by=(), sign_col='sign', sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None):
super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity,
replica_table_path, replica_name, partition_key)
self.sign_col = sign_col
def _build_sql_params(self):
params = super(CollapsingMergeTree, self)._build_sql_params()
def _build_sql_params(self, db):
params = super(CollapsingMergeTree, self)._build_sql_params(db)
params.append(self.sign_col)
return params
class SummingMergeTree(MergeTree):
def __init__(self, date_col, key_cols, summing_cols=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None):
super(SummingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name)
def __init__(self, date_col=None, order_by=(), summing_cols=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None):
super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path,
replica_name, partition_key)
assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple'
self.summing_cols = summing_cols
def _build_sql_params(self):
params = super(SummingMergeTree, self)._build_sql_params()
def _build_sql_params(self, db):
params = super(SummingMergeTree, self)._build_sql_params(db)
if self.summing_cols:
params.append('(%s)' % comma_join(self.summing_cols))
return params
@ -91,13 +146,14 @@ class SummingMergeTree(MergeTree):
class ReplacingMergeTree(MergeTree):
def __init__(self, date_col, key_cols, ver_col=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None):
super(ReplacingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name)
def __init__(self, date_col=None, order_by=(), ver_col=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None):
super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity,
replica_table_path, replica_name, partition_key)
self.ver_col = ver_col
def _build_sql_params(self):
params = super(ReplacingMergeTree, self)._build_sql_params()
def _build_sql_params(self, db):
params = super(ReplacingMergeTree, self)._build_sql_params(db)
if self.ver_col:
params.append(self.ver_col)
return params
@ -107,11 +163,12 @@ class Buffer(Engine):
"""
Buffers the data to write in RAM, periodically flushing it to another table.
Must be used in conjuction with a `BufferModel`.
Read more [here](https://clickhouse.yandex/reference_en.html#Buffer).
Read more [here](https://clickhouse.yandex/docs/en/table_engines/buffer/).
"""
#Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000):
def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000,
min_bytes=10000000, max_bytes=100000000):
self.main_model = main_model
self.num_layers = num_layers
self.min_time = min_time
@ -121,11 +178,11 @@ class Buffer(Engine):
self.min_bytes = min_bytes
self.max_bytes = max_bytes
def create_table_sql(self, db_name):
def create_table_sql(self, db):
# Overriden create_table_sql example:
#sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)'
# sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)'
sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % (
db_name, self.main_model.table_name(), self.num_layers,
db.db_name, self.main_model.table_name(), self.num_layers,
self.min_time, self.max_time, self.min_rows,
self.max_rows, self.min_bytes, self.max_bytes
)
@ -142,16 +199,58 @@ class Merge(Engine):
def __init__(self, table_regex):
assert isinstance(table_regex, six.string_types), "'table_regex' parameter must be string"
self.table_regex = table_regex
# Use current database as default
self.db_name = None
def create_table_sql(self, db):
return "Merge(`%s`, '%s')" % (db.db_name, self.table_regex)
def create_table_sql(self):
db_name = ("`%s`" % self.db_name) if self.db_name else 'currentDatabase()'
return "Merge(%s, '%s')" % (db_name, self.table_regex)
def set_db_name(self, db_name):
assert isinstance(db_name, six.string_types), "'db_name' parameter must be string"
self.db_name = db_name
class Distributed(Engine):
"""
The Distributed engine by itself does not store data,
but allows distributed query processing on multiple servers.
Reading is automatically parallelized.
During a read, the table indexes on remote servers are used, if there are any.
See full documentation here
https://clickhouse.yandex/docs/en/table_engines/distributed.html
"""
def __init__(self, cluster, table=None, sharding_key=None):
"""
:param cluster: what cluster to access data from
:param table: underlying table that actually stores data.
If you are not specifying any table here, ensure that it can be inferred
from your model's superclass (see models.DistributedModel.fix_engine_table)
:param sharding_key: how to distribute data among shards when inserting
straightly into Distributed table, optional
"""
self.cluster = cluster
self.table = table
self.sharding_key = sharding_key
@property
def table_name(self):
# TODO: circular import is bad
from .models import ModelBase
table = self.table
if isinstance(table, ModelBase):
return table.table_name()
return table
def create_table_sql(self, db):
name = self.__class__.__name__
params = self._build_sql_params(db)
return '%s(%s)' % (name, ', '.join(params))
def _build_sql_params(self, db):
if self.table_name is None:
raise ValueError("Cannot create {} engine: specify an underlying table".format(
self.__class__.__name__))
params = ["`%s`" % p for p in [self.cluster, db.db_name, self.table_name]]
if self.sharding_key:
params.append(self.sharding_key)
return params

View File

@ -39,7 +39,7 @@ class Field(object):
data can't be converted. Returns the converted value. Subclasses should override this.
The timezone_in_use parameter should be consulted when parsing datetime fields.
'''
return value
return value # pragma: no cover
def validate(self, value):
'''

View File

@ -6,6 +6,7 @@ from .engines import MergeTree
from .utils import escape
from six.moves import zip
from six import iteritems
import logging
logger = logging.getLogger('migrations')
@ -65,7 +66,7 @@ class AlterTable(Operation):
table_fields = dict(self._get_table_fields(database))
# Identify fields that were deleted from the model
deleted_fields = set(table_fields.keys()) - set(name for name, field in self.model_class._fields)
deleted_fields = set(table_fields.keys()) - set(self.model_class.fields())
for name in deleted_fields:
logger.info(' Drop column %s', name)
self._alter_table(database, 'DROP COLUMN %s' % name)
@ -73,7 +74,7 @@ class AlterTable(Operation):
# Identify fields that were added to the model
prev_name = None
for name, field in self.model_class._fields:
for name, field in iteritems(self.model_class.fields()):
if name not in table_fields:
logger.info(' Add column %s', name)
assert prev_name, 'Cannot add a column to the beginning of the table'
@ -89,7 +90,8 @@ class AlterTable(Operation):
# The order of class attributes can be changed any time, so we can't count on it
# Secondly, MATERIALIZED and ALIAS fields are always at the end of the DESC, so we can't expect them to save
# attribute position. Watch https://github.com/Infinidat/infi.clickhouse_orm/issues/47
model_fields = {name: field.get_sql(with_default_expression=False) for name, field in self.model_class._fields}
model_fields = {name: field.get_sql(with_default_expression=False)
for name, field in iteritems(self.model_class.fields())}
for field_name, field_sql in self._get_table_fields(database):
# All fields must have been created and dropped by this moment
assert field_name in model_fields, 'Model fields and table columns in disagreement'

View File

@ -1,14 +1,15 @@
from __future__ import unicode_literals
import sys
from collections import OrderedDict
from logging import getLogger
from six import with_metaclass, reraise
from six import with_metaclass, reraise, iteritems
import pytz
from .fields import Field, StringField
from .utils import parse_tsv
from .query import QuerySet
from .engines import Merge
from .engines import Merge, Distributed
logger = getLogger('clickhouse_orm')
@ -21,18 +22,28 @@ class ModelBase(type):
ad_hoc_model_cache = {}
def __new__(cls, name, bases, attrs):
new_cls = super(ModelBase, cls).__new__(cls, str(name), bases, attrs)
# Collect fields from parent classes
base_fields = []
base_fields = dict()
for base in bases:
if isinstance(base, ModelBase):
base_fields += base._fields
base_fields.update(base._fields)
fields = base_fields
# Build a list of fields, in the order they were listed in the class
fields = base_fields + [item for item in attrs.items() if isinstance(item[1], Field)]
fields.sort(key=lambda item: item[1].creation_counter)
setattr(new_cls, '_fields', fields)
setattr(new_cls, '_writable_fields', [f for f in fields if not f[1].readonly])
return new_cls
fields.update({n: f for n, f in iteritems(attrs) if isinstance(f, Field)})
fields = sorted(iteritems(fields), key=lambda item: item[1].creation_counter)
# Build a dictionary of default values
defaults = {n: f.to_python(f.default, pytz.UTC) for n, f in fields}
attrs = dict(
attrs,
_fields=OrderedDict(fields),
_writable_fields=OrderedDict([f for f in fields if not f[1].readonly]),
_defaults=defaults
)
return super(ModelBase, cls).__new__(cls, str(name), bases, attrs)
@classmethod
def create_ad_hoc_model(cls, fields, model_name='AdHocModel'):
@ -57,6 +68,10 @@ class ModelBase(type):
# Enums
if db_type.startswith('Enum'):
return orm_fields.BaseEnumField.create_ad_hoc_field(db_type)
# DateTime with timezone
if db_type.startswith('DateTime('):
# Some functions return DateTimeField with timezone in brackets
return orm_fields.DateTimeField()
# Arrays
if db_type.startswith('Array'):
inner_field = cls.create_ad_hoc_field(db_type[6 : -1])
@ -90,10 +105,12 @@ class Model(with_metaclass(ModelBase)):
engine = None
# Insert operations are restricted for read only models
readonly = False
_readonly = False
# Create table, drop table, insert operations are restricted for system models
system = False
_system = False
_database = None
def __init__(self, **kwargs):
'''
@ -103,20 +120,15 @@ class Model(with_metaclass(ModelBase)):
Unrecognized field names will cause an `AttributeError`.
'''
super(Model, self).__init__()
self._database = None
# Assign default values
self.__dict__.update(self._defaults)
# Assign field values from keyword arguments
for name, value in kwargs.items():
for name, value in iteritems(kwargs):
field = self.get_field(name)
if field:
setattr(self, name, value)
else:
raise AttributeError('%s does not have a field called %s' % (self.__class__.__name__, name))
# Assign default values for fields not included in the keyword arguments
for name, field in self._fields:
if name not in kwargs:
setattr(self, name, field.default)
def __setattr__(self, name, value):
'''
@ -155,8 +167,7 @@ class Model(with_metaclass(ModelBase)):
'''
Gets a `Field` instance given its name, or `None` if not found.
'''
field = getattr(self.__class__, name, None)
return field if isinstance(field, Field) else None
return self._fields.get(name)
@classmethod
def table_name(cls):
@ -168,32 +179,31 @@ class Model(with_metaclass(ModelBase)):
return cls.__name__.lower()
@classmethod
def create_table_sql(cls, db_name):
def create_table_sql(cls, db):
'''
Returns the SQL command for creating a table for this model.
'''
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db_name, cls.table_name())]
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())]
cols = []
for name, field in cls._fields:
for name, field in iteritems(cls.fields()):
cols.append(' %s %s' % (name, field.get_sql()))
parts.append(',\n'.join(cols))
parts.append(')')
parts.append('ENGINE = ' + cls.engine.create_table_sql())
parts.append('ENGINE = ' + cls.engine.create_table_sql(db))
return '\n'.join(parts)
@classmethod
def drop_table_sql(cls, db_name):
def drop_table_sql(cls, db):
'''
Returns the SQL command for deleting this model's table.
'''
return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db_name, cls.table_name())
return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db.db_name, cls.table_name())
@classmethod
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None):
def from_tsv(cls, line, field_names, timezone_in_use=pytz.utc, database=None):
'''
Create a model instance from a tab-separated line. The line may or may not include a newline.
The `field_names` list must match the fields defined in the model, but does not have to include all of them.
If omitted, it is assumed to be the names of all fields in the model, in order of definition.
- `line`: the TSV-formatted data.
- `field_names`: names of the model fields in the data.
@ -201,7 +211,6 @@ class Model(with_metaclass(ModelBase)):
- `database`: if given, sets the database that this instance belongs to.
'''
from six import next
field_names = field_names or [name for name, field in cls._fields]
values = iter(parse_tsv(line))
kwargs = {}
for name in field_names:
@ -221,8 +230,8 @@ class Model(with_metaclass(ModelBase)):
- `include_readonly`: if false, returns only fields that can be inserted into database.
'''
data = self.__dict__
fields = self._fields if include_readonly else self._writable_fields
return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in fields)
fields = self.fields(writable=not include_readonly)
return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in iteritems(fields))
def to_dict(self, include_readonly=True, field_names=None):
'''
@ -231,13 +240,13 @@ class Model(with_metaclass(ModelBase)):
- `include_readonly`: if false, returns only fields that can be inserted into database.
- `field_names`: an iterable of field names to return (optional)
'''
fields = self._fields if include_readonly else self._writable_fields
fields = self.fields(writable=not include_readonly)
if field_names is not None:
fields = [f for f in fields if f[0] in field_names]
fields = [f for f in fields if f in field_names]
data = self.__dict__
return {name: data[name] for name, field in fields}
return {name: data[name] for name in fields}
@classmethod
def objects_in(cls, database):
@ -246,16 +255,41 @@ class Model(with_metaclass(ModelBase)):
'''
return QuerySet(cls, database)
@classmethod
def fields(cls, writable=False):
'''
Returns an `OrderedDict` of the model's fields (from name to `Field` instance).
If `writable` is true, only writable fields are included.
Callers should not modify the dictionary.
'''
# noinspection PyProtectedMember,PyUnresolvedReferences
return cls._writable_fields if writable else cls._fields
@classmethod
def is_read_only(cls):
'''
Returns true if the model is marked as read only.
'''
return cls._readonly
@classmethod
def is_system_model(cls):
'''
Returns true if the model represents a system table.
'''
return cls._system
class BufferModel(Model):
@classmethod
def create_table_sql(cls, db_name):
def create_table_sql(cls, db):
'''
Returns the SQL command for creating a table for this model.
'''
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.engine.main_model.table_name())]
engine_str = cls.engine.create_table_sql(db_name)
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db.db_name, cls.table_name(), db.db_name,
cls.engine.main_model.table_name())]
engine_str = cls.engine.create_table_sql(db)
parts.append(engine_str)
return ' '.join(parts)
@ -271,18 +305,84 @@ class MergeModel(Model):
# Virtual fields can't be inserted into database
_table = StringField(readonly=True)
@classmethod
def create_table_sql(cls, db):
assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance"
return super(MergeModel, cls).create_table_sql(db)
# TODO: base class for models that require specific engine
class DistributedModel(Model):
"""
Model for Distributed engine
"""
def set_database(self, db):
'''
Gets the `Database` that this model instance belongs to.
Returns `None` unless the instance was read from the database or written to it.
'''
assert isinstance(self.engine, Merge), "engine must be engines.Merge instance"
res = super(MergeModel, self).set_database(db)
self.engine.set_db_name(db.db_name)
assert isinstance(self.engine, Distributed), "engine must be engines.Distributed instance"
res = super(DistributedModel, self).set_database(db)
return res
@classmethod
def create_table_sql(cls, db_name):
assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance"
cls.engine.set_db_name(db_name)
return super(MergeModel, cls).create_table_sql(db_name)
def fix_engine_table(cls):
"""
Remember: Distributed table does not store any data, just provides distributed access to it.
So if we define a model with engine that has no defined table for data storage
(see FooDistributed below), that table cannot be successfully created.
This routine can automatically fix engine's storage table by finding the first
non-distributed model among your model's superclasses.
>>> class Foo(Model):
... id = UInt8Field(1)
...
>>> class FooDistributed(Foo, DistributedModel):
... engine = Distributed('my_cluster')
...
>>> FooDistributed.engine.table
None
>>> FooDistributed.fix_engine()
>>> FooDistributed.engine.table
<class '__main__.Foo'>
However if you prefer more explicit way of doing things,
you can always mention the Foo model twice without bothering with any fixes:
>>> class FooDistributedVerbose(Foo, DistributedModel):
... engine = Distributed('my_cluster', Foo)
>>> FooDistributedVerbose.engine.table
<class '__main__.Foo'>
See tests.test_engines:DistributedTestCase for more examples
"""
# apply only when engine has no table defined
if cls.engine.table_name:
return
# find out all the superclasses of the Model that store any data
storage_models = [b for b in cls.__bases__ if issubclass(b, Model)
and not issubclass(b, DistributedModel)]
if not storage_models:
raise TypeError("When defining Distributed engine without the table_name "
"ensure that your model has a parent model")
if len(storage_models) > 1:
raise TypeError("When defining Distributed engine without the table_name "
"ensure that your model has exactly one non-distributed superclass")
# enable correct SQL for engine
cls.engine.table = storage_models[0]
@classmethod
def create_table_sql(cls, db):
assert isinstance(cls.engine, Distributed), "engine must be engines.Distributed instance"
cls.fix_engine_table()
parts = [
'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` AS `{0}`.`{2}`'.format(
db.db_name, cls.table_name(), cls.engine.table_name),
'ENGINE = ' + cls.engine.create_table_sql(db)]
return '\n'.join(parts)

View File

@ -7,7 +7,6 @@ from .utils import comma_join
# TODO
# - and/or between Q objects
# - check that field names are valid
# - operators for arrays: length, has, empty
@ -103,6 +102,29 @@ class NotOperator(Operator):
return 'NOT (%s)' % self._base_operator.to_sql(model_cls, field_name, value)
class BetweenOperator(Operator):
"""
An operator that implements BETWEEN.
Accepts list or tuple of two elements and generates sql condition:
- 'BETWEEN value[0] AND value[1]' if value[0] and value[1] are not None and not empty
Then imitations of BETWEEN, where one of two limits is missing
- '>= value[0]' if value[1] is None or empty
- '<= value[1]' if value[0] is None or empty
"""
def to_sql(self, model_cls, field_name, value):
field = getattr(model_cls, field_name)
value0 = field.to_db_string(
field.to_python(value[0], pytz.utc)) if value[0] is not None or len(str(value[0])) > 0 else None
value1 = field.to_db_string(
field.to_python(value[1], pytz.utc)) if value[1] is not None or len(str(value[1])) > 0 else None
if value0 and value1:
return '%s BETWEEN %s AND %s' % (field_name, value0, value1)
if value0 and not value1:
return ' '.join([field_name, '>=', value0])
if value1 and not value0:
return ' '.join([field_name, '<=', value1])
# Define the set of builtin operators
_operators = {}
@ -116,6 +138,7 @@ register_operator('gt', SimpleOperator('>'))
register_operator('gte', SimpleOperator('>='))
register_operator('lt', SimpleOperator('<'))
register_operator('lte', SimpleOperator('<='))
register_operator('between', BetweenOperator())
register_operator('in', InOperator())
register_operator('not_in', NotOperator(InOperator()))
register_operator('contains', LikeOperator('%{}%'))
@ -134,7 +157,11 @@ class FOV(object):
def __init__(self, field_name, operator, value):
self._field_name = field_name
self._operator = _operators[operator]
self._operator = _operators.get(operator)
if self._operator is None:
# The field name contains __ like my__field
self._field_name = field_name + '__' + operator
self._operator = _operators['eq']
self._value = value
def to_sql(self, model_cls):
@ -143,9 +170,23 @@ class FOV(object):
class Q(object):
def __init__(self, **kwargs):
self._fovs = [self._build_fov(k, v) for k, v in six.iteritems(kwargs)]
AND_MODE = 'AND'
OR_MODE = 'OR'
def __init__(self, **filter_fields):
self._fovs = [self._build_fov(k, v) for k, v in six.iteritems(filter_fields)]
self._l_child = None
self._r_child = None
self._negate = False
self._mode = self.AND_MODE
@classmethod
def _construct_from(cls, l_child, r_child, mode):
q = Q()
q._l_child = l_child
q._r_child = r_child
q._mode = mode
return q
def _build_fov(self, key, value):
if '__' in key:
@ -155,13 +196,24 @@ class Q(object):
return FOV(field_name, operator, value)
def to_sql(self, model_cls):
if not self._fovs:
if self._fovs:
sql = ' {} '.format(self._mode).join(fov.to_sql(model_cls) for fov in self._fovs)
else:
if self._l_child and self._r_child:
sql = '({}) {} ({})'.format(
self._l_child.to_sql(model_cls), self._mode, self._r_child.to_sql(model_cls))
else:
return '1'
sql = ' AND '.join(fov.to_sql(model_cls) for fov in self._fovs)
if self._negate:
sql = 'NOT (%s)' % sql
return sql
def __or__(self, other):
return Q._construct_from(self, other, self.OR_MODE)
def __and__(self, other):
return Q._construct_from(self, other, self.AND_MODE)
def __invert__(self):
q = copy(self)
q._negate = True
@ -286,20 +338,24 @@ class QuerySet(object):
qs._fields = field_names
return qs
def filter(self, **kwargs):
def filter(self, *q, **filter_fields):
"""
Returns a copy of this queryset that includes only rows matching the conditions.
Add q object to query if it specified.
"""
qs = copy(self)
qs._q = list(self._q) + [Q(**kwargs)]
if q:
qs._q = list(self._q) + list(q)
else:
qs._q = list(self._q) + [Q(**filter_fields)]
return qs
def exclude(self, **kwargs):
def exclude(self, **filter_fields):
"""
Returns a copy of this queryset that excludes all rows matching the conditions.
"""
qs = copy(self)
qs._q = list(self._q) + [~Q(**kwargs)]
qs._q = list(self._q) + [~Q(**filter_fields)]
return qs
def paginate(self, page_num=1, page_size=100):

View File

@ -1,6 +1,6 @@
"""
This file contains system readonly models that can be got from database
https://clickhouse.yandex/reference_en.html#System tables
This file contains system readonly models that can be got from the database
https://clickhouse.yandex/docs/en/system_tables/
"""
from __future__ import unicode_literals
from six import string_types
@ -15,7 +15,7 @@ class SystemPart(Model):
"""
Contains information about parts of a table in the MergeTree family.
This model operates only fields, described in the reference. Other fields are ignored.
https://clickhouse.yandex/reference_en.html#system.parts
https://clickhouse.yandex/docs/en/system_tables/system.parts/
"""
OPERATIONS = frozenset({'DETACH', 'DROP', 'ATTACH', 'FREEZE', 'FETCH'})
@ -56,7 +56,7 @@ class SystemPart(Model):
"""
Next methods return SQL for some operations, which can be done with partitions
https://clickhouse.yandex/reference_en.html#Manipulations with partitions and parts
https://clickhouse.yandex/docs/en/query_language/queries/#manipulations-with-partitions-and-parts
"""
def _partition_operation_sql(self, operation, settings=None, from_part=None):
"""
@ -68,7 +68,8 @@ class SystemPart(Model):
"""
operation = operation.upper()
assert operation in self.OPERATIONS, "operation must be in [%s]" % comma_join(self.OPERATIONS)
sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (self._database.db_name, self.table, operation, self.partition)
sql = "ALTER TABLE `%s`.`%s` %s PARTITION %s" % (self._database.db_name, self.table, operation, self.partition)
if from_part is not None:
sql += " FROM %s" % from_part
self._database.raw(sql, settings=settings, stream=False)
@ -126,7 +127,7 @@ class SystemPart(Model):
assert isinstance(conditions, string_types), "conditions must be a string"
if conditions:
conditions += " AND"
field_names = ','.join([f[0] for f in cls._fields])
field_names = ','.join(cls.fields())
return database.select("SELECT %s FROM %s WHERE %s database='%s'" %
(field_names, cls.table_name(), conditions, database.db_name), model_class=cls)

View File

@ -21,8 +21,8 @@ class TestCaseWithData(unittest.TestCase):
self.database.drop_table(Person)
self.database.drop_database()
def _insert_and_check(self, data, count):
self.database.insert(data)
def _insert_and_check(self, data, count, batch_size=1000):
self.database.insert(data, batch_size=batch_size)
self.assertEquals(count, self.database.count(Person))
for instance in data:
self.assertEquals(self.database, instance.get_database())

View File

@ -2,7 +2,7 @@
from __future__ import unicode_literals
import unittest
from infi.clickhouse_orm.database import Database, DatabaseException
from infi.clickhouse_orm.database import ServerError, DatabaseException
from .base_test_with_data import *
@ -20,6 +20,12 @@ class DatabaseTestCase(TestCaseWithData):
def test_insert__empty(self):
self._insert_and_check([], 0)
def test_insert__small_batches(self):
self._insert_and_check(self._sample_data(), len(data), batch_size=10)
def test_insert__medium_batches(self):
self._insert_and_check(self._sample_data(), len(data), batch_size=100)
def test_count(self):
self.database.insert(self._sample_data())
self.assertEquals(self.database.count(Person), 100)
@ -131,14 +137,42 @@ class DatabaseTestCase(TestCaseWithData):
self.assertEqual(results, "Whitney\tDurham\t1977-09-15\t1.72\nWhitney\tScott\t1971-07-04\t1.7\n")
def test_invalid_user(self):
with self.assertRaises(DatabaseException):
with self.assertRaises(ServerError) as cm:
Database(self.database.db_name, username='default', password='wrong')
exc = cm.exception
self.assertEqual(exc.code, 193)
self.assertEqual(exc.message, 'Wrong password for user default')
def test_nonexisting_db(self):
db = Database('db_not_here', autocreate=False)
with self.assertRaises(DatabaseException):
with self.assertRaises(ServerError) as cm:
db.create_table(Person)
exc = cm.exception
self.assertEqual(exc.code, 81)
self.assertEqual(exc.message, "Database db_not_here doesn't exist")
def test_preexisting_db(self):
db = Database(self.database.db_name, autocreate=False)
db.count(Person)
def test_missing_engine(self):
class EnginelessModel(Model):
float_field = Float32Field()
with self.assertRaises(DatabaseException) as cm:
self.database.create_table(EnginelessModel)
self.assertEqual(str(cm.exception), 'EnginelessModel class must define an engine')
def test_potentially_problematic_field_names(self):
class Model1(Model):
system = StringField()
readonly = StringField()
engine = Memory()
instance = Model1(system='s', readonly='r')
self.assertEquals(instance.to_dict(), dict(system='s', readonly='r'))
self.database.create_table(Model1)
self.database.insert([instance])
instance = Model1.objects_in(self.database)[0]
self.assertEquals(instance.to_dict(), dict(system='s', readonly='r'))

View File

@ -0,0 +1,42 @@
from __future__ import unicode_literals
import unittest
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.models import Model
from infi.clickhouse_orm.fields import *
from infi.clickhouse_orm.engines import *
class DateFieldsTest(unittest.TestCase):
def setUp(self):
self.database = Database('test-db')
self.database.create_table(ModelWithDate)
def tearDown(self):
self.database.drop_database()
def test_ad_hoc_model(self):
self.database.insert([
ModelWithDate(date_field='2016-08-30', datetime_field='2016-08-30 03:50:00'),
ModelWithDate(date_field='2016-08-31', datetime_field='2016-08-31 01:30:00')
])
# toStartOfHour returns DateTime('Asia/Yekaterinburg') in my case, so I test it here to
query = 'SELECT toStartOfHour(datetime_field) as hour_start, * from $db.modelwithdate ORDER BY date_field'
results = list(self.database.select(query))
self.assertEquals(len(results), 2)
self.assertEquals(results[0].date_field, datetime.date(2016, 8, 30))
self.assertEquals(results[0].datetime_field, datetime.datetime(2016, 8, 30, 3, 50, 0, tzinfo=pytz.UTC))
self.assertEquals(results[0].hour_start, datetime.datetime(2016, 8, 30, 3, 0, 0, tzinfo=pytz.UTC))
self.assertEquals(results[1].date_field, datetime.date(2016, 8, 31))
self.assertEquals(results[1].datetime_field, datetime.datetime(2016, 8, 31, 1, 30, 0, tzinfo=pytz.UTC))
self.assertEquals(results[1].hour_start, datetime.datetime(2016, 8, 31, 1, 0, 0, tzinfo=pytz.UTC))
class ModelWithDate(Model):
date_field = DateField()
datetime_field = DateTimeField()
engine = MergeTree('date_field', ('date_field',))

View File

@ -1,8 +1,9 @@
from __future__ import unicode_literals
import unittest
from infi.clickhouse_orm.database import Database, DatabaseException
from infi.clickhouse_orm.models import Model, MergeModel
from infi.clickhouse_orm.system_models import SystemPart
from infi.clickhouse_orm.database import Database, DatabaseException, ServerError
from infi.clickhouse_orm.models import Model, MergeModel, DistributedModel
from infi.clickhouse_orm.fields import *
from infi.clickhouse_orm.engines import *
@ -10,7 +11,7 @@ import logging
logging.getLogger("requests").setLevel(logging.WARNING)
class EnginesTestCase(unittest.TestCase):
class _EnginesHelperTestCase(unittest.TestCase):
def setUp(self):
self.database = Database('test-db')
@ -18,6 +19,8 @@ class EnginesTestCase(unittest.TestCase):
def tearDown(self):
self.database.drop_database()
class EnginesTestCase(_EnginesHelperTestCase):
def _create_and_insert(self, model_class):
self.database.create_table(model_class)
self.database.insert([
@ -31,7 +34,7 @@ class EnginesTestCase(unittest.TestCase):
def test_merge_tree_with_sampling(self):
class TestModel(SampleModel):
engine = MergeTree('date', ('date', 'event_id', 'event_group'), sampling_expr='intHash32(event_id)')
engine = MergeTree('date', ('date', 'event_id', 'event_group', 'intHash32(event_id)'), sampling_expr='intHash32(event_id)')
self._create_and_insert(TestModel)
def test_merge_tree_with_granularity(self):
@ -41,8 +44,18 @@ class EnginesTestCase(unittest.TestCase):
def test_replicated_merge_tree(self):
engine = MergeTree('date', ('date', 'event_id', 'event_group'), replica_table_path='/clickhouse/tables/{layer}-{shard}/hits', replica_name='{replica}')
# In ClickHouse 1.1.54310 custom partitioning key was introduced and new syntax is used
if self.database.server_version >= (1, 1, 54310):
expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}') PARTITION BY (toYYYYMM(`date`)) ORDER BY (date, event_id, event_group) SETTINGS index_granularity=8192"
else:
expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)"
self.assertEquals(engine.create_table_sql(), expected)
self.assertEquals(engine.create_table_sql(self.database), expected)
def test_replicated_merge_tree_incomplete(self):
with self.assertRaises(AssertionError):
MergeTree('date', ('date', 'event_id', 'event_group'), replica_table_path='/clickhouse/tables/{layer}-{shard}/hits')
with self.assertRaises(AssertionError):
MergeTree('date', ('date', 'event_id', 'event_group'), replica_name='{replica}')
def test_collapsing_merge_tree(self):
class TestModel(SampleModel):
@ -124,6 +137,34 @@ class EnginesTestCase(unittest.TestCase):
'event_uversion': 2
}, res[1].to_dict(include_readonly=True))
def test_custom_partitioning(self):
class TestModel(SampleModel):
engine = MergeTree(
order_by=('date', 'event_id', 'event_group'),
partition_key=('toYYYYMM(date)', 'event_group')
)
class TestCollapseModel(SampleModel):
sign = Int8Field()
engine = CollapsingMergeTree(
sign_col='sign',
order_by=('date', 'event_id', 'event_group'),
partition_key=('toYYYYMM(date)', 'event_group')
)
self._create_and_insert(TestModel)
self._create_and_insert(TestCollapseModel)
# Result order may be different, lets sort manually
parts = sorted(list(SystemPart.get(self.database)), key=lambda x: x.table)
self.assertEqual(2, len(parts))
self.assertEqual('testcollapsemodel', parts[0].table)
self.assertEqual('(201701, 13)', parts[0].partition)
self.assertEqual('testmodel', parts[1].table)
self.assertEqual('(201701, 13)', parts[1].partition)
class SampleModel(Model):
@ -133,3 +174,134 @@ class SampleModel(Model):
event_count = UInt16Field()
event_version = Int8Field()
event_uversion = UInt8Field(materialized='abs(event_version)')
class DistributedTestCase(_EnginesHelperTestCase):
def test_without_table_name(self):
engine = Distributed('my_cluster')
with self.assertRaises(ValueError) as cm:
engine.create_table_sql(self.database)
exc = cm.exception
self.assertEqual(str(exc), 'Cannot create Distributed engine: specify an underlying table')
def test_with_table_name(self):
engine = Distributed('my_cluster', 'foo')
sql = engine.create_table_sql(self.database)
self.assertEqual(sql, 'Distributed(`my_cluster`, `test-db`, `foo`)')
class TestModel(SampleModel):
engine = TinyLog()
def _create_distributed(self, shard_name, underlying=TestModel):
class TestDistributedModel(DistributedModel, underlying):
engine = Distributed(shard_name, underlying)
self.database.create_table(underlying)
self.database.create_table(TestDistributedModel)
return TestDistributedModel
def test_bad_cluster_name(self):
with self.assertRaises(ServerError) as cm:
d_model = self._create_distributed('cluster_name')
self.database.count(d_model)
exc = cm.exception
self.assertEqual(exc.code, 170)
self.assertEqual(exc.message, "Requested cluster 'cluster_name' not found")
def test_verbose_engine_two_superclasses(self):
class TestModel2(SampleModel):
engine = Log()
class TestDistributedModel(DistributedModel, self.TestModel, TestModel2):
engine = Distributed('test_shard_localhost', self.TestModel)
self.database.create_table(self.TestModel)
self.database.create_table(TestDistributedModel)
self.assertEqual(self.database.count(TestDistributedModel), 0)
def test_minimal_engine(self):
class TestDistributedModel(DistributedModel, self.TestModel):
engine = Distributed('test_shard_localhost')
self.database.create_table(self.TestModel)
self.database.create_table(TestDistributedModel)
self.assertEqual(self.database.count(TestDistributedModel), 0)
def test_minimal_engine_two_superclasses(self):
class TestModel2(SampleModel):
engine = Log()
class TestDistributedModel(DistributedModel, self.TestModel, TestModel2):
engine = Distributed('test_shard_localhost')
self.database.create_table(self.TestModel)
with self.assertRaises(TypeError) as cm:
self.database.create_table(TestDistributedModel)
exc = cm.exception
self.assertEqual(str(exc), 'When defining Distributed engine without the table_name ensure '
'that your model has exactly one non-distributed superclass')
def test_minimal_engine_no_superclasses(self):
class TestDistributedModel(DistributedModel):
engine = Distributed('test_shard_localhost')
self.database.create_table(self.TestModel)
with self.assertRaises(TypeError) as cm:
self.database.create_table(TestDistributedModel)
exc = cm.exception
self.assertEqual(str(exc), 'When defining Distributed engine without the table_name ensure '
'that your model has a parent model')
def _test_insert_select(self, local_to_distributed, test_model=TestModel, include_readonly=True):
d_model = self._create_distributed('test_shard_localhost', underlying=test_model)
if local_to_distributed:
to_insert, to_select = test_model, d_model
else:
to_insert, to_select = d_model, test_model
self.database.insert([
to_insert(date='2017-01-01', event_id=1, event_group=1, event_count=1, event_version=1),
to_insert(date='2017-01-02', event_id=2, event_group=2, event_count=2, event_version=2)
])
# event_uversion is materialized field. So * won't select it and it will be zero
res = self.database.select('SELECT *, event_uversion FROM $table ORDER BY event_id',
model_class=to_select)
res = [row for row in res]
self.assertEqual(2, len(res))
self.assertDictEqual({
'date': datetime.date(2017, 1, 1),
'event_id': 1,
'event_group': 1,
'event_count': 1,
'event_version': 1,
'event_uversion': 1
}, res[0].to_dict(include_readonly=include_readonly))
self.assertDictEqual({
'date': datetime.date(2017, 1, 2),
'event_id': 2,
'event_group': 2,
'event_count': 2,
'event_version': 2,
'event_uversion': 2
}, res[1].to_dict(include_readonly=include_readonly))
@unittest.skip("Bad support of materialized fields in Distributed tables "
"https://groups.google.com/forum/#!topic/clickhouse/XEYRRwZrsSc")
def test_insert_distributed_select_local(self):
return self._test_insert_select(local_to_distributed=False)
def test_insert_local_select_distributed(self):
return self._test_insert_select(local_to_distributed=True)
def _test_insert_distributed_select_local_no_materialized_fields(self):
class TestModel2(self.TestModel):
event_uversion = UInt8Field(readonly=True)
return self._test_insert_select(local_to_distributed=False, test_model=TestModel2, include_readonly=False)

View File

@ -3,6 +3,7 @@ import unittest
import datetime
import pytz
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.models import Model
from infi.clickhouse_orm.fields import *
from infi.clickhouse_orm.engines import *
@ -11,7 +12,7 @@ from infi.clickhouse_orm.engines import *
class InheritanceTestCase(unittest.TestCase):
def assertFieldNames(self, model_class, names):
self.assertEquals(names, [name for name, field in model_class._fields])
self.assertEquals(names, list(model_class.fields()))
def test_field_inheritance(self):
self.assertFieldNames(ParentModel, ['date_field', 'int_field'])
@ -19,9 +20,10 @@ class InheritanceTestCase(unittest.TestCase):
self.assertFieldNames(Model2, ['date_field', 'int_field', 'float_field'])
def test_create_table_sql(self):
sql1 = ParentModel.create_table_sql('default')
sql2 = Model1.create_table_sql('default')
sql3 = Model2.create_table_sql('default')
default_db = Database('default')
sql1 = ParentModel.create_table_sql(default_db)
sql2 = Model1.create_table_sql(default_db)
sql3 = Model2.create_table_sql(default_db)
self.assertNotEqual(sql1, sql2)
self.assertNotEqual(sql1, sql3)
self.assertNotEqual(sql2, sql3)

View File

@ -3,6 +3,7 @@ from __future__ import unicode_literals, print_function
import unittest
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.query import Q
from .base_test_with_data import *
import logging
from datetime import date, datetime
@ -59,6 +60,15 @@ class QuerySetTestCase(TestCaseWithData):
self._test_qs(qs.filter(first_name__iendswith='ia'), 3) # case insensitive
self._test_qs(qs.filter(first_name__iendswith=''), 100) # empty suffix
def test_filter_with_q_objects(self):
qs = Person.objects_in(self.database)
self._test_qs(qs.filter(Q(first_name='Ciaran')), 2)
self._test_qs(qs.filter(Q(first_name='Ciaran') | Q(first_name='Chelsea')), 3)
self._test_qs(qs.filter(Q(first_name__in=['Warren', 'Whilemina', 'Whitney']) & Q(height__gte=1.7)), 3)
self._test_qs(qs.filter((Q(first_name__in=['Warren', 'Whilemina', 'Whitney']) & Q(height__gte=1.7) |
(Q(first_name__in=['Victoria', 'Victor', 'Venus']) & Q(height__lt=1.7)))), 4)
self._test_qs(qs.filter(Q(first_name='Elton') & ~Q(last_name='Smith')), 1)
def test_filter_unicode_string(self):
self.database.insert([
Person(first_name=u'דונלד', last_name=u'דאק')
@ -324,6 +334,20 @@ class AggregateTestCase(TestCaseWithData):
print(qs.as_sql())
self.assertEquals(qs.count(), 1)
def test_double_underscore_field(self):
class Mdl(Model):
the__number = Int32Field()
the__next__number = Int32Field()
engine = Memory()
qs = Mdl.objects_in(self.database).filter(the__number=1)
self.assertEquals(qs.conditions_as_sql(), 'the__number = 1')
qs = Mdl.objects_in(self.database).filter(the__number__gt=1)
self.assertEquals(qs.conditions_as_sql(), 'the__number > 1')
qs = Mdl.objects_in(self.database).filter(the__next__number=1)
self.assertEquals(qs.conditions_as_sql(), 'the__next__number = 1')
qs = Mdl.objects_in(self.database).filter(the__next__number__gt=1)
self.assertEquals(qs.conditions_as_sql(), 'the__next__number > 1')
Color = Enum('Color', u'red blue green yellow brown white black')
@ -341,3 +365,5 @@ class SampleModel(Model):
class Numbers(Model):
number = UInt64Field()

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from infi.clickhouse_orm.database import DatabaseException
from infi.clickhouse_orm.database import DatabaseException, ServerError
from .base_test_with_data import *
@ -12,22 +12,34 @@ class ReadonlyTestCase(TestCaseWithData):
orig_database = self.database
try:
self.database = Database(orig_database.db_name, username=username, readonly=True)
with self.assertRaises(DatabaseException):
with self.assertRaises(ServerError) as cm:
self._insert_and_check(self._sample_data(), len(data))
self._check_db_readonly_err(cm.exception)
self.assertEquals(self.database.count(Person), 100)
list(self.database.select('SELECT * from $table', Person))
with self.assertRaises(DatabaseException):
with self.assertRaises(ServerError) as cm:
self.database.drop_table(Person)
with self.assertRaises(DatabaseException):
self._check_db_readonly_err(cm.exception, drop_table=True)
with self.assertRaises(ServerError) as cm:
self.database.drop_database()
except DatabaseException as e:
if 'Unknown user' in six.text_type(e):
self._check_db_readonly_err(cm.exception, drop_table=True)
except ServerError as e:
if e.code == 192 and e.message.startswith('Unknown user'):
raise unittest.SkipTest('Database user "%s" is not defined' % username)
else:
raise
finally:
self.database = orig_database
def _check_db_readonly_err(self, exc, drop_table=None):
self.assertEqual(exc.code, 164)
if drop_table:
self.assertEqual(exc.message, 'Cannot drop table in readonly mode')
else:
self.assertEqual(exc.message, 'Cannot insert into table in readonly mode')
def test_readonly_db_with_default_user(self):
self._test_readonly_db('default')
@ -36,6 +48,7 @@ class ReadonlyTestCase(TestCaseWithData):
def test_insert_readonly(self):
m = ReadOnlyModel(name='readonly')
self.database.create_table(ReadOnlyModel)
with self.assertRaises(DatabaseException):
self.database.insert([m])
@ -47,7 +60,7 @@ class ReadonlyTestCase(TestCaseWithData):
class ReadOnlyModel(Model):
readonly = True
_readonly = True
name = StringField()
date = DateField()

View File

@ -1,7 +1,10 @@
from __future__ import unicode_literals
import unittest
from datetime import date
import os
from infi.clickhouse_orm.database import Database, DatabaseException
from infi.clickhouse_orm.engines import *
from infi.clickhouse_orm.fields import *
@ -37,7 +40,9 @@ class SystemPartTest(unittest.TestCase):
def setUp(self):
self.database = Database('test-db')
self.database.create_table(TestTable)
self.database.create_table(CustomPartitionedTable)
self.database.insert([TestTable(date_field=date.today())])
self.database.insert([CustomPartitionedTable(date_field=date.today(), group_field=13)])
def tearDown(self):
self.database.drop_database()
@ -51,40 +56,46 @@ class SystemPartTest(unittest.TestCase):
def test_get_all(self):
parts = SystemPart.get(self.database)
self.assertEqual(len(list(parts)), 1)
self.assertEqual(len(list(parts)), 2)
def test_get_active(self):
parts = list(SystemPart.get_active(self.database))
self.assertEqual(len(parts), 1)
self.assertEqual(len(parts), 2)
parts[0].detach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
self.assertEqual(len(list(SystemPart.get_active(self.database))), 1)
def test_get_conditions(self):
parts = list(SystemPart.get(self.database, conditions="table='testtable'"))
self.assertEqual(len(parts), 1)
parts = list(SystemPart.get(self.database, conditions=u"table='othertable'"))
parts = list(SystemPart.get(self.database, conditions=u"table='custompartitionedtable'"))
self.assertEqual(len(parts), 1)
parts = list(SystemPart.get(self.database, conditions=u"table='invalidtable'"))
self.assertEqual(len(parts), 0)
def test_attach_detach(self):
parts = list(SystemPart.get_active(self.database))
self.assertEqual(len(parts), 1)
parts[0].detach()
self.assertEqual(len(parts), 2)
for p in parts:
p.detach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
parts[0].attach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 1)
for p in parts:
p.attach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 2)
def test_drop(self):
parts = list(SystemPart.get_active(self.database))
parts[0].drop()
for p in parts:
p.drop()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
def test_freeze(self):
parts = list(SystemPart.get(self.database))
# There can be other backups in the folder
prev_backups = set(self._get_backups())
parts[0].freeze()
for p in parts:
p.freeze()
backups = set(self._get_backups())
self.assertEqual(len(backups), len(prev_backups) + 1)
self.assertEqual(len(backups), len(prev_backups) + 2)
def test_fetch(self):
# TODO Not tested, as I have no replication set
@ -97,5 +108,12 @@ class TestTable(Model):
engine = MergeTree('date_field', ('date_field',))
class CustomPartitionedTable(Model):
date_field = DateField()
group_field = UInt32Field()
engine = MergeTree(order_by=('date_field', 'group_field'), partition_key=('toYYYYMM(date_field)', 'group_field'))
class SystemTestModel(Model):
system = True
_system = True

12
tox.ini Normal file
View File

@ -0,0 +1,12 @@
[tox]
envlist = py27, py35, pypy
[testenv]
deps =
nose
flake8
commands =
{envpython} -m compileall -q src/ tests/
# {envbindir}/flake8 src/ tests/ --max-line-length=120
nosetests -v {posargs}