Replace linearize with the more useful enforce_ordering.

This commit is contained in:
Andrew Godwin 2016-02-24 00:40:38 +00:00
parent 38c6df8125
commit 69186ef7b7
7 changed files with 169 additions and 82 deletions

View File

@ -1,30 +0,0 @@
import functools
def linearize(func):
"""
Makes sure the contained consumer does not run at the same time other
consumers are running on messages with the same reply_channel.
Required if you don't want weird things like a second consumer starting
up before the first has exited and saved its session. Doesn't guarantee
ordering, just linearity.
"""
raise NotImplementedError("Not yet reimplemented")
@functools.wraps(func)
def inner(message, *args, **kwargs):
# Make sure there's a reply channel
if not message.reply_channel:
raise ValueError(
"No reply_channel in message; @linearize can only be used on messages containing it."
)
# TODO: Get lock here
pass
# OK, keep going
try:
return func(message, *args, **kwargs)
finally:
# TODO: Release lock here
pass
return inner

View File

@ -3,10 +3,26 @@ import hashlib
from importlib import import_module
from django.conf import settings
from django.contrib.sessions.backends.base import CreateError
from .exceptions import ConsumeLater
from .handler import AsgiRequest
def session_for_reply_channel(reply_channel):
"""
Returns a session object tied to the reply_channel unicode string
passed in as an argument.
"""
# We hash the whole reply channel name and add a prefix, to fit inside 32B
reply_name = reply_channel
hashed = hashlib.md5(reply_name.encode("utf8")).hexdigest()
session_key = "chn" + hashed[:29]
# Make a session storage
session_engine = import_module(settings.SESSION_ENGINE)
return session_engine.SessionStore(session_key=session_key)
def channel_session(func):
"""
Provides a session-like object called "channel_session" to consumers
@ -17,30 +33,24 @@ def channel_session(func):
"""
@functools.wraps(func)
def inner(message, *args, **kwargs):
# Make sure there's NOT a channel_session already
if hasattr(message, "channel_session"):
return func(message, *args, **kwargs)
# Make sure there's a reply_channel
if not message.reply_channel:
raise ValueError(
"No reply_channel sent to consumer; @channel_session " +
"can only be used on messages containing it."
)
# Make sure there's NOT a channel_session already
if hasattr(message, "channel_session"):
raise ValueError("channel_session decorator wrapped inside another channel_session decorator")
# Turn the reply_channel into a valid session key length thing.
# We take the last 24 bytes verbatim, as these are the random section,
# and then hash the remaining ones onto the start, and add a prefix
reply_name = message.reply_channel.name
hashed = hashlib.md5(reply_name[:-24].encode()).hexdigest()[:8]
session_key = "skt" + hashed + reply_name[-24:]
# Make a session storage
session_engine = import_module(settings.SESSION_ENGINE)
session = session_engine.SessionStore(session_key=session_key)
# If the session does not already exist, save to force our
# session key to be valid.
session = session_for_reply_channel(message.reply_channel.name)
if not session.exists(session.session_key):
session.save(must_create=True)
try:
session.save(must_create=True)
except CreateError:
# Session wasn't unique, so another consumer is doing the same thing
raise ConsumeLater()
message.channel_session = session
# Run the consumer
try:
@ -52,6 +62,47 @@ def channel_session(func):
return inner
def enforce_ordering(func=None, slight=False):
"""
Enforces either slight (order=0 comes first, everything else isn't ordered)
or strict (all messages exactly ordered) ordering against a reply_channel.
Uses sessions to track ordering.
You cannot mix slight ordering and strict ordering on a channel; slight
ordering does not write to the session after the first message to improve
performance.
"""
def decorator(func):
@channel_session
@functools.wraps(func)
def inner(message, *args, **kwargs):
# Make sure there's an order
if "order" not in message.content:
raise ValueError(
"No `order` value in message; @enforce_ordering " +
"can only be used on messages containing it."
)
order = int(message.content['order'])
# See what the current next order should be
next_order = message.channel_session.get("__channels_next_order", 0)
if order == next_order or (slight and next_order > 0):
# Message is in right order. Maybe persist next one?
if order == 0 or not slight:
message.channel_session["__channels_next_order"] = order + 1
# Run consumer
return func(message, *args, **kwargs)
else:
# Bad ordering
print("Bad ordering detected: next %s, us %s, %s" % (next_order, order, message.reply_channel))
raise ConsumeLater()
return inner
if func is not None:
return decorator(func)
else:
return decorator
def http_session(func):
"""
Wraps a HTTP or WebSocket connect consumer (or any consumer of messages
@ -69,6 +120,9 @@ def http_session(func):
"""
@functools.wraps(func)
def inner(message, *args, **kwargs):
# Make sure there's NOT a http_session already
if hasattr(message, "http_session"):
return func(message, *args, **kwargs)
try:
# We want to parse the WebSocket (or similar HTTP-lite) message
# to get cookies and GET, but we need to add in a few things that
@ -78,9 +132,6 @@ def http_session(func):
request = AsgiRequest(message)
except Exception as e:
raise ValueError("Cannot parse HTTP message - are you sure this is a HTTP consumer? %s" % e)
# Make sure there's NOT a http_session already
if hasattr(message, "http_session"):
raise ValueError("http_session decorator wrapped inside another http_session decorator")
# Make sure there's a session key
session_key = request.GET.get("session_key", None)
if session_key is None:

View File

@ -3,6 +3,7 @@ from __future__ import unicode_literals
import logging
import time
from .exceptions import ConsumeLater
from .message import Message
from .utils import name_that_thing
@ -43,5 +44,7 @@ class Worker(object):
self.callback(channel, message)
try:
consumer(message)
except ConsumeLater:
self.channel_layer.send(channel, content)
except:
logger.exception("Error processing message with consumer %s:", name_that_thing(consumer))

View File

@ -381,7 +381,12 @@ use if the key is missing). Keys are unicode strings.
The one common key across all protocols is ``reply_channel``, a way to indicate
the client-specific channel to send responses to. Protocols are generally
encouraged to have one message type and one reply channel to ensure ordering.
encouraged to have one message type and one reply channel type to ensure ordering.
A ``reply_channel`` should be unique per connection. If the protocol in question
can have any server service a response - e.g. a theoretical SMS protocol - it
should not have ``reply_channel`` attributes on messages, but instead a separate
top-level outgoing channel.
Messages are specified here along with the channel names they are expected
on; if a channel name can vary, such as with reply channels, the varying
@ -390,7 +395,7 @@ the format the ``new_channel`` callable takes.
There is no label on message types to say what they are; their type is implicit
in the channel name they are received on. Two types that are sent on the same
channel, such as HTTP responses and server pushes, are distinguished apart
channel, such as HTTP responses and response chunks, are distinguished apart
by their required fields.
@ -630,6 +635,8 @@ Keys:
for this server as a unicode string, and ``port`` is the integer listening port.
Optional, defaults to ``None``.
* ``order``: The integer value ``0``.
Receive
'''''''
@ -647,6 +654,9 @@ Keys:
* ``text``: Unicode string of frame content, if it was text mode, or ``None``.
* ``order``: Order of this frame in the WebSocket stream, starting
at 1 (``connect`` is 0).
One of ``bytes`` or ``text`` must be non-``None``.
@ -665,6 +675,9 @@ Keys:
format ``websocket.send.?``. Cannot be used to send at this point; provided
as a way to identify the connection only.
* ``order``: Order of the disconnection relative to the incoming frames'
``order`` values in ``websocket.receive``.
Send/Close
''''''''''
@ -736,6 +749,33 @@ Keys:
* ``data``: Byte string of UDP datagram payload.
Protocol Format Guidelines
--------------------------
Message formats for protocols should follow these rules, unless
a very good performance or implementation reason is present:
* ``reply_channel`` should be unique per logical connection, and not per
logical client.
* If the protocol has server-side state, entirely encapsulate that state in
the protocol server; do not require the message consumers to use an external
state store.
* If the protocol has low-level negotiation, keepalive or other features,
handle these within the protocol server and don't expose them in ASGI
messages.
* If the protocol has guaranteed ordering, ASGI messages should include an
``order`` field (0-indexed) that preserves the ordering as received by the
protocol server (or as sent by the client, if available). This ordering should
span all message types emitted by the client - for example, a connect message
might have order ``0``, and the first two frames order ``1`` and ``2``.
* If the protocol is datagram-based, one datagram should equal one ASGI message
(unless size is an issue)
Approximate Global Ordering
---------------------------

View File

@ -259,7 +259,7 @@ name in the path of your WebSocket request (we'll ignore auth for now - that's n
# In consumers.py
from channels import Group
from channels.decorators import channel_session
from channels.sessions import channel_session
# Connected to websocket.connect
@channel_session
@ -342,7 +342,7 @@ chat to people with the same first letter of their username::
# In consumers.py
from channels import Channel, Group
from channels.decorators import channel_session
from channels.sessions import channel_session
from channels.auth import http_session_user, channel_session_user, transfer_user
# Connected to websocket.connect
@ -401,7 +401,7 @@ have a ChatMessage model with ``message`` and ``room`` fields::
# In consumers.py
from channels import Channel
from channels.decorators import channel_session
from channels.sessions import channel_session
from .models import ChatMessage
# Connected to chat-messages
@ -445,14 +445,16 @@ command run via ``cron``. If we wanted to write a bot, too, we could put its
listening logic inside the ``chat-messages`` consumer, as every message would
pass through it.
Linearization
-------------
Enforcing Ordering
------------------
There's one final concept we want to introduce you to before you go on to build
sites with Channels - linearizing consumers.
sites with Channels - consmer ordering
Because Channels is a distributed system that can have many workers, by default
it's entirely feasible for a WebSocket interface server to send out a ``connect``
it just processes messages in the order the workers get them off the queue.
It's entirely feasible for a WebSocket interface server to send out a ``connect``
and a ``receive`` message close enough together that a second worker will pick
up and start processing the ``receive`` message before the first worker has
finished processing the ``connect`` worker.
@ -464,53 +466,65 @@ same effect if someone tried to request a view before the login view had finishe
processing, but there you're not expecting that page to run after the login,
whereas you'd naturally expect ``receive`` to run after ``connect``.
But, of course, Channels has a solution - the ``linearize`` decorator. Any
handler decorated with this will use locking to ensure it does not run at the
same time as any other view with ``linearize`` **on messages with the same reply channel**.
That means your site will happily multitask with lots of different people's messages,
but if two happen to try to run at the same time for the same client, they'll
be deconflicted.
Channels has a solution - the ``enforce_ordering`` decorator. All WebSocket
messages contain an ``order`` key, and this decorator uses that to make sure that
messages are consumed in the right order, in one of two modes:
There's a small cost to using ``linearize``, which is why it's an optional
decorator, but generally you'll want to use it for most session-based WebSocket
* Slight ordering: Message 0 (``websocket.connect``) is done first, all others
are unordered
* Strict ordering: All messages are consumed strictly in sequence
The decorator uses ``channel_session`` to keep track of what numbered messages
have been processed, and if a worker tries to run a consumer on an out-of-order
message, it raises the ``ConsumeLater`` exception, which puts the message
back on the channel it came from and tells the worker to work on another message.
There's a cost to using ``enforce_ordering``, which is why it's an optional
decorator, and the cost is much greater in *strict* mode than it is in
*slight* mode. Generally you'll want to use *slight* mode for most session-based WebSocket
and other "continuous protocol" things. Here's an example, improving our
first-letter-of-username chat from earlier::
# In consumers.py
from channels import Channel, Group
from channels.decorators import channel_session, linearize
from channels.auth import http_session_user, channel_session_user, transfer_user
from channels.sessions import channel_session, enforce_ordering
from channels.auth import http_session_user, channel_session_user, channel_session_user_from_http
# Connected to websocket.connect
@linearize
@channel_session
@http_session_user
@enforce_ordering(slight=True)
@channel_session_user_from_http
def ws_add(message):
# Copy user from HTTP to channel session
transfer_user(message.http_session, message.channel_session)
# Add them to the right group
Group("chat-%s" % message.user.username[0]).add(message.reply_channel)
# Connected to websocket.keepalive
# We don't linearize as we know this will happen a decent time after add
@channel_session_user
def ws_keepalive(message):
# Keep them in the right group
Group("chat-%s" % message.user.username[0]).add(message.reply_channel)
# Connected to websocket.receive
@linearize
@enforce_ordering(slight=True)
@channel_session_user
def ws_message(message):
Group("chat-%s" % message.user.username[0]).send(message.content)
# Connected to websocket.disconnect
# We don't linearize as even if this gets an empty session, the group
# will auto-discard after the expiry anyway.
@enforce_ordering(slight=True)
@channel_session_user
def ws_disconnect(message):
Group("chat-%s" % message.user.username[0]).discard(message.reply_channel)
Slight ordering does mean that it's possible for a ``disconnect`` message to
get processed before a ``receive`` message, but that's fine in this case;
the client is disconnecting anyway, they don't care about those pending messages.
Strict ordering is the default as it's the most safe; to use it, just call
the decorator without arguments::
@enforce_ordering
def ws_message(message):
...
Generally, the performance (and safety) of your ordering is tied to your
session backend's performance. Make sure you choose session backend wisely
if you're going to rely heavily on ``enforce_ordering``.
Next Steps
----------

View File

@ -1,4 +1,12 @@
from channels.sessions import enforce_ordering
#@enforce_ordering(slight=True)
def ws_connect(message):
pass
#@enforce_ordering(slight=True)
def ws_message(message):
"Echoes messages back to the client"
message.reply_channel.send(message.content)

View File

@ -4,4 +4,5 @@ urlpatterns = []
channel_routing = {
"websocket.receive": consumers.ws_message,
"websocket.connect": consumers.ws_connect,
}