Added migration system.

This commit is contained in:
M1ha 2018-11-16 13:14:40 +05:00
parent cdc8a2cd26
commit 4480c1ae1d
14 changed files with 246 additions and 33 deletions

View File

@ -7,15 +7,15 @@ from itertools import chain
from typing import List, Tuple
from django.db.models import Model as DjangoModel
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase
from six import with_metaclass
from .db_clients import connections
from .serializers import Django2ClickHouseModelSerializer
from .configuration import config
from .database import connections, DEFAULT_DB_ALIAS
from .models import ClickHouseSyncModel
from .serializers import Django2ClickHouseModelSerializer
from .utils import lazy_class_import
from . import config
class ClickHouseModelMeta(InfiModelBase):
@ -38,11 +38,27 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
django_model = None
django_model_serializer = Django2ClickHouseModelSerializer
read_db_aliases = (DEFAULT_DB_ALIAS,)
write_db_aliases = (DEFAULT_DB_ALIAS,)
sync_batch_size = None
sync_storage = None
sync_delay = None
sync_database_alias = None
def get_database(self, for_write=False):
# type: (bool) -> Database
"""
Gets database for read or write purposes
:param for_write: Boolean flag if database is neede for read or for write
:return: infi.clickhouse_orm.Database instance
"""
db_router = lazy_class_import(config.DATABASE_ROUTER)()
if for_write:
return db_router.db_for_write(self.__class__, instance=self)
else:
return db_router.db_for_read(self.__class__, instance=self)
@classmethod
def get_django_model_serializer(cls):
serializer_cls = lazy_class_import(cls.django_model_serializer)

View File

@ -16,7 +16,11 @@ DEFAULTS = {
'SYNC_STORAGE': 'django_clickhouse.storages.RedisStorage',
'SYNC_DELAY': 5,
'REDIS_CONFIG': None,
'MODELS_MODULE': 'clickhouse_models',
'DATABASE_ROUTER': 'django_clickhouse.routers.DefaultRouter',
'STATSD_PREFIX': 'clickhouse',
'MIGRATIONS_PACKAGE': 'clickhouse_migrations',
'MIGRATE_WITH_DEFAULT_DB': True,
}

View File

@ -1,12 +1,25 @@
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.database import Database as InfiDatabase
from .exceptions import DBAliasError
from .configuration import config
from .exceptions import DBAliasError
DEFAULT_DB_ALIAS = 'default'
class Database(InfiDatabase):
def __init__(self, **kwargs):
infi_kwargs = {
k: kwargs[k]
for k in ('db_name', 'db_url', 'username', 'password', 'readonly', 'autocreate')
if k in kwargs
}
super(Database, self).__init__(**infi_kwargs)
def migrate(self, migrations_package_name, up_to=9999):
raise NotImplementedError('This method is not supported by django-clickhouse.'
' Use django_clickhouse.migrations module instead.')
class ConnectionProxy:
_connections = {}

View File

@ -1,11 +1,9 @@
"""
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
"""
from collections import defaultdict
from itertools import chain
from typing import List, Tuple, TypeVar, Type, Optional
from django.db.models import Model as DjangoModel
from typing import List, TypeVar, Type
from django.db.models import Model as DjangoModel
from infi.clickhouse_orm import engines as infi_engines
from infi.clickhouse_orm.database import Database
@ -55,7 +53,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
query = "SELECT * FROM $table FINAL WHERE `%s` >= '%s' AND `%s` <= '%s', id IN (%s)" \
% (self.date_col, min_date.isoformat(), self.date_col, max_date.isoformat(), ', '.join(obj_ids))
qs = model_cls.select(query, model_class=model_cls)
qs = model_cls.get_database().select(query, model_class=model_cls)
return list(qs)
def get_insert_batch(self, model_cls, database, objects):

View File

@ -1,17 +0,0 @@
@receiver(post_migrate)
def clickhouse_migrate(sender, **kwargs):
if getattr(settings, 'UNIT_TEST', False):
# Не надо мигрировать ClickHouse при каждом UnitTest
# Это сделает один раз система тестирования
return
if kwargs.get('using', 'default') != 'default':
# Не надо выполнять синхронизацию для каждого шарда. Только один раз.
return
app_name = kwargs['app_config'].name
package_name = "%s.%s" % (app_name, 'ch_migrations')
if importlib.util.find_spec(package_name):
settings.CLICKHOUSE_DB.migrate(package_name)
print('\033[94mMigrated ClickHouse models for app "%s"\033[0m' % app_name)

View File

@ -0,0 +1,82 @@
"""
Migrating database
"""
import datetime
from django.db.models.signals import post_migrate
from django.dispatch import receiver
from django.db import DEFAULT_DB_ALIAS as DJANGO_DEFAULT_DB_ALIAS
from infi.clickhouse_orm.migrations import MigrationHistory
from infi.clickhouse_orm.utils import import_submodules
from django_clickhouse.utils import lazy_class_import
from .configuration import config
from .database import connections
class Migration:
"""
Base class for migrations
"""
operations = []
def apply(self, db_alias): # type: (str) -> None
"""
Applies migration to given database
:param db_alias: Database alias to apply migration to
:return: None
"""
db_router = lazy_class_import(config.DATABASE_ROUTER)()
for op in self.operations:
model_class = getattr(op, 'model_class', None)
hints = getattr(op, 'hints', {})
if db_router.allow_migrate(db_alias, self.__module__, model=model_class, **hints):
op.apply(connections[db_alias])
def migrate_app(app_label, db_alias, up_to=9999):
# type: (str, str, int) -> None
"""
Migrates given django app
:param app_label: App label to migrate
:param db_alias: Database alias to migrate
:param up_to: Migration number to migrate to
:return: None
"""
db = connections[db_alias]
migrations_package = "%s.%s" % (app_label, config.MIGRATIONS_PACKAGE)
applied_migrations = db._get_applied_migrations(migrations_package)
modules = import_submodules(migrations_package)
unapplied_migrations = set(modules.keys()) - applied_migrations
for name in sorted(unapplied_migrations):
migration = modules[name].Migration()
migration.apply(db_alias)
db.insert([
MigrationHistory(package_name=migrations_package, module_name=name, applied=datetime.date.today())
])
if int(name[:4]) >= up_to:
break
@receiver(post_migrate)
def clickhouse_migrate(sender, **kwargs):
if not config.MIGRATE_WITH_DEFAULT_DB:
# If auto migration is enabled
return
if kwargs.get('using', DJANGO_DEFAULT_DB_ALIAS) != DJANGO_DEFAULT_DB_ALIAS:
# Не надо выполнять синхронизацию для каждого шарда. Только один раз.
return
app_name = kwargs['app_config'].name
for db_alias in config.DATABASES:
migrate_app(app_name, db_alias)

View File

@ -0,0 +1,57 @@
"""
This file defines router to find appropriate database
"""
import random
from typing import Optional
import six
from .clickhouse_models import ClickHouseModel
from .configuration import config
from .database import connections
from .utils import lazy_class_import
class DefaultRouter:
def db_for_read(self, model, **hints):
# type: (ClickHouseModel, **dict) -> str
"""
Gets database to read from for model
:param model: Model to decide for
:param hints: Some hints to make correct choice
:return: Database alias
"""
return random.choice(model.read_db_aliases)
def db_for_write(self, model, **hints):
# type: (ClickHouseModel, **dict) -> str
"""
Gets database to write to for model
:param model: Model to decide for
:param hints: Some hints to make correct choice
:return: Database alias
"""
return random.choice(model.write_db_aliases)
def allow_migrate(self, db_alias, app_label, model=None, **hints):
# type: (str, str, Optional[ClickHouseModel], **dict) -> bool
"""
Checks if migration can be applied to given database
:param db_alias: Database alias to check
:param app_label: App from which migration is got
:param model: Model migration is applied to
:param hints: Hints to make correct decision
:return: boolean
"""
if connections[db_alias].readonly:
return False
if hints.get("force_migrate_on_databases", None):
return db_alias in hints["force_migrate_on_databases"]
if hints.get('model'):
model = '%s.%s.%s' % (app_label, config.MODELS_MODULE, hints['model']) \
if isinstance(hints['model'], six.string_types) else hints['model']
model = lazy_class_import(model)
return db_alias in model.write_db_aliases

View File

@ -7,7 +7,7 @@ from importlib import import_module
from infi.clickhouse_orm.database import Database
from .db_clients import connections
from .database import connections
def get_tz_offset(db_alias=None): # type: (Optional[str]) -> int
@ -62,7 +62,6 @@ def lazy_class_import(obj): # type: (Union[str, Any]) -> Any
module = import_module(module_name)
try:
print(module, obj_name)
return getattr(module, obj_name)
except AttributeError:
raise ImportError('Invalid import path `%s`' % obj)

View File

@ -0,0 +1,10 @@
from infi.clickhouse_orm.migrations import CreateTable
from django_clickhouse import migrations
from tests.clickhouse_models import TestClickHouseModel
class Migration(migrations.Migration):
operations = [
CreateTable(TestClickHouseModel)
]

View File

View File

@ -13,4 +13,4 @@ class TestClickHouseModel(ClickHouseModel):
created_date = fields.DateField()
value = fields.Int32Field()
engine = MergeTree('created_Date')
engine = MergeTree('created_date', ('id',))

View File

@ -25,6 +25,10 @@ LOGGING = {
'django-clickhouse': {
'handlers': ['console'],
'level': 'DEBUG'
},
'infi.clickhouse-orm': {
'handlers': ['console'],
'level': 'INFO'
}
}
}

View File

@ -12,4 +12,4 @@ class ConfigTest(TestCase):
def test_not_lib_prop(self):
with self.assertRaises(AttributeError):
config.SECRET_KEY
config.SECRET_KEY

47
tests/test_migrations.py Normal file
View File

@ -0,0 +1,47 @@
from django.test import TestCase, override_settings
from infi.clickhouse_orm.migrations import MigrationHistory
from django_clickhouse.database import connections
from django_clickhouse.migrations import migrate_app
from django_clickhouse.routers import DefaultRouter
from tests.clickhouse_models import TestClickHouseModel
class NoMigrateRouter(DefaultRouter):
def allow_migrate(self, db_alias, app_label, model=None, **hints):
return False
def table_exists(db, model_class):
res = db.select(
"SELECT * FROM system.tables WHERE `database`='%s' AND `name`='%s'"
% (db.db_name, model_class.table_name())
)
res = list(res)
return bool(res)
@override_settings(CLICKHOUSE_MIGRATE_WITH_DEFAULT_DB=False)
class MigrateAppTest(TestCase):
def setUp(self):
self.db = connections['default']
# Clean all database data
self.db.drop_database()
self.db.db_exists = False
self.db.create_database()
def test_migrate_app(self):
migrate_app('tests', 'default')
self.assertTrue(table_exists(self.db, TestClickHouseModel))
self.assertEqual(1, self.db.count(MigrationHistory))
# Migrations are already applied no actions should be done
migrate_app('tests', 'default')
self.assertEqual(1, self.db.count(MigrationHistory))
@override_settings(CLICKHOUSE_DATABASE_ROUTER=NoMigrateRouter)
def test_router_not_allowed(self):
migrate_app('tests', 'default')
self.assertFalse(table_exists(self.db, TestClickHouseModel))