Merge branch 'carrotquest-custom_partitioning' into develop

This commit is contained in:
Itai Shirav 2018-04-21 13:28:57 +03:00
commit ae3011e88f
10 changed files with 212 additions and 114 deletions

View File

@ -24,6 +24,18 @@ created on the ClickHouse server if it does not already exist.
- `autocreate`: automatically create the database if does not exist (unless in readonly mode). - `autocreate`: automatically create the database if does not exist (unless in readonly mode).
#### server_timezone
Contains [pytz](http://pytz.sourceforge.net/) timezone used on database server
#### server_version
Contains a version tuple of database server, for example (1, 1, 54310)
#### count(model_class, conditions=None) #### count(model_class, conditions=None)
@ -144,13 +156,13 @@ invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`. Unrecognized field names will cause an `AttributeError`.
#### Model.create_table_sql(db_name) #### Model.create_table_sql(db)
Returns the SQL command for creating a table for this model. Returns the SQL command for creating a table for this model.
#### Model.drop_table_sql(db_name) #### Model.drop_table_sql(db)
Returns the SQL command for deleting this model's table. Returns the SQL command for deleting this model's table.
@ -241,13 +253,13 @@ invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`. Unrecognized field names will cause an `AttributeError`.
#### BufferModel.create_table_sql(db_name) #### BufferModel.create_table_sql(db)
Returns the SQL command for creating a table for this model. Returns the SQL command for creating a table for this model.
#### BufferModel.drop_table_sql(db_name) #### BufferModel.drop_table_sql(db)
Returns the SQL command for deleting this model's table. Returns the SQL command for deleting this model's table.
@ -639,7 +651,7 @@ Extends Engine
Extends Engine Extends Engine
#### MergeTree(date_col, key_cols, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None) #### MergeTree(date_col=None, order_by=(), sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None)
### Buffer ### Buffer

View File

@ -119,12 +119,12 @@ invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`. Unrecognized field names will cause an `AttributeError`.
#### Model.create_table_sql(db_name) #### Model.create_table_sql(db)
Returns the SQL command for creating a table for this model. Returns the SQL command for creating a table for this model.
#### Model.drop_table_sql(db_name) #### Model.drop_table_sql(db)
Returns the SQL command for deleting this model's table. Returns the SQL command for deleting this model's table.
@ -197,12 +197,12 @@ invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`. Unrecognized field names will cause an `AttributeError`.
#### BufferModel.create_table_sql(db_name) #### BufferModel.create_table_sql(db)
Returns the SQL command for creating a table for this model. Returns the SQL command for creating a table for this model.
#### BufferModel.drop_table_sql(db_name) #### BufferModel.drop_table_sql(db)
Returns the SQL command for deleting this model's table. Returns the SQL command for deleting this model's table.

View File

@ -55,6 +55,23 @@ For a `ReplacingMergeTree` you can optionally specify the version column:
engine = engines.ReplacingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), ver_col='Version') engine = engines.ReplacingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), ver_col='Version')
### Custom partitioning
ClickHouse supports [custom partitioning](https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/) expressions since version 1.1.54310
You can use custom partitioning with any MergeTree family engine.
To set custom partitioning:
* skip date_col (first) constructor parameter or fill it with None value
* add name to order_by (second) constructor parameter
* add partition_key parameter. It should be a tuple of expressions, by which partition are built.
Standard partitioning by date column can be added using toYYYYMM(date) function.
Example:
engine = engines.ReplacingMergeTree(order_by=('OrderID', 'EventDate', 'BannerID'), ver_col='Version',
partition_key=('toYYYYMM(EventDate)', 'BannerID'))
### Data Replication ### Data Replication
Any of the above engines can be converted to a replicated engine (e.g. `ReplicatedMergeTree`) by adding two parameters, `replica_table_path` and `replica_name`: Any of the above engines can be converted to a replicated engine (e.g. `ReplicatedMergeTree`) by adding two parameters, `replica_table_path` and `replica_name`:

View File

@ -97,6 +97,7 @@ class Database(object):
self.db_exists = False self.db_exists = False
self.create_database() self.create_database()
self.server_timezone = self._get_server_timezone() self.server_timezone = self._get_server_timezone()
self.server_version = self._get_server_version()
def create_database(self): def create_database(self):
''' '''
@ -118,7 +119,7 @@ class Database(object):
# TODO check that model has an engine # TODO check that model has an engine
if model_class.system: if model_class.system:
raise DatabaseException("You can't create system table") raise DatabaseException("You can't create system table")
self._send(model_class.create_table_sql(self.db_name)) self._send(model_class.create_table_sql(self))
def drop_table(self, model_class): def drop_table(self, model_class):
''' '''
@ -126,7 +127,7 @@ class Database(object):
''' '''
if model_class.system: if model_class.system:
raise DatabaseException("You can't drop system table") raise DatabaseException("You can't drop system table")
self._send(model_class.drop_table_sql(self.db_name)) self._send(model_class.drop_table_sql(self))
def insert(self, model_instances, batch_size=1000): def insert(self, model_instances, batch_size=1000):
''' '''
@ -326,6 +327,15 @@ class Database(object):
logger.exception('Cannot determine server timezone (%s), assuming UTC', e) logger.exception('Cannot determine server timezone (%s), assuming UTC', e)
return pytz.utc return pytz.utc
def _get_server_version(self, as_tuple=True):
try:
r = self._send('SELECT version();')
ver = r.text
except ServerError as e:
logger.exception('Cannot determine server version (%s), assuming 1.1.0', e)
ver = '1.1.0'
return tuple(int(n) for n in ver.split('.')) if as_tuple else ver
def _is_connection_readonly(self): def _is_connection_readonly(self):
r = self._send("SELECT value FROM system.settings WHERE name = 'readonly'") r = self._send("SELECT value FROM system.settings WHERE name = 'readonly'")
return r.text.strip() != '0' return r.text.strip() != '0'

View File

@ -1,89 +1,141 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import logging
import six import six
from .utils import comma_join from .utils import comma_join
logger = logging.getLogger('clickhouse_orm')
class Engine(object): class Engine(object):
def create_table_sql(self): def create_table_sql(self, db):
raise NotImplementedError() # pragma: no cover raise NotImplementedError() # pragma: no cover
class TinyLog(Engine): class TinyLog(Engine):
def create_table_sql(self): def create_table_sql(self, db):
return 'TinyLog' return 'TinyLog'
class Log(Engine): class Log(Engine):
def create_table_sql(self): def create_table_sql(self, db):
return 'Log' return 'Log'
class Memory(Engine): class Memory(Engine):
def create_table_sql(self): def create_table_sql(self, db):
return 'Memory' return 'Memory'
class MergeTree(Engine): class MergeTree(Engine):
def __init__(self, date_col, key_cols, sampling_expr=None, def __init__(self, date_col=None, order_by=(), sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None): index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None):
assert type(key_cols) in (list, tuple), 'key_cols must be a list or tuple' assert type(order_by) in (list, tuple), 'order_by must be a list or tuple'
assert date_col is None or isinstance(date_col, six.string_types), 'date_col must be string if present'
assert partition_key is None or type(partition_key) in (list, tuple),\
'partition_key must be tuple or list if present'
# These values conflict with each other (old and new syntax of table engines.
# So let's control only one of them is given.
assert date_col or partition_key, "You must set either date_col or partition_key"
self.date_col = date_col self.date_col = date_col
self.key_cols = key_cols self.partition_key = partition_key if partition_key else ('toYYYYMM(`%s`)' % date_col,)
self.order_by = order_by
self.sampling_expr = sampling_expr self.sampling_expr = sampling_expr
self.index_granularity = index_granularity self.index_granularity = index_granularity
self.replica_table_path = replica_table_path self.replica_table_path = replica_table_path
self.replica_name = replica_name self.replica_name = replica_name
# TODO verify that both replica fields are either present or missing # TODO verify that both replica fields are either present or missing
def create_table_sql(self): # I changed field name for new reality and syntax
@property
def key_cols(self):
logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead')
return self.order_by
@key_cols.setter
def key_cols(self, value):
logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead')
self.order_by = value
def create_table_sql(self, db):
name = self.__class__.__name__ name = self.__class__.__name__
if self.replica_name: if self.replica_name:
name = 'Replicated' + name name = 'Replicated' + name
params = self._build_sql_params()
return '%s(%s)' % (name, comma_join(params))
def _build_sql_params(self): # In ClickHouse 1.1.54310 custom partitioning key was introduced
# https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/
# Let's check version and use new syntax if available
if db.server_version >= (1, 1, 54310):
partition_sql = "PARTITION BY %s ORDER BY %s" \
% ('(%s)' % comma_join(self.partition_key), '(%s)' % comma_join(self.order_by))
if self.sampling_expr:
partition_sql += " SAMPLE BY %s" % self.sampling_expr
partition_sql += " SETTINGS index_granularity=%d" % self.index_granularity
elif not self.date_col:
# Can't import it globally due to circular import
from infi.clickhouse_orm.database import DatabaseException
raise DatabaseException("Custom partitioning is not supported before ClickHouse 1.1.54310. "
"Please update your server or use date_col syntax."
"https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/")
else:
partition_sql = ''
params = self._build_sql_params(db)
return '%s(%s) %s' % (name, comma_join(params), partition_sql)
def _build_sql_params(self, db):
params = [] params = []
if self.replica_name: if self.replica_name:
params += ["'%s'" % self.replica_table_path, "'%s'" % self.replica_name] params += ["'%s'" % self.replica_table_path, "'%s'" % self.replica_name]
params.append(self.date_col)
if self.sampling_expr: # In ClickHouse 1.1.54310 custom partitioning key was introduced
params.append(self.sampling_expr) # https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/
params.append('(%s)' % comma_join(self.key_cols)) # These parameters are process in create_table_sql directly.
params.append(str(self.index_granularity)) # In previous ClickHouse versions this this syntax does not work.
if db.server_version < (1, 1, 54310):
params.append(self.date_col)
if self.sampling_expr:
params.append(self.sampling_expr)
params.append('(%s)' % comma_join(self.order_by))
params.append(str(self.index_granularity))
return params return params
class CollapsingMergeTree(MergeTree): class CollapsingMergeTree(MergeTree):
def __init__(self, date_col, key_cols, sign_col, sampling_expr=None, def __init__(self, date_col, order_by, sign_col, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None): index_granularity=8192, replica_table_path=None, replica_name=None):
super(CollapsingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name)
self.sign_col = sign_col self.sign_col = sign_col
def _build_sql_params(self): def _build_sql_params(self, db):
params = super(CollapsingMergeTree, self)._build_sql_params() params = super(CollapsingMergeTree, self)._build_sql_params(db)
params.append(self.sign_col) params.append(self.sign_col)
return params return params
class SummingMergeTree(MergeTree): class SummingMergeTree(MergeTree):
def __init__(self, date_col, key_cols, summing_cols=None, sampling_expr=None, def __init__(self, date_col, order_by, summing_cols=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None): index_granularity=8192, replica_table_path=None, replica_name=None):
super(SummingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name)
assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple' assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple'
self.summing_cols = summing_cols self.summing_cols = summing_cols
def _build_sql_params(self): def _build_sql_params(self, db):
params = super(SummingMergeTree, self)._build_sql_params() params = super(SummingMergeTree, self)._build_sql_params(db)
if self.summing_cols: if self.summing_cols:
params.append('(%s)' % comma_join(self.summing_cols)) params.append('(%s)' % comma_join(self.summing_cols))
return params return params
@ -91,13 +143,13 @@ class SummingMergeTree(MergeTree):
class ReplacingMergeTree(MergeTree): class ReplacingMergeTree(MergeTree):
def __init__(self, date_col, key_cols, ver_col=None, sampling_expr=None, def __init__(self, date_col, order_by, ver_col=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None): index_granularity=8192, replica_table_path=None, replica_name=None):
super(ReplacingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name)
self.ver_col = ver_col self.ver_col = ver_col
def _build_sql_params(self): def _build_sql_params(self, db):
params = super(ReplacingMergeTree, self)._build_sql_params() params = super(ReplacingMergeTree, self)._build_sql_params(db)
if self.ver_col: if self.ver_col:
params.append(self.ver_col) params.append(self.ver_col)
return params return params
@ -121,11 +173,11 @@ class Buffer(Engine):
self.min_bytes = min_bytes self.min_bytes = min_bytes
self.max_bytes = max_bytes self.max_bytes = max_bytes
def create_table_sql(self, db_name): def create_table_sql(self, db):
# Overriden create_table_sql example: # Overriden create_table_sql example:
#sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' # sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)'
sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % ( sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % (
db_name, self.main_model.table_name(), self.num_layers, db.db_name, self.main_model.table_name(), self.num_layers,
self.min_time, self.max_time, self.min_rows, self.min_time, self.max_time, self.min_rows,
self.max_rows, self.min_bytes, self.max_bytes self.max_rows, self.min_bytes, self.max_bytes
) )
@ -142,19 +194,10 @@ class Merge(Engine):
def __init__(self, table_regex): def __init__(self, table_regex):
assert isinstance(table_regex, six.string_types), "'table_regex' parameter must be string" assert isinstance(table_regex, six.string_types), "'table_regex' parameter must be string"
self.table_regex = table_regex self.table_regex = table_regex
# Use current database as default def create_table_sql(self, db):
self.db_name = None return "Merge(`%s`, '%s')" % (db.db_name, self.table_regex)
def create_table_sql(self):
db_name = ("`%s`" % self.db_name) if self.db_name else 'currentDatabase()'
return "Merge(%s, '%s')" % (db_name, self.table_regex)
def set_db_name(self, db_name):
assert isinstance(db_name, six.string_types), "'db_name' parameter must be string"
self.db_name = db_name
class Distributed(Engine): class Distributed(Engine):
@ -167,20 +210,17 @@ class Distributed(Engine):
See full documentation here See full documentation here
https://clickhouse.yandex/docs/en/table_engines/distributed.html https://clickhouse.yandex/docs/en/table_engines/distributed.html
""" """
def __init__(self, cluster, table=None, db_name=None, sharding_key=None): def __init__(self, cluster, table=None, sharding_key=None):
""" """
:param cluster: what cluster to access data from :param cluster: what cluster to access data from
:param table: underlying table that actually stores data. :param table: underlying table that actually stores data.
If you are not specifying any table here, ensure that it can be inferred If you are not specifying any table here, ensure that it can be inferred
from your model's superclass (see models.DistributedModel.fix_engine_table) from your model's superclass (see models.DistributedModel.fix_engine_table)
:param db_name: which database to access data from
By default it is 'currentDatabase()'
:param sharding_key: how to distribute data among shards when inserting :param sharding_key: how to distribute data among shards when inserting
straightly into Distributed table, optional straightly into Distributed table, optional
""" """
self.cluster = cluster self.cluster = cluster
self.table = table self.table = table
self.db_name = db_name
self.sharding_key = sharding_key self.sharding_key = sharding_key
@property @property
@ -195,23 +235,17 @@ class Distributed(Engine):
return table return table
def set_db_name(self, db_name): def create_table_sql(self, db):
assert isinstance(db_name, six.string_types), "'db_name' parameter must be string"
self.db_name = db_name
def create_table_sql(self):
name = self.__class__.__name__ name = self.__class__.__name__
params = self._build_sql_params() params = self._build_sql_params(db)
return '%s(%s)' % (name, ', '.join(params)) return '%s(%s)' % (name, ', '.join(params))
def _build_sql_params(self): def _build_sql_params(self, db):
db_name = ("`%s`" % self.db_name) if self.db_name else 'currentDatabase()'
if self.table_name is None: if self.table_name is None:
raise ValueError("Cannot create {} engine: specify an underlying table".format( raise ValueError("Cannot create {} engine: specify an underlying table".format(
self.__class__.__name__)) self.__class__.__name__))
params = [self.cluster, db_name, self.table_name] params = ["`%s`" % p for p in [self.cluster, db.db_name, self.table_name]]
if self.sharding_key: if self.sharding_key:
params.append(self.sharding_key) params.append(self.sharding_key)
return params return params

View File

@ -173,25 +173,25 @@ class Model(with_metaclass(ModelBase)):
return cls.__name__.lower() return cls.__name__.lower()
@classmethod @classmethod
def create_table_sql(cls, db_name): def create_table_sql(cls, db):
''' '''
Returns the SQL command for creating a table for this model. Returns the SQL command for creating a table for this model.
''' '''
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db_name, cls.table_name())] parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())]
cols = [] cols = []
for name, field in iteritems(cls.fields()): for name, field in iteritems(cls.fields()):
cols.append(' %s %s' % (name, field.get_sql())) cols.append(' %s %s' % (name, field.get_sql()))
parts.append(',\n'.join(cols)) parts.append(',\n'.join(cols))
parts.append(')') parts.append(')')
parts.append('ENGINE = ' + cls.engine.create_table_sql()) parts.append('ENGINE = ' + cls.engine.create_table_sql(db))
return '\n'.join(parts) return '\n'.join(parts)
@classmethod @classmethod
def drop_table_sql(cls, db_name): def drop_table_sql(cls, db):
''' '''
Returns the SQL command for deleting this model's table. Returns the SQL command for deleting this model's table.
''' '''
return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db_name, cls.table_name()) return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db.db_name, cls.table_name())
@classmethod @classmethod
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None): def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None):
@ -265,12 +265,13 @@ class Model(with_metaclass(ModelBase)):
class BufferModel(Model): class BufferModel(Model):
@classmethod @classmethod
def create_table_sql(cls, db_name): def create_table_sql(cls, db):
''' '''
Returns the SQL command for creating a table for this model. Returns the SQL command for creating a table for this model.
''' '''
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.engine.main_model.table_name())] parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db.db_name, cls.table_name(), db.db_name,
engine_str = cls.engine.create_table_sql(db_name) cls.engine.main_model.table_name())]
engine_str = cls.engine.create_table_sql(db)
parts.append(engine_str) parts.append(engine_str)
return ' '.join(parts) return ' '.join(parts)
@ -286,21 +287,10 @@ class MergeModel(Model):
# Virtual fields can't be inserted into database # Virtual fields can't be inserted into database
_table = StringField(readonly=True) _table = StringField(readonly=True)
def set_database(self, db):
'''
Gets the `Database` that this model instance belongs to.
Returns `None` unless the instance was read from the database or written to it.
'''
assert isinstance(self.engine, Merge), "engine must be engines.Merge instance"
res = super(MergeModel, self).set_database(db)
self.engine.set_db_name(db.db_name)
return res
@classmethod @classmethod
def create_table_sql(cls, db_name): def create_table_sql(cls, db):
assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance" assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance"
cls.engine.set_db_name(db_name) return super(MergeModel, cls).create_table_sql(db)
return super(MergeModel, cls).create_table_sql(db_name)
# TODO: base class for models that require specific engine # TODO: base class for models that require specific engine
@ -314,7 +304,6 @@ class DistributedModel(Model):
def set_database(self, db): def set_database(self, db):
assert isinstance(self.engine, Distributed), "engine must be engines.Distributed instance" assert isinstance(self.engine, Distributed), "engine must be engines.Distributed instance"
res = super(DistributedModel, self).set_database(db) res = super(DistributedModel, self).set_database(db)
self.engine.set_db_name(db.db_name)
return res return res
@classmethod @classmethod
@ -369,14 +358,13 @@ class DistributedModel(Model):
cls.engine.table = storage_models[0] cls.engine.table = storage_models[0]
@classmethod @classmethod
def create_table_sql(cls, db_name): def create_table_sql(cls, db):
assert isinstance(cls.engine, Distributed), "engine must be engines.Distributed instance" assert isinstance(cls.engine, Distributed), "engine must be engines.Distributed instance"
cls.engine.set_db_name(db_name)
cls.fix_engine_table() cls.fix_engine_table()
parts = [ parts = [
'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` AS `{0}`.`{2}`'.format( 'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` AS `{0}`.`{2}`'.format(
db_name, cls.table_name(), cls.engine.table_name), db.db_name, cls.table_name(), cls.engine.table_name),
'ENGINE = ' + cls.engine.create_table_sql()] 'ENGINE = ' + cls.engine.create_table_sql(db)]
return '\n'.join(parts) return '\n'.join(parts)

View File

@ -68,7 +68,8 @@ class SystemPart(Model):
""" """
operation = operation.upper() operation = operation.upper()
assert operation in self.OPERATIONS, "operation must be in [%s]" % comma_join(self.OPERATIONS) assert operation in self.OPERATIONS, "operation must be in [%s]" % comma_join(self.OPERATIONS)
sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (self._database.db_name, self.table, operation, self.partition)
sql = "ALTER TABLE `%s`.`%s` %s PARTITION %s" % (self._database.db_name, self.table, operation, self.partition)
if from_part is not None: if from_part is not None:
sql += " FROM %s" % from_part sql += " FROM %s" % from_part
self._database.raw(sql, settings=settings, stream=False) self._database.raw(sql, settings=settings, stream=False)

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import unittest import unittest
from infi.clickhouse_orm.system_models import SystemPart
from infi.clickhouse_orm.database import Database, DatabaseException, ServerError from infi.clickhouse_orm.database import Database, DatabaseException, ServerError
from infi.clickhouse_orm.models import Model, MergeModel, DistributedModel from infi.clickhouse_orm.models import Model, MergeModel, DistributedModel
from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.fields import *
@ -43,8 +44,12 @@ class EnginesTestCase(_EnginesHelperTestCase):
def test_replicated_merge_tree(self): def test_replicated_merge_tree(self):
engine = MergeTree('date', ('date', 'event_id', 'event_group'), replica_table_path='/clickhouse/tables/{layer}-{shard}/hits', replica_name='{replica}') engine = MergeTree('date', ('date', 'event_id', 'event_group'), replica_table_path='/clickhouse/tables/{layer}-{shard}/hits', replica_name='{replica}')
expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)" # In ClickHouse 1.1.54310 custom partitioning key was introduced and new syntax is used
self.assertEquals(engine.create_table_sql(), expected) if self.database.server_version >= (1, 1, 54310):
expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}') PARTITION BY (toYYYYMM(`date`)) ORDER BY (date, event_id, event_group) SETTINGS index_granularity=8192"
else:
expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)"
self.assertEquals(engine.create_table_sql(self.database), expected)
def test_collapsing_merge_tree(self): def test_collapsing_merge_tree(self):
class TestModel(SampleModel): class TestModel(SampleModel):
@ -126,6 +131,17 @@ class EnginesTestCase(_EnginesHelperTestCase):
'event_uversion': 2 'event_uversion': 2
}, res[1].to_dict(include_readonly=True)) }, res[1].to_dict(include_readonly=True))
def test_custom_partitioning(self):
class TestModel(SampleModel):
engine = MergeTree(
order_by=('date', 'event_id', 'event_group'),
partition_key=('toYYYYMM(date)', 'event_group')
)
self._create_and_insert(TestModel)
parts = list(SystemPart.get(self.database))
self.assertEqual(1, len(parts))
self.assertEqual('(201701, 13)', parts[0].partition)
class SampleModel(Model): class SampleModel(Model):
@ -142,15 +158,15 @@ class DistributedTestCase(_EnginesHelperTestCase):
engine = Distributed('my_cluster') engine = Distributed('my_cluster')
with self.assertRaises(ValueError) as cm: with self.assertRaises(ValueError) as cm:
engine.create_table_sql() engine.create_table_sql(self.database)
exc = cm.exception exc = cm.exception
self.assertEqual(str(exc), 'Cannot create Distributed engine: specify an underlying table') self.assertEqual(str(exc), 'Cannot create Distributed engine: specify an underlying table')
def test_with_table_name(self): def test_with_table_name(self):
engine = Distributed('my_cluster', 'foo') engine = Distributed('my_cluster', 'foo')
sql = engine.create_table_sql() sql = engine.create_table_sql(self.database)
self.assertEqual(sql, 'Distributed(my_cluster, currentDatabase(), foo)') self.assertEqual(sql, 'Distributed(`my_cluster`, `test-db`, `foo`)')
class TestModel(SampleModel): class TestModel(SampleModel):
engine = TinyLog() engine = TinyLog()

View File

@ -3,6 +3,7 @@ import unittest
import datetime import datetime
import pytz import pytz
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.models import Model from infi.clickhouse_orm.models import Model
from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.fields import *
from infi.clickhouse_orm.engines import * from infi.clickhouse_orm.engines import *
@ -19,9 +20,10 @@ class InheritanceTestCase(unittest.TestCase):
self.assertFieldNames(Model2, ['date_field', 'int_field', 'float_field']) self.assertFieldNames(Model2, ['date_field', 'int_field', 'float_field'])
def test_create_table_sql(self): def test_create_table_sql(self):
sql1 = ParentModel.create_table_sql('default') default_db = Database('default')
sql2 = Model1.create_table_sql('default') sql1 = ParentModel.create_table_sql(default_db)
sql3 = Model2.create_table_sql('default') sql2 = Model1.create_table_sql(default_db)
sql3 = Model2.create_table_sql(default_db)
self.assertNotEqual(sql1, sql2) self.assertNotEqual(sql1, sql2)
self.assertNotEqual(sql1, sql3) self.assertNotEqual(sql1, sql3)
self.assertNotEqual(sql2, sql3) self.assertNotEqual(sql2, sql3)

View File

@ -1,7 +1,10 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import unittest import unittest
from datetime import date from datetime import date
import os import os
from infi.clickhouse_orm.database import Database, DatabaseException from infi.clickhouse_orm.database import Database, DatabaseException
from infi.clickhouse_orm.engines import * from infi.clickhouse_orm.engines import *
from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.fields import *
@ -37,7 +40,9 @@ class SystemPartTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.database = Database('test-db') self.database = Database('test-db')
self.database.create_table(TestTable) self.database.create_table(TestTable)
self.database.create_table(CustomPartitionedTable)
self.database.insert([TestTable(date_field=date.today())]) self.database.insert([TestTable(date_field=date.today())])
self.database.insert([CustomPartitionedTable(date_field=date.today(), group_field=13)])
def tearDown(self): def tearDown(self):
self.database.drop_database() self.database.drop_database()
@ -51,40 +56,46 @@ class SystemPartTest(unittest.TestCase):
def test_get_all(self): def test_get_all(self):
parts = SystemPart.get(self.database) parts = SystemPart.get(self.database)
self.assertEqual(len(list(parts)), 1) self.assertEqual(len(list(parts)), 2)
def test_get_active(self): def test_get_active(self):
parts = list(SystemPart.get_active(self.database)) parts = list(SystemPart.get_active(self.database))
self.assertEqual(len(parts), 1) self.assertEqual(len(parts), 2)
parts[0].detach() parts[0].detach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) self.assertEqual(len(list(SystemPart.get_active(self.database))), 1)
def test_get_conditions(self): def test_get_conditions(self):
parts = list(SystemPart.get(self.database, conditions="table='testtable'")) parts = list(SystemPart.get(self.database, conditions="table='testtable'"))
self.assertEqual(len(parts), 1) self.assertEqual(len(parts), 1)
parts = list(SystemPart.get(self.database, conditions=u"table='othertable'")) parts = list(SystemPart.get(self.database, conditions=u"table='custompartitionedtable'"))
self.assertEqual(len(parts), 1)
parts = list(SystemPart.get(self.database, conditions=u"table='invalidtable'"))
self.assertEqual(len(parts), 0) self.assertEqual(len(parts), 0)
def test_attach_detach(self): def test_attach_detach(self):
parts = list(SystemPart.get_active(self.database)) parts = list(SystemPart.get_active(self.database))
self.assertEqual(len(parts), 1) self.assertEqual(len(parts), 2)
parts[0].detach() for p in parts:
p.detach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
parts[0].attach() for p in parts:
self.assertEqual(len(list(SystemPart.get_active(self.database))), 1) p.attach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 2)
def test_drop(self): def test_drop(self):
parts = list(SystemPart.get_active(self.database)) parts = list(SystemPart.get_active(self.database))
parts[0].drop() for p in parts:
p.drop()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
def test_freeze(self): def test_freeze(self):
parts = list(SystemPart.get(self.database)) parts = list(SystemPart.get(self.database))
# There can be other backups in the folder # There can be other backups in the folder
prev_backups = set(self._get_backups()) prev_backups = set(self._get_backups())
parts[0].freeze() for p in parts:
p.freeze()
backups = set(self._get_backups()) backups = set(self._get_backups())
self.assertEqual(len(backups), len(prev_backups) + 1) self.assertEqual(len(backups), len(prev_backups) + 2)
def test_fetch(self): def test_fetch(self):
# TODO Not tested, as I have no replication set # TODO Not tested, as I have no replication set
@ -97,5 +108,12 @@ class TestTable(Model):
engine = MergeTree('date_field', ('date_field',)) engine = MergeTree('date_field', ('date_field',))
class CustomPartitionedTable(Model):
date_field = DateField()
group_field = UInt32Field()
engine = MergeTree(order_by=('date_field', 'group_field'), partition_key=('toYYYYMM(date_field)', 'group_field'))
class SystemTestModel(Model): class SystemTestModel(Model):
system = True system = True