From c9697de56c05313e0d164c4c51a9d8b60833905a Mon Sep 17 00:00:00 2001 From: emakarov Date: Tue, 7 Feb 2017 19:19:50 +0300 Subject: [PATCH 01/21] Buffer engine initial commit --- src/infi/clickhouse_orm/engines.py | 27 +++++++++++++++++++++++++++ src/infi/clickhouse_orm/models.py | 15 +++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 3f4870b..d07b2ed 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -62,3 +62,30 @@ class SummingMergeTree(MergeTree): params.append('(%s)' % ', '.join(self.summing_cols)) return params + +class Buffer(Engine): + """Here we define Buffer engine + Read more here https://clickhouse.yandex/reference_en.html#Buffer + """ + + #Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes) + def __init__(self, table, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): + self.table = table + self.num_layers = num_layers + self.min_time = min_time + self.max_time = max_time + self.min_rows = min_rows + self.max_rows = max_rows + self.min_bytes = min_bytes + self.max_bytes = max_bytes + + + def create_table_sql(self, db_name, target): + # Overriden create_table_sql example: + #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' + sql = 'ENGINE = Buffer(%s, %s, %d, %d, %d, %d, %d, %d, %d)' % ( + db_name, target.table_name(), self.num_layers, + self.min_time, self.max_time, self.min_rows, + self.max_rows, self.min_bytes, self.max_bytes + ) + return sql diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 6fae876..da79073 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -156,3 +156,18 @@ class Model(with_metaclass(ModelBase)): ''' data = self.__dict__ return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in self._fields) + + +class BufferModel(Model): + + main_model = None # table's Model should be defined in implementation. It's a table where data will be flushed + + @classmethod + def create_table_sql(cls, db_name): + ''' + Returns the SQL command for creating a table for this model. + ''' + parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.main_model.table_name())] + engine_str = cls.engine.create_table_sql(db_name, cls) + parts.append(engine_str) + return ' '.join(parts) From d19787cb9fa83a82d92fbf6bbe7dc3e1917e8a03 Mon Sep 17 00:00:00 2001 From: emakarov Date: Tue, 7 Feb 2017 20:41:26 +0300 Subject: [PATCH 02/21] Fix for create_table_sql for Buffer table --- src/infi/clickhouse_orm/engines.py | 4 ++-- src/infi/clickhouse_orm/models.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index d07b2ed..aab4c39 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -80,11 +80,11 @@ class Buffer(Engine): self.max_bytes = max_bytes - def create_table_sql(self, db_name, target): + def create_table_sql(self, db_name, main_model): # Overriden create_table_sql example: #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' sql = 'ENGINE = Buffer(%s, %s, %d, %d, %d, %d, %d, %d, %d)' % ( - db_name, target.table_name(), self.num_layers, + db_name, main_model.table_name(), self.num_layers, self.min_time, self.max_time, self.min_rows, self.max_rows, self.min_bytes, self.max_bytes ) diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index da79073..e7cfcb9 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -168,6 +168,6 @@ class BufferModel(Model): Returns the SQL command for creating a table for this model. ''' parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.main_model.table_name())] - engine_str = cls.engine.create_table_sql(db_name, cls) + engine_str = cls.engine.create_table_sql(db_name, cls.main_model) parts.append(engine_str) return ' '.join(parts) From 427088d87fd71bf5236bda051b3fe86cdc936416 Mon Sep 17 00:00:00 2001 From: emakarov Date: Tue, 7 Feb 2017 20:54:39 +0300 Subject: [PATCH 03/21] README extended --- README.rst | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index fec606b..58e011c 100644 --- a/README.rst +++ b/README.rst @@ -271,6 +271,30 @@ For a ``SummingMergeTree`` you can optionally specify the summing columns:: engine = engines.SummingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'), summing_cols=('Shows', 'Clicks', 'Cost')) +A ``Buffer`` engine is available for BufferModels. (See below how to use BufferModel). You can specify following parameters:: + + engine = engines.Buffer(Person) # you need to initialize engine with main Model. Other default parameters will be used + # or: + engine = engines.Buffer(Person, table, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): + +Buffer Models +------------- +Here's how you can define Model for Buffer Engine. The Buffer Model should be inherited from models.BufferModel and main model +Main model also should be specified in class:: + + class PersonBuffer(models.BufferModel, Person): + + main_model = Person + engine = engines.Buffer(Person) + +Then you can insert objects into Buffer model and they will be handled by Clickhouse properly:: + + db.create_table(PersonBuffer) + suzy = PersonBuffer(first_name='Suzy', last_name='Jones') + dan = PersonBuffer(first_name='Dan', last_name='Schwartz') + db.insert([dan, suzy]) + + Data Replication **************** @@ -291,4 +315,4 @@ After cloning the project, run the following commands:: To run the tests, ensure that the ClickHouse server is running on http://localhost:8123/ (this is the default), and run:: - bin/nosetests \ No newline at end of file + bin/nosetests From e599b6b309312aa3694f5a9113a580fba617bc70 Mon Sep 17 00:00:00 2001 From: emakarov Date: Tue, 7 Feb 2017 20:56:13 +0300 Subject: [PATCH 04/21] README extended --- README.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 58e011c..fbd1965 100644 --- a/README.rst +++ b/README.rst @@ -275,7 +275,9 @@ A ``Buffer`` engine is available for BufferModels. (See below how to use BufferM engine = engines.Buffer(Person) # you need to initialize engine with main Model. Other default parameters will be used # or: - engine = engines.Buffer(Person, table, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): + engine = engines.Buffer(Person, table, num_layers=16, min_time=10, + max_time=100, min_rows=10000, max_rows=1000000, + min_bytes=10000000, max_bytes=100000000) Buffer Models ------------- From 86a3fec14386473a2fa83dbaebbaf5b248dcfd44 Mon Sep 17 00:00:00 2001 From: emakarov Date: Tue, 7 Feb 2017 21:32:51 +0300 Subject: [PATCH 05/21] cleaner code for Buffer engine and Buffer Model class --- README.rst | 6 ++---- src/infi/clickhouse_orm/engines.py | 8 ++++---- src/infi/clickhouse_orm/models.py | 7 ++----- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/README.rst b/README.rst index 43b9c91..9486e54 100644 --- a/README.rst +++ b/README.rst @@ -331,18 +331,16 @@ A ``Buffer`` engine is available for BufferModels. (See below how to use BufferM engine = engines.Buffer(Person) # you need to initialize engine with main Model. Other default parameters will be used # or: - engine = engines.Buffer(Person, table, num_layers=16, min_time=10, + engine = engines.Buffer(Person, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000) Buffer Models ------------- -Here's how you can define Model for Buffer Engine. The Buffer Model should be inherited from models.BufferModel and main model -Main model also should be specified in class:: +Here's how you can define Model for Buffer Engine. The Buffer Model should be inherited from models.BufferModel and main Model:: class PersonBuffer(models.BufferModel, Person): - main_model = Person engine = engines.Buffer(Person) Then you can insert objects into Buffer model and they will be handled by Clickhouse properly:: diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index aab4c39..47373e5 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -69,8 +69,8 @@ class Buffer(Engine): """ #Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes) - def __init__(self, table, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): - self.table = table + def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000): + self.main_model = main_model self.num_layers = num_layers self.min_time = min_time self.max_time = max_time @@ -80,11 +80,11 @@ class Buffer(Engine): self.max_bytes = max_bytes - def create_table_sql(self, db_name, main_model): + def create_table_sql(self, db_name): # Overriden create_table_sql example: #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' sql = 'ENGINE = Buffer(%s, %s, %d, %d, %d, %d, %d, %d, %d)' % ( - db_name, main_model.table_name(), self.num_layers, + db_name, self.main_model.table_name(), self.num_layers, self.min_time, self.max_time, self.min_rows, self.max_rows, self.min_bytes, self.max_bytes ) diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index bd9a71c..c71689e 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -166,15 +166,12 @@ class Model(with_metaclass(ModelBase)): class BufferModel(Model): - main_model = None # table's Model should be defined in implementation. It's a table where data will be flushed - @classmethod def create_table_sql(cls, db_name): ''' Returns the SQL command for creating a table for this model. ''' - parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.main_model.table_name())] - engine_str = cls.engine.create_table_sql(db_name, cls.main_model) + parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.engine.main_model.table_name())] + engine_str = cls.engine.create_table_sql(db_name) parts.append(engine_str) return ' '.join(parts) - From dfd4d09e7012a4b61fb96085e096124b89f6eae1 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Wed, 8 Feb 2017 15:19:39 +0200 Subject: [PATCH 06/21] DateField.to_python() - handle timezone-aware datetime values correctly --- src/infi/clickhouse_orm/fields.py | 2 +- tests/test_simple_fields.py | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/infi/clickhouse_orm/fields.py b/src/infi/clickhouse_orm/fields.py index e4115e8..46fb0c5 100644 --- a/src/infi/clickhouse_orm/fields.py +++ b/src/infi/clickhouse_orm/fields.py @@ -96,7 +96,7 @@ class DateField(Field): def to_python(self, value, timezone_in_use): if isinstance(value, datetime.datetime): - return value.date() + return value.astimezone(pytz.utc).date() if value.tzinfo else value.date() if isinstance(value, datetime.date): return value if isinstance(value, int): diff --git a/tests/test_simple_fields.py b/tests/test_simple_fields.py index c955574..2d1f2ab 100644 --- a/tests/test_simple_fields.py +++ b/tests/test_simple_fields.py @@ -38,6 +38,27 @@ class SimpleFieldsTest(unittest.TestCase): with self.assertRaises(ValueError): f.to_python(value, pytz.utc) + def test_date_field(self): + f = DateField() + epoch = date(1970, 1, 1) + # Valid values + for value in (datetime(1970, 1, 1), epoch, '1970-01-01', '0000-00-00', 0): + d = f.to_python(value, pytz.utc) + self.assertEquals(d, epoch) + # Verify that conversion to and from db string does not change value + d2 = f.to_python(f.to_db_string(d, quote=False), pytz.utc) + self.assertEquals(d, d2) + # Invalid values + for value in ('nope', '21/7/1999', 0.5): + with self.assertRaises(ValueError): + f.to_python(value, pytz.utc) + + def test_date_field_timezone(self): + # Verify that conversion of timezone-aware datetime is correct + f = DateField() + dt = datetime(2017, 10, 5, tzinfo=pytz.timezone('Asia/Jerusalem')) + self.assertEquals(f.to_python(dt, pytz.utc), date(2017, 10, 4)) + def test_uint8_field(self): f = UInt8Field() # Valid values From 77b33c0ed4b3147b0c7e68aeb9e7694735b87004 Mon Sep 17 00:00:00 2001 From: emakarov Date: Wed, 8 Feb 2017 23:21:48 +0300 Subject: [PATCH 07/21] test added. engine sql query fix to support special characters --- src/infi/clickhouse_orm/engines.py | 2 +- tests/test_database.py | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/infi/clickhouse_orm/engines.py b/src/infi/clickhouse_orm/engines.py index 47373e5..7c9a94c 100644 --- a/src/infi/clickhouse_orm/engines.py +++ b/src/infi/clickhouse_orm/engines.py @@ -83,7 +83,7 @@ class Buffer(Engine): def create_table_sql(self, db_name): # Overriden create_table_sql example: #sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)' - sql = 'ENGINE = Buffer(%s, %s, %d, %d, %d, %d, %d, %d, %d)' % ( + sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % ( db_name, self.main_model.table_name(), self.num_layers, self.min_time, self.max_time, self.min_rows, self.max_rows, self.min_bytes, self.max_bytes diff --git a/tests/test_database.py b/tests/test_database.py index 1e62472..973b15b 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -3,7 +3,7 @@ import unittest from infi.clickhouse_orm.database import Database, DatabaseException -from infi.clickhouse_orm.models import Model +from infi.clickhouse_orm.models import Model, BufferModel from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.engines import * @@ -16,8 +16,10 @@ class DatabaseTestCase(unittest.TestCase): def setUp(self): self.database = Database('test-db') self.database.create_table(Person) + self.database.create_table(PersonBuffer) def tearDown(self): + self.database.drop_table(PersonBuffer) self.database.drop_table(Person) self.database.drop_database() @@ -25,6 +27,10 @@ class DatabaseTestCase(unittest.TestCase): self.database.insert(data) self.assertEquals(count, self.database.count(Person)) + def _insert_and_check_buffer(self, data, count): + self.database.insert(data) + self.assertEquals(count, self.database.count(PersonBuffer)) + def test_insert__generator(self): self._insert_and_check(self._sample_data(), len(data)) @@ -129,10 +135,17 @@ class DatabaseTestCase(unittest.TestCase): self.database.drop_database() self.database = orig_database + def test_insert_buffer(self): + self._insert_and_check_buffer(self._sample_buffer_data(), len(data)) + def _sample_data(self): for entry in data: yield Person(**entry) + def _sample_buffer_data(self): + for entry in data: + yield PersonBuffer(**entry) + class Person(Model): @@ -144,6 +157,11 @@ class Person(Model): engine = MergeTree('birthday', ('first_name', 'last_name', 'birthday')) +class PersonBuffer(BufferModel, Person): + + engine = Buffer(Person) + + data = [ {"first_name": "Abdul", "last_name": "Hester", "birthday": "1970-12-02", "height": "1.63"}, {"first_name": "Adam", "last_name": "Goodman", "birthday": "1986-01-07", "height": "1.74"}, From db61efd4ccf1c2037b8ebcc13a3a30651f5efc38 Mon Sep 17 00:00:00 2001 From: M1ha Date: Tue, 31 Jan 2017 12:43:11 +0500 Subject: [PATCH 08/21] 1) Added readonly models 2) Added SystemPart models in order to execute partition operations --- src/infi/clickhouse_orm/database.py | 19 ++++ src/infi/clickhouse_orm/models.py | 10 +- src/infi/clickhouse_orm/system_models.py | 137 +++++++++++++++++++++++ tests/test_database.py | 26 +++++ tests/test_system_models.py | 69 ++++++++++++ 5 files changed, 256 insertions(+), 5 deletions(-) create mode 100644 src/infi/clickhouse_orm/system_models.py create mode 100644 tests/test_system_models.py diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index 43ca36e..48a7c91 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -39,9 +39,13 @@ class Database(object): def create_table(self, model_class): # TODO check that model has an engine + if model_class.readonly: + raise DatabaseException("You can't create read only table") self._send(model_class.create_table_sql(self.db_name)) def drop_table(self, model_class): + if model_class.readonly: + raise DatabaseException("You can't drop read only table") self._send(model_class.drop_table_sql(self.db_name)) def insert(self, model_instances, batch_size=1000): @@ -52,6 +56,10 @@ class Database(object): except StopIteration: return # model_instances is empty model_class = first_instance.__class__ + + if first_instance.readonly: + raise DatabaseException("You can't insert into read only table") + def gen(): yield self._substitute('INSERT INTO $table FORMAT TabSeparated\n', model_class).encode('utf-8') yield (first_instance.to_tsv(insertable_only=True) + '\n').encode('utf-8') @@ -88,6 +96,17 @@ class Database(object): for line in lines: yield model_class.from_tsv(line, field_names, self.server_timezone) + def raw(self, query, settings=None, stream=False): + """ + Performs raw query to database. Returns its output + :param query: Query to execute + :param settings: Query settings to send as query GET parameters + :param stream: If flag is true, Http response from ClickHouse will be streamed. + :return: Query execution result + """ + query = self._substitute(query, None) + return self._send(query, settings=settings, stream=stream).text + def paginate(self, model_class, order_by, page_num=1, page_size=100, conditions=None, settings=None): count = self.count(model_class, conditions) pages_total = int(ceil(count / float(page_size))) diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 444e32e..8cd7c97 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -1,11 +1,11 @@ -from .utils import escape, parse_tsv -from .engines import * -from .fields import Field +from logging import getLogger from six import with_metaclass import pytz -from logging import getLogger +from .fields import Field +from .utils import parse_tsv + logger = getLogger('clickhouse_orm') @@ -69,6 +69,7 @@ class Model(with_metaclass(ModelBase)): ''' engine = None + readonly = False def __init__(self, **kwargs): ''' @@ -163,4 +164,3 @@ class Model(with_metaclass(ModelBase)): fields = [f for f in fields if f[1].is_insertable()] return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in fields) - diff --git a/src/infi/clickhouse_orm/system_models.py b/src/infi/clickhouse_orm/system_models.py new file mode 100644 index 0000000..25ea0e0 --- /dev/null +++ b/src/infi/clickhouse_orm/system_models.py @@ -0,0 +1,137 @@ +""" +This file contains system readonly models that can be got from database +https://clickhouse.yandex/reference_en.html#System tables +""" +from .database import Database # Can't import it globally, due to circular import +from .fields import * +from .models import Model +from .engines import MergeTree + + +class SystemPart(Model): + """ + Contains information about parts of a table in the MergeTree family. + This model operates only fields, described in the reference. Other fields are ignored. + https://clickhouse.yandex/reference_en.html#system.parts + """ + OPERATIONS = frozenset({'DETACH', 'DROP', 'ATTACH', 'FREEZE', 'FETCH'}) + + readonly = True + + database = StringField() # Name of the database where the table that this part belongs to is located. + table = StringField() # Name of the table that this part belongs to. + engine = StringField() # Name of the table engine, without parameters. + partition = StringField() # Name of the partition, in the format YYYYMM. + name = StringField() # Name of the part. + replicated = UInt8Field() # Whether the part belongs to replicated data. + + # Whether the part is used in a table, or is no longer needed and will be deleted soon. + # Inactive parts remain after merging. + active = UInt8Field() + + # Number of marks - multiply by the index granularity (usually 8192) + # to get the approximate number of rows in the part. + marks = UInt64Field() + + bytes = UInt64Field() # Number of bytes when compressed. + + # Time the directory with the part was modified. Usually corresponds to the part's creation time. + modification_time = DateTimeField() + remove_time = DateTimeField() # For inactive parts only - the time when the part became inactive. + + # The number of places where the part is used. A value greater than 2 indicates + # that this part participates in queries or merges. + refcount = UInt32Field() + + @classmethod + def table_name(cls): + return 'system.parts' + + """ + Next methods return SQL for some operations, which can be done with partitions + https://clickhouse.yandex/reference_en.html#Manipulations with partitions and parts + """ + def _partition_operation_sql(self, db, operation, settings=None, from_part=None): + """ + Performs some operation over partition + :param db: Database object to execute operation on + :param operation: Operation to execute from SystemPart.OPERATIONS set + :param settings: Settings for executing request to ClickHouse over db.raw() method + :return: Operation execution result + """ + operation = operation.upper() + assert operation in self.OPERATIONS, "operation must be in [%s]" % ', '.join(self.OPERATIONS) + sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (db.db_name, self.table, operation, self.partition) + if from_part is not None: + sql += " FROM %s" % from_part + db.raw(sql, settings=settings, stream=False) + + def detach(self, database, settings=None): + """ + Move a partition to the 'detached' directory and forget it. + :param database: Database object to execute operation on + :param settings: Settings for executing request to ClickHouse over db.raw() method + :return: SQL Query + """ + return self._partition_operation_sql(database, 'DETACH', settings=settings) + + def drop(self, database, settings=None): + """ + Delete a partition + :param database: Database object to execute operation on + :param settings: Settings for executing request to ClickHouse over db.raw() method + :return: SQL Query + """ + return self._partition_operation_sql(database, 'DROP', settings=settings) + + def attach(self, database, settings=None): + """ + Add a new part or partition from the 'detached' directory to the table. + :param database: Database object to execute operation on + :param settings: Settings for executing request to ClickHouse over db.raw() method + :return: SQL Query + """ + return self._partition_operation_sql(database, 'ATTACH', settings=settings) + + def freeze(self, database, settings=None): + """ + Create a backup of a partition. + :param database: Database object to execute operation on + :param settings: Settings for executing request to ClickHouse over db.raw() method + :return: SQL Query + """ + return self._partition_operation_sql(database, 'FREEZE', settings=settings) + + def fetch(self, database, zookeeper_path, settings=None): + """ + Download a partition from another server. + :param database: Database object to execute operation on + :param zookeeper_path: Path in zookeeper to fetch from + :param settings: Settings for executing request to ClickHouse over db.raw() method + :return: SQL Query + """ + return self._partition_operation_sql(database, 'FETCH', settings=settings, from_part=zookeeper_path) + + @classmethod + def get_active(cls, database): + """ + Get all active parts + :param database: A database object to fetch data from. + :return: A list of SystemPart objects + """ + assert isinstance(database, Database), "database must be database.Database class instance" + field_names = ','.join([f[0] for f in cls._fields]) + return database.select("SELECT %s FROM %s WHERE active AND database='%s'" % + (field_names, cls.table_name(), database.db_name), model_class=cls) + + @classmethod + def all(cls, database): + """ + Gets all data from system.parts database + :param database: + :return: + """ + assert isinstance(database, Database), "database must be database.Database class instance" + field_names = ','.join([f[0] for f in cls._fields]) + return database.select("SELECT %s FROM %s WHERE database='%s'" % + (field_names, cls.table_name(), database.db_name), model_class=cls) diff --git a/tests/test_database.py b/tests/test_database.py index 1e62472..a3ad15c 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -133,6 +133,25 @@ class DatabaseTestCase(unittest.TestCase): for entry in data: yield Person(**entry) + def test_raw(self): + self._insert_and_check(self._sample_data(), len(data)) + query = "SELECT * FROM `test-db`.person WHERE first_name = 'Whitney' ORDER BY last_name" + results = self.database.raw(query) + self.assertEqual(results, "Whitney\tDurham\t1977-09-15\t1.72\nWhitney\tScott\t1971-07-04\t1.7\n") + + def test_insert_readonly(self): + m = ReadOnlyModel(name='readonly') + with self.assertRaises(DatabaseException): + self.database.insert([m]) + + def test_create_readonly_table(self): + with self.assertRaises(DatabaseException): + self.database.create_table(ReadOnlyModel) + + def test_drop_readonly_table(self): + with self.assertRaises(DatabaseException): + self.database.drop_table(ReadOnlyModel) + class Person(Model): @@ -144,6 +163,13 @@ class Person(Model): engine = MergeTree('birthday', ('first_name', 'last_name', 'birthday')) +class ReadOnlyModel(Model): + readonly = True + + name = StringField() + + + data = [ {"first_name": "Abdul", "last_name": "Hester", "birthday": "1970-12-02", "height": "1.63"}, {"first_name": "Adam", "last_name": "Goodman", "birthday": "1986-01-07", "height": "1.74"}, diff --git a/tests/test_system_models.py b/tests/test_system_models.py new file mode 100644 index 0000000..db97e5d --- /dev/null +++ b/tests/test_system_models.py @@ -0,0 +1,69 @@ +import unittest +from datetime import date + +import os +import shutil +from infi.clickhouse_orm.database import Database +from infi.clickhouse_orm.engines import * +from infi.clickhouse_orm.fields import * +from infi.clickhouse_orm.models import Model +from infi.clickhouse_orm.system_models import SystemPart + + +class SystemPartTest(unittest.TestCase): + BACKUP_DIR = '/opt/clickhouse/shadow/' + + def setUp(self): + self.database = Database('test-db') + self.database.create_table(TestTable) + self.database.insert([TestTable(date_field=date.today())]) + + def tearDown(self): + self.database.drop_database() + + def _get_backups_count(self): + _, dirnames, _ = next(os.walk(self.BACKUP_DIR)) + return len(dirnames) + + def test_get_all(self): + parts = SystemPart.all(self.database) + self.assertEqual(len(list(parts)), 1) + + def test_get_active(self): + parts = list(SystemPart.get_active(self.database)) + self.assertEqual(len(parts), 1) + parts[0].detach(self.database) + self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) + + def test_attach_detach(self): + parts = list(SystemPart.get_active(self.database)) + self.assertEqual(len(parts), 1) + parts[0].detach(self.database) + self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) + parts[0].attach(self.database) + self.assertEqual(len(list(SystemPart.get_active(self.database))), 1) + + def test_drop(self): + parts = list(SystemPart.get_active(self.database)) + parts[0].drop(self.database) + self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) + + def test_freeze(self): + parts = list(SystemPart.all(self.database)) + # There can be other backups in the folder + backups_count = self._get_backups_count() + parts[0].freeze(self.database) + backup_number = self._get_backups_count() + self.assertEqual(backup_number, backups_count + 1) + # Clean created backup + shutil.rmtree(self.BACKUP_DIR + '{0}'.format(backup_number)) + + def test_fetch(self): + # TODO Not tested, as I have no replication set + pass + + +class TestTable(Model): + date_field = DateField() + + engine = MergeTree('date_field', ('date_field',)) From 58b7a9aeacb13e7c26c436044f1a943ea9c632ce Mon Sep 17 00:00:00 2001 From: M1ha Date: Tue, 31 Jan 2017 18:13:46 +0500 Subject: [PATCH 09/21] 1) Added to_dict model method 2) Fixed bug in test_freeze cleaning, if backups don't contain all directory names (e. g. 1, 2, 3, 6, 7 - count=5, created_backup=8, not 6) --- src/infi/clickhouse_orm/models.py | 12 ++++++++++++ tests/test_models.py | 24 ++++++++++++++++++++++++ tests/test_system_models.py | 12 ++++++------ 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 8cd7c97..70ccc34 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -164,3 +164,15 @@ class Model(with_metaclass(ModelBase)): fields = [f for f in fields if f[1].is_insertable()] return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in fields) + def to_dict(self, insertable_only=False, field_names=None): + ''' + Returns the instance's column values as a dict. + :param bool insertable_only: If True, returns only fields, that can be inserted into database + :param field_names: An iterable of field names to return + ''' + fields = [f for f in self._fields if f[1].is_insertable()] if insertable_only else self._fields + if field_names is not None: + fields = [f for f in fields if f[0] in field_names] + + data = self.__dict__ + return {name: field.to_python(data[name]) for name, field in fields} diff --git a/tests/test_models.py b/tests/test_models.py index 4b259a9..239f0c8 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -55,6 +55,29 @@ class ModelTestCase(unittest.TestCase): instance.int_field = '99' self.assertEquals(instance.int_field, 99) + def test_to_dict(self): + instance = SimpleModel(date_field='1973-12-06', int_field='100', float_field='7') + self.assertDictEqual(instance.to_dict(), { + "date_field": datetime.date(1973, 12, 6), + "int_field": 100, + "float_field": 7.0, + "datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + "alias_field": 0.0, + 'str_field': 'dozo' + }) + self.assertDictEqual(instance.to_dict(insertable_only=True), { + "date_field": datetime.date(1973, 12, 6), + "int_field": 100, + "float_field": 7.0, + "datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + 'str_field': 'dozo' + }) + self.assertDictEqual( + instance.to_dict(insertable_only=True, field_names=('int_field', 'alias_field', 'datetime_field')), { + "int_field": 100, + "datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc) + }) + class SimpleModel(Model): @@ -63,6 +86,7 @@ class SimpleModel(Model): str_field = StringField(default='dozo') int_field = Int32Field(default=17) float_field = Float32Field() + alias_field = Float32Field(alias='float_field') engine = MergeTree('date_field', ('int_field', 'date_field')) diff --git a/tests/test_system_models.py b/tests/test_system_models.py index db97e5d..d8e075a 100644 --- a/tests/test_system_models.py +++ b/tests/test_system_models.py @@ -21,9 +21,9 @@ class SystemPartTest(unittest.TestCase): def tearDown(self): self.database.drop_database() - def _get_backups_count(self): + def _get_backups(self): _, dirnames, _ = next(os.walk(self.BACKUP_DIR)) - return len(dirnames) + return dirnames def test_get_all(self): parts = SystemPart.all(self.database) @@ -51,12 +51,12 @@ class SystemPartTest(unittest.TestCase): def test_freeze(self): parts = list(SystemPart.all(self.database)) # There can be other backups in the folder - backups_count = self._get_backups_count() + prev_backups = set(self._get_backups()) parts[0].freeze(self.database) - backup_number = self._get_backups_count() - self.assertEqual(backup_number, backups_count + 1) + backups = set(self._get_backups()) + self.assertEqual(len(backups), len(prev_backups) + 1) # Clean created backup - shutil.rmtree(self.BACKUP_DIR + '{0}'.format(backup_number)) + shutil.rmtree(self.BACKUP_DIR + '{0}'.format(list(backups - prev_backups)[0])) def test_fetch(self): # TODO Not tested, as I have no replication set From 5f2195f87fad4464e41f40621065b912c83b9e3a Mon Sep 17 00:00:00 2001 From: M1ha Date: Wed, 8 Feb 2017 10:31:19 +0500 Subject: [PATCH 10/21] 1) Replaced is_insertable() field mehtod with readonly property (unification with model and tables) 2) Method SystemPart.all() was replaced with get() 3) Added conditions parameter to SystemPart.get() and SystemPart.get_active() methods. --- README.rst | 23 +++++++++++++++++++ src/infi/clickhouse_orm/fields.py | 5 ++-- src/infi/clickhouse_orm/models.py | 7 +++--- src/infi/clickhouse_orm/system_models.py | 29 ++++++++++++++---------- tests/test_system_models.py | 10 ++++++-- 5 files changed, 54 insertions(+), 20 deletions(-) diff --git a/README.rst b/README.rst index 7dba0f9..4898e5c 100644 --- a/README.rst +++ b/README.rst @@ -182,6 +182,29 @@ You can optionally pass conditions to the query:: Note that ``order_by`` must be chosen so that the ordering is unique, otherwise there might be inconsistencies in the pagination (such as an instance that appears on two different pages). +System models +------------- +`Clickhouse docs ` +System models are read only models for implementing part of the system's functionality, +and for providing access to information about how the system is working. + +Usage example: + + >>>> from infi.clickhouse_orm import system_models + >>>> print(system_models.SystemPart.all()) + +Currently the fllowing system models are supported: + +=================== ======== ================= =================================================== +Class DB Table Pythonic Type Comments +=================== ======== ================= =================================================== +SystemPart + +Partitions and parts +-------------------- +`ClickHouse docs ` + + Schema Migrations ----------------- diff --git a/src/infi/clickhouse_orm/fields.py b/src/infi/clickhouse_orm/fields.py index 46fb0c5..1b4ee7e 100644 --- a/src/infi/clickhouse_orm/fields.py +++ b/src/infi/clickhouse_orm/fields.py @@ -70,8 +70,9 @@ class Field(object): else: return self.db_type - def is_insertable(self): - return self.alias is None and self.materialized is None + @property + def readonly(self): + return self.alias is not None or self.materialized is not None class StringField(Field): diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 70ccc34..9580079 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -159,9 +159,8 @@ class Model(with_metaclass(ModelBase)): :param bool insertable_only: If True, returns only fields, that can be inserted into database ''' data = self.__dict__ - fields = self._fields - if insertable_only: - fields = [f for f in fields if f[1].is_insertable()] + + fields = [f for f in self._fields if not f[1].readonly] if insertable_only else self._fields return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in fields) def to_dict(self, insertable_only=False, field_names=None): @@ -170,7 +169,7 @@ class Model(with_metaclass(ModelBase)): :param bool insertable_only: If True, returns only fields, that can be inserted into database :param field_names: An iterable of field names to return ''' - fields = [f for f in self._fields if f[1].is_insertable()] if insertable_only else self._fields + fields = [f for f in self._fields if not f[1].readonly] if insertable_only else self._fields if field_names is not None: fields = [f for f in fields if f[0] in field_names] diff --git a/src/infi/clickhouse_orm/system_models.py b/src/infi/clickhouse_orm/system_models.py index 25ea0e0..ce94cb6 100644 --- a/src/infi/clickhouse_orm/system_models.py +++ b/src/infi/clickhouse_orm/system_models.py @@ -113,25 +113,30 @@ class SystemPart(Model): return self._partition_operation_sql(database, 'FETCH', settings=settings, from_part=zookeeper_path) @classmethod - def get_active(cls, database): + def get(cls, database, conditions=""): """ - Get all active parts + Get all data from system.parts table :param database: A database object to fetch data from. + :param conditions: WHERE clause conditions. Database condition is added automatically :return: A list of SystemPart objects """ assert isinstance(database, Database), "database must be database.Database class instance" + assert isinstance(conditions, str), "conditions must be a string" + if conditions: + conditions += " AND" field_names = ','.join([f[0] for f in cls._fields]) - return database.select("SELECT %s FROM %s WHERE active AND database='%s'" % - (field_names, cls.table_name(), database.db_name), model_class=cls) + return database.select("SELECT %s FROM %s WHERE %s database='%s'" % + (field_names, cls.table_name(), conditions, database.db_name), model_class=cls) @classmethod - def all(cls, database): + def get_active(cls, database, conditions=""): """ - Gets all data from system.parts database - :param database: - :return: + Gets active data from system.parts table + :param database: A database object to fetch data from. + :param conditions: WHERE clause conditions. Database and active conditions are added automatically + :return: A list of SystemPart objects """ - assert isinstance(database, Database), "database must be database.Database class instance" - field_names = ','.join([f[0] for f in cls._fields]) - return database.select("SELECT %s FROM %s WHERE database='%s'" % - (field_names, cls.table_name(), database.db_name), model_class=cls) + if conditions: + conditions += ' AND ' + conditions += 'active' + return SystemPart.get(database, conditions=conditions) diff --git a/tests/test_system_models.py b/tests/test_system_models.py index d8e075a..ac1616c 100644 --- a/tests/test_system_models.py +++ b/tests/test_system_models.py @@ -26,7 +26,7 @@ class SystemPartTest(unittest.TestCase): return dirnames def test_get_all(self): - parts = SystemPart.all(self.database) + parts = SystemPart.get(self.database) self.assertEqual(len(list(parts)), 1) def test_get_active(self): @@ -35,6 +35,12 @@ class SystemPartTest(unittest.TestCase): parts[0].detach(self.database) self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) + def test_get_conditions(self): + parts = list(SystemPart.get(self.database, conditions="table='testtable'")) + self.assertEqual(len(parts), 1) + parts = list(SystemPart.get(self.database, conditions="table='othertable'")) + self.assertEqual(len(parts), 0) + def test_attach_detach(self): parts = list(SystemPart.get_active(self.database)) self.assertEqual(len(parts), 1) @@ -49,7 +55,7 @@ class SystemPartTest(unittest.TestCase): self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) def test_freeze(self): - parts = list(SystemPart.all(self.database)) + parts = list(SystemPart.get(self.database)) # There can be other backups in the folder prev_backups = set(self._get_backups()) parts[0].freeze(self.database) From 948ab53e5e2fa30ad8f6e87470c0978ebfc9c795 Mon Sep 17 00:00:00 2001 From: M1ha Date: Wed, 8 Feb 2017 10:56:30 +0500 Subject: [PATCH 11/21] Corrected docs --- README.rst | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/README.rst b/README.rst index 4898e5c..9830599 100644 --- a/README.rst +++ b/README.rst @@ -184,7 +184,9 @@ inconsistencies in the pagination (such as an instance that appears on two diffe System models ------------- + `Clickhouse docs ` + System models are read only models for implementing part of the system's functionality, and for providing access to information about how the system is working. @@ -193,17 +195,34 @@ Usage example: >>>> from infi.clickhouse_orm import system_models >>>> print(system_models.SystemPart.all()) -Currently the fllowing system models are supported: +Currently the following system models are supported: -=================== ======== ================= =================================================== -Class DB Table Pythonic Type Comments -=================== ======== ================= =================================================== -SystemPart +=================== ============ =================================================== +Class DB Table Comments +=================== ============ =================================================== +SystemPart system.parts Gives methods to work with partitions. See below. Partitions and parts -------------------- + `ClickHouse docs ` +A partition in a table is data for a single calendar month. Table "system.parts" contains information about each part. + +=================== ======================= ============================================================================================= +Method Parameters Comments +=================== ======================= ============================================================================================= +get(static) database, conditions="" Gets database partitions, filtered by conditions +get_active(static) database, conditions="" Gets only active (not detached or dropped) partitions, filtered by conditions +detach database, settings=None Detaches the partition. Settings is a dict of params to pass to http request +drop database, settings=None Drops the partition. Settings is a dict of params to pass to http request +attach database, settings=None Attaches already detached partition. Settings is a dict of params to pass to http request +freeze database, settings=None Freezes (makes backup) of the partition. Settings is a dict of params to pass to http request +fetch database, settings=None Fetches partition. Settings is a dict of params to pass to http request + +``Note``: system.parts stores information for all databases. To be correct, +SystemPart model was designed to receive only given database parts. + Schema Migrations ----------------- From 92adeb697fc8a0a4b8e9e605c3e3a12d44188a4a Mon Sep 17 00:00:00 2001 From: M1ha Date: Wed, 8 Feb 2017 10:59:28 +0500 Subject: [PATCH 12/21] Corrected tables in docs --- README.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.rst b/README.rst index 9830599..d68e04f 100644 --- a/README.rst +++ b/README.rst @@ -182,6 +182,7 @@ You can optionally pass conditions to the query:: Note that ``order_by`` must be chosen so that the ordering is unique, otherwise there might be inconsistencies in the pagination (such as an instance that appears on two different pages). + System models ------------- @@ -201,6 +202,8 @@ Currently the following system models are supported: Class DB Table Comments =================== ============ =================================================== SystemPart system.parts Gives methods to work with partitions. See below. +=================== ============ =================================================== + Partitions and parts -------------------- @@ -219,6 +222,7 @@ drop database, settings=None Drops the partition. Settings is attach database, settings=None Attaches already detached partition. Settings is a dict of params to pass to http request freeze database, settings=None Freezes (makes backup) of the partition. Settings is a dict of params to pass to http request fetch database, settings=None Fetches partition. Settings is a dict of params to pass to http request +=================== ======================= ============================================================================================= ``Note``: system.parts stores information for all databases. To be correct, SystemPart model was designed to receive only given database parts. From dddca2d3b57470cf210824543973e68d0cfa4258 Mon Sep 17 00:00:00 2001 From: M1ha Date: Wed, 8 Feb 2017 11:05:13 +0500 Subject: [PATCH 13/21] Corrected links and code --- README.rst | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/README.rst b/README.rst index d68e04f..4a3c7b8 100644 --- a/README.rst +++ b/README.rst @@ -186,15 +186,15 @@ inconsistencies in the pagination (such as an instance that appears on two diffe System models ------------- -`Clickhouse docs ` +`Clickhouse docs `_. System models are read only models for implementing part of the system's functionality, and for providing access to information about how the system is working. -Usage example: +Usage example:: >>>> from infi.clickhouse_orm import system_models - >>>> print(system_models.SystemPart.all()) + >>>> print(system_models.SystemPart.get(Database())) Currently the following system models are supported: @@ -208,7 +208,7 @@ SystemPart system.parts Gives methods to work with partitions. See Partitions and parts -------------------- -`ClickHouse docs ` +`ClickHouse docs `_. A partition in a table is data for a single calendar month. Table "system.parts" contains information about each part. @@ -324,7 +324,8 @@ You can create array fields containing any data type, for example:: Working with materialized and alias fields ****************************************** -ClickHouse provides an opportunity to create MATERIALIZED and ALIAS fields. +ClickHouse provides an opportunity to create MATERIALIZED and ALIAS Fields. + See documentation `here `_. Both field types can't be inserted into the database directly, so they are ignored when using the ``Database.insert()`` method. From 27217102da622b092030f7366993d6d7ca87aaf0 Mon Sep 17 00:00:00 2001 From: M1ha Date: Wed, 8 Feb 2017 11:37:40 +0500 Subject: [PATCH 14/21] Fixed bugs, connected to merge --- src/infi/clickhouse_orm/models.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 9580079..c168f18 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -163,15 +163,16 @@ class Model(with_metaclass(ModelBase)): fields = [f for f in self._fields if not f[1].readonly] if insertable_only else self._fields return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in fields) - def to_dict(self, insertable_only=False, field_names=None): + def to_dict(self, insertable_only=False, field_names=None, timezone_in_use=pytz.utc): ''' Returns the instance's column values as a dict. :param bool insertable_only: If True, returns only fields, that can be inserted into database :param field_names: An iterable of field names to return + :param timezone_in_use: timezone to convert DateField and DateTimeField. ''' fields = [f for f in self._fields if not f[1].readonly] if insertable_only else self._fields if field_names is not None: fields = [f for f in fields if f[0] in field_names] data = self.__dict__ - return {name: field.to_python(data[name]) for name, field in fields} + return {name: field.to_python(data[name], timezone_in_use) for name, field in fields} From 14f2ab78b5d0de1d3a7232efbd177298bdba7c6b Mon Sep 17 00:00:00 2001 From: M1ha Date: Wed, 8 Feb 2017 15:23:27 +0500 Subject: [PATCH 15/21] Fixed: 1) Added partition working to readme 2) replaced insertable_only parameter with include_readonly 3) Added empty string alias and materialized field control --- README.rst | 10 ++++++++-- src/infi/clickhouse_orm/database.py | 4 ++-- src/infi/clickhouse_orm/fields.py | 8 +++++--- src/infi/clickhouse_orm/models.py | 17 ++++++++--------- tests/test_models.py | 4 ++-- 5 files changed, 25 insertions(+), 18 deletions(-) diff --git a/README.rst b/README.rst index 4a3c7b8..0007dae 100644 --- a/README.rst +++ b/README.rst @@ -193,8 +193,14 @@ and for providing access to information about how the system is working. Usage example:: - >>>> from infi.clickhouse_orm import system_models - >>>> print(system_models.SystemPart.get(Database())) + from infi.clickhouse_orm.database import Database + from infi.clickhouse_orm.system_models import SystemPart + db = Database('my_test_db', db_url='http://192.168.1.1:8050', username='scott', password='tiger') + partitions = SystemPart.get_active(db, conditions='') # Getting all active partitions of the database + if len(partitions) > 0: + partitions = sorted(partitions, key=lambda obj: obj.name) # Partition name is YYYYMM, so we can sort so + partitions[0].freeze(db) # Make a backup in /opt/clickhouse/shadow directory + partitions[0].drop() # Dropped partition Currently the following system models are supported: diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index 48a7c91..6ef5d97 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -62,11 +62,11 @@ class Database(object): def gen(): yield self._substitute('INSERT INTO $table FORMAT TabSeparated\n', model_class).encode('utf-8') - yield (first_instance.to_tsv(insertable_only=True) + '\n').encode('utf-8') + yield (first_instance.to_tsv(include_readonly=False) + '\n').encode('utf-8') # Collect lines in batches of batch_size batch = [] for instance in i: - batch.append(instance.to_tsv(insertable_only=True)) + batch.append(instance.to_tsv(include_readonly=False)) if len(batch) >= batch_size: # Return the current batch of lines yield ('\n'.join(batch) + '\n').encode('utf-8') diff --git a/src/infi/clickhouse_orm/fields.py b/src/infi/clickhouse_orm/fields.py index 1b4ee7e..3fa49ec 100644 --- a/src/infi/clickhouse_orm/fields.py +++ b/src/infi/clickhouse_orm/fields.py @@ -16,8 +16,10 @@ class Field(object): def __init__(self, default=None, alias=None, materialized=None): assert (None, None) in {(default, alias), (alias, materialized), (default, materialized)}, \ "Only one of default, alias and materialized parameters can be given" - assert alias is None or isinstance(alias, str), "Alias field must be string field name, if given" - assert materialized is None or isinstance(materialized, str), "Materialized field must be string, if given" + assert alias is None or isinstance(alias, str) and alias != "",\ + "Alias field must be string field name, if given" + assert materialized is None or isinstance(materialized, str) and alias != "",\ + "Materialized field must be string, if given" self.creation_counter = Field.creation_counter Field.creation_counter += 1 @@ -72,7 +74,7 @@ class Field(object): @property def readonly(self): - return self.alias is not None or self.materialized is not None + return bool(self.alias or self.materialized) class StringField(Field): diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index c168f18..80cafb9 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -153,26 +153,25 @@ class Model(with_metaclass(ModelBase)): kwargs[name] = field.to_python(next(values), timezone_in_use) return cls(**kwargs) - def to_tsv(self, insertable_only=False): + def to_tsv(self, include_readonly=True): ''' Returns the instance's column values as a tab-separated line. A newline is not included. - :param bool insertable_only: If True, returns only fields, that can be inserted into database + :param bool include_readonly: If False, returns only fields, that can be inserted into database ''' data = self.__dict__ - - fields = [f for f in self._fields if not f[1].readonly] if insertable_only else self._fields + fields = self._fields if include_readonly else [f for f in self._fields if not f[1].readonly] return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in fields) - def to_dict(self, insertable_only=False, field_names=None, timezone_in_use=pytz.utc): + def to_dict(self, include_readonly=True, field_names=None): ''' Returns the instance's column values as a dict. - :param bool insertable_only: If True, returns only fields, that can be inserted into database + :param bool include_readonly: If False, returns only fields, that can be inserted into database :param field_names: An iterable of field names to return - :param timezone_in_use: timezone to convert DateField and DateTimeField. ''' - fields = [f for f in self._fields if not f[1].readonly] if insertable_only else self._fields + fields = self._fields if include_readonly else [f for f in self._fields if not f[1].readonly] + if field_names is not None: fields = [f for f in fields if f[0] in field_names] data = self.__dict__ - return {name: field.to_python(data[name], timezone_in_use) for name, field in fields} + return {name: data[name] for name, field in fields} diff --git a/tests/test_models.py b/tests/test_models.py index 239f0c8..b52a2c6 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -65,7 +65,7 @@ class ModelTestCase(unittest.TestCase): "alias_field": 0.0, 'str_field': 'dozo' }) - self.assertDictEqual(instance.to_dict(insertable_only=True), { + self.assertDictEqual(instance.to_dict(include_readonly=False), { "date_field": datetime.date(1973, 12, 6), "int_field": 100, "float_field": 7.0, @@ -73,7 +73,7 @@ class ModelTestCase(unittest.TestCase): 'str_field': 'dozo' }) self.assertDictEqual( - instance.to_dict(insertable_only=True, field_names=('int_field', 'alias_field', 'datetime_field')), { + instance.to_dict(include_readonly=False, field_names=('int_field', 'alias_field', 'datetime_field')), { "int_field": 100, "datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc) }) From 64f6288fdbdb6da2903b0fab5dc336c56da70a11 Mon Sep 17 00:00:00 2001 From: M1ha Date: Thu, 9 Feb 2017 09:50:32 +0500 Subject: [PATCH 16/21] Moved code example to another section --- README.rst | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/README.rst b/README.rst index 0007dae..85e4aa8 100644 --- a/README.rst +++ b/README.rst @@ -191,17 +191,6 @@ System models System models are read only models for implementing part of the system's functionality, and for providing access to information about how the system is working. -Usage example:: - - from infi.clickhouse_orm.database import Database - from infi.clickhouse_orm.system_models import SystemPart - db = Database('my_test_db', db_url='http://192.168.1.1:8050', username='scott', password='tiger') - partitions = SystemPart.get_active(db, conditions='') # Getting all active partitions of the database - if len(partitions) > 0: - partitions = sorted(partitions, key=lambda obj: obj.name) # Partition name is YYYYMM, so we can sort so - partitions[0].freeze(db) # Make a backup in /opt/clickhouse/shadow directory - partitions[0].drop() # Dropped partition - Currently the following system models are supported: =================== ============ =================================================== @@ -230,6 +219,17 @@ freeze database, settings=None Freezes (makes backup) of the pa fetch database, settings=None Fetches partition. Settings is a dict of params to pass to http request =================== ======================= ============================================================================================= +Usage example:: + + from infi.clickhouse_orm.database import Database + from infi.clickhouse_orm.system_models import SystemPart + db = Database('my_test_db', db_url='http://192.168.1.1:8050', username='scott', password='tiger') + partitions = SystemPart.get_active(db, conditions='') # Getting all active partitions of the database + if len(partitions) > 0: + partitions = sorted(partitions, key=lambda obj: obj.name) # Partition name is YYYYMM, so we can sort so + partitions[0].freeze(db) # Make a backup in /opt/clickhouse/shadow directory + partitions[0].drop() # Dropped partition + ``Note``: system.parts stores information for all databases. To be correct, SystemPart model was designed to receive only given database parts. From f3e75cfae33100973a79421a3df433474961a6b5 Mon Sep 17 00:00:00 2001 From: M1ha Date: Thu, 9 Feb 2017 16:49:25 +0500 Subject: [PATCH 17/21] 1) Removed database params for working with SystemPart operations 2) Added _database attribute to each model, got through select --- src/infi/clickhouse_orm/database.py | 2 +- src/infi/clickhouse_orm/models.py | 24 +++++++++++++++-- src/infi/clickhouse_orm/system_models.py | 34 ++++++++++-------------- tests/test_system_models.py | 10 +++---- 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index 6ef5d97..5c804c0 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -94,7 +94,7 @@ class Database(object): field_types = parse_tsv(next(lines)) model_class = model_class or ModelBase.create_ad_hoc_model(zip(field_names, field_types)) for line in lines: - yield model_class.from_tsv(line, field_names, self.server_timezone) + yield model_class.from_tsv(line, field_names, self.server_timezone, self) def raw(self, query, settings=None, stream=False): """ diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 80cafb9..4b7b1b2 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -79,6 +79,9 @@ class Model(with_metaclass(ModelBase)): Unrecognized field names will cause an AttributeError. ''' super(Model, self).__init__() + + self._database = None + # Assign field values from keyword arguments for name, value in kwargs.items(): field = self.get_field(name) @@ -102,6 +105,17 @@ class Model(with_metaclass(ModelBase)): field.validate(value) super(Model, self).__setattr__(name, value) + def set_database(self, db): + """ + Sets _database attribute for current model instance + :param db: Database instance + :return: None + """ + # This can not be imported globally due to circular import + from .database import Database + assert isinstance(db, Database), "database must be database.Database instance" + self._database = db + def get_field(self, name): ''' Get a Field instance given its name, or None if not found. @@ -138,11 +152,12 @@ class Model(with_metaclass(ModelBase)): return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db_name, cls.table_name()) @classmethod - def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc): + def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None): ''' Create a model instance from a tab-separated line. The line may or may not include a newline. The field_names list must match the fields defined in the model, but does not have to include all of them. If omitted, it is assumed to be the names of all fields in the model, in order of definition. + :param database: if given, model receives database ''' from six import next field_names = field_names or [name for name, field in cls._fields] @@ -151,7 +166,12 @@ class Model(with_metaclass(ModelBase)): for name in field_names: field = getattr(cls, name) kwargs[name] = field.to_python(next(values), timezone_in_use) - return cls(**kwargs) + + obj = cls(**kwargs) + if database is not None: + obj.set_database(database) + + return obj def to_tsv(self, include_readonly=True): ''' diff --git a/src/infi/clickhouse_orm/system_models.py b/src/infi/clickhouse_orm/system_models.py index ce94cb6..8c66550 100644 --- a/src/infi/clickhouse_orm/system_models.py +++ b/src/infi/clickhouse_orm/system_models.py @@ -2,10 +2,9 @@ This file contains system readonly models that can be got from database https://clickhouse.yandex/reference_en.html#System tables """ -from .database import Database # Can't import it globally, due to circular import +from .database import Database from .fields import * from .models import Model -from .engines import MergeTree class SystemPart(Model): @@ -51,7 +50,7 @@ class SystemPart(Model): Next methods return SQL for some operations, which can be done with partitions https://clickhouse.yandex/reference_en.html#Manipulations with partitions and parts """ - def _partition_operation_sql(self, db, operation, settings=None, from_part=None): + def _partition_operation_sql(self, operation, settings=None, from_part=None): """ Performs some operation over partition :param db: Database object to execute operation on @@ -61,56 +60,51 @@ class SystemPart(Model): """ operation = operation.upper() assert operation in self.OPERATIONS, "operation must be in [%s]" % ', '.join(self.OPERATIONS) - sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (db.db_name, self.table, operation, self.partition) + sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (self._database.db_name, self.table, operation, self.partition) if from_part is not None: sql += " FROM %s" % from_part - db.raw(sql, settings=settings, stream=False) + self._database.raw(sql, settings=settings, stream=False) - def detach(self, database, settings=None): + def detach(self, settings=None): """ Move a partition to the 'detached' directory and forget it. - :param database: Database object to execute operation on :param settings: Settings for executing request to ClickHouse over db.raw() method :return: SQL Query """ - return self._partition_operation_sql(database, 'DETACH', settings=settings) + return self._partition_operation_sql('DETACH', settings=settings) - def drop(self, database, settings=None): + def drop(self, settings=None): """ Delete a partition - :param database: Database object to execute operation on :param settings: Settings for executing request to ClickHouse over db.raw() method :return: SQL Query """ - return self._partition_operation_sql(database, 'DROP', settings=settings) + return self._partition_operation_sql('DROP', settings=settings) - def attach(self, database, settings=None): + def attach(self, settings=None): """ Add a new part or partition from the 'detached' directory to the table. - :param database: Database object to execute operation on :param settings: Settings for executing request to ClickHouse over db.raw() method :return: SQL Query """ - return self._partition_operation_sql(database, 'ATTACH', settings=settings) + return self._partition_operation_sql('ATTACH', settings=settings) - def freeze(self, database, settings=None): + def freeze(self, settings=None): """ Create a backup of a partition. - :param database: Database object to execute operation on :param settings: Settings for executing request to ClickHouse over db.raw() method :return: SQL Query """ - return self._partition_operation_sql(database, 'FREEZE', settings=settings) + return self._partition_operation_sql('FREEZE', settings=settings) - def fetch(self, database, zookeeper_path, settings=None): + def fetch(self, zookeeper_path, settings=None): """ Download a partition from another server. - :param database: Database object to execute operation on :param zookeeper_path: Path in zookeeper to fetch from :param settings: Settings for executing request to ClickHouse over db.raw() method :return: SQL Query """ - return self._partition_operation_sql(database, 'FETCH', settings=settings, from_part=zookeeper_path) + return self._partition_operation_sql('FETCH', settings=settings, from_part=zookeeper_path) @classmethod def get(cls, database, conditions=""): diff --git a/tests/test_system_models.py b/tests/test_system_models.py index ac1616c..756e5cd 100644 --- a/tests/test_system_models.py +++ b/tests/test_system_models.py @@ -32,7 +32,7 @@ class SystemPartTest(unittest.TestCase): def test_get_active(self): parts = list(SystemPart.get_active(self.database)) self.assertEqual(len(parts), 1) - parts[0].detach(self.database) + parts[0].detach() self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) def test_get_conditions(self): @@ -44,21 +44,21 @@ class SystemPartTest(unittest.TestCase): def test_attach_detach(self): parts = list(SystemPart.get_active(self.database)) self.assertEqual(len(parts), 1) - parts[0].detach(self.database) + parts[0].detach() self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) - parts[0].attach(self.database) + parts[0].attach() self.assertEqual(len(list(SystemPart.get_active(self.database))), 1) def test_drop(self): parts = list(SystemPart.get_active(self.database)) - parts[0].drop(self.database) + parts[0].drop() self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) def test_freeze(self): parts = list(SystemPart.get(self.database)) # There can be other backups in the folder prev_backups = set(self._get_backups()) - parts[0].freeze(self.database) + parts[0].freeze() backups = set(self._get_backups()) self.assertEqual(len(backups), len(prev_backups) + 1) # Clean created backup From adff766246860a16c7fc9d2380c66cdc2335138c Mon Sep 17 00:00:00 2001 From: M1ha Date: Thu, 9 Feb 2017 16:52:05 +0500 Subject: [PATCH 18/21] Edited docs (SystemPart) --- README.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index 85e4aa8..f54bf4b 100644 --- a/README.rst +++ b/README.rst @@ -212,11 +212,11 @@ Method Parameters Comments =================== ======================= ============================================================================================= get(static) database, conditions="" Gets database partitions, filtered by conditions get_active(static) database, conditions="" Gets only active (not detached or dropped) partitions, filtered by conditions -detach database, settings=None Detaches the partition. Settings is a dict of params to pass to http request -drop database, settings=None Drops the partition. Settings is a dict of params to pass to http request -attach database, settings=None Attaches already detached partition. Settings is a dict of params to pass to http request -freeze database, settings=None Freezes (makes backup) of the partition. Settings is a dict of params to pass to http request -fetch database, settings=None Fetches partition. Settings is a dict of params to pass to http request +detach settings=None Detaches the partition. Settings is a dict of params to pass to http request +drop settings=None Drops the partition. Settings is a dict of params to pass to http request +attach settings=None Attaches already detached partition. Settings is a dict of params to pass to http request +freeze settings=None Freezes (makes backup) of the partition. Settings is a dict of params to pass to http request +fetch settings=None Fetches partition. Settings is a dict of params to pass to http request =================== ======================= ============================================================================================= Usage example:: @@ -227,7 +227,7 @@ Usage example:: partitions = SystemPart.get_active(db, conditions='') # Getting all active partitions of the database if len(partitions) > 0: partitions = sorted(partitions, key=lambda obj: obj.name) # Partition name is YYYYMM, so we can sort so - partitions[0].freeze(db) # Make a backup in /opt/clickhouse/shadow directory + partitions[0].freeze() # Make a backup in /opt/clickhouse/shadow directory partitions[0].drop() # Dropped partition ``Note``: system.parts stores information for all databases. To be correct, From 6f975a801ca745e8c12ac06a315fc55b0d348808 Mon Sep 17 00:00:00 2001 From: M1ha Date: Thu, 9 Feb 2017 17:10:48 +0500 Subject: [PATCH 19/21] 1) Added get_database method to Model 2) Added some assertions in tests for adding _database attribute in selects and inserts 3) database.insert() method sets _database --- src/infi/clickhouse_orm/database.py | 2 ++ src/infi/clickhouse_orm/models.py | 7 +++++++ tests/test_database.py | 8 ++++++++ 3 files changed, 17 insertions(+) diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index 5c804c0..2be8534 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -62,10 +62,12 @@ class Database(object): def gen(): yield self._substitute('INSERT INTO $table FORMAT TabSeparated\n', model_class).encode('utf-8') + first_instance.set_database(self) yield (first_instance.to_tsv(include_readonly=False) + '\n').encode('utf-8') # Collect lines in batches of batch_size batch = [] for instance in i: + instance.set_database(self) batch.append(instance.to_tsv(include_readonly=False)) if len(batch) >= batch_size: # Return the current batch of lines diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 4b7b1b2..6b82244 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -116,6 +116,13 @@ class Model(with_metaclass(ModelBase)): assert isinstance(db, Database), "database must be database.Database instance" self._database = db + def get_database(self): + """ + Gets _database attribute for current model instance + :return: database.Database instance, model was inserted or selected from or None + """ + return self._database + def get_field(self, name): ''' Get a Field instance given its name, or None if not found. diff --git a/tests/test_database.py b/tests/test_database.py index a3ad15c..aa7a4df 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -24,6 +24,8 @@ class DatabaseTestCase(unittest.TestCase): def _insert_and_check(self, data, count): self.database.insert(data) self.assertEquals(count, self.database.count(Person)) + for instance in data: + self.assertEquals(self.database, instance.get_database()) def test_insert__generator(self): self._insert_and_check(self._sample_data(), len(data)) @@ -53,6 +55,8 @@ class DatabaseTestCase(unittest.TestCase): self.assertEquals(results[0].height, 1.72) self.assertEquals(results[1].last_name, 'Scott') self.assertEquals(results[1].height, 1.70) + self.assertEqual(results[0].get_database(), self.database) + self.assertEqual(results[1].get_database(), self.database) def test_select_partial_fields(self): self._insert_and_check(self._sample_data(), len(data)) @@ -63,6 +67,8 @@ class DatabaseTestCase(unittest.TestCase): self.assertEquals(results[0].height, 0) # default value self.assertEquals(results[1].last_name, 'Scott') self.assertEquals(results[1].height, 0) # default value + self.assertEqual(results[0].get_database(), self.database) + self.assertEqual(results[1].get_database(), self.database) def test_select_ad_hoc_model(self): self._insert_and_check(self._sample_data(), len(data)) @@ -74,6 +80,8 @@ class DatabaseTestCase(unittest.TestCase): self.assertEquals(results[0].height, 1.72) self.assertEquals(results[1].last_name, 'Scott') self.assertEquals(results[1].height, 1.70) + self.assertEqual(results[0].get_database(), self.database) + self.assertEqual(results[1].get_database(), self.database) def test_pagination(self): self._insert_and_check(self._sample_data(), len(data)) From 58e322cc5241dbca6942e7cd8dfdcf76a4983d3f Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sun, 19 Feb 2017 09:01:02 +0200 Subject: [PATCH 20/21] fix test --- tests/test_system_models.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_system_models.py b/tests/test_system_models.py index 756e5cd..544713b 100644 --- a/tests/test_system_models.py +++ b/tests/test_system_models.py @@ -22,6 +22,8 @@ class SystemPartTest(unittest.TestCase): self.database.drop_database() def _get_backups(self): + if not os.path.exists(self.BACKUP_DIR): + return [] _, dirnames, _ = next(os.walk(self.BACKUP_DIR)) return dirnames From f70d0697a78e74be827dbe2a3301a22a44e5e699 Mon Sep 17 00:00:00 2001 From: Itai Shirav Date: Sun, 19 Feb 2017 09:30:19 +0200 Subject: [PATCH 21/21] Updated changelog --- CHANGELOG.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 59d97c0..b1210f7 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -8,6 +8,10 @@ Change Log - Pagination: passing -1 as the page number now returns the last page - Accept datetime values for date fields (Zloool) - Support readonly mode in Database class (tswr) +- Added support for the Buffer table engine (emakarov) +- Added the SystemPart readonly model, which provides operations on partitions (M1ha) +- Added Model.to_dict() that converts a model instance to a dictionary (M1ha) +- Added Database.raw() to perform arbitrary queries (M1ha) v0.7.1 ------