From 16dce3d0470d7ba7f6ee788d5647919307c2c640 Mon Sep 17 00:00:00 2001 From: M1ha Date: Sun, 3 Feb 2019 14:24:21 +0500 Subject: [PATCH] Changed library to using generators instead of lists and namedtuple instead of infi Model instance, for been more efficient. Problems with sync_test --- requirements.txt | 3 +- src/django_clickhouse/clickhouse_models.py | 66 ++++++----------- src/django_clickhouse/compatibility.py | 9 ++- src/django_clickhouse/database.py | 82 +++++++++++++++++++--- src/django_clickhouse/engines.py | 82 ++++++++++++---------- src/django_clickhouse/serializers.py | 12 ++-- tests/test_compatibility.py | 7 ++ tests/test_engines.py | 39 +++++----- tests/test_serializers.py | 3 - tests/test_sync.py | 18 +++-- tests/test_utils.py | 6 +- 11 files changed, 193 insertions(+), 134 deletions(-) diff --git a/requirements.txt b/requirements.txt index 208c860..1283c62 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ typing psycopg2 infi.clickhouse-orm celery -statsd \ No newline at end of file +statsd +django-pg-returning \ No newline at end of file diff --git a/src/django_clickhouse/clickhouse_models.py b/src/django_clickhouse/clickhouse_models.py index b33f5e9..057a91c 100644 --- a/src/django_clickhouse/clickhouse_models.py +++ b/src/django_clickhouse/clickhouse_models.py @@ -2,12 +2,12 @@ This file defines base abstract models to inherit from """ import datetime -from collections import defaultdict -from itertools import chain -from typing import List, Tuple, Iterable, Set, Any, Dict, Optional - import logging -import pytz +from collections import defaultdict +from copy import deepcopy +from itertools import chain +from typing import List, Tuple, Iterable, Set, Any, Optional + from django.db.models import Model as DjangoModel, QuerySet as DjangoQuerySet from infi.clickhouse_orm.database import Database from infi.clickhouse_orm.engines import CollapsingMergeTree @@ -15,6 +15,7 @@ from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiMode from six import with_metaclass from statsd.defaults.django import statsd +from .compatibility import namedtuple from .configuration import config from .database import connections from .exceptions import RedisLockTimeoutError @@ -23,7 +24,6 @@ from .query import QuerySet from .serializers import Django2ClickHouseModelSerializer from .utils import lazy_class_import, exec_multi_arg_func - logger = logging.getLogger('django-clickhouse') @@ -63,40 +63,16 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): # This attribute is initialized in metaclass, as it must get model class as a parameter objects = None # type: QuerySet - def __init__(self, **kwargs): - multi_init = kwargs.pop('__multi_init', False) - if multi_init: - pass - else: - super(ClickHouseModel, self).__init__(**kwargs) - @classmethod - def init_many(cls, kwargs_list, database=None): - # type: (Iterable[Dict[str, Any]], Optional[Database]) -> Iterable['ClickHouseModel'] - """ - Basic __init__ methods if not effective if we need to init 100k objects - :return: A list of inited classes - """ - assert database is None or isinstance(database, Database), "database must be database.Database instance" + def get_tuple_class(cls, field_names=None, defaults=None): + field_names = field_names or tuple(cls.fields(writable=False).keys()) + if defaults: + defaults_new = deepcopy(cls._defaults) + defaults_new.update(defaults) + else: + defaults_new = cls._defaults - # Assign default values - valid_field_names = set(cls._fields.keys()) - for kwargs in kwargs_list: - invalid_fields = set(kwargs.keys()) - valid_field_names - if invalid_fields: - raise AttributeError('%s does not have a fields called %s' % (cls.__name__, ', '.join(invalid_fields))) - - item = cls(__multi_init=True) - item.__dict__.update(cls._defaults) - item._database = database - - for name in kwargs: - field = cls._fields[name] - kwargs[name] = field.to_python(kwargs[name], pytz.utc) - field.validate(kwargs[name]) - - item.__dict__.update(kwargs) - yield item + return namedtuple("%sTuple" % cls.__name__, field_names, defaults=defaults_new) @classmethod def objects_in(cls, database): # type: (Database) -> QuerySet @@ -128,9 +104,10 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): return connections[db_alias] @classmethod - def get_django_model_serializer(cls, writable=False): # type: (bool) -> Django2ClickHouseModelSerializer + def get_django_model_serializer(cls, writable=False, defaults=None): + # type: (bool, Optional[dict]) -> Django2ClickHouseModelSerializer serializer_cls = lazy_class_import(cls.django_model_serializer) - return serializer_cls(cls, writable=writable) + return serializer_cls(cls, writable=writable, defaults=defaults) @classmethod def get_sync_batch_size(cls): @@ -223,8 +200,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): :return: """ if batch: - # +10 constant is to avoid 0 insert, generated by infi - cls.get_database(for_write=True).insert(batch, batch_size=len(batch) + 10) + cls.get_database(for_write=True).insert_tuples(cls, batch) @classmethod def sync_batch_from_storage(cls): @@ -257,7 +233,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): if import_objects: with statsd.timer(statsd_key.format('steps.get_insert_batch')): batch = cls.get_insert_batch(import_objects) - statsd.incr(statsd_key.format('insert_batch'), len(batch)) + # statsd.incr(statsd_key.format('insert_batch'), len(batch)) with statsd.timer(statsd_key.format('steps.insert')): cls.insert_batch(batch) @@ -333,8 +309,8 @@ class ClickHouseMultiModel(ClickHouseModel): model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__) with statsd.timer(model_statsd_key.format('steps.get_insert_batch')): batch = model_cls.get_insert_batch(import_objects) - statsd.incr(model_statsd_key.format('insert_batch'), len(batch)) - statsd.incr(statsd_key.format('insert_batch'), len(batch)) + # statsd.incr(model_statsd_key.format('insert_batch'), len(batch)) + # statsd.incr(statsd_key.format('insert_batch'), len(batch)) return model_cls, batch res = exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models)) diff --git a/src/django_clickhouse/compatibility.py b/src/django_clickhouse/compatibility.py index e95605c..90a55ac 100644 --- a/src/django_clickhouse/compatibility.py +++ b/src/django_clickhouse/compatibility.py @@ -13,11 +13,14 @@ class NamedTuple: @classmethod @lru_cache(maxsize=32) def _get_defaults(cls, exclude): - res = cls._defaults + res = deepcopy(cls._defaults) for k in exclude: res.pop(k, None) return res + def _astuple(self): + return tuple(self._data) + def __init__(self, *args, **kwargs): new_kwargs = deepcopy(self._get_defaults(self._data_cls._fields[:len(args)])) new_kwargs.update(kwargs) @@ -33,6 +36,10 @@ class NamedTuple: def __next__(self): return next(self._data_iterator) + def __eq__(self, other): + other_tuple = other._astuple() if isinstance(other, NamedTuple) else other + return tuple(self._data) == other_tuple + def namedtuple(*args, **kwargs): """ diff --git a/src/django_clickhouse/database.py b/src/django_clickhouse/database.py index d1b2658..979d34e 100644 --- a/src/django_clickhouse/database.py +++ b/src/django_clickhouse/database.py @@ -1,6 +1,9 @@ -from infi.clickhouse_orm.database import Database as InfiDatabase +from typing import Generator, Optional, Type, Iterable + +from infi.clickhouse_orm.database import Database as InfiDatabase, DatabaseException from infi.clickhouse_orm.utils import parse_tsv from six import next +from io import BytesIO from .configuration import config from .exceptions import DBAliasError @@ -27,29 +30,88 @@ class Database(InfiDatabase): def _get_applied_migrations(self, migrations_package_name): raise NotImplementedError("This method is not supported by django_clickhouse.") - def select_init_many(self, query, model_class, settings=None): + def select_tuples(self, query, model_class, settings=None): + # type: (str, Type['ClickHouseModel'], Optional[dict], Optional[dict]) -> Generator[tuple] """ - Base select doesn't use init_mult which is ineffective on big result lists + This method selects model_class namedtuples, instead of class instances. + Less memory consumption, greater speed + :param query: Query to execute. Can contain substitution variables. + :param model_class: A class of model to get fields from + :param settings: Optional connection settings + :return: Generator of namedtuple objects """ query += ' FORMAT TabSeparatedWithNames' query = self._substitute(query, model_class) r = self._send(query, settings, True) lines = r.iter_lines() field_names = parse_tsv(next(lines)) + fields = [ + field for name, field in model_class.fields(writable=True).items() + if name in field_names + ] + res_class = model_class.get_tuple_class(field_names) - kwargs_list = [] for line in lines: # skip blank line left by WITH TOTALS modifier if line: values = iter(parse_tsv(line)) - kwargs = {} - for name in field_names: - field = getattr(model_class, name) - kwargs[name] = field.to_python(next(values), self.server_timezone) + item = res_class(*( + fields[i].to_python(next(values), self.server_timezone) + for i, r in enumerate(parse_tsv(line)) + )) - kwargs_list.append(kwargs) + yield item - return model_class.init_many(kwargs_list, database=self) + def insert_tuples(self, model_class, model_tuples, batch_size=None): + # type: (Type['ClickHouseModel'], Iterable[tuple], Optional[int]) -> None + """ + Inserts model_class namedtuples + :param model_class: Clickhouse model, namedtuples are made from + :param model_tuples: An iterable of tuples to insert + :param batch_size: Size of batch + :return: None + """ + tuples_iterator = iter(model_tuples) + + try: + first_tuple = next(tuples_iterator) + except StopIteration: + return # model_instances is empty + + if model_class.is_read_only() or model_class.is_system_model(): + raise DatabaseException("You can't insert into read only and system tables") + + fields_list = ','.join('`%s`' % name for name in first_tuple._fields) + fields = [f for name, f in model_class.fields(writable=True).items() if name in first_tuple._fields] + + def tuple_to_csv(tup): + return '\t'.join(field.to_db_string(val, quote=False) for field, val in zip(fields, tup)) + '\n' + + def gen(): + buf = BytesIO() + query = 'INSERT INTO `%s`.`%s` (%s) FORMAT TabSeparated\n' \ + % (self.db_name, model_class.table_name(), fields_list) + buf.write(query.encode('utf-8')) + buf.write(tuple_to_csv(first_tuple).encode('utf-8')) + + # Collect lines in batches of batch_size + lines = 2 + for t in tuples_iterator: + buf.write(tuple_to_csv(t).encode('utf-8')) + + lines += 1 + if batch_size is not None and lines >= batch_size: + # Return the current batch of lines + yield buf.getvalue() + # Start a new batch + buf = BytesIO() + lines = 0 + + # Return any remaining lines in partial batch + if lines: + yield buf.getvalue() + + self._send(gen()) class ConnectionProxy: diff --git a/src/django_clickhouse/engines.py b/src/django_clickhouse/engines.py index 5694bf5..2b9d2bc 100644 --- a/src/django_clickhouse/engines.py +++ b/src/django_clickhouse/engines.py @@ -2,31 +2,28 @@ This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse """ import datetime -from typing import List, TypeVar, Type, Union, Iterable +from typing import List, Type, Union, Iterable, Generator from django.db.models import Model as DjangoModel from infi.clickhouse_orm import engines as infi_engines -from infi.clickhouse_orm.models import Model as InfiModel from statsd.defaults.django import statsd -from django_clickhouse.database import connections from .configuration import config +from .database import connections from .utils import format_datetime -T = TypeVar('T') - class InsertOnlyEngineMixin: def get_insert_batch(self, model_cls, objects): - # type: (Type[T], List[DjangoModel]) -> List[T] + # type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple] """ Gets a list of model_cls instances to insert into database :param model_cls: ClickHouseModel subclass to import :param objects: A list of django Model instances to sync - :return: An iterator of model_cls instances + :return: A generator of model_cls named tuples """ serializer = model_cls.get_django_model_serializer(writable=True) - return list(serializer.serialize_many(objects)) + return (serializer.serialize(obj) for obj in objects) class MergeTree(InsertOnlyEngineMixin, infi_engines.MergeTree): @@ -48,33 +45,32 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre self.version_col = kwargs.pop('version_col', None) super(CollapsingMergeTree, self).__init__(*args, **kwargs) - def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col): + def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns): query = """ - SELECT * FROM $table WHERE (`{pk_column}`, `{version_col}`) IN ( + SELECT {columns} FROM $table WHERE (`{pk_column}`, `{version_col}`) IN ( SELECT `{pk_column}`, MAX(`{version_col}`) FROM $table PREWHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}' AND `{pk_column}` IN ({object_pks}) GROUP BY `{pk_column}` ) - """.format(version_col=self.version_col, date_col=date_col, pk_column=self.pk_column, + """.format(columns=','.join(columns), version_col=self.version_col, date_col=date_col, pk_column=self.pk_column, min_date=min_date, max_date=max_date, object_pks=','.join(object_pks)) - qs = connections[db_alias].select_init_many(query, model_cls) - return list(qs) + return connections[db_alias].select_tuples(query, model_cls) - def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col): + def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns): query = """ - SELECT * FROM $table FINAL + SELECT {columns} FROM $table FINAL WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}' AND `{pk_column}` IN ({object_pks}) """ - query = query.format(date_col=date_col, pk_column=self.pk_column, min_date=min_date, + query = query.format(columns=','.join(columns), date_col=date_col, pk_column=self.pk_column, min_date=min_date, max_date=max_date, object_pks=','.join(object_pks)) - qs = connections[db_alias].select_init_many(query, model_cls) - return list(qs) + return connections[db_alias].select_tuples(query, model_cls) def get_final_versions(self, model_cls, objects, date_col=None): + # type: (Type['ClickHouseModel'], Iterable[DjangoModel], str) -> Generator[tuple] """ Get objects, that are currently stored in ClickHouse. Depending on the partition key this can be different for different models. @@ -82,7 +78,8 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre It also supposes primary key to by self.pk_column :param model_cls: ClickHouseModel subclass to import :param objects: Objects for which final versions are searched - :return: A list of model objects + :param date_col: Optional column name, where partiion date is hold. Defaults to self.date_col + :return: A generator of named tuples, representing previous state """ def _dt_to_str(dt): # type: (Union[datetime.date, datetime.datetime]) -> str @@ -94,7 +91,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre raise Exception('Invalid date or datetime object: `%s`' % dt) if not objects: - return [] + raise StopIteration() date_col = date_col or self.date_col min_date, max_date = None, None @@ -114,41 +111,48 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre min_date = _dt_to_str(min_date) max_date = _dt_to_str(max_date) + # Get fields. Sign is replaced to negative for further processing + columns = list(model_cls.fields(writable=True).keys()) + columns.remove(self.sign_col) + columns.append('-1 AS sign') + + params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns) + if self.version_col: - return self._get_final_versions_by_version(db_alias, model_cls, min_date, max_date, object_pks, date_col) + return self._get_final_versions_by_version(*params) else: - return self._get_final_versions_by_final(db_alias, model_cls, min_date, max_date, object_pks, date_col) + return self._get_final_versions_by_final(*params) def get_insert_batch(self, model_cls, objects): - # type: (Type[T], List[DjangoModel]) -> List[T] + # type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple] """ Gets a list of model_cls instances to insert into database :param model_cls: ClickHouseModel subclass to import :param objects: A list of django Model instances to sync :return: A list of model_cls objects """ - new_objs = super(CollapsingMergeTree, self).get_insert_batch(model_cls, objects) + defaults = {self.sign_col: 1} + if self.version_col: + defaults[self.version_col] = 1 + serializer = model_cls.get_django_model_serializer(writable=True, defaults=defaults) + new_objs = [serializer.serialize(obj) for obj in objects] statsd_key = "%s.sync.%s.steps.get_final_versions" % (config.STATSD_PREFIX, model_cls.__name__) with statsd.timer(statsd_key): - old_objs = self.get_final_versions(model_cls, new_objs) + old_objs = list(self.get_final_versions(model_cls, new_objs)) + # -1 sign has been set get_final_versions() old_objs_versions = {} for obj in old_objs: - self.set_obj_sign(obj, -1) - old_objs_versions[obj.id] = getattr(obj, self.version_col) - - for obj in new_objs: - self.set_obj_sign(obj, 1) - + pk = getattr(obj, self.pk_column) if self.version_col: - setattr(obj, self.version_col, old_objs_versions.get(obj.id, 0) + 1) + old_objs_versions[pk] = getattr(obj, self.version_col) + yield obj - return old_objs + new_objs + # 1 sign is set by default in serializer + for obj in new_objs: + pk = getattr(obj, self.pk_column) + if self.version_col: + setattr(obj, self.version_col, old_objs_versions.get(pk, 0) + 1) - def set_obj_sign(self, obj, sign): # type: (InfiModel, int) -> None - """ - Sets objects sign. By default gets attribute name from sign_col - :return: None - """ - setattr(obj, self.sign_col, sign) + yield obj diff --git a/src/django_clickhouse/serializers.py b/src/django_clickhouse/serializers.py index 56ebf98..b4aaefe 100644 --- a/src/django_clickhouse/serializers.py +++ b/src/django_clickhouse/serializers.py @@ -1,11 +1,9 @@ from django.db.models import Model as DjangoModel -from typing import List, Iterable - from django_clickhouse.utils import model_to_dict class Django2ClickHouseModelSerializer: - def __init__(self, model_cls, fields=None, exclude_fields=None, writable=False): + def __init__(self, model_cls, fields=None, exclude_fields=None, writable=False, defaults=None): self._model_cls = model_cls if fields is not None: self.serialize_fields = fields @@ -13,6 +11,7 @@ class Django2ClickHouseModelSerializer: self.serialize_fields = model_cls.fields(writable=writable).keys() self.exclude_serialize_fields = exclude_fields + self._result_class = self._model_cls.get_tuple_class(defaults=defaults) def _get_serialize_kwargs(self, obj): data = model_to_dict(obj, fields=self.serialize_fields, exclude_fields=self.exclude_serialize_fields) @@ -29,8 +28,5 @@ class Django2ClickHouseModelSerializer: return result - def serialize(self, obj): # type: (DjangoModel) -> 'ClickHouseModel' - return self._model_cls(**self._get_serialize_kwargs(obj)) - - def serialize_many(self, objs): # type: (Iterable[DjangoModel]) -> Iterable['ClickHouseModel'] - return self._model_cls.init_many((self._get_serialize_kwargs(obj) for obj in objs)) + def serialize(self, obj): # type: (DjangoModel) -> tuple + return self._result_class(**self._get_serialize_kwargs(obj)) diff --git a/tests/test_compatibility.py b/tests/test_compatibility.py index 5b113db..905e6b6 100644 --- a/tests/test_compatibility.py +++ b/tests/test_compatibility.py @@ -33,4 +33,11 @@ class NamedTupleTest(TestCase): self.assertTupleEqual((1, 2, 4), tuple(TestTuple(a=1, b=2, c=4))) self.assertTupleEqual((1, 2, 3), tuple(TestTuple(a=1, b=2))) + def test_equal(self): + TestTuple = namedtuple('TestTuple', ('a', 'b', 'c')) + t1 = TestTuple(1, 2, 3) + t2 = TestTuple(1, 2, 3) + self.assertEqual(t1, t2) + self.assertEqual((1, 2, 3), t1) + diff --git a/tests/test_engines.py b/tests/test_engines.py index 7fd103f..6852e8f 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -1,10 +1,7 @@ -import datetime - -import pytz from django.test import TestCase -from django_clickhouse.migrations import migrate_app from django_clickhouse.database import connections +from django_clickhouse.migrations import migrate_app from tests.clickhouse_models import ClickHouseCollapseTestModel from tests.models import TestModel @@ -16,36 +13,43 @@ class CollapsingMergeTreeTest(TestCase): collapse_fixture = [{ "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": 1, "version": 1 }, { "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": -1, "version": 1 }, { "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": 1, "version": 2 }, { "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": -1, "version": 2 }, { "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": 1, "version": 3 }, { "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": -1, "version": 3 }, { "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": 1, "version": 4 }] @@ -62,43 +66,38 @@ class CollapsingMergeTreeTest(TestCase): ]) self.objects = TestModel.objects.filter(id=1) + def _test_final_versions(self, final_versions): + final_versions = list(final_versions) + self.assertEqual(1, len(final_versions)) + item = (final_versions[0].id, final_versions[0].sign, final_versions[0].version, final_versions[0].value) + self.assertTupleEqual((1, -1, 4, 0), item) + def test_get_final_versions_by_final_date(self): final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, self.objects) - - self.assertEqual(1, len(final_versions)) - self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0}, - final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version'))) + self._test_final_versions(final_versions) def test_get_final_versions_by_version_date(self): ClickHouseCollapseTestModel.engine.version_col = 'version' final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, self.objects) - - self.assertEqual(1, len(final_versions)) - self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0}, - final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version'))) + self._test_final_versions(final_versions) def test_get_final_versions_by_final_datetime(self): final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, self.objects, date_col='created') - - self.assertEqual(1, len(final_versions)) - self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0}, - final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version'))) + self._test_final_versions(final_versions) def test_get_final_versions_by_version_datetime(self): ClickHouseCollapseTestModel.engine.version_col = 'version' final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, self.objects, date_col='created') - - self.assertEqual(1, len(final_versions)) - self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0}, - final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version'))) + self._test_final_versions(final_versions) def test_versions(self): ClickHouseCollapseTestModel.engine.version_col = 'version' batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects) + batch = list(batch) self.assertEqual(2, len(batch)) self.assertEqual(4, batch[0].version) self.assertEqual(-1, batch[0].sign) diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 893ff02..84c8526 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -16,7 +16,6 @@ class Django2ClickHouseModelSerializerTest(TestCase): def test_all(self): serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel) res = serializer.serialize(self.obj) - self.assertIsInstance(res, ClickHouseTestModel) self.assertEqual(self.obj.id, res.id) self.assertEqual(self.obj.value, res.value) self.assertEqual(self.obj.created_date, res.created_date) @@ -24,7 +23,6 @@ class Django2ClickHouseModelSerializerTest(TestCase): def test_fields(self): serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel, fields=('value',)) res = serializer.serialize(self.obj) - self.assertIsInstance(res, ClickHouseTestModel) self.assertEqual(0, res.id) self.assertEqual(datetime.date(1970, 1, 1), res.created_date) self.assertEqual(self.obj.value, res.value) @@ -32,7 +30,6 @@ class Django2ClickHouseModelSerializerTest(TestCase): def test_exclude_fields(self): serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel, exclude_fields=('created_date',)) res = serializer.serialize(self.obj) - self.assertIsInstance(res, ClickHouseTestModel) self.assertEqual(datetime.date(1970, 1, 1), res.created_date) self.assertEqual(self.obj.id, res.id) self.assertEqual(self.obj.value, res.value) diff --git a/tests/test_sync.py b/tests/test_sync.py index c24276a..af2b682 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -53,7 +53,7 @@ class SyncTest(TransactionTestCase): ClickHouseCollapseTestModel.sync_batch_from_storage() synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel)) - self.assertGreaterEqual(1, len(synced_data)) + self.assertGreaterEqual(len(synced_data), 1) self.assertEqual(obj.value, synced_data[0].value) self.assertEqual(obj.id, synced_data[0].id) @@ -76,7 +76,7 @@ class SyncTest(TransactionTestCase): ClickHouseCollapseTestModel.sync_batch_from_storage() synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel)) - self.assertGreaterEqual(1, len(synced_data)) + self.assertGreaterEqual(len(synced_data), 1) self.assertEqual(obj.value, synced_data[0].value) self.assertEqual(obj.id, synced_data[0].id) @@ -116,7 +116,7 @@ class SyncTest(TransactionTestCase): ClickHouseMultiTestModel.sync_batch_from_storage() synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel)) - self.assertGreaterEqual(1, len(synced_data)) + self.assertGreaterEqual(len(synced_data), 1) self.assertEqual(obj.value, synced_data[0].value) self.assertEqual(obj.id, synced_data[0].id) @@ -154,9 +154,14 @@ class KillTest(TransactionTestCase): self.sync_iteration(False) sync_left = storage.operations_count(import_key) - ch_data = list(connections['default'].select('SELECT * FROM $table FINAL ORDER BY id', - model_class=ClickHouseCollapseTestModel)) + logger.debug('django_clickhouse: sync finished') + + ch_data = list(connections['default'].select_tuples('SELECT * FROM $table FINAL ORDER BY id', + model_class=ClickHouseCollapseTestModel)) + logger.debug('django_clickhouse: got clickhouse data') + pg_data = list(TestModel.objects.all().order_by('id')) + logger.debug('django_clickhouse: got postgres data') if len(pg_data) != len(ch_data): absent_ids = set(item.id for item in pg_data) - set(item.id for item in ch_data) @@ -166,7 +171,8 @@ class KillTest(TransactionTestCase): self.assertEqual(len(pg_data), len(ch_data)) serializer = ClickHouseCollapseTestModel.get_django_model_serializer() - self.assertListEqual(ch_data, list(serializer.serialize_many(pg_data))) + for pg_item, ch_item in zip(pg_data, ch_data): + self.assertEqual(ch_item, serializer.serialize(pg_item)) @classmethod def sync_iteration(cls, kill=True): diff --git a/tests/test_utils.py b/tests/test_utils.py index e1fcff2..e0a4667 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,5 +1,7 @@ import datetime +import time from queue import Queue +from time import gmtime, localtime import pytz from django.test import TestCase @@ -12,7 +14,9 @@ from django_clickhouse.utils import get_tz_offset, format_datetime, lazy_class_i class GetTZOffsetTest(TestCase): def test_func(self): - self.assertEqual(300, get_tz_offset()) + ts = time.time() + utc_offset = (datetime.datetime.fromtimestamp(ts) - datetime.datetime.utcfromtimestamp(ts)).total_seconds() + self.assertEqual(utc_offset / 60, get_tz_offset()) class FormatDateTimeTest(TestCase):