From e7a354e03c50780474b77b6fc8b0bfe6012829b6 Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Tue, 12 Jul 2016 15:01:19 -0700 Subject: [PATCH] Fixed #148: Close database connections when consumers finish. --- channels/signals.py | 9 +++++++++ channels/worker.py | 7 +++++++ 2 files changed, 16 insertions(+) create mode 100644 channels/signals.py diff --git a/channels/signals.py b/channels/signals.py new file mode 100644 index 0000000..fbc6f43 --- /dev/null +++ b/channels/signals.py @@ -0,0 +1,9 @@ +from django.db import close_old_connections +from django.dispatch import Signal + + +consumer_started = Signal(providing_args=["environ"]) +consumer_finished = Signal() + +# Connect connection closer to consumer finished as well +consumer_finished.connect(close_old_connections) diff --git a/channels/worker.py b/channels/worker.py index 32f5edb..f6d93d7 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -6,6 +6,7 @@ import signal import sys import time +from .signals import consumer_started, consumer_finished from .exceptions import ConsumeLater from .message import Message from .utils import name_that_thing @@ -104,6 +105,9 @@ class Worker(object): self.callback(channel, message) try: logger.debug("Dispatching message on %s to %s", channel, name_that_thing(consumer)) + # Send consumer started to manage lifecycle stuff + consumer_started.send(sender=self.__class__, environ={}) + # Run consumer consumer(message, **kwargs) except ConsumeLater: # They want to not handle it yet. Re-inject it with a number-of-tries marker. @@ -127,3 +131,6 @@ class Worker(object): break except: logger.exception("Error processing message with consumer %s:", name_that_thing(consumer)) + else: + # Send consumer finished so DB conns close etc. + consumer_finished.send(sender=self.__class__)