Added request body chunking (#335)

The entire body was previously read in memory which would lead
the server to be killed by the scheduler.
This change allows 8Kb chunks to be read until the entire body is
consummed.

Co-authored-by: Samori Gorse <samori@codeinstyle.io>
This commit is contained in:
Samori Gorse 2020-10-21 16:38:03 +02:00 committed by GitHub
parent b96720390f
commit e1b77e930b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 68 additions and 9 deletions

View File

@ -112,7 +112,7 @@ should start with a slash, but not end with one; for example::
Python Support
--------------
Daphne requires Python 3.5 or later.
Daphne requires Python 3.6 or later.
Contributing

View File

@ -185,9 +185,19 @@ class WebRequest(http.Request):
# Not much we can do, the request is prematurely abandoned.
return
# Run application against request
self.application_queue.put_nowait(
{"type": "http.request", "body": self.content.read()}
)
buffer_size = self.server.request_buffer_size
while True:
chunk = self.content.read(buffer_size)
more_body = not (len(chunk) < buffer_size)
payload = {
"type": "http.request",
"body": chunk,
"more_body": more_body,
}
self.application_queue.put_nowait(payload)
if not more_body:
break
except Exception:
logger.error(traceback.format_exc())
self.basic_error(

View File

@ -42,6 +42,7 @@ class Server(object):
signal_handlers=True,
action_logger=None,
http_timeout=None,
request_buffer_size=8192,
websocket_timeout=86400,
websocket_connect_timeout=20,
ping_interval=20,
@ -67,6 +68,7 @@ class Server(object):
self.http_timeout = http_timeout
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.request_buffer_size = request_buffer_size
self.proxy_forwarded_address_header = proxy_forwarded_address_header
self.proxy_forwarded_port_header = proxy_forwarded_port_header
self.proxy_forwarded_proto_header = proxy_forwarded_proto_header

View File

@ -18,11 +18,12 @@ class DaphneTestingInstance:
startup_timeout = 2
def __init__(self, xff=False, http_timeout=None):
def __init__(self, xff=False, http_timeout=None, request_buffer_size=None):
self.xff = xff
self.http_timeout = http_timeout
self.host = "127.0.0.1"
self.lock = multiprocessing.Lock()
self.request_buffer_size = request_buffer_size
def __enter__(self):
# Clear result storage
@ -30,6 +31,8 @@ class DaphneTestingInstance:
TestApplication.delete_result()
# Option Daphne features
kwargs = {}
if self.request_buffer_size:
kwargs["request_buffer_size"] = self.request_buffer_size
# Optionally enable X-Forwarded-For support.
if self.xff:
kwargs["proxy_forwarded_address_header"] = "X-Forwarded-For"

View File

@ -23,6 +23,7 @@ setup(
packages=find_packages() + ["twisted.plugins"],
include_package_data=True,
install_requires=["twisted[tls]>=18.7", "autobahn>=0.18", "asgiref~=3.2"],
python_requires='>=3.6',
setup_requires=["pytest-runner"],
extras_require={
"tests": ["hypothesis==4.23", "pytest~=3.10", "pytest-asyncio~=0.8"]
@ -38,10 +39,10 @@ setup(
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Topic :: Internet :: WWW/HTTP",
],
)

View File

@ -20,13 +20,24 @@ class DaphneTestCase(unittest.TestCase):
### Plain HTTP helpers
def run_daphne_http(
self, method, path, params, body, responses, headers=None, timeout=1, xff=False
self,
method,
path,
params,
body,
responses,
headers=None,
timeout=1,
xff=False,
request_buffer_size=None,
):
"""
Runs Daphne with the given request callback (given the base URL)
and response messages.
"""
with DaphneTestingInstance(xff=xff) as test_app:
with DaphneTestingInstance(
xff=xff, request_buffer_size=request_buffer_size
) as test_app:
# Add the response messages
test_app.add_send_messages(responses)
# Send it the request. We have to do this the long way to allow
@ -79,7 +90,14 @@ class DaphneTestCase(unittest.TestCase):
)
def run_daphne_request(
self, method, path, params=None, body=None, headers=None, xff=False
self,
method,
path,
params=None,
body=None,
headers=None,
xff=False,
request_buffer_size=None,
):
"""
Convenience method for just testing request handling.
@ -92,6 +110,7 @@ class DaphneTestCase(unittest.TestCase):
body=body,
headers=headers,
xff=xff,
request_buffer_size=request_buffer_size,
responses=[
{"type": "http.response.start", "status": 200},
{"type": "http.response.body", "body": b"OK"},

View File

@ -6,6 +6,7 @@ from urllib import parse
import http_strategies
from http_base import DaphneTestCase
from hypothesis import assume, given, settings
from hypothesis.strategies import integers
class TestHTTPRequest(DaphneTestCase):
@ -119,6 +120,29 @@ class TestHTTPRequest(DaphneTestCase):
self.assert_valid_http_scope(scope, "GET", request_path, params=request_params)
self.assert_valid_http_request_message(messages[0], body=b"")
@given(
request_path=http_strategies.http_path(),
chunk_size=integers(min_value=1),
)
@settings(max_examples=5, deadline=5000)
def test_request_body_chunking(self, request_path, chunk_size):
"""
Tests request body chunking logic.
"""
body = b"The quick brown fox jumps over the lazy dog"
_, messages = self.run_daphne_request(
"POST",
request_path,
body=body,
request_buffer_size=chunk_size,
)
# Avoid running those asserts when there's a single "http.disconnect"
if len(messages) > 1:
assert messages[0]["body"].decode() == body.decode()[:chunk_size]
assert not messages[-2]["more_body"]
assert messages[-1] == {"type": "http.disconnect"}
@given(
request_path=http_strategies.http_path(),
request_body=http_strategies.http_body(),