Move testing to use multiprocessing for better reliability

We can also hopefully reuse this for LiveServerTestCase
This commit is contained in:
Andrew Godwin 2018-02-19 20:58:47 -08:00
parent 173617ad3b
commit 853771ec95
4 changed files with 270 additions and 200 deletions

View File

@ -1,122 +0,0 @@
import logging
import os
import pickle
import tempfile
from concurrent.futures import CancelledError
class TestApplication:
"""
An application that receives one or more messages, sends a response,
and then quits the server. For testing.
"""
setup_storage = os.path.join(tempfile.gettempdir(), "setup.testio")
result_storage = os.path.join(tempfile.gettempdir(), "result.testio")
def __init__(self, scope):
self.scope = scope
self.messages = []
async def __call__(self, send, receive):
# Receive input and send output
logging.debug("test app coroutine alive")
try:
while True:
# Receive a message and save it into the result store
self.messages.append(await receive())
logging.debug("test app received %r", self.messages[-1])
self.save_result(self.scope, self.messages)
# See if there are any messages to send back
setup = self.load_setup()
self.delete_setup()
for message in setup["response_messages"]:
await send(message)
logging.debug("test app sent %r", message)
except Exception as e:
if isinstance(e, CancelledError):
# Don't catch task-cancelled errors!
raise
else:
self.save_exception(e)
@classmethod
def save_setup(cls, response_messages):
"""
Stores setup information.
"""
with open(cls.setup_storage, "wb") as fh:
pickle.dump(
{
"response_messages": response_messages,
},
fh,
)
@classmethod
def load_setup(cls):
"""
Returns setup details.
"""
try:
with open(cls.setup_storage, "rb") as fh:
return pickle.load(fh)
except FileNotFoundError:
return {"response_messages": []}
@classmethod
def save_result(cls, scope, messages):
"""
Saves details of what happened to the result storage.
We could use pickle here, but that seems wrong, still, somehow.
"""
with open(cls.result_storage, "wb") as fh:
pickle.dump(
{
"scope": scope,
"messages": messages,
},
fh,
)
@classmethod
def save_exception(cls, exception):
"""
Saves details of what happened to the result storage.
We could use pickle here, but that seems wrong, still, somehow.
"""
with open(cls.result_storage, "wb") as fh:
pickle.dump(
{
"exception": exception,
},
fh,
)
@classmethod
def load_result(cls):
"""
Returns result details.
"""
with open(cls.result_storage, "rb") as fh:
return pickle.load(fh)
@classmethod
def delete_setup(cls):
"""
Clears setup storage files.
"""
try:
os.unlink(cls.setup_storage)
except OSError:
pass
@classmethod
def delete_result(cls):
"""
Clears result storage files.
"""
try:
os.unlink(cls.result_storage)
except OSError:
pass

268
daphne/testing.py Normal file
View File

@ -0,0 +1,268 @@
import logging
import multiprocessing
import os
import pickle
import tempfile
import traceback
from concurrent.futures import CancelledError
from twisted.internet import reactor
from .endpoints import build_endpoint_description_strings
from .server import Server
class DaphneTestingInstance:
"""
Launches an instance of Daphne in a subprocess, with a host and port
attribute allowing you to call it.
Works as a context manager.
"""
startup_timeout = 2
def __init__(self, xff=False, http_timeout=None):
self.xff = xff
self.http_timeout = http_timeout
self.host = "127.0.0.1"
def __enter__(self):
# Clear result storage
TestApplication.delete_setup()
TestApplication.delete_result()
# Option Daphne features
kwargs = {}
# Optionally enable X-Forwarded-For support.
if self.xff:
kwargs["proxy_forwarded_address_header"] = "X-Forwarded-For"
kwargs["proxy_forwarded_port_header"] = "X-Forwarded-Port"
if self.http_timeout:
kwargs["http_timeout"] = self.http_timeout
# Start up process
self.process = DaphneProcess(
host=self.host,
application=TestApplication,
kwargs=kwargs,
setup=self.process_setup,
teardown=self.process_teardown,
)
self.process.start()
# Wait for the port
if self.process.ready.wait(self.startup_timeout):
self.port = self.process.port.value
return self
else:
if self.process.errors.empty():
raise RuntimeError("Daphne did not start up, no error caught")
else:
error, traceback = self.process.errors.get(False)
raise RuntimeError("Daphne did not start up:\n%s" % traceback)
def __exit__(self, exc_type, exc_value, traceback):
# Shut down the process
self.process.terminate()
del self.process
def process_setup(self):
"""
Called by the process just before it starts serving.
"""
pass
def process_teardown(self):
"""
Called by the process just after it stops serving
"""
pass
def get_received(self):
"""
Returns the scope and messages the test application has received
so far. Note you'll get all messages since scope start, not just any
new ones since the last call.
Also checks for any exceptions in the application. If there are,
raises them.
"""
try:
inner_result = TestApplication.load_result()
except FileNotFoundError:
raise ValueError("No results available yet.")
# Check for exception
if "exception" in inner_result:
raise inner_result["exception"]
return inner_result["scope"], inner_result["messages"]
def add_send_messages(self, messages):
"""
Adds messages for the application to send back.
The next time it receives an incoming message, it will reply with these.
"""
TestApplication.save_setup(
response_messages=messages,
)
class DaphneProcess(multiprocessing.Process):
"""
Process subclass that launches and runs a Daphne instance, communicating the
port it ends up listening on back to the parent process.
"""
def __init__(self, host, application, kwargs=None, setup=None, teardown=None):
super().__init__()
self.host = host
self.application = application
self.kwargs = kwargs or {}
self.setup = setup or (lambda: None)
self.teardown = teardown or (lambda: None)
self.port = multiprocessing.Value("i")
self.ready = multiprocessing.Event()
self.errors = multiprocessing.Queue()
def run(self):
try:
# Create the server class
endpoints = build_endpoint_description_strings(host=self.host, port=0)
self.server = Server(
application=self.application,
endpoints=endpoints,
signal_handlers=False,
**self.kwargs
)
# Set up a poller to look for the port
reactor.callLater(0.1, self.resolve_port)
# Run with setup/teardown
self.setup()
try:
self.server.run()
finally:
self.teardown()
except Exception as e:
# Put the error on our queue so the parent gets it
self.errors.put((e, traceback.format_exc()))
def resolve_port(self):
if self.server.listening_addresses:
self.port.value = self.server.listening_addresses[0][1]
self.ready.set()
else:
reactor.callLater(0.1, self.resolve_port)
class TestApplication:
"""
An application that receives one or more messages, sends a response,
and then quits the server. For testing.
"""
setup_storage = os.path.join(tempfile.gettempdir(), "setup.testio")
result_storage = os.path.join(tempfile.gettempdir(), "result.testio")
def __init__(self, scope):
self.scope = scope
self.messages = []
async def __call__(self, send, receive):
# Receive input and send output
logging.debug("test app coroutine alive")
try:
while True:
# Receive a message and save it into the result store
self.messages.append(await receive())
logging.debug("test app received %r", self.messages[-1])
self.save_result(self.scope, self.messages)
# See if there are any messages to send back
setup = self.load_setup()
self.delete_setup()
for message in setup["response_messages"]:
await send(message)
logging.debug("test app sent %r", message)
except Exception as e:
if isinstance(e, CancelledError):
# Don't catch task-cancelled errors!
raise
else:
self.save_exception(e)
@classmethod
def save_setup(cls, response_messages):
"""
Stores setup information.
"""
with open(cls.setup_storage, "wb") as fh:
pickle.dump(
{
"response_messages": response_messages,
},
fh,
)
@classmethod
def load_setup(cls):
"""
Returns setup details.
"""
try:
with open(cls.setup_storage, "rb") as fh:
return pickle.load(fh)
except FileNotFoundError:
return {"response_messages": []}
@classmethod
def save_result(cls, scope, messages):
"""
Saves details of what happened to the result storage.
We could use pickle here, but that seems wrong, still, somehow.
"""
with open(cls.result_storage, "wb") as fh:
pickle.dump(
{
"scope": scope,
"messages": messages,
},
fh,
)
@classmethod
def save_exception(cls, exception):
"""
Saves details of what happened to the result storage.
We could use pickle here, but that seems wrong, still, somehow.
"""
with open(cls.result_storage, "wb") as fh:
pickle.dump(
{
"exception": exception,
},
fh,
)
@classmethod
def load_result(cls):
"""
Returns result details.
"""
with open(cls.result_storage, "rb") as fh:
return pickle.load(fh)
@classmethod
def delete_setup(cls):
"""
Clears setup storage files.
"""
try:
os.unlink(cls.setup_storage)
except OSError:
pass
@classmethod
def delete_result(cls):
"""
Clears result storage files.
"""
try:
os.unlink(cls.result_storage)
except OSError:
pass

View File

@ -1,87 +1,11 @@
import socket import socket
import struct import struct
import subprocess
import time import time
import unittest import unittest
from http.client import HTTPConnection from http.client import HTTPConnection
from urllib import parse from urllib import parse
from daphne.test_application import TestApplication from daphne.testing import DaphneTestingInstance, TestApplication
class DaphneTestingInstance:
"""
Launches an instance of Daphne to test against, with an application
object you can read messages from and feed messages to.
Works as a context manager.
"""
def __init__(self, xff=False, http_timeout=None):
self.xff = xff
self.http_timeout = http_timeout
self.host = "127.0.0.1"
def __enter__(self):
# Clear result storage
TestApplication.delete_setup()
TestApplication.delete_result()
# Tell Daphne to use port 0 so the OS gives it a free port
daphne_args = ["daphne", "-p", "0", "-v", "1"]
# Optionally enable X-Forwarded-For support.
if self.xff:
daphne_args += ["--proxy-headers"]
if self.http_timeout:
daphne_args += ["--http-timeout=%i" % self.http_timeout]
# Start up process
self.process = subprocess.Popen(
daphne_args + ["daphne.test_application:TestApplication"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
# Read the port from its stdout
stdout = b""
for line in self.process.stdout:
stdout += line
if b"Listening on TCP address " in line:
self.port = int(line.split(b"TCP address ")[1].split(b":")[1].strip())
return self
else:
# Daphne didn't start up right :(
raise RuntimeError("Daphne never listened on a port. Output: \n%s" % stdout)
def __exit__(self, exc_type, exc_value, traceback):
# Shut down the process
self.process.terminate()
del self.process
def get_received(self):
"""
Returns the scope and messages the test application has received
so far. Note you'll get all messages since scope start, not just any
new ones since the last call.
Also checks for any exceptions in the application. If there are,
raises them.
"""
try:
inner_result = TestApplication.load_result()
except FileNotFoundError:
raise ValueError("No results available yet.")
# Check for exception
if "exception" in inner_result:
raise inner_result["exception"]
return inner_result["scope"], inner_result["messages"]
def add_send_messages(self, messages):
"""
Adds messages for the application to send back.
The next time it receives an incoming message, it will reply with these.
"""
TestApplication.save_setup(
response_messages=messages,
)
class DaphneTestCase(unittest.TestCase): class DaphneTestCase(unittest.TestCase):

View File

@ -161,7 +161,7 @@ class TestHTTPRequest(DaphneTestCase):
request_headers=http_strategies.headers(), request_headers=http_strategies.headers(),
request_body=http_strategies.http_body(), request_body=http_strategies.http_body(),
) )
@settings(max_examples=5, deadline=5000) @settings(max_examples=2, deadline=5000)
def test_kitchen_sink( def test_kitchen_sink(
self, self,
request_method, request_method,