Send messages after the end of consumers

This commit is contained in:
Andrew Godwin 2016-10-05 14:49:46 -07:00
parent db0d2975a0
commit 0826b7997f
5 changed files with 56 additions and 17 deletions

View File

@ -29,13 +29,20 @@ class Channel(object):
else:
self.channel_layer = channel_layers[alias]
def send(self, content):
def send(self, content, immediately=False):
"""
Send a message over the channel - messages are always dicts.
Sends are delayed until consumer completion. To override this, you
may pass immediately=True.
"""
if not isinstance(content, dict):
raise TypeError("You can only send dicts as content on channels.")
self.channel_layer.send(self.name, content)
if immediately:
self.channel_layer.send(self.name, content)
else:
from .message import pending_message_store
pending_message_store.append(self, content)
def __str__(self):
return self.name
@ -66,7 +73,17 @@ class Group(object):
channel = channel.name
self.channel_layer.group_discard(self.name, channel)
def send(self, content):
def send(self, content, immediately=False):
"""
Send a message to all channels in the group.
Sends are delayed until consumer completion. To override this, you
may pass immediately=True.
"""
if not isinstance(content, dict):
raise ValueError("You can only send dicts as content on channels.")
self.channel_layer.send_group(self.name, content)
if immediately:
self.channel_layer.send_group(self.name, content)
else:
from .message import pending_message_store
pending_message_store.append(self, content)

View File

@ -23,7 +23,7 @@ class WebsocketConsumer(BaseConsumer):
# implies channel_session_user
http_user = False
# Set one to True if you want the class to enforce ordering for you
# Set to True if you want the class to enforce ordering for you
slight_ordering = False
strict_ordering = False
@ -47,7 +47,7 @@ class WebsocketConsumer(BaseConsumer):
if self.strict_ordering:
return enforce_ordering(handler, slight=False)
elif self.slight_ordering:
return enforce_ordering(handler, slight=True)
raise ValueError("Slight ordering is now always on. Please remove `slight_ordering=True`.")
else:
return handler

View File

@ -1,7 +1,9 @@
from __future__ import unicode_literals
import copy
import threading
from .channel import Channel
from .signals import consumer_finished
class Message(object):
@ -58,3 +60,25 @@ class Message(object):
self.channel.name,
self.channel_layer,
)
class PendingMessageStore(object):
"""
Singleton object used for storing pending messages that should be sent
to a channel or group when a consumer finishes.
"""
threadlocal = threading.local()
def append(self, sender, message):
if not hasattr(self.threadlocal, "messages"):
self.threadlocal.messages = []
self.threadlocal.messages.append((sender, message))
def send_and_flush(self, **kwargs):
for sender, message in getattr(self.threadlocal, "messages", []):
sender.send(message, immediately=True)
self.threadlocal.messages = []
pending_message_store = PendingMessageStore()
consumer_finished.connect(pending_message_store.send_and_flush)

View File

@ -71,15 +71,14 @@ def channel_session(func):
def enforce_ordering(func=None, slight=False):
"""
Enforces either slight (order=0 comes first, everything else isn't ordered)
or strict (all messages exactly ordered) ordering against a reply_channel.
Enforces strict (all messages exactly ordered) ordering against a reply_channel.
Uses sessions to track ordering and socket-specific wait channels for unordered messages.
You cannot mix slight ordering and strict ordering on a channel; slight
ordering does not write to the session after the first message to improve
performance.
"""
# Slight is deprecated
if slight:
raise ValueError("Slight ordering is now always on due to Channels changes. Please remove the decorator.")
# Main decorator
def decorator(func):
@channel_session
@functools.wraps(func)
@ -93,13 +92,12 @@ def enforce_ordering(func=None, slight=False):
order = int(message.content['order'])
# See what the current next order should be
next_order = message.channel_session.get("__channels_next_order", 0)
if order == next_order or (slight and next_order > 0):
if order == next_order:
# Run consumer
func(message, *args, **kwargs)
# Mark next message order as available for running
if order == 0 or not slight:
message.channel_session["__channels_next_order"] = order + 1
message.channel_session.save()
message.channel_session["__channels_next_order"] = order + 1
message.channel_session.save()
# Requeue any pending wait channel messages for this socket connection back onto it's original channel
while True:
wait_channel = "__wait__.%s" % message.reply_channel.name

View File

@ -145,7 +145,7 @@ class Worker(object):
break
except:
logger.exception("Error processing message with consumer %s:", name_that_thing(consumer))
else:
finally:
# Send consumer finished so DB conns close etc.
consumer_finished.send(sender=self.__class__)