Start making channels work to ASGI spec.

This commit is contained in:
Andrew Godwin 2016-01-02 10:17:45 -08:00
parent e78f75288d
commit b9464ca149
14 changed files with 405 additions and 260 deletions

View File

@ -1,21 +1,7 @@
__version__ = "0.8" __version__ = "0.8"
# Load backends, using settings if available (else falling back to a default)
DEFAULT_CHANNEL_BACKEND = "default"
from .backends import BackendManager # isort:skip
from django.conf import settings # isort:skip
channel_backends = BackendManager(
getattr(settings, "CHANNEL_BACKENDS", {
DEFAULT_CHANNEL_BACKEND: {
"BACKEND": "channels.backends.memory.InMemoryChannelBackend",
"ROUTING": {},
}
})
)
default_app_config = 'channels.apps.ChannelsConfig' default_app_config = 'channels.apps.ChannelsConfig'
DEFAULT_CHANNEL_LAYER = 'default'
# Promote channel to top-level (down here to avoid circular import errs) from .asgi import channel_layers # NOQA isort:skip
from .channel import Channel, Group # NOQA isort:skip from .channel import Channel, Group # NOQA isort:skip

View File

@ -1,29 +1,9 @@
import functools import functools
from django.core.handlers.base import BaseHandler
from django.http import HttpRequest, HttpResponse from django.http import HttpRequest, HttpResponse
from channels import Channel from channels import Channel
class UrlConsumer(object):
"""
Dispatches channel HTTP requests into django's URL system.
"""
def __init__(self):
self.handler = BaseHandler()
self.handler.load_middleware()
def __call__(self, message):
request = HttpRequest.channel_decode(message.content)
try:
response = self.handler.get_response(request)
except HttpResponse.ResponseLater:
return
message.reply_channel.send(response.channel_encode())
def view_producer(channel_name): def view_producer(channel_name):
""" """
Returns a new view function that actually writes the request to a channel Returns a new view function that actually writes the request to a channel

81
channels/asgi.py Normal file
View File

@ -0,0 +1,81 @@
from __future__ import unicode_literals
import django
from django.conf import settings
from django.utils.module_loading import import_string
from .consumer_registry import ConsumerRegistry
class InvalidChannelLayerError(ValueError):
pass
class ChannelLayerManager(object):
"""
Takes a settings dictionary of backends and initialises them on request.
"""
def __init__(self):
self.backends = {}
@property
def configs(self):
# Lazy load settings so we can be imported
return getattr(settings, "CHANNEL_LAYERS", {})
def make_backend(self, name):
# Load the backend class
try:
backend_class = import_string(self.configs[name]['BACKEND'])
except KeyError:
raise InvalidChannelLayerError("No BACKEND specified for %s" % name)
except ImportError:
raise InvalidChannelLayerError(
"Cannot import BACKEND %r specified for %s" % (self.configs[name]['BACKEND'], name)
)
# Get routing
try:
routing = self.configs[name]['ROUTING']
except KeyError:
raise InvalidChannelLayerError("No ROUTING specified for %s" % name)
# Initialise and pass config
asgi_layer = backend_class(**self.configs[name].get("CONFIG", {}))
return ChannelLayerWrapper(
channel_layer=asgi_layer,
alias=name,
routing=routing,
)
def __getitem__(self, key):
if key not in self.backends:
self.backends[key] = self.make_backend(key)
return self.backends[key]
class ChannelLayerWrapper(object):
"""
Top level channel layer wrapper, which contains both the ASGI channel
layer object as well as alias and routing information specific to Django.
"""
def __init__(self, channel_layer, alias, routing):
self.channel_layer = channel_layer
self.alias = alias
self.routing = routing
self.registry = ConsumerRegistry(self.routing)
def __getattr__(self, name):
return getattr(self.channel_layer, name)
def get_channel_layer(alias="default"):
"""
Returns the raw ASGI channel layer for this project.
"""
django.setup(set_prefix=False)
return channel_layers[alias].channel_layer
# Default global instance of the channel layer manager
channel_layers = ChannelLayerManager()

View File

@ -1,7 +1,7 @@
import random from __future__ import unicode_literals
import string
from channels import DEFAULT_CHANNEL_BACKEND, channel_backends from django.utils import six
from channels import DEFAULT_CHANNEL_LAYER, channel_layers
class Channel(object): class Channel(object):
@ -16,15 +16,17 @@ class Channel(object):
"default" one by default. "default" one by default.
""" """
def __init__(self, name, alias=DEFAULT_CHANNEL_BACKEND, channel_backend=None): def __init__(self, name, alias=DEFAULT_CHANNEL_LAYER, channel_layer=None):
""" """
Create an instance for the channel named "name" Create an instance for the channel named "name"
""" """
if isinstance(name, six.binary_type):
name = name.decode("ascii")
self.name = name self.name = name
if channel_backend: if channel_layer:
self.channel_backend = channel_backend self.channel_layer = channel_layer
else: else:
self.channel_backend = channel_backends[alias] self.channel_layer = channel_layers[alias]
def send(self, content): def send(self, content):
""" """
@ -32,17 +34,7 @@ class Channel(object):
""" """
if not isinstance(content, dict): if not isinstance(content, dict):
raise ValueError("You can only send dicts as content on channels.") raise ValueError("You can only send dicts as content on channels.")
self.channel_backend.send(self.name, content) self.channel_layer.send(self.name, content)
@classmethod
def new_name(self, prefix):
"""
Returns a new channel name that's unique and not closed
with the given prefix. Does not need to be called before sending
on a channel name - just provides a way to avoid clashing for
response channels.
"""
return "%s.%s" % (prefix, "".join(random.choice(string.ascii_letters) for i in range(32)))
def as_view(self): def as_view(self):
""" """
@ -63,27 +55,29 @@ class Group(object):
of the group after an expiry time (keep re-adding to keep them in). of the group after an expiry time (keep re-adding to keep them in).
""" """
def __init__(self, name, alias=DEFAULT_CHANNEL_BACKEND, channel_backend=None): def __init__(self, name, alias=DEFAULT_CHANNEL_LAYER, channel_layer=None):
if isinstance(name, six.binary_type):
name = name.decode("ascii")
self.name = name self.name = name
if channel_backend: if channel_layer:
self.channel_backend = channel_backend self.channel_layer = channel_layer
else: else:
self.channel_backend = channel_backends[alias] self.channel_layer = channel_layers[alias]
def add(self, channel): def add(self, channel):
if isinstance(channel, Channel): if isinstance(channel, Channel):
channel = channel.name channel = channel.name
self.channel_backend.group_add(self.name, channel) self.channel_layer.group_add(self.name, channel)
def discard(self, channel): def discard(self, channel):
if isinstance(channel, Channel): if isinstance(channel, Channel):
channel = channel.name channel = channel.name
self.channel_backend.group_discard(self.name, channel) self.channel_layer.group_discard(self.name, channel)
def channels(self): def channels(self):
return self.channel_backend.group_channels(self.name) return self.channel_layer.group_channels(self.name)
def send(self, content): def send(self, content):
if not isinstance(content, dict): if not isinstance(content, dict):
raise ValueError("You can only send dicts as content on channels.") raise ValueError("You can only send dicts as content on channels.")
self.channel_backend.send_group(self.name, content) self.channel_layer.send_group(self.name, content)

View File

@ -1,3 +1,5 @@
from __future__ import unicode_literals
import importlib import importlib
from django.core.exceptions import ImproperlyConfigured from django.core.exceptions import ImproperlyConfigured
@ -35,6 +37,11 @@ class ConsumerRegistry(object):
# Upconvert if you just pass in a string for channels # Upconvert if you just pass in a string for channels
if isinstance(channels, six.string_types): if isinstance(channels, six.string_types):
channels = [channels] channels = [channels]
# Make sure all channels are byte strings
channels = [
channel.decode("ascii") if isinstance(channel, six.binary_type) else channel
for channel in channels
]
# Import any consumer referenced as string # Import any consumer referenced as string
if isinstance(consumer, six.string_types): if isinstance(consumer, six.string_types):
module_name, variable_name = consumer.rsplit(".", 1) module_name, variable_name = consumer.rsplit(".", 1)

View File

@ -1,33 +1,11 @@
from django.core.handlers.base import BaseHandler
from django.http.request import HttpRequest
from django.http.response import HttpResponseBase
from .request import decode_request, encode_request
from .response import ResponseLater, decode_response, encode_response
def monkeypatch_django(): def monkeypatch_django():
""" """
Monkeypatches support for us into parts of Django. Monkeypatches support for us into parts of Django.
""" """
# Request encode/decode
HttpRequest.channel_encode = encode_request
HttpRequest.channel_decode = staticmethod(decode_request)
# Response encode/decode
HttpResponseBase.channel_encode = encode_response
HttpResponseBase.channel_decode = staticmethod(decode_response)
HttpResponseBase.ResponseLater = ResponseLater
# Allow ResponseLater to propagate above handler
BaseHandler.old_handle_uncaught_exception = BaseHandler.handle_uncaught_exception
BaseHandler.handle_uncaught_exception = new_handle_uncaught_exception
# Ensure that the staticfiles version of runserver bows down to us # Ensure that the staticfiles version of runserver bows down to us
# This one is particularly horrible # This one is particularly horrible
from django.contrib.staticfiles.management.commands.runserver import Command as StaticRunserverCommand from django.contrib.staticfiles.management.commands.runserver import Command as StaticRunserverCommand
from .management.commands.runserver import Command as RunserverCommand from .management.commands.runserver import Command as RunserverCommand
StaticRunserverCommand.__bases__ = (RunserverCommand, ) StaticRunserverCommand.__bases__ = (RunserverCommand, )
def new_handle_uncaught_exception(self, request, resolver, exc_info):
if exc_info[0] is ResponseLater:
raise
return BaseHandler.old_handle_uncaught_exception(self, request, resolver, exc_info)

191
channels/handler.py Normal file
View File

@ -0,0 +1,191 @@
from __future__ import unicode_literals
import sys
import logging
from threading import Lock
from django import http
from django.core.handlers import base
from django.core import signals
from django.core.urlresolvers import set_script_prefix
from django.utils.functional import cached_property
logger = logging.getLogger('django.request')
class AsgiRequest(http.HttpRequest):
"""
Custom request subclass that decodes from an ASGI-standard request
dict, and wraps request body handling.
"""
def __init__(self, message):
self.message = message
self.reply_channel = self.message['reply_channel']
self._content_length = 0
# Path info
self.path = self.message['path']
self.script_name = self.message.get('root_path', '')
if self.script_name:
# TODO: Better is-prefix checking, slash handling?
self.path_info = self.path[len(self.script_name):]
else:
self.path_info = self.path
# HTTP basics
self.method = self.message['method'].upper()
self.META = {
"REQUEST_METHOD": self.method,
"QUERY_STRING": self.message.get('query_string', ''),
}
if self.message.get('client', None):
self.META['REMOTE_ADDR'] = self.message['client'][0]
self.META['REMOTE_HOST'] = self.META['REMOTE_ADDR']
self.META['REMOTE_PORT'] = self.message['client'][1]
if self.message.get('server', None):
self.META['SERVER_NAME'] = self.message['server'][0]
self.META['SERVER_PORT'] = self.message['server'][1]
# Headers go into META
for name, value in self.message.get('headers', {}).items():
if name == "content_length":
corrected_name = "CONTENT_LENGTH"
elif name == "content_type":
corrected_name = "CONTENT_TYPE"
else:
corrected_name = 'HTTP_%s' % name.upper().replace("-", "_")
self.META[corrected_name] = value
# Pull out content length info
if self.META.get('CONTENT_LENGTH', None):
try:
self._content_length = int(self.META['CONTENT_LENGTH'])
except (ValueError, TypeError):
pass
# TODO: body handling
self._body = ""
# Other bits
self.resolver_match = None
@cached_property
def GET(self):
return http.QueryDict(
self.message.get('query_string', ''),
encoding=self._encoding,
)
def _get_post(self):
if not hasattr(self, '_post'):
self._load_post_and_files()
return self._post
def _set_post(self, post):
self._post = post
POST = property(_get_post, _set_post)
@cached_property
def COOKIES(self):
return http.parse_cookie(self.META.get('HTTP_COOKIE', ''))
class AsgiHandler(base.BaseHandler):
"""
Handler for ASGI requests for the view system only (it will have got here
after traversing the dispatch-by-channel-name system, which decides it's
a HTTP request)
"""
initLock = Lock()
request_class = AsgiRequest
def __call__(self, message):
# Set up middleware if needed. We couldn't do this earlier, because
# settings weren't available.
if self._request_middleware is None:
with self.initLock:
# Check that middleware is still uninitialized.
if self._request_middleware is None:
self.load_middleware()
# Set script prefix from message root_path
set_script_prefix(message.get('root_path', ''))
signals.request_started.send(sender=self.__class__, message=message)
# Run request through view system
try:
request = self.request_class(message)
except UnicodeDecodeError:
logger.warning(
'Bad Request (UnicodeDecodeError)',
exc_info=sys.exc_info(),
extra={
'status_code': 400,
}
)
response = http.HttpResponseBadRequest()
else:
response = self.get_response(request)
# Transform response into messages, which we yield back to caller
for message in self.encode_response(response):
# TODO: file_to_stream
yield message
def encode_response(self, response):
"""
Encodes a Django HTTP response into an ASGI http.response message(s).
"""
# Collect cookies into headers
response_headers = [(str(k), str(v)) for k, v in response.items()]
for c in response.cookies.values():
response_headers.append((str('Set-Cookie'), str(c.output(header=''))))
# Make initial response message
message = {
"status": response.status_code,
"status_text": response.reason_phrase,
"headers": response_headers,
}
# Streaming responses need to be pinned to their iterator
if response.streaming:
for part in response.streaming_content:
for chunk in self.chunk_bytes(part):
message['content'] = chunk
message['more_content'] = True
yield message
message = {}
# Final closing message
yield {
"more_content": False,
}
# Other responses just need chunking
else:
# Yield chunks of response
for chunk, last in self.chunk_bytes(response.content):
message['content'] = chunk
message['more_content'] = not last
yield message
message = {}
def chunk_bytes(self, data):
"""
Chunks some data into chunks based on the current ASGI channel layer's
message size and reasonable defaults.
Yields (chunk, last_chunk) tuples.
"""
CHUNK_SIZE = 512 * 1024
position = 0
while position < len(data):
yield (
data[position:position+CHUNK_SIZE],
(position + CHUNK_SIZE) >= len(data),
)
position += CHUNK_SIZE
class ViewConsumer(object):
"""
Dispatches channel HTTP requests into django's URL/View system.
"""
def __init__(self):
self.handler = AsgiHandler()
def __call__(self, message):
for reply_message in self.handler(message.content):
message.reply_channel.send(reply_message)

View File

@ -3,8 +3,8 @@ import threading
from django.core.management.commands.runserver import \ from django.core.management.commands.runserver import \
Command as RunserverCommand Command as RunserverCommand
from channels import DEFAULT_CHANNEL_BACKEND, channel_backends from channels import DEFAULT_CHANNEL_LAYER, channel_layers
from channels.adapters import UrlConsumer from channels.handler import ViewConsumer
from channels.interfaces.wsgi import WSGIInterface from channels.interfaces.wsgi import WSGIInterface
from channels.log import setup_logger from channels.log import setup_logger
from channels.worker import Worker from channels.worker import Worker
@ -21,7 +21,7 @@ class Command(RunserverCommand):
""" """
Returns the default WSGI handler for the runner. Returns the default WSGI handler for the runner.
""" """
return WSGIInterface(self.channel_backend) return WSGIInterface(self.channel_layer)
def run(self, *args, **options): def run(self, *args, **options):
# Run the rest # Run the rest
@ -29,16 +29,16 @@ class Command(RunserverCommand):
def inner_run(self, *args, **options): def inner_run(self, *args, **options):
# Check a handler is registered for http reqs # Check a handler is registered for http reqs
self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] self.channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER]
if not self.channel_backend.registry.consumer_for_channel("http.request"): if not self.channel_layer.registry.consumer_for_channel("http.request"):
# Register the default one # Register the default one
self.channel_backend.registry.add_consumer(UrlConsumer(), ["http.request"]) self.channel_layer.registry.add_consumer(ViewConsumer(), ["http.request"])
# Note that this is the right one on the console # Note that this is the right one on the console
self.logger.info("Worker thread running, channels enabled") self.logger.info("Worker thread running, channels enabled")
if self.channel_backend.local_only: if self.channel_layer.local_only:
self.logger.info("Local channel backend detected, no remote channels support") self.logger.info("Local channel backend detected, no remote channels support")
# Launch a worker thread # Launch a worker thread
worker = WorkerThread(self.channel_backend) worker = WorkerThread(self.channel_layer)
worker.daemon = True worker.daemon = True
worker.start() worker.start()
# Run rest of inner run # Run rest of inner run
@ -50,9 +50,9 @@ class WorkerThread(threading.Thread):
Class that runs a worker Class that runs a worker
""" """
def __init__(self, channel_backend): def __init__(self, channel_layer):
super(WorkerThread, self).__init__() super(WorkerThread, self).__init__()
self.channel_backend = channel_backend self.channel_layer = channel_layer
def run(self): def run(self):
Worker(channel_backend=self.channel_backend).run() Worker(channel_layer=self.channel_layer).run()

View File

@ -1,8 +1,9 @@
from __future__ import unicode_literals
from django.core.management import BaseCommand, CommandError from django.core.management import BaseCommand, CommandError
from channels import channel_backends, DEFAULT_CHANNEL_BACKEND from channels import channel_layers, DEFAULT_CHANNEL_LAYER
from channels.log import setup_logger from channels.log import setup_logger
from channels.adapters import UrlConsumer from channels.handler import ViewConsumer
from channels.worker import Worker from channels.worker import Worker
@ -12,25 +13,20 @@ class Command(BaseCommand):
# Get the backend to use # Get the backend to use
self.verbosity = options.get("verbosity", 1) self.verbosity = options.get("verbosity", 1)
self.logger = setup_logger('django.channels', self.verbosity) self.logger = setup_logger('django.channels', self.verbosity)
channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER]
if channel_backend.local_only:
raise CommandError(
"You have a process-local channel backend configured, and so cannot run separate workers.\n"
"Configure a network-based backend in CHANNEL_BACKENDS to use this command."
)
# Check a handler is registered for http reqs # Check a handler is registered for http reqs
if not channel_backend.registry.consumer_for_channel("http.request"): if not channel_layer.registry.consumer_for_channel("http.request"):
# Register the default one # Register the default one
channel_backend.registry.add_consumer(UrlConsumer(), ["http.request"]) channel_layer.registry.add_consumer(ViewConsumer(), ["http.request"])
# Launch a worker # Launch a worker
self.logger.info("Running worker against backend %s", channel_backend) self.logger.info("Running worker against backend %s", channel_layer.alias)
# Optionally provide an output callback # Optionally provide an output callback
callback = None callback = None
if self.verbosity > 1: if self.verbosity > 1:
callback = self.consumer_called callback = self.consumer_called
# Run the worker # Run the worker
try: try:
Worker(channel_backend=channel_backend, callback=callback).run() Worker(channel_layer=channel_layer, callback=callback).run()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass

View File

@ -1,3 +1,5 @@
from __future__ import unicode_literals
from .channel import Channel from .channel import Channel
@ -17,9 +19,9 @@ class Message(object):
""" """
pass pass
def __init__(self, content, channel, channel_backend, reply_channel=None): def __init__(self, content, channel, channel_layer, reply_channel=None):
self.content = content self.content = content
self.channel = channel self.channel = channel
self.channel_backend = channel_backend self.channel_layer = channel_layer
if reply_channel: if reply_channel:
self.reply_channel = Channel(reply_channel, channel_backend=self.channel_backend) self.reply_channel = Channel(reply_channel, channel_layer=self.channel_layer)

View File

@ -1,73 +0,0 @@
from django.http import HttpRequest
from django.http.request import QueryDict
from django.utils.datastructures import MultiValueDict
def encode_request(request):
"""
Encodes a request to JSON-compatible datastructures
"""
# TODO: More stuff
value = {
"get": dict(request.GET.lists()),
"post": dict(request.POST.lists()),
"cookies": request.COOKIES,
"headers": {
k[5:].lower(): v
for k, v in request.META.items()
if k.lower().startswith("http_")
},
"path": request.path,
"root_path": request.META.get("SCRIPT_NAME", ""),
"method": request.method,
"reply_channel": request.reply_channel,
"server": [
request.META.get("SERVER_NAME", None),
request.META.get("SERVER_PORT", None),
],
"client": [
request.META.get("REMOTE_ADDR", None),
request.META.get("REMOTE_PORT", None),
],
}
return value
def decode_request(value):
"""
Decodes a request JSONish value to a HttpRequest object.
"""
request = HttpRequest()
request.GET = CustomQueryDict(value['get'])
request.POST = CustomQueryDict(value['post'])
request.COOKIES = value['cookies']
request.path = value['path']
request.method = value['method']
request.reply_channel = value['reply_channel']
# Channels requests are more high-level than the dumping ground that is
# META; re-combine back into it
request.META = {
"REQUEST_METHOD": value["method"],
"SERVER_NAME": value["server"][0],
"SERVER_PORT": value["server"][1],
"REMOTE_ADDR": value["client"][0],
"REMOTE_HOST": value["client"][0], # Not the DNS name, hopefully fine.
"SCRIPT_NAME": value["root_path"],
}
for header, header_value in value.get("headers", {}).items():
request.META["HTTP_%s" % header.upper()] = header_value
# Derive path_info from script root
request.path_info = request.path
if request.META.get("SCRIPT_NAME", ""):
request.path_info = request.path_info[len(request.META["SCRIPT_NAME"]):]
return request
class CustomQueryDict(QueryDict):
"""
Custom override of QueryDict that sets things directly.
"""
def __init__(self, values, mutable=False, encoding=None):
""" mutable and encoding are ignored :( """
MultiValueDict.__init__(self, values)

View File

@ -1,43 +0,0 @@
from django.http import HttpResponse
from django.utils.six import PY3
def encode_response(response):
"""
Encodes a response to JSON-compatible datastructures
"""
value = {
"content_type": getattr(response, "content_type", None),
"content": response.content,
"status": response.status_code,
"headers": list(response._headers.values()),
"cookies": [v.output(header="") for _, v in response.cookies.items()]
}
if PY3:
value["content"] = value["content"].decode('utf8')
response.close()
return value
def decode_response(value):
"""
Decodes a response JSONish value to a HttpResponse object.
"""
response = HttpResponse(
content=value['content'],
content_type=value['content_type'],
status=value['status'],
)
for cookie in value['cookies']:
response.cookies.load(cookie)
response._headers = {k.lower(): (k, v) for k, v in value['headers']}
return response
class ResponseLater(Exception):
"""
Class that represents a response which will be sent down the response
channel later. Used to move a django view-based segment onto the next
task, as otherwise we'd need to write some kind of fake response.
"""
pass

View File

@ -1,4 +1,7 @@
from __future__ import unicode_literals
import logging import logging
import time
from .message import Message from .message import Message
from .utils import name_that_thing from .utils import name_that_thing
@ -12,30 +15,35 @@ class Worker(object):
and runs their consumers. and runs their consumers.
""" """
def __init__(self, channel_backend, callback=None): def __init__(self, channel_layer, callback=None):
self.channel_backend = channel_backend self.channel_layer = channel_layer
self.callback = callback self.callback = callback
def run(self): def run(self):
""" """
Tries to continually dispatch messages to consumers. Tries to continually dispatch messages to consumers.
""" """
channels = self.channel_backend.registry.all_channel_names() channels = self.channel_layer.registry.all_channel_names()
while True: while True:
channel, content = self.channel_backend.receive_many_blocking(channels) channel, content = self.channel_layer.receive_many(channels, block=True)
# If no message, stall a little to avoid busy-looping then continue
if channel is None:
time.sleep(0.01)
continue
# Create message wrapper
message = Message( message = Message(
content=content, content=content,
channel=channel, channel=channel,
channel_backend=self.channel_backend, channel_layer=self.channel_layer,
reply_channel=content.get("reply_channel", None), reply_channel=content.get("reply_channel", None),
) )
# Handle the message # Handle the message
consumer = self.channel_backend.registry.consumer_for_channel(channel) consumer = self.channel_layer.registry.consumer_for_channel(channel)
if self.callback: if self.callback:
self.callback(channel, message) self.callback(channel, message)
try: try:
consumer(message) consumer(message)
except Message.Requeue: except Message.Requeue:
self.channel_backend.send(channel, content) self.channel_layer.send(channel, content)
except: except:
logger.exception("Error processing message with consumer %s:", name_that_thing(consumer)) logger.exception("Error processing message with consumer %s:", name_that_thing(consumer))

View File

@ -101,12 +101,18 @@ Channels and Messages
--------------------- ---------------------
All communication in an ASGI stack uses *messages* sent over *channels*. All communication in an ASGI stack uses *messages* sent over *channels*.
All messages must be a ``dict`` at the top level of the object, and be All messages must be a ``dict`` at the top level of the object, and
serializable by the built-in ``json`` serializer module (though the contain only the following types to ensure serializability:
actual serialization a channel layer uses is up to the implementation;
we use ``json`` as the lowest common denominator).
Channels are identified by a byte string name consisting only of ASCII * Byte strings
* Unicode strings
* Integers (no longs)
* Lists (tuples should be treated as lists)
* Dicts (keys must be unicode strings)
* Booleans
* None
Channels are identified by a unicode string name consisting only of ASCII
letters, numbers, numerical digits, periods (``.``), dashes (``-``) letters, numbers, numerical digits, periods (``.``), dashes (``-``)
and underscores (``_``), plus an optional prefix character (see below). and underscores (``_``), plus an optional prefix character (see below).
@ -270,20 +276,20 @@ A *channel layer* should provide an object with these attributes
(all function arguments are positional): (all function arguments are positional):
* ``send(channel, message)``, a callable that takes two arguments; the * ``send(channel, message)``, a callable that takes two arguments; the
channel to send on, as a byte string, and the message channel to send on, as a unicode string, and the message
to send, as a serializable ``dict``. to send, as a serializable ``dict``.
* ``receive_many(channels, block=False)``, a callable that takes a list of channel * ``receive_many(channels, block=False)``, a callable that takes a list of channel
names as byte strings, and returns with either ``(None, None)`` names as unicode strings, and returns with either ``(None, None)``
or ``(channel, message)`` if a message is available. If ``block`` is True, then or ``(channel, message)`` if a message is available. If ``block`` is True, then
it will not return until after a built-in timeout or a message arrives; if it will not return until after a built-in timeout or a message arrives; if
``block`` is false, it will always return immediately. It is perfectly ``block`` is false, it will always return immediately. It is perfectly
valid to ignore ``block`` and always return immediately. valid to ignore ``block`` and always return immediately.
* ``new_channel(pattern)``, a callable that takes a byte string pattern, * ``new_channel(pattern)``, a callable that takes a unicode string pattern,
and returns a new valid channel name that does not already exist, by and returns a new valid channel name that does not already exist, by
substituting any occurrences of the question mark character ``?`` in substituting any occurrences of the question mark character ``?`` in
``pattern`` with a single random byte string and checking for ``pattern`` with a single random unicode string and checking for
existence of that name in the channel layer. This is NOT called prior to existence of that name in the channel layer. This is NOT called prior to
a message being sent on a channel, and should not be used for channel a message being sent on a channel, and should not be used for channel
initialization. initialization.
@ -298,14 +304,14 @@ A *channel layer* should provide an object with these attributes
A channel layer implementing the ``groups`` extension must also provide: A channel layer implementing the ``groups`` extension must also provide:
* ``group_add(group, channel)``, a callable that takes a ``channel`` and adds * ``group_add(group, channel)``, a callable that takes a ``channel`` and adds
it to the group given by ``group``. Both are byte strings. If the channel it to the group given by ``group``. Both are unicode strings. If the channel
is already in the group, the function should return normally. is already in the group, the function should return normally.
* ``group_discard(group, channel)``, a callable that removes the ``channel`` * ``group_discard(group, channel)``, a callable that removes the ``channel``
from the ``group`` if it is in it, and does nothing otherwise. from the ``group`` if it is in it, and does nothing otherwise.
* ``send_group(group, message)``, a callable that takes two positional * ``send_group(group, message)``, a callable that takes two positional
arguments; the group to send to, as a byte string, and the message arguments; the group to send to, as a unicode string, and the message
to send, as a serializable ``dict``. to send, as a serializable ``dict``.
A channel layer implementing the ``statistics`` extension must also provide: A channel layer implementing the ``statistics`` extension must also provide:
@ -423,11 +429,11 @@ Keys:
* ``reply_channel``: Channel name for responses and server pushes, in * ``reply_channel``: Channel name for responses and server pushes, in
format ``http.response.?`` format ``http.response.?``
* ``http_version``: Byte string, one of ``1.0``, ``1.1`` or ``2``. * ``http_version``: Unicode string, one of ``1.0``, ``1.1`` or ``2``.
* ``method``: Byte string HTTP method name, uppercased. * ``method``: Unicode string HTTP method name, uppercased.
* ``scheme``: Byte string URL scheme portion (likely ``http`` or ``https``). * ``scheme``: Unicode string URL scheme portion (likely ``http`` or ``https``).
Optional (but must not be empty), default is ``http``. Optional (but must not be empty), default is ``http``.
* ``path``: Byte string HTTP path from URL. * ``path``: Byte string HTTP path from URL.
@ -442,20 +448,46 @@ Keys:
* ``headers``: Dict of ``{name: value}``, where ``name`` is the lowercased * ``headers``: Dict of ``{name: value}``, where ``name`` is the lowercased
HTTP header name as byte string and ``value`` is the header value as a byte HTTP header name as byte string and ``value`` is the header value as a byte
string. If multiple headers with the same name are received, they should string. If multiple headers with the same name are received, they should
be concatenated into a single header as per . be concatenated into a single header as per RFC 2616.
* ``body``: Body of the request, as a byte string. Optional, defaults to empty * ``body``: Body of the request, as a byte string. Optional, defaults to empty
string. string. If ``body_channel`` is set, treat as start of body and concatenate
on further chunks.
* ``client``: List of ``[host, port]`` where ``host`` is a byte string of the * ``body_channel``: Single-reader unicode string channel name that contains
Request Body Chunk messages representing a large request body.
Optional, defaults to None. Chunks append to ``body`` if set. Presence of
a channel indicates at least one Request Body Chunk message needs to be read,
and then further consumption keyed off of the ``more_content`` key in those
messages.
* ``client``: List of ``[host, port]`` where ``host`` is a unicode string of the
remote host's IPv4 or IPv6 address, and ``port`` is the remote port as an remote host's IPv4 or IPv6 address, and ``port`` is the remote port as an
integer. Optional, defaults to ``None``. integer. Optional, defaults to ``None``.
* ``server``: List of ``[host, port]`` where ``host`` is the listening address * ``server``: List of ``[host, port]`` where ``host`` is the listening address
for this server as a byte string, and ``port`` is the integer listening port. for this server as a unicode string, and ``port`` is the integer listening port.
Optional, defaults to ``None``. Optional, defaults to ``None``.
Request Body Chunk
''''''''''''''''''
Must be sent after an initial Response.
Channel: ``http.request.body.?``
Keys:
* ``content``: Byte string of HTTP body content, will be concatenated onto
previously received ``content`` values and ``body`` key in Request.
* ``more_content``: Boolean value signifying if there is additional content
to come (as part of a Request Body Chunk message). If ``False``, request will
be taken as complete, and any further messages on the channel
will be ignored. Optional, defaults to ``False``.
Response Response
'''''''' ''''''''
@ -475,7 +507,8 @@ Keys:
string header name, and ``value`` is the byte string header value. Order string header name, and ``value`` is the byte string header value. Order
should be preserved in the HTTP response. should be preserved in the HTTP response.
* ``content``: Byte string of HTTP body content * ``content``: Byte string of HTTP body content.
Optional, defaults to empty string.
* ``more_content``: Boolean value signifying if there is additional content * ``more_content``: Boolean value signifying if there is additional content
to come (as part of a Response Chunk message). If ``False``, response will to come (as part of a Response Chunk message). If ``False``, response will
@ -534,7 +567,7 @@ Keys:
* ``reply_channel``: Channel name for sending data, in * ``reply_channel``: Channel name for sending data, in
format ``websocket.send.?`` format ``websocket.send.?``
* ``scheme``: Byte string URL scheme portion (likely ``ws`` or ``wss``). * ``scheme``: Unicode string URL scheme portion (likely ``ws`` or ``wss``).
Optional (but must not be empty), default is ``ws``. Optional (but must not be empty), default is ``ws``.
* ``path``: Byte string HTTP path from URL. * ``path``: Byte string HTTP path from URL.
@ -551,12 +584,12 @@ Keys:
string. If multiple headers with the same name are received, they should string. If multiple headers with the same name are received, they should
be concatenated into a single header as per . be concatenated into a single header as per .
* ``client``: List of ``[host, port]`` where ``host`` is a byte string of the * ``client``: List of ``[host, port]`` where ``host`` is a unicode string of the
remote host's IPv4 or IPv6 address, and ``port`` is the remote port as an remote host's IPv4 or IPv6 address, and ``port`` is the remote port as an
integer. Optional, defaults to ``None``. integer. Optional, defaults to ``None``.
* ``server``: List of ``[host, port]`` where ``host`` is the listening address * ``server``: List of ``[host, port]`` where ``host`` is the listening address
for this server as a byte string, and ``port`` is the integer listening port. for this server as a unicode string, and ``port`` is the integer listening port.
Optional, defaults to ``None``. Optional, defaults to ``None``.
@ -644,12 +677,12 @@ Keys:
* ``data``: Byte string of UDP datagram payload. * ``data``: Byte string of UDP datagram payload.
* ``client``: List of ``[host, port]`` where ``host`` is a byte string of the * ``client``: List of ``[host, port]`` where ``host`` is a unicode string of the
remote host's IPv4 or IPv6 address, and ``port`` is the remote port as an remote host's IPv4 or IPv6 address, and ``port`` is the remote port as an
integer. integer.
* ``server``: List of ``[host, port]`` where ``host`` is the listening address * ``server``: List of ``[host, port]`` where ``host`` is the listening address
for this server as a byte string, and ``port`` is the integer listening port. for this server as a unicode string, and ``port`` is the integer listening port.
Optional, defaults to ``None``. Optional, defaults to ``None``.
@ -700,8 +733,13 @@ underlying implementation, then any values should be kept within the lower
This document will never specify just *string* - all strings are one of the This document will never specify just *string* - all strings are one of the
two types. two types.
Channel and group names are always byte strings, with the additional limitation Some serializers, such as ``json``, cannot differentiate between byte
that they only use the following characters: strings and unicode strings; these should include logic to box one type as
the other (for example, encoding byte strings as base64 unicode strings with
a preceding special character, e.g. U+FFFF).
Channel and group names are always unicode strings, with the additional
limitation that they only use the following characters:
* ASCII letters * ASCII letters
* The digits ``0`` through ``9`` * The digits ``0`` through ``9``
@ -747,7 +785,7 @@ WSGI's ``environ`` variable to the Request message:
* ``CONTENT_TYPE`` can be extracted from ``headers`` * ``CONTENT_TYPE`` can be extracted from ``headers``
* ``CONTENT_LENGTH`` can be extracted from ``headers`` * ``CONTENT_LENGTH`` can be extracted from ``headers``
* ``SERVER_NAME`` and ``SERVER_PORT`` are in ``server`` * ``SERVER_NAME`` and ``SERVER_PORT`` are in ``server``
* ``REMOTE_HOST`` and ``REMOTE_PORT`` are in ``client`` * ``REMOTE_HOST``/``REMOTE_ADDR`` and ``REMOTE_PORT`` are in ``client``
* ``SERVER_PROTOCOL`` is encoded in ``http_version`` * ``SERVER_PROTOCOL`` is encoded in ``http_version``
* ``wsgi.url_scheme`` is ``scheme`` * ``wsgi.url_scheme`` is ``scheme``
* ``wsgi.input`` is a StringIO around ``body`` * ``wsgi.input`` is a StringIO around ``body``