diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 59d97c0..b1210f7 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -8,6 +8,10 @@ Change Log - Pagination: passing -1 as the page number now returns the last page - Accept datetime values for date fields (Zloool) - Support readonly mode in Database class (tswr) +- Added support for the Buffer table engine (emakarov) +- Added the SystemPart readonly model, which provides operations on partitions (M1ha) +- Added Model.to_dict() that converts a model instance to a dictionary (M1ha) +- Added Database.raw() to perform arbitrary queries (M1ha) v0.7.1 ------ diff --git a/README.rst b/README.rst index 4842360..9db0ab1 100644 --- a/README.rst +++ b/README.rst @@ -330,7 +330,8 @@ You can create array fields containing any data type, for example:: Working with materialized and alias fields ****************************************** -ClickHouse provides an opportunity to create MATERIALIZED and ALIAS fields. +ClickHouse provides an opportunity to create MATERIALIZED and ALIAS Fields. + See documentation `here `_. Both field types can't be inserted into the database directly, so they are ignored when using the ``Database.insert()`` method. @@ -379,6 +380,30 @@ For a ``SummingMergeTree`` you can optionally specify the summing columns:: engine = engines.SummingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), summing_cols=('Shows', 'Clicks', 'Cost')) +A ``Buffer`` engine is available for BufferModels. (See below how to use BufferModel). You can specify following parameters:: + + engine = engines.Buffer(Person) # you need to initialize engine with main Model. Other default parameters will be used + # or: + engine = engines.Buffer(Person, num_layers=16, min_time=10, + max_time=100, min_rows=10000, max_rows=1000000, + min_bytes=10000000, max_bytes=100000000) + +Buffer Models +------------- +Here's how you can define Model for Buffer Engine. The Buffer Model should be inherited from models.BufferModel and main Model:: + + class PersonBuffer(models.BufferModel, Person): + + engine = engines.Buffer(Person) + +Then you can insert objects into Buffer model and they will be handled by Clickhouse properly:: + + db.create_table(PersonBuffer) + suzy = PersonBuffer(first_name='Suzy', last_name='Jones') + dan = PersonBuffer(first_name='Dan', last_name='Schwartz') + db.insert([dan, suzy]) + + Data Replication **************** @@ -400,6 +425,7 @@ After cloning the project, run the following commands:: To run the tests, ensure that the ClickHouse server is running on http://localhost:8123/ (this is the default), and run:: bin/nosetests +======= To see test coverage information run:: diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 3f4870b..7c9a94c 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -62,3 +62,30 @@ class SummingMergeTree(MergeTree): params.append('(%s)' % ', '.join(self.summing_cols)) return params + +class Buffer(Engine): + """Here we define Buffer engine + Read more here https://clickhouse.yandex/reference_en.html#Buffer + """ + + #Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes) + def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): + self.main_model = main_model + self.num_layers = num_layers + self.min_time = min_time + self.max_time = max_time + self.min_rows = min_rows + self.max_rows = max_rows + self.min_bytes = min_bytes + self.max_bytes = max_bytes + + + def create_table_sql(self, db_name): + # Overriden create_table_sql example: + #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' + sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % ( + db_name, self.main_model.table_name(), self.num_layers, + self.min_time, self.max_time, self.min_rows, + self.max_rows, self.min_bytes, self.max_bytes + ) + return sql diff --git a/src/infi/clickhouse_orm/fields.py b/src/infi/clickhouse_orm/fields.py index 1113dd6..3fa49ec 100644 --- a/src/infi/clickhouse_orm/fields.py +++ b/src/infi/clickhouse_orm/fields.py @@ -99,7 +99,7 @@ class DateField(Field): def to_python(self, value, timezone_in_use): if isinstance(value, datetime.datetime): - return value.date() + return value.astimezone(pytz.utc).date() if value.tzinfo else value.date() if isinstance(value, datetime.date): return value if isinstance(value, int): diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 6b82244..a6163f7 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -202,3 +202,16 @@ class Model(with_metaclass(ModelBase)): data = self.__dict__ return {name: data[name] for name, field in fields} + + +class BufferModel(Model): + + @classmethod + def create_table_sql(cls, db_name): + ''' + Returns the SQL command for creating a table for this model. + ''' + parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.engine.main_model.table_name())] + engine_str = cls.engine.create_table_sql(db_name) + parts.append(engine_str) + return ' '.join(parts) diff --git a/tests/test_database.py b/tests/test_database.py index aa7a4df..62b340b 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -3,7 +3,7 @@ import unittest from infi.clickhouse_orm.database import Database, DatabaseException -from infi.clickhouse_orm.models import Model +from infi.clickhouse_orm.models import Model, BufferModel from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.engines import * @@ -16,8 +16,10 @@ class DatabaseTestCase(unittest.TestCase): def setUp(self): self.database = Database('test-db') self.database.create_table(Person) + self.database.create_table(PersonBuffer) def tearDown(self): + self.database.drop_table(PersonBuffer) self.database.drop_table(Person) self.database.drop_database() @@ -27,6 +29,10 @@ class DatabaseTestCase(unittest.TestCase): for instance in data: self.assertEquals(self.database, instance.get_database()) + def _insert_and_check_buffer(self, data, count): + self.database.insert(data) + self.assertEquals(count, self.database.count(PersonBuffer)) + def test_insert__generator(self): self._insert_and_check(self._sample_data(), len(data)) @@ -137,6 +143,9 @@ class DatabaseTestCase(unittest.TestCase): self.database.drop_database() self.database = orig_database + def test_insert_buffer(self): + self._insert_and_check_buffer(self._sample_buffer_data(), len(data)) + def _sample_data(self): for entry in data: yield Person(**entry) @@ -160,6 +169,11 @@ class DatabaseTestCase(unittest.TestCase): with self.assertRaises(DatabaseException): self.database.drop_table(ReadOnlyModel) + def _sample_buffer_data(self): + for entry in data: + yield PersonBuffer(**entry) + + class Person(Model): @@ -177,6 +191,11 @@ class ReadOnlyModel(Model): name = StringField() +class PersonBuffer(BufferModel, Person): + + engine = Buffer(Person) + + data = [ {"first_name": "Abdul", "last_name": "Hester", "birthday": "1970-12-02", "height": "1.63"}, diff --git a/tests/test_simple_fields.py b/tests/test_simple_fields.py index c955574..2d1f2ab 100644 --- a/tests/test_simple_fields.py +++ b/tests/test_simple_fields.py @@ -38,6 +38,27 @@ class SimpleFieldsTest(unittest.TestCase): with self.assertRaises(ValueError): f.to_python(value, pytz.utc) + def test_date_field(self): + f = DateField() + epoch = date(1970, 1, 1) + # Valid values + for value in (datetime(1970, 1, 1), epoch, '1970-01-01', '0000-00-00', 0): + d = f.to_python(value, pytz.utc) + self.assertEquals(d, epoch) + # Verify that conversion to and from db string does not change value + d2 = f.to_python(f.to_db_string(d, quote=False), pytz.utc) + self.assertEquals(d, d2) + # Invalid values + for value in ('nope', '21/7/1999', 0.5): + with self.assertRaises(ValueError): + f.to_python(value, pytz.utc) + + def test_date_field_timezone(self): + # Verify that conversion of timezone-aware datetime is correct + f = DateField() + dt = datetime(2017, 10, 5, tzinfo=pytz.timezone('Asia/Jerusalem')) + self.assertEquals(f.to_python(dt, pytz.utc), date(2017, 10, 4)) + def test_uint8_field(self): f = UInt8Field() # Valid values diff --git a/tests/test_system_models.py b/tests/test_system_models.py index 756e5cd..544713b 100644 --- a/tests/test_system_models.py +++ b/tests/test_system_models.py @@ -22,6 +22,8 @@ class SystemPartTest(unittest.TestCase): self.database.drop_database() def _get_backups(self): + if not os.path.exists(self.BACKUP_DIR): + return [] _, dirnames, _ = next(os.walk(self.BACKUP_DIR)) return dirnames