Speed up get_final_versions

This commit is contained in:
M1ha 2019-01-24 17:47:52 +05:00
parent cc76166ae1
commit b03b829ed9
5 changed files with 40 additions and 14 deletions

View File

@ -4,7 +4,7 @@ This file defines base abstract models to inherit from
import datetime
from collections import defaultdict
from itertools import chain
from typing import List, Tuple, Iterable, Set, Any, Dict
from typing import List, Tuple, Iterable, Set, Any, Dict, Optional
import logging
import pytz
@ -71,14 +71,16 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
super(ClickHouseModel, self).__init__(**kwargs)
@classmethod
def init_many(cls, kwargs_list): # type: (Iterable[Dict[str, Any]]) -> List['ClickHouseModel']
def init_many(cls, kwargs_list, database=None):
# type: (Iterable[Dict[str, Any]], Optional[Database]) -> Iterable['ClickHouseModel']
"""
Basic __init__ methods if not effective if we need to init 100k objects
:return: A list of inited classes
"""
assert database is None or isinstance(database, Database), "database must be database.Database instance"
# Assign default values
valid_field_names = set(cls._fields.keys())
result = []
for kwargs in kwargs_list:
invalid_fields = set(kwargs.keys()) - valid_field_names
if invalid_fields:
@ -86,6 +88,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
item = cls(__multi_init=True)
item.__dict__.update(cls._defaults)
item._database = database
for name in kwargs:
field = cls._fields[name]
@ -93,9 +96,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
field.validate(kwargs[name])
item.__dict__.update(kwargs)
result.append(item)
return result
yield item
@classmethod
def objects_in(cls, database): # type: (Database) -> QuerySet

View File

@ -1,5 +1,6 @@
from infi.clickhouse_orm.database import Database as InfiDatabase
from statsd.defaults.django import statsd
from infi.clickhouse_orm.utils import parse_tsv
from six import next
from .configuration import config
from .exceptions import DBAliasError
@ -26,6 +27,30 @@ class Database(InfiDatabase):
def _get_applied_migrations(self, migrations_package_name):
raise NotImplementedError("This method is not supported by django_clickhouse.")
def select_init_many(self, query, model_class, settings=None):
"""
Base select doesn't use init_mult which is ineffective on big result lists
"""
query += ' FORMAT TabSeparatedWithNames'
query = self._substitute(query, model_class)
r = self._send(query, settings, True)
lines = r.iter_lines()
field_names = parse_tsv(next(lines))
kwargs_list = []
for line in lines:
# skip blank line left by WITH TOTALS modifier
if line:
values = iter(parse_tsv(line))
kwargs = {}
for name in field_names:
field = getattr(model_class, name)
kwargs[name] = field.to_python(next(values), self.server_timezone)
kwargs_list.append(kwargs)
return model_class.init_many(kwargs_list, database=self)
class ConnectionProxy:
_connections = {}

View File

@ -2,7 +2,7 @@
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
"""
import datetime
from typing import List, TypeVar, Type, Union
from typing import List, TypeVar, Type, Union, Iterable
from django.db.models import Model as DjangoModel
from infi.clickhouse_orm import engines as infi_engines
@ -23,10 +23,10 @@ class InsertOnlyEngineMixin:
Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import
:param objects: A list of django Model instances to sync
:return: A list of model_cls objects
:return: An iterator of model_cls instances
"""
serializer = model_cls.get_django_model_serializer(writable=True)
return serializer.serialize_many(objects)
return list(serializer.serialize_many(objects))
class MergeTree(InsertOnlyEngineMixin, infi_engines.MergeTree):
@ -60,7 +60,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
""".format(version_col=self.version_col, date_col=date_col, pk_column=self.pk_column,
min_date=min_date, max_date=max_date, object_pks=','.join(object_pks))
qs = connections[db_alias].select(query, model_class=model_cls)
qs = connections[db_alias].select_init_many(query, model_cls)
return list(qs)
def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col):
@ -71,7 +71,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
"""
query = query.format(date_col=date_col, pk_column=self.pk_column, min_date=min_date,
max_date=max_date, object_pks=','.join(object_pks))
qs = connections[db_alias].select(query, model_class=model_cls)
qs = connections[db_alias].select_init_many(query, model_cls)
return list(qs)
def get_final_versions(self, model_cls, objects, date_col=None):

View File

@ -32,5 +32,5 @@ class Django2ClickHouseModelSerializer:
def serialize(self, obj): # type: (DjangoModel) -> 'ClickHouseModel'
return self._model_cls(**self._get_serialize_kwargs(obj))
def serialize_many(self, objs): # type: (Iterable[DjangoModel]) -> List['ClickHouseModel']
def serialize_many(self, objs): # type: (Iterable[DjangoModel]) -> Iterable['ClickHouseModel']
return self._model_cls.init_many((self._get_serialize_kwargs(obj) for obj in objs))

View File

@ -166,7 +166,7 @@ class KillTest(TransactionTestCase):
self.assertEqual(len(pg_data), len(ch_data))
serializer = ClickHouseCollapseTestModel.get_django_model_serializer()
self.assertListEqual(ch_data, serializer.serialize_many(pg_data))
self.assertListEqual(ch_data, list(serializer.serialize_many(pg_data)))
@classmethod
def sync_iteration(cls, kill=True):