diff --git a/.travis.yml b/.travis.yml index dade7ce..b775fa2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -27,28 +27,42 @@ addons: - postgresql-11 - postgresql-contrib-11 - postgresql-client-11 + - postgresql-12 + - postgresql-contrib-12 + - postgresql-client-12 - unzip - rabbitmq-server python: - 3.6 - 3.7 + - 3.8 env: - PG=9.6 DJANGO=2.1 - PG=10 DJANGO=2.1 - PG=11 DJANGO=2.1 + - PG=12 DJANGO=2.1 - PG=9.6 DJANGO=2.2 - PG=10 DJANGO=2.2 - PG=11 DJANGO=2.2 + - PG=12 DJANGO=2.2 + - PG=9.6 DJANGO=3.0 + - PG=10 DJANGO=3.0 + - PG=11 DJANGO=3.0 + - PG=12 DJANGO=3.0 before_install: # Use default PostgreSQL 11 port - sudo sed -i 's/port = 5433/port = 5432/' /etc/postgresql/11/main/postgresql.conf - sudo cp /etc/postgresql/{10,11}/main/pg_hba.conf + - sudo sed -i 's/port = 5434/port = 5432/' /etc/postgresql/12/main/postgresql.conf + - sudo cp /etc/postgresql/{10,12}/main/pg_hba.conf + # Start PostgreSQL version we need - - sudo systemctl stop postgresql && sudo systemctl start postgresql@$PG-main + - sudo systemctl stop postgresql + - sudo systemctl start postgresql@$PG-main # ClickHouse sources - sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4 @@ -60,9 +74,8 @@ install: - sudo apt-get install clickhouse-client clickhouse-server clickhouse-common-static - sudo service clickhouse-server restart - - pip install -r requirements.txt + - pip install -r requirements-test.txt - pip install -q Django==$DJANGO.* - - pip install redis - python setup.py -q install before_script: diff --git a/README.md b/README.md index e3a58ff..2d550c9 100644 --- a/README.md +++ b/README.md @@ -1 +1,2 @@ -# django-clickhouse \ No newline at end of file +# django-clickhouse +Documentation is [here](docs/index.md) \ No newline at end of file diff --git a/docs/basic_information.md b/docs/basic_information.md new file mode 100644 index 0000000..f0bac10 --- /dev/null +++ b/docs/basic_information.md @@ -0,0 +1,37 @@ +# Basic information +## About +This project's goal is to build [Yandex ClickHouse](https://clickhouse.yandex/) database into [Django](https://www.djangoproject.com/) project. +It is based on [infi.clickhouse-orm](https://github.com/Infinidat/infi.clickhouse_orm) library. + +## Features +* Multiple ClickHouse database configuration in [settings.py](https://docs.djangoproject.com/en/2.1/ref/settings/) +* ORM to create and manage ClickHouse models. +* ClickHouse migration system. +* Scalable serialization of django model instances to ORM model instances. +* Effective periodical synchronization of django models to ClickHouse without loosing data. +* Synchronization process monitoring. + +## Requirements +* [Python 3](https://www.python.org/downloads/) +* [Django](https://docs.djangoproject.com/) 1.7+ +* [Yandex ClickHouse](https://clickhouse.yandex/) +* [infi.clickhouse-orm](https://github.com/Infinidat/infi.clickhouse_orm) +* [pytz](https://pypi.org/project/pytz/) +* [six](https://pypi.org/project/six/) +* [typing](https://pypi.org/project/typing/) +* [psycopg2](https://www.psycopg.org/) +* [celery](http://www.celeryproject.org/) +* [statsd](https://pypi.org/project/statsd/) + +### Optional libraries +* [redis-py](https://redis-py.readthedocs.io/en/latest/) for [RedisStorage](storages.md#redisstorage) +* [django-pg-returning](https://github.com/M1hacka/django-pg-returning) + for optimizing registering updates in [PostgreSQL](https://www.postgresql.org/) +* [django-pg-bulk-update](https://github.com/M1hacka/django-pg-bulk-update) + for performing effective bulk update and create operations in [PostgreSQL](https://www.postgresql.org/) + +## Installation +Install via pip: +`pip install django-clickhouse` ([not released yet](https://github.com/carrotquest/django-clickhouse/issues/3)) +or via setup.py: +`python setup.py install` diff --git a/docs/configuration.md b/docs/configuration.md new file mode 100644 index 0000000..78c9487 --- /dev/null +++ b/docs/configuration.md @@ -0,0 +1,96 @@ +# Configuration + +Library configuration is made in settings.py. All parameters start with `CLICKHOUSE_` prefix. +Prefix can be changed using `CLICKHOUSE_SETTINGS_PREFIX` parameter. + +### CLICKHOUSE_SETTINGS_PREFIX +Defaults to: `'CLICKHOUSE_'` +You can change `CLICKHOUSE_` prefix in settings using this parameter to anything your like. + +### CLICKHOUSE_DATABASES +Defaults to: `{}` +A dictionary, defining databases in django-like style. +Key is an alias to communicate with this database in [connections](databases.md#getting-database-objects) and [using](routing.md#settings-database-in-queryset). +Value is a configuration dict with parameters: +* [infi.clickhouse_orm database parameters](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/class_reference.md#database) +* `migrate: bool` - indicates if this database should be migrated. See [migrations](migrations.md). + +Example: +```python +CLICKHOUSE_DATABASES = { + 'default': { + 'db_name': 'test', + 'username': 'default', + 'password': '' + }, + 'reader': { + 'db_name': 'read_only', + 'username': 'reader', + 'readonly': True, + 'password': '' + } +} +``` + +### CLICKHOUSE_DEFAULT_DB_ALIAS +Defaults to: `'default'` +A database alias to use in [QuerySets](queries.md) if direct [using](routing.md#settings-database-in-queryset) is not specified. + +### CLICKHOUSE_SYNC_STORAGE +Defaults to: `'django_clickhouse.storages.RedisStorage'` +An [intermediate storage](storages.md) class to use. Can be a string or class. + +### CLICKHOUSE_REDIS_CONFIG +Default to: `None` +Redis configuration for [RedisStorage](storages.md#redisstorage). +If given, should be a dictionary of parameters to pass to [redis-py](https://redis-py.readthedocs.io/en/latest/#redis.Redis). + +Example: +```python +CLICKHOUSE_REDIS_CONFIG = { + 'host': '127.0.0.1', + 'port': 6379, + 'db': 8, + 'socket_timeout': 10 +} +``` + +### CLICKHOUSE_SYNC_BATCH_SIZE +Defaults to: `10000` +Maximum number of operations, fetched by sync process from [intermediate storage](storages.md) per [sync](sync.md)) round. + +### CLICKHOUSE_SYNC_DELAY +Defaults to: `5` +A delay in seconds between two [sync](synchronization.md) rounds start. + +### CLICKHOUSE_MODELS_MODULE +Defaults to: `'clickhouse_models'` +Module name inside [django app](https://docs.djangoproject.com/en/3.0/intro/tutorial01/), +where [ClickHouseModel](models.md#clickhousemodel) classes are search during migrations. + +### CLICKHOUSE_DATABASE_ROUTER +Defaults to: `'django_clickhouse.routers.DefaultRouter'` +A dotted path to class, representing [database router](routing.md#router). + +### CLICKHOUSE_MIGRATIONS_PACKAGE +Defaults to: `'clickhouse_migrations'` +A python package name inside [django app](https://docs.djangoproject.com/en/3.0/intro/tutorial01/), +where migration files are searched. + +### CLICKHOUSE_MIGRATION_HISTORY_MODEL +Defaults to: `'django_clickhouse.migrations.MigrationHistory'` +A dotted name of a ClickHouseModel subclass (including module path), + representing [MigrationHistory model](migrations.md#migrationhistory-clickhousemodel). + +### CLICKHOUSE_MIGRATE_WITH_DEFAULT_DB +Defaults to: `True` +A boolean flag enabling automatic ClickHouse migration, +when you call [`migrate`](https://docs.djangoproject.com/en/2.2/ref/django-admin/#django-admin-migrate) on `default` database. + +### CLICKHOUSE_STATSD_PREFIX +Defaults to: `clickhouse` +A prefix in [statsd](https://pythonhosted.org/python-statsd/) added to each library metric. See [monitoring](monitoring.md). + +### CLICKHOUSE_CELERY_QUEUE +Defaults to: `'celery'` +A name of a queue, used by celery to plan library sync tasks. diff --git a/docs/databases.md b/docs/databases.md new file mode 100644 index 0000000..dc1109d --- /dev/null +++ b/docs/databases.md @@ -0,0 +1,37 @@ +# Databases +Direct usage of `Database` objects is not expected in this library. But in some cases, you may still need them. +This section describes `Database` objects and there usage. + +`django_clickhouse.database.Database` is a class, describing a ClickHouse database connection. + +## Getting database objects +To get a `Database` object by its alias name in [CLICKHOUSE_DATABASES](configuration.md#clickhouse_databases) + use `django_clickhouse.database.connections` object. +This object is a `django_clickhouse.database.ConnectionProxy` instance: + it creates `Database` objects when they are used for the first time and stores them in memory. + +Example: +```python +from django_clickhouse.database import connections + +# Database objects are inited on first call +db = connections['default'] +secondary = connections['secondary'] + +# Already inited - object is returned from memory +db_link = connections['default'] +``` + +You can also get database objects from [QuerySet](queries.md) and [ClickHouseModel](models.md) instances by calling `get_database(for_write: bool = False)` method. +This database may differ, depending on [routing](routing.md#router) you use. + +## Database object +Database class is based on [infi.clickhouse_orm Database object](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/models_and_databases.md#models-and-databases), +but extends it with some extra attributes and methods: + +### Database migrations are restricted +I expect this library [migration system](migrations.md) to be used. +Direct database migration will lead to migration information errors. + +### `insert_tuples` and `select_tuples` methods +Methods to work with [ClickHouseModel namedtuples](models.md#clickhousemodel-namedtuple-form). diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..8aedf21 --- /dev/null +++ b/docs/index.md @@ -0,0 +1,22 @@ +# Table of contents + +* [Basic information](basic_information.md) + * [About](basic_information.md#about) + * [Features](basic_information.md#features) + * [Requirements](basic_information.md#requirements) + * [Installation](basic_information.md#installation) + * [Design motivation](motivation.md) +* [Usage](overview.md) + * [Overview](overview.md) + * [Models](models.md) + * [DjangoModel](models.md#DjangoModel) + * [ClickHouseModel](models.md#ClickHouseModel) + * [Making queries](queries.md) + * [Databases](models.md) + * [Routing](routing.md) + * [Migrations](migrations.md) + * [Synchronization](synchronization.md) + * [Storages](storages.md) + * [RedisStorage](storages.md#redisstorage) + * [Monitoring](monitoring.md) + * [Performance notes](performance.md) diff --git a/docs/migrations.md b/docs/migrations.md new file mode 100644 index 0000000..f71d77b --- /dev/null +++ b/docs/migrations.md @@ -0,0 +1,77 @@ +# Migrations +Migration system allows to make migrate ClickHouse table schema based on `ClickHouseModel`. +Library migrations are based on [infi.clickhouse_orm migration system](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/schema_migrations.md), +but makes it a little bit more django-like. + +## File structure +Each django app can have optional `clickhouse_migrations` package. + This is a default package name, it can be changed with [CLICKHOUSE_MIGRATIONS_PACKAGE](configuration.md#clickhouse_migrations_package) setting. + +Package contains py files, starting with 4-digit number. +A number gives an order in which migrations will be applied. + +Example: +``` +my_app +>> clickhouse_migrations +>>>> __init__.py +>>>> 0001_initial.py +>>>> 0002_add_new_field_to_my_model.py +>> clickhouse_models.py +>> urls.py +>> views.py +``` + +## Migration files +Each file must contain a `Migration` class, inherited from `django_clickhouse.migrations.Migration`. +The class should define an `operations` attribute - a list of operations to apply one by one. +Operation is one of [operations, supported by infi.clickhouse-orm](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/schema_migrations.md). + +```python +from django_clickhouse import migrations +from my_app.clickhouse_models import ClickHouseUser + +class Migration(migrations.Migration): + operations = [ + migrations.CreateTable(ClickHouseUser) + ] +``` + +## MigrationHistory ClickHouseModel +This model stores information about applied migrations. +By default, library uses `django_clickhouse.migrations.MigrationHistory` model, + but this can be changed using `CLICKHOUSE_MIGRATION_HISTORY_MODEL` setting. +For instance, if you want to make it replicated, you have to redeclare tables engine. + +MigrationHistory model is stored in default database. + + +## Automatic migrations +When library is installed, it tries applying migrations every time, +you call [django migrate](https://docs.djangoproject.com/en/3.0/ref/django-admin/#django-admin-migrate). If you want to disable this, use [CLICKHOUSE_MIGRATE_WITH_DEFAULT_DB](configuration.md#clickhouse_migrate_with_default_db) setting. + +By default migrations are applied to all [CLICKHOUSE_DATABASES](configuration.md#clickhouse_databases), which have no flags: +* `'migrate': False` +* `'readonly': True` + +Note: migrations are only applied, with django `default` database. +So if you call `python manage.py migrate --database=secondary` they wouldn't be applied. + +## Migration algorithm +- Get a list of databases from `CLICKHOUSE_DATABASES` setting. Migrate them one by one. + - Find all django apps from `INSTALLED_APPS` setting, which have no `readonly=True` attribute and have `migrate=True` attribute. Migrate them one by one. + * Iterate over `INSTAALLED_APPS`, searching for [clickhouse_migrations package](#file-structure) + * If package was not found, skip app. + * Get a list of migrations applied from [MigrationHistory model](#migrationhistory-clickhousemodel) + * Get a list of unapplied migrations + * Get [Migration class](#migration-files) from each migration and call it `apply()` method + * `apply()` iterates operations, checking if it should be applied with [router](routing.md) + * If migration should be applied, it is applied + * Mark migration as applied in [MigrationHistory model](#migrationhistory-clickhousemodel) + +## Security notes +1) ClickHouse has no transaction system, as django relational databases. + As a result, if migration fails, it would be partially applied and there's no correct way to rollback. + I recommend to make migrations as small as possible, so it should be easier to determine and correct the result if something goes wrong. +2) Unlike django, this library is enable to unapply migrations. + This functionality may be implemented in the future. diff --git a/docs/models.md b/docs/models.md new file mode 100644 index 0000000..95e4435 --- /dev/null +++ b/docs/models.md @@ -0,0 +1,153 @@ +# Models +Model is a pythonic class representing database table in your code. + It also defines an interface (methods) to perform operations on this table + and describes its configuration inside framework. + +This library operates 2 kinds of models: +* DjangoModel, describing tables in source relational database (PostgreSQL, MySQL, etc.) +* ClickHouseModel, describing models in [ClickHouse](https://clickhouse.yandex/docs/en) database + +In order to distinguish them, I will refer them as ClickHouseModel and DjangoModel in further documentation. + +## DjangoModel +Django provides a [model system](https://docs.djangoproject.com/en/3.0/topics/db/models/) + to interact with relational databases. + In order to perform [synchronization](synchronization.md) we need to "catch" all [DML operations](https://en.wikipedia.org/wiki/Data_manipulation_language) + on source django model and save information about them in [storage](storages.md). + To achieve this, library introduces abstract `django_clickhouse.models.ClickHouseSyncModel` class. + Each model, inherited from `ClickHouseSyncModel` will automatically save information, needed to sync to storage. +Read [synchronization](synchronization.md) section for more info. + +`ClickHouseSyncModel` saves information about: +* `Model.objects.create()`, `Model.objects.bulk_create()` +* `Model.save()`, `Model.delete()` +* `QuerySet.update()`, `QuerySet.delete()` +* All queries of [django-pg-returning](https://pypi.org/project/django-pg-returning/) library +* All queries of [django-pg-bulk-update](https://pypi.org/project/django-pg-bulk-update/) library + +You can also combine your custom django manager and queryset using mixins from `django_clickhouse.models` package: + +**Important note**: Operations are saved in [transaction.on_commit()](https://docs.djangoproject.com/en/2.2/topics/db/transactions/#django.db.transaction.on_commit). + The goal is avoiding syncing operations, not committed to relational database. + But this may also provide bad effect: situation, when transaction is committed, + but it hasn't been registered, if something went wrong during registration. + +Example: +```python +from django_clickhouse.models import ClickHouseSyncModel +from django.db import models +from datetime import date + +class User(ClickHouseSyncModel): + first_name = models.CharField(max_length=50) + age = models.IntegerField() + birthday = models.DateField() + +# All operations will be registered to sync with ClickHouse models: +User.objects.create(first_name='Alice', age=16, birthday=date(2003, 6, 1)) +User(first_name='Bob', age=17, birthday=date(2002, 1, 1)).save() +User.objects.update(first_name='Candy') + +# Custom manager + +``` + +## ClickHouseModel +This kind of model is based on [infi.clickhouse_orm Model](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/models_and_databases.md#defining-models) + and represents table in [ClickHouse database](https://clickhouse.yandex/docs/en). + +You should define `ClickHouseModel` subclass for each table you want to access and sync in ClickHouse. +Each model should be inherited from `django_clickhouse.clickhouse_models.ClickHouseModel`. +By default, models are searched in `clickhouse_models` module of each django app. +You can change modules name, using setting [CLICKHOUSE_MODELS_MODULE](configuration.md#clickhouse_models_module) + +You can read more about creating models and fields [here](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/models_and_databases.md#defining-models): +all capabilities are supported. At the same time, django-clickhouse libraries adds: +* [routing attributes and methods](routing.md) +* [sync attributes and methods](synchronization.md) + +Example: +```python +from django_clickhouse.clickhouse_models import ClickHouseModel +from django_clickhouse.engines import MergeTree +from infi.clickhouse_orm import fields +from my_app.models import User + + +class HeightData(ClickHouseModel): + django_model = User + + first_name = fields.StringField() + birthday = fields.DateField() + height = fields.Float32Field() + + engine = MergeTree('birthday', ('first_name', 'last_name', 'birthday')) + + +class AgeData(ClickHouseModel): + django_model = User + + first_name = fields.StringField() + birthday = fields.DateField() + age = fields.UInt32Field() + + engine = MergeTree('birthday', ('first_name', 'last_name', 'birthday')) +``` + +### ClickHouseMultiModel +In some cases you may need to sync single DjangoModel to multiple ClickHouse models. +This model gives ability to reduce number of relational database operations. +You can read more in [sync](synchronization.md) section. + +Example: +```python +from django_clickhouse.clickhouse_models import ClickHouseMultiModel +from my_app.models import User + +class MyMultiModel(ClickHouseMultiModel): + django_model = User + sub_models = [AgeData, HeightData] +``` + +## ClickHouseModel namedtuple form +[infi.clickhouse_orm](https://github.com/Infinidat/infi.clickhouse_orm) stores data rows in special Model objects. +It works well on hundreds of records. +But when you sync 100k records in a batch, initializing 100k model instances will be slow. +Too optimize this process `ClickHouseModel` class have `get_tuple_class()` method. +It generates a [namedtuple](https://docs.python.org/3/library/collections.html#collections.namedtuple) class, +with same data fields a model has. +Initializing such tuples takes much less time, then initializing Model objects. + +## Engines +Engine is a way of storing, indexing, replicating and sorting data ClickHouse ([docs](https://clickhouse.yandex/docs/en/operations/table_engines/)). +Engine system is based on [infi.clickhouse_orm engine system](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/table_engines.md#table-engines). +This library extends original engine classes as each engine can have it's own synchronization mechanics. +Engines are defined in `django_clickhouse.engines` module. + +Currently supported engines (with all infi functionality, [more info](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/table_engines.md#data-replication)): +* `MergeTree` +* `ReplacingMergeTree` +* `SummingMergeTree` +* `CollapsingMergeTree` + + +## Serializers +Serializer is a class which translates django model instances to [namedtuples, inserted into ClickHouse](#clickhousemodel-namedtuple-form). +`django_clickhouse.serializers.Django2ClickHouseModelSerializer` is used by default in all models. + All serializers must inherit this class. + +Serializer must implement next interface: +```python +from django_clickhouse.serializers import Django2ClickHouseModelSerializer +from django.db.models import Model as DjangoModel +from typing import * + +class CustomSerializer(Django2ClickHouseModelSerializer): + def __init__(self, model_cls: Type['ClickHouseModel'], fields: Optional[Iterable[str]] = None, + exclude_fields: Optional[Iterable[str]] = None, writable: bool = False, + defaults: Optional[dict] = None) -> None: + super().__init__(model_cls, fields=fields, exclude_fields=exclude_fields, writable=writable, defaults=defaults) + + def serialize(self, obj: DjangoModel) -> NamedTuple: + pass +``` diff --git a/docs/monitoring.md b/docs/monitoring.md new file mode 100644 index 0000000..8567813 --- /dev/null +++ b/docs/monitoring.md @@ -0,0 +1,55 @@ +# Monitoring +In order to monitor [synchronization](synchronization.md) process, [statsd](https://pypi.org/project/statsd/) is used. +Data from statsd then can be used by [Prometheus exporter](https://github.com/prometheus/statsd_exporter) + or [Graphite](https://graphite.readthedocs.io/en/latest/). + +## Configuration +Library expects statsd to be configured as written in [statsd docs for django](https://statsd.readthedocs.io/en/latest/configure.html#in-django). +You can set a common prefix for all keys in this library using [CLICKHOUSE_STATSD_PREFIX](configuration.md#clickhouse_statsd_prefix) parameter. + +## Exported metrics +## Gauges +* `.sync..queue` + Number of elements in [intermediate storage](storages.md) queue waiting for import. + Queue should not be big. It depends on [sync_delay](synchronization.md#configuration) configured and time for syncing single batch. + It is a good parameter to watch and alert on. + +## Timers +All time is sent in milliseconds. + +* `.sync..total` + Total time of single batch task execution. + +* `.sync..steps.` + `` is one of `pre_sync`, `get_operations`, `get_sync_objects`, `get_insert_batch`, `get_final_versions`, + `insert`, `post_sync`. Read [here](synchronization.md) for more details. + Time of each sync step. Can be useful to debug reasons of long sync process. + +* `.inserted_tuples.` + Time of inserting batch of data into ClickHouse. + It excludes as much python code as it could to distinguish real INSERT time from python data preparation. + +* `.sync..register_operations` + Time of inserting sync operations into storage. + +## Counters + * `.sync..register_operations.` + `` is one or `create`, `update`, `delete`. + Number of DML operations added by DjangoModel methods calls to sync queue. + +* `.sync..operations` + Number of operations, fetched from [storage](storages.md) for sync in one batch. + +* `.sync..import_objects` + Number of objects, fetched from relational storage (based on operations) in order to sync with ClickHouse models. + +* `.inserted_tuples.` + Number of rows inserted to ClickHouse. + +* `.sync..lock.timeout` + Number of locks in [RedisStorage](storages.md#redisstorage), not acquired and skipped by timeout. + This value should be zero. If not, it means your model sync takes longer then sync task call interval. + +* `.sync..lock.hard_release` + Number of locks in [RedisStorage](storages.md#redisstorage), released hardly (as process which required a lock is dead). + This value should be zero. If not, it means your sync tasks are killed hardly during the sync process (by OutOfMemory killer, for instance). diff --git a/docs/motivation.md b/docs/motivation.md new file mode 100644 index 0000000..88c36c0 --- /dev/null +++ b/docs/motivation.md @@ -0,0 +1,35 @@ +# Design motivation +## Separate from django database setting, QuerySet and migration system +ClickHouse SQL and DML language is near to standard, but does not follow it exactly ([docs](https://clickhouse.tech/docs/en/introduction/distinctive_features/#sql-support)). +As a result, it can not be easily integrated into django query subsystem as it expects databases to support: +1. Transactions. +2. INNER/OUTER JOINS by condition. +3. Full featured updates and deletes. +4. Per database replication (ClickHouse has per table replication) +5. Other features, not supported in ClickHouse. + +In order to have more functionality, [infi.clickhouse-orm](https://github.com/Infinidat/infi.clickhouse_orm) + is used as base library for databases, querysets and migrations. The most part of it is compatible and can be used without any changes. + +## Sync over intermediate storage +This library has several goals which lead to intermediate storage: +1. Fail resistant import, does not matter what the fail reason is: + ClickHouse fail, network fail, killing import process by system (OOM, for instance). +2. ClickHouse does not like single row inserts: [docs](https://clickhouse.tech/docs/en/introduction/performance/#performance-when-inserting-data). + So it's worth batching data somewhere before inserting it. + ClickHouse provide BufferEngine for this, but it can loose data if ClickHouse fails - and no one will now about it. +3. Better scalability. Different intermediate storages may be implemented in the future, based on databases, queue systems or even BufferEngine. + +## Replication and routing +In primitive cases people just have single database or cluster with same tables on each replica. +But as ClickHouse has per table replication a more complicated structure can be built: +1. Model A is stored on servers 1 and 2 +2. Model B is stored on servers 2, 3 and 5 +3. Model C is stored on servers 1, 3 and 4 + +Moreover, migration operations in ClickHouse can also be auto-replicated (`ALTER TABLE`, for instance) or not (`CREATE TABLE`). + +In order to make replication scheme scalable: +1. Each model has it's own read / write / migrate [routing configuration](routing.md#clickhousemodel-routing-attributes). +2. You can use [router](routing.md#router) like django does to set basic routing rules for all models or model groups. + \ No newline at end of file diff --git a/docs/overview.md b/docs/overview.md new file mode 100644 index 0000000..2054067 --- /dev/null +++ b/docs/overview.md @@ -0,0 +1,140 @@ +# Usage overview +## Requirements +At the begging I expect, that you already have: +1. [ClickHouse](https://clickhouse.tech/docs/en/) (with [ZooKeeper](https://zookeeper.apache.org/), if you use replication) +2. Relational database used with [Django](https://www.djangoproject.com/). For instance, [PostgreSQL](https://www.postgresql.org/) +3. [Django database set up](https://docs.djangoproject.com/en/3.0/ref/databases/) +4. [Intermediate storage](storages.md) set up. For instance, [Redis](https://redis.io/). + +## Configuration +Add required parameters to [Django settings.py](https://docs.djangoproject.com/en/3.0/topics/settings/): +1. [CLICKHOUSE_DATABASES](configuration.md#clickhouse_databases) +2. [Intermediate storage](storages.md) configuration. For instance, [RedisStorage](storages.md#redisstorage) +3. It's recommended to change [CLICKHOUSE_CELERY_QUEUE](configuration.md#clickhouse_celery_queue) +4. Add sync task to [celerybeat schedule](http://docs.celeryproject.org/en/v2.3.3/userguide/periodic-tasks.html). + Note, that executing planner every 2 seconds doesn't mean sync is executed every 2 seconds. + Sync time depends on model sync_delay attribute value and [CLICKHOUSE_SYNC_DELAY](configuration.md#clickhouse_sync_delay) configuration parameter. + You can read more in [sync section](synchronization.md). + +You can also change other [configuration parameters](configuration.md) depending on your project. + +#### Example +```python +# django-clickhouse library setup +CLICKHOUSE_DATABASES = { + # Connection name to refer in using(...) method + 'default': { + 'db_name': 'test', + 'username': 'default', + 'password': '' + } +} +CLICKHOUSE_REDIS_CONFIG = { + 'host': '127.0.0.1', + 'port': 6379, + 'db': 8, + 'socket_timeout': 10 +} +CLICKHOUSE_CELERY_QUEUE = 'clickhouse' + +# If you have no any celerybeat tasks, define a new dictionary +# More info: http://docs.celeryproject.org/en/v2.3.3/userguide/periodic-tasks.html +from datetime import timedelta +CELERYBEAT_SCHEDULE = { + 'clickhouse_auto_sync': { + 'task': 'django_clickhouse.tasks.clickhouse_auto_sync', + 'schedule': timedelta(seconds=2), # Every 2 seconds + 'options': {'expires': 1, 'queue': CLICKHOUSE_CELERY_QUEUE} + } +} +``` + +## Adopting django model +Read [ClickHouseSyncModel](models.md#djangomodel) section. +Inherit all [django models](https://docs.djangoproject.com/en/3.0/topics/db/models/) + you want to sync with ClickHouse from `django_clickhouse.models.ClickHouseSyncModel` or sync mixins. + +```python +from django_clickhouse.models import ClickHouseSyncModel +from django.db import models + +class User(ClickHouseSyncModel): + first_name = models.CharField(max_length=50) + visits = models.IntegerField(default=0) + birthday = models.DateField() +``` + +## Create ClickHouseModel +1. Read [ClickHouseModel section](models.md#clickhousemodel) +2. Create `clickhouse_models.py` in your django app. +3. Add `ClickHouseModel` class there: +```python +from django_clickhouse.clickhouse_models import ClickHouseModel +from django_clickhouse.engines import MergeTree +from infi.clickhouse_orm import fields +from my_app.models import User + +class ClickHouseUser(ClickHouseModel): + django_model = User + + id = fields.UInt32Field() + first_name = fields.StringField() + birthday = fields.DateField() + visits = fields.UInt32Field(default=0) + + engine = MergeTree('birthday', ('birthday',)) +``` + +## Migration to create table in ClickHouse +1. Read [migrations](migrations.md) section +2. Create `clickhouse_migrations` package in your django app +3. Create `0001_initial.py` file inside the created package. Result structure should be: + ``` + my_app + >> clickhouse_migrations + >>>> __init__.py + >>>> 0001_initial.py + >> clickhouse_models.py + >> models.py + ``` + +4. Add content to file `0001_initial.py`: + ```python + from django_clickhouse import migrations + from my_app.cilckhouse_models import ClickHouseUser + + class Migration(migrations.Migration): + operations = [ + migrations.CreateTable(ClickHouseUser) + ] + ``` + +## Run migrations +Call [django migrate](https://docs.djangoproject.com/en/3.0/ref/django-admin/#django-admin-migrate) + to apply created migration and create table in ClickHouse. + +## Set up and run celery sync process +Set up [celery worker](https://docs.celeryproject.org/en/latest/userguide/workers.html#starting-the-worker) for [CLICKHOUSE_CELERY_QUEUE](configuration.md#clickhouse_celery_queue) and [celerybeat](https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#starting-the-scheduler). + +## Test sync and write analytics queries +1. Read [monitoring section](monitoring.md) in order to set up your monitoring system. +2. Read [query section](queries.md) to understand how to query database. +2. Create some data in source table with django. +3. Check, if it is synced. + +#### Example +```python +import time +from my_app.models import User +from my_app.clickhouse_models import ClickHouseUser + +u = User.objects.create(first_name='Alice', birthday=datetime.date(1987, 1, 1), visits=1) + +# Wait for celery task is executed at list once +time.sleep(6) + +assert ClickHouseUser.objects.filter(id=u.id).count() == 1, "Sync is not working" +``` + +## Congratulations +Tune your integration to achieve better performance if needed: [docs](performance.md). diff --git a/docs/performance.md b/docs/performance.md new file mode 100644 index 0000000..1efa1aa --- /dev/null +++ b/docs/performance.md @@ -0,0 +1,46 @@ +# Sync performance +Every real life system may have its own performance problems. +They depend on: +* You ClickHouse servers configuration +* Number of ClickHouse instances in your cluster +* Your data formats +* Import speed +* Network +* etc + +I recommend to use [monitoring](monitoring.md) in order to understand where is the bottle neck and act accordingly. + +This chapter gives a list of known problems which can slow down your import. + +## ClickHouse tuning +Read this [doc](https://clickhouse.tech/docs/en/introduction/performance/#performance-when-inserting-data) + and tune it both for read and write. + +## ClickHouse cluster +As ClickHouse is a [multimaster database](https://clickhouse.tech/docs/en/introduction/distinctive_features/#data-replication-and-data-integrity-support), + you can import and read from any node when you have a cluster. +In order to read and import to multiple nodes you can use [CHProxy](https://github.com/Vertamedia/chproxy) +or add multiple databases to [routing configuration](routing.md#clickhousemodel-routing-attributes). + +## CollapsingMergeTree engine and previous versions +In order to reduce number of stored data in [intermediate storage](storages.md), + this library doesn't store old versions of data on update or delete. + Another point is that getting previous data versions from relational storages is a hard operation. +Engines like `CollapsingMergeTree` get old versions from ClickHouse: +1. Using `version_col` if it is set in engine's parameters. + This is a special field which stores incremental row versions and is filled by the library. + It should be of any unsigned integer type (depending on how many row versions you may have). +2. Using `FINAL` query modification. + This way is much more slow, but doesn't require additional column. + +## Know your data +In common case library user uses python types to form ClickHouse data. +Library is responsible for converting this data into format ClickHouse expects to receive. +This leads to great number of convert operations when you import data in big batches. +In order to reduce this time, you can: +* Set `MyClickHouseModel.sync_formatted_tuples` to True +* Override `MyClickHouseModel.get_insert_batch(, import_objects: Iterable[DjangoModel])` method: + It should get `cls.get_tuple_class()` and yield (it is a [generator](https://wiki.python.org/moin/Generators)) + so it generates tuples of string values, already prepared to insert into ClickHouse. + **Important note**: `ClickHouseModel.get_insert_batch(...)` can perform additional functionality depending on model [engine](models.md#engines). + Be careful. diff --git a/docs/queries.md b/docs/queries.md new file mode 100644 index 0000000..3f90613 --- /dev/null +++ b/docs/queries.md @@ -0,0 +1,66 @@ +# Making queries + +QuerySet system used by this library looks very similar to django, but it is implemented separately. +You can read reasons for this design [here](motivation.md#separate-from-django-database-setting-queryset-and-migration-system). + +## Usage +Library query system extends [infi.clickhouse-orm QuerySet system](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/querysets.md) and supports all it features. +In most cases you have no need to create querysets explicitly - just use `objects` attribute or `objects_in(db)` method of `ClickHouseModel`. +At the same time `django-clickhouse` adds some extra features to `QuerySet` and `AggregateQuerySet`. +They are available if your model inherits `django_clickhouse.clickhouse_models.ClickHouseModel`. + +## Extra features +### Django-like routing system +There's no need to set database object explicitly with `objects_in(...)` method, as original QuerySet expects. +Database is determined based on library configuration and [router](routing.md#router) used. + +If you want to set database explicitly you can use any of approaches: +* [infi approach](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/querysets.md#querysets) +* Django like `QuerySet.using(db_alias)` method + +Example: +```python +from django_clickhouse.database import connections +from my_app.clickhouse_models import ClickHouseUser + +# This query will choose database using current router. +# By default django_clickhouse.routers.DefaultRouter is used. +# It gets one random database, from ClickHouseUser.read_db_aliases for read queries +ClickHouseUser.objects.filter(id__in=[1,2,3]).count() + +# These queries do the same thing, using 'secondary' connection from CLICKHOUSE_DATABASES setting +ClickHouseUser.objects_in(connections['secondary']).filter(id__in=[1,2,3]).count() +ClickHouseUser.objects.filter(id__in=[1,2,3]).using('secondary').count() + +# You can get database to use with get_database(for_write: bool = False) method +# Note that it if you have multiple database in model settings, +# DefaultRouter can return any of them each time function is called, function is stateless +ClickHouseUser.objects.get_database(for_write=False) +``` + +### QuerySet create methods +This library adds methods to add objects like django does without direct Database object usage. + +Example: +```python +from datetime import date +from my_app.clickhouse_models import ClickHouseUser + +# This queries will choose database using current router. +# By default django_clickhouse.routers.DefaultRouter is used. +# It gets one random database, from ClickHouseUser.write_db_aliases for write queries +# You can set database explicitly with using(...) or objects_in(...) methods +instance = ClickHouseUser.objects.create(id=1, first_name='Alice', visits=1, birthday=date(2003, 6, 1)) +objs = ClickHouseUser.objects.bulk_create([ + ClickHouseUser(id=2, first_name='Bob', visits=2, birthday=date(2001, 5, 1)), + ClickHouseUser(id=3, first_name='Jhon', visits=3, birthday=date(2002, 7, 11)) +], batch_size=10) +``` + +### Getting all objects +`QuerySet.all()` method returns copy of current QuerySet: +```python +from my_app.clickhouse_models import ClickHouseUser + +qs = ClickHouseUser.objects.all() +``` \ No newline at end of file diff --git a/docs/routing.md b/docs/routing.md new file mode 100644 index 0000000..f992f57 --- /dev/null +++ b/docs/routing.md @@ -0,0 +1,62 @@ +# Database routing +One of this libraries goals was to create easy and extendable automatic database routing. + +## Motivation +In original [infi.clickhouse-orm](https://github.com/Infinidat/infi.clickhouse_orm) + you had to explicitly create [Database](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/models_and_databases.md#inserting-to-the-database) objects + and set database to each query with `objects_in(db)` method. + But common projects use a quite little number of database connections. + As a result, it's easier to setup routing once and use it as [django](https://docs.djangoproject.com/en/2.2/topics/db/multi-db/) does. +Unlike traditional relational databases, [ClickHouse](https://clickhouse.yandex/docs/en/) + has per table replication. + This means that: + 1) Each model can have it's own replication scheme + 2) Some migration queries are replicated automatically, others - not. + 3) To make system more extendable we need default routing, per model routing and router class for complex cases. + +## Introduction +All database connections are defined in [CLICKHOUSE_DATABASES](configuration.md#clickhouse_databases) setting. + Each connection has it's alias name to refer with. + If no routing is configured, [CLICKHOUSE_DEFAULT_DB_ALIAS](configuration.md#clickhouse_default_db_alias) is used. + +## Router +Router is a class, defining 3 methods: +* `def db_for_read(self, model: ClickHouseModel, **hints) -> str` + Returns `database alias` to use for given `model` for `SELECT` queries. +* `def db_for_write(self, model: ClickHouseModel, **hints) -> str` + Returns `database alias` to use for given `model` for `INSERT` queries. +* `def allow_migrate(self, db_alias: str, app_label: str, operation: Operation, model: Optional[ClickHouseModel] = None, **hints: dict) -> bool` + Checks if migration `operation` should be applied in django application `app_label` on database `db_alias`. + Optional `model` field can be used to determine migrations on concrete model. + +By default [CLICKHOUSE_DATABASE_ROUTER](configuration.md#clickhouse_database_router) is used. + It gets routing information from model fields, described below. + +## ClickHouseModel routing attributes +Default database router reads routing settings from model attributes. +```python +from django_clickhouse.configuration import config +from django_clickhouse.clickhouse_models import ClickHouseModel + +class MyModel(ClickHouseModel): + # Servers, model is replicated to. + # Router takes random database to read or write from. + read_db_aliases = (config.DEFAULT_DB_ALIAS,) + write_db_aliases = (config.DEFAULT_DB_ALIAS,) + + # Databases to perform replicated migration queries, such as ALTER TABLE. + # Migration is applied to random database from the list. + migrate_replicated_db_aliases = (config.DEFAULT_DB_ALIAS,) + + # Databases to perform non-replicated migrations (CREATE TABLE, DROP TABLE). + # Migration is applied to all databases from the list. + migrate_non_replicated_db_aliases = (config.DEFAULT_DB_ALIAS,) + ``` + +## Settings database in QuerySet +Database can be set in each [QuerySet](queries.md) explicitly by using one of methods: +* With [infi approach](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/querysets.md#querysets): `MyModel.objects_in(db_object).filter(id__in=[1,2,3]).count()` +* With `using()` method: `MyModel.objects.filter(id__in=[1,2,3]).using(db_alias).count()` + +If no explicit database is provided, database connection to use is determined lazily with router's `db_for_read` or `db_for_write` + method, depending on query type. \ No newline at end of file diff --git a/docs/storages.md b/docs/storages.md new file mode 100644 index 0000000..e8e1f1d --- /dev/null +++ b/docs/storages.md @@ -0,0 +1,70 @@ +# Storages +Storage class is a facade, that stores information about operations, which where performed on django models. +It has three main purposes: +* Storage should be fast to insert single records. It forms a batch of data, which is then inserted to ClickHouse. +* Storage guarantees, that no data is lost. + Intermediate data in storage is deleted only after importing batch finishes successfully. + If it fails in some point - starting new import process should import failed data again. +* Keep information about sync process. For instance, last time the model sync has been called. + +In order to determine different models from each other storage uses `import_key`. +By default, it is generated by `ClickHouseModel.get_import_key()` method and is equal to class name. + +Each method of abstract `Storage` class takes `kwargs` parameters, which can be used in concrete storage. + +## Storage methods +* `register_operations(import_key: str, operation: str, *pks: *Any) -> int` + Saves a new operation in source database to storage. This method should be fast. + It is called after source database transaction is committed. + Method returns number of operations registered. + `operation` is one of `insert`, `update` or `delete` + `pks` is an iterable of strings, enough to select needed records from source database. + +* `get_last_sync_time(import_key: str) -> Optional[datetime.datetime]` + Returns last time, a model sync has been called. If no sync has been done, returns None. + +* `set_last_sync_time(import_key: str, dt: datetime.datetime) -> None` + Saves datetime, when a sync process has been called last time. + +* `register_operations_wrapped(self, import_key: str, operation: str, *pks: *Any) -> int` + A wrapper for register_operations. It's goal is to write metrics and logs. + +* `pre_sync(import_key: str, **kwargs) -> None` + Called before import process starts. It initializes storage for importing new batch. + +* `operations_count(import_key: str, **kwargs) -> int` + Counts, how many operations are waiting for import in storage. + +* `get_operations(import_key: str, count: int, **kwargs) -> List[Tuple[str, str]]` + Returns a next batch of operations to import. `count` parameter gives a number of operations to return. + Operation is a tuple `(operation, primary_key)`, where `operation` is one of insert, update or delete + and `primary_key` is a string enough to select record from source database. + +* `post_sync(import_key: str, **kwargs) -> None` + Called after import process have finished. It cleans storage after importing a batch. + +* `post_batch_removed(import_key: str, batch_size: int) -> None` + This method should be called by `post_sync` method after data is removed from storage. + By default, it marks queue size metric. + +* `post_sync_failed(import_key: str, exception: Exception, **kwargs) -> None:` + Called if any exception has occurred during import process. It cleans storage after unsuccessful import. + Note that if import process is hardly killed (with OOM killer, for instance) this method is not called. + +* `flush() -> None` + *Dangerous*. Drops all data, kept by storage. It is used for cleaning up between tests. + + +## Predefined storages +### RedisStorage +This storage uses [Redis database](https://redis.io/) as intermediate storage. +To communicate with Redis it uses [redis-py](https://redis-py.readthedocs.io/en/latest/) library. +It is not required, but should be installed to use RedisStorage. +In order to use RedisStorage you must also fill [CLICKHOUSE_REDIS_CONFIG](configuration.md#clickhouse_redis_config) parameter. + +Stored operation contains: +* Django database alias where original record can be found. +* Record primary key +* Operation performed (insert, update, delete) + +This storage does not allow multi-threaded sync. diff --git a/docs/synchronization.md b/docs/synchronization.md new file mode 100644 index 0000000..d147232 --- /dev/null +++ b/docs/synchronization.md @@ -0,0 +1,105 @@ +# Synchronization + +## Design motivation +Read [here](motivation.md#sync-over-intermediate-storage). + + +## Algorithm + +1. [Celery beat](https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html) schedules `django_clickhouse.tasks.clickhouse_auto_sync` task every second or near. +2. [Celery workers](https://docs.celeryproject.org/en/latest/userguide/workers.html) execute `clickhouse_auto_sync`. + It searches for `ClickHouseModel` subclasses which need sync (if `Model.need_sync()` method returns `True`). +2. `django_clickhouse.tasks.sync_clickhouse_model` task is scheduled for each `ClickHouseModel` which needs sync. +3. `sync_clickhouse_model` saves sync start time in [storage](storages.md) and calls `ClickHouseModel.sync_batch_from_storage()` method. +4. `ClickHouseModel.sync_batch_from_storage()`: + * Gets [storage](storages.md) model works with using `ClickHouseModel.get_storage()` method + * Calls `Storage.pre_sync(import_key)` for model [storage](storages.md). + This may be used to prevent parallel execution with locks or some other operations. + * Gets a list of operations to sync from [storage](storages.md). + * Fetches objects from relational database calling `ClickHouseModel.get_sync_objects(operations)` method. + * Forms a batch of tuples to insert into ClickHouse using `ClickHouseModel.get_insert_batch(import_objects)` method. + * Inserts batch of tuples into ClickHouse using `ClickHouseModel.insert_batch(batch)` method. + * Calls `Storage.post_sync(import_key)` method to clean up storage after syncing batch. + This method also removes synced operations from storage. + * If some exception occurred during execution, `Storage.post_sybc_failed(import_key)` method is called. + Note, that process can be killed without exception, for instance by OOM killer. + And this method will not be called. + + +## Configuration +Sync configuration can be set globally using django settings.py parameters or redeclared for each `ClickHouseModel` class. +`ClickHouseModel` configuration is prior to settings configuration. + +### Settings configuration +* [CLICKHOUSE_CELERY_QUEUE](configuration.md#clickhouse_celery_queue) +Defaults to: `'celery'` +A name of a queue, used by celery to plan library sync tasks. + +* [CLICKHOUSE_SYNC_STORAGE](configuration.md#clickhouse_sync_storage) +Defaults to: `'django_clickhouse.storages.RedisStorage'` +An [intermediate storage](storages.md) class to use. Can be a string or class. + +* [CLICKHOUSE_SYNC_BATCH_SIZE](configuration.md#clickhouse_sync_storage) +Defaults to: `10000` +Maximum number of operations, fetched by sync process from [intermediate storage](storages.md) per sync round. + +* [CLICKHOUSE_SYNC_DELAY](configuration.md#clickhouse_sync_storage) +Defaults to: `5` +A delay in seconds between two sync rounds start. + +### ClickHouseModel configuration +Each `ClickHouseModel` subclass can define sync arguments and methods: +* `django_model: django.db.models.Model` +Required. +Django model this ClickHouseModel class is synchronized with. + +* `django_model_serializer: django.db.models.Model` +Defaults to: `django_clickhouse.serializers.Django2ClickHouseModelSerializer` +[Serializer class](models.md#serializers) to convert DjangoModel to ClickHouseModel. + +* `sync_enabled: bool` +Defaults to: `False`. +Is sync for this model enabled? + +* `sync_batch_size: int` +Defaults to: [CLICKHOUSE_SYNC_BATCH_SIZE](configuration.md#clickhouse_sync_storage) +Maximum number of operations, fetched by sync process from [storage](storages.md) per sync round. + +* `sync_delay: float` +Defaults to: [CLICKHOUSE_SYNC_DELAY](configuration.md#clickhouse_sync_storage) +A delay in seconds between two sync rounds start. + +* `sync_storage: Union[str, Storage]` +Defaults to: [CLICKHOUSE_SYNC_STORAGE](configuration.md#clickhouse_sync_storage) +An [intermediate storage](storages.md) class to use. Can be a string or class. + +Example: +```python +from django_clickhouse.clickhouse_models import ClickHouseModel +from django_clickhouse.engines import ReplacingMergeTree +from infi.clickhouse_orm import fields +from my_app.models import User + +class ClickHouseUser(ClickHouseModel): + django_model = User + sync_enabled = True + sync_delay = 5 + sync_batch_size = 1000 + + id = fields.UInt32Field() + first_name = fields.StringField() + birthday = fields.DateField() + visits = fields.UInt32Field(default=0) + + engine = ReplacingMergeTree('birthday', ('birthday',)) +``` + + +## Fail resistance +Fail resistance is based on several points: +1. [Storage](storages.md) should not loose data in any case. It's not this library goal to keep it stable. +2. Data is removed from [storage](storages.md) only if import succeeds. Otherwise import attempt is repeated. +3. It's recommended to use ReplacingMergeTree or CollapsingMergeTree [engines](models.md#engines) + instead of simple MergeTree, so it removes duplicates if batch is imported twice. +4. Each `ClickHouseModel` is synced in separate process. + If one model fails, it should not affect other models. diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..e4635ac --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,11 @@ +Django (>=1.7) +pytz +six +typing +psycopg2 +infi.clickhouse-orm +celery +statsd +django-pg-returning +django-pg-bulk-update +redis \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 1283c62..208c860 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,5 +5,4 @@ typing psycopg2 infi.clickhouse-orm celery -statsd -django-pg-returning \ No newline at end of file +statsd \ No newline at end of file diff --git a/setup.py b/setup.py index c20a379..102de34 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ with open('requirements.txt') as f: setup( name='django-clickhouse', - version='0.0.1', + version='1.0.0', packages=['django_clickhouse'], package_dir={'': 'src'}, url='https://github.com/carrotquest/django-clickhouse', diff --git a/src/django_clickhouse/clickhouse_models.py b/src/django_clickhouse/clickhouse_models.py index 1b34c8d..343df18 100644 --- a/src/django_clickhouse/clickhouse_models.py +++ b/src/django_clickhouse/clickhouse_models.py @@ -48,9 +48,17 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): django_model = None django_model_serializer = Django2ClickHouseModelSerializer + # Servers, model is replicated to. + # Router takes random database to read or write from. read_db_aliases = (config.DEFAULT_DB_ALIAS,) write_db_aliases = (config.DEFAULT_DB_ALIAS,) + + # Databases to perform replicated migration queries, such as ALTER TABLE. + # Migration is applied to random database from the list. migrate_replicated_db_aliases = (config.DEFAULT_DB_ALIAS,) + + # Databases to perform non-replicated migrations (CREATE TABLE, DROP TABLE). + # Migration is applied to all databases from the list. migrate_non_replicated_db_aliases = (config.DEFAULT_DB_ALIAS,) sync_enabled = False @@ -86,12 +94,11 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): return namedtuple("%sTuple" % cls.__name__, field_names, defaults=default_values) @classmethod - def objects_in(cls, database): # type: (Database) -> QuerySet + def objects_in(cls, database: Database)-> QuerySet: return QuerySet(cls, database) @classmethod - def get_database_alias(cls, for_write=False): - # type: (bool) -> str + def get_database_alias(cls, for_write: bool = False) -> str: """ Gets database alias for read or write purposes :param for_write: Boolean flag if database is neede for read or for write @@ -104,8 +111,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): return db_router.db_for_read(cls) @classmethod - def get_database(cls, for_write=False): - # type: (bool) -> Database + def get_database(cls, for_write: bool = False) -> Database: """ Gets database alias for read or write purposes :param for_write: Boolean flag if database is neede for read or for write @@ -115,8 +121,8 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): return connections[db_alias] @classmethod - def get_django_model_serializer(cls, writable=False, defaults=None): - # type: (bool, Optional[dict]) -> Django2ClickHouseModelSerializer + def get_django_model_serializer(cls, writable: bool= False, defaults: Optional[dict] = None + ) -> Django2ClickHouseModelSerializer: serializer_cls = lazy_class_import(cls.django_model_serializer) return serializer_cls(cls, writable=writable, defaults=defaults) @@ -163,7 +169,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): return True @classmethod - def get_sync_query_set(cls, using, pk_set): # type: (str, Set[Any]) -> DjangoQuerySet + def get_sync_query_set(cls, using: str, pk_set: Set[Any]) -> DjangoQuerySet: """ Forms django queryset to fetch for sync :param using: Database to fetch from @@ -173,7 +179,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): return cls.django_model.objects.filter(pk__in=pk_set).using(using) @classmethod - def get_sync_objects(cls, operations): # type: (List[Tuple[str, str]]) -> List[DjangoModel] + def get_sync_objects(cls, operations: List[Tuple[str, str]]) -> List[DjangoModel]: """ Returns objects from main database to sync :param operations: A list of operations to perform @@ -195,7 +201,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): return list(chain(*objs)) @classmethod - def get_insert_batch(cls, import_objects): # type: (Iterable[DjangoModel]) -> List[ClickHouseModel] + def get_insert_batch(cls, import_objects: Iterable[DjangoModel]) -> List['ClickHouseModel']: """ Formats django model objects to batch of ClickHouse objects :param import_objects: DjangoModel objects to import @@ -259,7 +265,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): raise ex @classmethod - def need_sync(cls): # type: () -> bool + def need_sync(cls) -> bool: """ Checks if this model needs synchronization: sync is enabled and delay has passed :return: Boolean diff --git a/src/django_clickhouse/compatibility.py b/src/django_clickhouse/compatibility.py index 940d398..2354d7a 100644 --- a/src/django_clickhouse/compatibility.py +++ b/src/django_clickhouse/compatibility.py @@ -1,5 +1,9 @@ import sys from collections import namedtuple as basenamedtuple +from typing import Any, Set + +from django.db import transaction, connections +from django.db.models import QuerySet def namedtuple(*args, **kwargs): @@ -16,3 +20,36 @@ def namedtuple(*args, **kwargs): return TupleClass else: return basenamedtuple(*args, **kwargs) + + +def django_pg_returning_available(using: str) -> bool: + """ + Checks if django-pg-returning library is installed and can be used with given databse + :return: Boolean + """ + try: + import django_pg_returning + return connections[using].vendor == 'postgresql' + except ImportError: + return False + + +def update_returning_pk(qs: QuerySet, updates: dict) -> Set[Any]: + """ + Updates QuerySet items returning primary key values. + This method should not depend on database engine, though can have optimization performances for some engines. + :param qs: QuerySet to update + :param updates: Update items as passed to QuerySet.update(**updates) method + :return: A set of primary keys + """ + qs._for_write = True + if django_pg_returning_available(qs.db) and hasattr(qs, 'update_returning'): + pk_name = qs.model._meta.pk.name + qs = qs.only(pk_name).update_returning(**updates) + pks = set(qs.values_list(pk_name, flat=True)) + else: + with transaction.atomic(using=qs.db): + pks = set(qs.select_for_update().values_list('pk', flat=True)) + QuerySet.update(qs, **updates) + + return pks diff --git a/src/django_clickhouse/configuration.py b/src/django_clickhouse/configuration.py index 3467eb3..2c44872 100644 --- a/src/django_clickhouse/configuration.py +++ b/src/django_clickhouse/configuration.py @@ -28,7 +28,7 @@ DEFAULTS = { class Config: - def __getattr__(self, item): # type: (str) -> Any + def __getattr__(self, item: str) -> Any: if item not in DEFAULTS: raise AttributeError('Unknown config parameter `%s`' % item) diff --git a/src/django_clickhouse/database.py b/src/django_clickhouse/database.py index 6abc516..516d533 100644 --- a/src/django_clickhouse/database.py +++ b/src/django_clickhouse/database.py @@ -35,8 +35,8 @@ class Database(InfiDatabase): def _get_applied_migrations(self, migrations_package_name): raise NotImplementedError("This method is not supported by django_clickhouse.") - def select_tuples(self, query, model_class, settings=None): - # type: (str, Type['ClickHouseModel'], Optional[dict], Optional[dict]) -> Generator[tuple] + def select_tuples(self, query: str, model_class: Type['ClickHouseModel'], settings: Optional[dict] = None + ) -> Iterable[tuple]: """ This method selects model_class namedtuples, instead of class instances. Less memory consumption, greater speed @@ -67,11 +67,11 @@ class Database(InfiDatabase): yield item - def insert_tuples(self, model_class, model_tuples, batch_size=None, formatted=False): - # type: (Type['ClickHouseModel'], Iterable[tuple], Optional[int], bool) -> None + def insert_tuples(self, model_class: Type['ClickHouseModel'], model_tuples: Iterable[tuple], + batch_size: Optional[int] = None, formatted: bool = False) -> None: """ Inserts model_class namedtuples - :param model_class: Clickhouse model, namedtuples are made from + :param model_class: ClickHouse model, namedtuples are made from :param model_tuples: An iterable of tuples to insert :param batch_size: Size of batch :param formatted: If flag is set, tuples are expected to be ready to insert without calling field.to_db_string diff --git a/src/django_clickhouse/engines.py b/src/django_clickhouse/engines.py index 46cf26b..55e1647 100644 --- a/src/django_clickhouse/engines.py +++ b/src/django_clickhouse/engines.py @@ -2,7 +2,7 @@ This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse """ import datetime -from typing import List, Type, Union, Iterable, Generator +from typing import List, Type, Union, Iterable, Generator, Optional from django.db.models import Model as DjangoModel from infi.clickhouse_orm import engines as infi_engines @@ -14,8 +14,7 @@ from .utils import format_datetime class InsertOnlyEngineMixin: - def get_insert_batch(self, model_cls, objects): - # type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple] + def get_insert_batch(self, model_cls: Type['ClickHouseModel'], objects: List[DjangoModel]) -> Iterable[tuple]: """ Gets a list of model_cls instances to insert into database :param model_cls: ClickHouseModel subclass to import @@ -69,8 +68,8 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre max_date=max_date, object_pks=','.join(object_pks)) return connections[db_alias].select_tuples(query, model_cls) - def get_final_versions(self, model_cls, objects, date_col=None): - # type: (Type['ClickHouseModel'], Iterable[DjangoModel], str) -> Generator[tuple] + def get_final_versions(self, model_cls: Type['ClickHouseModel'], objects: Iterable[DjangoModel], + date_col: Optional[str] = None) -> Iterable[tuple]: """ Get objects, that are currently stored in ClickHouse. Depending on the partition key this can be different for different models. @@ -82,7 +81,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre :return: A generator of named tuples, representing previous state """ - def _dt_to_str(dt): # type: (Union[datetime.date, datetime.datetime]) -> str + def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str: if isinstance(dt, datetime.datetime): return format_datetime(dt, 0, db_alias=db_alias) elif isinstance(dt, datetime.date): @@ -123,8 +122,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre else: return self._get_final_versions_by_final(*params) - def get_insert_batch(self, model_cls, objects): - # type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple] + def get_insert_batch(self, model_cls: Type['ClickHouseModel'], objects: List[DjangoModel]) -> Iterable[tuple]: """ Gets a list of model_cls instances to insert into database :param model_cls: ClickHouseModel subclass to import diff --git a/src/django_clickhouse/migrations.py b/src/django_clickhouse/migrations.py index 5227baa..62ccd18 100644 --- a/src/django_clickhouse/migrations.py +++ b/src/django_clickhouse/migrations.py @@ -23,7 +23,7 @@ class Migration: """ operations = [] - def apply(self, db_alias, database=None): # type: (str, Optional[Database]) -> None + def apply(self, db_alias: str, database: Optional[Database] = None) -> None: """ Applies migration to given database :param db_alias: Database alias to apply migration to @@ -37,12 +37,11 @@ class Migration: model_class = getattr(op, 'model_class', None) hints = getattr(op, 'hints', {}) - if db_router.allow_migrate(db_alias, self.__module__, op, model=model_class, **hints): + if db_router.allow_migrate(db_alias, self.__module__, op, model_class, **hints): op.apply(database) -def migrate_app(app_label, db_alias, up_to=9999, database=None): - # type: (str, str, int, Optional[Database]) -> None +def migrate_app(app_label: str, db_alias: str, up_to: int = 9999, database: Optional[Database] = None) -> None: """ Migrates given django app :param app_label: App label to migrate @@ -110,7 +109,7 @@ class MigrationHistory(ClickHouseModel): engine = MergeTree('applied', ('db_alias', 'package_name', 'module_name')) @classmethod - def set_migration_applied(cls, db_alias, migrations_package, name): # type: (str, str, str) -> None + def set_migration_applied(cls, db_alias: str, migrations_package: str, name: str) -> None: """ Sets migration apply status :param db_alias: Database alias migration is applied to @@ -126,7 +125,7 @@ class MigrationHistory(ClickHouseModel): applied=datetime.date.today()) @classmethod - def get_applied_migrations(cls, db_alias, migrations_package): # type: (str, str) -> Set[str] + def get_applied_migrations(cls, db_alias: str, migrations_package: str) -> Set[str]: """ Returns applied migrations names :param db_alias: Database alias, to check diff --git a/src/django_clickhouse/models.py b/src/django_clickhouse/models.py index 2d47760..7342719 100644 --- a/src/django_clickhouse/models.py +++ b/src/django_clickhouse/models.py @@ -7,18 +7,17 @@ from typing import Optional, Any, Type, Set import six from django.db import transaction -from django.db.models import Manager as DjangoManager +from django.db.models import QuerySet as DjangoQuerySet, Model as DjangoModel, Manager as DjangoManager from django.db.models.manager import BaseManager from django.db.models.signals import post_save, post_delete from django.dispatch import receiver -from django.db.models import QuerySet as DjangoQuerySet, Model as DjangoModel from statsd.defaults.django import statsd +from .compatibility import update_returning_pk from .configuration import config from .storages import Storage from .utils import lazy_class_import - try: from django_pg_returning.manager import UpdateReturningMixin except ImportError: @@ -34,9 +33,9 @@ except ImportError: class ClickHouseSyncRegisterMixin: - def _register_ops(self, operation, result): + def _register_ops(self, operation, result, as_int: bool = False): pk_name = self.model._meta.pk.name - pk_list = [getattr(item, pk_name) for item in result] + pk_list = [getattr(item, pk_name) if isinstance(item, DjangoModel) else item for item in result] self.model.register_clickhouse_operations(operation, *pk_list, using=self.db) @@ -72,35 +71,51 @@ class ClickHouseSyncBulkUpdateQuerySetMixin(ClickHouseSyncRegisterMixin, BulkUpd return returning - def bulk_update(self, *args, **kwargs): + def _decorate_method(self, name: str, operation: str, args, kwargs): + if not hasattr(super(), name): + raise AttributeError("QuerySet has no attribute %s. Is django-pg-bulk-update library installed?" % name) + + func = getattr(super(), name) original_returning = kwargs.pop('returning', None) kwargs['returning'] = self._update_returning_param(original_returning) - result = super().bulk_update(*args, **kwargs) - self._register_ops('update', result) + result = func(*args, **kwargs) + self._register_ops(operation, result) return result.count() if original_returning is None else result - def bulk_update_or_create(self, *args, **kwargs): - original_returning = kwargs.pop('returning', None) - kwargs['returning'] = self._update_returning_param(original_returning) - result = super().bulk_update_or_create(*args, **kwargs) - self._register_ops('update', result) - return result.count() if original_returning is None else result + def pg_bulk_update(self, *args, **kwargs): + return self._decorate_method('pg_bulk_update', 'update', args, kwargs) + + def pg_bulk_update_or_create(self, *args, **kwargs): + return self._decorate_method('pg_bulk_update_or_create', 'update', args, kwargs) + + def pg_bulk_create(self, *args, **kwargs): + return self._decorate_method('pg_bulk_create', 'insert', args, kwargs) class ClickHouseSyncQuerySetMixin(ClickHouseSyncRegisterMixin): def update(self, **kwargs): - # BUG I use update_returning method here. But it is not suitable for databases other then PostgreSQL - # and requires django-pg-update-returning installed - pk_name = self.model._meta.pk.name - res = self.only(pk_name).update_returning(**kwargs) - self._register_ops('update', res) - return len(res) + pks = update_returning_pk(self, kwargs) + self._register_ops('update', pks) + return len(pks) def bulk_create(self, objs, batch_size=None): objs = super().bulk_create(objs, batch_size=batch_size) self._register_ops('insert', objs) return objs + def bulk_update(self, objs, *args, **kwargs): + objs = list(objs) + + # No need to register anything, if there are no objects. + # If objects are not models, django-pg-bulk-update method is called and pg_bulk_update will register items + if len(objs) == 0 or not isinstance(objs[0], DjangoModel): + return super().bulk_update(objs, *args, **kwargs) + + # native django bulk_update requires each object to have a primary key + res = super().bulk_update(objs, *args, **kwargs) + self._register_ops('update', objs) + return res + # I add library dependant mixins to base classes only if libraries are installed qs_bases = [ClickHouseSyncQuerySetMixin] @@ -131,7 +146,7 @@ class ClickHouseSyncModel(DjangoModel): abstract = True @classmethod - def get_clickhouse_storage(cls): # type: () -> Storage + def get_clickhouse_storage(cls) -> Storage: """ Returns Storage instance to save clickhouse sync data to :return: @@ -140,8 +155,7 @@ class ClickHouseSyncModel(DjangoModel): return storage_cls() @classmethod - def register_clickhouse_sync_model(cls, model_cls): - # type: (Type['django_clickhouse.clickhouse_models.ClickHouseModel']) -> None + def register_clickhouse_sync_model(cls, model_cls: Type['ClickHouseModel']) -> None: """ Registers ClickHouse model to listen to this model updates :param model_cls: Model class to register @@ -153,7 +167,7 @@ class ClickHouseSyncModel(DjangoModel): cls._clickhouse_sync_models.add(model_cls) @classmethod - def get_clickhouse_sync_models(cls): # type: () -> Set['django_clickhouse.clickhouse_models.ClickHouseModel'] + def get_clickhouse_sync_models(cls) -> Set['ClickHouseModel']: """ Returns all clickhouse models, listening to this class :return: A set of model classes to sync @@ -161,8 +175,7 @@ class ClickHouseSyncModel(DjangoModel): return getattr(cls, '_clickhouse_sync_models', set()) @classmethod - def register_clickhouse_operations(cls, operation, *model_pks, using=None): - # type: (str, *Any, Optional[str]) -> None + def register_clickhouse_operations(cls, operation: str, *model_pks: Any, using: Optional[str] = None) -> None: """ Registers model operation in storage :param operation: Operation type - one of [insert, update, delete) @@ -170,7 +183,7 @@ class ClickHouseSyncModel(DjangoModel): :param using: Database alias registered instances are from :return: None """ - model_pks = ['%s.%d' % (using or config.DEFAULT_DB_ALIAS, pk) for pk in model_pks] + model_pks = ['%s.%s' % (using or config.DEFAULT_DB_ALIAS, pk) for pk in model_pks] def _on_commit(): for model_cls in cls.get_clickhouse_sync_models(): @@ -181,16 +194,16 @@ class ClickHouseSyncModel(DjangoModel): storage = cls.get_clickhouse_storage() transaction.on_commit(_on_commit, using=using) - def post_save(self, created, using=None): # type: (bool, Optional[str]) -> None + def post_save(self, created: bool, using: Optional[str] = None) -> None: self.register_clickhouse_operations('insert' if created else 'update', self.pk, using=using) - def post_delete(self, using=None): # type: (Optional[str]) -> None + def post_delete(self, using: Optional[str] = None) -> None: self.register_clickhouse_operations('delete', self.pk, using=using) @receiver(post_save) def post_save(sender, instance, **kwargs): - statsd.incr('clickhouse.sync.post_save'.format('post_save'), 1) + statsd.incr('%s.sync.post_save' % config.STATSD_PREFIX, 1) if issubclass(sender, ClickHouseSyncModel): instance.post_save(kwargs.get('created', False), using=kwargs.get('using')) diff --git a/src/django_clickhouse/query.py b/src/django_clickhouse/query.py index 9cccc40..c31ba91 100644 --- a/src/django_clickhouse/query.py +++ b/src/django_clickhouse/query.py @@ -1,4 +1,4 @@ -from typing import Optional, Iterable, List +from typing import Optional, Iterable, List, Type from copy import copy from infi.clickhouse_orm.database import Database @@ -13,22 +13,22 @@ class QuerySet(InfiQuerySet): Basic QuerySet to use """ - def __init__(self, model_cls, database=None): # type: (Type[InfiModel], Optional[Database]) -> None + def __init__(self, model_cls: Type[InfiModel], database: Optional[Database] = None) -> None: super(QuerySet, self).__init__(model_cls, database) self._db_alias = None @property - def _database(self): # type: () -> Database + def _database(self) -> Database: # HACK for correct work of all infi.clickhouse-orm methods # There are no write QuerySet methods now, so I use for_write=False by default return self.get_database(for_write=False) @_database.setter - def _database(self, database): # type: (Database) -> None + def _database(self, database: Database) -> None: # HACK for correct work of all infi.clickhouse-orm methods self._db = database - def get_database(self, for_write=False): # type: (bool) -> Database + def get_database(self, for_write: bool = False) -> Database: """ Gets database to execute query on. Looks for constructor or using() method. If nothing was set tries to get database from model class using router. @@ -43,7 +43,7 @@ class QuerySet(InfiQuerySet): return self._db - def using(self, db_alias): # type: (str) -> QuerySet + def using(self, db_alias: str) -> 'QuerySet': """ Sets database alias to use for this query :param db_alias: Database alias name from CLICKHOUSE_DATABASES config option @@ -54,7 +54,7 @@ class QuerySet(InfiQuerySet): qs._db = None # Previous database should be forgotten return qs - def all(self): # type: () -> QuerySet + def all(self) -> 'QuerySet': """ Returns all items of queryset :return: QuerySet @@ -70,7 +70,7 @@ class QuerySet(InfiQuerySet): self.get_database(for_write=True).insert([instance]) return instance - def bulk_create(self, model_instances, batch_size=1000): # type: (Iterable[InfiModel], int) -> List[InfiModel] + def bulk_create(self, model_instances: Iterable[InfiModel], batch_size: int = 1000) -> List[InfiModel]: self.get_database(for_write=True).insert(model_instances=model_instances, batch_size=batch_size) return list(model_instances) diff --git a/src/django_clickhouse/routers.py b/src/django_clickhouse/routers.py index f56de90..9fcfe54 100644 --- a/src/django_clickhouse/routers.py +++ b/src/django_clickhouse/routers.py @@ -1,7 +1,7 @@ """ This file defines router to find appropriate database """ -from typing import Optional +from typing import Type import random import six @@ -13,8 +13,7 @@ from .utils import lazy_class_import class DefaultRouter: - def db_for_read(self, model, **hints): - # type: (ClickHouseModel, **dict) -> str + def db_for_read(self, model: Type[ClickHouseModel], **hints) -> str: """ Gets database to read from for model :param model: Model to decide for @@ -23,8 +22,7 @@ class DefaultRouter: """ return random.choice(model.read_db_aliases) - def db_for_write(self, model, **hints): - # type: (ClickHouseModel, **dict) -> str + def db_for_write(self, model: Type[ClickHouseModel], **hints) -> str: """ Gets database to write to for model :param model: Model to decide for @@ -33,8 +31,8 @@ class DefaultRouter: """ return random.choice(model.write_db_aliases) - def allow_migrate(self, db_alias, app_label, operation, model=None, **hints): - # type: (str, str, Operation, Optional[ClickHouseModel], **dict) -> bool + def allow_migrate(self, db_alias: str, app_label: str, operation: Operation, + model=None, **hints) -> bool: """ Checks if migration can be applied to given database :param db_alias: Database alias to check diff --git a/src/django_clickhouse/serializers.py b/src/django_clickhouse/serializers.py index bfb2a76..4b3907e 100644 --- a/src/django_clickhouse/serializers.py +++ b/src/django_clickhouse/serializers.py @@ -1,4 +1,4 @@ -from typing import NamedTuple +from typing import NamedTuple, Optional, Iterable, Type import pytz from django.db.models import Model as DjangoModel @@ -7,7 +7,19 @@ from .utils import model_to_dict class Django2ClickHouseModelSerializer: - def __init__(self, model_cls, fields=None, exclude_fields=None, writable=False, defaults=None): + def __init__(self, model_cls: Type['ClickHouseModel'], fields: Optional[Iterable[str]] = None, + exclude_fields: Optional[Iterable[str]] = None, writable: bool = False, + defaults: Optional[dict] = None) -> None: + """ + Initializes serializer + :param model_cls: ClickHouseModel subclass to serialize to + :param fields: Optional. A list of fields to add into result tuple + :param exclude_fields: Fields to exclude from result tuple + :param writable: If fields parameter is not set directly, + this flags determines if only writable or all fields should be taken from model_cls + :param defaults: A dictionary of field: value which are taken as default values for model_cls instances + :return: None + """ self._model_cls = model_cls if fields is not None: self.serialize_fields = fields @@ -18,7 +30,7 @@ class Django2ClickHouseModelSerializer: self._result_class = self._model_cls.get_tuple_class(defaults=defaults) self._fields = self._model_cls.fields(writable=False) - def _get_serialize_kwargs(self, obj): + def _get_serialize_kwargs(self, obj: DjangoModel) -> dict: data = model_to_dict(obj, fields=self.serialize_fields, exclude_fields=self.exclude_serialize_fields) # Remove None values, they should be initialized as defaults @@ -29,5 +41,5 @@ class Django2ClickHouseModelSerializer: return result - def serialize(self, obj): # type: (DjangoModel) -> NamedTuple + def serialize(self, obj: DjangoModel) -> NamedTuple: return self._result_class(**self._get_serialize_kwargs(obj)) diff --git a/src/django_clickhouse/storages.py b/src/django_clickhouse/storages.py index 84720a4..dcdfceb 100644 --- a/src/django_clickhouse/storages.py +++ b/src/django_clickhouse/storages.py @@ -39,7 +39,7 @@ class Storage: But ClickHouse is idempotent to duplicate inserts. So we can insert one batch twice correctly. """ - def pre_sync(self, import_key, **kwargs): # type: (str, **dict) -> None + def pre_sync(self, import_key: str, **kwargs) -> None: """ This method is called before import process starts :param import_key: A key, returned by ClickHouseModel.get_import_key() method @@ -48,7 +48,7 @@ class Storage: """ pass - def post_sync(self, import_key, **kwargs): # type: (str, **dict) -> None + def post_sync(self, import_key: str, **kwargs) -> None: """ This method is called after import process has finished. :param import_key: A key, returned by ClickHouseModel.get_import_key() method @@ -57,7 +57,7 @@ class Storage: """ pass - def post_sync_failed(self, import_key, **kwargs): # type: (str, **dict) -> None + def post_sync_failed(self, import_key: str, **kwargs) -> None: """ This method is called after import process has finished with exception. :param import_key: A key, returned by ClickHouseModel.get_import_key() method @@ -66,7 +66,7 @@ class Storage: """ pass - def post_batch_removed(self, import_key, batch_size): # type: (str, int) -> None + def post_batch_removed(self, import_key: str, batch_size: int) -> None: """ This method marks that batch has been removed in statsd :param import_key: A key, returned by ClickHouseModel.get_import_key() method @@ -76,8 +76,7 @@ class Storage: key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key) statsd.gauge(key, self.operations_count(import_key)) - def operations_count(self, import_key, **kwargs): - # type: (str, **dict) -> int + def operations_count(self, import_key: str, **kwargs) -> int: """ Returns sync queue size :param import_key: A key, returned by ClickHouseModel.get_import_key() method @@ -86,8 +85,7 @@ class Storage: """ raise NotImplemented() - def get_operations(self, import_key, count, **kwargs): - # type: (str, int, **dict) -> List[Tuple[str, str]] + def get_operations(self, import_key: str, count: int, **kwargs) -> List[Tuple[str, str]]: """ Must return a list of operations on the model. Method should be error safe - if something goes wrong, import data should not be lost. @@ -98,7 +96,7 @@ class Storage: """ raise NotImplemented() - def register_operations(self, import_key, operation, *pks): # type: (str, str, *Any) -> int + def register_operations(self, import_key: str, operation: str, *pks: Any) -> int: """ Registers new incoming operation :param import_key: A key, returned by ClickHouseModel.get_import_key() method @@ -108,8 +106,7 @@ class Storage: """ raise NotImplementedError() - def register_operations_wrapped(self, import_key, operation, *pks): - # type: (str, str, *Any) -> int + def register_operations_wrapped(self, import_key: str, operation: str, *pks: Any) -> int: """ This is a wrapper for register_operation method, checking main parameters. This method should be called from inner functions. @@ -140,14 +137,14 @@ class Storage: """ raise NotImplemented() - def get_last_sync_time(self, import_key): # type: (str) -> Optional[datetime.datetime] + def get_last_sync_time(self, import_key: str) -> Optional[datetime.datetime]: """ Gets the last time, sync has been executed :return: datetime.datetime if last sync has been. Otherwise - None. """ raise NotImplemented() - def set_last_sync_time(self, import_key, dt): # type: (str, datetime.datetime) -> None + def set_last_sync_time(self, import_key: str, dt: datetime.datetime) -> None: """ Sets successful sync time :return: None diff --git a/src/django_clickhouse/tasks.py b/src/django_clickhouse/tasks.py index 562fbdc..b23baed 100644 --- a/src/django_clickhouse/tasks.py +++ b/src/django_clickhouse/tasks.py @@ -11,14 +11,14 @@ from .utils import get_subclasses @shared_task(queue=config.CELERY_QUEUE) -def sync_clickhouse_model(cls): # type: (ClickHouseModel) -> None +def sync_clickhouse_model(model_cls) -> None: """ Syncs one batch of given ClickHouseModel - :param cls: ClickHouseModel subclass + :param model_cls: ClickHouseModel subclass :return: None """ - cls.get_storage().set_last_sync_time(cls.get_import_key(), datetime.datetime.now()) - cls.sync_batch_from_storage() + model_cls.get_storage().set_last_sync_time(model_cls.get_import_key(), datetime.datetime.now()) + model_cls.sync_batch_from_storage() @shared_task(queue=config.CELERY_QUEUE) diff --git a/src/django_clickhouse/utils.py b/src/django_clickhouse/utils.py index a279d3a..1ab1e90 100644 --- a/src/django_clickhouse/utils.py +++ b/src/django_clickhouse/utils.py @@ -18,7 +18,7 @@ from .database import connections T = TypeVar('T') -def get_tz_offset(db_alias=None): # type: (Optional[str]) -> int +def get_tz_offset(db_alias: Optional[str] = None) -> int: """ Returns ClickHouse server timezone offset in minutes :param db_alias: The database alias used @@ -28,8 +28,8 @@ def get_tz_offset(db_alias=None): # type: (Optional[str]) -> int return int(db.server_timezone.utcoffset(datetime.datetime.utcnow()).total_seconds() / 60) -def format_datetime(dt, timezone_offset=0, day_end=False, db_alias=None): - # type: (Union[datetime.date, datetime.datetime], int, bool, Optional[str]) -> str +def format_datetime(dt: Union[datetime.date, datetime.datetime], timezone_offset: int = 0, day_end: bool = False, + db_alias: Optional[str] = None) -> str: """ Formats datetime and date objects to format that can be used in WHERE conditions of query :param dt: datetime.datetime or datetime.date object @@ -58,9 +58,9 @@ def format_datetime(dt, timezone_offset=0, day_end=False, db_alias=None): return server_dt.strftime("%Y-%m-%d %H:%M:%S") -def module_exists(module_name): # type: (str) -> bool +def module_exists(module_name: str) -> bool: """ - Checks if moudle exists + Checks if module exists :param module_name: Dot-separated module name :return: Boolean """ @@ -69,7 +69,7 @@ def module_exists(module_name): # type: (str) -> bool return spam_spec is not None -def lazy_class_import(obj): # type: (Union[str, Any]) -> Any +def lazy_class_import(obj: Union[str, Any]) -> Any: """ If string is given, imports object by given module path. Otherwise returns the object @@ -88,7 +88,7 @@ def lazy_class_import(obj): # type: (Union[str, Any]) -> Any return obj -def get_subclasses(cls, recursive=False): # type: (T, bool) -> Set[T] +def get_subclasses(cls: T, recursive: bool = False) -> Set[T]: """ Gets all subclasses of given class Attention!!! Classes would be found only if they were imported before using this function @@ -105,8 +105,8 @@ def get_subclasses(cls, recursive=False): # type: (T, bool) -> Set[T] return subclasses -def model_to_dict(instance, fields=None, exclude_fields=None): - # type: (DjangoModel, Optional[Iterable[str]], Optional[Iterable[str]]) -> Dict[str, Any] +def model_to_dict(instance: DjangoModel, fields: Optional[Iterable[str]] = None, + exclude_fields: Optional[Iterable[str]] = None) -> Dict[str, Any]: """ Standard model_to_dict ignores some fields if they have invalid naming :param instance: Object to convert to dictionary diff --git a/tests/models.py b/tests/models.py index a0de1ec..e915252 100644 --- a/tests/models.py +++ b/tests/models.py @@ -2,10 +2,15 @@ This file contains sample models to use in tests """ from django.db import models +from django.db.models import QuerySet from django.db.models.manager import BaseManager from django_pg_returning import UpdateReturningModel -from django_clickhouse.models import ClickHouseSyncModel, ClickHouseSyncQuerySet +from django_clickhouse.models import ClickHouseSyncModel, ClickHouseSyncQuerySet, ClickHouseSyncQuerySetMixin + + +class NativeQuerySet(ClickHouseSyncQuerySetMixin, QuerySet): + pass class TestQuerySet(ClickHouseSyncQuerySet): @@ -16,8 +21,13 @@ class TestManager(BaseManager.from_queryset(TestQuerySet)): pass +class NativeManager(BaseManager.from_queryset(NativeQuerySet)): + pass + + class TestModel(UpdateReturningModel, ClickHouseSyncModel): objects = TestManager() + native_objects = NativeManager() value = models.IntegerField() created_date = models.DateField() @@ -26,6 +36,7 @@ class TestModel(UpdateReturningModel, ClickHouseSyncModel): class SecondaryTestModel(UpdateReturningModel, ClickHouseSyncModel): objects = TestManager() + native_objects = NativeManager() value = models.IntegerField() created_date = models.DateField() diff --git a/tests/test_compatibility.py b/tests/test_compatibility.py index 0c0cd30..c369277 100644 --- a/tests/test_compatibility.py +++ b/tests/test_compatibility.py @@ -1,3 +1,6 @@ +import sys +from unittest import skipIf + from django.test import TestCase from django_clickhouse.compatibility import namedtuple @@ -10,12 +13,16 @@ class NamedTupleTest(TestCase): self.assertTupleEqual((1, 2, 4), tuple(TestTuple(1, 2, 4))) self.assertTupleEqual((1, 2, 4), tuple(TestTuple(a=1, b=2, c=4))) - def test_exceptions(self): + @skipIf(sys.version_info < (3, 7), + "On python < 3.7 this error is not raised, as not given defaults are filled by None") + def test_no_required_value(self): TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults=[3]) - # BUG On python < 3.7 this error is not raised, as not given defaults are filled by None - # with self.assertRaises(TypeError): - # TestTuple(b=1, c=4) + with self.assertRaises(TypeError): + TestTuple(b=1, c=4) + + def test_duplicate_value(self): + TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults=[3]) with self.assertRaises(TypeError): TestTuple(1, 2, 3, c=4) diff --git a/tests/test_models.py b/tests/test_models.py index 978cdc2..c9b39be 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,6 +1,9 @@ import datetime +from unittest import skipIf +import django from django.test import TransactionTestCase +from django.utils.timezone import now from tests.clickhouse_models import ClickHouseTestModel, ClickHouseSecondTestModel, ClickHouseCollapseTestModel, \ ClickHouseMultiTestModel @@ -60,6 +63,76 @@ class TestOperations(TransactionTestCase): self.assertSetEqual({('insert', "%s.%d" % (self.db_alias, instance.pk)) for instance in items}, set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))) + @skipIf(django.VERSION < (2, 2), "bulk_update method has been introduced in django 2.2") + def test_native_bulk_update(self): + items = list(self.django_model.objects.filter(pk__in={1, 2})) + for instance in items: + instance.value = instance.pk * 10 + + self.django_model.native_objects.bulk_update(items, ['value']) + + items = list(self.django_model.objects.filter(pk__in={1, 2})) + self.assertEqual(2, len(items)) + for instance in items: + self.assertEqual(instance.value, instance.pk * 10) + + self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in items}, + set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))) + + def test_pg_bulk_create(self): + now_dt = now() + res = self.django_model.objects.pg_bulk_create([ + {'value': i, 'created': now_dt, 'created_date': now_dt.date()} + for i in range(5) + ]) + self.assertEqual(5, res) + + items = list(self.django_model.objects.filter(value__lt=100).order_by('value')) + self.assertEqual(5, len(items)) + for i, instance in enumerate(items): + self.assertEqual(instance.created, now_dt) + self.assertEqual(instance.created_date, now_dt.date()) + self.assertEqual(i, instance.value) + + self.assertSetEqual({('insert', "%s.%d" % (self.db_alias, instance.pk)) for instance in items}, + set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))) + + def test_pg_bulk_update(self): + items = list(self.django_model.objects.filter(pk__in={1, 2})) + + self.django_model.objects.pg_bulk_update([ + {'id': instance.pk, 'value': instance.pk * 10} + for instance in items + ]) + + items = list(self.django_model.objects.filter(pk__in={1, 2})) + self.assertEqual(2, len(items)) + for instance in items: + self.assertEqual(instance.value, instance.pk * 10) + + self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in items}, + set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))) + + def test_pg_bulk_update_or_create(self): + items = list(self.django_model.objects.filter(pk__in={1, 2})) + + data = [{ + 'id': instance.pk, + 'value': instance.pk * 10, + 'created_date': instance.created_date, + 'created': instance.created + } for instance in items] + [{'id': 11, 'value': 110, 'created_date': datetime.date.today(), 'created': now()}] + + self.django_model.objects.pg_bulk_update_or_create(data) + + items = list(self.django_model.objects.filter(pk__in={1, 2, 11})) + self.assertEqual(3, len(items)) + for instance in items: + self.assertEqual(instance.value, instance.pk * 10) + + self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in items}, + set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))) + def test_get_or_create(self): instance, created = self.django_model.objects. \ get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'created': datetime.datetime.now(),