Run server constructor in a threadpool as it's synchronous

This commit is contained in:
Andrew Godwin 2018-04-18 10:57:58 -07:00
parent cc6af549a6
commit a7ccfab495
3 changed files with 26 additions and 6 deletions

View File

@ -3,6 +3,7 @@ import time
import traceback import traceback
from urllib.parse import unquote from urllib.parse import unquote
from twisted.internet.defer import inlineCallbacks
from twisted.internet.interfaces import IProtocolNegotiationFactory from twisted.internet.interfaces import IProtocolNegotiationFactory
from twisted.protocols.policies import ProtocolWrapper from twisted.protocols.policies import ProtocolWrapper
from twisted.web import http from twisted.web import http
@ -55,6 +56,7 @@ class WebRequest(http.Request):
### Twisted progress callbacks ### Twisted progress callbacks
@inlineCallbacks
def process(self): def process(self):
try: try:
self.request_start = time.time() self.request_start = time.time()
@ -144,7 +146,7 @@ class WebRequest(http.Request):
logger.debug("HTTP %s request for %s", self.method, self.client_addr) logger.debug("HTTP %s request for %s", self.method, self.client_addr)
self.content.seek(0, 0) self.content.seek(0, 0)
# Work out the application scope and create application # Work out the application scope and create application
self.application_queue = self.server.create_application(self, { self.application_queue = yield self.server.create_application(self, {
"type": "http", "type": "http",
# TODO: Correctly say if it's 1.1 or 1.0 # TODO: Correctly say if it's 1.1 or 1.0
"http_version": self.clientproto.split(b"/")[-1].decode("ascii"), "http_version": self.clientproto.split(b"/")[-1].decode("ascii"),

View File

@ -24,6 +24,7 @@ from concurrent.futures import CancelledError
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.internet.endpoints import serverFromString from twisted.internet.endpoints import serverFromString
from twisted.internet.threads import deferToThread
from twisted.logger import STDLibLogObserver, globalLogBeginner from twisted.logger import STDLibLogObserver, globalLogBeginner
from twisted.web import http from twisted.web import http
@ -170,6 +171,7 @@ class Server(object):
### Internal event/message handling ### Internal event/message handling
@defer.inlineCallbacks
def create_application(self, protocol, scope): def create_application(self, protocol, scope):
""" """
Creates a new application instance that fronts a Protocol instance Creates a new application instance that fronts a Protocol instance
@ -181,7 +183,7 @@ class Server(object):
assert "application_instance" not in self.connections[protocol] assert "application_instance" not in self.connections[protocol]
# Make an instance of the application # Make an instance of the application
input_queue = asyncio.Queue() input_queue = asyncio.Queue()
application_instance = self.application(scope=scope) application_instance = yield deferToThread(self.application, scope=scope)
# Run it, and stash the future for later checking # Run it, and stash the future for later checking
self.connections[protocol]["application_instance"] = asyncio.ensure_future(application_instance( self.connections[protocol]["application_instance"] = asyncio.ensure_future(application_instance(
receive=input_queue.get, receive=input_queue.get,

View File

@ -66,7 +66,7 @@ class WebSocketProtocol(WebSocketServerProtocol):
] ]
# Make new application instance with scope # Make new application instance with scope
self.path = request.path.encode("ascii") self.path = request.path.encode("ascii")
self.application_queue = self.server.create_application(self, { self.application_deferred = self.server.create_application(self, {
"type": "websocket", "type": "websocket",
"path": unquote(self.path.decode("ascii")), "path": unquote(self.path.decode("ascii")),
"headers": self.clean_headers, "headers": self.clean_headers,
@ -75,12 +75,25 @@ class WebSocketProtocol(WebSocketServerProtocol):
"server": self.server_addr, "server": self.server_addr,
"subprotocols": subprotocols, "subprotocols": subprotocols,
}) })
self.application_deferred.addCallback(self.applicationCreateWorked)
self.application_deferred.addErrback(self.applicationCreateFailed)
except Exception as e: except Exception as e:
# Exceptions here are not displayed right, just 500. # Exceptions here are not displayed right, just 500.
# Turn them into an ERROR log. # Turn them into an ERROR log.
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
raise raise
# Make a deferred and return it - we'll either call it or err it later on
self.handshake_deferred = defer.Deferred()
return self.handshake_deferred
def applicationCreateWorked(self, application_queue):
"""
Called when the background thread has successfully made the application
instance.
"""
# Store the application's queue
self.application_queue = application_queue
# Send over the connect message # Send over the connect message
self.application_queue.put_nowait({"type": "websocket.connect"}) self.application_queue.put_nowait({"type": "websocket.connect"})
self.server.log_action("websocket", "connecting", { self.server.log_action("websocket", "connecting", {
@ -88,9 +101,12 @@ class WebSocketProtocol(WebSocketServerProtocol):
"client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None, "client": "%s:%s" % tuple(self.client_addr) if self.client_addr else None,
}) })
# Make a deferred and return it - we'll either call it or err it later on def applicationCreateFailed(self, failure):
self.handshake_deferred = defer.Deferred() """
return self.handshake_deferred Called when application creation fails.
"""
logger.error(failure)
return failure
### Twisted event handling ### Twisted event handling