diff --git a/channels/decorators.py b/channels/decorators.py deleted file mode 100644 index 279a7bd..0000000 --- a/channels/decorators.py +++ /dev/null @@ -1,30 +0,0 @@ -import functools - - -def linearize(func): - """ - Makes sure the contained consumer does not run at the same time other - consumers are running on messages with the same reply_channel. - - Required if you don't want weird things like a second consumer starting - up before the first has exited and saved its session. Doesn't guarantee - ordering, just linearity. - """ - raise NotImplementedError("Not yet reimplemented") - - @functools.wraps(func) - def inner(message, *args, **kwargs): - # Make sure there's a reply channel - if not message.reply_channel: - raise ValueError( - "No reply_channel in message; @linearize can only be used on messages containing it." - ) - # TODO: Get lock here - pass - # OK, keep going - try: - return func(message, *args, **kwargs) - finally: - # TODO: Release lock here - pass - return inner diff --git a/channels/sessions.py b/channels/sessions.py index 598b79e..7052f01 100644 --- a/channels/sessions.py +++ b/channels/sessions.py @@ -3,10 +3,26 @@ import hashlib from importlib import import_module from django.conf import settings +from django.contrib.sessions.backends.base import CreateError +from .exceptions import ConsumeLater from .handler import AsgiRequest +def session_for_reply_channel(reply_channel): + """ + Returns a session object tied to the reply_channel unicode string + passed in as an argument. + """ + # We hash the whole reply channel name and add a prefix, to fit inside 32B + reply_name = reply_channel + hashed = hashlib.md5(reply_name.encode("utf8")).hexdigest() + session_key = "chn" + hashed[:29] + # Make a session storage + session_engine = import_module(settings.SESSION_ENGINE) + return session_engine.SessionStore(session_key=session_key) + + def channel_session(func): """ Provides a session-like object called "channel_session" to consumers @@ -17,30 +33,24 @@ def channel_session(func): """ @functools.wraps(func) def inner(message, *args, **kwargs): + # Make sure there's NOT a channel_session already + if hasattr(message, "channel_session"): + return func(message, *args, **kwargs) # Make sure there's a reply_channel if not message.reply_channel: raise ValueError( "No reply_channel sent to consumer; @channel_session " + "can only be used on messages containing it." ) - - # Make sure there's NOT a channel_session already - if hasattr(message, "channel_session"): - raise ValueError("channel_session decorator wrapped inside another channel_session decorator") - - # Turn the reply_channel into a valid session key length thing. - # We take the last 24 bytes verbatim, as these are the random section, - # and then hash the remaining ones onto the start, and add a prefix - reply_name = message.reply_channel.name - hashed = hashlib.md5(reply_name[:-24].encode()).hexdigest()[:8] - session_key = "skt" + hashed + reply_name[-24:] - # Make a session storage - session_engine = import_module(settings.SESSION_ENGINE) - session = session_engine.SessionStore(session_key=session_key) # If the session does not already exist, save to force our # session key to be valid. + session = session_for_reply_channel(message.reply_channel.name) if not session.exists(session.session_key): - session.save(must_create=True) + try: + session.save(must_create=True) + except CreateError: + # Session wasn't unique, so another consumer is doing the same thing + raise ConsumeLater() message.channel_session = session # Run the consumer try: @@ -52,6 +62,47 @@ def channel_session(func): return inner +def enforce_ordering(func=None, slight=False): + """ + Enforces either slight (order=0 comes first, everything else isn't ordered) + or strict (all messages exactly ordered) ordering against a reply_channel. + + Uses sessions to track ordering. + + You cannot mix slight ordering and strict ordering on a channel; slight + ordering does not write to the session after the first message to improve + performance. + """ + def decorator(func): + @channel_session + @functools.wraps(func) + def inner(message, *args, **kwargs): + # Make sure there's an order + if "order" not in message.content: + raise ValueError( + "No `order` value in message; @enforce_ordering " + + "can only be used on messages containing it." + ) + order = int(message.content['order']) + # See what the current next order should be + next_order = message.channel_session.get("__channels_next_order", 0) + if order == next_order or (slight and next_order > 0): + # Message is in right order. Maybe persist next one? + if order == 0 or not slight: + message.channel_session["__channels_next_order"] = order + 1 + # Run consumer + return func(message, *args, **kwargs) + else: + # Bad ordering + print("Bad ordering detected: next %s, us %s, %s" % (next_order, order, message.reply_channel)) + raise ConsumeLater() + return inner + if func is not None: + return decorator(func) + else: + return decorator + + def http_session(func): """ Wraps a HTTP or WebSocket connect consumer (or any consumer of messages @@ -69,6 +120,9 @@ def http_session(func): """ @functools.wraps(func) def inner(message, *args, **kwargs): + # Make sure there's NOT a http_session already + if hasattr(message, "http_session"): + return func(message, *args, **kwargs) try: # We want to parse the WebSocket (or similar HTTP-lite) message # to get cookies and GET, but we need to add in a few things that @@ -78,9 +132,6 @@ def http_session(func): request = AsgiRequest(message) except Exception as e: raise ValueError("Cannot parse HTTP message - are you sure this is a HTTP consumer? %s" % e) - # Make sure there's NOT a http_session already - if hasattr(message, "http_session"): - raise ValueError("http_session decorator wrapped inside another http_session decorator") # Make sure there's a session key session_key = request.GET.get("session_key", None) if session_key is None: diff --git a/channels/worker.py b/channels/worker.py index 01e3891..927b980 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import logging import time +from .exceptions import ConsumeLater from .message import Message from .utils import name_that_thing @@ -43,5 +44,7 @@ class Worker(object): self.callback(channel, message) try: consumer(message) + except ConsumeLater: + self.channel_layer.send(channel, content) except: logger.exception("Error processing message with consumer %s:", name_that_thing(consumer)) diff --git a/docs/asgi.rst b/docs/asgi.rst index c68611e..61c0cfa 100644 --- a/docs/asgi.rst +++ b/docs/asgi.rst @@ -381,7 +381,12 @@ use if the key is missing). Keys are unicode strings. The one common key across all protocols is ``reply_channel``, a way to indicate the client-specific channel to send responses to. Protocols are generally -encouraged to have one message type and one reply channel to ensure ordering. +encouraged to have one message type and one reply channel type to ensure ordering. + +A ``reply_channel`` should be unique per connection. If the protocol in question +can have any server service a response - e.g. a theoretical SMS protocol - it +should not have ``reply_channel`` attributes on messages, but instead a separate +top-level outgoing channel. Messages are specified here along with the channel names they are expected on; if a channel name can vary, such as with reply channels, the varying @@ -390,7 +395,7 @@ the format the ``new_channel`` callable takes. There is no label on message types to say what they are; their type is implicit in the channel name they are received on. Two types that are sent on the same -channel, such as HTTP responses and server pushes, are distinguished apart +channel, such as HTTP responses and response chunks, are distinguished apart by their required fields. @@ -630,6 +635,8 @@ Keys: for this server as a unicode string, and ``port`` is the integer listening port. Optional, defaults to ``None``. +* ``order``: The integer value ``0``. + Receive ''''''' @@ -647,6 +654,9 @@ Keys: * ``text``: Unicode string of frame content, if it was text mode, or ``None``. +* ``order``: Order of this frame in the WebSocket stream, starting + at 1 (``connect`` is 0). + One of ``bytes`` or ``text`` must be non-``None``. @@ -665,6 +675,9 @@ Keys: format ``websocket.send.?``. Cannot be used to send at this point; provided as a way to identify the connection only. +* ``order``: Order of the disconnection relative to the incoming frames' + ``order`` values in ``websocket.receive``. + Send/Close '''''''''' @@ -736,6 +749,33 @@ Keys: * ``data``: Byte string of UDP datagram payload. +Protocol Format Guidelines +-------------------------- + +Message formats for protocols should follow these rules, unless +a very good performance or implementation reason is present: + +* ``reply_channel`` should be unique per logical connection, and not per + logical client. + +* If the protocol has server-side state, entirely encapsulate that state in + the protocol server; do not require the message consumers to use an external + state store. + +* If the protocol has low-level negotiation, keepalive or other features, + handle these within the protocol server and don't expose them in ASGI + messages. + +* If the protocol has guaranteed ordering, ASGI messages should include an + ``order`` field (0-indexed) that preserves the ordering as received by the + protocol server (or as sent by the client, if available). This ordering should + span all message types emitted by the client - for example, a connect message + might have order ``0``, and the first two frames order ``1`` and ``2``. + +* If the protocol is datagram-based, one datagram should equal one ASGI message + (unless size is an issue) + + Approximate Global Ordering --------------------------- diff --git a/docs/getting-started.rst b/docs/getting-started.rst index 029666e..010b8fc 100644 --- a/docs/getting-started.rst +++ b/docs/getting-started.rst @@ -259,7 +259,7 @@ name in the path of your WebSocket request (we'll ignore auth for now - that's n # In consumers.py from channels import Group - from channels.decorators import channel_session + from channels.sessions import channel_session # Connected to websocket.connect @channel_session @@ -342,7 +342,7 @@ chat to people with the same first letter of their username:: # In consumers.py from channels import Channel, Group - from channels.decorators import channel_session + from channels.sessions import channel_session from channels.auth import http_session_user, channel_session_user, transfer_user # Connected to websocket.connect @@ -401,7 +401,7 @@ have a ChatMessage model with ``message`` and ``room`` fields:: # In consumers.py from channels import Channel - from channels.decorators import channel_session + from channels.sessions import channel_session from .models import ChatMessage # Connected to chat-messages @@ -445,14 +445,16 @@ command run via ``cron``. If we wanted to write a bot, too, we could put its listening logic inside the ``chat-messages`` consumer, as every message would pass through it. -Linearization -------------- + +Enforcing Ordering +------------------ There's one final concept we want to introduce you to before you go on to build -sites with Channels - linearizing consumers. +sites with Channels - consmer ordering Because Channels is a distributed system that can have many workers, by default -it's entirely feasible for a WebSocket interface server to send out a ``connect`` +it just processes messages in the order the workers get them off the queue. +It's entirely feasible for a WebSocket interface server to send out a ``connect`` and a ``receive`` message close enough together that a second worker will pick up and start processing the ``receive`` message before the first worker has finished processing the ``connect`` worker. @@ -464,53 +466,65 @@ same effect if someone tried to request a view before the login view had finishe processing, but there you're not expecting that page to run after the login, whereas you'd naturally expect ``receive`` to run after ``connect``. -But, of course, Channels has a solution - the ``linearize`` decorator. Any -handler decorated with this will use locking to ensure it does not run at the -same time as any other view with ``linearize`` **on messages with the same reply channel**. -That means your site will happily multitask with lots of different people's messages, -but if two happen to try to run at the same time for the same client, they'll -be deconflicted. +Channels has a solution - the ``enforce_ordering`` decorator. All WebSocket +messages contain an ``order`` key, and this decorator uses that to make sure that +messages are consumed in the right order, in one of two modes: -There's a small cost to using ``linearize``, which is why it's an optional -decorator, but generally you'll want to use it for most session-based WebSocket +* Slight ordering: Message 0 (``websocket.connect``) is done first, all others + are unordered + +* Strict ordering: All messages are consumed strictly in sequence + +The decorator uses ``channel_session`` to keep track of what numbered messages +have been processed, and if a worker tries to run a consumer on an out-of-order +message, it raises the ``ConsumeLater`` exception, which puts the message +back on the channel it came from and tells the worker to work on another message. + +There's a cost to using ``enforce_ordering``, which is why it's an optional +decorator, and the cost is much greater in *strict* mode than it is in +*slight* mode. Generally you'll want to use *slight* mode for most session-based WebSocket and other "continuous protocol" things. Here's an example, improving our first-letter-of-username chat from earlier:: # In consumers.py from channels import Channel, Group - from channels.decorators import channel_session, linearize - from channels.auth import http_session_user, channel_session_user, transfer_user + from channels.sessions import channel_session, enforce_ordering + from channels.auth import http_session_user, channel_session_user, channel_session_user_from_http # Connected to websocket.connect - @linearize - @channel_session - @http_session_user + @enforce_ordering(slight=True) + @channel_session_user_from_http def ws_add(message): - # Copy user from HTTP to channel session - transfer_user(message.http_session, message.channel_session) # Add them to the right group Group("chat-%s" % message.user.username[0]).add(message.reply_channel) - # Connected to websocket.keepalive - # We don't linearize as we know this will happen a decent time after add - @channel_session_user - def ws_keepalive(message): - # Keep them in the right group - Group("chat-%s" % message.user.username[0]).add(message.reply_channel) - # Connected to websocket.receive - @linearize + @enforce_ordering(slight=True) @channel_session_user def ws_message(message): Group("chat-%s" % message.user.username[0]).send(message.content) # Connected to websocket.disconnect - # We don't linearize as even if this gets an empty session, the group - # will auto-discard after the expiry anyway. + @enforce_ordering(slight=True) @channel_session_user def ws_disconnect(message): Group("chat-%s" % message.user.username[0]).discard(message.reply_channel) +Slight ordering does mean that it's possible for a ``disconnect`` message to +get processed before a ``receive`` message, but that's fine in this case; +the client is disconnecting anyway, they don't care about those pending messages. + +Strict ordering is the default as it's the most safe; to use it, just call +the decorator without arguments:: + + @enforce_ordering + def ws_message(message): + ... + +Generally, the performance (and safety) of your ordering is tied to your +session backend's performance. Make sure you choose session backend wisely +if you're going to rely heavily on ``enforce_ordering``. + Next Steps ---------- diff --git a/testproject/chtest/consumers.py b/testproject/chtest/consumers.py index ba7385c..96dbabb 100644 --- a/testproject/chtest/consumers.py +++ b/testproject/chtest/consumers.py @@ -1,4 +1,12 @@ +from channels.sessions import enforce_ordering + +#@enforce_ordering(slight=True) +def ws_connect(message): + pass + + +#@enforce_ordering(slight=True) def ws_message(message): "Echoes messages back to the client" message.reply_channel.send(message.content) diff --git a/testproject/testproject/urls.py b/testproject/testproject/urls.py index eeea7b6..42ae607 100644 --- a/testproject/testproject/urls.py +++ b/testproject/testproject/urls.py @@ -4,4 +4,5 @@ urlpatterns = [] channel_routing = { "websocket.receive": consumers.ws_message, + "websocket.connect": consumers.ws_connect, }