Detect if you forget to decorate things with enforce_ordering

This commit is contained in:
Andrew Godwin 2016-03-02 10:42:18 -08:00
parent 54dc80e9a5
commit 8fba5220d9
2 changed files with 22 additions and 3 deletions

View File

@ -1,5 +1,6 @@
import functools import functools
import hashlib import hashlib
import warnings
from importlib import import_module from importlib import import_module
from django.conf import settings from django.conf import settings
@ -93,8 +94,9 @@ def enforce_ordering(func=None, slight=False):
# Run consumer # Run consumer
return func(message, *args, **kwargs) return func(message, *args, **kwargs)
else: else:
# Bad ordering # Bad ordering - warn if we're getting close to the limit
print("Bad ordering detected: next %s, us %s, %s" % (next_order, order, message.reply_channel)) if getattr(message, "__doomed__", False):
warnings.warn("Enforce ordering consumer reached retry limit, message being dropped. Did you decorate all protocol consumers correctly?")
raise ConsumeLater() raise ConsumeLater()
return inner return inner
if func is not None: if func is not None:

View File

@ -16,9 +16,10 @@ class Worker(object):
and runs their consumers. and runs their consumers.
""" """
def __init__(self, channel_layer, callback=None): def __init__(self, channel_layer, callback=None, message_retries=10):
self.channel_layer = channel_layer self.channel_layer = channel_layer
self.callback = callback self.callback = callback
self.message_retries = message_retries
def run(self): def run(self):
""" """
@ -38,6 +39,11 @@ class Worker(object):
channel_name=channel, channel_name=channel,
channel_layer=self.channel_layer, channel_layer=self.channel_layer,
) )
# Add attribute to message if it's been retried almost too many times,
# and would be thrown away this time if it's requeued. Used for helpful
# warnings in decorators and such - don't rely on this as public API.
if content.get("__retries__", 0) == self.message_retries:
message.__doomed__ = True
# Handle the message # Handle the message
consumer = self.channel_layer.registry.consumer_for_channel(channel) consumer = self.channel_layer.registry.consumer_for_channel(channel)
if self.callback: if self.callback:
@ -45,6 +51,17 @@ class Worker(object):
try: try:
consumer(message) consumer(message)
except ConsumeLater: except ConsumeLater:
# They want to not handle it yet. Re-inject it with a number-of-tries marker.
content['__retries__'] = content.get("__retries__", 0) + 1
# If we retried too many times, quit and error rather than
# spinning forever
if content['__retries__'] > self.message_retries:
logger.warning(
"Exceeded number of retries for message on channel %s: %s",
channel,
repr(content)[:100],
)
continue
self.channel_layer.send(channel, content) self.channel_layer.send(channel, content)
except: except:
logger.exception("Error processing message with consumer %s:", name_that_thing(consumer)) logger.exception("Error processing message with consumer %s:", name_that_thing(consumer))