From 4480c1ae1dccfc0cc05d9a8660cff2c7190a4fea Mon Sep 17 00:00:00 2001 From: M1ha Date: Fri, 16 Nov 2018 13:14:40 +0500 Subject: [PATCH] Added migration system. --- src/django_clickhouse/clickhouse_models.py | 24 +++++- src/django_clickhouse/configuration.py | 4 + .../{db_clients.py => database.py} | 19 ++++- src/django_clickhouse/engines.py | 8 +- src/django_clickhouse/migration.py | 17 ---- src/django_clickhouse/migrations.py | 82 +++++++++++++++++++ src/django_clickhouse/routers.py | 57 +++++++++++++ src/django_clickhouse/utils.py | 3 +- tests/clickhouse_migrations/0001_initial.py | 10 +++ tests/clickhouse_migrations/__init__.py | 0 tests/clickhouse_models.py | 2 +- tests/settings.py | 4 + tests/test_config.py | 2 +- tests/test_migrations.py | 47 +++++++++++ 14 files changed, 246 insertions(+), 33 deletions(-) rename src/django_clickhouse/{db_clients.py => database.py} (50%) delete mode 100644 src/django_clickhouse/migration.py create mode 100644 src/django_clickhouse/migrations.py create mode 100644 src/django_clickhouse/routers.py create mode 100644 tests/clickhouse_migrations/0001_initial.py create mode 100644 tests/clickhouse_migrations/__init__.py create mode 100644 tests/test_migrations.py diff --git a/src/django_clickhouse/clickhouse_models.py b/src/django_clickhouse/clickhouse_models.py index 0a7d86d..df1774f 100644 --- a/src/django_clickhouse/clickhouse_models.py +++ b/src/django_clickhouse/clickhouse_models.py @@ -7,15 +7,15 @@ from itertools import chain from typing import List, Tuple from django.db.models import Model as DjangoModel +from infi.clickhouse_orm.database import Database from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase - from six import with_metaclass -from .db_clients import connections -from .serializers import Django2ClickHouseModelSerializer +from .configuration import config +from .database import connections, DEFAULT_DB_ALIAS from .models import ClickHouseSyncModel +from .serializers import Django2ClickHouseModelSerializer from .utils import lazy_class_import -from . import config class ClickHouseModelMeta(InfiModelBase): @@ -38,11 +38,27 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)): django_model = None django_model_serializer = Django2ClickHouseModelSerializer + read_db_aliases = (DEFAULT_DB_ALIAS,) + write_db_aliases = (DEFAULT_DB_ALIAS,) + sync_batch_size = None sync_storage = None sync_delay = None sync_database_alias = None + def get_database(self, for_write=False): + # type: (bool) -> Database + """ + Gets database for read or write purposes + :param for_write: Boolean flag if database is neede for read or for write + :return: infi.clickhouse_orm.Database instance + """ + db_router = lazy_class_import(config.DATABASE_ROUTER)() + if for_write: + return db_router.db_for_write(self.__class__, instance=self) + else: + return db_router.db_for_read(self.__class__, instance=self) + @classmethod def get_django_model_serializer(cls): serializer_cls = lazy_class_import(cls.django_model_serializer) diff --git a/src/django_clickhouse/configuration.py b/src/django_clickhouse/configuration.py index 9c05ac6..ff00fa6 100644 --- a/src/django_clickhouse/configuration.py +++ b/src/django_clickhouse/configuration.py @@ -16,7 +16,11 @@ DEFAULTS = { 'SYNC_STORAGE': 'django_clickhouse.storages.RedisStorage', 'SYNC_DELAY': 5, 'REDIS_CONFIG': None, + 'MODELS_MODULE': 'clickhouse_models', + 'DATABASE_ROUTER': 'django_clickhouse.routers.DefaultRouter', 'STATSD_PREFIX': 'clickhouse', + 'MIGRATIONS_PACKAGE': 'clickhouse_migrations', + 'MIGRATE_WITH_DEFAULT_DB': True, } diff --git a/src/django_clickhouse/db_clients.py b/src/django_clickhouse/database.py similarity index 50% rename from src/django_clickhouse/db_clients.py rename to src/django_clickhouse/database.py index 523e2d5..5d1fc05 100644 --- a/src/django_clickhouse/db_clients.py +++ b/src/django_clickhouse/database.py @@ -1,12 +1,25 @@ -from infi.clickhouse_orm.database import Database +from infi.clickhouse_orm.database import Database as InfiDatabase -from .exceptions import DBAliasError from .configuration import config - +from .exceptions import DBAliasError DEFAULT_DB_ALIAS = 'default' +class Database(InfiDatabase): + def __init__(self, **kwargs): + infi_kwargs = { + k: kwargs[k] + for k in ('db_name', 'db_url', 'username', 'password', 'readonly', 'autocreate') + if k in kwargs + } + super(Database, self).__init__(**infi_kwargs) + + def migrate(self, migrations_package_name, up_to=9999): + raise NotImplementedError('This method is not supported by django-clickhouse.' + ' Use django_clickhouse.migrations module instead.') + + class ConnectionProxy: _connections = {} diff --git a/src/django_clickhouse/engines.py b/src/django_clickhouse/engines.py index 27ec230..db0a931 100644 --- a/src/django_clickhouse/engines.py +++ b/src/django_clickhouse/engines.py @@ -1,11 +1,9 @@ """ This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse """ -from collections import defaultdict -from itertools import chain -from typing import List, Tuple, TypeVar, Type, Optional -from django.db.models import Model as DjangoModel +from typing import List, TypeVar, Type +from django.db.models import Model as DjangoModel from infi.clickhouse_orm import engines as infi_engines from infi.clickhouse_orm.database import Database @@ -55,7 +53,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre query = "SELECT * FROM $table FINAL WHERE `%s` >= '%s' AND `%s` <= '%s', id IN (%s)" \ % (self.date_col, min_date.isoformat(), self.date_col, max_date.isoformat(), ', '.join(obj_ids)) - qs = model_cls.select(query, model_class=model_cls) + qs = model_cls.get_database().select(query, model_class=model_cls) return list(qs) def get_insert_batch(self, model_cls, database, objects): diff --git a/src/django_clickhouse/migration.py b/src/django_clickhouse/migration.py deleted file mode 100644 index 20d9433..0000000 --- a/src/django_clickhouse/migration.py +++ /dev/null @@ -1,17 +0,0 @@ -@receiver(post_migrate) -def clickhouse_migrate(sender, **kwargs): - if getattr(settings, 'UNIT_TEST', False): - # Не надо мигрировать ClickHouse при каждом UnitTest - # Это сделает один раз система тестирования - return - - if kwargs.get('using', 'default') != 'default': - # Не надо выполнять синхронизацию для каждого шарда. Только один раз. - return - - app_name = kwargs['app_config'].name - - package_name = "%s.%s" % (app_name, 'ch_migrations') - if importlib.util.find_spec(package_name): - settings.CLICKHOUSE_DB.migrate(package_name) - print('\033[94mMigrated ClickHouse models for app "%s"\033[0m' % app_name) \ No newline at end of file diff --git a/src/django_clickhouse/migrations.py b/src/django_clickhouse/migrations.py new file mode 100644 index 0000000..e9a274f --- /dev/null +++ b/src/django_clickhouse/migrations.py @@ -0,0 +1,82 @@ +""" +Migrating database +""" +import datetime + +from django.db.models.signals import post_migrate +from django.dispatch import receiver +from django.db import DEFAULT_DB_ALIAS as DJANGO_DEFAULT_DB_ALIAS + +from infi.clickhouse_orm.migrations import MigrationHistory +from infi.clickhouse_orm.utils import import_submodules + +from django_clickhouse.utils import lazy_class_import +from .configuration import config +from .database import connections + + +class Migration: + """ + Base class for migrations + """ + operations = [] + + def apply(self, db_alias): # type: (str) -> None + """ + Applies migration to given database + :param db_alias: Database alias to apply migration to + :return: None + """ + db_router = lazy_class_import(config.DATABASE_ROUTER)() + + for op in self.operations: + model_class = getattr(op, 'model_class', None) + hints = getattr(op, 'hints', {}) + + if db_router.allow_migrate(db_alias, self.__module__, model=model_class, **hints): + op.apply(connections[db_alias]) + + +def migrate_app(app_label, db_alias, up_to=9999): + # type: (str, str, int) -> None + """ + Migrates given django app + :param app_label: App label to migrate + :param db_alias: Database alias to migrate + :param up_to: Migration number to migrate to + :return: None + """ + db = connections[db_alias] + migrations_package = "%s.%s" % (app_label, config.MIGRATIONS_PACKAGE) + + applied_migrations = db._get_applied_migrations(migrations_package) + modules = import_submodules(migrations_package) + + unapplied_migrations = set(modules.keys()) - applied_migrations + + for name in sorted(unapplied_migrations): + migration = modules[name].Migration() + migration.apply(db_alias) + + db.insert([ + MigrationHistory(package_name=migrations_package, module_name=name, applied=datetime.date.today()) + ]) + + if int(name[:4]) >= up_to: + break + + +@receiver(post_migrate) +def clickhouse_migrate(sender, **kwargs): + if not config.MIGRATE_WITH_DEFAULT_DB: + # If auto migration is enabled + return + + if kwargs.get('using', DJANGO_DEFAULT_DB_ALIAS) != DJANGO_DEFAULT_DB_ALIAS: + # Не надо выполнять синхронизацию для каждого шарда. Только один раз. + return + + app_name = kwargs['app_config'].name + + for db_alias in config.DATABASES: + migrate_app(app_name, db_alias) diff --git a/src/django_clickhouse/routers.py b/src/django_clickhouse/routers.py new file mode 100644 index 0000000..85e7496 --- /dev/null +++ b/src/django_clickhouse/routers.py @@ -0,0 +1,57 @@ +""" +This file defines router to find appropriate database +""" +import random +from typing import Optional + +import six + +from .clickhouse_models import ClickHouseModel +from .configuration import config +from .database import connections +from .utils import lazy_class_import + + +class DefaultRouter: + def db_for_read(self, model, **hints): + # type: (ClickHouseModel, **dict) -> str + """ + Gets database to read from for model + :param model: Model to decide for + :param hints: Some hints to make correct choice + :return: Database alias + """ + return random.choice(model.read_db_aliases) + + def db_for_write(self, model, **hints): + # type: (ClickHouseModel, **dict) -> str + """ + Gets database to write to for model + :param model: Model to decide for + :param hints: Some hints to make correct choice + :return: Database alias + """ + return random.choice(model.write_db_aliases) + + def allow_migrate(self, db_alias, app_label, model=None, **hints): + # type: (str, str, Optional[ClickHouseModel], **dict) -> bool + """ + Checks if migration can be applied to given database + :param db_alias: Database alias to check + :param app_label: App from which migration is got + :param model: Model migration is applied to + :param hints: Hints to make correct decision + :return: boolean + """ + if connections[db_alias].readonly: + return False + + if hints.get("force_migrate_on_databases", None): + return db_alias in hints["force_migrate_on_databases"] + + if hints.get('model'): + model = '%s.%s.%s' % (app_label, config.MODELS_MODULE, hints['model']) \ + if isinstance(hints['model'], six.string_types) else hints['model'] + + model = lazy_class_import(model) + return db_alias in model.write_db_aliases diff --git a/src/django_clickhouse/utils.py b/src/django_clickhouse/utils.py index 34468de..2aeca4d 100644 --- a/src/django_clickhouse/utils.py +++ b/src/django_clickhouse/utils.py @@ -7,7 +7,7 @@ from importlib import import_module from infi.clickhouse_orm.database import Database -from .db_clients import connections +from .database import connections def get_tz_offset(db_alias=None): # type: (Optional[str]) -> int @@ -62,7 +62,6 @@ def lazy_class_import(obj): # type: (Union[str, Any]) -> Any module = import_module(module_name) try: - print(module, obj_name) return getattr(module, obj_name) except AttributeError: raise ImportError('Invalid import path `%s`' % obj) diff --git a/tests/clickhouse_migrations/0001_initial.py b/tests/clickhouse_migrations/0001_initial.py new file mode 100644 index 0000000..007111c --- /dev/null +++ b/tests/clickhouse_migrations/0001_initial.py @@ -0,0 +1,10 @@ + +from infi.clickhouse_orm.migrations import CreateTable +from django_clickhouse import migrations +from tests.clickhouse_models import TestClickHouseModel + + +class Migration(migrations.Migration): + operations = [ + CreateTable(TestClickHouseModel) + ] diff --git a/tests/clickhouse_migrations/__init__.py b/tests/clickhouse_migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/clickhouse_models.py b/tests/clickhouse_models.py index fb92d08..a2af22d 100644 --- a/tests/clickhouse_models.py +++ b/tests/clickhouse_models.py @@ -13,4 +13,4 @@ class TestClickHouseModel(ClickHouseModel): created_date = fields.DateField() value = fields.Int32Field() - engine = MergeTree('created_Date') + engine = MergeTree('created_date', ('id',)) diff --git a/tests/settings.py b/tests/settings.py index 562278f..0ede8a0 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -25,6 +25,10 @@ LOGGING = { 'django-clickhouse': { 'handlers': ['console'], 'level': 'DEBUG' + }, + 'infi.clickhouse-orm': { + 'handlers': ['console'], + 'level': 'INFO' } } } diff --git a/tests/test_config.py b/tests/test_config.py index 9750beb..e3f8163 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -12,4 +12,4 @@ class ConfigTest(TestCase): def test_not_lib_prop(self): with self.assertRaises(AttributeError): - config.SECRET_KEY \ No newline at end of file + config.SECRET_KEY diff --git a/tests/test_migrations.py b/tests/test_migrations.py new file mode 100644 index 0000000..5e81fca --- /dev/null +++ b/tests/test_migrations.py @@ -0,0 +1,47 @@ +from django.test import TestCase, override_settings +from infi.clickhouse_orm.migrations import MigrationHistory + +from django_clickhouse.database import connections +from django_clickhouse.migrations import migrate_app +from django_clickhouse.routers import DefaultRouter +from tests.clickhouse_models import TestClickHouseModel + + +class NoMigrateRouter(DefaultRouter): + def allow_migrate(self, db_alias, app_label, model=None, **hints): + return False + + +def table_exists(db, model_class): + res = db.select( + "SELECT * FROM system.tables WHERE `database`='%s' AND `name`='%s'" + % (db.db_name, model_class.table_name()) + ) + res = list(res) + return bool(res) + + +@override_settings(CLICKHOUSE_MIGRATE_WITH_DEFAULT_DB=False) +class MigrateAppTest(TestCase): + def setUp(self): + self.db = connections['default'] + + # Clean all database data + self.db.drop_database() + self.db.db_exists = False + self.db.create_database() + + def test_migrate_app(self): + migrate_app('tests', 'default') + self.assertTrue(table_exists(self.db, TestClickHouseModel)) + + self.assertEqual(1, self.db.count(MigrationHistory)) + + # Migrations are already applied no actions should be done + migrate_app('tests', 'default') + self.assertEqual(1, self.db.count(MigrationHistory)) + + @override_settings(CLICKHOUSE_DATABASE_ROUTER=NoMigrateRouter) + def test_router_not_allowed(self): + migrate_app('tests', 'default') + self.assertFalse(table_exists(self.db, TestClickHouseModel))