mirror of
				https://github.com/carrotquest/django-clickhouse.git
				synced 2025-10-25 05:01:15 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			283 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			283 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import datetime
 | |
| import logging
 | |
| from subprocess import Popen
 | |
| from time import sleep
 | |
| from unittest import expectedFailure, skip, mock
 | |
| 
 | |
| import os
 | |
| from django.test import TransactionTestCase
 | |
| from django.test.testcases import TestCase
 | |
| from django.utils.timezone import now
 | |
| from random import randint
 | |
| 
 | |
| from django_clickhouse.database import connections
 | |
| from django_clickhouse.migrations import migrate_app
 | |
| from django_clickhouse.storages import RedisStorage
 | |
| from django_clickhouse.tasks import sync_clickhouse_model, clickhouse_auto_sync
 | |
| from django_clickhouse.utils import int_ranges
 | |
| from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel, ClickHouseMultiTestModel
 | |
| from tests.models import TestModel
 | |
| 
 | |
| logger = logging.getLogger('django-clickhouse')
 | |
| 
 | |
| 
 | |
| class SyncTest(TransactionTestCase):
 | |
|     def setUp(self):
 | |
|         self.db = ClickHouseCollapseTestModel.get_database()
 | |
|         self.db.drop_database()
 | |
|         self.db.create_database()
 | |
|         migrate_app('tests', 'default')
 | |
|         ClickHouseTestModel.get_storage().flush()
 | |
| 
 | |
|     def test_simple(self):
 | |
|         obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
 | |
|         ClickHouseTestModel.sync_batch_from_storage()
 | |
| 
 | |
|         synced_data = list(ClickHouseTestModel.objects.all())
 | |
|         self.assertEqual(1, len(synced_data))
 | |
|         self.assertEqual(obj.created_date, synced_data[0].created_date)
 | |
|         self.assertEqual(obj.value, synced_data[0].value)
 | |
|         self.assertEqual(obj.id, synced_data[0].id)
 | |
| 
 | |
|     def test_collapsing_update_by_final(self):
 | |
|         obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
 | |
|         obj.value = 2
 | |
|         obj.save()
 | |
|         ClickHouseCollapseTestModel.sync_batch_from_storage()
 | |
| 
 | |
|         # insert and update came before sync. Only one item will be inserted
 | |
|         synced_data = list(ClickHouseCollapseTestModel.objects.all())
 | |
|         self.assertEqual(1, len(synced_data))
 | |
|         self.assertEqual(obj.value, synced_data[0].value)
 | |
|         self.assertEqual(obj.id, synced_data[0].id)
 | |
| 
 | |
|         obj.value = 3
 | |
|         obj.save()
 | |
|         ClickHouseCollapseTestModel.sync_batch_from_storage()
 | |
| 
 | |
|         synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
 | |
|         self.assertGreaterEqual(len(synced_data), 1)
 | |
|         self.assertEqual(obj.value, synced_data[0].value)
 | |
|         self.assertEqual(obj.id, synced_data[0].id)
 | |
| 
 | |
|     def test_collapsing_update_by_version(self):
 | |
|         ClickHouseCollapseTestModel.engine.version_col = 'version'
 | |
| 
 | |
|         obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
 | |
|         obj.value = 2
 | |
|         obj.save()
 | |
|         ClickHouseCollapseTestModel.sync_batch_from_storage()
 | |
| 
 | |
|         # insert and update came before sync. Only one item will be inserted
 | |
|         synced_data = list(ClickHouseCollapseTestModel.objects.all())
 | |
|         self.assertEqual(1, len(synced_data))
 | |
|         self.assertEqual(obj.value, synced_data[0].value)
 | |
|         self.assertEqual(obj.id, synced_data[0].id)
 | |
| 
 | |
|         obj.value = 3
 | |
|         obj.save()
 | |
|         ClickHouseCollapseTestModel.sync_batch_from_storage()
 | |
| 
 | |
|         synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
 | |
|         self.assertGreaterEqual(len(synced_data), 1)
 | |
|         self.assertEqual(obj.value, synced_data[0].value)
 | |
|         self.assertEqual(obj.id, synced_data[0].id)
 | |
| 
 | |
|         ClickHouseCollapseTestModel.engine.version_col = None
 | |
| 
 | |
|     @expectedFailure
 | |
|     def test_collapsing_delete(self):
 | |
|         obj = TestModel.objects.create(value=1, created_date=datetime.date.today())
 | |
|         ClickHouseCollapseTestModel.sync_batch_from_storage()
 | |
|         obj.delete()
 | |
|         ClickHouseCollapseTestModel.sync_batch_from_storage()
 | |
| 
 | |
|         # sync_batch_from_storage uses FINAL, so data would be collapsed by now
 | |
|         synced_data = list(ClickHouseCollapseTestModel.objects.all())
 | |
|         self.assertEqual(0, len(synced_data))
 | |
| 
 | |
|     def test_multi_model(self):
 | |
|         obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
 | |
|         obj.value = 2
 | |
|         obj.save()
 | |
|         ClickHouseMultiTestModel.sync_batch_from_storage()
 | |
| 
 | |
|         synced_data = list(ClickHouseTestModel.objects.all())
 | |
|         self.assertEqual(1, len(synced_data))
 | |
|         self.assertEqual(obj.created_date, synced_data[0].created_date)
 | |
|         self.assertEqual(obj.value, synced_data[0].value)
 | |
|         self.assertEqual(obj.id, synced_data[0].id)
 | |
| 
 | |
|         # sync_batch_from_storage uses FINAL, so data would be collapsed by now
 | |
|         synced_data = list(ClickHouseCollapseTestModel.objects.all())
 | |
|         self.assertEqual(1, len(synced_data))
 | |
|         self.assertEqual(obj.value, synced_data[0].value)
 | |
|         self.assertEqual(obj.id, synced_data[0].id)
 | |
| 
 | |
|         obj.value = 3
 | |
|         obj.save()
 | |
|         ClickHouseMultiTestModel.sync_batch_from_storage()
 | |
| 
 | |
|         synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
 | |
|         self.assertGreaterEqual(len(synced_data), 1)
 | |
|         self.assertEqual(obj.value, synced_data[0].value)
 | |
|         self.assertEqual(obj.id, synced_data[0].id)
 | |
| 
 | |
| 
 | |
| class KillTest(TransactionTestCase):
 | |
|     TEST_TIME = 60
 | |
|     maxDiff = None
 | |
| 
 | |
|     def setUp(self):
 | |
|         ClickHouseTestModel.get_storage().flush()
 | |
|         connections['default'].drop_database()
 | |
|         connections['default'].create_database()
 | |
|         migrate_app('tests', 'default')
 | |
| 
 | |
|         # Disable sync for not interesting models
 | |
|         ClickHouseMultiTestModel.sync_enabled = False
 | |
|         ClickHouseTestModel.sync_enabled = False
 | |
| 
 | |
|     def tearDown(self):
 | |
|         # Disable sync for not interesting models
 | |
|         ClickHouseMultiTestModel.sync_enabled = True
 | |
|         ClickHouseTestModel.sync_enabled = True
 | |
| 
 | |
|     def _check_data(self):
 | |
|         logger.debug('django-clickhouse: syncing left test data')
 | |
| 
 | |
|         # Sync all data that is not synced
 | |
|         # Data is expected to be in test_db, not default. So we need to call subprocess
 | |
|         # in order everything works correctly
 | |
|         import_key = ClickHouseCollapseTestModel.get_import_key()
 | |
|         storage = ClickHouseCollapseTestModel.get_storage()
 | |
|         sync_left = storage.operations_count(import_key)
 | |
|         while sync_left:
 | |
|             logger.debug('django-clickhouse: final sync (%d left)' % sync_left)
 | |
|             self.sync_iteration(False)
 | |
|             sync_left = storage.operations_count(import_key)
 | |
| 
 | |
|         logger.debug('django_clickhouse: sync finished')
 | |
| 
 | |
|         ch_data = list(connections['default'].select_tuples('SELECT * FROM $table FINAL ORDER BY id',
 | |
|                                                             model_class=ClickHouseCollapseTestModel))
 | |
|         logger.debug('django_clickhouse: got clickhouse data')
 | |
| 
 | |
|         pg_data = list(TestModel.objects.all().order_by('id'))
 | |
|         logger.debug('django_clickhouse: got postgres data')
 | |
| 
 | |
|         if len(pg_data) != len(ch_data):
 | |
|             absent_ids = set(item.id for item in pg_data) - set(item.id for item in ch_data)
 | |
|             logger.debug('django_clickhouse: absent ranges: %s (min: %d, max: %d)'
 | |
|                          % (','.join(('(%d, %d)' % r) for r in int_ranges(absent_ids)),
 | |
|                             min(item.id for item in pg_data), max(item.id for item in pg_data)))
 | |
| 
 | |
|         self.assertEqual(len(pg_data), len(ch_data))
 | |
|         for pg_item, ch_item in zip(pg_data, ch_data):
 | |
|             self.assertEqual(ch_item.id, pg_item.id)
 | |
|             self.assertEqual(ch_item.value, pg_item.value)
 | |
| 
 | |
|     @classmethod
 | |
|     def sync_iteration(cls, kill=True):
 | |
|         test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
 | |
|         if kill:
 | |
|             args = ['--test-time', str(cls.TEST_TIME)]
 | |
|         else:
 | |
|             args = ['--once', 'true']
 | |
|         p_sync = Popen(['python3', test_script, 'sync'] + args)
 | |
| 
 | |
|         if kill:
 | |
|             sleep(randint(0, 5))
 | |
|             logger.debug('django-clickhouse: test killing: %d' % p_sync.pid)
 | |
|             p_sync.kill()
 | |
|         else:
 | |
|             p_sync.wait()
 | |
| 
 | |
|     def test_kills(self):
 | |
|         test_script = os.path.join(os.path.dirname(__file__), 'kill_test_sub_process.py')
 | |
|         p_create = Popen(['python3', test_script, 'create', '--test-time', str(self.TEST_TIME)])
 | |
| 
 | |
|         # Updates must be slower than inserts, or they will do nothing
 | |
|         p_update = Popen(['python3', test_script, 'update', '--test-time', str(self.TEST_TIME), '--batch-size', '500'])
 | |
| 
 | |
|         start = now()
 | |
|         while (now() - start).total_seconds() < self.TEST_TIME:
 | |
|             self.sync_iteration()
 | |
| 
 | |
|         p_create.wait()
 | |
|         p_update.wait()
 | |
| 
 | |
|         self._check_data()
 | |
| 
 | |
| 
 | |
| @mock.patch.object(ClickHouseTestModel, 'sync_batch_from_storage')
 | |
| class SyncClickHouseModelTest(TestCase):
 | |
|     def test_model_as_class(self, sync_mock):
 | |
|         sync_clickhouse_model(ClickHouseTestModel)
 | |
|         sync_mock.assert_called()
 | |
| 
 | |
|     def test_model_as_string(self, sync_mock):
 | |
|         sync_clickhouse_model('tests.clickhouse_models.ClickHouseTestModel')
 | |
|         sync_mock.assert_called()
 | |
| 
 | |
|     @mock.patch.object(RedisStorage, 'set_last_sync_time')
 | |
|     def test_last_sync_time_called(self, storage_mock, _):
 | |
|         sync_clickhouse_model(ClickHouseTestModel)
 | |
|         storage_mock.assert_called()
 | |
|         self.assertEqual(2, len(storage_mock.call_args))
 | |
|         self.assertEqual(storage_mock.call_args[0][0], 'ClickHouseTestModel')
 | |
|         self.assertIsInstance(storage_mock.call_args[0][1], datetime.datetime)
 | |
| 
 | |
| 
 | |
| @mock.patch.object(sync_clickhouse_model, 'delay')
 | |
| class ClickHouseAutoSyncTest(TestCase):
 | |
|     @mock.patch('django_clickhouse.tasks.get_subclasses', return_value=[ClickHouseTestModel])
 | |
|     @mock.patch.object(ClickHouseTestModel, 'need_sync', return_value=True)
 | |
|     def test_needs_sync_enabled(self, need_sync_mock, get_subclasses_mock, sync_delay_mock):
 | |
|         clickhouse_auto_sync()
 | |
|         sync_delay_mock.assert_called_with('tests.clickhouse_models.ClickHouseTestModel')
 | |
| 
 | |
|     @mock.patch('django_clickhouse.tasks.get_subclasses', return_value=[ClickHouseTestModel])
 | |
|     @mock.patch.object(ClickHouseTestModel, 'need_sync', return_value=False)
 | |
|     def test_does_not_need_sync(self, need_sync_mock, get_subclasses_mock, sync_delay_mock):
 | |
|         clickhouse_auto_sync()
 | |
|         sync_delay_mock.assert_not_called()
 | |
| 
 | |
|     @mock.patch('django_clickhouse.tasks.get_subclasses',
 | |
|                 return_value=[ClickHouseTestModel, ClickHouseCollapseTestModel])
 | |
|     @mock.patch.object(ClickHouseTestModel, 'need_sync', return_value=True)
 | |
|     @mock.patch.object(ClickHouseCollapseTestModel, 'need_sync', return_value=True)
 | |
|     def test_multiple_models(self, need_sync_1_mock, need_sync_2_mock, get_subclasses_mock, sync_delay_mock):
 | |
|         clickhouse_auto_sync()
 | |
|         self.assertEqual(2, sync_delay_mock.call_count)
 | |
| 
 | |
| 
 | |
| # Used to profile sync execution time. Disabled by default
 | |
| @skip
 | |
| class ProfileTest(TransactionTestCase):
 | |
|     BATCH_SIZE = 10000
 | |
| 
 | |
|     def setUp(self):
 | |
|         ClickHouseTestModel.get_storage().flush()
 | |
|         connections['default'].drop_database()
 | |
|         connections['default'].create_database()
 | |
|         migrate_app('tests', 'default')
 | |
| 
 | |
|         # Disable sync for not interesting models
 | |
|         ClickHouseMultiTestModel.sync_enabled = False
 | |
|         ClickHouseTestModel.sync_enabled = False
 | |
| 
 | |
|         TestModel.objects.bulk_create([
 | |
|             TestModel(created=datetime.datetime.now(), created_date='2018-01-01', value=i)
 | |
|             for i in range(self.BATCH_SIZE)
 | |
|         ])
 | |
| 
 | |
|     def tearDown(self):
 | |
|         # Disable sync for not interesting models
 | |
|         ClickHouseMultiTestModel.sync_enabled = True
 | |
|         ClickHouseTestModel.sync_enabled = True
 | |
| 
 | |
|     def test_sync(self):
 | |
|         ClickHouseCollapseTestModel.sync_batch_size = self.BATCH_SIZE
 | |
|         ClickHouseCollapseTestModel.sync_batch_from_storage()
 |