Moved OAuth support out of DRF and into a separate package, per #1767

This commit is contained in:
Jharrod LaFon 2014-09-05 15:30:01 -07:00
parent f4e02446f9
commit baa518cd89
12 changed files with 6 additions and 862 deletions

View File

@ -20,9 +20,6 @@ install:
- pip install django-guardian==1.2.3
- pip install pytest-django==2.6.1
- pip install flake8==2.2.2
- "if [[ ${TRAVIS_PYTHON_VERSION::1} != '3' ]]; then pip install oauth2==1.5.211; fi"
- "if [[ ${TRAVIS_PYTHON_VERSION::1} != '3' ]]; then pip install django-oauth-plus==2.2.4; fi"
- "if [[ ${TRAVIS_PYTHON_VERSION::1} != '3' ]]; then pip install django-oauth2-provider==0.2.4; fi"
- "if [[ ${DJANGO::11} == 'django==1.3' ]]; then pip install django-filter==0.5.4; fi"
- "if [[ ${DJANGO::11} != 'django==1.3' ]]; then pip install django-filter==0.7; fi"
- "if [[ ${DJANGO} == 'django==1.7' ]]; then pip install -e git+https://github.com/linovia/django-guardian.git@feature/django_1_7#egg=django-guardian-1.2.0; fi"

View File

@ -13,7 +13,7 @@ Django REST framework is a powerful and flexible toolkit for building Web APIs.
Some reasons you might want to use REST framework:
* The [Web browseable API][sandbox] is a huge useability win for your developers.
* [Authentication policies][authentication] including [OAuth1a][oauth1-section] and [OAuth2][oauth2-section] out of the box.
* [Authentication policies][authentication] including [OAuth1a][oauth1-section] and [OAuth2][oauth2-section] through the rest-framework-oauth package.
* [Serialization][serializers] that supports both [ORM][modelserializer-section] and [non-ORM][serializer-section] data sources.
* Customizable all the way down - just use [regular function-based views][functionview-section] if you don't need the [more][generic-views] [powerful][viewsets] [features][routers].
* [Extensive documentation][index], and [great community support][group].

View File

@ -247,105 +247,6 @@ Unauthenticated responses that are denied permission will result in an `HTTP 403
If you're using an AJAX style API with SessionAuthentication, you'll need to make sure you include a valid CSRF token for any "unsafe" HTTP method calls, such as `PUT`, `PATCH`, `POST` or `DELETE` requests. See the [Django CSRF documentation][csrf-ajax] for more details.
## OAuthAuthentication
This authentication uses [OAuth 1.0a][oauth-1.0a] authentication scheme. OAuth 1.0a provides signature validation which provides a reasonable level of security over plain non-HTTPS connections. However, it may also be considered more complicated than OAuth2, as it requires clients to sign their requests.
This authentication class depends on the optional `django-oauth-plus` and `oauth2` packages. In order to make it work you must install these packages and add `oauth_provider` to your `INSTALLED_APPS`:
INSTALLED_APPS = (
...
`oauth_provider`,
)
Don't forget to run `syncdb` once you've added the package.
python manage.py syncdb
#### Getting started with django-oauth-plus
The OAuthAuthentication class only provides token verification and signature validation for requests. It doesn't provide authorization flow for your clients. You still need to implement your own views for accessing and authorizing tokens.
The `django-oauth-plus` package provides simple foundation for classic 'three-legged' oauth flow. Please refer to [the documentation][django-oauth-plus] for more details.
## OAuth2Authentication
This authentication uses [OAuth 2.0][rfc6749] authentication scheme. OAuth2 is more simple to work with than OAuth1, and provides much better security than simple token authentication. It is an unauthenticated scheme, and requires you to use an HTTPS connection.
This authentication class depends on the optional [django-oauth2-provider][django-oauth2-provider] project. In order to make it work you must install this package and add `provider` and `provider.oauth2` to your `INSTALLED_APPS`:
INSTALLED_APPS = (
...
'provider',
'provider.oauth2',
)
Then add `OAuth2Authentication` to your global `DEFAULT_AUTHENTICATION` setting:
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework.authentication.OAuth2Authentication',
),
You must also include the following in your root `urls.py` module:
url(r'^oauth2/', include('provider.oauth2.urls', namespace='oauth2')),
Note that the `namespace='oauth2'` argument is required.
Finally, sync your database.
python manage.py syncdb
python manage.py migrate
---
**Note:** If you use `OAuth2Authentication` in production you must ensure that your API is only available over `https`.
---
#### Getting started with django-oauth2-provider
The `OAuth2Authentication` class only provides token verification for requests. It doesn't provide authorization flow for your clients.
The OAuth 2 authorization flow is taken care by the [django-oauth2-provider][django-oauth2-provider] dependency. A walkthrough is given here, but for more details you should refer to [the documentation][django-oauth2-provider-docs].
To get started:
##### 1. Create a client
You can create a client, either through the shell, or by using the Django admin.
Go to the admin panel and create a new `Provider.Client` entry. It will create the `client_id` and `client_secret` properties for you.
##### 2. Request an access token
To request an access token, submit a `POST` request to the url `/oauth2/access_token` with the following fields:
* `client_id` the client id you've just configured at the previous step.
* `client_secret` again configured at the previous step.
* `username` the username with which you want to log in.
* `password` well, that speaks for itself.
You can use the command line to test that your local configuration is working:
curl -X POST -d "client_id=YOUR_CLIENT_ID&client_secret=YOUR_CLIENT_SECRET&grant_type=password&username=YOUR_USERNAME&password=YOUR_PASSWORD" http://localhost:8000/oauth2/access_token/
You should get a response that looks something like this:
{"access_token": "<your-access-token>", "scope": "read", "expires_in": 86399, "refresh_token": "<your-refresh-token>"}
##### 3. Access the API
The only thing needed to make the `OAuth2Authentication` class work is to insert the `access_token` you've received in the `Authorization` request header.
The command line to test the authentication looks like:
curl -H "Authorization: Bearer <your-access-token>" http://localhost:8000/api/
### Alternative OAuth 2 implementations
Note that [Django OAuth Toolkit][django-oauth-toolkit] is an alternative external package that also includes OAuth 2.0 support for REST framework.
---
# Custom authentication

View File

@ -148,21 +148,6 @@ Note that `DjangoObjectPermissions` **does not** require the `django-guardian` p
As with `DjangoModelPermissions` you can use custom model permissions by overriding `DjangoModelPermissions` and setting the `.perms_map` property. Refer to the source code for details. Note that if you add a custom `view` permission for `GET`, `HEAD` and `OPTIONS` requests, you'll probably also want to consider adding the `DjangoObjectPermissionsFilter` class to ensure that list endpoints only return results including objects for which the user has appropriate view permissions.
## TokenHasReadWriteScope
This permission class is intended for use with either of the `OAuthAuthentication` and `OAuth2Authentication` classes, and ties into the scoping that their backends provide.
Requests with a safe methods of `GET`, `OPTIONS` or `HEAD` will be allowed if the authenticated token has read permission.
Requests for `POST`, `PUT`, `PATCH` and `DELETE` will be allowed if the authenticated token has write permission.
This permission class relies on the implementations of the [django-oauth-plus][django-oauth-plus] and [django-oauth2-provider][django-oauth2-provider] libraries, which both provide limited support for controlling the scope of access tokens:
* `django-oauth-plus`: Tokens are associated with a `Resource` class which has a `name`, `url` and `is_readonly` properties.
* `django-oauth2-provider`: Tokens are associated with a bitwise `scope` attribute, that defaults to providing bitwise values for `read` and/or `write`.
If you require more advanced scoping for your API, such as restricting tokens to accessing a subset of functionality of your API then you will need to provide a custom permission class. See the source of the `django-oauth-plus` or `django-oauth2-provider` package for more details on scoping token access.
---
# Custom permissions
@ -254,8 +239,6 @@ The [REST Condition][rest-condition] package is another extension for building c
[objectpermissions]: https://docs.djangoproject.com/en/dev/topics/auth/customizing/#handling-object-permissions
[guardian]: https://github.com/lukaszb/django-guardian
[get_objects_for_user]: http://pythonhosted.org/django-guardian/api/guardian.shortcuts.html#get-objects-for-user
[django-oauth-plus]: http://code.larlet.fr/django-oauth-plus
[django-oauth2-provider]: https://github.com/caffeinehit/django-oauth2-provider
[2.2-announcement]: ../topics/2.2-announcement.md
[filtering]: filtering.md
[drf-any-permissions]: https://github.com/kevin-brown/drf-any-permissions

View File

@ -31,7 +31,7 @@ Django REST framework is a powerful and flexible toolkit that makes it easy to b
Some reasons you might want to use REST framework:
* The [Web browseable API][sandbox] is a huge usability win for your developers.
* [Authentication policies][authentication] including [OAuth1a][oauth1-section] and [OAuth2][oauth2-section] out of the box.
* [Authentication policies][authentication] including [OAuth1a][oauth1-section] and [OAuth2][oauth2-section] through the rest-framework-oauth package.
* [Serialization][serializers] that supports both [ORM][modelserializer-section] and [non-ORM][serializer-section] data sources.
* Customizable all the way down - just use [regular function-based views][functionview-section] if you don't need the [more][generic-views] [powerful][viewsets] [features][routers].
* [Extensive documentation][index], and [great community support][group].
@ -58,12 +58,9 @@ The following packages are optional:
* [PyYAML][yaml] (3.10+) - YAML content-type support.
* [defusedxml][defusedxml] (0.3+) - XML content-type support.
* [django-filter][django-filter] (0.5.4+) - Filtering support.
* [django-oauth-plus][django-oauth-plus] (2.0+) and [oauth2][oauth2] (1.5.211+) - OAuth 1.0a support.
* [django-oauth2-provider][django-oauth2-provider] (0.2.3+) - OAuth 2.0 support.
* [django-restframework-oauth][django-restframework-oauth] package for OAuth 1.0a and 2.0 support.
* [django-guardian][django-guardian] (1.1.1+) - Object level permissions support.
**Note**: The `oauth2` Python package is badly misnamed, and actually provides OAuth 1.0a support. Also note that packages required for both OAuth 1.0a, and OAuth 2.0 are not yet Python 3 compatible.
## Installation
Install using `pip`, including any optional packages you want...
@ -260,9 +257,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
[yaml]: http://pypi.python.org/pypi/PyYAML
[defusedxml]: https://pypi.python.org/pypi/defusedxml
[django-filter]: http://pypi.python.org/pypi/django-filter
[oauth2]: https://github.com/simplegeo/python-oauth2
[django-oauth-plus]: https://bitbucket.org/david/django-oauth-plus/wiki/Home
[django-oauth2-provider]: https://github.com/caffeinehit/django-oauth2-provider
[django-restframework-oauth]: https://github.com/jlafon/django-rest-framework-oauth
[django-guardian]: https://github.com/lukaszb/django-guardian
[0.4]: https://github.com/tomchristie/django-rest-framework/tree/0.4.X
[image]: img/quickstart.png

View File

@ -9,7 +9,4 @@ markdown>=2.1.0
PyYAML>=3.10
defusedxml>=0.3
django-filter>=0.5.4
django-oauth-plus>=2.2.1
oauth2>=1.5.211
django-oauth2-provider>=0.2.4
Pillow==2.3.0

View File

@ -3,14 +3,9 @@ Provides various authentication policies.
"""
from __future__ import unicode_literals
import base64
from django.contrib.auth import authenticate
from django.core.exceptions import ImproperlyConfigured
from django.middleware.csrf import CsrfViewMiddleware
from django.conf import settings
from rest_framework import exceptions, HTTP_HEADER_ENCODING
from rest_framework.compat import oauth, oauth_provider, oauth_provider_store
from rest_framework.compat import oauth2_provider, provider_now, check_nonce
from rest_framework.authtoken.models import Token
@ -178,181 +173,3 @@ class TokenAuthentication(BaseAuthentication):
def authenticate_header(self, request):
return 'Token'
class OAuthAuthentication(BaseAuthentication):
"""
OAuth 1.0a authentication backend using `django-oauth-plus` and `oauth2`.
Note: The `oauth2` package actually provides oauth1.0a support. Urg.
We import it from the `compat` module as `oauth`.
"""
www_authenticate_realm = 'api'
def __init__(self, *args, **kwargs):
super(OAuthAuthentication, self).__init__(*args, **kwargs)
if oauth is None:
raise ImproperlyConfigured(
"The 'oauth2' package could not be imported."
"It is required for use with the 'OAuthAuthentication' class.")
if oauth_provider is None:
raise ImproperlyConfigured(
"The 'django-oauth-plus' package could not be imported."
"It is required for use with the 'OAuthAuthentication' class.")
def authenticate(self, request):
"""
Returns two-tuple of (user, token) if authentication succeeds,
or None otherwise.
"""
try:
oauth_request = oauth_provider.utils.get_oauth_request(request)
except oauth.Error as err:
raise exceptions.AuthenticationFailed(err.message)
if not oauth_request:
return None
oauth_params = oauth_provider.consts.OAUTH_PARAMETERS_NAMES
found = any(param for param in oauth_params if param in oauth_request)
missing = list(param for param in oauth_params if param not in oauth_request)
if not found:
# OAuth authentication was not attempted.
return None
if missing:
# OAuth was attempted but missing parameters.
msg = 'Missing parameters: %s' % (', '.join(missing))
raise exceptions.AuthenticationFailed(msg)
if not self.check_nonce(request, oauth_request):
msg = 'Nonce check failed'
raise exceptions.AuthenticationFailed(msg)
try:
consumer_key = oauth_request.get_parameter('oauth_consumer_key')
consumer = oauth_provider_store.get_consumer(request, oauth_request, consumer_key)
except oauth_provider.store.InvalidConsumerError:
msg = 'Invalid consumer token: %s' % oauth_request.get_parameter('oauth_consumer_key')
raise exceptions.AuthenticationFailed(msg)
if consumer.status != oauth_provider.consts.ACCEPTED:
msg = 'Invalid consumer key status: %s' % consumer.get_status_display()
raise exceptions.AuthenticationFailed(msg)
try:
token_param = oauth_request.get_parameter('oauth_token')
token = oauth_provider_store.get_access_token(request, oauth_request, consumer, token_param)
except oauth_provider.store.InvalidTokenError:
msg = 'Invalid access token: %s' % oauth_request.get_parameter('oauth_token')
raise exceptions.AuthenticationFailed(msg)
try:
self.validate_token(request, consumer, token)
except oauth.Error as err:
raise exceptions.AuthenticationFailed(err.message)
user = token.user
if not user.is_active:
msg = 'User inactive or deleted: %s' % user.username
raise exceptions.AuthenticationFailed(msg)
return (token.user, token)
def authenticate_header(self, request):
"""
If permission is denied, return a '401 Unauthorized' response,
with an appropraite 'WWW-Authenticate' header.
"""
return 'OAuth realm="%s"' % self.www_authenticate_realm
def validate_token(self, request, consumer, token):
"""
Check the token and raise an `oauth.Error` exception if invalid.
"""
oauth_server, oauth_request = oauth_provider.utils.initialize_server_request(request)
oauth_server.verify_request(oauth_request, consumer, token)
def check_nonce(self, request, oauth_request):
"""
Checks nonce of request, and return True if valid.
"""
oauth_nonce = oauth_request['oauth_nonce']
oauth_timestamp = oauth_request['oauth_timestamp']
return check_nonce(request, oauth_request, oauth_nonce, oauth_timestamp)
class OAuth2Authentication(BaseAuthentication):
"""
OAuth 2 authentication backend using `django-oauth2-provider`
"""
www_authenticate_realm = 'api'
allow_query_params_token = settings.DEBUG
def __init__(self, *args, **kwargs):
super(OAuth2Authentication, self).__init__(*args, **kwargs)
if oauth2_provider is None:
raise ImproperlyConfigured(
"The 'django-oauth2-provider' package could not be imported. "
"It is required for use with the 'OAuth2Authentication' class.")
def authenticate(self, request):
"""
Returns two-tuple of (user, token) if authentication succeeds,
or None otherwise.
"""
auth = get_authorization_header(request).split()
if len(auth) == 1:
msg = 'Invalid bearer header. No credentials provided.'
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = 'Invalid bearer header. Token string should not contain spaces.'
raise exceptions.AuthenticationFailed(msg)
if auth and auth[0].lower() == b'bearer':
access_token = auth[1]
elif 'access_token' in request.POST:
access_token = request.POST['access_token']
elif 'access_token' in request.GET and self.allow_query_params_token:
access_token = request.GET['access_token']
else:
return None
return self.authenticate_credentials(request, access_token)
def authenticate_credentials(self, request, access_token):
"""
Authenticate the request, given the access token.
"""
try:
token = oauth2_provider.oauth2.models.AccessToken.objects.select_related('user')
# provider_now switches to timezone aware datetime when
# the oauth2_provider version supports to it.
token = token.get(token=access_token, expires__gt=provider_now())
except oauth2_provider.oauth2.models.AccessToken.DoesNotExist:
raise exceptions.AuthenticationFailed('Invalid token')
user = token.user
if not user.is_active:
msg = 'User inactive or deleted: %s' % user.get_username()
raise exceptions.AuthenticationFailed(msg)
return (user, token)
def authenticate_header(self, request):
"""
Bearer is the only finalized type currently
Check details on the `OAuth2Authentication.authenticate` method
"""
return 'Bearer realm="%s"' % self.www_authenticate_realm

View File

@ -182,57 +182,6 @@ except ImportError:
etree = None
# OAuth2 is optional
try:
# Note: The `oauth2` package actually provides oauth1.0a support. Urg.
import oauth2 as oauth
except ImportError:
oauth = None
# OAuthProvider is optional
try:
import oauth_provider
from oauth_provider.store import store as oauth_provider_store
# check_nonce's calling signature in django-oauth-plus changes sometime
# between versions 2.0 and 2.2.1
def check_nonce(request, oauth_request, oauth_nonce, oauth_timestamp):
check_nonce_args = inspect.getargspec(oauth_provider_store.check_nonce).args
if 'timestamp' in check_nonce_args:
return oauth_provider_store.check_nonce(
request, oauth_request, oauth_nonce, oauth_timestamp
)
return oauth_provider_store.check_nonce(
request, oauth_request, oauth_nonce
)
except (ImportError, ImproperlyConfigured):
oauth_provider = None
oauth_provider_store = None
check_nonce = None
# OAuth 2 support is optional
try:
import provider as oauth2_provider
from provider import scope as oauth2_provider_scope
from provider import constants as oauth2_constants
if oauth2_provider.__version__ in ('0.2.3', '0.2.4'):
# 0.2.3 and 0.2.4 are supported version that do not support
# timezone aware datetimes
import datetime
provider_now = datetime.datetime.now
else:
# Any other supported version does use timezone aware datetimes
from django.utils.timezone import now as provider_now
except ImportError:
oauth2_provider = None
oauth2_provider_scope = None
oauth2_constants = None
provider_now = None
# Handle lazy strings across Py2/Py3
from django.utils.functional import Promise

View File

@ -3,8 +3,7 @@ Provides a set of pluggable permission policies.
"""
from __future__ import unicode_literals
from django.http import Http404
from rest_framework.compat import (get_model_name, oauth2_provider_scope,
oauth2_constants)
from rest_framework.compat import get_model_name
SAFE_METHODS = ['GET', 'HEAD', 'OPTIONS']
@ -199,28 +198,3 @@ class DjangoObjectPermissions(DjangoModelPermissions):
return False
return True
class TokenHasReadWriteScope(BasePermission):
"""
The request is authenticated as a user and the token used has the right scope
"""
def has_permission(self, request, view):
token = request.auth
read_only = request.method in SAFE_METHODS
if not token:
return False
if hasattr(token, 'resource'): # OAuth 1
return read_only or not request.auth.resource.is_readonly
elif hasattr(token, 'scope'): # OAuth 2
required = oauth2_constants.READ if read_only else oauth2_constants.WRITE
return oauth2_provider_scope.check(required, request.auth.scope)
assert False, (
'TokenHasReadWriteScope requires either the'
'`OAuthAuthentication` or `OAuth2Authentication` authentication '
'class to be used.'
)

View File

@ -47,26 +47,6 @@ def pytest_configure():
),
)
try:
import oauth_provider # NOQA
import oauth2 # NOQA
except ImportError:
pass
else:
settings.INSTALLED_APPS += (
'oauth_provider',
)
try:
import provider # NOQA
except ImportError:
pass
else:
settings.INSTALLED_APPS += (
'provider',
'provider.oauth2',
)
# guardian is optional
try:
import guardian # NOQA

View File

@ -101,27 +101,6 @@ INSTALLED_APPS = (
'tests.users',
)
# OAuth is optional and won't work if there is no oauth_provider & oauth2
try:
import oauth_provider # NOQA
import oauth2 # NOQA
except ImportError:
pass
else:
INSTALLED_APPS += (
'oauth_provider',
)
try:
import provider # NOQA
except ImportError:
pass
else:
INSTALLED_APPS += (
'provider',
'provider.oauth2',
)
# guardian is optional
try:
import guardian # NOQA

View File

@ -3,8 +3,7 @@ from django.conf.urls import patterns, url, include
from django.contrib.auth.models import User
from django.http import HttpResponse
from django.test import TestCase
from django.utils import six, unittest
from django.utils.http import urlencode
from django.utils import six
from rest_framework import HTTP_HEADER_ENCODING
from rest_framework import exceptions
from rest_framework import permissions
@ -16,17 +15,11 @@ from rest_framework.authentication import (
TokenAuthentication,
BasicAuthentication,
SessionAuthentication,
OAuthAuthentication,
OAuth2Authentication
)
from rest_framework.authtoken.models import Token
from rest_framework.compat import oauth2_provider, oauth2_provider_scope
from rest_framework.compat import oauth, oauth_provider
from rest_framework.test import APIRequestFactory, APIClient
from rest_framework.views import APIView
import base64
import time
import datetime
factory = APIRequestFactory()
@ -50,37 +43,10 @@ urlpatterns = patterns(
(r'^basic/$', MockView.as_view(authentication_classes=[BasicAuthentication])),
(r'^token/$', MockView.as_view(authentication_classes=[TokenAuthentication])),
(r'^auth-token/$', 'rest_framework.authtoken.views.obtain_auth_token'),
(r'^oauth/$', MockView.as_view(authentication_classes=[OAuthAuthentication])),
(
r'^oauth-with-scope/$',
MockView.as_view(
authentication_classes=[OAuthAuthentication],
permission_classes=[permissions.TokenHasReadWriteScope]
)
),
url(r'^auth/', include('rest_framework.urls', namespace='rest_framework'))
)
class OAuth2AuthenticationDebug(OAuth2Authentication):
allow_query_params_token = True
if oauth2_provider is not None:
urlpatterns += patterns(
'',
url(r'^oauth2/', include('provider.oauth2.urls', namespace='oauth2')),
url(r'^oauth2-test/$', MockView.as_view(authentication_classes=[OAuth2Authentication])),
url(r'^oauth2-test-debug/$', MockView.as_view(authentication_classes=[OAuth2AuthenticationDebug])),
url(
r'^oauth2-with-scope-test/$',
MockView.as_view(
authentication_classes=[OAuth2Authentication],
permission_classes=[permissions.TokenHasReadWriteScope]
)
)
)
class BasicAuthTests(TestCase):
"""Basic authentication"""
urls = 'tests.test_authentication'
@ -276,400 +242,6 @@ class IncorrectCredentialsTests(TestCase):
self.assertEqual(response.data, {'detail': 'Bad credentials'})
class OAuthTests(TestCase):
"""OAuth 1.0a authentication"""
urls = 'tests.test_authentication'
def setUp(self):
# these imports are here because oauth is optional and hiding them in try..except block or compat
# could obscure problems if something breaks
from oauth_provider.models import Consumer, Scope
from oauth_provider.models import Token as OAuthToken
from oauth_provider import consts
self.consts = consts
self.csrf_client = APIClient(enforce_csrf_checks=True)
self.username = 'john'
self.email = 'lennon@thebeatles.com'
self.password = 'password'
self.user = User.objects.create_user(self.username, self.email, self.password)
self.CONSUMER_KEY = 'consumer_key'
self.CONSUMER_SECRET = 'consumer_secret'
self.TOKEN_KEY = "token_key"
self.TOKEN_SECRET = "token_secret"
self.consumer = Consumer.objects.create(
key=self.CONSUMER_KEY, secret=self.CONSUMER_SECRET,
name='example', user=self.user, status=self.consts.ACCEPTED
)
self.scope = Scope.objects.create(name="resource name", url="api/")
self.token = OAuthToken.objects.create(
user=self.user, consumer=self.consumer, scope=self.scope,
token_type=OAuthToken.ACCESS, key=self.TOKEN_KEY, secret=self.TOKEN_SECRET,
is_approved=True
)
def _create_authorization_header(self):
params = {
'oauth_version': "1.0",
'oauth_nonce': oauth.generate_nonce(),
'oauth_timestamp': int(time.time()),
'oauth_token': self.token.key,
'oauth_consumer_key': self.consumer.key
}
req = oauth.Request(method="GET", url="http://example.com", parameters=params)
signature_method = oauth.SignatureMethod_PLAINTEXT()
req.sign_request(signature_method, self.consumer, self.token)
return req.to_header()["Authorization"]
def _create_authorization_url_parameters(self):
params = {
'oauth_version': "1.0",
'oauth_nonce': oauth.generate_nonce(),
'oauth_timestamp': int(time.time()),
'oauth_token': self.token.key,
'oauth_consumer_key': self.consumer.key
}
req = oauth.Request(method="GET", url="http://example.com", parameters=params)
signature_method = oauth.SignatureMethod_PLAINTEXT()
req.sign_request(signature_method, self.consumer, self.token)
return dict(req)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_passing_oauth(self):
"""Ensure POSTing form over OAuth with correct credentials passes and does not require CSRF"""
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_repeated_nonce_failing_oauth(self):
"""Ensure POSTing form over OAuth with repeated auth (same nonces and timestamp) credentials fails"""
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
# simulate reply attack auth header containes already used (nonce, timestamp) pair
response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_token_removed_failing_oauth(self):
"""Ensure POSTing when there is no OAuth access token in db fails"""
self.token.delete()
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_consumer_status_not_accepted_failing_oauth(self):
"""Ensure POSTing when consumer status is anything other than ACCEPTED fails"""
for consumer_status in (self.consts.CANCELED, self.consts.PENDING, self.consts.REJECTED):
self.consumer.status = consumer_status
self.consumer.save()
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_with_request_token_failing_oauth(self):
"""Ensure POSTing with unauthorized request token instead of access token fails"""
self.token.token_type = self.token.REQUEST
self.token.save()
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth/', {'example': 'example'}, HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_with_urlencoded_parameters(self):
"""Ensure POSTing with x-www-form-urlencoded auth parameters passes"""
params = self._create_authorization_url_parameters()
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_get_form_with_url_parameters(self):
"""Ensure GETing with auth in url parameters passes"""
params = self._create_authorization_url_parameters()
response = self.csrf_client.get('/oauth/', params)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_hmac_sha1_signature_passes(self):
"""Ensure POSTing using HMAC_SHA1 signature method passes"""
params = {
'oauth_version': "1.0",
'oauth_nonce': oauth.generate_nonce(),
'oauth_timestamp': int(time.time()),
'oauth_token': self.token.key,
'oauth_consumer_key': self.consumer.key
}
req = oauth.Request(method="POST", url="http://testserver/oauth/", parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
req.sign_request(signature_method, self.consumer, self.token)
auth = req.to_header()["Authorization"]
response = self.csrf_client.post('/oauth/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_get_form_with_readonly_resource_passing_auth(self):
"""Ensure POSTing with a readonly scope instead of a write scope fails"""
read_only_access_token = self.token
read_only_access_token.scope.is_readonly = True
read_only_access_token.scope.save()
params = self._create_authorization_url_parameters()
response = self.csrf_client.get('/oauth-with-scope/', params)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_with_readonly_resource_failing_auth(self):
"""Ensure POSTing with a readonly resource instead of a write scope fails"""
read_only_access_token = self.token
read_only_access_token.scope.is_readonly = True
read_only_access_token.scope.save()
params = self._create_authorization_url_parameters()
response = self.csrf_client.post('/oauth-with-scope/', params)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_post_form_with_write_resource_passing_auth(self):
"""Ensure POSTing with a write resource succeed"""
read_write_access_token = self.token
read_write_access_token.scope.is_readonly = False
read_write_access_token.scope.save()
params = self._create_authorization_url_parameters()
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth-with-scope/', params, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_bad_consumer_key(self):
"""Ensure POSTing using HMAC_SHA1 signature method passes"""
params = {
'oauth_version': "1.0",
'oauth_nonce': oauth.generate_nonce(),
'oauth_timestamp': int(time.time()),
'oauth_token': self.token.key,
'oauth_consumer_key': 'badconsumerkey'
}
req = oauth.Request(method="POST", url="http://testserver/oauth/", parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
req.sign_request(signature_method, self.consumer, self.token)
auth = req.to_header()["Authorization"]
response = self.csrf_client.post('/oauth/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
@unittest.skipUnless(oauth_provider, 'django-oauth-plus not installed')
@unittest.skipUnless(oauth, 'oauth2 not installed')
def test_bad_token_key(self):
"""Ensure POSTing using HMAC_SHA1 signature method passes"""
params = {
'oauth_version': "1.0",
'oauth_nonce': oauth.generate_nonce(),
'oauth_timestamp': int(time.time()),
'oauth_token': 'badtokenkey',
'oauth_consumer_key': self.consumer.key
}
req = oauth.Request(method="POST", url="http://testserver/oauth/", parameters=params)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
req.sign_request(signature_method, self.consumer, self.token)
auth = req.to_header()["Authorization"]
response = self.csrf_client.post('/oauth/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
class OAuth2Tests(TestCase):
"""OAuth 2.0 authentication"""
urls = 'tests.test_authentication'
def setUp(self):
self.csrf_client = APIClient(enforce_csrf_checks=True)
self.username = 'john'
self.email = 'lennon@thebeatles.com'
self.password = 'password'
self.user = User.objects.create_user(self.username, self.email, self.password)
self.CLIENT_ID = 'client_key'
self.CLIENT_SECRET = 'client_secret'
self.ACCESS_TOKEN = "access_token"
self.REFRESH_TOKEN = "refresh_token"
self.oauth2_client = oauth2_provider.oauth2.models.Client.objects.create(
client_id=self.CLIENT_ID,
client_secret=self.CLIENT_SECRET,
redirect_uri='',
client_type=0,
name='example',
user=None,
)
self.access_token = oauth2_provider.oauth2.models.AccessToken.objects.create(
token=self.ACCESS_TOKEN,
client=self.oauth2_client,
user=self.user,
)
self.refresh_token = oauth2_provider.oauth2.models.RefreshToken.objects.create(
user=self.user,
access_token=self.access_token,
client=self.oauth2_client
)
def _create_authorization_header(self, token=None):
return "Bearer {0}".format(token or self.access_token.token)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_with_wrong_authorization_header_token_type_failing(self):
"""Ensure that a wrong token type lead to the correct HTTP error status code"""
auth = "Wrong token-type-obsviously"
response = self.csrf_client.get('/oauth2-test/', {}, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
response = self.csrf_client.get('/oauth2-test/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_with_wrong_authorization_header_token_format_failing(self):
"""Ensure that a wrong token format lead to the correct HTTP error status code"""
auth = "Bearer wrong token format"
response = self.csrf_client.get('/oauth2-test/', {}, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
response = self.csrf_client.get('/oauth2-test/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_with_wrong_authorization_header_token_failing(self):
"""Ensure that a wrong token lead to the correct HTTP error status code"""
auth = "Bearer wrong-token"
response = self.csrf_client.get('/oauth2-test/', {}, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
response = self.csrf_client.get('/oauth2-test/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_with_wrong_authorization_header_token_missing(self):
"""Ensure that a missing token lead to the correct HTTP error status code"""
auth = "Bearer"
response = self.csrf_client.get('/oauth2-test/', {}, HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
response = self.csrf_client.get('/oauth2-test/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 401)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_passing_auth(self):
"""Ensure GETing form over OAuth with correct client credentials succeed"""
auth = self._create_authorization_header()
response = self.csrf_client.get('/oauth2-test/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_passing_auth_url_transport(self):
"""Ensure GETing form over OAuth with correct client credentials in form data succeed"""
response = self.csrf_client.post(
'/oauth2-test/',
data={'access_token': self.access_token.token}
)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_passing_auth_url_transport(self):
"""Ensure GETing form over OAuth with correct client credentials in query succeed when DEBUG is True"""
query = urlencode({'access_token': self.access_token.token})
response = self.csrf_client.get('/oauth2-test-debug/?%s' % query)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_get_form_failing_auth_url_transport(self):
"""Ensure GETing form over OAuth with correct client credentials in query fails when DEBUG is False"""
query = urlencode({'access_token': self.access_token.token})
response = self.csrf_client.get('/oauth2-test/?%s' % query)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_passing_auth(self):
"""Ensure POSTing form over OAuth with correct credentials passes and does not require CSRF"""
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth2-test/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_token_removed_failing_auth(self):
"""Ensure POSTing when there is no OAuth access token in db fails"""
self.access_token.delete()
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth2-test/', HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_with_refresh_token_failing_auth(self):
"""Ensure POSTing with refresh token instead of access token fails"""
auth = self._create_authorization_header(token=self.refresh_token.token)
response = self.csrf_client.post('/oauth2-test/', HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_with_expired_access_token_failing_auth(self):
"""Ensure POSTing with expired access token fails with an 'Invalid token' error"""
self.access_token.expires = datetime.datetime.now() - datetime.timedelta(seconds=10) # 10 seconds late
self.access_token.save()
auth = self._create_authorization_header()
response = self.csrf_client.post('/oauth2-test/', HTTP_AUTHORIZATION=auth)
self.assertIn(response.status_code, (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN))
self.assertIn('Invalid token', response.content)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_with_invalid_scope_failing_auth(self):
"""Ensure POSTing with a readonly scope instead of a write scope fails"""
read_only_access_token = self.access_token
read_only_access_token.scope = oauth2_provider_scope.SCOPE_NAME_DICT['read']
read_only_access_token.save()
auth = self._create_authorization_header(token=read_only_access_token.token)
response = self.csrf_client.get('/oauth2-with-scope-test/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
response = self.csrf_client.post('/oauth2-with-scope-test/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
@unittest.skipUnless(oauth2_provider, 'django-oauth2-provider not installed')
def test_post_form_with_valid_scope_passing_auth(self):
"""Ensure POSTing with a write scope succeed"""
read_write_access_token = self.access_token
read_write_access_token.scope = oauth2_provider_scope.SCOPE_NAME_DICT['write']
read_write_access_token.save()
auth = self._create_authorization_header(token=read_write_access_token.token)
response = self.csrf_client.post('/oauth2-with-scope-test/', HTTP_AUTHORIZATION=auth)
self.assertEqual(response.status_code, 200)
class FailingAuthAccessedInRenderer(TestCase):
def setUp(self):
class AuthAccessingRenderer(renderers.BaseRenderer):