daphne/channels/interfaces/websocket_twisted.py
2015-09-10 16:00:31 -05:00

60 lines
2.1 KiB
Python

import time
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
from twisted.internet import reactor
from .websocket_autobahn import get_protocol, get_factory
class WebsocketTwistedInterface(object):
"""
Easy API to run a WebSocket interface server using Twisted.
Integrates the channel backend by running it in a separate thread, using
the always-compatible polling style.
"""
def __init__(self, channel_backend, port=9000):
self.channel_backend = channel_backend
self.port = port
def run(self):
self.factory = get_factory(WebSocketServerFactory)("ws://0.0.0.0:%i" % self.port, debug=False)
self.factory.protocol = get_protocol(WebSocketServerProtocol)
reactor.listenTCP(self.port, self.factory)
reactor.callInThread(self.backend_reader)
reactor.callLater(1, self.keepalive_sender)
reactor.run()
def backend_reader(self):
"""
Run in a separate thread; reads messages from the backend.
"""
while True:
channels = self.factory.reply_channels()
# Quit if reactor is stopping
if not reactor.running:
return
# Don't do anything if there's no channels to listen on
if channels:
channel, message = self.channel_backend.receive_many(channels)
else:
time.sleep(0.1)
continue
# Wait around if there's nothing received
if channel is None:
time.sleep(0.05)
continue
# Deal with the message
self.factory.dispatch_send(channel, message)
def keepalive_sender(self):
"""
Sends keepalive messages for open WebSockets every
(channel_backend expiry / 2) seconds.
"""
expiry_window = int(self.channel_backend.expiry / 2)
for protocol in self.factory.protocols.values():
if time.time() - protocol.last_keepalive > expiry_window:
protocol.sendKeepalive()
reactor.callLater(1, self.keepalive_sender)