mirror of
https://github.com/django/daphne.git
synced 2025-07-15 02:12:17 +03:00
Allow user to pass in twisted reactor to Daphne Server.
This commit is contained in:
parent
59b57a9f4b
commit
b7f19291d6
|
@ -23,7 +23,8 @@ import logging
|
||||||
import time
|
import time
|
||||||
from concurrent.futures import CancelledError
|
from concurrent.futures import CancelledError
|
||||||
|
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer
|
||||||
|
from twisted.internet import reactor as twisted_reactor
|
||||||
from twisted.internet.endpoints import serverFromString
|
from twisted.internet.endpoints import serverFromString
|
||||||
from twisted.logger import STDLibLogObserver, globalLogBeginner
|
from twisted.logger import STDLibLogObserver, globalLogBeginner
|
||||||
from twisted.web import http
|
from twisted.web import http
|
||||||
|
@ -55,6 +56,7 @@ class Server(object):
|
||||||
application_close_timeout=10,
|
application_close_timeout=10,
|
||||||
ready_callable=None,
|
ready_callable=None,
|
||||||
server_name="Daphne",
|
server_name="Daphne",
|
||||||
|
reactor=None,
|
||||||
# Deprecated and does not work, remove in version 2.2
|
# Deprecated and does not work, remove in version 2.2
|
||||||
ws_protocols=None,
|
ws_protocols=None,
|
||||||
):
|
):
|
||||||
|
@ -79,6 +81,7 @@ class Server(object):
|
||||||
self.abort_start = False
|
self.abort_start = False
|
||||||
self.ready_callable = ready_callable
|
self.ready_callable = ready_callable
|
||||||
self.server_name = server_name
|
self.server_name = server_name
|
||||||
|
self.reactor = reactor or twisted_reactor
|
||||||
# Check our construction is actually sensible
|
# Check our construction is actually sensible
|
||||||
if not self.endpoints:
|
if not self.endpoints:
|
||||||
logger.error("No endpoints. This server will not listen on anything.")
|
logger.error("No endpoints. This server will not listen on anything.")
|
||||||
|
@ -112,12 +115,12 @@ class Server(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Kick off the timeout loop
|
# Kick off the timeout loop
|
||||||
reactor.callLater(1, self.application_checker)
|
self.reactor.callLater(1, self.application_checker)
|
||||||
reactor.callLater(2, self.timeout_checker)
|
self.reactor.callLater(2, self.timeout_checker)
|
||||||
|
|
||||||
for socket_description in self.endpoints:
|
for socket_description in self.endpoints:
|
||||||
logger.info("Configuring endpoint %s", socket_description)
|
logger.info("Configuring endpoint %s", socket_description)
|
||||||
ep = serverFromString(reactor, str(socket_description))
|
ep = serverFromString(self.reactor, str(socket_description))
|
||||||
listener = ep.listen(self.http_factory)
|
listener = ep.listen(self.http_factory)
|
||||||
listener.addCallback(self.listen_success)
|
listener.addCallback(self.listen_success)
|
||||||
listener.addErrback(self.listen_error)
|
listener.addErrback(self.listen_error)
|
||||||
|
@ -125,19 +128,21 @@ class Server(object):
|
||||||
|
|
||||||
# Set the asyncio reactor's event loop as global
|
# Set the asyncio reactor's event loop as global
|
||||||
# TODO: Should we instead pass the global one into the reactor?
|
# TODO: Should we instead pass the global one into the reactor?
|
||||||
asyncio.set_event_loop(reactor._asyncioEventloop)
|
asyncio.set_event_loop(self.reactor._asyncioEventloop)
|
||||||
|
|
||||||
# Verbosity 3 turns on asyncio debug to find those blocking yields
|
# Verbosity 3 turns on asyncio debug to find those blocking yields
|
||||||
if self.verbosity >= 3:
|
if self.verbosity >= 3:
|
||||||
asyncio.get_event_loop().set_debug(True)
|
asyncio.get_event_loop().set_debug(True)
|
||||||
|
|
||||||
reactor.addSystemEventTrigger("before", "shutdown", self.kill_all_applications)
|
self.reactor.addSystemEventTrigger(
|
||||||
|
"before", "shutdown", self.kill_all_applications
|
||||||
|
)
|
||||||
if not self.abort_start:
|
if not self.abort_start:
|
||||||
# Trigger the ready flag if we had one
|
# Trigger the ready flag if we had one
|
||||||
if self.ready_callable:
|
if self.ready_callable:
|
||||||
self.ready_callable()
|
self.ready_callable()
|
||||||
# Run the reactor
|
# Run the reactor
|
||||||
reactor.run(installSignalHandlers=self.signal_handlers)
|
self.reactor.run(installSignalHandlers=self.signal_handlers)
|
||||||
|
|
||||||
def listen_success(self, port):
|
def listen_success(self, port):
|
||||||
"""
|
"""
|
||||||
|
@ -161,8 +166,8 @@ class Server(object):
|
||||||
"""
|
"""
|
||||||
Force-stops the server.
|
Force-stops the server.
|
||||||
"""
|
"""
|
||||||
if reactor.running:
|
if self.reactor.running:
|
||||||
reactor.stop()
|
self.reactor.stop()
|
||||||
else:
|
else:
|
||||||
self.abort_start = True
|
self.abort_start = True
|
||||||
|
|
||||||
|
@ -294,7 +299,7 @@ class Server(object):
|
||||||
# Check to see if protocol is closed and app is closed so we can remove it
|
# Check to see if protocol is closed and app is closed so we can remove it
|
||||||
if not application_instance and disconnected:
|
if not application_instance and disconnected:
|
||||||
del self.connections[protocol]
|
del self.connections[protocol]
|
||||||
reactor.callLater(1, self.application_checker)
|
self.reactor.callLater(1, self.application_checker)
|
||||||
|
|
||||||
def kill_all_applications(self):
|
def kill_all_applications(self):
|
||||||
"""
|
"""
|
||||||
|
@ -320,7 +325,7 @@ class Server(object):
|
||||||
"""
|
"""
|
||||||
for protocol in list(self.connections.keys()):
|
for protocol in list(self.connections.keys()):
|
||||||
protocol.check_timeouts()
|
protocol.check_timeouts()
|
||||||
reactor.callLater(2, self.timeout_checker)
|
self.reactor.callLater(2, self.timeout_checker)
|
||||||
|
|
||||||
def log_action(self, protocol, action, details):
|
def log_action(self, protocol, action, details):
|
||||||
"""
|
"""
|
||||||
|
|
Loading…
Reference in New Issue
Block a user