Delay Protocol Server (#401)

* Add Delay Protocol Server

Add a process that listens to a specific channel
and delays incoming messages by a given time.

* Add custom django command rundelay
* Add test suite
* Implements #115

* Add channels.delay app

* Add AppConfig

* Move rundelay command to channels.delay app

* Refactor DelayedMessage into model

Move login into a database backed model.
* Update Worker
* Add migration

* Add delay docs page

* Add to TOC

* Fix import sorting

* Add ASGI spec document for Delay Protocol

* Update channels.delay doc with new channel name
* remove interval docs

* Refactor Delay to use milliseconds instead of seconds

Use milliseconds as the default unit. Gives more control to developers.

* Remove interval logic from DelayedMessage
* Remove interval tests
* Tweak test logic to use milliseconds
This commit is contained in:
Sam Bolgert 2016-11-24 12:54:03 -06:00 committed by Andrew Godwin
parent fdc80cb269
commit 3dddefa845
15 changed files with 376 additions and 0 deletions

View File

@ -0,0 +1 @@
default_app_config = 'channels.delay.apps.DelayConfig'

8
channels/delay/apps.py Normal file
View File

@ -0,0 +1,8 @@
from django.apps import AppConfig
class DelayConfig(AppConfig):
name = "channels.delay"
label = "channels.delay"
verbose_name = "Channels Delay"

View File

View File

@ -0,0 +1,39 @@
from __future__ import unicode_literals
from django.core.management import BaseCommand, CommandError
from channels import DEFAULT_CHANNEL_LAYER, channel_layers
from channels.delay.worker import Worker
from channels.log import setup_logger
class Command(BaseCommand):
leave_locale_alone = True
def add_arguments(self, parser):
super(Command, self).add_arguments(parser)
parser.add_argument(
'--layer', action='store', dest='layer', default=DEFAULT_CHANNEL_LAYER,
help='Channel layer alias to use, if not the default.',
)
def handle(self, *args, **options):
self.verbosity = options.get("verbosity", 1)
self.logger = setup_logger('django.channels', self.verbosity)
self.channel_layer = channel_layers[options.get("layer", DEFAULT_CHANNEL_LAYER)]
# Check that handler isn't inmemory
if self.channel_layer.local_only():
raise CommandError(
"You cannot span multiple processes with the in-memory layer. " +
"Change your settings to use a cross-process channel layer."
)
self.options = options
self.logger.info("Running delay against channel layer %s", self.channel_layer)
try:
worker = Worker(
channel_layer=self.channel_layer,
)
worker.run()
except KeyboardInterrupt:
pass

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.7 on 2016-10-21 01:14
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='DelayedMessage',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('due_date', models.DateTimeField(db_index=True)),
('channel_name', models.CharField(max_length=512)),
('content', models.TextField()),
],
),
]

View File

44
channels/delay/models.py Normal file
View File

@ -0,0 +1,44 @@
import json
from datetime import timedelta
from django.db import models
from django.utils import timezone
from channels import DEFAULT_CHANNEL_LAYER, Channel, channel_layers
class DelayedMessageQuerySet(models.QuerySet):
def is_due(self):
return self.filter(due_date__lte=timezone.now())
class DelayedMessage(models.Model):
due_date = models.DateTimeField(db_index=True)
channel_name = models.CharField(max_length=512)
content = models.TextField()
objects = DelayedMessageQuerySet.as_manager()
@property
def delay(self):
return self._delay
@delay.setter
def delay(self, milliseconds):
self._delay = milliseconds
self.due_date = timezone.now() + timedelta(milliseconds=milliseconds)
def send(self, channel_layer=None):
"""
Sends the message on the configured channel with the stored content.
Deletes the DelayedMessage record.
Args:
channel_layer: optional channel_layer to use
"""
channel_layer = channel_layer or channel_layers[DEFAULT_CHANNEL_LAYER]
Channel(self.channel_name, channel_layer=channel_layer).send(json.loads(self.content), immediately=True)
self.delete()

82
channels/delay/worker.py Normal file
View File

@ -0,0 +1,82 @@
from __future__ import unicode_literals
import json
import logging
import signal
import sys
import time
from django.core.exceptions import ValidationError
from .models import DelayedMessage
logger = logging.getLogger('django.channels')
class Worker(object):
"""Worker class that listens to channels.delay messages and dispatches messages"""
def __init__(
self,
channel_layer,
signal_handlers=True,
):
self.channel_layer = channel_layer
self.signal_handlers = signal_handlers
self.termed = False
self.in_job = False
def install_signal_handler(self):
signal.signal(signal.SIGTERM, self.sigterm_handler)
signal.signal(signal.SIGINT, self.sigterm_handler)
def sigterm_handler(self, signo, stack_frame):
self.termed = True
if self.in_job:
logger.info("Shutdown signal received while busy, waiting for loop termination")
else:
logger.info("Shutdown signal received while idle, terminating immediately")
sys.exit(0)
def run(self):
if self.signal_handlers:
self.install_signal_handler()
logger.info("Listening on asgi.delay")
while not self.termed:
self.in_job = False
channel, content = self.channel_layer.receive_many(['asgi.delay'])
self.in_job = True
if channel is not None:
logger.debug("Got message on asgi.delay")
if 'channel' not in content or \
'content' not in content or \
'delay' not in content:
logger.error("Invalid message received, it must contain keys 'channel', 'content', "
"and 'delay'.")
break
message = DelayedMessage(
content=json.dumps(content['content']),
channel_name=content['channel'],
delay=content['delay']
)
try:
message.full_clean()
except ValidationError as err:
logger.error("Invalid message received: %s:%s", err.error_dict.keys(), err.messages)
break
message.save()
# check for messages to send
if not DelayedMessage.objects.is_due().count():
logger.debug("No delayed messages waiting.")
time.sleep(0.01)
continue
for message in DelayedMessage.objects.is_due().all():
logger.info("Delayed message due. Sending message to channel %s", message.channel_name)
message.send(channel_layer=self.channel_layer)

View File

@ -6,6 +6,7 @@ INSTALLED_APPS = (
'django.contrib.sessions',
'django.contrib.admin',
'channels',
'channels.delay'
)
DATABASES = {

View File

@ -0,0 +1,102 @@
from __future__ import unicode_literals
import json
from datetime import timedelta
from django.utils import timezone
from channels import DEFAULT_CHANNEL_LAYER, Channel, channel_layers
from channels.delay.models import DelayedMessage
from channels.delay.worker import Worker
from channels.tests import ChannelTestCase
try:
from unittest import mock
except ImportError:
import mock
class PatchedWorker(Worker):
"""Worker with specific numbers of loops"""
def get_termed(self):
if not self.__iters:
return True
self.__iters -= 1
return False
def set_termed(self, value):
self.__iters = value
termed = property(get_termed, set_termed)
class WorkerTests(ChannelTestCase):
def test_invalid_message(self):
"""
Tests the worker won't delay an invalid message
"""
Channel('asgi.delay').send({'test': 'value'}, immediately=True)
worker = PatchedWorker(channel_layers[DEFAULT_CHANNEL_LAYER])
worker.termed = 1
worker.run()
self.assertEqual(DelayedMessage.objects.count(), 0)
def test_delay_message(self):
"""
Tests the message is delayed and dispatched when due
"""
Channel('asgi.delay').send({
'channel': 'test',
'delay': 1000,
'content': {'test': 'value'}
}, immediately=True)
worker = PatchedWorker(channel_layers[DEFAULT_CHANNEL_LAYER])
worker.termed = 1
worker.run()
self.assertEqual(DelayedMessage.objects.count(), 1)
with mock.patch('django.utils.timezone.now', return_value=timezone.now() + timedelta(milliseconds=1001)):
worker.termed = 1
worker.run()
self.assertEqual(DelayedMessage.objects.count(), 0)
message = self.get_next_message('test', require=True)
self.assertEqual(message.content, {'test': 'value'})
class DelayedMessageTests(ChannelTestCase):
def _create_message(self):
kwargs = {
'content': json.dumps({'test': 'data'}),
'channel_name': 'test',
'delay': 1000 * 5
}
delayed_message = DelayedMessage(**kwargs)
delayed_message.save()
return delayed_message
def test_is_due(self):
message = self._create_message()
self.assertEqual(DelayedMessage.objects.is_due().count(), 0)
with mock.patch('django.utils.timezone.now', return_value=message.due_date + timedelta(milliseconds=1)):
self.assertEqual(DelayedMessage.objects.is_due().count(), 1)
def test_send(self):
message = self._create_message()
message.send(channel_layer=channel_layers[DEFAULT_CHANNEL_LAYER])
self.get_next_message(message.channel_name, require=True)
self.assertEqual(DelayedMessage.objects.count(), 0)

View File

@ -1050,3 +1050,4 @@ Protocol Definitions
/asgi/email
/asgi/udp
/asgi/delay

26
docs/asgi/delay.rst Normal file
View File

@ -0,0 +1,26 @@
===============================================
Delay Protocol ASGI Message Format (Draft Spec)
===============================================
Protocol that allows any ASGI message to be delayed for a given number of milliseconds.
This simple protocol enables developers to schedule ASGI messages to be sent at a time in the future.
It can be used in conjunction with any other channel. This allows you do simple tasks
like scheduling an email to be sent later, to more complex tasks like testing latency in protocols.
Delay
'''''
Send a message to this channel to delay a message.
Channel: ``asgi.delay``
Keys:
* ``channel``: Unicode string specifying the final destination channel for the message after the delay.
* ``delay``: Positive integer specifying the number of milliseconds to delay the message.
* ``content``: Dictionary of unicode string keys for the message content. This should meet the
content specifications for the specified destination channel.

46
docs/delay.rst Normal file
View File

@ -0,0 +1,46 @@
Delay Server
============
Channels has an optional app ``channels.delay`` that implements the :doc:`ASGI Delay Protocol <asgi/delay>`.
The server is exposed through a custom management command ``rundelay`` which listens to
the `asgi.delay` channel for messages to delay.
Getting Started with Delay
==========================
To Install the app add `channels.delay` to `INSTALLED_APPS`::
INSTALLED_APPS = (
...
'channels',
'channels.delay'
)
Run `migrate` to create the tables
`python manage.py migrate`
Run the delay process to start processing messages
`python manage.py rundelay`
Now you're ready to start delaying messages.
Delaying Messages
=================
To delay a message by a fixed number of milliseconds use the `delay` parameter.
Here's an example::
from channels import Channel
delayed_message = {
'channel': 'example_channel',
'content': {'x': 1},
'delay': 10 * 1000
}
# The message will be delayed 10 seconds by the server and then sent
Channel('asgi.delay').send(delayed_message, immediately=True)

View File

@ -50,6 +50,7 @@ Topics
routing
binding
backends
delay
testing
reference
faqs