Fixed #588: enforce_ordering failed to wait on process-specific chans

This commit is contained in:
Andrew Godwin 2017-04-03 13:01:20 +02:00
parent ba54268c19
commit 627b97c317
2 changed files with 15 additions and 6 deletions

View File

@ -83,12 +83,21 @@ def channel_session(func):
return inner return inner
def wait_channel_name(reply_channel):
"""
Given a reply_channel, returns a wait channel for it.
Replaces any ! with ? so process-specific channels become single-reader
channels.
"""
return "__wait__.%s" % (reply_channel.replace("!", "?"), )
def requeue_messages(message): def requeue_messages(message):
""" """
Requeue any pending wait channel messages for this socket connection back onto it's original channel Requeue any pending wait channel messages for this socket connection back onto it's original channel
""" """
while True: while True:
wait_channel = "__wait__.%s" % message.reply_channel.name wait_channel = wait_channel_name(message.reply_channel.name)
channel, content = message.channel_layer.receive_many([wait_channel], block=False) channel, content = message.channel_layer.receive_many([wait_channel], block=False)
if channel: if channel:
original_channel = content.pop("original_channel") original_channel = content.pop("original_channel")
@ -137,7 +146,7 @@ def enforce_ordering(func=None, slight=False):
requeue_messages(message) requeue_messages(message)
else: else:
# Since out of order, enqueue message temporarily to wait channel for this socket connection # Since out of order, enqueue message temporarily to wait channel for this socket connection
wait_channel = "__wait__.%s" % message.reply_channel.name wait_channel = wait_channel_name(message.reply_channel.name)
message.content["original_channel"] = message.channel.name message.content["original_channel"] = message.channel.name
try: try:
message.channel_layer.send(wait_channel, message.content) message.channel_layer.send(wait_channel, message.content)

View File

@ -256,17 +256,17 @@ class SessionTests(ChannelTestCase):
""" """
# Construct messages to send # Construct messages to send
message0 = Message( message0 = Message(
{"reply_channel": "test-reply-b", "order": 0}, {"reply_channel": "test-reply!b", "order": 0},
"websocket.connect", "websocket.connect",
channel_layers[DEFAULT_CHANNEL_LAYER] channel_layers[DEFAULT_CHANNEL_LAYER]
) )
message1 = Message( message1 = Message(
{"reply_channel": "test-reply-b", "order": 1}, {"reply_channel": "test-reply!b", "order": 1},
"websocket.receive", "websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER] channel_layers[DEFAULT_CHANNEL_LAYER]
) )
message2 = Message( message2 = Message(
{"reply_channel": "test-reply-b", "order": 2}, {"reply_channel": "test-reply!b", "order": 2},
"websocket.receive", "websocket.receive",
channel_layers[DEFAULT_CHANNEL_LAYER] channel_layers[DEFAULT_CHANNEL_LAYER]
) )
@ -281,7 +281,7 @@ class SessionTests(ChannelTestCase):
inner(message2) inner(message2)
# Ensure wait channel is empty # Ensure wait channel is empty
wait_channel = "__wait__.%s" % "test-reply-b" wait_channel = "__wait__.test-reply?b"
next_message = self.get_next_message(wait_channel) next_message = self.get_next_message(wait_channel)
self.assertEqual(next_message, None) self.assertEqual(next_message, None)