diff --git a/channels/delay/models.py b/channels/delay/models.py index 4bff090..932d6d3 100644 --- a/channels/delay/models.py +++ b/channels/delay/models.py @@ -30,15 +30,20 @@ class DelayedMessage(models.Model): self._delay = milliseconds self.due_date = timezone.now() + timedelta(milliseconds=milliseconds) - def send(self, channel_layer=None): + def send(self, channel_layer=None, requeue_delay=1000): """ Sends the message on the configured channel with the stored content. - Deletes the DelayedMessage record. + Deletes the DelayedMessage record if successfully sent. Args: channel_layer: optional channel_layer to use + requeue_delay: if the channel is full, milliseconds to wait before requeue """ channel_layer = channel_layer or channel_layers[DEFAULT_CHANNEL_LAYER] - Channel(self.channel_name, channel_layer=channel_layer).send(json.loads(self.content), immediately=True) - self.delete() + try: + Channel(self.channel_name, channel_layer=channel_layer).send(json.loads(self.content), immediately=True) + self.delete() + except channel_layer.ChannelFull: + self.delay = requeue_delay + self.save() diff --git a/tests/test_delay.py b/tests/test_delay.py index 08cb194..a6de9dd 100644 --- a/tests/test_delay.py +++ b/tests/test_delay.py @@ -71,6 +71,41 @@ class WorkerTests(ChannelTestCase): message = self.get_next_message('test', require=True) self.assertEqual(message.content, {'test': 'value'}) + def test_channel_full(self): + """ + Tests that when channel capacity is hit when processing due messages, + message is requeued instead of dropped + """ + for i in range(10): + Channel('asgi.delay').send({ + 'channel': 'test', + 'delay': 1000, + 'content': {'test': 'value'} + }, immediately=True) + + worker = PatchedWorker(channel_layers[DEFAULT_CHANNEL_LAYER]) + worker.termed = 10 + worker.run() + + for i in range(1): + Channel('asgi.delay').send({ + 'channel': 'test', + 'delay': 1000, + 'content': {'test': 'value'} + }, immediately=True) + + worker = PatchedWorker(channel_layers[DEFAULT_CHANNEL_LAYER]) + worker.termed = 1 + worker.run() + + self.assertEqual(DelayedMessage.objects.count(), 11) + + with mock.patch('django.utils.timezone.now', return_value=timezone.now() + timedelta(milliseconds=2000)): + worker.termed = 1 + worker.run() + + self.assertEqual(DelayedMessage.objects.count(), 1) + class DelayedMessageTests(ChannelTestCase):