1) Tests for ClickHouseSyncModel

2) Bugfixes in ClickHouseSyncModel
This commit is contained in:
M1ha 2018-11-15 13:02:05 +05:00
parent 0967614318
commit c6da379be7
11 changed files with 1213 additions and 949 deletions

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,7 @@ PREFIX = getattr(settings, 'CLICKHOUSE_SETTINGS_PREFIX', 'CLICKHOUSE_')
DEFAULTS = { DEFAULTS = {
'DATABASES': {}, 'DATABASES': {},
'SYNC_BATCH_SIZE': 10000, 'SYNC_BATCH_SIZE': 10000,
'SYNC_STORAGE': 'django_clickhouse.storage.DBStorage', 'SYNC_STORAGE': 'django_clickhouse.storages.RedisStorage',
'SYNC_DELAY': 5, 'SYNC_DELAY': 5,
'REDIS_CONFIG': None, 'REDIS_CONFIG': None,
'STATSD_PREFIX': 'clickhouse', 'STATSD_PREFIX': 'clickhouse',

View File

@ -3,7 +3,7 @@ This file contains base django model to be synced with ClickHouse.
It saves all operations to storage in order to write them to ClickHouse later. It saves all operations to storage in order to write them to ClickHouse later.
""" """
from typing import Optional, Any, List from typing import Optional, Any, List, Type
import six import six
from django.db import transaction from django.db import transaction
@ -12,7 +12,7 @@ from django.dispatch import receiver
from django.db.models import QuerySet as DjangoQuerySet, Manager as DjangoManager, Model as DjangoModel from django.db.models import QuerySet as DjangoQuerySet, Manager as DjangoManager, Model as DjangoModel
from .configuration import config from .configuration import config
from .storage import Storage from .storages import Storage
from .utils import lazy_class_import from .utils import lazy_class_import
@ -20,14 +20,14 @@ try:
from django_pg_returning.manager import UpdateReturningMixin from django_pg_returning.manager import UpdateReturningMixin
except ImportError: except ImportError:
class UpdateReturningMixin: class UpdateReturningMixin:
pass fake = True
try: try:
from django_pg_bulk_update.manager import BulkUpdateManagerMixin from django_pg_bulk_update.manager import BulkUpdateManagerMixin
except ImportError: except ImportError:
class BulkUpdateManagerMixin: class BulkUpdateManagerMixin:
pass fake = True
class ClickHouseSyncUpdateReturningQuerySetMixin(UpdateReturningMixin): class ClickHouseSyncUpdateReturningQuerySetMixin(UpdateReturningMixin):
@ -35,23 +35,27 @@ class ClickHouseSyncUpdateReturningQuerySetMixin(UpdateReturningMixin):
This mixin adopts methods of django-pg-returning library This mixin adopts methods of django-pg-returning library
""" """
def _register_ops(self, result): def _register_ops(self, operation, result):
pk_name = self.model._meta.pk.name pk_name = self.model._meta.pk.name
pk_list = result.values_list(pk_name, flat=True) pk_list = result.values_list(pk_name, flat=True)
self.model.register_clickhouse_operations('update', *pk_list, using=self.db) self.model.register_clickhouse_operations(operation, *pk_list, using=self.db)
def update_returning(self, **updates): def update_returning(self, **updates):
result = super().update_returning(**updates) result = super().update_returning(**updates)
self._register_ops(result) self._register_ops('update', result)
return result return result
def delete_returning(self): def delete_returning(self):
result = super().delete_returning() result = super().delete_returning()
self._register_ops(result) self._register_ops('delete', result)
return result return result
class ClickHouseSyncBulkUpdateManagerMixin(BulkUpdateManagerMixin): class ClickHouseSyncBulkUpdateManagerMixin(BulkUpdateManagerMixin):
"""
This mixin adopts methods of django-pg-bulk-update library
"""
def _update_returning_param(self, returning): def _update_returning_param(self, returning):
pk_name = self.model._meta.pk.name pk_name = self.model._meta.pk.name
if returning is None: if returning is None:
@ -85,13 +89,10 @@ class ClickHouseSyncBulkUpdateManagerMixin(BulkUpdateManagerMixin):
class ClickHouseSyncQuerySetMixin: class ClickHouseSyncQuerySetMixin:
def update(self, **kwargs): def update(self, **kwargs):
if self.model.clickhouse_sync_type == 'redis': pk_name = self.model._meta.pk.name
pk_name = self.model._meta.pk.name res = self.only(pk_name).update_returning(**kwargs).values_list(pk_name, flat=True)
res = self.only(pk_name).update_returning(**kwargs).values_list(pk_name, flat=True) self.model.register_clickhouse_operations('update', *res, using=self.db)
self.model.register_clickhouse_operations('update', *res, usint=self.db) return len(res)
return len(res)
else:
return super().update(**kwargs)
def bulk_create(self, objs, batch_size=None): def bulk_create(self, objs, batch_size=None):
objs = super().bulk_create(objs, batch_size=batch_size) objs = super().bulk_create(objs, batch_size=batch_size)
@ -100,15 +101,23 @@ class ClickHouseSyncQuerySetMixin:
return objs return objs
# I add library dependant mixins to base classes only if libraries are installed
qs_bases = [ClickHouseSyncQuerySetMixin, DjangoQuerySet]
if not getattr(UpdateReturningMixin, 'fake', False):
qs_bases.append(ClickHouseSyncUpdateReturningQuerySetMixin)
if not getattr(BulkUpdateManagerMixin, 'fake', False):
qs_bases.append(ClickHouseSyncBulkUpdateManagerMixin)
ClickHouseSyncModelQuerySet = type('ClickHouseSyncModelQuerySet', tuple(qs_bases), {})
class ClickHouseSyncModelMixin: class ClickHouseSyncModelMixin:
def get_queryset(self): def get_queryset(self):
return ClickHouseSyncModelQuerySet(model=self.model, using=self._db) return ClickHouseSyncModelQuerySet(model=self.model, using=self._db)
class ClickHouseSyncModelQuerySet(ClickHouseSyncQuerySetMixin, DjangoQuerySet):
pass
class ClickHouseSyncModelManager(ClickHouseSyncModelMixin, DjangoManager): class ClickHouseSyncModelManager(ClickHouseSyncModelMixin, DjangoManager):
pass pass
@ -133,7 +142,8 @@ class ClickHouseSyncModel(DjangoModel):
return storage_cls() return storage_cls()
@classmethod @classmethod
def register_clickhouse_sync_model(cls, model_cls): # type: (Type[ClickHouseModel]) -> None def register_clickhouse_sync_model(cls, model_cls):
# type: (Type['django_clickhouse.clickhouse_models.ClickHouseModel']) -> None
""" """
Registers ClickHouse model to listen to this model updates Registers ClickHouse model to listen to this model updates
:param model_cls: Model class to register :param model_cls: Model class to register
@ -142,7 +152,7 @@ class ClickHouseSyncModel(DjangoModel):
cls._clickhouse_sync_models.append(model_cls) cls._clickhouse_sync_models.append(model_cls)
@classmethod @classmethod
def get_clickhouse_sync_models(cls): # type: () -> List[ClickHouseModel] def get_clickhouse_sync_models(cls): # type: () -> List['django_clickhouse.clickhouse_models.ClickHouseModel']
""" """
Returns all clickhouse models, listening to this class Returns all clickhouse models, listening to this class
:return: :return:
@ -159,13 +169,12 @@ class ClickHouseSyncModel(DjangoModel):
:param using: Database alias registered instances are from :param using: Database alias registered instances are from
:return: None :return: None
""" """
def _on_commit():
for model_cls in cls.get_clickhouse_sync_models():
storage.register_operations_wrapped(model_cls.get_import_key(), operation, *model_pks)
if len(model_pks) > 0: if len(model_pks) > 0:
storage = cls.get_clickhouse_storage() storage = cls.get_clickhouse_storage()
def _on_commit():
for model_cls in cls.get_clickhouse_sync_models():
storage.register_operations_wrapped(model_cls.get_import_key(), operation, *model_pks)
transaction.on_commit(_on_commit, using=using) transaction.on_commit(_on_commit, using=using)
def post_save(self, created, using=None): # type: (bool, Optional[str]) -> None def post_save(self, created, using=None): # type: (bool, Optional[str]) -> None

View File

@ -82,7 +82,7 @@ class Storage:
""" """
raise NotImplemented() raise NotImplemented()
def register_operations(self, import_key, operation, *pks): # type: (str, str, *Iterable[Any]) -> None def register_operations(self, import_key, operation, *pks): # type: (str, str, *Any) -> None
""" """
Registers new incoming operation Registers new incoming operation
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -93,7 +93,7 @@ class Storage:
raise NotImplementedError() raise NotImplementedError()
def register_operations_wrapped(self, import_key, operation, *pks): def register_operations_wrapped(self, import_key, operation, *pks):
# type: (str, str, *Iterable[Any]) -> None # type: (str, str, *Any) -> None
""" """
This is a wrapper for register_operation method, checking main parameters. This is a wrapper for register_operation method, checking main parameters.
This method should be called from inner functions. This method should be called from inner functions.
@ -107,6 +107,13 @@ class Storage:
return self.register_operations(import_key, operation, *pks) return self.register_operations(import_key, operation, *pks)
def flush(self):
"""
This method is used in tests to drop all storage data
:return: None
"""
raise NotImplemented()
class RedisStorage(Storage): class RedisStorage(Storage):
""" """
@ -171,3 +178,15 @@ class RedisStorage(Storage):
.zremrangebyscore(ops_key, '-inf', score)\ .zremrangebyscore(ops_key, '-inf', score)\
.delete(batch_key)\ .delete(batch_key)\
.execute() .execute()
def flush(self):
key_tpls = [
self.REDIS_KEY_TS_TEMPLATE.format(import_key='*'),
self.REDIS_KEY_OPS_TEMPLATE.format(import_key='*'),
self.REDIS_KEY_BATCH_TEMPLATE.format(import_key='*')
]
for tpl in key_tpls:
keys = self._redis.keys(tpl)
if keys:
self._redis.delete(*keys)

View File

@ -0,0 +1,7 @@
from django_clickhouse.clickhouse_models import ClickHouseModel
from tests.models import TestModel
class TestClickHouseModel(ClickHouseModel):
django_model = TestModel
sync_delay = 5

18
tests/fixtures/test_model.json vendored Normal file
View File

@ -0,0 +1,18 @@
[
{
"model": "tests.TestModel",
"pk": 1,
"fields": {
"value": 100,
"created_date": "2018-01-01"
}
},
{
"model": "tests.TestModel",
"pk": 2,
"fields": {
"value": 200,
"created_date": "2018-02-01"
}
}
]

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.7 on 2017-12-26 11:00
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = []
operations = [
migrations.CreateModel(
name='TestModel',
fields=[
('id', models.AutoField()),
('value', models.IntegerField()),
('created_date', models.DateField()),
],
options={
'abstract': False,
},
),
]

View File

@ -3,3 +3,9 @@ This file contains sample models to use in tests
""" """
from django.db import models from django.db import models
from django_clickhouse.models import ClickHouseSyncModel
class TestModel(ClickHouseSyncModel):
value = models.IntegerField()
created_date = models.DateField()

View File

@ -11,14 +11,6 @@ DATABASES = {
'PASSWORD': 'test', 'PASSWORD': 'test',
'HOST': '127.0.0.1', 'HOST': '127.0.0.1',
'PORT': '5432' 'PORT': '5432'
},
'secondary': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'test2',
'USER': 'test',
'PASSWORD': 'test',
'HOST': '127.0.0.1',
'PORT': '5432'
} }
} }

112
tests/test_models.py Normal file
View File

@ -0,0 +1,112 @@
import datetime
from django.test import TransactionTestCase
from tests.clickhouse_models import TestClickHouseModel
from tests.models import TestModel
# TestCase can't be used here:
# 1) TestCase creates transaction for inner usage
# 2) I call transaction.on_commit(), expecting no transaction at the moment
# 3) TestCase rollbacks transaction, on_commit not called
class ClickHouseDjangoModelTest(TransactionTestCase):
fixtures = ['test_model']
def setUp(self):
self.storage = TestModel.get_clickhouse_storage()
self.storage.flush()
def tearDown(self):
self.storage.flush()
def test_save(self):
# INSERT operation
instance = TestModel(created_date=datetime.date.today(), value=2)
instance.save()
self.assertListEqual([('insert', str(instance.pk))],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
# UPDATE operation
instance.save()
self.assertListEqual([('insert', str(instance.pk)), ('update', str(instance.pk))],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_create(self):
instance = TestModel.objects.create(pk=100555, created_date=datetime.date.today(), value=2)
self.assertListEqual([('insert', str(instance.pk))],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_bulk_create(self):
items = [TestModel(created_date=datetime.date.today(), value=i) for i in range(5)]
items = TestModel.objects.bulk_create(items)
self.assertEqual(5, len(items))
self.assertListEqual([('insert', str(instance.pk)) for instance in items],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_get_or_create(self):
instance, created = TestModel.objects. \
get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2})
self.assertTrue(created)
self.assertListEqual([('insert', str(instance.pk))],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
instance, created = TestModel.objects. \
get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2})
self.assertFalse(created)
self.assertListEqual([('insert', str(instance.pk))],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_update_or_create(self):
instance, created = TestModel.objects. \
update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2})
self.assertTrue(created)
self.assertListEqual([('insert', str(instance.pk))],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
instance, created = TestModel.objects. \
update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'value': 2})
self.assertFalse(created)
self.assertListEqual([('insert', str(instance.pk)), ('update', str(instance.pk))],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_qs_update(self):
TestModel.objects.filter(pk=1).update(created_date=datetime.date.today())
self.assertListEqual([('update', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
# Update, after which updated element will not suit update conditions
TestModel.objects.filter(created_date__lt=datetime.date.today()). \
update(created_date=datetime.date.today())
self.assertListEqual([('update', '1'), ('update', '2')],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_qs_update_returning(self):
TestModel.objects.filter(pk=1).update_returning(created_date=datetime.date.today())
self.assertListEqual([('update', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
# Update, after which updated element will not suit update conditions
TestModel.objects.filter(created_date__lt=datetime.date.today()). \
update_returning(created_date=datetime.date.today())
self.assertListEqual([('update', '1'), ('update', '2')],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_qs_delete_returning(self):
TestModel.objects.filter(pk=1).delete_returning()
self.assertListEqual([('delete', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
# Update, после которого исходный фильтр уже не сработает
TestModel.objects.filter(created_date__lt=datetime.date.today()).delete_returning()
self.assertListEqual([('delete', '1'), ('delete', '2')],
self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_delete(self):
instance = TestModel.objects.get(pk=1)
instance.delete()
self.assertListEqual([('delete', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))
def test_qs_delete(self):
TestModel.objects.filter(pk=1).delete()
self.assertListEqual([('delete', '1')], self.storage.get_operations(TestClickHouseModel.get_import_key(), 10))

View File

@ -1,18 +1,16 @@
from django.test import TestCase from django.test import TestCase
from django_clickhouse.storage import RedisStorage from django_clickhouse.storages import RedisStorage
class StorageTest(TestCase): class StorageTest(TestCase):
storage = RedisStorage() storage = RedisStorage()
def setUp(self): def setUp(self):
# Clean storage self.storage.flush()
redis = self.storage._redis
keys = redis.keys('clickhouse_sync*') def tearDown(self):
if keys: self.storage.flush()
redis.delete(*keys)
def test_operation_pks(self): def test_operation_pks(self):
self.storage.register_operations_wrapped('test', 'insert', 100500) self.storage.register_operations_wrapped('test', 'insert', 100500)