From 75ee13ff9c25d82166b74d5d37bb21124e9a0fb4 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Wed, 10 Jun 2015 23:02:53 -0700 Subject: [PATCH] Add redis backend --- channels/backends/database.py | 1 - channels/backends/redis_py.py | 56 +++++++++++++++++++++ channels/interfaces/websocket_twisted.py | 14 ++++-- channels/management/commands/runwsserver.py | 15 +++--- 4 files changed, 75 insertions(+), 11 deletions(-) create mode 100644 channels/backends/redis_py.py diff --git a/channels/backends/database.py b/channels/backends/database.py index 1553216..bf8e842 100644 --- a/channels/backends/database.py +++ b/channels/backends/database.py @@ -9,7 +9,6 @@ from django.utils.timezone import now from .base import BaseChannelBackend -queues = {} class DatabaseChannelBackend(BaseChannelBackend): """ diff --git a/channels/backends/redis_py.py b/channels/backends/redis_py.py new file mode 100644 index 0000000..13ea81d --- /dev/null +++ b/channels/backends/redis_py.py @@ -0,0 +1,56 @@ +import time +import json +import datetime +import redis +import uuid + +from .base import BaseChannelBackend + + +class RedisChannelBackend(BaseChannelBackend): + """ + ORM-backed channel environment. For development use only; it will span + multiple processes fine, but it's going to be pretty bad at throughput. + """ + + def __init__(self, expiry=60, host="localhost", port=6379, prefix="django-channels:"): + super(RedisChannelBackend, self).__init__(expiry) + self.host = host + self.port = port + self.prefix = prefix + + @property + def connection(self): + """ + Returns the correct connection for the current thread. + """ + return redis.Redis(host=self.host, port=self.port) + + def send(self, channel, message): + key = uuid.uuid4() + self.connection.set( + key, + json.dumps(message), + ex = self.expiry + 10, + ) + self.connection.rpush( + self.prefix + channel, + key, + ) + + def receive_many(self, channels): + if not channels: + raise ValueError("Cannot receive on empty channel list!") + # Get a message from one of our channels + while True: + result = self.connection.blpop([self.prefix + channel for channel in channels], timeout=1) + if result: + content = self.connection.get(result[1]) + if content is None: + continue + return result[0][len(self.prefix):], json.loads(content) + else: + return None, None + + def __str__(self): + return "%s(host=%s, port=%s)" % (self.__class__.__name__, self.host, self.port) diff --git a/channels/interfaces/websocket_twisted.py b/channels/interfaces/websocket_twisted.py index 6650c82..a24f233 100644 --- a/channels/interfaces/websocket_twisted.py +++ b/channels/interfaces/websocket_twisted.py @@ -40,7 +40,10 @@ class InterfaceProtocol(WebSocketServerProtocol): ) def onChannelSend(self, content, binary=False, **kwargs): - self.sendMessage(content, binary) + if binary: + self.sendMessage(content, binary) + else: + self.sendMessage(content.encode("utf8"), binary) def onClose(self, wasClean, code, reason): del self.factory.protocols[self.send_channel] @@ -71,8 +74,8 @@ class InterfaceFactory(WebSocketServerFactory): class WebsocketTwistedInterface(object): """ Easy API to run a WebSocket interface server using Twisted. - Integrates the channel backend by running it in a separate thread, as we don't - know if the backend is Twisted-compliant. + Integrates the channel backend by running it in a separate thread, using + the always-compatible polling style. """ def __init__(self, channel_backend, port=9000): @@ -80,7 +83,7 @@ class WebsocketTwistedInterface(object): self.port = port def run(self): - self.factory = InterfaceFactory("ws://localhost:%i" % self.port, debug=False) + self.factory = InterfaceFactory("ws://0.0.0.0:%i" % self.port, debug=False) self.factory.protocol = InterfaceProtocol reactor.listenTCP(self.port, self.factory) reactor.callInThread(self.backend_reader) @@ -92,6 +95,9 @@ class WebsocketTwistedInterface(object): """ while True: channels = self.factory.send_channels() + # Quit if reactor is stopping + if not reactor.running: + return # Don't do anything if there's no channels to listen on if channels: channel, message = self.channel_backend.receive_many(channels) diff --git a/channels/management/commands/runwsserver.py b/channels/management/commands/runwsserver.py index d5d072c..9434be0 100644 --- a/channels/management/commands/runwsserver.py +++ b/channels/management/commands/runwsserver.py @@ -7,6 +7,10 @@ from channels.utils import auto_import_consumers class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument('port', nargs='?', + help='Optional port number') + def handle(self, *args, **options): # Get the backend to use channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] @@ -16,10 +20,9 @@ class Command(BaseCommand): "You have a process-local channel backend configured, and so cannot run separate interface servers.\n" "Configure a network-based backend in CHANNEL_BACKENDS to use this command." ) - # Launch a worker - self.stdout.write("Running Twisted/Autobahn WebSocket interface against backend %s" % channel_backend) # Run the interface - try: - WebsocketTwistedInterface(channel_backend=channel_backend).run() - except KeyboardInterrupt: - pass + port = options.get("port", None) or 9000 + self.stdout.write("Running Twisted/Autobahn WebSocket interface server") + self.stdout.write(" Channel backend: %s" % channel_backend) + self.stdout.write(" Listening on: ws://0.0.0.0:%i" % port) + WebsocketTwistedInterface(channel_backend=channel_backend, port=port).run()