Added test for killing sync process safety.

Test is not passing =)
This commit is contained in:
M1ha 2018-11-29 15:19:25 +05:00
parent 166d23ca7c
commit 9742abbaec
8 changed files with 241 additions and 137 deletions

View File

@ -14,10 +14,11 @@ from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiMode
from six import with_metaclass
from statsd.defaults.django import statsd
from .query import QuerySet
from .configuration import config
from .database import connections
from .exceptions import RedisLockTimeoutError
from .models import ClickHouseSyncModel
from .query import QuerySet
from .serializers import Django2ClickHouseModelSerializer
from .utils import lazy_class_import
@ -178,35 +179,38 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
Gets one batch from storage and syncs it.
:return:
"""
statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, cls.__name__)
with statsd.timer(statsd_key.format('total')):
try:
statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, cls.__name__)
with statsd.timer(statsd_key.format('total')):
storage = cls.get_storage()
import_key = cls.get_import_key()
storage = cls.get_storage()
import_key = cls.get_import_key()
with statsd.timer(statsd_key.format('pre_sync')):
storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout())
with statsd.timer(statsd_key.format('pre_sync')):
storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout())
with statsd.timer(statsd_key.format('get_operations')):
operations = storage.get_operations(import_key, cls.get_sync_batch_size())
with statsd.timer(statsd_key.format('get_operations')):
operations = storage.get_operations(import_key, cls.get_sync_batch_size())
if operations:
with statsd.timer(statsd_key.format('get_sync_objects')):
import_objects = cls.get_sync_objects(operations)
else:
import_objects = []
if operations:
with statsd.timer(statsd_key.format('get_sync_objects')):
import_objects = cls.get_sync_objects(operations)
else:
import_objects = []
if import_objects:
with statsd.timer(statsd_key.format('get_insert_batch')):
batch = cls.get_insert_batch(import_objects)
if import_objects:
with statsd.timer(statsd_key.format('get_insert_batch')):
batch = cls.get_insert_batch(import_objects)
with statsd.timer(statsd_key.format('insert')):
cls.insert_batch(batch)
with statsd.timer(statsd_key.format('insert')):
cls.insert_batch(batch)
with statsd.timer(statsd_key.format('post_sync')):
storage.post_sync(import_key)
with statsd.timer(statsd_key.format('post_sync')):
storage.post_sync(import_key)
storage.set_last_sync_time(import_key, now())
storage.set_last_sync_time(import_key, now())
except RedisLockTimeoutError:
pass # skip this sync round if lock is acquired by another thread
@classmethod
def need_sync(cls): # type: () -> bool
@ -237,34 +241,38 @@ class ClickHouseMultiModel(ClickHouseModel):
Gets one batch from storage and syncs it.
:return:
"""
statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, cls.__name__)
with statsd.timer(statsd_key.format('total')):
try:
statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, cls.__name__)
with statsd.timer(statsd_key.format('total')):
storage = cls.get_storage()
import_key = cls.get_import_key()
storage = cls.get_storage()
import_key = cls.get_import_key()
with statsd.timer(statsd_key.format('pre_sync')):
storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout())
with statsd.timer(statsd_key.format('pre_sync')):
storage.pre_sync(import_key, lock_timeout=cls.get_lock_timeout())
with statsd.timer(statsd_key.format('get_operations')):
operations = storage.get_operations(import_key, cls.get_sync_batch_size())
with statsd.timer(statsd_key.format('get_operations')):
operations = storage.get_operations(import_key, cls.get_sync_batch_size())
if operations:
with statsd.timer(statsd_key.format('get_sync_objects')):
import_objects = cls.get_sync_objects(operations)
else:
import_objects = []
if operations:
with statsd.timer(statsd_key.format('get_sync_objects')):
import_objects = cls.get_sync_objects(operations)
else:
import_objects = []
if import_objects:
batches = {}
with statsd.timer(statsd_key.format('get_insert_batch')):
for model_cls in cls.sub_models:
batches[model_cls] = model_cls.get_insert_batch(import_objects)
if import_objects:
batches = {}
with statsd.timer(statsd_key.format('get_insert_batch')):
for model_cls in cls.sub_models:
batches[model_cls] = model_cls.get_insert_batch(import_objects)
with statsd.timer(statsd_key.format('insert')):
for model_cls, batch in batches.items():
model_cls.insert_batch(batch)
with statsd.timer(statsd_key.format('insert')):
for model_cls, batch in batches.items():
model_cls.insert_batch(batch)
with statsd.timer(statsd_key.format('post_sync')):
storage.post_sync(import_key)
storage.set_last_sync_time(import_key, now())
with statsd.timer(statsd_key.format('post_sync')):
storage.post_sync(import_key)
storage.set_last_sync_time(import_key, now())
except RedisLockTimeoutError:
pass # skip this sync round if lock is acquired by another thread

View File

@ -1,9 +1,16 @@
"""
Contains additional components for redis-py to use in RedisStorage
"""
import os
from itertools import chain
import logging
from .exceptions import RedisLockTimeoutError
logger = logging.getLogger('django-clickhouse')
class RedisLock:
"""
Fixes issue of https://github.com/andymccurdy/redis-py/issues/621
@ -13,18 +20,22 @@ class RedisLock:
self.lock = redis_client.lock(*args, **kwargs)
def __enter__(self):
if self.lock.acquire():
return self
else:
raise RedisLockTimeoutError()
return self.lock.acquire()
def __exit__(self, type, value, tb):
self.lock.release()
def acquire(self):
self.lock.acquire()
logger.debug('Acquiring lock "%s" with pid %d' % (self.lock.name, os.getpid()))
if self.lock.acquire():
logger.debug('Acquired lock "%s" with pid %d' % (self.lock.name, os.getpid()))
return self
else:
logger.debug('Timeout lock "%s" with pid %d' % (self.lock.name, os.getpid()))
raise RedisLockTimeoutError()
def release(self):
logger.debug('Releasing lock "%s" with pid %d' % (self.lock.name, os.getpid()))
self.lock.release()
def hard_release(self) -> bool:
@ -32,4 +43,20 @@ class RedisLock:
Drops the lock, not looking if it is acquired by anyone.
:return: Boolean - if lock has been acquired before releasing or not
"""
logger.debug('Hard releasing lock "%s" with pid %d' % (self.lock.name, os.getpid()))
return bool(self.lock.redis.delete(self.lock.name))
def redis_zadd(redis_client, key, mapping, **kwargs):
"""
In redis-py 3.* interface of zadd changed to mapping
:return:
"""
import redis
if int(redis.__version__.split('.', 1)[0]) < 3:
# key, score1, value1, score2, value2, ...
items = chain(*((score, key) for key, score in mapping.items()))
else:
items = [mapping]
return redis_client.zadd(key, *items, **kwargs)

View File

@ -7,13 +7,18 @@ Important:
Storage should be able to restore current importing batch, if something goes wrong.
"""
import datetime
from itertools import chain
import logging
import os
from typing import Any, Optional, List, Tuple
from statsd.defaults.django import statsd
from django_clickhouse.redis import redis_zadd
from django_clickhouse.utils import check_pid
from .configuration import config
from .exceptions import ConfigurationError
from .exceptions import ConfigurationError, RedisLockTimeoutError
logger = logging.getLogger('django-clickhouse')
class Storage:
@ -60,6 +65,17 @@ class Storage:
"""
key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key)
statsd.gauge(key, -batch_size, delta=True)
logger.debug('Removed %d items (%s) from storage' % (batch_size, import_key))
def operations_count(self, import_key, **kwargs):
# type: (str, **dict) -> int
"""
Returns sync queue size
:param import_key: A key, returned by ClickHouseModel.get_import_key() method
:param kwargs: Storage dependant arguments
:return: Number of records in queue
"""
raise NotImplemented()
def get_operations(self, import_key, count, **kwargs):
# type: (str, int, **dict) -> List[Tuple[str, str]]
@ -98,6 +114,7 @@ class Storage:
statsd_key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key)
statsd.gauge(statsd_key, len(pks), delta=True)
logger.debug('Registered %d items (%s) to storage' % (len(pks), import_key))
return self.register_operations(import_key, operation, *pks)
@ -133,6 +150,7 @@ class RedisStorage(Storage):
REDIS_KEY_OPS_TEMPLATE = 'clickhouse_sync:operations:{import_key}'
REDIS_KEY_TS_TEMPLATE = 'clickhouse_sync:timstamp:{import_key}'
REDIS_KEY_LOCK = 'clickhouse_sync:lock:{import_key}'
REDIS_KEY_LOCK_PID = 'clickhouse_sync:lock_pid:{import_key}'
REDIS_KEY_LAST_SYNC_TS = 'clickhouse_sync:last_sync:{import_key}'
def __init__(self):
@ -148,10 +166,12 @@ class RedisStorage(Storage):
key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
score = datetime.datetime.now().timestamp()
items = chain(*((score, '%s:%s' % (operation, str(pk))) for pk in pks))
items = {'%s:%s' % (operation, str(pk)): score for pk in pks}
redis_zadd(self._redis, key, items)
# key, score1, value1, score2, value2, ...
self._redis.zadd(key, *items)
def operations_count(self, import_key, **kwargs):
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
return self._redis.zcard(ops_key)
def get_operations(self, import_key, count, **kwargs):
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
@ -173,7 +193,8 @@ class RedisStorage(Storage):
from .redis import RedisLock
lock_key = self.REDIS_KEY_LOCK.format(import_key=import_key)
lock_timeout = kwargs.get('lock_timeout', config.SYNC_DELAY * 10)
self._lock = RedisLock(self._redis, lock_key, timeout=lock_timeout, blocking_timeout=0)
self._lock = RedisLock(self._redis, lock_key, timeout=lock_timeout, blocking_timeout=0.1,
thread_local=False)
return self._lock
@ -181,7 +202,20 @@ class RedisStorage(Storage):
# Block process to be single threaded. Default sync delay is 10 * default sync delay.
# It can be changed for model, by passing `lock_timeout` argument to pre_sync
lock = self.get_lock(import_key, **kwargs)
lock.acquire()
lock_pid_key = self.REDIS_KEY_LOCK_PID.format(import_key=import_key)
try:
lock.acquire()
self._redis.set(lock_pid_key, os.getpid())
except RedisLockTimeoutError:
# Lock is busy. But If the process has been killed, I don't want to wait any more.
# Let's check if pid exists
pid = int(self._redis.get(lock_pid_key))
if not check_pid(pid):
logger.debug('Hard releasing lock "%s" locked by pid %d' % (import_key, pid))
lock.hard_release()
self.pre_sync(import_key, **kwargs)
else:
raise
def post_sync(self, import_key, **kwargs):
ts_key = self.REDIS_KEY_TS_TEMPLATE.format(import_key=import_key)
@ -197,7 +231,7 @@ class RedisStorage(Storage):
self.post_batch_removed(import_key, batch_size)
# unblock lock after sync completed
self._lock.release()
self.get_lock(import_key, **kwargs).release()
def flush(self):
key_tpls = [

View File

@ -1,4 +1,5 @@
import datetime
import os
from itertools import chain
from typing import Union, Any, Optional, TypeVar, Set, Dict, Iterable
@ -121,3 +122,15 @@ def model_to_dict(instance, fields=None, exclude_fields=None):
data[name] = val
return data
def check_pid(pid):
"""
Check For the existence of a unix pid.
"""
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True

View File

@ -0,0 +1,61 @@
import sys
import argparse
import django
import os
from time import sleep
import datetime
# set Django environment
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'tests.settings')
django.setup()
# This imports must be after django activation
from django.db.models import F
from tests.clickhouse_models import ClickHouseCollapseTestModel
from tests.models import TestModel
def create(batch_size=1000, test_time=60, period=1, **kwargs):
for iteration in range(int(test_time / period)):
TestModel.objects.db_manager('test_db').bulk_create([
TestModel(created_date='2018-01-01', value=iteration * batch_size + i) for i in range(batch_size)
])
sleep(period)
def update(batch_size=1000, test_time=60, period=1, **kwargs):
for iteration in range(int(test_time / period)):
TestModel.objects.db_manager('test_db').filter(id__gte=iteration * batch_size).annotate(idmod10=F('id') % 10). \
filter(idmod10=0).update(value=-1)
sleep(period)
def delete(batch_size=1000, test_time=60, period=1, **kwargs):
for iteration in range(int(test_time / period)):
TestModel.objects.db_manager('test_db').filter(id__gte=iteration * batch_size).annotate(idmod10=F('id') % 10). \
filter(idmod10=1).delete()
sleep(period)
def sync(period=1, test_time=60, **kwargs):
start = datetime.datetime.now()
while (datetime.datetime.now() - start).total_seconds() < test_time:
ClickHouseCollapseTestModel.sync_batch_from_storage()
sleep(period)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('process', type=str, choices=('sync', 'create', 'update', 'delete'))
parser.add_argument('--test-time', type=int, required=False, default=60)
parser.add_argument('--batch-size', type=str, required=False, default=1000)
parser.add_argument('--period', type=str, required=False, default=1)
params = vars(parser.parse_args())
func_name = params['process']
method = locals()[func_name]
method(**params)

View File

@ -14,25 +14,9 @@ DATABASES = {
},
# I need separate connections for multiprocessing tests
'create': {
'test_db': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'test',
'USER': 'test',
'PASSWORD': 'test',
'HOST': '127.0.0.1',
'PORT': '5432'
},
'update': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'test',
'USER': 'test',
'PASSWORD': 'test',
'HOST': '127.0.0.1',
'PORT': '5432'
},
'delete': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'test',
'NAME': 'test_test',
'USER': 'test',
'PASSWORD': 'test',
'HOST': '127.0.0.1',

View File

@ -62,3 +62,10 @@ class StorageTest(TestCase):
dt = datetime.datetime.now()
self.storage.set_last_sync_time('test', dt)
self.assertEqual(dt, self.storage.get_last_sync_time('test'))
def test_operations_count(self):
self.storage.register_operations_wrapped('test', 'insert', 100500)
self.storage.register_operations_wrapped('test', 'insert', 100501)
self.assertEqual(2, self.storage.operations_count('test'))
self.storage.register_operations_wrapped('test', 'insert', 100502)
self.assertEqual(3, self.storage.operations_count('test'))

View File

@ -1,13 +1,15 @@
import datetime
import signal
from multiprocessing import Process
from subprocess import Popen
from time import sleep
from unittest import skip, expectedFailure
import os
from django.db import connections as django_connections
from django.db.models import F
from django.test import TransactionTestCase
from django.utils.timezone import now
from random import randint
from django_clickhouse.database import connections
from django_clickhouse.migrations import migrate_app
@ -98,80 +100,48 @@ class SyncTest(TransactionTestCase):
self.assertEqual(obj.id, synced_data[0].id)
@skip("This doesn't work due to different threads connection problems")
# @skip("This doesn't work due to different threads connection problems")
class KillTest(TransactionTestCase):
TEST_TIME = 30
start = datetime.datetime.now()
maxDiff = None
def setUp(self):
ClickHouseTestModel.get_storage().flush()
@staticmethod
def _create_process(count=1000, test_time=60, period=1):
for iteration in range(test_time):
TestModel.objects.using('create').bulk_create([
TestModel(created_date='2018-01-01', value=iteration * count + i) for i in range(count)])
django_connections['create'].close()
sleep(period)
@staticmethod
def _update_process(count=1000, test_time=60, period=1):
for iteration in range(test_time):
TestModel.objects.using('update').filter(id__gte=iteration * count).annotate(idmod10=F('id') % 10). \
filter(idmod10=0).update(value=-1)
django_connections['update'].close()
sleep(period)
@staticmethod
def _delete_process(count=1000, test_time=60, period=1):
for iteration in range(test_time):
TestModel.objects.using('delete').filter(id__gte=iteration * count).annotate(idmod10=F('id') % 10). \
filter(idmod10=1).delete()
django_connections['delete'].close()
sleep(period)
@classmethod
def _sync_process(cls, period=1):
while (datetime.datetime.now() - cls.start).total_seconds() < cls.TEST_TIME:
ClickHouseCollapseTestModel.sync_batch_from_storage()
sleep(period)
def _kill_process(self, p):
# https://stackoverflow.com/questions/47553120/kill-a-multiprocessing-pool-with-sigkill-instead-of-sigterm-i-think
os.kill(p.pid, signal.SIGKILL)
p.terminate()
connections['default'].drop_database()
connections['default'].create_database()
migrate_app('tests', 'default')
def _check_data(self):
ClickHouseCollapseTestModel.sync_batch_from_storage()
# Sync all data that is not synced
while ClickHouseCollapseTestModel.get_storage().operations_count(ClickHouseCollapseTestModel.get_import_key()):
ClickHouseCollapseTestModel.sync_batch_from_storage()
ch_data = list(connections['default'].select('SELECT * FROM $table FINAL ORDER BY id',
model_class=ClickHouseCollapseTestModel))
pg_data = list(TestModel.objects.all().order_by('id'))
self.assertEqual(len(pg_data), len(ch_data))
serizlier = ClickHouseCollapseTestModel.get_django_model_serializer()
self.assertListEqual(ch_data, [serizlier.serialize(item) for item in pg_data])
serializer = ClickHouseCollapseTestModel.get_django_model_serializer()
self.assertListEqual(ch_data, [serializer.serialize(item) for item in pg_data])
@classmethod
def sync_iteration(cls):
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
p_sync = Popen(['python3', test_script, 'sync', '--test-time', str(cls.TEST_TIME)])
sleep(randint(0, 5))
print('Killing: %d' % p_sync.pid)
p_sync.kill()
def test_kills(self):
p_create = Process(target=self._create_process, kwargs={'test_time': 5})
p_update = Process(target=self._update_process, kwargs={'test_time': 5})
p_delete = Process(target=self._delete_process, kwargs={'test_time': 5})
p_sync = Process(target=self._sync_process)
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
p_create = Popen(['python3', test_script, 'create', '--test-time', str(self.TEST_TIME)])
p_update = Popen(['python3', test_script, 'update', '--test-time', str(self.TEST_TIME)])
self.start = datetime.datetime.now()
p_create.start()
p_update.start()
p_delete.start()
p_sync.start()
start = now()
while (now() - start).total_seconds() < self.TEST_TIME:
self.sync_iteration()
# while (datetime.datetime.now() - start).total_seconds() < self.TEST_TIME:
# self._kill_process(p_sync)
# p_sync.start()
# sleep(random.randint(0, 5))
p_create.wait()
p_update.wait()
p_create.join()
p_update.join()
p_delete.join()
p_sync.join()
# self._check_data()
self._check_data()