Fix multiprocessing spawn mode.

This commit is contained in:
Bryce Durham 2021-03-22 10:18:37 -07:00
parent aac4708a61
commit 14f61884e9

View File

@ -1,3 +1,4 @@
import importlib
import logging import logging
import multiprocessing import multiprocessing
import os import os
@ -128,14 +129,27 @@ class DaphneProcess(multiprocessing.Process):
self.host = host self.host = host
self.application = application self.application = application
self.kwargs = kwargs or {} self.kwargs = kwargs or {}
self.setup = setup or (lambda: None) # prefer `type(None)` over `lambda x: None` for serialization in spawn mode
self.teardown = teardown or (lambda: None) self.setup = setup or type(None)
self.port = multiprocessing.Value("i") self.teardown = teardown or type(None)
# ready event isn't really needed when blocking on Queue.get() but Channels calls into it
self.ready = multiprocessing.Event() self.ready = multiprocessing.Event()
self.errors = multiprocessing.Queue() self.errors = multiprocessing.Queue()
self.__port = None
self.__portqueue = multiprocessing.Queue()
@property
def port(self):
# lazy load the port so we don't block the main process waiting
if self.__port is None:
self.__port = self.__portqueue.get()
return self.__port
def run(self): def run(self):
# OK, now we are in a forked child process, and want to use the reactor. # Note: MacOS uses spawn by default since Python 3.8.
# Forked processes in MacOS are either pre-3.8 or explicitly requested
# OK, now we may be in a forked child process, and want to use the reactor.
# However, FreeBSD systems like MacOS do not fork the underlying Kqueue, # However, FreeBSD systems like MacOS do not fork the underlying Kqueue,
# which asyncio (hence asyncioreactor) is built on. # which asyncio (hence asyncioreactor) is built on.
# Therefore, we should uninstall the broken reactor and install a new one. # Therefore, we should uninstall the broken reactor and install a new one.
@ -147,10 +161,17 @@ class DaphneProcess(multiprocessing.Process):
from .server import Server from .server import Server
try: try:
if isinstance(self.application, str):
mod_name, _, app_name = self.application.rpartition(':')
module = importlib.import_module(mod_name)
application = getattr(module, app_name)
else:
application = self.application
# Create the server class # Create the server class
endpoints = build_endpoint_description_strings(host=self.host, port=0) endpoints = build_endpoint_description_strings(host=self.host, port=0)
self.server = Server( self.server = Server(
application=self.application, application=application,
endpoints=endpoints, endpoints=endpoints,
signal_handlers=False, signal_handlers=False,
**self.kwargs **self.kwargs
@ -171,7 +192,9 @@ class DaphneProcess(multiprocessing.Process):
from twisted.internet import reactor from twisted.internet import reactor
if self.server.listening_addresses: if self.server.listening_addresses:
self.port.value = self.server.listening_addresses[0][1] port = self.server.listening_addresses[0][1]
self.__portqueue.put(port)
# ready event isn't really needed when blocking on queue.get() but Channels calls into it
self.ready.set() self.ready.set()
else: else:
reactor.callLater(0.1, self.resolve_port) reactor.callLater(0.1, self.resolve_port)