Merge pull request #709 from tomchristie/oauth

OAuth support
This commit is contained in:
Tom Christie 2013-03-12 12:27:05 -07:00
commit 40e3fc0eee
11 changed files with 777 additions and 35 deletions

View File

@ -14,6 +14,9 @@ env:
install:
- pip install $DJANGO
- pip install defusedxml==0.3
- "if [[ ${TRAVIS_PYTHON_VERSION::1} != '3' ]]; then pip install oauth2==1.5.211 --use-mirrors; fi"
- "if [[ ${TRAVIS_PYTHON_VERSION::1} != '3' ]]; then pip install django-oauth-plus==2.0 --use-mirrors; fi"
- "if [[ ${TRAVIS_PYTHON_VERSION::1} != '3' ]]; then pip install django-oauth2-provider==0.2.3 --use-mirrors; fi"
- "if [[ ${TRAVIS_PYTHON_VERSION::1} != '3' ]]; then pip install django-filter==0.5.4 --use-mirrors; fi"
- "if [[ ${TRAVIS_PYTHON_VERSION::1} == '3' ]]; then pip install https://github.com/alex/django-filter/tarball/master; fi"
- export PYTHONPATH=.

View File

@ -111,7 +111,7 @@ Unauthenticated responses that are denied permission will result in an `HTTP 401
## TokenAuthentication
This authentication scheme uses a simple token-based HTTP Authentication scheme. Token authentication is appropriate for client-server setups, such as native desktop and mobile clients.
This authentication scheme uses a simple token-based HTTP Authentication scheme. Token authentication is appropriate for client-server setups, such as native desktop and mobile clients.
To use the `TokenAuthentication` scheme, include `rest_framework.authtoken` in your `INSTALLED_APPS` setting:
@ -207,6 +207,97 @@ Unauthenticated responses that are denied permission will result in an `HTTP 403
If you're using an AJAX style API with SessionAuthentication, you'll need to make sure you include a valid CSRF token for any "unsafe" HTTP method calls, such as `PUT`, `PATCH`, `POST` or `DELETE` requests. See the [Django CSRF documentation][csrf-ajax] for more details.
## OAuthAuthentication
This authentication uses [OAuth 1.0a][oauth-1.0a] authentication scheme. OAuth 1.0a provides signature validation which provides a reasonable level of security over plain non-HTTPS connections. However, it may also be considered more complicated than OAuth2, as it requires clients to sign their requests.
This authentication class depends on the optional `django-oauth-plus` and `oauth2` packages. In order to make it work you must install these packages and add `oauth_provider` to your `INSTALLED_APPS`:
INSTALLED_APPS = (
...
`oauth_provider`,
)
Don't forget to run `syncdb` once you've added the package.
python manage.py syncdb
#### Getting started with django-oauth-plus
The OAuthAuthentication class only provides token verification and signature validation for requests. It doesn't provide authorization flow for your clients. You still need to implement your own views for accessing and authorizing tokens.
The `django-oauth-plus` package provides simple foundation for classic 'three-legged' oauth flow. Please refer to [the documentation][django-oauth-plus] for more details.
## OAuth2Authentication
This authentication uses [OAuth 2.0][rfc6749] authentication scheme. OAuth2 is more simple to work with than OAuth1, and provides much better security than simple token authentication. It is an unauthenticated scheme, and requires you to use an HTTPS connection.
This authentication class depends on the optional [django-oauth2-provider][django-oauth2-provider] project. In order to make it work you must install this package and add `provider` and `provider.oauth2` to your `INSTALLED_APPS`:
INSTALLED_APPS = (
...
'provider',
'provider.oauth2',
)
You must also include the following in your root `urls.py` module:
url(r'^oauth2/', include('provider.oauth2.urls', namespace='oauth2')),
Note that the `namespace='oauth2'` argument is required.
Finally, sync your database.
python manage.py syncdb
python manage.py migrate
---
**Note:** If you use `OAuth2Authentication` in production you must ensure that your API is only available over `https` only.
---
#### Getting started with django-oauth2-provider
The `OAuth2Authentication` class only provides token verification for requests. It doesn't provide authorization flow for your clients.
The OAuth 2 authorization flow is taken care by the [django-oauth2-provider][django-oauth2-provider] dependency. A walkthrough is given here, but for more details you should refer to [the documentation][django-oauth2-provider-docs].
To get started:
##### 1. Create a client
You can create a client, either through the shell, or by using the Django admin.
Go to the admin panel and create a new `Provider.Client` entry. It will create the `client_id` and `client_secret` properties for you.
##### 2. Request an access token
To request an access token, submit a `POST` request to the url `/oauth2/access_token` with the following fields:
* `client_id` the client id you've just configured at the previous step.
* `client_secret` again configured at the previous step.
* `username` the username with which you want to log in.
* `password` well, that speaks for itself.
You can use the command line to test that your local configuration is working:
curl -X POST -d "client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&grant_type=password&username=YOUR_USERNAME&password=YOUR_PASSWORD" http://localhost:8000/oauth2/access_token/
You should get a response that looks something like this:
{"access_token": "<your-access-token>", "scope": "read", "expires_in": 86399, "refresh_token": "<your-refresh-token>"}
##### 3. Access the API
The only thing needed to make the `OAuth2Authentication` class work is to insert the `access_token` you've received in the `Authorization` request header.
The command line to test the authentication looks like:
curl -H "Authorization: Bearer <your-access-token>" http://localhost:8000/api/?client_id=YOUR_CLIENT_ID\&client_secret=YOUR_CLIENT_SECRET
---
# Custom authentication
To implement a custom authentication scheme, subclass `BaseAuthentication` and override the `.authenticate(self, request)` method. The method should return a two-tuple of `(user, auth)` if authentication succeeds, or `None` otherwise.
@ -262,3 +353,8 @@ HTTP digest authentication is a widely implemented scheme that was intended to r
[south-dependencies]: http://south.readthedocs.org/en/latest/dependencies.html
[juanriaza]: https://github.com/juanriaza
[djangorestframework-digestauth]: https://github.com/juanriaza/django-rest-framework-digestauth
[oauth-1.0a]: http://oauth.net/core/1.0a
[django-oauth-plus]: http://code.larlet.fr/django-oauth-plus
[django-oauth2-provider]: https://github.com/caffeinehit/django-oauth2-provider
[django-oauth2-provider-docs]: https://django-oauth2-provider.readthedocs.org/en/latest/
[rfc6749]: http://tools.ietf.org/html/rfc6749

View File

@ -36,6 +36,10 @@ The following packages are optional:
* [PyYAML][yaml] (3.10+) - YAML content-type support.
* [defusedxml][defusedxml] (0.3+) - XML content-type support.
* [django-filter][django-filter] (0.5.4+) - Filtering support.
* [django-oauth-plus][django-oauth-plus] (2.0+) and [oauth2][oauth2] (1.5.211+) - OAuth 1.0a support.
* [django-oauth2-provider][django-oauth2-provider] (0.2.3+) - OAuth 2.0 support.
**Note**: The `oauth2` python package is badly misnamed, and actually provides OAuth 1.0a support. Also note that packages required for both OAuth 1.0a, and OAuth 2.0 are not yet Python 3 compatible.
## Installation
@ -180,6 +184,9 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
[yaml]: http://pypi.python.org/pypi/PyYAML
[defusedxml]: https://pypi.python.org/pypi/defusedxml
[django-filter]: http://pypi.python.org/pypi/django-filter
[oauth2]: https://github.com/simplegeo/python-oauth2
[django-oauth-plus]: https://bitbucket.org/david/django-oauth-plus/wiki/Home
[django-oauth2-provider]: https://github.com/caffeinehit/django-oauth2-provider
[0.4]: https://github.com/tomchristie/django-rest-framework/tree/0.4.X
[image]: img/quickstart.png
[sandbox]: http://restframework.herokuapp.com/

View File

@ -110,6 +110,7 @@ The following people have helped make REST framework great.
* Jonas Braun - [iekadou]
* Ian Dash - [bitmonkey]
* Bouke Haarsma - [bouke]
* Pierre Dulac - [dulaccc]
Many thanks to everyone who's contributed to the project.
@ -254,4 +255,4 @@ You can also contact [@_tomchristie][twitter] directly on twitter.
[iekadou]: https://github.com/iekadou
[bitmonkey]: https://github.com/bitmonkey
[bouke]: https://github.com/bouke
[dulaccc]: https://github.com/dulaccc

View File

@ -2,3 +2,6 @@ markdown>=2.1.0
PyYAML>=3.10
defusedxml>=0.3
django-filter>=0.5.4
django-oauth-plus>=2.0
oauth2>=1.5.211
django-oauth2-provider>=0.2.3

View File

@ -3,13 +3,28 @@ Provides a set of pluggable authentication policies.
"""
from __future__ import unicode_literals
from django.contrib.auth import authenticate
from django.utils.encoding import DjangoUnicodeDecodeError
from django.core.exceptions import ImproperlyConfigured
from rest_framework import exceptions, HTTP_HEADER_ENCODING
from rest_framework.compat import CsrfViewMiddleware
from rest_framework.compat import oauth, oauth_provider, oauth_provider_store
from rest_framework.compat import oauth2_provider, oauth2_provider_forms, oauth2_provider_backends
from rest_framework.authtoken.models import Token
import base64
def get_authorization_header(request):
"""
Return request's 'Authorization:' header, as a bytestring.
Hide some test client ickyness where the header can be unicode.
"""
auth = request.META.get('HTTP_AUTHORIZATION', b'')
if type(auth) == type(''):
# Work around django test client oddness
auth = auth.encode(HTTP_HEADER_ENCODING)
return auth
class BaseAuthentication(object):
"""
All authentication classes should extend BaseAuthentication.
@ -41,28 +56,25 @@ class BasicAuthentication(BaseAuthentication):
Returns a `User` if a correct username and password have been supplied
using HTTP Basic authentication. Otherwise returns `None`.
"""
auth = request.META.get('HTTP_AUTHORIZATION', b'')
if type(auth) == type(''):
# Work around django test client oddness
auth = auth.encode(HTTP_HEADER_ENCODING)
auth = auth.split()
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'basic':
return None
if len(auth) != 2:
raise exceptions.AuthenticationFailed('Invalid basic header')
if len(auth) == 1:
msg = 'Invalid basic header. No credentials provided.'
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = 'Invalid basic header. Credentials string should not contain spaces.'
raise exceptions.AuthenticationFailed(msg)
try:
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
except (TypeError, UnicodeDecodeError):
raise exceptions.AuthenticationFailed('Invalid basic header')
try:
userid, password = auth_parts[0], auth_parts[2]
except DjangoUnicodeDecodeError:
raise exceptions.AuthenticationFailed('Invalid basic header')
msg = 'Invalid basic header. Credentials not correctly base64 encoded'
raise exceptions.AuthenticationFailed(msg)
userid, password = auth_parts[0], auth_parts[2]
return self.authenticate_credentials(userid, password)
def authenticate_credentials(self, userid, password):
@ -70,9 +82,9 @@ class BasicAuthentication(BaseAuthentication):
Authenticate the userid and password against username and password.
"""
user = authenticate(username=userid, password=password)
if user is not None and user.is_active:
return (user, None)
raise exceptions.AuthenticationFailed('Invalid username/password')
if user is None or not user.is_active:
raise exceptions.AuthenticationFailed('Invalid username/password')
return (user, None)
def authenticate_header(self, request):
return 'Basic realm="%s"' % self.www_authenticate_realm
@ -131,13 +143,17 @@ class TokenAuthentication(BaseAuthentication):
"""
def authenticate(self, request):
auth = request.META.get('HTTP_AUTHORIZATION', '').split()
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != "token":
if not auth or auth[0].lower() != b'token':
return None
if len(auth) != 2:
raise exceptions.AuthenticationFailed('Invalid token header')
if len(auth) == 1:
msg = 'Invalid token header. No credentials provided.'
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = 'Invalid token header. Token string should not contain spaces.'
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(auth[1])
@ -147,12 +163,179 @@ class TokenAuthentication(BaseAuthentication):
except self.model.DoesNotExist:
raise exceptions.AuthenticationFailed('Invalid token')
if token.user.is_active:
return (token.user, token)
raise exceptions.AuthenticationFailed('User inactive or deleted')
if not token.user.is_active:
raise exceptions.AuthenticationFailed('User inactive or deleted')
return (token.user, token)
def authenticate_header(self, request):
return 'Token'
# TODO: OAuthAuthentication
class OAuthAuthentication(BaseAuthentication):
"""
OAuth 1.0a authentication backend using `django-oauth-plus` and `oauth2`.
Note: The `oauth2` package actually provides oauth1.0a support. Urg.
We import it from the `compat` module as `oauth`.
"""
www_authenticate_realm = 'api'
def __init__(self, *args, **kwargs):
super(OAuthAuthentication, self).__init__(*args, **kwargs)
if oauth is None:
raise ImproperlyConfigured(
"The 'oauth2' package could not be imported."
"It is required for use with the 'OAuthAuthentication' class.")
if oauth_provider is None:
raise ImproperlyConfigured(
"The 'django-oauth-plus' package could not be imported."
"It is required for use with the 'OAuthAuthentication' class.")
def authenticate(self, request):
"""
Returns two-tuple of (user, token) if authentication succeeds,
or None otherwise.
"""
try:
oauth_request = oauth_provider.utils.get_oauth_request(request)
except oauth.Error as err:
raise exceptions.AuthenticationFailed(err.message)
oauth_params = oauth_provider.consts.OAUTH_PARAMETERS_NAMES
found = any(param for param in oauth_params if param in oauth_request)
missing = list(param for param in oauth_params if param not in oauth_request)
if not found:
# OAuth authentication was not attempted.
return None
if missing:
# OAuth was attempted but missing parameters.
msg = 'Missing parameters: %s' % (', '.join(missing))
raise exceptions.AuthenticationFailed(msg)
if not self.check_nonce(request, oauth_request):
msg = 'Nonce check failed'
raise exceptions.AuthenticationFailed(msg)
try:
consumer_key = oauth_request.get_parameter('oauth_consumer_key')
consumer = oauth_provider_store.get_consumer(request, oauth_request, consumer_key)
except oauth_provider_store.InvalidConsumerError as err:
raise exceptions.AuthenticationFailed(err)
if consumer.status != oauth_provider.consts.ACCEPTED:
msg = 'Invalid consumer key status: %s' % consumer.get_status_display()
raise exceptions.AuthenticationFailed(msg)
try:
token_param = oauth_request.get_parameter('oauth_token')
token = oauth_provider_store.get_access_token(request, oauth_request, consumer, token_param)
except oauth_provider_store.InvalidTokenError:
msg = 'Invalid access token: %s' % oauth_request.get_parameter('oauth_token')
raise exceptions.AuthenticationFailed(msg)
try:
self.validate_token(request, consumer, token)
except oauth.Error as err:
raise exceptions.AuthenticationFailed(err.message)
user = token.user
if not user.is_active:
msg = 'User inactive or deleted: %s' % user.username
raise exceptions.AuthenticationFailed(msg)
return (token.user, token)
def authenticate_header(self, request):
"""
If permission is denied, return a '401 Unauthorized' response,
with an appropraite 'WWW-Authenticate' header.
"""
return 'OAuth realm="%s"' % self.www_authenticate_realm
def validate_token(self, request, consumer, token):
"""
Check the token and raise an `oauth.Error` exception if invalid.
"""
oauth_server, oauth_request = oauth_provider.utils.initialize_server_request(request)
oauth_server.verify_request(oauth_request, consumer, token)
def check_nonce(self, request, oauth_request):
"""
Checks nonce of request, and return True if valid.
"""
return oauth_provider_store.check_nonce(request, oauth_request, oauth_request['oauth_nonce'])
class OAuth2Authentication(BaseAuthentication):
"""
OAuth 2 authentication backend using `django-oauth2-provider`
"""
www_authenticate_realm = 'api'
def __init__(self, *args, **kwargs):
super(OAuth2Authentication, self).__init__(*args, **kwargs)
if oauth2_provider is None:
raise ImproperlyConfigured(
"The 'django-oauth2-provider' package could not be imported. "
"It is required for use with the 'OAuth2Authentication' class.")
def authenticate(self, request):
"""
Returns two-tuple of (user, token) if authentication succeeds,
or None otherwise.
"""
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != b'bearer':
return None
if len(auth) == 1:
msg = 'Invalid bearer header. No credentials provided.'
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = 'Invalid bearer header. Token string should not contain spaces.'
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(request, auth[1])
def authenticate_credentials(self, request, access_token):
"""
Authenticate the request, given the access token.
"""
# Authenticate the client
oauth2_client_form = oauth2_provider_forms.ClientAuthForm(request.REQUEST)
if not oauth2_client_form.is_valid():
raise exceptions.AuthenticationFailed('Client could not be validated')
client = oauth2_client_form.cleaned_data.get('client')
# Retrieve the `OAuth2AccessToken` instance from the access_token
auth_backend = oauth2_provider_backends.AccessTokenBackend()
token = auth_backend.authenticate(access_token, client)
if token is None:
raise exceptions.AuthenticationFailed('Invalid token')
user = token.user
if not user.is_active:
msg = 'User inactive or deleted: %s' % user.username
raise exceptions.AuthenticationFailed(msg)
return (token.user, token)
def authenticate_header(self, request):
"""
Bearer is the only finalized type currently
Check details on the `OAuth2Authentication.authenticate` method
"""
return 'Bearer realm="%s"' % self.www_authenticate_realm

View File

@ -426,3 +426,34 @@ try:
import defusedxml.ElementTree as etree
except ImportError:
etree = None
# OAuth is optional
try:
# Note: The `oauth2` package actually provides oauth1.0a support. Urg.
import oauth2 as oauth
except ImportError:
oauth = None
# OAuth is optional
try:
import oauth_provider
from oauth_provider.store import store as oauth_provider_store
except ImportError:
oauth_provider = None
oauth_provider_store = None
# OAuth 2 support is optional
try:
import provider.oauth2 as oauth2_provider
from provider.oauth2 import backends as oauth2_provider_backends
from provider.oauth2 import models as oauth2_provider_models
from provider.oauth2 import forms as oauth2_provider_forms
from provider import scope as oauth2_provider_scope
from provider import constants as oauth2_constants
except ImportError:
oauth2_provider = None
oauth2_provider_backends = None
oauth2_provider_models = None
oauth2_provider_forms = None
oauth2_provider_scope = None
oauth2_constants = None

View File

@ -7,6 +7,8 @@ import warnings
SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS']
from rest_framework.compat import oauth2_provider_scope, oauth2_constants
class BasePermission(object):
"""
@ -132,3 +134,26 @@ class DjangoModelPermissions(BasePermission):
request.user.has_perms(perms)):
return True
return False
class TokenHasReadWriteScope(BasePermission):
"""
The request is authenticated as a user and the token used has the right scope
"""
def has_permission(self, request, view):
token = request.auth
read_only = request.method in SAFE_METHODS
if not token:
return False
if hasattr(token, 'resource'): # OAuth 1
return read_only or not request.auth.resource.is_readonly
elif hasattr(token, 'scope'): # OAuth 2
required = oauth2_constants.READ if read_only else oauth2_constants.WRITE
return oauth2_provider_scope.check(required, request.auth.scope)
else:
assert False, ('TokenHasReadWriteScope requires either the'
'`OAuthAuthentication` or `OAuth2Authentication` authentication '
'class to be used.')

View File

@ -97,9 +97,30 @@ INSTALLED_APPS = (
# 'django.contrib.admindocs',
'rest_framework',
'rest_framework.authtoken',
'rest_framework.tests'
'rest_framework.tests',
)
# OAuth is optional and won't work if there is no oauth_provider & oauth2
try:
import oauth_provider
import oauth2
except ImportError:
pass
else:
INSTALLED_APPS += (
'oauth_provider',
)
try:
import provider
except ImportError:
pass
else:
INSTALLED_APPS += (
'provider',
'provider.oauth2',
)
STATIC_URL = '/static/'
PASSWORD_HASHERS = (

View File

@ -2,23 +2,29 @@ from __future__ import unicode_literals
from django.contrib.auth.models import User
from django.http import HttpResponse
from django.test import Client, TestCase
from django.utils import unittest
from rest_framework import HTTP_HEADER_ENCODING
from rest_framework import exceptions
from rest_framework import permissions
from rest_framework import status
from rest_framework.authtoken.models import Token
from rest_framework.authentication import (
BaseAuthentication,
TokenAuthentication,
BasicAuthentication,
SessionAuthentication
SessionAuthentication,
OAuthAuthentication,
OAuth2Authentication
)
from rest_framework.compat import patterns
from rest_framework.authtoken.models import Token
from rest_framework.compat import patterns, url, include
from rest_framework.compat import oauth2_provider, oauth2_provider_models, oauth2_provider_scope
from rest_framework.compat import oauth, oauth_provider
from rest_framework.tests.utils import RequestFactory
from rest_framework.views import APIView
import json
import base64
import time
import datetime
factory = RequestFactory()
@ -41,8 +47,19 @@ urlpatterns = patterns('',
(r'^basic/$', MockView.as_view(authentication_classes=[BasicAuthentication])),
(r'^token/$', MockView.as_view(authentication_classes=[TokenAuthentication])),
(r'^auth-token/$', 'rest_framework.authtoken.views.obtain_auth_token'),
(r'^oauth/$', MockView.as_view(authentication_classes=[OAuthAuthentication])),
(r'^oauth-with-scope/$', MockView.as_view(authentication_classes=[OAuthAuthentication],
permission_classes=[permissions.TokenHasReadWriteScope]))
)
if oauth2_provider is not None:
urlpatterns += patterns('',
url(r'^oauth2/', include('provider.oauth2.urls', namespace='oauth2')),
url(r'^oauth2-test/$', MockView.as_view(authentication_classes=[OAuth2Authentication])),
url(r'^oauth2-with-scope-test/$', MockView.as_view(authentication_classes=[OAuth2Authentication],
permission_classes=[permissions.TokenHasReadWriteScope])),
)
class BasicAuthTests(TestCase):
"""Basic authentication"""
@ -146,7 +163,7 @@ class TokenAuthTests(TestCase):
def test_post_form_passing_token_auth(self):
"""Ensure POSTing json over token auth with correct credentials passes and does not require CSRF"""
auth = "Token " + self.key
auth = 'Token ' + self.key
response = self.csrf_client.post('/token/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, status.HTTP_200_OK)
@ -222,3 +239,339 @@ class IncorrectCredentialsTests(TestCase):
response = view(request)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertEqual(response.data, {'detail': 'Bad credentials'})
class OAuthTests(TestCase):
"""OAuth 1.0a authentication"""
urls = 'rest_framework.tests.authentication'
def setUp(self):
# these imports are here because oauth is optional and hiding them in try..except block or compat
# could obscure problems if something breaks
from oauth_provider.models import Consumer, Resource
from oauth_provider.models import Token as OAuthToken
from oauth_provider import consts
self.consts = consts
self.csrf_client = Client(enforce_csrf_checks=True)
self.username = 'john'
self.email = 'lennon@thebeatles.com'
self.password = 'password'
self.user = User.objects.create_user(self.username, self.email, self.password)
self.CONSUMER_KEY = 'consumer_key'
self.CONSUMER_SECRET = 'consumer_secret'
self.TOKEN_KEY = "token_key"
self.TOKEN_SECRET = "token_secret"
self.consumer = Consumer.objects.create(key=self.CONSUMER_KEY, secret=self.CONSUMER_SECRET,
name='example', user=self.user, status=self.consts.ACCEPTED)
self.resource = Resource.objects.create(name="resource name", url="api/")
self.token = OAuthToken.objects.create(user=self.user, consumer=self.consumer, resource=self.resource,
token_type=OAuthToken.ACCESS, key=self.TOKEN_KEY, secret=self.TOKEN_SECRET, is_approved=True
)
def _create_authorization_header(self):
params = {
'oauth_version': "1.0",
'oauth_nonce': oauth.generate_nonce(),
'oauth_timestamp': int(time.time()),
'oauth_token': self.token.key,
'oauth_consumer_key': self.consumer.key
}
req = oauth.Request(method="GET", url="http://example.com", parameters=params)
signature_method = oauth.SignatureMethod_PLAINTEXT()
req.sign_request(signature_method, self.consumer, self.token)
return req.to_header()["Authorization"]
def _create_authorization_url_parameters(self):
params = {
'oauth_version': "1.0",
'oauth_nonce': oauth.generate_nonce(),
'oauth_timestamp': int(time.time()),
'oauth_token': self.token.key,
'oauth_consumer_key': self.consumer.key
}
req = oauth.Request(method="GET", url="http://example.com", parameters=params)
signature_method = oauth.SignatureMethod_PLAINTEXT()
req.sign_request(signature_method, self.consumer, self.token)
return dict(req)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_passing_oauth(self):
"""Ensure POSTing form over OAuth with correct credentials passes and does not require CSRF"""
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_repeated_nonce_failing_oauth(self):
"""Ensure POSTing form over OAuth with repeated auth (same nonces and timestamp) credentials fails"""
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
# simulate reply attack auth header containes already used (nonce, timestamp) pair
response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_token_removed_failing_oauth(self):
"""Ensure POSTing when there is no OAuth access token in db fails"""
self.token.delete()
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_consumer_status_not_accepted_failing_oauth(self):
"""Ensure POSTing when consumer status is anything other than ACCEPTED fails"""
for consumer_status in (self.consts.CANCELED, self.consts.PENDING, self.consts.REJECTED):
self.consumer.status = consumer_status
self.consumer.save()
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_with_request_token_failing_oauth(self):
"""Ensure POSTing with unauthorized request token instead of access token fails"""
self.token.token_type = self.token.REQUEST
self.token.save()
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_with_urlencoded_parameters(self):
"""Ensure POSTing with x-www-form-urlencoded auth parameters passes"""
params = self._create_authorization_url_parameters()
response = self.csrf_client.post('/oauth/', params)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_get_form_with_url_parameters(self):
"""Ensure GETing with auth in url parameters passes"""
params = self._create_authorization_url_parameters()
response = self.csrf_client.get('/oauth/', params)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_hmac_sha1_signature_passes(self):
"""Ensure POSTing using HMAC_SHA1 signature method passes"""
params = {
'oauth_version': "1.0",
'oauth_nonce': oauth.generate_nonce(),
'oauth_timestamp': int(time.time()),
'oauth_token': self.token.key,
'oauth_consumer_key': self.consumer.key
}
req = oauth.Request(method="POST", url="http://testserver/oauth/", parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
req.sign_request(signature_method, self.consumer, self.token)
auth = req.to_header()["Authorization"]
response = self.csrf_client.post('/oauth/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_get_form_with_readonly_resource_passing_auth(self):
"""Ensure POSTing with a readonly resource instead of a write scope fails"""
read_only_access_token = self.token
read_only_access_token.resource.is_readonly = True
read_only_access_token.resource.save()
params = self._create_authorization_url_parameters()
response = self.csrf_client.get('/oauth-with-scope/', params)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_with_readonly_resource_failing_auth(self):
"""Ensure POSTing with a readonly resource instead of a write scope fails"""
read_only_access_token = self.token
read_only_access_token.resource.is_readonly = True
read_only_access_token.resource.save()
params = self._create_authorization_url_parameters()
response = self.csrf_client.post('/oauth-with-scope/', params)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_with_write_resource_passing_auth(self):
"""Ensure POSTing with a write resource succeed"""
read_write_access_token = self.token
read_write_access_token.resource.is_readonly = False
read_write_access_token.resource.save()
params = self._create_authorization_url_parameters()
response = self.csrf_client.post('/oauth-with-scope/', params)
self.assertEqual(response.status_code, 200)
class OAuth2Tests(TestCase):
"""OAuth 2.0 authentication"""
urls = 'rest_framework.tests.authentication'
def setUp(self):
self.csrf_client = Client(enforce_csrf_checks=True)
self.username = 'john'
self.email = 'lennon@thebeatles.com'
self.password = 'password'
self.user = User.objects.create_user(self.username, self.email, self.password)
self.CLIENT_ID = 'client_key'
self.CLIENT_SECRET = 'client_secret'
self.ACCESS_TOKEN = "access_token"
self.REFRESH_TOKEN = "refresh_token"
self.oauth2_client = oauth2_provider_models.Client.objects.create(
client_id=self.CLIENT_ID,
client_secret=self.CLIENT_SECRET,
redirect_uri='',
client_type=0,
name='example',
user=None,
)
self.access_token = oauth2_provider_models.AccessToken.objects.create(
token=self.ACCESS_TOKEN,
client=self.oauth2_client,
user=self.user,
)
self.refresh_token = oauth2_provider_models.RefreshToken.objects.create(
user=self.user,
access_token=self.access_token,
client=self.oauth2_client
)
def _create_authorization_header(self, token=None):
return "Bearer {0}".format(token or self.access_token.token)
def _client_credentials_params(self):
return {'client_id': self.CLIENT_ID, 'client_secret': self.CLIENT_SECRET}
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_with_wrong_authorization_header_token_type_failing(self):
"""Ensure that a wrong token type lead to the correct HTTP error status code"""
auth = "Wrong token-type-obsviously"
response = self.csrf_client.get('/oauth2-test/', {}, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
params = self._client_credentials_params()
response = self.csrf_client.get('/oauth2-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_with_wrong_authorization_header_token_format_failing(self):
"""Ensure that a wrong token format lead to the correct HTTP error status code"""
auth = "Bearer wrong token format"
response = self.csrf_client.get('/oauth2-test/', {}, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
params = self._client_credentials_params()
response = self.csrf_client.get('/oauth2-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_with_wrong_authorization_header_token_failing(self):
"""Ensure that a wrong token lead to the correct HTTP error status code"""
auth = "Bearer wrong-token"
response = self.csrf_client.get('/oauth2-test/', {}, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
params = self._client_credentials_params()
response = self.csrf_client.get('/oauth2-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_with_wrong_client_data_failing_auth(self):
"""Ensure GETing form over OAuth with incorrect client credentials fails"""
auth = self._create_authorization_header()
params = self._client_credentials_params()
params['client_id'] += 'a'
response = self.csrf_client.get('/oauth2-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_passing_auth(self):
"""Ensure GETing form over OAuth with correct client credentials succeed"""
auth = self._create_authorization_header()
params = self._client_credentials_params()
response = self.csrf_client.get('/oauth2-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_passing_auth(self):
"""Ensure POSTing form over OAuth with correct credentials passes and does not require CSRF"""
auth = self._create_authorization_header()
params = self._client_credentials_params()
response = self.csrf_client.post('/oauth2-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_token_removed_failing_auth(self):
"""Ensure POSTing when there is no OAuth access token in db fails"""
self.access_token.delete()
auth = self._create_authorization_header()
params = self._client_credentials_params()
response = self.csrf_client.post('/oauth2-test/', params, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_with_refresh_token_failing_auth(self):
"""Ensure POSTing with refresh token instead of access token fails"""
auth = self._create_authorization_header(token=self.refresh_token.token)
params = self._client_credentials_params()
response = self.csrf_client.post('/oauth2-test/', params, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_with_expired_access_token_failing_auth(self):
"""Ensure POSTing with expired access token fails with an 'Invalid token' error"""
self.access_token.expires = datetime.datetime.now() - datetime.timedelta(seconds=10) # 10 seconds late
self.access_token.save()
auth = self._create_authorization_header()
params = self._client_credentials_params()
response = self.csrf_client.post('/oauth2-test/', params, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
self.assertIn('Invalid token', response.content)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_with_invalid_scope_failing_auth(self):
"""Ensure POSTing with a readonly scope instead of a write scope fails"""
read_only_access_token = self.access_token
read_only_access_token.scope = oauth2_provider_scope.SCOPE_NAME_DICT['read']
read_only_access_token.save()
auth = self._create_authorization_header(token=read_only_access_token.token)
params = self._client_credentials_params()
response = self.csrf_client.get('/oauth2-with-scope-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
response = self.csrf_client.post('/oauth2-with-scope-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_with_valid_scope_passing_auth(self):
"""Ensure POSTing with a write scope succeed"""
read_write_access_token = self.access_token
read_write_access_token.scope = oauth2_provider_scope.SCOPE_NAME_DICT['write']
read_write_access_token.save()
auth = self._create_authorization_header(token=read_write_access_token.token)
params = self._client_credentials_params()
response = self.csrf_client.post('/oauth2-with-scope-test/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)

23
tox.ini
View File

@ -8,46 +8,65 @@ commands = {envpython} rest_framework/runtests/runtests.py
[testenv:py3.3-django1.5]
basepython = python3.3
deps = django==1.5
https://github.com/alex/django-filter/archive/master.tar.gz
-egit+git://github.com/alex/django-filter.git#egg=django_filter
defusedxml==0.3
[testenv:py3.2-django1.5]
basepython = python3.2
deps = django==1.5
https://github.com/alex/django-filter/archive/master.tar.gz
-egit+git://github.com/alex/django-filter.git#egg=django_filter
defusedxml==0.3
[testenv:py2.7-django1.5]
basepython = python2.7
deps = django==1.5
django-filter==0.5.4
defusedxml==0.3
django-oauth-plus==2.0
oauth2==1.5.211
django-oauth2-provider==0.2.3
[testenv:py2.6-django1.5]
basepython = python2.6
deps = django==1.5
django-filter==0.5.4
defusedxml==0.3
django-oauth-plus==2.0
oauth2==1.5.211
django-oauth2-provider==0.2.3
[testenv:py2.7-django1.4]
basepython = python2.7
deps = django==1.4.3
django-filter==0.5.4
defusedxml==0.3
django-oauth-plus==2.0
oauth2==1.5.211
django-oauth2-provider==0.2.3
[testenv:py2.6-django1.4]
basepython = python2.6
deps = django==1.4.3
django-filter==0.5.4
defusedxml==0.3
django-oauth-plus==2.0
oauth2==1.5.211
django-oauth2-provider==0.2.3
[testenv:py2.7-django1.3]
basepython = python2.7
deps = django==1.3.5
django-filter==0.5.4
defusedxml==0.3
django-oauth-plus==2.0
oauth2==1.5.211
django-oauth2-provider==0.2.3
[testenv:py2.6-django1.3]
basepython = python2.6
deps = django==1.3.5
django-filter==0.5.4
defusedxml==0.3
django-oauth-plus==2.0
oauth2==1.5.211
django-oauth2-provider==0.2.3