mirror of
https://github.com/django/daphne.git
synced 2025-07-10 16:02:18 +03:00
Remove middleware approach, change to simpler one
This commit is contained in:
parent
0fcb93acc2
commit
db0d2975a0
|
@ -3,7 +3,6 @@ from __future__ import unicode_literals
|
||||||
from django.utils import six
|
from django.utils import six
|
||||||
|
|
||||||
from channels import DEFAULT_CHANNEL_LAYER, channel_layers
|
from channels import DEFAULT_CHANNEL_LAYER, channel_layers
|
||||||
from .signals import message_sent
|
|
||||||
|
|
||||||
|
|
||||||
class Channel(object):
|
class Channel(object):
|
||||||
|
@ -37,7 +36,6 @@ class Channel(object):
|
||||||
if not isinstance(content, dict):
|
if not isinstance(content, dict):
|
||||||
raise TypeError("You can only send dicts as content on channels.")
|
raise TypeError("You can only send dicts as content on channels.")
|
||||||
self.channel_layer.send(self.name, content)
|
self.channel_layer.send(self.name, content)
|
||||||
message_sent.send(sender=self.__class__, channel=self.name, keys=list(content.keys()))
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.name
|
return self.name
|
||||||
|
|
|
@ -1,91 +0,0 @@
|
||||||
from __future__ import unicode_literals
|
|
||||||
|
|
||||||
import importlib
|
|
||||||
import threading
|
|
||||||
import warnings
|
|
||||||
from django.conf import settings
|
|
||||||
|
|
||||||
from .exceptions import DenyConnection
|
|
||||||
from .signals import consumer_started, consumer_finished, message_sent
|
|
||||||
|
|
||||||
|
|
||||||
class ConsumerMiddlewareRegistry(object):
|
|
||||||
"""
|
|
||||||
Handles registration (via settings object) and generation of consumer
|
|
||||||
middleware stacks
|
|
||||||
"""
|
|
||||||
|
|
||||||
fixed_middleware = ["channels.consumer_middleware.ConvenienceMiddleware"]
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
# Load middleware callables from settings
|
|
||||||
middleware_paths = self.fixed_middleware + getattr(settings, "CONSUMER_MIDDLEWARE", [])
|
|
||||||
self.middleware_instances = []
|
|
||||||
for path in middleware_paths:
|
|
||||||
module_name, variable_name = path.rsplit(".", 1)
|
|
||||||
try:
|
|
||||||
self.middleware_instances.append(getattr(importlib.import_module(module_name), variable_name))
|
|
||||||
except (ImportError, AttributeError) as e:
|
|
||||||
raise ImproperlyConfigured("Cannot import consumer middleware %r: %s" % (path, e))
|
|
||||||
|
|
||||||
def make_chain(self, consumer, kwargs):
|
|
||||||
"""
|
|
||||||
Returns an instantiated chain of middleware around a final consumer.
|
|
||||||
"""
|
|
||||||
next_layer = lambda message: consumer(message, **kwargs)
|
|
||||||
for middleware_instance in reversed(self.middleware_instances):
|
|
||||||
next_layer = middleware_instance(next_layer)
|
|
||||||
return next_layer
|
|
||||||
|
|
||||||
|
|
||||||
class ConvenienceMiddleware(object):
|
|
||||||
"""
|
|
||||||
Standard middleware which papers over some more explicit parts of ASGI.
|
|
||||||
"""
|
|
||||||
|
|
||||||
runtime_data = threading.local()
|
|
||||||
|
|
||||||
def __init__(self, consumer):
|
|
||||||
self.consumer = consumer
|
|
||||||
|
|
||||||
def __call__(self, message):
|
|
||||||
if message.channel.name == "websocket.connect":
|
|
||||||
# Websocket connect acceptance helper
|
|
||||||
try:
|
|
||||||
self.consumer(message)
|
|
||||||
except DenyConnection:
|
|
||||||
message.reply_channel.send({"accept": False})
|
|
||||||
else:
|
|
||||||
replies_sent = [msg for chan, msg in self.get_messages() if chan == message.reply_channel.name]
|
|
||||||
# If they sent no replies, send implicit acceptance
|
|
||||||
if not replies_sent:
|
|
||||||
warnings.warn("AAAAAAAAAAA", RuntimeWarning)
|
|
||||||
message.reply_channel.send({"accept": True})
|
|
||||||
else:
|
|
||||||
# General path
|
|
||||||
return self.consumer(message)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def reset_messages(cls, **kwargs):
|
|
||||||
"""
|
|
||||||
Tied to the consumer started/ended signal to reset the messages list.
|
|
||||||
"""
|
|
||||||
cls.runtime_data.sent_messages = []
|
|
||||||
|
|
||||||
consumer_started.connect(lambda **kwargs: ConvenienceMiddleware.reset_messages(), weak=False)
|
|
||||||
consumer_finished.connect(lambda **kwargs: ConvenienceMiddleware.reset_messages(), weak=False)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def sent_message(cls, channel, keys, **kwargs):
|
|
||||||
"""
|
|
||||||
Called by message sending interfaces when messages are sent,
|
|
||||||
for convenience errors only. Should not be relied upon to get
|
|
||||||
all messages.
|
|
||||||
"""
|
|
||||||
cls.runtime_data.sent_messages = getattr(cls.runtime_data, "sent_messages", []) + [(channel, keys)]
|
|
||||||
|
|
||||||
message_sent.connect(lambda channel, keys, **kwargs: ConvenienceMiddleware.sent_message(channel, keys), weak=False)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_messages(cls):
|
|
||||||
return getattr(cls.runtime_data, "sent_messages", [])
|
|
|
@ -7,9 +7,5 @@ consumer_finished = Signal()
|
||||||
worker_ready = Signal()
|
worker_ready = Signal()
|
||||||
worker_process_ready = Signal()
|
worker_process_ready = Signal()
|
||||||
|
|
||||||
# Called when a message is sent directly to a channel. Not called for group
|
|
||||||
# sends or direct ASGI usage. For convenience/nicer errors only.
|
|
||||||
message_sent = Signal(providing_args=["channel", "keys"])
|
|
||||||
|
|
||||||
# Connect connection closer to consumer finished as well
|
# Connect connection closer to consumer finished as well
|
||||||
consumer_finished.connect(close_old_connections)
|
consumer_finished.connect(close_old_connections)
|
||||||
|
|
|
@ -9,11 +9,10 @@ import multiprocessing
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
from .signals import consumer_started, consumer_finished
|
from .signals import consumer_started, consumer_finished
|
||||||
from .exceptions import ConsumeLater
|
from .exceptions import ConsumeLater, DenyConnection
|
||||||
from .message import Message
|
from .message import Message
|
||||||
from .utils import name_that_thing
|
from .utils import name_that_thing
|
||||||
from .signals import worker_ready
|
from .signals import worker_ready
|
||||||
from .consumer_middleware import ConsumerMiddlewareRegistry
|
|
||||||
|
|
||||||
logger = logging.getLogger('django.channels')
|
logger = logging.getLogger('django.channels')
|
||||||
|
|
||||||
|
@ -41,7 +40,6 @@ class Worker(object):
|
||||||
self.exclude_channels = exclude_channels
|
self.exclude_channels = exclude_channels
|
||||||
self.termed = False
|
self.termed = False
|
||||||
self.in_job = False
|
self.in_job = False
|
||||||
self.middleware_registry = ConsumerMiddlewareRegistry()
|
|
||||||
|
|
||||||
def install_signal_handler(self):
|
def install_signal_handler(self):
|
||||||
signal.signal(signal.SIGTERM, self.sigterm_handler)
|
signal.signal(signal.SIGTERM, self.sigterm_handler)
|
||||||
|
@ -119,8 +117,12 @@ class Worker(object):
|
||||||
# Send consumer started to manage lifecycle stuff
|
# Send consumer started to manage lifecycle stuff
|
||||||
consumer_started.send(sender=self.__class__, environ={})
|
consumer_started.send(sender=self.__class__, environ={})
|
||||||
# Run consumer
|
# Run consumer
|
||||||
chain = self.middleware_registry.make_chain(consumer, kwargs)
|
consumer(message, **kwargs)
|
||||||
chain(message)
|
except DenyConnection:
|
||||||
|
# They want to deny a WebSocket connection.
|
||||||
|
if message.channel.name != "websocket.connect":
|
||||||
|
raise ValueError("You cannot DenyConnection from a non-websocket.connect handler.")
|
||||||
|
message.reply_channel.send({"accept": False})
|
||||||
except ConsumeLater:
|
except ConsumeLater:
|
||||||
# They want to not handle it yet. Re-inject it with a number-of-tries marker.
|
# They want to not handle it yet. Re-inject it with a number-of-tries marker.
|
||||||
content['__retries__'] = content.get("__retries__", 0) + 1
|
content['__retries__'] = content.get("__retries__", 0) + 1
|
||||||
|
|
|
@ -790,6 +790,9 @@ is received to say if the connection should be accepted or dropped.
|
||||||
|
|
||||||
Behaviour on WebSocket rejection is defined in the Connection section above.
|
Behaviour on WebSocket rejection is defined in the Connection section above.
|
||||||
|
|
||||||
|
If received while the socket is already accepted, the protocol server should
|
||||||
|
log an error, but not do anything.
|
||||||
|
|
||||||
Channel: ``websocket.send!``
|
Channel: ``websocket.send!``
|
||||||
|
|
||||||
Keys:
|
Keys:
|
||||||
|
|
Loading…
Reference in New Issue
Block a user