diff --git a/channels/backends/base.py b/channels/backends/base.py index c2d8f7c..1cc27e2 100644 --- a/channels/backends/base.py +++ b/channels/backends/base.py @@ -1,3 +1,4 @@ +import time from channels.consumer_registry import ConsumerRegistry @@ -31,10 +32,34 @@ class BaseChannelBackend(object): def receive_many(self, channels): """ - Block and return the first message available on one of the - channels passed, as a (channel, message) tuple. + Return the first message available on one of the + channels passed, as a (channel, message) tuple, or return (None, None) + if no channels are available. + + Should not block, but is allowed to be moderately slow/have a short + timeout - it needs to return so we can refresh the list of channels, + not because the rest of the process is waiting on it. + + Better performance can be achieved for interface servers by directly + integrating the server and the backend code; this is merely for a + generic support-everything pattern. """ raise NotImplementedError() + def receive_many_blocking(self, channels): + """ + Blocking version of receive_many, if the calling context knows it + doesn't ever want to change the channels list until something happens. + + This base class provides a default implementation; can be overridden + to be more efficient by subclasses. + """ + while True: + channel, message = self.receive_many(channels) + if channel is None: + time.sleep(0.05) + continue + return channel, message + def __str__(self): return self.__class__.__name__ diff --git a/channels/backends/database.py b/channels/backends/database.py index eb48a19..1553216 100644 --- a/channels/backends/database.py +++ b/channels/backends/database.py @@ -60,16 +60,15 @@ class DatabaseChannelBackend(BaseChannelBackend): def receive_many(self, channels): if not channels: raise ValueError("Cannot receive on empty channel list!") - while True: - # Delete all expired messages (add 10 second grace period for clock sync) - self.model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete() - # Get a message from one of our channels - message = self.model.objects.filter(channel__in=channels).order_by("id").first() - if message: - self.model.objects.filter(pk=message.pk).delete() - return message.channel, json.loads(message.content) - # If all empty, sleep for a little bit - time.sleep(0.1) + # Delete all expired messages (add 10 second grace period for clock sync) + self.model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete() + # Get a message from one of our channels + message = self.model.objects.filter(channel__in=channels).order_by("id").first() + if message: + self.model.objects.filter(pk=message.pk).delete() + return message.channel, json.loads(message.content) + else: + return None, None def __str__(self): return "%s(alias=%s)" % (self.__class__.__name__, self.connection.alias) diff --git a/channels/backends/memory.py b/channels/backends/memory.py index 3b7baca..1d53d45 100644 --- a/channels/backends/memory.py +++ b/channels/backends/memory.py @@ -22,15 +22,13 @@ class InMemoryChannelBackend(BaseChannelBackend): def receive_many(self, channels): if not channels: raise ValueError("Cannot receive on empty channel list!") - while True: - # Try to pop a message from each channel - for channel in channels: - try: - # This doesn't clean up empty channels - OK for testing. - # For later versions, have cleanup w/lock. - return channel, queues[channel].popleft() - except (IndexError, KeyError): - pass - # If all empty, sleep for a little bit - time.sleep(0.01) + # Try to pop a message from each channel + for channel in channels: + try: + # This doesn't clean up empty channels - OK for testing. + # For later versions, have cleanup w/lock. + return channel, queues[channel].popleft() + except (IndexError, KeyError): + pass + return None, None diff --git a/channels/channel.py b/channels/channel.py index c1b8330..9572525 100644 --- a/channels/channel.py +++ b/channels/channel.py @@ -18,12 +18,15 @@ class Channel(object): "default" one by default. """ - def __init__(self, name, alias=DEFAULT_CHANNEL_BACKEND): + def __init__(self, name, alias=DEFAULT_CHANNEL_BACKEND, channel_backend=None): """ Create an instance for the channel named "name" """ self.name = name - self.channel_backend = channel_backends[alias] + if channel_backend: + self.channel_backend = channel_backend + else: + self.channel_backend = channel_backends[alias] def send(self, **kwargs): """ diff --git a/channels/interfaces/__init__.py b/channels/interfaces/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/channels/interfaces/websocket_twisted.py b/channels/interfaces/websocket_twisted.py new file mode 100644 index 0000000..6650c82 --- /dev/null +++ b/channels/interfaces/websocket_twisted.py @@ -0,0 +1,106 @@ +import django +import time +from collections import deque +from twisted.internet import reactor +from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND +from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory + + +class InterfaceProtocol(WebSocketServerProtocol): + """ + Protocol which supports WebSockets and forwards incoming messages to + the django.websocket channels. + """ + + def onConnect(self, request): + self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] + self.request = request + + def onOpen(self): + # Make sending channel + self.send_channel = Channel.new_name("django.websocket.send") + self.factory.protocols[self.send_channel] = self + # Send news that this channel is open + Channel("django.websocket.connect").send( + send_channel = self.send_channel, + ) + + def onMessage(self, payload, isBinary): + if isBinary: + Channel("django.websocket.receive").send( + send_channel = self.send_channel, + content = payload, + binary = True, + ) + else: + Channel("django.websocket.receive").send( + send_channel = self.send_channel, + content = payload.decode("utf8"), + binary = False, + ) + + def onChannelSend(self, content, binary=False, **kwargs): + self.sendMessage(content, binary) + + def onClose(self, wasClean, code, reason): + del self.factory.protocols[self.send_channel] + Channel("django.websocket.disconnect").send( + send_channel = self.send_channel, + ) + + +class InterfaceFactory(WebSocketServerFactory): + """ + Factory which keeps track of its open protocols' receive channels + and can dispatch to them. + """ + + # TODO: Clean up dead protocols if needed? + + def __init__(self, *args, **kwargs): + super(InterfaceFactory, self).__init__(*args, **kwargs) + self.protocols = {} + + def send_channels(self): + return self.protocols.keys() + + def dispatch_send(self, channel, message): + self.protocols[channel].onChannelSend(**message) + + +class WebsocketTwistedInterface(object): + """ + Easy API to run a WebSocket interface server using Twisted. + Integrates the channel backend by running it in a separate thread, as we don't + know if the backend is Twisted-compliant. + """ + + def __init__(self, channel_backend, port=9000): + self.channel_backend = channel_backend + self.port = port + + def run(self): + self.factory = InterfaceFactory("ws://localhost:%i" % self.port, debug=False) + self.factory.protocol = InterfaceProtocol + reactor.listenTCP(self.port, self.factory) + reactor.callInThread(self.backend_reader) + reactor.run() + + def backend_reader(self): + """ + Run in a separate thread; reads messages from the backend. + """ + while True: + channels = self.factory.send_channels() + # Don't do anything if there's no channels to listen on + if channels: + channel, message = self.channel_backend.receive_many(channels) + else: + time.sleep(0.1) + continue + # Wait around if there's nothing received + if channel is None: + time.sleep(0.05) + continue + # Deal with the message + self.factory.dispatch_send(channel, message) diff --git a/channels/interfaces/wsgi.py b/channels/interfaces/wsgi.py new file mode 100644 index 0000000..60b19c2 --- /dev/null +++ b/channels/interfaces/wsgi.py @@ -0,0 +1,21 @@ +import django +from django.core.handlers.wsgi import WSGIHandler +from django.http import HttpResponse +from channels import Channel + + +class WSGIInterface(WSGIHandler): + """ + WSGI application that pushes requests to channels. + """ + + def __init__(self, channel_backend, *args, **kwargs): + self.channel_backend = channel_backend + django.setup() + super(WSGIInterface, self).__init__(*args, **kwargs) + + def get_response(self, request): + request.response_channel = Channel.new_name("django.wsgi.response") + Channel("django.wsgi.request", channel_backend=self.channel_backend).send(**request.channel_encode()) + channel, message = self.channel_backend.receive_many_blocking([request.response_channel]) + return HttpResponse.channel_decode(message) diff --git a/channels/management/commands/runserver.py b/channels/management/commands/runserver.py index 0179b65..7a795bc 100644 --- a/channels/management/commands/runserver.py +++ b/channels/management/commands/runserver.py @@ -2,12 +2,11 @@ import django import threading from django.core.management.commands.runserver import Command as RunserverCommand from django.core.management import CommandError -from django.core.handlers.wsgi import WSGIHandler -from django.http import HttpResponse -from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND +from channels import channel_backends, DEFAULT_CHANNEL_BACKEND from channels.worker import Worker from channels.utils import auto_import_consumers from channels.adapters import UrlConsumer +from channels.interfaces.wsgi import WSGIInterface class Command(RunserverCommand): @@ -16,38 +15,25 @@ class Command(RunserverCommand): """ Returns the default WSGI handler for the runner. """ - django.setup() - return WSGIInterfaceHandler() + return WSGIInterface(self.channel_backend) def run(self, *args, **options): # Force disable reloader for now options['use_reloader'] = False # Check a handler is registered for http reqs - channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] + self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] auto_import_consumers() - if not channel_backend.registry.consumer_for_channel("django.wsgi.request"): + if not self.channel_backend.registry.consumer_for_channel("django.wsgi.request"): # Register the default one - channel_backend.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"]) + self.channel_backend.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"]) # Launch a worker thread - worker = WorkerThread(channel_backend) + worker = WorkerThread(self.channel_backend) worker.daemon = True worker.start() # Run the rest return super(Command, self).run(*args, **options) -class WSGIInterfaceHandler(WSGIHandler): - """ - New WSGI handler that pushes requests to channels. - """ - - def get_response(self, request): - request.response_channel = Channel.new_name("django.wsgi.response") - Channel("django.wsgi.request").send(**request.channel_encode()) - channel, message = channel_backends[DEFAULT_CHANNEL_BACKEND].receive_many([request.response_channel]) - return HttpResponse.channel_decode(message) - - class WorkerThread(threading.Thread): """ Class that runs a worker diff --git a/channels/management/commands/runwsserver.py b/channels/management/commands/runwsserver.py new file mode 100644 index 0000000..d5d072c --- /dev/null +++ b/channels/management/commands/runwsserver.py @@ -0,0 +1,25 @@ +import time +from django.core.management import BaseCommand, CommandError +from channels import channel_backends, DEFAULT_CHANNEL_BACKEND +from channels.interfaces.websocket_twisted import WebsocketTwistedInterface +from channels.utils import auto_import_consumers + + +class Command(BaseCommand): + + def handle(self, *args, **options): + # Get the backend to use + channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] + auto_import_consumers() + if channel_backend.local_only: + raise CommandError( + "You have a process-local channel backend configured, and so cannot run separate interface servers.\n" + "Configure a network-based backend in CHANNEL_BACKENDS to use this command." + ) + # Launch a worker + self.stdout.write("Running Twisted/Autobahn WebSocket interface against backend %s" % channel_backend) + # Run the interface + try: + WebsocketTwistedInterface(channel_backend=channel_backend).run() + except KeyboardInterrupt: + pass diff --git a/channels/worker.py b/channels/worker.py index 44522cd..f39327a 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -1,3 +1,6 @@ +import time + + class Worker(object): """ A "worker" process that continually looks for available messages to run @@ -12,10 +15,10 @@ class Worker(object): """ Tries to continually dispatch messages to consumers. """ - channels = self.channel_backend.registry.all_channel_names() while True: - channel, message = self.channel_backend.receive_many(channels) + channel, message = self.channel_backend.receive_many_blocking(channels) + # Handle the message consumer = self.channel_backend.registry.consumer_for_channel(channel) if self.callback: self.callback(channel, message)