mirror of
https://github.com/carrotquest/django-clickhouse.git
synced 2024-11-24 10:03:46 +03:00
283 lines
12 KiB
Python
283 lines
12 KiB
Python
import datetime
|
|
import logging
|
|
from subprocess import Popen
|
|
from time import sleep
|
|
from unittest import expectedFailure, skip, mock
|
|
|
|
import os
|
|
from django.test import TransactionTestCase
|
|
from django.test.testcases import TestCase
|
|
from django.utils.timezone import now
|
|
from random import randint
|
|
|
|
from django_clickhouse.database import connections
|
|
from django_clickhouse.migrations import migrate_app
|
|
from django_clickhouse.storages import RedisStorage
|
|
from django_clickhouse.tasks import sync_clickhouse_model, clickhouse_auto_sync
|
|
from django_clickhouse.utils import int_ranges
|
|
from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel, ClickHouseMultiTestModel
|
|
from tests.models import TestModel
|
|
|
|
logger = logging.getLogger('django-clickhouse')
|
|
|
|
|
|
class SyncTest(TransactionTestCase):
|
|
def setUp(self):
|
|
self.db = ClickHouseCollapseTestModel.get_database()
|
|
self.db.drop_database()
|
|
self.db.create_database()
|
|
migrate_app('tests', 'default')
|
|
ClickHouseTestModel.get_storage().flush()
|
|
|
|
def test_simple(self):
|
|
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
|
|
ClickHouseTestModel.sync_batch_from_storage()
|
|
|
|
synced_data = list(ClickHouseTestModel.objects.all())
|
|
self.assertEqual(1, len(synced_data))
|
|
self.assertEqual(obj.created_date, synced_data[0].created_date)
|
|
self.assertEqual(obj.value, synced_data[0].value)
|
|
self.assertEqual(obj.id, synced_data[0].id)
|
|
|
|
def test_collapsing_update_by_final(self):
|
|
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
|
|
obj.value = 2
|
|
obj.save()
|
|
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
|
|
|
# insert and update came before sync. Only one item will be inserted
|
|
synced_data = list(ClickHouseCollapseTestModel.objects.all())
|
|
self.assertEqual(1, len(synced_data))
|
|
self.assertEqual(obj.value, synced_data[0].value)
|
|
self.assertEqual(obj.id, synced_data[0].id)
|
|
|
|
obj.value = 3
|
|
obj.save()
|
|
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
|
|
|
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
|
|
self.assertGreaterEqual(len(synced_data), 1)
|
|
self.assertEqual(obj.value, synced_data[0].value)
|
|
self.assertEqual(obj.id, synced_data[0].id)
|
|
|
|
def test_collapsing_update_by_version(self):
|
|
ClickHouseCollapseTestModel.engine.version_col = 'version'
|
|
|
|
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
|
|
obj.value = 2
|
|
obj.save()
|
|
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
|
|
|
# insert and update came before sync. Only one item will be inserted
|
|
synced_data = list(ClickHouseCollapseTestModel.objects.all())
|
|
self.assertEqual(1, len(synced_data))
|
|
self.assertEqual(obj.value, synced_data[0].value)
|
|
self.assertEqual(obj.id, synced_data[0].id)
|
|
|
|
obj.value = 3
|
|
obj.save()
|
|
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
|
|
|
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
|
|
self.assertGreaterEqual(len(synced_data), 1)
|
|
self.assertEqual(obj.value, synced_data[0].value)
|
|
self.assertEqual(obj.id, synced_data[0].id)
|
|
|
|
ClickHouseCollapseTestModel.engine.version_col = None
|
|
|
|
@expectedFailure
|
|
def test_collapsing_delete(self):
|
|
obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
|
|
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
|
obj.delete()
|
|
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
|
|
|
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
|
|
synced_data = list(ClickHouseCollapseTestModel.objects.all())
|
|
self.assertEqual(0, len(synced_data))
|
|
|
|
def test_multi_model(self):
|
|
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
|
|
obj.value = 2
|
|
obj.save()
|
|
ClickHouseMultiTestModel.sync_batch_from_storage()
|
|
|
|
synced_data = list(ClickHouseTestModel.objects.all())
|
|
self.assertEqual(1, len(synced_data))
|
|
self.assertEqual(obj.created_date, synced_data[0].created_date)
|
|
self.assertEqual(obj.value, synced_data[0].value)
|
|
self.assertEqual(obj.id, synced_data[0].id)
|
|
|
|
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
|
|
synced_data = list(ClickHouseCollapseTestModel.objects.all())
|
|
self.assertEqual(1, len(synced_data))
|
|
self.assertEqual(obj.value, synced_data[0].value)
|
|
self.assertEqual(obj.id, synced_data[0].id)
|
|
|
|
obj.value = 3
|
|
obj.save()
|
|
ClickHouseMultiTestModel.sync_batch_from_storage()
|
|
|
|
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
|
|
self.assertGreaterEqual(len(synced_data), 1)
|
|
self.assertEqual(obj.value, synced_data[0].value)
|
|
self.assertEqual(obj.id, synced_data[0].id)
|
|
|
|
|
|
class KillTest(TransactionTestCase):
|
|
TEST_TIME = 60
|
|
maxDiff = None
|
|
|
|
def setUp(self):
|
|
ClickHouseTestModel.get_storage().flush()
|
|
connections['default'].drop_database()
|
|
connections['default'].create_database()
|
|
migrate_app('tests', 'default')
|
|
|
|
# Disable sync for not interesting models
|
|
ClickHouseMultiTestModel.sync_enabled = False
|
|
ClickHouseTestModel.sync_enabled = False
|
|
|
|
def tearDown(self):
|
|
# Disable sync for not interesting models
|
|
ClickHouseMultiTestModel.sync_enabled = True
|
|
ClickHouseTestModel.sync_enabled = True
|
|
|
|
def _check_data(self):
|
|
logger.debug('django-clickhouse: syncing left test data')
|
|
|
|
# Sync all data that is not synced
|
|
# Data is expected to be in test_db, not default. So we need to call subprocess
|
|
# in order everything works correctly
|
|
import_key = ClickHouseCollapseTestModel.get_import_key()
|
|
storage = ClickHouseCollapseTestModel.get_storage()
|
|
sync_left = storage.operations_count(import_key)
|
|
while sync_left:
|
|
logger.debug('django-clickhouse: final sync (%d left)' % sync_left)
|
|
self.sync_iteration(False)
|
|
sync_left = storage.operations_count(import_key)
|
|
|
|
logger.debug('django_clickhouse: sync finished')
|
|
|
|
ch_data = list(connections['default'].select_tuples('SELECT * FROM $table FINAL ORDER BY id',
|
|
model_class=ClickHouseCollapseTestModel))
|
|
logger.debug('django_clickhouse: got clickhouse data')
|
|
|
|
pg_data = list(TestModel.objects.all().order_by('id'))
|
|
logger.debug('django_clickhouse: got postgres data')
|
|
|
|
if len(pg_data) != len(ch_data):
|
|
absent_ids = set(item.id for item in pg_data) - set(item.id for item in ch_data)
|
|
logger.debug('django_clickhouse: absent ranges: %s (min: %d, max: %d)'
|
|
% (','.join(('(%d, %d)' % r) for r in int_ranges(absent_ids)),
|
|
min(item.id for item in pg_data), max(item.id for item in pg_data)))
|
|
|
|
self.assertEqual(len(pg_data), len(ch_data))
|
|
for pg_item, ch_item in zip(pg_data, ch_data):
|
|
self.assertEqual(ch_item.id, pg_item.id)
|
|
self.assertEqual(ch_item.value, pg_item.value)
|
|
|
|
@classmethod
|
|
def sync_iteration(cls, kill=True):
|
|
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
|
|
if kill:
|
|
args = ['--test-time', str(cls.TEST_TIME)]
|
|
else:
|
|
args = ['--once', 'true']
|
|
p_sync = Popen(['python3', test_script, 'sync'] + args)
|
|
|
|
if kill:
|
|
sleep(randint(0, 5))
|
|
logger.debug('django-clickhouse: test killing: %d' % p_sync.pid)
|
|
p_sync.kill()
|
|
else:
|
|
p_sync.wait()
|
|
|
|
def test_kills(self):
|
|
test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
|
|
p_create = Popen(['python3', test_script, 'create', '--test-time', str(self.TEST_TIME)])
|
|
|
|
# Updates must be slower than inserts, or they will do nothing
|
|
p_update = Popen(['python3', test_script, 'update', '--test-time', str(self.TEST_TIME), '--batch-size', '500'])
|
|
|
|
start = now()
|
|
while (now() - start).total_seconds() < self.TEST_TIME:
|
|
self.sync_iteration()
|
|
|
|
p_create.wait()
|
|
p_update.wait()
|
|
|
|
self._check_data()
|
|
|
|
|
|
@mock.patch.object(ClickHouseTestModel, 'sync_batch_from_storage')
|
|
class SyncClickHouseModelTest(TestCase):
|
|
def test_model_as_class(self, sync_mock):
|
|
sync_clickhouse_model(ClickHouseTestModel)
|
|
sync_mock.assert_called()
|
|
|
|
def test_model_as_string(self, sync_mock):
|
|
sync_clickhouse_model('tests.clickhouse_models.ClickHouseTestModel')
|
|
sync_mock.assert_called()
|
|
|
|
@mock.patch.object(RedisStorage, 'set_last_sync_time')
|
|
def test_last_sync_time_called(self, storage_mock, _):
|
|
sync_clickhouse_model(ClickHouseTestModel)
|
|
storage_mock.assert_called()
|
|
self.assertEqual(2, len(storage_mock.call_args))
|
|
self.assertEqual(storage_mock.call_args[0][0], 'ClickHouseTestModel')
|
|
self.assertIsInstance(storage_mock.call_args[0][1], datetime.datetime)
|
|
|
|
|
|
@mock.patch.object(sync_clickhouse_model, 'delay')
|
|
class ClickHouseAutoSyncTest(TestCase):
|
|
@mock.patch('django_clickhouse.tasks.get_subclasses', return_value=[ClickHouseTestModel])
|
|
@mock.patch.object(ClickHouseTestModel, 'need_sync', return_value=True)
|
|
def test_needs_sync_enabled(self, need_sync_mock, get_subclasses_mock, sync_delay_mock):
|
|
clickhouse_auto_sync()
|
|
sync_delay_mock.assert_called_with('tests.clickhouse_models.ClickHouseTestModel')
|
|
|
|
@mock.patch('django_clickhouse.tasks.get_subclasses', return_value=[ClickHouseTestModel])
|
|
@mock.patch.object(ClickHouseTestModel, 'need_sync', return_value=False)
|
|
def test_does_not_need_sync(self, need_sync_mock, get_subclasses_mock, sync_delay_mock):
|
|
clickhouse_auto_sync()
|
|
sync_delay_mock.assert_not_called()
|
|
|
|
@mock.patch('django_clickhouse.tasks.get_subclasses',
|
|
return_value=[ClickHouseTestModel, ClickHouseCollapseTestModel])
|
|
@mock.patch.object(ClickHouseTestModel, 'need_sync', return_value=True)
|
|
@mock.patch.object(ClickHouseCollapseTestModel, 'need_sync', return_value=True)
|
|
def test_multiple_models(self, need_sync_1_mock, need_sync_2_mock, get_subclasses_mock, sync_delay_mock):
|
|
clickhouse_auto_sync()
|
|
self.assertEqual(2, sync_delay_mock.call_count)
|
|
|
|
|
|
# Used to profile sync execution time. Disabled by default
|
|
@skip
|
|
class ProfileTest(TransactionTestCase):
|
|
BATCH_SIZE = 10000
|
|
|
|
def setUp(self):
|
|
ClickHouseTestModel.get_storage().flush()
|
|
connections['default'].drop_database()
|
|
connections['default'].create_database()
|
|
migrate_app('tests', 'default')
|
|
|
|
# Disable sync for not interesting models
|
|
ClickHouseMultiTestModel.sync_enabled = False
|
|
ClickHouseTestModel.sync_enabled = False
|
|
|
|
TestModel.objects.bulk_create([
|
|
TestModel(created=datetime.datetime.now(), created_date='2018-01-01', value=i)
|
|
for i in range(self.BATCH_SIZE)
|
|
])
|
|
|
|
def tearDown(self):
|
|
# Disable sync for not interesting models
|
|
ClickHouseMultiTestModel.sync_enabled = True
|
|
ClickHouseTestModel.sync_enabled = True
|
|
|
|
def test_sync(self):
|
|
ClickHouseCollapseTestModel.sync_batch_size = self.BATCH_SIZE
|
|
ClickHouseCollapseTestModel.sync_batch_from_storage()
|