diff --git a/src/django_clickhouse/clickhouse_models.py b/src/django_clickhouse/clickhouse_models.py index 057a91c..4e0fd0c 100644 --- a/src/django_clickhouse/clickhouse_models.py +++ b/src/django_clickhouse/clickhouse_models.py @@ -9,7 +9,6 @@ from itertools import chain from typing import List, Tuple, Iterable, Set, Any, Optional from django.db.models import Model as DjangoModel, QuerySet as DjangoQuerySet -from infi.clickhouse_orm.database import Database from infi.clickhouse_orm.engines import CollapsingMergeTree from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase from six import with_metaclass @@ -17,7 +16,7 @@ from statsd.defaults.django import statsd from .compatibility import namedtuple from .configuration import config -from .database import connections +from .database import connections, Database from .exceptions import RedisLockTimeoutError from .models import ClickHouseSyncModel from .query import QuerySet @@ -233,7 +232,6 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): if import_objects: with statsd.timer(statsd_key.format('steps.get_insert_batch')): batch = cls.get_insert_batch(import_objects) - # statsd.incr(statsd_key.format('insert_batch'), len(batch)) with statsd.timer(statsd_key.format('steps.insert')): cls.insert_batch(batch) @@ -309,8 +307,6 @@ class ClickHouseMultiModel(ClickHouseModel): model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__) with statsd.timer(model_statsd_key.format('steps.get_insert_batch')): batch = model_cls.get_insert_batch(import_objects) - # statsd.incr(model_statsd_key.format('insert_batch'), len(batch)) - # statsd.incr(statsd_key.format('insert_batch'), len(batch)) return model_cls, batch res = exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models)) diff --git a/src/django_clickhouse/database.py b/src/django_clickhouse/database.py index 979d34e..ff4e8ed 100644 --- a/src/django_clickhouse/database.py +++ b/src/django_clickhouse/database.py @@ -4,6 +4,7 @@ from infi.clickhouse_orm.database import Database as InfiDatabase, DatabaseExcep from infi.clickhouse_orm.utils import parse_tsv from six import next from io import BytesIO +from statsd.defaults.django import statsd from .configuration import config from .exceptions import DBAliasError @@ -83,6 +84,7 @@ class Database(InfiDatabase): fields_list = ','.join('`%s`' % name for name in first_tuple._fields) fields = [f for name, f in model_class.fields(writable=True).items() if name in first_tuple._fields] + statsd_key = "%s.inserted_tuples.%s.{0}" % (config.STATSD_PREFIX, model_class.__name__) def tuple_to_csv(tup): return '\t'.join(field.to_db_string(val, quote=False) for field, val in zip(fields, tup)) + '\n' @@ -102,6 +104,7 @@ class Database(InfiDatabase): lines += 1 if batch_size is not None and lines >= batch_size: # Return the current batch of lines + statsd.incr(statsd_key.format('insert_batch'), lines) yield buf.getvalue() # Start a new batch buf = BytesIO() @@ -109,6 +112,7 @@ class Database(InfiDatabase): # Return any remaining lines in partial batch if lines: + statsd.incr(statsd_key.format('insert_batch'), count=lines) yield buf.getvalue() self._send(gen()) diff --git a/src/django_clickhouse/serializers.py b/src/django_clickhouse/serializers.py index b4aaefe..fc78efc 100644 --- a/src/django_clickhouse/serializers.py +++ b/src/django_clickhouse/serializers.py @@ -1,3 +1,6 @@ +import datetime +from typing import NamedTuple + from django.db.models import Model as DjangoModel from django_clickhouse.utils import model_to_dict @@ -28,5 +31,5 @@ class Django2ClickHouseModelSerializer: return result - def serialize(self, obj): # type: (DjangoModel) -> tuple + def serialize(self, obj): # type: (DjangoModel) -> NamedTuple return self._result_class(**self._get_serialize_kwargs(obj)) diff --git a/src/django_clickhouse/storages.py b/src/django_clickhouse/storages.py index 798c37a..418accd 100644 --- a/src/django_clickhouse/storages.py +++ b/src/django_clickhouse/storages.py @@ -106,7 +106,7 @@ class Storage: This method should be called from inner functions. :param import_key: A key, returned by ClickHouseModel.get_import_key() method :param operation: One of insert, update, delete - :param pk: Primary key to find records in main database. Should be string-serializable with str() method. + :param pks: Primary keys to find records in main database. Should be string-serializable with str() method. :return: None """ if operation not in {'insert', 'update', 'delete'}: diff --git a/tests/kill_test_sub_process.py b/tests/kill_test_sub_process.py index 96d0825..642d638 100644 --- a/tests/kill_test_sub_process.py +++ b/tests/kill_test_sub_process.py @@ -69,6 +69,9 @@ if __name__ == '__main__': parser.add_argument('--once', type=bool, required=False, default=False) params = vars(parser.parse_args()) + # Disable registering not needed models + TestModel._clickhouse_sync_models = {ClickHouseCollapseTestModel} + func_name = params['process'] method = locals()[func_name] method(**params) diff --git a/tests/test_sync.py b/tests/test_sync.py index af2b682..b2184ae 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -170,9 +170,9 @@ class KillTest(TransactionTestCase): min(item.id for item in pg_data), max(item.id for item in pg_data))) self.assertEqual(len(pg_data), len(ch_data)) - serializer = ClickHouseCollapseTestModel.get_django_model_serializer() for pg_item, ch_item in zip(pg_data, ch_data): - self.assertEqual(ch_item, serializer.serialize(pg_item)) + self.assertEqual(ch_item.id, pg_item.id) + self.assertEqual(ch_item.value, pg_item.value) @classmethod def sync_iteration(cls, kill=True):