From 21e4bb3487876b2b04687fde4997000b3416fd0f Mon Sep 17 00:00:00 2001 From: kuratowski Date: Sun, 27 Oct 2019 16:52:07 +0900 Subject: [PATCH] Change resetting event loop to reinstalling reactor --- daphne/testing.py | 51 +++++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/daphne/testing.py b/daphne/testing.py index 5a61221..0da03d0 100644 --- a/daphne/testing.py +++ b/daphne/testing.py @@ -6,11 +6,6 @@ import tempfile import traceback from concurrent.futures import CancelledError -from twisted.internet import reactor - -from .endpoints import build_endpoint_description_strings -from .server import Server - class DaphneTestingInstance: """ @@ -121,19 +116,18 @@ class DaphneProcess(multiprocessing.Process): self.errors = multiprocessing.Queue() def run(self): + # OK, now we are in a forked child process, and want to use the reactor. + # However, FreeBSD systems like MacOS do not fork the underlying Kqueue, + # which asyncio (hence asyncioreactor) is built on. + # Therefore, we should uninstall the broken reactor and install a new one. + _reinstall_reactor() + + from twisted.internet import reactor + + from .server import Server + from .endpoints import build_endpoint_description_strings + try: - # Renew the asyncio event loop without anyone knowing. - # This is necessary because asyncio behaves badly with multiprocessing. - from twisted.internet import asyncioreactor - import sys - - current_reactor = sys.modules.get("twisted.internet.reactor") - if isinstance(current_reactor, asyncioreactor.AsyncioSelectorReactor): - import asyncio - - current_reactor._asyncioEventloop.close() - asyncio.set_event_loop(asyncio.new_event_loop()) - current_reactor._asyncioEventloop = asyncio.get_event_loop() # Create the server class endpoints = build_endpoint_description_strings(host=self.host, port=0) self.server = Server( @@ -155,6 +149,8 @@ class DaphneProcess(multiprocessing.Process): self.errors.put((e, traceback.format_exc())) def resolve_port(self): + from twisted.internet import reactor + if self.server.listening_addresses: self.port.value = self.server.listening_addresses[0][1] self.ready.set() @@ -261,3 +257,24 @@ class TestApplication: os.unlink(cls.result_storage) except OSError: pass + + +def _reinstall_reactor(): + import sys + import asyncio + + from twisted.internet import asyncioreactor + + # Uninstall the reactor. + if "twisted.internet.reactor" in sys.modules: + del sys.modules["twisted.internet.reactor"] + + # The daphne.server module may have already installed the reactor. + # If so, using this module will use uninstalled one, thus we should + # reimport this module too. + if "daphne.server" in sys.modules: + del sys.modules["daphne.server"] + + event_loop = asyncio.new_event_loop() + asyncioreactor.install(event_loop) + asyncio.set_event_loop(event_loop)