django-rest-framework/djangorestframework/tests/oauthentication.py

213 lines
9.8 KiB
Python
Raw Normal View History

import time
from django.conf.urls.defaults import patterns, url, include
from django.contrib.auth.models import User
from django.test import Client, TestCase
from djangorestframework.views import View
2011-06-28 11:53:53 +04:00
# Since oauth2 / django-oauth-plus are optional dependancies, we don't want to
# always run these tests.
# Unfortunatly we can't skip tests easily until 2.7, se we'll just do this for now.
try:
import oauth2 as oauth
from oauth_provider.decorators import oauth_required
from oauth_provider.models import Resource, Consumer, Token
2011-06-30 11:52:55 +04:00
except ImportError:
2011-06-28 11:53:53 +04:00
pass
else:
# Alrighty, we're good to go here.
class ClientView(View):
def get(self, request):
return {'resource': 'Protected!'}
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
urlpatterns = patterns('',
url(r'^$', oauth_required(ClientView.as_view())),
url(r'^oauth/', include('oauth_provider.urls')),
2012-02-25 22:45:17 +04:00
url(r'^restframework/', include('djangorestframework.urls', namespace='djangorestframework')),
2011-06-28 11:53:53 +04:00
)
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
class OAuthTests(TestCase):
"""
OAuth authentication:
* the user would like to access his API data from a third-party website
* the third-party website proposes a link to get that API data
* the user is redirected to the API and must log in if not authenticated
* the API displays a webpage to confirm that the user trusts the third-party website
* if confirmed, the user is redirected to the third-party website through the callback view
* the third-party website is able to retrieve data from the API
"""
urls = 'djangorestframework.tests.oauthentication'
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
def setUp(self):
self.client = Client()
self.username = 'john'
self.email = 'lennon@thebeatles.com'
self.password = 'password'
self.user = User.objects.create_user(self.username, self.email, self.password)
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
# OAuth requirements
self.resource = Resource(name='data', url='/')
self.resource.save()
self.CONSUMER_KEY = 'dpf43f3p2l4k3l03'
self.CONSUMER_SECRET = 'kd94hf93k423kf44'
2011-12-29 17:31:12 +04:00
self.consumer = Consumer(key=self.CONSUMER_KEY, secret=self.CONSUMER_SECRET,
2011-06-28 11:53:53 +04:00
name='api.example.com', user=self.user)
self.consumer.save()
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
def test_oauth_invalid_and_anonymous_access(self):
"""
Verify that the resource is protected and the OAuth authorization view
require the user to be logged in.
"""
response = self.client.get('/')
self.assertEqual(response.content, 'Invalid request parameters.')
self.assertEqual(response.status_code, 401)
response = self.client.get('/oauth/authorize/', follow=True)
self.assertRedirects(response, '/accounts/login/?next=/oauth/authorize/')
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
def test_oauth_authorize_access(self):
"""
2011-12-29 17:31:12 +04:00
Verify that once logged in, the user can access the authorization page
2011-06-28 11:53:53 +04:00
but can't display the page because the request token is not specified.
"""
self.client.login(username=self.username, password=self.password)
response = self.client.get('/oauth/authorize/', follow=True)
self.assertEqual(response.content, 'No request token specified.')
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
def _create_request_token_parameters(self):
"""
A shortcut to create request's token parameters.
"""
return {
'oauth_consumer_key': self.CONSUMER_KEY,
'oauth_signature_method': 'PLAINTEXT',
'oauth_signature': '%s&' % self.CONSUMER_SECRET,
'oauth_timestamp': str(int(time.time())),
'oauth_nonce': 'requestnonce',
'oauth_version': '1.0',
'oauth_callback': 'http://api.example.com/request_token_ready',
'scope': 'data',
}
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
def test_oauth_request_token_retrieval(self):
"""
Verify that the request token can be retrieved by the server.
"""
2011-12-29 17:31:12 +04:00
response = self.client.get("/oauth/request_token/",
2011-06-28 11:53:53 +04:00
self._create_request_token_parameters())
self.assertEqual(response.status_code, 200)
token = list(Token.objects.all())[-1]
self.failIf(token.key not in response.content)
self.failIf(token.secret not in response.content)
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
def test_oauth_user_request_authorization(self):
"""
Verify that the user can access the authorization page once logged in
and the request token has been retrieved.
"""
# Setup
2011-12-29 17:31:12 +04:00
response = self.client.get("/oauth/request_token/",
2011-06-28 11:53:53 +04:00
self._create_request_token_parameters())
token = list(Token.objects.all())[-1]
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
# Starting the test here
self.client.login(username=self.username, password=self.password)
parameters = {'oauth_token': token.key,}
response = self.client.get("/oauth/authorize/", parameters)
self.assertEqual(response.status_code, 200)
self.failIf(not response.content.startswith('Fake authorize view for api.example.com with params: oauth_token='))
self.assertEqual(token.is_approved, 0)
parameters['authorize_access'] = 1 # fake authorization by the user
response = self.client.post("/oauth/authorize/", parameters)
self.assertEqual(response.status_code, 302)
self.failIf(not response['Location'].startswith('http://api.example.com/request_token_ready?oauth_verifier='))
token = Token.objects.get(key=token.key)
self.failIf(token.key not in response['Location'])
self.assertEqual(token.is_approved, 1)
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
def _create_access_token_parameters(self, token):
"""
A shortcut to create access' token parameters.
"""
return {
'oauth_consumer_key': self.CONSUMER_KEY,
'oauth_token': token.key,
'oauth_signature_method': 'PLAINTEXT',
'oauth_signature': '%s&%s' % (self.CONSUMER_SECRET, token.secret),
'oauth_timestamp': str(int(time.time())),
'oauth_nonce': 'accessnonce',
'oauth_version': '1.0',
'oauth_verifier': token.verifier,
'scope': 'data',
}
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
def test_oauth_access_token_retrieval(self):
"""
Verify that the request token can be retrieved by the server.
"""
# Setup
2011-12-29 17:31:12 +04:00
response = self.client.get("/oauth/request_token/",
2011-06-28 11:53:53 +04:00
self._create_request_token_parameters())
token = list(Token.objects.all())[-1]
self.client.login(username=self.username, password=self.password)
parameters = {'oauth_token': token.key,}
response = self.client.get("/oauth/authorize/", parameters)
parameters['authorize_access'] = 1 # fake authorization by the user
response = self.client.post("/oauth/authorize/", parameters)
token = Token.objects.get(key=token.key)
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
# Starting the test here
response = self.client.get("/oauth/access_token/", self._create_access_token_parameters(token))
self.assertEqual(response.status_code, 200)
self.failIf(not response.content.startswith('oauth_token_secret='))
access_token = list(Token.objects.filter(token_type=Token.ACCESS))[-1]
self.failIf(access_token.key not in response.content)
self.failIf(access_token.secret not in response.content)
self.assertEqual(access_token.user.username, 'john')
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
def _create_access_parameters(self, access_token):
"""
A shortcut to create access' parameters.
"""
parameters = {
'oauth_consumer_key': self.CONSUMER_KEY,
'oauth_token': access_token.key,
'oauth_signature_method': 'HMAC-SHA1',
'oauth_timestamp': str(int(time.time())),
'oauth_nonce': 'accessresourcenonce',
'oauth_version': '1.0',
}
oauth_request = oauth.Request.from_token_and_callback(access_token,
http_url='http://testserver/', parameters=parameters)
signature_method = oauth.SignatureMethod_HMAC_SHA1()
signature = signature_method.sign(oauth_request, self.consumer, access_token)
parameters['oauth_signature'] = signature
return parameters
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
def test_oauth_protected_resource_access(self):
"""
Verify that the request token can be retrieved by the server.
"""
# Setup
2011-12-29 17:31:12 +04:00
response = self.client.get("/oauth/request_token/",
2011-06-28 11:53:53 +04:00
self._create_request_token_parameters())
token = list(Token.objects.all())[-1]
self.client.login(username=self.username, password=self.password)
parameters = {'oauth_token': token.key,}
response = self.client.get("/oauth/authorize/", parameters)
parameters['authorize_access'] = 1 # fake authorization by the user
response = self.client.post("/oauth/authorize/", parameters)
token = Token.objects.get(key=token.key)
response = self.client.get("/oauth/access_token/", self._create_access_token_parameters(token))
access_token = list(Token.objects.filter(token_type=Token.ACCESS))[-1]
2011-12-29 17:31:12 +04:00
2011-06-28 11:53:53 +04:00
# Starting the test here
response = self.client.get("/", self._create_access_token_parameters(access_token))
self.assertEqual(response.status_code, 200)
self.assertEqual(response.content, '{"resource": "Protected!"}')