From 8fba5220d905ec7fe10ddf228a30bbcbf2aa7e5d Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Wed, 2 Mar 2016 10:42:18 -0800 Subject: [PATCH] Detect if you forget to decorate things with enforce_ordering --- channels/sessions.py | 6 ++++-- channels/worker.py | 19 ++++++++++++++++++- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/channels/sessions.py b/channels/sessions.py index a9e847e..a622819 100644 --- a/channels/sessions.py +++ b/channels/sessions.py @@ -1,5 +1,6 @@ import functools import hashlib +import warnings from importlib import import_module from django.conf import settings @@ -93,8 +94,9 @@ def enforce_ordering(func=None, slight=False): # Run consumer return func(message, *args, **kwargs) else: - # Bad ordering - print("Bad ordering detected: next %s, us %s, %s" % (next_order, order, message.reply_channel)) + # Bad ordering - warn if we're getting close to the limit + if getattr(message, "__doomed__", False): + warnings.warn("Enforce ordering consumer reached retry limit, message being dropped. Did you decorate all protocol consumers correctly?") raise ConsumeLater() return inner if func is not None: diff --git a/channels/worker.py b/channels/worker.py index 927b980..8bf4ab8 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -16,9 +16,10 @@ class Worker(object): and runs their consumers. """ - def __init__(self, channel_layer, callback=None): + def __init__(self, channel_layer, callback=None, message_retries=10): self.channel_layer = channel_layer self.callback = callback + self.message_retries = message_retries def run(self): """ @@ -38,6 +39,11 @@ class Worker(object): channel_name=channel, channel_layer=self.channel_layer, ) + # Add attribute to message if it's been retried almost too many times, + # and would be thrown away this time if it's requeued. Used for helpful + # warnings in decorators and such - don't rely on this as public API. + if content.get("__retries__", 0) == self.message_retries: + message.__doomed__ = True # Handle the message consumer = self.channel_layer.registry.consumer_for_channel(channel) if self.callback: @@ -45,6 +51,17 @@ class Worker(object): try: consumer(message) except ConsumeLater: + # They want to not handle it yet. Re-inject it with a number-of-tries marker. + content['__retries__'] = content.get("__retries__", 0) + 1 + # If we retried too many times, quit and error rather than + # spinning forever + if content['__retries__'] > self.message_retries: + logger.warning( + "Exceeded number of retries for message on channel %s: %s", + channel, + repr(content)[:100], + ) + continue self.channel_layer.send(channel, content) except: logger.exception("Error processing message with consumer %s:", name_that_thing(consumer))