1) Removed database params for working with SystemPart operations

2) Added _database attribute to each model, got through select
This commit is contained in:
M1ha 2017-02-09 16:49:25 +05:00 committed by Itai Shirav
parent 64f6288fdb
commit f3e75cfae3
4 changed files with 42 additions and 28 deletions

View File

@ -94,7 +94,7 @@ class Database(object):
field_types = parse_tsv(next(lines))
model_class = model_class or ModelBase.create_ad_hoc_model(zip(field_names, field_types))
for line in lines:
yield model_class.from_tsv(line, field_names, self.server_timezone)
yield model_class.from_tsv(line, field_names, self.server_timezone, self)
def raw(self, query, settings=None, stream=False):
"""

View File

@ -79,6 +79,9 @@ class Model(with_metaclass(ModelBase)):
Unrecognized field names will cause an AttributeError.
'''
super(Model, self).__init__()
self._database = None
# Assign field values from keyword arguments
for name, value in kwargs.items():
field = self.get_field(name)
@ -102,6 +105,17 @@ class Model(with_metaclass(ModelBase)):
field.validate(value)
super(Model, self).__setattr__(name, value)
def set_database(self, db):
"""
Sets _database attribute for current model instance
:param db: Database instance
:return: None
"""
# This can not be imported globally due to circular import
from .database import Database
assert isinstance(db, Database), "database must be database.Database instance"
self._database = db
def get_field(self, name):
'''
Get a Field instance given its name, or None if not found.
@ -138,11 +152,12 @@ class Model(with_metaclass(ModelBase)):
return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db_name, cls.table_name())
@classmethod
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc):
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None):
'''
Create a model instance from a tab-separated line. The line may or may not include a newline.
The field_names list must match the fields defined in the model, but does not have to include all of them.
If omitted, it is assumed to be the names of all fields in the model, in order of definition.
:param database: if given, model receives database
'''
from six import next
field_names = field_names or [name for name, field in cls._fields]
@ -151,7 +166,12 @@ class Model(with_metaclass(ModelBase)):
for name in field_names:
field = getattr(cls, name)
kwargs[name] = field.to_python(next(values), timezone_in_use)
return cls(**kwargs)
obj = cls(**kwargs)
if database is not None:
obj.set_database(database)
return obj
def to_tsv(self, include_readonly=True):
'''

View File

@ -2,10 +2,9 @@
This file contains system readonly models that can be got from database
https://clickhouse.yandex/reference_en.html#System tables
"""
from .database import Database # Can't import it globally, due to circular import
from .database import Database
from .fields import *
from .models import Model
from .engines import MergeTree
class SystemPart(Model):
@ -51,7 +50,7 @@ class SystemPart(Model):
Next methods return SQL for some operations, which can be done with partitions
https://clickhouse.yandex/reference_en.html#Manipulations with partitions and parts
"""
def _partition_operation_sql(self, db, operation, settings=None, from_part=None):
def _partition_operation_sql(self, operation, settings=None, from_part=None):
"""
Performs some operation over partition
:param db: Database object to execute operation on
@ -61,56 +60,51 @@ class SystemPart(Model):
"""
operation = operation.upper()
assert operation in self.OPERATIONS, "operation must be in [%s]" % ', '.join(self.OPERATIONS)
sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (db.db_name, self.table, operation, self.partition)
sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (self._database.db_name, self.table, operation, self.partition)
if from_part is not None:
sql += " FROM %s" % from_part
db.raw(sql, settings=settings, stream=False)
self._database.raw(sql, settings=settings, stream=False)
def detach(self, database, settings=None):
def detach(self, settings=None):
"""
Move a partition to the 'detached' directory and forget it.
:param database: Database object to execute operation on
:param settings: Settings for executing request to ClickHouse over db.raw() method
:return: SQL Query
"""
return self._partition_operation_sql(database, 'DETACH', settings=settings)
return self._partition_operation_sql('DETACH', settings=settings)
def drop(self, database, settings=None):
def drop(self, settings=None):
"""
Delete a partition
:param database: Database object to execute operation on
:param settings: Settings for executing request to ClickHouse over db.raw() method
:return: SQL Query
"""
return self._partition_operation_sql(database, 'DROP', settings=settings)
return self._partition_operation_sql('DROP', settings=settings)
def attach(self, database, settings=None):
def attach(self, settings=None):
"""
Add a new part or partition from the 'detached' directory to the table.
:param database: Database object to execute operation on
:param settings: Settings for executing request to ClickHouse over db.raw() method
:return: SQL Query
"""
return self._partition_operation_sql(database, 'ATTACH', settings=settings)
return self._partition_operation_sql('ATTACH', settings=settings)
def freeze(self, database, settings=None):
def freeze(self, settings=None):
"""
Create a backup of a partition.
:param database: Database object to execute operation on
:param settings: Settings for executing request to ClickHouse over db.raw() method
:return: SQL Query
"""
return self._partition_operation_sql(database, 'FREEZE', settings=settings)
return self._partition_operation_sql('FREEZE', settings=settings)
def fetch(self, database, zookeeper_path, settings=None):
def fetch(self, zookeeper_path, settings=None):
"""
Download a partition from another server.
:param database: Database object to execute operation on
:param zookeeper_path: Path in zookeeper to fetch from
:param settings: Settings for executing request to ClickHouse over db.raw() method
:return: SQL Query
"""
return self._partition_operation_sql(database, 'FETCH', settings=settings, from_part=zookeeper_path)
return self._partition_operation_sql('FETCH', settings=settings, from_part=zookeeper_path)
@classmethod
def get(cls, database, conditions=""):

View File

@ -32,7 +32,7 @@ class SystemPartTest(unittest.TestCase):
def test_get_active(self):
parts = list(SystemPart.get_active(self.database))
self.assertEqual(len(parts), 1)
parts[0].detach(self.database)
parts[0].detach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
def test_get_conditions(self):
@ -44,21 +44,21 @@ class SystemPartTest(unittest.TestCase):
def test_attach_detach(self):
parts = list(SystemPart.get_active(self.database))
self.assertEqual(len(parts), 1)
parts[0].detach(self.database)
parts[0].detach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
parts[0].attach(self.database)
parts[0].attach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 1)
def test_drop(self):
parts = list(SystemPart.get_active(self.database))
parts[0].drop(self.database)
parts[0].drop()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
def test_freeze(self):
parts = list(SystemPart.get(self.database))
# There can be other backups in the folder
prev_backups = set(self._get_backups())
parts[0].freeze(self.database)
parts[0].freeze()
backups = set(self._get_backups())
self.assertEqual(len(backups), len(prev_backups) + 1)
# Clean created backup