diff --git a/channels/__init__.py b/channels/__init__.py index 1aa77ce..d679bd6 100644 --- a/channels/__init__.py +++ b/channels/__init__.py @@ -1,10 +1,15 @@ +from .channel import Channel from .consumer_registry import ConsumerRegistry # Make a site-wide registry coreg = ConsumerRegistry() -# Load an implementation of Channel -from .backends import InMemoryChannel as Channel +# Load a backend +from .backends.memory import InMemoryChannelBackend +DEFAULT_CHANNEL_LAYER = "default" +channel_layers = { + DEFAULT_CHANNEL_LAYER: InMemoryChannelBackend(), +} # Ensure monkeypatching from .hacks import monkeypatch_django diff --git a/channels/adapters.py b/channels/adapters.py index 03c02c9..e078310 100644 --- a/channels/adapters.py +++ b/channels/adapters.py @@ -1,3 +1,4 @@ +import functools from django.core.handlers.base import BaseHandler from django.http import HttpRequest, HttpResponse from channels import Channel, coreg @@ -38,6 +39,7 @@ def view_consumer(channel_name): Does not run any middleware. """ def inner(func): + @functools.wraps(func) def consumer(**kwargs): request = HttpRequest.channel_decode(kwargs) response = func(request) diff --git a/channels/backends/__init__.py b/channels/backends/__init__.py index 0627c0a..e69de29 100644 --- a/channels/backends/__init__.py +++ b/channels/backends/__init__.py @@ -1,2 +0,0 @@ -from .base import BaseChannel -from .memory import InMemoryChannel diff --git a/channels/backends/base.py b/channels/backends/base.py index 5341e0c..5ff69db 100644 --- a/channels/backends/base.py +++ b/channels/backends/base.py @@ -1,62 +1,24 @@ -class BaseChannel(object): +class ChannelClosed(Exception): + """ + Raised when you try to send to a closed channel. + """ + pass + + +class BaseChannelBackend(object): """ Base class for all channel layer implementations. """ - class ClosedError(Exception): - """ - Raised when you try to send to a closed channel. - """ - pass - - def __init__(self, name): - """ - Create an instance for the channel named "name" - """ - self.name = name - - def send(self, **kwargs): + def send(self, channel, message): """ Send a message over the channel, taken from the kwargs. """ raise NotImplementedError() - def close(self): - """ - Closes the channel, allowing no more messages to be sent over it. - """ - raise NotImplementedError() - - @property - def closed(self): - """ - Says if the channel is closed. - """ - raise NotImplementedError() - - @classmethod - def receive_many(self, channel_names): + def receive_many(self, channels): """ Block and return the first message available on one of the - channels passed, as a (channel_name, message) tuple. + channels passed, as a (channel, message) tuple. """ raise NotImplementedError() - - @classmethod - def new_name(self, prefix): - """ - Returns a new channel name that's unique and not closed - with the given prefix. Does not need to be called before sending - on a channel name - just provides a way to avoid clashing for - response channels. - """ - raise NotImplementedError() - - def as_view(self): - """ - Returns a view version of this channel - one that takes - the request passed in and dispatches it to our channel, - serialized. - """ - from channels.adapters import view_producer - return view_producer(self.name) diff --git a/channels/backends/memory.py b/channels/backends/memory.py index 91bd1a2..8b4bd14 100644 --- a/channels/backends/memory.py +++ b/channels/backends/memory.py @@ -2,47 +2,30 @@ import time import string import random from collections import deque -from .base import BaseChannel +from .base import BaseChannelBackend queues = {} -closed = set() -class InMemoryChannel(BaseChannel): +class InMemoryChannelBackend(BaseChannelBackend): """ In-memory channel implementation. Intended only for use with threading, in low-throughput development environments. """ - def send(self, **kwargs): - # Don't allow if closed - if self.name in closed: - raise Channel.ClosedError("%s is closed" % self.name) + def send(self, channel, message): # Add to the deque, making it if needs be - queues.setdefault(self.name, deque()).append(kwargs) + queues.setdefault(channel, deque()).append(message) - @property - def closed(self): - # Check closed set - return self.name in closed - - def close(self): - # Add to closed set - closed.add(self.name) - - @classmethod - def receive_many(self, channel_names): + def receive_many(self, channels): while True: # Try to pop a message from each channel - for channel_name in channel_names: + for channel in channels: try: # This doesn't clean up empty channels - OK for testing. # For later versions, have cleanup w/lock. - return channel_name, queues[channel_name].popleft() + return channel, queues[channel].popleft() except (IndexError, KeyError): pass # If all empty, sleep for a little bit time.sleep(0.01) - @classmethod - def new_name(self, prefix): - return "%s.%s" % (prefix, "".join(random.choice(string.ascii_letters) for i in range(16))) diff --git a/channels/channel.py b/channels/channel.py new file mode 100644 index 0000000..ea4a133 --- /dev/null +++ b/channels/channel.py @@ -0,0 +1,48 @@ +import random +import string + + +class Channel(object): + """ + Public interaction class for the channel layer. + + This is separate to the backends so we can: + a) Hide receive_many from end-users, as it is only for interface servers + b) Keep a stable-ish backend interface for third parties + + You can pass an alternate Channel Layer alias in, but it will use the + "default" one by default. + """ + + def __init__(self, name, alias=None): + """ + Create an instance for the channel named "name" + """ + from channels import channel_layers, DEFAULT_CHANNEL_LAYER + self.name = name + self.channel_layer = channel_layers[alias or DEFAULT_CHANNEL_LAYER] + + def send(self, **kwargs): + """ + Send a message over the channel, taken from the kwargs. + """ + self.channel_layer.send(self.name, kwargs) + + @classmethod + def new_name(self, prefix): + """ + Returns a new channel name that's unique and not closed + with the given prefix. Does not need to be called before sending + on a channel name - just provides a way to avoid clashing for + response channels. + """ + return "%s.%s" % (prefix, "".join(random.choice(string.ascii_letters) for i in range(32))) + + def as_view(self): + """ + Returns a view version of this channel - one that takes + the request passed in and dispatches it to our channel, + serialized. + """ + from channels.adapters import view_producer + return view_producer(self.name) diff --git a/channels/consumer_registry.py b/channels/consumer_registry.py index 0e6baaa..e308282 100644 --- a/channels/consumer_registry.py +++ b/channels/consumer_registry.py @@ -1,4 +1,6 @@ import functools +from django.utils import six +from .utils import name_that_thing class ConsumerRegistry(object): """ @@ -14,10 +16,10 @@ class ConsumerRegistry(object): def add_consumer(self, consumer, channels): for channel in channels: if channel in self.consumers: - raise ValueError("Cannot register consumer %s - channel %s already consumed by %s" % ( - consumer, + raise ValueError("Cannot register consumer %s - channel %r already consumed by %s" % ( + name_that_thing(consumer), channel, - self.consumers[channel], + name_that_thing(self.consumers[channel]), )) self.consumers[channel] = consumer @@ -25,6 +27,8 @@ class ConsumerRegistry(object): """ Decorator that registers a function as a consumer. """ + if isinstance(channels, six.string_types): + channels = [channels] def inner(func): self.add_consumer(func, channels) return func diff --git a/channels/docs/integration-changes.rst b/channels/docs/integration-changes.rst index 2f6d204..c59fa38 100644 --- a/channels/docs/integration-changes.rst +++ b/channels/docs/integration-changes.rst @@ -1,23 +1,24 @@ -Message Standards +Integration Notes ================= -Some standardised message formats are used for common message types - they -are detailed below. +Django Channels is intended to be merged into Django itself; these are the +planned changes the codebase will need to undertake in that transition. -HTTP Request ------------- +* The ``channels`` package will become ``django.channels``. The expected way + of interacting with the system will be via the ``Channel`` object, -Represents a full-fledged, single HTTP request coming in from a client. -Contains the following keys: +* Obviously, the monkeypatches in ``channels.hacks`` will be replaced by + placing methods onto the objects themselves. The ``request`` and ``response`` + modules will thus no longer exist separately. -* request: An encoded Django HTTP request -* response_channel: The channel name to write responses to +Things to ponder +---------------- +* The mismatch between signals (broadcast) and channels (single-worker) means + we should probably leave patching signals into channels for the end developer. + This would also ensure the speedup improvements for empty signals keep working. -HTTP Response -------------- - -Sends a whole response to a client. -Contains the following keys: - -* response: An encoded Django HTTP response +* It's likely that the decorator-based approach of consumer registration will + mean extending Django's auto-module-loading beyond ``models`` and + ``admin`` app modules to include ``views`` and ``consumers``. There may be + a better unified approach to this. diff --git a/channels/management/commands/runserver.py b/channels/management/commands/runserver.py index 910bac9..8ed1563 100644 --- a/channels/management/commands/runserver.py +++ b/channels/management/commands/runserver.py @@ -3,7 +3,7 @@ import threading from django.core.management.commands.runserver import Command as RunserverCommand from django.core.handlers.wsgi import WSGIHandler from django.http import HttpResponse -from channels import Channel, coreg +from channels import Channel, coreg, channel_layers, DEFAULT_CHANNEL_LAYER from channels.worker import Worker from channels.utils import auto_import_consumers from channels.adapters import UrlConsumer @@ -42,7 +42,7 @@ class WSGIInterfaceHandler(WSGIHandler): def get_response(self, request): request.response_channel = Channel.new_name("django.wsgi.response") Channel("django.wsgi.request").send(**request.channel_encode()) - channel, message = Channel.receive_many([request.response_channel]) + channel, message = channel_layers[DEFAULT_CHANNEL_LAYER].receive_many([request.response_channel]) return HttpResponse.channel_decode(message) @@ -54,5 +54,5 @@ class WorkerThread(threading.Thread): def run(self): Worker( consumer_registry = coreg, - channel_class = Channel, + channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER], ).run() diff --git a/channels/utils.py b/channels/utils.py index 35f56d6..b110e7a 100644 --- a/channels/utils.py +++ b/channels/utils.py @@ -1,3 +1,4 @@ +import types from django.apps import apps @@ -6,9 +7,23 @@ def auto_import_consumers(): Auto-import consumers modules in apps """ for app_config in apps.get_app_configs(): - consumer_module_name = "%s.consumers" % (app_config.name,) - try: - __import__(consumer_module_name) - except ImportError as e: - if "no module named consumers" not in str(e).lower(): - raise + for submodule in ["consumers", "views"]: + module_name = "%s.%s" % (app_config.name, submodule) + try: + __import__(module_name) + except ImportError as e: + if "no module named %s" % submodule not in str(e).lower(): + raise + + +def name_that_thing(thing): + """ + Returns either the function/class path or just the object's repr + """ + if hasattr(thing, "__name__"): + if hasattr(thing, "__class__") and not isinstance(thing, types.FunctionType): + if thing.__class__ is not type: + return name_that_thing(thing.__class__) + if hasattr(thing, "__module__"): + return "%s.%s" % (thing.__module__, thing.__name__) + return repr(thing) diff --git a/channels/worker.py b/channels/worker.py index 73a2c1f..4f4d129 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -4,16 +4,18 @@ class Worker(object): and runs their consumers. """ - def __init__(self, consumer_registry, channel_class): + def __init__(self, consumer_registry, channel_layer): + from channels import channel_layers, DEFAULT_CHANNEL_LAYER self.consumer_registry = consumer_registry - self.channel_class = channel_class + self.channel_layer = channel_layer def run(self): """ Tries to continually dispatch messages to consumers. """ + channels = self.consumer_registry.all_channel_names() while True: - channel, message = self.channel_class.receive_many(channels) + channel, message = self.channel_layer.receive_many(channels) consumer = self.consumer_registry.consumer_for_channel(channel) consumer(**message)