From c2c1ffc5bd59443463456b91d85598ab25bca8a1 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Mon, 8 Jun 2015 12:40:47 -0700 Subject: [PATCH] Make everything hang off of channel_layers --- channels/__init__.py | 4 ---- channels/adapters.py | 12 ++++++++---- channels/backends/base.py | 10 +++++++++- channels/channel.py | 19 +++++++++++++++++++ channels/consumer_registry.py | 20 ++++++++------------ channels/management/commands/runserver.py | 18 ++++++++++-------- channels/worker.py | 8 +++----- 7 files changed, 57 insertions(+), 34 deletions(-) diff --git a/channels/__init__.py b/channels/__init__.py index d679bd6..8d02ab0 100644 --- a/channels/__init__.py +++ b/channels/__init__.py @@ -1,8 +1,4 @@ from .channel import Channel -from .consumer_registry import ConsumerRegistry - -# Make a site-wide registry -coreg = ConsumerRegistry() # Load a backend from .backends.memory import InMemoryChannelBackend diff --git a/channels/adapters.py b/channels/adapters.py index e078310..bad2276 100644 --- a/channels/adapters.py +++ b/channels/adapters.py @@ -1,7 +1,9 @@ import functools + from django.core.handlers.base import BaseHandler from django.http import HttpRequest, HttpResponse -from channels import Channel, coreg + +from channels import Channel, channel_layers, DEFAULT_CHANNEL_LAYER class UrlConsumer(object): @@ -33,10 +35,10 @@ def view_producer(channel_name): return producing_view -def view_consumer(channel_name): +def view_consumer(channel_name, alias=None): """ Decorates a normal Django view to be a channel consumer. - Does not run any middleware. + Does not run any middleware """ def inner(func): @functools.wraps(func) @@ -44,6 +46,8 @@ def view_consumer(channel_name): request = HttpRequest.channel_decode(kwargs) response = func(request) Channel(request.response_channel).send(**response.channel_encode()) - coreg.add_consumer(consumer, [channel_name]) + # Get the channel layer and register + channel_layer = channel_layers[alias or DEFAULT_CHANNEL_LAYER] + channel_layer.registry.add_consumer(consumer, [channel_name]) return func return inner diff --git a/channels/backends/base.py b/channels/backends/base.py index 5ff69db..a35fcdc 100644 --- a/channels/backends/base.py +++ b/channels/backends/base.py @@ -1,3 +1,6 @@ +from channels.consumer_registry import ConsumerRegistry + + class ChannelClosed(Exception): """ Raised when you try to send to a closed channel. @@ -7,9 +10,14 @@ class ChannelClosed(Exception): class BaseChannelBackend(object): """ - Base class for all channel layer implementations. + Base class for all channel layer implementations. Manages both sending + and receving messages from the backend, and each comes with its own + registry of consumers. """ + def __init__(self): + self.registry = ConsumerRegistry() + def send(self, channel, message): """ Send a message over the channel, taken from the kwargs. diff --git a/channels/channel.py b/channels/channel.py index ea4a133..a911548 100644 --- a/channels/channel.py +++ b/channels/channel.py @@ -1,6 +1,8 @@ import random import string +from django.utils import six + class Channel(object): """ @@ -46,3 +48,20 @@ class Channel(object): """ from channels.adapters import view_producer return view_producer(self.name) + + @classmethod + def consumer(self, channels, alias=None): + """ + Decorator that registers a function as a consumer. + """ + from channels import channel_layers, DEFAULT_CHANNEL_LAYER + # Upconvert if you just pass in a string + if isinstance(channels, six.string_types): + channels = [channels] + # Get the channel + channel_layer = channel_layers[alias or DEFAULT_CHANNEL_LAYER] + # Return a function that'll register whatever it wraps + def inner(func): + channel_layer.registry.add_consumer(func, channels) + return func + return inner diff --git a/channels/consumer_registry.py b/channels/consumer_registry.py index e308282..1bfe4e0 100644 --- a/channels/consumer_registry.py +++ b/channels/consumer_registry.py @@ -1,19 +1,26 @@ import functools + from django.utils import six + from .utils import name_that_thing + class ConsumerRegistry(object): """ Manages the available consumers in the project and which channels they listen to. - Generally a single project-wide instance of this is used. + Generally this is attached to a backend instance as ".registry" """ def __init__(self): self.consumers = {} def add_consumer(self, consumer, channels): + # Upconvert if you just pass in a string + if isinstance(channels, six.string_types): + channels = [channels] + # Register on each channel, checking it's unique for channel in channels: if channel in self.consumers: raise ValueError("Cannot register consumer %s - channel %r already consumed by %s" % ( @@ -23,17 +30,6 @@ class ConsumerRegistry(object): )) self.consumers[channel] = consumer - def consumer(self, channels): - """ - Decorator that registers a function as a consumer. - """ - if isinstance(channels, six.string_types): - channels = [channels] - def inner(func): - self.add_consumer(func, channels) - return func - return inner - def all_channel_names(self): return self.consumers.keys() diff --git a/channels/management/commands/runserver.py b/channels/management/commands/runserver.py index 8ed1563..f17aa95 100644 --- a/channels/management/commands/runserver.py +++ b/channels/management/commands/runserver.py @@ -3,7 +3,7 @@ import threading from django.core.management.commands.runserver import Command as RunserverCommand from django.core.handlers.wsgi import WSGIHandler from django.http import HttpResponse -from channels import Channel, coreg, channel_layers, DEFAULT_CHANNEL_LAYER +from channels import Channel, channel_layers, DEFAULT_CHANNEL_LAYER from channels.worker import Worker from channels.utils import auto_import_consumers from channels.adapters import UrlConsumer @@ -22,12 +22,13 @@ class Command(RunserverCommand): # Force disable reloader for now options['use_reloader'] = False # Check a handler is registered for http reqs + channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER] auto_import_consumers() - if not coreg.consumer_for_channel("django.wsgi.request"): + if not channel_layer.registry.consumer_for_channel("django.wsgi.request"): # Register the default one - coreg.add_consumer(UrlConsumer(), ["django.wsgi.request"]) + channel_layer.registry.add_consumer(UrlConsumer(), ["django.wsgi.request"]) # Launch a worker thread - worker = WorkerThread() + worker = WorkerThread(channel_layer) worker.daemon = True worker.start() # Run the rest @@ -51,8 +52,9 @@ class WorkerThread(threading.Thread): Class that runs a worker """ + def __init__(self, channel_layer): + super(WorkerThread, self).__init__() + self.channel_layer = channel_layer + def run(self): - Worker( - consumer_registry = coreg, - channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER], - ).run() + Worker(channel_layer=self.channel_layer).run() diff --git a/channels/worker.py b/channels/worker.py index 4f4d129..9ce0606 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -4,9 +4,7 @@ class Worker(object): and runs their consumers. """ - def __init__(self, consumer_registry, channel_layer): - from channels import channel_layers, DEFAULT_CHANNEL_LAYER - self.consumer_registry = consumer_registry + def __init__(self, channel_layer): self.channel_layer = channel_layer def run(self): @@ -14,8 +12,8 @@ class Worker(object): Tries to continually dispatch messages to consumers. """ - channels = self.consumer_registry.all_channel_names() + channels = self.channel_layer.registry.all_channel_names() while True: channel, message = self.channel_layer.receive_many(channels) - consumer = self.consumer_registry.consumer_for_channel(channel) + consumer = self.channel_layer.registry.consumer_for_channel(channel) consumer(**message)