diff --git a/channels/delay/management/commands/rundelay.py b/channels/delay/management/commands/rundelay.py index 0a3e719..b47b1e2 100644 --- a/channels/delay/management/commands/rundelay.py +++ b/channels/delay/management/commands/rundelay.py @@ -17,6 +17,10 @@ class Command(BaseCommand): '--layer', action='store', dest='layer', default=DEFAULT_CHANNEL_LAYER, help='Channel layer alias to use, if not the default.', ) + parser.add_argument( + '--sleep', action='store', dest='sleep', default=1, type=float, + help='Amount of time to sleep between checks, in seconds.', + ) def handle(self, *args, **options): self.verbosity = options.get("verbosity", 1) @@ -33,6 +37,7 @@ class Command(BaseCommand): try: worker = Worker( channel_layer=self.channel_layer, + database_sleep_duration=options['sleep'], ) worker.run() except KeyboardInterrupt: diff --git a/channels/delay/worker.py b/channels/delay/worker.py index c2e554b..689a588 100644 --- a/channels/delay/worker.py +++ b/channels/delay/worker.py @@ -20,11 +20,13 @@ class Worker(object): self, channel_layer, signal_handlers=True, + database_sleep_duration=1, ): self.channel_layer = channel_layer self.signal_handlers = signal_handlers self.termed = False self.in_job = False + self.database_sleep_duration = database_sleep_duration def install_signal_handler(self): signal.signal(signal.SIGTERM, self.sigterm_handler) @@ -44,9 +46,11 @@ class Worker(object): logger.info("Listening on asgi.delay") + last_delay_check = 0 + while not self.termed: self.in_job = False - channel, content = self.channel_layer.receive_many(['asgi.delay']) + channel, content = self.channel_layer.receive(['asgi.delay'], block=False) self.in_job = True if channel is not None: @@ -71,12 +75,17 @@ class Worker(object): logger.error("Invalid message received: %s:%s", err.error_dict.keys(), err.messages) break message.save() - # check for messages to send - if not DelayedMessage.objects.is_due().count(): - logger.debug("No delayed messages waiting.") - time.sleep(0.01) - continue - for message in DelayedMessage.objects.is_due().all(): - logger.info("Delayed message due. Sending message to channel %s", message.channel_name) - message.send(channel_layer=self.channel_layer) + else: + # Sleep for a short interval so we don't idle hot. + time.sleep(0.1) + + # check for messages to send + if time.time() - last_delay_check > self.database_sleep_duration: + if DelayedMessage.objects.is_due().exists(): + for message in DelayedMessage.objects.is_due().all(): + logger.info("Sending delayed message to channel %s", message.channel_name) + message.send(channel_layer=self.channel_layer) + else: + logger.debug("No delayed messages waiting.") + last_delay_check = time.time()