diff --git a/src/django_clickhouse/clickhouse_models.py b/src/django_clickhouse/clickhouse_models.py index 2b4817b..b33f5e9 100644 --- a/src/django_clickhouse/clickhouse_models.py +++ b/src/django_clickhouse/clickhouse_models.py @@ -4,7 +4,7 @@ This file defines base abstract models to inherit from import datetime from collections import defaultdict from itertools import chain -from typing import List, Tuple, Iterable, Set, Any, Dict +from typing import List, Tuple, Iterable, Set, Any, Dict, Optional import logging import pytz @@ -71,14 +71,16 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): super(ClickHouseModel, self).__init__(**kwargs) @classmethod - def init_many(cls, kwargs_list): # type: (Iterable[Dict[str, Any]]) -> List['ClickHouseModel'] + def init_many(cls, kwargs_list, database=None): + # type: (Iterable[Dict[str, Any]], Optional[Database]) -> Iterable['ClickHouseModel'] """ Basic __init__ methods if not effective if we need to init 100k objects :return: A list of inited classes """ + assert database is None or isinstance(database, Database), "database must be database.Database instance" + # Assign default values valid_field_names = set(cls._fields.keys()) - result = [] for kwargs in kwargs_list: invalid_fields = set(kwargs.keys()) - valid_field_names if invalid_fields: @@ -86,6 +88,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): item = cls(__multi_init=True) item.__dict__.update(cls._defaults) + item._database = database for name in kwargs: field = cls._fields[name] @@ -93,9 +96,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): field.validate(kwargs[name]) item.__dict__.update(kwargs) - result.append(item) - - return result + yield item @classmethod def objects_in(cls, database): # type: (Database) -> QuerySet diff --git a/src/django_clickhouse/database.py b/src/django_clickhouse/database.py index 30c4303..d1b2658 100644 --- a/src/django_clickhouse/database.py +++ b/src/django_clickhouse/database.py @@ -1,5 +1,6 @@ from infi.clickhouse_orm.database import Database as InfiDatabase -from statsd.defaults.django import statsd +from infi.clickhouse_orm.utils import parse_tsv +from six import next from .configuration import config from .exceptions import DBAliasError @@ -26,6 +27,30 @@ class Database(InfiDatabase): def _get_applied_migrations(self, migrations_package_name): raise NotImplementedError("This method is not supported by django_clickhouse.") + def select_init_many(self, query, model_class, settings=None): + """ + Base select doesn't use init_mult which is ineffective on big result lists + """ + query += ' FORMAT TabSeparatedWithNames' + query = self._substitute(query, model_class) + r = self._send(query, settings, True) + lines = r.iter_lines() + field_names = parse_tsv(next(lines)) + + kwargs_list = [] + for line in lines: + # skip blank line left by WITH TOTALS modifier + if line: + values = iter(parse_tsv(line)) + kwargs = {} + for name in field_names: + field = getattr(model_class, name) + kwargs[name] = field.to_python(next(values), self.server_timezone) + + kwargs_list.append(kwargs) + + return model_class.init_many(kwargs_list, database=self) + class ConnectionProxy: _connections = {} diff --git a/src/django_clickhouse/engines.py b/src/django_clickhouse/engines.py index 5dd9d4b..c1f7d78 100644 --- a/src/django_clickhouse/engines.py +++ b/src/django_clickhouse/engines.py @@ -2,7 +2,7 @@ This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse """ import datetime -from typing import List, TypeVar, Type, Union +from typing import List, TypeVar, Type, Union, Iterable from django.db.models import Model as DjangoModel from infi.clickhouse_orm import engines as infi_engines @@ -23,10 +23,10 @@ class InsertOnlyEngineMixin: Gets a list of model_cls instances to insert into database :param model_cls: ClickHouseModel subclass to import :param objects: A list of django Model instances to sync - :return: A list of model_cls objects + :return: An iterator of model_cls instances """ serializer = model_cls.get_django_model_serializer(writable=True) - return serializer.serialize_many(objects) + return list(serializer.serialize_many(objects)) class MergeTree(InsertOnlyEngineMixin, infi_engines.MergeTree): @@ -60,7 +60,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre """.format(version_col=self.version_col, date_col=date_col, pk_column=self.pk_column, min_date=min_date, max_date=max_date, object_pks=','.join(object_pks)) - qs = connections[db_alias].select(query, model_class=model_cls) + qs = connections[db_alias].select_init_many(query, model_cls) return list(qs) def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col): @@ -71,7 +71,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre """ query = query.format(date_col=date_col, pk_column=self.pk_column, min_date=min_date, max_date=max_date, object_pks=','.join(object_pks)) - qs = connections[db_alias].select(query, model_class=model_cls) + qs = connections[db_alias].select_init_many(query, model_cls) return list(qs) def get_final_versions(self, model_cls, objects, date_col=None): diff --git a/src/django_clickhouse/serializers.py b/src/django_clickhouse/serializers.py index 9c08597..56ebf98 100644 --- a/src/django_clickhouse/serializers.py +++ b/src/django_clickhouse/serializers.py @@ -32,5 +32,5 @@ class Django2ClickHouseModelSerializer: def serialize(self, obj): # type: (DjangoModel) -> 'ClickHouseModel' return self._model_cls(**self._get_serialize_kwargs(obj)) - def serialize_many(self, objs): # type: (Iterable[DjangoModel]) -> List['ClickHouseModel'] + def serialize_many(self, objs): # type: (Iterable[DjangoModel]) -> Iterable['ClickHouseModel'] return self._model_cls.init_many((self._get_serialize_kwargs(obj) for obj in objs)) diff --git a/tests/test_sync.py b/tests/test_sync.py index 05a17ad..c24276a 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -166,7 +166,7 @@ class KillTest(TransactionTestCase): self.assertEqual(len(pg_data), len(ch_data)) serializer = ClickHouseCollapseTestModel.get_django_model_serializer() - self.assertListEqual(ch_data, serializer.serialize_many(pg_data)) + self.assertListEqual(ch_data, list(serializer.serialize_many(pg_data))) @classmethod def sync_iteration(cls, kill=True):