Make _stream work on AsgiRequest

This commit is contained in:
Andrew Godwin 2016-02-10 19:15:00 +00:00
parent 899e180c21
commit 1a7010aa2c
2 changed files with 23 additions and 0 deletions

View File

@ -2,6 +2,7 @@ from __future__ import unicode_literals
import sys
import logging
from io import BytesIO
from threading import Lock
from django import http
@ -89,6 +90,8 @@ class AsgiRequest(http.HttpRequest):
if not chunk.get("more_content", False):
break
assert isinstance(self._body, six.binary_type), "Body is not bytes"
# Add a stream-a-like for the body
self._stream = BytesIO(self._body)
# Other bits
self.resolver_match = None

View File

@ -176,3 +176,23 @@ class RequestTests(SimpleTestCase):
self.assertFalse(request._post_parse_error)
self.assertEqual(request.POST["title"], "My First Book")
self.assertEqual(request.FILES["pdf"].read(), b"FAKEPDFBYTESGOHERE")
def test_stream(self):
"""
Tests the body stream is emulated correctly.
"""
message = self.make_message({
"reply_channel": "test",
"http_version": "1.1",
"method": "PUT",
"path": b"/",
"body": b"onetwothree",
"headers": {
"host": b"example.com",
"content-length": b"11",
},
}, "test")
request = AsgiRequest(message)
self.assertEqual(request.method, "PUT")
self.assertEqual(request.read(3), b"one")
self.assertEqual(request.read(), b"twothree")