Refactor websocket servers a bit to share logic

This commit is contained in:
Andrew Godwin 2015-09-10 16:00:31 -05:00
parent f69ad33747
commit bd1553556d
3 changed files with 109 additions and 182 deletions

View File

@ -1,97 +1,9 @@
import asyncio import asyncio
import django
import time import time
from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory
from collections import deque
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND from .websocket_autobahn import get_protocol, get_factory
class InterfaceProtocol(WebSocketServerProtocol):
"""
Protocol which supports WebSockets and forwards incoming messages to
the websocket channels.
"""
def onConnect(self, request):
self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
self.request_info = {
"path": request.path,
"get": request.params,
}
def onOpen(self):
# Make sending channel
self.reply_channel = Channel.new_name("!websocket.send")
self.request_info["reply_channel"] = self.reply_channel
self.last_keepalive = time.time()
self.factory.protocols[self.reply_channel] = self
# Send news that this channel is open
Channel("websocket.connect").send(self.request_info)
def onMessage(self, payload, isBinary):
if isBinary:
Channel("websocket.receive").send(dict(
self.request_info,
content = payload,
binary = True,
))
else:
Channel("websocket.receive").send(dict(
self.request_info,
content = payload.decode("utf8"),
binary = False,
))
def serverSend(self, content, binary=False, **kwargs):
"""
Server-side channel message to send a message.
"""
if binary:
self.sendMessage(content, binary)
else:
self.sendMessage(content.encode("utf8"), binary)
def serverClose(self):
"""
Server-side channel message to close the socket
"""
self.sendClose()
def onClose(self, wasClean, code, reason):
if hasattr(self, "reply_channel"):
del self.factory.protocols[self.reply_channel]
Channel("websocket.disconnect").send(self.request_info)
def sendKeepalive(self):
"""
Sends a keepalive packet on the keepalive channel.
"""
Channel("websocket.keepalive").send(self.request_info)
self.last_keepalive = time.time()
class InterfaceFactory(WebSocketServerFactory):
"""
Factory which keeps track of its open protocols' receive channels
and can dispatch to them.
"""
# TODO: Clean up dead protocols if needed?
def __init__(self, *args, **kwargs):
super(InterfaceFactory, self).__init__(*args, **kwargs)
self.protocols = {}
def reply_channels(self):
return self.protocols.keys()
def dispatch_send(self, channel, message):
if message.get("close", False):
self.protocols[channel].serverClose()
else:
self.protocols[channel].serverSend(**message)
class WebsocketAsyncioInterface(object): class WebsocketAsyncioInterface(object):
@ -106,8 +18,8 @@ class WebsocketAsyncioInterface(object):
self.port = port self.port = port
def run(self): def run(self):
self.factory = InterfaceFactory("ws://0.0.0.0:%i" % self.port, debug=False) self.factory = get_factory(WebSocketServerFactory)("ws://0.0.0.0:%i" % self.port, debug=False)
self.factory.protocol = InterfaceProtocol self.factory.protocol = get_protocol(WebSocketServerProtocol)
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
coro = self.loop.create_server(self.factory, '0.0.0.0', 9000) coro = self.loop.create_server(self.factory, '0.0.0.0', 9000)
server = self.loop.run_until_complete(coro) server = self.loop.run_until_complete(coro)
@ -125,6 +37,8 @@ class WebsocketAsyncioInterface(object):
""" """
Run in a separate thread; reads messages from the backend. Run in a separate thread; reads messages from the backend.
""" """
# Wait for main loop to start
time.sleep(0.5)
while True: while True:
channels = self.factory.reply_channels() channels = self.factory.reply_channels()
# Quit if reactor is stopping # Quit if reactor is stopping

View File

@ -0,0 +1,101 @@
import time
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND
def get_protocol(base):
class InterfaceProtocol(base):
"""
Protocol which supports WebSockets and forwards incoming messages to
the websocket channels.
"""
def onConnect(self, request):
self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
self.request_info = {
"path": request.path,
"get": request.params,
}
def onOpen(self):
# Make sending channel
self.reply_channel = Channel.new_name("!websocket.send")
self.request_info["reply_channel"] = self.reply_channel
self.last_keepalive = time.time()
self.factory.protocols[self.reply_channel] = self
# Send news that this channel is open
Channel("websocket.connect").send(self.request_info)
def onMessage(self, payload, isBinary):
if isBinary:
Channel("websocket.receive").send({
"reply_channel": self.reply_channel,
"content": payload,
"binary": True,
})
else:
Channel("websocket.receive").send({
"reply_channel": self.reply_channel,
"content": payload.decode("utf8"),
"binary": False,
})
def serverSend(self, content, binary=False, **kwargs):
"""
Server-side channel message to send a message.
"""
if binary:
self.sendMessage(content, binary)
else:
self.sendMessage(content.encode("utf8"), binary)
def serverClose(self):
"""
Server-side channel message to close the socket
"""
self.sendClose()
def onClose(self, wasClean, code, reason):
if hasattr(self, "reply_channel"):
del self.factory.protocols[self.reply_channel]
Channel("websocket.disconnect").send({
"reply_channel": self.reply_channel,
})
def sendKeepalive(self):
"""
Sends a keepalive packet on the keepalive channel.
"""
Channel("websocket.keepalive").send({
"reply_channel": self.reply_channel,
})
self.last_keepalive = time.time()
return InterfaceProtocol
def get_factory(base):
class InterfaceFactory(base):
"""
Factory which keeps track of its open protocols' receive channels
and can dispatch to them.
"""
# TODO: Clean up dead protocols if needed?
def __init__(self, *args, **kwargs):
super(InterfaceFactory, self).__init__(*args, **kwargs)
self.protocols = {}
def reply_channels(self):
return self.protocols.keys()
def dispatch_send(self, channel, message):
if message.get("close", False):
self.protocols[channel].serverClose()
else:
self.protocols[channel].serverSend(**message)
return InterfaceFactory

View File

@ -1,97 +1,9 @@
import django
import time import time
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
from collections import deque
from twisted.internet import reactor from twisted.internet import reactor
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND from .websocket_autobahn import get_protocol, get_factory
class InterfaceProtocol(WebSocketServerProtocol):
"""
Protocol which supports WebSockets and forwards incoming messages to
the websocket channels.
"""
def onConnect(self, request):
self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
self.request_info = {
"path": request.path,
"get": request.params,
}
def onOpen(self):
# Make sending channel
self.reply_channel = Channel.new_name("!websocket.send")
self.request_info["reply_channel"] = self.reply_channel
self.last_keepalive = time.time()
self.factory.protocols[self.reply_channel] = self
# Send news that this channel is open
Channel("websocket.connect").send(self.request_info)
def onMessage(self, payload, isBinary):
if isBinary:
Channel("websocket.receive").send(dict(
self.request_info,
content = payload,
binary = True,
))
else:
Channel("websocket.receive").send(dict(
self.request_info,
content = payload.decode("utf8"),
binary = False,
))
def serverSend(self, content, binary=False, **kwargs):
"""
Server-side channel message to send a message.
"""
if binary:
self.sendMessage(content, binary)
else:
self.sendMessage(content.encode("utf8"), binary)
def serverClose(self):
"""
Server-side channel message to close the socket
"""
self.sendClose()
def onClose(self, wasClean, code, reason):
if hasattr(self, "reply_channel"):
del self.factory.protocols[self.reply_channel]
Channel("websocket.disconnect").send(self.request_info)
def sendKeepalive(self):
"""
Sends a keepalive packet on the keepalive channel.
"""
Channel("websocket.keepalive").send(self.request_info)
self.last_keepalive = time.time()
class InterfaceFactory(WebSocketServerFactory):
"""
Factory which keeps track of its open protocols' receive channels
and can dispatch to them.
"""
# TODO: Clean up dead protocols if needed?
def __init__(self, *args, **kwargs):
super(InterfaceFactory, self).__init__(*args, **kwargs)
self.protocols = {}
def reply_channels(self):
return self.protocols.keys()
def dispatch_send(self, channel, message):
if message.get("close", False):
self.protocols[channel].serverClose()
else:
self.protocols[channel].serverSend(**message)
class WebsocketTwistedInterface(object): class WebsocketTwistedInterface(object):
@ -106,8 +18,8 @@ class WebsocketTwistedInterface(object):
self.port = port self.port = port
def run(self): def run(self):
self.factory = InterfaceFactory("ws://0.0.0.0:%i" % self.port, debug=False) self.factory = get_factory(WebSocketServerFactory)("ws://0.0.0.0:%i" % self.port, debug=False)
self.factory.protocol = InterfaceProtocol self.factory.protocol = get_protocol(WebSocketServerProtocol)
reactor.listenTCP(self.port, self.factory) reactor.listenTCP(self.port, self.factory)
reactor.callInThread(self.backend_reader) reactor.callInThread(self.backend_reader)
reactor.callLater(1, self.keepalive_sender) reactor.callLater(1, self.keepalive_sender)