mirror of
https://github.com/encode/django-rest-framework.git
synced 2024-11-25 19:14:01 +03:00
ffdac0d936
This allows subclassing TokenAuthentication and setting custom keyword, thus allowing the Authorization header to be for example: Bearer 956e252a-513c-48c5-92dd-bfddc364e812 It doesn't change the behavior of TokenAuthentication itself, it simply allows to reuse the logic of TokenAuthentication without the need of copy pasting the class and changing one hardcoded string. Related: #4080
204 lines
6.4 KiB
Python
204 lines
6.4 KiB
Python
"""
|
|
Provides various authentication policies.
|
|
"""
|
|
from __future__ import unicode_literals
|
|
|
|
import base64
|
|
import binascii
|
|
|
|
from django.contrib.auth import authenticate, get_user_model
|
|
from django.middleware.csrf import CsrfViewMiddleware
|
|
from django.utils.six import text_type
|
|
from django.utils.translation import ugettext_lazy as _
|
|
|
|
from rest_framework import HTTP_HEADER_ENCODING, exceptions
|
|
|
|
|
|
def get_authorization_header(request):
|
|
"""
|
|
Return request's 'Authorization:' header, as a bytestring.
|
|
|
|
Hide some test client ickyness where the header can be unicode.
|
|
"""
|
|
auth = request.META.get('HTTP_AUTHORIZATION', b'')
|
|
if isinstance(auth, text_type):
|
|
# Work around django test client oddness
|
|
auth = auth.encode(HTTP_HEADER_ENCODING)
|
|
return auth
|
|
|
|
|
|
class CSRFCheck(CsrfViewMiddleware):
|
|
def _reject(self, request, reason):
|
|
# Return the failure reason instead of an HttpResponse
|
|
return reason
|
|
|
|
|
|
class BaseAuthentication(object):
|
|
"""
|
|
All authentication classes should extend BaseAuthentication.
|
|
"""
|
|
|
|
def authenticate(self, request):
|
|
"""
|
|
Authenticate the request and return a two-tuple of (user, token).
|
|
"""
|
|
raise NotImplementedError(".authenticate() must be overridden.")
|
|
|
|
def authenticate_header(self, request):
|
|
"""
|
|
Return a string to be used as the value of the `WWW-Authenticate`
|
|
header in a `401 Unauthenticated` response, or `None` if the
|
|
authentication scheme should return `403 Permission Denied` responses.
|
|
"""
|
|
pass
|
|
|
|
|
|
class BasicAuthentication(BaseAuthentication):
|
|
"""
|
|
HTTP Basic authentication against username/password.
|
|
"""
|
|
www_authenticate_realm = 'api'
|
|
|
|
def authenticate(self, request):
|
|
"""
|
|
Returns a `User` if a correct username and password have been supplied
|
|
using HTTP Basic authentication. Otherwise returns `None`.
|
|
"""
|
|
auth = get_authorization_header(request).split()
|
|
|
|
if not auth or auth[0].lower() != b'basic':
|
|
return None
|
|
|
|
if len(auth) == 1:
|
|
msg = _('Invalid basic header. No credentials provided.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
elif len(auth) > 2:
|
|
msg = _('Invalid basic header. Credentials string should not contain spaces.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
|
|
try:
|
|
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
|
|
except (TypeError, UnicodeDecodeError, binascii.Error):
|
|
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
|
|
userid, password = auth_parts[0], auth_parts[2]
|
|
return self.authenticate_credentials(userid, password)
|
|
|
|
def authenticate_credentials(self, userid, password):
|
|
"""
|
|
Authenticate the userid and password against username and password.
|
|
"""
|
|
credentials = {
|
|
get_user_model().USERNAME_FIELD: userid,
|
|
'password': password
|
|
}
|
|
user = authenticate(**credentials)
|
|
|
|
if user is None:
|
|
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
|
|
|
|
if not user.is_active:
|
|
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
|
|
|
|
return (user, None)
|
|
|
|
def authenticate_header(self, request):
|
|
return 'Basic realm="%s"' % self.www_authenticate_realm
|
|
|
|
|
|
class SessionAuthentication(BaseAuthentication):
|
|
"""
|
|
Use Django's session framework for authentication.
|
|
"""
|
|
|
|
def authenticate(self, request):
|
|
"""
|
|
Returns a `User` if the request session currently has a logged in user.
|
|
Otherwise returns `None`.
|
|
"""
|
|
|
|
# Get the session-based user from the underlying HttpRequest object
|
|
user = getattr(request._request, 'user', None)
|
|
|
|
# Unauthenticated, CSRF validation not required
|
|
if not user or not user.is_active:
|
|
return None
|
|
|
|
self.enforce_csrf(request)
|
|
|
|
# CSRF passed with authenticated user
|
|
return (user, None)
|
|
|
|
def enforce_csrf(self, request):
|
|
"""
|
|
Enforce CSRF validation for session based authentication.
|
|
"""
|
|
reason = CSRFCheck().process_view(request, None, (), {})
|
|
if reason:
|
|
# CSRF failed, bail with explicit error message
|
|
raise exceptions.PermissionDenied('CSRF Failed: %s' % reason)
|
|
|
|
|
|
class TokenAuthentication(BaseAuthentication):
|
|
"""
|
|
Simple token based authentication.
|
|
|
|
Clients should authenticate by passing the token key in the "Authorization"
|
|
HTTP header, prepended with the string "Token ". For example:
|
|
|
|
Authorization: Token 401f7ac837da42b97f613d789819ff93537bee6a
|
|
"""
|
|
|
|
keyword = 'Token'
|
|
model = None
|
|
|
|
def get_model(self):
|
|
if self.model is not None:
|
|
return self.model
|
|
from rest_framework.authtoken.models import Token
|
|
return Token
|
|
|
|
"""
|
|
A custom token model may be used, but must have the following properties.
|
|
|
|
* key -- The string identifying the token
|
|
* user -- The user to which the token belongs
|
|
"""
|
|
|
|
def authenticate(self, request):
|
|
auth = get_authorization_header(request).split()
|
|
|
|
if not auth or auth[0].lower() != self.keyword.lower().encode():
|
|
return None
|
|
|
|
if len(auth) == 1:
|
|
msg = _('Invalid token header. No credentials provided.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
elif len(auth) > 2:
|
|
msg = _('Invalid token header. Token string should not contain spaces.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
|
|
try:
|
|
token = auth[1].decode()
|
|
except UnicodeError:
|
|
msg = _('Invalid token header. Token string should not contain invalid characters.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
|
|
return self.authenticate_credentials(token)
|
|
|
|
def authenticate_credentials(self, key):
|
|
model = self.get_model()
|
|
try:
|
|
token = model.objects.select_related('user').get(key=key)
|
|
except model.DoesNotExist:
|
|
raise exceptions.AuthenticationFailed(_('Invalid token.'))
|
|
|
|
if not token.user.is_active:
|
|
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
|
|
|
|
return (token.user, token)
|
|
|
|
def authenticate_header(self, request):
|
|
return self.keyword
|