From 69c2c5504bd04515ed761172a5d66df58ea5cf82 Mon Sep 17 00:00:00 2001 From: M1ha Date: Mon, 21 Jan 2019 14:32:12 +0500 Subject: [PATCH] MultiModelConverter now processes sub_models in parallel threads --- src/django_clickhouse/clickhouse_models.py | 17 +++++++++++------ src/django_clickhouse/utils.py | 22 +++++++++++----------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/django_clickhouse/clickhouse_models.py b/src/django_clickhouse/clickhouse_models.py index 588b13b..1f5d530 100644 --- a/src/django_clickhouse/clickhouse_models.py +++ b/src/django_clickhouse/clickhouse_models.py @@ -19,7 +19,7 @@ from .exceptions import RedisLockTimeoutError from .models import ClickHouseSyncModel from .query import QuerySet from .serializers import Django2ClickHouseModelSerializer -from .utils import lazy_class_import, exec_multi_db_func +from .utils import lazy_class_import, exec_multi_arg_func, exec_in_parallel class ClickHouseModelMeta(InfiModelBase): @@ -160,7 +160,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): pk_by_db[using].add(pk) # Selecting data from multiple databases should work faster in parallel, if connections are independent. - objs = exec_multi_db_func( + objs = exec_multi_arg_func( lambda db_alias: cls.get_sync_query_set(db_alias, pk_by_db[db_alias]), pk_by_db.keys() ) @@ -281,16 +281,21 @@ class ClickHouseMultiModel(ClickHouseModel): if import_objects: batches = {} with statsd.timer(statsd_key.format('steps.get_insert_batch')): - for model_cls in cls.sub_models: + def _sub_model_func(model_cls): model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__) with statsd.timer(model_statsd_key.format('steps.get_insert_batch')): - batches[model_cls] = model_cls.get_insert_batch(import_objects) + return model_cls, model_cls.get_insert_batch(import_objects) + + res = exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models)) + batches = dict(res) with statsd.timer(statsd_key.format('steps.insert')): - for model_cls, batch in batches.items(): + def _sub_model_func(model_cls): model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__) with statsd.timer(model_statsd_key.format('steps.insert')): - model_cls.insert_batch(batch) + model_cls.insert_batch(batches[model_cls]) + + exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models)) with statsd.timer(statsd_key.format('steps.post_sync')): storage.post_sync(import_key) diff --git a/src/django_clickhouse/utils.py b/src/django_clickhouse/utils.py index 6bb3b9d..931692d 100644 --- a/src/django_clickhouse/utils.py +++ b/src/django_clickhouse/utils.py @@ -238,25 +238,25 @@ def exec_in_parallel(func: Callable, args_queue: Queue, threads_count: Optional[ return results -def exec_multi_db_func(func: Callable, using: Iterable[str], *args, threads_count: Optional[int] = None, - **kwargs) -> List[Any]: +def exec_multi_arg_func(func: Callable, split_args: Iterable[Any], *args, threads_count: Optional[int] = None, + **kwargs) -> List[Any]: """ - Executes multiple databases function in parallel threads. Thread functions (func) receive db alias as first argument + Executes function in parallel threads. Thread functions (func) receive one of split_args as first argument Another arguments passed to functions - args and kwargs - If function uses single shard, separate threads are not run, main thread is used. - :param func: Function to execute on single database - :param using: A list of database aliases to use. + If len(split_args) <= 0, separate threads are not run, main thread is used. + :param func: Function to execute. Must accept split_arg as first parameter + :param split_args: A list of arguments to split threads by :param threads_count: Maximum number of threads to run in parallel :return: A list of execution results. Order of execution is not guaranteed. """ - using = list(using) - if len(using) == 0: + split_args = list(split_args) + if len(split_args) == 0: return [] - elif len(using) == 1: - return [func(using[0], *args, **kwargs)] + elif len(split_args) == 1: + return [func(split_args[0], *args, **kwargs)] else: q = Queue() - for s in using: + for s in split_args: q.put(([s] + list(args), kwargs)) return exec_in_parallel(func, q, threads_count=threads_count)