From 2839a4265e2d786826dbb76512dfcb6d8fd0aa55 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Wed, 9 Sep 2015 22:07:22 -0500 Subject: [PATCH 01/17] Fix tests. --- channels/tests/test_backends.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/channels/tests/test_backends.py b/channels/tests/test_backends.py index 1846bdc..a626d2b 100644 --- a/channels/tests/test_backends.py +++ b/channels/tests/test_backends.py @@ -10,7 +10,7 @@ class MemoryBackendTests(TestCase): backend_class = InMemoryChannelBackend def setUp(self): - self.backend = self.backend_class() + self.backend = self.backend_class(routing={}) def test_send_recv(self): """ @@ -33,7 +33,7 @@ class MemoryBackendTests(TestCase): self.assertEqual(message, {"value": "red"}) def test_message_expiry(self): - self.backend = self.backend_class(expiry=-100) + self.backend = self.backend_class(routing={}, expiry=-100) self.backend.send("test", {"value": "blue"}) channel, message = self.backend.receive_many(["test"]) self.assertIs(channel, None) @@ -72,7 +72,7 @@ class MemoryBackendTests(TestCase): self.assertEqual(message, {"value": "orange"}) def test_group_expiry(self): - self.backend = self.backend_class(expiry=-100) + self.backend = self.backend_class(routing={}, expiry=-100) self.backend.group_add("tgroup", "test") self.backend.group_add("tgroup", "test2") self.assertEqual( From e042da5bc14afc41671ee89bfcf568a7c3f41e24 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Wed, 9 Sep 2015 22:07:30 -0500 Subject: [PATCH 02/17] Add a bit of in-memory docs --- docs/backends.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/backends.rst b/docs/backends.rst index be9faac..e79dda9 100644 --- a/docs/backends.rst +++ b/docs/backends.rst @@ -8,6 +8,13 @@ you wish; the API is very simple and documented below. In-memory --------- +The in-memory backend is the simplest, and not really a backend as such; +it exists purely to enable Django to run in a "normal" mode where no Channels +functionality is available, just normal HTTP request processing. You should +never need to set it explicitly. + +This backend provides no network transparency or non-blocking guarantees. + Database -------- From eed6e5e607d36851ab3d5fbbf09994495abb8f00 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Wed, 9 Sep 2015 22:07:52 -0500 Subject: [PATCH 03/17] Don't waffle about ordering guarantees --- docs/message-standards.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/message-standards.rst b/docs/message-standards.rst index 722e218..8924048 100644 --- a/docs/message-standards.rst +++ b/docs/message-standards.rst @@ -11,8 +11,7 @@ In addition to the standards outlined below, each message may contain a separate connection and data receiving messages (like WebSockets) will only contain the connection and detailed client information in the first message; use the ``@channel_session`` decorator to persist this data to consumers of -the received data (the decorator will take care of handling persistence and -ordering guarantees on messages). +the received data (or something else based on ``reply_channel``). All messages must be able to be encoded as JSON; channel backends don't necessarily have to use JSON, but we consider it the lowest common denominator From 638bf260f823e333dd281f921aa8fb5f6d73e4ec Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Sep 2015 11:52:49 -0500 Subject: [PATCH 04/17] Fixed #6: Linearise decorator and better user session stuff --- channels/auth.py | 66 +++++++++++ channels/backends/base.py | 13 ++ channels/backends/database.py | 45 ++++++- channels/backends/memory.py | 20 ++++ channels/backends/redis_py.py | 15 +++ channels/decorators.py | 144 ++++++++++++----------- channels/interfaces/websocket_twisted.py | 2 +- channels/message.py | 7 ++ channels/worker.py | 4 + 9 files changed, 243 insertions(+), 73 deletions(-) create mode 100644 channels/auth.py diff --git a/channels/auth.py b/channels/auth.py new file mode 100644 index 0000000..261010c --- /dev/null +++ b/channels/auth.py @@ -0,0 +1,66 @@ +import functools + +from django.contrib import auth +from .decorators import channel_session, http_session + + +def transfer_user(from_session, to_session): + """ + Transfers user from HTTP session to channel session. + """ + to_session[auth.BACKEND_SESSION_KEY] = from_session[auth.BACKEND_SESSION_KEY] + to_session[auth.SESSION_KEY] = from_session[auth.SESSION_KEY] + to_session[auth.HASH_SESSION_KEY] = from_session[auth.HASH_SESSION_KEY] + + +def channel_session_user(func): + """ + Presents a message.user attribute obtained from a user ID in the channel + session, rather than in the http_session. Turns on channel session implicitly. + """ + @channel_session + @functools.wraps(func) + def inner(message, *args, **kwargs): + # If we didn't get a session, then we don't get a user + if not hasattr(message, "channel_session"): + raise ValueError("Did not see a channel session to get auth from") + if message.channel_session is None: + message.user = None + # Otherwise, be a bit naughty and make a fake Request with just + # a "session" attribute (later on, perhaps refactor contrib.auth to + # pass around session rather than request) + else: + fake_request = type("FakeRequest", (object, ), {"session": message.channel_session}) + message.user = auth.get_user(fake_request) + # Run the consumer + return func(message, *args, **kwargs) + return inner + + +def http_session_user(func): + """ + Wraps a HTTP or WebSocket consumer (or any consumer of messages + that provides a "COOKIES" attribute) to provide both a "session" + attribute and a "user" attibute, like AuthMiddleware does. + + This runs http_session() to get a session to hook auth off of. + If the user does not have a session cookie set, both "session" + and "user" will be None. + """ + @http_session + @functools.wraps(func) + def inner(message, *args, **kwargs): + # If we didn't get a session, then we don't get a user + if not hasattr(message, "http_session"): + raise ValueError("Did not see a http session to get auth from") + if message.http_session is None: + message.user = None + # Otherwise, be a bit naughty and make a fake Request with just + # a "session" attribute (later on, perhaps refactor contrib.auth to + # pass around session rather than request) + else: + fake_request = type("FakeRequest", (object, ), {"session": message.http_session}) + message.user = auth.get_user(fake_request) + # Run the consumer + return func(message, *args, **kwargs) + return inner diff --git a/channels/backends/base.py b/channels/backends/base.py index 84e874e..9baaf6c 100644 --- a/channels/backends/base.py +++ b/channels/backends/base.py @@ -93,3 +93,16 @@ class BaseChannelBackend(object): def __str__(self): return self.__class__.__name__ + + def lock_channel(self, channel): + """ + Attempts to get a lock on the named channel. Returns True if lock + obtained, False if lock not obtained. + """ + raise NotImplementedError() + + def unlock_channel(self, channel): + """ + Unlocks the named channel. Always succeeds. + """ + raise NotImplementedError() diff --git a/channels/backends/database.py b/channels/backends/database.py index 504f3ad..fdc866a 100644 --- a/channels/backends/database.py +++ b/channels/backends/database.py @@ -3,7 +3,7 @@ import json import datetime from django.apps.registry import Apps -from django.db import models, connections, DEFAULT_DB_ALIAS +from django.db import models, connections, DEFAULT_DB_ALIAS, IntegrityError from django.utils.functional import cached_property from django.utils.timezone import now @@ -71,6 +71,26 @@ class DatabaseChannelBackend(BaseChannelBackend): editor.create_model(Group) return Group + @cached_property + def lock_model(self): + """ + Initialises a new model to store groups; not done as part of a + models.py as we don't want to make it for most installs. + """ + # Make the model class + class Lock(models.Model): + channel = models.CharField(max_length=200, unique=True) + expiry = models.DateTimeField(db_index=True) + class Meta: + apps = Apps() + app_label = "channels" + db_table = "django_channel_locks" + # Ensure its table exists + if Lock._meta.db_table not in self.connection.introspection.table_names(self.connection.cursor()): + with self.connection.schema_editor() as editor: + editor.create_model(Lock) + return Lock + def send(self, channel, message): self.channel_model.objects.create( channel = channel, @@ -97,6 +117,7 @@ class DatabaseChannelBackend(BaseChannelBackend): # Include a 10-second grace period because that solves some clock sync self.channel_model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete() self.group_model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete() + self.lock_model.objects.filter(expiry__lt=now() - datetime.timedelta(seconds=10)).delete() def group_add(self, group, channel, expiry=None): """ @@ -123,5 +144,27 @@ class DatabaseChannelBackend(BaseChannelBackend): self._clean_expired() return list(self.group_model.objects.filter(group=group).values_list("channel", flat=True)) + def lock_channel(self, channel, expiry=None): + """ + Attempts to get a lock on the named channel. Returns True if lock + obtained, False if lock not obtained. + """ + # We rely on the UNIQUE constraint for only-one-thread-wins on locks + try: + self.lock_model.objects.create( + channel = channel, + expiry = now() + datetime.timedelta(seconds=expiry or self.expiry), + ) + except IntegrityError: + return False + else: + return True + + def unlock_channel(self, channel): + """ + Unlocks the named channel. Always succeeds. + """ + self.lock_model.objects.filter(channel=channel).delete() + def __str__(self): return "%s(alias=%s)" % (self.__class__.__name__, self.connection.alias) diff --git a/channels/backends/memory.py b/channels/backends/memory.py index c398653..1d78363 100644 --- a/channels/backends/memory.py +++ b/channels/backends/memory.py @@ -5,6 +5,7 @@ from .base import BaseChannelBackend queues = {} groups = {} +locks = set() class InMemoryChannelBackend(BaseChannelBackend): """ @@ -72,3 +73,22 @@ class InMemoryChannelBackend(BaseChannelBackend): """ self._clean_expired() return groups.get(group, {}).keys() + + def lock_channel(self, channel): + """ + Attempts to get a lock on the named channel. Returns True if lock + obtained, False if lock not obtained. + """ + # Probably not perfect for race conditions, but close enough considering + # it shouldn't be used. + if channel not in locks: + locks.add(channel) + return True + else: + return False + + def unlock_channel(self, channel): + """ + Unlocks the named channel. Always succeeds. + """ + locks.discard(channel) diff --git a/channels/backends/redis_py.py b/channels/backends/redis_py.py index 5c8671c..6af662f 100644 --- a/channels/backends/redis_py.py +++ b/channels/backends/redis_py.py @@ -101,5 +101,20 @@ class RedisChannelBackend(BaseChannelBackend): # TODO: send_group efficient implementation using Lua + def lock_channel(self, channel, expiry=None): + """ + Attempts to get a lock on the named channel. Returns True if lock + obtained, False if lock not obtained. + """ + key = "%s:lock:%s" % (self.prefix, channel) + return bool(self.connection.setnx(key, "1")) + + def unlock_channel(self, channel): + """ + Unlocks the named channel. Always succeeds. + """ + key = "%s:lock:%s" % (self.prefix, channel) + self.connection.delete(key) + def __str__(self): return "%s(host=%s, port=%s)" % (self.__class__.__name__, self.host, self.port) diff --git a/channels/decorators.py b/channels/decorators.py index 03a4ecc..280b493 100644 --- a/channels/decorators.py +++ b/channels/decorators.py @@ -3,16 +3,77 @@ import hashlib from importlib import import_module from django.conf import settings -from django.utils import six -from django.contrib import auth -from channels import channel_backends, DEFAULT_CHANNEL_BACKEND + +def linearize(func): + """ + Makes sure the contained consumer does not run at the same time other + consumers are running on messages with the same reply_channel. + + Required if you don't want weird things like a second consumer starting + up before the first has exited and saved its session. Doesn't guarantee + ordering, just linearity. + """ + @functools.wraps(func) + def inner(message, *args, **kwargs): + # Make sure there's a reply channel + if not message.reply_channel: + raise ValueError("No reply_channel sent to consumer; @no_overlap can only be used on messages containing it.") + # Get the lock, or re-queue + locked = message.channel_backend.lock_channel(message.reply_channel) + if not locked: + raise message.Requeue() + # OK, keep going + try: + return func(message, *args, **kwargs) + finally: + message.channel_backend.unlock_channel(message.reply_channel) + return inner + + +def channel_session(func): + """ + Provides a session-like object called "channel_session" to consumers + as a message attribute that will auto-persist across consumers with + the same incoming "reply_channel" value. + + Use this to persist data across the lifetime of a connection. + """ + @functools.wraps(func) + def inner(message, *args, **kwargs): + # Make sure there's a reply_channel + if not message.reply_channel: + raise ValueError("No reply_channel sent to consumer; @channel_session can only be used on messages containing it.") + # Make sure there's NOT a channel_session already + if hasattr(message, "channel_session"): + raise ValueError("channel_session decorator wrapped inside another channel_session decorator") + # Turn the reply_channel into a valid session key length thing. + # We take the last 24 bytes verbatim, as these are the random section, + # and then hash the remaining ones onto the start, and add a prefix + reply_name = message.reply_channel.name + session_key = "skt" + hashlib.md5(reply_name[:-24]).hexdigest()[:8] + reply_name[-24:] + # Make a session storage + session_engine = import_module(settings.SESSION_ENGINE) + session = session_engine.SessionStore(session_key=session_key) + # If the session does not already exist, save to force our + # session key to be valid. + if not session.exists(session.session_key): + session.save(must_create=True) + message.channel_session = session + # Run the consumer + try: + return func(message, *args, **kwargs) + finally: + # Persist session if needed + if session.modified: + session.save() + return inner def http_session(func): """ - Wraps a HTTP or WebSocket consumer (or any consumer of messages - that provides a "COOKIES" or "GET" attribute) to provide a "session" + Wraps a HTTP or WebSocket connect consumer (or any consumer of messages + that provides a "cooikies" or "get" attribute) to provide a "http_session" attribute that behaves like request.session; that is, it's hung off of a per-user session key that is saved in a cookie or passed as the "session_key" GET parameter. @@ -21,13 +82,16 @@ def http_session(func): don't have one - that's what SessionMiddleware is for, this is a simpler read-only version for more low-level code. - If a user does not have a session we can inflate, the "session" attribute will - be None, rather than an empty session you can write to. + If a message does not have a session we can inflate, the "session" attribute + will be None, rather than an empty session you can write to. """ @functools.wraps(func) def inner(message, *args, **kwargs): if "cookies" not in message.content and "get" not in message.content: - raise ValueError("No cookies or get sent to consumer; this decorator can only be used on messages containing at least one.") + raise ValueError("No cookies or get sent to consumer - cannot initialise http_session") + # Make sure there's NOT a http_session already + if hasattr(message, "http_session"): + raise ValueError("http_session decorator wrapped inside another http_session decorator") # Make sure there's a session key session_key = None if "get" in message.content: @@ -43,7 +107,7 @@ def http_session(func): session = session_engine.SessionStore(session_key=session_key) else: session = None - message.session = session + message.http_session = session # Run the consumer result = func(message, *args, **kwargs) # Persist session if needed (won't be saved if error happens) @@ -51,65 +115,3 @@ def http_session(func): session.save() return result return inner - - -def http_django_auth(func): - """ - Wraps a HTTP or WebSocket consumer (or any consumer of messages - that provides a "COOKIES" attribute) to provide both a "session" - attribute and a "user" attibute, like AuthMiddleware does. - - This runs http_session() to get a session to hook auth off of. - If the user does not have a session cookie set, both "session" - and "user" will be None. - """ - @http_session - @functools.wraps(func) - def inner(message, *args, **kwargs): - # If we didn't get a session, then we don't get a user - if not hasattr(message, "session"): - raise ValueError("Did not see a session to get auth from") - if message.session is None: - message.user = None - # Otherwise, be a bit naughty and make a fake Request with just - # a "session" attribute (later on, perhaps refactor contrib.auth to - # pass around session rather than request) - else: - fake_request = type("FakeRequest", (object, ), {"session": message.session}) - message.user = auth.get_user(fake_request) - # Run the consumer - return func(message, *args, **kwargs) - return inner - - -def channel_session(func): - """ - Provides a session-like object called "channel_session" to consumers - as a message attribute that will auto-persist across consumers with - the same incoming "reply_channel" value. - """ - @functools.wraps(func) - def inner(message, *args, **kwargs): - # Make sure there's a reply_channel in kwargs - if not message.reply_channel: - raise ValueError("No reply_channel sent to consumer; this decorator can only be used on messages containing it.") - # Turn the reply_channel into a valid session key length thing. - # We take the last 24 bytes verbatim, as these are the random section, - # and then hash the remaining ones onto the start, and add a prefix - # TODO: See if there's a better way of doing this - reply_name = message.reply_channel.name - session_key = "skt" + hashlib.md5(reply_name[:-24]).hexdigest()[:8] + reply_name[-24:] - # Make a session storage - session_engine = import_module(settings.SESSION_ENGINE) - session = session_engine.SessionStore(session_key=session_key) - # If the session does not already exist, save to force our session key to be valid - if not session.exists(session.session_key): - session.save() - message.channel_session = session - # Run the consumer - result = func(message, *args, **kwargs) - # Persist session if needed (won't be saved if error happens) - if session.modified: - session.save() - return result - return inner diff --git a/channels/interfaces/websocket_twisted.py b/channels/interfaces/websocket_twisted.py index 478d03b..ff5e947 100644 --- a/channels/interfaces/websocket_twisted.py +++ b/channels/interfaces/websocket_twisted.py @@ -18,7 +18,7 @@ class InterfaceProtocol(WebSocketServerProtocol): self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] self.request_info = { "path": request.path, - "GET": request.params, + "get": request.params, } def onOpen(self): diff --git a/channels/message.py b/channels/message.py index bc7eda5..c9a80bd 100644 --- a/channels/message.py +++ b/channels/message.py @@ -10,6 +10,13 @@ class Message(object): to use to reply to this message's end user, if that makes sense. """ + class Requeue(Exception): + """ + Raise this while processing a message to requeue it back onto the + channel. Useful if you're manually ensuring partial ordering, etc. + """ + pass + def __init__(self, content, channel, channel_backend, reply_channel=None): self.content = content self.channel = channel diff --git a/channels/worker.py b/channels/worker.py index db0a896..50de9bd 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -1,5 +1,6 @@ import traceback from .message import Message +from .utils import name_that_thing class Worker(object): @@ -31,5 +32,8 @@ class Worker(object): self.callback(channel, message) try: consumer(message) + except Message.Requeue: + self.channel_backend.send(channel, content) except: + print "Error processing message with consumer %s:" % name_that_thing(consumer) traceback.print_exc() From 15d29a0230d31aa981a11c63bba6ca9414421009 Mon Sep 17 00:00:00 2001 From: Faris Chebib Date: Thu, 10 Sep 2015 11:30:35 -0600 Subject: [PATCH 05/17] added external link to golang example --- docs/concepts.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/concepts.rst b/docs/concepts.rst index e91cc64..1f2de1e 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -46,7 +46,7 @@ The channels have capacity, so a load of producers can write lots of messages into a channel with no consumers and then a consumer can come along later and will start getting served those queued messages. -If you've used channels in Go, these are reasonably similar to those. The key +If you've used `channels in Go `_, these are reasonably similar to those. The key difference is that these channels are network-transparent; the implementations of channels we provide are all accessible across a network to consumers and producers running in different processes or on different machines. From 19130ebb05a2eea7fe4b9af942826fe58463af5c Mon Sep 17 00:00:00 2001 From: Faris Chebib Date: Thu, 10 Sep 2015 11:54:51 -0600 Subject: [PATCH 06/17] updated concepts and added six to reqs --- docs/concepts.rst | 2 +- setup.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/concepts.rst b/docs/concepts.rst index 1f2de1e..cb5febb 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -161,7 +161,7 @@ and be less than 200 characters long. It's optional for a backend implementation to understand this - after all, it's only important at scale, where you want to shard the two types differently -- but it's present nonetheless. For more on scaling, and how to handle channel +— but it's present nonetheless. For more on scaling, and how to handle channel types if you're writing a backend or interface server, read :doc:`scaling`. Groups diff --git a/setup.py b/setup.py index a5e0547..845e3c1 100644 --- a/setup.py +++ b/setup.py @@ -10,4 +10,7 @@ setup( license='BSD', packages=find_packages(), include_package_data=True, + install_requires=[ + 'six', + ] ) From bd6f61de98597463ab7b198cf3e07b4cb575a90b Mon Sep 17 00:00:00 2001 From: Faris Chebib Date: Thu, 10 Sep 2015 12:00:47 -0600 Subject: [PATCH 07/17] updated getting started to include Group ref --- docs/getting-started.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/getting-started.rst b/docs/getting-started.rst index 77ce6b0..6ed982e 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -123,6 +123,8 @@ And, even though channels will expire out, let's add an explicit ``disconnect`` handler to clean up as people disconnect (most channels will cleanly disconnect and get this called):: + from channels import Group + # Connected to websocket.disconnect def ws_disconnect(message): Group("chat").discard(message.reply_channel) From 275bfdf1e50954eef6d3349b1ecf6880c278d5b8 Mon Sep 17 00:00:00 2001 From: Faris Chebib Date: Thu, 10 Sep 2015 12:04:01 -0600 Subject: [PATCH 08/17] Group ref on keepalive --- docs/getting-started.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/getting-started.rst b/docs/getting-started.rst index 6ed982e..5d210d9 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -105,6 +105,8 @@ so we can hook that up to re-add the channel (it's safe to add the channel to a group it's already in - similarly, it's safe to discard a channel from a group it's not in):: + from channels import Group + # Connected to websocket.keepalive def ws_keepalive(message): Group("chat").add(message.reply_channel) From d4de42d3b26212f25209c302e0679df860292316 Mon Sep 17 00:00:00 2001 From: Faris Chebib Date: Thu, 10 Sep 2015 12:04:52 -0600 Subject: [PATCH 09/17] Group ref on keepalive --- docs/getting-started.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/getting-started.rst b/docs/getting-started.rst index 5d210d9..3d4c112 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -71,6 +71,8 @@ do any time. Let's try some WebSockets, and make a basic chat server! Delete that consumer and its routing - we'll want the normal Django view layer to serve HTTP requests from now on - and make this WebSocket consumer instead:: + from channels import Group + def ws_add(message): Group("chat").add(message.reply_channel) From 27f54ad23b53aad4331e4fc1efb4aa3614936528 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Sep 2015 13:08:09 -0500 Subject: [PATCH 10/17] Rework getting started --- docs/getting-started.rst | 205 ++++++++++++++++++++++----------------- 1 file changed, 115 insertions(+), 90 deletions(-) diff --git a/docs/getting-started.rst b/docs/getting-started.rst index 77ce6b0..bedf07a 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -229,105 +229,31 @@ like, so you can understand when they're called. If you run three or four copies of ``runworker`` you'll probably be able to see the tasks running on different workers. -Authentication --------------- - -Now, of course, a WebSocket solution is somewhat limited in scope without the -ability to live with the rest of your website - in particular, we want to make -sure we know what user we're talking to, in case we have things like private -chat channels (we don't want a solution where clients just ask for the right -channels, as anyone could change the code and just put in private channel names) - -It can also save you having to manually make clients ask for what they want to -see; if I see you open a WebSocket to my "updates" endpoint, and I know which -user ID, I can just auto-add that channel to all the relevant groups (mentions -of that user, for example). - -Handily, as WebSockets start off using the HTTP protocol, they have a lot of -familiar features, including a path, GET parameters, and cookies. We'd like to -use these to hook into the familiar Django session and authentication systems; -after all, WebSockets are no good unless we can identify who they belong to -and do things securely. - -In addition, we don't want the interface servers storing data or trying to run -authentication; they're meant to be simple, lean, fast processes without much -state, and so we'll need to do our authentication inside our consumer functions. - -Fortunately, because Channels has standardised WebSocket event -:doc:`message-standards`, it ships with decorators that help you with -authentication, as well as using Django's session framework (which authentication -relies on). Channels can use Django sessions either from cookies (if you're running your websocket -server on the same port as your main site, which requires a reverse proxy that -understands WebSockets), or from a ``session_key`` GET parameter, which -is much more portable, and works in development where you need to run a separate -WebSocket server (by default, on port 9000). - -All we need to do is add the ``django_http_auth`` decorator to our views, -and we'll get extra ``session`` and ``user`` keyword attributes on ``message`` we can use; -let's make one where users can only chat to people with the same first letter -of their username:: - - from channels import Channel, Group - from channels.decorators import django_http_auth - - @django_http_auth - def ws_add(message): - Group("chat-%s" % message.user.username[0]).add(message.reply_channel) - - @django_http_auth - def ws_message(message): - Group("chat-%s" % message.user.username[0]).send(message.content) - - @django_http_auth - def ws_disconnect(message): - Group("chat-%s" % message.user.username[0]).discard(message.reply_channel) - -Now, when we connect to the WebSocket we'll have to remember to provide the -Django session ID as part of the URL, like this:: - - socket = new WebSocket("ws://127.0.0.1:9000/?session_key=abcdefg"); - -You can get the current session key in a template with ``{{ request.session.session_key }}``. -Note that Channels can't work with signed cookie sessions - since only HTTP -responses can set cookies, it needs a backend it can write to separately to -store state. - Persisting Data --------------- -Doing chatrooms by username first letter is a nice simple example, but it's +Echoing messages is a nice simple example, but it's skirting around the real design pattern - persistent state for connections. -A user may open our chat site and select the chatroom to join themselves, so we -should let them send this request in the initial WebSocket connection, -check they're allowed to access it, and then remember which room a socket is -connected to when they send a message in so we know which group to send it to. +Let's consider a basic chat site where a user requests a chat room upon initial +connection, as part of the query string (e.g. ``http://host/websocket?room=abc``). -The ``reply_channel`` is our unique pointer to the open WebSocket - as you've -seen, we do all our operations on it - but it's not something we can annotate -with data; it's just a simple string, and even if we hack around and set -attributes on it that's not going to carry over to other workers. +The ``reply_channel`` attribute you've seen before is our unique pointer to the +open WebSocket - because it varies between different clients, it's how we can +keep track of "who" a message is from. Remember, Channels is network-trasparent +and can run on multiple workers, so you can't just store things locally in +global variables or similar. -Instead, the solution is to persist information keyed by the send channel in +Instead, the solution is to persist information keyed by the ``reply_channel`` in some other data store - sound familiar? This is what Django's session framework does for HTTP requests, only there it uses cookies as the lookup key rather than the ``reply_channel``. -Now, as you saw above, you can use the ``django_http_auth`` decorator to get -both a ``user`` and a ``session`` attribute on your message - and, -indeed, there is a ``http_session`` decorator that will just give you -the ``session`` attribute. - -However, that session is based on cookies, and so follows the user round the -site - it's great for information that should persist across all WebSocket and -HTTP connections, but not great for information that is specific to a single -WebSocket (such as "which chatroom should this socket be connected to"). For -this reason, Channels also provides a ``channel_session`` decorator, -which adds a ``channel_session`` attribute to the message; this works just like -the normal ``session`` attribute, and persists to the same storage, but varies -per-channel rather than per-cookie. +Channels provides a ``channel_session`` decorator for this purpose - it +provides you with an attribute called ``message.channel_session`` that acts +just like a normal Django session. Let's use it now to build a chat server that expects you to pass a chatroom -name in the path of your WebSocket request (we'll ignore auth for now):: +name in the path of your WebSocket request (we'll ignore auth for now - that's next):: from channels import Channel from channels.decorators import channel_session @@ -358,9 +284,102 @@ name in the path of your WebSocket request (we'll ignore auth for now):: If you play around with it from the console (or start building a simple JavaScript chat client that appends received messages to a div), you'll see -that you can now request which chat room you want in the initial request. We -could easily add in the auth decorator here too and do an initial check in -``connect`` that the user had permission to join that chatroom. +that you can now request which chat room you want in the initial request. + +Authentication +-------------- + +Now, of course, a WebSocket solution is somewhat limited in scope without the +ability to live with the rest of your website - in particular, we want to make +sure we know what user we're talking to, in case we have things like private +chat channels (we don't want a solution where clients just ask for the right +channels, as anyone could change the code and just put in private channel names) + +It can also save you having to manually make clients ask for what they want to +see; if I see you open a WebSocket to my "updates" endpoint, and I know which +user you are, I can just auto-add that channel to all the relevant groups (mentions +of that user, for example). + +Handily, as WebSockets start off using the HTTP protocol, they have a lot of +familiar features, including a path, GET parameters, and cookies. We'd like to +use these to hook into the familiar Django session and authentication systems; +after all, WebSockets are no good unless we can identify who they belong to +and do things securely. + +In addition, we don't want the interface servers storing data or trying to run +authentication; they're meant to be simple, lean, fast processes without much +state, and so we'll need to do our authentication inside our consumer functions. + +Fortunately, because Channels has standardised WebSocket event +:doc:`message-standards`, it ships with decorators that help you with +both authentication and getting the underlying Django session (which is what +Django authentication relies on). + +Channels can use Django sessions either from cookies (if you're running your websocket +server on the same port as your main site, which requires a reverse proxy that +understands WebSockets), or from a ``session_key`` GET parameter, which +is much more portable, and works in development where you need to run a separate +WebSocket server (by default, on port 9000). + +You get access to a user's normal Django session using the ``http_session`` +decorator - that gives you a ``message.http_session`` attribute that behaves +just like ``request.session``. You can go one further and use ``http_session_user`` +which will provide a ``message.user`` attribute as well as the session attribute. + +Now, one thing to note is that you only get the detailed HTTP information +during the ``connect`` message of a WebSocket connection (you can read more +about what you get when in :doc:`message-standards`) - this means we're not +wasting bandwidth sending the same information over the wire needlessly. + +This also means we'll have to grab the user in the connection handler and then +store it in the session; thankfully, Channels ships with both a ``channel_session_user`` +decorator that works like the ``http_session_user`` decorator you saw above but +loads the user from the *channel* session rather than the *HTTP* session, +and a function called ``transfer_user`` which replicates a user from one session +to another. + +Bringing that all together, let's make a chat server one where users can only +chat to people with the same first letter of their username:: + + from channels import Channel, Group + from channels.decorators import channel_session + from channels.auth import http_session_user, channel_session_user, transfer_user + + # Connected to websocket.connect + @channel_session + @http_session_user + def ws_add(message): + # Copy user from HTTP to channel session + transfer_user(message.http_session, message.channel_session) + # Add them to the right group + Group("chat-%s" % message.user.username[0]).add(message.reply_channel) + + # Connected to websocket.keepalive + @channel_session_user + def ws_keepalive(message): + # Keep them in the right group + Group("chat-%s" % message.user.username[0]).add(message.reply_channel) + + # Connected to websocket.receive + @channel_session_user + def ws_message(message): + Group("chat-%s" % message.user.username[0]).send(message.content) + + # Connected to websocket.disconnect + @channel_session_user + def ws_disconnect(message): + Group("chat-%s" % message.user.username[0]).discard(message.reply_channel) + +Now, when we connect to the WebSocket we'll have to remember to provide the +Django session ID as part of the URL, like this:: + + socket = new WebSocket("ws://127.0.0.1:9000/?session_key=abcdefg"); + +You can get the current session key in a template with ``{{ request.session.session_key }}``. +Note that Channels can't work with signed cookie sessions - since only HTTP +responses can set cookies, it needs a backend it can write to separately to +store state. + Models ------ @@ -434,6 +453,12 @@ command run via ``cron``. If we wanted to write a bot, too, we could put its listening logic inside the ``chat-messages`` consumer, as every message would pass through it. +Linearization +------------- + +TODO + + Next Steps ---------- From d563f7748c08c0a05f4d176fcd34617f9af80176 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Sep 2015 13:20:58 -0500 Subject: [PATCH 11/17] Linearize docs --- docs/getting-started.rst | 62 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/docs/getting-started.rst b/docs/getting-started.rst index bedf07a..2287f24 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -456,7 +456,67 @@ pass through it. Linearization ------------- -TODO +There's one final concept we want to introduce you to before you go on to build +sites with Channels - linearizing consumers. + +Because Channels is a distributed system that can have many workers, by default +it's entirely feasible for a WebSocket interface server to send out a ``connect`` +and a ``receive`` message close enough together that a second worker will pick +up and start processing the ``receive`` message before the first worker has +finished processing the ``connect`` worker. + +This is particularly annoying if you're storing things in the session in the +``connect`` consumer and trying to get them in the ``receive`` consumer - because +the ``connect`` consumer hasn't exited, its session hasn't saved. You'd get the +same effect if someone tried to request a view before the login view had finished +processing, but there you're not expecting that page to run after the login, +whereas you'd naturally expect ``receive`` to run after ``connect``. + +But, of course, Channels has a solution - the ``linearize`` decorator. Any +handler decorated with this will use locking to ensure it does not run at the +same time as any other view with ``linearize`` **on messages with the same reply channel**. +That means your site will happily mutitask with lots of different people's messages, +but if two happen to try to run at the same time for the same client, they'll +be deconflicted. + +There's a small cost to using ``linearize``, which is why it's an optional +decorator, but generally you'll want to use it for most session-based WebSocket +and other "continuous protocol" things. Here's an example, improving our +first-letter-of-username chat from earlier:: + + from channels import Channel, Group + from channels.decorators import channel_session, linearize + from channels.auth import http_session_user, channel_session_user, transfer_user + + # Connected to websocket.connect + @linearize + @channel_session + @http_session_user + def ws_add(message): + # Copy user from HTTP to channel session + transfer_user(message.http_session, message.channel_session) + # Add them to the right group + Group("chat-%s" % message.user.username[0]).add(message.reply_channel) + + # Connected to websocket.keepalive + # We don't linearize as we know this will happen a decent time after add + @channel_session_user + def ws_keepalive(message): + # Keep them in the right group + Group("chat-%s" % message.user.username[0]).add(message.reply_channel) + + # Connected to websocket.receive + @linearize + @channel_session_user + def ws_message(message): + Group("chat-%s" % message.user.username[0]).send(message.content) + + # Connected to websocket.disconnect + # We don't linearize as even if this gets an empty session, the group + # will auto-discard after the expiry anyway. + @channel_session_user + def ws_disconnect(message): + Group("chat-%s" % message.user.username[0]).discard(message.reply_channel) Next Steps From 414a4ab460893d85f82371f3af62bbeb44578e28 Mon Sep 17 00:00:00 2001 From: Faris Chebib Date: Thu, 10 Sep 2015 12:26:08 -0600 Subject: [PATCH 12/17] updated print statements --- channels/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/channels/worker.py b/channels/worker.py index 50de9bd..b54dc44 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -35,5 +35,5 @@ class Worker(object): except Message.Requeue: self.channel_backend.send(channel, content) except: - print "Error processing message with consumer %s:" % name_that_thing(consumer) + print("Error processing message with consumer {}:".format(name_that_thing(consumer))) traceback.print_exc() From f69ad33747515eb224204eee4e142bf41f371514 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Sep 2015 14:41:38 -0500 Subject: [PATCH 13/17] Fixed #9: Add asyncio websocket server --- channels/interfaces/websocket_asyncio.py | 155 ++++++++++++++++++++ channels/management/commands/runwsserver.py | 19 ++- 2 files changed, 169 insertions(+), 5 deletions(-) create mode 100644 channels/interfaces/websocket_asyncio.py diff --git a/channels/interfaces/websocket_asyncio.py b/channels/interfaces/websocket_asyncio.py new file mode 100644 index 0000000..5dc3fe3 --- /dev/null +++ b/channels/interfaces/websocket_asyncio.py @@ -0,0 +1,155 @@ +import asyncio +import django +import time + +from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory +from collections import deque + +from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND + + +class InterfaceProtocol(WebSocketServerProtocol): + """ + Protocol which supports WebSockets and forwards incoming messages to + the websocket channels. + """ + + def onConnect(self, request): + self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] + self.request_info = { + "path": request.path, + "get": request.params, + } + + def onOpen(self): + # Make sending channel + self.reply_channel = Channel.new_name("!websocket.send") + self.request_info["reply_channel"] = self.reply_channel + self.last_keepalive = time.time() + self.factory.protocols[self.reply_channel] = self + # Send news that this channel is open + Channel("websocket.connect").send(self.request_info) + + def onMessage(self, payload, isBinary): + if isBinary: + Channel("websocket.receive").send(dict( + self.request_info, + content = payload, + binary = True, + )) + else: + Channel("websocket.receive").send(dict( + self.request_info, + content = payload.decode("utf8"), + binary = False, + )) + + def serverSend(self, content, binary=False, **kwargs): + """ + Server-side channel message to send a message. + """ + if binary: + self.sendMessage(content, binary) + else: + self.sendMessage(content.encode("utf8"), binary) + + def serverClose(self): + """ + Server-side channel message to close the socket + """ + self.sendClose() + + def onClose(self, wasClean, code, reason): + if hasattr(self, "reply_channel"): + del self.factory.protocols[self.reply_channel] + Channel("websocket.disconnect").send(self.request_info) + + def sendKeepalive(self): + """ + Sends a keepalive packet on the keepalive channel. + """ + Channel("websocket.keepalive").send(self.request_info) + self.last_keepalive = time.time() + + +class InterfaceFactory(WebSocketServerFactory): + """ + Factory which keeps track of its open protocols' receive channels + and can dispatch to them. + """ + + # TODO: Clean up dead protocols if needed? + + def __init__(self, *args, **kwargs): + super(InterfaceFactory, self).__init__(*args, **kwargs) + self.protocols = {} + + def reply_channels(self): + return self.protocols.keys() + + def dispatch_send(self, channel, message): + if message.get("close", False): + self.protocols[channel].serverClose() + else: + self.protocols[channel].serverSend(**message) + + +class WebsocketAsyncioInterface(object): + """ + Easy API to run a WebSocket interface server using Twisted. + Integrates the channel backend by running it in a separate thread, using + the always-compatible polling style. + """ + + def __init__(self, channel_backend, port=9000): + self.channel_backend = channel_backend + self.port = port + + def run(self): + self.factory = InterfaceFactory("ws://0.0.0.0:%i" % self.port, debug=False) + self.factory.protocol = InterfaceProtocol + self.loop = asyncio.get_event_loop() + coro = self.loop.create_server(self.factory, '0.0.0.0', 9000) + server = self.loop.run_until_complete(coro) + self.loop.run_in_executor(None, self.backend_reader) + self.loop.call_later(1, self.keepalive_sender) + try: + self.loop.run_forever() + except KeyboardInterrupt: + pass + finally: + server.close() + self.loop.close() + + def backend_reader(self): + """ + Run in a separate thread; reads messages from the backend. + """ + while True: + channels = self.factory.reply_channels() + # Quit if reactor is stopping + if not self.loop.is_running(): + return + # Don't do anything if there's no channels to listen on + if channels: + channel, message = self.channel_backend.receive_many(channels) + else: + time.sleep(0.1) + continue + # Wait around if there's nothing received + if channel is None: + time.sleep(0.05) + continue + # Deal with the message + self.factory.dispatch_send(channel, message) + + def keepalive_sender(self): + """ + Sends keepalive messages for open WebSockets every + (channel_backend expiry / 2) seconds. + """ + expiry_window = int(self.channel_backend.expiry / 2) + for protocol in self.factory.protocols.values(): + if time.time() - protocol.last_keepalive > expiry_window: + protocol.sendKeepalive() + self.loop.call_later(1, self.keepalive_sender) diff --git a/channels/management/commands/runwsserver.py b/channels/management/commands/runwsserver.py index 7b4d296..b1525d9 100644 --- a/channels/management/commands/runwsserver.py +++ b/channels/management/commands/runwsserver.py @@ -1,7 +1,6 @@ import time from django.core.management import BaseCommand, CommandError from channels import channel_backends, DEFAULT_CHANNEL_BACKEND -from channels.interfaces.websocket_twisted import WebsocketTwistedInterface class Command(BaseCommand): @@ -20,7 +19,17 @@ class Command(BaseCommand): ) # Run the interface port = options.get("port", None) or 9000 - self.stdout.write("Running Twisted/Autobahn WebSocket interface server") - self.stdout.write(" Channel backend: %s" % channel_backend) - self.stdout.write(" Listening on: ws://0.0.0.0:%i" % port) - WebsocketTwistedInterface(channel_backend=channel_backend, port=port).run() + try: + import asyncio + except ImportError: + from channels.interfaces.websocket_twisted import WebsocketTwistedInterface + self.stdout.write("Running Twisted/Autobahn WebSocket interface server") + self.stdout.write(" Channel backend: %s" % channel_backend) + self.stdout.write(" Listening on: ws://0.0.0.0:%i" % port) + WebsocketTwistedInterface(channel_backend=channel_backend, port=port).run() + else: + from channels.interfaces.websocket_asyncio import WebsocketAsyncioInterface + self.stdout.write("Running asyncio/Autobahn WebSocket interface server") + self.stdout.write(" Channel backend: %s" % channel_backend) + self.stdout.write(" Listening on: ws://0.0.0.0:%i" % port) + WebsocketAsyncioInterface(channel_backend=channel_backend, port=port).run() From 73d50a46952a8b13614e4cea6dc14ca63b6b4765 Mon Sep 17 00:00:00 2001 From: Faris Chebib Date: Thu, 10 Sep 2015 14:46:24 -0600 Subject: [PATCH 14/17] updated typo in docs example --- docs/getting-started.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/getting-started.rst b/docs/getting-started.rst index 5b1d88e..f2d4a36 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -138,7 +138,7 @@ Now, that's taken care of adding and removing WebSocket send channels for the we're not going to store a history of messages or anything and just replay any message sent in to all connected clients. Here's all the code:: - from channels import Channel, Group + from channels import Group # Connected to websocket.connect and websocket.keepalive def ws_add(message): @@ -261,7 +261,7 @@ just like a normal Django session. Let's use it now to build a chat server that expects you to pass a chatroom name in the path of your WebSocket request (we'll ignore auth for now - that's next):: - from channels import Channel + from channels import Group from channels.decorators import channel_session # Connected to websocket.connect @@ -281,7 +281,7 @@ name in the path of your WebSocket request (we'll ignore auth for now - that's n # Connected to websocket.receive @channel_session def ws_message(message): - Group("chat-%s" % message.channel_session['room']).send(content) + Group("chat-%s" % message.channel_session['room']).send(message.content) # Connected to websocket.disconnect @channel_session From bd1553556d38f61d88d8cbbbde0bc50f4afc45b8 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Sep 2015 16:00:31 -0500 Subject: [PATCH 15/17] Refactor websocket servers a bit to share logic --- channels/interfaces/websocket_asyncio.py | 96 ++------------------ channels/interfaces/websocket_autobahn.py | 101 ++++++++++++++++++++++ channels/interfaces/websocket_twisted.py | 94 +------------------- 3 files changed, 109 insertions(+), 182 deletions(-) create mode 100644 channels/interfaces/websocket_autobahn.py diff --git a/channels/interfaces/websocket_asyncio.py b/channels/interfaces/websocket_asyncio.py index 5dc3fe3..55c9ea3 100644 --- a/channels/interfaces/websocket_asyncio.py +++ b/channels/interfaces/websocket_asyncio.py @@ -1,97 +1,9 @@ import asyncio -import django import time from autobahn.asyncio.websocket import WebSocketServerProtocol, WebSocketServerFactory -from collections import deque -from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND - - -class InterfaceProtocol(WebSocketServerProtocol): - """ - Protocol which supports WebSockets and forwards incoming messages to - the websocket channels. - """ - - def onConnect(self, request): - self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] - self.request_info = { - "path": request.path, - "get": request.params, - } - - def onOpen(self): - # Make sending channel - self.reply_channel = Channel.new_name("!websocket.send") - self.request_info["reply_channel"] = self.reply_channel - self.last_keepalive = time.time() - self.factory.protocols[self.reply_channel] = self - # Send news that this channel is open - Channel("websocket.connect").send(self.request_info) - - def onMessage(self, payload, isBinary): - if isBinary: - Channel("websocket.receive").send(dict( - self.request_info, - content = payload, - binary = True, - )) - else: - Channel("websocket.receive").send(dict( - self.request_info, - content = payload.decode("utf8"), - binary = False, - )) - - def serverSend(self, content, binary=False, **kwargs): - """ - Server-side channel message to send a message. - """ - if binary: - self.sendMessage(content, binary) - else: - self.sendMessage(content.encode("utf8"), binary) - - def serverClose(self): - """ - Server-side channel message to close the socket - """ - self.sendClose() - - def onClose(self, wasClean, code, reason): - if hasattr(self, "reply_channel"): - del self.factory.protocols[self.reply_channel] - Channel("websocket.disconnect").send(self.request_info) - - def sendKeepalive(self): - """ - Sends a keepalive packet on the keepalive channel. - """ - Channel("websocket.keepalive").send(self.request_info) - self.last_keepalive = time.time() - - -class InterfaceFactory(WebSocketServerFactory): - """ - Factory which keeps track of its open protocols' receive channels - and can dispatch to them. - """ - - # TODO: Clean up dead protocols if needed? - - def __init__(self, *args, **kwargs): - super(InterfaceFactory, self).__init__(*args, **kwargs) - self.protocols = {} - - def reply_channels(self): - return self.protocols.keys() - - def dispatch_send(self, channel, message): - if message.get("close", False): - self.protocols[channel].serverClose() - else: - self.protocols[channel].serverSend(**message) +from .websocket_autobahn import get_protocol, get_factory class WebsocketAsyncioInterface(object): @@ -106,8 +18,8 @@ class WebsocketAsyncioInterface(object): self.port = port def run(self): - self.factory = InterfaceFactory("ws://0.0.0.0:%i" % self.port, debug=False) - self.factory.protocol = InterfaceProtocol + self.factory = get_factory(WebSocketServerFactory)("ws://0.0.0.0:%i" % self.port, debug=False) + self.factory.protocol = get_protocol(WebSocketServerProtocol) self.loop = asyncio.get_event_loop() coro = self.loop.create_server(self.factory, '0.0.0.0', 9000) server = self.loop.run_until_complete(coro) @@ -125,6 +37,8 @@ class WebsocketAsyncioInterface(object): """ Run in a separate thread; reads messages from the backend. """ + # Wait for main loop to start + time.sleep(0.5) while True: channels = self.factory.reply_channels() # Quit if reactor is stopping diff --git a/channels/interfaces/websocket_autobahn.py b/channels/interfaces/websocket_autobahn.py new file mode 100644 index 0000000..dae31e0 --- /dev/null +++ b/channels/interfaces/websocket_autobahn.py @@ -0,0 +1,101 @@ +import time + +from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND + + +def get_protocol(base): + + class InterfaceProtocol(base): + """ + Protocol which supports WebSockets and forwards incoming messages to + the websocket channels. + """ + + def onConnect(self, request): + self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] + self.request_info = { + "path": request.path, + "get": request.params, + } + + def onOpen(self): + # Make sending channel + self.reply_channel = Channel.new_name("!websocket.send") + self.request_info["reply_channel"] = self.reply_channel + self.last_keepalive = time.time() + self.factory.protocols[self.reply_channel] = self + # Send news that this channel is open + Channel("websocket.connect").send(self.request_info) + + def onMessage(self, payload, isBinary): + if isBinary: + Channel("websocket.receive").send({ + "reply_channel": self.reply_channel, + "content": payload, + "binary": True, + }) + else: + Channel("websocket.receive").send({ + "reply_channel": self.reply_channel, + "content": payload.decode("utf8"), + "binary": False, + }) + + def serverSend(self, content, binary=False, **kwargs): + """ + Server-side channel message to send a message. + """ + if binary: + self.sendMessage(content, binary) + else: + self.sendMessage(content.encode("utf8"), binary) + + def serverClose(self): + """ + Server-side channel message to close the socket + """ + self.sendClose() + + def onClose(self, wasClean, code, reason): + if hasattr(self, "reply_channel"): + del self.factory.protocols[self.reply_channel] + Channel("websocket.disconnect").send({ + "reply_channel": self.reply_channel, + }) + + def sendKeepalive(self): + """ + Sends a keepalive packet on the keepalive channel. + """ + Channel("websocket.keepalive").send({ + "reply_channel": self.reply_channel, + }) + self.last_keepalive = time.time() + + return InterfaceProtocol + + +def get_factory(base): + + class InterfaceFactory(base): + """ + Factory which keeps track of its open protocols' receive channels + and can dispatch to them. + """ + + # TODO: Clean up dead protocols if needed? + + def __init__(self, *args, **kwargs): + super(InterfaceFactory, self).__init__(*args, **kwargs) + self.protocols = {} + + def reply_channels(self): + return self.protocols.keys() + + def dispatch_send(self, channel, message): + if message.get("close", False): + self.protocols[channel].serverClose() + else: + self.protocols[channel].serverSend(**message) + + return InterfaceFactory diff --git a/channels/interfaces/websocket_twisted.py b/channels/interfaces/websocket_twisted.py index ff5e947..e6055f1 100644 --- a/channels/interfaces/websocket_twisted.py +++ b/channels/interfaces/websocket_twisted.py @@ -1,97 +1,9 @@ -import django import time from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory -from collections import deque from twisted.internet import reactor -from channels import Channel, channel_backends, DEFAULT_CHANNEL_BACKEND - - -class InterfaceProtocol(WebSocketServerProtocol): - """ - Protocol which supports WebSockets and forwards incoming messages to - the websocket channels. - """ - - def onConnect(self, request): - self.channel_backend = channel_backends[DEFAULT_CHANNEL_BACKEND] - self.request_info = { - "path": request.path, - "get": request.params, - } - - def onOpen(self): - # Make sending channel - self.reply_channel = Channel.new_name("!websocket.send") - self.request_info["reply_channel"] = self.reply_channel - self.last_keepalive = time.time() - self.factory.protocols[self.reply_channel] = self - # Send news that this channel is open - Channel("websocket.connect").send(self.request_info) - - def onMessage(self, payload, isBinary): - if isBinary: - Channel("websocket.receive").send(dict( - self.request_info, - content = payload, - binary = True, - )) - else: - Channel("websocket.receive").send(dict( - self.request_info, - content = payload.decode("utf8"), - binary = False, - )) - - def serverSend(self, content, binary=False, **kwargs): - """ - Server-side channel message to send a message. - """ - if binary: - self.sendMessage(content, binary) - else: - self.sendMessage(content.encode("utf8"), binary) - - def serverClose(self): - """ - Server-side channel message to close the socket - """ - self.sendClose() - - def onClose(self, wasClean, code, reason): - if hasattr(self, "reply_channel"): - del self.factory.protocols[self.reply_channel] - Channel("websocket.disconnect").send(self.request_info) - - def sendKeepalive(self): - """ - Sends a keepalive packet on the keepalive channel. - """ - Channel("websocket.keepalive").send(self.request_info) - self.last_keepalive = time.time() - - -class InterfaceFactory(WebSocketServerFactory): - """ - Factory which keeps track of its open protocols' receive channels - and can dispatch to them. - """ - - # TODO: Clean up dead protocols if needed? - - def __init__(self, *args, **kwargs): - super(InterfaceFactory, self).__init__(*args, **kwargs) - self.protocols = {} - - def reply_channels(self): - return self.protocols.keys() - - def dispatch_send(self, channel, message): - if message.get("close", False): - self.protocols[channel].serverClose() - else: - self.protocols[channel].serverSend(**message) +from .websocket_autobahn import get_protocol, get_factory class WebsocketTwistedInterface(object): @@ -106,8 +18,8 @@ class WebsocketTwistedInterface(object): self.port = port def run(self): - self.factory = InterfaceFactory("ws://0.0.0.0:%i" % self.port, debug=False) - self.factory.protocol = InterfaceProtocol + self.factory = get_factory(WebSocketServerFactory)("ws://0.0.0.0:%i" % self.port, debug=False) + self.factory.protocol = get_protocol(WebSocketServerProtocol) reactor.listenTCP(self.port, self.factory) reactor.callInThread(self.backend_reader) reactor.callLater(1, self.keepalive_sender) From 655213eff9f7d9643e264df6d3fff411434a2d73 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Sep 2015 16:00:39 -0500 Subject: [PATCH 16/17] 0.8 --- channels/__init__.py | 2 +- docs/index.rst | 1 + docs/releases/0.8.rst | 22 ++++++++++++++++++++++ docs/releases/index.rst | 7 +++++++ setup.py | 2 +- 5 files changed, 32 insertions(+), 2 deletions(-) create mode 100644 docs/releases/0.8.rst create mode 100644 docs/releases/index.rst diff --git a/channels/__init__.py b/channels/__init__.py index 2ee5420..8fd164c 100644 --- a/channels/__init__.py +++ b/channels/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.1.1" +__version__ = "0.8" # Load backends, using settings if available (else falling back to a default) DEFAULT_CHANNEL_BACKEND = "default" diff --git a/docs/index.rst b/docs/index.rst index a1c8d5f..d246cee 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -31,3 +31,4 @@ Contents: scaling backends faqs + releases/index diff --git a/docs/releases/0.8.rst b/docs/releases/0.8.rst new file mode 100644 index 0000000..ddeac26 --- /dev/null +++ b/docs/releases/0.8.rst @@ -0,0 +1,22 @@ +0.8 (2015-09-10) +---------------- + +This release reworks a few of the core concepts to make the channel layer +more efficient and user friendly: + +* Channel names now do not start with ``django``, and are instead just ``http.request``, etc. + +* HTTP headers/GET/etc are only sent with ``websocket.connect`` rather than all websocket requests, + to save a lot of bandwidth in the channel layer. + +* The session/user decorators were renamed, and a ``@channel_session_user`` and ``transfer_user`` set of functions + added to allow moving the user details from the HTTP session to the channel session in the ``connect`` consumer. + +* A ``@linearize`` decorator was added to help ensure a ``connect``/``receive`` pair are not executed + simultanously on two different workers. + +* Channel backends gained locking mechanisms to support the ``linearize`` feature. + +* ``runwsserver`` will use asyncio rather than Twisted if it's available. + +* Message formats have been made a bit more consistent. diff --git a/docs/releases/index.rst b/docs/releases/index.rst new file mode 100644 index 0000000..231771f --- /dev/null +++ b/docs/releases/index.rst @@ -0,0 +1,7 @@ +Release Notes +------------- + +.. toctree:: + :maxdepth: 1 + + 0.8 diff --git a/setup.py b/setup.py index 845e3c1..d8c5394 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import find_packages, setup setup( name='channels', - version="0.7", + version="0.8", url='http://github.com/andrewgodwin/django-channels', author='Andrew Godwin', author_email='andrew@aeracode.org', From 4a8bae272b5a8285104daef96b7f21b11129c1f0 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Thu, 10 Sep 2015 16:34:28 -0500 Subject: [PATCH 17/17] Update docs to recommend doing routing not in settings --- docs/concepts.rst | 4 ++-- docs/getting-started.rst | 48 +++++++++++++++++----------------------- 2 files changed, 22 insertions(+), 30 deletions(-) diff --git a/docs/concepts.rst b/docs/concepts.rst index cb5febb..5bbf21b 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -65,9 +65,9 @@ you can write a function to consume a channel, like so:: def my_consumer(message): pass -And then assign a channel to it like this in the channel backend settings:: +And then assign a channel to it like this in the channel routing:: - "ROUTING": { + channel_routing = { "some-channel": "myapp.consumers.my_consumer", } diff --git a/docs/getting-started.rst b/docs/getting-started.rst index 5b1d88e..6158fdf 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -44,21 +44,25 @@ For now, we want to override the *channel routing* so that, rather than going to the URL resolver and our normal view stack, all HTTP requests go to our custom consumer we wrote above. Here's what that looks like:: + # In settings.py CHANNEL_BACKENDS = { "default": { "BACKEND": "channels.backends.database.DatabaseChannelBackend", - "ROUTING": { - "http.request": "myproject.myapp.consumers.http_consumer", - }, + "ROUTING": "myproject.routing.channel_routing", }, } + # In routing.py + channel_routing = { + "http.request": "myproject.myapp.consumers.http_consumer", + } + As you can see, this is a little like Django's ``DATABASES`` setting; there are named channel backends, with a default one called ``default``. Each backend needs a class specified which powers it - we'll come to the options there later - -and a routing scheme, which can either be defined directly as a dict or as -a string pointing to a dict in another file (if you'd rather keep it outside -settings). +and a routing scheme, which points to a dict containing the routing settings. +It's recommended you call this ``routing.py`` and put it alongside ``urls.py`` +in your project. If you start up ``python manage.py runserver`` and go to ``http://localhost:8000``, you'll see that, rather than a default Django page, @@ -78,13 +82,8 @@ serve HTTP requests from now on - and make this WebSocket consumer instead:: Hook it up to the ``websocket.connect`` channel like this:: - CHANNEL_BACKENDS = { - "default": { - "BACKEND": "channels.backends.database.DatabaseChannelBackend", - "ROUTING": { - "websocket.connect": "myproject.myapp.consumers.ws_add", - }, - }, + channel_routing = { + "websocket.connect": "myproject.myapp.consumers.ws_add", } Now, let's look at what this is doing. It's tied to the @@ -116,12 +115,10 @@ group it's not in):: Of course, this is exactly the same code as the ``connect`` handler, so let's just route both channels to the same consumer:: - ... - "ROUTING": { + channel_routing = { "websocket.connect": "myproject.myapp.consumers.ws_add", "websocket.keepalive": "myproject.myapp.consumers.ws_add", - }, - ... + } And, even though channels will expire out, let's add an explicit ``disconnect`` handler to clean up as people disconnect (most channels will cleanly disconnect @@ -152,18 +149,13 @@ any message sent in to all connected clients. Here's all the code:: def ws_disconnect(message): Group("chat").discard(message.reply_channel) -And what our routing should look like in ``settings.py``:: +And what our routing should look like in ``routing.py``:: - CHANNEL_BACKENDS = { - "default": { - "BACKEND": "channels.backends.database.DatabaseChannelBackend", - "ROUTING": { - "websocket.connect": "myproject.myapp.consumers.ws_add", - "websocket.keepalive": "myproject.myapp.consumers.ws_add", - "websocket.receive": "myproject.myapp.consumers.ws_message", - "websocket.disconnect": "myproject.myapp.consumers.ws_disconnect", - }, - }, + channel_routing = { + "websocket.connect": "myproject.myapp.consumers.ws_add", + "websocket.keepalive": "myproject.myapp.consumers.ws_add", + "websocket.receive": "myproject.myapp.consumers.ws_message", + "websocket.disconnect": "myproject.myapp.consumers.ws_disconnect", } With all that code in your ``consumers.py`` file, you now have a working