Merge pull request #24 from carrotquest/add-sync-process-debug-logs

Added debug logs for sync process
This commit is contained in:
M1ha Shvn 2021-04-01 10:03:42 +05:00 committed by GitHub
commit 95add2165f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 7 deletions

View File

@ -236,7 +236,10 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
with statsd.timer(statsd_key.format('steps.get_operations')): with statsd.timer(statsd_key.format('steps.get_operations')):
operations = storage.get_operations(import_key, cls.get_sync_batch_size()) operations = storage.get_operations(import_key, cls.get_sync_batch_size())
statsd.incr(statsd_key.format('operations'), len(operations))
statsd.incr(statsd_key.format('operations'), len(operations))
logger.debug('django-clickhouse: got %d operations from storage (key: %s)'
% (len(operations), import_key))
if operations: if operations:
with statsd.timer(statsd_key.format('steps.get_sync_objects')): with statsd.timer(statsd_key.format('steps.get_sync_objects')):
@ -245,13 +248,18 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
import_objects = [] import_objects = []
statsd.incr(statsd_key.format('import_objects'), len(import_objects)) statsd.incr(statsd_key.format('import_objects'), len(import_objects))
logger.debug('django-clickhouse: got %d objects to import from database (key: %s)'
% (len(import_objects), import_key))
if import_objects: if import_objects:
with statsd.timer(statsd_key.format('steps.get_insert_batch')): with statsd.timer(statsd_key.format('steps.get_insert_batch')):
# NOTE I don't use generator pattern here, as it move all time into insert. # NOTE I don't use generator pattern here, as it move all time into insert.
# That makes hard to understand where real problem is in monitoring # That makes hard to understand where real problem is in monitoring
batch = tuple(cls.get_insert_batch(import_objects)) batch = tuple(cls.get_insert_batch(import_objects))
logger.debug('django-clickhouse: formed %d ClickHouse objects to insert (key: %s)'
% (len(batch), import_key))
with statsd.timer(statsd_key.format('steps.insert')): with statsd.timer(statsd_key.format('steps.insert')):
cls.insert_batch(batch) cls.insert_batch(batch)
@ -312,7 +320,10 @@ class ClickHouseMultiModel(ClickHouseModel):
with statsd.timer(statsd_key.format('steps.get_operations')): with statsd.timer(statsd_key.format('steps.get_operations')):
operations = storage.get_operations(import_key, cls.get_sync_batch_size()) operations = storage.get_operations(import_key, cls.get_sync_batch_size())
statsd.incr(statsd_key.format('operations'), len(operations))
statsd.incr(statsd_key.format('operations'), len(operations))
logger.debug('django-clickhouse: got %d operations from storage (key: %s)'
% (len(operations), import_key))
if operations: if operations:
with statsd.timer(statsd_key.format('steps.get_sync_objects')): with statsd.timer(statsd_key.format('steps.get_sync_objects')):
@ -321,6 +332,8 @@ class ClickHouseMultiModel(ClickHouseModel):
import_objects = [] import_objects = []
statsd.incr(statsd_key.format('import_objects'), len(import_objects)) statsd.incr(statsd_key.format('import_objects'), len(import_objects))
logger.debug('django-clickhouse: got %d objects to import from database (key: %s)'
% (len(import_objects), import_key))
if import_objects: if import_objects:
batches = {} batches = {}
@ -331,7 +344,10 @@ class ClickHouseMultiModel(ClickHouseModel):
# NOTE I don't use generator pattern here, as it move all time into insert. # NOTE I don't use generator pattern here, as it move all time into insert.
# That makes hard to understand where real problem is in monitoring # That makes hard to understand where real problem is in monitoring
batch = tuple(model_cls.get_insert_batch(import_objects)) batch = tuple(model_cls.get_insert_batch(import_objects))
return model_cls, batch
logger.debug('django-clickhouse: formed %d ClickHouse objects to insert'
' (model_cls: %s, key: %s)' % (len(batch), model_cls.__name__, import_key))
return model_cls, batch
res = exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models)) res = exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models))
batches = dict(res) batches = dict(res)

View File

@ -251,7 +251,7 @@ class RedisStorage(with_metaclass(SingletonMeta, Storage)):
self.post_batch_removed(import_key, batch_size) self.post_batch_removed(import_key, batch_size)
self.get_lock(import_key, **kwargs).release() self.get_lock(import_key, **kwargs).release()
logger.info('django-clickhouse: synced %d items (key: %s)' % (batch_size, import_key)) logger.info('django-clickhouse: removed %d operations from storage (key: %s)' % (batch_size, import_key))
def post_sync_failed(self, import_key, **kwargs): def post_sync_failed(self, import_key, **kwargs):
# unblock lock after sync completed # unblock lock after sync completed

View File

@ -1,5 +1,6 @@
import datetime import datetime
import importlib import importlib
from typing import Type, Union
from celery import shared_task from celery import shared_task
from django.conf import settings from django.conf import settings
@ -11,7 +12,7 @@ from .utils import get_subclasses, lazy_class_import
@shared_task(queue=config.CELERY_QUEUE) @shared_task(queue=config.CELERY_QUEUE)
def sync_clickhouse_model(model_cls) -> None: def sync_clickhouse_model(model_cls: Union[Type[ClickHouseModel], str]) -> None:
""" """
Syncs one batch of given ClickHouseModel Syncs one batch of given ClickHouseModel
:param model_cls: ClickHouseModel subclass or python path to it :param model_cls: ClickHouseModel subclass or python path to it
@ -25,7 +26,7 @@ def sync_clickhouse_model(model_cls) -> None:
@shared_task(queue=config.CELERY_QUEUE) @shared_task(queue=config.CELERY_QUEUE)
def clickhouse_auto_sync(): def clickhouse_auto_sync() -> None:
""" """
Plans syncing models Plans syncing models
:return: None :return: None