removed unnecessary guardian req and view.action parsing

This commit is contained in:
bwreilly 2013-09-09 08:39:09 -07:00
parent 9ff0f6d3bf
commit 0183c69538
2 changed files with 26 additions and 37 deletions

View File

@ -8,8 +8,7 @@ import warnings
SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS'] SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS']
from django.http import Http404 from django.http import Http404
from rest_framework.compat import oauth2_provider_scope, oauth2_constants, guardian from rest_framework.compat import oauth2_provider_scope, oauth2_constants
from rest_framework.filters import ObjectPermissionReaderFilter
class BasePermission(object): class BasePermission(object):
@ -158,47 +157,42 @@ class DjangoObjectLevelModelPermissions(DjangoModelPermissions):
Basic object level permissions utilizing django-guardian. Basic object level permissions utilizing django-guardian.
""" """
def __init__(self): actions_map = {
assert guardian, 'Using DjangoObjectLevelModelPermissions, but django-guardian is not installed' 'GET': ['read_%(model_name)s'],
'OPTIONS': ['read_%(model_name)s'],
action_perm_map = { 'HEAD': ['read_%(model_name)s'],
'list': 'read', 'POST': ['add_%(model_name)s'],
'retrieve': 'read', 'PUT': ['change_%(model_name)s'],
'create': 'add', 'PATCH': ['change_%(model_name)s'],
'partial_update': 'change', 'DELETE': ['delete_%(model_name)s'],
'update': 'change',
'destroy': 'delete',
} }
def _get_model_name(self, view): def get_required_object_permissions(self, method, model_cls):
model_cls = getattr(view, 'model', None) kwargs = {
queryset = getattr(view, 'queryset', None) 'model_name': model_cls._meta.module_name
}
if model_cls is None and queryset is not None: return [perm % kwargs for perm in self.actions_map[method]]
model_cls = queryset.model
if not model_cls: # no model, no model based permissions
return None
model_name = model_cls._meta.module_name
return model_name
def has_permission(self, request, view): def has_permission(self, request, view):
if view.action == 'list': if getattr(view, 'action', None) == 'list':
queryset = view.get_queryset() queryset = view.get_queryset()
view.queryset = ObjectPermissionReaderFilter().filter_queryset(request, queryset, view) view.queryset = ObjectPermissionReaderFilter().filter_queryset(request, queryset, view)
return super(DjangoObjectLevelModelPermissions, self).has_permission(request, view) return super(DjangoObjectLevelModelPermissions, self).has_permission(request, view)
def has_object_permission(self, request, view, obj): def has_object_permission(self, request, view, obj):
action = self.action_perm_map.get(view.action) model_cls = getattr(view, 'model', None)
assert action, "Tried to determine object permissions but no action specified in view" queryset = getattr(view, 'queryset', None)
if model_cls is None and queryset is not None:
model_cls = queryset.model
perms = self.get_required_object_permissions(request.method, model_cls)
user = request.user user = request.user
model_name = self._get_model_name(view)
perm = "{action}_{model_name}".format(action=action, model_name=model_name) check = user.has_perms(perms, obj)
check = user.has_perm(perm, obj)
if not check: if not check:
raise Http404 raise Http404
return user.has_perm(perm, obj) return user.has_perms(perms, obj)
class TokenHasReadWriteScope(BasePermission): class TokenHasReadWriteScope(BasePermission):

View File

@ -4,6 +4,7 @@ from django.db import models
from django.test import TestCase from django.test import TestCase
from rest_framework import generics, status, permissions, authentication, HTTP_HEADER_ENCODING from rest_framework import generics, status, permissions, authentication, HTTP_HEADER_ENCODING
from rest_framework.compat import guardian from rest_framework.compat import guardian
from rest_framework.filters import ObjectPermissionReaderFilter
from rest_framework.test import APIRequestFactory from rest_framework.test import APIRequestFactory
from rest_framework.tests.models import BasicModel from rest_framework.tests.models import BasicModel
import base64 import base64
@ -227,13 +228,11 @@ if guardian:
# Delete # Delete
def test_can_delete_permissions(self): def test_can_delete_permissions(self):
request = factory.delete('/1', HTTP_AUTHORIZATION=self.credentials['deleteonly']) request = factory.delete('/1', HTTP_AUTHORIZATION=self.credentials['deleteonly'])
object_permissions_view.cls.action = 'destroy'
response = object_permissions_view(request, pk='1') response = object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
def test_cannot_delete_permissions(self): def test_cannot_delete_permissions(self):
request = factory.delete('/1', HTTP_AUTHORIZATION=self.credentials['readonly']) request = factory.delete('/1', HTTP_AUTHORIZATION=self.credentials['readonly'])
object_permissions_view.cls.action = 'destroy'
response = object_permissions_view(request, pk='1') response = object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
@ -241,7 +240,6 @@ if guardian:
def test_can_update_permissions(self): def test_can_update_permissions(self):
request = factory.patch('/1', {'text': 'foobar'}, format='json', request = factory.patch('/1', {'text': 'foobar'}, format='json',
HTTP_AUTHORIZATION=self.credentials['writeonly']) HTTP_AUTHORIZATION=self.credentials['writeonly'])
object_permissions_view.cls.action = 'partial_update'
response = object_permissions_view(request, pk='1') response = object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data.get('text'), 'foobar') self.assertEqual(response.data.get('text'), 'foobar')
@ -249,34 +247,31 @@ if guardian:
def test_cannot_update_permissions(self): def test_cannot_update_permissions(self):
request = factory.patch('/1', {'text': 'foobar'}, format='json', request = factory.patch('/1', {'text': 'foobar'}, format='json',
HTTP_AUTHORIZATION=self.credentials['deleteonly']) HTTP_AUTHORIZATION=self.credentials['deleteonly'])
object_permissions_view.cls.action = 'partial_update'
response = object_permissions_view(request, pk='1') response = object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
# Read # Read
def test_can_read_permissions(self): def test_can_read_permissions(self):
request = factory.get('/1', HTTP_AUTHORIZATION=self.credentials['readonly']) request = factory.get('/1', HTTP_AUTHORIZATION=self.credentials['readonly'])
object_permissions_view.cls.action = 'retrieve'
response = object_permissions_view(request, pk='1') response = object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_cannot_read_permissions(self): def test_cannot_read_permissions(self):
request = factory.get('/1', HTTP_AUTHORIZATION=self.credentials['writeonly']) request = factory.get('/1', HTTP_AUTHORIZATION=self.credentials['writeonly'])
object_permissions_view.cls.action = 'retrieve'
response = object_permissions_view(request, pk='1') response = object_permissions_view(request, pk='1')
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
# Read list # Read list
def test_can_read_list_permissions(self): def test_can_read_list_permissions(self):
request = factory.get('/', HTTP_AUTHORIZATION=self.credentials['readonly']) request = factory.get('/', HTTP_AUTHORIZATION=self.credentials['readonly'])
object_permissions_list_view.cls.action = 'list' object_permissions_list_view.cls.filter_backends = (ObjectPermissionReaderFilter,)
response = object_permissions_list_view(request) response = object_permissions_list_view(request)
self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.data[0].get('id'), 1) self.assertEqual(response.data[0].get('id'), 1)
def test_cannot_read_list_permissions(self): def test_cannot_read_list_permissions(self):
request = factory.get('/', HTTP_AUTHORIZATION=self.credentials['writeonly']) request = factory.get('/', HTTP_AUTHORIZATION=self.credentials['writeonly'])
object_permissions_list_view.cls.action = 'list' object_permissions_list_view.cls.filter_backends = (ObjectPermissionReaderFilter,)
response = object_permissions_list_view(request) response = object_permissions_list_view(request)
self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertListEqual(response.data, []) self.assertListEqual(response.data, [])