1) RedisStorage and tests for it

2) Some fixes
This commit is contained in:
M1ha 2018-11-14 12:52:01 +05:00
parent 2a9567d27f
commit bac333d03a
11 changed files with 252 additions and 34 deletions

View File

@ -5,3 +5,4 @@ typing
psycopg2 psycopg2
infi.clickhouse-orm infi.clickhouse-orm
celery celery
statsd

23
runtests.py Normal file
View File

@ -0,0 +1,23 @@
#!/usr/bin/env python
"""
This suite runs tests in django environment. See:
https://docs.djangoproject.com/en/1.11/topics/testing/advanced/#using-the-django-test-runner-to-test-reusable-applications
"""
import os
import sys
import django
from django.conf import settings
from django.test.utils import get_runner
if __name__ == "__main__":
print('Django: ', django.VERSION)
print('Python: ', sys.version)
os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.settings'
django.setup()
TestRunner = get_runner(settings)
test_runner = TestRunner()
failures = test_runner.run_tests(["tests"])
sys.exit(bool(failures))

5
setup.cfg Normal file
View File

@ -0,0 +1,5 @@
[metadata]
description-file = README.md
[bdist_wheel]
universal = 1

27
setup.py Normal file
View File

@ -0,0 +1,27 @@
from setuptools import setup
with open("README.md", "r") as fh:
long_description = fh.read()
requires = []
with open('requirements.txt') as f:
for line in f.readlines():
line = line.strip() # Remove spaces
line = line.split('#')[0] # Remove comments
if line: # Remove empty lines
requires.append(line)
setup(
name='django-clickhouse',
version='0.0.1',
packages=['django_clickhouse'],
package_dir={'': 'src'},
url='https://github.com/carrotquest/django-clickhouse',
license='BSD 3-clause "New" or "Revised" License',
author='Mikhail Shvein',
author_email='work_shvein_mihail@mail.ru',
description='Django extension to integrate with ClickHouse database',
long_description=long_description,
long_description_content_type="text/markdown",
# requires=requires
)

View File

@ -14,7 +14,9 @@ DEFAULTS = {
'DATABASES': {}, 'DATABASES': {},
'SYNC_BATCH_SIZE': 10000, 'SYNC_BATCH_SIZE': 10000,
'SYNC_STORAGE': 'django_clickhouse.storage.DBStorage', 'SYNC_STORAGE': 'django_clickhouse.storage.DBStorage',
'SYNC_DELAY': 5 'SYNC_DELAY': 5,
'REDIS_CONFIG': None,
'STATSD_PREFIX': 'clickhouse'
} }

View File

@ -1,4 +1,10 @@
from .configuration import PREFIX
class ClickHouseError(Exception): class ClickHouseError(Exception):
pass pass
class ConfigurationError(Exception):
def __init__(self, param_name):
param_name = PREFIX + param_name
super(ConfigurationError, self).__init__("Config parameter '%s' is not set properly" % param_name)

View File

@ -1,8 +1,8 @@
from django.db.models.signals import post_save, post_delete from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver from django.dispatch import receiver
from django.db.models import QuerySet as DjangoQuerySet, Manager as DjangoManager, Model as DjangoModel
class ClickHouseDjangoModelQuerySet(DjangoQuerySet):
class ClickHouseDjangoModelQuerySet(DjangoBaseQuerySet):
""" """
Переопределяет update, чтобы он сгенерировал данные для обновления ClickHouse Переопределяет update, чтобы он сгенерировал данные для обновления ClickHouse
""" """
@ -36,7 +36,7 @@ class ClickHouseDjangoModelQuerySet(DjangoBaseQuerySet):
return result return result
class ClickHouseDjangoModelManager(DjangoBaseManager): class ClickHouseDjangoModelManager(DjangoManager):
def get_queryset(self): def get_queryset(self):
""" """
Инициализирует кастомный QuerySet Инициализирует кастомный QuerySet
@ -51,7 +51,7 @@ class ClickHouseDjangoModelManager(DjangoBaseManager):
return objs return objs
class ClickHouseDjangoModel(DjangoBaseModel): class ClickHouseDjangoModel(DjangoModel):
""" """
Определяет базовую абстрактную модель, синхронизируемую с кликхаусом Определяет базовую абстрактную модель, синхронизируемую с кликхаусом
""" """

View File

@ -6,59 +6,159 @@ This data is periodically fetched from storage and applied to ClickHouse tables.
Important: Important:
Storage should be able to restore current importing batch, if something goes wrong. Storage should be able to restore current importing batch, if something goes wrong.
""" """
import datetime
from typing import Any, Optional, List, Tuple, Iterable
from .exceptions import ConfigurationError
from .configuration import config
class Storage: class Storage:
"""
Base abstract storage class, defining interface for all storages.
The storage work algorithm:
1) pre_sync()
2) get_import_batch(). If batch is present go to 5)
3) If batch is None, call get_operations()
4) Transform operations to batch and call write_import_batch()
5) Import batch to ClickHouse
6) call post_sync(). If succeeded, it should remove the batch and it's data from sync_queue.
def pre_sync(self): # type: () -> None If anything goes wrong before write_import_batch(), it is guaranteed that ClickHouse import hasn't been started yet,
And we can repeat the procedure from the beginning.
If anything goes wrong after write_import_batch(), we don't know it the part has been imported to ClickHouse.
But ClickHouse is idempotent to duplicate inserts. So we can insert one batch twice correctly.
"""
def pre_sync(self, import_key, **kwargs): # type: (str, **dict) -> None
""" """
This method is called before import process starts This method is called before import process starts
:param import_key: A key, returned by ClickHouseModel.get_import_key() method
:param kwargs: Storage dependant arguments
:return: None :return: None
""" """
pass pass
def post_sync(self, success): # type: (bool) -> None def post_sync(self, import_key, **kwargs): # type: (str, **dict) -> None
""" """
This method is called after import process has finished. This method is called after import process has finished.
:param success: A flag, if process ended with success or error :param import_key: A key, returned by ClickHouseModel.get_import_key() method
:param kwargs: Storage dependant arguments
:return: None :return: None
""" """
pass pass
def get_sync_ids(self, **kwargs): # type(**dict) -> Tuple[Set[Any], Set[Any], Set[Any]] def get_import_batch(self, import_key, **kwargs):
# type: (str, **dict) -> Optional[Tuple[str]]
""" """
Must return 3 sets of ids: to insert, update and delete records. Returns a saved batch for ClickHouse import or None, if it was not found
Method should be error safe - if something goes wrong, import data should not be lost :param import_key: A key, returned by ClickHouseModel.get_import_key() method
:param kwargs: Storage dependant arguments :param kwargs: Storage dependant arguments
:return: 3 sets of primary keys :return: None, if no batch has been formed. A tuple strings, saved in write_import_batch() method.
""" """
raise NotImplemented() raise NotImplemented()
def write_import_batch(self, import_key, batch, **kwargs):
# type: (str, Iterable[str], **dict) -> None
"""
Saves batch for ClickHouse import
:param import_key: A key, returned by ClickHouseModel.get_import_key() method
:param batch: An iterable of strings to save as a batch
:param kwargs: Storage dependant arguments
:return: None
"""
raise NotImplemented()
def get_operations(self, import_key, count, **kwargs):
# type: (str, int, **dict) -> List[Tuple[str, str]]
"""
Must return a list of operations on the model.
Method should be error safe - if something goes wrong, import data should not be lost.
:param import_key: A key, returned by ClickHouseModel.get_import_key() method
:param count: A batch size to get
:param kwargs: Storage dependant arguments
:return: A list of tuples (operation, pk) in incoming order.
"""
raise NotImplemented()
def register_operation(self, import_key, operation, pk): # type: (str, str, Any) -> None
"""
Registers new incoming operation
:param import_key: A key, returned by ClickHouseModel.get_import_key() method
:param operation: One of insert, update, delete
:param pk: Primary key to find records in main database. Should be string-serializable with str() method.
:return: None
"""
raise NotImplementedError()
def register_operation_wrapped(self, import_key, operation, pk):
# type: (str, str, Any) -> None
"""
This is a wrapper for register_operation method, checking main parameters.
This method should be called from inner functions.
:param import_key: A key, returned by ClickHouseModel.get_import_key() method
:param operation: One of insert, update, delete
:param pk: Primary key to find records in main database. Should be string-serializable with str() method.
:return: None
"""
if operation not in {'insert', 'update', 'delete'}:
raise ValueError('operation must be one of [insert, update, delete]')
return self.register_operation(import_key, operation, pk)
class RedisStorage(Storage): class RedisStorage(Storage):
"""
Fast in-memory storage made on bases of redis and redis-py library.
Requires:
1) REDIS database
2) CLICKHOUSE_REDIS_CONFIG parameter defined. This should be a dict of kwargs for redis.StrictRedis(**kwargs).
"""
REDIS_KEY_OPS_TEMPLATE = 'clickhouse_sync:operations:{import_key}'
REDIS_KEY_TS_TEMPLATE = 'clickhouse_sync:timstamp:{import_key}'
REDIS_KEY_BATCH_TEMPLATE = 'clickhouse_sync:batch:{import_key}'
def __init__(self): def __init__(self):
# Create redis library connection. If redis is not connected properly errors should be raised
if config.REDIS_CONFIG is None:
raise ConfigurationError('REDIS_CONFIG')
from redis import StrictRedis
self._redis = StrictRedis(**config.REDIS_CONFIG)
@classmethod def register_operation(self, import_key, operation, pk):
def get_sync_ids(cls, **kwargs): key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
# Шардинговый формат
key = 'clickhouse_sync:{using}:{table}:{operation}'.format(table=cls.django_model._meta.db_table,
operation='*', using=(using or 'default'))
# Множества id для вставки, обновления, удаления # key, score, value
insert_model_ids, update_model_ids, delete_model_ids = set(), set(), set() self._redis.zadd(key, datetime.datetime.now().timestamp(), '%s:%s' % (operation, str(pk)))
for key in settings.REDIS.keys(key): def get_operations(self, import_key, count, **kwargs):
model_ids = settings.REDIS.pipeline().smembers(key).delete(key).execute()[0] ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
model_ids = {int(mid) for mid in model_ids} ops, scores = zip(*self._redis.zrangebyscore(ops_key, '-inf', datetime.datetime.now().timestamp(),
start=0, num=count, withscores=True))
op = key.decode('utf-8').split(':')[-1] ts_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
if op == 'INSERT': self._redis.set(ts_key, max(scores))
insert_model_ids = set(model_ids)
elif op == 'UPDATE':
update_model_ids = set(model_ids)
else: # if op == 'DELETE'
delete_model_ids = set(model_ids)
return insert_model_ids, update_model_ids, delete_model_ids return list(tuple(op.decode().split(':')) for op in ops)
def get_import_batch(self, import_key, **kwargs):
batch_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
return tuple(item.decode() for item in self._redis.lrange(batch_key, 0, -1))
def write_import_batch(self, import_key, batch, **kwargs):
batch_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
# Elements are pushed to the head, so we need to invert batch in order to save correct order
self._redis.lpush(batch_key, *reversed(batch))
def post_sync(self, import_key, **kwargs):
ts_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
batch_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
score = float(self._redis.pipeline().get(ts_key))
self._redis.pipeline()\
.zremrangebyscore(ops_key, '-inf', score)\
.delete(batch_key)\
.execute()

View File

@ -38,8 +38,14 @@ LOGGING = {
} }
INSTALLED_APPS = [ INSTALLED_APPS = [
"src.django_clickhouse", "src",
"tests" "tests"
] ]
CLICKHOUSE_BATCH_SIZE = 5000 CLICKHOUSE_SYNC_BATCH_SIZE = 5000
CLICKHOUSE_REDIS_CONFIG = {
'host': '127.0.0.1',
'port': 6379,
'db': 8,
'socket_timeout': 10
}

View File

@ -8,7 +8,7 @@ class ConfigTest(TestCase):
self.assertEqual(5, config.SYNC_DELAY) self.assertEqual(5, config.SYNC_DELAY)
def test_value(self): def test_value(self):
self.assertEqual(5000, config.BATCH_SIZE) self.assertEqual(5000, config.SYNC_BATCH_SIZE)
def test_not_lib_prop(self): def test_not_lib_prop(self):
with self.assertRaises(AttributeError): with self.assertRaises(AttributeError):

48
tests/test_storages.py Normal file
View File

@ -0,0 +1,48 @@
from django.test import TestCase
from django_clickhouse.storage import RedisStorage
class StorageTest(TestCase):
storage = RedisStorage()
def setUp(self):
# Clean storage
redis = self.storage._redis
redis.delete(*redis.keys('clickhouse_sync*'))
def test_operation_pks(self):
self.storage.register_operation_wrapped('test', 'insert', 100500)
self.storage.register_operation_wrapped('test', 'insert', 100501)
self.storage.register_operation_wrapped('test', 'insert', 100502)
self.assertListEqual([
('insert', '100500'),
('insert', '100501'),
('insert', '100502'),
], self.storage.get_operations('test', 10))
def test_operation_types(self):
self.storage.register_operation_wrapped('test', 'insert', 100500)
self.storage.register_operation_wrapped('test', 'update', 100500)
self.storage.register_operation_wrapped('test', 'delete', 100500)
self.assertListEqual([
('insert', '100500'),
('update', '100500'),
('delete', '100500'),
], self.storage.get_operations('test', 10))
def test_operation_import_keys(self):
self.storage.register_operation_wrapped('test1', 'insert', 100500)
self.storage.register_operation_wrapped('test2', 'insert', 100500)
self.storage.register_operation_wrapped('test2', 'insert', 100501)
self.assertListEqual([
('insert', '100500')
], self.storage.get_operations('test1', 10))
self.assertListEqual([
('insert', '100500'),
('insert', '100501'),
], self.storage.get_operations('test2', 10))
def test_import_batch(self):
self.storage.write_import_batch('test', [str(i) for i in range(10)])
self.assertTupleEqual(tuple(str(i) for i in range(10)), self.storage.get_import_batch('test'))