Fixed sync test bugs

This commit is contained in:
M1ha 2019-02-03 21:05:01 +05:00
parent 16dce3d047
commit 30b4b0568d
6 changed files with 15 additions and 9 deletions

View File

@ -9,7 +9,6 @@ from itertools import chain
from typing import List, Tuple, Iterable, Set, Any, Optional from typing import List, Tuple, Iterable, Set, Any, Optional
from django.db.models import Model as DjangoModel, QuerySet as DjangoQuerySet from django.db.models import Model as DjangoModel, QuerySet as DjangoQuerySet
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.engines import CollapsingMergeTree from infi.clickhouse_orm.engines import CollapsingMergeTree
from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase
from six import with_metaclass from six import with_metaclass
@ -17,7 +16,7 @@ from statsd.defaults.django import statsd
from .compatibility import namedtuple from .compatibility import namedtuple
from .configuration import config from .configuration import config
from .database import connections from .database import connections, Database
from .exceptions import RedisLockTimeoutError from .exceptions import RedisLockTimeoutError
from .models import ClickHouseSyncModel from .models import ClickHouseSyncModel
from .query import QuerySet from .query import QuerySet
@ -233,7 +232,6 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
if import_objects: if import_objects:
with statsd.timer(statsd_key.format('steps.get_insert_batch')): with statsd.timer(statsd_key.format('steps.get_insert_batch')):
batch = cls.get_insert_batch(import_objects) batch = cls.get_insert_batch(import_objects)
# statsd.incr(statsd_key.format('insert_batch'), len(batch))
with statsd.timer(statsd_key.format('steps.insert')): with statsd.timer(statsd_key.format('steps.insert')):
cls.insert_batch(batch) cls.insert_batch(batch)
@ -309,8 +307,6 @@ class ClickHouseMultiModel(ClickHouseModel):
model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__) model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__)
with statsd.timer(model_statsd_key.format('steps.get_insert_batch')): with statsd.timer(model_statsd_key.format('steps.get_insert_batch')):
batch = model_cls.get_insert_batch(import_objects) batch = model_cls.get_insert_batch(import_objects)
# statsd.incr(model_statsd_key.format('insert_batch'), len(batch))
# statsd.incr(statsd_key.format('insert_batch'), len(batch))
return model_cls, batch return model_cls, batch
res = exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models)) res = exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models))

View File

@ -4,6 +4,7 @@ from infi.clickhouse_orm.database import Database as InfiDatabase, DatabaseExcep
from infi.clickhouse_orm.utils import parse_tsv from infi.clickhouse_orm.utils import parse_tsv
from six import next from six import next
from io import BytesIO from io import BytesIO
from statsd.defaults.django import statsd
from .configuration import config from .configuration import config
from .exceptions import DBAliasError from .exceptions import DBAliasError
@ -83,6 +84,7 @@ class Database(InfiDatabase):
fields_list = ','.join('`%s`' % name for name in first_tuple._fields) fields_list = ','.join('`%s`' % name for name in first_tuple._fields)
fields = [f for name, f in model_class.fields(writable=True).items() if name in first_tuple._fields] fields = [f for name, f in model_class.fields(writable=True).items() if name in first_tuple._fields]
statsd_key = "%s.inserted_tuples.%s.{0}" % (config.STATSD_PREFIX, model_class.__name__)
def tuple_to_csv(tup): def tuple_to_csv(tup):
return '\t'.join(field.to_db_string(val, quote=False) for field, val in zip(fields, tup)) + '\n' return '\t'.join(field.to_db_string(val, quote=False) for field, val in zip(fields, tup)) + '\n'
@ -102,6 +104,7 @@ class Database(InfiDatabase):
lines += 1 lines += 1
if batch_size is not None and lines >= batch_size: if batch_size is not None and lines >= batch_size:
# Return the current batch of lines # Return the current batch of lines
statsd.incr(statsd_key.format('insert_batch'), lines)
yield buf.getvalue() yield buf.getvalue()
# Start a new batch # Start a new batch
buf = BytesIO() buf = BytesIO()
@ -109,6 +112,7 @@ class Database(InfiDatabase):
# Return any remaining lines in partial batch # Return any remaining lines in partial batch
if lines: if lines:
statsd.incr(statsd_key.format('insert_batch'), count=lines)
yield buf.getvalue() yield buf.getvalue()
self._send(gen()) self._send(gen())

View File

@ -1,3 +1,6 @@
import datetime
from typing import NamedTuple
from django.db.models import Model as DjangoModel from django.db.models import Model as DjangoModel
from django_clickhouse.utils import model_to_dict from django_clickhouse.utils import model_to_dict
@ -28,5 +31,5 @@ class Django2ClickHouseModelSerializer:
return result return result
def serialize(self, obj): # type: (DjangoModel) -> tuple def serialize(self, obj): # type: (DjangoModel) -> NamedTuple
return self._result_class(**self._get_serialize_kwargs(obj)) return self._result_class(**self._get_serialize_kwargs(obj))

View File

@ -106,7 +106,7 @@ class Storage:
This method should be called from inner functions. This method should be called from inner functions.
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
:param operation: One of insert, update, delete :param operation: One of insert, update, delete
:param pk: Primary key to find records in main database. Should be string-serializable with str() method. :param pks: Primary keys to find records in main database. Should be string-serializable with str() method.
:return: None :return: None
""" """
if operation not in {'insert', 'update', 'delete'}: if operation not in {'insert', 'update', 'delete'}:

View File

@ -69,6 +69,9 @@ if __name__ == '__main__':
parser.add_argument('--once', type=bool, required=False, default=False) parser.add_argument('--once', type=bool, required=False, default=False)
params = vars(parser.parse_args()) params = vars(parser.parse_args())
# Disable registering not needed models
TestModel._clickhouse_sync_models = {ClickHouseCollapseTestModel}
func_name = params['process'] func_name = params['process']
method = locals()[func_name] method = locals()[func_name]
method(**params) method(**params)

View File

@ -170,9 +170,9 @@ class KillTest(TransactionTestCase):
min(item.id for item in pg_data), max(item.id for item in pg_data))) min(item.id for item in pg_data), max(item.id for item in pg_data)))
self.assertEqual(len(pg_data), len(ch_data)) self.assertEqual(len(pg_data), len(ch_data))
serializer = ClickHouseCollapseTestModel.get_django_model_serializer()
for pg_item, ch_item in zip(pg_data, ch_data): for pg_item, ch_item in zip(pg_data, ch_data):
self.assertEqual(ch_item, serializer.serialize(pg_item)) self.assertEqual(ch_item.id, pg_item.id)
self.assertEqual(ch_item.value, pg_item.value)
@classmethod @classmethod
def sync_iteration(cls, kill=True): def sync_iteration(cls, kill=True):