From e0fe50d53478ae4d1cf714f36a76e67e96b92af3 Mon Sep 17 00:00:00 2001 From: M1ha Date: Fri, 16 Nov 2018 17:36:04 +0500 Subject: [PATCH] =?UTF-8?q?=D0=A0=D0=B0=D1=81=D1=81=D1=82=D0=B0=D0=B2?= =?UTF-8?q?=D0=B8=D0=BB=20=D0=BC=D0=B5=D1=82=D1=80=D0=B8=D0=BA=D0=B8=20sta?= =?UTF-8?q?tsd?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/django_clickhouse/clickhouse_models.py | 947 +-------------------- src/django_clickhouse/configuration.py | 1 + src/django_clickhouse/queryset.py | 105 +++ src/django_clickhouse/storages.py | 28 +- src/django_clickhouse/tasks.py | 178 +--- src/django_clickhouse/utils.py | 22 +- 6 files changed, 200 insertions(+), 1081 deletions(-) create mode 100644 src/django_clickhouse/queryset.py diff --git a/src/django_clickhouse/clickhouse_models.py b/src/django_clickhouse/clickhouse_models.py index 33fef2e..a48ae3c 100644 --- a/src/django_clickhouse/clickhouse_models.py +++ b/src/django_clickhouse/clickhouse_models.py @@ -10,6 +10,7 @@ from django.db.models import Model as DjangoModel from infi.clickhouse_orm.database import Database from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase from six import with_metaclass +from statsd.defaults.django import statsd from .configuration import config from .database import connections, DEFAULT_DB_ALIAS @@ -41,6 +42,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): read_db_aliases = (DEFAULT_DB_ALIAS,) write_db_aliases = (DEFAULT_DB_ALIAS,) + sync_enabled = False sync_batch_size = None sync_storage = None sync_delay = None @@ -123,153 +125,47 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): Gets one batch from storage and syncs it. :return: """ - storage = cls.get_storage() - import_key = cls.get_import_key() - conn = connections[cls.sync_database_alias] + statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, cls.__name__) + with statsd.timer(statsd_key.format('total')): - storage.pre_sync(import_key) - batch = storage.get_import_batch(import_key) + storage = cls.get_storage() + import_key = cls.get_import_key() + conn = connections[cls.sync_database_alias] - if batch is None: - operations = storage.get_operations(import_key, cls.get_sync_batch_size()) - import_objects = cls.get_sync_objects(operations) + with statsd.timer(statsd_key.format('pre_sync')): + storage.pre_sync(import_key) - batch = cls.engine.get_insert_batch(cls, conn, import_objects) + with statsd.timer(statsd_key.format('get_import_batch')): + batch = storage.get_import_batch(import_key) + + if batch is None: + with statsd.timer(statsd_key.format('get_operations')): + operations = storage.get_operations(import_key, cls.get_sync_batch_size()) + + with statsd.timer(statsd_key.format('get_sync_objects')): + import_objects = cls.get_sync_objects(operations) + + with statsd.timer(statsd_key.format('get_insert_batch')): + batch = cls.engine.get_insert_batch(cls, conn, import_objects) + + if batch: + with statsd.timer(statsd_key.format('write_import_batch')): + storage.write_import_batch(import_key, [obj.to_tsv() for obj in batch]) + else: + # Previous import error, retry + statsd.incr(statsd_key.format('restore_existing_batch')) if batch: - storage.write_import_batch(import_key, [obj.to_tsv() for obj in batch]) - else: - pass # Previous import error, retry + with statsd.timer(statsd_key.format('insert')): + conn.insert(batch) - if batch: - conn.insert(batch) - - storage.post_sync(import_key) + with statsd.timer(statsd_key.format('post_sync')): + storage.post_sync(import_key) # class ClickHouseModelConverter: # """ # Абстрактный класс, описывающий процесс конвертации модели django в модель ClickHouse и обратно. -# """ -# # Ключ REDIS, по которому синхронизируются модели -# # Содержит словарь, где ключи = module_name:ModelConverterName, -# # а значения - last_recalc_timestamp -# #REDIS_SYNC_KEY = 'clickhouse_models_sync' -# #sync_batch_size = settings.CLICKHOUSE_INSERT_SIZE -# -# # Если этот атрибут установлен, это должно быть вермя в секундах - частота, -# # с которой будет вызываться синхронизация модели в celery задаче -# #auto_sync = None -# #sync_type = settings.CLICKHOUSE_DEFAULT_SYNC_TYPE -# -# # Возможность создавать таблицы не в миграциях, а по необходимости перед импортом -# #auto_create_tables = False -# -# # django_model = None -# -# @classmethod -# def _validate_cls_attributes(cls): -# assert inspect.isclass(cls.django_model), \ -# "django_model static attribute must be django.db.model subclass" -# assert issubclass(cls.django_model, django_models.Model), \ -# "django_model static attribute must be django.db.model subclass" -# assert isinstance(cls.sync_batch_size, int) and cls.sync_batch_size > 0,\ -# "sync_batch_size must be positive integer" -# assert cls.auto_sync is None or isinstance(cls.auto_sync, int) and cls.auto_sync > 0, \ -# "auto_sync must be positive integer, if set" -# assert cls.sync_type in {'redis', 'postgres'}, "sync_type must be one of [redis, postgres]" -# -# @classmethod -# def _validate_django_model_instance(cls, obj): -# cls._validate_cls_attributes() -# assert isinstance(obj, cls.django_model), \ -# "obj must be instance of {0}".format(cls.django_model.__name__) -# -# @classmethod -# @transaction.atomic -# def import_data(cls, using: Optional[str] = None, inserted_items=None, updated_items=None)-> int: -# """ -# Это сервисный метод, который синхронизирует данный Converter c ClickHouse при включенной авто-синхронизации -# :param using: Данная функция получает id только для одной БД. -# :param inserted_items: Элементы, которые были вставлены в БД -# :param updated_items: Элементы, которые были обновлены. -# Оба параметра должны быть задан или не заданы. -# Если не заданы будут получены через get_sync_items() -# Если заданы, то необходимо указать корректный using. -# :return: Количество импортированных элементов -# """ -# def _shard_func(shard): -# return cls.import_sync_items(*cls.get_sync_items(using=shard), using=shard) -# -# cls._validate_cls_attributes() -# assert inserted_items is None and updated_items is None \ -# or inserted_items is not None and updated_items is not None, \ -# "You must specify both inserted_items and updated_items or none of them" -# -# from utils.sharding.models import BaseShardedManager -# if inserted_items is not None: -# return cls.import_sync_items(inserted_items, updated_items, using=using) -# elif isinstance(cls.django_model.objects, BaseShardedManager): -# res = exec_all_shards_func(_shard_func) -# return sum(res) -# else: -# return _shard_func(using) -# -# @classmethod -# def import_sync_items(cls, inserted_items, updated_items, using: Optional[str] = None) -> int: -# """ -# Это сервисный метод, который загружает данные в ClickHouse. Не должен делать запросов к БД. -# sync_items уже получены ранее. -# :param inserted_items: Элементы, которые были вставлены в БД -# :param updated_items: Элементы, которые были обновлены -# :param using: Данная функция получает id только для одной БД. -# :return: Количество импортированных элементов -# """ -# # Эта процедура игнорирует update-ы и delete-ы. Используйте для этого другие модели -# -# if len(inserted_items) > 0: -# res = import_data_from_queryset(cls, inserted_items, statsd_key_prefix='upload_clickhouse.' + cls.__name__, -# batch_size=cls.sync_batch_size, no_qs_validation=True, -# create_table_if_not_exist=cls.auto_create_tables) -# statsd.incr('clickhouse.{0}.inserts'.format(cls.__name__), res) -# else: -# # Если данный вариант возникает слишком часто, это означает, что время auto_sync слишком мелнькое -# # И вызывать синхронизацию так часто не имеет смысла. -# statsd.incr('clickhouse.{0}.empty_inserts'.format(cls.__name__)) -# res = 0 -# -# return res -# -# @classmethod -# def get_sync_items(cls): -# cls._validate_cls_attributes() -# -# if cls.sync_type == 'postgres': -# insert_ids, update_ids, delete_ids = cls._get_sync_ids_postgres(database=using) -# else: # if cls.sync_type == 'redis' -# insert_ids, update_ids, delete_ids = cls._get_sync_ids_redis(database=using) -# -# if update_ids: -# # Операция UPDATE недоступна для обычной модели. Поэтому Если произошло обновление данных в БД, -# # мы должны залогировать WARNING и проигнорировать запись -# statsd.incr('clickhouse.{0}.invalid_operation'.format(cls.__name__)) -# log('clickhouse_import_data_warning', 'update_operation on insert only model', data={ -# 'model': cls.__name__, -# 'model_ids': list(update_ids) -# }) -# -# if len(insert_ids) > 0: -# qs = cls.django_model.objects.filter(pk__in=insert_ids).using(using).nocache() -# if select_related: -# qs = qs.select_related(*select_related) -# if prefetch_related: -# qs = qs.prefetch_related(*prefetch_related) -# -# insert_items = list(qs) -# else: -# insert_items = [] -# -# return insert_items, [] # # @classmethod # def start_sync(cls): @@ -304,782 +200,3 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): # return result # # -# class ClickHouseModel(InfiModel): -# -# @classmethod -# def _validate_cls_attributes(cls): -# super()._validate_cls_attributes() -# assert inspect.isclass(cls.django_model), \ -# "django_model static attribute must be ClickHouseDjangoModel subclass" -# assert issubclass(cls.django_model, ClickHouseDjangoModel), \ -# "django_model static attribute must be ClickHouseDjangoModel subclass" -# -# assert type(cls.clickhouse_pk_field) is str, "pk_field attribute must be string" -# -# @classmethod -# def form_query(cls, select_items: Union[str, Set[str], List[str], Tuple[str]], table: Optional[str] = None, -# final: bool = False, date_filter_field: str = '', date_in_prewhere: bool = True, -# prewhere: Union[str, Set[str], List[str], Tuple[str]] = '', -# where: Union[str, Set[str], List[str], Tuple[str]] = '', -# group_fields: Union[str, Set[str], List[str], Tuple[str]] = '', group_with_totals: bool = False, -# order_by: Union[str, Set[str], List[str], Tuple[str]] = '', -# limit: Optional[int] = None, prewhere_app: bool = True): -# """ -# Формирует запрос к данной таблице -# :param select_items: строка или массив строк, которые надо выбрать в запросе -# :param table: Таблица данного класса по-умолчанию -# :param final: Позволяет выбрать из CollapsingMergeTree только последнюю версию записи. -# :param date_filter_field: Поле, которое надо отфильтровать от start_date до date_end, если задано -# :param date_in_prewhere: Если флаг указан и задано поле date_filter_field, -# то условие будет помещено в секцию PREWHERE, иначе - в WHERE -# :param prewhere: Условие, которое добавляется в prewhere -# :param where: Условие, которое добавляется в where -# :param group_fields: Поля, по которым будет производится группировка -# :param group_with_totals: Позволяет добавить к группировке модификатор with_totals -# :param order_by: Поле или массив полей, по которым сортируется результат -# :param limit: Лимит на количество записей -# :param prewhere_app: Автоматически добавляет в prewhere фильтр по app_id -# :return: Запрос, в пределах приложения -# """ -# assert isinstance(select_items, (str, list, tuple, set)), "select_items must be string, list, tuple or set" -# assert table is None or isinstance(table, str), "table must be string or None" -# assert isinstance(final, bool), "final must be boolean" -# assert isinstance(date_filter_field, str), "date_filter_field must be string" -# assert isinstance(date_in_prewhere, bool), "date_in_prewhere must be boolean" -# assert isinstance(prewhere, (str, list, tuple, set)), "prewhere must be string, list, tuple or set" -# assert isinstance(where, (str, list, tuple, set)), "where must be string, list, tuple or set" -# assert isinstance(group_fields, (str, list, tuple, set)), "group_fields must be string, list, tuple or set" -# assert isinstance(group_with_totals, bool), "group_with_totals must be boolean" -# assert isinstance(order_by, (str, list, tuple, set)), "group_fields must be string, list, tuple or set" -# assert limit is None or isinstance(limit, int) and limit > 0, "limit must be None or positive integer" -# assert isinstance(prewhere_app, bool), "prewhere_app must be boolean" -# -# table = table or '$db.`{0}`'.format(cls.table_name()) -# final = 'FINAL' if final else '' -# -# if prewhere: -# if not isinstance(prewhere, str): -# prewhere = '(%s)' % ') AND ('.join(prewhere) -# -# if prewhere_app: -# prewhere = '`app_id`={app_id} AND (' + prewhere + ')' if prewhere else '`app_id`={app_id}' -# -# if prewhere: -# prewhere = 'PREWHERE ' + prewhere -# -# if where: -# if not isinstance(where, str): -# where = ' AND '.join(where) -# where = 'WHERE ' + where -# -# if not isinstance(select_items, str): -# # Исключим пустые строки -# select_items = [item for item in select_items if item] -# select_items = ', '.join(select_items) -# -# if group_fields: -# if not isinstance(group_fields, str): -# group_fields = ', '.join(group_fields) -# -# group_fields = 'GROUP BY %s' % group_fields -# -# if group_with_totals: -# group_fields += ' WITH TOTALS' -# -# if order_by: -# if not isinstance(order_by, str): -# order_by = ', '.join(order_by) -# -# order_by = 'ORDER BY ' + order_by -# -# if date_filter_field: -# cond = "`%s` >= '{start_date}' AND `%s` < '{end_date}'" % (date_filter_field, date_filter_field) -# if date_in_prewhere: -# prewhere += ' AND ' + cond -# elif where: -# where += ' AND ' + cond -# else: -# where = 'WHERE ' + cond -# -# limit = "LIMIT {0}".format(limit) if limit else '' -# -# query = ''' -# SELECT %s -# FROM %s %s -# -# %s -# %s -# %s %s %s -# ''' % (select_items, table, final, prewhere, where, group_fields, order_by, limit) -# -# # Моя спец функция, сокращающая запись проверки даты на Null в запросах. -# # В скобках не может быть других скобок -# # (иначе надо делать сложную проверку скобочной последовательности, решил пока не заморачиваться) -# # Фактически, функция приводит выражение в скобках к timestamp и смотрит, что оно больше 0 -# query = re.sub(r'\$dateIsNotNull\s*(\([^()]*\))', r'toUInt64(toDateTime(\1)) > 0', query) -# -# return re.sub(r'\s+', ' ', query.strip()) -# -# @classmethod -# def init_insert(cls, model_ids, database=None): -# """ -# Ининциирует обновление данных для объектов с укзаанными id-шниками -# :param model_ids: Список id моделей, которые надо обновлять -# :param database: Данная функция получает id только для одной БД. -# :return: None -# """ -# assert isinstance(model_ids, Iterable), "model_ids must be iterable" -# -# if len(model_ids) > 0: -# if cls.sync_type == 'redis': -# cls.django_model.register_clickhouse_operations('INSERT', *model_ids, database=(database or 'default')) -# else: # if self.sync_type == 'postgres' -# from utils.models import ClickHouseModelOperation -# ClickHouseModelOperation.objects.bulk_update_or_create([ -# {'table': cls.django_model._meta.db_table, -# 'model_id': int(x), -# 'operation': 'INSERT', -# 'database': (database or 'default') -# } for x in model_ids -# ], update=True, key_fields=('table', 'model_id')) -# -# @classmethod -# def recheck(cls, qs: DjangoBaseQuerySet) -> int: -# """ -# В ходе импорта могут возникнуть ошибки - какие-то записи могут не быть вставлены -# Данная функция проверяет правильность импорта данных между указанными id -# :param qs: QuerySet с данными, которые надо проверить -# :return: Количество некорректных записей -# """ -# cls._validate_cls_attributes() -# assert isinstance(qs, DjangoBaseQuerySet), "qs must be a QuerySet" -# assert qs.model == cls.django_model, "qs model must be equal to current django_model" -# -# item_ids = {item.pk for item in qs} -# -# query = cls.form_query('id', where='id IN (%s)' % ','.join([str(idx) for idx in item_ids]), prewhere_app=False) -# ch_ids = {item.id for item in settings.CLICKHOUSE_DB.select(query, model_class=cls)} -# -# cls.init_insert(list(item_ids - ch_ids), qs.db) -# -# return len(item_ids - ch_ids) -# -# def _prepare_val_for_eq(self, field_name, field, val): -# if isinstance(val, datetime.datetime): -# return val.replace(microsecond=0) -# elif field_name == '_version': -# return True # Независимо от версии должно сравнение быть истиной -# return val -# -# def __eq__(self, other): -# if other.__class__ != self.__class__: -# return False -# -# for name, f in self.fields().items(): -# val_1 = getattr(self, name, None) -# val_2 = getattr(other, name, None) -# if self._prepare_val_for_eq(name, f, val_1) != self._prepare_val_for_eq(name, f, val_2): -# return False -# -# return True -# -# -# class ClickHouseMultiModelConverter(ClickHouseModelConverter): -# """ -# Поскольку в ClickHouse нельзя изменять данные, нередко одна django-модель будет разделяться -# на несколько моделей в ClickHouse. Этот класс упрощает преобразование между моделями. -# Например, его можно использовать в функции import_data, чтобы не выбирать исходную модель несколько раз -# """ -# ch_models = None -# -# def __init__(self, *ch_objects): -# """ -# Инициирует multi-модель из объектов моделей. -# Порядок ch_objects должен соответствовать порядку ch_models -# :param ch_objects: Набор объектов моделей ClickHouse -# """ -# self._validate_ch_objects(ch_objects) -# self._ch_objects = ch_objects -# -# @classmethod -# def _validate_cls_attributes(cls): -# super()._validate_cls_attributes() -# assert isinstance(cls.ch_models, (tuple, list)), "ch_models must be tuple or list instance" -# assert len(cls.ch_models) > 0, "ch_models can't be empty" -# for m in cls.ch_models: -# assert issubclass(m, ClickHouseModelConverter), \ -# "ch_models must be a list of ClickHouseModelConverter subclasses" -# -# @classmethod -# def _validate_ch_objects(cls, ch_objects): -# cls._validate_cls_attributes() -# for index, obj in enumerate(ch_objects): -# assert isinstance(obj, tuple(cls.ch_models)), "Parameter {0} must be instance of [{1}]". \ -# format(index + 1, ','.join(klass.__name__ for klass in cls.ch_models)) -# -# @classmethod -# def from_django_model(cls, obj): -# """ -# Создает объекты модели ClickHouse из модели django -# При переопределении метода желательно проверить аргументы, вызвав: -# cls._validate_django_model_instance(obj) -# :param obj: Объект модели django -# :return: Список объектов моделей ClickHouse в том порядке, в каком они указаны в ch_models -# """ -# cls._validate_django_model_instance(obj) -# cls._validate_cls_attributes() -# -# res = [] -# for m in cls.ch_models: -# item = m.from_django_model(obj) -# if isinstance(item, list): -# res.extend(item) -# else: -# res.append(item) -# return res -# -# def to_django_model(self, obj=None): -# # Валидируем входящие данные -# if obj is not None: -# self._validate_django_model_instance(obj) -# else: -# self._validate_cls_attributes() -# obj = self.django_model() -# -# # Обновление по всем моделям -# for model_obj in self._ch_objects: -# model_obj.to_django_model(obj) -# -# return obj -# -# @classmethod -# def import_sync_items(cls, inserted_items, updated_items, using: Optional[str] = None) -> int: -# return sum([ -# m.import_sync_items(inserted_items=inserted_items, updated_items=updated_items, using=using) -# for m in cls.ch_models -# ]) -# -# @classmethod -# def get_sync_items(cls, using=None, select_related=None, prefetch_related=None): -# """ -# Формирует 2 списка объектов модели для вставки и обновления данных. -# :param using: Данная функция получает id только для одной БД. -# :param select_related: Позволяет выбрать данные через QuerySet.select_related() -# :param prefetch_related: Позволяет выбрать данные через QuerySet.prefetch_related() -# :return: Insert items, Update items -# """ -# cls._validate_cls_attributes() -# -# if cls.sync_type == 'postgres': -# insert_ids, update_ids, delete_ids = cls._get_sync_ids_postgres(database=using) -# else: # if cls.sync_type == 'redis' -# insert_ids, update_ids, delete_ids = cls._get_sync_ids_redis(database=using) -# -# # Выберем одним запросом из БД, а затем преераспределим -# ids_set = insert_ids | update_ids | delete_ids -# if len(ids_set) > 0: -# qs = cls.django_model.objects.filter(pk__in=ids_set).using(using).nocache() -# if select_related: -# qs = qs.select_related(*select_related) -# if prefetch_related: -# qs = qs.prefetch_related(*prefetch_related) -# -# insert_items, update_items, delete_items = [], [], [] -# for pk, instance in qs.models_dict().items(): -# if pk in insert_ids: -# insert_items.append(instance) -# else: # if pk in update_ids: -# # Вставка данных приоритетна перед update-ом -# update_items.append(instance) -# else: -# insert_items, update_items = [], [] -# -# return insert_items, update_items -# -# -# class ClickHouseCollapseModel(ClickHouseModel): -# """ -# Добавлеяет операции для таблиц с engine CollapsingMergeTree -# https://clickhouse.yandex/reference_ru.html#CollapsingMergeTree -# """ -# # Это поле в ClickHouse, в котором хранится знак строки для подсчета -# clickhouse_sign_field = "sign" -# -# # Это номер версии вставленной записи. Добавляется автоматически. -# # Служит для определения последней вставленной записи с одинаковым первичным ключом -# _version = fields.UInt32Field(default=0) -# -# @classmethod -# def _validate_cls_attributes(cls): -# super()._validate_cls_attributes() -# assert inspect.isclass(cls.django_model), \ -# "django_model static attribute must be ClickHouseDjangoModel subclass" -# assert issubclass(cls.django_model, ClickHouseDjangoModel), \ -# "django_model static attribute must be ClickHouseDjangoModel subclass" -# assert type(cls.clickhouse_sign_field) is str, "sign_field attribute must be string" -# -# @classmethod -# def get_sync_items(cls, using=None, select_related=None, prefetch_related=None): -# """ -# Формирует 2 списка объектов модели для вставки и обновления данных. -# :param using: Данная функция получает id только для одной БД. -# :param select_related: Позволяет выбрать данные через QuerySet.select_related() -# :param prefetch_related: Позволяет выбрать данные через QuerySet.prefetch_related() -# :return: Insert items, Update items -# """ -# cls._validate_cls_attributes() -# -# if cls.sync_type == 'postgres': -# insert_ids, update_ids, delete_ids = cls._get_sync_ids_postgres(database=using) -# else: # if cls.sync_type == 'redis' -# insert_ids, update_ids, delete_ids = cls._get_sync_ids_redis(database=using) -# -# # Выберем одним запросом из БД, а затем преераспределим -# ids_set = insert_ids | update_ids | delete_ids -# if len(ids_set) > 0: -# qs = cls.django_model.objects.filter(pk__in=ids_set).using(using).nocache() -# if select_related: -# qs = qs.select_related(*select_related) -# if prefetch_related: -# qs = qs.prefetch_related(*prefetch_related) -# -# insert_items, update_items, delete_items = [], [], [] -# for pk, instance in qs.models_dict().items(): -# if pk in insert_ids: -# insert_items.append(instance) -# else: # if pk in update_ids: -# # Вставка данных приоритетна перед update-ом -# update_items.append(instance) -# else: -# insert_items, update_items = [], [] -# -# return insert_items, update_items -# -# @classmethod -# def import_sync_items(cls, inserted_items, updated_items, using: Optional[str] = None): -# if len(inserted_items) > 0 or len(updated_items) > 0: -# # Эта операция удалит данные из ClickHouse, но не вставит обновленные данные -# update_list = cls.update(updated_items, fake_insert=True) -# -# # Посчитаем, сколько еще можем сделать INSERT-ов -# updated_count = len(update_list) -# # Умножим update_count на 2, поскольку мы уже вставили записи с -1 -# insert_count = cls.sync_batch_size - updated_count * 2 -# -# # Импорт новых моделей -# insert_data = inserted_items[:insert_count] -# update_list.extend(insert_data) -# -# # Возвращаем в БД невсатвленные данные -# restore_model_ids = {item.pk for item in inserted_items[insert_count:]} -# cls.init_insert(restore_model_ids, database=using) -# -# # Вставляем данные -# res = import_data_from_queryset(cls, update_list, statsd_key_prefix='upload_clickhouse.' + cls.__name__, -# batch_size=cls.sync_batch_size, no_qs_validation=True, -# create_table_if_not_exist=cls.auto_create_tables) -# -# # Отмечаем в графане -# statsd.incr('clickhouse.{0}.inserts'.format(cls.__name__), len(insert_data)) -# statsd.incr('clickhouse.{0}.updates'.format(cls.__name__), updated_count) -# -# res += updated_count -# else: -# # Если данный вариант возникает слишком часто, это означает, что время auto_sync слишком мелнькое -# # И вызывать синхронизацию так часто не имеет смысла. -# statsd.incr('clickhouse.{0}.empty_inserts'.format(cls.__name__)) -# res = 0 -# -# return res -# -# @classmethod -# @transaction.atomic -# def delete(cls, django_qs): -# """ -# Удаляет в ClickHouse все записи из указанного QuerySet -# :param django_qs: iterable объектов моделей, которые будут удаляться -# :return: Последние версии удаленных данных (словарь id: version) -# """ -# assert isinstance(django_qs, Iterable), "django_qs must be iterable" -# upd_list = list(django_qs) -# -# if len(upd_list) == 0: -# return {} -# -# # Если используется ClickHousePartitionedModel, удаление должно производится из нужных таблиц, а не общей. -# # В этом случае в запросе max_query движком Merge будет выбран столбец _table, содержащий имя таблицы. -# # Запрос на удаление из каждой таблицы мы будем делать по отдельности. -# is_merge_table = issubclass(cls, ClickHousePartitionedModel) -# -# # Получаем данные для вставки -# field_list = cls._get_model_fields() -# field_str = ','.join(field_list) -# -# params = { -# 'table': cls.table_name(), -# 'fields_with_sign': field_str + ',' + cls.clickhouse_sign_field, -# 'pk_values': ','.join(str(upd.pk) for upd in upd_list), -# 'fields': field_str, -# 'sign_field': cls.clickhouse_sign_field, -# 'pk_field': cls.clickhouse_pk_field, -# 'merge_table': ', _table' if is_merge_table else '' -# } -# -# # Запрос на получение последних актуальных версий данных, которые надо зачистить. -# # _version содержит удаляемую версию данных -# # Если модель наследует ClickHousePartitionedModel, запрос сделаем к Merge-таблице, получим столбец _table. -# max_query = "SELECT {pk_field} AS pk, MAX(_version) AS _version{merge_table} FROM $db.{table} " \ -# "WHERE {pk_field} IN ({pk_values}) GROUP BY {pk_field}{merge_table}".format(**params) -# -# t1 = time.time() -# max_ids_list = list(settings.CLICKHOUSE_DB.select(max_query)) -# t2 = time.time() -# statsd.timing('clickhouse.{0}.get_max_ids_list'.format(cls.__name__), t2 - t1) -# -# if len(max_ids_list): -# # Разделяем данные по таблицам, из которых реально удаляются данные -# if is_merge_table: -# max_version_ids_by_table = defaultdict(list) -# for item in max_ids_list: -# max_version_ids_by_table[item._table].append("({0}, {1})".format(item.pk, item._version)) -# else: -# # Тут всегда одна таблица -# max_version_ids_by_table = {cls.table_name(): ["({0}, {1})".format(item.pk, item._version) -# for item in max_ids_list]} -# -# for tbl, ids_list in max_version_ids_by_table.items(): -# params['max_version_ids'] = ','.join(ids_list) -# params['table'] = tbl -# -# # "Удаляем" старые данные, вставляя записи с отрицательным знаком -# # Поскольку выборка происходит из ClickHouse удаление еще недобавленных туда записей невозможно -# query = "INSERT INTO $db.{table} ({fields_with_sign}, _version) " \ -# "SELECT {fields}, -1 AS sign, _version FROM $db.{table} " \ -# "WHERE ({pk_field}, _version) IN ({max_version_ids})". \ -# format(**params) -# -# t1 = time.time() -# settings.CLICKHOUSE_DB.raw(query) -# t2 = time.time() -# statsd.timing('clickhouse.{0}.delete'.format(cls.__name__), t2 - t1) -# -# return {item.pk: item._version for item in max_ids_list} -# -# @classmethod -# @transaction.atomic -# def update(cls, django_qs, fake_insert=False): -# """ -# Обновляет в ClickHouse все записи из указанного QuerySet -# :param django_qs: iterable объектов моделей, которые будут обновляться -# :param fake_insert: Если флаг установлен, то данные будут удалены, но не будут вставлены (удобнов в импорте). -# Вместо количества обновлений будет возвращен список объектов для вставки через import_data_from_queryset -# :return: Количество обновленных данных, если fake_insert=False, иначе список объектов django-модели -# """ -# upd_list = list(django_qs) -# -# # Удаляем предыдущие данные -# max_ids_dict = cls.delete(django_qs) -# -# # Теперь вставим обновленные данные -# -# # Устанавливаем версию для новых элементов -# insert_list = [] -# for item in django_qs: -# item._version = max_ids_dict.get(item.pk, -1) + 1 -# insert_list.append(item) -# -# if fake_insert: -# return insert_list -# else: -# res = import_data_from_queryset(cls, insert_list, statsd_key_prefix='upload_clickhouse.' + cls.__name__, -# batch_size=cls.sync_batch_size, no_qs_validation=True, -# create_table_if_not_exist=cls.auto_create_tables) -# statsd.incr('clickhouse.{0}.updates'.format(cls.__name__), len(upd_list)) -# return res -# -# @classmethod -# def optimize(cls, partition=None, final=False): -# """ -# Вызывает оптимизацию таблицы в ClickHouse. -# https://clickhouse.yandex/reference_ru.html#OPTIMIZE -# :param final: Если указан, то оптимизация будет производиться даже когда все данные уже лежат в одном куске -# :param partition: Если указана, то оптимизация будет производиться только для указаной партиции -# :return: None -# """ -# assert partition is None or isinstance(partition, django_models.six.string_types) -# assert type(final) is bool, "final must be boolean" -# assert not final or partition, "final flag is meaningful only wit specified partition" -# -# partition_string = 'PARTITION ' + partition if partition else '' -# final_string = 'FINAL' if final else '' -# settings.CLICKHOUSE_DB.raw("OPTIMIZE TABLE $db.`{0}` {1} {2};".format(cls.table_name(), partition_string, -# final_string)) -# -# @classmethod -# def init_update(cls, model_ids: IterableType[int], database: Optional[str] = None) -> None: -# """ -# Ининциирует обновление данных для объектов с укзаанными id-шниками -# :param model_ids: Список id моделей, которые надо обновлять -# :param database: Данная функция получает id только для одной БД. -# :return: None -# """ -# assert isinstance(model_ids, Iterable), "model_ids must be iterable" -# -# if len(model_ids) > 0: -# if cls.sync_type == 'redis': -# cls.django_model.register_clickhouse_operations('UPDATE', *list(model_ids), database=database) -# else: # if self.sync_type == 'postgres' -# from utils.models import ClickHouseModelOperation -# ClickHouseModelOperation.objects.bulk_update_or_create([ -# {'table': cls.django_model._meta.db_table, -# 'model_id': int(x), -# 'operation': 'UPDATE', -# 'database': (database or 'default')} -# for x in model_ids -# ], update=False, key_fields=('table', 'model_id')) -# -# @classmethod -# def _get_model_fields(cls) -> List[str]: -# """ -# Возвращает поля модели без учета readonly, sign и _version -# :return: List имен полей -# """ -# if not isinstance(cls._fields, dict): -# # DEPRECATED для старой infi.clickhouse-orm -# field_list = [f[0] for f in cls._fields -# if not f[1].readonly and f[0] != cls.clickhouse_sign_field and f[0] != '_version'] -# else: -# field_list = [f_name for f_name, f in cls._fields.items() -# if not f.readonly and f_name != cls.clickhouse_sign_field and f_name != '_version'] -# return field_list -# -# @classmethod -# def correct_minus_records(cls): -# """ -# Проверяет правильность импорта элементов модели и исправляет их при необходимости. -# Импорт неправилен, если сумма sign по первичному ключу меньше 0 -# :return: Количество исправлений -# """ -# # Ищем кривые данные -# query = 'SELECT DISTINCT(id), SUM(sign) AS sum FROM $db.`{table}` GROUP BY id HAVING sum <= 0'. \ -# format(table=cls.table_name()) -# data = set(settings.CLICKHOUSE_DB.select(query)) -# minus_ids = {item.id for item in data if item.sum < 0} -# zero_ids = {item.id for item in data if item.sum == 0} -# if len(minus_ids) > 0 or len(zero_ids) > 0: -# joined_minus_ids = ', '.join([str(x) for x in minus_ids]) -# joined_zero_ids = ', '.join([str(x) for x in zero_ids]) -# print('Ids with minus sum ({0} items): {1}'.format(len(minus_ids), joined_minus_ids or 'not found')) -# print('Ids with zero sum ({0} items): {1}'.format(len(zero_ids), joined_zero_ids or 'not found')) -# -# # Дополняем до корректных записей -# field_list = cls._get_model_fields() -# field_str = ','.join(field_list) -# -# # 1000 здесь взята от балды, как большой _version, который больше всех существующих в системе -# if zero_ids and minus_ids: -# joined_zero_ids = ', ' + joined_zero_ids -# -# if minus_ids: -# minus_ids_select = ''' -# SELECT DISTINCT {fields} FROM $db.`{table}` -# WHERE id IN ({minus_ids}) -# UNION ALL -# '''.format(fields=field_str, table=cls.table_name(), minus_ids=joined_minus_ids) -# else: -# minus_ids_select = '' -# -# upd_query = ''' -# INSERT INTO $db.`{table}` ({fields}, _version, sign) -# SELECT *, toUInt32(1000) AS _version, toInt8(1) AS sign FROM ( -# {minus_ids_select} -# SELECT DISTINCT {fields} FROM $db.`{table}` -# WHERE id IN ({minus_ids}{zero_ids}) -# ) -# '''.format(fields=field_str, table=cls.table_name(), minus_ids=joined_minus_ids, zero_ids=joined_zero_ids, -# minus_ids_select=minus_ids_select) -# settings.CLICKHOUSE_DB.raw(upd_query) -# -# print("Data corrected") -# -# # Инициируем обновление -# cls.init_update(minus_ids | zero_ids) -# print("Updates were planned") -# -# @classmethod -# def recheck(cls, qs: DjangoBaseQuerySet) -> int: -# """ -# В ходе импорта могут возникнуть ошибки - какие-то записи могут не быть вставлены или некорректны -# Данная функция проверяет правильность импорта данных между указанными id -# :param qs: QuerySet с данными, которые надо проверить -# :return: Количество некорректных записей -# """ -# cls._validate_cls_attributes() -# assert isinstance(qs, DjangoBaseQuerySet), "qs must be a QuerySet" -# assert qs.model == cls.django_model, "qs model must be equal to current django_model" -# -# items = list(qs) -# item_ids = [str(item.pk) for item in items] -# -# query = cls.form_query('*', where='id IN (%s)' % ','.join(item_ids), final=True, prewhere_app=False) -# ch_qs = {item.id: item for item in settings.CLICKHOUSE_DB.select(query, model_class=cls)} -# -# insert_ids, update_ids = set(), set() -# for item in items: -# if item.id in ch_qs: -# if ch_qs[item.id] != cls.from_django_model(item): -# update_ids.add(item.id) -# else: -# insert_ids.add(item.id) -# -# cls.init_insert(insert_ids, database=qs.db) -# cls.init_update(update_ids, database=qs.db) -# -# return len(insert_ids) + len(update_ids) -# -# -# class ClickHousePartitionedModelMeta(models.ModelBase): -# def __new__(cls, clsname, superclasses, attributedict): -# res = super().__new__(cls, clsname, superclasses, attributedict) -# -# # Далее нужны if-ы, чтобы корректно отработать ситуации: -# # - с неопределенным свойством -# # - с зацикливанием при отрабатываании _base_class_factory -# -# if clsname != 'ClickHousePartitionedModel': -# # Подменяем engine - модель будет работать с движком Merge -# if hasattr(res, 'engine') and not hasattr(res, 'base_engine'): -# res.base_engine = res.engine -# res.engine = Merge('^' + res.table_name() + '_') -# -# # Подменяем from_django_model, чтобы он подменял таблицу в генерируемом инстансе -# if hasattr(res, 'from_django_model') and not hasattr(res, 'base_from_django_model'): -# res.base_from_django_model = res.from_django_model -# res.from_django_model = res._from_django_model -# -# return res -# -# -# class ClickHousePartitionedModel(models.MergeModel, ClickHouseModel, metaclass=ClickHousePartitionedModelMeta): -# """ -# Этот класс служит для разделения данных на несколько таблиц в ClickHouse. -# При этом объекты базовой модели автоматически распределяются по таблицам -# с помощью метода get_partition_key(django_instance) -# -# Стандартный db.insert будет поднимать исключение (так как движок Merge), что нас устраивает. -# Стандартный db.select(model_class=THIS) будет делать запросы к Merge таблице, что не эффективно, но работает: -# делает запросы параллельно во все таблицы, получая результат -# Для получения данных из конкретной таблицы лучше использовать метод select_by_partition_key(key). -# -# Для того, чтобы исползовать другой базовый подкласс ClickHouseModelConverter достаточно унаследовать дочерний класс -# от обоих классов. -# -# Пример: -# class TestPartitionedModel(ClickHousePartitionedModel, ClickHouseCollapseModel): -# auto_sync = 5 -# django_model = TestModel -# -# @classmethod -# def get_partition_key(cls, django_instance): -# ... -# -# def to_django_model(self, obj=None): -# ... -# -# @classmethod -# def from_django_model(cls, obj) -# ... -# -# """ -# auto_create_tables = True -# -# @classmethod -# def get_partition_key(cls, django_instance): -# """ -# Получает из объекта исходной django-модели строковый ключ, по которому разделяются данные. -# :param django_instance: Объект Django -# :return: Строка - ключ разделения таблиц -# """ -# raise NotImplementedError('Method get_partition_key(django_instance) must is not implemented') -# -# @classmethod -# def base_class_factory(cls, partition_key): -# """ -# Factory для подкласса с конкретной таблицей -# :param partition_key: Ключ разделения таблиц -# :return: -# """ -# BaseClass = models.ModelBase(cls.__name__, (cls,), {}) -# -# # Подмена имени таблицы -# BaseClass.base_table_name = cls.table_name -# BaseClass.table_name = lambda _=None: cls.table_name() + '_' + partition_key -# -# # Движок у базовой таблицы должен быть исходный, а не Merge -# BaseClass.engine = cls.base_engine -# -# # В базовую модель можно вставлять данные -# BaseClass.readonly = False -# -# # Метод не должен проверять движок Merge и устанавливать для него таблицу -# BaseClass.set_database = ClickHouseModel.set_database -# -# # HACK Закостылял переопределением метода -# # BaseClass.create_table_sql = super(models.MergeModel, cls).create_table_sql -# -# # У базовой модели этого поля нет -# if hasattr(BaseClass, '_fields'): -# BaseClass._fields.pop('_table', None) -# -# return BaseClass -# -# @classmethod -# def _from_django_model(cls, obj): -# res = cls.base_from_django_model(obj) -# partition_key = cls.get_partition_key(obj) -# -# # Подменяем класс на базовый -# BaseClass = cls.base_class_factory(partition_key) -# -# # Некоторые преобразователи могут вернуть списки значений, а не одно. -# if isinstance(res, list): -# for item in res: -# if isinstance(item, cls): -# item.__class__ = BaseClass -# else: -# res.__class__ = BaseClass -# -# return res -# -# @classmethod -# def select_by_partition_key(cls, db, partition_key, query): -# """ -# Делает select из конкретной таблицы (не через движок Merge) -# :param db: База данных, из которой делается выборка -# :param partition_key: Ключ конкретной таблицы (как его возвращает метод get_partition_key) -# :param query: Запрос к БД -# :return: Результат select-а -# """ -# assert isinstance(db, Database), "db parameter must be infi.clickhouse_orm.database.Database model instance" -# assert isinstance(partition_key, six.string_types), "partition_key parameter must be string" -# assert isinstance(query, six.string_types), "query parameter must be string" -# -# BaseClass = cls.base_class_factory(partition_key) -# -# return db.select(query, model_class=BaseClass) -# -# @property -# def partition_key(self): -# if hasattr(self, 'base_table_name'): -# table = self.base_table_name() -# else: -# table = self.table_name() -# div_index = len(table) + 1 -# return self._table[div_index:] -# -# @classmethod -# def create_table_sql(cls, db): -# # HACK Мне надо избежать assert-а на Merge модель, чтобы создавать таблицы динамически -# from infi.clickhouse_orm.models import MergeModel -# return super(MergeModel, cls).create_table_sql(db) -# diff --git a/src/django_clickhouse/configuration.py b/src/django_clickhouse/configuration.py index ff00fa6..cc26cc2 100644 --- a/src/django_clickhouse/configuration.py +++ b/src/django_clickhouse/configuration.py @@ -21,6 +21,7 @@ DEFAULTS = { 'STATSD_PREFIX': 'clickhouse', 'MIGRATIONS_PACKAGE': 'clickhouse_migrations', 'MIGRATE_WITH_DEFAULT_DB': True, + 'CELERY_QUEUE': 'celery' } diff --git a/src/django_clickhouse/queryset.py b/src/django_clickhouse/queryset.py new file mode 100644 index 0000000..c16f55a --- /dev/null +++ b/src/django_clickhouse/queryset.py @@ -0,0 +1,105 @@ +# class ClickHouseModel(InfiModel): +# +# @classmethod +# def form_query(cls, select_items: Union[str, Set[str], List[str], Tuple[str]], table: Optional[str] = None, +# final: bool = False, date_filter_field: str = '', date_in_prewhere: bool = True, +# prewhere: Union[str, Set[str], List[str], Tuple[str]] = '', +# where: Union[str, Set[str], List[str], Tuple[str]] = '', +# group_fields: Union[str, Set[str], List[str], Tuple[str]] = '', group_with_totals: bool = False, +# order_by: Union[str, Set[str], List[str], Tuple[str]] = '', +# limit: Optional[int] = None, prewhere_app: bool = True): +# """ +# Формирует запрос к данной таблице +# :param select_items: строка или массив строк, которые надо выбрать в запросе +# :param table: Таблица данного класса по-умолчанию +# :param final: Позволяет выбрать из CollapsingMergeTree только последнюю версию записи. +# :param date_filter_field: Поле, которое надо отфильтровать от start_date до date_end, если задано +# :param date_in_prewhere: Если флаг указан и задано поле date_filter_field, +# то условие будет помещено в секцию PREWHERE, иначе - в WHERE +# :param prewhere: Условие, которое добавляется в prewhere +# :param where: Условие, которое добавляется в where +# :param group_fields: Поля, по которым будет производится группировка +# :param group_with_totals: Позволяет добавить к группировке модификатор with_totals +# :param order_by: Поле или массив полей, по которым сортируется результат +# :param limit: Лимит на количество записей +# :param prewhere_app: Автоматически добавляет в prewhere фильтр по app_id +# :return: Запрос, в пределах приложения +# """ +# assert isinstance(select_items, (str, list, tuple, set)), "select_items must be string, list, tuple or set" +# assert table is None or isinstance(table, str), "table must be string or None" +# assert isinstance(final, bool), "final must be boolean" +# assert isinstance(date_filter_field, str), "date_filter_field must be string" +# assert isinstance(date_in_prewhere, bool), "date_in_prewhere must be boolean" +# assert isinstance(prewhere, (str, list, tuple, set)), "prewhere must be string, list, tuple or set" +# assert isinstance(where, (str, list, tuple, set)), "where must be string, list, tuple or set" +# assert isinstance(group_fields, (str, list, tuple, set)), "group_fields must be string, list, tuple or set" +# assert isinstance(group_with_totals, bool), "group_with_totals must be boolean" +# assert isinstance(order_by, (str, list, tuple, set)), "group_fields must be string, list, tuple or set" +# assert limit is None or isinstance(limit, int) and limit > 0, "limit must be None or positive integer" +# assert isinstance(prewhere_app, bool), "prewhere_app must be boolean" +# +# table = table or '$db.`{0}`'.format(cls.table_name()) +# final = 'FINAL' if final else '' +# +# if prewhere: +# if not isinstance(prewhere, str): +# prewhere = '(%s)' % ') AND ('.join(prewhere) +# +# if prewhere_app: +# prewhere = '`app_id`={app_id} AND (' + prewhere + ')' if prewhere else '`app_id`={app_id}' +# +# if prewhere: +# prewhere = 'PREWHERE ' + prewhere +# +# if where: +# if not isinstance(where, str): +# where = ' AND '.join(where) +# where = 'WHERE ' + where +# +# if not isinstance(select_items, str): +# # Исключим пустые строки +# select_items = [item for item in select_items if item] +# select_items = ', '.join(select_items) +# +# if group_fields: +# if not isinstance(group_fields, str): +# group_fields = ', '.join(group_fields) +# +# group_fields = 'GROUP BY %s' % group_fields +# +# if group_with_totals: +# group_fields += ' WITH TOTALS' +# +# if order_by: +# if not isinstance(order_by, str): +# order_by = ', '.join(order_by) +# +# order_by = 'ORDER BY ' + order_by +# +# if date_filter_field: +# cond = "`%s` >= '{start_date}' AND `%s` < '{end_date}'" % (date_filter_field, date_filter_field) +# if date_in_prewhere: +# prewhere += ' AND ' + cond +# elif where: +# where += ' AND ' + cond +# else: +# where = 'WHERE ' + cond +# +# limit = "LIMIT {0}".format(limit) if limit else '' +# +# query = ''' +# SELECT %s +# FROM %s %s +# +# %s +# %s +# %s %s %s +# ''' % (select_items, table, final, prewhere, where, group_fields, order_by, limit) +# +# # Моя спец функция, сокращающая запись проверки даты на Null в запросах. +# # В скобках не может быть других скобок +# # (иначе надо делать сложную проверку скобочной последовательности, решил пока не заморачиваться) +# # Фактически, функция приводит выражение в скобках к timestamp и смотрит, что оно больше 0 +# query = re.sub(r'\$dateIsNotNull\s*(\([^()]*\))', r'toUInt64(toDateTime(\1)) > 0', query) +# +# return re.sub(r'\s+', ' ', query.strip()) \ No newline at end of file diff --git a/src/django_clickhouse/storages.py b/src/django_clickhouse/storages.py index ae5c7da..c36cfc1 100644 --- a/src/django_clickhouse/storages.py +++ b/src/django_clickhouse/storages.py @@ -10,8 +10,10 @@ import datetime from itertools import chain from typing import Any, Optional, List, Tuple, Iterable -from .exceptions import ConfigurationError +from statsd.defaults.django import statsd + from .configuration import config +from .exceptions import ConfigurationError class Storage: @@ -49,6 +51,16 @@ class Storage: """ pass + def post_batch_removed(self, import_key, batch_size): # type: (str, int) -> None + """ + This method marks that batch has been removed in statsd + :param import_key: A key, returned by ClickHouseModel.get_import_key() method + :param batch_size: Batch size to subtract from queue counter + :return: None + """ + key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key) + statsd.gauge(key, -batch_size, delta=True) + def get_import_batch(self, import_key, **kwargs): # type: (str, **dict) -> Optional[Tuple[str]] """ @@ -105,6 +117,9 @@ class Storage: if operation not in {'insert', 'update', 'delete'}: raise ValueError('operation must be one of [insert, update, delete]') + statsd_key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key) + statsd.gauge(statsd_key, len(pks), delta=True) + return self.register_operations(import_key, operation, *pks) def flush(self): @@ -176,10 +191,15 @@ class RedisStorage(Storage): score = self._redis.get(ts_key) if score: - self._redis.pipeline()\ - .zremrangebyscore(ops_key, '-inf', float(score))\ - .delete(batch_key)\ + res = self._redis.pipeline() \ + .zremrangebyscore(ops_key, '-inf', float(score)) \ + .delete(batch_key) \ .execute() + batch_size = int(res[1]) + else: + batch_size = 0 + + self.post_batch_removed(import_key, batch_size) def flush(self): key_tpls = [ diff --git a/src/django_clickhouse/tasks.py b/src/django_clickhouse/tasks.py index b83fe5d..cce3099 100644 --- a/src/django_clickhouse/tasks.py +++ b/src/django_clickhouse/tasks.py @@ -1,184 +1,40 @@ +from celery import shared_task +from django.conf import settings +from infi.clickhouse_orm.utils import import_submodules +from statsd.defaults.django import statsd -@shared_task(queue='service') -@statsd.timer('clickhouse.import_clickhouse_model_objects') -def import_clickhouse_model_objects(model_objects, statsd_key_prefix='upload_clickhouse.all', - batch_size=settings.CLICKHOUSE_INSERT_SIZE, - create_table_if_not_exist=False): - """ - Загрузка данных в Yandex ClickHouse - :param model_objects: Список объектов модели ClickHouse - :param statsd_key_prefix: Префикс для statsd, чтобы выделить какую-то группу запросов с отдельным ключом. - Пишется время обработки данных (data_prepare) и время выполнения запроса (request) - :param batch_size: Максимальное количество данных, вставляемых за 1 INSERT запрос - :param create_table_if_not_exist: Если флаг установлен, перед вставкой данных проверит существование таблицы - и создаст ее, если ее нет - :return: Количество импортированных элементов - """ - assert isinstance(model_objects, Iterable), "model_objects must be Iterable" - for obj in model_objects: - assert isinstance(obj, models.Model), "model_objects must contain infi.clickhouse_orm.models.Model instances" - - if not model_objects: - return 0 - - t1 = time.time() - # Поскольку составные модели могут вернуть не один объект, а список объектов, - # то необходимо разбить их по таблице назначения - convert_items = defaultdict(list) - for obj in model_objects: - convert_items[(obj.__class__, obj.table_name())].append(obj) - - t2 = time.time() - statsd.timing(statsd_key_prefix + '.import_clickhouse_model_objects.convert', t2 - t1) - - for (model_class, _), model_group in convert_items.items(): - if create_table_if_not_exist: - settings.CLICKHOUSE_DB.create_table(model_class) - settings.CLICKHOUSE_DB.insert(model_group, batch_size=batch_size) - t3 = time.time() - statsd.timing(statsd_key_prefix + '.import_clickhouse_model_objects.insert', t3 - t2) - - return len(model_objects) +from django_clickhouse.clickhouse_models import ClickHouseModel +from .configuration import config +from .utils import get_subclasses -@shared_task(queue='service') -@statsd.timer('clickhouse.import_data_from_queryset') -def import_data_from_queryset(ch_model, django_qs, statsd_key_prefix='upload_clickhouse.all', - batch_size=settings.CLICKHOUSE_INSERT_SIZE, no_qs_validation=False, - create_table_if_not_exist=False): - """ - Загрузка данных в Yandex ClickHouse - :param ch_model: infi model to import data to. It must have from_django_model() method - :param django_qs: QuerySet элементов, которые надо загрузить - :param statsd_key_prefix: Префикс для statsd, чтобы выделить какую-то группу запросов с отдельным ключом. - Пишется время обработки данных (data_prepare) и время выполнения запроса (request) - :param batch_size: Максимальное количество данных, вставляемых за 1 INSERT запрос - :param no_qs_validation: Иногда удобно передать не QuerySet, а сразу список объектов модели. - Тогда мы его не будем валидировать. - :param create_table_if_not_exist: Если флаг установлен, перед вставкой данных проверит существование таблицы - и создаст ее, если ее нет - :return: Количество импортированных элементов - """ - - def _update_version(model_obj, django_obj): - if hasattr(model_obj, '_version'): - model_obj._version = getattr(django_obj, '_version', 0) - - return model_obj - - assert inspect.isclass(ch_model), "ch_model must be a subclass of ClickHouseModelConverter" - assert issubclass(ch_model, ClickHouseModelConverter), "ch_model must be a subclass of ClickHouseModelConverter" - if not no_qs_validation: - assert isinstance(django_qs, django_models.QuerySet) and django_qs.model == ch_model.django_model, \ - "django_qs must be queryset of {0}".format(ch_model.django_model.__name__) - - t1 = time.time() - items = list(django_qs) - t2 = time.time() - if not items: - statsd.timing(statsd_key_prefix + '.import_data_from_queryset.empty_inserts', t2 - t1) - return - - statsd.timing(statsd_key_prefix + '.import_data_from_queryset.fetch', t2 - t1) - - insert_items = [] - for item in items: - convert_res = ch_model.from_django_model(item) - if isinstance(convert_res, Iterable): - insert_items.extend( - [_update_version(model_obj, item) for model_obj in convert_res if model_obj is not None]) - else: - insert_items.append(_update_version(convert_res, item)) - - t3 = time.time() - statsd.timing(statsd_key_prefix + '.import_data_from_queryset.convert', t3 - t2) - - res = import_clickhouse_model_objects(insert_items, statsd_key_prefix=statsd_key_prefix, batch_size=batch_size, - create_table_if_not_exist=create_table_if_not_exist) - t4 = time.time() - statsd.timing(statsd_key_prefix + '.import_data_from_queryset.insert', t4 - t3) - - return res - - -@shared_task(queue='service') -@statsd.timer('clickhouse.import_data') -def import_data(ch_model, pk_start, pk_end, statsd_key_prefix='upload_clickhouse.all', - batch_size=settings.CLICKHOUSE_INSERT_SIZE): - """ - Загрузка данных в Yandex ClickHouse - :param ch_model: infi model to import data to. It must have from_django_model() method - :param pk_start: Стартовый pk выборки - :param pk_end: Конечный pk выборки - :param statsd_key_prefix: Префикс для statsd, чтобы выделить какую-то группу запросов с отдельным ключом. - Пишется время обработки данных (data_prepare) и время выполнения запроса (request) - :param batch_size: Максимальное количество данных, вставляемых за 1 INSERT запрос - :return: Количество импортированных элементов - """ - assert inspect.isclass(ch_model), "ch_model must be a subclass of ClickHouseModelConverter" - assert issubclass(ch_model, ClickHouseModelConverter), "ch_model must be a subclass of ClickHouseModelConverter" - return import_data_from_queryset(ch_model, ch_model.django_model.objects.filter(pk__gte=pk_start, pk__lt=pk_end), - statsd_key_prefix=statsd_key_prefix, batch_size=batch_size) - - -@shared_task(queue='service') +@shared_task(queue=config.CELERY_QUEUE) def sync_clickhouse_converter(cls): """ - Синхронизирует один указанный класс с ClickHouse + Syncs one batch of given ClickHouseModel :param cls: Наследник ClickHouseModelConverter :return: Количество загруженных в ClickHouse записей """ - assert inspect.isclass(cls), "cls must be ClickHouseModelConverter subclass" - assert issubclass(cls, ClickHouseModelConverter), "cls must be ClickHouseModelConverter subclass" - - t1 = time.time() - result = cls.import_data() - t2 = time.time() - - statsd.timing('sync_clickhouse_model.' + cls.__name__, t2 - t1) + statsd_key = "%s.sync.%s.time" % (config.STATSD_PREFIX, cls.__name__) + with statsd.timing(statsd_key): + result = cls.sync_batch_from_storage() return result -@shared_task(queue='service') -@statsd.timer('clickhouse.auto_sync') +@shared_task(queue=config.CELERY_QUEUE) def clickhouse_auto_sync(): """ - Выгружает в ClickHouse данные всех наследников ClickHouseModelConverter, у которых установлен атрибут auto_sync + Plans syncing models :return: """ - # Импортируем все модели, иначе get_subclasses их не увидит + # Import all model modules for app in settings.INSTALLED_APPS: - pack = app + ".ch_models" - spam_spec = importlib.util.find_spec(pack) - if spam_spec is not None: - importlib.import_module(pack) + import_submodules("%s.%s" % (app, config.MODELS_MODULE)) # Запускаем - for cls in get_subclasses(ClickHouseModelConverter, recursive=True): + for cls in get_subclasses(ClickHouseModel, recursive=True): if cls.start_sync(): # Даже если синхронизация вдруг не выполнится, не страшно, что мы установили период синхронизации # Она выполнится следующей таской через интервал. sync_clickhouse_converter.delay(cls) - - -class ReSyncClickHouseModel(PkLongMigration): - ch_model = None # type: ClickHouseBaseModel - - @property - def hints(self): - return {'model_name': self.ch_model.django_model.__class__.__name__} - - def forwards_batch(self, qs, using: Optional[str] = None): - result = self.ch_model.recheck(qs) - - items = list(qs) - - next_start_id = None if len(items) < self.batch_size else min(item.id for item in items) - return result, next_start_id - - def backwords_batch(self, qs, using: Optional[str] = None): - return 0, None - - def get_queryset(self): - return self.ch_model.django_model.objects.all() \ No newline at end of file diff --git a/src/django_clickhouse/utils.py b/src/django_clickhouse/utils.py index 2aeca4d..8710fc7 100644 --- a/src/django_clickhouse/utils.py +++ b/src/django_clickhouse/utils.py @@ -1,5 +1,5 @@ import datetime -from typing import Union, Any, Optional +from typing import Union, Any, Optional, TypeVar, Set import pytz import six @@ -10,6 +10,9 @@ from infi.clickhouse_orm.database import Database from .database import connections +T = TypeVar('T') + + def get_tz_offset(db_alias=None): # type: (Optional[str]) -> int """ Returns ClickHouse server timezone offset in minutes @@ -67,3 +70,20 @@ def lazy_class_import(obj): # type: (Union[str, Any]) -> Any raise ImportError('Invalid import path `%s`' % obj) else: return obj + + +def get_subclasses(cls, recursive=False): # type: (T, bool) -> Set[T] + """ + Gets all subclasses of given class + Attention!!! Classes would be found only if they were imported before using this function + :param cls: Class to get subcalsses + :param recursive: If flag is set, returns subclasses of subclasses and so on too + :return: A list of subclasses + """ + subclasses = set(cls.__subclasses__()) + + if recursive: + for subcls in subclasses.copy(): + subclasses.update(get_subclasses(subcls, recursive=True)) + + return subclasses \ No newline at end of file