Use twisted variant of receive_many if available.

This commit is contained in:
Andrew Godwin 2016-07-26 20:00:11 +01:00
parent a249d6a69c
commit dd3bf9b0b0

View File

@ -1,7 +1,7 @@
import logging
import socket
from twisted.internet import reactor
from twisted.internet import reactor, defer
from twisted.logger import globalLogBeginner
from .http_protocol import HTTPFactory
@ -63,11 +63,16 @@ class Server(object):
else:
reactor.listenTCP(self.port, self.factory, interface=self.host)
reactor.callLater(0, self.backend_reader)
if "twisted" in self.channel_layer.extensions:
logging.info("Using native Twisted mode on channel layer")
reactor.callLater(0, self.backend_reader_twisted)
else:
logging.info("Using busy-loop synchronous mode on channel layer")
reactor.callLater(0, self.backend_reader_sync)
reactor.callLater(2, self.timeout_checker)
reactor.run(installSignalHandlers=self.signal_handlers)
def backend_reader(self):
def backend_reader_sync(self):
"""
Runs as an-often-as-possible task with the reactor, unless there was
no result previously in which case we add a small delay.
@ -83,10 +88,36 @@ class Server(object):
delay = 0.01
channel, message = self.channel_layer.receive_many(channels, block=False)
if channel:
delay = 0
delay = 0.00
# Deal with the message
self.factory.dispatch_reply(channel, message)
reactor.callLater(delay, self.backend_reader)
reactor.callLater(delay, self.backend_reader_sync)
@defer.inlineCallbacks
def backend_reader_twisted(self):
"""
Runs as an-often-as-possible task with the reactor, unless there was
no result previously in which case we add a small delay.
"""
while True:
if not reactor.running:
logging.debug("Backend reader quitting due to reactor stop")
return
channels = self.factory.reply_channels()
if channels:
channel, message = yield self.channel_layer.receive_many_twisted(channels)
# Deal with the message
if channel:
self.factory.dispatch_reply(channel, message)
else:
yield self.sleep(0.01)
else:
yield self.sleep(0.05)
def sleep(self, delay):
d = defer.Deferred()
reactor.callLater(delay, d.callback, None)
return d
def timeout_checker(self):
"""