Delay server will now requeue messages for later when gets ChannelFull exception (#600)

It re-delays them for one second each time rather than dropping them.
This commit is contained in:
Sachin Rekhi 2017-04-11 02:25:56 -07:00 committed by Andrew Godwin
parent db1b3ba951
commit 31cd68c89b
2 changed files with 44 additions and 4 deletions

View File

@ -30,15 +30,20 @@ class DelayedMessage(models.Model):
self._delay = milliseconds self._delay = milliseconds
self.due_date = timezone.now() + timedelta(milliseconds=milliseconds) self.due_date = timezone.now() + timedelta(milliseconds=milliseconds)
def send(self, channel_layer=None): def send(self, channel_layer=None, requeue_delay=1000):
""" """
Sends the message on the configured channel with the stored content. Sends the message on the configured channel with the stored content.
Deletes the DelayedMessage record. Deletes the DelayedMessage record if successfully sent.
Args: Args:
channel_layer: optional channel_layer to use channel_layer: optional channel_layer to use
requeue_delay: if the channel is full, milliseconds to wait before requeue
""" """
channel_layer = channel_layer or channel_layers[DEFAULT_CHANNEL_LAYER] channel_layer = channel_layer or channel_layers[DEFAULT_CHANNEL_LAYER]
Channel(self.channel_name, channel_layer=channel_layer).send(json.loads(self.content), immediately=True) try:
self.delete() Channel(self.channel_name, channel_layer=channel_layer).send(json.loads(self.content), immediately=True)
self.delete()
except channel_layer.ChannelFull:
self.delay = requeue_delay
self.save()

View File

@ -71,6 +71,41 @@ class WorkerTests(ChannelTestCase):
message = self.get_next_message('test', require=True) message = self.get_next_message('test', require=True)
self.assertEqual(message.content, {'test': 'value'}) self.assertEqual(message.content, {'test': 'value'})
def test_channel_full(self):
"""
Tests that when channel capacity is hit when processing due messages,
message is requeued instead of dropped
"""
for i in range(10):
Channel('asgi.delay').send({
'channel': 'test',
'delay': 1000,
'content': {'test': 'value'}
}, immediately=True)
worker = PatchedWorker(channel_layers[DEFAULT_CHANNEL_LAYER])
worker.termed = 10
worker.run()
for i in range(1):
Channel('asgi.delay').send({
'channel': 'test',
'delay': 1000,
'content': {'test': 'value'}
}, immediately=True)
worker = PatchedWorker(channel_layers[DEFAULT_CHANNEL_LAYER])
worker.termed = 1
worker.run()
self.assertEqual(DelayedMessage.objects.count(), 11)
with mock.patch('django.utils.timezone.now', return_value=timezone.now() + timedelta(milliseconds=2000)):
worker.termed = 1
worker.run()
self.assertEqual(DelayedMessage.objects.count(), 1)
class DelayedMessageTests(ChannelTestCase): class DelayedMessageTests(ChannelTestCase):