Added ClickHouseMultiModel

This commit is contained in:
M1ha 2018-11-22 16:22:34 +05:00
parent 6261d69821
commit ea5d9a7dfd

View File

@ -138,6 +138,17 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
""" """
return cls.engine.get_insert_batch(cls, import_objects) return cls.engine.get_insert_batch(cls, import_objects)
@classmethod
def insert_batch(cls, batch):
"""
Inserts batch into database
:param batch:
:return:
"""
if batch:
conn = connections[cls.sync_database_alias]
conn.insert(batch)
@classmethod @classmethod
def sync_batch_from_storage(cls): def sync_batch_from_storage(cls):
""" """
@ -149,34 +160,21 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
storage = cls.get_storage() storage = cls.get_storage()
import_key = cls.get_import_key() import_key = cls.get_import_key()
conn = connections[cls.sync_database_alias]
with statsd.timer(statsd_key.format('pre_sync')): with statsd.timer(statsd_key.format('pre_sync')):
storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout()) storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout())
with statsd.timer(statsd_key.format('get_import_batch')): with statsd.timer(statsd_key.format('get_operations')):
batch = storage.get_import_batch(import_key) operations = storage.get_operations(import_key, cls.get_sync_batch_size())
if batch is None: with statsd.timer(statsd_key.format('get_sync_objects')):
with statsd.timer(statsd_key.format('get_operations')): import_objects = cls.get_sync_objects(operations)
operations = storage.get_operations(import_key, cls.get_sync_batch_size())
with statsd.timer(statsd_key.format('get_sync_objects')): with statsd.timer(statsd_key.format('get_insert_batch')):
import_objects = cls.get_sync_objects(operations) batch = cls.get_insert_batch(import_objects)
with statsd.timer(statsd_key.format('get_insert_batch')): with statsd.timer(statsd_key.format('insert')):
batch = cls.get_insert_batch(import_objects) cls.insert_batch(batch)
if batch:
with statsd.timer(statsd_key.format('write_import_batch')):
storage.write_import_batch(import_key, [obj.to_tsv() for obj in batch])
else:
# Previous import error, retry
statsd.incr(statsd_key.format('restore_existing_batch'))
if batch:
with statsd.timer(statsd_key.format('insert')):
conn.insert(batch)
with statsd.timer(statsd_key.format('post_sync')): with statsd.timer(statsd_key.format('post_sync')):
storage.post_sync(import_key) storage.post_sync(import_key)
@ -198,3 +196,44 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return True return True
return (last_sync_time - datetime.datetime.now()).total_seconds() >= cls.get_sync_delay() return (last_sync_time - datetime.datetime.now()).total_seconds() >= cls.get_sync_delay()
class ClickHouseMultiModel(ClickHouseModel):
"""
This model syncs one django model with multiple ClickHouse sub-models
"""
sub_models = []
@classmethod
def sync_batch_from_storage(cls):
"""
Gets one batch from storage and syncs it.
:return:
"""
statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, cls.__name__)
with statsd.timer(statsd_key.format('total')):
storage = cls.get_storage()
import_key = cls.get_import_key()
with statsd.timer(statsd_key.format('pre_sync')):
storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout())
with statsd.timer(statsd_key.format('get_operations')):
operations = storage.get_operations(import_key, cls.get_sync_batch_size())
with statsd.timer(statsd_key.format('get_sync_objects')):
import_objects = cls.get_sync_objects(operations)
batches = {}
with statsd.timer(statsd_key.format('get_insert_batch')):
for model_cls in cls.sub_models:
batches[model_cls] = cls.get_insert_batch(import_objects)
with statsd.timer(statsd_key.format('insert')):
for model_cls, batch in batches.items():
model_cls.insert_batch(batch)
with statsd.timer(statsd_key.format('post_sync')):
storage.post_sync(import_key)
storage.set_last_sync_time(import_key, now())