diff --git a/.gitignore b/.gitignore index 894a44c..5bbb08d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,9 @@ __pycache__/ *.py[cod] *$py.class +# JetBrains files +.idea + # C extensions *.so @@ -102,3 +105,4 @@ venv.bak/ # mypy .mypy_cache/ +/.idea/dataSources.xml diff --git a/requirements.txt b/requirements.txt index 208c860..1283c62 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ typing psycopg2 infi.clickhouse-orm celery -statsd \ No newline at end of file +statsd +django-pg-returning \ No newline at end of file diff --git a/src/django_clickhouse/clickhouse_models.py b/src/django_clickhouse/clickhouse_models.py index b33f5e9..594aa06 100644 --- a/src/django_clickhouse/clickhouse_models.py +++ b/src/django_clickhouse/clickhouse_models.py @@ -2,28 +2,27 @@ This file defines base abstract models to inherit from """ import datetime -from collections import defaultdict -from itertools import chain -from typing import List, Tuple, Iterable, Set, Any, Dict, Optional - import logging -import pytz +from collections import defaultdict +from copy import deepcopy +from itertools import chain +from typing import List, Tuple, Iterable, Set, Any, Optional + from django.db.models import Model as DjangoModel, QuerySet as DjangoQuerySet -from infi.clickhouse_orm.database import Database from infi.clickhouse_orm.engines import CollapsingMergeTree from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase from six import with_metaclass from statsd.defaults.django import statsd +from .compatibility import namedtuple from .configuration import config -from .database import connections +from .database import connections, Database from .exceptions import RedisLockTimeoutError from .models import ClickHouseSyncModel from .query import QuerySet from .serializers import Django2ClickHouseModelSerializer from .utils import lazy_class_import, exec_multi_arg_func - logger = logging.getLogger('django-clickhouse') @@ -63,40 +62,20 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): # This attribute is initialized in metaclass, as it must get model class as a parameter objects = None # type: QuerySet - def __init__(self, **kwargs): - multi_init = kwargs.pop('__multi_init', False) - if multi_init: - pass - else: - super(ClickHouseModel, self).__init__(**kwargs) - @classmethod - def init_many(cls, kwargs_list, database=None): - # type: (Iterable[Dict[str, Any]], Optional[Database]) -> Iterable['ClickHouseModel'] - """ - Basic __init__ methods if not effective if we need to init 100k objects - :return: A list of inited classes - """ - assert database is None or isinstance(database, Database), "database must be database.Database instance" + def get_tuple_class(cls, field_names=None, defaults=None): + field_names = field_names or cls.fields(writable=False).keys() - # Assign default values - valid_field_names = set(cls._fields.keys()) - for kwargs in kwargs_list: - invalid_fields = set(kwargs.keys()) - valid_field_names - if invalid_fields: - raise AttributeError('%s does not have a fields called %s' % (cls.__name__, ', '.join(invalid_fields))) + # Strange, but sometimes the columns are in different order... + field_names = tuple(sorted(field_names)) - item = cls(__multi_init=True) - item.__dict__.update(cls._defaults) - item._database = database + if defaults: + defaults_new = deepcopy(cls._defaults) + defaults_new.update(defaults) + else: + defaults_new = cls._defaults - for name in kwargs: - field = cls._fields[name] - kwargs[name] = field.to_python(kwargs[name], pytz.utc) - field.validate(kwargs[name]) - - item.__dict__.update(kwargs) - yield item + return namedtuple("%sTuple" % cls.__name__, field_names, defaults=defaults_new) @classmethod def objects_in(cls, database): # type: (Database) -> QuerySet @@ -128,9 +107,10 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): return connections[db_alias] @classmethod - def get_django_model_serializer(cls, writable=False): # type: (bool) -> Django2ClickHouseModelSerializer + def get_django_model_serializer(cls, writable=False, defaults=None): + # type: (bool, Optional[dict]) -> Django2ClickHouseModelSerializer serializer_cls = lazy_class_import(cls.django_model_serializer) - return serializer_cls(cls, writable=writable) + return serializer_cls(cls, writable=writable, defaults=defaults) @classmethod def get_sync_batch_size(cls): @@ -223,8 +203,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): :return: """ if batch: - # +10 constant is to avoid 0 insert, generated by infi - cls.get_database(for_write=True).insert(batch, batch_size=len(batch) + 10) + cls.get_database(for_write=True).insert_tuples(cls, batch) @classmethod def sync_batch_from_storage(cls): @@ -257,7 +236,6 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): if import_objects: with statsd.timer(statsd_key.format('steps.get_insert_batch')): batch = cls.get_insert_batch(import_objects) - statsd.incr(statsd_key.format('insert_batch'), len(batch)) with statsd.timer(statsd_key.format('steps.insert')): cls.insert_batch(batch) @@ -333,8 +311,6 @@ class ClickHouseMultiModel(ClickHouseModel): model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__) with statsd.timer(model_statsd_key.format('steps.get_insert_batch')): batch = model_cls.get_insert_batch(import_objects) - statsd.incr(model_statsd_key.format('insert_batch'), len(batch)) - statsd.incr(statsd_key.format('insert_batch'), len(batch)) return model_cls, batch res = exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models)) diff --git a/src/django_clickhouse/compatibility.py b/src/django_clickhouse/compatibility.py new file mode 100644 index 0000000..8213afe --- /dev/null +++ b/src/django_clickhouse/compatibility.py @@ -0,0 +1,26 @@ +import sys +from collections import namedtuple as basenamedtuple, Mapping +from functools import lru_cache + +from copy import deepcopy + + +def namedtuple(*args, **kwargs): + """ + Changes namedtuple to support defaults parameter as python 3.7 does + https://docs.python.org/3.7/library/collections.html#collections.namedtuple + See https://stackoverflow.com/questions/11351032/namedtuple-and-default-values-for-optional-keyword-arguments + :return: namedtuple class + """ + if sys.version_info < (3, 7): + defaults = kwargs.pop('defaults', {}) + TupleClass = basenamedtuple(*args, **kwargs) + TupleClass.__new__.__defaults__ = (None,) * len(TupleClass._fields) + if isinstance(defaults, Mapping): + prototype = TupleClass(**defaults) + else: + prototype = TupleClass(*defaults) + TupleClass.__new__.__defaults__ = tuple(prototype) + return TupleClass + else: + return basenamedtuple(*args, **kwargs) diff --git a/src/django_clickhouse/database.py b/src/django_clickhouse/database.py index d1b2658..0ed6449 100644 --- a/src/django_clickhouse/database.py +++ b/src/django_clickhouse/database.py @@ -1,6 +1,10 @@ -from infi.clickhouse_orm.database import Database as InfiDatabase +from typing import Generator, Optional, Type, Iterable + +from infi.clickhouse_orm.database import Database as InfiDatabase, DatabaseException from infi.clickhouse_orm.utils import parse_tsv from six import next +from io import BytesIO +from statsd.defaults.django import statsd from .configuration import config from .exceptions import DBAliasError @@ -27,29 +31,92 @@ class Database(InfiDatabase): def _get_applied_migrations(self, migrations_package_name): raise NotImplementedError("This method is not supported by django_clickhouse.") - def select_init_many(self, query, model_class, settings=None): + def select_tuples(self, query, model_class, settings=None): + # type: (str, Type['ClickHouseModel'], Optional[dict], Optional[dict]) -> Generator[tuple] """ - Base select doesn't use init_mult which is ineffective on big result lists + This method selects model_class namedtuples, instead of class instances. + Less memory consumption, greater speed + :param query: Query to execute. Can contain substitution variables. + :param model_class: A class of model to get fields from + :param settings: Optional connection settings + :return: Generator of namedtuple objects """ query += ' FORMAT TabSeparatedWithNames' query = self._substitute(query, model_class) r = self._send(query, settings, True) lines = r.iter_lines() field_names = parse_tsv(next(lines)) + fields = [ + field for name, field in model_class.fields(writable=True).items() + if name in field_names + ] + res_class = model_class.get_tuple_class(field_names) - kwargs_list = [] for line in lines: # skip blank line left by WITH TOTALS modifier if line: values = iter(parse_tsv(line)) - kwargs = {} - for name in field_names: - field = getattr(model_class, name) - kwargs[name] = field.to_python(next(values), self.server_timezone) + item = res_class(**{ + field_name: fields[i].to_python(next(values), self.server_timezone) + for i, field_name in enumerate(field_names) + }) - kwargs_list.append(kwargs) + yield item - return model_class.init_many(kwargs_list, database=self) + def insert_tuples(self, model_class, model_tuples, batch_size=None): + # type: (Type['ClickHouseModel'], Iterable[tuple], Optional[int]) -> None + """ + Inserts model_class namedtuples + :param model_class: Clickhouse model, namedtuples are made from + :param model_tuples: An iterable of tuples to insert + :param batch_size: Size of batch + :return: None + """ + tuples_iterator = iter(model_tuples) + + try: + first_tuple = next(tuples_iterator) + except StopIteration: + return # model_instances is empty + + if model_class.is_read_only() or model_class.is_system_model(): + raise DatabaseException("You can't insert into read only and system tables") + + fields_list = ','.join('`%s`' % name for name in first_tuple._fields) + fields_dict = model_class.fields(writable=True) + fields = [fields_dict[name] for name in first_tuple._fields] + statsd_key = "%s.inserted_tuples.%s.{0}" % (config.STATSD_PREFIX, model_class.__name__) + + def tuple_to_csv(tup): + return '\t'.join(field.to_db_string(val, quote=False) for field, val in zip(fields, tup)) + '\n' + + def gen(): + buf = BytesIO() + query = 'INSERT INTO `%s`.`%s` (%s) FORMAT TabSeparated\n' \ + % (self.db_name, model_class.table_name(), fields_list) + buf.write(query.encode('utf-8')) + buf.write(tuple_to_csv(first_tuple).encode('utf-8')) + + # Collect lines in batches of batch_size + lines = 1 + for t in tuples_iterator: + buf.write(tuple_to_csv(t).encode('utf-8')) + + lines += 1 + if batch_size is not None and lines >= batch_size: + # Return the current batch of lines + statsd.incr(statsd_key.format('insert_batch'), lines) + yield buf.getvalue() + # Start a new batch + buf = BytesIO() + lines = 0 + + # Return any remaining lines in partial batch + if lines: + statsd.incr(statsd_key.format('insert_batch'), count=lines) + yield buf.getvalue() + + self._send(gen()) class ConnectionProxy: diff --git a/src/django_clickhouse/engines.py b/src/django_clickhouse/engines.py index 5694bf5..4b59389 100644 --- a/src/django_clickhouse/engines.py +++ b/src/django_clickhouse/engines.py @@ -2,31 +2,28 @@ This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse """ import datetime -from typing import List, TypeVar, Type, Union, Iterable +from typing import List, Type, Union, Iterable, Generator from django.db.models import Model as DjangoModel from infi.clickhouse_orm import engines as infi_engines -from infi.clickhouse_orm.models import Model as InfiModel from statsd.defaults.django import statsd -from django_clickhouse.database import connections from .configuration import config +from .database import connections from .utils import format_datetime -T = TypeVar('T') - class InsertOnlyEngineMixin: def get_insert_batch(self, model_cls, objects): - # type: (Type[T], List[DjangoModel]) -> List[T] + # type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple] """ Gets a list of model_cls instances to insert into database :param model_cls: ClickHouseModel subclass to import :param objects: A list of django Model instances to sync - :return: An iterator of model_cls instances + :return: A generator of model_cls named tuples """ serializer = model_cls.get_django_model_serializer(writable=True) - return list(serializer.serialize_many(objects)) + return (serializer.serialize(obj) for obj in objects) class MergeTree(InsertOnlyEngineMixin, infi_engines.MergeTree): @@ -48,33 +45,32 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre self.version_col = kwargs.pop('version_col', None) super(CollapsingMergeTree, self).__init__(*args, **kwargs) - def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col): + def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns): query = """ - SELECT * FROM $table WHERE (`{pk_column}`, `{version_col}`) IN ( + SELECT {columns} FROM $table WHERE (`{pk_column}`, `{version_col}`) IN ( SELECT `{pk_column}`, MAX(`{version_col}`) FROM $table PREWHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}' AND `{pk_column}` IN ({object_pks}) GROUP BY `{pk_column}` ) - """.format(version_col=self.version_col, date_col=date_col, pk_column=self.pk_column, + """.format(columns=','.join(columns), version_col=self.version_col, date_col=date_col, pk_column=self.pk_column, min_date=min_date, max_date=max_date, object_pks=','.join(object_pks)) - qs = connections[db_alias].select_init_many(query, model_cls) - return list(qs) + return connections[db_alias].select_tuples(query, model_cls) - def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col): + def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns): query = """ - SELECT * FROM $table FINAL + SELECT {columns} FROM $table FINAL WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}' AND `{pk_column}` IN ({object_pks}) """ - query = query.format(date_col=date_col, pk_column=self.pk_column, min_date=min_date, + query = query.format(columns=','.join(columns), date_col=date_col, pk_column=self.pk_column, min_date=min_date, max_date=max_date, object_pks=','.join(object_pks)) - qs = connections[db_alias].select_init_many(query, model_cls) - return list(qs) + return connections[db_alias].select_tuples(query, model_cls) def get_final_versions(self, model_cls, objects, date_col=None): + # type: (Type['ClickHouseModel'], Iterable[DjangoModel], str) -> Generator[tuple] """ Get objects, that are currently stored in ClickHouse. Depending on the partition key this can be different for different models. @@ -82,7 +78,8 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre It also supposes primary key to by self.pk_column :param model_cls: ClickHouseModel subclass to import :param objects: Objects for which final versions are searched - :return: A list of model objects + :param date_col: Optional column name, where partiion date is hold. Defaults to self.date_col + :return: A generator of named tuples, representing previous state """ def _dt_to_str(dt): # type: (Union[datetime.date, datetime.datetime]) -> str @@ -94,7 +91,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre raise Exception('Invalid date or datetime object: `%s`' % dt) if not objects: - return [] + raise StopIteration() date_col = date_col or self.date_col min_date, max_date = None, None @@ -114,41 +111,48 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre min_date = _dt_to_str(min_date) max_date = _dt_to_str(max_date) + # Get fields. Sign is replaced to negative for further processing + columns = list(model_cls.fields(writable=True).keys()) + columns.remove(self.sign_col) + columns.append('-1 AS sign') + + params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns) + if self.version_col: - return self._get_final_versions_by_version(db_alias, model_cls, min_date, max_date, object_pks, date_col) + return self._get_final_versions_by_version(*params) else: - return self._get_final_versions_by_final(db_alias, model_cls, min_date, max_date, object_pks, date_col) + return self._get_final_versions_by_final(*params) def get_insert_batch(self, model_cls, objects): - # type: (Type[T], List[DjangoModel]) -> List[T] + # type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple] """ Gets a list of model_cls instances to insert into database :param model_cls: ClickHouseModel subclass to import :param objects: A list of django Model instances to sync :return: A list of model_cls objects """ - new_objs = super(CollapsingMergeTree, self).get_insert_batch(model_cls, objects) + defaults = {self.sign_col: 1} + if self.version_col: + defaults[self.version_col] = 1 + serializer = model_cls.get_django_model_serializer(writable=True, defaults=defaults) + new_objs = [serializer.serialize(obj) for obj in objects] statsd_key = "%s.sync.%s.steps.get_final_versions" % (config.STATSD_PREFIX, model_cls.__name__) with statsd.timer(statsd_key): old_objs = self.get_final_versions(model_cls, new_objs) + # -1 sign has been set get_final_versions() old_objs_versions = {} for obj in old_objs: - self.set_obj_sign(obj, -1) - old_objs_versions[obj.id] = getattr(obj, self.version_col) - - for obj in new_objs: - self.set_obj_sign(obj, 1) - + pk = getattr(obj, self.pk_column) if self.version_col: - setattr(obj, self.version_col, old_objs_versions.get(obj.id, 0) + 1) + old_objs_versions[pk] = getattr(obj, self.version_col) + yield obj - return old_objs + new_objs + # 1 sign is set by default in serializer + for obj in new_objs: + pk = getattr(obj, self.pk_column) + if self.version_col: + obj = obj._replace(**{self.version_col: old_objs_versions.get(pk, 0) + 1}) - def set_obj_sign(self, obj, sign): # type: (InfiModel, int) -> None - """ - Sets objects sign. By default gets attribute name from sign_col - :return: None - """ - setattr(obj, self.sign_col, sign) + yield obj diff --git a/src/django_clickhouse/serializers.py b/src/django_clickhouse/serializers.py index 56ebf98..fc78efc 100644 --- a/src/django_clickhouse/serializers.py +++ b/src/django_clickhouse/serializers.py @@ -1,11 +1,12 @@ -from django.db.models import Model as DjangoModel -from typing import List, Iterable +import datetime +from typing import NamedTuple +from django.db.models import Model as DjangoModel from django_clickhouse.utils import model_to_dict class Django2ClickHouseModelSerializer: - def __init__(self, model_cls, fields=None, exclude_fields=None, writable=False): + def __init__(self, model_cls, fields=None, exclude_fields=None, writable=False, defaults=None): self._model_cls = model_cls if fields is not None: self.serialize_fields = fields @@ -13,6 +14,7 @@ class Django2ClickHouseModelSerializer: self.serialize_fields = model_cls.fields(writable=writable).keys() self.exclude_serialize_fields = exclude_fields + self._result_class = self._model_cls.get_tuple_class(defaults=defaults) def _get_serialize_kwargs(self, obj): data = model_to_dict(obj, fields=self.serialize_fields, exclude_fields=self.exclude_serialize_fields) @@ -29,8 +31,5 @@ class Django2ClickHouseModelSerializer: return result - def serialize(self, obj): # type: (DjangoModel) -> 'ClickHouseModel' - return self._model_cls(**self._get_serialize_kwargs(obj)) - - def serialize_many(self, objs): # type: (Iterable[DjangoModel]) -> Iterable['ClickHouseModel'] - return self._model_cls.init_many((self._get_serialize_kwargs(obj) for obj in objs)) + def serialize(self, obj): # type: (DjangoModel) -> NamedTuple + return self._result_class(**self._get_serialize_kwargs(obj)) diff --git a/src/django_clickhouse/storages.py b/src/django_clickhouse/storages.py index 798c37a..418accd 100644 --- a/src/django_clickhouse/storages.py +++ b/src/django_clickhouse/storages.py @@ -106,7 +106,7 @@ class Storage: This method should be called from inner functions. :param import_key: A key, returned by ClickHouseModel.get_import_key() method :param operation: One of insert, update, delete - :param pk: Primary key to find records in main database. Should be string-serializable with str() method. + :param pks: Primary keys to find records in main database. Should be string-serializable with str() method. :return: None """ if operation not in {'insert', 'update', 'delete'}: diff --git a/tests/kill_test_sub_process.py b/tests/kill_test_sub_process.py index 96d0825..642d638 100644 --- a/tests/kill_test_sub_process.py +++ b/tests/kill_test_sub_process.py @@ -69,6 +69,9 @@ if __name__ == '__main__': parser.add_argument('--once', type=bool, required=False, default=False) params = vars(parser.parse_args()) + # Disable registering not needed models + TestModel._clickhouse_sync_models = {ClickHouseCollapseTestModel} + func_name = params['process'] method = locals()[func_name] method(**params) diff --git a/tests/test_compatibility.py b/tests/test_compatibility.py new file mode 100644 index 0000000..e652ae0 --- /dev/null +++ b/tests/test_compatibility.py @@ -0,0 +1,45 @@ +from unittest import TestCase + +from django_clickhouse.compatibility import namedtuple + + +class NamedTupleTest(TestCase): + def test_defaults(self): + TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults={'c': 3}) + self.assertTupleEqual((1, 2, 3), tuple(TestTuple(1, b=2))) + self.assertTupleEqual((1, 2, 4), tuple(TestTuple(1, 2, 4))) + self.assertTupleEqual((1, 2, 4), tuple(TestTuple(a=1, b=2, c=4))) + + def test_exceptions(self): + TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults={'c': 3}) + + # BUG On python < 3.7 this error is not raised, as not given defaults are filled by None + # with self.assertRaises(TypeError): + # TestTuple(b=1, c=4) + + with self.assertRaises(TypeError): + TestTuple(1, 2, 3, c=4) + + def test_different_defaults(self): + # Test that 2 tuple type defaults don't affect each other + TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults={'c': 3}) + OtherTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults={'c': 4}) + t1 = TestTuple(a=1, b=2) + t2 = OtherTuple(a=3, b=4) + self.assertTupleEqual((1, 2, 3), tuple(t1)) + self.assertTupleEqual((3, 4, 4), tuple(t2)) + + def test_defaults_cache(self): + # Test that 2 tuple instances don't affect each other's defaults + TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults={'c': 3}) + self.assertTupleEqual((1, 2, 4), tuple(TestTuple(a=1, b=2, c=4))) + self.assertTupleEqual((1, 2, 3), tuple(TestTuple(a=1, b=2))) + + def test_equal(self): + TestTuple = namedtuple('TestTuple', ('a', 'b', 'c')) + t1 = TestTuple(1, 2, 3) + t2 = TestTuple(1, 2, 3) + self.assertEqual(t1, t2) + self.assertEqual((1, 2, 3), t1) + + diff --git a/tests/test_engines.py b/tests/test_engines.py index 7fd103f..6852e8f 100644 --- a/tests/test_engines.py +++ b/tests/test_engines.py @@ -1,10 +1,7 @@ -import datetime - -import pytz from django.test import TestCase -from django_clickhouse.migrations import migrate_app from django_clickhouse.database import connections +from django_clickhouse.migrations import migrate_app from tests.clickhouse_models import ClickHouseCollapseTestModel from tests.models import TestModel @@ -16,36 +13,43 @@ class CollapsingMergeTreeTest(TestCase): collapse_fixture = [{ "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": 1, "version": 1 }, { "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": -1, "version": 1 }, { "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": 1, "version": 2 }, { "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": -1, "version": 2 }, { "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": 1, "version": 3 }, { "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": -1, "version": 3 }, { "id": 1, "created": "2018-01-01 00:00:00", + "value": 0, "sign": 1, "version": 4 }] @@ -62,43 +66,38 @@ class CollapsingMergeTreeTest(TestCase): ]) self.objects = TestModel.objects.filter(id=1) + def _test_final_versions(self, final_versions): + final_versions = list(final_versions) + self.assertEqual(1, len(final_versions)) + item = (final_versions[0].id, final_versions[0].sign, final_versions[0].version, final_versions[0].value) + self.assertTupleEqual((1, -1, 4, 0), item) + def test_get_final_versions_by_final_date(self): final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, self.objects) - - self.assertEqual(1, len(final_versions)) - self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0}, - final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version'))) + self._test_final_versions(final_versions) def test_get_final_versions_by_version_date(self): ClickHouseCollapseTestModel.engine.version_col = 'version' final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, self.objects) - - self.assertEqual(1, len(final_versions)) - self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0}, - final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version'))) + self._test_final_versions(final_versions) def test_get_final_versions_by_final_datetime(self): final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, self.objects, date_col='created') - - self.assertEqual(1, len(final_versions)) - self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0}, - final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version'))) + self._test_final_versions(final_versions) def test_get_final_versions_by_version_datetime(self): ClickHouseCollapseTestModel.engine.version_col = 'version' final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel, self.objects, date_col='created') - - self.assertEqual(1, len(final_versions)) - self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0}, - final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version'))) + self._test_final_versions(final_versions) def test_versions(self): ClickHouseCollapseTestModel.engine.version_col = 'version' batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects) + batch = list(batch) self.assertEqual(2, len(batch)) self.assertEqual(4, batch[0].version) self.assertEqual(-1, batch[0].sign) diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 893ff02..84c8526 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -16,7 +16,6 @@ class Django2ClickHouseModelSerializerTest(TestCase): def test_all(self): serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel) res = serializer.serialize(self.obj) - self.assertIsInstance(res, ClickHouseTestModel) self.assertEqual(self.obj.id, res.id) self.assertEqual(self.obj.value, res.value) self.assertEqual(self.obj.created_date, res.created_date) @@ -24,7 +23,6 @@ class Django2ClickHouseModelSerializerTest(TestCase): def test_fields(self): serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel, fields=('value',)) res = serializer.serialize(self.obj) - self.assertIsInstance(res, ClickHouseTestModel) self.assertEqual(0, res.id) self.assertEqual(datetime.date(1970, 1, 1), res.created_date) self.assertEqual(self.obj.value, res.value) @@ -32,7 +30,6 @@ class Django2ClickHouseModelSerializerTest(TestCase): def test_exclude_fields(self): serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel, exclude_fields=('created_date',)) res = serializer.serialize(self.obj) - self.assertIsInstance(res, ClickHouseTestModel) self.assertEqual(datetime.date(1970, 1, 1), res.created_date) self.assertEqual(self.obj.id, res.id) self.assertEqual(self.obj.value, res.value) diff --git a/tests/test_sync.py b/tests/test_sync.py index 231d988..32f9337 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -20,7 +20,7 @@ logger = logging.getLogger('django-clickhouse') class SyncTest(TransactionTestCase): def setUp(self): - self.db = connections['default'] + self.db = ClickHouseCollapseTestModel.get_database() self.db.drop_database() self.db.create_database() migrate_app('tests', 'default') @@ -42,7 +42,7 @@ class SyncTest(TransactionTestCase): obj.save() ClickHouseCollapseTestModel.sync_batch_from_storage() - # sync_batch_from_storage uses FINAL, so data would be collapsed by now + # insert and update came before sync. Only one item will be inserted synced_data = list(ClickHouseCollapseTestModel.objects.all()) self.assertEqual(1, len(synced_data)) self.assertEqual(obj.value, synced_data[0].value) @@ -53,7 +53,7 @@ class SyncTest(TransactionTestCase): ClickHouseCollapseTestModel.sync_batch_from_storage() synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel)) - self.assertGreaterEqual(1, len(synced_data)) + self.assertGreaterEqual(len(synced_data), 1) self.assertEqual(obj.value, synced_data[0].value) self.assertEqual(obj.id, synced_data[0].id) @@ -65,7 +65,7 @@ class SyncTest(TransactionTestCase): obj.save() ClickHouseCollapseTestModel.sync_batch_from_storage() - # sync_batch_from_storage uses FINAL, so data would be collapsed by now + # insert and update came before sync. Only one item will be inserted synced_data = list(ClickHouseCollapseTestModel.objects.all()) self.assertEqual(1, len(synced_data)) self.assertEqual(obj.value, synced_data[0].value) @@ -76,7 +76,7 @@ class SyncTest(TransactionTestCase): ClickHouseCollapseTestModel.sync_batch_from_storage() synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel)) - self.assertGreaterEqual(1, len(synced_data)) + self.assertGreaterEqual(len(synced_data), 1) self.assertEqual(obj.value, synced_data[0].value) self.assertEqual(obj.id, synced_data[0].id) @@ -116,7 +116,7 @@ class SyncTest(TransactionTestCase): ClickHouseMultiTestModel.sync_batch_from_storage() synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel)) - self.assertGreaterEqual(1, len(synced_data)) + self.assertGreaterEqual(len(synced_data), 1) self.assertEqual(obj.value, synced_data[0].value) self.assertEqual(obj.id, synced_data[0].id) @@ -154,9 +154,14 @@ class KillTest(TransactionTestCase): self.sync_iteration(False) sync_left = storage.operations_count(import_key) - ch_data = list(connections['default'].select('SELECT * FROM $table FINAL ORDER BY id', - model_class=ClickHouseCollapseTestModel)) + logger.debug('django_clickhouse: sync finished') + + ch_data = list(connections['default'].select_tuples('SELECT * FROM $table FINAL ORDER BY id', + model_class=ClickHouseCollapseTestModel)) + logger.debug('django_clickhouse: got clickhouse data') + pg_data = list(TestModel.objects.all().order_by('id')) + logger.debug('django_clickhouse: got postgres data') if len(pg_data) != len(ch_data): absent_ids = set(item.id for item in pg_data) - set(item.id for item in ch_data) @@ -165,8 +170,9 @@ class KillTest(TransactionTestCase): min(item.id for item in pg_data), max(item.id for item in pg_data))) self.assertEqual(len(pg_data), len(ch_data)) - serializer = ClickHouseCollapseTestModel.get_django_model_serializer() - self.assertListEqual(ch_data, list(serializer.serialize_many(pg_data))) + for pg_item, ch_item in zip(pg_data, ch_data): + self.assertEqual(ch_item.id, pg_item.id) + self.assertEqual(ch_item.value, pg_item.value) @classmethod def sync_iteration(cls, kill=True): diff --git a/tests/test_utils.py b/tests/test_utils.py index e1fcff2..e0a4667 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,5 +1,7 @@ import datetime +import time from queue import Queue +from time import gmtime, localtime import pytz from django.test import TestCase @@ -12,7 +14,9 @@ from django_clickhouse.utils import get_tz_offset, format_datetime, lazy_class_i class GetTZOffsetTest(TestCase): def test_func(self): - self.assertEqual(300, get_tz_offset()) + ts = time.time() + utc_offset = (datetime.datetime.fromtimestamp(ts) - datetime.datetime.utcfromtimestamp(ts)).total_seconds() + self.assertEqual(utc_offset / 60, get_tz_offset()) class FormatDateTimeTest(TestCase):