From 627b97c317a6c3ceafa1c8f1192633d1c85c93ff Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Mon, 3 Apr 2017 13:01:20 +0200 Subject: [PATCH] Fixed #588: enforce_ordering failed to wait on process-specific chans --- channels/sessions.py | 13 +++++++++++-- tests/test_sessions.py | 8 ++++---- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/channels/sessions.py b/channels/sessions.py index 3e43f80..8bf716b 100644 --- a/channels/sessions.py +++ b/channels/sessions.py @@ -83,12 +83,21 @@ def channel_session(func): return inner +def wait_channel_name(reply_channel): + """ + Given a reply_channel, returns a wait channel for it. + Replaces any ! with ? so process-specific channels become single-reader + channels. + """ + return "__wait__.%s" % (reply_channel.replace("!", "?"), ) + + def requeue_messages(message): """ Requeue any pending wait channel messages for this socket connection back onto it's original channel """ while True: - wait_channel = "__wait__.%s" % message.reply_channel.name + wait_channel = wait_channel_name(message.reply_channel.name) channel, content = message.channel_layer.receive_many([wait_channel], block=False) if channel: original_channel = content.pop("original_channel") @@ -137,7 +146,7 @@ def enforce_ordering(func=None, slight=False): requeue_messages(message) else: # Since out of order, enqueue message temporarily to wait channel for this socket connection - wait_channel = "__wait__.%s" % message.reply_channel.name + wait_channel = wait_channel_name(message.reply_channel.name) message.content["original_channel"] = message.channel.name try: message.channel_layer.send(wait_channel, message.content) diff --git a/tests/test_sessions.py b/tests/test_sessions.py index d1d507a..007ae98 100644 --- a/tests/test_sessions.py +++ b/tests/test_sessions.py @@ -256,17 +256,17 @@ class SessionTests(ChannelTestCase): """ # Construct messages to send message0 = Message( - {"reply_channel": "test-reply-b", "order": 0}, + {"reply_channel": "test-reply!b", "order": 0}, "websocket.connect", channel_layers[DEFAULT_CHANNEL_LAYER] ) message1 = Message( - {"reply_channel": "test-reply-b", "order": 1}, + {"reply_channel": "test-reply!b", "order": 1}, "websocket.receive", channel_layers[DEFAULT_CHANNEL_LAYER] ) message2 = Message( - {"reply_channel": "test-reply-b", "order": 2}, + {"reply_channel": "test-reply!b", "order": 2}, "websocket.receive", channel_layers[DEFAULT_CHANNEL_LAYER] ) @@ -281,7 +281,7 @@ class SessionTests(ChannelTestCase): inner(message2) # Ensure wait channel is empty - wait_channel = "__wait__.%s" % "test-reply-b" + wait_channel = "__wait__.test-reply?b" next_message = self.get_next_message(wait_channel) self.assertEqual(next_message, None)