From aa921b16598c74d2da7061abdc906aace4ab3719 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sun, 12 Jul 2015 20:19:10 -0500 Subject: [PATCH] Channel groups, tests and docs --- channels/backends/base.py | 2 +- channels/backends/database.py | 68 ++++++++++++++-- channels/backends/memory.py | 44 +++++++++- channels/backends/redis_py.py | 38 ++++++++- channels/channel.py | 28 ++++++- channels/tests/__init__.py | 0 channels/tests/test_backends.py | 91 +++++++++++++++++++++ docs/getting-started.rst | 138 +++++++++++++++++++++++++++++++- 8 files changed, 395 insertions(+), 14 deletions(-) create mode 100644 channels/tests/__init__.py create mode 100644 channels/tests/test_backends.py diff --git a/channels/backends/base.py b/channels/backends/base.py index e7dd139..7a99568 100644 --- a/channels/backends/base.py +++ b/channels/backends/base.py @@ -88,7 +88,7 @@ class BaseChannelBackend(object): This base class provides a default implementation; can be overridden to be more efficient by subclasses. """ - for channel in self.group_channels(): + for channel in self.group_channels(group): self.send(channel, message) def __str__(self): diff --git a/channels/backends/database.py b/channels/backends/database.py index bf8e842..ed59a9e 100644 --- a/channels/backends/database.py +++ b/channels/backends/database.py @@ -27,8 +27,8 @@ class DatabaseChannelBackend(BaseChannelBackend): """ return connections[self.db_alias] - @property - def model(self): + @cached_property + def channel_model(self): """ Initialises a new model to store messages; not done as part of a models.py as we don't want to make it for most installs. @@ -49,8 +49,30 @@ class DatabaseChannelBackend(BaseChannelBackend): editor.create_model(Message) return Message + @cached_property + def group_model(self): + """ + Initialises a new model to store groups; not done as part of a + models.py as we don't want to make it for most installs. + """ + # Make the model class + class Group(models.Model): + group = models.CharField(max_length=200) + channel = models.CharField(max_length=200) + expiry = models.DateTimeField(db_index=True) + class Meta: + apps = Apps() + app_label = "channels" + db_table = "django_channel_groups" + unique_together = [["group", "channel"]] + # Ensure its table exists + if Group._meta.db_table not in self.connection.introspection.table_names(self.connection.cursor()): + with self.connection.schema_editor() as editor: + editor.create_model(Group) + return Group + def send(self, channel, message): - self.model.objects.create( + self.channel_model.objects.create( channel = channel, content = json.dumps(message), expiry = now() + datetime.timedelta(seconds=self.expiry) @@ -59,15 +81,47 @@ class DatabaseChannelBackend(BaseChannelBackend): def receive_many(self, channels): if not channels: raise ValueError("Cannot receive on empty channel list!") - # Delete all expired messages (add 10 second grace period for clock sync) - self.model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete() + self._clean_expired() # Get a message from one of our channels - message = self.model.objects.filter(channel__in=channels).order_by("id").first() + message = self.channel_model.objects.filter(channel__in=channels).order_by("id").first() if message: - self.model.objects.filter(pk=message.pk).delete() + self.channel_model.objects.filter(pk=message.pk).delete() return message.channel, json.loads(message.content) else: return None, None + def _clean_expired(self): + """ + Cleans out expired groups and messages. + """ + # Include a 10-second grace period because that solves some clock sync + self.channel_model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete() + self.group_model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete() + + def group_add(self, group, channel, expiry=None): + """ + Adds the channel to the named group for at least 'expiry' + seconds (expiry defaults to message expiry if not provided). + """ + self.group_model.objects.update_or_create( + group = group, + channel = channel, + defaults = {"expiry": now() + datetime.timedelta(seconds=expiry or self.expiry)}, + ) + + def group_discard(self, group, channel): + """ + Removes the channel from the named group if it is in the group; + does nothing otherwise (does not error) + """ + self.group_model.objects.filter(group=group, channel=channel).delete() + + def group_channels(self, group): + """ + Returns an iterable of all channels in the group. + """ + self._clean_expired() + return list(self.group_model.objects.filter(group=group).values_list("channel", flat=True)) + def __str__(self): return "%s(alias=%s)" % (self.__class__.__name__, self.connection.alias) diff --git a/channels/backends/memory.py b/channels/backends/memory.py index 1d53d45..c398653 100644 --- a/channels/backends/memory.py +++ b/channels/backends/memory.py @@ -4,6 +4,7 @@ from collections import deque from .base import BaseChannelBackend queues = {} +groups = {} class InMemoryChannelBackend(BaseChannelBackend): """ @@ -17,18 +18,57 @@ class InMemoryChannelBackend(BaseChannelBackend): # Try JSON encoding it to make sure it would, but store the native version json.dumps(message) # Add to the deque, making it if needs be - queues.setdefault(channel, deque()).append(message) + queues.setdefault(channel, deque()).append((message, time.time() + self.expiry)) def receive_many(self, channels): if not channels: raise ValueError("Cannot receive on empty channel list!") # Try to pop a message from each channel + self._clean_expired() for channel in channels: try: # This doesn't clean up empty channels - OK for testing. # For later versions, have cleanup w/lock. - return channel, queues[channel].popleft() + return channel, queues[channel].popleft()[0] except (IndexError, KeyError): pass return None, None + def _clean_expired(self): + # Handle expired messages + for channel, messages in queues.items(): + while len(messages) and messages[0][1] < time.time(): + messages.popleft() + # Handle expired groups + for group, channels in list(groups.items()): + for channel, expiry in list(channels.items()): + if expiry < (time.time() - 10): + try: + del groups[group][channel] + except KeyError: + # Another thread might have got there first + pass + + def group_add(self, group, channel, expiry=None): + """ + Adds the channel to the named group for at least 'expiry' + seconds (expiry defaults to message expiry if not provided). + """ + groups.setdefault(group, {})[channel] = time.time() + (expiry or self.expiry) + + def group_discard(self, group, channel): + """ + Removes the channel from the named group if it is in the group; + does nothing otherwise (does not error) + """ + try: + del groups[group][channel] + except KeyError: + pass + + def group_channels(self, group): + """ + Returns an iterable of all channels in the group. + """ + self._clean_expired() + return groups.get(group, {}).keys() diff --git a/channels/backends/redis_py.py b/channels/backends/redis_py.py index 602e749..0557fc1 100644 --- a/channels/backends/redis_py.py +++ b/channels/backends/redis_py.py @@ -28,7 +28,7 @@ class RedisChannelBackend(BaseChannelBackend): def send(self, channel, message): # Write out message into expiring key (avoids big items in list) - key = uuid.uuid4() + key = self.prefix + uuid.uuid4().get_hex() self.connection.set( key, json.dumps(message), @@ -63,5 +63,41 @@ class RedisChannelBackend(BaseChannelBackend): else: return None, None + def group_add(self, group, channel, expiry=None): + """ + Adds the channel to the named group for at least 'expiry' + seconds (expiry defaults to message expiry if not provided). + """ + key = "%s:group:%s" % (self.prefix, group) + self.connection.zadd( + key, + **{channel: time.time() + (expiry or self.expiry)} + ) + + def group_discard(self, group, channel): + """ + Removes the channel from the named group if it is in the group; + does nothing otherwise (does not error) + """ + key = "%s:group:%s" % (self.prefix, group) + self.connection.zrem( + key, + channel, + ) + + def group_channels(self, group): + """ + Returns an iterable of all channels in the group. + """ + key = "%s:group:%s" % (self.prefix, group) + # Discard old channels + self.connection.zremrangebyscore(key, 0, int(time.time()) - 10) + # Return current lot + return self.connection.zrange( + key, + 0, + -1, + ) + def __str__(self): return "%s(host=%s, port=%s)" % (self.__class__.__name__, self.host, self.port) diff --git a/channels/channel.py b/channels/channel.py index 9572525..ac24c4b 100644 --- a/channels/channel.py +++ b/channels/channel.py @@ -54,7 +54,7 @@ class Channel(object): return view_producer(self.name) @classmethod - def consumer(self, channels, alias=DEFAULT_CHANNEL_BACKEND): + def consumer(self, *channels, alias=DEFAULT_CHANNEL_BACKEND): """ Decorator that registers a function as a consumer. """ @@ -68,3 +68,29 @@ class Channel(object): channel_backend.registry.add_consumer(func, channels) return func return inner + + +class Group(object): + """ + A group of channels that can be messaged at once, and that expire out + of the group after an expiry time (keep re-adding to keep them in). + """ + + def __init__(self, alias=DEFAULT_CHANNEL_BACKEND, channel_backend=None): + self.name = name + if channel_backend: + self.channel_backend = channel_backend + else: + self.channel_backend = channel_backends[alias] + + def add(self, channel): + self.channel_backend.add(self.name, channel) + + def discard(self, channel): + self.channel_backend.discard(self.name, channel) + + def channels(self): + self.channel_backend.channels(self.name) + + def send(self, **kwargs): + self.channel_backend.send_group(self, self.name, kwargs) diff --git a/channels/tests/__init__.py b/channels/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/channels/tests/test_backends.py b/channels/tests/test_backends.py new file mode 100644 index 0000000..1846bdc --- /dev/null +++ b/channels/tests/test_backends.py @@ -0,0 +1,91 @@ +from django.test import TestCase +from ..channel import Channel +from ..backends.database import DatabaseChannelBackend +from ..backends.redis_py import RedisChannelBackend +from ..backends.memory import InMemoryChannelBackend + + +class MemoryBackendTests(TestCase): + + backend_class = InMemoryChannelBackend + + def setUp(self): + self.backend = self.backend_class() + + def test_send_recv(self): + """ + Tests that channels can send and receive messages. + """ + self.backend.send("test", {"value": "blue"}) + self.backend.send("test", {"value": "green"}) + self.backend.send("test2", {"value": "red"}) + # Get just one first + channel, message = self.backend.receive_many(["test"]) + self.assertEqual(channel, "test") + self.assertEqual(message, {"value": "blue"}) + # And the second + channel, message = self.backend.receive_many(["test"]) + self.assertEqual(channel, "test") + self.assertEqual(message, {"value": "green"}) + # And the other channel with multi select + channel, message = self.backend.receive_many(["test", "test2"]) + self.assertEqual(channel, "test2") + self.assertEqual(message, {"value": "red"}) + + def test_message_expiry(self): + self.backend = self.backend_class(expiry=-100) + self.backend.send("test", {"value": "blue"}) + channel, message = self.backend.receive_many(["test"]) + self.assertIs(channel, None) + self.assertIs(message, None) + + def test_groups(self): + """ + Tests that group addition and removal and listing works + """ + self.backend.group_add("tgroup", "test") + self.backend.group_add("tgroup", "test2") + self.backend.group_add("tgroup2", "test3") + self.assertEqual( + set(self.backend.group_channels("tgroup")), + {"test", "test2"}, + ) + self.backend.group_discard("tgroup", "test2") + self.backend.group_discard("tgroup", "test2") + self.assertEqual( + self.backend.group_channels("tgroup"), + ["test"], + ) + + def test_group_send(self): + """ + Tests sending to groups. + """ + self.backend.group_add("tgroup", "test") + self.backend.group_add("tgroup", "test2") + self.backend.send_group("tgroup", {"value": "orange"}) + channel, message = self.backend.receive_many(["test"]) + self.assertEqual(channel, "test") + self.assertEqual(message, {"value": "orange"}) + channel, message = self.backend.receive_many(["test2"]) + self.assertEqual(channel, "test2") + self.assertEqual(message, {"value": "orange"}) + + def test_group_expiry(self): + self.backend = self.backend_class(expiry=-100) + self.backend.group_add("tgroup", "test") + self.backend.group_add("tgroup", "test2") + self.assertEqual( + self.backend.group_channels("tgroup"), + [], + ) + + +class RedisBackendTests(MemoryBackendTests): + + backend_class = RedisChannelBackend + + +class DatabaseBackendTests(MemoryBackendTests): + + backend_class = DatabaseChannelBackend diff --git a/docs/getting-started.rst b/docs/getting-started.rst index 67054cc..9313844 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -41,9 +41,143 @@ you get the Hello World response, so things are working. If you don't see a response, check you :doc:`installed Channels correctly `. Now, that's not very exciting - raw HTTP responses are something Django can -do any time. Let's try some WebSockets! +do any time. Let's try some WebSockets, and make a basic chat server! Delete that consumer from above - we'll need the normal Django view layer to serve templates later - and make this WebSocket consumer instead:: - # todo + @Channel.consumer("django.websocket.connect") + def ws_connect(channel, send_channel, **kwargs): + Group("chat").add(send_channel) + +Now, let's look at what this is doing. It's tied to the +``django.websocket.connect`` channel, which means that it'll get a message +whenever a new WebSocket connection is opened by a client. + +When it gets that message, it takes the ``send_channel`` key from it, which +is the unique response channel for that client, and adds it to the ``chat`` +group, which means we can send messages to all connected chat clients. + +Of course, if you've read through :doc:`concepts`, you'll know that channels +added to groups expire out after a while unless you keep renewing their +membership. This is because Channels is stateless; the worker processes +don't keep track of the open/close states of the potentially thousands of +connections you have open at any one time. + +The solution to this is that the WebSocket interface servers will send +periodic "keepalive" messages on the ``django.websocket.keepalive`` channel, +so we can hook that up to re-add the channel (it's safe to add the channel to +a group it's already in - similarly, it's safe to discard a channel from a +group it's not in):: + + @Channel.consumer("django.websocket.keepalive") + def ws_keepalive(channel, send_channel, **kwargs): + Group("chat").add(send_channel) + +Of course, this is exactly the same code as the ``connect`` handler, so let's +just combine them:: + + @Channel.consumer("django.websocket.connect", "django.websocket.keepalive") + def ws_add(channel, send_channel, **kwargs): + Group("chat").add(send_channel) + +And, even though channels will expire out, let's add an explicit ``disconnect`` +handler to clean up as people disconnect (most channels will cleanly disconnect +and get this called):: + + @Channel.consumer("django.websocket.disconnect") + def ws_disconnect(channel, send_channel, **kwargs): + Group("chat").discard(send_channel) + +Now, that's taken care of adding and removing WebSocket send channels for the +``chat`` group; all we need to do now is take care of message sending. For now, +we're not going to store a history of messages or anything and just replay +any message sent in to all connected clients. Here's all the code:: + + @Channel.consumer("django.websocket.connect", "django.websocket.keepalive") + def ws_add(channel, send_channel, **kwargs): + Group("chat").add(send_channel) + + @Channel.consumer("django.websocket.receive") + def ws_message(channel, send_channel, content, **kwargs): + Group("chat").send(content=content) + + @Channel.consumer("django.websocket.disconnect") + def ws_disconnect(channel, send_channel, **kwargs): + Group("chat").discard(send_channel) + +With all that code in your ``consumers.py`` file, you now have a working +set of a logic for a chat server. All you need to do now is get it deployed, +and as we'll see, that's not too hard. + +Running with Channels +--------------------- + +Because Channels takes Django into a multi-process model, you can no longer +just run one process if you want to serve more than one protocol type. + +There are multiple kinds of "interface server", and each one will service a +different type of request - one might do WSGI requests, one might handle +WebSockets, or you might have one that handles both. + +These are separate from the "worker servers" where Django will run actual logic, +though, and so you'll need to configure a channel backend to allow the +channels to run over the network. By default, when you're using Django out of +the box, the channel backend is set to an in-memory one that only works in +process; this is enough to serve normal WSGI style requests (``runserver`` is +just running a WSGI interface and a worker in two threads), but now we want +WebSocket support we'll need a separate process to keep things clean. + +For simplicity, we'll configure the database backend - this uses two tables +in the database to do message handling, and isn't particularly fast but +requires no extra dependencies. Put this in your ``settings.py`` file:: + + CHANNEL_BACKENDS = { + "default": { + "BACKEND": "channels.backends.database.DatabaseChannelBackend", + }, + } + +As you can see, the format is quite similar to the ``DATABASES`` setting in +Django, but for this case much simpler, as it just uses the default database +(you can set which alias it uses with the ``DB_ALIAS`` key). + +In production, we'd recommend you use something like the Redis channel backend; +you can :doc:`read about the backends ` and see how to set them up +and their performance considerations if you wish. + +The second thing, once we have a networked channel backend set up, is to make +sure we're running the WebSocket interface server. Even in development, we need +to do this; ``runserver`` will take care of normal Web requests and running +a worker for us, but WebSockets require an in-process async solution. + +The easiest way to do this is to use the ``runwsserver`` management command +that ships with Django; just make sure you've installed the latest release +of ``autobahn`` first:: + + pip install -U autobahn + python manage.py runwsserver + +Run that alongside ``runserver`` and you'll have two interface servers, a +worker thread, and the channel backend all connected and running. You can +even launch separate worker processes with ``runworker`` if you like (you'll +need at least one of those if you're not also running ``runserver``). + +Now, just open a browser and put the following into the JavaScript console +to test your new code:: + + socket = new WebSocket("ws://127.0.0.1:9000"); + socket.onmessage = function(e) { + alert(e.data); + } + socket.send("hello world"); + +You should see an alert come back immediately saying "hello world" - your +message has round-tripped through the server and come back to trigger the alert. +You can open another tab and do the same there if you like, and both tabs will +receive the message and show an alert. + +Feel free to put some calls to ``print`` in your handler functions too, if you +like, so you can understand when they're called. If you run three or four +copies of ``runworker``, too, you will probably be able to see the tasks running +on different workers.