diff --git a/channels/interfaces/websocket_asyncio.py b/channels/interfaces/websocket_asyncio.py index 5dc3fe3..55c9ea3 100644 --- a/channels/interfaces/websocket_asyncio.py +++ b/channels/interfaces/websocket_asyncio.py @@ -1,97 +1,9 @@ import asyncio -import django import time from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory -from collections import deque -from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND - - -class InterfaceProtocol(WebSocketServerProtocol): - """ - Protocol which supports WebSockets and forwards incoming messages to - the websocket channels. - """ - - def onConnect(self, request): - self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] - self.request_info = { - "path": request.path, - "get": request.params, - } - - def onOpen(self): - # Make sending channel - self.reply_channel = Channel.new_name("!websocket.send") - self.request_info["reply_channel"] = self.reply_channel - self.last_keepalive = time.time() - self.factory.protocols[self.reply_channel] = self - # Send news that this channel is open - Channel("websocket.connect").send(self.request_info) - - def onMessage(self, payload, isBinary): - if isBinary: - Channel("websocket.receive").send(dict( - self.request_info, - content = payload, - binary = True, - )) - else: - Channel("websocket.receive").send(dict( - self.request_info, - content = payload.decode("utf8"), - binary = False, - )) - - def serverSend(self, content, binary=False, **kwargs): - """ - Server-side channel message to send a message. - """ - if binary: - self.sendMessage(content, binary) - else: - self.sendMessage(content.encode("utf8"), binary) - - def serverClose(self): - """ - Server-side channel message to close the socket - """ - self.sendClose() - - def onClose(self, wasClean, code, reason): - if hasattr(self, "reply_channel"): - del self.factory.protocols[self.reply_channel] - Channel("websocket.disconnect").send(self.request_info) - - def sendKeepalive(self): - """ - Sends a keepalive packet on the keepalive channel. - """ - Channel("websocket.keepalive").send(self.request_info) - self.last_keepalive = time.time() - - -class InterfaceFactory(WebSocketServerFactory): - """ - Factory which keeps track of its open protocols' receive channels - and can dispatch to them. - """ - - # TODO: Clean up dead protocols if needed? - - def __init__(self, *args, **kwargs): - super(InterfaceFactory, self).__init__(*args, **kwargs) - self.protocols = {} - - def reply_channels(self): - return self.protocols.keys() - - def dispatch_send(self, channel, message): - if message.get("close", False): - self.protocols[channel].serverClose() - else: - self.protocols[channel].serverSend(**message) +from .websocket_autobahn import get_protocol, get_factory class WebsocketAsyncioInterface(object): @@ -106,8 +18,8 @@ class WebsocketAsyncioInterface(object): self.port = port def run(self): - self.factory = InterfaceFactory("ws://0.0.0.0:%i" % self.port, debug=False) - self.factory.protocol = InterfaceProtocol + self.factory = get_factory(WebSocketServerFactory)("ws://0.0.0.0:%i" % self.port, debug=False) + self.factory.protocol = get_protocol(WebSocketServerProtocol) self.loop = asyncio.get_event_loop() coro = self.loop.create_server(self.factory, '0.0.0.0', 9000) server = self.loop.run_until_complete(coro) @@ -125,6 +37,8 @@ class WebsocketAsyncioInterface(object): """ Run in a separate thread; reads messages from the backend. """ + # Wait for main loop to start + time.sleep(0.5) while True: channels = self.factory.reply_channels() # Quit if reactor is stopping diff --git a/channels/interfaces/websocket_autobahn.py b/channels/interfaces/websocket_autobahn.py new file mode 100644 index 0000000..dae31e0 --- /dev/null +++ b/channels/interfaces/websocket_autobahn.py @@ -0,0 +1,101 @@ +import time + +from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND + + +def get_protocol(base): + + class InterfaceProtocol(base): + """ + Protocol which supports WebSockets and forwards incoming messages to + the websocket channels. + """ + + def onConnect(self, request): + self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] + self.request_info = { + "path": request.path, + "get": request.params, + } + + def onOpen(self): + # Make sending channel + self.reply_channel = Channel.new_name("!websocket.send") + self.request_info["reply_channel"] = self.reply_channel + self.last_keepalive = time.time() + self.factory.protocols[self.reply_channel] = self + # Send news that this channel is open + Channel("websocket.connect").send(self.request_info) + + def onMessage(self, payload, isBinary): + if isBinary: + Channel("websocket.receive").send({ + "reply_channel": self.reply_channel, + "content": payload, + "binary": True, + }) + else: + Channel("websocket.receive").send({ + "reply_channel": self.reply_channel, + "content": payload.decode("utf8"), + "binary": False, + }) + + def serverSend(self, content, binary=False, **kwargs): + """ + Server-side channel message to send a message. + """ + if binary: + self.sendMessage(content, binary) + else: + self.sendMessage(content.encode("utf8"), binary) + + def serverClose(self): + """ + Server-side channel message to close the socket + """ + self.sendClose() + + def onClose(self, wasClean, code, reason): + if hasattr(self, "reply_channel"): + del self.factory.protocols[self.reply_channel] + Channel("websocket.disconnect").send({ + "reply_channel": self.reply_channel, + }) + + def sendKeepalive(self): + """ + Sends a keepalive packet on the keepalive channel. + """ + Channel("websocket.keepalive").send({ + "reply_channel": self.reply_channel, + }) + self.last_keepalive = time.time() + + return InterfaceProtocol + + +def get_factory(base): + + class InterfaceFactory(base): + """ + Factory which keeps track of its open protocols' receive channels + and can dispatch to them. + """ + + # TODO: Clean up dead protocols if needed? + + def __init__(self, *args, **kwargs): + super(InterfaceFactory, self).__init__(*args, **kwargs) + self.protocols = {} + + def reply_channels(self): + return self.protocols.keys() + + def dispatch_send(self, channel, message): + if message.get("close", False): + self.protocols[channel].serverClose() + else: + self.protocols[channel].serverSend(**message) + + return InterfaceFactory diff --git a/channels/interfaces/websocket_twisted.py b/channels/interfaces/websocket_twisted.py index ff5e947..e6055f1 100644 --- a/channels/interfaces/websocket_twisted.py +++ b/channels/interfaces/websocket_twisted.py @@ -1,97 +1,9 @@ -import django import time from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory -from collections import deque from twisted.internet import reactor -from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND - - -class InterfaceProtocol(WebSocketServerProtocol): - """ - Protocol which supports WebSockets and forwards incoming messages to - the websocket channels. - """ - - def onConnect(self, request): - self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] - self.request_info = { - "path": request.path, - "get": request.params, - } - - def onOpen(self): - # Make sending channel - self.reply_channel = Channel.new_name("!websocket.send") - self.request_info["reply_channel"] = self.reply_channel - self.last_keepalive = time.time() - self.factory.protocols[self.reply_channel] = self - # Send news that this channel is open - Channel("websocket.connect").send(self.request_info) - - def onMessage(self, payload, isBinary): - if isBinary: - Channel("websocket.receive").send(dict( - self.request_info, - content = payload, - binary = True, - )) - else: - Channel("websocket.receive").send(dict( - self.request_info, - content = payload.decode("utf8"), - binary = False, - )) - - def serverSend(self, content, binary=False, **kwargs): - """ - Server-side channel message to send a message. - """ - if binary: - self.sendMessage(content, binary) - else: - self.sendMessage(content.encode("utf8"), binary) - - def serverClose(self): - """ - Server-side channel message to close the socket - """ - self.sendClose() - - def onClose(self, wasClean, code, reason): - if hasattr(self, "reply_channel"): - del self.factory.protocols[self.reply_channel] - Channel("websocket.disconnect").send(self.request_info) - - def sendKeepalive(self): - """ - Sends a keepalive packet on the keepalive channel. - """ - Channel("websocket.keepalive").send(self.request_info) - self.last_keepalive = time.time() - - -class InterfaceFactory(WebSocketServerFactory): - """ - Factory which keeps track of its open protocols' receive channels - and can dispatch to them. - """ - - # TODO: Clean up dead protocols if needed? - - def __init__(self, *args, **kwargs): - super(InterfaceFactory, self).__init__(*args, **kwargs) - self.protocols = {} - - def reply_channels(self): - return self.protocols.keys() - - def dispatch_send(self, channel, message): - if message.get("close", False): - self.protocols[channel].serverClose() - else: - self.protocols[channel].serverSend(**message) +from .websocket_autobahn import get_protocol, get_factory class WebsocketTwistedInterface(object): @@ -106,8 +18,8 @@ class WebsocketTwistedInterface(object): self.port = port def run(self): - self.factory = InterfaceFactory("ws://0.0.0.0:%i" % self.port, debug=False) - self.factory.protocol = InterfaceProtocol + self.factory = get_factory(WebSocketServerFactory)("ws://0.0.0.0:%i" % self.port, debug=False) + self.factory.protocol = get_protocol(WebSocketServerProtocol) reactor.listenTCP(self.port, self.factory) reactor.callInThread(self.backend_reader) reactor.callLater(1, self.keepalive_sender)