Fixed #643: Add retry and limit to PendingMessageStore.

This won't fix all backlog issues, but will be sufficient to smooth
over bumps.
This commit is contained in:
Andrew Godwin 2017-05-29 11:07:03 -07:00
parent f2b39c33e6
commit 34a047a6ff

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import copy
import time
import threading
from .channel import Channel
@ -67,10 +68,17 @@ class PendingMessageStore(object):
"""
Singleton object used for storing pending messages that should be sent
to a channel or group when a consumer finishes.
Will retry when it sees ChannelFull up to a limit; if you want more control
over this, change to `immediately=True` in your send method and handle it
yourself.
"""
threadlocal = threading.local()
retry_time = 2 # seconds
retry_interval = 0.2 # seconds
def prepare(self, **kwargs):
"""
Sets the message store up to receive messages.
@ -90,7 +98,24 @@ class PendingMessageStore(object):
def send_and_flush(self, **kwargs):
for sender, message in getattr(self.threadlocal, "messages", []):
sender.send(message, immediately=True)
# Loop until the retry time limit is hit
started = time.time()
while time.time() - started < self.retry_time:
try:
sender.send(message, immediately=True)
except sender.channel_layer.ChannelFull:
time.sleep(self.retry_interval)
continue
else:
break
# If we didn't break out, we failed to send, so do a nice exception
else:
raise RuntimeError(
"Failed to send queued message to %s after retrying for %.2fs.\n"
"You need to increase the consumption rate on this channel, its capacity,\n"
"or handle the ChannelFull exception yourself after adding\n"
"immediately=True to send()." % (sender, self.retry_time)
)
delattr(self.threadlocal, "messages")