Added extra method in model to get insert batch

This commit is contained in:
M1ha 2018-11-22 12:54:51 +05:00
parent e4b2958895
commit 6261d69821
2 changed files with 16 additions and 9 deletions

View File

@ -4,7 +4,7 @@ This file defines base abstract models to inherit from
import datetime
from collections import defaultdict
from itertools import chain
from typing import List, Tuple
from typing import List, Tuple, Iterable
from django.db.models import Model as DjangoModel
from django.utils.timezone import now
@ -129,6 +129,15 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
))
return list(objs)
@classmethod
def get_insert_batch(cls, import_objects): # type: (Iterable[DjangoModel]) -> List[ClickHouseModel]
"""
Formats django model objects to batch of ClickHouse objects
:param import_objects: DjangoModel objects to import
:return: ClickHouseModel objects to import
"""
return cls.engine.get_insert_batch(cls, import_objects)
@classmethod
def sync_batch_from_storage(cls):
"""
@ -156,7 +165,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
import_objects = cls.get_sync_objects(operations)
with statsd.timer(statsd_key.format('get_insert_batch')):
batch = cls.engine.get_insert_batch(cls, conn, import_objects)
batch = cls.get_insert_batch(import_objects)
if batch:
with statsd.timer(statsd_key.format('write_import_batch')):

View File

@ -17,12 +17,11 @@ T = TypeVar('T')
class InsertOnlyEngineMixin:
def get_insert_batch(self, model_cls, database, objects):
# type: (Type[T], Database, List[DjangoModel]) -> List[T]
def get_insert_batch(self, model_cls, objects):
# type: (Type[T], List[DjangoModel]) -> List[T]
"""
Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import
:param database: infi.clickhouse_orm Database instance to sync data with
:param objects: A list of django Model instances to sync
:return: A list of model_cls objects
"""
@ -75,16 +74,15 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
qs = connections[db].select(query, model_class=model_cls)
return list(qs)
def get_insert_batch(self, model_cls, database, objects):
# type: (Type[T], Database, List[DjangoModel]) -> List[T]
def get_insert_batch(self, model_cls, objects):
# type: (Type[T], List[DjangoModel]) -> List[T]
"""
Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import
:param database: infi.clickhouse_orm Database instance to sync data with
:param objects: A list of django Model instances to sync
:return: A list of model_cls objects
"""
new_objs = super(CollapsingMergeTree, self).get_insert_batch(model_cls, database, objects)
new_objs = super(CollapsingMergeTree, self).get_insert_batch(model_cls, objects)
statsd_key = "%s.sync.%s.get_final_versions" % (config.STATSD_PREFIX, model_cls.__name__)
with statsd.timer(statsd_key):