django-clickhouse/src/django_clickhouse/database.py

48 lines
1.4 KiB
Python
Raw Normal View History

2018-11-16 11:14:40 +03:00
from infi.clickhouse_orm.database import Database as InfiDatabase
from .configuration import config
2018-11-16 11:14:40 +03:00
from .exceptions import DBAliasError
2018-11-15 15:37:58 +03:00
DEFAULT_DB_ALIAS = 'default'
2018-11-16 11:14:40 +03:00
class Database(InfiDatabase):
def __init__(self, **kwargs):
infi_kwargs = {
k: kwargs[k]
for k in ('db_name', 'db_url', 'username', 'password', 'readonly', 'autocreate')
if k in kwargs
}
super(Database, self).__init__(**infi_kwargs)
def drop_database(self):
# BUG fix https://github.com/Infinidat/infi.clickhouse_orm/issues/89
super(Database, self).drop_database()
self.db_exists = False
2018-11-16 11:14:40 +03:00
def migrate(self, migrations_package_name, up_to=9999):
raise NotImplementedError('This method is not supported by django-clickhouse.'
' Use django_clickhouse.migrations module instead.')
class ConnectionProxy:
_connections = {}
def get_connection(self, alias):
2018-11-15 15:37:58 +03:00
if alias is None:
alias = DEFAULT_DB_ALIAS
if alias not in self._connections:
if alias not in config.DATABASES:
raise DBAliasError(alias)
self._connections[alias] = Database(**config.DATABASES[alias])
return self._connections[alias]
2018-11-15 15:37:58 +03:00
def __getitem__(self, item):
return self.get_connection(item)
connections = ConnectionProxy()