M1ha 2020-02-06 18:57:49 +05:00
parent 951c13ad7d
commit 818704989b
2 changed files with 44 additions and 9 deletions

View File

@ -1,5 +1,9 @@
import sys import sys
from collections import namedtuple as basenamedtuple from collections import namedtuple as basenamedtuple
from typing import Any, Set
from django.db import transaction, connections
from django.db.models import QuerySet
def namedtuple(*args, **kwargs): def namedtuple(*args, **kwargs):
@ -16,3 +20,36 @@ def namedtuple(*args, **kwargs):
return TupleClass return TupleClass
else: else:
return basenamedtuple(*args, **kwargs) return basenamedtuple(*args, **kwargs)
def django_pg_returning_available(using: str) -> bool:
"""
Checks if django-pg-returning library is installed and can be used with given databse
:return: Boolean
"""
try:
import django_pg_returning
return connections[using].vendor == 'postgresql'
except ImportError:
return False
def update_returning_pk(qs: QuerySet, updates: dict) -> Set[Any]:
"""
Updates QuerySet items returning primary key values.
This method should not depend on database engine, though can have optimization performances for some engines.
:param qs: QuerySet to update
:param updates: Update items as passed to QuerySet.update(**updates) method
:return: A set of primary keys
"""
qs._for_write = True
if django_pg_returning_available(qs.db):
pk_name = qs.model._meta.pk.name
qs = qs.only(pk_name).update_returning(**updates)
pks = set(qs.values_list(pk_name, flat=True))
else:
with transaction.atomic(using=qs.db):
pks = set(qs.select_for_update().values_list('pk', flat=True))
QuerySet.update(qs, **updates)
return pks

View File

@ -13,6 +13,7 @@ from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver from django.dispatch import receiver
from statsd.defaults.django import statsd from statsd.defaults.django import statsd
from .compatibility import update_returning_pk
from .configuration import config from .configuration import config
from .storages import Storage from .storages import Storage
from .utils import lazy_class_import from .utils import lazy_class_import
@ -32,9 +33,9 @@ except ImportError:
class ClickHouseSyncRegisterMixin: class ClickHouseSyncRegisterMixin:
def _register_ops(self, operation, result): def _register_ops(self, operation, result, as_int: bool = False):
pk_name = self.model._meta.pk.name pk_name = self.model._meta.pk.name
pk_list = [getattr(item, pk_name) for item in result] pk_list = [getattr(item, pk_name) if isinstance(item, DjangoModel) else item for item in result]
self.model.register_clickhouse_operations(operation, *pk_list, using=self.db) self.model.register_clickhouse_operations(operation, *pk_list, using=self.db)
@ -87,12 +88,9 @@ class ClickHouseSyncBulkUpdateQuerySetMixin(ClickHouseSyncRegisterMixin, BulkUpd
class ClickHouseSyncQuerySetMixin(ClickHouseSyncRegisterMixin): class ClickHouseSyncQuerySetMixin(ClickHouseSyncRegisterMixin):
def update(self, **kwargs): def update(self, **kwargs):
# BUG I use update_returning method here. But it is not suitable for databases other then PostgreSQL pks = update_returning_pk(self, kwargs)
# and requires django-pg-update-returning installed self._register_ops('update', pks)
pk_name = self.model._meta.pk.name return len(pks)
res = self.only(pk_name).update_returning(**kwargs)
self._register_ops('update', res)
return len(res)
def bulk_create(self, objs, batch_size=None): def bulk_create(self, objs, batch_size=None):
objs = super().bulk_create(objs, batch_size=batch_size) objs = super().bulk_create(objs, batch_size=batch_size)
@ -168,7 +166,7 @@ class ClickHouseSyncModel(DjangoModel):
:param using: Database alias registered instances are from :param using: Database alias registered instances are from
:return: None :return: None
""" """
model_pks = ['%s.%d' % (using or config.DEFAULT_DB_ALIAS, pk) for pk in model_pks] model_pks = ['%s.%s' % (using or config.DEFAULT_DB_ALIAS, pk) for pk in model_pks]
def _on_commit(): def _on_commit():
for model_cls in cls.get_clickhouse_sync_models(): for model_cls in cls.get_clickhouse_sync_models():