mirror of
https://github.com/django/daphne.git
synced 2025-07-10 16:02:18 +03:00
Add demultiplexer for class-based consumers (#383)
Avoid coupling between the demultiplexer and consumers.
This commit is contained in:
parent
28666f26cf
commit
21b08b01b8
|
@ -225,3 +225,77 @@ class WebsocketDemultiplexer(JsonWebsocketConsumer):
|
||||||
"stream": stream,
|
"stream": stream,
|
||||||
"payload": payload,
|
"payload": payload,
|
||||||
}, cls=DjangoJSONEncoder)}
|
}, cls=DjangoJSONEncoder)}
|
||||||
|
|
||||||
|
|
||||||
|
class WebsocketConsumerDemultiplexer(WebsocketDemultiplexer):
|
||||||
|
"""
|
||||||
|
Demultiplexer but for consumer classes.
|
||||||
|
|
||||||
|
Set a mapping of streams to consumer classes in the dict "consumers".
|
||||||
|
|
||||||
|
The demultiplexer dispatch the payload of incoming messages to the corresponding
|
||||||
|
consumers. The demultiplexer is forwarded to the consumer as a kwargs "demultiplexer".
|
||||||
|
This allows the consumer to answer with a multiplexed message using a send method
|
||||||
|
from the demultiplexer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Put your JSON consumers here: {stream_name : consumer}
|
||||||
|
consumers = {}
|
||||||
|
|
||||||
|
def receive(self, content, **kwargs):
|
||||||
|
"""Forward messages to all consumers."""
|
||||||
|
# Check the frame looks good
|
||||||
|
if isinstance(content, dict) and "stream" in content and "payload" in content:
|
||||||
|
# Match it to a channel
|
||||||
|
for stream, consumer in self.consumers.items():
|
||||||
|
if stream == content['stream']:
|
||||||
|
# Extract payload and add in reply_channel
|
||||||
|
payload = content['payload']
|
||||||
|
if not isinstance(payload, dict):
|
||||||
|
raise ValueError("Multiplexed frame payload is not a dict")
|
||||||
|
# The json consumer expects serialized JSON
|
||||||
|
self.message.content['text'] = json.dumps(payload)
|
||||||
|
# Send demultiplexer to the consumer, to be able to answer
|
||||||
|
kwargs['multiplexer'] = Multiplexer(stream, self)
|
||||||
|
consumer(self.message, **kwargs)
|
||||||
|
return
|
||||||
|
|
||||||
|
raise ValueError("Invalid multiplexed frame received (stream not mapped)")
|
||||||
|
else:
|
||||||
|
raise ValueError("Invalid multiplexed **frame received (no channel/payload key)")
|
||||||
|
|
||||||
|
def connect(self, message, **kwargs):
|
||||||
|
"""Forward connection to all consumers."""
|
||||||
|
for stream, consumer in self.consumers.items():
|
||||||
|
kwargs['multiplexer'] = Multiplexer(stream, self)
|
||||||
|
consumer(message, **kwargs)
|
||||||
|
|
||||||
|
def disconnect(self, message, **kwargs):
|
||||||
|
"""Forward disconnection to all consumers."""
|
||||||
|
for stream, consumer in self.consumers.items():
|
||||||
|
kwargs['multiplexer'] = Multiplexer(stream, self)
|
||||||
|
consumer(message, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class Multiplexer(object):
|
||||||
|
"""
|
||||||
|
The opposite of the demultiplexer, to send a message though a multiplexed channel.
|
||||||
|
|
||||||
|
The demultiplexer holds the mapping and the basic send function.
|
||||||
|
The multiplexer allows the consumer class to be independant of the stream name.
|
||||||
|
"""
|
||||||
|
|
||||||
|
stream = None
|
||||||
|
demultiplexer = None
|
||||||
|
|
||||||
|
def __init__(self, stream, demultiplexer):
|
||||||
|
self.stream = stream
|
||||||
|
self.demultiplexer = demultiplexer
|
||||||
|
|
||||||
|
def send(self, payload):
|
||||||
|
"""Multiplex the payload using the stream name and send it."""
|
||||||
|
self.demultiplexer.send(self.stream, payload)
|
||||||
|
|
||||||
|
def group_send(self, name, payload, close=False):
|
||||||
|
"""Proxy that abstracts the stream name"""
|
||||||
|
self.demultiplexer.group_send(name, self.stream, payload, close)
|
||||||
|
|
|
@ -105,6 +105,13 @@ class Client(object):
|
||||||
return
|
return
|
||||||
return Message(content, recv_channel, channel_layers[self.alias])
|
return Message(content, recv_channel, channel_layers[self.alias])
|
||||||
|
|
||||||
|
def get_consumer_by_channel(self, channel):
|
||||||
|
message = Message({'text': ''}, channel, self.channel_layer)
|
||||||
|
match = self.channel_layer.router.match(message)
|
||||||
|
if match:
|
||||||
|
consumer, kwargs = match
|
||||||
|
return consumer
|
||||||
|
|
||||||
def send(self, to, content={}):
|
def send(self, to, content={}):
|
||||||
"""
|
"""
|
||||||
Send a message to a channel.
|
Send a message to a channel.
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
from django.test import override_settings
|
from django.test import override_settings
|
||||||
|
|
||||||
from channels import route_class
|
from channels import route_class
|
||||||
|
@ -129,3 +131,57 @@ class GenericTests(ChannelTestCase):
|
||||||
|
|
||||||
client.send_and_consume('mychannel', {'name': 'filter'})
|
client.send_and_consume('mychannel', {'name': 'filter'})
|
||||||
self.assertEqual(client.receive(), {'trigger': 'from_as_route'})
|
self.assertEqual(client.receive(), {'trigger': 'from_as_route'})
|
||||||
|
|
||||||
|
def test_websockets_demultiplexer(self):
|
||||||
|
|
||||||
|
class MyWebsocketConsumer(websockets.JsonWebsocketConsumer):
|
||||||
|
def connect(self, message, multiplexer=None, **kwargs):
|
||||||
|
multiplexer.send(kwargs)
|
||||||
|
|
||||||
|
def disconnect(self, message, multiplexer=None, **kwargs):
|
||||||
|
multiplexer.send(kwargs)
|
||||||
|
|
||||||
|
def receive(self, content, multiplexer=None, **kwargs):
|
||||||
|
multiplexer.send(content)
|
||||||
|
|
||||||
|
class Demultiplexer(websockets.WebsocketConsumerDemultiplexer):
|
||||||
|
|
||||||
|
consumers = {
|
||||||
|
"mystream": MyWebsocketConsumer
|
||||||
|
}
|
||||||
|
|
||||||
|
with apply_routes([
|
||||||
|
route_class(Demultiplexer, path='/path/(?P<id>\d+)'),
|
||||||
|
route_class(MyWebsocketConsumer),
|
||||||
|
]):
|
||||||
|
client = Client()
|
||||||
|
|
||||||
|
client.send_and_consume('websocket.connect', {'path': '/path/1'})
|
||||||
|
self.assertEqual(client.receive(), {
|
||||||
|
"text": json.dumps({
|
||||||
|
"stream": "mystream",
|
||||||
|
"payload": {"id": "1"},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
client.send_and_consume('websocket.receive', {
|
||||||
|
'path': '/path/1',
|
||||||
|
'text': json.dumps({
|
||||||
|
"stream": "mystream",
|
||||||
|
"payload": {"text_field": "mytext"}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
self.assertEqual(client.receive(), {
|
||||||
|
"text": json.dumps({
|
||||||
|
"stream": "mystream",
|
||||||
|
"payload": {"text_field": "mytext"},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
client.send_and_consume('websocket.disconnect', {'path': '/path/1'})
|
||||||
|
self.assertEqual(client.receive(), {
|
||||||
|
"text": json.dumps({
|
||||||
|
"stream": "mystream",
|
||||||
|
"payload": {"id": "1"},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
|
@ -198,11 +198,31 @@ channel name. It will then forward the message onto that channel while
|
||||||
preserving ``reply_channel``, so you can hook consumers up to them directly
|
preserving ``reply_channel``, so you can hook consumers up to them directly
|
||||||
in the ``routing.py`` file, and use authentication decorators as you wish.
|
in the ``routing.py`` file, and use authentication decorators as you wish.
|
||||||
|
|
||||||
You cannot use class-based consumers this way as the messages are no
|
|
||||||
longer in WebSocket format, though. If you need to do operations on
|
Example using class-based consumer::
|
||||||
``connect`` or ``disconnect``, override those methods on the ``Demultiplexer``
|
|
||||||
itself (you can also provide a ``connection_groups`` method, as it's just
|
from channels.generic.websockets import WebsocketConsumerDemultiplexer, JsonWebsocketConsumer
|
||||||
based on the JSON WebSocket generic consumer).
|
|
||||||
|
class MyWebsocketConsumer(JsonWebsocketConsumer):
|
||||||
|
def connect(self, message, multiplexer=None, **kwargs):
|
||||||
|
multiplexer.send({"status": "I just connected!"})
|
||||||
|
|
||||||
|
def disconnect(self, message, multiplexer=None, **kwargs):
|
||||||
|
print(multiplexer.stream)
|
||||||
|
|
||||||
|
def receive(self, content, multiplexer=None, **kwargs):
|
||||||
|
# simple echo
|
||||||
|
multiplexer.send(content)
|
||||||
|
|
||||||
|
class Demultiplexer(WebsocketConsumerDemultiplexer):
|
||||||
|
|
||||||
|
# Put your JSON consumers here: {stream_name : consumer}
|
||||||
|
consumers = {
|
||||||
|
"mystream": MyWebsocketConsumer
|
||||||
|
}
|
||||||
|
|
||||||
|
The ``multiplexer`` allows the consumer class to be independant of the stream name.
|
||||||
|
It holds the stream name and the demultiplexer on the attributes ``stream`` and ``demultiplexer``.
|
||||||
|
|
||||||
The :doc:`data binding <binding>` code will also send out messages to clients
|
The :doc:`data binding <binding>` code will also send out messages to clients
|
||||||
in the same format, and you can encode things in this format yourself by
|
in the same format, and you can encode things in this format yourself by
|
||||||
|
|
Loading…
Reference in New Issue
Block a user