Channel groups, tests and docs

This commit is contained in:
Andrew Godwin 2015-07-12 20:19:10 -05:00
parent 60f0680ec2
commit aa921b1659
8 changed files with 395 additions and 14 deletions

View File

@ -88,7 +88,7 @@ class BaseChannelBackend(object):
This base class provides a default implementation; can be overridden
to be more efficient by subclasses.
"""
for channel in self.group_channels():
for channel in self.group_channels(group):
self.send(channel, message)
def __str__(self):

View File

@ -27,8 +27,8 @@ class DatabaseChannelBackend(BaseChannelBackend):
"""
return connections[self.db_alias]
@property
def model(self):
@cached_property
def channel_model(self):
"""
Initialises a new model to store messages; not done as part of a
models.py as we don't want to make it for most installs.
@ -49,8 +49,30 @@ class DatabaseChannelBackend(BaseChannelBackend):
editor.create_model(Message)
return Message
@cached_property
def group_model(self):
"""
Initialises a new model to store groups; not done as part of a
models.py as we don't want to make it for most installs.
"""
# Make the model class
class Group(models.Model):
group = models.CharField(max_length=200)
channel = models.CharField(max_length=200)
expiry = models.DateTimeField(db_index=True)
class Meta:
apps = Apps()
app_label = "channels"
db_table = "django_channel_groups"
unique_together = [["group", "channel"]]
# Ensure its table exists
if Group._meta.db_table not in self.connection.introspection.table_names(self.connection.cursor()):
with self.connection.schema_editor() as editor:
editor.create_model(Group)
return Group
def send(self, channel, message):
self.model.objects.create(
self.channel_model.objects.create(
channel = channel,
content = json.dumps(message),
expiry = now() + datetime.timedelta(seconds=self.expiry)
@ -59,15 +81,47 @@ class DatabaseChannelBackend(BaseChannelBackend):
def receive_many(self, channels):
if not channels:
raise ValueError("Cannot receive on empty channel list!")
# Delete all expired messages (add 10 second grace period for clock sync)
self.model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete()
self._clean_expired()
# Get a message from one of our channels
message = self.model.objects.filter(channel__in=channels).order_by("id").first()
message = self.channel_model.objects.filter(channel__in=channels).order_by("id").first()
if message:
self.model.objects.filter(pk=message.pk).delete()
self.channel_model.objects.filter(pk=message.pk).delete()
return message.channel, json.loads(message.content)
else:
return None, None
def _clean_expired(self):
"""
Cleans out expired groups and messages.
"""
# Include a 10-second grace period because that solves some clock sync
self.channel_model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete()
self.group_model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete()
def group_add(self, group, channel, expiry=None):
"""
Adds the channel to the named group for at least 'expiry'
seconds (expiry defaults to message expiry if not provided).
"""
self.group_model.objects.update_or_create(
group = group,
channel = channel,
defaults = {"expiry": now() + datetime.timedelta(seconds=expiry or self.expiry)},
)
def group_discard(self, group, channel):
"""
Removes the channel from the named group if it is in the group;
does nothing otherwise (does not error)
"""
self.group_model.objects.filter(group=group, channel=channel).delete()
def group_channels(self, group):
"""
Returns an iterable of all channels in the group.
"""
self._clean_expired()
return list(self.group_model.objects.filter(group=group).values_list("channel", flat=True))
def __str__(self):
return "%s(alias=%s)" % (self.__class__.__name__, self.connection.alias)

View File

@ -4,6 +4,7 @@ from collections import deque
from .base import BaseChannelBackend
queues = {}
groups = {}
class InMemoryChannelBackend(BaseChannelBackend):
"""
@ -17,18 +18,57 @@ class InMemoryChannelBackend(BaseChannelBackend):
# Try JSON encoding it to make sure it would, but store the native version
json.dumps(message)
# Add to the deque, making it if needs be
queues.setdefault(channel, deque()).append(message)
queues.setdefault(channel, deque()).append((message, time.time() + self.expiry))
def receive_many(self, channels):
if not channels:
raise ValueError("Cannot receive on empty channel list!")
# Try to pop a message from each channel
self._clean_expired()
for channel in channels:
try:
# This doesn't clean up empty channels - OK for testing.
# For later versions, have cleanup w/lock.
return channel, queues[channel].popleft()
return channel, queues[channel].popleft()[0]
except (IndexError, KeyError):
pass
return None, None
def _clean_expired(self):
# Handle expired messages
for channel, messages in queues.items():
while len(messages) and messages[0][1] < time.time():
messages.popleft()
# Handle expired groups
for group, channels in list(groups.items()):
for channel, expiry in list(channels.items()):
if expiry < (time.time() - 10):
try:
del groups[group][channel]
except KeyError:
# Another thread might have got there first
pass
def group_add(self, group, channel, expiry=None):
"""
Adds the channel to the named group for at least 'expiry'
seconds (expiry defaults to message expiry if not provided).
"""
groups.setdefault(group, {})[channel] = time.time() + (expiry or self.expiry)
def group_discard(self, group, channel):
"""
Removes the channel from the named group if it is in the group;
does nothing otherwise (does not error)
"""
try:
del groups[group][channel]
except KeyError:
pass
def group_channels(self, group):
"""
Returns an iterable of all channels in the group.
"""
self._clean_expired()
return groups.get(group, {}).keys()

View File

@ -28,7 +28,7 @@ class RedisChannelBackend(BaseChannelBackend):
def send(self, channel, message):
# Write out message into expiring key (avoids big items in list)
key = uuid.uuid4()
key = self.prefix + uuid.uuid4().get_hex()
self.connection.set(
key,
json.dumps(message),
@ -63,5 +63,41 @@ class RedisChannelBackend(BaseChannelBackend):
else:
return None, None
def group_add(self, group, channel, expiry=None):
"""
Adds the channel to the named group for at least 'expiry'
seconds (expiry defaults to message expiry if not provided).
"""
key = "%s:group:%s" % (self.prefix, group)
self.connection.zadd(
key,
**{channel: time.time() + (expiry or self.expiry)}
)
def group_discard(self, group, channel):
"""
Removes the channel from the named group if it is in the group;
does nothing otherwise (does not error)
"""
key = "%s:group:%s" % (self.prefix, group)
self.connection.zrem(
key,
channel,
)
def group_channels(self, group):
"""
Returns an iterable of all channels in the group.
"""
key = "%s:group:%s" % (self.prefix, group)
# Discard old channels
self.connection.zremrangebyscore(key, 0, int(time.time()) - 10)
# Return current lot
return self.connection.zrange(
key,
0,
-1,
)
def __str__(self):
return "%s(host=%s, port=%s)" % (self.__class__.__name__, self.host, self.port)

View File

@ -54,7 +54,7 @@ class Channel(object):
return view_producer(self.name)
@classmethod
def consumer(self, channels, alias=DEFAULT_CHANNEL_BACKEND):
def consumer(self, *channels, alias=DEFAULT_CHANNEL_BACKEND):
"""
Decorator that registers a function as a consumer.
"""
@ -68,3 +68,29 @@ class Channel(object):
channel_backend.registry.add_consumer(func, channels)
return func
return inner
class Group(object):
"""
A group of channels that can be messaged at once, and that expire out
of the group after an expiry time (keep re-adding to keep them in).
"""
def __init__(self, alias=DEFAULT_CHANNEL_BACKEND, channel_backend=None):
self.name = name
if channel_backend:
self.channel_backend = channel_backend
else:
self.channel_backend = channel_backends[alias]
def add(self, channel):
self.channel_backend.add(self.name, channel)
def discard(self, channel):
self.channel_backend.discard(self.name, channel)
def channels(self):
self.channel_backend.channels(self.name)
def send(self, **kwargs):
self.channel_backend.send_group(self, self.name, kwargs)

View File

View File

@ -0,0 +1,91 @@
from django.test import TestCase
from ..channel import Channel
from ..backends.database import DatabaseChannelBackend
from ..backends.redis_py import RedisChannelBackend
from ..backends.memory import InMemoryChannelBackend
class MemoryBackendTests(TestCase):
backend_class = InMemoryChannelBackend
def setUp(self):
self.backend = self.backend_class()
def test_send_recv(self):
"""
Tests that channels can send and receive messages.
"""
self.backend.send("test", {"value": "blue"})
self.backend.send("test", {"value": "green"})
self.backend.send("test2", {"value": "red"})
# Get just one first
channel, message = self.backend.receive_many(["test"])
self.assertEqual(channel, "test")
self.assertEqual(message, {"value": "blue"})
# And the second
channel, message = self.backend.receive_many(["test"])
self.assertEqual(channel, "test")
self.assertEqual(message, {"value": "green"})
# And the other channel with multi select
channel, message = self.backend.receive_many(["test", "test2"])
self.assertEqual(channel, "test2")
self.assertEqual(message, {"value": "red"})
def test_message_expiry(self):
self.backend = self.backend_class(expiry=-100)
self.backend.send("test", {"value": "blue"})
channel, message = self.backend.receive_many(["test"])
self.assertIs(channel, None)
self.assertIs(message, None)
def test_groups(self):
"""
Tests that group addition and removal and listing works
"""
self.backend.group_add("tgroup", "test")
self.backend.group_add("tgroup", "test2")
self.backend.group_add("tgroup2", "test3")
self.assertEqual(
set(self.backend.group_channels("tgroup")),
{"test", "test2"},
)
self.backend.group_discard("tgroup", "test2")
self.backend.group_discard("tgroup", "test2")
self.assertEqual(
self.backend.group_channels("tgroup"),
["test"],
)
def test_group_send(self):
"""
Tests sending to groups.
"""
self.backend.group_add("tgroup", "test")
self.backend.group_add("tgroup", "test2")
self.backend.send_group("tgroup", {"value": "orange"})
channel, message = self.backend.receive_many(["test"])
self.assertEqual(channel, "test")
self.assertEqual(message, {"value": "orange"})
channel, message = self.backend.receive_many(["test2"])
self.assertEqual(channel, "test2")
self.assertEqual(message, {"value": "orange"})
def test_group_expiry(self):
self.backend = self.backend_class(expiry=-100)
self.backend.group_add("tgroup", "test")
self.backend.group_add("tgroup", "test2")
self.assertEqual(
self.backend.group_channels("tgroup"),
[],
)
class RedisBackendTests(MemoryBackendTests):
backend_class = RedisChannelBackend
class DatabaseBackendTests(MemoryBackendTests):
backend_class = DatabaseChannelBackend

View File

@ -41,9 +41,143 @@ you get the Hello World response, so things are working. If you don't see
a response, check you :doc:`installed Channels correctly <installation>`.
Now, that's not very exciting - raw HTTP responses are something Django can
do any time. Let's try some WebSockets!
do any time. Let's try some WebSockets, and make a basic chat server!
Delete that consumer from above - we'll need the normal Django view layer to
serve templates later - and make this WebSocket consumer instead::
# todo
@Channel.consumer("django.websocket.connect")
def ws_connect(channel, send_channel, **kwargs):
Group("chat").add(send_channel)
Now, let's look at what this is doing. It's tied to the
``django.websocket.connect`` channel, which means that it'll get a message
whenever a new WebSocket connection is opened by a client.
When it gets that message, it takes the ``send_channel`` key from it, which
is the unique response channel for that client, and adds it to the ``chat``
group, which means we can send messages to all connected chat clients.
Of course, if you've read through :doc:`concepts`, you'll know that channels
added to groups expire out after a while unless you keep renewing their
membership. This is because Channels is stateless; the worker processes
don't keep track of the open/close states of the potentially thousands of
connections you have open at any one time.
The solution to this is that the WebSocket interface servers will send
periodic "keepalive" messages on the ``django.websocket.keepalive`` channel,
so we can hook that up to re-add the channel (it's safe to add the channel to
a group it's already in - similarly, it's safe to discard a channel from a
group it's not in)::
@Channel.consumer("django.websocket.keepalive")
def ws_keepalive(channel, send_channel, **kwargs):
Group("chat").add(send_channel)
Of course, this is exactly the same code as the ``connect`` handler, so let's
just combine them::
@Channel.consumer("django.websocket.connect", "django.websocket.keepalive")
def ws_add(channel, send_channel, **kwargs):
Group("chat").add(send_channel)
And, even though channels will expire out, let's add an explicit ``disconnect``
handler to clean up as people disconnect (most channels will cleanly disconnect
and get this called)::
@Channel.consumer("django.websocket.disconnect")
def ws_disconnect(channel, send_channel, **kwargs):
Group("chat").discard(send_channel)
Now, that's taken care of adding and removing WebSocket send channels for the
``chat`` group; all we need to do now is take care of message sending. For now,
we're not going to store a history of messages or anything and just replay
any message sent in to all connected clients. Here's all the code::
@Channel.consumer("django.websocket.connect", "django.websocket.keepalive")
def ws_add(channel, send_channel, **kwargs):
Group("chat").add(send_channel)
@Channel.consumer("django.websocket.receive")
def ws_message(channel, send_channel, content, **kwargs):
Group("chat").send(content=content)
@Channel.consumer("django.websocket.disconnect")
def ws_disconnect(channel, send_channel, **kwargs):
Group("chat").discard(send_channel)
With all that code in your ``consumers.py`` file, you now have a working
set of a logic for a chat server. All you need to do now is get it deployed,
and as we'll see, that's not too hard.
Running with Channels
---------------------
Because Channels takes Django into a multi-process model, you can no longer
just run one process if you want to serve more than one protocol type.
There are multiple kinds of "interface server", and each one will service a
different type of request - one might do WSGI requests, one might handle
WebSockets, or you might have one that handles both.
These are separate from the "worker servers" where Django will run actual logic,
though, and so you'll need to configure a channel backend to allow the
channels to run over the network. By default, when you're using Django out of
the box, the channel backend is set to an in-memory one that only works in
process; this is enough to serve normal WSGI style requests (``runserver`` is
just running a WSGI interface and a worker in two threads), but now we want
WebSocket support we'll need a separate process to keep things clean.
For simplicity, we'll configure the database backend - this uses two tables
in the database to do message handling, and isn't particularly fast but
requires no extra dependencies. Put this in your ``settings.py`` file::
CHANNEL_BACKENDS = {
"default": {
"BACKEND": "channels.backends.database.DatabaseChannelBackend",
},
}
As you can see, the format is quite similar to the ``DATABASES`` setting in
Django, but for this case much simpler, as it just uses the default database
(you can set which alias it uses with the ``DB_ALIAS`` key).
In production, we'd recommend you use something like the Redis channel backend;
you can :doc:`read about the backends <backends>` and see how to set them up
and their performance considerations if you wish.
The second thing, once we have a networked channel backend set up, is to make
sure we're running the WebSocket interface server. Even in development, we need
to do this; ``runserver`` will take care of normal Web requests and running
a worker for us, but WebSockets require an in-process async solution.
The easiest way to do this is to use the ``runwsserver`` management command
that ships with Django; just make sure you've installed the latest release
of ``autobahn`` first::
pip install -U autobahn
python manage.py runwsserver
Run that alongside ``runserver`` and you'll have two interface servers, a
worker thread, and the channel backend all connected and running. You can
even launch separate worker processes with ``runworker`` if you like (you'll
need at least one of those if you're not also running ``runserver``).
Now, just open a browser and put the following into the JavaScript console
to test your new code::
socket = new WebSocket("ws://127.0.0.1:9000");
socket.onmessage = function(e) {
alert(e.data);
}
socket.send("hello world");
You should see an alert come back immediately saying "hello world" - your
message has round-tripped through the server and come back to trigger the alert.
You can open another tab and do the same there if you like, and both tabs will
receive the message and show an alert.
Feel free to put some calls to ``print`` in your handler functions too, if you
like, so you can understand when they're called. If you run three or four
copies of ``runworker``, too, you will probably be able to see the tasks running
on different workers.