2. Added attribute server_version to Database class
3. Changed Engine.create_table_sql(), Engine.drop_table_sql(), Model.create_table_sql(), Model.drop_table_sql()  parameter to db from db_name
This commit is contained in:
M1ha 2018-04-12 14:21:46 +05:00
parent c023ad407d
commit 7fb0589692
11 changed files with 199 additions and 92 deletions

View File

@ -24,6 +24,18 @@ created on the ClickHouse server if it does not already exist.
- `autocreate`: automatically create the database if does not exist (unless in readonly mode).
#### server_timezone
Contains [pytz](http://pytz.sourceforge.net/) timezone used on database server
#### server_version
Contains a version tuple of database server, for example (1, 1, 54310)
#### count(model_class, conditions=None)
@ -144,13 +156,13 @@ invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`.
#### Model.create_table_sql(db_name)
#### Model.create_table_sql(db)
Returns the SQL command for creating a table for this model.
#### Model.drop_table_sql(db_name)
#### Model.drop_table_sql(db)
Returns the SQL command for deleting this model's table.
@ -233,13 +245,13 @@ invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`.
#### BufferModel.create_table_sql(db_name)
#### BufferModel.create_table_sql(db)
Returns the SQL command for creating a table for this model.
#### BufferModel.drop_table_sql(db_name)
#### BufferModel.drop_table_sql(db)
Returns the SQL command for deleting this model's table.
@ -497,7 +509,7 @@ Extends Engine
Extends Engine
#### MergeTree(date_col, key_cols, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None)
#### MergeTree(date_col=None, order_by=(), sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None)
### Buffer

View File

@ -119,12 +119,12 @@ invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`.
#### Model.create_table_sql(db_name)
#### Model.create_table_sql(db)
Returns the SQL command for creating a table for this model.
#### Model.drop_table_sql(db_name)
#### Model.drop_table_sql(db)
Returns the SQL command for deleting this model's table.
@ -197,12 +197,12 @@ invalid values will cause a `ValueError` to be raised.
Unrecognized field names will cause an `AttributeError`.
#### BufferModel.create_table_sql(db_name)
#### BufferModel.create_table_sql(db)
Returns the SQL command for creating a table for this model.
#### BufferModel.drop_table_sql(db_name)
#### BufferModel.drop_table_sql(db)
Returns the SQL command for deleting this model's table.

View File

@ -54,6 +54,23 @@ For a `ReplacingMergeTree` you can optionally specify the version column:
engine = engines.ReplacingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), ver_col='Version')
### Custom partitioning
ClickHouse supports [custom partitioning](https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/) expressions since version 1.1.54310
You can use custom partitioning with any MergeTree family engine.
To set custom partitioning:
* skip date_col (first) constructor parameter or fill it with None value
* add name to order_by (second) constructor parameter
* add partition_key parameter. It should be a tuple of expressions, by which partition are built.
Standard partitioning by date column can be added using toYYYYMM(date) function.
Example:
engine = engines.ReplacingMergeTree(order_by=('OrderID', 'EventDate', 'BannerID'), ver_col='Version',
partition_key=('toYYYYMM(EventDate)', 'BannerID'))
### Data Replication
Any of the above engines can be converted to a replicated engine (e.g. `ReplicatedMergeTree`) by adding two parameters, `replica_table_path` and `replica_name`:

View File

@ -56,6 +56,7 @@ class Database(object):
self.db_exists = False
self.create_database()
self.server_timezone = self._get_server_timezone()
self.server_version = self._get_server_version()
def create_database(self):
'''
@ -77,7 +78,7 @@ class Database(object):
# TODO check that model has an engine
if model_class.system:
raise DatabaseException("You can't create system table")
self._send(model_class.create_table_sql(self.db_name))
self._send(model_class.create_table_sql(self))
def drop_table(self, model_class):
'''
@ -85,7 +86,7 @@ class Database(object):
'''
if model_class.system:
raise DatabaseException("You can't drop system table")
self._send(model_class.drop_table_sql(self.db_name))
self._send(model_class.drop_table_sql(self))
def insert(self, model_instances, batch_size=1000):
'''
@ -285,6 +286,11 @@ class Database(object):
logger.exception('Cannot determine server timezone, assuming UTC')
return pytz.utc
def _get_server_version(self, as_tuple=True):
r = self._send('SELECT version();')
ver = r.text
return tuple(int(n) for n in ver.split('.')) if as_tuple else ver
def _is_connection_readonly(self):
r = self._send("SELECT value FROM system.settings WHERE name = 'readonly'")
return r.text.strip() != '0'

View File

@ -1,89 +1,141 @@
from __future__ import unicode_literals
import logging
import six
from .utils import comma_join
logger = logging.getLogger('clickhouse_orm')
class Engine(object):
def create_table_sql(self):
def create_table_sql(self, db):
raise NotImplementedError() # pragma: no cover
class TinyLog(Engine):
def create_table_sql(self):
def create_table_sql(self, db):
return 'TinyLog'
class Log(Engine):
def create_table_sql(self):
def create_table_sql(self, db):
return 'Log'
class Memory(Engine):
def create_table_sql(self):
def create_table_sql(self, db):
return 'Memory'
class MergeTree(Engine):
def __init__(self, date_col, key_cols, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None):
assert type(key_cols) in (list, tuple), 'key_cols must be a list or tuple'
def __init__(self, date_col=None, order_by=(), sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None, partition_key=None):
assert type(order_by) in (list, tuple), 'order_by must be a list or tuple'
assert date_col is None or isinstance(date_col, six.string_types), 'date_col must be string if present'
assert partition_key is None or type(partition_key) in (list, tuple),\
'partition_key must be tuple or list if present'
# These values conflict with each other (old and new syntax of table engines.
# So let's control only one of them is given.
assert date_col or partition_key, "You must set either date_col or partition_key"
self.date_col = date_col
self.key_cols = key_cols
self.partition_key = partition_key if partition_key else ('toYYYYMM(`%s`)' % date_col,)
self.order_by = order_by
self.sampling_expr = sampling_expr
self.index_granularity = index_granularity
self.replica_table_path = replica_table_path
self.replica_name = replica_name
# TODO verify that both replica fields are either present or missing
def create_table_sql(self):
# I changed field name for new reality and syntax
@property
def key_cols(self):
logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead')
return self.order_by
@key_cols.setter
def key_cols(self, value):
logger.warning('`key_cols` attribute is deprecated and may be removed in future. Use `order_by` attribute instead')
self.order_by = value
def create_table_sql(self, db):
name = self.__class__.__name__
if self.replica_name:
name = 'Replicated' + name
params = self._build_sql_params()
return '%s(%s)' % (name, comma_join(params))
def _build_sql_params(self):
# In ClickHouse 1.1.54310 custom partitioning key was introduced
# https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/
# Let's check version and use new syntax if available
if db.server_version >= (1, 1, 54310):
partition_sql = "PARTITION BY %s ORDER BY %s" \
% ('(%s)' % comma_join(self.partition_key), '(%s)' % comma_join(self.order_by))
if self.sampling_expr:
partition_sql += " SAMPLE BY %s" % self.sampling_expr
partition_sql += " SETTINGS index_granularity=%d" % self.index_granularity
elif not self.date_col:
# Can't import it globally due to circular import
from infi.clickhouse_orm.database import DatabaseException
raise DatabaseException("Custom partitioning is not supported before ClickHouse 1.1.54310. "
"Please update your server or use date_col syntax."
"https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/")
else:
partition_sql = ''
params = self._build_sql_params(db)
return '%s(%s) %s' % (name, comma_join(params), partition_sql)
def _build_sql_params(self, db):
params = []
if self.replica_name:
params += ["'%s'" % self.replica_table_path, "'%s'" % self.replica_name]
# In ClickHouse 1.1.54310 custom partitioning key was introduced
# https://clickhouse.yandex/docs/en/table_engines/custom_partitioning_key/
# These parameters are process in create_table_sql directly.
# In previous ClickHouse versions this this syntax does not work.
if db.server_version < (1, 1, 54310):
params.append(self.date_col)
if self.sampling_expr:
params.append(self.sampling_expr)
params.append('(%s)' % comma_join(self.key_cols))
params.append('(%s)' % comma_join(self.order_by))
params.append(str(self.index_granularity))
return params
class CollapsingMergeTree(MergeTree):
def __init__(self, date_col, key_cols, sign_col, sampling_expr=None,
def __init__(self, date_col, order_by, sign_col, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None):
super(CollapsingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name)
super(CollapsingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name)
self.sign_col = sign_col
def _build_sql_params(self):
params = super(CollapsingMergeTree, self)._build_sql_params()
def _build_sql_params(self, db):
params = super(CollapsingMergeTree, self)._build_sql_params(db)
params.append(self.sign_col)
return params
class SummingMergeTree(MergeTree):
def __init__(self, date_col, key_cols, summing_cols=None, sampling_expr=None,
def __init__(self, date_col, order_by, summing_cols=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None):
super(SummingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name)
super(SummingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name)
assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple'
self.summing_cols = summing_cols
def _build_sql_params(self):
params = super(SummingMergeTree, self)._build_sql_params()
def _build_sql_params(self, db):
params = super(SummingMergeTree, self)._build_sql_params(db)
if self.summing_cols:
params.append('(%s)' % comma_join(self.summing_cols))
return params
@ -91,13 +143,13 @@ class SummingMergeTree(MergeTree):
class ReplacingMergeTree(MergeTree):
def __init__(self, date_col, key_cols, ver_col=None, sampling_expr=None,
def __init__(self, date_col, order_by, ver_col=None, sampling_expr=None,
index_granularity=8192, replica_table_path=None, replica_name=None):
super(ReplacingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name)
super(ReplacingMergeTree, self).__init__(date_col, order_by, sampling_expr, index_granularity, replica_table_path, replica_name)
self.ver_col = ver_col
def _build_sql_params(self):
params = super(ReplacingMergeTree, self)._build_sql_params()
def _build_sql_params(self, db):
params = super(ReplacingMergeTree, self)._build_sql_params(db)
if self.ver_col:
params.append(self.ver_col)
return params
@ -121,11 +173,11 @@ class Buffer(Engine):
self.min_bytes = min_bytes
self.max_bytes = max_bytes
def create_table_sql(self, db_name):
def create_table_sql(self, db):
# Overriden create_table_sql example:
# sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)'
sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % (
db_name, self.main_model.table_name(), self.num_layers,
db.db_name, self.main_model.table_name(), self.num_layers,
self.min_time, self.max_time, self.min_rows,
self.max_rows, self.min_bytes, self.max_bytes
)
@ -145,13 +197,5 @@ class Merge(Engine):
self.table_regex = table_regex
# Use current database as default
self.db_name = None
def create_table_sql(self):
db_name = ("`%s`" % self.db_name) if self.db_name else 'currentDatabase()'
return "Merge(%s, '%s')" % (db_name, self.table_regex)
def set_db_name(self, db_name):
assert isinstance(db_name, six.string_types), "'db_name' parameter must be string"
self.db_name = db_name
def create_table_sql(self, db):
return "Merge(`%s`, '%s')" % (db.db_name, self.table_regex)

View File

@ -168,25 +168,25 @@ class Model(with_metaclass(ModelBase)):
return cls.__name__.lower()
@classmethod
def create_table_sql(cls, db_name):
def create_table_sql(cls, db):
'''
Returns the SQL command for creating a table for this model.
'''
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db_name, cls.table_name())]
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())]
cols = []
for name, field in cls._fields:
cols.append(' %s %s' % (name, field.get_sql()))
parts.append(',\n'.join(cols))
parts.append(')')
parts.append('ENGINE = ' + cls.engine.create_table_sql())
parts.append('ENGINE = ' + cls.engine.create_table_sql(db))
return '\n'.join(parts)
@classmethod
def drop_table_sql(cls, db_name):
def drop_table_sql(cls, db):
'''
Returns the SQL command for deleting this model's table.
'''
return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db_name, cls.table_name())
return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db.db_name, cls.table_name())
@classmethod
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None):
@ -250,12 +250,13 @@ class Model(with_metaclass(ModelBase)):
class BufferModel(Model):
@classmethod
def create_table_sql(cls, db_name):
def create_table_sql(cls, db):
'''
Returns the SQL command for creating a table for this model.
'''
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.engine.main_model.table_name())]
engine_str = cls.engine.create_table_sql(db_name)
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db.db_name, cls.table_name(), db.db_name,
cls.engine.main_model.table_name())]
engine_str = cls.engine.create_table_sql(db)
parts.append(engine_str)
return ' '.join(parts)
@ -271,18 +272,7 @@ class MergeModel(Model):
# Virtual fields can't be inserted into database
_table = StringField(readonly=True)
def set_database(self, db):
'''
Gets the `Database` that this model instance belongs to.
Returns `None` unless the instance was read from the database or written to it.
'''
assert isinstance(self.engine, Merge), "engine must be engines.Merge instance"
res = super(MergeModel, self).set_database(db)
self.engine.set_db_name(db.db_name)
return res
@classmethod
def create_table_sql(cls, db_name):
def create_table_sql(cls, db):
assert isinstance(cls.engine, Merge), "engine must be engines.Merge instance"
cls.engine.set_db_name(db_name)
return super(MergeModel, cls).create_table_sql(db_name)
return super(MergeModel, cls).create_table_sql(db)

View File

@ -68,7 +68,8 @@ class SystemPart(Model):
"""
operation = operation.upper()
assert operation in self.OPERATIONS, "operation must be in [%s]" % comma_join(self.OPERATIONS)
sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (self._database.db_name, self.table, operation, self.partition)
sql = "ALTER TABLE `%s`.`%s` %s PARTITION %s" % (self._database.db_name, self.table, operation, self.partition)
if from_part is not None:
sql += " FROM %s" % from_part
self._database.raw(sql, settings=settings, stream=False)

View File

@ -135,8 +135,8 @@ class DatabaseTestCase(TestCaseWithData):
Database(self.database.db_name, username='default', password='wrong')
def test_nonexisting_db(self):
db = Database('db_not_here', autocreate=False)
with self.assertRaises(DatabaseException):
db = Database('db_not_here', autocreate=False)
db.create_table(Person)
def test_preexisting_db(self):

View File

@ -1,6 +1,8 @@
from __future__ import unicode_literals
import unittest
from infi.clickhouse_orm.system_models import SystemPart
from infi.clickhouse_orm.database import Database, DatabaseException
from infi.clickhouse_orm.models import Model, MergeModel
from infi.clickhouse_orm.fields import *
@ -41,8 +43,12 @@ class EnginesTestCase(unittest.TestCase):
def test_replicated_merge_tree(self):
engine = MergeTree('date', ('date', 'event_id', 'event_group'), replica_table_path='/clickhouse/tables/{layer}-{shard}/hits', replica_name='{replica}')
# In ClickHouse 1.1.54310 custom partitioning key was introduced and new syntax is used
if self.database.server_version >= (1, 1, 54310):
expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}') PARTITION BY (toYYYYMM(`date`)) ORDER BY (date, event_id, event_group) SETTINGS index_granularity=8192"
else:
expected = "ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', date, (date, event_id, event_group), 8192)"
self.assertEquals(engine.create_table_sql(), expected)
self.assertEquals(engine.create_table_sql(self.database), expected)
def test_collapsing_merge_tree(self):
class TestModel(SampleModel):
@ -124,6 +130,17 @@ class EnginesTestCase(unittest.TestCase):
'event_uversion': 2
}, res[1].to_dict(include_readonly=True))
def test_custom_partitioning(self):
class TestModel(SampleModel):
engine = MergeTree(
order_by=('date', 'event_id', 'event_group'),
partition_key=('toYYYYMM(date)', 'event_group')
)
self._create_and_insert(TestModel)
parts = list(SystemPart.get(self.database))
self.assertEqual(1, len(parts))
self.assertEqual('(201701, 13)', parts[0].partition)
class SampleModel(Model):

View File

@ -3,6 +3,7 @@ import unittest
import datetime
import pytz
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.models import Model
from infi.clickhouse_orm.fields import *
from infi.clickhouse_orm.engines import *
@ -19,9 +20,10 @@ class InheritanceTestCase(unittest.TestCase):
self.assertFieldNames(Model2, ['date_field', 'int_field', 'float_field'])
def test_create_table_sql(self):
sql1 = ParentModel.create_table_sql('default')
sql2 = Model1.create_table_sql('default')
sql3 = Model2.create_table_sql('default')
default_db = Database('default')
sql1 = ParentModel.create_table_sql(default_db)
sql2 = Model1.create_table_sql(default_db)
sql3 = Model2.create_table_sql(default_db)
self.assertNotEqual(sql1, sql2)
self.assertNotEqual(sql1, sql3)
self.assertNotEqual(sql2, sql3)

View File

@ -1,7 +1,10 @@
from __future__ import unicode_literals
import unittest
from datetime import date
import os
from infi.clickhouse_orm.database import Database, DatabaseException
from infi.clickhouse_orm.engines import *
from infi.clickhouse_orm.fields import *
@ -37,7 +40,9 @@ class SystemPartTest(unittest.TestCase):
def setUp(self):
self.database = Database('test-db')
self.database.create_table(TestTable)
self.database.create_table(CustomPartitionedTable)
self.database.insert([TestTable(date_field=date.today())])
self.database.insert([CustomPartitionedTable(date_field=date.today(), group_field=13)])
def tearDown(self):
self.database.drop_database()
@ -51,40 +56,46 @@ class SystemPartTest(unittest.TestCase):
def test_get_all(self):
parts = SystemPart.get(self.database)
self.assertEqual(len(list(parts)), 1)
self.assertEqual(len(list(parts)), 2)
def test_get_active(self):
parts = list(SystemPart.get_active(self.database))
self.assertEqual(len(parts), 1)
self.assertEqual(len(parts), 2)
parts[0].detach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
self.assertEqual(len(list(SystemPart.get_active(self.database))), 1)
def test_get_conditions(self):
parts = list(SystemPart.get(self.database, conditions="table='testtable'"))
self.assertEqual(len(parts), 1)
parts = list(SystemPart.get(self.database, conditions=u"table='othertable'"))
parts = list(SystemPart.get(self.database, conditions=u"table='custompartitionedtable'"))
self.assertEqual(len(parts), 1)
parts = list(SystemPart.get(self.database, conditions=u"table='invalidtable'"))
self.assertEqual(len(parts), 0)
def test_attach_detach(self):
parts = list(SystemPart.get_active(self.database))
self.assertEqual(len(parts), 1)
parts[0].detach()
self.assertEqual(len(parts), 2)
for p in parts:
p.detach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
parts[0].attach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 1)
for p in parts:
p.attach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 2)
def test_drop(self):
parts = list(SystemPart.get_active(self.database))
parts[0].drop()
for p in parts:
p.drop()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
def test_freeze(self):
parts = list(SystemPart.get(self.database))
# There can be other backups in the folder
prev_backups = set(self._get_backups())
parts[0].freeze()
for p in parts:
p.freeze()
backups = set(self._get_backups())
self.assertEqual(len(backups), len(prev_backups) + 1)
self.assertEqual(len(backups), len(prev_backups) + 2)
def test_fetch(self):
# TODO Not tested, as I have no replication set
@ -97,5 +108,12 @@ class TestTable(Model):
engine = MergeTree('date_field', ('date_field',))
class CustomPartitionedTable(Model):
date_field = DateField()
group_field = UInt32Field()
engine = MergeTree(order_by=('date_field', 'group_field'), partition_key=('toYYYYMM(date_field)', 'group_field'))
class SystemTestModel(Model):
system = True