Finished Release v0.8.0

This commit is contained in:
Itai Shirav 2017-02-22 10:52:40 +02:00
commit 9a81372fff
16 changed files with 930 additions and 60 deletions

31
CHANGELOG.rst Normal file
View File

@ -0,0 +1,31 @@
Change Log
==========
v0.8.0
------
- Always keep datetime fields in UTC internally, and convert server timezone to UTC when parsing query results
- Support for ALIAS and MATERIALIZED fields (M1ha)
- Pagination: passing -1 as the page number now returns the last page
- Accept datetime values for date fields (Zloool)
- Support readonly mode in Database class (tswr)
- Added support for the Buffer table engine (emakarov)
- Added the SystemPart readonly model, which provides operations on partitions (M1ha)
- Added Model.to_dict() that converts a model instance to a dictionary (M1ha)
- Added Database.raw() to perform arbitrary queries (M1ha)
v0.7.1
------
- Accept '0000-00-00 00:00:00' as a datetime value (tsionyx)
- Bug fix: parse_array fails on int arrays
- Improve performance when inserting many rows
v0.7.0
------
- Support array fields
- Support enum fields
v0.6.3
------
- Python 3 support

View File

@ -31,6 +31,8 @@ Models are defined in a way reminiscent of Django's ORM::
engine = engines.MergeTree('birthday', ('first_name', 'last_name', 'birthday'))
It is possible to provide a default value for a field, instead of its "natural" default (empty string for string fields, zero for numeric fields etc.).
Alternatively it is possible to pass alias or materialized parameters (see below for usage examples).
Only one of ``default``, ``alias`` and ``materialized`` parameters can be provided.
See below for the supported field types and table engines.
@ -90,6 +92,11 @@ Using the ``Database`` instance you can create a table for your model, and inser
The ``insert`` method can take any iterable of model instances, but they all must belong to the same model class.
Creating a read-only database is also supported. Such a ``Database`` instance can only read data, and cannot
modify data or schemas::
db = Database('my_test_db', readonly=True)
Reading from the Database
-------------------------
@ -152,7 +159,7 @@ Pagination
It is possible to paginate through model instances::
>>> order_by = 'first_name, last_name'
>>> page = db.paginate(Person, order_by, page_num=1, page_size=100)
>>> page = db.paginate(Person, order_by, page_num=1, page_size=10)
>>> print page.number_of_objects
2507
>>> print page.pages_total
@ -175,6 +182,58 @@ You can optionally pass conditions to the query::
Note that ``order_by`` must be chosen so that the ordering is unique, otherwise there might be
inconsistencies in the pagination (such as an instance that appears on two different pages).
System models
-------------
`Clickhouse docs <https://clickhouse.yandex/reference_en.html#System tables>`_.
System models are read only models for implementing part of the system's functionality,
and for providing access to information about how the system is working.
Currently the following system models are supported:
=================== ============ ===================================================
Class DB Table Comments
=================== ============ ===================================================
SystemPart system.parts Gives methods to work with partitions. See below.
=================== ============ ===================================================
Partitions and parts
--------------------
`ClickHouse docs <https://clickhouse.yandex/reference_en.html#Manipulations with partitions and parts>`_.
A partition in a table is data for a single calendar month. Table "system.parts" contains information about each part.
=================== ======================= =============================================================================================
Method Parameters Comments
=================== ======================= =============================================================================================
get(static) database, conditions="" Gets database partitions, filtered by conditions
get_active(static) database, conditions="" Gets only active (not detached or dropped) partitions, filtered by conditions
detach settings=None Detaches the partition. Settings is a dict of params to pass to http request
drop settings=None Drops the partition. Settings is a dict of params to pass to http request
attach settings=None Attaches already detached partition. Settings is a dict of params to pass to http request
freeze settings=None Freezes (makes backup) of the partition. Settings is a dict of params to pass to http request
fetch settings=None Fetches partition. Settings is a dict of params to pass to http request
=================== ======================= =============================================================================================
Usage example::
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.system_models import SystemPart
db = Database('my_test_db', db_url='http://192.168.1.1:8050', username='scott', password='tiger')
partitions = SystemPart.get_active(db, conditions='') # Getting all active partitions of the database
if len(partitions) > 0:
partitions = sorted(partitions, key=lambda obj: obj.name) # Partition name is YYYYMM, so we can sort so
partitions[0].freeze() # Make a backup in /opt/clickhouse/shadow directory
partitions[0].drop() # Dropped partition
``Note``: system.parts stores information for all databases. To be correct,
SystemPart model was designed to receive only given database parts.
Schema Migrations
-----------------
@ -189,26 +248,44 @@ Field Types
Currently the following field types are supported:
============= ======== ================= ===================================================
Class DB Type Pythonic Type Comments
============= ======== ================= ===================================================
StringField String unicode Encoded as UTF-8 when written to ClickHouse
DateField Date datetime.date Range 1970-01-01 to 2038-01-19
DateTimeField DateTime datetime.datetime Minimal value is 1970-01-01 00:00:00; Always in UTC
Int8Field Int8 int Range -128 to 127
Int16Field Int16 int Range -32768 to 32767
Int32Field Int32 int Range -2147483648 to 2147483647
Int64Field Int64 int/long Range -9223372036854775808 to 9223372036854775807
UInt8Field UInt8 int Range 0 to 255
UInt16Field UInt16 int Range 0 to 65535
UInt32Field UInt32 int Range 0 to 4294967295
UInt64Field UInt64 int/long Range 0 to 18446744073709551615
Float32Field Float32 float
Float64Field Float64 float
Enum8Field Enum8 Enum See below
Enum16Field Enum16 Enum See below
ArrayField Array list See below
============= ======== ================= ===================================================
=================== ======== ================= ===================================================
Class DB Type Pythonic Type Comments
=================== ======== ================= ===================================================
StringField String unicode Encoded as UTF-8 when written to ClickHouse
DateField Date datetime.date Range 1970-01-01 to 2038-01-19
DateTimeField DateTime datetime.datetime Minimal value is 1970-01-01 00:00:00; Always in UTC
Int8Field Int8 int Range -128 to 127
Int16Field Int16 int Range -32768 to 32767
Int32Field Int32 int Range -2147483648 to 2147483647
Int64Field Int64 int/long Range -9223372036854775808 to 9223372036854775807
UInt8Field UInt8 int Range 0 to 255
UInt16Field UInt16 int Range 0 to 65535
UInt32Field UInt32 int Range 0 to 4294967295
UInt64Field UInt64 int/long Range 0 to 18446744073709551615
Float32Field Float32 float
Float64Field Float64 float
Enum8Field Enum8 Enum See below
Enum16Field Enum16 Enum See below
ArrayField Array list See below
=================== ======== ================= ===================================================
DateTimeField and Time Zones
****************************
A ``DateTimeField`` can be assigned values from one of the following types:
- datetime
- date
- integer - number of seconds since the Unix epoch
- string in ``YYYY-MM-DD HH:MM:SS`` format
The assigned value always gets converted to a timezone-aware ``datetime`` in UTC. If the assigned
value is a timezone-aware ``datetime`` in another timezone, it will be converted to UTC. Otherwise, the assigned value is assumed to already be in UTC.
DateTime values that are read from the database are also converted to UTC. ClickHouse formats them according to the
timezone of the server, and the ORM makes the necessary conversions. This requires a ClickHouse version which is new
enough to support the ``timezone()`` function, otherwise it is assumed to be using UTC. In any case, we recommend
settings the server timezone to UTC in order to prevent confusion.
Working with enum fields
************************
@ -249,6 +326,38 @@ You can create array fields containing any data type, for example::
data = SensorData(date=date.today(), temperatures=[25.5, 31.2, 28.7], humidity_levels=[41, 39, 66])
Working with materialized and alias fields
******************************************
ClickHouse provides an opportunity to create MATERIALIZED and ALIAS Fields.
See documentation `here <https://clickhouse.yandex/reference_en.html#Default values>`_.
Both field types can't be inserted into the database directly, so they are ignored when using the ``Database.insert()`` method.
ClickHouse does not return the field values if you use ``"SELECT * FROM ..."`` - you have to list these field
names explicitly in the query.
Usage::
class Event(models.Model):
created = fields.DateTimeField()
created_date = fields.DateTimeField(materialized='toDate(created)')
name = fields.StringField()
username = fields.StringField(alias='name')
engine = engines.MergeTree('created_date', ('created_date', 'created'))
obj = Event(created=datetime.now(), name='MyEvent')
db = Database('my_test_db')
db.insert([obj])
# All values will be retrieved from database
db.select('SELECT created, created_date, username, name FROM $db.event', model_class=Event)
# created_date and username will contain a default value
db.select('SELECT * FROM $db.event', model_class=Event)
Table Engines
-------------
@ -271,6 +380,30 @@ For a ``SummingMergeTree`` you can optionally specify the summing columns::
engine = engines.SummingMergeTree('EventDate', ('OrderID', 'EventDate', 'BannerID'),
summing_cols=('Shows', 'Clicks', 'Cost'))
A ``Buffer`` engine is available for BufferModels. (See below how to use BufferModel). You can specify following parameters::
engine = engines.Buffer(Person) # you need to initialize engine with main Model. Other default parameters will be used
# or:
engine = engines.Buffer(Person, num_layers=16, min_time=10,
max_time=100, min_rows=10000, max_rows=1000000,
min_bytes=10000000, max_bytes=100000000)
Buffer Models
-------------
Here's how you can define Model for Buffer Engine. The Buffer Model should be inherited from models.BufferModel and main Model::
class PersonBuffer(models.BufferModel, Person):
engine = engines.Buffer(Person)
Then you can insert objects into Buffer model and they will be handled by Clickhouse properly::
db.create_table(PersonBuffer)
suzy = PersonBuffer(first_name='Suzy', last_name='Jones')
dan = PersonBuffer(first_name='Dan', last_name='Schwartz')
db.insert([dan, suzy])
Data Replication
****************
@ -291,4 +424,9 @@ After cloning the project, run the following commands::
To run the tests, ensure that the ClickHouse server is running on http://localhost:8123/ (this is the default), and run::
bin/nosetests
bin/nosetests
=======
To see test coverage information run::
bin/nosetests --with-coverage --cover-package=infi.clickhouse_orm

View File

@ -4,9 +4,12 @@ from .models import ModelBase
from .utils import escape, parse_tsv, import_submodules
from math import ceil
import datetime
import logging
from string import Template
from six import PY3, string_types
import pytz
import logging
logger = logging.getLogger('clickhouse_orm')
Page = namedtuple('Page', 'objects number_of_objects pages_total number page_size')
@ -26,6 +29,7 @@ class Database(object):
self.readonly = readonly
if not self.readonly:
self.create_database()
self.server_timezone = self._get_server_timezone()
def create_database(self):
self._send('CREATE DATABASE IF NOT EXISTS `%s`' % self.db_name)
@ -35,9 +39,13 @@ class Database(object):
def create_table(self, model_class):
# TODO check that model has an engine
if model_class.readonly:
raise DatabaseException("You can't create read only table")
self._send(model_class.create_table_sql(self.db_name))
def drop_table(self, model_class):
if model_class.readonly:
raise DatabaseException("You can't drop read only table")
self._send(model_class.drop_table_sql(self.db_name))
def insert(self, model_instances, batch_size=1000):
@ -48,13 +56,19 @@ class Database(object):
except StopIteration:
return # model_instances is empty
model_class = first_instance.__class__
if first_instance.readonly:
raise DatabaseException("You can't insert into read only table")
def gen():
yield self._substitute('INSERT INTO $table FORMAT TabSeparated\n', model_class).encode('utf-8')
yield (first_instance.to_tsv() + '\n').encode('utf-8')
first_instance.set_database(self)
yield (first_instance.to_tsv(include_readonly=False) + '\n').encode('utf-8')
# Collect lines in batches of batch_size
batch = []
for instance in i:
batch.append(instance.to_tsv())
instance.set_database(self)
batch.append(instance.to_tsv(include_readonly=False))
if len(batch) >= batch_size:
# Return the current batch of lines
yield ('\n'.join(batch) + '\n').encode('utf-8')
@ -82,7 +96,18 @@ class Database(object):
field_types = parse_tsv(next(lines))
model_class = model_class or ModelBase.create_ad_hoc_model(zip(field_names, field_types))
for line in lines:
yield model_class.from_tsv(line, field_names)
yield model_class.from_tsv(line, field_names, self.server_timezone, self)
def raw(self, query, settings=None, stream=False):
"""
Performs raw query to database. Returns its output
:param query: Query to execute
:param settings: Query settings to send as query GET parameters
:param stream: If flag is true, Http response from ClickHouse will be streamed.
:return: Query execution result
"""
query = self._substitute(query, None)
return self._send(query, settings=settings, stream=stream).text
def paginate(self, model_class, order_by, page_num=1, page_size=100, conditions=None, settings=None):
count = self.count(model_class, conditions)
@ -142,6 +167,8 @@ class Database(object):
params['user'] = self.username
if self.password:
params['password'] = self.password
if self.readonly:
params['readonly'] = '1'
return params
def _substitute(self, query, model_class=None):
@ -154,3 +181,11 @@ class Database(object):
mapping['table'] = "`%s`.`%s`" % (self.db_name, model_class.table_name())
query = Template(query).substitute(mapping)
return query
def _get_server_timezone(self):
try:
r = self._send('SELECT timezone()')
return pytz.timezone(r.text.strip())
except DatabaseException:
logger.exception('Cannot determine server timezone, assuming UTC')
return pytz.utc

View File

@ -62,3 +62,30 @@ class SummingMergeTree(MergeTree):
params.append('(%s)' % ', '.join(self.summing_cols))
return params
class Buffer(Engine):
"""Here we define Buffer engine
Read more here https://clickhouse.yandex/reference_en.html#Buffer
"""
#Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
def __init__(self, main_model, num_layers=16, min_time=10, max_time=100, min_rows=10000, max_rows=1000000, min_bytes=10000000, max_bytes=100000000):
self.main_model = main_model
self.num_layers = num_layers
self.min_time = min_time
self.max_time = max_time
self.min_rows = min_rows
self.max_rows = max_rows
self.min_bytes = min_bytes
self.max_bytes = max_bytes
def create_table_sql(self, db_name):
# Overriden create_table_sql example:
#sql = 'ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)'
sql = 'ENGINE = Buffer(`%s`, `%s`, %d, %d, %d, %d, %d, %d, %d)' % (
db_name, self.main_model.table_name(), self.num_layers,
self.min_time, self.max_time, self.min_rows,
self.max_rows, self.min_bytes, self.max_bytes
)
return sql

View File

@ -2,6 +2,7 @@ from six import string_types, text_type, binary_type
import datetime
import pytz
import time
from calendar import timegm
from .utils import escape, parse_array
@ -12,15 +13,25 @@ class Field(object):
class_default = 0
db_type = None
def __init__(self, default=None):
def __init__(self, default=None, alias=None, materialized=None):
assert (None, None) in {(default, alias), (alias, materialized), (default, materialized)}, \
"Only one of default, alias and materialized parameters can be given"
assert alias is None or isinstance(alias, str) and alias != "",\
"Alias field must be string field name, if given"
assert materialized is None or isinstance(materialized, str) and alias != "",\
"Materialized field must be string, if given"
self.creation_counter = Field.creation_counter
Field.creation_counter += 1
self.default = self.class_default if default is None else default
self.alias = alias
self.materialized = materialized
def to_python(self, value):
def to_python(self, value, timezone_in_use):
'''
Converts the input value into the expected Python data type, raising ValueError if the
data can't be converted. Returns the converted value. Subclasses should override this.
The timezone_in_use parameter should be consulted when parsing datetime fields.
'''
return value
@ -48,20 +59,30 @@ class Field(object):
def get_sql(self, with_default=True):
'''
Returns an SQL expression describing the field (e.g. for CREATE TABLE).
:param with_default: If True, adds default value to sql.
It doesn't affect fields with alias and materialized values.
'''
if with_default:
if self.alias:
return '%s ALIAS %s' % (self.db_type, self.alias)
elif self.materialized:
return '%s MATERIALIZED %s' % (self.db_type, self.materialized)
elif with_default:
default = self.to_db_string(self.default)
return '%s DEFAULT %s' % (self.db_type, default)
else:
return self.db_type
@property
def readonly(self):
return bool(self.alias or self.materialized)
class StringField(Field):
class_default = ''
db_type = 'String'
def to_python(self, value):
def to_python(self, value, timezone_in_use):
if isinstance(value, text_type):
return value
if isinstance(value, binary_type):
@ -76,11 +97,11 @@ class DateField(Field):
class_default = min_value
db_type = 'Date'
def to_python(self, value):
def to_python(self, value, timezone_in_use):
if isinstance(value, datetime.datetime):
return value.astimezone(pytz.utc).date() if value.tzinfo else value.date()
if isinstance(value, datetime.date):
return value
if isinstance(value, datetime.datetime):
return value.date()
if isinstance(value, int):
return DateField.class_default + datetime.timedelta(days=value)
if isinstance(value, string_types):
@ -101,26 +122,27 @@ class DateTimeField(Field):
class_default = datetime.datetime.fromtimestamp(0, pytz.utc)
db_type = 'DateTime'
def to_python(self, value):
def to_python(self, value, timezone_in_use):
if isinstance(value, datetime.datetime):
return value
return value.astimezone(pytz.utc) if value.tzinfo else value.replace(tzinfo=pytz.utc)
if isinstance(value, datetime.date):
return datetime.datetime(value.year, value.month, value.day)
return datetime.datetime(value.year, value.month, value.day, tzinfo=pytz.utc)
if isinstance(value, int):
return datetime.datetime.fromtimestamp(value, pytz.utc)
return datetime.datetime.utcfromtimestamp(value).replace(tzinfo=pytz.utc)
if isinstance(value, string_types):
if value == '0000-00-00 00:00:00':
return self.class_default
return datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
dt = datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
return timezone_in_use.localize(dt).astimezone(pytz.utc)
raise ValueError('Invalid value for %s - %r' % (self.__class__.__name__, value))
def to_db_string(self, value, quote=True):
return escape(int(time.mktime(value.timetuple())), quote)
return escape(timegm(value.utctimetuple()), quote)
class BaseIntField(Field):
def to_python(self, value):
def to_python(self, value, timezone_in_use):
try:
return int(value)
except:
@ -188,7 +210,7 @@ class Int64Field(BaseIntField):
class BaseFloatField(Field):
def to_python(self, value):
def to_python(self, value, timezone_in_use):
try:
return float(value)
except:
@ -207,13 +229,13 @@ class Float64Field(BaseFloatField):
class BaseEnumField(Field):
def __init__(self, enum_cls, default=None):
def __init__(self, enum_cls, default=None, alias=None, materialized=None):
self.enum_cls = enum_cls
if default is None:
default = list(enum_cls)[0]
super(BaseEnumField, self).__init__(default)
super(BaseEnumField, self).__init__(default, alias, materialized)
def to_python(self, value):
def to_python(self, value, timezone_in_use):
if isinstance(value, self.enum_cls):
return value
try:
@ -271,18 +293,18 @@ class ArrayField(Field):
class_default = []
def __init__(self, inner_field, default=None):
def __init__(self, inner_field, default=None, alias=None, materialized=None):
self.inner_field = inner_field
super(ArrayField, self).__init__(default)
super(ArrayField, self).__init__(default, alias, materialized)
def to_python(self, value):
def to_python(self, value, timezone_in_use):
if isinstance(value, text_type):
value = parse_array(value)
elif isinstance(value, binary_type):
value = parse_array(value.decode('UTF-8'))
elif not isinstance(value, (list, tuple)):
raise ValueError('ArrayField expects list or tuple, not %s' % type(value))
return [self.inner_field.to_python(v) for v in value]
return [self.inner_field.to_python(v, timezone_in_use) for v in value]
def validate(self, value):
for v in value:
@ -295,3 +317,4 @@ class ArrayField(Field):
def get_sql(self, with_default=True):
from .utils import escape
return 'Array(%s)' % self.inner_field.get_sql(with_default=False)

View File

@ -1,10 +1,11 @@
from .utils import escape, parse_tsv
from .engines import *
from .fields import Field
from logging import getLogger
from six import with_metaclass
import pytz
from .fields import Field
from .utils import parse_tsv
from logging import getLogger
logger = getLogger('clickhouse_orm')
@ -68,6 +69,7 @@ class Model(with_metaclass(ModelBase)):
'''
engine = None
readonly = False
def __init__(self, **kwargs):
'''
@ -77,6 +79,9 @@ class Model(with_metaclass(ModelBase)):
Unrecognized field names will cause an AttributeError.
'''
super(Model, self).__init__()
self._database = None
# Assign field values from keyword arguments
for name, value in kwargs.items():
field = self.get_field(name)
@ -96,10 +101,28 @@ class Model(with_metaclass(ModelBase)):
'''
field = self.get_field(name)
if field:
value = field.to_python(value)
value = field.to_python(value, pytz.utc)
field.validate(value)
super(Model, self).__setattr__(name, value)
def set_database(self, db):
"""
Sets _database attribute for current model instance
:param db: Database instance
:return: None
"""
# This can not be imported globally due to circular import
from .database import Database
assert isinstance(db, Database), "database must be database.Database instance"
self._database = db
def get_database(self):
"""
Gets _database attribute for current model instance
:return: database.Database instance, model was inserted or selected from or None
"""
return self._database
def get_field(self, name):
'''
Get a Field instance given its name, or None if not found.
@ -136,23 +159,59 @@ class Model(with_metaclass(ModelBase)):
return 'DROP TABLE IF EXISTS `%s`.`%s`' % (db_name, cls.table_name())
@classmethod
def from_tsv(cls, line, field_names=None):
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None):
'''
Create a model instance from a tab-separated line. The line may or may not include a newline.
The field_names list must match the fields defined in the model, but does not have to include all of them.
If omitted, it is assumed to be the names of all fields in the model, in order of definition.
:param database: if given, model receives database
'''
from six import next
field_names = field_names or [name for name, field in cls._fields]
values = iter(parse_tsv(line))
kwargs = {}
for name in field_names:
kwargs[name] = next(values)
return cls(**kwargs)
field = getattr(cls, name)
kwargs[name] = field.to_python(next(values), timezone_in_use)
def to_tsv(self):
obj = cls(**kwargs)
if database is not None:
obj.set_database(database)
return obj
def to_tsv(self, include_readonly=True):
'''
Returns the instance's column values as a tab-separated line. A newline is not included.
:param bool include_readonly: If False, returns only fields, that can be inserted into database
'''
data = self.__dict__
return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in self._fields)
fields = self._fields if include_readonly else [f for f in self._fields if not f[1].readonly]
return '\t'.join(field.to_db_string(data[name], quote=False) for name, field in fields)
def to_dict(self, include_readonly=True, field_names=None):
'''
Returns the instance's column values as a dict.
:param bool include_readonly: If False, returns only fields, that can be inserted into database
:param field_names: An iterable of field names to return
'''
fields = self._fields if include_readonly else [f for f in self._fields if not f[1].readonly]
if field_names is not None:
fields = [f for f in fields if f[0] in field_names]
data = self.__dict__
return {name: data[name] for name, field in fields}
class BufferModel(Model):
@classmethod
def create_table_sql(cls, db_name):
'''
Returns the SQL command for creating a table for this model.
'''
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db_name, cls.table_name(), db_name, cls.engine.main_model.table_name())]
engine_str = cls.engine.create_table_sql(db_name)
parts.append(engine_str)
return ' '.join(parts)

View File

@ -0,0 +1,136 @@
"""
This file contains system readonly models that can be got from database
https://clickhouse.yandex/reference_en.html#System tables
"""
from .database import Database
from .fields import *
from .models import Model
class SystemPart(Model):
"""
Contains information about parts of a table in the MergeTree family.
This model operates only fields, described in the reference. Other fields are ignored.
https://clickhouse.yandex/reference_en.html#system.parts
"""
OPERATIONS = frozenset({'DETACH', 'DROP', 'ATTACH', 'FREEZE', 'FETCH'})
readonly = True
database = StringField() # Name of the database where the table that this part belongs to is located.
table = StringField() # Name of the table that this part belongs to.
engine = StringField() # Name of the table engine, without parameters.
partition = StringField() # Name of the partition, in the format YYYYMM.
name = StringField() # Name of the part.
replicated = UInt8Field() # Whether the part belongs to replicated data.
# Whether the part is used in a table, or is no longer needed and will be deleted soon.
# Inactive parts remain after merging.
active = UInt8Field()
# Number of marks - multiply by the index granularity (usually 8192)
# to get the approximate number of rows in the part.
marks = UInt64Field()
bytes = UInt64Field() # Number of bytes when compressed.
# Time the directory with the part was modified. Usually corresponds to the part's creation time.
modification_time = DateTimeField()
remove_time = DateTimeField() # For inactive parts only - the time when the part became inactive.
# The number of places where the part is used. A value greater than 2 indicates
# that this part participates in queries or merges.
refcount = UInt32Field()
@classmethod
def table_name(cls):
return 'system.parts'
"""
Next methods return SQL for some operations, which can be done with partitions
https://clickhouse.yandex/reference_en.html#Manipulations with partitions and parts
"""
def _partition_operation_sql(self, operation, settings=None, from_part=None):
"""
Performs some operation over partition
:param db: Database object to execute operation on
:param operation: Operation to execute from SystemPart.OPERATIONS set
:param settings: Settings for executing request to ClickHouse over db.raw() method
:return: Operation execution result
"""
operation = operation.upper()
assert operation in self.OPERATIONS, "operation must be in [%s]" % ', '.join(self.OPERATIONS)
sql = "ALTER TABLE `%s`.`%s` %s PARTITION '%s'" % (self._database.db_name, self.table, operation, self.partition)
if from_part is not None:
sql += " FROM %s" % from_part
self._database.raw(sql, settings=settings, stream=False)
def detach(self, settings=None):
"""
Move a partition to the 'detached' directory and forget it.
:param settings: Settings for executing request to ClickHouse over db.raw() method
:return: SQL Query
"""
return self._partition_operation_sql('DETACH', settings=settings)
def drop(self, settings=None):
"""
Delete a partition
:param settings: Settings for executing request to ClickHouse over db.raw() method
:return: SQL Query
"""
return self._partition_operation_sql('DROP', settings=settings)
def attach(self, settings=None):
"""
Add a new part or partition from the 'detached' directory to the table.
:param settings: Settings for executing request to ClickHouse over db.raw() method
:return: SQL Query
"""
return self._partition_operation_sql('ATTACH', settings=settings)
def freeze(self, settings=None):
"""
Create a backup of a partition.
:param settings: Settings for executing request to ClickHouse over db.raw() method
:return: SQL Query
"""
return self._partition_operation_sql('FREEZE', settings=settings)
def fetch(self, zookeeper_path, settings=None):
"""
Download a partition from another server.
:param zookeeper_path: Path in zookeeper to fetch from
:param settings: Settings for executing request to ClickHouse over db.raw() method
:return: SQL Query
"""
return self._partition_operation_sql('FETCH', settings=settings, from_part=zookeeper_path)
@classmethod
def get(cls, database, conditions=""):
"""
Get all data from system.parts table
:param database: A database object to fetch data from.
:param conditions: WHERE clause conditions. Database condition is added automatically
:return: A list of SystemPart objects
"""
assert isinstance(database, Database), "database must be database.Database class instance"
assert isinstance(conditions, str), "conditions must be a string"
if conditions:
conditions += " AND"
field_names = ','.join([f[0] for f in cls._fields])
return database.select("SELECT %s FROM %s WHERE %s database='%s'" %
(field_names, cls.table_name(), conditions, database.db_name), model_class=cls)
@classmethod
def get_active(cls, database, conditions=""):
"""
Gets active data from system.parts table
:param database: A database object to fetch data from.
:param conditions: WHERE clause conditions. Database and active conditions are added automatically
:return: A list of SystemPart objects
"""
if conditions:
conditions += ' AND '
conditions += 'active'
return SystemPart.get(database, conditions=conditions)

View File

@ -0,0 +1,6 @@
from infi.clickhouse_orm import migrations
from ..test_migrations import *
operations = [
migrations.CreateTable(MaterializedModel)
]

View File

@ -0,0 +1,6 @@
from infi.clickhouse_orm import migrations
from ..test_migrations import *
operations = [
migrations.CreateTable(AliasModel)
]

View File

@ -0,0 +1,69 @@
import unittest
from datetime import date
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.models import Model
from infi.clickhouse_orm.fields import *
from infi.clickhouse_orm.engines import *
class MaterializedFieldsTest(unittest.TestCase):
def setUp(self):
self.database = Database('test-db')
self.database.create_table(ModelWithAliasFields)
def tearDown(self):
self.database.drop_database()
def test_insert_and_select(self):
instance = ModelWithAliasFields(
date_field='2016-08-30',
int_field=-10,
str_field='TEST'
)
self.database.insert([instance])
# We can't select * from table, as it doesn't select materialized and alias fields
query = 'SELECT date_field, int_field, str_field, alias_int, alias_date, alias_str' \
' FROM $db.%s ORDER BY alias_date' % ModelWithAliasFields.table_name()
for model_cls in (ModelWithAliasFields, None):
results = list(self.database.select(query, model_cls))
self.assertEquals(len(results), 1)
self.assertEquals(results[0].date_field, instance.date_field)
self.assertEquals(results[0].int_field, instance.int_field)
self.assertEquals(results[0].str_field, instance.str_field)
self.assertEquals(results[0].alias_int, instance.int_field)
self.assertEquals(results[0].alias_str, instance.str_field)
self.assertEquals(results[0].alias_date, instance.date_field)
def test_assignment_error(self):
# I can't prevent assigning at all, in case db.select statements with model provided sets model fields.
instance = ModelWithAliasFields()
for value in ('x', [date.today()], ['aaa'], [None]):
with self.assertRaises(ValueError):
instance.alias_date = value
def test_wrong_field(self):
with self.assertRaises(AssertionError):
StringField(alias=123)
def test_duplicate_default(self):
with self.assertRaises(AssertionError):
StringField(alias='str_field', default='with default')
with self.assertRaises(AssertionError):
StringField(alias='str_field', materialized='str_field')
class ModelWithAliasFields(Model):
int_field = Int32Field()
date_field = DateField()
str_field = StringField()
alias_str = StringField(alias='str_field')
alias_int = Int32Field(alias='int_field')
alias_date = DateField(alias='date_field')
engine = MergeTree('date_field', ('date_field',))

View File

@ -2,8 +2,8 @@
import unittest
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.models import Model
from infi.clickhouse_orm.database import Database, DatabaseException
from infi.clickhouse_orm.models import Model, BufferModel
from infi.clickhouse_orm.fields import *
from infi.clickhouse_orm.engines import *
@ -16,14 +16,22 @@ class DatabaseTestCase(unittest.TestCase):
def setUp(self):
self.database = Database('test-db')
self.database.create_table(Person)
self.database.create_table(PersonBuffer)
def tearDown(self):
self.database.drop_table(PersonBuffer)
self.database.drop_table(Person)
self.database.drop_database()
def _insert_and_check(self, data, count):
self.database.insert(data)
self.assertEquals(count, self.database.count(Person))
for instance in data:
self.assertEquals(self.database, instance.get_database())
def _insert_and_check_buffer(self, data, count):
self.database.insert(data)
self.assertEquals(count, self.database.count(PersonBuffer))
def test_insert__generator(self):
self._insert_and_check(self._sample_data(), len(data))
@ -53,6 +61,8 @@ class DatabaseTestCase(unittest.TestCase):
self.assertEquals(results[0].height, 1.72)
self.assertEquals(results[1].last_name, 'Scott')
self.assertEquals(results[1].height, 1.70)
self.assertEqual(results[0].get_database(), self.database)
self.assertEqual(results[1].get_database(), self.database)
def test_select_partial_fields(self):
self._insert_and_check(self._sample_data(), len(data))
@ -63,6 +73,8 @@ class DatabaseTestCase(unittest.TestCase):
self.assertEquals(results[0].height, 0) # default value
self.assertEquals(results[1].last_name, 'Scott')
self.assertEquals(results[1].height, 0) # default value
self.assertEqual(results[0].get_database(), self.database)
self.assertEqual(results[1].get_database(), self.database)
def test_select_ad_hoc_model(self):
self._insert_and_check(self._sample_data(), len(data))
@ -74,6 +86,8 @@ class DatabaseTestCase(unittest.TestCase):
self.assertEquals(results[0].height, 1.72)
self.assertEquals(results[1].last_name, 'Scott')
self.assertEquals(results[1].height, 1.70)
self.assertEqual(results[0].get_database(), self.database)
self.assertEqual(results[1].get_database(), self.database)
def test_pagination(self):
self._insert_and_check(self._sample_data(), len(data))
@ -117,10 +131,49 @@ class DatabaseTestCase(unittest.TestCase):
p = list(self.database.select("SELECT * from $table", Person))[0]
self.assertEquals(p.first_name, s)
def test_readonly(self):
orig_database = self.database
self.database = Database(orig_database.db_name, readonly=True)
with self.assertRaises(DatabaseException):
self._insert_and_check(self._sample_data(), len(data))
self.assertEquals(self.database.count(Person), 0)
with self.assertRaises(DatabaseException):
self.database.drop_table(Person)
with self.assertRaises(DatabaseException):
self.database.drop_database()
self.database = orig_database
def test_insert_buffer(self):
self._insert_and_check_buffer(self._sample_buffer_data(), len(data))
def _sample_data(self):
for entry in data:
yield Person(**entry)
def test_raw(self):
self._insert_and_check(self._sample_data(), len(data))
query = "SELECT * FROM `test-db`.person WHERE first_name = 'Whitney' ORDER BY last_name"
results = self.database.raw(query)
self.assertEqual(results, "Whitney\tDurham\t1977-09-15\t1.72\nWhitney\tScott\t1971-07-04\t1.7\n")
def test_insert_readonly(self):
m = ReadOnlyModel(name='readonly')
with self.assertRaises(DatabaseException):
self.database.insert([m])
def test_create_readonly_table(self):
with self.assertRaises(DatabaseException):
self.database.create_table(ReadOnlyModel)
def test_drop_readonly_table(self):
with self.assertRaises(DatabaseException):
self.database.drop_table(ReadOnlyModel)
def _sample_buffer_data(self):
for entry in data:
yield PersonBuffer(**entry)
class Person(Model):
@ -132,6 +185,18 @@ class Person(Model):
engine = MergeTree('birthday', ('first_name', 'last_name', 'birthday'))
class ReadOnlyModel(Model):
readonly = True
name = StringField()
class PersonBuffer(BufferModel, Person):
engine = Buffer(Person)
data = [
{"first_name": "Abdul", "last_name": "Hester", "birthday": "1970-12-02", "height": "1.63"},
{"first_name": "Adam", "last_name": "Goodman", "birthday": "1986-01-07", "height": "1.74"},

View File

@ -0,0 +1,69 @@
import unittest
from datetime import date
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.models import Model
from infi.clickhouse_orm.fields import *
from infi.clickhouse_orm.engines import *
class MaterializedFieldsTest(unittest.TestCase):
def setUp(self):
self.database = Database('test-db')
self.database.create_table(ModelWithMaterializedFields)
def tearDown(self):
self.database.drop_database()
def test_insert_and_select(self):
instance = ModelWithMaterializedFields(
date_time_field='2016-08-30 11:00:00',
int_field=-10,
str_field='TEST'
)
self.database.insert([instance])
# We can't select * from table, as it doesn't select materialized and alias fields
query = 'SELECT date_time_field, int_field, str_field, mat_int, mat_date, mat_str' \
' FROM $db.%s ORDER BY mat_date' % ModelWithMaterializedFields.table_name()
for model_cls in (ModelWithMaterializedFields, None):
results = list(self.database.select(query, model_cls))
self.assertEquals(len(results), 1)
self.assertEquals(results[0].date_time_field, instance.date_time_field)
self.assertEquals(results[0].int_field, instance.int_field)
self.assertEquals(results[0].str_field, instance.str_field)
self.assertEquals(results[0].mat_int, abs(instance.int_field))
self.assertEquals(results[0].mat_str, instance.str_field.lower())
self.assertEquals(results[0].mat_date, instance.date_time_field.date())
def test_assignment_error(self):
# I can't prevent assigning at all, in case db.select statements with model provided sets model fields.
instance = ModelWithMaterializedFields()
for value in ('x', [date.today()], ['aaa'], [None]):
with self.assertRaises(ValueError):
instance.mat_date = value
def test_wrong_field(self):
with self.assertRaises(AssertionError):
StringField(materialized=123)
def test_duplicate_default(self):
with self.assertRaises(AssertionError):
StringField(materialized='str_field', default='with default')
with self.assertRaises(AssertionError):
StringField(materialized='str_field', alias='str_field')
class ModelWithMaterializedFields(Model):
int_field = Int32Field()
date_time_field = DateTimeField()
str_field = StringField()
mat_str = StringField(materialized='lower(str_field)')
mat_int = Int32Field(materialized='abs(int_field)')
mat_date = DateField(materialized='toDate(date_time_field)')
engine = MergeTree('mat_date', ('mat_date',))

View File

@ -60,6 +60,15 @@ class MigrationsTestCase(unittest.TestCase):
self.assertTrue(self.tableExists(EnumModel1))
self.assertEquals(self.getTableFields(EnumModel2),
[('date', 'Date'), ('f1', "Enum16('dog' = 1, 'cat' = 2, 'horse' = 3, 'pig' = 4)")])
self.database.migrate('tests.sample_migrations', 8)
self.assertTrue(self.tableExists(MaterializedModel))
self.assertEquals(self.getTableFields(MaterializedModel),
[('date_time', "DateTime"), ('date', 'Date')])
self.database.migrate('tests.sample_migrations', 9)
self.assertTrue(self.tableExists(AliasModel))
self.assertEquals(self.getTableFields(AliasModel),
[('date', 'Date'), ('date_alias', "Date")])
# Several different models with the same table name, to simulate a table that changes over time
@ -127,3 +136,25 @@ class EnumModel2(Model):
@classmethod
def table_name(cls):
return 'enum_mig'
class MaterializedModel(Model):
date_time = DateTimeField()
date = DateField(materialized='toDate(date_time)')
engine = MergeTree('date', ('date',))
@classmethod
def table_name(cls):
return 'materalized_date'
class AliasModel(Model):
date = DateField()
date_alias = DateField(alias='date')
engine = MergeTree('date', ('date',))
@classmethod
def table_name(cls):
return 'alias_date'

View File

@ -55,6 +55,29 @@ class ModelTestCase(unittest.TestCase):
instance.int_field = '99'
self.assertEquals(instance.int_field, 99)
def test_to_dict(self):
instance = SimpleModel(date_field='1973-12-06', int_field='100', float_field='7')
self.assertDictEqual(instance.to_dict(), {
"date_field": datetime.date(1973, 12, 6),
"int_field": 100,
"float_field": 7.0,
"datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
"alias_field": 0.0,
'str_field': 'dozo'
})
self.assertDictEqual(instance.to_dict(include_readonly=False), {
"date_field": datetime.date(1973, 12, 6),
"int_field": 100,
"float_field": 7.0,
"datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
'str_field': 'dozo'
})
self.assertDictEqual(
instance.to_dict(include_readonly=False, field_names=('int_field', 'alias_field', 'datetime_field')), {
"int_field": 100,
"datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc)
})
class SimpleModel(Model):
@ -63,6 +86,7 @@ class SimpleModel(Model):
str_field = StringField(default='dozo')
int_field = Int32Field(default=17)
float_field = Float32Field()
alias_field = Float32Field(alias='float_field')
engine = MergeTree('date_field', ('int_field', 'date_field'))

View File

@ -0,0 +1,74 @@
import unittest
from infi.clickhouse_orm.fields import *
from datetime import date, datetime
import pytz
class SimpleFieldsTest(unittest.TestCase):
def test_date_field(self):
f = DateField()
# Valid values
for value in (date(1970, 1, 1), datetime(1970, 1, 1), '1970-01-01', '0000-00-00', 0):
self.assertEquals(f.to_python(value, pytz.utc), date(1970, 1, 1))
# Invalid values
for value in ('nope', '21/7/1999', 0.5):
with self.assertRaises(ValueError):
f.to_python(value, pytz.utc)
# Range check
for value in (date(1900, 1, 1), date(2900, 1, 1)):
with self.assertRaises(ValueError):
f.validate(value)
def test_datetime_field(self):
f = DateTimeField()
epoch = datetime(1970, 1, 1, tzinfo=pytz.utc)
# Valid values
for value in (date(1970, 1, 1), datetime(1970, 1, 1), epoch,
epoch.astimezone(pytz.timezone('US/Eastern')), epoch.astimezone(pytz.timezone('Asia/Jerusalem')),
'1970-01-01 00:00:00', '0000-00-00 00:00:00', 0):
dt = f.to_python(value, pytz.utc)
self.assertEquals(dt.tzinfo, pytz.utc)
self.assertEquals(dt, epoch)
# Verify that conversion to and from db string does not change value
dt2 = f.to_python(int(f.to_db_string(dt)), pytz.utc)
self.assertEquals(dt, dt2)
# Invalid values
for value in ('nope', '21/7/1999', 0.5):
with self.assertRaises(ValueError):
f.to_python(value, pytz.utc)
def test_date_field(self):
f = DateField()
epoch = date(1970, 1, 1)
# Valid values
for value in (datetime(1970, 1, 1), epoch, '1970-01-01', '0000-00-00', 0):
d = f.to_python(value, pytz.utc)
self.assertEquals(d, epoch)
# Verify that conversion to and from db string does not change value
d2 = f.to_python(f.to_db_string(d, quote=False), pytz.utc)
self.assertEquals(d, d2)
# Invalid values
for value in ('nope', '21/7/1999', 0.5):
with self.assertRaises(ValueError):
f.to_python(value, pytz.utc)
def test_date_field_timezone(self):
# Verify that conversion of timezone-aware datetime is correct
f = DateField()
dt = datetime(2017, 10, 5, tzinfo=pytz.timezone('Asia/Jerusalem'))
self.assertEquals(f.to_python(dt, pytz.utc), date(2017, 10, 4))
def test_uint8_field(self):
f = UInt8Field()
# Valid values
for value in (17, '17', 17.0):
self.assertEquals(f.to_python(value, pytz.utc), 17)
# Invalid values
for value in ('nope', date.today()):
with self.assertRaises(ValueError):
f.to_python(value, pytz.utc)
# Range check
for value in (-1, 1000):
with self.assertRaises(ValueError):
f.validate(value)

View File

@ -0,0 +1,77 @@
import unittest
from datetime import date
import os
import shutil
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.engines import *
from infi.clickhouse_orm.fields import *
from infi.clickhouse_orm.models import Model
from infi.clickhouse_orm.system_models import SystemPart
class SystemPartTest(unittest.TestCase):
BACKUP_DIR = '/opt/clickhouse/shadow/'
def setUp(self):
self.database = Database('test-db')
self.database.create_table(TestTable)
self.database.insert([TestTable(date_field=date.today())])
def tearDown(self):
self.database.drop_database()
def _get_backups(self):
if not os.path.exists(self.BACKUP_DIR):
return []
_, dirnames, _ = next(os.walk(self.BACKUP_DIR))
return dirnames
def test_get_all(self):
parts = SystemPart.get(self.database)
self.assertEqual(len(list(parts)), 1)
def test_get_active(self):
parts = list(SystemPart.get_active(self.database))
self.assertEqual(len(parts), 1)
parts[0].detach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
def test_get_conditions(self):
parts = list(SystemPart.get(self.database, conditions="table='testtable'"))
self.assertEqual(len(parts), 1)
parts = list(SystemPart.get(self.database, conditions="table='othertable'"))
self.assertEqual(len(parts), 0)
def test_attach_detach(self):
parts = list(SystemPart.get_active(self.database))
self.assertEqual(len(parts), 1)
parts[0].detach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
parts[0].attach()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 1)
def test_drop(self):
parts = list(SystemPart.get_active(self.database))
parts[0].drop()
self.assertEqual(len(list(SystemPart.get_active(self.database))), 0)
def test_freeze(self):
parts = list(SystemPart.get(self.database))
# There can be other backups in the folder
prev_backups = set(self._get_backups())
parts[0].freeze()
backups = set(self._get_backups())
self.assertEqual(len(backups), len(prev_backups) + 1)
# Clean created backup
shutil.rmtree(self.BACKUP_DIR + '{0}'.format(list(backups - prev_backups)[0]))
def test_fetch(self):
# TODO Not tested, as I have no replication set
pass
class TestTable(Model):
date_field = DateField()
engine = MergeTree('date_field', ('date_field',))