Separate channel backend from user-facing class

This commit is contained in:
Andrew Godwin 2015-06-08 12:22:23 -07:00
parent 6cd01e2bc1
commit 2cc1d00e18
11 changed files with 128 additions and 108 deletions

View File

@ -1,10 +1,15 @@
from .channel import Channel
from .consumer_registry import ConsumerRegistry
# Make a site-wide registry
coreg = ConsumerRegistry()
# Load an implementation of Channel
from .backends import InMemoryChannel as Channel
# Load a backend
from .backends.memory import InMemoryChannelBackend
DEFAULT_CHANNEL_LAYER = "default"
channel_layers = {
DEFAULT_CHANNEL_LAYER: InMemoryChannelBackend(),
}
# Ensure monkeypatching
from .hacks import monkeypatch_django

View File

@ -1,3 +1,4 @@
import functools
from django.core.handlers.base import BaseHandler
from django.http import HttpRequest, HttpResponse
from channels import Channel, coreg
@ -38,6 +39,7 @@ def view_consumer(channel_name):
Does not run any middleware.
"""
def inner(func):
@functools.wraps(func)
def consumer(**kwargs):
request = HttpRequest.channel_decode(kwargs)
response = func(request)

View File

@ -1,2 +0,0 @@
from .base import BaseChannel
from .memory import InMemoryChannel

View File

@ -1,62 +1,24 @@
class BaseChannel(object):
class ChannelClosed(Exception):
"""
Raised when you try to send to a closed channel.
"""
pass
class BaseChannelBackend(object):
"""
Base class for all channel layer implementations.
"""
class ClosedError(Exception):
"""
Raised when you try to send to a closed channel.
"""
pass
def __init__(self, name):
"""
Create an instance for the channel named "name"
"""
self.name = name
def send(self, **kwargs):
def send(self, channel, message):
"""
Send a message over the channel, taken from the kwargs.
"""
raise NotImplementedError()
def close(self):
"""
Closes the channel, allowing no more messages to be sent over it.
"""
raise NotImplementedError()
@property
def closed(self):
"""
Says if the channel is closed.
"""
raise NotImplementedError()
@classmethod
def receive_many(self, channel_names):
def receive_many(self, channels):
"""
Block and return the first message available on one of the
channels passed, as a (channel_name, message) tuple.
channels passed, as a (channel, message) tuple.
"""
raise NotImplementedError()
@classmethod
def new_name(self, prefix):
"""
Returns a new channel name that's unique and not closed
with the given prefix. Does not need to be called before sending
on a channel name - just provides a way to avoid clashing for
response channels.
"""
raise NotImplementedError()
def as_view(self):
"""
Returns a view version of this channel - one that takes
the request passed in and dispatches it to our channel,
serialized.
"""
from channels.adapters import view_producer
return view_producer(self.name)

View File

@ -2,47 +2,30 @@ import time
import string
import random
from collections import deque
from .base import BaseChannel
from .base import BaseChannelBackend
queues = {}
closed = set()
class InMemoryChannel(BaseChannel):
class InMemoryChannelBackend(BaseChannelBackend):
"""
In-memory channel implementation. Intended only for use with threading,
in low-throughput development environments.
"""
def send(self, **kwargs):
# Don't allow if closed
if self.name in closed:
raise Channel.ClosedError("%s is closed" % self.name)
def send(self, channel, message):
# Add to the deque, making it if needs be
queues.setdefault(self.name, deque()).append(kwargs)
queues.setdefault(channel, deque()).append(message)
@property
def closed(self):
# Check closed set
return self.name in closed
def close(self):
# Add to closed set
closed.add(self.name)
@classmethod
def receive_many(self, channel_names):
def receive_many(self, channels):
while True:
# Try to pop a message from each channel
for channel_name in channel_names:
for channel in channels:
try:
# This doesn't clean up empty channels - OK for testing.
# For later versions, have cleanup w/lock.
return channel_name, queues[channel_name].popleft()
return channel, queues[channel].popleft()
except (IndexError, KeyError):
pass
# If all empty, sleep for a little bit
time.sleep(0.01)
@classmethod
def new_name(self, prefix):
return "%s.%s" % (prefix, "".join(random.choice(string.ascii_letters) for i in range(16)))

48
channels/channel.py Normal file
View File

@ -0,0 +1,48 @@
import random
import string
class Channel(object):
"""
Public interaction class for the channel layer.
This is separate to the backends so we can:
a) Hide receive_many from end-users, as it is only for interface servers
b) Keep a stable-ish backend interface for third parties
You can pass an alternate Channel Layer alias in, but it will use the
"default" one by default.
"""
def __init__(self, name, alias=None):
"""
Create an instance for the channel named "name"
"""
from channels import channel_layers, DEFAULT_CHANNEL_LAYER
self.name = name
self.channel_layer = channel_layers[alias or DEFAULT_CHANNEL_LAYER]
def send(self, **kwargs):
"""
Send a message over the channel, taken from the kwargs.
"""
self.channel_layer.send(self.name, kwargs)
@classmethod
def new_name(self, prefix):
"""
Returns a new channel name that's unique and not closed
with the given prefix. Does not need to be called before sending
on a channel name - just provides a way to avoid clashing for
response channels.
"""
return "%s.%s" % (prefix, "".join(random.choice(string.ascii_letters) for i in range(32)))
def as_view(self):
"""
Returns a view version of this channel - one that takes
the request passed in and dispatches it to our channel,
serialized.
"""
from channels.adapters import view_producer
return view_producer(self.name)

View File

@ -1,4 +1,6 @@
import functools
from django.utils import six
from .utils import name_that_thing
class ConsumerRegistry(object):
"""
@ -14,10 +16,10 @@ class ConsumerRegistry(object):
def add_consumer(self, consumer, channels):
for channel in channels:
if channel in self.consumers:
raise ValueError("Cannot register consumer %s - channel %s already consumed by %s" % (
consumer,
raise ValueError("Cannot register consumer %s - channel %r already consumed by %s" % (
name_that_thing(consumer),
channel,
self.consumers[channel],
name_that_thing(self.consumers[channel]),
))
self.consumers[channel] = consumer
@ -25,6 +27,8 @@ class ConsumerRegistry(object):
"""
Decorator that registers a function as a consumer.
"""
if isinstance(channels, six.string_types):
channels = [channels]
def inner(func):
self.add_consumer(func, channels)
return func

View File

@ -1,23 +1,24 @@
Message Standards
Integration Notes
=================
Some standardised message formats are used for common message types - they
are detailed below.
Django Channels is intended to be merged into Django itself; these are the
planned changes the codebase will need to undertake in that transition.
HTTP Request
------------
* The ``channels`` package will become ``django.channels``. The expected way
of interacting with the system will be via the ``Channel`` object,
Represents a full-fledged, single HTTP request coming in from a client.
Contains the following keys:
* Obviously, the monkeypatches in ``channels.hacks`` will be replaced by
placing methods onto the objects themselves. The ``request`` and ``response``
modules will thus no longer exist separately.
* request: An encoded Django HTTP request
* response_channel: The channel name to write responses to
Things to ponder
----------------
* The mismatch between signals (broadcast) and channels (single-worker) means
we should probably leave patching signals into channels for the end developer.
This would also ensure the speedup improvements for empty signals keep working.
HTTP Response
-------------
Sends a whole response to a client.
Contains the following keys:
* response: An encoded Django HTTP response
* It's likely that the decorator-based approach of consumer registration will
mean extending Django's auto-module-loading beyond ``models`` and
``admin`` app modules to include ``views`` and ``consumers``. There may be
a better unified approach to this.

View File

@ -3,7 +3,7 @@ import threading
from django.core.management.commands.runserver import Command as RunserverCommand
from django.core.handlers.wsgi import WSGIHandler
from django.http import HttpResponse
from channels import Channel, coreg
from channels import Channel, coreg, channel_layers, DEFAULT_CHANNEL_LAYER
from channels.worker import Worker
from channels.utils import auto_import_consumers
from channels.adapters import UrlConsumer
@ -42,7 +42,7 @@ class WSGIInterfaceHandler(WSGIHandler):
def get_response(self, request):
request.response_channel = Channel.new_name("django.wsgi.response")
Channel("django.wsgi.request").send(**request.channel_encode())
channel, message = Channel.receive_many([request.response_channel])
channel, message = channel_layers[DEFAULT_CHANNEL_LAYER].receive_many([request.response_channel])
return HttpResponse.channel_decode(message)
@ -54,5 +54,5 @@ class WorkerThread(threading.Thread):
def run(self):
Worker(
consumer_registry = coreg,
channel_class = Channel,
channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER],
).run()

View File

@ -1,3 +1,4 @@
import types
from django.apps import apps
@ -6,9 +7,23 @@ def auto_import_consumers():
Auto-import consumers modules in apps
"""
for app_config in apps.get_app_configs():
consumer_module_name = "%s.consumers" % (app_config.name,)
try:
__import__(consumer_module_name)
except ImportError as e:
if "no module named consumers" not in str(e).lower():
raise
for submodule in ["consumers", "views"]:
module_name = "%s.%s" % (app_config.name, submodule)
try:
__import__(module_name)
except ImportError as e:
if "no module named %s" % submodule not in str(e).lower():
raise
def name_that_thing(thing):
"""
Returns either the function/class path or just the object's repr
"""
if hasattr(thing, "__name__"):
if hasattr(thing, "__class__") and not isinstance(thing, types.FunctionType):
if thing.__class__ is not type:
return name_that_thing(thing.__class__)
if hasattr(thing, "__module__"):
return "%s.%s" % (thing.__module__, thing.__name__)
return repr(thing)

View File

@ -4,16 +4,18 @@ class Worker(object):
and runs their consumers.
"""
def __init__(self, consumer_registry, channel_class):
def __init__(self, consumer_registry, channel_layer):
from channels import channel_layers, DEFAULT_CHANNEL_LAYER
self.consumer_registry = consumer_registry
self.channel_class = channel_class
self.channel_layer = channel_layer
def run(self):
"""
Tries to continually dispatch messages to consumers.
"""
channels = self.consumer_registry.all_channel_names()
while True:
channel, message = self.channel_class.receive_many(channels)
channel, message = self.channel_layer.receive_many(channels)
consumer = self.consumer_registry.consumer_for_channel(channel)
consumer(**message)