# -- coding: utf-8 -- # Note that we import as `DjangoRequestFactory` and `DjangoClient` in order # to make it harder for the user to import the wrong thing without realizing. from __future__ import unicode_literals import io from django.conf import settings from django.core.exceptions import ImproperlyConfigured from django.core.handlers.wsgi import WSGIHandler from django.test import testcases from django.test.client import Client as DjangoClient from django.test.client import RequestFactory as DjangoRequestFactory from django.test.client import ClientHandler from django.utils import six from django.utils.encoding import force_bytes from django.utils.http import urlencode from rest_framework.compat import coreapi, requests from rest_framework.settings import api_settings def force_authenticate(request, user=None, token=None): request._force_auth_user = user request._force_auth_token = token if requests is not None: class HeaderDict(requests.packages.urllib3._collections.HTTPHeaderDict): def get_all(self, key, default): return self.getheaders(key) class MockOriginalResponse(object): def __init__(self, headers): self.msg = HeaderDict(headers) self.closed = False def isclosed(self): return self.closed def close(self): self.closed = True class DjangoTestAdapter(requests.adapters.HTTPAdapter): """ A transport adapter for `requests`, that makes requests via the Django WSGI app, rather than making actual HTTP requests over the network. """ def __init__(self): self.app = WSGIHandler() self.factory = DjangoRequestFactory() def get_environ(self, request): """ Given a `requests.PreparedRequest` instance, return a WSGI environ dict. """ method = request.method url = request.url kwargs = {} # Set request content, if any exists. if request.body is not None: if hasattr(request.body, 'read'): kwargs['data'] = request.body.read() else: kwargs['data'] = request.body if 'content-type' in request.headers: kwargs['content_type'] = request.headers['content-type'] # Set request headers. for key, value in request.headers.items(): key = key.upper() if key in ('CONNECTION', 'CONTENT-LENGTH', 'CONTENT-TYPE'): continue kwargs['HTTP_%s' % key.replace('-', '_')] = value return self.factory.generic(method, url, **kwargs).environ def send(self, request, *args, **kwargs): """ Make an outgoing request to the Django WSGI application. """ raw_kwargs = {} def start_response(wsgi_status, wsgi_headers): status, _, reason = wsgi_status.partition(' ') raw_kwargs['status'] = int(status) raw_kwargs['reason'] = reason raw_kwargs['headers'] = wsgi_headers raw_kwargs['version'] = 11 raw_kwargs['preload_content'] = False raw_kwargs['original_response'] = MockOriginalResponse(wsgi_headers) # Make the outgoing request via WSGI. environ = self.get_environ(request) wsgi_response = self.app(environ, start_response) # Build the underlying urllib3.HTTPResponse raw_kwargs['body'] = io.BytesIO(b''.join(wsgi_response)) raw = requests.packages.urllib3.HTTPResponse(**raw_kwargs) # Build the requests.Response return self.build_response(request, raw) def close(self): pass class RequestsClient(requests.Session): def __init__(self, *args, **kwargs): super(RequestsClient, self).__init__(*args, **kwargs) adapter = DjangoTestAdapter() self.mount('http://', adapter) self.mount('https://', adapter) def request(self, method, url, *args, **kwargs): if ':' not in url: raise ValueError('Missing "http:" or "https:". Use a fully qualified URL, eg "http://testserver%s"' % url) return super(RequestsClient, self).request(method, url, *args, **kwargs) else: def RequestsClient(*args, **kwargs): raise ImproperlyConfigured('requests must be installed in order to use RequestsClient.') if coreapi is not None: class CoreAPIClient(coreapi.Client): def __init__(self, *args, **kwargs): self._session = RequestsClient() kwargs['transports'] = [coreapi.transports.HTTPTransport(session=self.session)] return super(CoreAPIClient, self).__init__(*args, **kwargs) @property def session(self): return self._session else: def CoreAPIClient(*args, **kwargs): raise ImproperlyConfigured('coreapi must be installed in order to use CoreAPIClient.') class APIRequestFactory(DjangoRequestFactory): renderer_classes_list = api_settings.TEST_REQUEST_RENDERER_CLASSES default_format = api_settings.TEST_REQUEST_DEFAULT_FORMAT def __init__(self, enforce_csrf_checks=False, **defaults): self.enforce_csrf_checks = enforce_csrf_checks self.renderer_classes = {} for cls in self.renderer_classes_list: self.renderer_classes[cls.format] = cls super(APIRequestFactory, self).__init__(**defaults) def _encode_data(self, data, format=None, content_type=None): """ Encode the data returning a two tuple of (bytes, content_type) """ if data is None: return ('', content_type) assert format is None or content_type is None, ( 'You may not set both `format` and `content_type`.' ) if content_type: # Content type specified explicitly, treat data as a raw bytestring ret = force_bytes(data, settings.DEFAULT_CHARSET) else: format = format or self.default_format assert format in self.renderer_classes, ( "Invalid format '{0}'. Available formats are {1}. " "Set TEST_REQUEST_RENDERER_CLASSES to enable " "extra request formats.".format( format, ', '.join(["'" + fmt + "'" for fmt in self.renderer_classes.keys()]) ) ) # Use format and render the data into a bytestring renderer = self.renderer_classes[format]() ret = renderer.render(data) # Determine the content-type header from the renderer content_type = "{0}; charset={1}".format( renderer.media_type, renderer.charset ) # Coerce text to bytes if required. if isinstance(ret, six.text_type): ret = bytes(ret.encode(renderer.charset)) return ret, content_type def get(self, path, data=None, **extra): r = { 'QUERY_STRING': urlencode(data or {}, doseq=True), } if not data and '?' in path: # Fix to support old behavior where you have the arguments in the # url. See #1461. query_string = force_bytes(path.split('?')[1]) if six.PY3: query_string = query_string.decode('iso-8859-1') r['QUERY_STRING'] = query_string r.update(extra) return self.generic('GET', path, **r) def post(self, path, data=None, format=None, content_type=None, **extra): data, content_type = self._encode_data(data, format, content_type) return self.generic('POST', path, data, content_type, **extra) def put(self, path, data=None, format=None, content_type=None, **extra): data, content_type = self._encode_data(data, format, content_type) return self.generic('PUT', path, data, content_type, **extra) def patch(self, path, data=None, format=None, content_type=None, **extra): data, content_type = self._encode_data(data, format, content_type) return self.generic('PATCH', path, data, content_type, **extra) def delete(self, path, data=None, format=None, content_type=None, **extra): data, content_type = self._encode_data(data, format, content_type) return self.generic('DELETE', path, data, content_type, **extra) def options(self, path, data=None, format=None, content_type=None, **extra): data, content_type = self._encode_data(data, format, content_type) return self.generic('OPTIONS', path, data, content_type, **extra) def request(self, **kwargs): request = super(APIRequestFactory, self).request(**kwargs) request._dont_enforce_csrf_checks = not self.enforce_csrf_checks return request class ForceAuthClientHandler(ClientHandler): """ A patched version of ClientHandler that can enforce authentication on the outgoing requests. """ def __init__(self, *args, **kwargs): self._force_user = None self._force_token = None super(ForceAuthClientHandler, self).__init__(*args, **kwargs) def get_response(self, request): # This is the simplest place we can hook into to patch the # request object. force_authenticate(request, self._force_user, self._force_token) return super(ForceAuthClientHandler, self).get_response(request) class APIClient(APIRequestFactory, DjangoClient): def __init__(self, enforce_csrf_checks=False, **defaults): super(APIClient, self).__init__(**defaults) self.handler = ForceAuthClientHandler(enforce_csrf_checks) self._credentials = {} def credentials(self, **kwargs): """ Sets headers that will be used on every outgoing request. """ self._credentials = kwargs def force_authenticate(self, user=None, token=None): """ Forcibly authenticates outgoing requests with the given user and/or token. """ self.handler._force_user = user self.handler._force_token = token if user is None: self.logout() # Also clear any possible session info if required def request(self, **kwargs): # Ensure that any credentials set get added to every request. kwargs.update(self._credentials) return super(APIClient, self).request(**kwargs) def get(self, path, data=None, follow=False, **extra): response = super(APIClient, self).get(path, data=data, **extra) if follow: response = self._handle_redirects(response, **extra) return response def post(self, path, data=None, format=None, content_type=None, follow=False, **extra): response = super(APIClient, self).post( path, data=data, format=format, content_type=content_type, **extra) if follow: response = self._handle_redirects(response, **extra) return response def put(self, path, data=None, format=None, content_type=None, follow=False, **extra): response = super(APIClient, self).put( path, data=data, format=format, content_type=content_type, **extra) if follow: response = self._handle_redirects(response, **extra) return response def patch(self, path, data=None, format=None, content_type=None, follow=False, **extra): response = super(APIClient, self).patch( path, data=data, format=format, content_type=content_type, **extra) if follow: response = self._handle_redirects(response, **extra) return response def delete(self, path, data=None, format=None, content_type=None, follow=False, **extra): response = super(APIClient, self).delete( path, data=data, format=format, content_type=content_type, **extra) if follow: response = self._handle_redirects(response, **extra) return response def options(self, path, data=None, format=None, content_type=None, follow=False, **extra): response = super(APIClient, self).options( path, data=data, format=format, content_type=content_type, **extra) if follow: response = self._handle_redirects(response, **extra) return response def logout(self): self._credentials = {} # Also clear any `force_authenticate` self.handler._force_user = None self.handler._force_token = None if self.session: super(APIClient, self).logout() class APITransactionTestCase(testcases.TransactionTestCase): client_class = APIClient class APITestCase(testcases.TestCase): client_class = APIClient class APISimpleTestCase(testcases.SimpleTestCase): client_class = APIClient class APILiveServerTestCase(testcases.LiveServerTestCase): client_class = APIClient