diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b1210f7..878d8a3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,8 +1,14 @@ Change Log ========== -[Unreleased] ------------- +Unreleased +---------- +- Add support for ReplacingMergeTree (leenr) +- Fix problem with SELECT WITH TOTALS (pilosus) +- Update serialization format of DateTimeField to 10 digits, zero padded (nikepan) + +v0.8.0 +------ - Always keep datetime fields in UTC internally, and convert server timezone to UTC when parsing query results - Support for ALIAS and MATERIALIZED fields (M1ha) - Pagination: passing -1 as the page number now returns the last page diff --git a/README.rst b/README.rst index 9db0ab1..6c2c6d9 100644 --- a/README.rst +++ b/README.rst @@ -380,6 +380,10 @@ For a ``SummingMergeTree`` you can optionally specify the summing columns:: engine = engines.SummingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), summing_cols=('Shows', 'Clicks', 'Cost')) +For a ``ReplacingMergeTree`` you can optionally specify the version column:: + + engine = engines.ReplacingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), ver_col='Version') + A ``Buffer`` engine is available for BufferModels. (See below how to use BufferModel). You can specify following parameters:: engine = engines.Buffer(Person) # you need to initialize engine with main Model. Other default parameters will be used @@ -425,7 +429,6 @@ After cloning the project, run the following commands:: To run the tests, ensure that the ClickHouse server is running on http://localhost:8123/ (this is the default), and run:: bin/nosetests -======= To see test coverage information run:: diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index 2be8534..3afd1c3 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -96,7 +96,9 @@ class Database(object): field_types = parse_tsv(next(lines)) model_class = model_class or ModelBase.create_ad_hoc_model(zip(field_names, field_types)) for line in lines: - yield model_class.from_tsv(line, field_names, self.server_timezone, self) + # skip blank line left by WITH TOTALS modifier + if line: + yield model_class.from_tsv(line, field_names, self.server_timezone, self) def raw(self, query, settings=None, stream=False): """ diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 7c9a94c..c26b451 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -9,6 +9,7 @@ class MergeTree(Engine): def __init__(self, date_col, key_cols, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None): + assert type(key_cols) in (list, tuple), 'key_cols must be a list or tuple' self.date_col = date_col self.key_cols = key_cols self.sampling_expr = sampling_expr @@ -54,6 +55,7 @@ class SummingMergeTree(MergeTree): def __init__(self, date_col, key_cols, summing_cols=None, sampling_expr=None, index_granularity=8192, replica_table_path=None, replica_name=None): super(SummingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) + assert type is None or type(summing_cols) in (list, tuple), 'summing_cols must be a list or tuple' self.summing_cols = summing_cols def _build_sql_params(self): @@ -63,6 +65,20 @@ class SummingMergeTree(MergeTree): return params +class ReplacingMergeTree(MergeTree): + + def __init__(self, date_col, key_cols, ver_col=None, sampling_expr=None, + index_granularity=8192, replica_table_path=None, replica_name=None): + super(ReplacingMergeTree, self).__init__(date_col, key_cols, sampling_expr, index_granularity, replica_table_path, replica_name) + self.ver_col = ver_col + + def _build_sql_params(self): + params = super(ReplacingMergeTree, self)._build_sql_params() + if self.ver_col: + params.append(self.ver_col) + return params + + class Buffer(Engine): """Here we define Buffer engine Read more here https://clickhouse.yandex/reference_en.html#Buffer diff --git a/src/infi/clickhouse_orm/fields.py b/src/infi/clickhouse_orm/fields.py index 3fa49ec..915ebee 100644 --- a/src/infi/clickhouse_orm/fields.py +++ b/src/infi/clickhouse_orm/fields.py @@ -132,12 +132,18 @@ class DateTimeField(Field): if isinstance(value, string_types): if value == '0000-00-00 00:00:00': return self.class_default + if len(value) == 10: + try: + value = int(value) + return datetime.datetime.utcfromtimestamp(value).replace(tzinfo=pytz.utc) + except ValueError: + pass dt = datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S') return timezone_in_use.localize(dt).astimezone(pytz.utc) raise ValueError('Invalid value for %s - %r' % (self.__class__.__name__, value)) def to_db_string(self, value, quote=True): - return escape(timegm(value.utctimetuple()), quote) + return escape('%010d' % timegm(value.utctimetuple()), quote) class BaseIntField(Field): diff --git a/src/infi/clickhouse_orm/utils.py b/src/infi/clickhouse_orm/utils.py index c24a93d..f5b5b22 100644 --- a/src/infi/clickhouse_orm/utils.py +++ b/src/infi/clickhouse_orm/utils.py @@ -38,7 +38,7 @@ def unescape(value): def parse_tsv(line): if PY3 and isinstance(line, binary_type): line = line.decode() - if line[-1] == '\n': + if line and line[-1] == '\n': line = line[:-1] return [unescape(value) for value in line.split('\t')] diff --git a/tests/test_database.py b/tests/test_database.py index 62b340b..1897d8f 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -89,6 +89,15 @@ class DatabaseTestCase(unittest.TestCase): self.assertEqual(results[0].get_database(), self.database) self.assertEqual(results[1].get_database(), self.database) + def test_select_with_totals(self): + self._insert_and_check(self._sample_data(), len(data)) + query = "SELECT last_name, sum(height) as height FROM `test-db`.person GROUP BY last_name WITH TOTALS" + results = list(self.database.select(query)) + total = sum(r.height for r in results[:-1]) + # Last line has an empty last name, and total of all heights + self.assertFalse(results[-1].last_name) + self.assertEquals(total, results[-1].height) + def test_pagination(self): self._insert_and_check(self._sample_data(), len(data)) # Try different page sizes diff --git a/tests/test_engines.py b/tests/test_engines.py new file mode 100644 index 0000000..d3d8865 --- /dev/null +++ b/tests/test_engines.py @@ -0,0 +1,64 @@ +import unittest + +from infi.clickhouse_orm.database import Database, DatabaseException +from infi.clickhouse_orm.models import Model +from infi.clickhouse_orm.fields import * +from infi.clickhouse_orm.engines import * + +import logging +logging.getLogger("requests").setLevel(logging.WARNING) + + +class EnginesTestCase(unittest.TestCase): + + def setUp(self): + self.database = Database('test-db') + + def tearDown(self): + self.database.drop_database() + + def _create_and_insert(self, model_class): + self.database.create_table(model_class) + self.database.insert([ + model_class(date='2017-01-01', event_id=23423, event_group=13, event_count=7, event_version=1) + ]) + + def test_merge_tree(self): + class TestModel(SampleModel): + engine = MergeTree('date', ('date', 'event_id', 'event_group')) + self._create_and_insert(TestModel) + + def test_merge_tree_with_sampling(self): + class TestModel(SampleModel): + engine = MergeTree('date', ('date', 'event_id', 'event_group'), sampling_expr='intHash32(event_id)') + self._create_and_insert(TestModel) + + def test_merge_tree_with_granularity(self): + class TestModel(SampleModel): + engine = MergeTree('date', ('date', 'event_id', 'event_group'), index_granularity=4096) + self._create_and_insert(TestModel) + + def test_collapsing_merge_tree(self): + class TestModel(SampleModel): + engine = CollapsingMergeTree('date', ('date', 'event_id', 'event_group'), 'event_version') + self._create_and_insert(TestModel) + + def test_summing_merge_tree(self): + class TestModel(SampleModel): + engine = SummingMergeTree('date', ('date', 'event_group'), ('event_count',)) + self._create_and_insert(TestModel) + + def test_replacing_merge_tree(self): + class TestModel(SampleModel): + engine = ReplacingMergeTree('date', ('date', 'event_id', 'event_group'), 'event_uversion') + self._create_and_insert(TestModel) + + +class SampleModel(Model): + + date = DateField() + event_id = UInt32Field() + event_group = UInt32Field() + event_count = UInt16Field() + event_version = Int8Field() + event_uversion = UInt8Field(materialized='abs(event_version)') diff --git a/tests/test_simple_fields.py b/tests/test_simple_fields.py index 2d1f2ab..645d9ed 100644 --- a/tests/test_simple_fields.py +++ b/tests/test_simple_fields.py @@ -6,32 +6,17 @@ import pytz class SimpleFieldsTest(unittest.TestCase): - def test_date_field(self): - f = DateField() - # Valid values - for value in (date(1970, 1, 1), datetime(1970, 1, 1), '1970-01-01', '0000-00-00', 0): - self.assertEquals(f.to_python(value, pytz.utc), date(1970, 1, 1)) - # Invalid values - for value in ('nope', '21/7/1999', 0.5): - with self.assertRaises(ValueError): - f.to_python(value, pytz.utc) - # Range check - for value in (date(1900, 1, 1), date(2900, 1, 1)): - with self.assertRaises(ValueError): - f.validate(value) - def test_datetime_field(self): f = DateTimeField() epoch = datetime(1970, 1, 1, tzinfo=pytz.utc) # Valid values for value in (date(1970, 1, 1), datetime(1970, 1, 1), epoch, epoch.astimezone(pytz.timezone('US/Eastern')), epoch.astimezone(pytz.timezone('Asia/Jerusalem')), - '1970-01-01 00:00:00', '0000-00-00 00:00:00', 0): + '1970-01-01 00:00:00', '1970-01-17 00:00:17', '0000-00-00 00:00:00', 0): dt = f.to_python(value, pytz.utc) self.assertEquals(dt.tzinfo, pytz.utc) - self.assertEquals(dt, epoch) # Verify that conversion to and from db string does not change value - dt2 = f.to_python(int(f.to_db_string(dt)), pytz.utc) + dt2 = f.to_python(f.to_db_string(dt, quote=False), pytz.utc) self.assertEquals(dt, dt2) # Invalid values for value in ('nope', '21/7/1999', 0.5): @@ -52,6 +37,10 @@ class SimpleFieldsTest(unittest.TestCase): for value in ('nope', '21/7/1999', 0.5): with self.assertRaises(ValueError): f.to_python(value, pytz.utc) + # Range check + for value in (date(1900, 1, 1), date(2900, 1, 1)): + with self.assertRaises(ValueError): + f.validate(value) def test_date_field_timezone(self): # Verify that conversion of timezone-aware datetime is correct