mirror of
https://github.com/encode/django-rest-framework.git
synced 2024-11-10 19:56:59 +03:00
first pass at object level permissions and tests
This commit is contained in:
parent
57d6b5fb7c
commit
118645e480
|
@ -7,6 +7,7 @@ import warnings
|
|||
|
||||
SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS']
|
||||
|
||||
from django.http import Http404
|
||||
from rest_framework.compat import oauth2_provider_scope, oauth2_constants, guardian
|
||||
|
||||
|
||||
|
@ -152,9 +153,54 @@ class DjangoModelPermissionsOrAnonReadOnly(DjangoModelPermissions):
|
|||
|
||||
|
||||
class DjangoObjectLevelModelPermissions(DjangoModelPermissions):
|
||||
"""
|
||||
Basic object level permissions utilizing django-guardian.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
assert guardian, 'Using DjangoObjectLevelModelPermissions, but django-guardian is not installed'
|
||||
|
||||
action_perm_map = {
|
||||
'list': 'read',
|
||||
'retrieve': 'read',
|
||||
'create': 'add',
|
||||
'partial_update': 'change',
|
||||
'update': 'change',
|
||||
'destroy': 'delete',
|
||||
}
|
||||
|
||||
def _get_names(self, view):
|
||||
model_cls = getattr(view, 'model', None)
|
||||
queryset = getattr(view, 'queryset', None)
|
||||
|
||||
if model_cls is None and queryset is not None:
|
||||
model_cls = queryset.model
|
||||
if not model_cls: # no model, no model based permissions
|
||||
return None
|
||||
model_name = model_cls._meta.module_name
|
||||
return model_name
|
||||
|
||||
def has_permission(self, request, view):
|
||||
if view.action == 'list':
|
||||
user = request.user
|
||||
queryset = view.get_queryset()
|
||||
model_name = self._get_names(view)
|
||||
view.queryset = guardian.shortcuts.get_objects_for_user(user, 'read_' + model_name, queryset) #TODO: move to filter
|
||||
return super(DjangoObjectLevelModelPermissions, self).has_permission(request, view)
|
||||
|
||||
def has_object_permission(self, request, view, obj):
|
||||
user = request.user
|
||||
model_name = self._get_names(view)
|
||||
action = self.action_perm_map.get(view.action)
|
||||
|
||||
assert action, "Tried to determine object permissions but no action specified in view"
|
||||
|
||||
perm = "{action}_{model_name}".format(action=action, model_name=model_name)
|
||||
check = user.has_perm(perm, obj)
|
||||
if not check:
|
||||
raise Http404
|
||||
return user.has_perm(perm, obj)
|
||||
|
||||
|
||||
class TokenHasReadWriteScope(BasePermission):
|
||||
"""
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
from __future__ import unicode_literals
|
||||
from django.contrib.auth.models import User, Permission
|
||||
from django.contrib.auth.models import User, Permission, Group
|
||||
from django.db import models
|
||||
from django.test import TestCase
|
||||
from rest_framework import generics, status, permissions, authentication, HTTP_HEADER_ENCODING
|
||||
from rest_framework.compat import guardian
|
||||
from rest_framework.test import APIRequestFactory
|
||||
from rest_framework.tests.models import BasicModel
|
||||
from rest_framework.settings import api_settings
|
||||
import base64
|
||||
|
||||
factory = APIRequestFactory()
|
||||
|
@ -142,67 +141,142 @@ class ModelPermissionsIntegrationTests(TestCase):
|
|||
self.assertEqual(list(response.data['actions'].keys()), ['PUT'])
|
||||
|
||||
|
||||
class BasicPermModel(BasicModel):
|
||||
class BasicPermModel(models.Model):
|
||||
text = models.CharField(max_length=100)
|
||||
|
||||
class Meta:
|
||||
app_label = 'tests'
|
||||
permissions = (
|
||||
('read_basicpermmodel', "Can view basic perm model"),
|
||||
('read_basicpermmodel', 'Can view basic perm model'),
|
||||
# add, change, delete built in to django
|
||||
)
|
||||
|
||||
class ObjectPermissionInstanceView(generics.RetrieveUpdateDestroyAPIView):
|
||||
model = BasicModel
|
||||
model = BasicPermModel
|
||||
authentication_classes = [authentication.BasicAuthentication]
|
||||
permission_classes = [permissions.DjangoObjectLevelModelPermissions]
|
||||
|
||||
|
||||
object_permissions_view = ObjectPermissionInstanceView.as_view()
|
||||
|
||||
class ObjectPermissionListView(generics.ListAPIView):
|
||||
model = BasicPermModel
|
||||
authentication_classes = [authentication.BasicAuthentication]
|
||||
permission_classes = [permissions.DjangoObjectLevelModelPermissions]
|
||||
|
||||
object_permissions_list_view = ObjectPermissionListView.as_view()
|
||||
|
||||
if guardian:
|
||||
from guardian.shortcuts import assign_perm
|
||||
|
||||
class ObjectPermissionsIntegrationTests(TestCase):
|
||||
"""
|
||||
Integration tests for the object level permissions API.
|
||||
"""
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
# create users
|
||||
create = User.objects.create_user
|
||||
users = {
|
||||
'fullaccess': create('fullaccess', 'fullaccess@example.com', 'password'),
|
||||
'readonly': create('readonly', 'readonly@example.com', 'password'),
|
||||
'writeonly': create('writeonly', 'writeonly@example.com', 'password'),
|
||||
'deleteonly': create('deleteonly', 'deleteonly@example.com', 'password'),
|
||||
}
|
||||
|
||||
# give everyone model level permissions, as we are not testing those
|
||||
everyone = Group.objects.create(name='everyone')
|
||||
model_name = BasicPermModel._meta.module_name
|
||||
app_label = BasicPermModel._meta.app_label
|
||||
f = '{0}_{1}'.format
|
||||
perms = {
|
||||
'read': f('read', model_name),
|
||||
'change': f('change', model_name),
|
||||
'delete': f('delete', model_name)
|
||||
}
|
||||
for perm in perms.values():
|
||||
perm = '{0}.{1}'.format(app_label, perm)
|
||||
assign_perm(perm, everyone)
|
||||
everyone.user_set.add(*users.values())
|
||||
|
||||
cls.perms = perms
|
||||
cls.users = users
|
||||
|
||||
def setUp(self):
|
||||
# create users
|
||||
User.objects.create_user('no_permission', 'no_permission@example.com', 'password')
|
||||
reader = User.objects.create_user('reader', 'reader@example.com', 'password')
|
||||
writer = User.objects.create_user('writer', 'writer@example.com', 'password')
|
||||
full_access = User.objects.create_user('full_access', 'full_access@example.com', 'password')
|
||||
|
||||
perms = self.perms
|
||||
users = self.users
|
||||
|
||||
# appropriate object level permissions
|
||||
readers = Group.objects.create(name='readers')
|
||||
writers = Group.objects.create(name='writers')
|
||||
deleters = Group.objects.create(name='deleters')
|
||||
|
||||
model = BasicPermModel.objects.create(text='foo')
|
||||
|
||||
assign_perm(perms['read'], readers, model)
|
||||
assign_perm(perms['change'], writers, model)
|
||||
assign_perm(perms['delete'], deleters, model)
|
||||
|
||||
# assign permissions appropriately
|
||||
from guardian.shortcuts import assign_perm
|
||||
readers.user_set.add(users['fullaccess'], users['readonly'])
|
||||
writers.user_set.add(users['fullaccess'], users['writeonly'])
|
||||
deleters.user_set.add(users['fullaccess'], users['deleteonly'])
|
||||
|
||||
read = "read_basicpermmodel"
|
||||
write = "change_basicpermmodel"
|
||||
delete = "delete_basicpermmodel"
|
||||
app_label = 'tests.'
|
||||
# model level permissions
|
||||
assign_perm(app_label + delete, full_access, obj=model)
|
||||
(assign_perm(app_label + write, user, obj=model) for user in (writer, full_access))
|
||||
(assign_perm(app_label + read, user, obj=model) for user in (reader, writer, full_access))
|
||||
self.credentials = {}
|
||||
for user in users.values():
|
||||
self.credentials[user.username] = basic_auth_header(user.username, 'password')
|
||||
|
||||
# object level permissions
|
||||
assign_perm(delete, full_access, obj=model)
|
||||
(assign_perm(write, user, obj=model) for user in (writer, full_access))
|
||||
(assign_perm(read, user, obj=model) for user in (reader, writer, full_access))
|
||||
|
||||
self.no_permission_credentials = basic_auth_header('no_permission', 'password')
|
||||
self.reader_credentials = basic_auth_header('reader', 'password')
|
||||
self.writer_credentials = basic_auth_header('writer', 'password')
|
||||
self.full_access_credentials = basic_auth_header('full_access', 'password')
|
||||
|
||||
|
||||
def test_has_delete_permissions(self):
|
||||
request = factory.delete('/1', HTTP_AUTHORIZATION=self.full_access_credentials)
|
||||
# Delete
|
||||
def test_can_delete_permissions(self):
|
||||
request = factory.delete('/1', HTTP_AUTHORIZATION=self.credentials['deleteonly'])
|
||||
object_permissions_view.cls.action = 'destroy'
|
||||
response = object_permissions_view(request, pk='1')
|
||||
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
|
||||
|
||||
def test_no_delete_permissions(self):
|
||||
request = factory.delete('/1', HTTP_AUTHORIZATION=self.writer_credentials)
|
||||
def test_cannot_delete_permissions(self):
|
||||
request = factory.delete('/1', HTTP_AUTHORIZATION=self.credentials['readonly'])
|
||||
object_permissions_view.cls.action = 'destroy'
|
||||
response = object_permissions_view(request, pk='1')
|
||||
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
|
||||
|
||||
# Update
|
||||
def test_can_update_permissions(self):
|
||||
request = factory.patch('/1', {'text': 'foobar'}, format='json',
|
||||
HTTP_AUTHORIZATION=self.credentials['writeonly'])
|
||||
object_permissions_view.cls.action = 'partial_update'
|
||||
response = object_permissions_view(request, pk='1')
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data.get('text'), 'foobar')
|
||||
|
||||
def test_cannot_update_permissions(self):
|
||||
request = factory.patch('/1', {'text': 'foobar'}, format='json',
|
||||
HTTP_AUTHORIZATION=self.credentials['deleteonly'])
|
||||
object_permissions_view.cls.action = 'partial_update'
|
||||
response = object_permissions_view(request, pk='1')
|
||||
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
|
||||
|
||||
# Read
|
||||
def test_can_read_permissions(self):
|
||||
request = factory.get('/1', HTTP_AUTHORIZATION=self.credentials['readonly'])
|
||||
object_permissions_view.cls.action = 'retrieve'
|
||||
response = object_permissions_view(request, pk='1')
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_cannot_read_permissions(self):
|
||||
request = factory.get('/1', HTTP_AUTHORIZATION=self.credentials['writeonly'])
|
||||
object_permissions_view.cls.action = 'retrieve'
|
||||
response = object_permissions_view(request, pk='1')
|
||||
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
|
||||
|
||||
# Read list
|
||||
def test_can_read_list_permissions(self):
|
||||
request = factory.get('/', HTTP_AUTHORIZATION=self.credentials['readonly'])
|
||||
object_permissions_list_view.cls.action = 'list'
|
||||
response = object_permissions_list_view(request)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(response.data[0].get('id'), 1)
|
||||
|
||||
def test_cannot_read_list_permissions(self):
|
||||
request = factory.get('/', HTTP_AUTHORIZATION=self.credentials['writeonly'])
|
||||
object_permissions_list_view.cls.action = 'list'
|
||||
response = object_permissions_list_view(request)
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
self.assertListEqual(response.data, [])
|
Loading…
Reference in New Issue
Block a user