django-clickhouse/tests/test_sync.py

283 lines
12 KiB
Python
Raw Permalink Normal View History

import datetime
2018-11-29 17:15:27 +03:00
import logging
from subprocess import Popen
from time import sleep
from unittest import expectedFailure, skip, mock
import os
from django.test import TransactionTestCase
from django.test.testcases import TestCase
from django.utils.timezone import now
from random import randint
from django_clickhouse.database import connections
from django_clickhouse.migrations import migrate_app
from django_clickhouse.storages import RedisStorage
from django_clickhouse.tasks import sync_clickhouse_model, clickhouse_auto_sync
from django_clickhouse.utils import int_ranges
2018-11-26 17:17:58 +03:00
from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel, ClickHouseMultiTestModel
from tests.models import TestModel
2018-11-29 17:15:27 +03:00
logger = logging.getLogger('django-clickhouse')
class SyncTest(TransactionTestCase):
def setUp(self):
2019-02-03 21:05:54 +03:00
self.db = ClickHouseCollapseTestModel.get_database()
self.db.drop_database()
self.db.create_database()
migrate_app('tests', 'default')
ClickHouseTestModel.get_storage().flush()
def test_simple(self):
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
ClickHouseTestModel.sync_batch_from_storage()
2018-11-28 13:33:01 +03:00
synced_data = list(ClickHouseTestModel.objects.all())
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
def test_collapsing_update_by_final(self):
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
obj.value = 2
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
2019-02-03 21:05:54 +03:00
# insert and update came before sync. Only one item will be inserted
2018-11-28 13:33:01 +03:00
synced_data = list(ClickHouseCollapseTestModel.objects.all())
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
obj.value = 3
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
self.assertGreaterEqual(len(synced_data), 1)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
def test_collapsing_update_by_version(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
obj.value = 2
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
2019-02-03 21:05:54 +03:00
# insert and update came before sync. Only one item will be inserted
synced_data = list(ClickHouseCollapseTestModel.objects.all())
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
obj.value = 3
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
self.assertGreaterEqual(len(synced_data), 1)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
ClickHouseCollapseTestModel.engine.version_col = None
@expectedFailure
def test_collapsing_delete(self):
obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
ClickHouseCollapseTestModel.sync_batch_from_storage()
obj.delete()
ClickHouseCollapseTestModel.sync_batch_from_storage()
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
2018-11-28 13:33:01 +03:00
synced_data = list(ClickHouseCollapseTestModel.objects.all())
self.assertEqual(0, len(synced_data))
2018-11-26 17:17:58 +03:00
def test_multi_model(self):
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
2018-11-26 17:17:58 +03:00
obj.value = 2
obj.save()
ClickHouseMultiTestModel.sync_batch_from_storage()
2018-11-28 13:33:01 +03:00
synced_data = list(ClickHouseTestModel.objects.all())
2018-11-26 17:17:58 +03:00
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
2018-11-28 13:33:01 +03:00
synced_data = list(ClickHouseCollapseTestModel.objects.all())
2018-11-26 17:17:58 +03:00
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
obj.value = 3
obj.save()
ClickHouseMultiTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
self.assertGreaterEqual(len(synced_data), 1)
2018-11-26 17:17:58 +03:00
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
class KillTest(TransactionTestCase):
TEST_TIME = 60
maxDiff = None
def setUp(self):
ClickHouseTestModel.get_storage().flush()
connections['default'].drop_database()
connections['default'].create_database()
migrate_app('tests', 'default')
# Disable sync for not interesting models
ClickHouseMultiTestModel.sync_enabled = False
ClickHouseTestModel.sync_enabled = False
def tearDown(self):
# Disable sync for not interesting models
ClickHouseMultiTestModel.sync_enabled = True
ClickHouseTestModel.sync_enabled = True
def _check_data(self):
logger.debug('django-clickhouse: syncing left test data')
# Sync all data that is not synced
# Data is expected to be in test_db, not default. So we need to call subprocess
# in order everything works correctly
import_key = ClickHouseCollapseTestModel.get_import_key()
storage = ClickHouseCollapseTestModel.get_storage()
sync_left = storage.operations_count(import_key)
while sync_left:
logger.debug('django-clickhouse: final sync (%d left)' % sync_left)
self.sync_iteration(False)
sync_left = storage.operations_count(import_key)
logger.debug('django_clickhouse: sync finished')
ch_data = list(connections['default'].select_tuples('SELECT * FROM $table FINAL ORDER BY id',
model_class=ClickHouseCollapseTestModel))
logger.debug('django_clickhouse: got clickhouse data')
pg_data = list(TestModel.objects.all().order_by('id'))
logger.debug('django_clickhouse: got postgres data')
if len(pg_data) != len(ch_data):
absent_ids = set(item.id for item in pg_data) - set(item.id for item in ch_data)
logger.debug('django_clickhouse: absent ranges: %s (min: %d, max: %d)'
% (','.join(('(%d, %d)' % r) for r in int_ranges(absent_ids)),
min(item.id for item in pg_data), max(item.id for item in pg_data)))
self.assertEqual(len(pg_data), len(ch_data))
for pg_item, ch_item in zip(pg_data, ch_data):
2019-02-03 19:05:01 +03:00
self.assertEqual(ch_item.id, pg_item.id)
self.assertEqual(ch_item.value, pg_item.value)
@classmethod
def sync_iteration(cls, kill=True):
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
if kill:
args = ['--test-time', str(cls.TEST_TIME)]
else:
args = ['--once', 'true']
p_sync = Popen(['python3', test_script, 'sync'] + args)
if kill:
sleep(randint(0, 5))
logger.debug('django-clickhouse: test killing: %d' % p_sync.pid)
p_sync.kill()
else:
p_sync.wait()
def test_kills(self):
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
p_create = Popen(['python3', test_script, 'create', '--test-time', str(self.TEST_TIME)])
# Updates must be slower than inserts, or they will do nothing
p_update = Popen(['python3', test_script, 'update', '--test-time', str(self.TEST_TIME), '--batch-size', '500'])
start = now()
while (now() - start).total_seconds() < self.TEST_TIME:
self.sync_iteration()
p_create.wait()
2018-11-29 17:15:27 +03:00
p_update.wait()
self._check_data()
2019-02-03 19:19:34 +03:00
@mock.patch.object(ClickHouseTestModel, 'sync_batch_from_storage')
class SyncClickHouseModelTest(TestCase):
def test_model_as_class(self, sync_mock):
sync_clickhouse_model(ClickHouseTestModel)
sync_mock.assert_called()
def test_model_as_string(self, sync_mock):
sync_clickhouse_model('tests.clickhouse_models.ClickHouseTestModel')
sync_mock.assert_called()
@mock.patch.object(RedisStorage, 'set_last_sync_time')
def test_last_sync_time_called(self, storage_mock, _):
sync_clickhouse_model(ClickHouseTestModel)
storage_mock.assert_called()
self.assertEqual(2, len(storage_mock.call_args))
self.assertEqual(storage_mock.call_args[0][0], 'ClickHouseTestModel')
self.assertIsInstance(storage_mock.call_args[0][1], datetime.datetime)
@mock.patch.object(sync_clickhouse_model, 'delay')
class ClickHouseAutoSyncTest(TestCase):
@mock.patch('django_clickhouse.tasks.get_subclasses', return_value=[ClickHouseTestModel])
@mock.patch.object(ClickHouseTestModel, 'need_sync', return_value=True)
def test_needs_sync_enabled(self, need_sync_mock, get_subclasses_mock, sync_delay_mock):
clickhouse_auto_sync()
sync_delay_mock.assert_called_with('tests.clickhouse_models.ClickHouseTestModel')
@mock.patch('django_clickhouse.tasks.get_subclasses', return_value=[ClickHouseTestModel])
@mock.patch.object(ClickHouseTestModel, 'need_sync', return_value=False)
def test_does_not_need_sync(self, need_sync_mock, get_subclasses_mock, sync_delay_mock):
clickhouse_auto_sync()
sync_delay_mock.assert_not_called()
@mock.patch('django_clickhouse.tasks.get_subclasses',
return_value=[ClickHouseTestModel, ClickHouseCollapseTestModel])
@mock.patch.object(ClickHouseTestModel, 'need_sync', return_value=True)
@mock.patch.object(ClickHouseCollapseTestModel, 'need_sync', return_value=True)
def test_multiple_models(self, need_sync_1_mock, need_sync_2_mock, get_subclasses_mock, sync_delay_mock):
clickhouse_auto_sync()
self.assertEqual(2, sync_delay_mock.call_count)
2019-02-03 19:19:34 +03:00
# Used to profile sync execution time. Disabled by default
@skip
class ProfileTest(TransactionTestCase):
BATCH_SIZE = 10000
def setUp(self):
ClickHouseTestModel.get_storage().flush()
connections['default'].drop_database()
connections['default'].create_database()
migrate_app('tests', 'default')
# Disable sync for not interesting models
ClickHouseMultiTestModel.sync_enabled = False
ClickHouseTestModel.sync_enabled = False
TestModel.objects.bulk_create([
TestModel(created=datetime.datetime.now(), created_date='2018-01-01', value=i)
for i in range(self.BATCH_SIZE)
])
def tearDown(self):
# Disable sync for not interesting models
ClickHouseMultiTestModel.sync_enabled = True
ClickHouseTestModel.sync_enabled = True
def test_sync(self):
ClickHouseCollapseTestModel.sync_batch_size = self.BATCH_SIZE
ClickHouseCollapseTestModel.sync_batch_from_storage()