From b9fce94b042bc7b273efa1457b2a5d68b0d690d3 Mon Sep 17 00:00:00 2001 From: M1ha Date: Tue, 31 Jan 2017 12:43:11 +0500 Subject: [PATCH] 1) Added readonly models 2) Added SystemPart models in order to execute partition operations --- src/infi/clickhouse_orm/database.py | 19 ++++ src/infi/clickhouse_orm/models.py | 10 +- src/infi/clickhouse_orm/system_models.py | 137 +++++++++++++++++++++++ tests/test_database.py | 28 ++++- tests/test_system_models.py | 69 ++++++++++++ 5 files changed, 257 insertions(+), 6 deletions(-) create mode 100644 src/infi/clickhouse_orm/system_models.py create mode 100644 tests/test_system_models.py diff --git a/src/infi/clickhouse_orm/database.py b/src/infi/clickhouse_orm/database.py index 73bbc13..892d92a 100644 --- a/src/infi/clickhouse_orm/database.py +++ b/src/infi/clickhouse_orm/database.py @@ -35,9 +35,13 @@ class Database(object): def create_table(self, model_class): # TODO check that model has an engine + if model_class.readonly: + raise DatabaseException("You can't create read only table") self._send(model_class.create_table_sql(self.db_name)) def drop_table(self, model_class): + if model_class.readonly: + raise DatabaseException("You can't drop read only table") self._send(model_class.drop_table_sql(self.db_name)) def insert(self, model_instances, batch_size=1000): @@ -48,6 +52,10 @@ class Database(object): except StopIteration: return # model_instances is empty model_class = first_instance.__class__ + + if first_instance.readonly: + raise DatabaseException("You can't insert into read only table") + def gen(): yield self._substitute('INSERT INTO $table FORMAT TabSeparated\n', model_class).encode('utf-8') yield (first_instance.to_tsv(insertable_only=True) + '\n').encode('utf-8') @@ -84,6 +92,17 @@ class Database(object): for line in lines: yield model_class.from_tsv(line, field_names) + def raw(self, query, settings=None, stream=False): + """ + Performs raw query to database. Returns its output + :param query: Query to execute + :param settings: Query settings to send as query GET parameters + :param stream: If flag is true, Http response from ClickHouse will be streamed. + :return: Query execution result + """ + query = self._substitute(query, None) + return self._send(query, settings=settings, stream=stream).text + def paginate(self, model_class, order_by, page_num=1, page_size=100, conditions=None, settings=None): count = self.count(model_class, conditions) pages_total = int(ceil(count / float(page_size))) diff --git a/src/infi/clickhouse_orm/models.py b/src/infi/clickhouse_orm/models.py index 16f6f77..09127e3 100644 --- a/src/infi/clickhouse_orm/models.py +++ b/src/infi/clickhouse_orm/models.py @@ -1,10 +1,10 @@ -from .utils import escape, parse_tsv -from .engines import * -from .fields import Field +from logging import getLogger from six import with_metaclass -from logging import getLogger +from .fields import Field +from .utils import parse_tsv + logger = getLogger('clickhouse_orm') @@ -68,6 +68,7 @@ class Model(with_metaclass(ModelBase)): ''' engine = None + readonly = False def __init__(self, **kwargs): ''' @@ -160,4 +161,3 @@ class Model(with_metaclass(ModelBase)): fields = [f for f in self._fields if f[1].is_insertable()] if insertable_only else self._fields return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in fields) - diff --git a/src/infi/clickhouse_orm/system_models.py b/src/infi/clickhouse_orm/system_models.py new file mode 100644 index 0000000..25ea0e0 --- /dev/null +++ b/src/infi/clickhouse_orm/system_models.py @@ -0,0 +1,137 @@ +""" +This file contains system readonly models that can be got from database +https://clickhouse.yandex/reference_en.html#System tables +""" +from .database import Database # Can't import it globally, due to circular import +from .fields import * +from .models import Model +from .engines import MergeTree + + +class SystemPart(Model): + """ + Contains information about parts of a table in the MergeTree family. + This model operates only fields, described in the reference. Other fields are ignored. + https://clickhouse.yandex/reference_en.html#system.parts + """ + OPERATIONS = frozenset({'DETACH', 'DROP', 'ATTACH', 'FREEZE', 'FETCH'}) + + readonly = True + + database = StringField() # Name of the database where the table that this part belongs to is located. + table = StringField() # Name of the table that this part belongs to. + engine = StringField() # Name of the table engine, without parameters. + partition = StringField() # Name of the partition, in the format YYYYMM. + name = StringField() # Name of the part. + replicated = UInt8Field() # Whether the part belongs to replicated data. + + # Whether the part is used in a table, or is no longer needed and will be deleted soon. + # Inactive parts remain after merging. + active = UInt8Field() + + # Number of marks - multiply by the index granularity (usually 8192) + # to get the approximate number of rows in the part. + marks = UInt64Field() + + bytes = UInt64Field() # Number of bytes when compressed. + + # Time the directory with the part was modified. Usually corresponds to the part's creation time. + modification_time = DateTimeField() + remove_time = DateTimeField() # For inactive parts only - the time when the part became inactive. + + # The number of places where the part is used. A value greater than 2 indicates + # that this part participates in queries or merges. + refcount = UInt32Field() + + @classmethod + def table_name(cls): + return 'system.parts' + + """ + Next methods return SQL for some operations, which can be done with partitions + https://clickhouse.yandex/reference_en.html#Manipulations with partitions and parts + """ + def _partition_operation_sql(self, db, operation, settings=None, from_part=None): + """ + Performs some operation over partition + :param db: Database object to execute operation on + :param operation: Operation to execute from SystemPart.OPERATIONS set + :param settings: Settings for executing request to ClickHouse over db.raw() method + :return: Operation execution result + """ + operation = operation.upper() + assert operation in self.OPERATIONS, "operation must be in [%s]" % ', '.join(self.OPERATIONS) + sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (db.db_name, self.table, operation, self.partition) + if from_part is not None: + sql += " FROM %s" % from_part + db.raw(sql, settings=settings, stream=False) + + def detach(self, database, settings=None): + """ + Move a partition to the 'detached' directory and forget it. + :param database: Database object to execute operation on + :param settings: Settings for executing request to ClickHouse over db.raw() method + :return: SQL Query + """ + return self._partition_operation_sql(database, 'DETACH', settings=settings) + + def drop(self, database, settings=None): + """ + Delete a partition + :param database: Database object to execute operation on + :param settings: Settings for executing request to ClickHouse over db.raw() method + :return: SQL Query + """ + return self._partition_operation_sql(database, 'DROP', settings=settings) + + def attach(self, database, settings=None): + """ + Add a new part or partition from the 'detached' directory to the table. + :param database: Database object to execute operation on + :param settings: Settings for executing request to ClickHouse over db.raw() method + :return: SQL Query + """ + return self._partition_operation_sql(database, 'ATTACH', settings=settings) + + def freeze(self, database, settings=None): + """ + Create a backup of a partition. + :param database: Database object to execute operation on + :param settings: Settings for executing request to ClickHouse over db.raw() method + :return: SQL Query + """ + return self._partition_operation_sql(database, 'FREEZE', settings=settings) + + def fetch(self, database, zookeeper_path, settings=None): + """ + Download a partition from another server. + :param database: Database object to execute operation on + :param zookeeper_path: Path in zookeeper to fetch from + :param settings: Settings for executing request to ClickHouse over db.raw() method + :return: SQL Query + """ + return self._partition_operation_sql(database, 'FETCH', settings=settings, from_part=zookeeper_path) + + @classmethod + def get_active(cls, database): + """ + Get all active parts + :param database: A database object to fetch data from. + :return: A list of SystemPart objects + """ + assert isinstance(database, Database), "database must be database.Database class instance" + field_names = ','.join([f[0] for f in cls._fields]) + return database.select("SELECT %s FROM %s WHERE active AND database='%s'" % + (field_names, cls.table_name(), database.db_name), model_class=cls) + + @classmethod + def all(cls, database): + """ + Gets all data from system.parts database + :param database: + :return: + """ + assert isinstance(database, Database), "database must be database.Database class instance" + field_names = ','.join([f[0] for f in cls._fields]) + return database.select("SELECT %s FROM %s WHERE database='%s'" % + (field_names, cls.table_name(), database.db_name), model_class=cls) diff --git a/tests/test_database.py b/tests/test_database.py index 30c25d3..8e9ba16 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -2,7 +2,7 @@ import unittest -from infi.clickhouse_orm.database import Database +from infi.clickhouse_orm.database import Database, DatabaseException from infi.clickhouse_orm.models import Model from infi.clickhouse_orm.fields import * from infi.clickhouse_orm.engines import * @@ -121,6 +121,25 @@ class DatabaseTestCase(unittest.TestCase): for entry in data: yield Person(**entry) + def test_raw(self): + self._insert_and_check(self._sample_data(), len(data)) + query = "SELECT * FROM `test-db`.person WHERE first_name = 'Whitney' ORDER BY last_name" + results = self.database.raw(query) + self.assertEqual(results, "Whitney\tDurham\t1977-09-15\t1.72\nWhitney\tScott\t1971-07-04\t1.7\n") + + def test_insert_readonly(self): + m = ReadOnlyModel(name='readonly') + with self.assertRaises(DatabaseException): + self.database.insert([m]) + + def test_create_readonly_table(self): + with self.assertRaises(DatabaseException): + self.database.create_table(ReadOnlyModel) + + def test_drop_readonly_table(self): + with self.assertRaises(DatabaseException): + self.database.drop_table(ReadOnlyModel) + class Person(Model): @@ -132,6 +151,13 @@ class Person(Model): engine = MergeTree('birthday', ('first_name', 'last_name', 'birthday')) +class ReadOnlyModel(Model): + readonly = True + + name = StringField() + + + data = [ {"first_name": "Abdul", "last_name": "Hester", "birthday": "1970-12-02", "height": "1.63"}, {"first_name": "Adam", "last_name": "Goodman", "birthday": "1986-01-07", "height": "1.74"}, diff --git a/tests/test_system_models.py b/tests/test_system_models.py new file mode 100644 index 0000000..db97e5d --- /dev/null +++ b/tests/test_system_models.py @@ -0,0 +1,69 @@ +import unittest +from datetime import date + +import os +import shutil +from infi.clickhouse_orm.database import Database +from infi.clickhouse_orm.engines import * +from infi.clickhouse_orm.fields import * +from infi.clickhouse_orm.models import Model +from infi.clickhouse_orm.system_models import SystemPart + + +class SystemPartTest(unittest.TestCase): + BACKUP_DIR = '/opt/clickhouse/shadow/' + + def setUp(self): + self.database = Database('test-db') + self.database.create_table(TestTable) + self.database.insert([TestTable(date_field=date.today())]) + + def tearDown(self): + self.database.drop_database() + + def _get_backups_count(self): + _, dirnames, _ = next(os.walk(self.BACKUP_DIR)) + return len(dirnames) + + def test_get_all(self): + parts = SystemPart.all(self.database) + self.assertEqual(len(list(parts)), 1) + + def test_get_active(self): + parts = list(SystemPart.get_active(self.database)) + self.assertEqual(len(parts), 1) + parts[0].detach(self.database) + self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) + + def test_attach_detach(self): + parts = list(SystemPart.get_active(self.database)) + self.assertEqual(len(parts), 1) + parts[0].detach(self.database) + self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) + parts[0].attach(self.database) + self.assertEqual(len(list(SystemPart.get_active(self.database))), 1) + + def test_drop(self): + parts = list(SystemPart.get_active(self.database)) + parts[0].drop(self.database) + self.assertEqual(len(list(SystemPart.get_active(self.database))), 0) + + def test_freeze(self): + parts = list(SystemPart.all(self.database)) + # There can be other backups in the folder + backups_count = self._get_backups_count() + parts[0].freeze(self.database) + backup_number = self._get_backups_count() + self.assertEqual(backup_number, backups_count + 1) + # Clean created backup + shutil.rmtree(self.BACKUP_DIR + '{0}'.format(backup_number)) + + def test_fetch(self): + # TODO Not tested, as I have no replication set + pass + + +class TestTable(Model): + date_field = DateField() + + engine = MergeTree('date_field', ('date_field',))