Added QuerySet that determines db with router help.

Added using(), create(), bulk_create(), all() methods to QuerySet
This commit is contained in:
M1ha 2018-11-28 15:30:19 +05:00
parent ce1a206b04
commit abe0ae45c5
12 changed files with 179 additions and 23 deletions

View File

@ -1 +0,0 @@
from .configuration import config

View File

@ -14,8 +14,9 @@ from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiMode
from six import with_metaclass from six import with_metaclass
from statsd.defaults.django import statsd from statsd.defaults.django import statsd
from .query import QuerySet
from .configuration import config from .configuration import config
from .database import connections, DEFAULT_DB_ALIAS from .database import connections
from .models import ClickHouseSyncModel from .models import ClickHouseSyncModel
from .serializers import Django2ClickHouseModelSerializer from .serializers import Django2ClickHouseModelSerializer
from .utils import lazy_class_import from .utils import lazy_class_import
@ -31,6 +32,8 @@ class ClickHouseModelMeta(InfiModelBase):
if res.django_model and res.get_sync_delay(): if res.django_model and res.get_sync_delay():
res.django_model.register_clickhouse_sync_model(res) res.django_model.register_clickhouse_sync_model(res)
res.objects = QuerySet(res)
return res return res
@ -41,8 +44,9 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
django_model = None django_model = None
django_model_serializer = Django2ClickHouseModelSerializer django_model_serializer = Django2ClickHouseModelSerializer
read_db_aliases = (DEFAULT_DB_ALIAS,) read_db_aliases = (config.DEFAULT_DB_ALIAS,)
write_db_aliases = (DEFAULT_DB_ALIAS,) write_db_aliases = (config.DEFAULT_DB_ALIAS,)
migrate_db_aliases = write_db_aliases
sync_enabled = False sync_enabled = False
sync_batch_size = None sync_batch_size = None
@ -52,12 +56,16 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
sync_lock_timeout = None sync_lock_timeout = None
@classmethod @classmethod
def get_database(cls, for_write=False): def objects_in(cls, database): # type: (Database) -> QuerySet
# type: (bool) -> Database return QuerySet(cls, database)
@classmethod
def get_database_alias(cls, for_write=False):
# type: (bool) -> str
""" """
Gets database for read or write purposes Gets database alias for read or write purposes
:param for_write: Boolean flag if database is neede for read or for write :param for_write: Boolean flag if database is neede for read or for write
:return: infi.clickhouse_orm.Database instance :return: Database alias to use
""" """
db_router = lazy_class_import(config.DATABASE_ROUTER)() db_router = lazy_class_import(config.DATABASE_ROUTER)()
if for_write: if for_write:
@ -65,6 +73,17 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
else: else:
return db_router.db_for_read(cls) return db_router.db_for_read(cls)
@classmethod
def get_database(cls, for_write=False):
# type: (bool) -> Database
"""
Gets database alias for read or write purposes
:param for_write: Boolean flag if database is neede for read or for write
:return: infi.clickhouse_orm.Database instance
"""
db_alias = cls.get_database_alias(for_write=for_write)
return connections[db_alias]
@classmethod @classmethod
def get_django_model_serializer(cls, writable=False): # type: (bool) -> Django2ClickHouseModelSerializer def get_django_model_serializer(cls, writable=False): # type: (bool) -> Django2ClickHouseModelSerializer
serializer_cls = lazy_class_import(cls.django_model_serializer) serializer_cls = lazy_class_import(cls.django_model_serializer)

View File

@ -21,7 +21,8 @@ DEFAULTS = {
'STATSD_PREFIX': 'clickhouse', 'STATSD_PREFIX': 'clickhouse',
'MIGRATIONS_PACKAGE': 'clickhouse_migrations', 'MIGRATIONS_PACKAGE': 'clickhouse_migrations',
'MIGRATE_WITH_DEFAULT_DB': True, 'MIGRATE_WITH_DEFAULT_DB': True,
'CELERY_QUEUE': 'celery' 'CELERY_QUEUE': 'celery',
'DEFAULT_DB_ALIAS': 'default'
} }

View File

@ -3,8 +3,6 @@ from infi.clickhouse_orm.database import Database as InfiDatabase
from .configuration import config from .configuration import config
from .exceptions import DBAliasError from .exceptions import DBAliasError
DEFAULT_DB_ALIAS = 'default'
class Database(InfiDatabase): class Database(InfiDatabase):
def __init__(self, **kwargs): def __init__(self, **kwargs):
@ -30,7 +28,7 @@ class ConnectionProxy:
def get_connection(self, alias): def get_connection(self, alias):
if alias is None: if alias is None:
alias = DEFAULT_DB_ALIAS alias = config.DEFAULT_DB_ALIAS
if alias not in self._connections: if alias not in self._connections:
if alias not in config.DATABASES: if alias not in config.DATABASES:

View File

@ -6,7 +6,7 @@ It saves all operations to storage in order to write them to ClickHouse later.
from typing import Optional, Any, List, Type from typing import Optional, Any, List, Type
import six import six
from django.db import transaction, DEFAULT_DB_ALIAS from django.db import transaction
from django.db.models.signals import post_save, post_delete from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver from django.dispatch import receiver
from django.db.models import QuerySet as DjangoQuerySet, Manager as DjangoManager, Model as DjangoModel from django.db.models import QuerySet as DjangoQuerySet, Manager as DjangoManager, Model as DjangoModel
@ -171,7 +171,7 @@ class ClickHouseSyncModel(DjangoModel):
:param using: Database alias registered instances are from :param using: Database alias registered instances are from
:return: None :return: None
""" """
model_pks = ['%s.%d' % (using or DEFAULT_DB_ALIAS, pk) for pk in model_pks] model_pks = ['%s.%d' % (using or config.DEFAULT_DB_ALIAS, pk) for pk in model_pks]
def _on_commit(): def _on_commit():
for model_cls in cls.get_clickhouse_sync_models(): for model_cls in cls.get_clickhouse_sync_models():

View File

@ -1,9 +1,79 @@
from typing import Optional, Iterable
from copy import copy
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.models import Model as InfiModel
from infi.clickhouse_orm.query import QuerySet as InfiQuerySet, AggregateQuerySet as InfiAggregateQuerySet from infi.clickhouse_orm.query import QuerySet as InfiQuerySet, AggregateQuerySet as InfiAggregateQuerySet
from .database import connections
class QuerySet(InfiQuerySet): class QuerySet(InfiQuerySet):
pass """
Basic QuerySet to use
"""
def __init__(self, model_cls, database=None): # type: (InfiModel, Optional[Database]) -> None
super(QuerySet, self).__init__(model_cls, database)
self._db_alias = None
@property
def _database(self): # type: () -> Database
# HACK for correct work of all infi.clickhouse-orm methods
# There are no write QuerySet methods now, so I use for_write=False by default
return self.get_database(for_write=False)
@_database.setter
def _database(self, database): # type: (Database) -> None
# HACK for correct work of all infi.clickhouse-orm methods
self._db = database
def get_database(self, for_write=False): # type: (bool) -> Database
"""
Gets database to execute query on. Looks for constructor or using() method.
If nothing was set tries to get database from model class using router.
:param for_write: Return QuerySet for read or for write.
:return: Database instance
"""
if not self._db:
if self._db_alias:
self._db = connections[self._db_alias]
else:
self._db = self._model_cls.get_database(for_write=for_write)
return self._db
def using(self, db_alias): # type: (str) -> QuerySet
"""
Sets database alias to use for this query
:param db_alias: Database alias name from CLICKHOUSE_DATABASES config option
:return: None
"""
qs = copy(self)
qs._db_alias = db_alias
qs._db = None # Previous database should be forgotten
return qs
def all(self): # type: () -> QuerySet
"""
Returns all items of queryset
:return: QuerySet
"""
return copy(self)
def create(self, **kwargs):
"""
Create single item in database
:return: Created instance
"""
instance = self._model_cls(**kwargs)
self.get_database(for_write=True).insert([instance])
return instance
def bulk_create(self, model_instances, batch_size=1000): # type: (Iterable[InfiModel], int)
self.get_database(for_write=True).insert(model_instances=model_instances, batch_size=batch_size)
return model_instances
class AggregateQuerySet(InfiAggregateQuerySet): class AggregateQuerySet(QuerySet, InfiAggregateQuerySet):
pass pass

View File

@ -50,4 +50,4 @@ class DefaultRouter:
if isinstance(hints['model'], six.string_types) else hints['model'] if isinstance(hints['model'], six.string_types) else hints['model']
model = lazy_class_import(model) model = lazy_class_import(model)
return db_alias in model.write_db_aliases return db_alias in model.migrate_db_aliases

View File

@ -14,6 +14,7 @@ class ClickHouseTestModel(ClickHouseModel):
value = fields.Int32Field() value = fields.Int32Field()
engine = ReplacingMergeTree('created_date', ('id',)) engine = ReplacingMergeTree('created_date', ('id',))
migrate_db_aliases = ('default', 'secondary')
class ClickHouseCollapseTestModel(ClickHouseModel): class ClickHouseCollapseTestModel(ClickHouseModel):

View File

@ -69,6 +69,11 @@ CLICKHOUSE_DATABASES = {
'db_name': 'test', 'db_name': 'test',
'username': 'default', 'username': 'default',
'password': '' 'password': ''
},
'secondary': {
'db_name': 'test_2',
'username': 'default',
'password': ''
} }
} }

View File

@ -1,6 +1,6 @@
from django.test import TestCase from django.test import TestCase
from django_clickhouse import config from django_clickhouse.configuration import config
class ConfigTest(TestCase): class ConfigTest(TestCase):

66
tests/test_query.py Normal file
View File

@ -0,0 +1,66 @@
from unittest import TestCase
import datetime
from django_clickhouse.database import connections
from django_clickhouse.migrations import migrate_app
from django_clickhouse.query import QuerySet
from tests.clickhouse_models import ClickHouseTestModel
class TestQuerySet(TestCase):
def _recreate_db(self, db_alias):
db = connections[db_alias]
db.drop_database()
db.db_exists = False
db.create_database()
migrate_app('tests', db_alias)
return db
def setUp(self):
self.db = self._recreate_db('default')
self._recreate_db('secondary')
def test_all(self):
self.db.insert([ClickHouseTestModel(id=i, created_date=datetime.date.today(), value=i) for i in range(1, 4)])
qs = ClickHouseTestModel.objects.all()
print(qs.get_database(for_write=True).db_name)
self.assertIsInstance(qs, QuerySet)
self.assertEqual(3, qs.count())
def test_create(self):
ClickHouseTestModel.objects.create(id=1, created_date=datetime.date.today(), value=2)
res = list(self.db.select('SELECT * FROM $table', model_class=ClickHouseTestModel))
self.assertEqual(1, len(res))
self.assertEqual(1, res[0].id)
self.assertEqual(datetime.date.today(), res[0].created_date)
self.assertEqual(2, res[0].value)
def test_bulk_create(self):
ClickHouseTestModel.objects.bulk_create([
ClickHouseTestModel(id=i, created_date=datetime.date.today(), value=i) for i in range(1, 4)
])
res = list(self.db.select('SELECT * FROM $table ORDER BY id', model_class=ClickHouseTestModel))
self.assertEqual(3, len(res))
for i in range(0, 3):
self.assertEqual(i + 1, res[i].id)
self.assertEqual(datetime.date.today(), res[0].created_date)
self.assertEqual(i + 1, res[i].value)
def test_using(self):
self.db.insert(
[ClickHouseTestModel(id=i, created_date=datetime.date.today(), value=i) for i in range(1, 4)]
)
connections['secondary'].insert([
ClickHouseTestModel(id=i, created_date=datetime.date.today(), value=i) for i in range(10, 12)
])
self.assertEqual(3, ClickHouseTestModel.objects.count())
self.assertEqual(3, ClickHouseTestModel.objects_in(self.db).count())
self.assertEqual(2, ClickHouseTestModel.objects_in(self.db).using('secondary').count())
self.assertEqual(2, ClickHouseTestModel.objects.using('secondary').count())
self.assertEqual(3, ClickHouseTestModel.objects.using('default').count())

View File

@ -1,17 +1,14 @@
import datetime import datetime
import signal import signal
import os
from multiprocessing import Process from multiprocessing import Process
from time import sleep from time import sleep
from unittest import skip, expectedFailure from unittest import skip, expectedFailure
import random import os
from django.db import connections as django_connections from django.db import connections as django_connections
from django.db.models import F from django.db.models import F
from django.test import TransactionTestCase, override_settings from django.test import TransactionTestCase
from django_clickhouse import config
from django_clickhouse.database import connections from django_clickhouse.database import connections
from django_clickhouse.migrations import migrate_app from django_clickhouse.migrations import migrate_app
from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel, ClickHouseMultiTestModel from tests.clickhouse_models import ClickHouseTestModel, ClickHouseCollapseTestModel, ClickHouseMultiTestModel