mirror of
https://github.com/encode/django-rest-framework.git
synced 2024-11-24 18:44:00 +03:00
7921e9af43
Fixes #7417. Fixes all these issues seen with `tox -e py38-django31`: ``` /Users/chainz/Documents/Projects/django-rest-framework/tests/test_request.py:208: RemovedInDjango40Warning: Passing None for the middleware get_response argument is deprecated. SessionMiddleware().process_request(self.wrapped_request) tests/test_requests_client.py: 1 test with warning tests/test_testing.py: 4 tests with warnings tests/test_throttling.py: 1 test with warning tests/authentication/test_authentication.py: 4 tests with warnings tests/browsable_api/test_browsable_api.py: 4 tests with warnings /Users/chainz/Documents/Projects/django-rest-framework/rest_framework/authentication.py:139: RemovedInDjango40Warning: Passing None for the middleware get_response argument is deprecated. check = CSRFCheck() ```
233 lines
7.5 KiB
Python
233 lines
7.5 KiB
Python
"""
|
|
Provides various authentication policies.
|
|
"""
|
|
import base64
|
|
import binascii
|
|
|
|
from django.contrib.auth import authenticate, get_user_model
|
|
from django.middleware.csrf import CsrfViewMiddleware
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from rest_framework import HTTP_HEADER_ENCODING, exceptions
|
|
|
|
|
|
def get_authorization_header(request):
|
|
"""
|
|
Return request's 'Authorization:' header, as a bytestring.
|
|
|
|
Hide some test client ickyness where the header can be unicode.
|
|
"""
|
|
auth = request.META.get('HTTP_AUTHORIZATION', b'')
|
|
if isinstance(auth, str):
|
|
# Work around django test client oddness
|
|
auth = auth.encode(HTTP_HEADER_ENCODING)
|
|
return auth
|
|
|
|
|
|
class CSRFCheck(CsrfViewMiddleware):
|
|
def _reject(self, request, reason):
|
|
# Return the failure reason instead of an HttpResponse
|
|
return reason
|
|
|
|
|
|
class BaseAuthentication:
|
|
"""
|
|
All authentication classes should extend BaseAuthentication.
|
|
"""
|
|
|
|
def authenticate(self, request):
|
|
"""
|
|
Authenticate the request and return a two-tuple of (user, token).
|
|
"""
|
|
raise NotImplementedError(".authenticate() must be overridden.")
|
|
|
|
def authenticate_header(self, request):
|
|
"""
|
|
Return a string to be used as the value of the `WWW-Authenticate`
|
|
header in a `401 Unauthenticated` response, or `None` if the
|
|
authentication scheme should return `403 Permission Denied` responses.
|
|
"""
|
|
pass
|
|
|
|
|
|
class BasicAuthentication(BaseAuthentication):
|
|
"""
|
|
HTTP Basic authentication against username/password.
|
|
"""
|
|
www_authenticate_realm = 'api'
|
|
|
|
def authenticate(self, request):
|
|
"""
|
|
Returns a `User` if a correct username and password have been supplied
|
|
using HTTP Basic authentication. Otherwise returns `None`.
|
|
"""
|
|
auth = get_authorization_header(request).split()
|
|
|
|
if not auth or auth[0].lower() != b'basic':
|
|
return None
|
|
|
|
if len(auth) == 1:
|
|
msg = _('Invalid basic header. No credentials provided.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
elif len(auth) > 2:
|
|
msg = _('Invalid basic header. Credentials string should not contain spaces.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
|
|
try:
|
|
try:
|
|
auth_decoded = base64.b64decode(auth[1]).decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
auth_decoded = base64.b64decode(auth[1]).decode('latin-1')
|
|
auth_parts = auth_decoded.partition(':')
|
|
except (TypeError, UnicodeDecodeError, binascii.Error):
|
|
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
|
|
userid, password = auth_parts[0], auth_parts[2]
|
|
return self.authenticate_credentials(userid, password, request)
|
|
|
|
def authenticate_credentials(self, userid, password, request=None):
|
|
"""
|
|
Authenticate the userid and password against username and password
|
|
with optional request for context.
|
|
"""
|
|
credentials = {
|
|
get_user_model().USERNAME_FIELD: userid,
|
|
'password': password
|
|
}
|
|
user = authenticate(request=request, **credentials)
|
|
|
|
if user is None:
|
|
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
|
|
|
|
if not user.is_active:
|
|
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
|
|
|
|
return (user, None)
|
|
|
|
def authenticate_header(self, request):
|
|
return 'Basic realm="%s"' % self.www_authenticate_realm
|
|
|
|
|
|
class SessionAuthentication(BaseAuthentication):
|
|
"""
|
|
Use Django's session framework for authentication.
|
|
"""
|
|
|
|
def authenticate(self, request):
|
|
"""
|
|
Returns a `User` if the request session currently has a logged in user.
|
|
Otherwise returns `None`.
|
|
"""
|
|
|
|
# Get the session-based user from the underlying HttpRequest object
|
|
user = getattr(request._request, 'user', None)
|
|
|
|
# Unauthenticated, CSRF validation not required
|
|
if not user or not user.is_active:
|
|
return None
|
|
|
|
self.enforce_csrf(request)
|
|
|
|
# CSRF passed with authenticated user
|
|
return (user, None)
|
|
|
|
def enforce_csrf(self, request):
|
|
"""
|
|
Enforce CSRF validation for session based authentication.
|
|
"""
|
|
def dummy_get_response(request): # pragma: no cover
|
|
return None
|
|
|
|
check = CSRFCheck(dummy_get_response)
|
|
# populates request.META['CSRF_COOKIE'], which is used in process_view()
|
|
check.process_request(request)
|
|
reason = check.process_view(request, None, (), {})
|
|
if reason:
|
|
# CSRF failed, bail with explicit error message
|
|
raise exceptions.PermissionDenied('CSRF Failed: %s' % reason)
|
|
|
|
|
|
class TokenAuthentication(BaseAuthentication):
|
|
"""
|
|
Simple token based authentication.
|
|
|
|
Clients should authenticate by passing the token key in the "Authorization"
|
|
HTTP header, prepended with the string "Token ". For example:
|
|
|
|
Authorization: Token 401f7ac837da42b97f613d789819ff93537bee6a
|
|
"""
|
|
|
|
keyword = 'Token'
|
|
model = None
|
|
|
|
def get_model(self):
|
|
if self.model is not None:
|
|
return self.model
|
|
from rest_framework.authtoken.models import Token
|
|
return Token
|
|
|
|
"""
|
|
A custom token model may be used, but must have the following properties.
|
|
|
|
* key -- The string identifying the token
|
|
* user -- The user to which the token belongs
|
|
"""
|
|
|
|
def authenticate(self, request):
|
|
auth = get_authorization_header(request).split()
|
|
|
|
if not auth or auth[0].lower() != self.keyword.lower().encode():
|
|
return None
|
|
|
|
if len(auth) == 1:
|
|
msg = _('Invalid token header. No credentials provided.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
elif len(auth) > 2:
|
|
msg = _('Invalid token header. Token string should not contain spaces.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
|
|
try:
|
|
token = auth[1].decode()
|
|
except UnicodeError:
|
|
msg = _('Invalid token header. Token string should not contain invalid characters.')
|
|
raise exceptions.AuthenticationFailed(msg)
|
|
|
|
return self.authenticate_credentials(token)
|
|
|
|
def authenticate_credentials(self, key):
|
|
model = self.get_model()
|
|
try:
|
|
token = model.objects.select_related('user').get(key=key)
|
|
except model.DoesNotExist:
|
|
raise exceptions.AuthenticationFailed(_('Invalid token.'))
|
|
|
|
if not token.user.is_active:
|
|
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
|
|
|
|
return (token.user, token)
|
|
|
|
def authenticate_header(self, request):
|
|
return self.keyword
|
|
|
|
|
|
class RemoteUserAuthentication(BaseAuthentication):
|
|
"""
|
|
REMOTE_USER authentication.
|
|
|
|
To use this, set up your web server to perform authentication, which will
|
|
set the REMOTE_USER environment variable. You will need to have
|
|
'django.contrib.auth.backends.RemoteUserBackend in your
|
|
AUTHENTICATION_BACKENDS setting
|
|
"""
|
|
|
|
# Name of request header to grab username from. This will be the key as
|
|
# used in the request.META dictionary, i.e. the normalization of headers to
|
|
# all uppercase and the addition of "HTTP_" prefix apply.
|
|
header = "REMOTE_USER"
|
|
|
|
def authenticate(self, request):
|
|
user = authenticate(remote_user=request.META.get(self.header))
|
|
if user and user.is_active:
|
|
return (user, None)
|