From c9697de56c05313e0d164c4c51a9d8b60833905a Mon Sep 17 00:00:00 2001 From: emakarov Date: Tue, 7 Feb 2017 19:19:50 +0300 Subject: [PATCH 1/6] Buffer engine initial commit --- src/infi/clickhouse_orm/engines.py | 27 +++++++++++++++++++++++++++ src/infi/clickhouse_orm/models.py | 15 +++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 3f4870b..d07b2ed 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -62,3 +62,30 @@ class SummingMergeTree(MergeTree): params.append('(%s)' % ', '.join(self.summing_cols)) return params + +class Buffer(Engine): + """Here we define Buffer engine + Read more here https://clickhouse.yandex/reference_en.html#Buffer + """ + + #Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes) + def __init__(self, table, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): + self.table = table + self.num_layers = num_layers + self.min_time = min_time + self.max_time = max_time + self.min_rows = min_rows + self.max_rows = max_rows + self.min_bytes = min_bytes + self.max_bytes = max_bytes + + + def create_table_sql(self, db_name, target): + # Overriden create_table_sql example: + #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' + sql = 'ENGINE = Buffer(%s, %s, %d, %d, %d, %d, %d, %d, %d)' % ( + db_name, target.table_name(), self.num_layers, + self.min_time, self.max_time, self.min_rows, + self.max_rows, self.min_bytes, self.max_bytes + ) + return sql diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 6fae876..da79073 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -156,3 +156,18 @@ class Model(with_metaclass(ModelBase)): ''' data = self.__dict__ return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in self._fields) + + +class BufferModel(Model): + + main_model = None # table's Model should be defined in implementation. It's a table where data will be flushed + + @classmethod + def create_table_sql(cls, db_name): + ''' + Returns the SQL command for creating a table for this model. + ''' + parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.main_model.table_name())] + engine_str = cls.engine.create_table_sql(db_name, cls) + parts.append(engine_str) + return ' '.join(parts) From d19787cb9fa83a82d92fbf6bbe7dc3e1917e8a03 Mon Sep 17 00:00:00 2001 From: emakarov Date: Tue, 7 Feb 2017 20:41:26 +0300 Subject: [PATCH 2/6] Fix for create_table_sql for Buffer table --- src/infi/clickhouse_orm/engines.py | 4 ++-- src/infi/clickhouse_orm/models.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index d07b2ed..aab4c39 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -80,11 +80,11 @@ class Buffer(Engine): self.max_bytes = max_bytes - def create_table_sql(self, db_name, target): + def create_table_sql(self, db_name, main_model): # Overriden create_table_sql example: #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' sql = 'ENGINE = Buffer(%s, %s, %d, %d, %d, %d, %d, %d, %d)' % ( - db_name, target.table_name(), self.num_layers, + db_name, main_model.table_name(), self.num_layers, self.min_time, self.max_time, self.min_rows, self.max_rows, self.min_bytes, self.max_bytes ) diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index da79073..e7cfcb9 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -168,6 +168,6 @@ class BufferModel(Model): Returns the SQL command for creating a table for this model. ''' parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.main_model.table_name())] - engine_str = cls.engine.create_table_sql(db_name, cls) + engine_str = cls.engine.create_table_sql(db_name, cls.main_model) parts.append(engine_str) return ' '.join(parts) From 427088d87fd71bf5236bda051b3fe86cdc936416 Mon Sep 17 00:00:00 2001 From: emakarov Date: Tue, 7 Feb 2017 20:54:39 +0300 Subject: [PATCH 3/6] README extended --- README.rst | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index fec606b..58e011c 100644 --- a/README.rst +++ b/README.rst @@ -271,6 +271,30 @@ For a ``SummingMergeTree`` you can optionally specify the summing columns:: engine = engines.SummingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), summing_cols=('Shows', 'Clicks', 'Cost')) +A ``Buffer`` engine is available for BufferModels. (See below how to use BufferModel). You can specify following parameters:: + + engine = engines.Buffer(Person) # you need to initialize engine with main Model. Other default parameters will be used + # or: + engine = engines.Buffer(Person, table, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): + +Buffer Models +------------- +Here's how you can define Model for Buffer Engine. The Buffer Model should be inherited from models.BufferModel and main model +Main model also should be specified in class:: + + class PersonBuffer(models.BufferModel, Person): + + main_model = Person + engine = engines.Buffer(Person) + +Then you can insert objects into Buffer model and they will be handled by Clickhouse properly:: + + db.create_table(PersonBuffer) + suzy = PersonBuffer(first_name='Suzy', last_name='Jones') + dan = PersonBuffer(first_name='Dan', last_name='Schwartz') + db.insert([dan, suzy]) + + Data Replication **************** @@ -291,4 +315,4 @@ After cloning the project, run the following commands:: To run the tests, ensure that the ClickHouse server is running on http://localhost:8123/ (this is the default), and run:: - bin/nosetests \ No newline at end of file + bin/nosetests From e599b6b309312aa3694f5a9113a580fba617bc70 Mon Sep 17 00:00:00 2001 From: emakarov Date: Tue, 7 Feb 2017 20:56:13 +0300 Subject: [PATCH 4/6] README extended --- README.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 58e011c..fbd1965 100644 --- a/README.rst +++ b/README.rst @@ -275,7 +275,9 @@ A ``Buffer`` engine is available for BufferModels. (See below how to use BufferM engine = engines.Buffer(Person) # you need to initialize engine with main Model. Other default parameters will be used # or: - engine = engines.Buffer(Person, table, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): + engine = engines.Buffer(Person, table, num_layers=16, min_time=10, + max_time=100, min_rows=10000, max_rows=1000000, + min_bytes=10000000, max_bytes=100000000) Buffer Models ------------- From 86a3fec14386473a2fa83dbaebbaf5b248dcfd44 Mon Sep 17 00:00:00 2001 From: emakarov Date: Tue, 7 Feb 2017 21:32:51 +0300 Subject: [PATCH 5/6] cleaner code for Buffer engine and Buffer Model class --- README.rst | 6 ++---- src/infi/clickhouse_orm/engines.py | 8 ++++---- src/infi/clickhouse_orm/models.py | 7 ++----- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/README.rst b/README.rst index 43b9c91..9486e54 100644 --- a/README.rst +++ b/README.rst @@ -331,18 +331,16 @@ A ``Buffer`` engine is available for BufferModels. (See below how to use BufferM engine = engines.Buffer(Person) # you need to initialize engine with main Model. Other default parameters will be used # or: - engine = engines.Buffer(Person, table, num_layers=16, min_time=10, + engine = engines.Buffer(Person, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000) Buffer Models ------------- -Here's how you can define Model for Buffer Engine. The Buffer Model should be inherited from models.BufferModel and main model -Main model also should be specified in class:: +Here's how you can define Model for Buffer Engine. The Buffer Model should be inherited from models.BufferModel and main Model:: class PersonBuffer(models.BufferModel, Person): - main_model = Person engine = engines.Buffer(Person) Then you can insert objects into Buffer model and they will be handled by Clickhouse properly:: diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index aab4c39..47373e5 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -69,8 +69,8 @@ class Buffer(Engine): """ #Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes) - def __init__(self, table, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): - self.table = table + def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): + self.main_model = main_model self.num_layers = num_layers self.min_time = min_time self.max_time = max_time @@ -80,11 +80,11 @@ class Buffer(Engine): self.max_bytes = max_bytes - def create_table_sql(self, db_name, main_model): + def create_table_sql(self, db_name): # Overriden create_table_sql example: #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' sql = 'ENGINE = Buffer(%s, %s, %d, %d, %d, %d, %d, %d, %d)' % ( - db_name, main_model.table_name(), self.num_layers, + db_name, self.main_model.table_name(), self.num_layers, self.min_time, self.max_time, self.min_rows, self.max_rows, self.min_bytes, self.max_bytes ) diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index bd9a71c..c71689e 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -166,15 +166,12 @@ class Model(with_metaclass(ModelBase)): class BufferModel(Model): - main_model = None # table's Model should be defined in implementation. It's a table where data will be flushed - @classmethod def create_table_sql(cls, db_name): ''' Returns the SQL command for creating a table for this model. ''' - parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.main_model.table_name())] - engine_str = cls.engine.create_table_sql(db_name, cls.main_model) + parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.engine.main_model.table_name())] + engine_str = cls.engine.create_table_sql(db_name) parts.append(engine_str) return ' '.join(parts) - From 77b33c0ed4b3147b0c7e68aeb9e7694735b87004 Mon Sep 17 00:00:00 2001 From: emakarov Date: Wed, 8 Feb 2017 23:21:48 +0300 Subject: [PATCH 6/6] test added. engine sql query fix to support special characters --- src/infi/clickhouse_orm/engines.py | 2 +- tests/test_database.py | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 47373e5..7c9a94c 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -83,7 +83,7 @@ class Buffer(Engine): def create_table_sql(self, db_name): # Overriden create_table_sql example: #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' - sql = 'ENGINE = Buffer(%s, %s, %d, %d, %d, %d, %d, %d, %d)' % ( + sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % ( db_name, self.main_model.table_name(), self.num_layers, self.min_time, self.max_time, self.min_rows, self.max_rows, self.min_bytes, self.max_bytes diff --git a/tests/test_database.py b/tests/test_database.py index 1e62472..973b15b 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -3,7 +3,7 @@ import unittest from infi.clickhouse_orm.database import Database, DatabaseException -from infi.clickhouse_orm.models import Model +from infi.clickhouse_orm.models import Model, BufferModel from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.engines import * @@ -16,8 +16,10 @@ class DatabaseTestCase(unittest.TestCase): def setUp(self): self.database = Database('test-db') self.database.create_table(Person) + self.database.create_table(PersonBuffer) def tearDown(self): + self.database.drop_table(PersonBuffer) self.database.drop_table(Person) self.database.drop_database() @@ -25,6 +27,10 @@ class DatabaseTestCase(unittest.TestCase): self.database.insert(data) self.assertEquals(count, self.database.count(Person)) + def _insert_and_check_buffer(self, data, count): + self.database.insert(data) + self.assertEquals(count, self.database.count(PersonBuffer)) + def test_insert__generator(self): self._insert_and_check(self._sample_data(), len(data)) @@ -129,10 +135,17 @@ class DatabaseTestCase(unittest.TestCase): self.database.drop_database() self.database = orig_database + def test_insert_buffer(self): + self._insert_and_check_buffer(self._sample_buffer_data(), len(data)) + def _sample_data(self): for entry in data: yield Person(**entry) + def _sample_buffer_data(self): + for entry in data: + yield PersonBuffer(**entry) + class Person(Model): @@ -144,6 +157,11 @@ class Person(Model): engine = MergeTree('birthday', ('first_name', 'last_name', 'birthday')) +class PersonBuffer(BufferModel, Person): + + engine = Buffer(Person) + + data = [ {"first_name": "Abdul", "last_name": "Hester", "birthday": "1970-12-02", "height": "1.63"}, {"first_name": "Adam", "last_name": "Goodman", "birthday": "1986-01-07", "height": "1.74"},