mirror of
https://github.com/Infinidat/infi.clickhouse_orm.git
synced 2024-11-22 09:06:41 +03:00
Merge branch 'custom_partitioning' of https://github.com/carrotquest/infi.clickhouse_orm into carrotquest-custom_partitioning
# Conflicts: # src/infi/clickhouse_orm/engines.py # src/infi/clickhouse_orm/models.py # tests/test_database.py # tests/test_engines.py
This commit is contained in:
commit
b6229125a3
|
@ -24,6 +24,18 @@ created on the ClickHouse server if it does not already exist.
|
||||||
- `autocreate`: automatically create the database if does not exist (unless in readonly mode).
|
- `autocreate`: automatically create the database if does not exist (unless in readonly mode).
|
||||||
|
|
||||||
|
|
||||||
|
#### server_timezone
|
||||||
|
|
||||||
|
|
||||||
|
Contains [pytz](http://pytz.sourceforge.net/) timezone used on database server
|
||||||
|
|
||||||
|
|
||||||
|
#### server_version
|
||||||
|
|
||||||
|
|
||||||
|
Contains a version tuple of database server, for example (1, 1, 54310)
|
||||||
|
|
||||||
|
|
||||||
#### count(model_class, conditions=None)
|
#### count(model_class, conditions=None)
|
||||||
|
|
||||||
|
|
||||||
|
@ -144,13 +156,13 @@ invalid values will cause a `ValueError` to be raised.
|
||||||
Unrecognized field names will cause an `AttributeError`.
|
Unrecognized field names will cause an `AttributeError`.
|
||||||
|
|
||||||
|
|
||||||
#### Model.create_table_sql(db_name)
|
#### Model.create_table_sql(db)
|
||||||
|
|
||||||
|
|
||||||
Returns the SQL command for creating a table for this model.
|
Returns the SQL command for creating a table for this model.
|
||||||
|
|
||||||
|
|
||||||
#### Model.drop_table_sql(db_name)
|
#### Model.drop_table_sql(db)
|
||||||
|
|
||||||
|
|
||||||
Returns the SQL command for deleting this model's table.
|
Returns the SQL command for deleting this model's table.
|
||||||
|
@ -241,13 +253,13 @@ invalid values will cause a `ValueError` to be raised.
|
||||||
Unrecognized field names will cause an `AttributeError`.
|
Unrecognized field names will cause an `AttributeError`.
|
||||||
|
|
||||||
|
|
||||||
#### BufferModel.create_table_sql(db_name)
|
#### BufferModel.create_table_sql(db)
|
||||||
|
|
||||||
|
|
||||||
Returns the SQL command for creating a table for this model.
|
Returns the SQL command for creating a table for this model.
|
||||||
|
|
||||||
|
|
||||||
#### BufferModel.drop_table_sql(db_name)
|
#### BufferModel.drop_table_sql(db)
|
||||||
|
|
||||||
|
|
||||||
Returns the SQL command for deleting this model's table.
|
Returns the SQL command for deleting this model's table.
|
||||||
|
@ -639,7 +651,7 @@ Extends Engine
|
||||||
|
|
||||||
Extends Engine
|
Extends Engine
|
||||||
|
|
||||||
#### MergeTree(date_col, key_cols, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None)
|
#### MergeTree(date_col=None, order_by=(), sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None)
|
||||||
|
|
||||||
|
|
||||||
### Buffer
|
### Buffer
|
||||||
|
|
|
@ -119,12 +119,12 @@ invalid values will cause a `ValueError` to be raised.
|
||||||
Unrecognized field names will cause an `AttributeError`.
|
Unrecognized field names will cause an `AttributeError`.
|
||||||
|
|
||||||
|
|
||||||
#### Model.create_table_sql(db_name)
|
#### Model.create_table_sql(db)
|
||||||
|
|
||||||
Returns the SQL command for creating a table for this model.
|
Returns the SQL command for creating a table for this model.
|
||||||
|
|
||||||
|
|
||||||
#### Model.drop_table_sql(db_name)
|
#### Model.drop_table_sql(db)
|
||||||
|
|
||||||
Returns the SQL command for deleting this model's table.
|
Returns the SQL command for deleting this model's table.
|
||||||
|
|
||||||
|
@ -197,12 +197,12 @@ invalid values will cause a `ValueError` to be raised.
|
||||||
Unrecognized field names will cause an `AttributeError`.
|
Unrecognized field names will cause an `AttributeError`.
|
||||||
|
|
||||||
|
|
||||||
#### BufferModel.create_table_sql(db_name)
|
#### BufferModel.create_table_sql(db)
|
||||||
|
|
||||||
Returns the SQL command for creating a table for this model.
|
Returns the SQL command for creating a table for this model.
|
||||||
|
|
||||||
|
|
||||||
#### BufferModel.drop_table_sql(db_name)
|
#### BufferModel.drop_table_sql(db)
|
||||||
|
|
||||||
Returns the SQL command for deleting this model's table.
|
Returns the SQL command for deleting this model's table.
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,23 @@ For a `ReplacingMergeTree` you can optionally specify the version column:
|
||||||
|
|
||||||
engine = engines.ReplacingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), ver_col='Version')
|
engine = engines.ReplacingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), ver_col='Version')
|
||||||
|
|
||||||
|
### Custom partitioning
|
||||||
|
|
||||||
|
ClickHouse supports [custom partitioning](https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/) expressions since version 1.1.54310
|
||||||
|
You can use custom partitioning with any MergeTree family engine.
|
||||||
|
To set custom partitioning:
|
||||||
|
* skip date_col (first) constructor parameter or fill it with None value
|
||||||
|
* add name to order_by (second) constructor parameter
|
||||||
|
* add partition_key parameter. It should be a tuple of expressions, by which partition are built.
|
||||||
|
|
||||||
|
Standard partitioning by date column can be added using toYYYYMM(date) function.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
engine = engines.ReplacingMergeTree(order_by=('OrderID', 'EventDate', 'BannerID'), ver_col='Version',
|
||||||
|
partition_key=('toYYYYMM(EventDate)', 'BannerID'))
|
||||||
|
|
||||||
|
|
||||||
### Data Replication
|
### Data Replication
|
||||||
|
|
||||||
Any of the above engines can be converted to a replicated engine (e.g. `ReplicatedMergeTree`) by adding two parameters, `replica_table_path` and `replica_name`:
|
Any of the above engines can be converted to a replicated engine (e.g. `ReplicatedMergeTree`) by adding two parameters, `replica_table_path` and `replica_name`:
|
||||||
|
|
|
@ -97,6 +97,7 @@ class Database(object):
|
||||||
self.db_exists = False
|
self.db_exists = False
|
||||||
self.create_database()
|
self.create_database()
|
||||||
self.server_timezone = self._get_server_timezone()
|
self.server_timezone = self._get_server_timezone()
|
||||||
|
self.server_version = self._get_server_version()
|
||||||
|
|
||||||
def create_database(self):
|
def create_database(self):
|
||||||
'''
|
'''
|
||||||
|
@ -118,7 +119,7 @@ class Database(object):
|
||||||
# TODO check that model has an engine
|
# TODO check that model has an engine
|
||||||
if model_class.system:
|
if model_class.system:
|
||||||
raise DatabaseException("You can't create system table")
|
raise DatabaseException("You can't create system table")
|
||||||
self._send(model_class.create_table_sql(self.db_name))
|
self._send(model_class.create_table_sql(self))
|
||||||
|
|
||||||
def drop_table(self, model_class):
|
def drop_table(self, model_class):
|
||||||
'''
|
'''
|
||||||
|
@ -126,7 +127,7 @@ class Database(object):
|
||||||
'''
|
'''
|
||||||
if model_class.system:
|
if model_class.system:
|
||||||
raise DatabaseException("You can't drop system table")
|
raise DatabaseException("You can't drop system table")
|
||||||
self._send(model_class.drop_table_sql(self.db_name))
|
self._send(model_class.drop_table_sql(self))
|
||||||
|
|
||||||
def insert(self, model_instances, batch_size=1000):
|
def insert(self, model_instances, batch_size=1000):
|
||||||
'''
|
'''
|
||||||
|
@ -326,6 +327,11 @@ class Database(object):
|
||||||
logger.exception('Cannot determine server timezone (%s), assuming UTC', e)
|
logger.exception('Cannot determine server timezone (%s), assuming UTC', e)
|
||||||
return pytz.utc
|
return pytz.utc
|
||||||
|
|
||||||
|
def _get_server_version(self, as_tuple=True):
|
||||||
|
r = self._send('SELECT version();')
|
||||||
|
ver = r.text
|
||||||
|
return tuple(int(n) for n in ver.split('.')) if as_tuple else ver
|
||||||
|
|
||||||
def _is_connection_readonly(self):
|
def _is_connection_readonly(self):
|
||||||
r = self._send("SELECT value FROM system.settings WHERE name = 'readonly'")
|
r = self._send("SELECT value FROM system.settings WHERE name = 'readonly'")
|
||||||
return r.text.strip() != '0'
|
return r.text.strip() != '0'
|
||||||
|
|
|
@ -1,89 +1,141 @@
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import logging
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from .utils import comma_join
|
from .utils import comma_join
|
||||||
|
|
||||||
|
logger = logging.getLogger('clickhouse_orm')
|
||||||
|
|
||||||
|
|
||||||
class Engine(object):
|
class Engine(object):
|
||||||
|
|
||||||
def create_table_sql(self):
|
def create_table_sql(self, db):
|
||||||
raise NotImplementedError() # pragma: no cover
|
raise NotImplementedError() # pragma: no cover
|
||||||
|
|
||||||
|
|
||||||
class TinyLog(Engine):
|
class TinyLog(Engine):
|
||||||
|
|
||||||
def create_table_sql(self):
|
def create_table_sql(self, db):
|
||||||
return 'TinyLog'
|
return 'TinyLog'
|
||||||
|
|
||||||
|
|
||||||
class Log(Engine):
|
class Log(Engine):
|
||||||
|
|
||||||
def create_table_sql(self):
|
def create_table_sql(self, db):
|
||||||
return 'Log'
|
return 'Log'
|
||||||
|
|
||||||
|
|
||||||
class Memory(Engine):
|
class Memory(Engine):
|
||||||
|
|
||||||
def create_table_sql(self):
|
def create_table_sql(self, db):
|
||||||
return 'Memory'
|
return 'Memory'
|
||||||
|
|
||||||
|
|
||||||
class MergeTree(Engine):
|
class MergeTree(Engine):
|
||||||
|
|
||||||
def __init__(self, date_col, key_cols, sampling_expr=None,
|
def __init__(self, date_col=None, order_by=(), sampling_expr=None,
|
||||||
index_granularity=8192, replica_table_path=None, replica_name=None):
|
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None):
|
||||||
assert type(key_cols) in (list, tuple), 'key_cols must be a list or tuple'
|
assert type(order_by) in (list, tuple), 'order_by must be a list or tuple'
|
||||||
|
assert date_col is None or isinstance(date_col, six.string_types), 'date_col must be string if present'
|
||||||
|
assert partition_key is None or type(partition_key) in (list, tuple),\
|
||||||
|
'partition_key must be tuple or list if present'
|
||||||
|
|
||||||
|
# These values conflict with each other (old and new syntax of table engines.
|
||||||
|
# So let's control only one of them is given.
|
||||||
|
assert date_col or partition_key, "You must set either date_col or partition_key"
|
||||||
self.date_col = date_col
|
self.date_col = date_col
|
||||||
self.key_cols = key_cols
|
self.partition_key = partition_key if partition_key else ('toYYYYMM(`%s`)' % date_col,)
|
||||||
|
|
||||||
|
self.order_by = order_by
|
||||||
self.sampling_expr = sampling_expr
|
self.sampling_expr = sampling_expr
|
||||||
self.index_granularity = index_granularity
|
self.index_granularity = index_granularity
|
||||||
self.replica_table_path = replica_table_path
|
self.replica_table_path = replica_table_path
|
||||||
self.replica_name = replica_name
|
self.replica_name = replica_name
|
||||||
# TODO verify that both replica fields are either present or missing
|
# TODO verify that both replica fields are either present or missing
|
||||||
|
|
||||||
def create_table_sql(self):
|
# I changed field name for new reality and syntax
|
||||||
|
@property
|
||||||
|
def key_cols(self):
|
||||||
|
logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead')
|
||||||
|
return self.order_by
|
||||||
|
|
||||||
|
@key_cols.setter
|
||||||
|
def key_cols(self, value):
|
||||||
|
logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead')
|
||||||
|
self.order_by = value
|
||||||
|
|
||||||
|
def create_table_sql(self, db):
|
||||||
name = self.__class__.__name__
|
name = self.__class__.__name__
|
||||||
if self.replica_name:
|
if self.replica_name:
|
||||||
name = 'Replicated' + name
|
name = 'Replicated' + name
|
||||||
params = self._build_sql_params()
|
|
||||||
return '%s(%s)' % (name, comma_join(params))
|
|
||||||
|
|
||||||
def _build_sql_params(self):
|
# In ClickHouse 1.1.54310 custom partitioning key was introduced
|
||||||
|
# https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/
|
||||||
|
# Let's check version and use new syntax if available
|
||||||
|
if db.server_version >= (1, 1, 54310):
|
||||||
|
partition_sql = "PARTITION BY %s ORDER BY %s" \
|
||||||
|
% ('(%s)' % comma_join(self.partition_key), '(%s)' % comma_join(self.order_by))
|
||||||
|
|
||||||
|
if self.sampling_expr:
|
||||||
|
partition_sql += " SAMPLE BY %s" % self.sampling_expr
|
||||||
|
|
||||||
|
partition_sql += " SETTINGS index_granularity=%d" % self.index_granularity
|
||||||
|
|
||||||
|
elif not self.date_col:
|
||||||
|
# Can't import it globally due to circular import
|
||||||
|
from infi.clickhouse_orm.database import DatabaseException
|
||||||
|
raise DatabaseException("Custom partitioning is not supported before ClickHouse 1.1.54310. "
|
||||||
|
"Please update your server or use date_col syntax."
|
||||||
|
"https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/")
|
||||||
|
else:
|
||||||
|
partition_sql = ''
|
||||||
|
|
||||||
|
params = self._build_sql_params(db)
|
||||||
|
return '%s(%s) %s' % (name, comma_join(params), partition_sql)
|
||||||
|
|
||||||
|
def _build_sql_params(self, db):
|
||||||
params = []
|
params = []
|
||||||
if self.replica_name:
|
if self.replica_name:
|
||||||
params += ["'%s'" % self.replica_table_path, "'%s'" % self.replica_name]
|
params += ["'%s'" % self.replica_table_path, "'%s'" % self.replica_name]
|
||||||
params.append(self.date_col)
|
|
||||||
if self.sampling_expr:
|
# In ClickHouse 1.1.54310 custom partitioning key was introduced
|
||||||
params.append(self.sampling_expr)
|
# https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/
|
||||||
params.append('(%s)' % comma_join(self.key_cols))
|
# These parameters are process in create_table_sql directly.
|
||||||
params.append(str(self.index_granularity))
|
# In previous ClickHouse versions this this syntax does not work.
|
||||||
|
if db.server_version < (1, 1, 54310):
|
||||||
|
params.append(self.date_col)
|
||||||
|
if self.sampling_expr:
|
||||||
|
params.append(self.sampling_expr)
|
||||||
|
params.append('(%s)' % comma_join(self.order_by))
|
||||||
|
params.append(str(self.index_granularity))
|
||||||
|
|
||||||
return params
|
return params
|
||||||
|
|
||||||
|
|
||||||
class CollapsingMergeTree(MergeTree):
|
class CollapsingMergeTree(MergeTree):
|
||||||
|
|
||||||
def __init__(self, date_col, key_cols, sign_col, sampling_expr=None,
|
def __init__(self, date_col, order_by, sign_col, sampling_expr=None,
|
||||||
index_granularity=8192, replica_table_path=None, replica_name=None):
|
index_granularity=8192, replica_table_path=None, replica_name=None):
|
||||||
super(CollapsingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name)
|
super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name)
|
||||||
self.sign_col = sign_col
|
self.sign_col = sign_col
|
||||||
|
|
||||||
def _build_sql_params(self):
|
def _build_sql_params(self, db):
|
||||||
params = super(CollapsingMergeTree, self)._build_sql_params()
|
params = super(CollapsingMergeTree, self)._build_sql_params(db)
|
||||||
params.append(self.sign_col)
|
params.append(self.sign_col)
|
||||||
return params
|
return params
|
||||||
|
|
||||||
|
|
||||||
class SummingMergeTree(MergeTree):
|
class SummingMergeTree(MergeTree):
|
||||||
|
|
||||||
def __init__(self, date_col, key_cols, summing_cols=None, sampling_expr=None,
|
def __init__(self, date_col, order_by, summing_cols=None, sampling_expr=None,
|
||||||
index_granularity=8192, replica_table_path=None, replica_name=None):
|
index_granularity=8192, replica_table_path=None, replica_name=None):
|
||||||
super(SummingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name)
|
super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name)
|
||||||
assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple'
|
assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple'
|
||||||
self.summing_cols = summing_cols
|
self.summing_cols = summing_cols
|
||||||
|
|
||||||
def _build_sql_params(self):
|
def _build_sql_params(self, db):
|
||||||
params = super(SummingMergeTree, self)._build_sql_params()
|
params = super(SummingMergeTree, self)._build_sql_params(db)
|
||||||
if self.summing_cols:
|
if self.summing_cols:
|
||||||
params.append('(%s)' % comma_join(self.summing_cols))
|
params.append('(%s)' % comma_join(self.summing_cols))
|
||||||
return params
|
return params
|
||||||
|
@ -91,13 +143,13 @@ class SummingMergeTree(MergeTree):
|
||||||
|
|
||||||
class ReplacingMergeTree(MergeTree):
|
class ReplacingMergeTree(MergeTree):
|
||||||
|
|
||||||
def __init__(self, date_col, key_cols, ver_col=None, sampling_expr=None,
|
def __init__(self, date_col, order_by, ver_col=None, sampling_expr=None,
|
||||||
index_granularity=8192, replica_table_path=None, replica_name=None):
|
index_granularity=8192, replica_table_path=None, replica_name=None):
|
||||||
super(ReplacingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name)
|
super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name)
|
||||||
self.ver_col = ver_col
|
self.ver_col = ver_col
|
||||||
|
|
||||||
def _build_sql_params(self):
|
def _build_sql_params(self, db):
|
||||||
params = super(ReplacingMergeTree, self)._build_sql_params()
|
params = super(ReplacingMergeTree, self)._build_sql_params(db)
|
||||||
if self.ver_col:
|
if self.ver_col:
|
||||||
params.append(self.ver_col)
|
params.append(self.ver_col)
|
||||||
return params
|
return params
|
||||||
|
@ -121,11 +173,11 @@ class Buffer(Engine):
|
||||||
self.min_bytes = min_bytes
|
self.min_bytes = min_bytes
|
||||||
self.max_bytes = max_bytes
|
self.max_bytes = max_bytes
|
||||||
|
|
||||||
def create_table_sql(self, db_name):
|
def create_table_sql(self, db):
|
||||||
# Overriden create_table_sql example:
|
# Overriden create_table_sql example:
|
||||||
#sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)'
|
# sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)'
|
||||||
sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % (
|
sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % (
|
||||||
db_name, self.main_model.table_name(), self.num_layers,
|
db.db_name, self.main_model.table_name(), self.num_layers,
|
||||||
self.min_time, self.max_time, self.min_rows,
|
self.min_time, self.max_time, self.min_rows,
|
||||||
self.max_rows, self.min_bytes, self.max_bytes
|
self.max_rows, self.min_bytes, self.max_bytes
|
||||||
)
|
)
|
||||||
|
@ -142,19 +194,10 @@ class Merge(Engine):
|
||||||
|
|
||||||
def __init__(self, table_regex):
|
def __init__(self, table_regex):
|
||||||
assert isinstance(table_regex, six.string_types), "'table_regex' parameter must be string"
|
assert isinstance(table_regex, six.string_types), "'table_regex' parameter must be string"
|
||||||
|
|
||||||
self.table_regex = table_regex
|
self.table_regex = table_regex
|
||||||
|
|
||||||
# Use current database as default
|
def create_table_sql(self, db):
|
||||||
self.db_name = None
|
self.db_name = None + return "Merge(`%s`, '%s')" % (db.db_name, self.table_regex)
|
||||||
|
|
||||||
def create_table_sql(self):
|
|
||||||
db_name = ("`%s`" % self.db_name) if self.db_name else 'currentDatabase()'
|
|
||||||
return "Merge(%s, '%s')" % (db_name, self.table_regex)
|
|
||||||
|
|
||||||
def set_db_name(self, db_name):
|
|
||||||
assert isinstance(db_name, six.string_types), "'db_name' parameter must be string"
|
|
||||||
self.db_name = db_name
|
|
||||||
|
|
||||||
|
|
||||||
class Distributed(Engine):
|
class Distributed(Engine):
|
||||||
|
|
|
@ -173,25 +173,25 @@ class Model(with_metaclass(ModelBase)):
|
||||||
return cls.__name__.lower()
|
return cls.__name__.lower()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create_table_sql(cls, db_name):
|
def create_table_sql(cls, db):
|
||||||
'''
|
'''
|
||||||
Returns the SQL command for creating a table for this model.
|
Returns the SQL command for creating a table for this model.
|
||||||
'''
|
'''
|
||||||
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db_name, cls.table_name())]
|
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())]
|
||||||
cols = []
|
cols = []
|
||||||
for name, field in iteritems(cls.fields()):
|
for name, field in iteritems(cls.fields()):
|
||||||
cols.append(' %s %s' % (name, field.get_sql()))
|
cols.append(' %s %s' % (name, field.get_sql()))
|
||||||
parts.append(',\n'.join(cols))
|
parts.append(',\n'.join(cols))
|
||||||
parts.append(')')
|
parts.append(')')
|
||||||
parts.append('ENGINE = ' + cls.engine.create_table_sql())
|
parts.append('ENGINE = ' + cls.engine.create_table_sql(db))
|
||||||
return '\n'.join(parts)
|
return '\n'.join(parts)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def drop_table_sql(cls, db_name):
|
def drop_table_sql(cls, db):
|
||||||
'''
|
'''
|
||||||
Returns the SQL command for deleting this model's table.
|
Returns the SQL command for deleting this model's table.
|
||||||
'''
|
'''
|
||||||
return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db_name, cls.table_name())
|
return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db.db_name, cls.table_name())
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None):
|
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None):
|
||||||
|
@ -265,12 +265,13 @@ class Model(with_metaclass(ModelBase)):
|
||||||
class BufferModel(Model):
|
class BufferModel(Model):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create_table_sql(cls, db_name):
|
def create_table_sql(cls, db):
|
||||||
'''
|
'''
|
||||||
Returns the SQL command for creating a table for this model.
|
Returns the SQL command for creating a table for this model.
|
||||||
'''
|
'''
|
||||||
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.engine.main_model.table_name())]
|
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db.db_name, cls.table_name(), db.db_name,
|
||||||
engine_str = cls.engine.create_table_sql(db_name)
|
cls.engine.main_model.table_name())]
|
||||||
|
engine_str = cls.engine.create_table_sql(db)
|
||||||
parts.append(engine_str)
|
parts.append(engine_str)
|
||||||
return ' '.join(parts)
|
return ' '.join(parts)
|
||||||
|
|
||||||
|
@ -286,21 +287,10 @@ class MergeModel(Model):
|
||||||
# Virtual fields can't be inserted into database
|
# Virtual fields can't be inserted into database
|
||||||
_table = StringField(readonly=True)
|
_table = StringField(readonly=True)
|
||||||
|
|
||||||
def set_database(self, db):
|
|
||||||
'''
|
|
||||||
Gets the `Database` that this model instance belongs to.
|
|
||||||
Returns `None` unless the instance was read from the database or written to it.
|
|
||||||
'''
|
|
||||||
assert isinstance(self.engine, Merge), "engine must be engines.Merge instance"
|
|
||||||
res = super(MergeModel, self).set_database(db)
|
|
||||||
self.engine.set_db_name(db.db_name)
|
|
||||||
return res
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create_table_sql(cls, db_name):
|
def create_table_sql(cls, db):
|
||||||
assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance"
|
assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance"
|
||||||
cls.engine.set_db_name(db_name)
|
return super(MergeModel, cls).create_table_sql(db)
|
||||||
return super(MergeModel, cls).create_table_sql(db_name)
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: base class for models that require specific engine
|
# TODO: base class for models that require specific engine
|
||||||
|
@ -380,3 +370,5 @@ class DistributedModel(Model):
|
||||||
db_name, cls.table_name(), cls.engine.table_name),
|
db_name, cls.table_name(), cls.engine.table_name),
|
||||||
'ENGINE = ' + cls.engine.create_table_sql()]
|
'ENGINE = ' + cls.engine.create_table_sql()]
|
||||||
return '\n'.join(parts)
|
return '\n'.join(parts)
|
||||||
|
cls.engine.set_db_name(db_name)
|
||||||
|
return super(MergeModel, cls).create_table_sql(db_name)
|
||||||
|
|
|
@ -68,7 +68,8 @@ class SystemPart(Model):
|
||||||
"""
|
"""
|
||||||
operation = operation.upper()
|
operation = operation.upper()
|
||||||
assert operation in self.OPERATIONS, "operation must be in [%s]" % comma_join(self.OPERATIONS)
|
assert operation in self.OPERATIONS, "operation must be in [%s]" % comma_join(self.OPERATIONS)
|
||||||
sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (self._database.db_name, self.table, operation, self.partition)
|
|
||||||
|
sql = "ALTER TABLE `%s`.`%s` %s PARTITION %s" % (self._database.db_name, self.table, operation, self.partition)
|
||||||
if from_part is not None:
|
if from_part is not None:
|
||||||
sql += " FROM %s" % from_part
|
sql += " FROM %s" % from_part
|
||||||
self._database.raw(sql, settings=settings, stream=False)
|
self._database.raw(sql, settings=settings, stream=False)
|
||||||
|
|
|
@ -1,8 +1,18 @@
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
from infi.clickhouse_orm.database import Database, DatabaseException, ServerError
|
from infi.clickhouse_orm.database import Database, DatabaseException, ServerError
|
||||||
from infi.clickhouse_orm.models import Model, MergeModel, DistributedModel
|
from infi.clickhouse_orm.models import Model, MergeModel, DistributedModel
|
||||||
|
||||||| merged common ancestors
|
||||||
|
from infi.clickhouse_orm.database import Database, DatabaseException
|
||||||
|
from infi.clickhouse_orm.models import Model, MergeModel
|
||||||
|
=======
|
||||||
|
from infi.clickhouse_orm.system_models import SystemPart
|
||||||
|
|
||||||
|
from infi.clickhouse_orm.database import Database, DatabaseException
|
||||||
|
from infi.clickhouse_orm.models import Model, MergeModel
|
||||||
|
>>>>>>> 7fb05896926acab163a1f373092bf22cc0f3cb4f
|
||||||
from infi.clickhouse_orm.fields import *
|
from infi.clickhouse_orm.fields import *
|
||||||
from infi.clickhouse_orm.engines import *
|
from infi.clickhouse_orm.engines import *
|
||||||
|
|
||||||
|
@ -43,8 +53,12 @@ class EnginesTestCase(_EnginesHelperTestCase):
|
||||||
|
|
||||||
def test_replicated_merge_tree(self):
|
def test_replicated_merge_tree(self):
|
||||||
engine = MergeTree('date', ('date', 'event_id', 'event_group'), replica_table_path='/clickhouse/tables/{layer}-{shard}/hits', replica_name='{replica}')
|
engine = MergeTree('date', ('date', 'event_id', 'event_group'), replica_table_path='/clickhouse/tables/{layer}-{shard}/hits', replica_name='{replica}')
|
||||||
expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)"
|
# In ClickHouse 1.1.54310 custom partitioning key was introduced and new syntax is used
|
||||||
self.assertEquals(engine.create_table_sql(), expected)
|
if self.database.server_version >= (1, 1, 54310):
|
||||||
|
expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}') PARTITION BY (toYYYYMM(`date`)) ORDER BY (date, event_id, event_group) SETTINGS index_granularity=8192"
|
||||||
|
else:
|
||||||
|
expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)"
|
||||||
|
self.assertEquals(engine.create_table_sql(self.database), expected)
|
||||||
|
|
||||||
def test_collapsing_merge_tree(self):
|
def test_collapsing_merge_tree(self):
|
||||||
class TestModel(SampleModel):
|
class TestModel(SampleModel):
|
||||||
|
@ -126,6 +140,17 @@ class EnginesTestCase(_EnginesHelperTestCase):
|
||||||
'event_uversion': 2
|
'event_uversion': 2
|
||||||
}, res[1].to_dict(include_readonly=True))
|
}, res[1].to_dict(include_readonly=True))
|
||||||
|
|
||||||
|
def test_custom_partitioning(self):
|
||||||
|
class TestModel(SampleModel):
|
||||||
|
engine = MergeTree(
|
||||||
|
order_by=('date', 'event_id', 'event_group'),
|
||||||
|
partition_key=('toYYYYMM(date)', 'event_group')
|
||||||
|
)
|
||||||
|
self._create_and_insert(TestModel)
|
||||||
|
parts = list(SystemPart.get(self.database))
|
||||||
|
self.assertEqual(1, len(parts))
|
||||||
|
self.assertEqual('(201701, 13)', parts[0].partition)
|
||||||
|
|
||||||
|
|
||||||
class SampleModel(Model):
|
class SampleModel(Model):
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ import unittest
|
||||||
import datetime
|
import datetime
|
||||||
import pytz
|
import pytz
|
||||||
|
|
||||||
|
from infi.clickhouse_orm.database import Database
|
||||||
from infi.clickhouse_orm.models import Model
|
from infi.clickhouse_orm.models import Model
|
||||||
from infi.clickhouse_orm.fields import *
|
from infi.clickhouse_orm.fields import *
|
||||||
from infi.clickhouse_orm.engines import *
|
from infi.clickhouse_orm.engines import *
|
||||||
|
@ -19,9 +20,10 @@ class InheritanceTestCase(unittest.TestCase):
|
||||||
self.assertFieldNames(Model2, ['date_field', 'int_field', 'float_field'])
|
self.assertFieldNames(Model2, ['date_field', 'int_field', 'float_field'])
|
||||||
|
|
||||||
def test_create_table_sql(self):
|
def test_create_table_sql(self):
|
||||||
sql1 = ParentModel.create_table_sql('default')
|
default_db = Database('default')
|
||||||
sql2 = Model1.create_table_sql('default')
|
sql1 = ParentModel.create_table_sql(default_db)
|
||||||
sql3 = Model2.create_table_sql('default')
|
sql2 = Model1.create_table_sql(default_db)
|
||||||
|
sql3 = Model2.create_table_sql(default_db)
|
||||||
self.assertNotEqual(sql1, sql2)
|
self.assertNotEqual(sql1, sql2)
|
||||||
self.assertNotEqual(sql1, sql3)
|
self.assertNotEqual(sql1, sql3)
|
||||||
self.assertNotEqual(sql2, sql3)
|
self.assertNotEqual(sql2, sql3)
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from datetime import date
|
from datetime import date
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from infi.clickhouse_orm.database import Database, DatabaseException
|
from infi.clickhouse_orm.database import Database, DatabaseException
|
||||||
from infi.clickhouse_orm.engines import *
|
from infi.clickhouse_orm.engines import *
|
||||||
from infi.clickhouse_orm.fields import *
|
from infi.clickhouse_orm.fields import *
|
||||||
|
@ -37,7 +40,9 @@ class SystemPartTest(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.database = Database('test-db')
|
self.database = Database('test-db')
|
||||||
self.database.create_table(TestTable)
|
self.database.create_table(TestTable)
|
||||||
|
self.database.create_table(CustomPartitionedTable)
|
||||||
self.database.insert([TestTable(date_field=date.today())])
|
self.database.insert([TestTable(date_field=date.today())])
|
||||||
|
self.database.insert([CustomPartitionedTable(date_field=date.today(), group_field=13)])
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.database.drop_database()
|
self.database.drop_database()
|
||||||
|
@ -51,40 +56,46 @@ class SystemPartTest(unittest.TestCase):
|
||||||
|
|
||||||
def test_get_all(self):
|
def test_get_all(self):
|
||||||
parts = SystemPart.get(self.database)
|
parts = SystemPart.get(self.database)
|
||||||
self.assertEqual(len(list(parts)), 1)
|
self.assertEqual(len(list(parts)), 2)
|
||||||
|
|
||||||
def test_get_active(self):
|
def test_get_active(self):
|
||||||
parts = list(SystemPart.get_active(self.database))
|
parts = list(SystemPart.get_active(self.database))
|
||||||
self.assertEqual(len(parts), 1)
|
self.assertEqual(len(parts), 2)
|
||||||
parts[0].detach()
|
parts[0].detach()
|
||||||
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
|
self.assertEqual(len(list(SystemPart.get_active(self.database))), 1)
|
||||||
|
|
||||||
def test_get_conditions(self):
|
def test_get_conditions(self):
|
||||||
parts = list(SystemPart.get(self.database, conditions="table='testtable'"))
|
parts = list(SystemPart.get(self.database, conditions="table='testtable'"))
|
||||||
self.assertEqual(len(parts), 1)
|
self.assertEqual(len(parts), 1)
|
||||||
parts = list(SystemPart.get(self.database, conditions=u"table='othertable'"))
|
parts = list(SystemPart.get(self.database, conditions=u"table='custompartitionedtable'"))
|
||||||
|
self.assertEqual(len(parts), 1)
|
||||||
|
parts = list(SystemPart.get(self.database, conditions=u"table='invalidtable'"))
|
||||||
self.assertEqual(len(parts), 0)
|
self.assertEqual(len(parts), 0)
|
||||||
|
|
||||||
def test_attach_detach(self):
|
def test_attach_detach(self):
|
||||||
parts = list(SystemPart.get_active(self.database))
|
parts = list(SystemPart.get_active(self.database))
|
||||||
self.assertEqual(len(parts), 1)
|
self.assertEqual(len(parts), 2)
|
||||||
parts[0].detach()
|
for p in parts:
|
||||||
|
p.detach()
|
||||||
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
|
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
|
||||||
parts[0].attach()
|
for p in parts:
|
||||||
self.assertEqual(len(list(SystemPart.get_active(self.database))), 1)
|
p.attach()
|
||||||
|
self.assertEqual(len(list(SystemPart.get_active(self.database))), 2)
|
||||||
|
|
||||||
def test_drop(self):
|
def test_drop(self):
|
||||||
parts = list(SystemPart.get_active(self.database))
|
parts = list(SystemPart.get_active(self.database))
|
||||||
parts[0].drop()
|
for p in parts:
|
||||||
|
p.drop()
|
||||||
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
|
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
|
||||||
|
|
||||||
def test_freeze(self):
|
def test_freeze(self):
|
||||||
parts = list(SystemPart.get(self.database))
|
parts = list(SystemPart.get(self.database))
|
||||||
# There can be other backups in the folder
|
# There can be other backups in the folder
|
||||||
prev_backups = set(self._get_backups())
|
prev_backups = set(self._get_backups())
|
||||||
parts[0].freeze()
|
for p in parts:
|
||||||
|
p.freeze()
|
||||||
backups = set(self._get_backups())
|
backups = set(self._get_backups())
|
||||||
self.assertEqual(len(backups), len(prev_backups) + 1)
|
self.assertEqual(len(backups), len(prev_backups) + 2)
|
||||||
|
|
||||||
def test_fetch(self):
|
def test_fetch(self):
|
||||||
# TODO Not tested, as I have no replication set
|
# TODO Not tested, as I have no replication set
|
||||||
|
@ -97,5 +108,12 @@ class TestTable(Model):
|
||||||
engine = MergeTree('date_field', ('date_field',))
|
engine = MergeTree('date_field', ('date_field',))
|
||||||
|
|
||||||
|
|
||||||
|
class CustomPartitionedTable(Model):
|
||||||
|
date_field = DateField()
|
||||||
|
group_field = UInt32Field()
|
||||||
|
|
||||||
|
engine = MergeTree(order_by=('date_field', 'group_field'), partition_key=('toYYYYMM(date_field)', 'group_field'))
|
||||||
|
|
||||||
|
|
||||||
class SystemTestModel(Model):
|
class SystemTestModel(Model):
|
||||||
system = True
|
system = True
|
||||||
|
|
Loading…
Reference in New Issue
Block a user