From 69c59ee8b4924cd346e0a2b1724979375ad9924c Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Wed, 11 Jan 2017 15:34:23 -0800 Subject: [PATCH] Fixed #481: Sends from outside consumers send immediately --- channels/channel.py | 11 ++++++----- channels/message.py | 21 +++++++++++++++++---- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/channels/channel.py b/channels/channel.py index 03a2db6..e4bef7b 100644 --- a/channels/channel.py +++ b/channels/channel.py @@ -34,14 +34,15 @@ class Channel(object): Send a message over the channel - messages are always dicts. Sends are delayed until consumer completion. To override this, you - may pass immediately=True. + may pass immediately=True. If you are outside a consumer, things are + always sent immediately. """ + from .message import pending_message_store if not isinstance(content, dict): raise TypeError("You can only send dicts as content on channels.") - if immediately: + if immediately or not pending_message_store.active: self.channel_layer.send(self.name, content) else: - from .message import pending_message_store pending_message_store.append(self, content) def __str__(self): @@ -80,10 +81,10 @@ class Group(object): Sends are delayed until consumer completion. To override this, you may pass immediately=True. """ + from .message import pending_message_store if not isinstance(content, dict): raise ValueError("You can only send dicts as content on channels.") - if immediately: + if immediately or not pending_message_store.active: self.channel_layer.send_group(self.name, content) else: - from .message import pending_message_store pending_message_store.append(self, content) diff --git a/channels/message.py b/channels/message.py index 6a4d3f3..f8001d8 100644 --- a/channels/message.py +++ b/channels/message.py @@ -4,7 +4,7 @@ import copy import threading from .channel import Channel -from .signals import consumer_finished +from .signals import consumer_finished, consumer_started class Message(object): @@ -71,16 +71,29 @@ class PendingMessageStore(object): threadlocal = threading.local() + def prepare(self, **kwargs): + """ + Sets the message store up to receive messages. + """ + self.threadlocal.messages = [] + + @property + def active(self): + """ + Returns if the pending message store can be used or not + (it can only be used inside consumers) + """ + return hasattr(self.threadlocal, "messages") + def append(self, sender, message): - if not hasattr(self.threadlocal, "messages"): - self.threadlocal.messages = [] self.threadlocal.messages.append((sender, message)) def send_and_flush(self, **kwargs): for sender, message in getattr(self.threadlocal, "messages", []): sender.send(message, immediately=True) - self.threadlocal.messages = [] + delattr(self.threadlocal, "messages") pending_message_store = PendingMessageStore() +consumer_started.connect(pending_message_store.prepare) consumer_finished.connect(pending_message_store.send_and_flush)