Changed library to using generators instead of lists

and namedtuple instead of infi Model instance, for been more efficient.
Problems with sync_test
This commit is contained in:
M1ha 2019-02-03 14:24:21 +05:00
parent 27c9ca4dfc
commit 16dce3d047
11 changed files with 193 additions and 134 deletions

View File

@ -6,3 +6,4 @@ psycopg2
infi.clickhouse-orm infi.clickhouse-orm
celery celery
statsd statsd
django-pg-returning

View File

@ -2,12 +2,12 @@
This file defines base abstract models to inherit from This file defines base abstract models to inherit from
""" """
import datetime import datetime
from collections import defaultdict
from itertools import chain
from typing import List, Tuple, Iterable, Set, Any, Dict, Optional
import logging import logging
import pytz from collections import defaultdict
from copy import deepcopy
from itertools import chain
from typing import List, Tuple, Iterable, Set, Any, Optional
from django.db.models import Model as DjangoModel, QuerySet as DjangoQuerySet from django.db.models import Model as DjangoModel, QuerySet as DjangoQuerySet
from infi.clickhouse_orm.database import Database from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.engines import CollapsingMergeTree from infi.clickhouse_orm.engines import CollapsingMergeTree
@ -15,6 +15,7 @@ from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiMode
from six import with_metaclass from six import with_metaclass
from statsd.defaults.django import statsd from statsd.defaults.django import statsd
from .compatibility import namedtuple
from .configuration import config from .configuration import config
from .database import connections from .database import connections
from .exceptions import RedisLockTimeoutError from .exceptions import RedisLockTimeoutError
@ -23,7 +24,6 @@ from .query import QuerySet
from .serializers import Django2ClickHouseModelSerializer from .serializers import Django2ClickHouseModelSerializer
from .utils import lazy_class_import, exec_multi_arg_func from .utils import lazy_class_import, exec_multi_arg_func
logger = logging.getLogger('django-clickhouse') logger = logging.getLogger('django-clickhouse')
@ -63,40 +63,16 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
# This attribute is initialized in metaclass, as it must get model class as a parameter # This attribute is initialized in metaclass, as it must get model class as a parameter
objects = None # type: QuerySet objects = None # type: QuerySet
def __init__(self, **kwargs):
multi_init = kwargs.pop('__multi_init', False)
if multi_init:
pass
else:
super(ClickHouseModel, self).__init__(**kwargs)
@classmethod @classmethod
def init_many(cls, kwargs_list, database=None): def get_tuple_class(cls, field_names=None, defaults=None):
# type: (Iterable[Dict[str, Any]], Optional[Database]) -> Iterable['ClickHouseModel'] field_names = field_names or tuple(cls.fields(writable=False).keys())
""" if defaults:
Basic __init__ methods if not effective if we need to init 100k objects defaults_new = deepcopy(cls._defaults)
:return: A list of inited classes defaults_new.update(defaults)
""" else:
assert database is None or isinstance(database, Database), "database must be database.Database instance" defaults_new = cls._defaults
# Assign default values return namedtuple("%sTuple" % cls.__name__, field_names, defaults=defaults_new)
valid_field_names = set(cls._fields.keys())
for kwargs in kwargs_list:
invalid_fields = set(kwargs.keys()) - valid_field_names
if invalid_fields:
raise AttributeError('%s does not have a fields called %s' % (cls.__name__, ', '.join(invalid_fields)))
item = cls(__multi_init=True)
item.__dict__.update(cls._defaults)
item._database = database
for name in kwargs:
field = cls._fields[name]
kwargs[name] = field.to_python(kwargs[name], pytz.utc)
field.validate(kwargs[name])
item.__dict__.update(kwargs)
yield item
@classmethod @classmethod
def objects_in(cls, database): # type: (Database) -> QuerySet def objects_in(cls, database): # type: (Database) -> QuerySet
@ -128,9 +104,10 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return connections[db_alias] return connections[db_alias]
@classmethod @classmethod
def get_django_model_serializer(cls, writable=False): # type: (bool) -> Django2ClickHouseModelSerializer def get_django_model_serializer(cls, writable=False, defaults=None):
# type: (bool, Optional[dict]) -> Django2ClickHouseModelSerializer
serializer_cls = lazy_class_import(cls.django_model_serializer) serializer_cls = lazy_class_import(cls.django_model_serializer)
return serializer_cls(cls, writable=writable) return serializer_cls(cls, writable=writable, defaults=defaults)
@classmethod @classmethod
def get_sync_batch_size(cls): def get_sync_batch_size(cls):
@ -223,8 +200,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
:return: :return:
""" """
if batch: if batch:
# +10 constant is to avoid 0 insert, generated by infi cls.get_database(for_write=True).insert_tuples(cls, batch)
cls.get_database(for_write=True).insert(batch, batch_size=len(batch) + 10)
@classmethod @classmethod
def sync_batch_from_storage(cls): def sync_batch_from_storage(cls):
@ -257,7 +233,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
if import_objects: if import_objects:
with statsd.timer(statsd_key.format('steps.get_insert_batch')): with statsd.timer(statsd_key.format('steps.get_insert_batch')):
batch = cls.get_insert_batch(import_objects) batch = cls.get_insert_batch(import_objects)
statsd.incr(statsd_key.format('insert_batch'), len(batch)) # statsd.incr(statsd_key.format('insert_batch'), len(batch))
with statsd.timer(statsd_key.format('steps.insert')): with statsd.timer(statsd_key.format('steps.insert')):
cls.insert_batch(batch) cls.insert_batch(batch)
@ -333,8 +309,8 @@ class ClickHouseMultiModel(ClickHouseModel):
model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__) model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__)
with statsd.timer(model_statsd_key.format('steps.get_insert_batch')): with statsd.timer(model_statsd_key.format('steps.get_insert_batch')):
batch = model_cls.get_insert_batch(import_objects) batch = model_cls.get_insert_batch(import_objects)
statsd.incr(model_statsd_key.format('insert_batch'), len(batch)) # statsd.incr(model_statsd_key.format('insert_batch'), len(batch))
statsd.incr(statsd_key.format('insert_batch'), len(batch)) # statsd.incr(statsd_key.format('insert_batch'), len(batch))
return model_cls, batch return model_cls, batch
res = exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models)) res = exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models))

View File

@ -13,11 +13,14 @@ class NamedTuple:
@classmethod @classmethod
@lru_cache(maxsize=32) @lru_cache(maxsize=32)
def _get_defaults(cls, exclude): def _get_defaults(cls, exclude):
res = cls._defaults res = deepcopy(cls._defaults)
for k in exclude: for k in exclude:
res.pop(k, None) res.pop(k, None)
return res return res
def _astuple(self):
return tuple(self._data)
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
new_kwargs = deepcopy(self._get_defaults(self._data_cls._fields[:len(args)])) new_kwargs = deepcopy(self._get_defaults(self._data_cls._fields[:len(args)]))
new_kwargs.update(kwargs) new_kwargs.update(kwargs)
@ -33,6 +36,10 @@ class NamedTuple:
def __next__(self): def __next__(self):
return next(self._data_iterator) return next(self._data_iterator)
def __eq__(self, other):
other_tuple = other._astuple() if isinstance(other, NamedTuple) else other
return tuple(self._data) == other_tuple
def namedtuple(*args, **kwargs): def namedtuple(*args, **kwargs):
""" """

View File

@ -1,6 +1,9 @@
from infi.clickhouse_orm.database import Database as InfiDatabase from typing import Generator, Optional, Type, Iterable
from infi.clickhouse_orm.database import Database as InfiDatabase, DatabaseException
from infi.clickhouse_orm.utils import parse_tsv from infi.clickhouse_orm.utils import parse_tsv
from six import next from six import next
from io import BytesIO
from .configuration import config from .configuration import config
from .exceptions import DBAliasError from .exceptions import DBAliasError
@ -27,29 +30,88 @@ class Database(InfiDatabase):
def _get_applied_migrations(self, migrations_package_name): def _get_applied_migrations(self, migrations_package_name):
raise NotImplementedError("This method is not supported by django_clickhouse.") raise NotImplementedError("This method is not supported by django_clickhouse.")
def select_init_many(self, query, model_class, settings=None): def select_tuples(self, query, model_class, settings=None):
# type: (str, Type['ClickHouseModel'], Optional[dict], Optional[dict]) -> Generator[tuple]
""" """
Base select doesn't use init_mult which is ineffective on big result lists This method selects model_class namedtuples, instead of class instances.
Less memory consumption, greater speed
:param query: Query to execute. Can contain substitution variables.
:param model_class: A class of model to get fields from
:param settings: Optional connection settings
:return: Generator of namedtuple objects
""" """
query += ' FORMAT TabSeparatedWithNames' query += ' FORMAT TabSeparatedWithNames'
query = self._substitute(query, model_class) query = self._substitute(query, model_class)
r = self._send(query, settings, True) r = self._send(query, settings, True)
lines = r.iter_lines() lines = r.iter_lines()
field_names = parse_tsv(next(lines)) field_names = parse_tsv(next(lines))
fields = [
field for name, field in model_class.fields(writable=True).items()
if name in field_names
]
res_class = model_class.get_tuple_class(field_names)
kwargs_list = []
for line in lines: for line in lines:
# skip blank line left by WITH TOTALS modifier # skip blank line left by WITH TOTALS modifier
if line: if line:
values = iter(parse_tsv(line)) values = iter(parse_tsv(line))
kwargs = {} item = res_class(*(
for name in field_names: fields[i].to_python(next(values), self.server_timezone)
field = getattr(model_class, name) for i, r in enumerate(parse_tsv(line))
kwargs[name] = field.to_python(next(values), self.server_timezone) ))
kwargs_list.append(kwargs) yield item
return model_class.init_many(kwargs_list, database=self) def insert_tuples(self, model_class, model_tuples, batch_size=None):
# type: (Type['ClickHouseModel'], Iterable[tuple], Optional[int]) -> None
"""
Inserts model_class namedtuples
:param model_class: Clickhouse model, namedtuples are made from
:param model_tuples: An iterable of tuples to insert
:param batch_size: Size of batch
:return: None
"""
tuples_iterator = iter(model_tuples)
try:
first_tuple = next(tuples_iterator)
except StopIteration:
return # model_instances is empty
if model_class.is_read_only() or model_class.is_system_model():
raise DatabaseException("You can't insert into read only and system tables")
fields_list = ','.join('`%s`' % name for name in first_tuple._fields)
fields = [f for name, f in model_class.fields(writable=True).items() if name in first_tuple._fields]
def tuple_to_csv(tup):
return '\t'.join(field.to_db_string(val, quote=False) for field, val in zip(fields, tup)) + '\n'
def gen():
buf = BytesIO()
query = 'INSERT INTO `%s`.`%s` (%s) FORMAT TabSeparated\n' \
% (self.db_name, model_class.table_name(), fields_list)
buf.write(query.encode('utf-8'))
buf.write(tuple_to_csv(first_tuple).encode('utf-8'))
# Collect lines in batches of batch_size
lines = 2
for t in tuples_iterator:
buf.write(tuple_to_csv(t).encode('utf-8'))
lines += 1
if batch_size is not None and lines >= batch_size:
# Return the current batch of lines
yield buf.getvalue()
# Start a new batch
buf = BytesIO()
lines = 0
# Return any remaining lines in partial batch
if lines:
yield buf.getvalue()
self._send(gen())
class ConnectionProxy: class ConnectionProxy:

View File

@ -2,31 +2,28 @@
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
""" """
import datetime import datetime
from typing import List, TypeVar, Type, Union, Iterable from typing import List, Type, Union, Iterable, Generator
from django.db.models import Model as DjangoModel from django.db.models import Model as DjangoModel
from infi.clickhouse_orm import engines as infi_engines from infi.clickhouse_orm import engines as infi_engines
from infi.clickhouse_orm.models import Model as InfiModel
from statsd.defaults.django import statsd from statsd.defaults.django import statsd
from django_clickhouse.database import connections
from .configuration import config from .configuration import config
from .database import connections
from .utils import format_datetime from .utils import format_datetime
T = TypeVar('T')
class InsertOnlyEngineMixin: class InsertOnlyEngineMixin:
def get_insert_batch(self, model_cls, objects): def get_insert_batch(self, model_cls, objects):
# type: (Type[T], List[DjangoModel]) -> List[T] # type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple]
""" """
Gets a list of model_cls instances to insert into database Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import :param model_cls: ClickHouseModel subclass to import
:param objects: A list of django Model instances to sync :param objects: A list of django Model instances to sync
:return: An iterator of model_cls instances :return: A generator of model_cls named tuples
""" """
serializer = model_cls.get_django_model_serializer(writable=True) serializer = model_cls.get_django_model_serializer(writable=True)
return list(serializer.serialize_many(objects)) return (serializer.serialize(obj) for obj in objects)
class MergeTree(InsertOnlyEngineMixin, infi_engines.MergeTree): class MergeTree(InsertOnlyEngineMixin, infi_engines.MergeTree):
@ -48,33 +45,32 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
self.version_col = kwargs.pop('version_col', None) self.version_col = kwargs.pop('version_col', None)
super(CollapsingMergeTree, self).__init__(*args, **kwargs) super(CollapsingMergeTree, self).__init__(*args, **kwargs)
def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col): def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns):
query = """ query = """
SELECT * FROM $table WHERE (`{pk_column}`, `{version_col}`) IN ( SELECT {columns} FROM $table WHERE (`{pk_column}`, `{version_col}`) IN (
SELECT `{pk_column}`, MAX(`{version_col}`) SELECT `{pk_column}`, MAX(`{version_col}`)
FROM $table FROM $table
PREWHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}' PREWHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
AND `{pk_column}` IN ({object_pks}) AND `{pk_column}` IN ({object_pks})
GROUP BY `{pk_column}` GROUP BY `{pk_column}`
) )
""".format(version_col=self.version_col, date_col=date_col, pk_column=self.pk_column, """.format(columns=','.join(columns), version_col=self.version_col, date_col=date_col, pk_column=self.pk_column,
min_date=min_date, max_date=max_date, object_pks=','.join(object_pks)) min_date=min_date, max_date=max_date, object_pks=','.join(object_pks))
qs = connections[db_alias].select_init_many(query, model_cls) return connections[db_alias].select_tuples(query, model_cls)
return list(qs)
def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col): def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns):
query = """ query = """
SELECT * FROM $table FINAL SELECT {columns} FROM $table FINAL
WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}' WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
AND `{pk_column}` IN ({object_pks}) AND `{pk_column}` IN ({object_pks})
""" """
query = query.format(date_col=date_col, pk_column=self.pk_column, min_date=min_date, query = query.format(columns=','.join(columns), date_col=date_col, pk_column=self.pk_column, min_date=min_date,
max_date=max_date, object_pks=','.join(object_pks)) max_date=max_date, object_pks=','.join(object_pks))
qs = connections[db_alias].select_init_many(query, model_cls) return connections[db_alias].select_tuples(query, model_cls)
return list(qs)
def get_final_versions(self, model_cls, objects, date_col=None): def get_final_versions(self, model_cls, objects, date_col=None):
# type: (Type['ClickHouseModel'], Iterable[DjangoModel], str) -> Generator[tuple]
""" """
Get objects, that are currently stored in ClickHouse. Get objects, that are currently stored in ClickHouse.
Depending on the partition key this can be different for different models. Depending on the partition key this can be different for different models.
@ -82,7 +78,8 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
It also supposes primary key to by self.pk_column It also supposes primary key to by self.pk_column
:param model_cls: ClickHouseModel subclass to import :param model_cls: ClickHouseModel subclass to import
:param objects: Objects for which final versions are searched :param objects: Objects for which final versions are searched
:return: A list of model objects :param date_col: Optional column name, where partiion date is hold. Defaults to self.date_col
:return: A generator of named tuples, representing previous state
""" """
def _dt_to_str(dt): # type: (Union[datetime.date, datetime.datetime]) -> str def _dt_to_str(dt): # type: (Union[datetime.date, datetime.datetime]) -> str
@ -94,7 +91,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
raise Exception('Invalid date or datetime object: `%s`' % dt) raise Exception('Invalid date or datetime object: `%s`' % dt)
if not objects: if not objects:
return [] raise StopIteration()
date_col = date_col or self.date_col date_col = date_col or self.date_col
min_date, max_date = None, None min_date, max_date = None, None
@ -114,41 +111,48 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
min_date = _dt_to_str(min_date) min_date = _dt_to_str(min_date)
max_date = _dt_to_str(max_date) max_date = _dt_to_str(max_date)
# Get fields. Sign is replaced to negative for further processing
columns = list(model_cls.fields(writable=True).keys())
columns.remove(self.sign_col)
columns.append('-1 AS sign')
params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns)
if self.version_col: if self.version_col:
return self._get_final_versions_by_version(db_alias, model_cls, min_date, max_date, object_pks, date_col) return self._get_final_versions_by_version(*params)
else: else:
return self._get_final_versions_by_final(db_alias, model_cls, min_date, max_date, object_pks, date_col) return self._get_final_versions_by_final(*params)
def get_insert_batch(self, model_cls, objects): def get_insert_batch(self, model_cls, objects):
# type: (Type[T], List[DjangoModel]) -> List[T] # type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple]
""" """
Gets a list of model_cls instances to insert into database Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import :param model_cls: ClickHouseModel subclass to import
:param objects: A list of django Model instances to sync :param objects: A list of django Model instances to sync
:return: A list of model_cls objects :return: A list of model_cls objects
""" """
new_objs = super(CollapsingMergeTree, self).get_insert_batch(model_cls, objects) defaults = {self.sign_col: 1}
if self.version_col:
defaults[self.version_col] = 1
serializer = model_cls.get_django_model_serializer(writable=True, defaults=defaults)
new_objs = [serializer.serialize(obj) for obj in objects]
statsd_key = "%s.sync.%s.steps.get_final_versions" % (config.STATSD_PREFIX, model_cls.__name__) statsd_key = "%s.sync.%s.steps.get_final_versions" % (config.STATSD_PREFIX, model_cls.__name__)
with statsd.timer(statsd_key): with statsd.timer(statsd_key):
old_objs = self.get_final_versions(model_cls, new_objs) old_objs = list(self.get_final_versions(model_cls, new_objs))
# -1 sign has been set get_final_versions()
old_objs_versions = {} old_objs_versions = {}
for obj in old_objs: for obj in old_objs:
self.set_obj_sign(obj, -1) pk = getattr(obj, self.pk_column)
old_objs_versions[obj.id] = getattr(obj, self.version_col)
for obj in new_objs:
self.set_obj_sign(obj, 1)
if self.version_col: if self.version_col:
setattr(obj, self.version_col, old_objs_versions.get(obj.id, 0) + 1) old_objs_versions[pk] = getattr(obj, self.version_col)
yield obj
return old_objs + new_objs # 1 sign is set by default in serializer
for obj in new_objs:
pk = getattr(obj, self.pk_column)
if self.version_col:
setattr(obj, self.version_col, old_objs_versions.get(pk, 0) + 1)
def set_obj_sign(self, obj, sign): # type: (InfiModel, int) -> None yield obj
"""
Sets objects sign. By default gets attribute name from sign_col
:return: None
"""
setattr(obj, self.sign_col, sign)

View File

@ -1,11 +1,9 @@
from django.db.models import Model as DjangoModel from django.db.models import Model as DjangoModel
from typing import List, Iterable
from django_clickhouse.utils import model_to_dict from django_clickhouse.utils import model_to_dict
class Django2ClickHouseModelSerializer: class Django2ClickHouseModelSerializer:
def __init__(self, model_cls, fields=None, exclude_fields=None, writable=False): def __init__(self, model_cls, fields=None, exclude_fields=None, writable=False, defaults=None):
self._model_cls = model_cls self._model_cls = model_cls
if fields is not None: if fields is not None:
self.serialize_fields = fields self.serialize_fields = fields
@ -13,6 +11,7 @@ class Django2ClickHouseModelSerializer:
self.serialize_fields = model_cls.fields(writable=writable).keys() self.serialize_fields = model_cls.fields(writable=writable).keys()
self.exclude_serialize_fields = exclude_fields self.exclude_serialize_fields = exclude_fields
self._result_class = self._model_cls.get_tuple_class(defaults=defaults)
def _get_serialize_kwargs(self, obj): def _get_serialize_kwargs(self, obj):
data = model_to_dict(obj, fields=self.serialize_fields, exclude_fields=self.exclude_serialize_fields) data = model_to_dict(obj, fields=self.serialize_fields, exclude_fields=self.exclude_serialize_fields)
@ -29,8 +28,5 @@ class Django2ClickHouseModelSerializer:
return result return result
def serialize(self, obj): # type: (DjangoModel) -> 'ClickHouseModel' def serialize(self, obj): # type: (DjangoModel) -> tuple
return self._model_cls(**self._get_serialize_kwargs(obj)) return self._result_class(**self._get_serialize_kwargs(obj))
def serialize_many(self, objs): # type: (Iterable[DjangoModel]) -> Iterable['ClickHouseModel']
return self._model_cls.init_many((self._get_serialize_kwargs(obj) for obj in objs))

View File

@ -33,4 +33,11 @@ class NamedTupleTest(TestCase):
self.assertTupleEqual((1, 2, 4), tuple(TestTuple(a=1, b=2, c=4))) self.assertTupleEqual((1, 2, 4), tuple(TestTuple(a=1, b=2, c=4)))
self.assertTupleEqual((1, 2, 3), tuple(TestTuple(a=1, b=2))) self.assertTupleEqual((1, 2, 3), tuple(TestTuple(a=1, b=2)))
def test_equal(self):
TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'))
t1 = TestTuple(1, 2, 3)
t2 = TestTuple(1, 2, 3)
self.assertEqual(t1, t2)
self.assertEqual((1, 2, 3), t1)

View File

@ -1,10 +1,7 @@
import datetime
import pytz
from django.test import TestCase from django.test import TestCase
from django_clickhouse.migrations import migrate_app
from django_clickhouse.database import connections from django_clickhouse.database import connections
from django_clickhouse.migrations import migrate_app
from tests.clickhouse_models import ClickHouseCollapseTestModel from tests.clickhouse_models import ClickHouseCollapseTestModel
from tests.models import TestModel from tests.models import TestModel
@ -16,36 +13,43 @@ class CollapsingMergeTreeTest(TestCase):
collapse_fixture = [{ collapse_fixture = [{
"id": 1, "id": 1,
"created": "2018-01-01 00:00:00", "created": "2018-01-01 00:00:00",
"value": 0,
"sign": 1, "sign": 1,
"version": 1 "version": 1
}, { }, {
"id": 1, "id": 1,
"created": "2018-01-01 00:00:00", "created": "2018-01-01 00:00:00",
"value": 0,
"sign": -1, "sign": -1,
"version": 1 "version": 1
}, { }, {
"id": 1, "id": 1,
"created": "2018-01-01 00:00:00", "created": "2018-01-01 00:00:00",
"value": 0,
"sign": 1, "sign": 1,
"version": 2 "version": 2
}, { }, {
"id": 1, "id": 1,
"created": "2018-01-01 00:00:00", "created": "2018-01-01 00:00:00",
"value": 0,
"sign": -1, "sign": -1,
"version": 2 "version": 2
}, { }, {
"id": 1, "id": 1,
"created": "2018-01-01 00:00:00", "created": "2018-01-01 00:00:00",
"value": 0,
"sign": 1, "sign": 1,
"version": 3 "version": 3
}, { }, {
"id": 1, "id": 1,
"created": "2018-01-01 00:00:00", "created": "2018-01-01 00:00:00",
"value": 0,
"sign": -1, "sign": -1,
"version": 3 "version": 3
}, { }, {
"id": 1, "id": 1,
"created": "2018-01-01 00:00:00", "created": "2018-01-01 00:00:00",
"value": 0,
"sign": 1, "sign": 1,
"version": 4 "version": 4
}] }]
@ -62,43 +66,38 @@ class CollapsingMergeTreeTest(TestCase):
]) ])
self.objects = TestModel.objects.filter(id=1) self.objects = TestModel.objects.filter(id=1)
def _test_final_versions(self, final_versions):
final_versions = list(final_versions)
self.assertEqual(1, len(final_versions))
item = (final_versions[0].id, final_versions[0].sign, final_versions[0].version, final_versions[0].value)
self.assertTupleEqual((1, -1, 4, 0), item)
def test_get_final_versions_by_final_date(self): def test_get_final_versions_by_final_date(self):
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects) self.objects)
self._test_final_versions(final_versions)
self.assertEqual(1, len(final_versions))
self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0},
final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version')))
def test_get_final_versions_by_version_date(self): def test_get_final_versions_by_version_date(self):
ClickHouseCollapseTestModel.engine.version_col = 'version' ClickHouseCollapseTestModel.engine.version_col = 'version'
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects) self.objects)
self._test_final_versions(final_versions)
self.assertEqual(1, len(final_versions))
self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0},
final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version')))
def test_get_final_versions_by_final_datetime(self): def test_get_final_versions_by_final_datetime(self):
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects, date_col='created') self.objects, date_col='created')
self._test_final_versions(final_versions)
self.assertEqual(1, len(final_versions))
self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0},
final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version')))
def test_get_final_versions_by_version_datetime(self): def test_get_final_versions_by_version_datetime(self):
ClickHouseCollapseTestModel.engine.version_col = 'version' ClickHouseCollapseTestModel.engine.version_col = 'version'
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects, date_col='created') self.objects, date_col='created')
self._test_final_versions(final_versions)
self.assertEqual(1, len(final_versions))
self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0},
final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version')))
def test_versions(self): def test_versions(self):
ClickHouseCollapseTestModel.engine.version_col = 'version' ClickHouseCollapseTestModel.engine.version_col = 'version'
batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects) batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects)
batch = list(batch)
self.assertEqual(2, len(batch)) self.assertEqual(2, len(batch))
self.assertEqual(4, batch[0].version) self.assertEqual(4, batch[0].version)
self.assertEqual(-1, batch[0].sign) self.assertEqual(-1, batch[0].sign)

View File

@ -16,7 +16,6 @@ class Django2ClickHouseModelSerializerTest(TestCase):
def test_all(self): def test_all(self):
serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel) serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel)
res = serializer.serialize(self.obj) res = serializer.serialize(self.obj)
self.assertIsInstance(res, ClickHouseTestModel)
self.assertEqual(self.obj.id, res.id) self.assertEqual(self.obj.id, res.id)
self.assertEqual(self.obj.value, res.value) self.assertEqual(self.obj.value, res.value)
self.assertEqual(self.obj.created_date, res.created_date) self.assertEqual(self.obj.created_date, res.created_date)
@ -24,7 +23,6 @@ class Django2ClickHouseModelSerializerTest(TestCase):
def test_fields(self): def test_fields(self):
serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel, fields=('value',)) serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel, fields=('value',))
res = serializer.serialize(self.obj) res = serializer.serialize(self.obj)
self.assertIsInstance(res, ClickHouseTestModel)
self.assertEqual(0, res.id) self.assertEqual(0, res.id)
self.assertEqual(datetime.date(1970, 1, 1), res.created_date) self.assertEqual(datetime.date(1970, 1, 1), res.created_date)
self.assertEqual(self.obj.value, res.value) self.assertEqual(self.obj.value, res.value)
@ -32,7 +30,6 @@ class Django2ClickHouseModelSerializerTest(TestCase):
def test_exclude_fields(self): def test_exclude_fields(self):
serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel, exclude_fields=('created_date',)) serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel, exclude_fields=('created_date',))
res = serializer.serialize(self.obj) res = serializer.serialize(self.obj)
self.assertIsInstance(res, ClickHouseTestModel)
self.assertEqual(datetime.date(1970, 1, 1), res.created_date) self.assertEqual(datetime.date(1970, 1, 1), res.created_date)
self.assertEqual(self.obj.id, res.id) self.assertEqual(self.obj.id, res.id)
self.assertEqual(self.obj.value, res.value) self.assertEqual(self.obj.value, res.value)

View File

@ -53,7 +53,7 @@ class SyncTest(TransactionTestCase):
ClickHouseCollapseTestModel.sync_batch_from_storage() ClickHouseCollapseTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel)) synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
self.assertGreaterEqual(1, len(synced_data)) self.assertGreaterEqual(len(synced_data), 1)
self.assertEqual(obj.value, synced_data[0].value) self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id) self.assertEqual(obj.id, synced_data[0].id)
@ -76,7 +76,7 @@ class SyncTest(TransactionTestCase):
ClickHouseCollapseTestModel.sync_batch_from_storage() ClickHouseCollapseTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel)) synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
self.assertGreaterEqual(1, len(synced_data)) self.assertGreaterEqual(len(synced_data), 1)
self.assertEqual(obj.value, synced_data[0].value) self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id) self.assertEqual(obj.id, synced_data[0].id)
@ -116,7 +116,7 @@ class SyncTest(TransactionTestCase):
ClickHouseMultiTestModel.sync_batch_from_storage() ClickHouseMultiTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel)) synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
self.assertGreaterEqual(1, len(synced_data)) self.assertGreaterEqual(len(synced_data), 1)
self.assertEqual(obj.value, synced_data[0].value) self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id) self.assertEqual(obj.id, synced_data[0].id)
@ -154,9 +154,14 @@ class KillTest(TransactionTestCase):
self.sync_iteration(False) self.sync_iteration(False)
sync_left = storage.operations_count(import_key) sync_left = storage.operations_count(import_key)
ch_data = list(connections['default'].select('SELECT * FROM $table FINAL ORDER BY id', logger.debug('django_clickhouse: sync finished')
ch_data = list(connections['default'].select_tuples('SELECT * FROM $table FINAL ORDER BY id',
model_class=ClickHouseCollapseTestModel)) model_class=ClickHouseCollapseTestModel))
logger.debug('django_clickhouse: got clickhouse data')
pg_data = list(TestModel.objects.all().order_by('id')) pg_data = list(TestModel.objects.all().order_by('id'))
logger.debug('django_clickhouse: got postgres data')
if len(pg_data) != len(ch_data): if len(pg_data) != len(ch_data):
absent_ids = set(item.id for item in pg_data) - set(item.id for item in ch_data) absent_ids = set(item.id for item in pg_data) - set(item.id for item in ch_data)
@ -166,7 +171,8 @@ class KillTest(TransactionTestCase):
self.assertEqual(len(pg_data), len(ch_data)) self.assertEqual(len(pg_data), len(ch_data))
serializer = ClickHouseCollapseTestModel.get_django_model_serializer() serializer = ClickHouseCollapseTestModel.get_django_model_serializer()
self.assertListEqual(ch_data, list(serializer.serialize_many(pg_data))) for pg_item, ch_item in zip(pg_data, ch_data):
self.assertEqual(ch_item, serializer.serialize(pg_item))
@classmethod @classmethod
def sync_iteration(cls, kill=True): def sync_iteration(cls, kill=True):

View File

@ -1,5 +1,7 @@
import datetime import datetime
import time
from queue import Queue from queue import Queue
from time import gmtime, localtime
import pytz import pytz
from django.test import TestCase from django.test import TestCase
@ -12,7 +14,9 @@ from django_clickhouse.utils import get_tz_offset, format_datetime, lazy_class_i
class GetTZOffsetTest(TestCase): class GetTZOffsetTest(TestCase):
def test_func(self): def test_func(self):
self.assertEqual(300, get_tz_offset()) ts = time.time()
utc_offset = (datetime.datetime.fromtimestamp(ts) - datetime.datetime.utcfromtimestamp(ts)).total_seconds()
self.assertEqual(utc_offset / 60, get_tz_offset())
class FormatDateTimeTest(TestCase): class FormatDateTimeTest(TestCase):