Start working on a WebSocket interface server

This commit is contained in:
Andrew Godwin 2015-06-10 20:39:36 -07:00
parent 80627d8e37
commit 433625da1e
10 changed files with 214 additions and 48 deletions

View File

@ -1,3 +1,4 @@
import time
from channels.consumer_registry import ConsumerRegistry from channels.consumer_registry import ConsumerRegistry
@ -31,10 +32,34 @@ class BaseChannelBackend(object):
def receive_many(self, channels): def receive_many(self, channels):
""" """
Block and return the first message available on one of the Return the first message available on one of the
channels passed, as a (channel, message) tuple. channels passed, as a (channel, message) tuple, or return (None, None)
if no channels are available.
Should not block, but is allowed to be moderately slow/have a short
timeout - it needs to return so we can refresh the list of channels,
not because the rest of the process is waiting on it.
Better performance can be achieved for interface servers by directly
integrating the server and the backend code; this is merely for a
generic support-everything pattern.
""" """
raise NotImplementedError() raise NotImplementedError()
def receive_many_blocking(self, channels):
"""
Blocking version of receive_many, if the calling context knows it
doesn't ever want to change the channels list until something happens.
This base class provides a default implementation; can be overridden
to be more efficient by subclasses.
"""
while True:
channel, message = self.receive_many(channels)
if channel is None:
time.sleep(0.05)
continue
return channel, message
def __str__(self): def __str__(self):
return self.__class__.__name__ return self.__class__.__name__

View File

@ -60,16 +60,15 @@ class DatabaseChannelBackend(BaseChannelBackend):
def receive_many(self, channels): def receive_many(self, channels):
if not channels: if not channels:
raise ValueError("Cannot receive on empty channel list!") raise ValueError("Cannot receive on empty channel list!")
while True: # Delete all expired messages (add 10 second grace period for clock sync)
# Delete all expired messages (add 10 second grace period for clock sync) self.model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete()
self.model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete() # Get a message from one of our channels
# Get a message from one of our channels message = self.model.objects.filter(channel__in=channels).order_by("id").first()
message = self.model.objects.filter(channel__in=channels).order_by("id").first() if message:
if message: self.model.objects.filter(pk=message.pk).delete()
self.model.objects.filter(pk=message.pk).delete() return message.channel, json.loads(message.content)
return message.channel, json.loads(message.content) else:
# If all empty, sleep for a little bit return None, None
time.sleep(0.1)
def __str__(self): def __str__(self):
return "%s(alias=%s)" % (self.__class__.__name__, self.connection.alias) return "%s(alias=%s)" % (self.__class__.__name__, self.connection.alias)

View File

@ -22,15 +22,13 @@ class InMemoryChannelBackend(BaseChannelBackend):
def receive_many(self, channels): def receive_many(self, channels):
if not channels: if not channels:
raise ValueError("Cannot receive on empty channel list!") raise ValueError("Cannot receive on empty channel list!")
while True: # Try to pop a message from each channel
# Try to pop a message from each channel for channel in channels:
for channel in channels: try:
try: # This doesn't clean up empty channels - OK for testing.
# This doesn't clean up empty channels - OK for testing. # For later versions, have cleanup w/lock.
# For later versions, have cleanup w/lock. return channel, queues[channel].popleft()
return channel, queues[channel].popleft() except (IndexError, KeyError):
except (IndexError, KeyError): pass
pass return None, None
# If all empty, sleep for a little bit
time.sleep(0.01)

View File

@ -18,12 +18,15 @@ class Channel(object):
"default" one by default. "default" one by default.
""" """
def __init__(self, name, alias=DEFAULT_CHANNEL_BACKEND): def __init__(self, name, alias=DEFAULT_CHANNEL_BACKEND, channel_backend=None):
""" """
Create an instance for the channel named "name" Create an instance for the channel named "name"
""" """
self.name = name self.name = name
self.channel_backend = channel_backends[alias] if channel_backend:
self.channel_backend = channel_backend
else:
self.channel_backend = channel_backends[alias]
def send(self, **kwargs): def send(self, **kwargs):
""" """

View File

View File

@ -0,0 +1,106 @@
import django
import time
from collections import deque
from twisted.internet import reactor
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
class InterfaceProtocol(WebSocketServerProtocol):
"""
Protocol which supports WebSockets and forwards incoming messages to
the django.websocket channels.
"""
def onConnect(self, request):
self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
self.request = request
def onOpen(self):
# Make sending channel
self.send_channel = Channel.new_name("django.websocket.send")
self.factory.protocols[self.send_channel] = self
# Send news that this channel is open
Channel("django.websocket.connect").send(
send_channel = self.send_channel,
)
def onMessage(self, payload, isBinary):
if isBinary:
Channel("django.websocket.receive").send(
send_channel = self.send_channel,
content = payload,
binary = True,
)
else:
Channel("django.websocket.receive").send(
send_channel = self.send_channel,
content = payload.decode("utf8"),
binary = False,
)
def onChannelSend(self, content, binary=False, **kwargs):
self.sendMessage(content, binary)
def onClose(self, wasClean, code, reason):
del self.factory.protocols[self.send_channel]
Channel("django.websocket.disconnect").send(
send_channel = self.send_channel,
)
class InterfaceFactory(WebSocketServerFactory):
"""
Factory which keeps track of its open protocols' receive channels
and can dispatch to them.
"""
# TODO: Clean up dead protocols if needed?
def __init__(self, *args, **kwargs):
super(InterfaceFactory, self).__init__(*args, **kwargs)
self.protocols = {}
def send_channels(self):
return self.protocols.keys()
def dispatch_send(self, channel, message):
self.protocols[channel].onChannelSend(**message)
class WebsocketTwistedInterface(object):
"""
Easy API to run a WebSocket interface server using Twisted.
Integrates the channel backend by running it in a separate thread, as we don't
know if the backend is Twisted-compliant.
"""
def __init__(self, channel_backend, port=9000):
self.channel_backend = channel_backend
self.port = port
def run(self):
self.factory = InterfaceFactory("ws://localhost:%i" % self.port, debug=False)
self.factory.protocol = InterfaceProtocol
reactor.listenTCP(self.port, self.factory)
reactor.callInThread(self.backend_reader)
reactor.run()
def backend_reader(self):
"""
Run in a separate thread; reads messages from the backend.
"""
while True:
channels = self.factory.send_channels()
# Don't do anything if there's no channels to listen on
if channels:
channel, message = self.channel_backend.receive_many(channels)
else:
time.sleep(0.1)
continue
# Wait around if there's nothing received
if channel is None:
time.sleep(0.05)
continue
# Deal with the message
self.factory.dispatch_send(channel, message)

View File

@ -0,0 +1,21 @@
import django
from django.core.handlers.wsgi import WSGIHandler
from django.http import HttpResponse
from channels import Channel
class WSGIInterface(WSGIHandler):
"""
WSGI application that pushes requests to channels.
"""
def __init__(self, channel_backend, *args, **kwargs):
self.channel_backend = channel_backend
django.setup()
super(WSGIInterface, self).__init__(*args, **kwargs)
def get_response(self, request):
request.response_channel = Channel.new_name("django.wsgi.response")
Channel("django.wsgi.request", channel_backend=self.channel_backend).send(**request.channel_encode())
channel, message = self.channel_backend.receive_many_blocking([request.response_channel])
return HttpResponse.channel_decode(message)

View File

@ -2,12 +2,11 @@ import django
import threading import threading
from django.core.management.commands.runserver import Command as RunserverCommand from django.core.management.commands.runserver import Command as RunserverCommand
from django.core.management import CommandError from django.core.management import CommandError
from django.core.handlers.wsgi import WSGIHandler from channels import channel_backends, DEFAULT_CHANNEL_BACKEND
from django.http import HttpResponse
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND
from channels.worker import Worker from channels.worker import Worker
from channels.utils import auto_import_consumers from channels.utils import auto_import_consumers
from channels.adapters import UrlConsumer from channels.adapters import UrlConsumer
from channels.interfaces.wsgi import WSGIInterface
class Command(RunserverCommand): class Command(RunserverCommand):
@ -16,38 +15,25 @@ class Command(RunserverCommand):
""" """
Returns the default WSGI handler for the runner. Returns the default WSGI handler for the runner.
""" """
django.setup() return WSGIInterface(self.channel_backend)
return WSGIInterfaceHandler()
def run(self, *args, **options): def run(self, *args, **options):
# Force disable reloader for now # Force disable reloader for now
options['use_reloader'] = False options['use_reloader'] = False
# Check a handler is registered for http reqs # Check a handler is registered for http reqs
channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
auto_import_consumers() auto_import_consumers()
if not channel_backend.registry.consumer_for_channel("django.wsgi.request"): if not self.channel_backend.registry.consumer_for_channel("django.wsgi.request"):
# Register the default one # Register the default one
channel_backend.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"]) self.channel_backend.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"])
# Launch a worker thread # Launch a worker thread
worker = WorkerThread(channel_backend) worker = WorkerThread(self.channel_backend)
worker.daemon = True worker.daemon = True
worker.start() worker.start()
# Run the rest # Run the rest
return super(Command, self).run(*args, **options) return super(Command, self).run(*args, **options)
class WSGIInterfaceHandler(WSGIHandler):
"""
New WSGI handler that pushes requests to channels.
"""
def get_response(self, request):
request.response_channel = Channel.new_name("django.wsgi.response")
Channel("django.wsgi.request").send(**request.channel_encode())
channel, message = channel_backends[DEFAULT_CHANNEL_BACKEND].receive_many([request.response_channel])
return HttpResponse.channel_decode(message)
class WorkerThread(threading.Thread): class WorkerThread(threading.Thread):
""" """
Class that runs a worker Class that runs a worker

View File

@ -0,0 +1,25 @@
import time
from django.core.management import BaseCommand, CommandError
from channels import channel_backends, DEFAULT_CHANNEL_BACKEND
from channels.interfaces.websocket_twisted import WebsocketTwistedInterface
from channels.utils import auto_import_consumers
class Command(BaseCommand):
def handle(self, *args, **options):
# Get the backend to use
channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
auto_import_consumers()
if channel_backend.local_only:
raise CommandError(
"You have a process-local channel backend configured, and so cannot run separate interface servers.\n"
"Configure a network-based backend in CHANNEL_BACKENDS to use this command."
)
# Launch a worker
self.stdout.write("Running Twisted/Autobahn WebSocket interface against backend %s" % channel_backend)
# Run the interface
try:
WebsocketTwistedInterface(channel_backend=channel_backend).run()
except KeyboardInterrupt:
pass

View File

@ -1,3 +1,6 @@
import time
class Worker(object): class Worker(object):
""" """
A "worker" process that continually looks for available messages to run A "worker" process that continually looks for available messages to run
@ -12,10 +15,10 @@ class Worker(object):
""" """
Tries to continually dispatch messages to consumers. Tries to continually dispatch messages to consumers.
""" """
channels = self.channel_backend.registry.all_channel_names() channels = self.channel_backend.registry.all_channel_names()
while True: while True:
channel, message = self.channel_backend.receive_many(channels) channel, message = self.channel_backend.receive_many_blocking(channels)
# Handle the message
consumer = self.channel_backend.registry.consumer_for_channel(channel) consumer = self.channel_backend.registry.consumer_for_channel(channel)
if self.callback: if self.callback:
self.callback(channel, message) self.callback(channel, message)