From 34a047a6ffb65ea7277466ab4a1a2c6e4ed67847 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Mon, 29 May 2017 11:07:03 -0700 Subject: [PATCH] Fixed #643: Add retry and limit to PendingMessageStore. This won't fix all backlog issues, but will be sufficient to smooth over bumps. --- channels/message.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/channels/message.py b/channels/message.py index f8001d8..1564977 100644 --- a/channels/message.py +++ b/channels/message.py @@ -1,6 +1,7 @@ from __future__ import unicode_literals import copy +import time import threading from .channel import Channel @@ -67,10 +68,17 @@ class PendingMessageStore(object): """ Singleton object used for storing pending messages that should be sent to a channel or group when a consumer finishes. + + Will retry when it sees ChannelFull up to a limit; if you want more control + over this, change to `immediately=True` in your send method and handle it + yourself. """ threadlocal = threading.local() + retry_time = 2 # seconds + retry_interval = 0.2 # seconds + def prepare(self, **kwargs): """ Sets the message store up to receive messages. @@ -90,7 +98,24 @@ class PendingMessageStore(object): def send_and_flush(self, **kwargs): for sender, message in getattr(self.threadlocal, "messages", []): - sender.send(message, immediately=True) + # Loop until the retry time limit is hit + started = time.time() + while time.time() - started < self.retry_time: + try: + sender.send(message, immediately=True) + except sender.channel_layer.ChannelFull: + time.sleep(self.retry_interval) + continue + else: + break + # If we didn't break out, we failed to send, so do a nice exception + else: + raise RuntimeError( + "Failed to send queued message to %s after retrying for %.2fs.\n" + "You need to increase the consumption rate on this channel, its capacity,\n" + "or handle the ChannelFull exception yourself after adding\n" + "immediately=True to send()." % (sender, self.retry_time) + ) delattr(self.threadlocal, "messages")