mirror of
https://github.com/django/daphne.git
synced 2025-07-10 16:02:18 +03:00
Fixed #116: Allow configuration of worker listening
This commit is contained in:
parent
c579f27f6d
commit
073cbca16d
|
@ -15,6 +15,10 @@ class Command(BaseCommand):
|
||||||
super(Command, self).add_arguments(parser)
|
super(Command, self).add_arguments(parser)
|
||||||
parser.add_argument('--layer', action='store', dest='layer', default=DEFAULT_CHANNEL_LAYER,
|
parser.add_argument('--layer', action='store', dest='layer', default=DEFAULT_CHANNEL_LAYER,
|
||||||
help='Channel layer alias to use, if not the default.')
|
help='Channel layer alias to use, if not the default.')
|
||||||
|
parser.add_argument('--only-channels', action='append', dest='only_channels',
|
||||||
|
help='Limits this worker to only listening on the provided channels (supports globbing).')
|
||||||
|
parser.add_argument('--exclude-channels', action='append', dest='exclude_channels',
|
||||||
|
help='Prevents this worker from listening on the provided channels (supports globbing).')
|
||||||
|
|
||||||
def handle(self, *args, **options):
|
def handle(self, *args, **options):
|
||||||
# Get the backend to use
|
# Get the backend to use
|
||||||
|
@ -37,7 +41,12 @@ class Command(BaseCommand):
|
||||||
callback = self.consumer_called
|
callback = self.consumer_called
|
||||||
# Run the worker
|
# Run the worker
|
||||||
try:
|
try:
|
||||||
Worker(channel_layer=self.channel_layer, callback=callback).run()
|
Worker(
|
||||||
|
channel_layer=self.channel_layer,
|
||||||
|
callback=callback,
|
||||||
|
only_channels=options.get("only_channels", None),
|
||||||
|
exclude_channels=options.get("exclude_channels", None),
|
||||||
|
).run()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
37
channels/tests/test_worker.py
Normal file
37
channels/tests/test_worker.py
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
from django.test import SimpleTestCase
|
||||||
|
|
||||||
|
from channels.worker import Worker
|
||||||
|
|
||||||
|
|
||||||
|
class WorkerTests(SimpleTestCase):
|
||||||
|
"""
|
||||||
|
Tests that the router's routing code works correctly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_channel_filters(self):
|
||||||
|
"""
|
||||||
|
Tests that the include/exclude logic works
|
||||||
|
"""
|
||||||
|
# Include
|
||||||
|
worker = Worker(None, only_channels=["yes.*", "maybe.*"])
|
||||||
|
self.assertEqual(
|
||||||
|
worker.apply_channel_filters(["yes.1", "no.1"]),
|
||||||
|
["yes.1"],
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
worker.apply_channel_filters(["yes.1", "no.1", "maybe.2", "yes"]),
|
||||||
|
["yes.1", "maybe.2"],
|
||||||
|
)
|
||||||
|
# Exclude
|
||||||
|
worker = Worker(None, exclude_channels=["no.*", "maybe.*"])
|
||||||
|
self.assertEqual(
|
||||||
|
worker.apply_channel_filters(["yes.1", "no.1", "maybe.2", "yes"]),
|
||||||
|
["yes.1", "yes"],
|
||||||
|
)
|
||||||
|
# Both
|
||||||
|
worker = Worker(None, exclude_channels=["no.*"], only_channels=["yes.*"])
|
||||||
|
self.assertEqual(
|
||||||
|
worker.apply_channel_filters(["yes.1", "no.1", "maybe.2", "yes"]),
|
||||||
|
["yes.1"],
|
||||||
|
)
|
|
@ -1,5 +1,6 @@
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import fnmatch
|
||||||
import logging
|
import logging
|
||||||
import signal
|
import signal
|
||||||
import sys
|
import sys
|
||||||
|
@ -18,11 +19,14 @@ class Worker(object):
|
||||||
and runs their consumers.
|
and runs their consumers.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, channel_layer, callback=None, message_retries=10, signal_handlers=True):
|
def __init__(self, channel_layer, callback=None, message_retries=10, signal_handlers=True,
|
||||||
|
only_channels=None, exclude_channels=None):
|
||||||
self.channel_layer = channel_layer
|
self.channel_layer = channel_layer
|
||||||
self.callback = callback
|
self.callback = callback
|
||||||
self.message_retries = message_retries
|
self.message_retries = message_retries
|
||||||
self.signal_handlers = signal_handlers
|
self.signal_handlers = signal_handlers
|
||||||
|
self.only_channels = only_channels
|
||||||
|
self.exclude_channels = exclude_channels
|
||||||
self.termed = False
|
self.termed = False
|
||||||
self.in_job = False
|
self.in_job = False
|
||||||
|
|
||||||
|
@ -38,13 +42,30 @@ class Worker(object):
|
||||||
logger.info("Shutdown signal received while idle, terminating immediately")
|
logger.info("Shutdown signal received while idle, terminating immediately")
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
|
def apply_channel_filters(self, channels):
|
||||||
|
"""
|
||||||
|
Applies our include and exclude filters to the channel list and returns it
|
||||||
|
"""
|
||||||
|
if self.only_channels:
|
||||||
|
channels = [
|
||||||
|
channel for channel in channels
|
||||||
|
if any(fnmatch.fnmatchcase(channel, pattern) for pattern in self.only_channels)
|
||||||
|
]
|
||||||
|
if self.exclude_channels:
|
||||||
|
channels = [
|
||||||
|
channel for channel in channels
|
||||||
|
if not any(fnmatch.fnmatchcase(channel, pattern) for pattern in self.exclude_channels)
|
||||||
|
]
|
||||||
|
return channels
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""
|
"""
|
||||||
Tries to continually dispatch messages to consumers.
|
Tries to continually dispatch messages to consumers.
|
||||||
"""
|
"""
|
||||||
if self.signal_handlers:
|
if self.signal_handlers:
|
||||||
self.install_signal_handler()
|
self.install_signal_handler()
|
||||||
channels = self.channel_layer.router.channels
|
channels = self.apply_channel_filters(self.channel_layer.router.channels)
|
||||||
|
logger.info("Listening on channels %s", ", ".join(channels))
|
||||||
while not self.termed:
|
while not self.termed:
|
||||||
self.in_job = False
|
self.in_job = False
|
||||||
channel, content = self.channel_layer.receive_many(channels, block=True)
|
channel, content = self.channel_layer.receive_many(channels, block=True)
|
||||||
|
|
|
@ -80,7 +80,21 @@ requests will take longer and longer to return as the messages queue up
|
||||||
(until the expiry limit is reached, at which point HTTP connections will
|
(until the expiry limit is reached, at which point HTTP connections will
|
||||||
start dropping).
|
start dropping).
|
||||||
|
|
||||||
TODO: We should probably ship some kind of latency measuring tooling.
|
In a more complex project, you won't want all your channels being served by the
|
||||||
|
same workers, especially if you have long-running tasks (if you serve them from
|
||||||
|
the same workers as HTTP requests, there's a chance long-running tasks could
|
||||||
|
block up all the workers and delay responding to HTTP requests).
|
||||||
|
|
||||||
|
To manage this, it's possible to tell workers to either limit themselves to
|
||||||
|
just certain channel names or ignore sepcific channels using the
|
||||||
|
``--only-channels`` and ``--exclude-channels`` options. Here's an example
|
||||||
|
of configuring a worker to only serve HTTP and WebSocket requests::
|
||||||
|
|
||||||
|
python manage.py runworker --only-channels=http.* --only-channels=websocket.*
|
||||||
|
|
||||||
|
Or telling a worker to ignore all messages on the "thumbnail" channel::
|
||||||
|
|
||||||
|
python manage.py runworker --exclude-channels=thumbnail
|
||||||
|
|
||||||
|
|
||||||
Run interface servers
|
Run interface servers
|
||||||
|
|
Loading…
Reference in New Issue
Block a user