mirror of
https://github.com/carrotquest/django-clickhouse.git
synced 2024-11-22 09:06:43 +03:00
Speed up of get_sync_objects (parallel execution on multiple databases)
This commit is contained in:
parent
073e002125
commit
96b625ecff
|
@ -19,7 +19,7 @@ from .exceptions import RedisLockTimeoutError
|
||||||
from .models import ClickHouseSyncModel
|
from .models import ClickHouseSyncModel
|
||||||
from .query import QuerySet
|
from .query import QuerySet
|
||||||
from .serializers import Django2ClickHouseModelSerializer
|
from .serializers import Django2ClickHouseModelSerializer
|
||||||
from .utils import lazy_class_import
|
from .utils import lazy_class_import, exec_multi_db_func
|
||||||
|
|
||||||
|
|
||||||
class ClickHouseModelMeta(InfiModelBase):
|
class ClickHouseModelMeta(InfiModelBase):
|
||||||
|
@ -159,8 +159,12 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
|
||||||
using, pk = pk_str.split('.')
|
using, pk = pk_str.split('.')
|
||||||
pk_by_db[using].add(pk)
|
pk_by_db[using].add(pk)
|
||||||
|
|
||||||
objs = chain(*(cls.get_sync_query_set(using, pk_set) for using, pk_set in pk_by_db.items()))
|
# Selecting data from multiple databases should work faster in parallel, if connections are independent.
|
||||||
return list(objs)
|
objs = exec_multi_db_func(
|
||||||
|
lambda db_alias: cls.get_sync_query_set(db_alias, pk_by_db[db_alias]),
|
||||||
|
pk_by_db.keys()
|
||||||
|
)
|
||||||
|
return list(chain(*objs))
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_insert_batch(cls, import_objects): # type: (Iterable[DjangoModel]) -> List[ClickHouseModel]
|
def get_insert_batch(cls, import_objects): # type: (Iterable[DjangoModel]) -> List[ClickHouseModel]
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
import datetime
|
import datetime
|
||||||
|
from queue import Queue, Empty
|
||||||
|
from threading import Thread, Lock
|
||||||
|
|
||||||
import os
|
import os
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
from typing import Union, Any, Optional, TypeVar, Set, Dict, Iterable, Tuple, Iterator
|
from typing import Union, Any, Optional, TypeVar, Set, Dict, Iterable, Tuple, Iterator, Callable, List
|
||||||
|
|
||||||
import pytz
|
import pytz
|
||||||
import six
|
import six
|
||||||
|
@ -158,3 +161,102 @@ def int_ranges(items: Iterable[int]) -> Iterator[Tuple[int, int]]:
|
||||||
raise StopIteration()
|
raise StopIteration()
|
||||||
else:
|
else:
|
||||||
yield interval_start, prev_item
|
yield interval_start, prev_item
|
||||||
|
|
||||||
|
|
||||||
|
class ExceptionThread(Thread):
|
||||||
|
"""
|
||||||
|
Thread objects, which catches thread exceptions and raises them in main thread
|
||||||
|
"""
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(ExceptionThread, self).__init__(*args, **kwargs)
|
||||||
|
self.exc = None
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
try:
|
||||||
|
return super(ExceptionThread, self).run()
|
||||||
|
except Exception as e:
|
||||||
|
self.exc = e
|
||||||
|
|
||||||
|
def join(self, timeout=None):
|
||||||
|
super(ExceptionThread, self).join(timeout=timeout)
|
||||||
|
if self.exc:
|
||||||
|
raise self.exc
|
||||||
|
|
||||||
|
|
||||||
|
def exec_in_parallel(func: Callable, args_queue: Queue, threads_count: Optional[int] = None) -> List[Any]:
|
||||||
|
"""
|
||||||
|
Executes func in multiple threads in parallel
|
||||||
|
Functions are expected to be thread safe. If it needs some locks, func must provide them.
|
||||||
|
:param func: Function to execute in thread
|
||||||
|
:param args_queue: A queue with arguments for separate function call. Each element is tuple of (args, kwargs)
|
||||||
|
:param threads_count: Maximum number of parallel threads tho run
|
||||||
|
:return: A list of results. Order of results is not guaranteed. Element types depends func return type.
|
||||||
|
"""
|
||||||
|
results = []
|
||||||
|
results_lock = Lock()
|
||||||
|
|
||||||
|
# If thread_count is not given, we execute all tasks in parallel.
|
||||||
|
# If queue has less elements than threads_count, take queue size.
|
||||||
|
threads_count = min(args_queue.qsize(), threads_count) if threads_count else args_queue.qsize()
|
||||||
|
|
||||||
|
def _worker():
|
||||||
|
"""
|
||||||
|
Thread worker, gets next arguments from queue and processes them.
|
||||||
|
Results are put into results array using thread safe lock
|
||||||
|
:return: None
|
||||||
|
"""
|
||||||
|
finished = False
|
||||||
|
while not finished:
|
||||||
|
try:
|
||||||
|
# Get arguments
|
||||||
|
args, kwargs = args_queue.get_nowait()
|
||||||
|
|
||||||
|
# Execute function
|
||||||
|
local_res = func(*args, **kwargs)
|
||||||
|
|
||||||
|
# Write result in lock
|
||||||
|
with results_lock:
|
||||||
|
results.append(local_res)
|
||||||
|
|
||||||
|
args_queue.task_done()
|
||||||
|
|
||||||
|
except Empty:
|
||||||
|
# No data in queue, finish worker thread
|
||||||
|
finished = True
|
||||||
|
|
||||||
|
# Run threads
|
||||||
|
threads = []
|
||||||
|
for index in range(threads_count):
|
||||||
|
t = ExceptionThread(target=_worker)
|
||||||
|
threads.append(t)
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
# Wait for threads to finish
|
||||||
|
for t in threads:
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
def exec_multi_db_func(func: Callable, using: Iterable[str], *args, threads_count: Optional[int] = None,
|
||||||
|
**kwargs) -> List[Any]:
|
||||||
|
"""
|
||||||
|
Executes multiple databases function in parallel threads. Thread functions (func) receive db alias as first argument
|
||||||
|
Another arguments passed to functions - args and kwargs
|
||||||
|
If function uses single shard, separate threads are not run, main thread is used.
|
||||||
|
:param func: Function to execute on single database
|
||||||
|
:param using: A list of database aliases to use.
|
||||||
|
:param threads_count: Maximum number of threads to run in parallel
|
||||||
|
:return: A list of execution results. Order of execution is not guaranteed.
|
||||||
|
"""
|
||||||
|
using = list(using)
|
||||||
|
if len(using) == 0:
|
||||||
|
return []
|
||||||
|
elif len(using) == 1:
|
||||||
|
return [func(using[0], *args, **kwargs)]
|
||||||
|
else:
|
||||||
|
q = Queue()
|
||||||
|
for s in using:
|
||||||
|
q.put(([s] + list(args), kwargs))
|
||||||
|
|
||||||
|
return exec_in_parallel(func, q, threads_count=threads_count)
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
import datetime
|
import datetime
|
||||||
|
from queue import Queue
|
||||||
|
|
||||||
import pytz
|
import pytz
|
||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
|
|
||||||
from django_clickhouse.models import ClickHouseSyncModel
|
from django_clickhouse.models import ClickHouseSyncModel
|
||||||
from django_clickhouse.utils import get_tz_offset, format_datetime, lazy_class_import, int_ranges
|
from django_clickhouse.utils import get_tz_offset, format_datetime, lazy_class_import, int_ranges, exec_in_parallel
|
||||||
|
|
||||||
|
|
||||||
class GetTZOffsetTest(TestCase):
|
class GetTZOffsetTest(TestCase):
|
||||||
|
@ -67,3 +68,34 @@ class TestIntRanges(TestCase):
|
||||||
def test_bounds(self):
|
def test_bounds(self):
|
||||||
self.assertListEqual([(1, 1), (5, 6), (10, 10)],
|
self.assertListEqual([(1, 1), (5, 6), (10, 10)],
|
||||||
list(int_ranges([1, 5, 6, 10])))
|
list(int_ranges([1, 5, 6, 10])))
|
||||||
|
|
||||||
|
|
||||||
|
class TestExecInParallel(TestCase):
|
||||||
|
base_classes = []
|
||||||
|
|
||||||
|
def test_exec(self):
|
||||||
|
q = Queue()
|
||||||
|
for i in range(10):
|
||||||
|
q.put(([i], {}))
|
||||||
|
|
||||||
|
res = exec_in_parallel(lambda x: x*x, q, 4)
|
||||||
|
self.assertSetEqual({x * x for x in range(10)}, set(res))
|
||||||
|
|
||||||
|
def test_exec_no_count(self):
|
||||||
|
q = Queue()
|
||||||
|
for i in range(10):
|
||||||
|
q.put(([i], {}))
|
||||||
|
|
||||||
|
res = exec_in_parallel(lambda x: x * x, q)
|
||||||
|
self.assertSetEqual({x * x for x in range(10)}, set(res))
|
||||||
|
|
||||||
|
def test_exception(self):
|
||||||
|
q = Queue()
|
||||||
|
for i in range(10):
|
||||||
|
q.put(([i], {}))
|
||||||
|
|
||||||
|
def _test_func(x):
|
||||||
|
raise TypeError("Exception in thread %d" % x)
|
||||||
|
|
||||||
|
with self.assertRaises(TypeError):
|
||||||
|
exec_in_parallel(_test_func, q)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user