From 3dddefa8453e4e9595a52a182ddcee316ffd6980 Mon Sep 17 00:00:00 2001 From: Sam Bolgert Date: Thu, 24 Nov 2016 12:54:03 -0600 Subject: [PATCH] Delay Protocol Server (#401) * Add Delay Protocol Server Add a process that listens to a specific channel and delays incoming messages by a given time. * Add custom django command rundelay * Add test suite * Implements #115 * Add channels.delay app * Add AppConfig * Move rundelay command to channels.delay app * Refactor DelayedMessage into model Move login into a database backed model. * Update Worker * Add migration * Add delay docs page * Add to TOC * Fix import sorting * Add ASGI spec document for Delay Protocol * Update channels.delay doc with new channel name * remove interval docs * Refactor Delay to use milliseconds instead of seconds Use milliseconds as the default unit. Gives more control to developers. * Remove interval logic from DelayedMessage * Remove interval tests * Tweak test logic to use milliseconds --- channels/delay/__init__.py | 1 + channels/delay/apps.py | 8 ++ channels/delay/management/__init__.py | 0 .../delay/management/commands/__init__.py | 0 .../delay/management/commands/rundelay.py | 39 +++++++ channels/delay/migrations/0001_initial.py | 25 +++++ channels/delay/migrations/__init__.py | 0 channels/delay/models.py | 44 ++++++++ channels/delay/worker.py | 82 ++++++++++++++ channels/tests/settings.py | 1 + channels/tests/test_delay.py | 102 ++++++++++++++++++ docs/asgi.rst | 1 + docs/asgi/delay.rst | 26 +++++ docs/delay.rst | 46 ++++++++ docs/index.rst | 1 + 15 files changed, 376 insertions(+) create mode 100644 channels/delay/__init__.py create mode 100644 channels/delay/apps.py create mode 100644 channels/delay/management/__init__.py create mode 100644 channels/delay/management/commands/__init__.py create mode 100644 channels/delay/management/commands/rundelay.py create mode 100644 channels/delay/migrations/0001_initial.py create mode 100644 channels/delay/migrations/__init__.py create mode 100644 channels/delay/models.py create mode 100644 channels/delay/worker.py create mode 100644 channels/tests/test_delay.py create mode 100644 docs/asgi/delay.rst create mode 100644 docs/delay.rst diff --git a/channels/delay/__init__.py b/channels/delay/__init__.py new file mode 100644 index 0000000..389cd5b --- /dev/null +++ b/channels/delay/__init__.py @@ -0,0 +1 @@ +default_app_config = 'channels.delay.apps.DelayConfig' diff --git a/channels/delay/apps.py b/channels/delay/apps.py new file mode 100644 index 0000000..f68802b --- /dev/null +++ b/channels/delay/apps.py @@ -0,0 +1,8 @@ +from django.apps import AppConfig + + +class DelayConfig(AppConfig): + + name = "channels.delay" + label = "channels.delay" + verbose_name = "Channels Delay" diff --git a/channels/delay/management/__init__.py b/channels/delay/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/channels/delay/management/commands/__init__.py b/channels/delay/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/channels/delay/management/commands/rundelay.py b/channels/delay/management/commands/rundelay.py new file mode 100644 index 0000000..0a3e719 --- /dev/null +++ b/channels/delay/management/commands/rundelay.py @@ -0,0 +1,39 @@ +from __future__ import unicode_literals + +from django.core.management import BaseCommand, CommandError + +from channels import DEFAULT_CHANNEL_LAYER, channel_layers +from channels.delay.worker import Worker +from channels.log import setup_logger + + +class Command(BaseCommand): + + leave_locale_alone = True + + def add_arguments(self, parser): + super(Command, self).add_arguments(parser) + parser.add_argument( + '--layer', action='store', dest='layer', default=DEFAULT_CHANNEL_LAYER, + help='Channel layer alias to use, if not the default.', + ) + + def handle(self, *args, **options): + self.verbosity = options.get("verbosity", 1) + self.logger = setup_logger('django.channels', self.verbosity) + self.channel_layer = channel_layers[options.get("layer", DEFAULT_CHANNEL_LAYER)] + # Check that handler isn't inmemory + if self.channel_layer.local_only(): + raise CommandError( + "You cannot span multiple processes with the in-memory layer. " + + "Change your settings to use a cross-process channel layer." + ) + self.options = options + self.logger.info("Running delay against channel layer %s", self.channel_layer) + try: + worker = Worker( + channel_layer=self.channel_layer, + ) + worker.run() + except KeyboardInterrupt: + pass diff --git a/channels/delay/migrations/0001_initial.py b/channels/delay/migrations/0001_initial.py new file mode 100644 index 0000000..82e85f9 --- /dev/null +++ b/channels/delay/migrations/0001_initial.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.9.7 on 2016-10-21 01:14 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='DelayedMessage', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('due_date', models.DateTimeField(db_index=True)), + ('channel_name', models.CharField(max_length=512)), + ('content', models.TextField()), + ], + ), + ] diff --git a/channels/delay/migrations/__init__.py b/channels/delay/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/channels/delay/models.py b/channels/delay/models.py new file mode 100644 index 0000000..4bff090 --- /dev/null +++ b/channels/delay/models.py @@ -0,0 +1,44 @@ +import json +from datetime import timedelta + +from django.db import models +from django.utils import timezone + +from channels import DEFAULT_CHANNEL_LAYER, Channel, channel_layers + + +class DelayedMessageQuerySet(models.QuerySet): + + def is_due(self): + return self.filter(due_date__lte=timezone.now()) + + +class DelayedMessage(models.Model): + + due_date = models.DateTimeField(db_index=True) + channel_name = models.CharField(max_length=512) + content = models.TextField() + + objects = DelayedMessageQuerySet.as_manager() + + @property + def delay(self): + return self._delay + + @delay.setter + def delay(self, milliseconds): + self._delay = milliseconds + self.due_date = timezone.now() + timedelta(milliseconds=milliseconds) + + def send(self, channel_layer=None): + """ + Sends the message on the configured channel with the stored content. + + Deletes the DelayedMessage record. + + Args: + channel_layer: optional channel_layer to use + """ + channel_layer = channel_layer or channel_layers[DEFAULT_CHANNEL_LAYER] + Channel(self.channel_name, channel_layer=channel_layer).send(json.loads(self.content), immediately=True) + self.delete() diff --git a/channels/delay/worker.py b/channels/delay/worker.py new file mode 100644 index 0000000..c2e554b --- /dev/null +++ b/channels/delay/worker.py @@ -0,0 +1,82 @@ +from __future__ import unicode_literals + +import json +import logging +import signal +import sys +import time + +from django.core.exceptions import ValidationError + +from .models import DelayedMessage + +logger = logging.getLogger('django.channels') + + +class Worker(object): + """Worker class that listens to channels.delay messages and dispatches messages""" + + def __init__( + self, + channel_layer, + signal_handlers=True, + ): + self.channel_layer = channel_layer + self.signal_handlers = signal_handlers + self.termed = False + self.in_job = False + + def install_signal_handler(self): + signal.signal(signal.SIGTERM, self.sigterm_handler) + signal.signal(signal.SIGINT, self.sigterm_handler) + + def sigterm_handler(self, signo, stack_frame): + self.termed = True + if self.in_job: + logger.info("Shutdown signal received while busy, waiting for loop termination") + else: + logger.info("Shutdown signal received while idle, terminating immediately") + sys.exit(0) + + def run(self): + if self.signal_handlers: + self.install_signal_handler() + + logger.info("Listening on asgi.delay") + + while not self.termed: + self.in_job = False + channel, content = self.channel_layer.receive_many(['asgi.delay']) + self.in_job = True + + if channel is not None: + logger.debug("Got message on asgi.delay") + + if 'channel' not in content or \ + 'content' not in content or \ + 'delay' not in content: + logger.error("Invalid message received, it must contain keys 'channel', 'content', " + "and 'delay'.") + break + + message = DelayedMessage( + content=json.dumps(content['content']), + channel_name=content['channel'], + delay=content['delay'] + ) + + try: + message.full_clean() + except ValidationError as err: + logger.error("Invalid message received: %s:%s", err.error_dict.keys(), err.messages) + break + message.save() + # check for messages to send + if not DelayedMessage.objects.is_due().count(): + logger.debug("No delayed messages waiting.") + time.sleep(0.01) + continue + + for message in DelayedMessage.objects.is_due().all(): + logger.info("Delayed message due. Sending message to channel %s", message.channel_name) + message.send(channel_layer=self.channel_layer) diff --git a/channels/tests/settings.py b/channels/tests/settings.py index d720f33..2ddf8ac 100644 --- a/channels/tests/settings.py +++ b/channels/tests/settings.py @@ -6,6 +6,7 @@ INSTALLED_APPS = ( 'django.contrib.sessions', 'django.contrib.admin', 'channels', + 'channels.delay' ) DATABASES = { diff --git a/channels/tests/test_delay.py b/channels/tests/test_delay.py new file mode 100644 index 0000000..8d22d0f --- /dev/null +++ b/channels/tests/test_delay.py @@ -0,0 +1,102 @@ +from __future__ import unicode_literals + +import json +from datetime import timedelta + +from django.utils import timezone + +from channels import DEFAULT_CHANNEL_LAYER, Channel, channel_layers +from channels.delay.models import DelayedMessage +from channels.delay.worker import Worker +from channels.tests import ChannelTestCase + +try: + from unittest import mock +except ImportError: + import mock + + +class PatchedWorker(Worker): + """Worker with specific numbers of loops""" + def get_termed(self): + if not self.__iters: + return True + self.__iters -= 1 + return False + + def set_termed(self, value): + self.__iters = value + + termed = property(get_termed, set_termed) + + +class WorkerTests(ChannelTestCase): + + def test_invalid_message(self): + """ + Tests the worker won't delay an invalid message + """ + Channel('asgi.delay').send({'test': 'value'}, immediately=True) + + worker = PatchedWorker(channel_layers[DEFAULT_CHANNEL_LAYER]) + worker.termed = 1 + + worker.run() + + self.assertEqual(DelayedMessage.objects.count(), 0) + + def test_delay_message(self): + """ + Tests the message is delayed and dispatched when due + """ + Channel('asgi.delay').send({ + 'channel': 'test', + 'delay': 1000, + 'content': {'test': 'value'} + }, immediately=True) + + worker = PatchedWorker(channel_layers[DEFAULT_CHANNEL_LAYER]) + worker.termed = 1 + + worker.run() + + self.assertEqual(DelayedMessage.objects.count(), 1) + + with mock.patch('django.utils.timezone.now', return_value=timezone.now() + timedelta(milliseconds=1001)): + worker.termed = 1 + worker.run() + + self.assertEqual(DelayedMessage.objects.count(), 0) + + message = self.get_next_message('test', require=True) + self.assertEqual(message.content, {'test': 'value'}) + + +class DelayedMessageTests(ChannelTestCase): + + def _create_message(self): + kwargs = { + 'content': json.dumps({'test': 'data'}), + 'channel_name': 'test', + 'delay': 1000 * 5 + } + delayed_message = DelayedMessage(**kwargs) + delayed_message.save() + + return delayed_message + + def test_is_due(self): + message = self._create_message() + + self.assertEqual(DelayedMessage.objects.is_due().count(), 0) + + with mock.patch('django.utils.timezone.now', return_value=message.due_date + timedelta(milliseconds=1)): + self.assertEqual(DelayedMessage.objects.is_due().count(), 1) + + def test_send(self): + message = self._create_message() + message.send(channel_layer=channel_layers[DEFAULT_CHANNEL_LAYER]) + + self.get_next_message(message.channel_name, require=True) + + self.assertEqual(DelayedMessage.objects.count(), 0) diff --git a/docs/asgi.rst b/docs/asgi.rst index 3ea981b..dcd57d2 100644 --- a/docs/asgi.rst +++ b/docs/asgi.rst @@ -1050,3 +1050,4 @@ Protocol Definitions /asgi/email /asgi/udp + /asgi/delay diff --git a/docs/asgi/delay.rst b/docs/asgi/delay.rst new file mode 100644 index 0000000..6edc19a --- /dev/null +++ b/docs/asgi/delay.rst @@ -0,0 +1,26 @@ +=============================================== +Delay Protocol ASGI Message Format (Draft Spec) +=============================================== + +Protocol that allows any ASGI message to be delayed for a given number of milliseconds. + +This simple protocol enables developers to schedule ASGI messages to be sent at a time in the future. +It can be used in conjunction with any other channel. This allows you do simple tasks +like scheduling an email to be sent later, to more complex tasks like testing latency in protocols. + + +Delay +''''' + +Send a message to this channel to delay a message. + +Channel: ``asgi.delay`` + +Keys: + + * ``channel``: Unicode string specifying the final destination channel for the message after the delay. + + * ``delay``: Positive integer specifying the number of milliseconds to delay the message. + + * ``content``: Dictionary of unicode string keys for the message content. This should meet the +content specifications for the specified destination channel. diff --git a/docs/delay.rst b/docs/delay.rst new file mode 100644 index 0000000..1539fd6 --- /dev/null +++ b/docs/delay.rst @@ -0,0 +1,46 @@ +Delay Server +============ + +Channels has an optional app ``channels.delay`` that implements the :doc:`ASGI Delay Protocol `. + +The server is exposed through a custom management command ``rundelay`` which listens to +the `asgi.delay` channel for messages to delay. + + +Getting Started with Delay +========================== + +To Install the app add `channels.delay` to `INSTALLED_APPS`:: + + INSTALLED_APPS = ( + ... + 'channels', + 'channels.delay' + ) + +Run `migrate` to create the tables + +`python manage.py migrate` + +Run the delay process to start processing messages + +`python manage.py rundelay` + +Now you're ready to start delaying messages. + +Delaying Messages +================= + +To delay a message by a fixed number of milliseconds use the `delay` parameter. + +Here's an example:: + + from channels import Channel + + delayed_message = { + 'channel': 'example_channel', + 'content': {'x': 1}, + 'delay': 10 * 1000 + } + # The message will be delayed 10 seconds by the server and then sent + Channel('asgi.delay').send(delayed_message, immediately=True) diff --git a/docs/index.rst b/docs/index.rst index 9125e55..b4c12cd 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -50,6 +50,7 @@ Topics routing binding backends + delay testing reference faqs