mirror of
https://github.com/django/daphne.git
synced 2025-07-22 13:39:44 +03:00
improve @enforce_ordering to leverage a wait channel to avoid spinlocks (#144)
* improved @enforce_ordering to leverage a wait channel to avoid spinlocks * addressed pyflake issues * renamed wait channel to __wait__.<reply channel> * handled potential ChannelFull exception * updated sessions unit tests * updated enforce_ordering tests to reflect new approach of leveraging wait channels * addressed pyflake issues * more pyflake fixes * removed close_on_error handling on enforce_ordering since only worked on websockets
This commit is contained in:
parent
c9497e74dd
commit
363b5a09e9
|
@ -1,6 +1,5 @@
|
|||
import functools
|
||||
import hashlib
|
||||
import warnings
|
||||
from importlib import import_module
|
||||
|
||||
from django.conf import settings
|
||||
|
@ -75,7 +74,7 @@ def enforce_ordering(func=None, slight=False):
|
|||
Enforces either slight (order=0 comes first, everything else isn't ordered)
|
||||
or strict (all messages exactly ordered) ordering against a reply_channel.
|
||||
|
||||
Uses sessions to track ordering.
|
||||
Uses sessions to track ordering and socket-specific wait channels for unordered messages.
|
||||
|
||||
You cannot mix slight ordering and strict ordering on a channel; slight
|
||||
ordering does not write to the session after the first message to improve
|
||||
|
@ -95,19 +94,38 @@ def enforce_ordering(func=None, slight=False):
|
|||
# See what the current next order should be
|
||||
next_order = message.channel_session.get("__channels_next_order", 0)
|
||||
if order == next_order or (slight and next_order > 0):
|
||||
# Message is in right order. Maybe persist next one?
|
||||
# Run consumer
|
||||
func(message, *args, **kwargs)
|
||||
# Mark next message order as available for running
|
||||
if order == 0 or not slight:
|
||||
message.channel_session["__channels_next_order"] = order + 1
|
||||
# Run consumer
|
||||
return func(message, *args, **kwargs)
|
||||
message.channel_session.save()
|
||||
# Requeue any pending wait channel messages for this socket connection back onto it's original channel
|
||||
while True:
|
||||
wait_channel = "__wait__.%s" % message.reply_channel.name
|
||||
channel, content = message.channel_layer.receive_many([wait_channel], block=False)
|
||||
if channel:
|
||||
original_channel = content.pop("original_channel")
|
||||
try:
|
||||
message.channel_layer.send(original_channel, content)
|
||||
except message.channel_layer.ChannelFull:
|
||||
raise message.channel_layer.ChannelFull(
|
||||
"Cannot requeue pending __wait__ channel message " +
|
||||
"back on to already full channel %s" % original_channel
|
||||
)
|
||||
else:
|
||||
break
|
||||
else:
|
||||
# Bad ordering - warn if we're getting close to the limit
|
||||
if getattr(message, "__doomed__", False):
|
||||
warnings.warn(
|
||||
"Enforce ordering consumer reached retry limit, message " +
|
||||
"being dropped. Did you decorate all protocol consumers correctly?"
|
||||
# Since out of order, enqueue message temporarily to wait channel for this socket connection
|
||||
wait_channel = "__wait__.%s" % message.reply_channel.name
|
||||
message.content["original_channel"] = message.channel.name
|
||||
try:
|
||||
message.channel_layer.send(wait_channel, message.content)
|
||||
except message.channel_layer.ChannelFull:
|
||||
raise message.channel_layer.ChannelFull(
|
||||
"Cannot add unordered message to already " +
|
||||
"full __wait__ channel for socket %s" % message.reply_channel.name
|
||||
)
|
||||
raise ConsumeLater()
|
||||
return inner
|
||||
if func is not None:
|
||||
return decorator(func)
|
||||
|
|
|
@ -2,10 +2,10 @@ from __future__ import unicode_literals
|
|||
|
||||
from django.conf import settings
|
||||
from django.test import override_settings
|
||||
from channels.exceptions import ConsumeLater
|
||||
from channels.message import Message
|
||||
from channels.sessions import channel_session, http_session, enforce_ordering, session_for_reply_channel
|
||||
from channels.tests import ChannelTestCase
|
||||
from channels import DEFAULT_CHANNEL_LAYER, channel_layers
|
||||
|
||||
|
||||
@override_settings(SESSION_ENGINE="django.contrib.sessions.backends.cache")
|
||||
|
@ -110,9 +110,21 @@ class SessionTests(ChannelTestCase):
|
|||
Tests that slight mode of enforce_ordering works
|
||||
"""
|
||||
# Construct messages to send
|
||||
message0 = Message({"reply_channel": "test-reply-a", "order": 0}, None, None)
|
||||
message1 = Message({"reply_channel": "test-reply-a", "order": 1}, None, None)
|
||||
message2 = Message({"reply_channel": "test-reply-a", "order": 2}, None, None)
|
||||
message0 = Message(
|
||||
{"reply_channel": "test-reply-a", "order": 0},
|
||||
"websocket.connect",
|
||||
channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
)
|
||||
message1 = Message(
|
||||
{"reply_channel": "test-reply-a", "order": 1},
|
||||
"websocket.receive",
|
||||
channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
)
|
||||
message2 = Message(
|
||||
{"reply_channel": "test-reply-a", "order": 2},
|
||||
"websocket.receive",
|
||||
channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
)
|
||||
|
||||
# Run them in an acceptable slight order
|
||||
@enforce_ordering(slight=True)
|
||||
|
@ -123,29 +135,54 @@ class SessionTests(ChannelTestCase):
|
|||
inner(message2)
|
||||
inner(message1)
|
||||
|
||||
# Ensure wait channel is empty
|
||||
wait_channel = "__wait__.%s" % "test-reply-a"
|
||||
next_message = self.get_next_message(wait_channel)
|
||||
self.assertEqual(next_message, None)
|
||||
|
||||
def test_enforce_ordering_slight_fail(self):
|
||||
"""
|
||||
Tests that slight mode of enforce_ordering fails on bad ordering
|
||||
"""
|
||||
# Construct messages to send
|
||||
message2 = Message({"reply_channel": "test-reply-e", "order": 2}, None, None)
|
||||
message2 = Message(
|
||||
{"reply_channel": "test-reply-e", "order": 2},
|
||||
"websocket.receive",
|
||||
channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
)
|
||||
|
||||
# Run them in an acceptable strict order
|
||||
@enforce_ordering(slight=True)
|
||||
def inner(message):
|
||||
pass
|
||||
|
||||
with self.assertRaises(ConsumeLater):
|
||||
inner(message2)
|
||||
inner(message2)
|
||||
|
||||
# Ensure wait channel is not empty
|
||||
wait_channel = "__wait__.%s" % "test-reply-e"
|
||||
next_message = self.get_next_message(wait_channel)
|
||||
self.assertNotEqual(next_message, None)
|
||||
|
||||
def test_enforce_ordering_strict(self):
|
||||
"""
|
||||
Tests that strict mode of enforce_ordering works
|
||||
"""
|
||||
# Construct messages to send
|
||||
message0 = Message({"reply_channel": "test-reply-b", "order": 0}, None, None)
|
||||
message1 = Message({"reply_channel": "test-reply-b", "order": 1}, None, None)
|
||||
message2 = Message({"reply_channel": "test-reply-b", "order": 2}, None, None)
|
||||
message0 = Message(
|
||||
{"reply_channel": "test-reply-b", "order": 0},
|
||||
"websocket.connect",
|
||||
channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
)
|
||||
message1 = Message(
|
||||
{"reply_channel": "test-reply-b", "order": 1},
|
||||
"websocket.receive",
|
||||
channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
)
|
||||
message2 = Message(
|
||||
{"reply_channel": "test-reply-b", "order": 2},
|
||||
"websocket.receive",
|
||||
channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
)
|
||||
|
||||
# Run them in an acceptable strict order
|
||||
@enforce_ordering
|
||||
|
@ -156,13 +193,26 @@ class SessionTests(ChannelTestCase):
|
|||
inner(message1)
|
||||
inner(message2)
|
||||
|
||||
# Ensure wait channel is empty
|
||||
wait_channel = "__wait__.%s" % "test-reply-b"
|
||||
next_message = self.get_next_message(wait_channel)
|
||||
self.assertEqual(next_message, None)
|
||||
|
||||
def test_enforce_ordering_strict_fail(self):
|
||||
"""
|
||||
Tests that strict mode of enforce_ordering fails on bad ordering
|
||||
"""
|
||||
# Construct messages to send
|
||||
message0 = Message({"reply_channel": "test-reply-c", "order": 0}, None, None)
|
||||
message2 = Message({"reply_channel": "test-reply-c", "order": 2}, None, None)
|
||||
message0 = Message(
|
||||
{"reply_channel": "test-reply-c", "order": 0},
|
||||
"websocket.connect",
|
||||
channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
)
|
||||
message2 = Message(
|
||||
{"reply_channel": "test-reply-c", "order": 2},
|
||||
"websocket.receive",
|
||||
channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
)
|
||||
|
||||
# Run them in an acceptable strict order
|
||||
@enforce_ordering
|
||||
|
@ -170,14 +220,22 @@ class SessionTests(ChannelTestCase):
|
|||
pass
|
||||
|
||||
inner(message0)
|
||||
with self.assertRaises(ConsumeLater):
|
||||
inner(message2)
|
||||
inner(message2)
|
||||
|
||||
# Ensure wait channel is not empty
|
||||
wait_channel = "__wait__.%s" % "test-reply-c"
|
||||
next_message = self.get_next_message(wait_channel)
|
||||
self.assertNotEqual(next_message, None)
|
||||
|
||||
def test_enforce_ordering_fail_no_order(self):
|
||||
"""
|
||||
Makes sure messages with no "order" key fail
|
||||
"""
|
||||
message0 = Message({"reply_channel": "test-reply-d"}, None, None)
|
||||
message0 = Message(
|
||||
{"reply_channel": "test-reply-d"},
|
||||
None,
|
||||
channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
)
|
||||
|
||||
@enforce_ordering(slight=True)
|
||||
def inner(message):
|
||||
|
|
Loading…
Reference in New Issue
Block a user