RAMEN-208 Support codec compression for clickhouse

This commit is contained in:
Roy Belio 2019-06-20 11:21:43 +03:00
parent 40725372fe
commit 4d2ebd65fb
8 changed files with 222 additions and 27 deletions

View File

@ -148,6 +148,45 @@ to `None`.
NOTE: `ArrayField` of `NullableField` is not supported. Also `EnumField` cannot be nullable. NOTE: `ArrayField` of `NullableField` is not supported. Also `EnumField` cannot be nullable.
Working with field compression codecs
-------------------------------------
Besides default data compression, defined in server settings, per-field specification is also available.
Supported compression algorithms:
| Codec | Argument | Comment
| -------------------- | -------------------------------------------| ----------------------------------------------------
| NONE | None | No compression.
| LZ4 | None | LZ4 compression.
| LZ4HC(`level`) | Possible `level` range: [3, 12]. | Default value: 9. Greater values stands for better compression and higher CPU usage. Recommended value range: [4,9].
| ZSTD(`level`) | Possible `level`range: [1, 22]. | Default value: 1. Greater values stands for better compression and higher CPU usage. Levels >= 20, should be used with caution, as they require more memory.
| Delta(`delta_bytes`) | Possible `delta_bytes` range: 1, 2, 4 , 8. | Default value for `delta_bytes` is `sizeof(type)` if it is equal to 1, 2,4 or 8 and equals to 1 otherwise.
Codecs can be combined in a pipeline. Default table codec is not included into pipeline (if it should be applied to a field, you have to specify it explicitly in pipeline).
Recommended usage for codecs:
- Usually, values for particular metric, stored in path does not differ significantly from point to point. Using delta-encoding allows to reduce disk space usage significantly.
- DateTime works great with pipeline of Delta, ZSTD and the column size can be compressed to 2-3% of its original size (given a smooth datetime data)
- Numeric types usually enjoy best compression rates with ZSTD
- String types enjoy good compression rates with LZ4HC
Usage:
```python
class Stats(models.Model):
id = fields.UInt64Field(codec='ZSTD(10)')
timestamp = fields.DateTimeField(codec='Delta,ZSTD')
timestamp_date = fields.DateField(codec='Delta(4),ZSTD(22)')
metadata_id = fields.Int64Field(codec='LZ4')
status = fields.StringField(codec='LZ4HC(10)')
calculation = fields.NullableField(fields.Float32Field(), codec='ZSTD')
alerts = fields.ArrayField(fields.FixedStringField(length=15), codec='Delta(2),LZ4HC')
engine = MergeTree('timestamp_date', ('id', 'timestamp'))
```
:exclamation:**_This feature is supported on clickhouse version 19.1.16 and above, codec arguments will be ignored by the ORM for clickhouse versions lower than 19.1.16_**
Creating custom field types Creating custom field types
--------------------------- ---------------------------
Sometimes it is convenient to use data types that are supported in Python, but have no corresponding column type in ClickHouse. In these cases it is possible to define a custom field class that knows how to convert the Pythonic object to a suitable representation in the database, and vice versa. Sometimes it is convenient to use data types that are supported in Python, but have no corresponding column type in ClickHouse. In these cases it is possible to define a custom field class that knows how to convert the Pythonic object to a suitable representation in the database, and vice versa.

View File

@ -120,6 +120,8 @@ class Database(object):
self.server_version = self._get_server_version() self.server_version = self._get_server_version()
# Versions 1.1.53981 and below don't have timezone function # Versions 1.1.53981 and below don't have timezone function
self.server_timezone = self._get_server_timezone() if self.server_version > (1, 1, 53981) else pytz.utc self.server_timezone = self._get_server_timezone() if self.server_version > (1, 1, 53981) else pytz.utc
# Versions 19.1.16 and below don't support codec compression
self.has_codec_support = self.server_version >= (19, 1, 16)
def create_database(self): def create_database(self):
''' '''

View File

@ -19,7 +19,7 @@ class Field(object):
class_default = 0 class_default = 0
db_type = None db_type = None
def __init__(self, default=None, alias=None, materialized=None, readonly=None): def __init__(self, default=None, alias=None, materialized=None, readonly=None, codec=None):
assert (None, None) in {(default, alias), (alias, materialized), (default, materialized)}, \ assert (None, None) in {(default, alias), (alias, materialized), (default, materialized)}, \
"Only one of default, alias and materialized parameters can be given" "Only one of default, alias and materialized parameters can be given"
assert alias is None or isinstance(alias, string_types) and alias != "",\ assert alias is None or isinstance(alias, string_types) and alias != "",\
@ -27,6 +27,8 @@ class Field(object):
assert materialized is None or isinstance(materialized, string_types) and alias != "",\ assert materialized is None or isinstance(materialized, string_types) and alias != "",\
"Materialized field must be string, if given" "Materialized field must be string, if given"
assert readonly is None or type(readonly) is bool, "readonly parameter must be bool if given" assert readonly is None or type(readonly) is bool, "readonly parameter must be bool if given"
assert codec is None or isinstance(codec, string_types) and codec != "", \
"Codec field must be string, if given"
self.creation_counter = Field.creation_counter self.creation_counter = Field.creation_counter
Field.creation_counter += 1 Field.creation_counter += 1
@ -34,6 +36,7 @@ class Field(object):
self.alias = alias self.alias = alias
self.materialized = materialized self.materialized = materialized
self.readonly = bool(self.alias or self.materialized or readonly) self.readonly = bool(self.alias or self.materialized or readonly)
self.codec = codec
def to_python(self, value, timezone_in_use): def to_python(self, value, timezone_in_use):
''' '''
@ -64,22 +67,25 @@ class Field(object):
''' '''
return escape(value, quote) return escape(value, quote)
def get_sql(self, with_default_expression=True): def get_sql(self, with_default_expression=True, db=None):
''' '''
Returns an SQL expression describing the field (e.g. for CREATE TABLE). Returns an SQL expression describing the field (e.g. for CREATE TABLE).
:param with_default_expression: If True, adds default value to sql. :param with_default_expression: If True, adds default value to sql.
It doesn't affect fields with alias and materialized values. It doesn't affect fields with alias and materialized values.
:param db: Database, used for checking supported features.
''' '''
sql = self.db_type
if with_default_expression: if with_default_expression:
if self.alias: if self.alias:
return '%s ALIAS %s' % (self.db_type, self.alias) sql += ' ALIAS %s' % self.alias
elif self.materialized: elif self.materialized:
return '%s MATERIALIZED %s' % (self.db_type, self.materialized) sql += ' MATERIALIZED %s' % self.materialized
else: else:
default = self.to_db_string(self.default) default = self.to_db_string(self.default)
return '%s DEFAULT %s' % (self.db_type, default) sql += ' DEFAULT %s' % default
else: if self.codec and db and db.has_codec_support:
return self.db_type sql+= ' CODEC(%s)' % self.codec
return sql
def isinstance(self, types): def isinstance(self, types):
""" """
@ -361,11 +367,11 @@ class BaseEnumField(Field):
Abstract base class for all enum-type fields. Abstract base class for all enum-type fields.
''' '''
def __init__(self, enum_cls, default=None, alias=None, materialized=None, readonly=None): def __init__(self, enum_cls, default=None, alias=None, materialized=None, readonly=None, codec=None):
self.enum_cls = enum_cls self.enum_cls = enum_cls
if default is None: if default is None:
default = list(enum_cls)[0] default = list(enum_cls)[0]
super(BaseEnumField, self).__init__(default, alias, materialized, readonly) super(BaseEnumField, self).__init__(default, alias, materialized, readonly, codec)
def to_python(self, value, timezone_in_use): def to_python(self, value, timezone_in_use):
if isinstance(value, self.enum_cls): if isinstance(value, self.enum_cls):
@ -384,12 +390,14 @@ class BaseEnumField(Field):
def to_db_string(self, value, quote=True): def to_db_string(self, value, quote=True):
return escape(value.name, quote) return escape(value.name, quote)
def get_sql(self, with_default_expression=True): def get_sql(self, with_default_expression=True, db=None):
values = ['%s = %d' % (escape(item.name), item.value) for item in self.enum_cls] values = ['%s = %d' % (escape(item.name), item.value) for item in self.enum_cls]
sql = '%s(%s)' % (self.db_type, ' ,'.join(values)) sql = '%s(%s)' % (self.db_type, ' ,'.join(values))
if with_default_expression: if with_default_expression:
default = self.to_db_string(self.default) default = self.to_db_string(self.default)
sql = '%s DEFAULT %s' % (sql, default) sql = '%s DEFAULT %s' % (sql, default)
if self.codec and db and db.has_codec_support:
sql+= ' CODEC(%s)' % self.codec
return sql return sql
@classmethod @classmethod
@ -425,11 +433,11 @@ class ArrayField(Field):
class_default = [] class_default = []
def __init__(self, inner_field, default=None, alias=None, materialized=None, readonly=None): def __init__(self, inner_field, default=None, alias=None, materialized=None, readonly=None, codec=None):
assert isinstance(inner_field, Field), "The first argument of ArrayField must be a Field instance" assert isinstance(inner_field, Field), "The first argument of ArrayField must be a Field instance"
assert not isinstance(inner_field, ArrayField), "Multidimensional array fields are not supported by the ORM" assert not isinstance(inner_field, ArrayField), "Multidimensional array fields are not supported by the ORM"
self.inner_field = inner_field self.inner_field = inner_field
super(ArrayField, self).__init__(default, alias, materialized, readonly) super(ArrayField, self).__init__(default, alias, materialized, readonly, codec)
def to_python(self, value, timezone_in_use): def to_python(self, value, timezone_in_use):
if isinstance(value, text_type): if isinstance(value, text_type):
@ -448,9 +456,11 @@ class ArrayField(Field):
array = [self.inner_field.to_db_string(v, quote=True) for v in value] array = [self.inner_field.to_db_string(v, quote=True) for v in value]
return '[' + comma_join(array) + ']' return '[' + comma_join(array) + ']'
def get_sql(self, with_default_expression=True): def get_sql(self, with_default_expression=True, db=None):
from .utils import escape sql = 'Array(%s)' % self.inner_field.get_sql(with_default_expression=False)
return 'Array(%s)' % self.inner_field.get_sql(with_default_expression=False) if self.codec and db and db.has_codec_support:
sql+= ' CODEC(%s)' % self.codec
return sql
class UUIDField(Field): class UUIDField(Field):
@ -481,12 +491,12 @@ class NullableField(Field):
class_default = None class_default = None
def __init__(self, inner_field, default=None, alias=None, materialized=None, def __init__(self, inner_field, default=None, alias=None, materialized=None,
extra_null_values=None): extra_null_values=None, codec=None):
self.inner_field = inner_field self.inner_field = inner_field
self._null_values = [None] self._null_values = [None]
if extra_null_values: if extra_null_values:
self._null_values.extend(extra_null_values) self._null_values.extend(extra_null_values)
super(NullableField, self).__init__(default, alias, materialized, readonly=None) super(NullableField, self).__init__(default, alias, materialized, readonly=None, codec=codec)
def to_python(self, value, timezone_in_use): def to_python(self, value, timezone_in_use):
if value == '\\N' or value in self._null_values: if value == '\\N' or value in self._null_values:
@ -501,14 +511,16 @@ class NullableField(Field):
return '\\N' return '\\N'
return self.inner_field.to_db_string(value, quote=quote) return self.inner_field.to_db_string(value, quote=quote)
def get_sql(self, with_default_expression=True): def get_sql(self, with_default_expression=True, db=None):
s = 'Nullable(%s)' % self.inner_field.get_sql(with_default_expression=False) sql = 'Nullable(%s)' % self.inner_field.get_sql(with_default_expression=False)
if with_default_expression: if with_default_expression:
if self.alias: if self.alias:
s = '%s ALIAS %s' % (s, self.alias) sql += ' ALIAS %s' % self.alias
elif self.materialized: elif self.materialized:
s = '%s MATERIALIZED %s' % (s, self.materialized) sql += ' MATERIALIZED %s' % self.materialized
elif self.default: elif self.default:
default = self.to_db_string(self.default) default = self.to_db_string(self.default)
s = '%s DEFAULT %s' % (s, default) sql += ' DEFAULT %s' % default
return s if self.codec and db and db.has_codec_support:
sql+= ' CODEC(%s)' % self.codec
return sql

View File

@ -79,7 +79,7 @@ class AlterTable(Operation):
if name not in table_fields: if name not in table_fields:
logger.info(' Add column %s', name) logger.info(' Add column %s', name)
assert prev_name, 'Cannot add a column to the beginning of the table' assert prev_name, 'Cannot add a column to the beginning of the table'
cmd = 'ADD COLUMN %s %s' % (name, field.get_sql()) cmd = 'ADD COLUMN %s %s' % (name, field.get_sql(db=database))
if is_regular_field: if is_regular_field:
cmd += ' AFTER %s' % prev_name cmd += ' AFTER %s' % prev_name
self._alter_table(database, cmd) self._alter_table(database, cmd)
@ -93,7 +93,7 @@ class AlterTable(Operation):
# The order of class attributes can be changed any time, so we can't count on it # The order of class attributes can be changed any time, so we can't count on it
# Secondly, MATERIALIZED and ALIAS fields are always at the end of the DESC, so we can't expect them to save # Secondly, MATERIALIZED and ALIAS fields are always at the end of the DESC, so we can't expect them to save
# attribute position. Watch https://github.com/Infinidat/infi.clickhouse_orm/issues/47 # attribute position. Watch https://github.com/Infinidat/infi.clickhouse_orm/issues/47
model_fields = {name: field.get_sql(with_default_expression=False) model_fields = {name: field.get_sql(with_default_expression=False, db=database)
for name, field in iteritems(self.model_class.fields())} for name, field in iteritems(self.model_class.fields())}
for field_name, field_sql in self._get_table_fields(database): for field_name, field_sql in self._get_table_fields(database):
# All fields must have been created and dropped by this moment # All fields must have been created and dropped by this moment

View File

@ -190,7 +190,7 @@ class Model(with_metaclass(ModelBase)):
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())] parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())]
cols = [] cols = []
for name, field in iteritems(cls.fields()): for name, field in iteritems(cls.fields()):
cols.append(' %s %s' % (name, field.get_sql())) cols.append(' %s %s' % (name, field.get_sql(db=db)))
parts.append(',\n'.join(cols)) parts.append(',\n'.join(cols))
parts.append(')') parts.append(')')
parts.append('ENGINE = ' + cls.engine.create_table_sql(db)) parts.append('ENGINE = ' + cls.engine.create_table_sql(db))
@ -316,7 +316,7 @@ class MergeModel(Model):
cols = [] cols = []
for name, field in iteritems(cls.fields()): for name, field in iteritems(cls.fields()):
if name != '_table': if name != '_table':
cols.append(' %s %s' % (name, field.get_sql())) cols.append(' %s %s' % (name, field.get_sql(db=db)))
parts.append(',\n'.join(cols)) parts.append(',\n'.join(cols))
parts.append(')') parts.append(')')
parts.append('ENGINE = ' + cls.engine.create_table_sql(db)) parts.append('ENGINE = ' + cls.engine.create_table_sql(db))

View File

@ -0,0 +1,6 @@
from infi.clickhouse_orm import migrations
from ..test_migrations import *
operations = [
migrations.AlterTableWithBuffer(Model4_compressed)
]

View File

@ -0,0 +1,123 @@
from __future__ import unicode_literals
import unittest
import datetime
import pytz
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.models import Model
from infi.clickhouse_orm.fields import *
from infi.clickhouse_orm.engines import *
from infi.clickhouse_orm.utils import parse_tsv
class CompressedFieldsTestCase(unittest.TestCase):
def setUp(self):
self.database = Database('test-db', log_statements=True)
self.database.create_table(CompressedModel)
def tearDown(self):
self.database.drop_database()
def test_defaults(self):
# Check that all fields have their explicit or implicit defaults
instance = CompressedModel()
self.database.insert([instance])
self.assertEqual(instance.date_field, datetime.date(1970, 1, 1))
self.assertEqual(instance.datetime_field, datetime.datetime(1970, 1, 1, tzinfo=pytz.utc))
self.assertEqual(instance.string_field, 'dozo')
self.assertEqual(instance.int64_field, 42)
self.assertEqual(instance.float_field, 0)
self.assertEqual(instance.nullable_field, None)
self.assertEqual(instance.array_field, [])
def test_assignment(self):
# Check that all fields are assigned during construction
kwargs = dict(
uint64_field=217,
date_field=datetime.date(1973, 12, 6),
datetime_field=datetime.datetime(2000, 5, 24, 10, 22, tzinfo=pytz.utc),
string_field='aloha',
int64_field=-50,
float_field=3.14,
nullable_field=-2.718281,
array_field=['123456789123456','','a']
)
instance = CompressedModel(**kwargs)
self.database.insert([instance])
for name, value in kwargs.items():
self.assertEqual(kwargs[name], getattr(instance, name))
def test_string_conversion(self):
# Check field conversion from string during construction
instance = CompressedModel(date_field='1973-12-06', int64_field='100', float_field='7', nullable_field=None, array_field='[a,b,c]')
self.assertEqual(instance.date_field, datetime.date(1973, 12, 6))
self.assertEqual(instance.int64_field, 100)
self.assertEqual(instance.float_field, 7)
self.assertEqual(instance.nullable_field, None)
self.assertEqual(instance.array_field, ['a', 'b', 'c'])
# Check field conversion from string during assignment
instance.int64_field = '99'
self.assertEqual(instance.int64_field, 99)
def test_to_dict(self):
instance = CompressedModel(date_field='1973-12-06', int64_field='100', float_field='7', array_field='[a,b,c]')
self.assertDictEqual(instance.to_dict(), {
"date_field": datetime.date(1973, 12, 6),
"int64_field": 100,
"float_field": 7.0,
"datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
"alias_field": 0.0,
'string_field': 'dozo',
'nullable_field': None,
'uint64_field': 0,
'array_field': ['a','b','c']
})
self.assertDictEqual(instance.to_dict(include_readonly=False), {
"date_field": datetime.date(1973, 12, 6),
"int64_field": 100,
"float_field": 7.0,
"datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
'string_field': 'dozo',
'nullable_field': None,
'uint64_field': 0,
'array_field': ['a', 'b', 'c']
})
self.assertDictEqual(
instance.to_dict(include_readonly=False, field_names=('int64_field', 'alias_field', 'datetime_field')), {
"int64_field": 100,
"datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc)
})
# This test will fail on clickhouse version < 19.1.16, use skip test
def test_confirm_compression_codec(self):
instance = CompressedModel(date_field='1973-12-06', int64_field='100', float_field='7', array_field='[a,b,c]')
self.database.insert([instance])
r = self.database.raw("select name, compression_codec from system.columns where table = '{}' and database='{}' FORMAT TabSeparatedWithNamesAndTypes".format(instance.table_name(), self.database.db_name))
lines = r.splitlines()
field_names = parse_tsv(lines[0])
field_types = parse_tsv(lines[1])
data = [tuple(parse_tsv(line)) for line in lines[2:]]
self.assertListEqual(data, [('uint64_field', 'CODEC(ZSTD(10))'),
('datetime_field', 'CODEC(Delta(4), ZSTD(1))'),
('date_field', 'CODEC(Delta(4), ZSTD(22))'),
('int64_field', 'CODEC(LZ4)'),
('string_field', 'CODEC(LZ4HC(10))'),
('nullable_field', 'CODEC(ZSTD(1))'),
('array_field', 'CODEC(Delta(2), LZ4HC(0))'),
('float_field', 'CODEC(NONE)'),
('alias_field', 'CODEC(ZSTD(4))')])
class CompressedModel(Model):
uint64_field = UInt64Field(codec='ZSTD(10)')
datetime_field = DateTimeField(codec='Delta,ZSTD')
date_field = DateField(codec='Delta(4),ZSTD(22)')
int64_field = Int64Field(default=42, codec='LZ4')
string_field = StringField(default='dozo', codec='LZ4HC(10)')
nullable_field = NullableField(Float32Field(), codec='ZSTD')
array_field = ArrayField(FixedStringField(length=15), codec='Delta(2),LZ4HC')
float_field = Float32Field(codec='NONE')
alias_field = Float32Field(alias='float_field', codec='ZSTD(4)')
engine = MergeTree('datetime_field', ('uint64_field', 'datetime_field'))

View File

@ -258,3 +258,16 @@ class Model4Buffer_changed(BufferModel, Model4_changed):
@classmethod @classmethod
def table_name(cls): def table_name(cls):
return 'model4buffer' return 'model4buffer'
class Model4_compressed(Model):
date = DateField(codec='Delta(4),ZSTD')
f3 = DateTimeField(codec='Delta,ZSTD(10)')
f2 = StringField(codec='LZ4HC')
engine = MergeTree('date', ('date',))
@classmethod
def table_name(cls):
return 'model4'