From 073cbca16ddda9bb194bd04eeba0bfc6eecd437b Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sat, 30 Apr 2016 19:09:46 -0700 Subject: [PATCH] Fixed #116: Allow configuration of worker listening --- channels/management/commands/runworker.py | 11 ++++++- channels/tests/test_worker.py | 37 +++++++++++++++++++++++ channels/worker.py | 25 +++++++++++++-- docs/deploying.rst | 16 +++++++++- 4 files changed, 85 insertions(+), 4 deletions(-) create mode 100644 channels/tests/test_worker.py diff --git a/channels/management/commands/runworker.py b/channels/management/commands/runworker.py index 0cf4498..7f812e5 100644 --- a/channels/management/commands/runworker.py +++ b/channels/management/commands/runworker.py @@ -15,6 +15,10 @@ class Command(BaseCommand): super(Command, self).add_arguments(parser) parser.add_argument('--layer', action='store', dest='layer', default=DEFAULT_CHANNEL_LAYER, help='Channel layer alias to use, if not the default.') + parser.add_argument('--only-channels', action='append', dest='only_channels', + help='Limits this worker to only listening on the provided channels (supports globbing).') + parser.add_argument('--exclude-channels', action='append', dest='exclude_channels', + help='Prevents this worker from listening on the provided channels (supports globbing).') def handle(self, *args, **options): # Get the backend to use @@ -37,7 +41,12 @@ class Command(BaseCommand): callback = self.consumer_called # Run the worker try: - Worker(channel_layer=self.channel_layer, callback=callback).run() + Worker( + channel_layer=self.channel_layer, + callback=callback, + only_channels=options.get("only_channels", None), + exclude_channels=options.get("exclude_channels", None), + ).run() except KeyboardInterrupt: pass diff --git a/channels/tests/test_worker.py b/channels/tests/test_worker.py new file mode 100644 index 0000000..53f8d40 --- /dev/null +++ b/channels/tests/test_worker.py @@ -0,0 +1,37 @@ +from __future__ import unicode_literals +from django.test import SimpleTestCase + +from channels.worker import Worker + + +class WorkerTests(SimpleTestCase): + """ + Tests that the router's routing code works correctly. + """ + + def test_channel_filters(self): + """ + Tests that the include/exclude logic works + """ + # Include + worker = Worker(None, only_channels=["yes.*", "maybe.*"]) + self.assertEqual( + worker.apply_channel_filters(["yes.1", "no.1"]), + ["yes.1"], + ) + self.assertEqual( + worker.apply_channel_filters(["yes.1", "no.1", "maybe.2", "yes"]), + ["yes.1", "maybe.2"], + ) + # Exclude + worker = Worker(None, exclude_channels=["no.*", "maybe.*"]) + self.assertEqual( + worker.apply_channel_filters(["yes.1", "no.1", "maybe.2", "yes"]), + ["yes.1", "yes"], + ) + # Both + worker = Worker(None, exclude_channels=["no.*"], only_channels=["yes.*"]) + self.assertEqual( + worker.apply_channel_filters(["yes.1", "no.1", "maybe.2", "yes"]), + ["yes.1"], + ) diff --git a/channels/worker.py b/channels/worker.py index 4f8278f..1e970e1 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals +import fnmatch import logging import signal import sys @@ -18,11 +19,14 @@ class Worker(object): and runs their consumers. """ - def __init__(self, channel_layer, callback=None, message_retries=10, signal_handlers=True): + def __init__(self, channel_layer, callback=None, message_retries=10, signal_handlers=True, + only_channels=None, exclude_channels=None): self.channel_layer = channel_layer self.callback = callback self.message_retries = message_retries self.signal_handlers = signal_handlers + self.only_channels = only_channels + self.exclude_channels = exclude_channels self.termed = False self.in_job = False @@ -38,13 +42,30 @@ class Worker(object): logger.info("Shutdown signal received while idle, terminating immediately") sys.exit(0) + def apply_channel_filters(self, channels): + """ + Applies our include and exclude filters to the channel list and returns it + """ + if self.only_channels: + channels = [ + channel for channel in channels + if any(fnmatch.fnmatchcase(channel, pattern) for pattern in self.only_channels) + ] + if self.exclude_channels: + channels = [ + channel for channel in channels + if not any(fnmatch.fnmatchcase(channel, pattern) for pattern in self.exclude_channels) + ] + return channels + def run(self): """ Tries to continually dispatch messages to consumers. """ if self.signal_handlers: self.install_signal_handler() - channels = self.channel_layer.router.channels + channels = self.apply_channel_filters(self.channel_layer.router.channels) + logger.info("Listening on channels %s", ", ".join(channels)) while not self.termed: self.in_job = False channel, content = self.channel_layer.receive_many(channels, block=True) diff --git a/docs/deploying.rst b/docs/deploying.rst index f2298d1..e5d16cc 100644 --- a/docs/deploying.rst +++ b/docs/deploying.rst @@ -80,7 +80,21 @@ requests will take longer and longer to return as the messages queue up (until the expiry limit is reached, at which point HTTP connections will start dropping). -TODO: We should probably ship some kind of latency measuring tooling. +In a more complex project, you won't want all your channels being served by the +same workers, especially if you have long-running tasks (if you serve them from +the same workers as HTTP requests, there's a chance long-running tasks could +block up all the workers and delay responding to HTTP requests). + +To manage this, it's possible to tell workers to either limit themselves to +just certain channel names or ignore sepcific channels using the +``--only-channels`` and ``--exclude-channels`` options. Here's an example +of configuring a worker to only serve HTTP and WebSocket requests:: + + python manage.py runworker --only-channels=http.* --only-channels=websocket.* + +Or telling a worker to ignore all messages on the "thumbnail" channel:: + + python manage.py runworker --exclude-channels=thumbnail Run interface servers