import logging import multiprocessing import os import pickle import tempfile import traceback from concurrent.futures import CancelledError from twisted.internet import reactor from .endpoints import build_endpoint_description_strings from .server import Server class DaphneTestingInstance: """ Launches an instance of Daphne in a subprocess, with a host and port attribute allowing you to call it. Works as a context manager. """ startup_timeout = 2 def __init__(self, xff=False, http_timeout=None): self.xff = xff self.http_timeout = http_timeout self.host = "127.0.0.1" def __enter__(self): # Clear result storage TestApplication.delete_setup() TestApplication.delete_result() # Option Daphne features kwargs = {} # Optionally enable X-Forwarded-For support. if self.xff: kwargs["proxy_forwarded_address_header"] = "X-Forwarded-For" kwargs["proxy_forwarded_port_header"] = "X-Forwarded-Port" if self.http_timeout: kwargs["http_timeout"] = self.http_timeout # Start up process self.process = DaphneProcess( host=self.host, application=TestApplication, kwargs=kwargs, setup=self.process_setup, teardown=self.process_teardown, ) self.process.start() # Wait for the port if self.process.ready.wait(self.startup_timeout): self.port = self.process.port.value return self else: if self.process.errors.empty(): raise RuntimeError("Daphne did not start up, no error caught") else: error, traceback = self.process.errors.get(False) raise RuntimeError("Daphne did not start up:\n%s" % traceback) def __exit__(self, exc_type, exc_value, traceback): # Shut down the process self.process.terminate() del self.process def process_setup(self): """ Called by the process just before it starts serving. """ pass def process_teardown(self): """ Called by the process just after it stops serving """ pass def get_received(self): """ Returns the scope and messages the test application has received so far. Note you'll get all messages since scope start, not just any new ones since the last call. Also checks for any exceptions in the application. If there are, raises them. """ try: inner_result = TestApplication.load_result() except FileNotFoundError: raise ValueError("No results available yet.") # Check for exception if "exception" in inner_result: raise inner_result["exception"] return inner_result["scope"], inner_result["messages"] def add_send_messages(self, messages): """ Adds messages for the application to send back. The next time it receives an incoming message, it will reply with these. """ TestApplication.save_setup( response_messages=messages, ) class DaphneProcess(multiprocessing.Process): """ Process subclass that launches and runs a Daphne instance, communicating the port it ends up listening on back to the parent process. """ def __init__(self, host, application, kwargs=None, setup=None, teardown=None): super().__init__() self.host = host self.application = application self.kwargs = kwargs or {} self.setup = setup or (lambda: None) self.teardown = teardown or (lambda: None) self.port = multiprocessing.Value("i") self.ready = multiprocessing.Event() self.errors = multiprocessing.Queue() def run(self): try: # Create the server class endpoints = build_endpoint_description_strings(host=self.host, port=0) self.server = Server( application=self.application, endpoints=endpoints, signal_handlers=False, **self.kwargs ) # Set up a poller to look for the port reactor.callLater(0.1, self.resolve_port) # Run with setup/teardown self.setup() try: self.server.run() finally: self.teardown() except Exception as e: # Put the error on our queue so the parent gets it self.errors.put((e, traceback.format_exc())) def resolve_port(self): if self.server.listening_addresses: self.port.value = self.server.listening_addresses[0][1] self.ready.set() else: reactor.callLater(0.1, self.resolve_port) class TestApplication: """ An application that receives one or more messages, sends a response, and then quits the server. For testing. """ setup_storage = os.path.join(tempfile.gettempdir(), "setup.testio") result_storage = os.path.join(tempfile.gettempdir(), "result.testio") def __init__(self, scope): self.scope = scope self.messages = [] async def __call__(self, send, receive): # Receive input and send output logging.debug("test app coroutine alive") try: while True: # Receive a message and save it into the result store self.messages.append(await receive()) logging.debug("test app received %r", self.messages[-1]) self.save_result(self.scope, self.messages) # See if there are any messages to send back setup = self.load_setup() self.delete_setup() for message in setup["response_messages"]: await send(message) logging.debug("test app sent %r", message) except Exception as e: if isinstance(e, CancelledError): # Don't catch task-cancelled errors! raise else: self.save_exception(e) @classmethod def save_setup(cls, response_messages): """ Stores setup information. """ with open(cls.setup_storage, "wb") as fh: pickle.dump( { "response_messages": response_messages, }, fh, ) @classmethod def load_setup(cls): """ Returns setup details. """ try: with open(cls.setup_storage, "rb") as fh: return pickle.load(fh) except FileNotFoundError: return {"response_messages": []} @classmethod def save_result(cls, scope, messages): """ Saves details of what happened to the result storage. We could use pickle here, but that seems wrong, still, somehow. """ with open(cls.result_storage, "wb") as fh: pickle.dump( { "scope": scope, "messages": messages, }, fh, ) @classmethod def save_exception(cls, exception): """ Saves details of what happened to the result storage. We could use pickle here, but that seems wrong, still, somehow. """ with open(cls.result_storage, "wb") as fh: pickle.dump( { "exception": exception, }, fh, ) @classmethod def load_result(cls): """ Returns result details. """ with open(cls.result_storage, "rb") as fh: return pickle.load(fh) @classmethod def delete_setup(cls): """ Clears setup storage files. """ try: os.unlink(cls.setup_storage) except OSError: pass @classmethod def delete_result(cls): """ Clears result storage files. """ try: os.unlink(cls.result_storage) except OSError: pass