mirror of
https://github.com/carrotquest/django-clickhouse.git
synced 2025-04-15 14:22:01 +03:00
Compare commits
22 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
3b79ad697a | ||
|
95f1bdb26a | ||
|
85d27a65f9 | ||
|
43a1821408 | ||
|
1b247e95fe | ||
|
244b4fbd37 | ||
|
0ac4615649 | ||
|
ee723fca2a | ||
|
b70ec45edf | ||
|
fc362e852f | ||
|
b2cb098349 | ||
|
9fb93060ef | ||
|
ac8d34c261 | ||
|
01cec7c999 | ||
|
2beb449b67 | ||
|
404584fc01 | ||
|
0e65f15333 | ||
|
12069db14e | ||
|
0a7f0c1219 | ||
|
25b7d26f84 | ||
|
612239d586 | ||
|
5a8e2ec7a4 |
52
.github/workflows/python-tests.yml
vendored
52
.github/workflows/python-tests.yml
vendored
|
@ -11,12 +11,56 @@ jobs:
|
|||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ["3.6", "3.7", "3.8", "3.9"]
|
||||
postgres-version: ["9.6", "10", "11", "12"]
|
||||
django-version: ["2.1", "2.2", "3.0", "3.1", "3.2"]
|
||||
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
|
||||
postgres-version: ["9.6", "10", "11", "12", "13", "14", "15", "16"]
|
||||
django-version: ["3.2", "4.0", "4.1", "4.2", "5.0", "5.1"]
|
||||
clickhouse-version: ["latest"]
|
||||
redis-version: ["latest"]
|
||||
|
||||
exclude:
|
||||
# Django 4.0+ doesn't support PostgreSQL 9.6
|
||||
- django-version: "4.0"
|
||||
postgres-version: "9.6"
|
||||
- django-version: "4.1"
|
||||
postgres-version: "9.6"
|
||||
- django-version: "4.2"
|
||||
postgres-version: "9.6"
|
||||
- django-version: "5.0"
|
||||
postgres-version: "9.6"
|
||||
- django-version: "5.1"
|
||||
postgres-version: "9.6"
|
||||
|
||||
# Django 4.1+ doesn't support PostgreSQL 10
|
||||
- django-version: "4.1"
|
||||
postgres-version: "10"
|
||||
- django-version: "4.2"
|
||||
postgres-version: "10"
|
||||
- django-version: "5.0"
|
||||
postgres-version: "10"
|
||||
- django-version: "5.1"
|
||||
postgres-version: "10"
|
||||
|
||||
# Django 4.2+ doesn't support PostgreSQL 11
|
||||
- django-version: "4.2"
|
||||
postgres-version: "11"
|
||||
- django-version: "5.0"
|
||||
postgres-version: "11"
|
||||
- django-version: "5.1"
|
||||
postgres-version: "11"
|
||||
|
||||
# Django 5.1+ doesn't support PostgreSQL 12
|
||||
- django-version: "5.1"
|
||||
postgres-version: "12"
|
||||
|
||||
# Django 5.0+ does not support python 3.8, 3.9
|
||||
- django-version: "5.0"
|
||||
python-version: "3.8"
|
||||
- django-version: "5.0"
|
||||
python-version: "3.9"
|
||||
- django-version: "5.1"
|
||||
python-version: "3.8"
|
||||
- django-version: "5.1"
|
||||
python-version: "3.9"
|
||||
|
||||
services:
|
||||
postgres:
|
||||
image: postgres:${{ matrix.postgres-version }}
|
||||
|
|
|
@ -1,2 +1,4 @@
|
|||
[](https://github.com/carrotquest/django-clickhouse/actions/workflows/python-tests.yml) [](https://github.com/carrotquest/django-clickhouse/actions/workflows/python-publish.yml) [](https://pepy.tech/project/django-clickhouse)
|
||||
|
||||
# django-clickhouse
|
||||
Documentation is [here](docs/index.md)
|
||||
Documentation is [here](docs/index.md)
|
||||
|
|
|
@ -24,7 +24,7 @@ services:
|
|||
build:
|
||||
context: .
|
||||
args:
|
||||
- PYTHON_VER=latest
|
||||
- PYTHON_IMAGE_TAG=latest
|
||||
environment:
|
||||
- REDIS_HOST=redis_db
|
||||
- PGHOST=postgres_db
|
||||
|
|
|
@ -57,6 +57,30 @@ By default migrations are applied to all [CLICKHOUSE_DATABASES](configuration.md
|
|||
Note: migrations are only applied, with django `default` database.
|
||||
So if you call `python manage.py migrate --database=secondary` they wouldn't be applied.
|
||||
|
||||
## Admin migration command
|
||||
In order to make migrations separately from django's `manage.py migrate` command,
|
||||
this library implements custom `manage.py` command `clickhouse_migrate`.
|
||||
|
||||
Usage:
|
||||
```bash
|
||||
python manage.py clickhouse_migrate [--help] [--database <db_alias>] [--verbosity {0,1,2,3}] [app_label] [migration_number]
|
||||
```
|
||||
|
||||
Parameters
|
||||
* `app_label: Optional[str]` - If set, migrates only given django application
|
||||
* `migration_number: Optional[int]` - If set, migrate django app with `app_label` to migration with this number
|
||||
**Important note**: Library currently does not support unapplying migrations.
|
||||
If already applied migration is given - it will do noting.
|
||||
* `--database: Optional[str]` - If set, migrates only this database alias from [CLICKHOUSE_DATABASES config parameter](configuration.md#clickhouse_databases)
|
||||
* `--verbosity: Optional[int] = 1` - Level of debug output. See [here](https://docs.djangoproject.com/en/3.2/ref/django-admin/#cmdoption-verbosity) for more details.
|
||||
* `--help` - Print help
|
||||
|
||||
|
||||
## Migration operations enhancements
|
||||
* `RunSQL`, `RunPython`
|
||||
Can accept `hints: dict = {}` parameter in order to set migration database alias (`force_migrate_on_databases: List[str]` key) or model (`model: Union[str, Type[ClickHouseModel]]` key)
|
||||
|
||||
|
||||
## Migration algorithm
|
||||
- Get a list of databases from `CLICKHOUSE_DATABASES` setting. Migrate them one by one.
|
||||
- Find all django apps from `INSTALLED_APPS` setting, which have no `readonly=True` attribute and have `migrate=True` attribute. Migrate them one by one.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Usage overview
|
||||
## Requirements
|
||||
At the begging I expect, that you already have:
|
||||
At the beginning I expect, that you already have:
|
||||
1. [ClickHouse](https://clickhouse.tech/docs/en/) (with [ZooKeeper](https://zookeeper.apache.org/), if you use replication)
|
||||
2. Relational database used with [Django](https://www.djangoproject.com/). For instance, [PostgreSQL](https://www.postgresql.org/)
|
||||
3. [Django database set up](https://docs.djangoproject.com/en/3.0/ref/databases/)
|
||||
|
@ -96,6 +96,19 @@ class ClickHouseUser(ClickHouseModel):
|
|||
engine = MergeTree('birthday', ('birthday',))
|
||||
```
|
||||
|
||||
**Important note**: `clickhouse_model.py` file is not anyhow imported by django initialization code. So if your models are not used anywhere excluding this file, you should import it somewhere in your code if you want synchroniztion working correctly. For instance, you can customise [AppConfig](https://docs.djangoproject.com/en/5.0/ref/applications/#django.apps.AppConfig.ready) like:
|
||||
|
||||
```python
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class MyAppConfig(AppConfig):
|
||||
name = 'my_app'
|
||||
|
||||
def ready(self):
|
||||
from my_app.clickhouse_models import ClickHouseUser
|
||||
```
|
||||
|
||||
## Migration to create table in ClickHouse
|
||||
1. Read [migrations](migrations.md) section
|
||||
2. Create `clickhouse_migrations` package in your django app
|
||||
|
@ -112,7 +125,7 @@ class ClickHouseUser(ClickHouseModel):
|
|||
4. Add content to file `0001_initial.py`:
|
||||
```python
|
||||
from django_clickhouse import migrations
|
||||
from my_app.cilckhouse_models import ClickHouseUser
|
||||
from my_app.clickhouse_models import ClickHouseUser
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
operations = [
|
||||
|
|
|
@ -25,12 +25,17 @@ Router is a class, defining 3 methods:
|
|||
Returns `database alias` to use for given `model` for `SELECT` queries.
|
||||
* `def db_for_write(self, model: ClickHouseModel, **hints) -> str`
|
||||
Returns `database alias` to use for given `model` for `INSERT` queries.
|
||||
* `def allow_migrate(self, db_alias: str, app_label: str, operation: Operation, model: Optional[ClickHouseModel] = None, **hints: dict) -> bool`
|
||||
* `def allow_migrate(self, db_alias: str, app_label: str, operation: Operation, **hints: dict) -> bool`
|
||||
Checks if migration `operation` should be applied in django application `app_label` on database `db_alias`.
|
||||
Optional `model` field can be used to determine migrations on concrete model.
|
||||
Optional `hints` help to pass additional info which can be used to test migrations availability on concrete model.
|
||||
|
||||
By default [CLICKHOUSE_DATABASE_ROUTER](configuration.md#clickhouse_database_router) is used.
|
||||
It gets routing information from model fields, described below.
|
||||
It also gives ability to use 2 kinds of hints:
|
||||
* `force_migrate_on_databases: Iterable[str]` - concrete database aliases where migration should be applied
|
||||
* `model: Type[ClickHouseModel]` - a model class, to read routing attributes from.
|
||||
Can be set as class or its string name.
|
||||
If name is set, class is searched in current `<app_label>.<config.MODELS_MODULE>` package.
|
||||
|
||||
## ClickHouseModel routing attributes
|
||||
Default database router reads routing settings from model attributes.
|
||||
|
|
4
setup.py
4
setup.py
|
@ -13,8 +13,8 @@ with open('requirements.txt') as f:
|
|||
|
||||
setup(
|
||||
name='django-clickhouse',
|
||||
version='1.0.4',
|
||||
packages=['django_clickhouse'],
|
||||
version='1.2.2',
|
||||
packages=['django_clickhouse', 'django_clickhouse.management.commands'],
|
||||
package_dir={'': 'src'},
|
||||
url='https://github.com/carrotquest/django-clickhouse',
|
||||
license='BSD 3-clause "New" or "Revised" License',
|
||||
|
|
|
@ -9,6 +9,7 @@ from itertools import chain
|
|||
from typing import List, Tuple, Iterable, Set, Any, Optional
|
||||
|
||||
from django.db.models import Model as DjangoModel, QuerySet as DjangoQuerySet
|
||||
from django.utils.timezone import now
|
||||
from infi.clickhouse_orm.engines import CollapsingMergeTree
|
||||
from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase
|
||||
from statsd.defaults.django import statsd
|
||||
|
@ -290,7 +291,7 @@ class ClickHouseModel(InfiModel, metaclass=ClickHouseModelMeta):
|
|||
res = (datetime.datetime.now() - last_sync_time).total_seconds() >= cls.get_sync_delay()
|
||||
logger.debug('django-clickhouse: need_sync returned %s for class %s as no last sync found'
|
||||
' (now: %s, last: %s, delay: %d)'
|
||||
% (res, cls.__name__, datetime.datetime.now().isoformat(), last_sync_time.isoformat(),
|
||||
% (res, cls.__name__, now().isoformat(), last_sync_time.isoformat(),
|
||||
cls.get_sync_delay()))
|
||||
|
||||
return res
|
||||
|
|
|
@ -2,7 +2,8 @@
|
|||
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
|
||||
"""
|
||||
import datetime
|
||||
from typing import List, Type, Union, Iterable, Optional
|
||||
import logging
|
||||
from typing import List, Type, Union, Iterable, Optional, Tuple, NamedTuple
|
||||
|
||||
from django.db.models import Model as DjangoModel
|
||||
from infi.clickhouse_orm import engines as infi_engines
|
||||
|
@ -14,6 +15,9 @@ from .database import connections
|
|||
from .utils import format_datetime
|
||||
|
||||
|
||||
logger = logging.getLogger('django-clickhouse')
|
||||
|
||||
|
||||
class InsertOnlyEngineMixin:
|
||||
def get_insert_batch(self, model_cls: Type[ClickHouseModel], objects: List[DjangoModel]) -> Iterable[tuple]:
|
||||
"""
|
||||
|
@ -45,43 +49,64 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
|
|||
self.version_col = kwargs.pop('version_col', None)
|
||||
super(CollapsingMergeTree, self).__init__(*args, **kwargs)
|
||||
|
||||
def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns):
|
||||
def _get_final_versions_by_version(self, db_alias: str, model_cls: Type[ClickHouseModel], object_pks: Iterable[str],
|
||||
columns: str, date_range_filter: str = '') -> List[NamedTuple]:
|
||||
"""
|
||||
Performs request to ClickHouse in order to fetch latest version for each object pk
|
||||
:param db_alias: ClickHouse database alias used
|
||||
:param model_cls: Model class for which data is fetched
|
||||
:param object_pks: Objects primary keys to filter by
|
||||
:param columns: Columns to fetch
|
||||
:param date_range_filter: Optional date_range_filter which speeds up query if date_col is set
|
||||
:return: List of named tuples with requested columns
|
||||
"""
|
||||
if date_range_filter:
|
||||
date_range_filter = 'PREWHERE {}'.format(date_range_filter)
|
||||
|
||||
query = """
|
||||
SELECT {columns} FROM $table WHERE (`{pk_column}`, `{version_col}`) IN (
|
||||
SELECT `{pk_column}`, MAX(`{version_col}`)
|
||||
FROM $table
|
||||
PREWHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
|
||||
AND `{pk_column}` IN ({object_pks})
|
||||
GROUP BY `{pk_column}`
|
||||
)
|
||||
""".format(columns=','.join(columns), version_col=self.version_col, date_col=date_col, pk_column=self.pk_column,
|
||||
min_date=min_date, max_date=max_date, object_pks=','.join(object_pks))
|
||||
SELECT {columns}
|
||||
FROM $table
|
||||
{date_range_filter}
|
||||
WHERE `{pk_column}` IN ({object_pks})
|
||||
ORDER BY `{pk_column}`, `{version_col}` DESC
|
||||
LIMIT 1 BY `{pk_column}`
|
||||
""".format(columns=','.join(columns), version_col=self.version_col, pk_column=self.pk_column,
|
||||
date_range_filter=date_range_filter, object_pks=','.join(object_pks), sign_col=self.sign_col)
|
||||
|
||||
return connections[db_alias].select_tuples(query, model_cls)
|
||||
|
||||
def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns):
|
||||
def _get_final_versions_by_final(self, db_alias: str, model_cls: Type[ClickHouseModel], object_pks: Iterable[str],
|
||||
columns: str, date_range_filter: str = '') -> List[NamedTuple]:
|
||||
"""
|
||||
Performs request to ClickHouse in order to fetch latest version for each object pk
|
||||
:param db_alias: ClickHouse database alias used
|
||||
:param model_cls: Model class for which data is fetched
|
||||
:param object_pks: Objects primary keys to filter by
|
||||
:param columns: Columns to fetch
|
||||
:param date_range_filter: Optional date_range_filter which speeds up query if date_col is set
|
||||
:return: List of named tuples with requested columns
|
||||
"""
|
||||
if date_range_filter:
|
||||
date_range_filter += ' AND'
|
||||
|
||||
query = """
|
||||
SELECT {columns} FROM $table FINAL
|
||||
WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
|
||||
AND `{pk_column}` IN ({object_pks})
|
||||
WHERE {date_range_filter} `{pk_column}` IN ({object_pks})
|
||||
"""
|
||||
query = query.format(columns=','.join(columns), date_col=date_col, pk_column=self.pk_column, min_date=min_date,
|
||||
max_date=max_date, object_pks=','.join(object_pks))
|
||||
query = query.format(columns=','.join(columns), pk_column=self.pk_column, date_range_filter=date_range_filter,
|
||||
object_pks=','.join(object_pks))
|
||||
return connections[db_alias].select_tuples(query, model_cls)
|
||||
|
||||
def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel],
|
||||
date_col: Optional[str] = None) -> Iterable[tuple]:
|
||||
def _get_date_rate_filter(self, objects, model_cls: Type[ClickHouseModel], db_alias: str,
|
||||
date_col: Optional[str]) -> str:
|
||||
"""
|
||||
Get objects, that are currently stored in ClickHouse.
|
||||
Depending on the partition key this can be different for different models.
|
||||
In common case, this method is optimized for date field that doesn't change.
|
||||
It also supposes primary key to by self.pk_column
|
||||
:param model_cls: ClickHouseModel subclass to import
|
||||
:param objects: Objects for which final versions are searched
|
||||
:param date_col: Optional column name, where partiion date is hold. Defaults to self.date_col
|
||||
:return: A generator of named tuples, representing previous state
|
||||
Generates datetime filter to speed up final queries, if date_col is present
|
||||
:param objects: Objects, which are inserted
|
||||
:param model_cls: Model class for which data is fetched
|
||||
:param db_alias: ClickHouse database alias used
|
||||
:param date_col: Optional column name, where partition date is hold. Defaults to self.date_col
|
||||
:return: String to add to WHERE or PREWHERE query section
|
||||
"""
|
||||
|
||||
def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str:
|
||||
if isinstance(dt, datetime.datetime):
|
||||
return format_datetime(dt, 0, db_alias=db_alias)
|
||||
|
@ -90,10 +115,15 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
|
|||
else:
|
||||
raise Exception('Invalid date or datetime object: `%s`' % dt)
|
||||
|
||||
if not objects:
|
||||
raise StopIteration()
|
||||
|
||||
date_col = date_col or self.date_col
|
||||
|
||||
if not date_col:
|
||||
logger.warning('django-clickhouse: date_col is not provided for model %s.'
|
||||
' This can cause significant performance problems while fetching data.'
|
||||
' It is worth inheriting CollapsingMergeTree engine with custom get_final_versions() method,'
|
||||
' based on your partition_key' % model_cls)
|
||||
return ''
|
||||
|
||||
min_date, max_date = None, None
|
||||
for obj in objects:
|
||||
obj_date = getattr(obj, date_col)
|
||||
|
@ -104,19 +134,39 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
|
|||
if max_date is None or max_date < obj_date:
|
||||
max_date = obj_date
|
||||
|
||||
min_date = _dt_to_str(min_date)
|
||||
max_date = _dt_to_str(max_date)
|
||||
|
||||
return "`{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'".\
|
||||
format(min_date=min_date, max_date=max_date, date_col=date_col)
|
||||
|
||||
def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel],
|
||||
date_col: Optional[str] = None) -> Iterable[tuple]:
|
||||
"""
|
||||
Get objects, that are currently stored in ClickHouse.
|
||||
Depending on the partition key this can be different for different models.
|
||||
In common case, this method is optimized for date field that doesn't change.
|
||||
It also supposes primary key to by self.pk_column
|
||||
:param model_cls: ClickHouseModel subclass to import
|
||||
:param objects: Objects for which final versions are searched
|
||||
:param date_col: Optional column name, where partition date is hold. Defaults to self.date_col
|
||||
:return: A generator of named tuples, representing previous state
|
||||
"""
|
||||
if not objects:
|
||||
raise StopIteration()
|
||||
|
||||
object_pks = [str(getattr(obj, self.pk_column)) for obj in objects]
|
||||
|
||||
db_alias = model_cls.get_database_alias()
|
||||
|
||||
min_date = _dt_to_str(min_date)
|
||||
max_date = _dt_to_str(max_date)
|
||||
date_range_filter = self._get_date_rate_filter(objects, model_cls, db_alias, date_col)
|
||||
|
||||
# Get fields. Sign is replaced to negative for further processing
|
||||
columns = list(model_cls.fields(writable=True).keys())
|
||||
columns.remove(self.sign_col)
|
||||
columns.append('-1 AS sign')
|
||||
|
||||
params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns)
|
||||
params = (db_alias, model_cls, object_pks, columns, date_range_filter)
|
||||
|
||||
if self.version_col:
|
||||
return self._get_final_versions_by_version(*params)
|
||||
|
|
0
src/django_clickhouse/management/__init__.py
Normal file
0
src/django_clickhouse/management/__init__.py
Normal file
|
@ -0,0 +1,45 @@
|
|||
"""
|
||||
Django command that applies migrations for ClickHouse database
|
||||
"""
|
||||
import json
|
||||
|
||||
from django.apps import apps as django_apps
|
||||
from django.core.management import BaseCommand, CommandParser
|
||||
|
||||
from ...configuration import config
|
||||
from ...migrations import migrate_app
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'Migrates ClickHouse databases'
|
||||
requires_migrations_checks = False
|
||||
|
||||
def add_arguments(self, parser: CommandParser) -> None:
|
||||
parser.add_argument('app_label', nargs='?', type=str,
|
||||
help='Django App name to migrate. By default all found apps are migrated.')
|
||||
|
||||
parser.add_argument('migration_number', nargs='?', type=int,
|
||||
help='Migration number in selected django app to migrate to.'
|
||||
' By default all available migrations are applied.'
|
||||
' Note that library currently have no ability rollback migrations')
|
||||
|
||||
parser.add_argument('--database', '-d', nargs='?', type=str, required=False, choices=list(config.DATABASES.keys()),
|
||||
help='ClickHouse database alias key from CLICKHOUSE_DATABASES django setting.'
|
||||
' By default migrations are applied to all databases.')
|
||||
|
||||
def handle(self, *args, **options) -> None:
|
||||
apps = [options['app_label']] if options['app_label'] else [app.name for app in django_apps.get_app_configs()]
|
||||
databases = [options['database']] if options['database'] else list(config.DATABASES.keys())
|
||||
kwargs = {'up_to': options['migration_number']} if options['migration_number'] else {}
|
||||
|
||||
self.stdout.write(self.style.MIGRATE_HEADING(
|
||||
"Applying ClickHouse migrations for apps %s in databases %s" % (json.dumps(apps), json.dumps(databases))))
|
||||
|
||||
any_migrations_applied = False
|
||||
for app_label in apps:
|
||||
for db_alias in databases:
|
||||
res = migrate_app(app_label, db_alias, verbosity=options['verbosity'], **kwargs)
|
||||
any_migrations_applied = any_migrations_applied or res
|
||||
|
||||
if not any_migrations_applied:
|
||||
self.stdout.write("No migrations to apply")
|
|
@ -10,6 +10,7 @@ from django.dispatch import receiver
|
|||
|
||||
# In order to support all operations import here
|
||||
from infi.clickhouse_orm.migrations import * # noqa F401, F403
|
||||
from infi.clickhouse_orm.migrations import RunSQL as LibRunSQL, RunPython as LibRunPython
|
||||
|
||||
from infi.clickhouse_orm.database import ServerError, DatabaseException
|
||||
from infi.clickhouse_orm.fields import StringField, DateField
|
||||
|
@ -39,49 +40,69 @@ class Migration:
|
|||
database = database or connections[db_alias]
|
||||
|
||||
for op in self.operations:
|
||||
model_class = getattr(op, 'model_class', None)
|
||||
hints = getattr(op, 'hints', {})
|
||||
|
||||
if db_router.allow_migrate(db_alias, self.__module__, op, model_class, **hints):
|
||||
if db_router.allow_migrate(db_alias, self.__module__, op, **hints):
|
||||
op.apply(database)
|
||||
|
||||
|
||||
def migrate_app(app_label: str, db_alias: str, up_to: int = 9999, database: Optional[Database] = None) -> None:
|
||||
def migrate_app(app_label: str, db_alias: str, up_to: int = 9999, database: Optional[Database] = None,
|
||||
verbosity: int = 1) -> bool:
|
||||
"""
|
||||
Migrates given django app
|
||||
:param app_label: App label to migrate
|
||||
:param db_alias: Database alias to migrate
|
||||
:param up_to: Migration number to migrate to
|
||||
:param database: Sometimes I want to pass db object directly for testing purposes
|
||||
:return: None
|
||||
:param verbosity: 0-4, уровень verbosity вывода
|
||||
:return: True if any migration has been applied
|
||||
"""
|
||||
# Can't migrate such connection, just skip it
|
||||
if config.DATABASES[db_alias].get('readonly', False):
|
||||
return
|
||||
if verbosity > 1:
|
||||
print('Skipping database "%s": marked as readonly' % db_alias)
|
||||
return False
|
||||
|
||||
# Ignore force not migrated databases
|
||||
if not config.DATABASES[db_alias].get('migrate', True):
|
||||
return
|
||||
if verbosity > 1:
|
||||
print('Skipping database "%s": migrations are restricted in configuration' % db_alias)
|
||||
return False
|
||||
|
||||
migrations_package = "%s.%s" % (app_label, config.MIGRATIONS_PACKAGE)
|
||||
|
||||
if module_exists(migrations_package):
|
||||
database = database or connections[db_alias]
|
||||
migration_history_model = lazy_class_import(config.MIGRATION_HISTORY_MODEL)
|
||||
if not module_exists(migrations_package):
|
||||
if verbosity > 1:
|
||||
print('Skipping migrations for app "%s": no migration_package "%s"' % (app_label, migrations_package))
|
||||
return False
|
||||
|
||||
applied_migrations = migration_history_model.get_applied_migrations(db_alias, migrations_package)
|
||||
modules = import_submodules(migrations_package)
|
||||
unapplied_migrations = set(modules.keys()) - applied_migrations
|
||||
database = database or connections[db_alias]
|
||||
migration_history_model = lazy_class_import(config.MIGRATION_HISTORY_MODEL)
|
||||
|
||||
for name in sorted(unapplied_migrations):
|
||||
applied_migrations = migration_history_model.get_applied_migrations(db_alias, migrations_package)
|
||||
modules = import_submodules(migrations_package)
|
||||
unapplied_migrations = set(modules.keys()) - applied_migrations
|
||||
|
||||
any_applied = False
|
||||
for name in sorted(unapplied_migrations):
|
||||
if int(name[:4]) > up_to:
|
||||
break
|
||||
|
||||
if verbosity > 0:
|
||||
print('Applying ClickHouse migration %s for app %s in database %s' % (name, app_label, db_alias))
|
||||
migration = modules[name].Migration()
|
||||
migration.apply(db_alias, database=database)
|
||||
|
||||
migration_history_model.set_migration_applied(db_alias, migrations_package, name)
|
||||
migration = modules[name].Migration()
|
||||
migration.apply(db_alias, database=database)
|
||||
|
||||
if int(name[:4]) >= up_to:
|
||||
break
|
||||
migration_history_model.set_migration_applied(db_alias, migrations_package, name)
|
||||
any_applied = True
|
||||
|
||||
if not any_applied:
|
||||
if verbosity > 1:
|
||||
print('No migrations to apply for app "%s" does not exist' % app_label)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@receiver(post_migrate)
|
||||
|
@ -91,21 +112,19 @@ def clickhouse_migrate(sender, **kwargs):
|
|||
return
|
||||
|
||||
if kwargs.get('using', DJANGO_DEFAULT_DB_ALIAS) != DJANGO_DEFAULT_DB_ALIAS:
|
||||
# Не надо выполнять синхронизацию для каждого шарда. Только один раз.
|
||||
# Don't call sync for every database. Just once.
|
||||
return
|
||||
|
||||
app_name = kwargs['app_config'].name
|
||||
|
||||
for db_alias in config.DATABASES:
|
||||
migrate_app(app_name, db_alias)
|
||||
migrate_app(app_name, db_alias, verbosity=kwargs.get('verbosity', 1))
|
||||
|
||||
|
||||
class MigrationHistory(ClickHouseModel):
|
||||
"""
|
||||
A model for storing which migrations were already applied to database.
|
||||
This
|
||||
"""
|
||||
|
||||
db_alias = StringField()
|
||||
package_name = StringField()
|
||||
module_name = StringField()
|
||||
|
@ -157,3 +176,15 @@ class MigrationHistory(ClickHouseModel):
|
|||
@classmethod
|
||||
def table_name(cls):
|
||||
return 'django_clickhouse_migrations'
|
||||
|
||||
|
||||
class RunSQL(LibRunSQL):
|
||||
def __init__(self, *args, hints: Optional[dict] = None, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.hints = hints or {}
|
||||
|
||||
|
||||
class RunPython(LibRunPython):
|
||||
def __init__(self, *args, hints: Optional[dict] = None, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.hints = hints or {}
|
||||
|
|
|
@ -64,7 +64,7 @@ class ClickHouseSyncBulkUpdateQuerySetMixin(ClickHouseSyncRegisterMixin, BulkUpd
|
|||
if returning is None:
|
||||
returning = pk_name
|
||||
elif isinstance(returning, str):
|
||||
returning = [pk_name, returning]
|
||||
returning = [pk_name, returning] if returning != '*' else '*'
|
||||
else:
|
||||
returning = list(returning) + [pk_name]
|
||||
|
||||
|
|
|
@ -30,24 +30,26 @@ class DefaultRouter:
|
|||
"""
|
||||
return random.choice(model.write_db_aliases)
|
||||
|
||||
def allow_migrate(self, db_alias: str, app_label: str, operation: Operation,
|
||||
model=None, **hints) -> bool:
|
||||
def allow_migrate(self, db_alias: str, app_label: str, operation: Operation, **hints) -> bool:
|
||||
"""
|
||||
Checks if migration can be applied to given database
|
||||
:param db_alias: Database alias to check
|
||||
:param app_label: App from which migration is got
|
||||
:param operation: Operation object to perform
|
||||
:param model: Model migration is applied to
|
||||
:param hints: Hints to make correct decision
|
||||
:return: boolean
|
||||
"""
|
||||
if hints.get("force_migrate_on_databases", None):
|
||||
return db_alias in hints["force_migrate_on_databases"]
|
||||
|
||||
if hints.get('model'):
|
||||
model = '%s.%s.%s' % (app_label, config.MODELS_MODULE, hints['model']) \
|
||||
if isinstance(hints['model'], str) else hints['model']
|
||||
model = hints.get('model') or getattr(operation, 'model_class', None)
|
||||
if model is None:
|
||||
raise ValueError('"model_class" attribute is not defined for operation "%s". '
|
||||
'Please provide "force_migrate_on_databases" or "model" in hints.'
|
||||
% operation.__class__.__name__)
|
||||
|
||||
model = '%s.%s.%s' % (app_label, config.MODELS_MODULE, model) \
|
||||
if isinstance(model, str) else model
|
||||
model = lazy_class_import(model)
|
||||
|
||||
if operation.__class__ not in {CreateTable, DropTable}:
|
||||
|
|
|
@ -11,12 +11,15 @@ import logging
|
|||
from typing import Any, Optional, List, Tuple
|
||||
|
||||
import os
|
||||
|
||||
from celery.utils.nodenames import gethostname
|
||||
from django.utils.timezone import now
|
||||
from statsd.defaults.django import statsd
|
||||
|
||||
from .configuration import config
|
||||
from .exceptions import ConfigurationError, RedisLockTimeoutError
|
||||
from .redis import redis_zadd
|
||||
from .utils import check_pid, get_subclasses, SingletonMeta
|
||||
from .utils import check_pid_exists, get_subclasses, SingletonMeta
|
||||
|
||||
logger = logging.getLogger('django-clickhouse')
|
||||
|
||||
|
@ -186,8 +189,7 @@ class RedisStorage(Storage, metaclass=SingletonMeta):
|
|||
|
||||
def get_operations(self, import_key, count, **kwargs):
|
||||
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
|
||||
res = self._redis.zrangebyscore(ops_key, '-inf', datetime.datetime.now().timestamp(), start=0, num=count,
|
||||
withscores=True)
|
||||
res = self._redis.zrangebyscore(ops_key, '-inf', now().timestamp(), start=0, num=count, withscores=True)
|
||||
|
||||
if res:
|
||||
ops, scores = zip(*res)
|
||||
|
@ -214,19 +216,31 @@ class RedisStorage(Storage, metaclass=SingletonMeta):
|
|||
# Block process to be single threaded. Default sync delay is 10 * default sync delay.
|
||||
# It can be changed for model, by passing `lock_timeout` argument to pre_sync
|
||||
lock = self.get_lock(import_key, **kwargs)
|
||||
current_host_name = gethostname()
|
||||
lock_pid_key = self.REDIS_KEY_LOCK_PID.format(import_key=import_key)
|
||||
try:
|
||||
lock.acquire()
|
||||
self._redis.set(lock_pid_key, os.getpid())
|
||||
self._redis.set(lock_pid_key, '%s:%s' % (current_host_name, os.getpid()))
|
||||
except RedisLockTimeoutError:
|
||||
statsd.incr('%s.sync.%s.lock.timeout' % (config.STATSD_PREFIX, import_key))
|
||||
|
||||
# Lock is busy. But If the process has been killed, I don't want to wait any more.
|
||||
# Let's check if pid exists
|
||||
pid = int(self._redis.get(lock_pid_key) or 0)
|
||||
if pid and not check_pid(pid):
|
||||
# I assume that lock has been killed if it works on the same host (other than localhost)
|
||||
# and there is no process alive.
|
||||
# I also assume that there are no hosts with same hostname other than localhost.
|
||||
# Note: previously value contained only pid. Let's support old value for back compatibility
|
||||
active_lock_data = self._redis.get(lock_pid_key).split(b":")
|
||||
active_pid = int(active_lock_data[-1] or 0)
|
||||
active_host_name = active_lock_data[0] \
|
||||
if len(active_lock_data) > 1 and active_lock_data[0] != "localhost" else None
|
||||
|
||||
if (
|
||||
active_pid and active_host_name
|
||||
and active_host_name == current_host_name and not check_pid_exists(active_pid)
|
||||
):
|
||||
statsd.incr('%s.sync.%s.lock.hard_release' % (config.STATSD_PREFIX, import_key))
|
||||
logger.warning('django-clickhouse: hard releasing lock "%s" locked by pid %d (process is dead)'
|
||||
% (import_key, pid))
|
||||
% (import_key, active_pid))
|
||||
self._redis.delete(lock_pid_key)
|
||||
lock.hard_release()
|
||||
self.pre_sync(import_key, **kwargs)
|
||||
|
|
|
@ -3,7 +3,7 @@ import importlib
|
|||
from typing import Type, Union
|
||||
|
||||
from celery import shared_task
|
||||
from django.conf import settings
|
||||
from django.apps import apps as django_apps
|
||||
from infi.clickhouse_orm.utils import import_submodules
|
||||
|
||||
from django_clickhouse.clickhouse_models import ClickHouseModel
|
||||
|
@ -32,8 +32,8 @@ def clickhouse_auto_sync() -> None:
|
|||
:return: None
|
||||
"""
|
||||
# Import all model modules
|
||||
for app in settings.INSTALLED_APPS:
|
||||
package_name = "%s.%s" % (app, config.MODELS_MODULE)
|
||||
for app in django_apps.get_app_configs():
|
||||
package_name = "%s.%s" % (app.name, config.MODELS_MODULE)
|
||||
try:
|
||||
module = importlib.import_module(package_name)
|
||||
if hasattr(module, '__path__'):
|
||||
|
|
|
@ -127,7 +127,7 @@ def model_to_dict(instance: DjangoModel, fields: Optional[Iterable[str]] = None
|
|||
return data
|
||||
|
||||
|
||||
def check_pid(pid):
|
||||
def check_pid_exists(pid):
|
||||
"""
|
||||
Check For the existence of a unix pid.
|
||||
"""
|
||||
|
|
6
tests/apps.py
Normal file
6
tests/apps.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class UnitTestAppConfig(AppConfig):
|
||||
name = 'tests'
|
||||
verbose_name = "Unit test app"
|
10
tests/fixtures/test_model.json
vendored
10
tests/fixtures/test_model.json
vendored
|
@ -5,7 +5,7 @@
|
|||
"fields": {
|
||||
"value": 100,
|
||||
"created_date": "2018-01-01",
|
||||
"created": "2018-01-01 00:00:00"
|
||||
"created": "2018-01-01 00:00:00+0000"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -14,7 +14,7 @@
|
|||
"fields": {
|
||||
"value": 200,
|
||||
"created_date": "2018-02-01",
|
||||
"created": "2018-02-01 00:00:00"
|
||||
"created": "2018-02-01 00:00:00+0000"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -23,7 +23,7 @@
|
|||
"fields": {
|
||||
"value": 300,
|
||||
"created_date": "2018-03-01",
|
||||
"created": "2018-03-01 00:00:00"
|
||||
"created": "2018-03-01 00:00:00+0000"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -32,7 +32,7 @@
|
|||
"fields": {
|
||||
"value": 400,
|
||||
"created_date": "2018-04-01",
|
||||
"created": "2018-04-01 00:00:00"
|
||||
"created": "2018-04-01 00:00:00+0000"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -41,7 +41,7 @@
|
|||
"fields": {
|
||||
"value": 500,
|
||||
"created_date": "2018-05-01",
|
||||
"created": "2018-05-01 00:00:00"
|
||||
"created": "2018-05-01 00:00:00+0000"
|
||||
}
|
||||
}
|
||||
]
|
4
tests/fixtures/test_secondary_model.json
vendored
4
tests/fixtures/test_secondary_model.json
vendored
|
@ -5,7 +5,7 @@
|
|||
"fields": {
|
||||
"value": 100,
|
||||
"created_date": "2018-01-01",
|
||||
"created": "2018-02-01 00:00:00"
|
||||
"created": "2018-02-01 00:00:00+0000"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -14,7 +14,7 @@
|
|||
"fields": {
|
||||
"value": 200,
|
||||
"created_date": "2018-02-01",
|
||||
"created": "2018-02-01 00:00:00"
|
||||
"created": "2018-02-01 00:00:00+0000"
|
||||
}
|
||||
}
|
||||
]
|
|
@ -8,6 +8,7 @@ from time import sleep
|
|||
|
||||
import datetime
|
||||
|
||||
from django.utils.timezone import now
|
||||
|
||||
# set Django environment
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
@ -25,7 +26,7 @@ logger = logging.getLogger('django-clickhouse')
|
|||
def create(batch_size=1000, test_time=60, period=1, **kwargs):
|
||||
for iteration in range(int(test_time / period)):
|
||||
res = TestModel.objects.db_manager('test_db').bulk_create([
|
||||
TestModel(created=datetime.datetime.now(), created_date='2018-01-01', value=iteration * batch_size + i)
|
||||
TestModel(created=now(), created_date='2018-01-01', value=iteration * batch_size + i)
|
||||
for i in range(batch_size)
|
||||
])
|
||||
logger.info('django-clickhouse: test created %d records' % len(res))
|
||||
|
@ -54,8 +55,8 @@ def sync(period=1, test_time=60, **kwargs):
|
|||
if kwargs['once']:
|
||||
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
||||
else:
|
||||
start = datetime.datetime.now()
|
||||
while (datetime.datetime.now() - start).total_seconds() < test_time:
|
||||
start = now()
|
||||
while (now() - start).total_seconds() < test_time:
|
||||
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
||||
sleep(period)
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ This file contains django settings to run tests with runtests.py
|
|||
from os import environ
|
||||
|
||||
SECRET_KEY = 'fake-key'
|
||||
USE_TZ = True
|
||||
|
||||
DATABASES = {
|
||||
'default': {
|
||||
|
@ -56,7 +57,9 @@ LOGGING = {
|
|||
|
||||
INSTALLED_APPS = [
|
||||
"src",
|
||||
"tests"
|
||||
|
||||
# This app is included with config in order to test all is working fine here
|
||||
"tests.apps.UnitTestAppConfig"
|
||||
]
|
||||
|
||||
CLICKHOUSE_DATABASES = {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import datetime
|
||||
|
||||
from django.test import TestCase
|
||||
from django.utils.timezone import now
|
||||
|
||||
from tests.clickhouse_models import ClickHouseTestModel
|
||||
|
||||
|
@ -20,11 +21,11 @@ class ClickHouseModelTest(TestCase):
|
|||
self.assertTrue(ClickHouseTestModel.need_sync())
|
||||
|
||||
# Time hasn't passed - no sync
|
||||
self.storage.set_last_sync_time(ClickHouseTestModel.get_import_key(), datetime.datetime.now())
|
||||
self.storage.set_last_sync_time(ClickHouseTestModel.get_import_key(), now())
|
||||
self.assertFalse(ClickHouseTestModel.need_sync())
|
||||
|
||||
# Time has passed
|
||||
sync_delay = ClickHouseTestModel.get_sync_delay()
|
||||
self.storage.set_last_sync_time(ClickHouseTestModel.get_import_key(),
|
||||
datetime.datetime.now() - datetime.timedelta(seconds=sync_delay + 1))
|
||||
now() - datetime.timedelta(seconds=sync_delay + 1))
|
||||
self.assertTrue(ClickHouseTestModel.need_sync())
|
||||
|
|
|
@ -88,12 +88,25 @@ class CollapsingMergeTreeTest(TestCase):
|
|||
self.objects, date_col='created')
|
||||
self._test_final_versions(final_versions)
|
||||
|
||||
def test_get_final_versions_by_final_no_date_col(self):
|
||||
ClickHouseCollapseTestModel.engine.date_col = None
|
||||
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
|
||||
self.objects)
|
||||
self._test_final_versions(final_versions)
|
||||
|
||||
def test_get_final_versions_by_version_datetime(self):
|
||||
ClickHouseCollapseTestModel.engine.version_col = 'version'
|
||||
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
|
||||
self.objects, date_col='created')
|
||||
self._test_final_versions(final_versions)
|
||||
|
||||
def test_get_final_versions_by_version_no_date_col(self):
|
||||
ClickHouseCollapseTestModel.engine.version_col = 'version'
|
||||
ClickHouseCollapseTestModel.engine.date_col = None
|
||||
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
|
||||
self.objects)
|
||||
self._test_final_versions(final_versions)
|
||||
|
||||
def test_versions(self):
|
||||
ClickHouseCollapseTestModel.engine.version_col = 'version'
|
||||
batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects)
|
||||
|
|
|
@ -1,14 +1,19 @@
|
|||
from django.test import TestCase, override_settings
|
||||
from django_clickhouse.migrations import MigrationHistory
|
||||
from typing import List, Dict, Any
|
||||
from unittest import mock
|
||||
|
||||
from django.conf import settings
|
||||
from django.test import TestCase, override_settings
|
||||
|
||||
from django_clickhouse.configuration import config
|
||||
from django_clickhouse.database import connections
|
||||
from django_clickhouse.migrations import migrate_app
|
||||
from django_clickhouse.management.commands.clickhouse_migrate import Command
|
||||
from django_clickhouse.migrations import MigrationHistory, migrate_app
|
||||
from django_clickhouse.routers import DefaultRouter
|
||||
from tests.clickhouse_models import ClickHouseTestModel
|
||||
|
||||
|
||||
class NoMigrateRouter(DefaultRouter):
|
||||
def allow_migrate(self, db_alias, app_label, operation, model=None, **hints):
|
||||
def allow_migrate(self, db_alias, app_label, operation, **hints):
|
||||
return False
|
||||
|
||||
|
||||
|
@ -53,3 +58,104 @@ class MigrateAppTest(TestCase):
|
|||
def test_readonly_connections(self):
|
||||
migrate_app('tests', 'readonly')
|
||||
self.assertFalse(table_exists(connections['readonly'], ClickHouseTestModel))
|
||||
|
||||
|
||||
@override_settings(CLICKHOUSE_MIGRATE_WITH_DEFAULT_DB=False)
|
||||
@mock.patch('django_clickhouse.management.commands.clickhouse_migrate.migrate_app', return_value=True)
|
||||
class MigrateDjangoCommandTest(TestCase):
|
||||
APP_LABELS = ('src', 'tests')
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.cmd = Command()
|
||||
|
||||
def test_handle_all(self, migrate_app_mock):
|
||||
self.cmd.handle(verbosity=3, app_label=None, database=None, migration_number=None)
|
||||
|
||||
self.assertEqual(len(config.DATABASES.keys()) * len(self.APP_LABELS), migrate_app_mock.call_count)
|
||||
for db_alias in config.DATABASES.keys():
|
||||
for app_label in self.APP_LABELS:
|
||||
migrate_app_mock.assert_any_call(app_label, db_alias, verbosity=3)
|
||||
|
||||
def test_handle_app(self, migrate_app_mock):
|
||||
self.cmd.handle(verbosity=3, app_label='tests', database=None, migration_number=None)
|
||||
|
||||
self.assertEqual(len(config.DATABASES.keys()), migrate_app_mock.call_count)
|
||||
for db_alias in config.DATABASES.keys():
|
||||
migrate_app_mock.assert_any_call('tests', db_alias, verbosity=3)
|
||||
|
||||
def test_handle_database(self, migrate_app_mock):
|
||||
self.cmd.handle(verbosity=3, database='default', app_label=None, migration_number=None)
|
||||
|
||||
self.assertEqual(len(settings.INSTALLED_APPS), migrate_app_mock.call_count)
|
||||
for app_label in self.APP_LABELS:
|
||||
migrate_app_mock.assert_any_call(app_label, 'default', verbosity=3)
|
||||
|
||||
def test_handle_app_and_database(self, migrate_app_mock):
|
||||
self.cmd.handle(verbosity=3, app_label='tests', database='default', migration_number=None)
|
||||
|
||||
migrate_app_mock.assert_called_with('tests', 'default', verbosity=3)
|
||||
|
||||
def test_handle_migration_number(self, migrate_app_mock):
|
||||
self.cmd.handle(verbosity=3, database='default', app_label='tests', migration_number=1)
|
||||
|
||||
migrate_app_mock.assert_called_with('tests', 'default', up_to=1, verbosity=3)
|
||||
|
||||
def _test_parser_results(self, argv: List[str], expected: Dict[str, Any]) -> None:
|
||||
"""
|
||||
Tests if parser process input correctly.
|
||||
Checks only expected parameters, ignores others.
|
||||
:param argv: List of string arguments from command line
|
||||
:param expected: Dictionary of expected results
|
||||
:return: None
|
||||
:raises AssertionError: If expected result is incorrect
|
||||
"""
|
||||
parser = self.cmd.create_parser('./manage.py', 'clickhouse_migrate')
|
||||
|
||||
options = parser.parse_args(argv)
|
||||
|
||||
# Copied from django.core.management.base.BaseCommand.run_from_argv('...')
|
||||
cmd_options = vars(options)
|
||||
cmd_options.pop('args', ())
|
||||
|
||||
self.assertDictEqual(expected, {opt: cmd_options[opt] for opt in expected.keys()})
|
||||
|
||||
def test_parser(self, _):
|
||||
with self.subTest('Simple'):
|
||||
self._test_parser_results([], {
|
||||
'app_label': None,
|
||||
'database': None,
|
||||
'migration_number': None,
|
||||
'verbosity': 1
|
||||
})
|
||||
|
||||
with self.subTest('App label'):
|
||||
self._test_parser_results(['tests'], {
|
||||
'app_label': 'tests',
|
||||
'database': None,
|
||||
'migration_number': None,
|
||||
'verbosity': 1
|
||||
})
|
||||
|
||||
with self.subTest('App label and migration number'):
|
||||
self._test_parser_results(['tests', '123'], {
|
||||
'app_label': 'tests',
|
||||
'database': None,
|
||||
'migration_number': 123,
|
||||
'verbosity': 1
|
||||
})
|
||||
|
||||
with self.subTest('Database'):
|
||||
self._test_parser_results(['--database', 'default'], {
|
||||
'app_label': None,
|
||||
'database': 'default',
|
||||
'migration_number': None,
|
||||
'verbosity': 1
|
||||
})
|
||||
|
||||
with self.subTest('Verbosity'):
|
||||
self._test_parser_results(['--verbosity', '2'], {
|
||||
'app_label': None,
|
||||
'database': None,
|
||||
'migration_number': None,
|
||||
'verbosity': 2
|
||||
})
|
||||
|
|
|
@ -40,7 +40,7 @@ class TestOperations(TransactionTestCase):
|
|||
|
||||
def test_save(self):
|
||||
# INSERT operation
|
||||
instance = self.django_model(created_date=datetime.date.today(), created=datetime.datetime.now(), value=2)
|
||||
instance = self.django_model(created_date=datetime.date.today(), created=now(), value=2)
|
||||
instance.save()
|
||||
self.assertListEqual([('insert', "%s.%d" % (self.db_alias, instance.pk))],
|
||||
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
|
||||
|
@ -52,13 +52,13 @@ class TestOperations(TransactionTestCase):
|
|||
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
|
||||
|
||||
def test_create(self):
|
||||
instance = self.django_model.objects.create(pk=100555, created_date=datetime.date.today(),
|
||||
created=datetime.datetime.now(), value=2)
|
||||
instance = self.django_model.objects.create(pk=100555, created_date=datetime.date.today(), created=now(),
|
||||
value=2)
|
||||
self.assertListEqual([('insert', "%s.%d" % (self.db_alias, instance.pk))],
|
||||
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
|
||||
|
||||
def test_bulk_create(self):
|
||||
items = [self.django_model(created_date=datetime.date.today(), created=datetime.datetime.now(), value=i)
|
||||
items = [self.django_model(created_date=datetime.date.today(), created=now(), value=i)
|
||||
for i in range(5)]
|
||||
items = self.django_model.objects.bulk_create(items)
|
||||
self.assertEqual(5, len(items))
|
||||
|
@ -99,6 +99,22 @@ class TestOperations(TransactionTestCase):
|
|||
self.assertSetEqual({('insert', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
|
||||
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
|
||||
|
||||
def test_pg_bulk_create_returning(self):
|
||||
now_dt = now()
|
||||
res = self.django_model.objects.pg_bulk_create([
|
||||
{'value': i, 'created': now_dt, 'created_date': now_dt.date()}
|
||||
for i in range(5)
|
||||
], returning='*')
|
||||
|
||||
self.assertEqual(5, len(res))
|
||||
for i, instance in enumerate(res):
|
||||
self.assertEqual(instance.created, now_dt)
|
||||
self.assertEqual(instance.created_date, now_dt.date())
|
||||
self.assertEqual(i, instance.value)
|
||||
|
||||
self.assertSetEqual({('insert', "%s.%d" % (self.db_alias, instance.pk)) for instance in res},
|
||||
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
|
||||
|
||||
def test_pg_bulk_update(self):
|
||||
items = list(self.django_model.objects.filter(pk__in={1, 2}))
|
||||
|
||||
|
@ -115,6 +131,21 @@ class TestOperations(TransactionTestCase):
|
|||
self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
|
||||
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
|
||||
|
||||
def test_pg_bulk_update_returning(self):
|
||||
items = list(self.django_model.objects.filter(pk__in={1, 2}))
|
||||
|
||||
res = self.django_model.objects.pg_bulk_update([
|
||||
{'id': instance.pk, 'value': instance.pk * 10}
|
||||
for instance in items
|
||||
], returning='*')
|
||||
|
||||
self.assertEqual(2, len(res))
|
||||
for instance in res:
|
||||
self.assertEqual(instance.value, instance.pk * 10)
|
||||
|
||||
self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
|
||||
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
|
||||
|
||||
def test_pg_bulk_update_or_create(self):
|
||||
items = list(self.django_model.objects.filter(pk__in={1, 2}))
|
||||
|
||||
|
@ -135,9 +166,28 @@ class TestOperations(TransactionTestCase):
|
|||
self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
|
||||
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
|
||||
|
||||
def test_pg_bulk_update_or_create_returning(self):
|
||||
items = list(self.django_model.objects.filter(pk__in={1, 2}))
|
||||
|
||||
data = [{
|
||||
'id': instance.pk,
|
||||
'value': instance.pk * 10,
|
||||
'created_date': instance.created_date,
|
||||
'created': instance.created
|
||||
} for instance in items] + [{'id': 11, 'value': 110, 'created_date': datetime.date.today(), 'created': now()}]
|
||||
|
||||
res = self.django_model.objects.pg_bulk_update_or_create(data, returning='*')
|
||||
|
||||
self.assertEqual(3, len(res))
|
||||
for instance in res:
|
||||
self.assertEqual(instance.value, instance.pk * 10)
|
||||
|
||||
self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in res},
|
||||
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
|
||||
|
||||
def test_get_or_create(self):
|
||||
instance, created = self.django_model.objects. \
|
||||
get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'created': datetime.datetime.now(),
|
||||
get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'created': now(),
|
||||
'value': 2})
|
||||
|
||||
self.assertTrue(created)
|
||||
|
@ -153,8 +203,7 @@ class TestOperations(TransactionTestCase):
|
|||
|
||||
def test_update_or_create(self):
|
||||
instance, created = self.django_model.objects. \
|
||||
update_or_create(pk=100, defaults={'created_date': datetime.date.today(),
|
||||
'created': datetime.datetime.now(), 'value': 2})
|
||||
update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'created': now(), 'value': 2})
|
||||
self.assertTrue(created)
|
||||
self.assertListEqual([('insert', "%s.%d" % (self.db_alias, instance.pk))],
|
||||
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
|
||||
|
@ -179,7 +228,7 @@ class TestOperations(TransactionTestCase):
|
|||
|
||||
def test_bulk_create_returning(self):
|
||||
items = [
|
||||
self.django_model(created_date=datetime.date.today(), created=datetime.datetime.now(), value=i)
|
||||
self.django_model(created_date=datetime.date.today(), created=now(), value=i)
|
||||
for i in range(5)
|
||||
]
|
||||
items = self.django_model.objects.bulk_create_returning(items)
|
||||
|
@ -210,7 +259,7 @@ class TestOperations(TransactionTestCase):
|
|||
|
||||
def test_save_returning(self):
|
||||
# INSERT operation
|
||||
instance = self.django_model(created_date=datetime.date.today(), created=datetime.datetime.now(), value=2)
|
||||
instance = self.django_model(created_date=datetime.date.today(), created=now(), value=2)
|
||||
instance.save_returning()
|
||||
self.assertListEqual([('insert', "%s.%d" % (self.db_alias, instance.pk))],
|
||||
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
|
||||
|
@ -234,6 +283,7 @@ class TestOperations(TransactionTestCase):
|
|||
|
||||
|
||||
class TestSecondaryOperations(TestOperations):
|
||||
# from django.db.models.fields import *
|
||||
fixtures = ['test_secondary_model']
|
||||
django_model = SecondaryTestModel
|
||||
clickhouse_model = ClickHouseSecondTestModel
|
||||
|
|
59
tests/test_routers.py
Normal file
59
tests/test_routers.py
Normal file
|
@ -0,0 +1,59 @@
|
|||
from django.test import SimpleTestCase
|
||||
|
||||
from django_clickhouse.migrations import RunSQL, CreateTable
|
||||
from django_clickhouse.routers import DefaultRouter
|
||||
from tests.clickhouse_models import ClickHouseTestModel
|
||||
|
||||
|
||||
class DefaultRouterAllowMigrateTest(SimpleTestCase):
|
||||
def setUp(self):
|
||||
self.router = DefaultRouter()
|
||||
self.operation = RunSQL('SELECT 1')
|
||||
|
||||
def test_hints_model_class(self):
|
||||
hints = {'model': ClickHouseTestModel}
|
||||
|
||||
with self.subTest('Allow migrate'):
|
||||
res = self.router.allow_migrate('default', 'tests', self.operation, **hints)
|
||||
self.assertTrue(res)
|
||||
|
||||
with self.subTest('Reject migrate'):
|
||||
res = self.router.allow_migrate('other', 'tests', self.operation, **hints)
|
||||
self.assertFalse(res)
|
||||
|
||||
def test_hints_model_name(self):
|
||||
hints = {'model': 'ClickHouseTestModel'}
|
||||
|
||||
with self.subTest('Allow migrate'):
|
||||
res = self.router.allow_migrate('default', 'tests', self.operation, **hints)
|
||||
self.assertTrue(res)
|
||||
|
||||
with self.subTest('Reject migrate'):
|
||||
res = self.router.allow_migrate('other', 'tests', self.operation, **hints)
|
||||
self.assertFalse(res)
|
||||
|
||||
def test_hints_force_migrate_on_databases(self):
|
||||
hints = {'force_migrate_on_databases': ['secondary']}
|
||||
|
||||
with self.subTest('Allow migrate'):
|
||||
res = self.router.allow_migrate('secondary', 'apps', self.operation, **hints)
|
||||
self.assertTrue(res)
|
||||
|
||||
with self.subTest('Reject migrate'):
|
||||
res = self.router.allow_migrate('default', 'apps', self.operation, **hints)
|
||||
self.assertFalse(res)
|
||||
|
||||
def test_model_operation(self):
|
||||
with self.subTest('Allow migrate'):
|
||||
operation = CreateTable(ClickHouseTestModel)
|
||||
res = self.router.allow_migrate('default', 'apps', operation)
|
||||
self.assertTrue(res)
|
||||
|
||||
with self.subTest('Reject migrate'):
|
||||
operation = CreateTable(ClickHouseTestModel)
|
||||
res = self.router.allow_migrate('other', 'apps', operation)
|
||||
self.assertFalse(res)
|
||||
|
||||
def test_no_model(self):
|
||||
with self.assertRaises(ValueError):
|
||||
self.router.allow_migrate('default', 'apps', self.operation)
|
|
@ -30,7 +30,7 @@ class SyncTest(TransactionTestCase):
|
|||
ClickHouseTestModel.get_storage().flush()
|
||||
|
||||
def test_simple(self):
|
||||
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
|
||||
obj = TestModel.objects.create(value=1, created=now(), created_date=datetime.date.today())
|
||||
ClickHouseTestModel.sync_batch_from_storage()
|
||||
|
||||
synced_data = list(ClickHouseTestModel.objects.all())
|
||||
|
@ -40,7 +40,7 @@ class SyncTest(TransactionTestCase):
|
|||
self.assertEqual(obj.id, synced_data[0].id)
|
||||
|
||||
def test_collapsing_update_by_final(self):
|
||||
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
|
||||
obj = TestModel.objects.create(value=1, created=now(), created_date=datetime.date.today())
|
||||
obj.value = 2
|
||||
obj.save()
|
||||
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
||||
|
@ -63,7 +63,7 @@ class SyncTest(TransactionTestCase):
|
|||
def test_collapsing_update_by_version(self):
|
||||
ClickHouseCollapseTestModel.engine.version_col = 'version'
|
||||
|
||||
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
|
||||
obj = TestModel.objects.create(value=1, created=now(), created_date=datetime.date.today())
|
||||
obj.value = 2
|
||||
obj.save()
|
||||
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
||||
|
@ -97,7 +97,7 @@ class SyncTest(TransactionTestCase):
|
|||
self.assertEqual(0, len(synced_data))
|
||||
|
||||
def test_multi_model(self):
|
||||
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
|
||||
obj = TestModel.objects.create(value=1, created=now(), created_date=datetime.date.today())
|
||||
obj.value = 2
|
||||
obj.save()
|
||||
ClickHouseMultiTestModel.sync_batch_from_storage()
|
||||
|
@ -268,7 +268,7 @@ class ProfileTest(TransactionTestCase):
|
|||
ClickHouseTestModel.sync_enabled = False
|
||||
|
||||
TestModel.objects.bulk_create([
|
||||
TestModel(created=datetime.datetime.now(), created_date='2018-01-01', value=i)
|
||||
TestModel(created=now(), created_date='2018-01-01', value=i)
|
||||
for i in range(self.BATCH_SIZE)
|
||||
])
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user