MultiModelConverter now processes sub_models in parallel threads

This commit is contained in:
M1ha 2019-01-21 14:32:12 +05:00
parent 480a857fe8
commit 69c2c5504b
2 changed files with 22 additions and 17 deletions

View File

@ -19,7 +19,7 @@ from .exceptions import RedisLockTimeoutError
from .models import ClickHouseSyncModel from .models import ClickHouseSyncModel
from .query import QuerySet from .query import QuerySet
from .serializers import Django2ClickHouseModelSerializer from .serializers import Django2ClickHouseModelSerializer
from .utils import lazy_class_import, exec_multi_db_func from .utils import lazy_class_import, exec_multi_arg_func, exec_in_parallel
class ClickHouseModelMeta(InfiModelBase): class ClickHouseModelMeta(InfiModelBase):
@ -160,7 +160,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
pk_by_db[using].add(pk) pk_by_db[using].add(pk)
# Selecting data from multiple databases should work faster in parallel, if connections are independent. # Selecting data from multiple databases should work faster in parallel, if connections are independent.
objs = exec_multi_db_func( objs = exec_multi_arg_func(
lambda db_alias: cls.get_sync_query_set(db_alias, pk_by_db[db_alias]), lambda db_alias: cls.get_sync_query_set(db_alias, pk_by_db[db_alias]),
pk_by_db.keys() pk_by_db.keys()
) )
@ -281,16 +281,21 @@ class ClickHouseMultiModel(ClickHouseModel):
if import_objects: if import_objects:
batches = {} batches = {}
with statsd.timer(statsd_key.format('steps.get_insert_batch')): with statsd.timer(statsd_key.format('steps.get_insert_batch')):
for model_cls in cls.sub_models: def _sub_model_func(model_cls):
model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__) model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__)
with statsd.timer(model_statsd_key.format('steps.get_insert_batch')): with statsd.timer(model_statsd_key.format('steps.get_insert_batch')):
batches[model_cls] = model_cls.get_insert_batch(import_objects) return model_cls, model_cls.get_insert_batch(import_objects)
res = exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models))
batches = dict(res)
with statsd.timer(statsd_key.format('steps.insert')): with statsd.timer(statsd_key.format('steps.insert')):
for model_cls, batch in batches.items(): def _sub_model_func(model_cls):
model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__) model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__)
with statsd.timer(model_statsd_key.format('steps.insert')): with statsd.timer(model_statsd_key.format('steps.insert')):
model_cls.insert_batch(batch) model_cls.insert_batch(batches[model_cls])
exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models))
with statsd.timer(statsd_key.format('steps.post_sync')): with statsd.timer(statsd_key.format('steps.post_sync')):
storage.post_sync(import_key) storage.post_sync(import_key)

View File

@ -238,25 +238,25 @@ def exec_in_parallel(func: Callable, args_queue: Queue, threads_count: Optional[
return results return results
def exec_multi_db_func(func: Callable, using: Iterable[str], *args, threads_count: Optional[int] = None, def exec_multi_arg_func(func: Callable, split_args: Iterable[Any], *args, threads_count: Optional[int] = None,
**kwargs) -> List[Any]: **kwargs) -> List[Any]:
""" """
Executes multiple databases function in parallel threads. Thread functions (func) receive db alias as first argument Executes function in parallel threads. Thread functions (func) receive one of split_args as first argument
Another arguments passed to functions - args and kwargs Another arguments passed to functions - args and kwargs
If function uses single shard, separate threads are not run, main thread is used. If len(split_args) <= 0, separate threads are not run, main thread is used.
:param func: Function to execute on single database :param func: Function to execute. Must accept split_arg as first parameter
:param using: A list of database aliases to use. :param split_args: A list of arguments to split threads by
:param threads_count: Maximum number of threads to run in parallel :param threads_count: Maximum number of threads to run in parallel
:return: A list of execution results. Order of execution is not guaranteed. :return: A list of execution results. Order of execution is not guaranteed.
""" """
using = list(using) split_args = list(split_args)
if len(using) == 0: if len(split_args) == 0:
return [] return []
elif len(using) == 1: elif len(split_args) == 1:
return [func(using[0], *args, **kwargs)] return [func(split_args[0], *args, **kwargs)]
else: else:
q = Queue() q = Queue()
for s in using: for s in split_args:
q.put(([s] + list(args), kwargs)) q.put(([s] + list(args), kwargs))
return exec_in_parallel(func, q, threads_count=threads_count) return exec_in_parallel(func, q, threads_count=threads_count)