mirror of
https://github.com/encode/django-rest-framework.git
synced 2024-11-10 19:56:59 +03:00
352 lines
13 KiB
Python
352 lines
13 KiB
Python
# -- coding: utf-8 --
|
|
|
|
# Note that we import as `DjangoRequestFactory` and `DjangoClient` in order
|
|
# to make it harder for the user to import the wrong thing without realizing.
|
|
from __future__ import unicode_literals
|
|
|
|
import io
|
|
|
|
from django.conf import settings
|
|
from django.core.exceptions import ImproperlyConfigured
|
|
from django.core.handlers.wsgi import WSGIHandler
|
|
from django.test import testcases
|
|
from django.test.client import Client as DjangoClient
|
|
from django.test.client import RequestFactory as DjangoRequestFactory
|
|
from django.test.client import ClientHandler
|
|
from django.utils import six
|
|
from django.utils.encoding import force_bytes
|
|
from django.utils.http import urlencode
|
|
|
|
from rest_framework.compat import coreapi, requests
|
|
from rest_framework.settings import api_settings
|
|
|
|
|
|
def force_authenticate(request, user=None, token=None):
|
|
request._force_auth_user = user
|
|
request._force_auth_token = token
|
|
|
|
|
|
if requests is not None:
|
|
class HeaderDict(requests.packages.urllib3._collections.HTTPHeaderDict):
|
|
def get_all(self, key, default):
|
|
return self.getheaders(key)
|
|
|
|
class MockOriginalResponse(object):
|
|
def __init__(self, headers):
|
|
self.msg = HeaderDict(headers)
|
|
self.closed = False
|
|
|
|
def isclosed(self):
|
|
return self.closed
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
class DjangoTestAdapter(requests.adapters.HTTPAdapter):
|
|
"""
|
|
A transport adapter for `requests`, that makes requests via the
|
|
Django WSGI app, rather than making actual HTTP requests over the network.
|
|
"""
|
|
def __init__(self):
|
|
self.app = WSGIHandler()
|
|
self.factory = DjangoRequestFactory()
|
|
|
|
def get_environ(self, request):
|
|
"""
|
|
Given a `requests.PreparedRequest` instance, return a WSGI environ dict.
|
|
"""
|
|
method = request.method
|
|
url = request.url
|
|
kwargs = {}
|
|
|
|
# Set request content, if any exists.
|
|
if request.body is not None:
|
|
if hasattr(request.body, 'read'):
|
|
kwargs['data'] = request.body.read()
|
|
else:
|
|
kwargs['data'] = request.body
|
|
if 'content-type' in request.headers:
|
|
kwargs['content_type'] = request.headers['content-type']
|
|
|
|
# Set request headers.
|
|
for key, value in request.headers.items():
|
|
key = key.upper()
|
|
if key in ('CONNECTION', 'CONTENT-LENGTH', 'CONTENT-TYPE'):
|
|
continue
|
|
kwargs['HTTP_%s' % key.replace('-', '_')] = value
|
|
|
|
return self.factory.generic(method, url, **kwargs).environ
|
|
|
|
def send(self, request, *args, **kwargs):
|
|
"""
|
|
Make an outgoing request to the Django WSGI application.
|
|
"""
|
|
raw_kwargs = {}
|
|
|
|
def start_response(wsgi_status, wsgi_headers):
|
|
status, _, reason = wsgi_status.partition(' ')
|
|
raw_kwargs['status'] = int(status)
|
|
raw_kwargs['reason'] = reason
|
|
raw_kwargs['headers'] = wsgi_headers
|
|
raw_kwargs['version'] = 11
|
|
raw_kwargs['preload_content'] = False
|
|
raw_kwargs['original_response'] = MockOriginalResponse(wsgi_headers)
|
|
|
|
# Make the outgoing request via WSGI.
|
|
environ = self.get_environ(request)
|
|
wsgi_response = self.app(environ, start_response)
|
|
|
|
# Build the underlying urllib3.HTTPResponse
|
|
raw_kwargs['body'] = io.BytesIO(b''.join(wsgi_response))
|
|
raw = requests.packages.urllib3.HTTPResponse(**raw_kwargs)
|
|
|
|
# Build the requests.Response
|
|
return self.build_response(request, raw)
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
class RequestsClient(requests.Session):
|
|
def __init__(self, *args, **kwargs):
|
|
super(RequestsClient, self).__init__(*args, **kwargs)
|
|
adapter = DjangoTestAdapter()
|
|
self.mount('http://', adapter)
|
|
self.mount('https://', adapter)
|
|
|
|
def request(self, method, url, *args, **kwargs):
|
|
if not url.startswith('http'):
|
|
raise ValueError('Missing "http:" or "https:". Use a fully qualified URL, eg "http://testserver%s"' % url)
|
|
return super(RequestsClient, self).request(method, url, *args, **kwargs)
|
|
|
|
else:
|
|
def RequestsClient(*args, **kwargs):
|
|
raise ImproperlyConfigured('requests must be installed in order to use RequestsClient.')
|
|
|
|
|
|
if coreapi is not None:
|
|
class CoreAPIClient(coreapi.Client):
|
|
def __init__(self, *args, **kwargs):
|
|
self._session = RequestsClient()
|
|
kwargs['transports'] = [coreapi.transports.HTTPTransport(session=self.session)]
|
|
return super(CoreAPIClient, self).__init__(*args, **kwargs)
|
|
|
|
@property
|
|
def session(self):
|
|
return self._session
|
|
|
|
else:
|
|
def CoreAPIClient(*args, **kwargs):
|
|
raise ImproperlyConfigured('coreapi must be installed in order to use CoreAPIClient.')
|
|
|
|
|
|
class APIRequestFactory(DjangoRequestFactory):
|
|
renderer_classes_list = api_settings.TEST_REQUEST_RENDERER_CLASSES
|
|
default_format = api_settings.TEST_REQUEST_DEFAULT_FORMAT
|
|
|
|
def __init__(self, enforce_csrf_checks=False, **defaults):
|
|
self.enforce_csrf_checks = enforce_csrf_checks
|
|
self.renderer_classes = {}
|
|
for cls in self.renderer_classes_list:
|
|
self.renderer_classes[cls.format] = cls
|
|
super(APIRequestFactory, self).__init__(**defaults)
|
|
|
|
def _encode_data(self, data, format=None, content_type=None):
|
|
"""
|
|
Encode the data returning a two tuple of (bytes, content_type)
|
|
"""
|
|
|
|
if data is None:
|
|
return ('', content_type)
|
|
|
|
assert format is None or content_type is None, (
|
|
'You may not set both `format` and `content_type`.'
|
|
)
|
|
|
|
if content_type:
|
|
# Content type specified explicitly, treat data as a raw bytestring
|
|
ret = force_bytes(data, settings.DEFAULT_CHARSET)
|
|
|
|
else:
|
|
format = format or self.default_format
|
|
|
|
assert format in self.renderer_classes, (
|
|
"Invalid format '{0}'. Available formats are {1}. "
|
|
"Set TEST_REQUEST_RENDERER_CLASSES to enable "
|
|
"extra request formats.".format(
|
|
format,
|
|
', '.join(["'" + fmt + "'" for fmt in self.renderer_classes.keys()])
|
|
)
|
|
)
|
|
|
|
# Use format and render the data into a bytestring
|
|
renderer = self.renderer_classes[format]()
|
|
ret = renderer.render(data)
|
|
|
|
# Determine the content-type header from the renderer
|
|
content_type = "{0}; charset={1}".format(
|
|
renderer.media_type, renderer.charset
|
|
)
|
|
|
|
# Coerce text to bytes if required.
|
|
if isinstance(ret, six.text_type):
|
|
ret = bytes(ret.encode(renderer.charset))
|
|
|
|
return ret, content_type
|
|
|
|
def get(self, path, data=None, **extra):
|
|
r = {
|
|
'QUERY_STRING': urlencode(data or {}, doseq=True),
|
|
}
|
|
if not data and '?' in path:
|
|
# Fix to support old behavior where you have the arguments in the
|
|
# url. See #1461.
|
|
query_string = force_bytes(path.split('?')[1])
|
|
if six.PY3:
|
|
query_string = query_string.decode('iso-8859-1')
|
|
r['QUERY_STRING'] = query_string
|
|
r.update(extra)
|
|
return self.generic('GET', path, **r)
|
|
|
|
def post(self, path, data=None, format=None, content_type=None, **extra):
|
|
data, content_type = self._encode_data(data, format, content_type)
|
|
return self.generic('POST', path, data, content_type, **extra)
|
|
|
|
def put(self, path, data=None, format=None, content_type=None, **extra):
|
|
data, content_type = self._encode_data(data, format, content_type)
|
|
return self.generic('PUT', path, data, content_type, **extra)
|
|
|
|
def patch(self, path, data=None, format=None, content_type=None, **extra):
|
|
data, content_type = self._encode_data(data, format, content_type)
|
|
return self.generic('PATCH', path, data, content_type, **extra)
|
|
|
|
def delete(self, path, data=None, format=None, content_type=None, **extra):
|
|
data, content_type = self._encode_data(data, format, content_type)
|
|
return self.generic('DELETE', path, data, content_type, **extra)
|
|
|
|
def options(self, path, data=None, format=None, content_type=None, **extra):
|
|
data, content_type = self._encode_data(data, format, content_type)
|
|
return self.generic('OPTIONS', path, data, content_type, **extra)
|
|
|
|
def request(self, **kwargs):
|
|
request = super(APIRequestFactory, self).request(**kwargs)
|
|
request._dont_enforce_csrf_checks = not self.enforce_csrf_checks
|
|
return request
|
|
|
|
|
|
class ForceAuthClientHandler(ClientHandler):
|
|
"""
|
|
A patched version of ClientHandler that can enforce authentication
|
|
on the outgoing requests.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self._force_user = None
|
|
self._force_token = None
|
|
super(ForceAuthClientHandler, self).__init__(*args, **kwargs)
|
|
|
|
def get_response(self, request):
|
|
# This is the simplest place we can hook into to patch the
|
|
# request object.
|
|
force_authenticate(request, self._force_user, self._force_token)
|
|
return super(ForceAuthClientHandler, self).get_response(request)
|
|
|
|
|
|
class APIClient(APIRequestFactory, DjangoClient):
|
|
def __init__(self, enforce_csrf_checks=False, **defaults):
|
|
super(APIClient, self).__init__(**defaults)
|
|
self.handler = ForceAuthClientHandler(enforce_csrf_checks)
|
|
self._credentials = {}
|
|
|
|
def credentials(self, **kwargs):
|
|
"""
|
|
Sets headers that will be used on every outgoing request.
|
|
"""
|
|
self._credentials = kwargs
|
|
|
|
def force_authenticate(self, user=None, token=None):
|
|
"""
|
|
Forcibly authenticates outgoing requests with the given
|
|
user and/or token.
|
|
"""
|
|
self.handler._force_user = user
|
|
self.handler._force_token = token
|
|
if user is None:
|
|
self.logout() # Also clear any possible session info if required
|
|
|
|
def request(self, **kwargs):
|
|
# Ensure that any credentials set get added to every request.
|
|
kwargs.update(self._credentials)
|
|
return super(APIClient, self).request(**kwargs)
|
|
|
|
def get(self, path, data=None, follow=False, **extra):
|
|
response = super(APIClient, self).get(path, data=data, **extra)
|
|
if follow:
|
|
response = self._handle_redirects(response, **extra)
|
|
return response
|
|
|
|
def post(self, path, data=None, format=None, content_type=None,
|
|
follow=False, **extra):
|
|
response = super(APIClient, self).post(
|
|
path, data=data, format=format, content_type=content_type, **extra)
|
|
if follow:
|
|
response = self._handle_redirects(response, **extra)
|
|
return response
|
|
|
|
def put(self, path, data=None, format=None, content_type=None,
|
|
follow=False, **extra):
|
|
response = super(APIClient, self).put(
|
|
path, data=data, format=format, content_type=content_type, **extra)
|
|
if follow:
|
|
response = self._handle_redirects(response, **extra)
|
|
return response
|
|
|
|
def patch(self, path, data=None, format=None, content_type=None,
|
|
follow=False, **extra):
|
|
response = super(APIClient, self).patch(
|
|
path, data=data, format=format, content_type=content_type, **extra)
|
|
if follow:
|
|
response = self._handle_redirects(response, **extra)
|
|
return response
|
|
|
|
def delete(self, path, data=None, format=None, content_type=None,
|
|
follow=False, **extra):
|
|
response = super(APIClient, self).delete(
|
|
path, data=data, format=format, content_type=content_type, **extra)
|
|
if follow:
|
|
response = self._handle_redirects(response, **extra)
|
|
return response
|
|
|
|
def options(self, path, data=None, format=None, content_type=None,
|
|
follow=False, **extra):
|
|
response = super(APIClient, self).options(
|
|
path, data=data, format=format, content_type=content_type, **extra)
|
|
if follow:
|
|
response = self._handle_redirects(response, **extra)
|
|
return response
|
|
|
|
def logout(self):
|
|
self._credentials = {}
|
|
|
|
# Also clear any `force_authenticate`
|
|
self.handler._force_user = None
|
|
self.handler._force_token = None
|
|
|
|
if self.session:
|
|
super(APIClient, self).logout()
|
|
|
|
|
|
class APITransactionTestCase(testcases.TransactionTestCase):
|
|
client_class = APIClient
|
|
|
|
|
|
class APITestCase(testcases.TestCase):
|
|
client_class = APIClient
|
|
|
|
|
|
class APISimpleTestCase(testcases.SimpleTestCase):
|
|
client_class = APIClient
|
|
|
|
|
|
class APILiveServerTestCase(testcases.LiveServerTestCase):
|
|
client_class = APIClient
|