From bd1553556d38f61d88d8cbbbde0bc50f4afc45b8 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Sep 2015 16:00:31 -0500 Subject: [PATCH 1/3] Refactor websocket servers a bit to share logic --- channels/interfaces/websocket_asyncio.py | 96 ++------------------ channels/interfaces/websocket_autobahn.py | 101 ++++++++++++++++++++++ channels/interfaces/websocket_twisted.py | 94 +------------------- 3 files changed, 109 insertions(+), 182 deletions(-) create mode 100644 channels/interfaces/websocket_autobahn.py diff --git a/channels/interfaces/websocket_asyncio.py b/channels/interfaces/websocket_asyncio.py index 5dc3fe3..55c9ea3 100644 --- a/channels/interfaces/websocket_asyncio.py +++ b/channels/interfaces/websocket_asyncio.py @@ -1,97 +1,9 @@ import asyncio -import django import time from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory -from collections import deque -from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND - - -class InterfaceProtocol(WebSocketServerProtocol): - """ - Protocol which supports WebSockets and forwards incoming messages to - the websocket channels. - """ - - def onConnect(self, request): - self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] - self.request_info = { - "path": request.path, - "get": request.params, - } - - def onOpen(self): - # Make sending channel - self.reply_channel = Channel.new_name("!websocket.send") - self.request_info["reply_channel"] = self.reply_channel - self.last_keepalive = time.time() - self.factory.protocols[self.reply_channel] = self - # Send news that this channel is open - Channel("websocket.connect").send(self.request_info) - - def onMessage(self, payload, isBinary): - if isBinary: - Channel("websocket.receive").send(dict( - self.request_info, - content = payload, - binary = True, - )) - else: - Channel("websocket.receive").send(dict( - self.request_info, - content = payload.decode("utf8"), - binary = False, - )) - - def serverSend(self, content, binary=False, **kwargs): - """ - Server-side channel message to send a message. - """ - if binary: - self.sendMessage(content, binary) - else: - self.sendMessage(content.encode("utf8"), binary) - - def serverClose(self): - """ - Server-side channel message to close the socket - """ - self.sendClose() - - def onClose(self, wasClean, code, reason): - if hasattr(self, "reply_channel"): - del self.factory.protocols[self.reply_channel] - Channel("websocket.disconnect").send(self.request_info) - - def sendKeepalive(self): - """ - Sends a keepalive packet on the keepalive channel. - """ - Channel("websocket.keepalive").send(self.request_info) - self.last_keepalive = time.time() - - -class InterfaceFactory(WebSocketServerFactory): - """ - Factory which keeps track of its open protocols' receive channels - and can dispatch to them. - """ - - # TODO: Clean up dead protocols if needed? - - def __init__(self, *args, **kwargs): - super(InterfaceFactory, self).__init__(*args, **kwargs) - self.protocols = {} - - def reply_channels(self): - return self.protocols.keys() - - def dispatch_send(self, channel, message): - if message.get("close", False): - self.protocols[channel].serverClose() - else: - self.protocols[channel].serverSend(**message) +from .websocket_autobahn import get_protocol, get_factory class WebsocketAsyncioInterface(object): @@ -106,8 +18,8 @@ class WebsocketAsyncioInterface(object): self.port = port def run(self): - self.factory = InterfaceFactory("ws://0.0.0.0:%i" % self.port, debug=False) - self.factory.protocol = InterfaceProtocol + self.factory = get_factory(WebSocketServerFactory)("ws://0.0.0.0:%i" % self.port, debug=False) + self.factory.protocol = get_protocol(WebSocketServerProtocol) self.loop = asyncio.get_event_loop() coro = self.loop.create_server(self.factory, '0.0.0.0', 9000) server = self.loop.run_until_complete(coro) @@ -125,6 +37,8 @@ class WebsocketAsyncioInterface(object): """ Run in a separate thread; reads messages from the backend. """ + # Wait for main loop to start + time.sleep(0.5) while True: channels = self.factory.reply_channels() # Quit if reactor is stopping diff --git a/channels/interfaces/websocket_autobahn.py b/channels/interfaces/websocket_autobahn.py new file mode 100644 index 0000000..dae31e0 --- /dev/null +++ b/channels/interfaces/websocket_autobahn.py @@ -0,0 +1,101 @@ +import time + +from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND + + +def get_protocol(base): + + class InterfaceProtocol(base): + """ + Protocol which supports WebSockets and forwards incoming messages to + the websocket channels. + """ + + def onConnect(self, request): + self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] + self.request_info = { + "path": request.path, + "get": request.params, + } + + def onOpen(self): + # Make sending channel + self.reply_channel = Channel.new_name("!websocket.send") + self.request_info["reply_channel"] = self.reply_channel + self.last_keepalive = time.time() + self.factory.protocols[self.reply_channel] = self + # Send news that this channel is open + Channel("websocket.connect").send(self.request_info) + + def onMessage(self, payload, isBinary): + if isBinary: + Channel("websocket.receive").send({ + "reply_channel": self.reply_channel, + "content": payload, + "binary": True, + }) + else: + Channel("websocket.receive").send({ + "reply_channel": self.reply_channel, + "content": payload.decode("utf8"), + "binary": False, + }) + + def serverSend(self, content, binary=False, **kwargs): + """ + Server-side channel message to send a message. + """ + if binary: + self.sendMessage(content, binary) + else: + self.sendMessage(content.encode("utf8"), binary) + + def serverClose(self): + """ + Server-side channel message to close the socket + """ + self.sendClose() + + def onClose(self, wasClean, code, reason): + if hasattr(self, "reply_channel"): + del self.factory.protocols[self.reply_channel] + Channel("websocket.disconnect").send({ + "reply_channel": self.reply_channel, + }) + + def sendKeepalive(self): + """ + Sends a keepalive packet on the keepalive channel. + """ + Channel("websocket.keepalive").send({ + "reply_channel": self.reply_channel, + }) + self.last_keepalive = time.time() + + return InterfaceProtocol + + +def get_factory(base): + + class InterfaceFactory(base): + """ + Factory which keeps track of its open protocols' receive channels + and can dispatch to them. + """ + + # TODO: Clean up dead protocols if needed? + + def __init__(self, *args, **kwargs): + super(InterfaceFactory, self).__init__(*args, **kwargs) + self.protocols = {} + + def reply_channels(self): + return self.protocols.keys() + + def dispatch_send(self, channel, message): + if message.get("close", False): + self.protocols[channel].serverClose() + else: + self.protocols[channel].serverSend(**message) + + return InterfaceFactory diff --git a/channels/interfaces/websocket_twisted.py b/channels/interfaces/websocket_twisted.py index ff5e947..e6055f1 100644 --- a/channels/interfaces/websocket_twisted.py +++ b/channels/interfaces/websocket_twisted.py @@ -1,97 +1,9 @@ -import django import time from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory -from collections import deque from twisted.internet import reactor -from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND - - -class InterfaceProtocol(WebSocketServerProtocol): - """ - Protocol which supports WebSockets and forwards incoming messages to - the websocket channels. - """ - - def onConnect(self, request): - self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] - self.request_info = { - "path": request.path, - "get": request.params, - } - - def onOpen(self): - # Make sending channel - self.reply_channel = Channel.new_name("!websocket.send") - self.request_info["reply_channel"] = self.reply_channel - self.last_keepalive = time.time() - self.factory.protocols[self.reply_channel] = self - # Send news that this channel is open - Channel("websocket.connect").send(self.request_info) - - def onMessage(self, payload, isBinary): - if isBinary: - Channel("websocket.receive").send(dict( - self.request_info, - content = payload, - binary = True, - )) - else: - Channel("websocket.receive").send(dict( - self.request_info, - content = payload.decode("utf8"), - binary = False, - )) - - def serverSend(self, content, binary=False, **kwargs): - """ - Server-side channel message to send a message. - """ - if binary: - self.sendMessage(content, binary) - else: - self.sendMessage(content.encode("utf8"), binary) - - def serverClose(self): - """ - Server-side channel message to close the socket - """ - self.sendClose() - - def onClose(self, wasClean, code, reason): - if hasattr(self, "reply_channel"): - del self.factory.protocols[self.reply_channel] - Channel("websocket.disconnect").send(self.request_info) - - def sendKeepalive(self): - """ - Sends a keepalive packet on the keepalive channel. - """ - Channel("websocket.keepalive").send(self.request_info) - self.last_keepalive = time.time() - - -class InterfaceFactory(WebSocketServerFactory): - """ - Factory which keeps track of its open protocols' receive channels - and can dispatch to them. - """ - - # TODO: Clean up dead protocols if needed? - - def __init__(self, *args, **kwargs): - super(InterfaceFactory, self).__init__(*args, **kwargs) - self.protocols = {} - - def reply_channels(self): - return self.protocols.keys() - - def dispatch_send(self, channel, message): - if message.get("close", False): - self.protocols[channel].serverClose() - else: - self.protocols[channel].serverSend(**message) +from .websocket_autobahn import get_protocol, get_factory class WebsocketTwistedInterface(object): @@ -106,8 +18,8 @@ class WebsocketTwistedInterface(object): self.port = port def run(self): - self.factory = InterfaceFactory("ws://0.0.0.0:%i" % self.port, debug=False) - self.factory.protocol = InterfaceProtocol + self.factory = get_factory(WebSocketServerFactory)("ws://0.0.0.0:%i" % self.port, debug=False) + self.factory.protocol = get_protocol(WebSocketServerProtocol) reactor.listenTCP(self.port, self.factory) reactor.callInThread(self.backend_reader) reactor.callLater(1, self.keepalive_sender) From 655213eff9f7d9643e264df6d3fff411434a2d73 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Sep 2015 16:00:39 -0500 Subject: [PATCH 2/3] 0.8 --- channels/__init__.py | 2 +- docs/index.rst | 1 + docs/releases/0.8.rst | 22 ++++++++++++++++++++++ docs/releases/index.rst | 7 +++++++ setup.py | 2 +- 5 files changed, 32 insertions(+), 2 deletions(-) create mode 100644 docs/releases/0.8.rst create mode 100644 docs/releases/index.rst diff --git a/channels/__init__.py b/channels/__init__.py index 2ee5420..8fd164c 100644 --- a/channels/__init__.py +++ b/channels/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.1.1" +__version__ = "0.8" # Load backends, using settings if available (else falling back to a default) DEFAULT_CHANNEL_BACKEND = "default" diff --git a/docs/index.rst b/docs/index.rst index a1c8d5f..d246cee 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -31,3 +31,4 @@ Contents: scaling backends faqs + releases/index diff --git a/docs/releases/0.8.rst b/docs/releases/0.8.rst new file mode 100644 index 0000000..ddeac26 --- /dev/null +++ b/docs/releases/0.8.rst @@ -0,0 +1,22 @@ +0.8 (2015-09-10) +---------------- + +This release reworks a few of the core concepts to make the channel layer +more efficient and user friendly: + +* Channel names now do not start with ``django``, and are instead just ``http.request``, etc. + +* HTTP headers/GET/etc are only sent with ``websocket.connect`` rather than all websocket requests, + to save a lot of bandwidth in the channel layer. + +* The session/user decorators were renamed, and a ``@channel_session_user`` and ``transfer_user`` set of functions + added to allow moving the user details from the HTTP session to the channel session in the ``connect`` consumer. + +* A ``@linearize`` decorator was added to help ensure a ``connect``/``receive`` pair are not executed + simultanously on two different workers. + +* Channel backends gained locking mechanisms to support the ``linearize`` feature. + +* ``runwsserver`` will use asyncio rather than Twisted if it's available. + +* Message formats have been made a bit more consistent. diff --git a/docs/releases/index.rst b/docs/releases/index.rst new file mode 100644 index 0000000..231771f --- /dev/null +++ b/docs/releases/index.rst @@ -0,0 +1,7 @@ +Release Notes +------------- + +.. toctree:: + :maxdepth: 1 + + 0.8 diff --git a/setup.py b/setup.py index 845e3c1..d8c5394 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import find_packages, setup setup( name='channels', - version="0.7", + version="0.8", url='http://github.com/andrewgodwin/django-channels', author='Andrew Godwin', author_email='andrew@aeracode.org', From 4a8bae272b5a8285104daef96b7f21b11129c1f0 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Sep 2015 16:34:28 -0500 Subject: [PATCH 3/3] Update docs to recommend doing routing not in settings --- docs/concepts.rst | 4 ++-- docs/getting-started.rst | 48 +++++++++++++++++----------------------- 2 files changed, 22 insertions(+), 30 deletions(-) diff --git a/docs/concepts.rst b/docs/concepts.rst index cb5febb..5bbf21b 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -65,9 +65,9 @@ you can write a function to consume a channel, like so:: def my_consumer(message): pass -And then assign a channel to it like this in the channel backend settings:: +And then assign a channel to it like this in the channel routing:: - "ROUTING": { + channel_routing = { "some-channel": "myapp.consumers.my_consumer", } diff --git a/docs/getting-started.rst b/docs/getting-started.rst index 5b1d88e..6158fdf 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -44,21 +44,25 @@ For now, we want to override the *channel routing* so that, rather than going to the URL resolver and our normal view stack, all HTTP requests go to our custom consumer we wrote above. Here's what that looks like:: + # In settings.py CHANNEL_BACKENDS = { "default": { "BACKEND": "channels.backends.database.DatabaseChannelBackend", - "ROUTING": { - "http.request": "myproject.myapp.consumers.http_consumer", - }, + "ROUTING": "myproject.routing.channel_routing", }, } + # In routing.py + channel_routing = { + "http.request": "myproject.myapp.consumers.http_consumer", + } + As you can see, this is a little like Django's ``DATABASES`` setting; there are named channel backends, with a default one called ``default``. Each backend needs a class specified which powers it - we'll come to the options there later - -and a routing scheme, which can either be defined directly as a dict or as -a string pointing to a dict in another file (if you'd rather keep it outside -settings). +and a routing scheme, which points to a dict containing the routing settings. +It's recommended you call this ``routing.py`` and put it alongside ``urls.py`` +in your project. If you start up ``python manage.py runserver`` and go to ``http://localhost:8000``, you'll see that, rather than a default Django page, @@ -78,13 +82,8 @@ serve HTTP requests from now on - and make this WebSocket consumer instead:: Hook it up to the ``websocket.connect`` channel like this:: - CHANNEL_BACKENDS = { - "default": { - "BACKEND": "channels.backends.database.DatabaseChannelBackend", - "ROUTING": { - "websocket.connect": "myproject.myapp.consumers.ws_add", - }, - }, + channel_routing = { + "websocket.connect": "myproject.myapp.consumers.ws_add", } Now, let's look at what this is doing. It's tied to the @@ -116,12 +115,10 @@ group it's not in):: Of course, this is exactly the same code as the ``connect`` handler, so let's just route both channels to the same consumer:: - ... - "ROUTING": { + channel_routing = { "websocket.connect": "myproject.myapp.consumers.ws_add", "websocket.keepalive": "myproject.myapp.consumers.ws_add", - }, - ... + } And, even though channels will expire out, let's add an explicit ``disconnect`` handler to clean up as people disconnect (most channels will cleanly disconnect @@ -152,18 +149,13 @@ any message sent in to all connected clients. Here's all the code:: def ws_disconnect(message): Group("chat").discard(message.reply_channel) -And what our routing should look like in ``settings.py``:: +And what our routing should look like in ``routing.py``:: - CHANNEL_BACKENDS = { - "default": { - "BACKEND": "channels.backends.database.DatabaseChannelBackend", - "ROUTING": { - "websocket.connect": "myproject.myapp.consumers.ws_add", - "websocket.keepalive": "myproject.myapp.consumers.ws_add", - "websocket.receive": "myproject.myapp.consumers.ws_message", - "websocket.disconnect": "myproject.myapp.consumers.ws_disconnect", - }, - }, + channel_routing = { + "websocket.connect": "myproject.myapp.consumers.ws_add", + "websocket.keepalive": "myproject.myapp.consumers.ws_add", + "websocket.receive": "myproject.myapp.consumers.ws_message", + "websocket.disconnect": "myproject.myapp.consumers.ws_disconnect", } With all that code in your ``consumers.py`` file, you now have a working