django-rest-framework/tests/test_permissions.py
Xavier Ordoquy b41a6cfa38 permissions: Allow permissions to be composed (#5753)
* permissions: Allow permissions to be composed

Implement a system to compose permissions with and / or.
This is performed by returning an `OperationHolder` instance that keeps the
permission classes and type of composition (and / or).
When called it will return a AND/OR instance that will then delegate the
permission check to the operands.

* permissions: Add documentation about composed permissions

* Fix documentation typo in permissions
2018-10-03 15:36:24 +01:00

585 lines
23 KiB
Python

from __future__ import unicode_literals
import base64
import unittest
import warnings
import django
from django.contrib.auth.models import Group, Permission, User
from django.db import models
from django.test import TestCase
from django.urls import ResolverMatch
from rest_framework import (
HTTP_HEADER_ENCODING, authentication, generics, permissions, serializers,
status, views
)
from rest_framework.compat import is_guardian_installed
from rest_framework.filters import DjangoObjectPermissionsFilter
from rest_framework.routers import DefaultRouter
from rest_framework.test import APIRequestFactory
from tests.models import BasicModel
factory = APIRequestFactory()
class BasicSerializer(serializers.ModelSerializer):
class Meta:
model = BasicModel
fields = '__all__'
class RootView(generics.ListCreateAPIView):
queryset = BasicModel.objects.all()
serializer_class = BasicSerializer
authentication_classes = [authentication.BasicAuthentication]
permission_classes = [permissions.DjangoModelPermissions]
class InstanceView(generics.RetrieveUpdateDestroyAPIView):
queryset = BasicModel.objects.all()
serializer_class = BasicSerializer
authentication_classes = [authentication.BasicAuthentication]
permission_classes = [permissions.DjangoModelPermissions]
class GetQuerySetListView(generics.ListCreateAPIView):
serializer_class = BasicSerializer
authentication_classes = [authentication.BasicAuthentication]
permission_classes = [permissions.DjangoModelPermissions]
def get_queryset(self):
return BasicModel.objects.all()
class EmptyListView(generics.ListCreateAPIView):
queryset = BasicModel.objects.none()
serializer_class = BasicSerializer
authentication_classes = [authentication.BasicAuthentication]
permission_classes = [permissions.DjangoModelPermissions]
root_view = RootView.as_view()
api_root_view = DefaultRouter().get_api_root_view()
instance_view = InstanceView.as_view()
get_queryset_list_view = GetQuerySetListView.as_view()
empty_list_view = EmptyListView.as_view()
def basic_auth_header(username, password):
credentials = ('%s:%s' % (username, password))
base64_credentials = base64.b64encode(credentials.encode(HTTP_HEADER_ENCODING)).decode(HTTP_HEADER_ENCODING)
return 'Basic %s' % base64_credentials
class ModelPermissionsIntegrationTests(TestCase):
def setUp(self):
User.objects.create_user('disallowed', 'disallowed@example.com', 'password')
user = User.objects.create_user('permitted', 'permitted@example.com', 'password')
user.user_permissions.set([
Permission.objects.get(codename='add_basicmodel'),
Permission.objects.get(codename='change_basicmodel'),
Permission.objects.get(codename='delete_basicmodel')
])
user = User.objects.create_user('updateonly', 'updateonly@example.com', 'password')
user.user_permissions.set([
Permission.objects.get(codename='change_basicmodel'),
])
self.permitted_credentials = basic_auth_header('permitted', 'password')
self.disallowed_credentials = basic_auth_header('disallowed', 'password')
self.updateonly_credentials = basic_auth_header('updateonly', 'password')
BasicModel(text='foo').save()
def test_has_create_permissions(self):
request = factory.post('/', {'text': 'foobar'}, format='json',
HTTP_AUTHORIZATION=self.permitted_credentials)
response = root_view(request, pk=1)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def test_api_root_view_discard_default_django_model_permission(self):
"""
We check that DEFAULT_PERMISSION_CLASSES can
apply to APIRoot view. More specifically we check expected behavior of
``_ignore_model_permissions`` attribute support.
"""
request = factory.get('/', format='json',
HTTP_AUTHORIZATION=self.permitted_credentials)
request.resolver_match = ResolverMatch('get', (), {})
response = api_root_view(request)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_get_queryset_has_create_permissions(self):
request = factory.post('/', {'text': 'foobar'}, format='json',
HTTP_AUTHORIZATION=self.permitted_credentials)
response = get_queryset_list_view(request, pk=1)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def test_has_put_permissions(self):
request = factory.put('/1', {'text': 'foobar'}, format='json',
HTTP_AUTHORIZATION=self.permitted_credentials)
response = instance_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_has_delete_permissions(self):
request = factory.delete('/1', HTTP_AUTHORIZATION=self.permitted_credentials)
response = instance_view(request, pk=1)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
def test_does_not_have_create_permissions(self):
request = factory.post('/', {'text': 'foobar'}, format='json',
HTTP_AUTHORIZATION=self.disallowed_credentials)
response = root_view(request, pk=1)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_does_not_have_put_permissions(self):
request = factory.put('/1', {'text': 'foobar'}, format='json',
HTTP_AUTHORIZATION=self.disallowed_credentials)
response = instance_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_does_not_have_delete_permissions(self):
request = factory.delete('/1', HTTP_AUTHORIZATION=self.disallowed_credentials)
response = instance_view(request, pk=1)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_options_permitted(self):
request = factory.options(
'/',
HTTP_AUTHORIZATION=self.permitted_credentials
)
response = root_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('actions', response.data)
self.assertEqual(list(response.data['actions']), ['POST'])
request = factory.options(
'/1',
HTTP_AUTHORIZATION=self.permitted_credentials
)
response = instance_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('actions', response.data)
self.assertEqual(list(response.data['actions']), ['PUT'])
def test_options_disallowed(self):
request = factory.options(
'/',
HTTP_AUTHORIZATION=self.disallowed_credentials
)
response = root_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertNotIn('actions', response.data)
request = factory.options(
'/1',
HTTP_AUTHORIZATION=self.disallowed_credentials
)
response = instance_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertNotIn('actions', response.data)
def test_options_updateonly(self):
request = factory.options(
'/',
HTTP_AUTHORIZATION=self.updateonly_credentials
)
response = root_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertNotIn('actions', response.data)
request = factory.options(
'/1',
HTTP_AUTHORIZATION=self.updateonly_credentials
)
response = instance_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertIn('actions', response.data)
self.assertEqual(list(response.data['actions']), ['PUT'])
def test_empty_view_does_not_assert(self):
request = factory.get('/1', HTTP_AUTHORIZATION=self.permitted_credentials)
response = empty_list_view(request, pk=1)
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_calling_method_not_allowed(self):
request = factory.generic('METHOD_NOT_ALLOWED', '/', HTTP_AUTHORIZATION=self.permitted_credentials)
response = root_view(request)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
request = factory.generic('METHOD_NOT_ALLOWED', '/1', HTTP_AUTHORIZATION=self.permitted_credentials)
response = instance_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
def test_check_auth_before_queryset_call(self):
class View(RootView):
def get_queryset(_):
self.fail('should not reach due to auth check')
view = View.as_view()
request = factory.get('/', HTTP_AUTHORIZATION='')
response = view(request)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_queryset_assertions(self):
class View(views.APIView):
authentication_classes = [authentication.BasicAuthentication]
permission_classes = [permissions.DjangoModelPermissions]
view = View.as_view()
request = factory.get('/', HTTP_AUTHORIZATION=self.permitted_credentials)
msg = 'Cannot apply DjangoModelPermissions on a view that does not set `.queryset` or have a `.get_queryset()` method.'
with self.assertRaisesMessage(AssertionError, msg):
view(request)
# Faulty `get_queryset()` methods should trigger the above "view does not have a queryset" assertion.
class View(RootView):
def get_queryset(self):
return None
view = View.as_view()
request = factory.get('/', HTTP_AUTHORIZATION=self.permitted_credentials)
with self.assertRaisesMessage(AssertionError, 'View.get_queryset() returned None'):
view(request)
class BasicPermModel(models.Model):
text = models.CharField(max_length=100)
class Meta:
app_label = 'tests'
if django.VERSION < (2, 1):
permissions = (
('view_basicpermmodel', 'Can view basic perm model'),
# add, change, delete built in to django
)
class BasicPermSerializer(serializers.ModelSerializer):
class Meta:
model = BasicPermModel
fields = '__all__'
# Custom object-level permission, that includes 'view' permissions
class ViewObjectPermissions(permissions.DjangoObjectPermissions):
perms_map = {
'GET': ['%(app_label)s.view_%(model_name)s'],
'OPTIONS': ['%(app_label)s.view_%(model_name)s'],
'HEAD': ['%(app_label)s.view_%(model_name)s'],
'POST': ['%(app_label)s.add_%(model_name)s'],
'PUT': ['%(app_label)s.change_%(model_name)s'],
'PATCH': ['%(app_label)s.change_%(model_name)s'],
'DELETE': ['%(app_label)s.delete_%(model_name)s'],
}
class ObjectPermissionInstanceView(generics.RetrieveUpdateDestroyAPIView):
queryset = BasicPermModel.objects.all()
serializer_class = BasicPermSerializer
authentication_classes = [authentication.BasicAuthentication]
permission_classes = [ViewObjectPermissions]
object_permissions_view = ObjectPermissionInstanceView.as_view()
class ObjectPermissionListView(generics.ListAPIView):
queryset = BasicPermModel.objects.all()
serializer_class = BasicPermSerializer
authentication_classes = [authentication.BasicAuthentication]
permission_classes = [ViewObjectPermissions]
object_permissions_list_view = ObjectPermissionListView.as_view()
class GetQuerysetObjectPermissionInstanceView(generics.RetrieveUpdateDestroyAPIView):
serializer_class = BasicPermSerializer
authentication_classes = [authentication.BasicAuthentication]
permission_classes = [ViewObjectPermissions]
def get_queryset(self):
return BasicPermModel.objects.all()
get_queryset_object_permissions_view = GetQuerysetObjectPermissionInstanceView.as_view()
@unittest.skipUnless(is_guardian_installed(), 'django-guardian not installed')
class ObjectPermissionsIntegrationTests(TestCase):
"""
Integration tests for the object level permissions API.
"""
def setUp(self):
from guardian.shortcuts import assign_perm
# create users
create = User.objects.create_user
users = {
'fullaccess': create('fullaccess', 'fullaccess@example.com', 'password'),
'readonly': create('readonly', 'readonly@example.com', 'password'),
'writeonly': create('writeonly', 'writeonly@example.com', 'password'),
'deleteonly': create('deleteonly', 'deleteonly@example.com', 'password'),
}
# give everyone model level permissions, as we are not testing those
everyone = Group.objects.create(name='everyone')
model_name = BasicPermModel._meta.model_name
app_label = BasicPermModel._meta.app_label
f = '{0}_{1}'.format
perms = {
'view': f('view', model_name),
'change': f('change', model_name),
'delete': f('delete', model_name)
}
for perm in perms.values():
perm = '{0}.{1}'.format(app_label, perm)
assign_perm(perm, everyone)
everyone.user_set.add(*users.values())
# appropriate object level permissions
readers = Group.objects.create(name='readers')
writers = Group.objects.create(name='writers')
deleters = Group.objects.create(name='deleters')
model = BasicPermModel.objects.create(text='foo')
assign_perm(perms['view'], readers, model)
assign_perm(perms['change'], writers, model)
assign_perm(perms['delete'], deleters, model)
readers.user_set.add(users['fullaccess'], users['readonly'])
writers.user_set.add(users['fullaccess'], users['writeonly'])
deleters.user_set.add(users['fullaccess'], users['deleteonly'])
self.credentials = {}
for user in users.values():
self.credentials[user.username] = basic_auth_header(user.username, 'password')
# Delete
def test_can_delete_permissions(self):
request = factory.delete('/1', HTTP_AUTHORIZATION=self.credentials['deleteonly'])
response = object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
def test_cannot_delete_permissions(self):
request = factory.delete('/1', HTTP_AUTHORIZATION=self.credentials['readonly'])
response = object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
# Update
def test_can_update_permissions(self):
request = factory.patch(
'/1', {'text': 'foobar'}, format='json',
HTTP_AUTHORIZATION=self.credentials['writeonly']
)
response = object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data.get('text'), 'foobar')
def test_cannot_update_permissions(self):
request = factory.patch(
'/1', {'text': 'foobar'}, format='json',
HTTP_AUTHORIZATION=self.credentials['deleteonly']
)
response = object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_cannot_update_permissions_non_existing(self):
request = factory.patch(
'/999', {'text': 'foobar'}, format='json',
HTTP_AUTHORIZATION=self.credentials['deleteonly']
)
response = object_permissions_view(request, pk='999')
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
# Read
def test_can_read_permissions(self):
request = factory.get('/1', HTTP_AUTHORIZATION=self.credentials['readonly'])
response = object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_cannot_read_permissions(self):
request = factory.get('/1', HTTP_AUTHORIZATION=self.credentials['writeonly'])
response = object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_can_read_get_queryset_permissions(self):
"""
same as ``test_can_read_permissions`` but with a view
that rely on ``.get_queryset()`` instead of ``.queryset``.
"""
request = factory.get('/1', HTTP_AUTHORIZATION=self.credentials['readonly'])
response = get_queryset_object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Read list
def test_django_object_permissions_filter_deprecated(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
DjangoObjectPermissionsFilter()
message = ("`DjangoObjectPermissionsFilter` has been deprecated and moved "
"to the 3rd-party django-rest-framework-guardian package.")
self.assertEqual(len(w), 1)
self.assertIs(w[-1].category, DeprecationWarning)
self.assertEqual(str(w[-1].message), message)
def test_can_read_list_permissions(self):
request = factory.get('/', HTTP_AUTHORIZATION=self.credentials['readonly'])
object_permissions_list_view.cls.filter_backends = (DjangoObjectPermissionsFilter,)
# TODO: remove in version 3.10
with warnings.catch_warnings(record=True):
warnings.simplefilter("always")
response = object_permissions_list_view(request)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data[0].get('id'), 1)
def test_cannot_read_list_permissions(self):
request = factory.get('/', HTTP_AUTHORIZATION=self.credentials['writeonly'])
object_permissions_list_view.cls.filter_backends = (DjangoObjectPermissionsFilter,)
# TODO: remove in version 3.10
with warnings.catch_warnings(record=True):
warnings.simplefilter("always")
response = object_permissions_list_view(request)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertListEqual(response.data, [])
def test_cannot_method_not_allowed(self):
request = factory.generic('METHOD_NOT_ALLOWED', '/', HTTP_AUTHORIZATION=self.credentials['readonly'])
response = object_permissions_list_view(request)
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
class BasicPerm(permissions.BasePermission):
def has_permission(self, request, view):
return False
class BasicPermWithDetail(permissions.BasePermission):
message = 'Custom: You cannot access this resource'
def has_permission(self, request, view):
return False
class BasicObjectPerm(permissions.BasePermission):
def has_object_permission(self, request, view, obj):
return False
class BasicObjectPermWithDetail(permissions.BasePermission):
message = 'Custom: You cannot access this resource'
def has_object_permission(self, request, view, obj):
return False
class PermissionInstanceView(generics.RetrieveUpdateDestroyAPIView):
queryset = BasicModel.objects.all()
serializer_class = BasicSerializer
class DeniedView(PermissionInstanceView):
permission_classes = (BasicPerm,)
class DeniedViewWithDetail(PermissionInstanceView):
permission_classes = (BasicPermWithDetail,)
class DeniedObjectView(PermissionInstanceView):
permission_classes = (BasicObjectPerm,)
class DeniedObjectViewWithDetail(PermissionInstanceView):
permission_classes = (BasicObjectPermWithDetail,)
denied_view = DeniedView.as_view()
denied_view_with_detail = DeniedViewWithDetail.as_view()
denied_object_view = DeniedObjectView.as_view()
denied_object_view_with_detail = DeniedObjectViewWithDetail.as_view()
class CustomPermissionsTests(TestCase):
def setUp(self):
BasicModel(text='foo').save()
User.objects.create_user('username', 'username@example.com', 'password')
credentials = basic_auth_header('username', 'password')
self.request = factory.get('/1', format='json', HTTP_AUTHORIZATION=credentials)
self.custom_message = 'Custom: You cannot access this resource'
def test_permission_denied(self):
response = denied_view(self.request, pk=1)
detail = response.data.get('detail')
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertNotEqual(detail, self.custom_message)
def test_permission_denied_with_custom_detail(self):
response = denied_view_with_detail(self.request, pk=1)
detail = response.data.get('detail')
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertEqual(detail, self.custom_message)
def test_permission_denied_for_object(self):
response = denied_object_view(self.request, pk=1)
detail = response.data.get('detail')
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertNotEqual(detail, self.custom_message)
def test_permission_denied_for_object_with_custom_detail(self):
response = denied_object_view_with_detail(self.request, pk=1)
detail = response.data.get('detail')
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertEqual(detail, self.custom_message)
class FakeUser:
def __init__(self, auth=False):
self.is_authenticated = auth
class PermissionsCompositionTests(TestCase):
def test_and_false(self):
request = factory.get('/1', format='json')
request.user = FakeUser(auth=False)
composed_perm = permissions.IsAuthenticated & permissions.AllowAny
assert composed_perm().has_permission(request, None) is False
def test_and_true(self):
request = factory.get('/1', format='json')
request.user = FakeUser(auth=True)
composed_perm = permissions.IsAuthenticated & permissions.AllowAny
assert composed_perm().has_permission(request, None) is True
def test_or_false(self):
request = factory.get('/1', format='json')
request.user = FakeUser(auth=False)
composed_perm = permissions.IsAuthenticated | permissions.AllowAny
assert composed_perm().has_permission(request, None) is True
def test_or_true(self):
request = factory.get('/1', format='json')
request.user = FakeUser(auth=True)
composed_perm = permissions.IsAuthenticated | permissions.AllowAny
assert composed_perm().has_permission(request, None) is True
def test_several_levels(self):
request = factory.get('/1', format='json')
request.user = FakeUser(auth=True)
composed_perm = (
permissions.IsAuthenticated &
permissions.IsAuthenticated &
permissions.IsAuthenticated &
permissions.IsAuthenticated
)
assert composed_perm().has_permission(request, None) is True