Fixed #481: Sends from outside consumers send immediately

This commit is contained in:
Andrew Godwin 2017-01-11 15:34:23 -08:00
parent ee4aa9b292
commit 69c59ee8b4
2 changed files with 23 additions and 9 deletions

View File

@ -34,14 +34,15 @@ class Channel(object):
Send a message over the channel - messages are always dicts.
Sends are delayed until consumer completion. To override this, you
may pass immediately=True.
may pass immediately=True. If you are outside a consumer, things are
always sent immediately.
"""
from .message import pending_message_store
if not isinstance(content, dict):
raise TypeError("You can only send dicts as content on channels.")
if immediately:
if immediately or not pending_message_store.active:
self.channel_layer.send(self.name, content)
else:
from .message import pending_message_store
pending_message_store.append(self, content)
def __str__(self):
@ -80,10 +81,10 @@ class Group(object):
Sends are delayed until consumer completion. To override this, you
may pass immediately=True.
"""
from .message import pending_message_store
if not isinstance(content, dict):
raise ValueError("You can only send dicts as content on channels.")
if immediately:
if immediately or not pending_message_store.active:
self.channel_layer.send_group(self.name, content)
else:
from .message import pending_message_store
pending_message_store.append(self, content)

View File

@ -4,7 +4,7 @@ import copy
import threading
from .channel import Channel
from .signals import consumer_finished
from .signals import consumer_finished, consumer_started
class Message(object):
@ -71,16 +71,29 @@ class PendingMessageStore(object):
threadlocal = threading.local()
def prepare(self, **kwargs):
"""
Sets the message store up to receive messages.
"""
self.threadlocal.messages = []
@property
def active(self):
"""
Returns if the pending message store can be used or not
(it can only be used inside consumers)
"""
return hasattr(self.threadlocal, "messages")
def append(self, sender, message):
if not hasattr(self.threadlocal, "messages"):
self.threadlocal.messages = []
self.threadlocal.messages.append((sender, message))
def send_and_flush(self, **kwargs):
for sender, message in getattr(self.threadlocal, "messages", []):
sender.send(message, immediately=True)
self.threadlocal.messages = []
delattr(self.threadlocal, "messages")
pending_message_store = PendingMessageStore()
consumer_started.connect(pending_message_store.prepare)
consumer_finished.connect(pending_message_store.send_and_flush)