Support for data skipping indexes

This commit is contained in:
Itai Shirav 2020-06-06 20:56:32 +03:00
parent 22cd908a49
commit 635197de38
13 changed files with 446 additions and 135 deletions

View File

@ -4,6 +4,7 @@ Change Log
Unreleased Unreleased
---------- ----------
- Support for model constraints - Support for model constraints
- Support for data skipping indexes
v2.0.1 v2.0.1
------ ------

View File

@ -731,16 +731,84 @@ Defines a model constraint.
#### Constraint(expr) #### Constraint(expr)
Initializer. Requires an expression that ClickHouse will verify when inserting data. Initializer. Expects an expression that ClickHouse will verify when inserting data.
#### create_table_sql() #### create_table_sql()
Returns the SQL statement for defining this constraint on table creation. Returns the SQL statement for defining this constraint during table creation.
#### str() ### Index
Defines a data-skipping index.
#### Index(expr, type, granularity)
Initializer.
- `expr` - a column, expression, or tuple of columns and expressions to index.
- `type` - the index type. Use one of the following methods to specify the type:
`Index.minmax`, `Index.set`, `Index.ngrambf_v1`, `Index.tokenbf_v1` or `Index.bloom_filter`.
- `granularity` - index block size (number of multiples of the `index_granularity` defined by the engine).
#### bloom_filter()
An index that stores a Bloom filter containing values of the index expression.
- `false_positive` - the probability (between 0 and 1) of receiving a false positive
response from the filter
#### create_table_sql()
Returns the SQL statement for defining this index during table creation.
#### minmax()
An index that stores extremes of the specified expression (if the expression is tuple, then it stores
extremes for each element of tuple). The stored info is used for skipping blocks of data like the primary key.
#### ngrambf_v1(size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)
An index that stores a Bloom filter containing all ngrams from a block of data.
Works only with strings. Can be used for optimization of equals, like and in expressions.
- `n` — ngram size
- `size_of_bloom_filter_in_bytes` — Bloom filter size in bytes (you can use large values here,
for example 256 or 512, because it can be compressed well).
- `number_of_hash_functions` — The number of hash functions used in the Bloom filter.
- `random_seed` — The seed for Bloom filter hash functions.
#### set()
An index that stores unique values of the specified expression (no more than max_rows rows,
or unlimited if max_rows=0). Uses the values to check if the WHERE expression is not satisfiable
on a block of data.
#### tokenbf_v1(number_of_hash_functions, random_seed)
An index that stores a Bloom filter containing string tokens. Tokens are sequences
separated by non-alphanumeric characters.
- `size_of_bloom_filter_in_bytes` — Bloom filter size in bytes (you can use large values here,
for example 256 or 512, because it can be compressed well).
- `number_of_hash_functions` — The number of hash functions used in the Bloom filter.
- `random_seed` — The seed for Bloom filter hash functions.
infi.clickhouse_orm.fields infi.clickhouse_orm.fields

View File

@ -9,7 +9,7 @@ Defining Models
--------------- ---------------
Models are defined in a way reminiscent of Django's ORM, by subclassing `Model`: Models are defined in a way reminiscent of Django's ORM, by subclassing `Model`:
```python
from infi.clickhouse_orm import Model, StringField, DateField, Float32Field, MergeTree from infi.clickhouse_orm import Model, StringField, DateField, Float32Field, MergeTree
class Person(Model): class Person(Model):
@ -20,6 +20,7 @@ Models are defined in a way reminiscent of Django's ORM, by subclassing `Model`:
height = Float32Field() height = Float32Field()
engine = MergeTree('birthday', ('first_name', 'last_name', 'birthday')) engine = MergeTree('birthday', ('first_name', 'last_name', 'birthday'))
```
The columns in the database table are represented by model fields. Each field has a type, which matches the type of the corresponding database column. All the supported fields types are listed [here](field_types.md). The columns in the database table are represented by model fields. Each field has a type, which matches the type of the corresponding database column. All the supported fields types are listed [here](field_types.md).
@ -66,7 +67,7 @@ For additional details see [here](field_options.md).
### Table Names ### Table Names
The table name used for the model is its class name, converted to lowercase. To override the default name, implement the `table_name` method: The table name used for the model is its class name, converted to lowercase. To override the default name, implement the `table_name` method:
```python
class Person(Model): class Person(Model):
... ...
@ -74,24 +75,37 @@ The table name used for the model is its class name, converted to lowercase. To
@classmethod @classmethod
def table_name(cls): def table_name(cls):
return 'people' return 'people'
```
### Model Constraints ### Model Constraints
It is possible to define constraints which ClickHouse verifies when data is inserted. Trying to insert invalid records will raise a `ServerError`. Each constraint has a name and an expression to validate. For example: It is possible to define constraints which ClickHouse verifies when data is inserted. Trying to insert invalid records will raise a `ServerError`. Each constraint has a name and an expression to validate. For example:
```python
from infi.clickhouse_orm import Model, Constraint, F, StringField, DateField, Float32Field, MergeTree
class Person(Model): class Person(Model):
first_name = StringField() ...
last_name = StringField()
birthday = DateField()
height = Float32Field()
# Ensure that the birthday is not a future date # Ensure that the birthday is not a future date
birthday_is_in_the_past = Constraint(birthday <= F.today()) birthday_is_in_the_past = Constraint(birthday <= F.today())
```
engine = MergeTree('birthday', ('first_name', 'last_name', 'birthday')) ### Data Skipping Indexes
Models that use an engine from the `MergeTree` family can define additional indexes over one or more columns or expressions. These indexes are used in SELECT queries for reducing the amount of data to read from the disk by skipping big blocks of data that do not satisfy the query's conditions.
For example:
```python
class Person(Model):
...
# A minmax index that can help find people taller or shorter than some height
height_index = Index(height, type=Index.minmax(), granularity=2)
# A trigram index that can help find substrings inside people names
names_index = Index((F.lower(first_name), F.lower(last_name)),
type=Index.ngrambf_v1(3, 256, 2, 0), granularity=1)
```
Using Models Using Models

View File

@ -11,6 +11,7 @@
* [Alias fields](models_and_databases.md#alias-fields) * [Alias fields](models_and_databases.md#alias-fields)
* [Table Names](models_and_databases.md#table-names) * [Table Names](models_and_databases.md#table-names)
* [Model Constraints](models_and_databases.md#model-constraints) * [Model Constraints](models_and_databases.md#model-constraints)
* [Data Skipping Indexes](models_and_databases.md#data-skipping-indexes)
* [Using Models](models_and_databases.md#using-models) * [Using Models](models_and_databases.md#using-models)
* [Inserting to the Database](models_and_databases.md#inserting-to-the-database) * [Inserting to the Database](models_and_databases.md#inserting-to-the-database)
* [Reading from the Database](models_and_databases.md#reading-from-the-database) * [Reading from the Database](models_and_databases.md#reading-from-the-database)
@ -85,6 +86,7 @@
* [MergeModel](class_reference.md#mergemodel) * [MergeModel](class_reference.md#mergemodel)
* [DistributedModel](class_reference.md#distributedmodel) * [DistributedModel](class_reference.md#distributedmodel)
* [Constraint](class_reference.md#constraint) * [Constraint](class_reference.md#constraint)
* [Index](class_reference.md#index)
* [infi.clickhouse_orm.fields](class_reference.md#inficlickhouse_ormfields) * [infi.clickhouse_orm.fields](class_reference.md#inficlickhouse_ormfields)
* [ArrayField](class_reference.md#arrayfield) * [ArrayField](class_reference.md#arrayfield)
* [BaseEnumField](class_reference.md#baseenumfield) * [BaseEnumField](class_reference.md#baseenumfield)

View File

@ -132,7 +132,7 @@ if __name__ == '__main__':
print('===============') print('===============')
print() print()
module_doc([database.Database, database.DatabaseException]) module_doc([database.Database, database.DatabaseException])
module_doc([models.Model, models.BufferModel, models.MergeModel, models.DistributedModel, models.Constraint]) module_doc([models.Model, models.BufferModel, models.MergeModel, models.DistributedModel, models.Constraint, models.Index])
module_doc(sorted([fields.Field] + all_subclasses(fields.Field), key=lambda x: x.__name__), False) module_doc(sorted([fields.Field] + all_subclasses(fields.Field), key=lambda x: x.__name__), False)
module_doc([engines.Engine] + all_subclasses(engines.Engine), False) module_doc([engines.Engine] + all_subclasses(engines.Engine), False)
module_doc([query.QuerySet, query.AggregateQuerySet, query.Q]) module_doc([query.QuerySet, query.AggregateQuerySet, query.Q])

View File

@ -1,9 +1,8 @@
from datetime import date, datetime, tzinfo, timedelta
from functools import wraps from functools import wraps
from inspect import signature, Parameter from inspect import signature, Parameter
from types import FunctionType from types import FunctionType
from .utils import is_iterable, comma_join, NO_VALUE from .utils import is_iterable, comma_join, NO_VALUE, arg_to_sql
from .query import Cond, QuerySet from .query import Cond, QuerySet
@ -263,43 +262,9 @@ class F(Cond, FunctionOperatorsMixin, metaclass=FMeta):
else: else:
prefix = self.name prefix = self.name
sep = ', ' sep = ', '
arg_strs = (F._arg_to_sql(arg) for arg in self.args if arg != NO_VALUE) arg_strs = (arg_to_sql(arg) for arg in self.args if arg != NO_VALUE)
return prefix + '(' + sep.join(arg_strs) + ')' return prefix + '(' + sep.join(arg_strs) + ')'
@staticmethod
def _arg_to_sql(arg):
"""
Converts a function argument to SQL string according to its type.
Supports functions, model fields, strings, dates, datetimes, timedeltas, booleans,
None, numbers, timezones, arrays/iterables.
"""
from .fields import Field, StringField, DateTimeField, DateField
if isinstance(arg, F):
return arg.to_sql()
if isinstance(arg, Field):
return "`%s`" % arg
if isinstance(arg, str):
return StringField().to_db_string(arg)
if isinstance(arg, datetime):
return "toDateTime(%s)" % DateTimeField().to_db_string(arg)
if isinstance(arg, date):
return "toDate('%s')" % arg.isoformat()
if isinstance(arg, timedelta):
return "toIntervalSecond(%d)" % int(arg.total_seconds())
if isinstance(arg, bool):
return str(int(arg))
if isinstance(arg, tzinfo):
return StringField().to_db_string(arg.tzname(None))
if arg is None:
return 'NULL'
if isinstance(arg, QuerySet):
return "(%s)" % arg
if isinstance(arg, tuple):
return '(' + comma_join(F._arg_to_sql(x) for x in arg) + ')'
if is_iterable(arg):
return '[' + comma_join(F._arg_to_sql(x) for x in arg) + ']'
return str(arg)
# Arithmetic functions # Arithmetic functions
@staticmethod @staticmethod

View File

@ -26,12 +26,13 @@ class ModelOperation(Operation):
Initializer. Initializer.
''' '''
self.model_class = model_class self.model_class = model_class
self.table_name = model_class.table_name()
def _alter_table(self, database, cmd): def _alter_table(self, database, cmd):
''' '''
Utility for running ALTER TABLE commands. Utility for running ALTER TABLE commands.
''' '''
cmd = "ALTER TABLE $db.`%s` %s" % (self.model_class.table_name(), cmd) cmd = "ALTER TABLE $db.`%s` %s" % (self.table_name, cmd)
logger.debug(cmd) logger.debug(cmd)
database.raw(cmd) database.raw(cmd)
@ -42,7 +43,7 @@ class CreateTable(ModelOperation):
''' '''
def apply(self, database): def apply(self, database):
logger.info(' Create table %s', self.model_class.table_name()) logger.info(' Create table %s', self.table_name)
if issubclass(self.model_class, BufferModel): if issubclass(self.model_class, BufferModel):
database.create_table(self.model_class.engine.main_model) database.create_table(self.model_class.engine.main_model)
database.create_table(self.model_class) database.create_table(self.model_class)
@ -59,11 +60,11 @@ class AlterTable(ModelOperation):
''' '''
def _get_table_fields(self, database): def _get_table_fields(self, database):
query = "DESC `%s`.`%s`" % (database.db_name, self.model_class.table_name()) query = "DESC `%s`.`%s`" % (database.db_name, self.table_name)
return [(row.name, row.type) for row in database.select(query)] return [(row.name, row.type) for row in database.select(query)]
def apply(self, database): def apply(self, database):
logger.info(' Alter table %s', self.model_class.table_name()) logger.info(' Alter table %s', self.table_name)
# Note that MATERIALIZED and ALIAS fields are always at the end of the DESC, # Note that MATERIALIZED and ALIAS fields are always at the end of the DESC,
# ADD COLUMN ... AFTER doesn't affect it # ADD COLUMN ... AFTER doesn't affect it
@ -131,7 +132,7 @@ class DropTable(ModelOperation):
''' '''
def apply(self, database): def apply(self, database):
logger.info(' Drop table %s', self.model_class.table_name()) logger.info(' Drop table %s', self.table_name)
database.drop_table(self.model_class) database.drop_table(self.model_class)
@ -144,7 +145,7 @@ class AlterConstraints(ModelOperation):
''' '''
def apply(self, database): def apply(self, database):
logger.info(' Alter constraints for %s', self.model_class.table_name()) logger.info(' Alter constraints for %s', self.table_name)
existing = self._get_constraint_names(database) existing = self._get_constraint_names(database)
# Go over constraints in the model # Go over constraints in the model
for constraint in self.model_class._constraints.values(): for constraint in self.model_class._constraints.values():
@ -164,8 +165,56 @@ class AlterConstraints(ModelOperation):
Returns a set containing the names of existing constraints in the table. Returns a set containing the names of existing constraints in the table.
''' '''
import re import re
create_table_sql = database.raw('SHOW CREATE TABLE $db.`%s`' % self.model_class.table_name()) table_def = database.raw('SHOW CREATE TABLE $db.`%s`' % self.table_name)
matches = re.findall(r'\sCONSTRAINT\s+`?(.+?)`?\s+CHECK\s', create_table_sql, flags=re.IGNORECASE) matches = re.findall(r'\sCONSTRAINT\s+`?(.+?)`?\s+CHECK\s', table_def)
return set(matches)
class AlterIndexes(ModelOperation):
'''
A migration operation that adds new indexes from the model to the database
table, and drops obsolete ones. Indexes are identified by their names, so
a change in an existing index will not be detected unless its name was changed too.
'''
def __init__(self, model_class, reindex=False):
'''
Initializer.
By default ClickHouse does not build indexes over existing data, only for
new data. Passing `reindex=True` will run `OPTIMIZE TABLE` in order to build
the indexes over the existing data.
'''
super().__init__(model_class)
self.reindex = reindex
def apply(self, database):
logger.info(' Alter indexes for %s', self.table_name)
existing = self._get_index_names(database)
logger.info(existing)
# Go over indexes in the model
for index in self.model_class._indexes.values():
# Check if it's a new index
if index.name not in existing:
logger.info(' Add index %s', index.name)
self._alter_table(database, 'ADD %s' % index.create_table_sql())
else:
existing.remove(index.name)
# Remaining indexes in `existing` are obsolete
for name in existing:
logger.info(' Drop index %s', name)
self._alter_table(database, 'DROP INDEX `%s`' % name)
# Reindex
if self.reindex:
logger.info(' Build indexes on table')
self.database.raw('OPTIMIZE TABLE $db.`%s` FINAL' % self.table_name)
def _get_index_names(self, database):
'''
Returns a set containing the names of existing indexes in the table.
'''
import re
table_def = database.raw('SHOW CREATE TABLE $db.`%s`' % self.table_name)
matches = re.findall(r'\sINDEX\s+`?(.+?)`?\s+', table_def)
return set(matches) return set(matches)

View File

@ -7,7 +7,7 @@ from logging import getLogger
import pytz import pytz
from .fields import Field, StringField from .fields import Field, StringField
from .utils import parse_tsv, NO_VALUE, get_subclass_names from .utils import parse_tsv, NO_VALUE, get_subclass_names, arg_to_sql
from .query import QuerySet from .query import QuerySet
from .funcs import F from .funcs import F
from .engines import Merge, Distributed from .engines import Merge, Distributed
@ -16,7 +16,7 @@ logger = getLogger('clickhouse_orm')
class Constraint(): class Constraint:
''' '''
Defines a model constraint. Defines a model constraint.
''' '''
@ -34,10 +34,89 @@ class Constraint():
''' '''
Returns the SQL statement for defining this constraint during table creation. Returns the SQL statement for defining this constraint during table creation.
''' '''
return 'CONSTRAINT `%s` CHECK %s' % (self.name, self.expr) return 'CONSTRAINT `%s` CHECK %s' % (self.name, arg_to_sql(self.expr))
def str(self):
return self.create_table_sql() class Index:
'''
Defines a data-skipping index.
'''
name = None # this is set by the parent model
parent = None # this is set by the parent model
def __init__(self, expr, type, granularity):
'''
Initializer.
- `expr` - a column, expression, or tuple of columns and expressions to index.
- `type` - the index type. Use one of the following methods to specify the type:
`Index.minmax`, `Index.set`, `Index.ngrambf_v1`, `Index.tokenbf_v1` or `Index.bloom_filter`.
- `granularity` - index block size (number of multiples of the `index_granularity` defined by the engine).
'''
self.expr = expr
self.type = type
self.granularity = granularity
def create_table_sql(self):
'''
Returns the SQL statement for defining this index during table creation.
'''
return 'INDEX `%s` %s TYPE %s GRANULARITY %d' % (self.name, arg_to_sql(self.expr), self.type, self.granularity)
@staticmethod
def minmax():
'''
An index that stores extremes of the specified expression (if the expression is tuple, then it stores
extremes for each element of tuple). The stored info is used for skipping blocks of data like the primary key.
'''
return 'minmax'
@staticmethod
def set(max_rows):
'''
An index that stores unique values of the specified expression (no more than max_rows rows,
or unlimited if max_rows=0). Uses the values to check if the WHERE expression is not satisfiable
on a block of data.
'''
return 'set(%d)' % max_rows
@staticmethod
def ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed):
'''
An index that stores a Bloom filter containing all ngrams from a block of data.
Works only with strings. Can be used for optimization of equals, like and in expressions.
- `n` ngram size
- `size_of_bloom_filter_in_bytes` Bloom filter size in bytes (you can use large values here,
for example 256 or 512, because it can be compressed well).
- `number_of_hash_functions` The number of hash functions used in the Bloom filter.
- `random_seed` The seed for Bloom filter hash functions.
'''
return 'ngrambf_v1(%d, %d, %d, %d)' % (n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)
@staticmethod
def tokenbf_v1(size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed):
'''
An index that stores a Bloom filter containing string tokens. Tokens are sequences
separated by non-alphanumeric characters.
- `size_of_bloom_filter_in_bytes` Bloom filter size in bytes (you can use large values here,
for example 256 or 512, because it can be compressed well).
- `number_of_hash_functions` The number of hash functions used in the Bloom filter.
- `random_seed` The seed for Bloom filter hash functions.
'''
return 'tokenbf_v1(%d, %d, %d)' % (size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)
@staticmethod
def bloom_filter(false_positive=0.025):
'''
An index that stores a Bloom filter containing values of the index expression.
- `false_positive` - the probability (between 0 and 1) of receiving a false positive
response from the filter
'''
return 'bloom_filter(%f)' % false_positive
class ModelBase(type): class ModelBase(type):
@ -48,21 +127,29 @@ class ModelBase(type):
ad_hoc_model_cache = {} ad_hoc_model_cache = {}
def __new__(cls, name, bases, attrs): def __new__(cls, name, bases, attrs):
# Collect fields and constraints from parent classes
fields = dict() # Collect fields, constraints and indexes from parent classes
constraints = dict() fields = {}
constraints = {}
indexes = {}
for base in bases: for base in bases:
if isinstance(base, ModelBase): if isinstance(base, ModelBase):
fields.update(base._fields) fields.update(base._fields)
constraints.update(base._constraints) constraints.update(base._constraints)
indexes.update(base._indexes)
# Build a list of (name, field) tuples, in the order they were listed in the class # Add fields, constraints and indexes from this class
fields.update({n: f for n, f in attrs.items() if isinstance(f, Field)}) for n, obj in attrs.items():
if isinstance(obj, Field):
fields[n] = obj
elif isinstance(obj, Constraint):
constraints[n] = obj
elif isinstance(obj, Index):
indexes[n] = obj
# Convert fields to a list of (name, field) tuples, in the order they were listed in the class
fields = sorted(fields.items(), key=lambda item: item[1].creation_counter) fields = sorted(fields.items(), key=lambda item: item[1].creation_counter)
# Build a list of constraints
constraints.update({n: c for n, c in attrs.items() if isinstance(c, Constraint)})
# Build a dictionary of default values # Build a dictionary of default values
defaults = {} defaults = {}
has_funcs_as_defaults = False has_funcs_as_defaults = False
@ -75,18 +162,20 @@ class ModelBase(type):
else: else:
defaults[n] = f.to_python(f.default, pytz.UTC) defaults[n] = f.to_python(f.default, pytz.UTC)
# Create the model class
attrs = dict( attrs = dict(
attrs, attrs,
_fields=OrderedDict(fields), _fields=OrderedDict(fields),
_constraints=constraints, _constraints=constraints,
_indexes=indexes,
_writable_fields=OrderedDict([f for f in fields if not f[1].readonly]), _writable_fields=OrderedDict([f for f in fields if not f[1].readonly]),
_defaults=defaults, _defaults=defaults,
_has_funcs_as_defaults=has_funcs_as_defaults _has_funcs_as_defaults=has_funcs_as_defaults
) )
model = super(ModelBase, cls).__new__(cls, str(name), bases, attrs) model = super(ModelBase, cls).__new__(cls, str(name), bases, attrs)
# Let each field and constraint know its parent and its own name # Let each field, constraint and index know its parent and its own name
for n, obj in chain(fields, constraints.items()): for n, obj in chain(fields, constraints.items(), indexes.items()):
setattr(obj, 'parent', model) setattr(obj, 'parent', model)
setattr(obj, 'name', n) setattr(obj, 'name', n)
@ -255,24 +344,22 @@ class Model(metaclass=ModelBase):
Returns the SQL statement for creating a table for this model. Returns the SQL statement for creating a table for this model.
''' '''
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())] parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())]
cols = [] # Fields
items = []
for name, field in cls.fields().items(): for name, field in cls.fields().items():
cols.append(' %s %s' % (name, field.get_sql(db=db))) items.append(' %s %s' % (name, field.get_sql(db=db)))
parts.append(',\n'.join(cols)) # Constraints
parts.append(cls._constraints_sql()) for c in cls._constraints.values():
items.append(' %s' % c.create_table_sql())
# Indexes
for i in cls._indexes.values():
items.append(' %s' % i.create_table_sql())
parts.append(',\n'.join(items))
# Engine
parts.append(')') parts.append(')')
parts.append('ENGINE = ' + cls.engine.create_table_sql(db)) parts.append('ENGINE = ' + cls.engine.create_table_sql(db))
return '\n'.join(parts) return '\n'.join(parts)
@classmethod
def _constraints_sql(cls):
'''
Returns this model's contraints as SQL.
'''
if not cls._constraints:
return ''
return ',' + ',\n'.join(c.create_table_sql() for c in cls._constraints.values())
@classmethod @classmethod
def drop_table_sql(cls, db): def drop_table_sql(cls, db):
''' '''
@ -420,7 +507,6 @@ class MergeModel(Model):
if name != '_table': if name != '_table':
cols.append(' %s %s' % (name, field.get_sql(db=db))) cols.append(' %s %s' % (name, field.get_sql(db=db)))
parts.append(',\n'.join(cols)) parts.append(',\n'.join(cols))
parts.append(cls._constraints_sql())
parts.append(')') parts.append(')')
parts.append('ENGINE = ' + cls.engine.create_table_sql(db)) parts.append('ENGINE = ' + cls.engine.create_table_sql(db))
return '\n'.join(parts) return '\n'.join(parts)
@ -510,4 +596,4 @@ class DistributedModel(Model):
# Expose only relevant classes in import * # Expose only relevant classes in import *
__all__ = get_subclass_names(locals(), (Model, Constraint)) __all__ = get_subclass_names(locals(), (Model, Constraint, Index))

View File

@ -1,6 +1,6 @@
from __future__ import unicode_literals
import codecs import codecs
import re import re
from datetime import date, datetime, tzinfo, timedelta
SPECIAL_CHARS = { SPECIAL_CHARS = {
@ -42,6 +42,40 @@ def string_or_func(obj):
return obj.to_sql() if hasattr(obj, 'to_sql') else obj return obj.to_sql() if hasattr(obj, 'to_sql') else obj
def arg_to_sql(arg):
"""
Converts a function argument to SQL string according to its type.
Supports functions, model fields, strings, dates, datetimes, timedeltas, booleans,
None, numbers, timezones, arrays/iterables.
"""
from infi.clickhouse_orm import Field, StringField, DateTimeField, DateField, F, QuerySet
if isinstance(arg, F):
return arg.to_sql()
if isinstance(arg, Field):
return "`%s`" % arg
if isinstance(arg, str):
return StringField().to_db_string(arg)
if isinstance(arg, datetime):
return "toDateTime(%s)" % DateTimeField().to_db_string(arg)
if isinstance(arg, date):
return "toDate('%s')" % arg.isoformat()
if isinstance(arg, timedelta):
return "toIntervalSecond(%d)" % int(arg.total_seconds())
if isinstance(arg, bool):
return str(int(arg))
if isinstance(arg, tzinfo):
return StringField().to_db_string(arg.tzname(None))
if arg is None:
return 'NULL'
if isinstance(arg, QuerySet):
return "(%s)" % arg
if isinstance(arg, tuple):
return '(' + comma_join(arg_to_sql(x) for x in arg) + ')'
if is_iterable(arg):
return '[' + comma_join(arg_to_sql(x) for x in arg) + ']'
return str(arg)
def parse_tsv(line): def parse_tsv(line):
if isinstance(line, bytes): if isinstance(line, bytes):
line = line.decode() line = line.decode()

View File

@ -0,0 +1,6 @@
from infi.clickhouse_orm import migrations
from ..test_migrations import *
operations = [
migrations.CreateTable(ModelWithIndex)
]

View File

@ -0,0 +1,6 @@
from infi.clickhouse_orm import migrations
from ..test_migrations import *
operations = [
migrations.AlterIndexes(ModelWithIndex2, reindex=True)
]

32
tests/test_indexes.py Normal file
View File

@ -0,0 +1,32 @@
import unittest
from infi.clickhouse_orm import *
class IndexesTest(unittest.TestCase):
def setUp(self):
self.database = Database('test-db', log_statements=True)
if self.database.server_version < (19, 3, 3):
raise unittest.SkipTest('ClickHouse version too old')
def tearDown(self):
self.database.drop_database()
def test_all_index_types(self):
self.database.create_table(ModelWithIndexes)
class ModelWithIndexes(Model):
date = DateField()
f1 = Int32Field()
f2 = StringField()
i1 = Index(f1, type=Index.minmax(), granularity=1)
i2 = Index(f1, type=Index.set(1000), granularity=2)
i3 = Index(f2, type=Index.ngrambf_v1(3, 256, 2, 0), granularity=1)
i4 = Index(F.lower(f2), type=Index.tokenbf_v1(256, 2, 0), granularity=2)
i5 = Index((F.toQuarter(date), f2), type=Index.bloom_filter(), granularity=3)
engine = MergeTree('date', ('date',))

View File

@ -2,7 +2,7 @@ from __future__ import unicode_literals
import unittest import unittest
from infi.clickhouse_orm.database import Database, ServerError from infi.clickhouse_orm.database import Database, ServerError
from infi.clickhouse_orm.models import Model, BufferModel, Constraint from infi.clickhouse_orm.models import Model, BufferModel, Constraint, Index
from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.fields import *
from infi.clickhouse_orm.engines import * from infi.clickhouse_orm.engines import *
from infi.clickhouse_orm.migrations import MigrationHistory from infi.clickhouse_orm.migrations import MigrationHistory
@ -27,55 +27,58 @@ class MigrationsTestCase(unittest.TestCase):
def tearDown(self): def tearDown(self):
self.database.drop_database() self.database.drop_database()
def tableExists(self, model_class): def table_exists(self, model_class):
query = "EXISTS TABLE $db.`%s`" % model_class.table_name() query = "EXISTS TABLE $db.`%s`" % model_class.table_name()
return next(self.database.select(query)).result == 1 return next(self.database.select(query)).result == 1
def getTableFields(self, model_class): def get_table_fields(self, model_class):
query = "DESC `%s`.`%s`" % (self.database.db_name, model_class.table_name()) query = "DESC `%s`.`%s`" % (self.database.db_name, model_class.table_name())
return [(row.name, row.type) for row in self.database.select(query)] return [(row.name, row.type) for row in self.database.select(query)]
def get_table_def(self, model_class):
return self.database.raw('SHOW CREATE TABLE $db.`%s`' % self.table_name)
def test_migrations(self): def test_migrations(self):
# Creation and deletion of table # Creation and deletion of table
self.database.migrate('tests.sample_migrations', 1) self.database.migrate('tests.sample_migrations', 1)
self.assertTrue(self.tableExists(Model1)) self.assertTrue(self.table_exists(Model1))
self.database.migrate('tests.sample_migrations', 2) self.database.migrate('tests.sample_migrations', 2)
self.assertFalse(self.tableExists(Model1)) self.assertFalse(self.table_exists(Model1))
self.database.migrate('tests.sample_migrations', 3) self.database.migrate('tests.sample_migrations', 3)
self.assertTrue(self.tableExists(Model1)) self.assertTrue(self.table_exists(Model1))
# Adding, removing and altering simple fields # Adding, removing and altering simple fields
self.assertEqual(self.getTableFields(Model1), [('date', 'Date'), ('f1', 'Int32'), ('f2', 'String')]) self.assertEqual(self.get_table_fields(Model1), [('date', 'Date'), ('f1', 'Int32'), ('f2', 'String')])
self.database.migrate('tests.sample_migrations', 4) self.database.migrate('tests.sample_migrations', 4)
self.assertEqual(self.getTableFields(Model2), [('date', 'Date'), ('f1', 'Int32'), ('f3', 'Float32'), ('f2', 'String'), ('f4', 'String'), ('f5', 'Array(UInt64)')]) self.assertEqual(self.get_table_fields(Model2), [('date', 'Date'), ('f1', 'Int32'), ('f3', 'Float32'), ('f2', 'String'), ('f4', 'String'), ('f5', 'Array(UInt64)')])
self.database.migrate('tests.sample_migrations', 5) self.database.migrate('tests.sample_migrations', 5)
self.assertEqual(self.getTableFields(Model3), [('date', 'Date'), ('f1', 'Int64'), ('f3', 'Float64'), ('f4', 'String')]) self.assertEqual(self.get_table_fields(Model3), [('date', 'Date'), ('f1', 'Int64'), ('f3', 'Float64'), ('f4', 'String')])
# Altering enum fields # Altering enum fields
self.database.migrate('tests.sample_migrations', 6) self.database.migrate('tests.sample_migrations', 6)
self.assertTrue(self.tableExists(EnumModel1)) self.assertTrue(self.table_exists(EnumModel1))
self.assertEqual(self.getTableFields(EnumModel1), self.assertEqual(self.get_table_fields(EnumModel1),
[('date', 'Date'), ('f1', "Enum8('dog' = 1, 'cat' = 2, 'cow' = 3)")]) [('date', 'Date'), ('f1', "Enum8('dog' = 1, 'cat' = 2, 'cow' = 3)")])
self.database.migrate('tests.sample_migrations', 7) self.database.migrate('tests.sample_migrations', 7)
self.assertTrue(self.tableExists(EnumModel1)) self.assertTrue(self.table_exists(EnumModel1))
self.assertEqual(self.getTableFields(EnumModel2), self.assertEqual(self.get_table_fields(EnumModel2),
[('date', 'Date'), ('f1', "Enum16('dog' = 1, 'cat' = 2, 'horse' = 3, 'pig' = 4)")]) [('date', 'Date'), ('f1', "Enum16('dog' = 1, 'cat' = 2, 'horse' = 3, 'pig' = 4)")])
# Materialized fields and alias fields # Materialized fields and alias fields
self.database.migrate('tests.sample_migrations', 8) self.database.migrate('tests.sample_migrations', 8)
self.assertTrue(self.tableExists(MaterializedModel)) self.assertTrue(self.table_exists(MaterializedModel))
self.assertEqual(self.getTableFields(MaterializedModel), self.assertEqual(self.get_table_fields(MaterializedModel),
[('date_time', "DateTime"), ('date', 'Date')]) [('date_time', "DateTime"), ('date', 'Date')])
self.database.migrate('tests.sample_migrations', 9) self.database.migrate('tests.sample_migrations', 9)
self.assertTrue(self.tableExists(AliasModel)) self.assertTrue(self.table_exists(AliasModel))
self.assertEqual(self.getTableFields(AliasModel), self.assertEqual(self.get_table_fields(AliasModel),
[('date', 'Date'), ('date_alias', "Date")]) [('date', 'Date'), ('date_alias', "Date")])
# Buffer models creation and alteration # Buffer models creation and alteration
self.database.migrate('tests.sample_migrations', 10) self.database.migrate('tests.sample_migrations', 10)
self.assertTrue(self.tableExists(Model4)) self.assertTrue(self.table_exists(Model4))
self.assertTrue(self.tableExists(Model4Buffer)) self.assertTrue(self.table_exists(Model4Buffer))
self.assertEqual(self.getTableFields(Model4), [('date', 'Date'), ('f1', 'Int32'), ('f2', 'String')]) self.assertEqual(self.get_table_fields(Model4), [('date', 'Date'), ('f1', 'Int32'), ('f2', 'String')])
self.assertEqual(self.getTableFields(Model4Buffer), [('date', 'Date'), ('f1', 'Int32'), ('f2', 'String')]) self.assertEqual(self.get_table_fields(Model4Buffer), [('date', 'Date'), ('f1', 'Int32'), ('f2', 'String')])
self.database.migrate('tests.sample_migrations', 11) self.database.migrate('tests.sample_migrations', 11)
self.assertEqual(self.getTableFields(Model4), [('date', 'Date'), ('f3', 'DateTime'), ('f2', 'String')]) self.assertEqual(self.get_table_fields(Model4), [('date', 'Date'), ('f3', 'DateTime'), ('f2', 'String')])
self.assertEqual(self.getTableFields(Model4Buffer), [('date', 'Date'), ('f3', 'DateTime'), ('f2', 'String')]) self.assertEqual(self.get_table_fields(Model4Buffer), [('date', 'Date'), ('f3', 'DateTime'), ('f2', 'String')])
self.database.migrate('tests.sample_migrations', 12) self.database.migrate('tests.sample_migrations', 12)
self.assertEqual(self.database.count(Model3), 3) self.assertEqual(self.database.count(Model3), 3)
@ -88,29 +91,29 @@ class MigrationsTestCase(unittest.TestCase):
self.assertListEqual(data, [1, 2, 3, 4]) self.assertListEqual(data, [1, 2, 3, 4])
self.database.migrate('tests.sample_migrations', 14) self.database.migrate('tests.sample_migrations', 14)
self.assertTrue(self.tableExists(MaterializedModel1)) self.assertTrue(self.table_exists(MaterializedModel1))
self.assertEqual(self.getTableFields(MaterializedModel1), self.assertEqual(self.get_table_fields(MaterializedModel1),
[('date_time', 'DateTime'), ('int_field', 'Int8'), ('date', 'Date'), ('int_field_plus_one', 'Int8')]) [('date_time', 'DateTime'), ('int_field', 'Int8'), ('date', 'Date'), ('int_field_plus_one', 'Int8')])
self.assertTrue(self.tableExists(AliasModel1)) self.assertTrue(self.table_exists(AliasModel1))
self.assertEqual(self.getTableFields(AliasModel1), self.assertEqual(self.get_table_fields(AliasModel1),
[('date', 'Date'), ('int_field', 'Int8'), ('date_alias', 'Date'), ('int_field_plus_one', 'Int8')]) [('date', 'Date'), ('int_field', 'Int8'), ('date_alias', 'Date'), ('int_field_plus_one', 'Int8')])
# Codecs and low cardinality # Codecs and low cardinality
self.database.migrate('tests.sample_migrations', 15) self.database.migrate('tests.sample_migrations', 15)
self.assertTrue(self.tableExists(Model4_compressed)) self.assertTrue(self.table_exists(Model4_compressed))
if self.database.has_low_cardinality_support: if self.database.has_low_cardinality_support:
self.assertEqual(self.getTableFields(Model2LowCardinality), self.assertEqual(self.get_table_fields(Model2LowCardinality),
[('date', 'Date'), ('f1', 'LowCardinality(Int32)'), ('f3', 'LowCardinality(Float32)'), [('date', 'Date'), ('f1', 'LowCardinality(Int32)'), ('f3', 'LowCardinality(Float32)'),
('f2', 'LowCardinality(String)'), ('f4', 'LowCardinality(Nullable(String))'), ('f5', 'Array(LowCardinality(UInt64))')]) ('f2', 'LowCardinality(String)'), ('f4', 'LowCardinality(Nullable(String))'), ('f5', 'Array(LowCardinality(UInt64))')])
else: else:
logging.warning('No support for low cardinality') logging.warning('No support for low cardinality')
self.assertEqual(self.getTableFields(Model2), self.assertEqual(self.get_table_fields(Model2),
[('date', 'Date'), ('f1', 'Int32'), ('f3', 'Float32'), ('f2', 'String'), ('f4', 'Nullable(String)'), [('date', 'Date'), ('f1', 'Int32'), ('f3', 'Float32'), ('f2', 'String'), ('f4', 'Nullable(String)'),
('f5', 'Array(UInt64)')]) ('f5', 'Array(UInt64)')])
if self.database.server_version >= (19, 14, 3, 3): if self.database.server_version >= (19, 14, 3, 3):
# Adding constraints # Creating constraints
self.database.migrate('tests.sample_migrations', 16) self.database.migrate('tests.sample_migrations', 16)
self.assertTrue(self.tableExists(ModelWithConstraints)) self.assertTrue(self.table_exists(ModelWithConstraints))
self.database.insert([ModelWithConstraints(f1=101, f2='a')]) self.database.insert([ModelWithConstraints(f1=101, f2='a')])
with self.assertRaises(ServerError): with self.assertRaises(ServerError):
self.database.insert([ModelWithConstraints(f1=99, f2='a')]) self.database.insert([ModelWithConstraints(f1=99, f2='a')])
@ -124,6 +127,19 @@ class MigrationsTestCase(unittest.TestCase):
with self.assertRaises(ServerError): with self.assertRaises(ServerError):
self.database.insert([ModelWithConstraints(f1=99, f2='x')]) self.database.insert([ModelWithConstraints(f1=99, f2='x')])
if self.database.server_version < (19, 3, 3):
# Creating indexes
self.database.migrate('tests.sample_migrations', 18)
self.assertTrue(self.table_exists(ModelWithIndex))
self.assertIn('INDEX `index`', self.get_table_def())
self.assertIn('INDEX another_index', self.get_table_def())
# Modifying indexes
self.database.migrate('tests.sample_migrations', 19)
self.assertNotIn('INDEX `index`', self.get_table_def())
self.assertIn('INDEX index2', self.get_table_def())
self.assertIn('INDEX another_index', self.get_table_def())
# Several different models with the same table name, to simulate a table that changes over time # Several different models with the same table name, to simulate a table that changes over time
class Model1(Model): class Model1(Model):
@ -344,3 +360,35 @@ class ModelWithConstraints2(Model):
def table_name(cls): def table_name(cls):
return 'modelwithconstraints' return 'modelwithconstraints'
class ModelWithIndex(Model):
date = DateField()
f1 = Int32Field()
f2 = StringField()
index = Index(f1, type=Index.minmax(), granularity=1)
another_index = Index(f2, type=Index.set(0), granularity=1)
engine = MergeTree('date', ('date',))
@classmethod
def table_name(cls):
return 'modelwithindex'
class ModelWithIndex2(Model):
date = DateField()
f1 = Int32Field()
f2 = StringField()
index2 = Index(f1, type=Index.bloom_filter(), granularity=2)
another_index = Index(f2, type=Index.set(0), granularity=1)
engine = MergeTree('date', ('date',))
@classmethod
def table_name(cls):
return 'modelwithindex'