Merge pull request #721 from dulaccc/token-scope-permission

Token scope permission class
This commit is contained in:
Tom Christie 2013-03-12 11:47:35 -07:00
commit 12ac357559
3 changed files with 97 additions and 2 deletions

View File

@ -453,9 +453,11 @@ try:
from provider.oauth2 import backends as oauth2_provider_backends from provider.oauth2 import backends as oauth2_provider_backends
from provider.oauth2 import models as oauth2_provider_models from provider.oauth2 import models as oauth2_provider_models
from provider.oauth2 import forms as oauth2_provider_forms from provider.oauth2 import forms as oauth2_provider_forms
from provider import scope as oauth2_provider_scope
except ImportError: except ImportError:
oauth2_provider = None oauth2_provider = None
oauth2_provider_backends = None oauth2_provider_backends = None
oauth2_provider_models = None oauth2_provider_models = None
oauth2_provider_forms = None oauth2_provider_forms = None
oauth2_provider_scope = None

View File

@ -7,6 +7,8 @@ import warnings
SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS'] SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS']
from rest_framework.compat import oauth2_provider_scope
class BasePermission(object): class BasePermission(object):
""" """
@ -125,3 +127,33 @@ class DjangoModelPermissions(BasePermission):
request.user.has_perms(perms)): request.user.has_perms(perms)):
return True return True
return False return False
class TokenHasReadWriteScope(BasePermission):
"""
The request is authenticated as a user and the token used has the right scope
"""
def has_permission(self, request, view):
if not request.auth:
return False
read_only = request.method in SAFE_METHODS
if hasattr(request.auth, 'resource'): # oauth 1
if read_only:
return True
elif request.auth.resource.is_readonly is False:
return True
return False
elif hasattr(request.auth, 'scope'): # oauth 2
scope_valid = lambda scope_wanted_key, scope_had: oauth2_provider_scope.check(
oauth2_provider_scope.SCOPE_NAME_DICT[scope_wanted_key], scope_had)
if read_only and scope_valid('read', request.auth.scope):
return True
elif scope_valid('write', request.auth.scope):
return True
return False
else:
# Improperly configured!
pass

View File

@ -17,7 +17,7 @@ from rest_framework.authentication import (
) )
from rest_framework.authtoken.models import Token from rest_framework.authtoken.models import Token
from rest_framework.compat import patterns, url, include from rest_framework.compat import patterns, url, include
from rest_framework.compat import oauth2_provider, oauth2_provider_models from rest_framework.compat import oauth2_provider, oauth2_provider_models, oauth2_provider_scope
from rest_framework.compat import oauth, oauth_provider from rest_framework.compat import oauth, oauth_provider
from rest_framework.tests.utils import RequestFactory from rest_framework.tests.utils import RequestFactory
from rest_framework.views import APIView from rest_framework.views import APIView
@ -47,13 +47,17 @@ urlpatterns = patterns('',
(r'^basic/$', MockView.as_view(authentication_classes=[BasicAuthentication])), (r'^basic/$', MockView.as_view(authentication_classes=[BasicAuthentication])),
(r'^token/$', MockView.as_view(authentication_classes=[TokenAuthentication])), (r'^token/$', MockView.as_view(authentication_classes=[TokenAuthentication])),
(r'^auth-token/$', 'rest_framework.authtoken.views.obtain_auth_token'), (r'^auth-token/$', 'rest_framework.authtoken.views.obtain_auth_token'),
(r'^oauth/$', MockView.as_view(authentication_classes=[OAuthAuthentication])) (r'^oauth/$', MockView.as_view(authentication_classes=[OAuthAuthentication])),
(r'^oauth-with-scope/$', MockView.as_view(authentication_classes=[OAuthAuthentication],
permission_classes=[permissions.TokenHasReadWriteScope]))
) )
if oauth2_provider is not None: if oauth2_provider is not None:
urlpatterns += patterns('', urlpatterns += patterns('',
url(r'^oauth2/', include('provider.oauth2.urls', namespace='oauth2')), url(r'^oauth2/', include('provider.oauth2.urls', namespace='oauth2')),
url(r'^oauth2-test/$', MockView.as_view(authentication_classes=[OAuth2Authentication])), url(r'^oauth2-test/$', MockView.as_view(authentication_classes=[OAuth2Authentication])),
url(r'^oauth2-with-scope-test/$', MockView.as_view(authentication_classes=[OAuth2Authentication],
permission_classes=[permissions.TokenHasReadWriteScope])),
) )
@ -389,6 +393,39 @@ class OAuthTests(TestCase):
response = self.csrf_client.post('/oauth/', HTTP_AUTHORIZATION=auth) response = self.csrf_client.post('/oauth/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_get_form_with_readonly_resource_passing_auth(self):
"""Ensure POSTing with a readonly resource instead of a write scope fails"""
read_only_access_token = self.token
read_only_access_token.resource.is_readonly = True
read_only_access_token.resource.save()
params = self._create_authorization_url_parameters()
response = self.csrf_client.get('/oauth-with-scope/', params)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_with_readonly_resource_failing_auth(self):
"""Ensure POSTing with a readonly resource instead of a write scope fails"""
read_only_access_token = self.token
read_only_access_token.resource.is_readonly = True
read_only_access_token.resource.save()
params = self._create_authorization_url_parameters()
response = self.csrf_client.post('/oauth-with-scope/', params)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_with_write_resource_passing_auth(self):
"""Ensure POSTing with a write resource succeed"""
read_write_access_token = self.token
read_write_access_token.resource.is_readonly = False
read_write_access_token.resource.save()
params = self._create_authorization_url_parameters()
response = self.csrf_client.post('/oauth-with-scope/', params)
self.assertEqual(response.status_code, 200)
class OAuth2Tests(TestCase): class OAuth2Tests(TestCase):
"""OAuth 2.0 authentication""" """OAuth 2.0 authentication"""
@ -514,3 +551,27 @@ class OAuth2Tests(TestCase):
response = self.csrf_client.post('/oauth2-test/', params, HTTP_AUTHORIZATION=auth) response = self.csrf_client.post('/oauth2-test/', params, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN)) self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
self.assertIn('Invalid token', response.content) self.assertIn('Invalid token', response.content)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_with_invalid_scope_failing_auth(self):
"""Ensure POSTing with a readonly scope instead of a write scope fails"""
read_only_access_token = self.access_token
read_only_access_token.scope = oauth2_provider_scope.SCOPE_NAME_DICT['read']
read_only_access_token.save()
auth = self._create_authorization_header(token=read_only_access_token.token)
params = self._client_credentials_params()
response = self.csrf_client.get('/oauth2-with-scope-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
response = self.csrf_client.post('/oauth2-with-scope-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_with_valid_scope_passing_auth(self):
"""Ensure POSTing with a write scope succeed"""
read_write_access_token = self.access_token
read_write_access_token.scope = oauth2_provider_scope.SCOPE_NAME_DICT['write']
read_write_access_token.save()
auth = self._create_authorization_header(token=read_write_access_token.token)
params = self._client_credentials_params()
response = self.csrf_client.post('/oauth2-with-scope-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)