Compare commits

..

No commits in common. "master" and "v1.1.1" have entirely different histories.

24 changed files with 96 additions and 368 deletions

View File

@ -11,56 +11,12 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
postgres-version: ["9.6", "10", "11", "12", "13", "14", "15", "16"]
django-version: ["3.2", "4.0", "4.1", "4.2", "5.0", "5.1"]
python-version: ["3.6", "3.7", "3.8", "3.9"]
postgres-version: ["9.6", "10", "11", "12"]
django-version: ["2.1", "2.2", "3.0", "3.1", "3.2"]
clickhouse-version: ["latest"]
redis-version: ["latest"]
exclude:
# Django 4.0+ doesn't support PostgreSQL 9.6
- django-version: "4.0"
postgres-version: "9.6"
- django-version: "4.1"
postgres-version: "9.6"
- django-version: "4.2"
postgres-version: "9.6"
- django-version: "5.0"
postgres-version: "9.6"
- django-version: "5.1"
postgres-version: "9.6"
# Django 4.1+ doesn't support PostgreSQL 10
- django-version: "4.1"
postgres-version: "10"
- django-version: "4.2"
postgres-version: "10"
- django-version: "5.0"
postgres-version: "10"
- django-version: "5.1"
postgres-version: "10"
# Django 4.2+ doesn't support PostgreSQL 11
- django-version: "4.2"
postgres-version: "11"
- django-version: "5.0"
postgres-version: "11"
- django-version: "5.1"
postgres-version: "11"
# Django 5.1+ doesn't support PostgreSQL 12
- django-version: "5.1"
postgres-version: "12"
# Django 5.0+ does not support python 3.8, 3.9
- django-version: "5.0"
python-version: "3.8"
- django-version: "5.0"
python-version: "3.9"
- django-version: "5.1"
python-version: "3.8"
- django-version: "5.1"
python-version: "3.9"
services:
postgres:
image: postgres:${{ matrix.postgres-version }}

View File

@ -24,7 +24,7 @@ services:
build:
context: .
args:
- PYTHON_IMAGE_TAG=latest
- PYTHON_VER=latest
environment:
- REDIS_HOST=redis_db
- PGHOST=postgres_db

View File

@ -75,12 +75,6 @@ Parameters
* `--verbosity: Optional[int] = 1` - Level of debug output. See [here](https://docs.djangoproject.com/en/3.2/ref/django-admin/#cmdoption-verbosity) for more details.
* `--help` - Print help
## Migration operations enhancements
* `RunSQL`, `RunPython`
Can accept `hints: dict = {}` parameter in order to set migration database alias (`force_migrate_on_databases: List[str]` key) or model (`model: Union[str, Type[ClickHouseModel]]` key)
## Migration algorithm
- Get a list of databases from `CLICKHOUSE_DATABASES` setting. Migrate them one by one.
- Find all django apps from `INSTALLED_APPS` setting, which have no `readonly=True` attribute and have `migrate=True` attribute. Migrate them one by one.

View File

@ -96,19 +96,6 @@ class ClickHouseUser(ClickHouseModel):
engine = MergeTree('birthday', ('birthday',))
```
**Important note**: `clickhouse_model.py` file is not anyhow imported by django initialization code. So if your models are not used anywhere excluding this file, you should import it somewhere in your code if you want synchroniztion working correctly. For instance, you can customise [AppConfig](https://docs.djangoproject.com/en/5.0/ref/applications/#django.apps.AppConfig.ready) like:
```python
from django.apps import AppConfig
class MyAppConfig(AppConfig):
name = 'my_app'
def ready(self):
from my_app.clickhouse_models import ClickHouseUser
```
## Migration to create table in ClickHouse
1. Read [migrations](migrations.md) section
2. Create `clickhouse_migrations` package in your django app
@ -125,7 +112,7 @@ class MyAppConfig(AppConfig):
4. Add content to file `0001_initial.py`:
```python
from django_clickhouse import migrations
from my_app.clickhouse_models import ClickHouseUser
from my_app.cilckhouse_models import ClickHouseUser
class Migration(migrations.Migration):
operations = [

View File

@ -25,17 +25,12 @@ Router is a class, defining 3 methods:
Returns `database alias` to use for given `model` for `SELECT` queries.
* `def db_for_write(self, model: ClickHouseModel, **hints) -> str`
Returns `database alias` to use for given `model` for `INSERT` queries.
* `def allow_migrate(self, db_alias: str, app_label: str, operation: Operation, **hints: dict) -> bool`
* `def allow_migrate(self, db_alias: str, app_label: str, operation: Operation, model: Optional[ClickHouseModel] = None, **hints: dict) -> bool`
Checks if migration `operation` should be applied in django application `app_label` on database `db_alias`.
Optional `hints` help to pass additional info which can be used to test migrations availability on concrete model.
Optional `model` field can be used to determine migrations on concrete model.
By default [CLICKHOUSE_DATABASE_ROUTER](configuration.md#clickhouse_database_router) is used.
It gets routing information from model fields, described below.
It also gives ability to use 2 kinds of hints:
* `force_migrate_on_databases: Iterable[str]` - concrete database aliases where migration should be applied
* `model: Type[ClickHouseModel]` - a model class, to read routing attributes from.
Can be set as class or its string name.
If name is set, class is searched in current `<app_label>.<config.MODELS_MODULE>` package.
## ClickHouseModel routing attributes
Default database router reads routing settings from model attributes.

View File

@ -13,7 +13,7 @@ with open('requirements.txt') as f:
setup(
name='django-clickhouse',
version='1.2.2',
version='1.1.1',
packages=['django_clickhouse', 'django_clickhouse.management.commands'],
package_dir={'': 'src'},
url='https://github.com/carrotquest/django-clickhouse',

View File

@ -9,7 +9,6 @@ from itertools import chain
from typing import List, Tuple, Iterable, Set, Any, Optional
from django.db.models import Model as DjangoModel, QuerySet as DjangoQuerySet
from django.utils.timezone import now
from infi.clickhouse_orm.engines import CollapsingMergeTree
from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase
from statsd.defaults.django import statsd
@ -291,7 +290,7 @@ class ClickHouseModel(InfiModel, metaclass=ClickHouseModelMeta):
res = (datetime.datetime.now() - last_sync_time).total_seconds() >= cls.get_sync_delay()
logger.debug('django-clickhouse: need_sync returned %s for class %s as no last sync found'
' (now: %s, last: %s, delay: %d)'
% (res, cls.__name__, now().isoformat(), last_sync_time.isoformat(),
% (res, cls.__name__, datetime.datetime.now().isoformat(), last_sync_time.isoformat(),
cls.get_sync_delay()))
return res

View File

@ -2,8 +2,7 @@
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
"""
import datetime
import logging
from typing import List, Type, Union, Iterable, Optional, Tuple, NamedTuple
from typing import List, Type, Union, Iterable, Optional
from django.db.models import Model as DjangoModel
from infi.clickhouse_orm import engines as infi_engines
@ -15,9 +14,6 @@ from .database import connections
from .utils import format_datetime
logger = logging.getLogger('django-clickhouse')
class InsertOnlyEngineMixin:
def get_insert_batch(self, model_cls: Type[ClickHouseModel], objects: List[DjangoModel]) -> Iterable[tuple]:
"""
@ -49,97 +45,30 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
self.version_col = kwargs.pop('version_col', None)
super(CollapsingMergeTree, self).__init__(*args, **kwargs)
def _get_final_versions_by_version(self, db_alias: str, model_cls: Type[ClickHouseModel], object_pks: Iterable[str],
columns: str, date_range_filter: str = '') -> List[NamedTuple]:
"""
Performs request to ClickHouse in order to fetch latest version for each object pk
:param db_alias: ClickHouse database alias used
:param model_cls: Model class for which data is fetched
:param object_pks: Objects primary keys to filter by
:param columns: Columns to fetch
:param date_range_filter: Optional date_range_filter which speeds up query if date_col is set
:return: List of named tuples with requested columns
"""
if date_range_filter:
date_range_filter = 'PREWHERE {}'.format(date_range_filter)
def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns):
query = """
SELECT {columns}
FROM $table
{date_range_filter}
WHERE `{pk_column}` IN ({object_pks})
ORDER BY `{pk_column}`, `{version_col}` DESC
LIMIT 1 BY `{pk_column}`
""".format(columns=','.join(columns), version_col=self.version_col, pk_column=self.pk_column,
date_range_filter=date_range_filter, object_pks=','.join(object_pks), sign_col=self.sign_col)
SELECT {columns} FROM $table WHERE (`{pk_column}`, `{version_col}`) IN (
SELECT `{pk_column}`, MAX(`{version_col}`)
FROM $table
PREWHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
AND `{pk_column}` IN ({object_pks})
GROUP BY `{pk_column}`
)
""".format(columns=','.join(columns), version_col=self.version_col, date_col=date_col, pk_column=self.pk_column,
min_date=min_date, max_date=max_date, object_pks=','.join(object_pks))
return connections[db_alias].select_tuples(query, model_cls)
def _get_final_versions_by_final(self, db_alias: str, model_cls: Type[ClickHouseModel], object_pks: Iterable[str],
columns: str, date_range_filter: str = '') -> List[NamedTuple]:
"""
Performs request to ClickHouse in order to fetch latest version for each object pk
:param db_alias: ClickHouse database alias used
:param model_cls: Model class for which data is fetched
:param object_pks: Objects primary keys to filter by
:param columns: Columns to fetch
:param date_range_filter: Optional date_range_filter which speeds up query if date_col is set
:return: List of named tuples with requested columns
"""
if date_range_filter:
date_range_filter += ' AND'
def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns):
query = """
SELECT {columns} FROM $table FINAL
WHERE {date_range_filter} `{pk_column}` IN ({object_pks})
WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
AND `{pk_column}` IN ({object_pks})
"""
query = query.format(columns=','.join(columns), pk_column=self.pk_column, date_range_filter=date_range_filter,
object_pks=','.join(object_pks))
query = query.format(columns=','.join(columns), date_col=date_col, pk_column=self.pk_column, min_date=min_date,
max_date=max_date, object_pks=','.join(object_pks))
return connections[db_alias].select_tuples(query, model_cls)
def _get_date_rate_filter(self, objects, model_cls: Type[ClickHouseModel], db_alias: str,
date_col: Optional[str]) -> str:
"""
Generates datetime filter to speed up final queries, if date_col is present
:param objects: Objects, which are inserted
:param model_cls: Model class for which data is fetched
:param db_alias: ClickHouse database alias used
:param date_col: Optional column name, where partition date is hold. Defaults to self.date_col
:return: String to add to WHERE or PREWHERE query section
"""
def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str:
if isinstance(dt, datetime.datetime):
return format_datetime(dt, 0, db_alias=db_alias)
elif isinstance(dt, datetime.date):
return dt.isoformat()
else:
raise Exception('Invalid date or datetime object: `%s`' % dt)
date_col = date_col or self.date_col
if not date_col:
logger.warning('django-clickhouse: date_col is not provided for model %s.'
' This can cause significant performance problems while fetching data.'
' It is worth inheriting CollapsingMergeTree engine with custom get_final_versions() method,'
' based on your partition_key' % model_cls)
return ''
min_date, max_date = None, None
for obj in objects:
obj_date = getattr(obj, date_col)
if min_date is None or min_date > obj_date:
min_date = obj_date
if max_date is None or max_date < obj_date:
max_date = obj_date
min_date = _dt_to_str(min_date)
max_date = _dt_to_str(max_date)
return "`{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'".\
format(min_date=min_date, max_date=max_date, date_col=date_col)
def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel],
date_col: Optional[str] = None) -> Iterable[tuple]:
"""
@ -149,24 +78,45 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
It also supposes primary key to by self.pk_column
:param model_cls: ClickHouseModel subclass to import
:param objects: Objects for which final versions are searched
:param date_col: Optional column name, where partition date is hold. Defaults to self.date_col
:param date_col: Optional column name, where partiion date is hold. Defaults to self.date_col
:return: A generator of named tuples, representing previous state
"""
def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str:
if isinstance(dt, datetime.datetime):
return format_datetime(dt, 0, db_alias=db_alias)
elif isinstance(dt, datetime.date):
return dt.isoformat()
else:
raise Exception('Invalid date or datetime object: `%s`' % dt)
if not objects:
raise StopIteration()
date_col = date_col or self.date_col
min_date, max_date = None, None
for obj in objects:
obj_date = getattr(obj, date_col)
if min_date is None or min_date > obj_date:
min_date = obj_date
if max_date is None or max_date < obj_date:
max_date = obj_date
object_pks = [str(getattr(obj, self.pk_column)) for obj in objects]
db_alias = model_cls.get_database_alias()
date_range_filter = self._get_date_rate_filter(objects, model_cls, db_alias, date_col)
min_date = _dt_to_str(min_date)
max_date = _dt_to_str(max_date)
# Get fields. Sign is replaced to negative for further processing
columns = list(model_cls.fields(writable=True).keys())
columns.remove(self.sign_col)
columns.append('-1 AS sign')
params = (db_alias, model_cls, object_pks, columns, date_range_filter)
params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns)
if self.version_col:
return self._get_final_versions_by_version(*params)

View File

@ -23,7 +23,7 @@ class Command(BaseCommand):
' By default all available migrations are applied.'
' Note that library currently have no ability rollback migrations')
parser.add_argument('--database', '-d', nargs='?', type=str, required=False, choices=list(config.DATABASES.keys()),
parser.add_argument('--database', '-d', nargs='?', type=str, required=False, choices=config.DATABASES.keys(),
help='ClickHouse database alias key from CLICKHOUSE_DATABASES django setting.'
' By default migrations are applied to all databases.')

View File

@ -10,7 +10,6 @@ from django.dispatch import receiver
# In order to support all operations import here
from infi.clickhouse_orm.migrations import * # noqa F401, F403
from infi.clickhouse_orm.migrations import RunSQL as LibRunSQL, RunPython as LibRunPython
from infi.clickhouse_orm.database import ServerError, DatabaseException
from infi.clickhouse_orm.fields import StringField, DateField
@ -40,9 +39,10 @@ class Migration:
database = database or connections[db_alias]
for op in self.operations:
model_class = getattr(op, 'model_class', None)
hints = getattr(op, 'hints', {})
if db_router.allow_migrate(db_alias, self.__module__, op, **hints):
if db_router.allow_migrate(db_alias, self.__module__, op, model_class, **hints):
op.apply(database)
@ -176,15 +176,3 @@ class MigrationHistory(ClickHouseModel):
@classmethod
def table_name(cls):
return 'django_clickhouse_migrations'
class RunSQL(LibRunSQL):
def __init__(self, *args, hints: Optional[dict] = None, **kwargs):
super().__init__(*args, **kwargs)
self.hints = hints or {}
class RunPython(LibRunPython):
def __init__(self, *args, hints: Optional[dict] = None, **kwargs):
super().__init__(*args, **kwargs)
self.hints = hints or {}

View File

@ -64,7 +64,7 @@ class ClickHouseSyncBulkUpdateQuerySetMixin(ClickHouseSyncRegisterMixin, BulkUpd
if returning is None:
returning = pk_name
elif isinstance(returning, str):
returning = [pk_name, returning] if returning != '*' else '*'
returning = [pk_name, returning]
else:
returning = list(returning) + [pk_name]

View File

@ -30,26 +30,24 @@ class DefaultRouter:
"""
return random.choice(model.write_db_aliases)
def allow_migrate(self, db_alias: str, app_label: str, operation: Operation, **hints) -> bool:
def allow_migrate(self, db_alias: str, app_label: str, operation: Operation,
model=None, **hints) -> bool:
"""
Checks if migration can be applied to given database
:param db_alias: Database alias to check
:param app_label: App from which migration is got
:param operation: Operation object to perform
:param model: Model migration is applied to
:param hints: Hints to make correct decision
:return: boolean
"""
if hints.get("force_migrate_on_databases", None):
return db_alias in hints["force_migrate_on_databases"]
model = hints.get('model') or getattr(operation, 'model_class', None)
if model is None:
raise ValueError('"model_class" attribute is not defined for operation "%s". '
'Please provide "force_migrate_on_databases" or "model" in hints.'
% operation.__class__.__name__)
if hints.get('model'):
model = '%s.%s.%s' % (app_label, config.MODELS_MODULE, hints['model']) \
if isinstance(hints['model'], str) else hints['model']
model = '%s.%s.%s' % (app_label, config.MODELS_MODULE, model) \
if isinstance(model, str) else model
model = lazy_class_import(model)
if operation.__class__ not in {CreateTable, DropTable}:

View File

@ -11,15 +11,12 @@ import logging
from typing import Any, Optional, List, Tuple
import os
from celery.utils.nodenames import gethostname
from django.utils.timezone import now
from statsd.defaults.django import statsd
from .configuration import config
from .exceptions import ConfigurationError, RedisLockTimeoutError
from .redis import redis_zadd
from .utils import check_pid_exists, get_subclasses, SingletonMeta
from .utils import check_pid, get_subclasses, SingletonMeta
logger = logging.getLogger('django-clickhouse')
@ -189,7 +186,8 @@ class RedisStorage(Storage, metaclass=SingletonMeta):
def get_operations(self, import_key, count, **kwargs):
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
res = self._redis.zrangebyscore(ops_key, '-inf', now().timestamp(), start=0, num=count, withscores=True)
res = self._redis.zrangebyscore(ops_key, '-inf', datetime.datetime.now().timestamp(), start=0, num=count,
withscores=True)
if res:
ops, scores = zip(*res)
@ -216,31 +214,19 @@ class RedisStorage(Storage, metaclass=SingletonMeta):
# Block process to be single threaded. Default sync delay is 10 * default sync delay.
# It can be changed for model, by passing `lock_timeout` argument to pre_sync
lock = self.get_lock(import_key, **kwargs)
current_host_name = gethostname()
lock_pid_key = self.REDIS_KEY_LOCK_PID.format(import_key=import_key)
try:
lock.acquire()
self._redis.set(lock_pid_key, '%s:%s' % (current_host_name, os.getpid()))
self._redis.set(lock_pid_key, os.getpid())
except RedisLockTimeoutError:
statsd.incr('%s.sync.%s.lock.timeout' % (config.STATSD_PREFIX, import_key))
# Lock is busy. But If the process has been killed, I don't want to wait any more.
# I assume that lock has been killed if it works on the same host (other than localhost)
# and there is no process alive.
# I also assume that there are no hosts with same hostname other than localhost.
# Note: previously value contained only pid. Let's support old value for back compatibility
active_lock_data = self._redis.get(lock_pid_key).split(b":")
active_pid = int(active_lock_data[-1] or 0)
active_host_name = active_lock_data[0] \
if len(active_lock_data) > 1 and active_lock_data[0] != "localhost" else None
if (
active_pid and active_host_name
and active_host_name == current_host_name and not check_pid_exists(active_pid)
):
# Let's check if pid exists
pid = int(self._redis.get(lock_pid_key) or 0)
if pid and not check_pid(pid):
statsd.incr('%s.sync.%s.lock.hard_release' % (config.STATSD_PREFIX, import_key))
logger.warning('django-clickhouse: hard releasing lock "%s" locked by pid %d (process is dead)'
% (import_key, active_pid))
% (import_key, pid))
self._redis.delete(lock_pid_key)
lock.hard_release()
self.pre_sync(import_key, **kwargs)

View File

@ -127,7 +127,7 @@ def model_to_dict(instance: DjangoModel, fields: Optional[Iterable[str]] = None
return data
def check_pid_exists(pid):
def check_pid(pid):
"""
Check For the existence of a unix pid.
"""

View File

@ -5,7 +5,7 @@
"fields": {
"value": 100,
"created_date": "2018-01-01",
"created": "2018-01-01 00:00:00+0000"
"created": "2018-01-01 00:00:00"
}
},
{
@ -14,7 +14,7 @@
"fields": {
"value": 200,
"created_date": "2018-02-01",
"created": "2018-02-01 00:00:00+0000"
"created": "2018-02-01 00:00:00"
}
},
{
@ -23,7 +23,7 @@
"fields": {
"value": 300,
"created_date": "2018-03-01",
"created": "2018-03-01 00:00:00+0000"
"created": "2018-03-01 00:00:00"
}
},
{
@ -32,7 +32,7 @@
"fields": {
"value": 400,
"created_date": "2018-04-01",
"created": "2018-04-01 00:00:00+0000"
"created": "2018-04-01 00:00:00"
}
},
{
@ -41,7 +41,7 @@
"fields": {
"value": 500,
"created_date": "2018-05-01",
"created": "2018-05-01 00:00:00+0000"
"created": "2018-05-01 00:00:00"
}
}
]

View File

@ -5,7 +5,7 @@
"fields": {
"value": 100,
"created_date": "2018-01-01",
"created": "2018-02-01 00:00:00+0000"
"created": "2018-02-01 00:00:00"
}
},
{
@ -14,7 +14,7 @@
"fields": {
"value": 200,
"created_date": "2018-02-01",
"created": "2018-02-01 00:00:00+0000"
"created": "2018-02-01 00:00:00"
}
}
]

View File

@ -8,7 +8,6 @@ from time import sleep
import datetime
from django.utils.timezone import now
# set Django environment
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@ -26,7 +25,7 @@ logger = logging.getLogger('django-clickhouse')
def create(batch_size=1000, test_time=60, period=1, **kwargs):
for iteration in range(int(test_time / period)):
res = TestModel.objects.db_manager('test_db').bulk_create([
TestModel(created=now(), created_date='2018-01-01', value=iteration * batch_size + i)
TestModel(created=datetime.datetime.now(), created_date='2018-01-01', value=iteration * batch_size + i)
for i in range(batch_size)
])
logger.info('django-clickhouse: test created %d records' % len(res))
@ -55,8 +54,8 @@ def sync(period=1, test_time=60, **kwargs):
if kwargs['once']:
ClickHouseCollapseTestModel.sync_batch_from_storage()
else:
start = now()
while (now() - start).total_seconds() < test_time:
start = datetime.datetime.now()
while (datetime.datetime.now() - start).total_seconds() < test_time:
ClickHouseCollapseTestModel.sync_batch_from_storage()
sleep(period)

View File

@ -4,7 +4,6 @@ This file contains django settings to run tests with runtests.py
from os import environ
SECRET_KEY = 'fake-key'
USE_TZ = True
DATABASES = {
'default': {

View File

@ -1,7 +1,6 @@
import datetime
from django.test import TestCase
from django.utils.timezone import now
from tests.clickhouse_models import ClickHouseTestModel
@ -21,11 +20,11 @@ class ClickHouseModelTest(TestCase):
self.assertTrue(ClickHouseTestModel.need_sync())
# Time hasn't passed - no sync
self.storage.set_last_sync_time(ClickHouseTestModel.get_import_key(), now())
self.storage.set_last_sync_time(ClickHouseTestModel.get_import_key(), datetime.datetime.now())
self.assertFalse(ClickHouseTestModel.need_sync())
# Time has passed
sync_delay = ClickHouseTestModel.get_sync_delay()
self.storage.set_last_sync_time(ClickHouseTestModel.get_import_key(),
now() - datetime.timedelta(seconds=sync_delay + 1))
datetime.datetime.now() - datetime.timedelta(seconds=sync_delay + 1))
self.assertTrue(ClickHouseTestModel.need_sync())

View File

@ -88,25 +88,12 @@ class CollapsingMergeTreeTest(TestCase):
self.objects, date_col='created')
self._test_final_versions(final_versions)
def test_get_final_versions_by_final_no_date_col(self):
ClickHouseCollapseTestModel.engine.date_col = None
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects)
self._test_final_versions(final_versions)
def test_get_final_versions_by_version_datetime(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects, date_col='created')
self._test_final_versions(final_versions)
def test_get_final_versions_by_version_no_date_col(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
ClickHouseCollapseTestModel.engine.date_col = None
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects)
self._test_final_versions(final_versions)
def test_versions(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects)

View File

@ -13,7 +13,7 @@ from tests.clickhouse_models import ClickHouseTestModel
class NoMigrateRouter(DefaultRouter):
def allow_migrate(self, db_alias, app_label, operation, **hints):
def allow_migrate(self, db_alias, app_label, operation, model=None, **hints):
return False

View File

@ -40,7 +40,7 @@ class TestOperations(TransactionTestCase):
def test_save(self):
# INSERT operation
instance = self.django_model(created_date=datetime.date.today(), created=now(), value=2)
instance = self.django_model(created_date=datetime.date.today(), created=datetime.datetime.now(), value=2)
instance.save()
self.assertListEqual([('insert', "%s.%d" % (self.db_alias, instance.pk))],
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
@ -52,13 +52,13 @@ class TestOperations(TransactionTestCase):
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
def test_create(self):
instance = self.django_model.objects.create(pk=100555, created_date=datetime.date.today(), created=now(),
value=2)
instance = self.django_model.objects.create(pk=100555, created_date=datetime.date.today(),
created=datetime.datetime.now(), value=2)
self.assertListEqual([('insert', "%s.%d" % (self.db_alias, instance.pk))],
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
def test_bulk_create(self):
items = [self.django_model(created_date=datetime.date.today(), created=now(), value=i)
items = [self.django_model(created_date=datetime.date.today(), created=datetime.datetime.now(), value=i)
for i in range(5)]
items = self.django_model.objects.bulk_create(items)
self.assertEqual(5, len(items))
@ -99,22 +99,6 @@ class TestOperations(TransactionTestCase):
self.assertSetEqual({('insert', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
def test_pg_bulk_create_returning(self):
now_dt = now()
res = self.django_model.objects.pg_bulk_create([
{'value': i, 'created': now_dt, 'created_date': now_dt.date()}
for i in range(5)
], returning='*')
self.assertEqual(5, len(res))
for i, instance in enumerate(res):
self.assertEqual(instance.created, now_dt)
self.assertEqual(instance.created_date, now_dt.date())
self.assertEqual(i, instance.value)
self.assertSetEqual({('insert', "%s.%d" % (self.db_alias, instance.pk)) for instance in res},
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
def test_pg_bulk_update(self):
items = list(self.django_model.objects.filter(pk__in={1, 2}))
@ -131,21 +115,6 @@ class TestOperations(TransactionTestCase):
self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
def test_pg_bulk_update_returning(self):
items = list(self.django_model.objects.filter(pk__in={1, 2}))
res = self.django_model.objects.pg_bulk_update([
{'id': instance.pk, 'value': instance.pk * 10}
for instance in items
], returning='*')
self.assertEqual(2, len(res))
for instance in res:
self.assertEqual(instance.value, instance.pk * 10)
self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
def test_pg_bulk_update_or_create(self):
items = list(self.django_model.objects.filter(pk__in={1, 2}))
@ -166,28 +135,9 @@ class TestOperations(TransactionTestCase):
self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in items},
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
def test_pg_bulk_update_or_create_returning(self):
items = list(self.django_model.objects.filter(pk__in={1, 2}))
data = [{
'id': instance.pk,
'value': instance.pk * 10,
'created_date': instance.created_date,
'created': instance.created
} for instance in items] + [{'id': 11, 'value': 110, 'created_date': datetime.date.today(), 'created': now()}]
res = self.django_model.objects.pg_bulk_update_or_create(data, returning='*')
self.assertEqual(3, len(res))
for instance in res:
self.assertEqual(instance.value, instance.pk * 10)
self.assertSetEqual({('update', "%s.%d" % (self.db_alias, instance.pk)) for instance in res},
set(self.storage.get_operations(self.clickhouse_model.get_import_key(), 10)))
def test_get_or_create(self):
instance, created = self.django_model.objects. \
get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'created': now(),
get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'created': datetime.datetime.now(),
'value': 2})
self.assertTrue(created)
@ -203,7 +153,8 @@ class TestOperations(TransactionTestCase):
def test_update_or_create(self):
instance, created = self.django_model.objects. \
update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'created': now(), 'value': 2})
update_or_create(pk=100, defaults={'created_date': datetime.date.today(),
'created': datetime.datetime.now(), 'value': 2})
self.assertTrue(created)
self.assertListEqual([('insert', "%s.%d" % (self.db_alias, instance.pk))],
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
@ -228,7 +179,7 @@ class TestOperations(TransactionTestCase):
def test_bulk_create_returning(self):
items = [
self.django_model(created_date=datetime.date.today(), created=now(), value=i)
self.django_model(created_date=datetime.date.today(), created=datetime.datetime.now(), value=i)
for i in range(5)
]
items = self.django_model.objects.bulk_create_returning(items)
@ -259,7 +210,7 @@ class TestOperations(TransactionTestCase):
def test_save_returning(self):
# INSERT operation
instance = self.django_model(created_date=datetime.date.today(), created=now(), value=2)
instance = self.django_model(created_date=datetime.date.today(), created=datetime.datetime.now(), value=2)
instance.save_returning()
self.assertListEqual([('insert', "%s.%d" % (self.db_alias, instance.pk))],
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
@ -283,7 +234,6 @@ class TestOperations(TransactionTestCase):
class TestSecondaryOperations(TestOperations):
# from django.db.models.fields import *
fixtures = ['test_secondary_model']
django_model = SecondaryTestModel
clickhouse_model = ClickHouseSecondTestModel

View File

@ -1,59 +0,0 @@
from django.test import SimpleTestCase
from django_clickhouse.migrations import RunSQL, CreateTable
from django_clickhouse.routers import DefaultRouter
from tests.clickhouse_models import ClickHouseTestModel
class DefaultRouterAllowMigrateTest(SimpleTestCase):
def setUp(self):
self.router = DefaultRouter()
self.operation = RunSQL('SELECT 1')
def test_hints_model_class(self):
hints = {'model': ClickHouseTestModel}
with self.subTest('Allow migrate'):
res = self.router.allow_migrate('default', 'tests', self.operation, **hints)
self.assertTrue(res)
with self.subTest('Reject migrate'):
res = self.router.allow_migrate('other', 'tests', self.operation, **hints)
self.assertFalse(res)
def test_hints_model_name(self):
hints = {'model': 'ClickHouseTestModel'}
with self.subTest('Allow migrate'):
res = self.router.allow_migrate('default', 'tests', self.operation, **hints)
self.assertTrue(res)
with self.subTest('Reject migrate'):
res = self.router.allow_migrate('other', 'tests', self.operation, **hints)
self.assertFalse(res)
def test_hints_force_migrate_on_databases(self):
hints = {'force_migrate_on_databases': ['secondary']}
with self.subTest('Allow migrate'):
res = self.router.allow_migrate('secondary', 'apps', self.operation, **hints)
self.assertTrue(res)
with self.subTest('Reject migrate'):
res = self.router.allow_migrate('default', 'apps', self.operation, **hints)
self.assertFalse(res)
def test_model_operation(self):
with self.subTest('Allow migrate'):
operation = CreateTable(ClickHouseTestModel)
res = self.router.allow_migrate('default', 'apps', operation)
self.assertTrue(res)
with self.subTest('Reject migrate'):
operation = CreateTable(ClickHouseTestModel)
res = self.router.allow_migrate('other', 'apps', operation)
self.assertFalse(res)
def test_no_model(self):
with self.assertRaises(ValueError):
self.router.allow_migrate('default', 'apps', self.operation)

View File

@ -30,7 +30,7 @@ class SyncTest(TransactionTestCase):
ClickHouseTestModel.get_storage().flush()
def test_simple(self):
obj = TestModel.objects.create(value=1, created=now(), created_date=datetime.date.today())
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
ClickHouseTestModel.sync_batch_from_storage()
synced_data = list(ClickHouseTestModel.objects.all())
@ -40,7 +40,7 @@ class SyncTest(TransactionTestCase):
self.assertEqual(obj.id, synced_data[0].id)
def test_collapsing_update_by_final(self):
obj = TestModel.objects.create(value=1, created=now(), created_date=datetime.date.today())
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
obj.value = 2
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
@ -63,7 +63,7 @@ class SyncTest(TransactionTestCase):
def test_collapsing_update_by_version(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
obj = TestModel.objects.create(value=1, created=now(), created_date=datetime.date.today())
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
obj.value = 2
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
@ -97,7 +97,7 @@ class SyncTest(TransactionTestCase):
self.assertEqual(0, len(synced_data))
def test_multi_model(self):
obj = TestModel.objects.create(value=1, created=now(), created_date=datetime.date.today())
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
obj.value = 2
obj.save()
ClickHouseMultiTestModel.sync_batch_from_storage()
@ -268,7 +268,7 @@ class ProfileTest(TransactionTestCase):
ClickHouseTestModel.sync_enabled = False
TestModel.objects.bulk_create([
TestModel(created=now(), created_date='2018-01-01', value=i)
TestModel(created=datetime.datetime.now(), created_date='2018-01-01', value=i)
for i in range(self.BATCH_SIZE)
])