""" Provides various authentication policies. """ import base64 import binascii from django.contrib.auth import authenticate, get_user_model from django.middleware.csrf import CsrfViewMiddleware from django.utils.translation import gettext_lazy as _ from rest_framework import HTTP_HEADER_ENCODING, exceptions def get_authorization_header(request): """ Return request's 'Authorization:' header, as a bytestring. Hide some test client ickyness where the header can be unicode. """ auth = request.META.get('HTTP_AUTHORIZATION', b'') if isinstance(auth, str): # Work around django test client oddness auth = auth.encode(HTTP_HEADER_ENCODING) return auth class CSRFCheck(CsrfViewMiddleware): def _reject(self, request, reason): # Return the failure reason instead of an HttpResponse return reason class BaseAuthentication: """ All authentication classes should extend BaseAuthentication. """ def authenticate(self, request): """ Authenticate the request and return a two-tuple of (user, token). """ raise NotImplementedError(".authenticate() must be overridden.") def authenticate_header(self, request): """ Return a string to be used as the value of the `WWW-Authenticate` header in a `401 Unauthenticated` response, or `None` if the authentication scheme should return `403 Permission Denied` responses. """ pass class BasicAuthentication(BaseAuthentication): """ HTTP Basic authentication against username/password. """ www_authenticate_realm = 'api' def authenticate(self, request): """ Returns a `User` if a correct username and password have been supplied using HTTP Basic authentication. Otherwise returns `None`. """ auth = get_authorization_header(request).split() if not auth or auth[0].lower() != b'basic': return None if len(auth) == 1: msg = _('Invalid basic header. No credentials provided.') raise exceptions.AuthenticationFailed(msg) elif len(auth) > 2: msg = _('Invalid basic header. Credentials string should not contain spaces.') raise exceptions.AuthenticationFailed(msg) try: try: auth_decoded = base64.b64decode(auth[1]).decode('utf-8') except UnicodeDecodeError: auth_decoded = base64.b64decode(auth[1]).decode('latin-1') auth_parts = auth_decoded.partition(':') except (TypeError, UnicodeDecodeError, binascii.Error): msg = _('Invalid basic header. Credentials not correctly base64 encoded.') raise exceptions.AuthenticationFailed(msg) userid, password = auth_parts[0], auth_parts[2] return self.authenticate_credentials(userid, password, request) def authenticate_credentials(self, userid, password, request=None): """ Authenticate the userid and password against username and password with optional request for context. """ credentials = { get_user_model().USERNAME_FIELD: userid, 'password': password } user = authenticate(request=request, **credentials) if user is None: raise exceptions.AuthenticationFailed(_('Invalid username/password.')) if not user.is_active: raise exceptions.AuthenticationFailed(_('User inactive or deleted.')) return (user, None) def authenticate_header(self, request): return 'Basic realm="%s"' % self.www_authenticate_realm class SessionAuthentication(BaseAuthentication): """ Use Django's session framework for authentication. """ def authenticate(self, request): """ Returns a `User` if the request session currently has a logged in user. Otherwise returns `None`. """ # Get the session-based user from the underlying HttpRequest object user = getattr(request._request, 'user', None) # Unauthenticated, CSRF validation not required if not user or not user.is_active: return None self.enforce_csrf(request) # CSRF passed with authenticated user return (user, None) def enforce_csrf(self, request): """ Enforce CSRF validation for session based authentication. """ def dummy_get_response(request): # pragma: no cover return None check = CSRFCheck(dummy_get_response) # populates request.META['CSRF_COOKIE'], which is used in process_view() check.process_request(request) reason = check.process_view(request, None, (), {}) if reason: # CSRF failed, bail with explicit error message raise exceptions.PermissionDenied('CSRF Failed: %s' % reason) class TokenAuthentication(BaseAuthentication): """ Simple token based authentication. Clients should authenticate by passing the token key in the "Authorization" HTTP header, prepended with the string "Token ". For example: Authorization: Token 401f7ac837da42b97f613d789819ff93537bee6a """ keyword = 'Token' model = None def get_model(self): if self.model is not None: return self.model from rest_framework.authtoken.models import Token return Token """ A custom token model may be used, but must have the following properties. * key -- The string identifying the token * user -- The user to which the token belongs """ def authenticate(self, request): auth = get_authorization_header(request).split() if not auth or auth[0].lower() != self.keyword.lower().encode(): return None if len(auth) == 1: msg = _('Invalid token header. No credentials provided.') raise exceptions.AuthenticationFailed(msg) elif len(auth) > 2: msg = _('Invalid token header. Token string should not contain spaces.') raise exceptions.AuthenticationFailed(msg) try: token = auth[1].decode() except UnicodeError: msg = _('Invalid token header. Token string should not contain invalid characters.') raise exceptions.AuthenticationFailed(msg) return self.authenticate_credentials(token) def authenticate_credentials(self, key): model = self.get_model() try: token = model.objects.select_related('user').get(key=key) except model.DoesNotExist: raise exceptions.AuthenticationFailed(_('Invalid token.')) if not token.user.is_active: raise exceptions.AuthenticationFailed(_('User inactive or deleted.')) return (token.user, token) def authenticate_header(self, request): return self.keyword class RemoteUserAuthentication(BaseAuthentication): """ REMOTE_USER authentication. To use this, set up your web server to perform authentication, which will set the REMOTE_USER environment variable. You will need to have 'django.contrib.auth.backends.RemoteUserBackend in your AUTHENTICATION_BACKENDS setting """ # Name of request header to grab username from. This will be the key as # used in the request.META dictionary, i.e. the normalization of headers to # all uppercase and the addition of "HTTP_" prefix apply. header = "REMOTE_USER" def authenticate(self, request): user = authenticate(request=request, remote_user=request.META.get(self.header)) if user and user.is_active: return (user, None)