Graceful shutdown for workers on SIGTERM and SIGINT

This commit is contained in:
Andrew Godwin 2016-03-02 17:21:58 -08:00
parent 44568dab5b
commit 08d1f9a14d

View File

@ -1,6 +1,8 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import logging import logging
import signal
import sys
import time import time
from .exceptions import ConsumeLater from .exceptions import ConsumeLater
@ -16,18 +18,37 @@ class Worker(object):
and runs their consumers. and runs their consumers.
""" """
def __init__(self, channel_layer, callback=None, message_retries=10): def __init__(self, channel_layer, callback=None, message_retries=10, signal_handlers=True):
self.channel_layer = channel_layer self.channel_layer = channel_layer
self.callback = callback self.callback = callback
self.message_retries = message_retries self.message_retries = message_retries
self.signal_handlers = signal_handlers
self.termed = False
self.in_job = False
def install_signal_handler(self):
signal.signal(signal.SIGTERM, self.sigterm_handler)
signal.signal(signal.SIGINT, self.sigterm_handler)
def sigterm_handler(self, signo, stack_frame):
self.termed = True
if self.in_job:
logger.info("Shutdown signal received while busy, waiting for loop termination")
else:
logger.info("Shutdown signal received while idle, terminating immediately")
sys.exit(0)
def run(self): def run(self):
""" """
Tries to continually dispatch messages to consumers. Tries to continually dispatch messages to consumers.
""" """
if self.signal_handlers:
self.install_signal_handler()
channels = self.channel_layer.registry.all_channel_names() channels = self.channel_layer.registry.all_channel_names()
while True: while not self.termed:
self.in_job = False
channel, content = self.channel_layer.receive_many(channels, block=True) channel, content = self.channel_layer.receive_many(channels, block=True)
self.in_job = True
# If no message, stall a little to avoid busy-looping then continue # If no message, stall a little to avoid busy-looping then continue
if channel is None: if channel is None:
time.sleep(0.01) time.sleep(0.01)