Make everything hang off of channel_layers

This commit is contained in:
Andrew Godwin 2015-06-08 12:40:47 -07:00
parent 2cc1d00e18
commit c2c1ffc5bd
7 changed files with 57 additions and 34 deletions

View File

@ -1,8 +1,4 @@
from .channel import Channel from .channel import Channel
from .consumer_registry import ConsumerRegistry
# Make a site-wide registry
coreg = ConsumerRegistry()
# Load a backend # Load a backend
from .backends.memory import InMemoryChannelBackend from .backends.memory import InMemoryChannelBackend

View File

@ -1,7 +1,9 @@
import functools import functools
from django.core.handlers.base import BaseHandler from django.core.handlers.base import BaseHandler
from django.http import HttpRequest, HttpResponse from django.http import HttpRequest, HttpResponse
from channels import Channel, coreg
from channels import Channel, channel_layers, DEFAULT_CHANNEL_LAYER
class UrlConsumer(object): class UrlConsumer(object):
@ -33,10 +35,10 @@ def view_producer(channel_name):
return producing_view return producing_view
def view_consumer(channel_name): def view_consumer(channel_name, alias=None):
""" """
Decorates a normal Django view to be a channel consumer. Decorates a normal Django view to be a channel consumer.
Does not run any middleware. Does not run any middleware
""" """
def inner(func): def inner(func):
@functools.wraps(func) @functools.wraps(func)
@ -44,6 +46,8 @@ def view_consumer(channel_name):
request = HttpRequest.channel_decode(kwargs) request = HttpRequest.channel_decode(kwargs)
response = func(request) response = func(request)
Channel(request.response_channel).send(**response.channel_encode()) Channel(request.response_channel).send(**response.channel_encode())
coreg.add_consumer(consumer, [channel_name]) # Get the channel layer and register
channel_layer = channel_layers[alias or DEFAULT_CHANNEL_LAYER]
channel_layer.registry.add_consumer(consumer, [channel_name])
return func return func
return inner return inner

View File

@ -1,3 +1,6 @@
from channels.consumer_registry import ConsumerRegistry
class ChannelClosed(Exception): class ChannelClosed(Exception):
""" """
Raised when you try to send to a closed channel. Raised when you try to send to a closed channel.
@ -7,9 +10,14 @@ class ChannelClosed(Exception):
class BaseChannelBackend(object): class BaseChannelBackend(object):
""" """
Base class for all channel layer implementations. Base class for all channel layer implementations. Manages both sending
and receving messages from the backend, and each comes with its own
registry of consumers.
""" """
def __init__(self):
self.registry = ConsumerRegistry()
def send(self, channel, message): def send(self, channel, message):
""" """
Send a message over the channel, taken from the kwargs. Send a message over the channel, taken from the kwargs.

View File

@ -1,6 +1,8 @@
import random import random
import string import string
from django.utils import six
class Channel(object): class Channel(object):
""" """
@ -46,3 +48,20 @@ class Channel(object):
""" """
from channels.adapters import view_producer from channels.adapters import view_producer
return view_producer(self.name) return view_producer(self.name)
@classmethod
def consumer(self, channels, alias=None):
"""
Decorator that registers a function as a consumer.
"""
from channels import channel_layers, DEFAULT_CHANNEL_LAYER
# Upconvert if you just pass in a string
if isinstance(channels, six.string_types):
channels = [channels]
# Get the channel
channel_layer = channel_layers[alias or DEFAULT_CHANNEL_LAYER]
# Return a function that'll register whatever it wraps
def inner(func):
channel_layer.registry.add_consumer(func, channels)
return func
return inner

View File

@ -1,19 +1,26 @@
import functools import functools
from django.utils import six from django.utils import six
from .utils import name_that_thing from .utils import name_that_thing
class ConsumerRegistry(object): class ConsumerRegistry(object):
""" """
Manages the available consumers in the project and which channels they Manages the available consumers in the project and which channels they
listen to. listen to.
Generally a single project-wide instance of this is used. Generally this is attached to a backend instance as ".registry"
""" """
def __init__(self): def __init__(self):
self.consumers = {} self.consumers = {}
def add_consumer(self, consumer, channels): def add_consumer(self, consumer, channels):
# Upconvert if you just pass in a string
if isinstance(channels, six.string_types):
channels = [channels]
# Register on each channel, checking it's unique
for channel in channels: for channel in channels:
if channel in self.consumers: if channel in self.consumers:
raise ValueError("Cannot register consumer %s - channel %r already consumed by %s" % ( raise ValueError("Cannot register consumer %s - channel %r already consumed by %s" % (
@ -23,17 +30,6 @@ class ConsumerRegistry(object):
)) ))
self.consumers[channel] = consumer self.consumers[channel] = consumer
def consumer(self, channels):
"""
Decorator that registers a function as a consumer.
"""
if isinstance(channels, six.string_types):
channels = [channels]
def inner(func):
self.add_consumer(func, channels)
return func
return inner
def all_channel_names(self): def all_channel_names(self):
return self.consumers.keys() return self.consumers.keys()

View File

@ -3,7 +3,7 @@ import threading
from django.core.management.commands.runserver import Command as RunserverCommand from django.core.management.commands.runserver import Command as RunserverCommand
from django.core.handlers.wsgi import WSGIHandler from django.core.handlers.wsgi import WSGIHandler
from django.http import HttpResponse from django.http import HttpResponse
from channels import Channel, coreg, channel_layers, DEFAULT_CHANNEL_LAYER from channels import Channel, channel_layers, DEFAULT_CHANNEL_LAYER
from channels.worker import Worker from channels.worker import Worker
from channels.utils import auto_import_consumers from channels.utils import auto_import_consumers
from channels.adapters import UrlConsumer from channels.adapters import UrlConsumer
@ -22,12 +22,13 @@ class Command(RunserverCommand):
# Force disable reloader for now # Force disable reloader for now
options['use_reloader'] = False options['use_reloader'] = False
# Check a handler is registered for http reqs # Check a handler is registered for http reqs
channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER]
auto_import_consumers() auto_import_consumers()
if not coreg.consumer_for_channel("django.wsgi.request"): if not channel_layer.registry.consumer_for_channel("django.wsgi.request"):
# Register the default one # Register the default one
coreg.add_consumer(UrlConsumer(), ["django.wsgi.request"]) channel_layer.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"])
# Launch a worker thread # Launch a worker thread
worker = WorkerThread() worker = WorkerThread(channel_layer)
worker.daemon = True worker.daemon = True
worker.start() worker.start()
# Run the rest # Run the rest
@ -51,8 +52,9 @@ class WorkerThread(threading.Thread):
Class that runs a worker Class that runs a worker
""" """
def __init__(self, channel_layer):
super(WorkerThread, self).__init__()
self.channel_layer = channel_layer
def run(self): def run(self):
Worker( Worker(channel_layer=self.channel_layer).run()
consumer_registry = coreg,
channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER],
).run()

View File

@ -4,9 +4,7 @@ class Worker(object):
and runs their consumers. and runs their consumers.
""" """
def __init__(self, consumer_registry, channel_layer): def __init__(self, channel_layer):
from channels import channel_layers, DEFAULT_CHANNEL_LAYER
self.consumer_registry = consumer_registry
self.channel_layer = channel_layer self.channel_layer = channel_layer
def run(self): def run(self):
@ -14,8 +12,8 @@ class Worker(object):
Tries to continually dispatch messages to consumers. Tries to continually dispatch messages to consumers.
""" """
channels = self.consumer_registry.all_channel_names() channels = self.channel_layer.registry.all_channel_names()
while True: while True:
channel, message = self.channel_layer.receive_many(channels) channel, message = self.channel_layer.receive_many(channels)
consumer = self.consumer_registry.consumer_for_channel(channel) consumer = self.channel_layer.registry.consumer_for_channel(channel)
consumer(**message) consumer(**message)