diff --git a/channels/channel.py b/channels/channel.py index b65d65a..03a2db6 100644 --- a/channels/channel.py +++ b/channels/channel.py @@ -29,13 +29,20 @@ class Channel(object): else: self.channel_layer = channel_layers[alias] - def send(self, content): + def send(self, content, immediately=False): """ Send a message over the channel - messages are always dicts. + + Sends are delayed until consumer completion. To override this, you + may pass immediately=True. """ if not isinstance(content, dict): raise TypeError("You can only send dicts as content on channels.") - self.channel_layer.send(self.name, content) + if immediately: + self.channel_layer.send(self.name, content) + else: + from .message import pending_message_store + pending_message_store.append(self, content) def __str__(self): return self.name @@ -66,7 +73,17 @@ class Group(object): channel = channel.name self.channel_layer.group_discard(self.name, channel) - def send(self, content): + def send(self, content, immediately=False): + """ + Send a message to all channels in the group. + + Sends are delayed until consumer completion. To override this, you + may pass immediately=True. + """ if not isinstance(content, dict): raise ValueError("You can only send dicts as content on channels.") - self.channel_layer.send_group(self.name, content) + if immediately: + self.channel_layer.send_group(self.name, content) + else: + from .message import pending_message_store + pending_message_store.append(self, content) diff --git a/channels/generic/websockets.py b/channels/generic/websockets.py index 272b938..4890f3d 100644 --- a/channels/generic/websockets.py +++ b/channels/generic/websockets.py @@ -23,7 +23,7 @@ class WebsocketConsumer(BaseConsumer): # implies channel_session_user http_user = False - # Set one to True if you want the class to enforce ordering for you + # Set to True if you want the class to enforce ordering for you slight_ordering = False strict_ordering = False @@ -47,7 +47,7 @@ class WebsocketConsumer(BaseConsumer): if self.strict_ordering: return enforce_ordering(handler, slight=False) elif self.slight_ordering: - return enforce_ordering(handler, slight=True) + raise ValueError("Slight ordering is now always on. Please remove `slight_ordering=True`.") else: return handler diff --git a/channels/message.py b/channels/message.py index a44ecc7..b9c5ca1 100644 --- a/channels/message.py +++ b/channels/message.py @@ -1,7 +1,9 @@ from __future__ import unicode_literals import copy +import threading from .channel import Channel +from .signals import consumer_finished class Message(object): @@ -58,3 +60,25 @@ class Message(object): self.channel.name, self.channel_layer, ) + + +class PendingMessageStore(object): + """ + Singleton object used for storing pending messages that should be sent + to a channel or group when a consumer finishes. + """ + + threadlocal = threading.local() + + def append(self, sender, message): + if not hasattr(self.threadlocal, "messages"): + self.threadlocal.messages = [] + self.threadlocal.messages.append((sender, message)) + + def send_and_flush(self, **kwargs): + for sender, message in getattr(self.threadlocal, "messages", []): + sender.send(message, immediately=True) + self.threadlocal.messages = [] + +pending_message_store = PendingMessageStore() +consumer_finished.connect(pending_message_store.send_and_flush) diff --git a/channels/sessions.py b/channels/sessions.py index 8e3c254..10607f4 100644 --- a/channels/sessions.py +++ b/channels/sessions.py @@ -71,15 +71,14 @@ def channel_session(func): def enforce_ordering(func=None, slight=False): """ - Enforces either slight (order=0 comes first, everything else isn't ordered) - or strict (all messages exactly ordered) ordering against a reply_channel. + Enforces strict (all messages exactly ordered) ordering against a reply_channel. Uses sessions to track ordering and socket-specific wait channels for unordered messages. - - You cannot mix slight ordering and strict ordering on a channel; slight - ordering does not write to the session after the first message to improve - performance. """ + # Slight is deprecated + if slight: + raise ValueError("Slight ordering is now always on due to Channels changes. Please remove the decorator.") + # Main decorator def decorator(func): @channel_session @functools.wraps(func) @@ -93,13 +92,12 @@ def enforce_ordering(func=None, slight=False): order = int(message.content['order']) # See what the current next order should be next_order = message.channel_session.get("__channels_next_order", 0) - if order == next_order or (slight and next_order > 0): + if order == next_order: # Run consumer func(message, *args, **kwargs) # Mark next message order as available for running - if order == 0 or not slight: - message.channel_session["__channels_next_order"] = order + 1 - message.channel_session.save() + message.channel_session["__channels_next_order"] = order + 1 + message.channel_session.save() # Requeue any pending wait channel messages for this socket connection back onto it's original channel while True: wait_channel = "__wait__.%s" % message.reply_channel.name diff --git a/channels/worker.py b/channels/worker.py index bad22df..658f370 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -145,7 +145,7 @@ class Worker(object): break except: logger.exception("Error processing message with consumer %s:", name_that_thing(consumer)) - else: + finally: # Send consumer finished so DB conns close etc. consumer_finished.send(sender=self.__class__)