Fixed #512: Give rundelay a configurable sleep interval

Also reduced the default interval to 1s.
This commit is contained in:
Andrew Godwin 2017-01-30 17:07:48 -08:00
parent 5fc5267d2a
commit 558d66a6b2
2 changed files with 23 additions and 9 deletions

View File

@ -17,6 +17,10 @@ class Command(BaseCommand):
'--layer', action='store', dest='layer', default=DEFAULT_CHANNEL_LAYER,
help='Channel layer alias to use, if not the default.',
)
parser.add_argument(
'--sleep', action='store', dest='sleep', default=1, type=float,
help='Amount of time to sleep between checks, in seconds.',
)
def handle(self, *args, **options):
self.verbosity = options.get("verbosity", 1)
@ -33,6 +37,7 @@ class Command(BaseCommand):
try:
worker = Worker(
channel_layer=self.channel_layer,
database_sleep_duration=options['sleep'],
)
worker.run()
except KeyboardInterrupt:

View File

@ -20,11 +20,13 @@ class Worker(object):
self,
channel_layer,
signal_handlers=True,
database_sleep_duration=1,
):
self.channel_layer = channel_layer
self.signal_handlers = signal_handlers
self.termed = False
self.in_job = False
self.database_sleep_duration = database_sleep_duration
def install_signal_handler(self):
signal.signal(signal.SIGTERM, self.sigterm_handler)
@ -44,9 +46,11 @@ class Worker(object):
logger.info("Listening on asgi.delay")
last_delay_check = 0
while not self.termed:
self.in_job = False
channel, content = self.channel_layer.receive_many(['asgi.delay'])
channel, content = self.channel_layer.receive(['asgi.delay'], block=False)
self.in_job = True
if channel is not None:
@ -71,12 +75,17 @@ class Worker(object):
logger.error("Invalid message received: %s:%s", err.error_dict.keys(), err.messages)
break
message.save()
# check for messages to send
if not DelayedMessage.objects.is_due().count():
logger.debug("No delayed messages waiting.")
time.sleep(0.01)
continue
else:
# Sleep for a short interval so we don't idle hot.
time.sleep(0.1)
# check for messages to send
if time.time() - last_delay_check > self.database_sleep_duration:
if DelayedMessage.objects.is_due().exists():
for message in DelayedMessage.objects.is_due().all():
logger.info("Delayed message due. Sending message to channel %s", message.channel_name)
logger.info("Sending delayed message to channel %s", message.channel_name)
message.send(channel_layer=self.channel_layer)
else:
logger.debug("No delayed messages waiting.")
last_delay_check = time.time()