Fix unique together validator doesn't respect condition's fields (#9360)

This commit is contained in:
Konstantin Alekseev 2025-02-17 10:01:32 +02:00 committed by GitHub
parent f30c0e2eed
commit 17e95604f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 140 additions and 40 deletions

View File

@ -3,6 +3,9 @@ The `compat` module provides support for backwards compatibility with older
versions of Django/Python, and compatibility wrappers around optional packages. versions of Django/Python, and compatibility wrappers around optional packages.
""" """
import django import django
from django.db import models
from django.db.models.constants import LOOKUP_SEP
from django.db.models.sql.query import Node
from django.views.generic import View from django.views.generic import View
@ -157,6 +160,10 @@ if django.VERSION >= (5, 1):
# 1) the list of validators and 2) the error message. Starting from # 1) the list of validators and 2) the error message. Starting from
# Django 5.1 ip_address_validators only returns the list of validators # Django 5.1 ip_address_validators only returns the list of validators
from django.core.validators import ip_address_validators from django.core.validators import ip_address_validators
def get_referenced_base_fields_from_q(q):
return q.referenced_base_fields
else: else:
# Django <= 5.1: create a compatibility shim for ip_address_validators # Django <= 5.1: create a compatibility shim for ip_address_validators
from django.core.validators import \ from django.core.validators import \
@ -165,6 +172,35 @@ else:
def ip_address_validators(protocol, unpack_ipv4): def ip_address_validators(protocol, unpack_ipv4):
return _ip_address_validators(protocol, unpack_ipv4)[0] return _ip_address_validators(protocol, unpack_ipv4)[0]
# Django < 5.1: create a compatibility shim for Q.referenced_base_fields
# https://github.com/django/django/blob/5.1a1/django/db/models/query_utils.py#L179
def _get_paths_from_expression(expr):
if isinstance(expr, models.F):
yield expr.name
elif hasattr(expr, 'flatten'):
for child in expr.flatten():
if isinstance(child, models.F):
yield child.name
elif isinstance(child, models.Q):
yield from _get_children_from_q(child)
def _get_children_from_q(q):
for child in q.children:
if isinstance(child, Node):
yield from _get_children_from_q(child)
elif isinstance(child, tuple):
lhs, rhs = child
yield lhs
if hasattr(rhs, 'resolve_expression'):
yield from _get_paths_from_expression(rhs)
elif hasattr(child, 'resolve_expression'):
yield from _get_paths_from_expression(child)
def get_referenced_base_fields_from_q(q):
return {
child.split(LOOKUP_SEP, 1)[0] for child in _get_children_from_q(q)
}
# `separators` argument to `json.dumps()` differs between 2.x and 3.x # `separators` argument to `json.dumps()` differs between 2.x and 3.x
# See: https://bugs.python.org/issue22767 # See: https://bugs.python.org/issue22767

View File

@ -26,7 +26,9 @@ from django.utils import timezone
from django.utils.functional import cached_property from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from rest_framework.compat import postgres_fields from rest_framework.compat import (
get_referenced_base_fields_from_q, postgres_fields
)
from rest_framework.exceptions import ErrorDetail, ValidationError from rest_framework.exceptions import ErrorDetail, ValidationError
from rest_framework.fields import get_error_detail from rest_framework.fields import get_error_detail
from rest_framework.settings import api_settings from rest_framework.settings import api_settings
@ -1425,20 +1427,20 @@ class ModelSerializer(Serializer):
def get_unique_together_constraints(self, model): def get_unique_together_constraints(self, model):
""" """
Returns iterator of (fields, queryset), each entry describes an unique together Returns iterator of (fields, queryset, condition_fields, condition),
constraint on `fields` in `queryset`. each entry describes an unique together constraint on `fields` in `queryset`
with respect of constraint's `condition`.
""" """
for parent_class in [model] + list(model._meta.parents): for parent_class in [model] + list(model._meta.parents):
for unique_together in parent_class._meta.unique_together: for unique_together in parent_class._meta.unique_together:
yield unique_together, model._default_manager yield unique_together, model._default_manager, [], None
for constraint in parent_class._meta.constraints: for constraint in parent_class._meta.constraints:
if isinstance(constraint, models.UniqueConstraint) and len(constraint.fields) > 1: if isinstance(constraint, models.UniqueConstraint) and len(constraint.fields) > 1:
yield ( if constraint.condition is None:
constraint.fields, condition_fields = []
model._default_manager else:
if constraint.condition is None condition_fields = list(get_referenced_base_fields_from_q(constraint.condition))
else model._default_manager.filter(constraint.condition) yield (constraint.fields, model._default_manager, condition_fields, constraint.condition)
)
def get_uniqueness_extra_kwargs(self, field_names, declared_fields, extra_kwargs): def get_uniqueness_extra_kwargs(self, field_names, declared_fields, extra_kwargs):
""" """
@ -1470,9 +1472,10 @@ class ModelSerializer(Serializer):
# Include each of the `unique_together` and `UniqueConstraint` field names, # Include each of the `unique_together` and `UniqueConstraint` field names,
# so long as all the field names are included on the serializer. # so long as all the field names are included on the serializer.
for unique_together_list, queryset in self.get_unique_together_constraints(model): for unique_together_list, queryset, condition_fields, condition in self.get_unique_together_constraints(model):
if set(field_names).issuperset(unique_together_list): unique_together_list_and_condition_fields = set(unique_together_list) | set(condition_fields)
unique_constraint_names |= set(unique_together_list) if set(field_names).issuperset(unique_together_list_and_condition_fields):
unique_constraint_names |= unique_together_list_and_condition_fields
# Now we have all the field names that have uniqueness constraints # Now we have all the field names that have uniqueness constraints
# applied, we can add the extra 'required=...' or 'default=...' # applied, we can add the extra 'required=...' or 'default=...'
@ -1594,12 +1597,13 @@ class ModelSerializer(Serializer):
# Note that we make sure to check `unique_together` both on the # Note that we make sure to check `unique_together` both on the
# base model class, but also on any parent classes. # base model class, but also on any parent classes.
validators = [] validators = []
for unique_together, queryset in self.get_unique_together_constraints(self.Meta.model): for unique_together, queryset, condition_fields, condition in self.get_unique_together_constraints(self.Meta.model):
# Skip if serializer does not map to all unique together sources # Skip if serializer does not map to all unique together sources
if not set(source_map).issuperset(unique_together): unique_together_and_condition_fields = set(unique_together) | set(condition_fields)
if not set(source_map).issuperset(unique_together_and_condition_fields):
continue continue
for source in unique_together: for source in unique_together_and_condition_fields:
assert len(source_map[source]) == 1, ( assert len(source_map[source]) == 1, (
"Unable to create `UniqueTogetherValidator` for " "Unable to create `UniqueTogetherValidator` for "
"`{model}.{field}` as `{serializer}` has multiple " "`{model}.{field}` as `{serializer}` has multiple "
@ -1618,7 +1622,9 @@ class ModelSerializer(Serializer):
field_names = tuple(source_map[f][0] for f in unique_together) field_names = tuple(source_map[f][0] for f in unique_together)
validator = UniqueTogetherValidator( validator = UniqueTogetherValidator(
queryset=queryset, queryset=queryset,
fields=field_names fields=field_names,
condition_fields=tuple(source_map[f][0] for f in condition_fields),
condition=condition,
) )
validators.append(validator) validators.append(validator)
return validators return validators

View File

@ -6,7 +6,9 @@ This gives us better separation of concerns, allows us to use single-step
object creation, and makes it possible to switch between using the implicit object creation, and makes it possible to switch between using the implicit
`ModelSerializer` class and an equivalent explicit `Serializer` class. `ModelSerializer` class and an equivalent explicit `Serializer` class.
""" """
from django.core.exceptions import FieldError
from django.db import DataError from django.db import DataError
from django.db.models import Exists
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ValidationError from rest_framework.exceptions import ValidationError
@ -23,6 +25,17 @@ def qs_exists(queryset):
return False return False
def qs_exists_with_condition(queryset, condition, against):
if condition is None:
return qs_exists(queryset)
try:
# use the same query as UniqueConstraint.validate
# https://github.com/django/django/blob/7ba2a0db20c37a5b1500434ca4ed48022311c171/django/db/models/constraints.py#L672
return (condition & Exists(queryset.filter(condition))).check(against)
except (TypeError, ValueError, DataError, FieldError):
return False
def qs_filter(queryset, **kwargs): def qs_filter(queryset, **kwargs):
try: try:
return queryset.filter(**kwargs) return queryset.filter(**kwargs)
@ -99,10 +112,12 @@ class UniqueTogetherValidator:
missing_message = _('This field is required.') missing_message = _('This field is required.')
requires_context = True requires_context = True
def __init__(self, queryset, fields, message=None): def __init__(self, queryset, fields, message=None, condition_fields=None, condition=None):
self.queryset = queryset self.queryset = queryset
self.fields = fields self.fields = fields
self.message = message or self.message self.message = message or self.message
self.condition_fields = [] if condition_fields is None else condition_fields
self.condition = condition
def enforce_required_fields(self, attrs, serializer): def enforce_required_fields(self, attrs, serializer):
""" """
@ -114,7 +129,7 @@ class UniqueTogetherValidator:
missing_items = { missing_items = {
field_name: self.missing_message field_name: self.missing_message
for field_name in self.fields for field_name in (*self.fields, *self.condition_fields)
if serializer.fields[field_name].source not in attrs if serializer.fields[field_name].source not in attrs
} }
if missing_items: if missing_items:
@ -173,16 +188,19 @@ class UniqueTogetherValidator:
if attrs[field_name] != getattr(serializer.instance, field_name) if attrs[field_name] != getattr(serializer.instance, field_name)
] ]
if checked_values and None not in checked_values and qs_exists(queryset): condition_kwargs = {source: attrs[source] for source in self.condition_fields}
if checked_values and None not in checked_values and qs_exists_with_condition(queryset, self.condition, condition_kwargs):
field_names = ', '.join(self.fields) field_names = ', '.join(self.fields)
message = self.message.format(field_names=field_names) message = self.message.format(field_names=field_names)
raise ValidationError(message, code='unique') raise ValidationError(message, code='unique')
def __repr__(self): def __repr__(self):
return '<%s(queryset=%s, fields=%s)>' % ( return '<{}({})>'.format(
self.__class__.__name__, self.__class__.__name__,
smart_repr(self.queryset), ', '.join(
smart_repr(self.fields) f'{attr}={smart_repr(getattr(self, attr))}'
for attr in ('queryset', 'fields', 'condition')
if getattr(self, attr) is not None)
) )
def __eq__(self, other): def __eq__(self, other):

View File

@ -521,7 +521,7 @@ class UniqueConstraintModel(models.Model):
race_name = models.CharField(max_length=100) race_name = models.CharField(max_length=100)
position = models.IntegerField() position = models.IntegerField()
global_id = models.IntegerField() global_id = models.IntegerField()
fancy_conditions = models.IntegerField(null=True) fancy_conditions = models.IntegerField()
class Meta: class Meta:
constraints = [ constraints = [
@ -543,7 +543,12 @@ class UniqueConstraintModel(models.Model):
name="unique_constraint_model_together_uniq", name="unique_constraint_model_together_uniq",
fields=('race_name', 'position'), fields=('race_name', 'position'),
condition=models.Q(race_name='example'), condition=models.Q(race_name='example'),
) ),
models.UniqueConstraint(
name='unique_constraint_model_together_uniq2',
fields=('race_name', 'position'),
condition=models.Q(fancy_conditions__gte=10),
),
] ]
@ -576,17 +581,20 @@ class TestUniqueConstraintValidation(TestCase):
self.instance = UniqueConstraintModel.objects.create( self.instance = UniqueConstraintModel.objects.create(
race_name='example', race_name='example',
position=1, position=1,
global_id=1 global_id=1,
fancy_conditions=1
) )
UniqueConstraintModel.objects.create( UniqueConstraintModel.objects.create(
race_name='example', race_name='example',
position=2, position=2,
global_id=2 global_id=2,
fancy_conditions=1
) )
UniqueConstraintModel.objects.create( UniqueConstraintModel.objects.create(
race_name='other', race_name='other',
position=1, position=1,
global_id=3 global_id=3,
fancy_conditions=1
) )
def test_repr(self): def test_repr(self):
@ -601,22 +609,55 @@ class TestUniqueConstraintValidation(TestCase):
position = IntegerField\(.*required=True\) position = IntegerField\(.*required=True\)
global_id = IntegerField\(.*validators=\[<UniqueValidator\(queryset=UniqueConstraintModel.objects.all\(\)\)>\]\) global_id = IntegerField\(.*validators=\[<UniqueValidator\(queryset=UniqueConstraintModel.objects.all\(\)\)>\]\)
class Meta: class Meta:
validators = \[<UniqueTogetherValidator\(queryset=<QuerySet \[<UniqueConstraintModel: UniqueConstraintModel object \(1\)>, <UniqueConstraintModel: UniqueConstraintModel object \(2\)>\]>, fields=\('race_name', 'position'\)\)>\] validators = \[<UniqueTogetherValidator\(queryset=UniqueConstraintModel.objects.all\(\), fields=\('race_name', 'position'\), condition=<Q: \(AND: \('race_name', 'example'\)\)>\)>\]
""") """)
assert re.search(expected, repr(serializer)) is not None assert re.search(expected, repr(serializer)) is not None
def test_unique_together_field(self): def test_unique_together_condition(self):
""" """
UniqueConstraint fields and condition attributes must be passed Fields used in UniqueConstraint's condition must be included
to UniqueTogetherValidator as fields and queryset into queryset existence check
""" """
serializer = UniqueConstraintSerializer() UniqueConstraintModel.objects.create(
assert len(serializer.validators) == 1 race_name='condition',
validator = serializer.validators[0] position=1,
assert validator.fields == ('race_name', 'position') global_id=10,
assert set(validator.queryset.values_list(flat=True)) == set( fancy_conditions=10,
UniqueConstraintModel.objects.filter(race_name='example').values_list(flat=True)
) )
serializer = UniqueConstraintSerializer(data={
'race_name': 'condition',
'position': 1,
'global_id': 11,
'fancy_conditions': 9,
})
assert serializer.is_valid()
serializer = UniqueConstraintSerializer(data={
'race_name': 'condition',
'position': 1,
'global_id': 11,
'fancy_conditions': 11,
})
assert not serializer.is_valid()
def test_unique_together_condition_fields_required(self):
"""
Fields used in UniqueConstraint's condition must be present in serializer
"""
serializer = UniqueConstraintSerializer(data={
'race_name': 'condition',
'position': 1,
'global_id': 11,
})
assert not serializer.is_valid()
assert serializer.errors == {'fancy_conditions': ['This field is required.']}
class NoFieldsSerializer(serializers.ModelSerializer):
class Meta:
model = UniqueConstraintModel
fields = ('race_name', 'position', 'global_id')
serializer = NoFieldsSerializer()
assert len(serializer.validators) == 1
def test_single_field_uniq_validators(self): def test_single_field_uniq_validators(self):
""" """
@ -625,9 +666,8 @@ class TestUniqueConstraintValidation(TestCase):
""" """
# Django 5 includes Max and Min values validators for IntergerField # Django 5 includes Max and Min values validators for IntergerField
extra_validators_qty = 2 if django_version[0] >= 5 else 0 extra_validators_qty = 2 if django_version[0] >= 5 else 0
#
serializer = UniqueConstraintSerializer() serializer = UniqueConstraintSerializer()
assert len(serializer.validators) == 1 assert len(serializer.validators) == 2
validators = serializer.fields['global_id'].validators validators = serializer.fields['global_id'].validators
assert len(validators) == 1 + extra_validators_qty assert len(validators) == 1 + extra_validators_qty
assert validators[0].queryset == UniqueConstraintModel.objects assert validators[0].queryset == UniqueConstraintModel.objects