Merge branch 'doc-update'

This commit is contained in:
Faris Chebib 2015-09-10 16:21:25 -06:00
commit 38800d24eb
22 changed files with 683 additions and 300 deletions

View File

@ -1,4 +1,4 @@
__version__ = "0.1.1"
__version__ = "0.8"
# Load backends, using settings if available (else falling back to a default)
DEFAULT_CHANNEL_BACKEND = "default"

66
channels/auth.py Normal file
View File

@ -0,0 +1,66 @@
import functools
from django.contrib import auth
from .decorators import channel_session, http_session
def transfer_user(from_session, to_session):
"""
Transfers user from HTTP session to channel session.
"""
to_session[auth.BACKEND_SESSION_KEY] = from_session[auth.BACKEND_SESSION_KEY]
to_session[auth.SESSION_KEY] = from_session[auth.SESSION_KEY]
to_session[auth.HASH_SESSION_KEY] = from_session[auth.HASH_SESSION_KEY]
def channel_session_user(func):
"""
Presents a message.user attribute obtained from a user ID in the channel
session, rather than in the http_session. Turns on channel session implicitly.
"""
@channel_session
@functools.wraps(func)
def inner(message, *args, **kwargs):
# If we didn't get a session, then we don't get a user
if not hasattr(message, "channel_session"):
raise ValueError("Did not see a channel session to get auth from")
if message.channel_session is None:
message.user = None
# Otherwise, be a bit naughty and make a fake Request with just
# a "session" attribute (later on, perhaps refactor contrib.auth to
# pass around session rather than request)
else:
fake_request = type("FakeRequest", (object, ), {"session": message.channel_session})
message.user = auth.get_user(fake_request)
# Run the consumer
return func(message, *args, **kwargs)
return inner
def http_session_user(func):
"""
Wraps a HTTP or WebSocket consumer (or any consumer of messages
that provides a "COOKIES" attribute) to provide both a "session"
attribute and a "user" attibute, like AuthMiddleware does.
This runs http_session() to get a session to hook auth off of.
If the user does not have a session cookie set, both "session"
and "user" will be None.
"""
@http_session
@functools.wraps(func)
def inner(message, *args, **kwargs):
# If we didn't get a session, then we don't get a user
if not hasattr(message, "http_session"):
raise ValueError("Did not see a http session to get auth from")
if message.http_session is None:
message.user = None
# Otherwise, be a bit naughty and make a fake Request with just
# a "session" attribute (later on, perhaps refactor contrib.auth to
# pass around session rather than request)
else:
fake_request = type("FakeRequest", (object, ), {"session": message.http_session})
message.user = auth.get_user(fake_request)
# Run the consumer
return func(message, *args, **kwargs)
return inner

View File

@ -93,3 +93,16 @@ class BaseChannelBackend(object):
def __str__(self):
return self.__class__.__name__
def lock_channel(self, channel):
"""
Attempts to get a lock on the named channel. Returns True if lock
obtained, False if lock not obtained.
"""
raise NotImplementedError()
def unlock_channel(self, channel):
"""
Unlocks the named channel. Always succeeds.
"""
raise NotImplementedError()

View File

@ -3,7 +3,7 @@ import json
import datetime
from django.apps.registry import Apps
from django.db import models, connections, DEFAULT_DB_ALIAS
from django.db import models, connections, DEFAULT_DB_ALIAS, IntegrityError
from django.utils.functional import cached_property
from django.utils.timezone import now
@ -71,6 +71,26 @@ class DatabaseChannelBackend(BaseChannelBackend):
editor.create_model(Group)
return Group
@cached_property
def lock_model(self):
"""
Initialises a new model to store groups; not done as part of a
models.py as we don't want to make it for most installs.
"""
# Make the model class
class Lock(models.Model):
channel = models.CharField(max_length=200, unique=True)
expiry = models.DateTimeField(db_index=True)
class Meta:
apps = Apps()
app_label = "channels"
db_table = "django_channel_locks"
# Ensure its table exists
if Lock._meta.db_table not in self.connection.introspection.table_names(self.connection.cursor()):
with self.connection.schema_editor() as editor:
editor.create_model(Lock)
return Lock
def send(self, channel, message):
self.channel_model.objects.create(
channel = channel,
@ -97,6 +117,7 @@ class DatabaseChannelBackend(BaseChannelBackend):
# Include a 10-second grace period because that solves some clock sync
self.channel_model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete()
self.group_model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete()
self.lock_model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete()
def group_add(self, group, channel, expiry=None):
"""
@ -123,5 +144,27 @@ class DatabaseChannelBackend(BaseChannelBackend):
self._clean_expired()
return list(self.group_model.objects.filter(group=group).values_list("channel", flat=True))
def lock_channel(self, channel, expiry=None):
"""
Attempts to get a lock on the named channel. Returns True if lock
obtained, False if lock not obtained.
"""
# We rely on the UNIQUE constraint for only-one-thread-wins on locks
try:
self.lock_model.objects.create(
channel = channel,
expiry = now() + datetime.timedelta(seconds=expiry or self.expiry),
)
except IntegrityError:
return False
else:
return True
def unlock_channel(self, channel):
"""
Unlocks the named channel. Always succeeds.
"""
self.lock_model.objects.filter(channel=channel).delete()
def __str__(self):
return "%s(alias=%s)" % (self.__class__.__name__, self.connection.alias)

View File

@ -5,6 +5,7 @@ from .base import BaseChannelBackend
queues = {}
groups = {}
locks = set()
class InMemoryChannelBackend(BaseChannelBackend):
"""
@ -72,3 +73,22 @@ class InMemoryChannelBackend(BaseChannelBackend):
"""
self._clean_expired()
return groups.get(group, {}).keys()
def lock_channel(self, channel):
"""
Attempts to get a lock on the named channel. Returns True if lock
obtained, False if lock not obtained.
"""
# Probably not perfect for race conditions, but close enough considering
# it shouldn't be used.
if channel not in locks:
locks.add(channel)
return True
else:
return False
def unlock_channel(self, channel):
"""
Unlocks the named channel. Always succeeds.
"""
locks.discard(channel)

View File

@ -101,5 +101,20 @@ class RedisChannelBackend(BaseChannelBackend):
# TODO: send_group efficient implementation using Lua
def lock_channel(self, channel, expiry=None):
"""
Attempts to get a lock on the named channel. Returns True if lock
obtained, False if lock not obtained.
"""
key = "%s:lock:%s" % (self.prefix, channel)
return bool(self.connection.setnx(key, "1"))
def unlock_channel(self, channel):
"""
Unlocks the named channel. Always succeeds.
"""
key = "%s:lock:%s" % (self.prefix, channel)
self.connection.delete(key)
def __str__(self):
return "%s(host=%s, port=%s)" % (self.__class__.__name__, self.host, self.port)

View File

@ -3,16 +3,77 @@ import hashlib
from importlib import import_module
from django.conf import settings
from django.utils import six
from django.contrib import auth
from channels import channel_backends, DEFAULT_CHANNEL_BACKEND
def linearize(func):
"""
Makes sure the contained consumer does not run at the same time other
consumers are running on messages with the same reply_channel.
Required if you don't want weird things like a second consumer starting
up before the first has exited and saved its session. Doesn't guarantee
ordering, just linearity.
"""
@functools.wraps(func)
def inner(message, *args, **kwargs):
# Make sure there's a reply channel
if not message.reply_channel:
raise ValueError("No reply_channel sent to consumer; @no_overlap can only be used on messages containing it.")
# Get the lock, or re-queue
locked = message.channel_backend.lock_channel(message.reply_channel)
if not locked:
raise message.Requeue()
# OK, keep going
try:
return func(message, *args, **kwargs)
finally:
message.channel_backend.unlock_channel(message.reply_channel)
return inner
def channel_session(func):
"""
Provides a session-like object called "channel_session" to consumers
as a message attribute that will auto-persist across consumers with
the same incoming "reply_channel" value.
Use this to persist data across the lifetime of a connection.
"""
@functools.wraps(func)
def inner(message, *args, **kwargs):
# Make sure there's a reply_channel
if not message.reply_channel:
raise ValueError("No reply_channel sent to consumer; @channel_session can only be used on messages containing it.")
# Make sure there's NOT a channel_session already
if hasattr(message, "channel_session"):
raise ValueError("channel_session decorator wrapped inside another channel_session decorator")
# Turn the reply_channel into a valid session key length thing.
# We take the last 24 bytes verbatim, as these are the random section,
# and then hash the remaining ones onto the start, and add a prefix
reply_name = message.reply_channel.name
session_key = "skt" + hashlib.md5(reply_name[:-24]).hexdigest()[:8] + reply_name[-24:]
# Make a session storage
session_engine = import_module(settings.SESSION_ENGINE)
session = session_engine.SessionStore(session_key=session_key)
# If the session does not already exist, save to force our
# session key to be valid.
if not session.exists(session.session_key):
session.save(must_create=True)
message.channel_session = session
# Run the consumer
try:
return func(message, *args, **kwargs)
finally:
# Persist session if needed
if session.modified:
session.save()
return inner
def http_session(func):
"""
Wraps a HTTP or WebSocket consumer (or any consumer of messages
that provides a "COOKIES" or "GET" attribute) to provide a "session"
Wraps a HTTP or WebSocket connect consumer (or any consumer of messages
that provides a "cooikies" or "get" attribute) to provide a "http_session"
attribute that behaves like request.session; that is, it's hung off of
a per-user session key that is saved in a cookie or passed as the
"session_key" GET parameter.
@ -21,13 +82,16 @@ def http_session(func):
don't have one - that's what SessionMiddleware is for, this is a simpler
read-only version for more low-level code.
If a user does not have a session we can inflate, the "session" attribute will
be None, rather than an empty session you can write to.
If a message does not have a session we can inflate, the "session" attribute
will be None, rather than an empty session you can write to.
"""
@functools.wraps(func)
def inner(message, *args, **kwargs):
if "cookies" not in message.content and "get" not in message.content:
raise ValueError("No cookies or get sent to consumer; this decorator can only be used on messages containing at least one.")
raise ValueError("No cookies or get sent to consumer - cannot initialise http_session")
# Make sure there's NOT a http_session already
if hasattr(message, "http_session"):
raise ValueError("http_session decorator wrapped inside another http_session decorator")
# Make sure there's a session key
session_key = None
if "get" in message.content:
@ -43,7 +107,7 @@ def http_session(func):
session = session_engine.SessionStore(session_key=session_key)
else:
session = None
message.session = session
message.http_session = session
# Run the consumer
result = func(message, *args, **kwargs)
# Persist session if needed (won't be saved if error happens)
@ -51,65 +115,3 @@ def http_session(func):
session.save()
return result
return inner
def http_django_auth(func):
"""
Wraps a HTTP or WebSocket consumer (or any consumer of messages
that provides a "COOKIES" attribute) to provide both a "session"
attribute and a "user" attibute, like AuthMiddleware does.
This runs http_session() to get a session to hook auth off of.
If the user does not have a session cookie set, both "session"
and "user" will be None.
"""
@http_session
@functools.wraps(func)
def inner(message, *args, **kwargs):
# If we didn't get a session, then we don't get a user
if not hasattr(message, "session"):
raise ValueError("Did not see a session to get auth from")
if message.session is None:
message.user = None
# Otherwise, be a bit naughty and make a fake Request with just
# a "session" attribute (later on, perhaps refactor contrib.auth to
# pass around session rather than request)
else:
fake_request = type("FakeRequest", (object, ), {"session": message.session})
message.user = auth.get_user(fake_request)
# Run the consumer
return func(message, *args, **kwargs)
return inner
def channel_session(func):
"""
Provides a session-like object called "channel_session" to consumers
as a message attribute that will auto-persist across consumers with
the same incoming "reply_channel" value.
"""
@functools.wraps(func)
def inner(message, *args, **kwargs):
# Make sure there's a reply_channel in kwargs
if not message.reply_channel:
raise ValueError("No reply_channel sent to consumer; this decorator can only be used on messages containing it.")
# Turn the reply_channel into a valid session key length thing.
# We take the last 24 bytes verbatim, as these are the random section,
# and then hash the remaining ones onto the start, and add a prefix
# TODO: See if there's a better way of doing this
reply_name = message.reply_channel.name
session_key = "skt" + hashlib.md5(reply_name[:-24]).hexdigest()[:8] + reply_name[-24:]
# Make a session storage
session_engine = import_module(settings.SESSION_ENGINE)
session = session_engine.SessionStore(session_key=session_key)
# If the session does not already exist, save to force our session key to be valid
if not session.exists(session.session_key):
session.save()
message.channel_session = session
# Run the consumer
result = func(message, *args, **kwargs)
# Persist session if needed (won't be saved if error happens)
if session.modified:
session.save()
return result
return inner

View File

@ -0,0 +1,69 @@
import asyncio
import time
from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory
from .websocket_autobahn import get_protocol, get_factory
class WebsocketAsyncioInterface(object):
"""
Easy API to run a WebSocket interface server using Twisted.
Integrates the channel backend by running it in a separate thread, using
the always-compatible polling style.
"""
def __init__(self, channel_backend, port=9000):
self.channel_backend = channel_backend
self.port = port
def run(self):
self.factory = get_factory(WebSocketServerFactory)("ws://0.0.0.0:%i" % self.port, debug=False)
self.factory.protocol = get_protocol(WebSocketServerProtocol)
self.loop = asyncio.get_event_loop()
coro = self.loop.create_server(self.factory, '0.0.0.0', 9000)
server = self.loop.run_until_complete(coro)
self.loop.run_in_executor(None, self.backend_reader)
self.loop.call_later(1, self.keepalive_sender)
try:
self.loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
self.loop.close()
def backend_reader(self):
"""
Run in a separate thread; reads messages from the backend.
"""
# Wait for main loop to start
time.sleep(0.5)
while True:
channels = self.factory.reply_channels()
# Quit if reactor is stopping
if not self.loop.is_running():
return
# Don't do anything if there's no channels to listen on
if channels:
channel, message = self.channel_backend.receive_many(channels)
else:
time.sleep(0.1)
continue
# Wait around if there's nothing received
if channel is None:
time.sleep(0.05)
continue
# Deal with the message
self.factory.dispatch_send(channel, message)
def keepalive_sender(self):
"""
Sends keepalive messages for open WebSockets every
(channel_backend expiry / 2) seconds.
"""
expiry_window = int(self.channel_backend.expiry / 2)
for protocol in self.factory.protocols.values():
if time.time() - protocol.last_keepalive > expiry_window:
protocol.sendKeepalive()
self.loop.call_later(1, self.keepalive_sender)

View File

@ -0,0 +1,101 @@
import time
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND
def get_protocol(base):
class InterfaceProtocol(base):
"""
Protocol which supports WebSockets and forwards incoming messages to
the websocket channels.
"""
def onConnect(self, request):
self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
self.request_info = {
"path": request.path,
"get": request.params,
}
def onOpen(self):
# Make sending channel
self.reply_channel = Channel.new_name("!websocket.send")
self.request_info["reply_channel"] = self.reply_channel
self.last_keepalive = time.time()
self.factory.protocols[self.reply_channel] = self
# Send news that this channel is open
Channel("websocket.connect").send(self.request_info)
def onMessage(self, payload, isBinary):
if isBinary:
Channel("websocket.receive").send({
"reply_channel": self.reply_channel,
"content": payload,
"binary": True,
})
else:
Channel("websocket.receive").send({
"reply_channel": self.reply_channel,
"content": payload.decode("utf8"),
"binary": False,
})
def serverSend(self, content, binary=False, **kwargs):
"""
Server-side channel message to send a message.
"""
if binary:
self.sendMessage(content, binary)
else:
self.sendMessage(content.encode("utf8"), binary)
def serverClose(self):
"""
Server-side channel message to close the socket
"""
self.sendClose()
def onClose(self, wasClean, code, reason):
if hasattr(self, "reply_channel"):
del self.factory.protocols[self.reply_channel]
Channel("websocket.disconnect").send({
"reply_channel": self.reply_channel,
})
def sendKeepalive(self):
"""
Sends a keepalive packet on the keepalive channel.
"""
Channel("websocket.keepalive").send({
"reply_channel": self.reply_channel,
})
self.last_keepalive = time.time()
return InterfaceProtocol
def get_factory(base):
class InterfaceFactory(base):
"""
Factory which keeps track of its open protocols' receive channels
and can dispatch to them.
"""
# TODO: Clean up dead protocols if needed?
def __init__(self, *args, **kwargs):
super(InterfaceFactory, self).__init__(*args, **kwargs)
self.protocols = {}
def reply_channels(self):
return self.protocols.keys()
def dispatch_send(self, channel, message):
if message.get("close", False):
self.protocols[channel].serverClose()
else:
self.protocols[channel].serverSend(**message)
return InterfaceFactory

View File

@ -1,97 +1,9 @@
import django
import time
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
from collections import deque
from twisted.internet import reactor
from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND
class InterfaceProtocol(WebSocketServerProtocol):
"""
Protocol which supports WebSockets and forwards incoming messages to
the websocket channels.
"""
def onConnect(self, request):
self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND]
self.request_info = {
"path": request.path,
"GET": request.params,
}
def onOpen(self):
# Make sending channel
self.reply_channel = Channel.new_name("!websocket.send")
self.request_info["reply_channel"] = self.reply_channel
self.last_keepalive = time.time()
self.factory.protocols[self.reply_channel] = self
# Send news that this channel is open
Channel("websocket.connect").send(self.request_info)
def onMessage(self, payload, isBinary):
if isBinary:
Channel("websocket.receive").send(dict(
self.request_info,
content = payload,
binary = True,
))
else:
Channel("websocket.receive").send(dict(
self.request_info,
content = payload.decode("utf8"),
binary = False,
))
def serverSend(self, content, binary=False, **kwargs):
"""
Server-side channel message to send a message.
"""
if binary:
self.sendMessage(content, binary)
else:
self.sendMessage(content.encode("utf8"), binary)
def serverClose(self):
"""
Server-side channel message to close the socket
"""
self.sendClose()
def onClose(self, wasClean, code, reason):
if hasattr(self, "reply_channel"):
del self.factory.protocols[self.reply_channel]
Channel("websocket.disconnect").send(self.request_info)
def sendKeepalive(self):
"""
Sends a keepalive packet on the keepalive channel.
"""
Channel("websocket.keepalive").send(self.request_info)
self.last_keepalive = time.time()
class InterfaceFactory(WebSocketServerFactory):
"""
Factory which keeps track of its open protocols' receive channels
and can dispatch to them.
"""
# TODO: Clean up dead protocols if needed?
def __init__(self, *args, **kwargs):
super(InterfaceFactory, self).__init__(*args, **kwargs)
self.protocols = {}
def reply_channels(self):
return self.protocols.keys()
def dispatch_send(self, channel, message):
if message.get("close", False):
self.protocols[channel].serverClose()
else:
self.protocols[channel].serverSend(**message)
from .websocket_autobahn import get_protocol, get_factory
class WebsocketTwistedInterface(object):
@ -106,8 +18,8 @@ class WebsocketTwistedInterface(object):
self.port = port
def run(self):
self.factory = InterfaceFactory("ws://0.0.0.0:%i" % self.port, debug=False)
self.factory.protocol = InterfaceProtocol
self.factory = get_factory(WebSocketServerFactory)("ws://0.0.0.0:%i" % self.port, debug=False)
self.factory.protocol = get_protocol(WebSocketServerProtocol)
reactor.listenTCP(self.port, self.factory)
reactor.callInThread(self.backend_reader)
reactor.callLater(1, self.keepalive_sender)

View File

@ -1,7 +1,6 @@
import time
from django.core.management import BaseCommand, CommandError
from channels import channel_backends, DEFAULT_CHANNEL_BACKEND
from channels.interfaces.websocket_twisted import WebsocketTwistedInterface
class Command(BaseCommand):
@ -20,7 +19,17 @@ class Command(BaseCommand):
)
# Run the interface
port = options.get("port", None) or 9000
self.stdout.write("Running Twisted/Autobahn WebSocket interface server")
self.stdout.write(" Channel backend: %s" % channel_backend)
self.stdout.write(" Listening on: ws://0.0.0.0:%i" % port)
WebsocketTwistedInterface(channel_backend=channel_backend, port=port).run()
try:
import asyncio
except ImportError:
from channels.interfaces.websocket_twisted import WebsocketTwistedInterface
self.stdout.write("Running Twisted/Autobahn WebSocket interface server")
self.stdout.write(" Channel backend: %s" % channel_backend)
self.stdout.write(" Listening on: ws://0.0.0.0:%i" % port)
WebsocketTwistedInterface(channel_backend=channel_backend, port=port).run()
else:
from channels.interfaces.websocket_asyncio import WebsocketAsyncioInterface
self.stdout.write("Running asyncio/Autobahn WebSocket interface server")
self.stdout.write(" Channel backend: %s" % channel_backend)
self.stdout.write(" Listening on: ws://0.0.0.0:%i" % port)
WebsocketAsyncioInterface(channel_backend=channel_backend, port=port).run()

View File

@ -10,6 +10,13 @@ class Message(object):
to use to reply to this message's end user, if that makes sense.
"""
class Requeue(Exception):
"""
Raise this while processing a message to requeue it back onto the
channel. Useful if you're manually ensuring partial ordering, etc.
"""
pass
def __init__(self, content, channel, channel_backend, reply_channel=None):
self.content = content
self.channel = channel

View File

@ -10,7 +10,7 @@ class MemoryBackendTests(TestCase):
backend_class = InMemoryChannelBackend
def setUp(self):
self.backend = self.backend_class()
self.backend = self.backend_class(routing={})
def test_send_recv(self):
"""
@ -33,7 +33,7 @@ class MemoryBackendTests(TestCase):
self.assertEqual(message, {"value": "red"})
def test_message_expiry(self):
self.backend = self.backend_class(expiry=-100)
self.backend = self.backend_class(routing={}, expiry=-100)
self.backend.send("test", {"value": "blue"})
channel, message = self.backend.receive_many(["test"])
self.assertIs(channel, None)
@ -72,7 +72,7 @@ class MemoryBackendTests(TestCase):
self.assertEqual(message, {"value": "orange"})
def test_group_expiry(self):
self.backend = self.backend_class(expiry=-100)
self.backend = self.backend_class(routing={}, expiry=-100)
self.backend.group_add("tgroup", "test")
self.backend.group_add("tgroup", "test2")
self.assertEqual(

View File

@ -1,5 +1,6 @@
import traceback
from .message import Message
from .utils import name_that_thing
class Worker(object):
@ -31,5 +32,8 @@ class Worker(object):
self.callback(channel, message)
try:
consumer(message)
except Message.Requeue:
self.channel_backend.send(channel, content)
except:
print("Error processing message with consumer {}:".format(name_that_thing(consumer)))
traceback.print_exc()

View File

@ -8,6 +8,13 @@ you wish; the API is very simple and documented below.
In-memory
---------
The in-memory backend is the simplest, and not really a backend as such;
it exists purely to enable Django to run in a "normal" mode where no Channels
functionality is available, just normal HTTP request processing. You should
never need to set it explicitly.
This backend provides no network transparency or non-blocking guarantees.
Database
--------

View File

@ -46,7 +46,7 @@ The channels have capacity, so a load of producers can write lots of messages
into a channel with no consumers and then a consumer can come along later and
will start getting served those queued messages.
If you've used channels in Go, these are reasonably similar to those. The key
If you've used `channels in Go <https://gobyexample.com/channels>`_, these are reasonably similar to those. The key
difference is that these channels are network-transparent; the implementations
of channels we provide are all accessible across a network to consumers
and producers running in different processes or on different machines.
@ -65,9 +65,9 @@ you can write a function to consume a channel, like so::
def my_consumer(message):
pass
And then assign a channel to it like this in the channel backend settings::
And then assign a channel to it like this in the channel routing::
"ROUTING": {
channel_routing = {
"some-channel": "myapp.consumers.my_consumer",
}
@ -161,7 +161,7 @@ and be less than 200 characters long.
It's optional for a backend implementation to understand this - after all,
it's only important at scale, where you want to shard the two types differently
- but it's present nonetheless. For more on scaling, and how to handle channel
but it's present nonetheless. For more on scaling, and how to handle channel
types if you're writing a backend or interface server, read :doc:`scaling`.
Groups

View File

@ -44,21 +44,25 @@ For now, we want to override the *channel routing* so that, rather than going
to the URL resolver and our normal view stack, all HTTP requests go to our
custom consumer we wrote above. Here's what that looks like::
# In settings.py
CHANNEL_BACKENDS = {
"default": {
"BACKEND": "channels.backends.database.DatabaseChannelBackend",
"ROUTING": {
"http.request": "myproject.myapp.consumers.http_consumer",
},
"ROUTING": "myproject.routing.channel_routing",
},
}
# In routing.py
channel_routing = {
"http.request": "myproject.myapp.consumers.http_consumer",
}
As you can see, this is a little like Django's ``DATABASES`` setting; there are
named channel backends, with a default one called ``default``. Each backend
needs a class specified which powers it - we'll come to the options there later -
and a routing scheme, which can either be defined directly as a dict or as
a string pointing to a dict in another file (if you'd rather keep it outside
settings).
and a routing scheme, which points to a dict containing the routing settings.
It's recommended you call this ``routing.py`` and put it alongside ``urls.py``
in your project.
If you start up ``python manage.py runserver`` and go to
``http://localhost:8000``, you'll see that, rather than a default Django page,
@ -71,18 +75,15 @@ do any time. Let's try some WebSockets, and make a basic chat server!
Delete that consumer and its routing - we'll want the normal Django view layer to
serve HTTP requests from now on - and make this WebSocket consumer instead::
from channels import Group
def ws_add(message):
Group("chat").add(message.reply_channel)
Hook it up to the ``websocket.connect`` channel like this::
CHANNEL_BACKENDS = {
"default": {
"BACKEND": "channels.backends.database.DatabaseChannelBackend",
"ROUTING": {
"websocket.connect": "myproject.myapp.consumers.ws_add",
},
},
channel_routing = {
"websocket.connect": "myproject.myapp.consumers.ws_add",
}
Now, let's look at what this is doing. It's tied to the
@ -105,6 +106,8 @@ so we can hook that up to re-add the channel (it's safe to add the channel to
a group it's already in - similarly, it's safe to discard a channel from a
group it's not in)::
from channels import Group
# Connected to websocket.keepalive
def ws_keepalive(message):
Group("chat").add(message.reply_channel)
@ -112,17 +115,17 @@ group it's not in)::
Of course, this is exactly the same code as the ``connect`` handler, so let's
just route both channels to the same consumer::
...
"ROUTING": {
channel_routing = {
"websocket.connect": "myproject.myapp.consumers.ws_add",
"websocket.keepalive": "myproject.myapp.consumers.ws_add",
},
...
}
And, even though channels will expire out, let's add an explicit ``disconnect``
handler to clean up as people disconnect (most channels will cleanly disconnect
and get this called)::
from channels import Group
# Connected to websocket.disconnect
def ws_disconnect(message):
Group("chat").discard(message.reply_channel)
@ -132,7 +135,7 @@ Now, that's taken care of adding and removing WebSocket send channels for the
we're not going to store a history of messages or anything and just replay
any message sent in to all connected clients. Here's all the code::
from channels import Channel, Group
from channels import Group
# Connected to websocket.connect and websocket.keepalive
def ws_add(message):
@ -146,18 +149,13 @@ any message sent in to all connected clients. Here's all the code::
def ws_disconnect(message):
Group("chat").discard(message.reply_channel)
And what our routing should look like in ``settings.py``::
And what our routing should look like in ``routing.py``::
CHANNEL_BACKENDS = {
"default": {
"BACKEND": "channels.backends.database.DatabaseChannelBackend",
"ROUTING": {
"websocket.connect": "myproject.myapp.consumers.ws_add",
"websocket.keepalive": "myproject.myapp.consumers.ws_add",
"websocket.receive": "myproject.myapp.consumers.ws_message",
"websocket.disconnect": "myproject.myapp.consumers.ws_disconnect",
},
},
channel_routing = {
"websocket.connect": "myproject.myapp.consumers.ws_add",
"websocket.keepalive": "myproject.myapp.consumers.ws_add",
"websocket.receive": "myproject.myapp.consumers.ws_message",
"websocket.disconnect": "myproject.myapp.consumers.ws_disconnect",
}
With all that code in your ``consumers.py`` file, you now have a working
@ -229,107 +227,33 @@ like, so you can understand when they're called. If you run three or four
copies of ``runworker`` you'll probably be able to see the tasks running
on different workers.
Authentication
--------------
Now, of course, a WebSocket solution is somewhat limited in scope without the
ability to live with the rest of your website - in particular, we want to make
sure we know what user we're talking to, in case we have things like private
chat channels (we don't want a solution where clients just ask for the right
channels, as anyone could change the code and just put in private channel names)
It can also save you having to manually make clients ask for what they want to
see; if I see you open a WebSocket to my "updates" endpoint, and I know which
user ID, I can just auto-add that channel to all the relevant groups (mentions
of that user, for example).
Handily, as WebSockets start off using the HTTP protocol, they have a lot of
familiar features, including a path, GET parameters, and cookies. We'd like to
use these to hook into the familiar Django session and authentication systems;
after all, WebSockets are no good unless we can identify who they belong to
and do things securely.
In addition, we don't want the interface servers storing data or trying to run
authentication; they're meant to be simple, lean, fast processes without much
state, and so we'll need to do our authentication inside our consumer functions.
Fortunately, because Channels has standardised WebSocket event
:doc:`message-standards`, it ships with decorators that help you with
authentication, as well as using Django's session framework (which authentication
relies on). Channels can use Django sessions either from cookies (if you're running your websocket
server on the same port as your main site, which requires a reverse proxy that
understands WebSockets), or from a ``session_key`` GET parameter, which
is much more portable, and works in development where you need to run a separate
WebSocket server (by default, on port 9000).
All we need to do is add the ``django_http_auth`` decorator to our views,
and we'll get extra ``session`` and ``user`` keyword attributes on ``message`` we can use;
let's make one where users can only chat to people with the same first letter
of their username::
from channels import Channel, Group
from channels.decorators import django_http_auth
@django_http_auth
def ws_add(message):
Group("chat-%s" % message.user.username[0]).add(message.reply_channel)
@django_http_auth
def ws_message(message):
Group("chat-%s" % message.user.username[0]).send(message.content)
@django_http_auth
def ws_disconnect(message):
Group("chat-%s" % message.user.username[0]).discard(message.reply_channel)
Now, when we connect to the WebSocket we'll have to remember to provide the
Django session ID as part of the URL, like this::
socket = new WebSocket("ws://127.0.0.1:9000/?session_key=abcdefg");
You can get the current session key in a template with ``{{ request.session.session_key }}``.
Note that Channels can't work with signed cookie sessions - since only HTTP
responses can set cookies, it needs a backend it can write to separately to
store state.
Persisting Data
---------------
Doing chatrooms by username first letter is a nice simple example, but it's
Echoing messages is a nice simple example, but it's
skirting around the real design pattern - persistent state for connections.
A user may open our chat site and select the chatroom to join themselves, so we
should let them send this request in the initial WebSocket connection,
check they're allowed to access it, and then remember which room a socket is
connected to when they send a message in so we know which group to send it to.
Let's consider a basic chat site where a user requests a chat room upon initial
connection, as part of the query string (e.g. ``http://host/websocket?room=abc``).
The ``reply_channel`` is our unique pointer to the open WebSocket - as you've
seen, we do all our operations on it - but it's not something we can annotate
with data; it's just a simple string, and even if we hack around and set
attributes on it that's not going to carry over to other workers.
The ``reply_channel`` attribute you've seen before is our unique pointer to the
open WebSocket - because it varies between different clients, it's how we can
keep track of "who" a message is from. Remember, Channels is network-trasparent
and can run on multiple workers, so you can't just store things locally in
global variables or similar.
Instead, the solution is to persist information keyed by the send channel in
Instead, the solution is to persist information keyed by the ``reply_channel`` in
some other data store - sound familiar? This is what Django's session framework
does for HTTP requests, only there it uses cookies as the lookup key rather
than the ``reply_channel``.
Now, as you saw above, you can use the ``django_http_auth`` decorator to get
both a ``user`` and a ``session`` attribute on your message - and,
indeed, there is a ``http_session`` decorator that will just give you
the ``session`` attribute.
However, that session is based on cookies, and so follows the user round the
site - it's great for information that should persist across all WebSocket and
HTTP connections, but not great for information that is specific to a single
WebSocket (such as "which chatroom should this socket be connected to"). For
this reason, Channels also provides a ``channel_session`` decorator,
which adds a ``channel_session`` attribute to the message; this works just like
the normal ``session`` attribute, and persists to the same storage, but varies
per-channel rather than per-cookie.
Channels provides a ``channel_session`` decorator for this purpose - it
provides you with an attribute called ``message.channel_session`` that acts
just like a normal Django session.
Let's use it now to build a chat server that expects you to pass a chatroom
name in the path of your WebSocket request (we'll ignore auth for now)::
name in the path of your WebSocket request (we'll ignore auth for now - that's next)::
from channels import Channel
from channels import Group
from channels.decorators import channel_session
# Connected to websocket.connect
@ -349,7 +273,7 @@ name in the path of your WebSocket request (we'll ignore auth for now)::
# Connected to websocket.receive
@channel_session
def ws_message(message):
Group("chat-%s" % message.channel_session['room']).send(content)
Group("chat-%s" % message.channel_session['room']).send(message.content)
# Connected to websocket.disconnect
@channel_session
@ -358,9 +282,102 @@ name in the path of your WebSocket request (we'll ignore auth for now)::
If you play around with it from the console (or start building a simple
JavaScript chat client that appends received messages to a div), you'll see
that you can now request which chat room you want in the initial request. We
could easily add in the auth decorator here too and do an initial check in
``connect`` that the user had permission to join that chatroom.
that you can now request which chat room you want in the initial request.
Authentication
--------------
Now, of course, a WebSocket solution is somewhat limited in scope without the
ability to live with the rest of your website - in particular, we want to make
sure we know what user we're talking to, in case we have things like private
chat channels (we don't want a solution where clients just ask for the right
channels, as anyone could change the code and just put in private channel names)
It can also save you having to manually make clients ask for what they want to
see; if I see you open a WebSocket to my "updates" endpoint, and I know which
user you are, I can just auto-add that channel to all the relevant groups (mentions
of that user, for example).
Handily, as WebSockets start off using the HTTP protocol, they have a lot of
familiar features, including a path, GET parameters, and cookies. We'd like to
use these to hook into the familiar Django session and authentication systems;
after all, WebSockets are no good unless we can identify who they belong to
and do things securely.
In addition, we don't want the interface servers storing data or trying to run
authentication; they're meant to be simple, lean, fast processes without much
state, and so we'll need to do our authentication inside our consumer functions.
Fortunately, because Channels has standardised WebSocket event
:doc:`message-standards`, it ships with decorators that help you with
both authentication and getting the underlying Django session (which is what
Django authentication relies on).
Channels can use Django sessions either from cookies (if you're running your websocket
server on the same port as your main site, which requires a reverse proxy that
understands WebSockets), or from a ``session_key`` GET parameter, which
is much more portable, and works in development where you need to run a separate
WebSocket server (by default, on port 9000).
You get access to a user's normal Django session using the ``http_session``
decorator - that gives you a ``message.http_session`` attribute that behaves
just like ``request.session``. You can go one further and use ``http_session_user``
which will provide a ``message.user`` attribute as well as the session attribute.
Now, one thing to note is that you only get the detailed HTTP information
during the ``connect`` message of a WebSocket connection (you can read more
about what you get when in :doc:`message-standards`) - this means we're not
wasting bandwidth sending the same information over the wire needlessly.
This also means we'll have to grab the user in the connection handler and then
store it in the session; thankfully, Channels ships with both a ``channel_session_user``
decorator that works like the ``http_session_user`` decorator you saw above but
loads the user from the *channel* session rather than the *HTTP* session,
and a function called ``transfer_user`` which replicates a user from one session
to another.
Bringing that all together, let's make a chat server one where users can only
chat to people with the same first letter of their username::
from channels import Channel, Group
from channels.decorators import channel_session
from channels.auth import http_session_user, channel_session_user, transfer_user
# Connected to websocket.connect
@channel_session
@http_session_user
def ws_add(message):
# Copy user from HTTP to channel session
transfer_user(message.http_session, message.channel_session)
# Add them to the right group
Group("chat-%s" % message.user.username[0]).add(message.reply_channel)
# Connected to websocket.keepalive
@channel_session_user
def ws_keepalive(message):
# Keep them in the right group
Group("chat-%s" % message.user.username[0]).add(message.reply_channel)
# Connected to websocket.receive
@channel_session_user
def ws_message(message):
Group("chat-%s" % message.user.username[0]).send(message.content)
# Connected to websocket.disconnect
@channel_session_user
def ws_disconnect(message):
Group("chat-%s" % message.user.username[0]).discard(message.reply_channel)
Now, when we connect to the WebSocket we'll have to remember to provide the
Django session ID as part of the URL, like this::
socket = new WebSocket("ws://127.0.0.1:9000/?session_key=abcdefg");
You can get the current session key in a template with ``{{ request.session.session_key }}``.
Note that Channels can't work with signed cookie sessions - since only HTTP
responses can set cookies, it needs a backend it can write to separately to
store state.
Models
------
@ -434,6 +451,72 @@ command run via ``cron``. If we wanted to write a bot, too, we could put its
listening logic inside the ``chat-messages`` consumer, as every message would
pass through it.
Linearization
-------------
There's one final concept we want to introduce you to before you go on to build
sites with Channels - linearizing consumers.
Because Channels is a distributed system that can have many workers, by default
it's entirely feasible for a WebSocket interface server to send out a ``connect``
and a ``receive`` message close enough together that a second worker will pick
up and start processing the ``receive`` message before the first worker has
finished processing the ``connect`` worker.
This is particularly annoying if you're storing things in the session in the
``connect`` consumer and trying to get them in the ``receive`` consumer - because
the ``connect`` consumer hasn't exited, its session hasn't saved. You'd get the
same effect if someone tried to request a view before the login view had finished
processing, but there you're not expecting that page to run after the login,
whereas you'd naturally expect ``receive`` to run after ``connect``.
But, of course, Channels has a solution - the ``linearize`` decorator. Any
handler decorated with this will use locking to ensure it does not run at the
same time as any other view with ``linearize`` **on messages with the same reply channel**.
That means your site will happily mutitask with lots of different people's messages,
but if two happen to try to run at the same time for the same client, they'll
be deconflicted.
There's a small cost to using ``linearize``, which is why it's an optional
decorator, but generally you'll want to use it for most session-based WebSocket
and other "continuous protocol" things. Here's an example, improving our
first-letter-of-username chat from earlier::
from channels import Channel, Group
from channels.decorators import channel_session, linearize
from channels.auth import http_session_user, channel_session_user, transfer_user
# Connected to websocket.connect
@linearize
@channel_session
@http_session_user
def ws_add(message):
# Copy user from HTTP to channel session
transfer_user(message.http_session, message.channel_session)
# Add them to the right group
Group("chat-%s" % message.user.username[0]).add(message.reply_channel)
# Connected to websocket.keepalive
# We don't linearize as we know this will happen a decent time after add
@channel_session_user
def ws_keepalive(message):
# Keep them in the right group
Group("chat-%s" % message.user.username[0]).add(message.reply_channel)
# Connected to websocket.receive
@linearize
@channel_session_user
def ws_message(message):
Group("chat-%s" % message.user.username[0]).send(message.content)
# Connected to websocket.disconnect
# We don't linearize as even if this gets an empty session, the group
# will auto-discard after the expiry anyway.
@channel_session_user
def ws_disconnect(message):
Group("chat-%s" % message.user.username[0]).discard(message.reply_channel)
Next Steps
----------

View File

@ -31,3 +31,4 @@ Contents:
scaling
backends
faqs
releases/index

View File

@ -11,8 +11,7 @@ In addition to the standards outlined below, each message may contain a
separate connection and data receiving messages (like WebSockets) will only
contain the connection and detailed client information in the first message;
use the ``@channel_session`` decorator to persist this data to consumers of
the received data (the decorator will take care of handling persistence and
ordering guarantees on messages).
the received data (or something else based on ``reply_channel``).
All messages must be able to be encoded as JSON; channel backends don't
necessarily have to use JSON, but we consider it the lowest common denominator

22
docs/releases/0.8.rst Normal file
View File

@ -0,0 +1,22 @@
0.8 (2015-09-10)
----------------
This release reworks a few of the core concepts to make the channel layer
more efficient and user friendly:
* Channel names now do not start with ``django``, and are instead just ``http.request``, etc.
* HTTP headers/GET/etc are only sent with ``websocket.connect`` rather than all websocket requests,
to save a lot of bandwidth in the channel layer.
* The session/user decorators were renamed, and a ``@channel_session_user`` and ``transfer_user`` set of functions
added to allow moving the user details from the HTTP session to the channel session in the ``connect`` consumer.
* A ``@linearize`` decorator was added to help ensure a ``connect``/``receive`` pair are not executed
simultanously on two different workers.
* Channel backends gained locking mechanisms to support the ``linearize`` feature.
* ``runwsserver`` will use asyncio rather than Twisted if it's available.
* Message formats have been made a bit more consistent.

7
docs/releases/index.rst Normal file
View File

@ -0,0 +1,7 @@
Release Notes
-------------
.. toctree::
:maxdepth: 1
0.8

View File

@ -2,7 +2,7 @@ from setuptools import find_packages, setup
setup(
name='channels',
version="0.7",
version="0.8",
url='http://github.com/andrewgodwin/django-channels',
author='Andrew Godwin',
author_email='andrew@aeracode.org',
@ -10,4 +10,7 @@ setup(
license='BSD',
packages=find_packages(),
include_package_data=True,
install_requires=[
'six',
]
)