Merge pull request #8 from carrotquest/docs-and-compatibility

Docs and compatibility
This commit is contained in:
M1ha Shvn 2020-02-07 15:33:03 +05:00 committed by GitHub
commit c6148d0fb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 1294 additions and 118 deletions

View File

@ -27,28 +27,42 @@ addons:
- postgresql-11 - postgresql-11
- postgresql-contrib-11 - postgresql-contrib-11
- postgresql-client-11 - postgresql-client-11
- postgresql-12
- postgresql-contrib-12
- postgresql-client-12
- unzip - unzip
- rabbitmq-server - rabbitmq-server
python: python:
- 3.6 - 3.6
- 3.7 - 3.7
- 3.8
env: env:
- PG=9.6 DJANGO=2.1 - PG=9.6 DJANGO=2.1
- PG=10 DJANGO=2.1 - PG=10 DJANGO=2.1
- PG=11 DJANGO=2.1 - PG=11 DJANGO=2.1
- PG=12 DJANGO=2.1
- PG=9.6 DJANGO=2.2 - PG=9.6 DJANGO=2.2
- PG=10 DJANGO=2.2 - PG=10 DJANGO=2.2
- PG=11 DJANGO=2.2 - PG=11 DJANGO=2.2
- PG=12 DJANGO=2.2
- PG=9.6 DJANGO=3.0
- PG=10 DJANGO=3.0
- PG=11 DJANGO=3.0
- PG=12 DJANGO=3.0
before_install: before_install:
# Use default PostgreSQL 11 port # Use default PostgreSQL 11 port
- sudo sed -i 's/port = 5433/port = 5432/' /etc/postgresql/11/main/postgresql.conf - sudo sed -i 's/port = 5433/port = 5432/' /etc/postgresql/11/main/postgresql.conf
- sudo cp /etc/postgresql/{10,11}/main/pg_hba.conf - sudo cp /etc/postgresql/{10,11}/main/pg_hba.conf
- sudo sed -i 's/port = 5434/port = 5432/' /etc/postgresql/12/main/postgresql.conf
- sudo cp /etc/postgresql/{10,12}/main/pg_hba.conf
# Start PostgreSQL version we need # Start PostgreSQL version we need
- sudo systemctl stop postgresql && sudo systemctl start postgresql@$PG-main - sudo systemctl stop postgresql
- sudo systemctl start postgresql@$PG-main
# ClickHouse sources # ClickHouse sources
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4 - sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4
@ -60,9 +74,8 @@ install:
- sudo apt-get install clickhouse-client clickhouse-server clickhouse-common-static - sudo apt-get install clickhouse-client clickhouse-server clickhouse-common-static
- sudo service clickhouse-server restart - sudo service clickhouse-server restart
- pip install -r requirements.txt - pip install -r requirements-test.txt
- pip install -q Django==$DJANGO.* - pip install -q Django==$DJANGO.*
- pip install redis
- python setup.py -q install - python setup.py -q install
before_script: before_script:

View File

@ -1 +1,2 @@
# django-clickhouse # django-clickhouse
Documentation is [here](docs/index.md)

37
docs/basic_information.md Normal file
View File

@ -0,0 +1,37 @@
# Basic information
## About
This project's goal is to build [Yandex ClickHouse](https://clickhouse.yandex/) database into [Django](https://www.djangoproject.com/) project.
It is based on [infi.clickhouse-orm](https://github.com/Infinidat/infi.clickhouse_orm) library.
## Features
* Multiple ClickHouse database configuration in [settings.py](https://docs.djangoproject.com/en/2.1/ref/settings/)
* ORM to create and manage ClickHouse models.
* ClickHouse migration system.
* Scalable serialization of django model instances to ORM model instances.
* Effective periodical synchronization of django models to ClickHouse without loosing data.
* Synchronization process monitoring.
## Requirements
* [Python 3](https://www.python.org/downloads/)
* [Django](https://docs.djangoproject.com/) 1.7+
* [Yandex ClickHouse](https://clickhouse.yandex/)
* [infi.clickhouse-orm](https://github.com/Infinidat/infi.clickhouse_orm)
* [pytz](https://pypi.org/project/pytz/)
* [six](https://pypi.org/project/six/)
* [typing](https://pypi.org/project/typing/)
* [psycopg2](https://www.psycopg.org/)
* [celery](http://www.celeryproject.org/)
* [statsd](https://pypi.org/project/statsd/)
### Optional libraries
* [redis-py](https://redis-py.readthedocs.io/en/latest/) for [RedisStorage](storages.md#redisstorage)
* [django-pg-returning](https://github.com/M1hacka/django-pg-returning)
for optimizing registering updates in [PostgreSQL](https://www.postgresql.org/)
* [django-pg-bulk-update](https://github.com/M1hacka/django-pg-bulk-update)
for performing effective bulk update and create operations in [PostgreSQL](https://www.postgresql.org/)
## Installation
Install via pip:
`pip install django-clickhouse` ([not released yet](https://github.com/carrotquest/django-clickhouse/issues/3))
or via setup.py:
`python setup.py install`

96
docs/configuration.md Normal file
View File

@ -0,0 +1,96 @@
# Configuration
Library configuration is made in settings.py. All parameters start with `CLICKHOUSE_` prefix.
Prefix can be changed using `CLICKHOUSE_SETTINGS_PREFIX` parameter.
### CLICKHOUSE_SETTINGS_PREFIX
Defaults to: `'CLICKHOUSE_'`
You can change `CLICKHOUSE_` prefix in settings using this parameter to anything your like.
### CLICKHOUSE_DATABASES
Defaults to: `{}`
A dictionary, defining databases in django-like style.
Key is an alias to communicate with this database in [connections](databases.md#getting-database-objects) and [using](routing.md#settings-database-in-queryset).
Value is a configuration dict with parameters:
* [infi.clickhouse_orm database parameters](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/class_reference.md#database)
* `migrate: bool` - indicates if this database should be migrated. See [migrations](migrations.md).
Example:
```python
CLICKHOUSE_DATABASES = {
'default': {
'db_name': 'test',
'username': 'default',
'password': ''
},
'reader': {
'db_name': 'read_only',
'username': 'reader',
'readonly': True,
'password': ''
}
}
```
### CLICKHOUSE_DEFAULT_DB_ALIAS
Defaults to: `'default'`
A database alias to use in [QuerySets](queries.md) if direct [using](routing.md#settings-database-in-queryset) is not specified.
### CLICKHOUSE_SYNC_STORAGE
Defaults to: `'django_clickhouse.storages.RedisStorage'`
An [intermediate storage](storages.md) class to use. Can be a string or class.
### CLICKHOUSE_REDIS_CONFIG
Default to: `None`
Redis configuration for [RedisStorage](storages.md#redisstorage).
If given, should be a dictionary of parameters to pass to [redis-py](https://redis-py.readthedocs.io/en/latest/#redis.Redis).
Example:
```python
CLICKHOUSE_REDIS_CONFIG = {
'host': '127.0.0.1',
'port': 6379,
'db': 8,
'socket_timeout': 10
}
```
### CLICKHOUSE_SYNC_BATCH_SIZE
Defaults to: `10000`
Maximum number of operations, fetched by sync process from [intermediate storage](storages.md) per [sync](sync.md)) round.
### CLICKHOUSE_SYNC_DELAY
Defaults to: `5`
A delay in seconds between two [sync](synchronization.md) rounds start.
### CLICKHOUSE_MODELS_MODULE
Defaults to: `'clickhouse_models'`
Module name inside [django app](https://docs.djangoproject.com/en/3.0/intro/tutorial01/),
where [ClickHouseModel](models.md#clickhousemodel) classes are search during migrations.
### CLICKHOUSE_DATABASE_ROUTER
Defaults to: `'django_clickhouse.routers.DefaultRouter'`
A dotted path to class, representing [database router](routing.md#router).
### CLICKHOUSE_MIGRATIONS_PACKAGE
Defaults to: `'clickhouse_migrations'`
A python package name inside [django app](https://docs.djangoproject.com/en/3.0/intro/tutorial01/),
where migration files are searched.
### CLICKHOUSE_MIGRATION_HISTORY_MODEL
Defaults to: `'django_clickhouse.migrations.MigrationHistory'`
A dotted name of a ClickHouseModel subclass (including module path),
representing [MigrationHistory model](migrations.md#migrationhistory-clickhousemodel).
### CLICKHOUSE_MIGRATE_WITH_DEFAULT_DB
Defaults to: `True`
A boolean flag enabling automatic ClickHouse migration,
when you call [`migrate`](https://docs.djangoproject.com/en/2.2/ref/django-admin/#django-admin-migrate) on `default` database.
### CLICKHOUSE_STATSD_PREFIX
Defaults to: `clickhouse`
A prefix in [statsd](https://pythonhosted.org/python-statsd/) added to each library metric. See [monitoring](monitoring.md).
### CLICKHOUSE_CELERY_QUEUE
Defaults to: `'celery'`
A name of a queue, used by celery to plan library sync tasks.

37
docs/databases.md Normal file
View File

@ -0,0 +1,37 @@
# Databases
Direct usage of `Database` objects is not expected in this library. But in some cases, you may still need them.
This section describes `Database` objects and there usage.
`django_clickhouse.database.Database` is a class, describing a ClickHouse database connection.
## Getting database objects
To get a `Database` object by its alias name in [CLICKHOUSE_DATABASES](configuration.md#clickhouse_databases)
use `django_clickhouse.database.connections` object.
This object is a `django_clickhouse.database.ConnectionProxy` instance:
it creates `Database` objects when they are used for the first time and stores them in memory.
Example:
```python
from django_clickhouse.database import connections
# Database objects are inited on first call
db = connections['default']
secondary = connections['secondary']
# Already inited - object is returned from memory
db_link = connections['default']
```
You can also get database objects from [QuerySet](queries.md) and [ClickHouseModel](models.md) instances by calling `get_database(for_write: bool = False)` method.
This database may differ, depending on [routing](routing.md#router) you use.
## Database object
Database class is based on [infi.clickhouse_orm Database object](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/models_and_databases.md#models-and-databases),
but extends it with some extra attributes and methods:
### Database migrations are restricted
I expect this library [migration system](migrations.md) to be used.
Direct database migration will lead to migration information errors.
### `insert_tuples` and `select_tuples` methods
Methods to work with [ClickHouseModel namedtuples](models.md#clickhousemodel-namedtuple-form).

22
docs/index.md Normal file
View File

@ -0,0 +1,22 @@
# Table of contents
* [Basic information](basic_information.md)
* [About](basic_information.md#about)
* [Features](basic_information.md#features)
* [Requirements](basic_information.md#requirements)
* [Installation](basic_information.md#installation)
* [Design motivation](motivation.md)
* [Usage](overview.md)
* [Overview](overview.md)
* [Models](models.md)
* [DjangoModel](models.md#DjangoModel)
* [ClickHouseModel](models.md#ClickHouseModel)
* [Making queries](queries.md)
* [Databases](models.md)
* [Routing](routing.md)
* [Migrations](migrations.md)
* [Synchronization](synchronization.md)
* [Storages](storages.md)
* [RedisStorage](storages.md#redisstorage)
* [Monitoring](monitoring.md)
* [Performance notes](performance.md)

77
docs/migrations.md Normal file
View File

@ -0,0 +1,77 @@
# Migrations
Migration system allows to make migrate ClickHouse table schema based on `ClickHouseModel`.
Library migrations are based on [infi.clickhouse_orm migration system](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/schema_migrations.md),
but makes it a little bit more django-like.
## File structure
Each django app can have optional `clickhouse_migrations` package.
This is a default package name, it can be changed with [CLICKHOUSE_MIGRATIONS_PACKAGE](configuration.md#clickhouse_migrations_package) setting.
Package contains py files, starting with 4-digit number.
A number gives an order in which migrations will be applied.
Example:
```
my_app
>> clickhouse_migrations
>>>> __init__.py
>>>> 0001_initial.py
>>>> 0002_add_new_field_to_my_model.py
>> clickhouse_models.py
>> urls.py
>> views.py
```
## Migration files
Each file must contain a `Migration` class, inherited from `django_clickhouse.migrations.Migration`.
The class should define an `operations` attribute - a list of operations to apply one by one.
Operation is one of [operations, supported by infi.clickhouse-orm](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/schema_migrations.md).
```python
from django_clickhouse import migrations
from my_app.clickhouse_models import ClickHouseUser
class Migration(migrations.Migration):
operations = [
migrations.CreateTable(ClickHouseUser)
]
```
## MigrationHistory ClickHouseModel
This model stores information about applied migrations.
By default, library uses `django_clickhouse.migrations.MigrationHistory` model,
but this can be changed using `CLICKHOUSE_MIGRATION_HISTORY_MODEL` setting.
For instance, if you want to make it replicated, you have to redeclare tables engine.
MigrationHistory model is stored in default database.
## Automatic migrations
When library is installed, it tries applying migrations every time,
you call [django migrate](https://docs.djangoproject.com/en/3.0/ref/django-admin/#django-admin-migrate). If you want to disable this, use [CLICKHOUSE_MIGRATE_WITH_DEFAULT_DB](configuration.md#clickhouse_migrate_with_default_db) setting.
By default migrations are applied to all [CLICKHOUSE_DATABASES](configuration.md#clickhouse_databases), which have no flags:
* `'migrate': False`
* `'readonly': True`
Note: migrations are only applied, with django `default` database.
So if you call `python manage.py migrate --database=secondary` they wouldn't be applied.
## Migration algorithm
- Get a list of databases from `CLICKHOUSE_DATABASES` setting. Migrate them one by one.
- Find all django apps from `INSTALLED_APPS` setting, which have no `readonly=True` attribute and have `migrate=True` attribute. Migrate them one by one.
* Iterate over `INSTAALLED_APPS`, searching for [clickhouse_migrations package](#file-structure)
* If package was not found, skip app.
* Get a list of migrations applied from [MigrationHistory model](#migrationhistory-clickhousemodel)
* Get a list of unapplied migrations
* Get [Migration class](#migration-files) from each migration and call it `apply()` method
* `apply()` iterates operations, checking if it should be applied with [router](routing.md)
* If migration should be applied, it is applied
* Mark migration as applied in [MigrationHistory model](#migrationhistory-clickhousemodel)
## Security notes
1) ClickHouse has no transaction system, as django relational databases.
As a result, if migration fails, it would be partially applied and there's no correct way to rollback.
I recommend to make migrations as small as possible, so it should be easier to determine and correct the result if something goes wrong.
2) Unlike django, this library is enable to unapply migrations.
This functionality may be implemented in the future.

153
docs/models.md Normal file
View File

@ -0,0 +1,153 @@
# Models
Model is a pythonic class representing database table in your code.
It also defines an interface (methods) to perform operations on this table
and describes its configuration inside framework.
This library operates 2 kinds of models:
* DjangoModel, describing tables in source relational database (PostgreSQL, MySQL, etc.)
* ClickHouseModel, describing models in [ClickHouse](https://clickhouse.yandex/docs/en) database
In order to distinguish them, I will refer them as ClickHouseModel and DjangoModel in further documentation.
## DjangoModel
Django provides a [model system](https://docs.djangoproject.com/en/3.0/topics/db/models/)
to interact with relational databases.
In order to perform [synchronization](synchronization.md) we need to "catch" all [DML operations](https://en.wikipedia.org/wiki/Data_manipulation_language)
on source django model and save information about them in [storage](storages.md).
To achieve this, library introduces abstract `django_clickhouse.models.ClickHouseSyncModel` class.
Each model, inherited from `ClickHouseSyncModel` will automatically save information, needed to sync to storage.
Read [synchronization](synchronization.md) section for more info.
`ClickHouseSyncModel` saves information about:
* `Model.objects.create()`, `Model.objects.bulk_create()`
* `Model.save()`, `Model.delete()`
* `QuerySet.update()`, `QuerySet.delete()`
* All queries of [django-pg-returning](https://pypi.org/project/django-pg-returning/) library
* All queries of [django-pg-bulk-update](https://pypi.org/project/django-pg-bulk-update/) library
You can also combine your custom django manager and queryset using mixins from `django_clickhouse.models` package:
**Important note**: Operations are saved in [transaction.on_commit()](https://docs.djangoproject.com/en/2.2/topics/db/transactions/#django.db.transaction.on_commit).
The goal is avoiding syncing operations, not committed to relational database.
But this may also provide bad effect: situation, when transaction is committed,
but it hasn't been registered, if something went wrong during registration.
Example:
```python
from django_clickhouse.models import ClickHouseSyncModel
from django.db import models
from datetime import date
class User(ClickHouseSyncModel):
first_name = models.CharField(max_length=50)
age = models.IntegerField()
birthday = models.DateField()
# All operations will be registered to sync with ClickHouse models:
User.objects.create(first_name='Alice', age=16, birthday=date(2003, 6, 1))
User(first_name='Bob', age=17, birthday=date(2002, 1, 1)).save()
User.objects.update(first_name='Candy')
# Custom manager
```
## ClickHouseModel
This kind of model is based on [infi.clickhouse_orm Model](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/models_and_databases.md#defining-models)
and represents table in [ClickHouse database](https://clickhouse.yandex/docs/en).
You should define `ClickHouseModel` subclass for each table you want to access and sync in ClickHouse.
Each model should be inherited from `django_clickhouse.clickhouse_models.ClickHouseModel`.
By default, models are searched in `clickhouse_models` module of each django app.
You can change modules name, using setting [CLICKHOUSE_MODELS_MODULE](configuration.md#clickhouse_models_module)
You can read more about creating models and fields [here](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/models_and_databases.md#defining-models):
all capabilities are supported. At the same time, django-clickhouse libraries adds:
* [routing attributes and methods](routing.md)
* [sync attributes and methods](synchronization.md)
Example:
```python
from django_clickhouse.clickhouse_models import ClickHouseModel
from django_clickhouse.engines import MergeTree
from infi.clickhouse_orm import fields
from my_app.models import User
class HeightData(ClickHouseModel):
django_model = User
first_name = fields.StringField()
birthday = fields.DateField()
height = fields.Float32Field()
engine = MergeTree('birthday', ('first_name', 'last_name', 'birthday'))
class AgeData(ClickHouseModel):
django_model = User
first_name = fields.StringField()
birthday = fields.DateField()
age = fields.UInt32Field()
engine = MergeTree('birthday', ('first_name', 'last_name', 'birthday'))
```
### ClickHouseMultiModel
In some cases you may need to sync single DjangoModel to multiple ClickHouse models.
This model gives ability to reduce number of relational database operations.
You can read more in [sync](synchronization.md) section.
Example:
```python
from django_clickhouse.clickhouse_models import ClickHouseMultiModel
from my_app.models import User
class MyMultiModel(ClickHouseMultiModel):
django_model = User
sub_models = [AgeData, HeightData]
```
## ClickHouseModel namedtuple form
[infi.clickhouse_orm](https://github.com/Infinidat/infi.clickhouse_orm) stores data rows in special Model objects.
It works well on hundreds of records.
But when you sync 100k records in a batch, initializing 100k model instances will be slow.
Too optimize this process `ClickHouseModel` class have `get_tuple_class()` method.
It generates a [namedtuple](https://docs.python.org/3/library/collections.html#collections.namedtuple) class,
with same data fields a model has.
Initializing such tuples takes much less time, then initializing Model objects.
## Engines
Engine is a way of storing, indexing, replicating and sorting data ClickHouse ([docs](https://clickhouse.yandex/docs/en/operations/table_engines/)).
Engine system is based on [infi.clickhouse_orm engine system](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/table_engines.md#table-engines).
This library extends original engine classes as each engine can have it's own synchronization mechanics.
Engines are defined in `django_clickhouse.engines` module.
Currently supported engines (with all infi functionality, [more info](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/table_engines.md#data-replication)):
* `MergeTree`
* `ReplacingMergeTree`
* `SummingMergeTree`
* `CollapsingMergeTree`
## Serializers
Serializer is a class which translates django model instances to [namedtuples, inserted into ClickHouse](#clickhousemodel-namedtuple-form).
`django_clickhouse.serializers.Django2ClickHouseModelSerializer` is used by default in all models.
All serializers must inherit this class.
Serializer must implement next interface:
```python
from django_clickhouse.serializers import Django2ClickHouseModelSerializer
from django.db.models import Model as DjangoModel
from typing import *
class CustomSerializer(Django2ClickHouseModelSerializer):
def __init__(self, model_cls: Type['ClickHouseModel'], fields: Optional[Iterable[str]] = None,
exclude_fields: Optional[Iterable[str]] = None, writable: bool = False,
defaults: Optional[dict] = None) -> None:
super().__init__(model_cls, fields=fields, exclude_fields=exclude_fields, writable=writable, defaults=defaults)
def serialize(self, obj: DjangoModel) -> NamedTuple:
pass
```

55
docs/monitoring.md Normal file
View File

@ -0,0 +1,55 @@
# Monitoring
In order to monitor [synchronization](synchronization.md) process, [statsd](https://pypi.org/project/statsd/) is used.
Data from statsd then can be used by [Prometheus exporter](https://github.com/prometheus/statsd_exporter)
or [Graphite](https://graphite.readthedocs.io/en/latest/).
## Configuration
Library expects statsd to be configured as written in [statsd docs for django](https://statsd.readthedocs.io/en/latest/configure.html#in-django).
You can set a common prefix for all keys in this library using [CLICKHOUSE_STATSD_PREFIX](configuration.md#clickhouse_statsd_prefix) parameter.
## Exported metrics
## Gauges
* `<prefix>.sync.<model_name>.queue`
Number of elements in [intermediate storage](storages.md) queue waiting for import.
Queue should not be big. It depends on [sync_delay](synchronization.md#configuration) configured and time for syncing single batch.
It is a good parameter to watch and alert on.
## Timers
All time is sent in milliseconds.
* `<prefix>.sync.<model_name>.total`
Total time of single batch task execution.
* `<prefix>.sync.<model_name>.steps.<step_name>`
`<step_name>` is one of `pre_sync`, `get_operations`, `get_sync_objects`, `get_insert_batch`, `get_final_versions`,
`insert`, `post_sync`. Read [here](synchronization.md) for more details.
Time of each sync step. Can be useful to debug reasons of long sync process.
* `<prefix>.inserted_tuples.<model_name>`
Time of inserting batch of data into ClickHouse.
It excludes as much python code as it could to distinguish real INSERT time from python data preparation.
* `<prefix>.sync.<model_name>.register_operations`
Time of inserting sync operations into storage.
## Counters
* `<prefix>.sync.<model_name>.register_operations.<op_name>`
`<op_name>` is one or `create`, `update`, `delete`.
Number of DML operations added by DjangoModel methods calls to sync queue.
* `<prefix>.sync.<model_name>.operations`
Number of operations, fetched from [storage](storages.md) for sync in one batch.
* `<prefix>.sync.<model_name>.import_objects`
Number of objects, fetched from relational storage (based on operations) in order to sync with ClickHouse models.
* `<prefix>.inserted_tuples.<model_name>`
Number of rows inserted to ClickHouse.
* `<prefix>.sync.<model_name>.lock.timeout`
Number of locks in [RedisStorage](storages.md#redisstorage), not acquired and skipped by timeout.
This value should be zero. If not, it means your model sync takes longer then sync task call interval.
* `<prefix>.sync.<model_name>.lock.hard_release`
Number of locks in [RedisStorage](storages.md#redisstorage), released hardly (as process which required a lock is dead).
This value should be zero. If not, it means your sync tasks are killed hardly during the sync process (by OutOfMemory killer, for instance).

35
docs/motivation.md Normal file
View File

@ -0,0 +1,35 @@
# Design motivation
## Separate from django database setting, QuerySet and migration system
ClickHouse SQL and DML language is near to standard, but does not follow it exactly ([docs](https://clickhouse.tech/docs/en/introduction/distinctive_features/#sql-support)).
As a result, it can not be easily integrated into django query subsystem as it expects databases to support:
1. Transactions.
2. INNER/OUTER JOINS by condition.
3. Full featured updates and deletes.
4. Per database replication (ClickHouse has per table replication)
5. Other features, not supported in ClickHouse.
In order to have more functionality, [infi.clickhouse-orm](https://github.com/Infinidat/infi.clickhouse_orm)
is used as base library for databases, querysets and migrations. The most part of it is compatible and can be used without any changes.
## Sync over intermediate storage
This library has several goals which lead to intermediate storage:
1. Fail resistant import, does not matter what the fail reason is:
ClickHouse fail, network fail, killing import process by system (OOM, for instance).
2. ClickHouse does not like single row inserts: [docs](https://clickhouse.tech/docs/en/introduction/performance/#performance-when-inserting-data).
So it's worth batching data somewhere before inserting it.
ClickHouse provide BufferEngine for this, but it can loose data if ClickHouse fails - and no one will now about it.
3. Better scalability. Different intermediate storages may be implemented in the future, based on databases, queue systems or even BufferEngine.
## Replication and routing
In primitive cases people just have single database or cluster with same tables on each replica.
But as ClickHouse has per table replication a more complicated structure can be built:
1. Model A is stored on servers 1 and 2
2. Model B is stored on servers 2, 3 and 5
3. Model C is stored on servers 1, 3 and 4
Moreover, migration operations in ClickHouse can also be auto-replicated (`ALTER TABLE`, for instance) or not (`CREATE TABLE`).
In order to make replication scheme scalable:
1. Each model has it's own read / write / migrate [routing configuration](routing.md#clickhousemodel-routing-attributes).
2. You can use [router](routing.md#router) like django does to set basic routing rules for all models or model groups.

140
docs/overview.md Normal file
View File

@ -0,0 +1,140 @@
# Usage overview
## Requirements
At the begging I expect, that you already have:
1. [ClickHouse](https://clickhouse.tech/docs/en/) (with [ZooKeeper](https://zookeeper.apache.org/), if you use replication)
2. Relational database used with [Django](https://www.djangoproject.com/). For instance, [PostgreSQL](https://www.postgresql.org/)
3. [Django database set up](https://docs.djangoproject.com/en/3.0/ref/databases/)
4. [Intermediate storage](storages.md) set up. For instance, [Redis](https://redis.io/).
## Configuration
Add required parameters to [Django settings.py](https://docs.djangoproject.com/en/3.0/topics/settings/):
1. [CLICKHOUSE_DATABASES](configuration.md#clickhouse_databases)
2. [Intermediate storage](storages.md) configuration. For instance, [RedisStorage](storages.md#redisstorage)
3. It's recommended to change [CLICKHOUSE_CELERY_QUEUE](configuration.md#clickhouse_celery_queue)
4. Add sync task to [celerybeat schedule](http://docs.celeryproject.org/en/v2.3.3/userguide/periodic-tasks.html).
Note, that executing planner every 2 seconds doesn't mean sync is executed every 2 seconds.
Sync time depends on model sync_delay attribute value and [CLICKHOUSE_SYNC_DELAY](configuration.md#clickhouse_sync_delay) configuration parameter.
You can read more in [sync section](synchronization.md).
You can also change other [configuration parameters](configuration.md) depending on your project.
#### Example
```python
# django-clickhouse library setup
CLICKHOUSE_DATABASES = {
# Connection name to refer in using(...) method
'default': {
'db_name': 'test',
'username': 'default',
'password': ''
}
}
CLICKHOUSE_REDIS_CONFIG = {
'host': '127.0.0.1',
'port': 6379,
'db': 8,
'socket_timeout': 10
}
CLICKHOUSE_CELERY_QUEUE = 'clickhouse'
# If you have no any celerybeat tasks, define a new dictionary
# More info: http://docs.celeryproject.org/en/v2.3.3/userguide/periodic-tasks.html
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'clickhouse_auto_sync': {
'task': 'django_clickhouse.tasks.clickhouse_auto_sync',
'schedule': timedelta(seconds=2), # Every 2 seconds
'options': {'expires': 1, 'queue': CLICKHOUSE_CELERY_QUEUE}
}
}
```
## Adopting django model
Read [ClickHouseSyncModel](models.md#djangomodel) section.
Inherit all [django models](https://docs.djangoproject.com/en/3.0/topics/db/models/)
you want to sync with ClickHouse from `django_clickhouse.models.ClickHouseSyncModel` or sync mixins.
```python
from django_clickhouse.models import ClickHouseSyncModel
from django.db import models
class User(ClickHouseSyncModel):
first_name = models.CharField(max_length=50)
visits = models.IntegerField(default=0)
birthday = models.DateField()
```
## Create ClickHouseModel
1. Read [ClickHouseModel section](models.md#clickhousemodel)
2. Create `clickhouse_models.py` in your django app.
3. Add `ClickHouseModel` class there:
```python
from django_clickhouse.clickhouse_models import ClickHouseModel
from django_clickhouse.engines import MergeTree
from infi.clickhouse_orm import fields
from my_app.models import User
class ClickHouseUser(ClickHouseModel):
django_model = User
id = fields.UInt32Field()
first_name = fields.StringField()
birthday = fields.DateField()
visits = fields.UInt32Field(default=0)
engine = MergeTree('birthday', ('birthday',))
```
## Migration to create table in ClickHouse
1. Read [migrations](migrations.md) section
2. Create `clickhouse_migrations` package in your django app
3. Create `0001_initial.py` file inside the created package. Result structure should be:
```
my_app
>> clickhouse_migrations
>>>> __init__.py
>>>> 0001_initial.py
>> clickhouse_models.py
>> models.py
```
4. Add content to file `0001_initial.py`:
```python
from django_clickhouse import migrations
from my_app.cilckhouse_models import ClickHouseUser
class Migration(migrations.Migration):
operations = [
migrations.CreateTable(ClickHouseUser)
]
```
## Run migrations
Call [django migrate](https://docs.djangoproject.com/en/3.0/ref/django-admin/#django-admin-migrate)
to apply created migration and create table in ClickHouse.
## Set up and run celery sync process
Set up [celery worker](https://docs.celeryproject.org/en/latest/userguide/workers.html#starting-the-worker) for [CLICKHOUSE_CELERY_QUEUE](configuration.md#clickhouse_celery_queue) and [celerybeat](https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#starting-the-scheduler).
## Test sync and write analytics queries
1. Read [monitoring section](monitoring.md) in order to set up your monitoring system.
2. Read [query section](queries.md) to understand how to query database.
2. Create some data in source table with django.
3. Check, if it is synced.
#### Example
```python
import time
from my_app.models import User
from my_app.clickhouse_models import ClickHouseUser
u = User.objects.create(first_name='Alice', birthday=datetime.date(1987, 1, 1), visits=1)
# Wait for celery task is executed at list once
time.sleep(6)
assert ClickHouseUser.objects.filter(id=u.id).count() == 1, "Sync is not working"
```
## Congratulations
Tune your integration to achieve better performance if needed: [docs](performance.md).

46
docs/performance.md Normal file
View File

@ -0,0 +1,46 @@
# Sync performance
Every real life system may have its own performance problems.
They depend on:
* You ClickHouse servers configuration
* Number of ClickHouse instances in your cluster
* Your data formats
* Import speed
* Network
* etc
I recommend to use [monitoring](monitoring.md) in order to understand where is the bottle neck and act accordingly.
This chapter gives a list of known problems which can slow down your import.
## ClickHouse tuning
Read this [doc](https://clickhouse.tech/docs/en/introduction/performance/#performance-when-inserting-data)
and tune it both for read and write.
## ClickHouse cluster
As ClickHouse is a [multimaster database](https://clickhouse.tech/docs/en/introduction/distinctive_features/#data-replication-and-data-integrity-support),
you can import and read from any node when you have a cluster.
In order to read and import to multiple nodes you can use [CHProxy](https://github.com/Vertamedia/chproxy)
or add multiple databases to [routing configuration](routing.md#clickhousemodel-routing-attributes).
## CollapsingMergeTree engine and previous versions
In order to reduce number of stored data in [intermediate storage](storages.md),
this library doesn't store old versions of data on update or delete.
Another point is that getting previous data versions from relational storages is a hard operation.
Engines like `CollapsingMergeTree` get old versions from ClickHouse:
1. Using `version_col` if it is set in engine's parameters.
This is a special field which stores incremental row versions and is filled by the library.
It should be of any unsigned integer type (depending on how many row versions you may have).
2. Using `FINAL` query modification.
This way is much more slow, but doesn't require additional column.
## Know your data
In common case library user uses python types to form ClickHouse data.
Library is responsible for converting this data into format ClickHouse expects to receive.
This leads to great number of convert operations when you import data in big batches.
In order to reduce this time, you can:
* Set `MyClickHouseModel.sync_formatted_tuples` to True
* Override `MyClickHouseModel.get_insert_batch(, import_objects: Iterable[DjangoModel])` method:
It should get `cls.get_tuple_class()` and yield (it is a [generator](https://wiki.python.org/moin/Generators))
so it generates tuples of string values, already prepared to insert into ClickHouse.
**Important note**: `ClickHouseModel.get_insert_batch(...)` can perform additional functionality depending on model [engine](models.md#engines).
Be careful.

66
docs/queries.md Normal file
View File

@ -0,0 +1,66 @@
# Making queries
QuerySet system used by this library looks very similar to django, but it is implemented separately.
You can read reasons for this design [here](motivation.md#separate-from-django-database-setting-queryset-and-migration-system).
## Usage
Library query system extends [infi.clickhouse-orm QuerySet system](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/querysets.md) and supports all it features.
In most cases you have no need to create querysets explicitly - just use `objects` attribute or `objects_in(db)` method of `ClickHouseModel`.
At the same time `django-clickhouse` adds some extra features to `QuerySet` and `AggregateQuerySet`.
They are available if your model inherits `django_clickhouse.clickhouse_models.ClickHouseModel`.
## Extra features
### Django-like routing system
There's no need to set database object explicitly with `objects_in(...)` method, as original QuerySet expects.
Database is determined based on library configuration and [router](routing.md#router) used.
If you want to set database explicitly you can use any of approaches:
* [infi approach](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/querysets.md#querysets)
* Django like `QuerySet.using(db_alias)` method
Example:
```python
from django_clickhouse.database import connections
from my_app.clickhouse_models import ClickHouseUser
# This query will choose database using current router.
# By default django_clickhouse.routers.DefaultRouter is used.
# It gets one random database, from ClickHouseUser.read_db_aliases for read queries
ClickHouseUser.objects.filter(id__in=[1,2,3]).count()
# These queries do the same thing, using 'secondary' connection from CLICKHOUSE_DATABASES setting
ClickHouseUser.objects_in(connections['secondary']).filter(id__in=[1,2,3]).count()
ClickHouseUser.objects.filter(id__in=[1,2,3]).using('secondary').count()
# You can get database to use with get_database(for_write: bool = False) method
# Note that it if you have multiple database in model settings,
# DefaultRouter can return any of them each time function is called, function is stateless
ClickHouseUser.objects.get_database(for_write=False)
```
### QuerySet create methods
This library adds methods to add objects like django does without direct Database object usage.
Example:
```python
from datetime import date
from my_app.clickhouse_models import ClickHouseUser
# This queries will choose database using current router.
# By default django_clickhouse.routers.DefaultRouter is used.
# It gets one random database, from ClickHouseUser.write_db_aliases for write queries
# You can set database explicitly with using(...) or objects_in(...) methods
instance = ClickHouseUser.objects.create(id=1, first_name='Alice', visits=1, birthday=date(2003, 6, 1))
objs = ClickHouseUser.objects.bulk_create([
ClickHouseUser(id=2, first_name='Bob', visits=2, birthday=date(2001, 5, 1)),
ClickHouseUser(id=3, first_name='Jhon', visits=3, birthday=date(2002, 7, 11))
], batch_size=10)
```
### Getting all objects
`QuerySet.all()` method returns copy of current QuerySet:
```python
from my_app.clickhouse_models import ClickHouseUser
qs = ClickHouseUser.objects.all()
```

62
docs/routing.md Normal file
View File

@ -0,0 +1,62 @@
# Database routing
One of this libraries goals was to create easy and extendable automatic database routing.
## Motivation
In original [infi.clickhouse-orm](https://github.com/Infinidat/infi.clickhouse_orm)
you had to explicitly create [Database](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/models_and_databases.md#inserting-to-the-database) objects
and set database to each query with `objects_in(db)` method.
But common projects use a quite little number of database connections.
As a result, it's easier to setup routing once and use it as [django](https://docs.djangoproject.com/en/2.2/topics/db/multi-db/) does.
Unlike traditional relational databases, [ClickHouse](https://clickhouse.yandex/docs/en/)
has per table replication.
This means that:
1) Each model can have it's own replication scheme
2) Some migration queries are replicated automatically, others - not.
3) To make system more extendable we need default routing, per model routing and router class for complex cases.
## Introduction
All database connections are defined in [CLICKHOUSE_DATABASES](configuration.md#clickhouse_databases) setting.
Each connection has it's alias name to refer with.
If no routing is configured, [CLICKHOUSE_DEFAULT_DB_ALIAS](configuration.md#clickhouse_default_db_alias) is used.
## Router
Router is a class, defining 3 methods:
* `def db_for_read(self, model: ClickHouseModel, **hints) -> str`
Returns `database alias` to use for given `model` for `SELECT` queries.
* `def db_for_write(self, model: ClickHouseModel, **hints) -> str`
Returns `database alias` to use for given `model` for `INSERT` queries.
* `def allow_migrate(self, db_alias: str, app_label: str, operation: Operation, model: Optional[ClickHouseModel] = None, **hints: dict) -> bool`
Checks if migration `operation` should be applied in django application `app_label` on database `db_alias`.
Optional `model` field can be used to determine migrations on concrete model.
By default [CLICKHOUSE_DATABASE_ROUTER](configuration.md#clickhouse_database_router) is used.
It gets routing information from model fields, described below.
## ClickHouseModel routing attributes
Default database router reads routing settings from model attributes.
```python
from django_clickhouse.configuration import config
from django_clickhouse.clickhouse_models import ClickHouseModel
class MyModel(ClickHouseModel):
# Servers, model is replicated to.
# Router takes random database to read or write from.
read_db_aliases = (config.DEFAULT_DB_ALIAS,)
write_db_aliases = (config.DEFAULT_DB_ALIAS,)
# Databases to perform replicated migration queries, such as ALTER TABLE.
# Migration is applied to random database from the list.
migrate_replicated_db_aliases = (config.DEFAULT_DB_ALIAS,)
# Databases to perform non-replicated migrations (CREATE TABLE, DROP TABLE).
# Migration is applied to all databases from the list.
migrate_non_replicated_db_aliases = (config.DEFAULT_DB_ALIAS,)
```
## Settings database in QuerySet
Database can be set in each [QuerySet](queries.md) explicitly by using one of methods:
* With [infi approach](https://github.com/Infinidat/infi.clickhouse_orm/blob/develop/docs/querysets.md#querysets): `MyModel.objects_in(db_object).filter(id__in=[1,2,3]).count()`
* With `using()` method: `MyModel.objects.filter(id__in=[1,2,3]).using(db_alias).count()`
If no explicit database is provided, database connection to use is determined lazily with router's `db_for_read` or `db_for_write`
method, depending on query type.

70
docs/storages.md Normal file
View File

@ -0,0 +1,70 @@
# Storages
Storage class is a facade, that stores information about operations, which where performed on django models.
It has three main purposes:
* Storage should be fast to insert single records. It forms a batch of data, which is then inserted to ClickHouse.
* Storage guarantees, that no data is lost.
Intermediate data in storage is deleted only after importing batch finishes successfully.
If it fails in some point - starting new import process should import failed data again.
* Keep information about sync process. For instance, last time the model sync has been called.
In order to determine different models from each other storage uses `import_key`.
By default, it is generated by `ClickHouseModel.get_import_key()` method and is equal to class name.
Each method of abstract `Storage` class takes `kwargs` parameters, which can be used in concrete storage.
## Storage methods
* `register_operations(import_key: str, operation: str, *pks: *Any) -> int`
Saves a new operation in source database to storage. This method should be fast.
It is called after source database transaction is committed.
Method returns number of operations registered.
`operation` is one of `insert`, `update` or `delete`
`pks` is an iterable of strings, enough to select needed records from source database.
* `get_last_sync_time(import_key: str) -> Optional[datetime.datetime]`
Returns last time, a model sync has been called. If no sync has been done, returns None.
* `set_last_sync_time(import_key: str, dt: datetime.datetime) -> None`
Saves datetime, when a sync process has been called last time.
* `register_operations_wrapped(self, import_key: str, operation: str, *pks: *Any) -> int`
A wrapper for register_operations. It's goal is to write metrics and logs.
* `pre_sync(import_key: str, **kwargs) -> None`
Called before import process starts. It initializes storage for importing new batch.
* `operations_count(import_key: str, **kwargs) -> int`
Counts, how many operations are waiting for import in storage.
* `get_operations(import_key: str, count: int, **kwargs) -> List[Tuple[str, str]]`
Returns a next batch of operations to import. `count` parameter gives a number of operations to return.
Operation is a tuple `(operation, primary_key)`, where `operation` is one of insert, update or delete
and `primary_key` is a string enough to select record from source database.
* `post_sync(import_key: str, **kwargs) -> None`
Called after import process have finished. It cleans storage after importing a batch.
* `post_batch_removed(import_key: str, batch_size: int) -> None`
This method should be called by `post_sync` method after data is removed from storage.
By default, it marks queue size metric.
* `post_sync_failed(import_key: str, exception: Exception, **kwargs) -> None:`
Called if any exception has occurred during import process. It cleans storage after unsuccessful import.
Note that if import process is hardly killed (with OOM killer, for instance) this method is not called.
* `flush() -> None`
*Dangerous*. Drops all data, kept by storage. It is used for cleaning up between tests.
## Predefined storages
### RedisStorage
This storage uses [Redis database](https://redis.io/) as intermediate storage.
To communicate with Redis it uses [redis-py](https://redis-py.readthedocs.io/en/latest/) library.
It is not required, but should be installed to use RedisStorage.
In order to use RedisStorage you must also fill [CLICKHOUSE_REDIS_CONFIG](configuration.md#clickhouse_redis_config) parameter.
Stored operation contains:
* Django database alias where original record can be found.
* Record primary key
* Operation performed (insert, update, delete)
This storage does not allow multi-threaded sync.

105
docs/synchronization.md Normal file
View File

@ -0,0 +1,105 @@
# Synchronization
## Design motivation
Read [here](motivation.md#sync-over-intermediate-storage).
## Algorithm
<!--- ![General scheme](https://octodex.github.com/images/yaktocat.png) --->
1. [Celery beat](https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html) schedules `django_clickhouse.tasks.clickhouse_auto_sync` task every second or near.
2. [Celery workers](https://docs.celeryproject.org/en/latest/userguide/workers.html) execute `clickhouse_auto_sync`.
It searches for `ClickHouseModel` subclasses which need sync (if `Model.need_sync()` method returns `True`).
2. `django_clickhouse.tasks.sync_clickhouse_model` task is scheduled for each `ClickHouseModel` which needs sync.
3. `sync_clickhouse_model` saves sync start time in [storage](storages.md) and calls `ClickHouseModel.sync_batch_from_storage()` method.
4. `ClickHouseModel.sync_batch_from_storage()`:
* Gets [storage](storages.md) model works with using `ClickHouseModel.get_storage()` method
* Calls `Storage.pre_sync(import_key)` for model [storage](storages.md).
This may be used to prevent parallel execution with locks or some other operations.
* Gets a list of operations to sync from [storage](storages.md).
* Fetches objects from relational database calling `ClickHouseModel.get_sync_objects(operations)` method.
* Forms a batch of tuples to insert into ClickHouse using `ClickHouseModel.get_insert_batch(import_objects)` method.
* Inserts batch of tuples into ClickHouse using `ClickHouseModel.insert_batch(batch)` method.
* Calls `Storage.post_sync(import_key)` method to clean up storage after syncing batch.
This method also removes synced operations from storage.
* If some exception occurred during execution, `Storage.post_sybc_failed(import_key)` method is called.
Note, that process can be killed without exception, for instance by OOM killer.
And this method will not be called.
## Configuration
Sync configuration can be set globally using django settings.py parameters or redeclared for each `ClickHouseModel` class.
`ClickHouseModel` configuration is prior to settings configuration.
### Settings configuration
* [CLICKHOUSE_CELERY_QUEUE](configuration.md#clickhouse_celery_queue)
Defaults to: `'celery'`
A name of a queue, used by celery to plan library sync tasks.
* [CLICKHOUSE_SYNC_STORAGE](configuration.md#clickhouse_sync_storage)
Defaults to: `'django_clickhouse.storages.RedisStorage'`
An [intermediate storage](storages.md) class to use. Can be a string or class.
* [CLICKHOUSE_SYNC_BATCH_SIZE](configuration.md#clickhouse_sync_storage)
Defaults to: `10000`
Maximum number of operations, fetched by sync process from [intermediate storage](storages.md) per sync round.
* [CLICKHOUSE_SYNC_DELAY](configuration.md#clickhouse_sync_storage)
Defaults to: `5`
A delay in seconds between two sync rounds start.
### ClickHouseModel configuration
Each `ClickHouseModel` subclass can define sync arguments and methods:
* `django_model: django.db.models.Model`
Required.
Django model this ClickHouseModel class is synchronized with.
* `django_model_serializer: django.db.models.Model`
Defaults to: `django_clickhouse.serializers.Django2ClickHouseModelSerializer`
[Serializer class](models.md#serializers) to convert DjangoModel to ClickHouseModel.
* `sync_enabled: bool`
Defaults to: `False`.
Is sync for this model enabled?
* `sync_batch_size: int`
Defaults to: [CLICKHOUSE_SYNC_BATCH_SIZE](configuration.md#clickhouse_sync_storage)
Maximum number of operations, fetched by sync process from [storage](storages.md) per sync round.
* `sync_delay: float`
Defaults to: [CLICKHOUSE_SYNC_DELAY](configuration.md#clickhouse_sync_storage)
A delay in seconds between two sync rounds start.
* `sync_storage: Union[str, Storage]`
Defaults to: [CLICKHOUSE_SYNC_STORAGE](configuration.md#clickhouse_sync_storage)
An [intermediate storage](storages.md) class to use. Can be a string or class.
Example:
```python
from django_clickhouse.clickhouse_models import ClickHouseModel
from django_clickhouse.engines import ReplacingMergeTree
from infi.clickhouse_orm import fields
from my_app.models import User
class ClickHouseUser(ClickHouseModel):
django_model = User
sync_enabled = True
sync_delay = 5
sync_batch_size = 1000
id = fields.UInt32Field()
first_name = fields.StringField()
birthday = fields.DateField()
visits = fields.UInt32Field(default=0)
engine = ReplacingMergeTree('birthday', ('birthday',))
```
## Fail resistance
Fail resistance is based on several points:
1. [Storage](storages.md) should not loose data in any case. It's not this library goal to keep it stable.
2. Data is removed from [storage](storages.md) only if import succeeds. Otherwise import attempt is repeated.
3. It's recommended to use ReplacingMergeTree or CollapsingMergeTree [engines](models.md#engines)
instead of simple MergeTree, so it removes duplicates if batch is imported twice.
4. Each `ClickHouseModel` is synced in separate process.
If one model fails, it should not affect other models.

11
requirements-test.txt Normal file
View File

@ -0,0 +1,11 @@
Django (>=1.7)
pytz
six
typing
psycopg2
infi.clickhouse-orm
celery
statsd
django-pg-returning
django-pg-bulk-update
redis

View File

@ -5,5 +5,4 @@ typing
psycopg2 psycopg2
infi.clickhouse-orm infi.clickhouse-orm
celery celery
statsd statsd
django-pg-returning

View File

@ -13,7 +13,7 @@ with open('requirements.txt') as f:
setup( setup(
name='django-clickhouse', name='django-clickhouse',
version='0.0.1', version='1.0.0',
packages=['django_clickhouse'], packages=['django_clickhouse'],
package_dir={'': 'src'}, package_dir={'': 'src'},
url='https://github.com/carrotquest/django-clickhouse', url='https://github.com/carrotquest/django-clickhouse',

View File

@ -48,9 +48,17 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
django_model = None django_model = None
django_model_serializer = Django2ClickHouseModelSerializer django_model_serializer = Django2ClickHouseModelSerializer
# Servers, model is replicated to.
# Router takes random database to read or write from.
read_db_aliases = (config.DEFAULT_DB_ALIAS,) read_db_aliases = (config.DEFAULT_DB_ALIAS,)
write_db_aliases = (config.DEFAULT_DB_ALIAS,) write_db_aliases = (config.DEFAULT_DB_ALIAS,)
# Databases to perform replicated migration queries, such as ALTER TABLE.
# Migration is applied to random database from the list.
migrate_replicated_db_aliases = (config.DEFAULT_DB_ALIAS,) migrate_replicated_db_aliases = (config.DEFAULT_DB_ALIAS,)
# Databases to perform non-replicated migrations (CREATE TABLE, DROP TABLE).
# Migration is applied to all databases from the list.
migrate_non_replicated_db_aliases = (config.DEFAULT_DB_ALIAS,) migrate_non_replicated_db_aliases = (config.DEFAULT_DB_ALIAS,)
sync_enabled = False sync_enabled = False
@ -86,12 +94,11 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return namedtuple("%sTuple" % cls.__name__, field_names, defaults=default_values) return namedtuple("%sTuple" % cls.__name__, field_names, defaults=default_values)
@classmethod @classmethod
def objects_in(cls, database): # type: (Database) -> QuerySet def objects_in(cls, database: Database)-> QuerySet:
return QuerySet(cls, database) return QuerySet(cls, database)
@classmethod @classmethod
def get_database_alias(cls, for_write=False): def get_database_alias(cls, for_write: bool = False) -> str:
# type: (bool) -> str
""" """
Gets database alias for read or write purposes Gets database alias for read or write purposes
:param for_write: Boolean flag if database is neede for read or for write :param for_write: Boolean flag if database is neede for read or for write
@ -104,8 +111,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return db_router.db_for_read(cls) return db_router.db_for_read(cls)
@classmethod @classmethod
def get_database(cls, for_write=False): def get_database(cls, for_write: bool = False) -> Database:
# type: (bool) -> Database
""" """
Gets database alias for read or write purposes Gets database alias for read or write purposes
:param for_write: Boolean flag if database is neede for read or for write :param for_write: Boolean flag if database is neede for read or for write
@ -115,8 +121,8 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return connections[db_alias] return connections[db_alias]
@classmethod @classmethod
def get_django_model_serializer(cls, writable=False, defaults=None): def get_django_model_serializer(cls, writable: bool= False, defaults: Optional[dict] = None
# type: (bool, Optional[dict]) -> Django2ClickHouseModelSerializer ) -> Django2ClickHouseModelSerializer:
serializer_cls = lazy_class_import(cls.django_model_serializer) serializer_cls = lazy_class_import(cls.django_model_serializer)
return serializer_cls(cls, writable=writable, defaults=defaults) return serializer_cls(cls, writable=writable, defaults=defaults)
@ -163,7 +169,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return True return True
@classmethod @classmethod
def get_sync_query_set(cls, using, pk_set): # type: (str, Set[Any]) -> DjangoQuerySet def get_sync_query_set(cls, using: str, pk_set: Set[Any]) -> DjangoQuerySet:
""" """
Forms django queryset to fetch for sync Forms django queryset to fetch for sync
:param using: Database to fetch from :param using: Database to fetch from
@ -173,7 +179,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return cls.django_model.objects.filter(pk__in=pk_set).using(using) return cls.django_model.objects.filter(pk__in=pk_set).using(using)
@classmethod @classmethod
def get_sync_objects(cls, operations): # type: (List[Tuple[str, str]]) -> List[DjangoModel] def get_sync_objects(cls, operations: List[Tuple[str, str]]) -> List[DjangoModel]:
""" """
Returns objects from main database to sync Returns objects from main database to sync
:param operations: A list of operations to perform :param operations: A list of operations to perform
@ -195,7 +201,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return list(chain(*objs)) return list(chain(*objs))
@classmethod @classmethod
def get_insert_batch(cls, import_objects): # type: (Iterable[DjangoModel]) -> List[ClickHouseModel] def get_insert_batch(cls, import_objects: Iterable[DjangoModel]) -> List['ClickHouseModel']:
""" """
Formats django model objects to batch of ClickHouse objects Formats django model objects to batch of ClickHouse objects
:param import_objects: DjangoModel objects to import :param import_objects: DjangoModel objects to import
@ -259,7 +265,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
raise ex raise ex
@classmethod @classmethod
def need_sync(cls): # type: () -> bool def need_sync(cls) -> bool:
""" """
Checks if this model needs synchronization: sync is enabled and delay has passed Checks if this model needs synchronization: sync is enabled and delay has passed
:return: Boolean :return: Boolean

View File

@ -1,5 +1,9 @@
import sys import sys
from collections import namedtuple as basenamedtuple from collections import namedtuple as basenamedtuple
from typing import Any, Set
from django.db import transaction, connections
from django.db.models import QuerySet
def namedtuple(*args, **kwargs): def namedtuple(*args, **kwargs):
@ -16,3 +20,36 @@ def namedtuple(*args, **kwargs):
return TupleClass return TupleClass
else: else:
return basenamedtuple(*args, **kwargs) return basenamedtuple(*args, **kwargs)
def django_pg_returning_available(using: str) -> bool:
"""
Checks if django-pg-returning library is installed and can be used with given databse
:return: Boolean
"""
try:
import django_pg_returning
return connections[using].vendor == 'postgresql'
except ImportError:
return False
def update_returning_pk(qs: QuerySet, updates: dict) -> Set[Any]:
"""
Updates QuerySet items returning primary key values.
This method should not depend on database engine, though can have optimization performances for some engines.
:param qs: QuerySet to update
:param updates: Update items as passed to QuerySet.update(**updates) method
:return: A set of primary keys
"""
qs._for_write = True
if django_pg_returning_available(qs.db) and hasattr(qs, 'update_returning'):
pk_name = qs.model._meta.pk.name
qs = qs.only(pk_name).update_returning(**updates)
pks = set(qs.values_list(pk_name, flat=True))
else:
with transaction.atomic(using=qs.db):
pks = set(qs.select_for_update().values_list('pk', flat=True))
QuerySet.update(qs, **updates)
return pks

View File

@ -28,7 +28,7 @@ DEFAULTS = {
class Config: class Config:
def __getattr__(self, item): # type: (str) -> Any def __getattr__(self, item: str) -> Any:
if item not in DEFAULTS: if item not in DEFAULTS:
raise AttributeError('Unknown config parameter `%s`' % item) raise AttributeError('Unknown config parameter `%s`' % item)

View File

@ -35,8 +35,8 @@ class Database(InfiDatabase):
def _get_applied_migrations(self, migrations_package_name): def _get_applied_migrations(self, migrations_package_name):
raise NotImplementedError("This method is not supported by django_clickhouse.") raise NotImplementedError("This method is not supported by django_clickhouse.")
def select_tuples(self, query, model_class, settings=None): def select_tuples(self, query: str, model_class: Type['ClickHouseModel'], settings: Optional[dict] = None
# type: (str, Type['ClickHouseModel'], Optional[dict], Optional[dict]) -> Generator[tuple] ) -> Iterable[tuple]:
""" """
This method selects model_class namedtuples, instead of class instances. This method selects model_class namedtuples, instead of class instances.
Less memory consumption, greater speed Less memory consumption, greater speed
@ -67,11 +67,11 @@ class Database(InfiDatabase):
yield item yield item
def insert_tuples(self, model_class, model_tuples, batch_size=None, formatted=False): def insert_tuples(self, model_class: Type['ClickHouseModel'], model_tuples: Iterable[tuple],
# type: (Type['ClickHouseModel'], Iterable[tuple], Optional[int], bool) -> None batch_size: Optional[int] = None, formatted: bool = False) -> None:
""" """
Inserts model_class namedtuples Inserts model_class namedtuples
:param model_class: Clickhouse model, namedtuples are made from :param model_class: ClickHouse model, namedtuples are made from
:param model_tuples: An iterable of tuples to insert :param model_tuples: An iterable of tuples to insert
:param batch_size: Size of batch :param batch_size: Size of batch
:param formatted: If flag is set, tuples are expected to be ready to insert without calling field.to_db_string :param formatted: If flag is set, tuples are expected to be ready to insert without calling field.to_db_string

View File

@ -2,7 +2,7 @@
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
""" """
import datetime import datetime
from typing import List, Type, Union, Iterable, Generator from typing import List, Type, Union, Iterable, Generator, Optional
from django.db.models import Model as DjangoModel from django.db.models import Model as DjangoModel
from infi.clickhouse_orm import engines as infi_engines from infi.clickhouse_orm import engines as infi_engines
@ -14,8 +14,7 @@ from .utils import format_datetime
class InsertOnlyEngineMixin: class InsertOnlyEngineMixin:
def get_insert_batch(self, model_cls, objects): def get_insert_batch(self, model_cls: Type['ClickHouseModel'], objects: List[DjangoModel]) -> Iterable[tuple]:
# type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple]
""" """
Gets a list of model_cls instances to insert into database Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import :param model_cls: ClickHouseModel subclass to import
@ -69,8 +68,8 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
max_date=max_date, object_pks=','.join(object_pks)) max_date=max_date, object_pks=','.join(object_pks))
return connections[db_alias].select_tuples(query, model_cls) return connections[db_alias].select_tuples(query, model_cls)
def get_final_versions(self, model_cls, objects, date_col=None): def get_final_versions(self, model_cls: Type['ClickHouseModel'], objects: Iterable[DjangoModel],
# type: (Type['ClickHouseModel'], Iterable[DjangoModel], str) -> Generator[tuple] date_col: Optional[str] = None) -> Iterable[tuple]:
""" """
Get objects, that are currently stored in ClickHouse. Get objects, that are currently stored in ClickHouse.
Depending on the partition key this can be different for different models. Depending on the partition key this can be different for different models.
@ -82,7 +81,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
:return: A generator of named tuples, representing previous state :return: A generator of named tuples, representing previous state
""" """
def _dt_to_str(dt): # type: (Union[datetime.date, datetime.datetime]) -> str def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str:
if isinstance(dt, datetime.datetime): if isinstance(dt, datetime.datetime):
return format_datetime(dt, 0, db_alias=db_alias) return format_datetime(dt, 0, db_alias=db_alias)
elif isinstance(dt, datetime.date): elif isinstance(dt, datetime.date):
@ -123,8 +122,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
else: else:
return self._get_final_versions_by_final(*params) return self._get_final_versions_by_final(*params)
def get_insert_batch(self, model_cls, objects): def get_insert_batch(self, model_cls: Type['ClickHouseModel'], objects: List[DjangoModel]) -> Iterable[tuple]:
# type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple]
""" """
Gets a list of model_cls instances to insert into database Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import :param model_cls: ClickHouseModel subclass to import

View File

@ -23,7 +23,7 @@ class Migration:
""" """
operations = [] operations = []
def apply(self, db_alias, database=None): # type: (str, Optional[Database]) -> None def apply(self, db_alias: str, database: Optional[Database] = None) -> None:
""" """
Applies migration to given database Applies migration to given database
:param db_alias: Database alias to apply migration to :param db_alias: Database alias to apply migration to
@ -37,12 +37,11 @@ class Migration:
model_class = getattr(op, 'model_class', None) model_class = getattr(op, 'model_class', None)
hints = getattr(op, 'hints', {}) hints = getattr(op, 'hints', {})
if db_router.allow_migrate(db_alias, self.__module__, op, model=model_class, **hints): if db_router.allow_migrate(db_alias, self.__module__, op, model_class, **hints):
op.apply(database) op.apply(database)
def migrate_app(app_label, db_alias, up_to=9999, database=None): def migrate_app(app_label: str, db_alias: str, up_to: int = 9999, database: Optional[Database] = None) -> None:
# type: (str, str, int, Optional[Database]) -> None
""" """
Migrates given django app Migrates given django app
:param app_label: App label to migrate :param app_label: App label to migrate
@ -110,7 +109,7 @@ class MigrationHistory(ClickHouseModel):
engine = MergeTree('applied', ('db_alias', 'package_name', 'module_name')) engine = MergeTree('applied', ('db_alias', 'package_name', 'module_name'))
@classmethod @classmethod
def set_migration_applied(cls, db_alias, migrations_package, name): # type: (str, str, str) -> None def set_migration_applied(cls, db_alias: str, migrations_package: str, name: str) -> None:
""" """
Sets migration apply status Sets migration apply status
:param db_alias: Database alias migration is applied to :param db_alias: Database alias migration is applied to
@ -126,7 +125,7 @@ class MigrationHistory(ClickHouseModel):
applied=datetime.date.today()) applied=datetime.date.today())
@classmethod @classmethod
def get_applied_migrations(cls, db_alias, migrations_package): # type: (str, str) -> Set[str] def get_applied_migrations(cls, db_alias: str, migrations_package: str) -> Set[str]:
""" """
Returns applied migrations names Returns applied migrations names
:param db_alias: Database alias, to check :param db_alias: Database alias, to check

View File

@ -7,18 +7,17 @@ from typing import Optional, Any, Type, Set
import six import six
from django.db import transaction from django.db import transaction
from django.db.models import Manager as DjangoManager from django.db.models import QuerySet as DjangoQuerySet, Model as DjangoModel, Manager as DjangoManager
from django.db.models.manager import BaseManager from django.db.models.manager import BaseManager
from django.db.models.signals import post_save, post_delete from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver from django.dispatch import receiver
from django.db.models import QuerySet as DjangoQuerySet, Model as DjangoModel
from statsd.defaults.django import statsd from statsd.defaults.django import statsd
from .compatibility import update_returning_pk
from .configuration import config from .configuration import config
from .storages import Storage from .storages import Storage
from .utils import lazy_class_import from .utils import lazy_class_import
try: try:
from django_pg_returning.manager import UpdateReturningMixin from django_pg_returning.manager import UpdateReturningMixin
except ImportError: except ImportError:
@ -34,9 +33,9 @@ except ImportError:
class ClickHouseSyncRegisterMixin: class ClickHouseSyncRegisterMixin:
def _register_ops(self, operation, result): def _register_ops(self, operation, result, as_int: bool = False):
pk_name = self.model._meta.pk.name pk_name = self.model._meta.pk.name
pk_list = [getattr(item, pk_name) for item in result] pk_list = [getattr(item, pk_name) if isinstance(item, DjangoModel) else item for item in result]
self.model.register_clickhouse_operations(operation, *pk_list, using=self.db) self.model.register_clickhouse_operations(operation, *pk_list, using=self.db)
@ -72,35 +71,51 @@ class ClickHouseSyncBulkUpdateQuerySetMixin(ClickHouseSyncRegisterMixin, BulkUpd
return returning return returning
def bulk_update(self, *args, **kwargs): def _decorate_method(self, name: str, operation: str, args, kwargs):
if not hasattr(super(), name):
raise AttributeError("QuerySet has no attribute %s. Is django-pg-bulk-update library installed?" % name)
func = getattr(super(), name)
original_returning = kwargs.pop('returning', None) original_returning = kwargs.pop('returning', None)
kwargs['returning'] = self._update_returning_param(original_returning) kwargs['returning'] = self._update_returning_param(original_returning)
result = super().bulk_update(*args, **kwargs) result = func(*args, **kwargs)
self._register_ops('update', result) self._register_ops(operation, result)
return result.count() if original_returning is None else result return result.count() if original_returning is None else result
def bulk_update_or_create(self, *args, **kwargs): def pg_bulk_update(self, *args, **kwargs):
original_returning = kwargs.pop('returning', None) return self._decorate_method('pg_bulk_update', 'update', args, kwargs)
kwargs['returning'] = self._update_returning_param(original_returning)
result = super().bulk_update_or_create(*args, **kwargs) def pg_bulk_update_or_create(self, *args, **kwargs):
self._register_ops('update', result) return self._decorate_method('pg_bulk_update_or_create', 'update', args, kwargs)
return result.count() if original_returning is None else result
def pg_bulk_create(self, *args, **kwargs):
return self._decorate_method('pg_bulk_create', 'insert', args, kwargs)
class ClickHouseSyncQuerySetMixin(ClickHouseSyncRegisterMixin): class ClickHouseSyncQuerySetMixin(ClickHouseSyncRegisterMixin):
def update(self, **kwargs): def update(self, **kwargs):
# BUG I use update_returning method here. But it is not suitable for databases other then PostgreSQL pks = update_returning_pk(self, kwargs)
# and requires django-pg-update-returning installed self._register_ops('update', pks)
pk_name = self.model._meta.pk.name return len(pks)
res = self.only(pk_name).update_returning(**kwargs)
self._register_ops('update', res)
return len(res)
def bulk_create(self, objs, batch_size=None): def bulk_create(self, objs, batch_size=None):
objs = super().bulk_create(objs, batch_size=batch_size) objs = super().bulk_create(objs, batch_size=batch_size)
self._register_ops('insert', objs) self._register_ops('insert', objs)
return objs return objs
def bulk_update(self, objs, *args, **kwargs):
objs = list(objs)
# No need to register anything, if there are no objects.
# If objects are not models, django-pg-bulk-update method is called and pg_bulk_update will register items
if len(objs) == 0 or not isinstance(objs[0], DjangoModel):
return super().bulk_update(objs, *args, **kwargs)
# native django bulk_update requires each object to have a primary key
res = super().bulk_update(objs, *args, **kwargs)
self._register_ops('update', objs)
return res
# I add library dependant mixins to base classes only if libraries are installed # I add library dependant mixins to base classes only if libraries are installed
qs_bases = [ClickHouseSyncQuerySetMixin] qs_bases = [ClickHouseSyncQuerySetMixin]
@ -131,7 +146,7 @@ class ClickHouseSyncModel(DjangoModel):
abstract = True abstract = True
@classmethod @classmethod
def get_clickhouse_storage(cls): # type: () -> Storage def get_clickhouse_storage(cls) -> Storage:
""" """
Returns Storage instance to save clickhouse sync data to Returns Storage instance to save clickhouse sync data to
:return: :return:
@ -140,8 +155,7 @@ class ClickHouseSyncModel(DjangoModel):
return storage_cls() return storage_cls()
@classmethod @classmethod
def register_clickhouse_sync_model(cls, model_cls): def register_clickhouse_sync_model(cls, model_cls: Type['ClickHouseModel']) -> None:
# type: (Type['django_clickhouse.clickhouse_models.ClickHouseModel']) -> None
""" """
Registers ClickHouse model to listen to this model updates Registers ClickHouse model to listen to this model updates
:param model_cls: Model class to register :param model_cls: Model class to register
@ -153,7 +167,7 @@ class ClickHouseSyncModel(DjangoModel):
cls._clickhouse_sync_models.add(model_cls) cls._clickhouse_sync_models.add(model_cls)
@classmethod @classmethod
def get_clickhouse_sync_models(cls): # type: () -> Set['django_clickhouse.clickhouse_models.ClickHouseModel'] def get_clickhouse_sync_models(cls) -> Set['ClickHouseModel']:
""" """
Returns all clickhouse models, listening to this class Returns all clickhouse models, listening to this class
:return: A set of model classes to sync :return: A set of model classes to sync
@ -161,8 +175,7 @@ class ClickHouseSyncModel(DjangoModel):
return getattr(cls, '_clickhouse_sync_models', set()) return getattr(cls, '_clickhouse_sync_models', set())
@classmethod @classmethod
def register_clickhouse_operations(cls, operation, *model_pks, using=None): def register_clickhouse_operations(cls, operation: str, *model_pks: Any, using: Optional[str] = None) -> None:
# type: (str, *Any, Optional[str]) -> None
""" """
Registers model operation in storage Registers model operation in storage
:param operation: Operation type - one of [insert, update, delete) :param operation: Operation type - one of [insert, update, delete)
@ -170,7 +183,7 @@ class ClickHouseSyncModel(DjangoModel):
:param using: Database alias registered instances are from :param using: Database alias registered instances are from
:return: None :return: None
""" """
model_pks = ['%s.%d' % (using or config.DEFAULT_DB_ALIAS, pk) for pk in model_pks] model_pks = ['%s.%s' % (using or config.DEFAULT_DB_ALIAS, pk) for pk in model_pks]
def _on_commit(): def _on_commit():
for model_cls in cls.get_clickhouse_sync_models(): for model_cls in cls.get_clickhouse_sync_models():
@ -181,16 +194,16 @@ class ClickHouseSyncModel(DjangoModel):
storage = cls.get_clickhouse_storage() storage = cls.get_clickhouse_storage()
transaction.on_commit(_on_commit, using=using) transaction.on_commit(_on_commit, using=using)
def post_save(self, created, using=None): # type: (bool, Optional[str]) -> None def post_save(self, created: bool, using: Optional[str] = None) -> None:
self.register_clickhouse_operations('insert' if created else 'update', self.pk, using=using) self.register_clickhouse_operations('insert' if created else 'update', self.pk, using=using)
def post_delete(self, using=None): # type: (Optional[str]) -> None def post_delete(self, using: Optional[str] = None) -> None:
self.register_clickhouse_operations('delete', self.pk, using=using) self.register_clickhouse_operations('delete', self.pk, using=using)
@receiver(post_save) @receiver(post_save)
def post_save(sender, instance, **kwargs): def post_save(sender, instance, **kwargs):
statsd.incr('clickhouse.sync.post_save'.format('post_save'), 1) statsd.incr('%s.sync.post_save' % config.STATSD_PREFIX, 1)
if issubclass(sender, ClickHouseSyncModel): if issubclass(sender, ClickHouseSyncModel):
instance.post_save(kwargs.get('created', False), using=kwargs.get('using')) instance.post_save(kwargs.get('created', False), using=kwargs.get('using'))

View File

@ -1,4 +1,4 @@
from typing import Optional, Iterable, List from typing import Optional, Iterable, List, Type
from copy import copy from copy import copy
from infi.clickhouse_orm.database import Database from infi.clickhouse_orm.database import Database
@ -13,22 +13,22 @@ class QuerySet(InfiQuerySet):
Basic QuerySet to use Basic QuerySet to use
""" """
def __init__(self, model_cls, database=None): # type: (Type[InfiModel], Optional[Database]) -> None def __init__(self, model_cls: Type[InfiModel], database: Optional[Database] = None) -> None:
super(QuerySet, self).__init__(model_cls, database) super(QuerySet, self).__init__(model_cls, database)
self._db_alias = None self._db_alias = None
@property @property
def _database(self): # type: () -> Database def _database(self) -> Database:
# HACK for correct work of all infi.clickhouse-orm methods # HACK for correct work of all infi.clickhouse-orm methods
# There are no write QuerySet methods now, so I use for_write=False by default # There are no write QuerySet methods now, so I use for_write=False by default
return self.get_database(for_write=False) return self.get_database(for_write=False)
@_database.setter @_database.setter
def _database(self, database): # type: (Database) -> None def _database(self, database: Database) -> None:
# HACK for correct work of all infi.clickhouse-orm methods # HACK for correct work of all infi.clickhouse-orm methods
self._db = database self._db = database
def get_database(self, for_write=False): # type: (bool) -> Database def get_database(self, for_write: bool = False) -> Database:
""" """
Gets database to execute query on. Looks for constructor or using() method. Gets database to execute query on. Looks for constructor or using() method.
If nothing was set tries to get database from model class using router. If nothing was set tries to get database from model class using router.
@ -43,7 +43,7 @@ class QuerySet(InfiQuerySet):
return self._db return self._db
def using(self, db_alias): # type: (str) -> QuerySet def using(self, db_alias: str) -> 'QuerySet':
""" """
Sets database alias to use for this query Sets database alias to use for this query
:param db_alias: Database alias name from CLICKHOUSE_DATABASES config option :param db_alias: Database alias name from CLICKHOUSE_DATABASES config option
@ -54,7 +54,7 @@ class QuerySet(InfiQuerySet):
qs._db = None # Previous database should be forgotten qs._db = None # Previous database should be forgotten
return qs return qs
def all(self): # type: () -> QuerySet def all(self) -> 'QuerySet':
""" """
Returns all items of queryset Returns all items of queryset
:return: QuerySet :return: QuerySet
@ -70,7 +70,7 @@ class QuerySet(InfiQuerySet):
self.get_database(for_write=True).insert([instance]) self.get_database(for_write=True).insert([instance])
return instance return instance
def bulk_create(self, model_instances, batch_size=1000): # type: (Iterable[InfiModel], int) -> List[InfiModel] def bulk_create(self, model_instances: Iterable[InfiModel], batch_size: int = 1000) -> List[InfiModel]:
self.get_database(for_write=True).insert(model_instances=model_instances, batch_size=batch_size) self.get_database(for_write=True).insert(model_instances=model_instances, batch_size=batch_size)
return list(model_instances) return list(model_instances)

View File

@ -1,7 +1,7 @@
""" """
This file defines router to find appropriate database This file defines router to find appropriate database
""" """
from typing import Optional from typing import Type
import random import random
import six import six
@ -13,8 +13,7 @@ from .utils import lazy_class_import
class DefaultRouter: class DefaultRouter:
def db_for_read(self, model, **hints): def db_for_read(self, model: Type[ClickHouseModel], **hints) -> str:
# type: (ClickHouseModel, **dict) -> str
""" """
Gets database to read from for model Gets database to read from for model
:param model: Model to decide for :param model: Model to decide for
@ -23,8 +22,7 @@ class DefaultRouter:
""" """
return random.choice(model.read_db_aliases) return random.choice(model.read_db_aliases)
def db_for_write(self, model, **hints): def db_for_write(self, model: Type[ClickHouseModel], **hints) -> str:
# type: (ClickHouseModel, **dict) -> str
""" """
Gets database to write to for model Gets database to write to for model
:param model: Model to decide for :param model: Model to decide for
@ -33,8 +31,8 @@ class DefaultRouter:
""" """
return random.choice(model.write_db_aliases) return random.choice(model.write_db_aliases)
def allow_migrate(self, db_alias, app_label, operation, model=None, **hints): def allow_migrate(self, db_alias: str, app_label: str, operation: Operation,
# type: (str, str, Operation, Optional[ClickHouseModel], **dict) -> bool model=None, **hints) -> bool:
""" """
Checks if migration can be applied to given database Checks if migration can be applied to given database
:param db_alias: Database alias to check :param db_alias: Database alias to check

View File

@ -1,4 +1,4 @@
from typing import NamedTuple from typing import NamedTuple, Optional, Iterable, Type
import pytz import pytz
from django.db.models import Model as DjangoModel from django.db.models import Model as DjangoModel
@ -7,7 +7,19 @@ from .utils import model_to_dict
class Django2ClickHouseModelSerializer: class Django2ClickHouseModelSerializer:
def __init__(self, model_cls, fields=None, exclude_fields=None, writable=False, defaults=None): def __init__(self, model_cls: Type['ClickHouseModel'], fields: Optional[Iterable[str]] = None,
exclude_fields: Optional[Iterable[str]] = None, writable: bool = False,
defaults: Optional[dict] = None) -> None:
"""
Initializes serializer
:param model_cls: ClickHouseModel subclass to serialize to
:param fields: Optional. A list of fields to add into result tuple
:param exclude_fields: Fields to exclude from result tuple
:param writable: If fields parameter is not set directly,
this flags determines if only writable or all fields should be taken from model_cls
:param defaults: A dictionary of field: value which are taken as default values for model_cls instances
:return: None
"""
self._model_cls = model_cls self._model_cls = model_cls
if fields is not None: if fields is not None:
self.serialize_fields = fields self.serialize_fields = fields
@ -18,7 +30,7 @@ class Django2ClickHouseModelSerializer:
self._result_class = self._model_cls.get_tuple_class(defaults=defaults) self._result_class = self._model_cls.get_tuple_class(defaults=defaults)
self._fields = self._model_cls.fields(writable=False) self._fields = self._model_cls.fields(writable=False)
def _get_serialize_kwargs(self, obj): def _get_serialize_kwargs(self, obj: DjangoModel) -> dict:
data = model_to_dict(obj, fields=self.serialize_fields, exclude_fields=self.exclude_serialize_fields) data = model_to_dict(obj, fields=self.serialize_fields, exclude_fields=self.exclude_serialize_fields)
# Remove None values, they should be initialized as defaults # Remove None values, they should be initialized as defaults
@ -29,5 +41,5 @@ class Django2ClickHouseModelSerializer:
return result return result
def serialize(self, obj): # type: (DjangoModel) -> NamedTuple def serialize(self, obj: DjangoModel) -> NamedTuple:
return self._result_class(**self._get_serialize_kwargs(obj)) return self._result_class(**self._get_serialize_kwargs(obj))

View File

@ -39,7 +39,7 @@ class Storage:
But ClickHouse is idempotent to duplicate inserts. So we can insert one batch twice correctly. But ClickHouse is idempotent to duplicate inserts. So we can insert one batch twice correctly.
""" """
def pre_sync(self, import_key, **kwargs): # type: (str, **dict) -> None def pre_sync(self, import_key: str, **kwargs) -> None:
""" """
This method is called before import process starts This method is called before import process starts
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -48,7 +48,7 @@ class Storage:
""" """
pass pass
def post_sync(self, import_key, **kwargs): # type: (str, **dict) -> None def post_sync(self, import_key: str, **kwargs) -> None:
""" """
This method is called after import process has finished. This method is called after import process has finished.
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -57,7 +57,7 @@ class Storage:
""" """
pass pass
def post_sync_failed(self, import_key, **kwargs): # type: (str, **dict) -> None def post_sync_failed(self, import_key: str, **kwargs) -> None:
""" """
This method is called after import process has finished with exception. This method is called after import process has finished with exception.
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -66,7 +66,7 @@ class Storage:
""" """
pass pass
def post_batch_removed(self, import_key, batch_size): # type: (str, int) -> None def post_batch_removed(self, import_key: str, batch_size: int) -> None:
""" """
This method marks that batch has been removed in statsd This method marks that batch has been removed in statsd
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -76,8 +76,7 @@ class Storage:
key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key) key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key)
statsd.gauge(key, self.operations_count(import_key)) statsd.gauge(key, self.operations_count(import_key))
def operations_count(self, import_key, **kwargs): def operations_count(self, import_key: str, **kwargs) -> int:
# type: (str, **dict) -> int
""" """
Returns sync queue size Returns sync queue size
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -86,8 +85,7 @@ class Storage:
""" """
raise NotImplemented() raise NotImplemented()
def get_operations(self, import_key, count, **kwargs): def get_operations(self, import_key: str, count: int, **kwargs) -> List[Tuple[str, str]]:
# type: (str, int, **dict) -> List[Tuple[str, str]]
""" """
Must return a list of operations on the model. Must return a list of operations on the model.
Method should be error safe - if something goes wrong, import data should not be lost. Method should be error safe - if something goes wrong, import data should not be lost.
@ -98,7 +96,7 @@ class Storage:
""" """
raise NotImplemented() raise NotImplemented()
def register_operations(self, import_key, operation, *pks): # type: (str, str, *Any) -> int def register_operations(self, import_key: str, operation: str, *pks: Any) -> int:
""" """
Registers new incoming operation Registers new incoming operation
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -108,8 +106,7 @@ class Storage:
""" """
raise NotImplementedError() raise NotImplementedError()
def register_operations_wrapped(self, import_key, operation, *pks): def register_operations_wrapped(self, import_key: str, operation: str, *pks: Any) -> int:
# type: (str, str, *Any) -> int
""" """
This is a wrapper for register_operation method, checking main parameters. This is a wrapper for register_operation method, checking main parameters.
This method should be called from inner functions. This method should be called from inner functions.
@ -140,14 +137,14 @@ class Storage:
""" """
raise NotImplemented() raise NotImplemented()
def get_last_sync_time(self, import_key): # type: (str) -> Optional[datetime.datetime] def get_last_sync_time(self, import_key: str) -> Optional[datetime.datetime]:
""" """
Gets the last time, sync has been executed Gets the last time, sync has been executed
:return: datetime.datetime if last sync has been. Otherwise - None. :return: datetime.datetime if last sync has been. Otherwise - None.
""" """
raise NotImplemented() raise NotImplemented()
def set_last_sync_time(self, import_key, dt): # type: (str, datetime.datetime) -> None def set_last_sync_time(self, import_key: str, dt: datetime.datetime) -> None:
""" """
Sets successful sync time Sets successful sync time
:return: None :return: None

View File

@ -11,14 +11,14 @@ from .utils import get_subclasses
@shared_task(queue=config.CELERY_QUEUE) @shared_task(queue=config.CELERY_QUEUE)
def sync_clickhouse_model(cls): # type: (ClickHouseModel) -> None def sync_clickhouse_model(model_cls) -> None:
""" """
Syncs one batch of given ClickHouseModel Syncs one batch of given ClickHouseModel
:param cls: ClickHouseModel subclass :param model_cls: ClickHouseModel subclass
:return: None :return: None
""" """
cls.get_storage().set_last_sync_time(cls.get_import_key(), datetime.datetime.now()) model_cls.get_storage().set_last_sync_time(model_cls.get_import_key(), datetime.datetime.now())
cls.sync_batch_from_storage() model_cls.sync_batch_from_storage()
@shared_task(queue=config.CELERY_QUEUE) @shared_task(queue=config.CELERY_QUEUE)

View File

@ -18,7 +18,7 @@ from .database import connections
T = TypeVar('T') T = TypeVar('T')
def get_tz_offset(db_alias=None): # type: (Optional[str]) -> int def get_tz_offset(db_alias: Optional[str] = None) -> int:
""" """
Returns ClickHouse server timezone offset in minutes Returns ClickHouse server timezone offset in minutes
:param db_alias: The database alias used :param db_alias: The database alias used
@ -28,8 +28,8 @@ def get_tz_offset(db_alias=None): # type: (Optional[str]) -> int
return int(db.server_timezone.utcoffset(datetime.datetime.utcnow()).total_seconds() / 60) return int(db.server_timezone.utcoffset(datetime.datetime.utcnow()).total_seconds() / 60)
def format_datetime(dt, timezone_offset=0, day_end=False, db_alias=None): def format_datetime(dt: Union[datetime.date, datetime.datetime], timezone_offset: int = 0, day_end: bool = False,
# type: (Union[datetime.date, datetime.datetime], int, bool, Optional[str]) -> str db_alias: Optional[str] = None) -> str:
""" """
Formats datetime and date objects to format that can be used in WHERE conditions of query Formats datetime and date objects to format that can be used in WHERE conditions of query
:param dt: datetime.datetime or datetime.date object :param dt: datetime.datetime or datetime.date object
@ -58,9 +58,9 @@ def format_datetime(dt, timezone_offset=0, day_end=False, db_alias=None):
return server_dt.strftime("%Y-%m-%d %H:%M:%S") return server_dt.strftime("%Y-%m-%d %H:%M:%S")
def module_exists(module_name): # type: (str) -> bool def module_exists(module_name: str) -> bool:
""" """
Checks if moudle exists Checks if module exists
:param module_name: Dot-separated module name :param module_name: Dot-separated module name
:return: Boolean :return: Boolean
""" """
@ -69,7 +69,7 @@ def module_exists(module_name): # type: (str) -> bool
return spam_spec is not None return spam_spec is not None
def lazy_class_import(obj): # type: (Union[str, Any]) -> Any def lazy_class_import(obj: Union[str, Any]) -> Any:
""" """
If string is given, imports object by given module path. If string is given, imports object by given module path.
Otherwise returns the object Otherwise returns the object
@ -88,7 +88,7 @@ def lazy_class_import(obj): # type: (Union[str, Any]) -> Any
return obj return obj
def get_subclasses(cls, recursive=False): # type: (T, bool) -> Set[T] def get_subclasses(cls: T, recursive: bool = False) -> Set[T]:
""" """
Gets all subclasses of given class Gets all subclasses of given class
Attention!!! Classes would be found only if they were imported before using this function Attention!!! Classes would be found only if they were imported before using this function
@ -105,8 +105,8 @@ def get_subclasses(cls, recursive=False): # type: (T, bool) -> Set[T]
return subclasses return subclasses
def model_to_dict(instance, fields=None, exclude_fields=None): def model_to_dict(instance: DjangoModel, fields: Optional[Iterable[str]] = None,
# type: (DjangoModel, Optional[Iterable[str]], Optional[Iterable[str]]) -> Dict[str, Any] exclude_fields: Optional[Iterable[str]] = None) -> Dict[str, Any]:
""" """
Standard model_to_dict ignores some fields if they have invalid naming Standard model_to_dict ignores some fields if they have invalid naming
:param instance: Object to convert to dictionary :param instance: Object to convert to dictionary

View File

@ -2,10 +2,15 @@
This file contains sample models to use in tests This file contains sample models to use in tests
""" """
from django.db import models from django.db import models
from django.db.models import QuerySet
from django.db.models.manager import BaseManager from django.db.models.manager import BaseManager
from django_pg_returning import UpdateReturningModel from django_pg_returning import UpdateReturningModel
from django_clickhouse.models import ClickHouseSyncModel, ClickHouseSyncQuerySet from django_clickhouse.models import ClickHouseSyncModel, ClickHouseSyncQuerySet, ClickHouseSyncQuerySetMixin
class NativeQuerySet(ClickHouseSyncQuerySetMixin, QuerySet):
pass
class TestQuerySet(ClickHouseSyncQuerySet): class TestQuerySet(ClickHouseSyncQuerySet):
@ -16,8 +21,13 @@ class TestManager(BaseManager.from_queryset(TestQuerySet)):
pass pass
class NativeManager(BaseManager.from_queryset(NativeQuerySet)):
pass
class TestModel(UpdateReturningModel, ClickHouseSyncModel): class TestModel(UpdateReturningModel, ClickHouseSyncModel):
objects = TestManager() objects = TestManager()
native_objects = NativeManager()
value = models.IntegerField() value = models.IntegerField()
created_date = models.DateField() created_date = models.DateField()
@ -26,6 +36,7 @@ class TestModel(UpdateReturningModel, ClickHouseSyncModel):
class SecondaryTestModel(UpdateReturningModel, ClickHouseSyncModel): class SecondaryTestModel(UpdateReturningModel, ClickHouseSyncModel):
objects = TestManager() objects = TestManager()
native_objects = NativeManager()
value = models.IntegerField() value = models.IntegerField()
created_date = models.DateField() created_date = models.DateField()

View File

@ -1,3 +1,6 @@
import sys
from unittest import skipIf
from django.test import TestCase from django.test import TestCase
from django_clickhouse.compatibility import namedtuple from django_clickhouse.compatibility import namedtuple
@ -10,12 +13,16 @@ class NamedTupleTest(TestCase):
self.assertTupleEqual((1, 2, 4), tuple(TestTuple(1, 2, 4))) self.assertTupleEqual((1, 2, 4), tuple(TestTuple(1, 2, 4)))
self.assertTupleEqual((1, 2, 4), tuple(TestTuple(a=1, b=2, c=4))) self.assertTupleEqual((1, 2, 4), tuple(TestTuple(a=1, b=2, c=4)))
def test_exceptions(self): @skipIf(sys.version_info < (3, 7),
"On python < 3.7 this error is not raised, as not given defaults are filled by None")
def test_no_required_value(self):
TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults=[3]) TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults=[3])
# BUG On python < 3.7 this error is not raised, as not given defaults are filled by None with self.assertRaises(TypeError):
# with self.assertRaises(TypeError): TestTuple(b=1, c=4)
# TestTuple(b=1, c=4)
def test_duplicate_value(self):
TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults=[3])
with self.assertRaises(TypeError): with self.assertRaises(TypeError):
TestTuple(1, 2, 3, c=4) TestTuple(1, 2, 3, c=4)

View File

@ -1,6 +1,9 @@
import datetime import datetime
from unittest import skipIf
import django
from django.test import TransactionTestCase from django.test import TransactionTestCase
from django.utils.timezone import now
from tests.clickhouse_models import ClickHouseTestModel, ClickHouseSecondTestModel, ClickHouseCollapseTestModel, \ from tests.clickhouse_models import ClickHouseTestModel, ClickHouseSecondTestModel, ClickHouseCollapseTestModel, \
ClickHouseMultiTestModel ClickHouseMultiTestModel
@ -60,6 +63,76 @@ class TestOperations(TransactionTestCase):
self.assertSetEqual({('insert', "%s.%d" % (self.db_alias, instance.pk)) for instance in items}, self.assertSetEqual({('insert', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))) set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
@skipIf(django.VERSION < (2, 2), "bulk_update method has been introduced in django 2.2")
def test_native_bulk_update(self):
items = list(self.django_model.objects.filter(pk__in={1, 2}))
for instance in items:
instance.value = instance.pk * 10
self.django_model.native_objects.bulk_update(items, ['value'])
items = list(self.django_model.objects.filter(pk__in={1, 2}))
self.assertEqual(2, len(items))
for instance in items:
self.assertEqual(instance.value, instance.pk * 10)
self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
def test_pg_bulk_create(self):
now_dt = now()
res = self.django_model.objects.pg_bulk_create([
{'value': i, 'created': now_dt, 'created_date': now_dt.date()}
for i in range(5)
])
self.assertEqual(5, res)
items = list(self.django_model.objects.filter(value__lt=100).order_by('value'))
self.assertEqual(5, len(items))
for i, instance in enumerate(items):
self.assertEqual(instance.created, now_dt)
self.assertEqual(instance.created_date, now_dt.date())
self.assertEqual(i, instance.value)
self.assertSetEqual({('insert', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
def test_pg_bulk_update(self):
items = list(self.django_model.objects.filter(pk__in={1, 2}))
self.django_model.objects.pg_bulk_update([
{'id': instance.pk, 'value': instance.pk * 10}
for instance in items
])
items = list(self.django_model.objects.filter(pk__in={1, 2}))
self.assertEqual(2, len(items))
for instance in items:
self.assertEqual(instance.value, instance.pk * 10)
self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
def test_pg_bulk_update_or_create(self):
items = list(self.django_model.objects.filter(pk__in={1, 2}))
data = [{
'id': instance.pk,
'value': instance.pk * 10,
'created_date': instance.created_date,
'created': instance.created
} for instance in items] + [{'id': 11, 'value': 110, 'created_date': datetime.date.today(), 'created': now()}]
self.django_model.objects.pg_bulk_update_or_create(data)
items = list(self.django_model.objects.filter(pk__in={1, 2, 11}))
self.assertEqual(3, len(items))
for instance in items:
self.assertEqual(instance.value, instance.pk * 10)
self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
def test_get_or_create(self): def test_get_or_create(self):
instance, created = self.django_model.objects. \ instance, created = self.django_model.objects. \
get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'created': datetime.datetime.now(), get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'created': datetime.datetime.now(),