django-clickhouse/tests/test_sync.py

182 lines
7.3 KiB
Python
Raw Normal View History

import datetime
2018-11-29 17:15:27 +03:00
import logging
from subprocess import Popen
from time import sleep
from unittest import expectedFailure, skip
import os
from django.test import TransactionTestCase
from django.utils.timezone import now
from random import randint
from django_clickhouse.database import connections
from django_clickhouse.migrations import migrate_app
from django_clickhouse.utils import int_ranges
2018-11-26 17:17:58 +03:00
from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel, ClickHouseMultiTestModel
from tests.models import TestModel
2018-11-29 17:15:27 +03:00
logger = logging.getLogger('django-clickhouse')
class SyncTest(TransactionTestCase):
def setUp(self):
self.db = connections['default']
self.db.drop_database()
self.db.db_exists = False
self.db.create_database()
migrate_app('tests', 'default')
ClickHouseTestModel.get_storage().flush()
def test_simple(self):
obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
ClickHouseTestModel.sync_batch_from_storage()
2018-11-28 13:33:01 +03:00
synced_data = list(ClickHouseTestModel.objects.all())
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
def test_collapsing_update(self):
obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
obj.value = 2
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
2018-11-28 13:33:01 +03:00
synced_data = list(ClickHouseCollapseTestModel.objects.all())
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
obj.value = 3
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
2018-11-16 15:41:54 +03:00
self.assertGreaterEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
@expectedFailure
def test_collapsing_delete(self):
obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
ClickHouseCollapseTestModel.sync_batch_from_storage()
obj.delete()
ClickHouseCollapseTestModel.sync_batch_from_storage()
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
2018-11-28 13:33:01 +03:00
synced_data = list(ClickHouseCollapseTestModel.objects.all())
self.assertEqual(0, len(synced_data))
2018-11-26 17:17:58 +03:00
def test_multi_model(self):
obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
obj.value = 2
obj.save()
ClickHouseMultiTestModel.sync_batch_from_storage()
2018-11-28 13:33:01 +03:00
synced_data = list(ClickHouseTestModel.objects.all())
2018-11-26 17:17:58 +03:00
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
2018-11-28 13:33:01 +03:00
synced_data = list(ClickHouseCollapseTestModel.objects.all())
2018-11-26 17:17:58 +03:00
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
obj.value = 3
obj.save()
ClickHouseMultiTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
self.assertGreaterEqual(1, len(synced_data))
self.assertEqual(obj.created_date, synced_data[0].created_date)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
class KillTest(TransactionTestCase):
TEST_TIME = 60
maxDiff = None
def setUp(self):
ClickHouseTestModel.get_storage().flush()
connections['default'].drop_database()
connections['default'].create_database()
migrate_app('tests', 'default')
# Disable sync for not interesting models
ClickHouseMultiTestModel.sync_enabled = False
ClickHouseTestModel.sync_enabled = False
def tearDown(self):
# Disable sync for not interesting models
ClickHouseMultiTestModel.sync_enabled = True
ClickHouseTestModel.sync_enabled = True
def _check_data(self):
logger.debug('django-clickhouse: syncing left test data')
# Sync all data that is not synced
# Data is expected to be in test_db, not default. So we need to call subprocess
# in order everything works correctly
import_key = ClickHouseCollapseTestModel.get_import_key()
storage = ClickHouseCollapseTestModel.get_storage()
sync_left = storage.operations_count(import_key)
while sync_left:
logger.debug('django-clickhouse: final sync (%d left)' % sync_left)
self.sync_iteration(False)
sync_left = storage.operations_count(import_key)
ch_data = list(connections['default'].select('SELECT * FROM $table FINAL ORDER BY id',
model_class=ClickHouseCollapseTestModel))
pg_data = list(TestModel.objects.all().order_by('id'))
if len(pg_data) != len(ch_data):
absent_ids = set(item.id for item in pg_data) - set(item.id for item in ch_data)
logger.debug('django_clickhouse: absent ranges: %s (min: %d, max: %d)'
% (','.join(('(%d, %d)' % r) for r in int_ranges(absent_ids)),
min(item.id for item in pg_data), max(item.id for item in pg_data)))
self.assertEqual(len(pg_data), len(ch_data))
serializer = ClickHouseCollapseTestModel.get_django_model_serializer()
self.assertListEqual(ch_data, [serializer.serialize(item) for item in pg_data])
@classmethod
def sync_iteration(cls, kill=True):
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
if kill:
args = ['--test-time', str(cls.TEST_TIME)]
else:
args = ['--once', 'true']
p_sync = Popen(['python3', test_script, 'sync'] + args)
if kill:
sleep(randint(0, 5))
logger.debug('django-clickhouse: test killing: %d' % p_sync.pid)
p_sync.kill()
else:
p_sync.wait()
def test_kills(self):
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
p_create = Popen(['python3', test_script, 'create', '--test-time', str(self.TEST_TIME)])
# Updates must be slower than inserts, or they will do nothing
p_update = Popen(['python3', test_script, 'update', '--test-time', str(self.TEST_TIME), '--batch-size', '500'])
start = now()
while (now() - start).total_seconds() < self.TEST_TIME:
self.sync_iteration()
p_create.wait()
2018-11-29 17:15:27 +03:00
p_update.wait()
self._check_data()