Replace multiplexer with class demultiplexer

Update documentation
Ensure send is not available on demultiplexed consumer classes
Data binding needs fixing
This commit is contained in:
raphael.boucher 2016-12-11 14:21:59 +01:00 committed by Andrew Godwin
parent 21b08b01b8
commit 33dbc4a184
5 changed files with 105 additions and 92 deletions

View File

@ -3,7 +3,7 @@ import json
from django.core import serializers from django.core import serializers
from django.core.serializers.json import DjangoJSONEncoder from django.core.serializers.json import DjangoJSONEncoder
from ..generic.websockets import WebsocketDemultiplexer from ..generic.websockets import WebsocketMultiplexer
from ..sessions import enforce_ordering from ..sessions import enforce_ordering
from .base import Binding from .base import Binding
@ -40,7 +40,7 @@ class WebsocketBinding(Binding):
# Outbound # Outbound
@classmethod @classmethod
def encode(cls, stream, payload): def encode(cls, stream, payload):
return WebsocketDemultiplexer.encode(stream, payload) return WebsocketMultiplexer.encode(stream, payload)
def serialize(self, instance, action): def serialize(self, instance, action):
payload = { payload = {

View File

@ -33,7 +33,14 @@ class RequestAborted(Exception):
class DenyConnection(Exception): class DenyConnection(Exception):
""" """
Raise during a websocket.connect (or other supported connection) handler Raised during a websocket.connect (or other supported connection) handler
to deny the connection. to deny the connection.
""" """
pass pass
class SendNotAvailableOnDemultiplexer(Exception):
"""
Raised when trying to send with a WebsocketDemultiplexer. Use the multiplexer instead.
"""
pass

View File

@ -1,8 +1,9 @@
from django.core.serializers.json import DjangoJSONEncoder, json from django.core.serializers.json import DjangoJSONEncoder, json
from ..auth import channel_session_user_from_http from ..auth import channel_session_user_from_http
from ..channel import Channel, Group from ..channel import Group
from ..sessions import enforce_ordering from ..sessions import enforce_ordering
from ..exceptions import SendNotAvailableOnDemultiplexer
from .base import BaseConsumer from .base import BaseConsumer
@ -176,67 +177,16 @@ class WebsocketDemultiplexer(JsonWebsocketConsumer):
in a sub-dict called "payload". This lets you run multiple streams over in a sub-dict called "payload". This lets you run multiple streams over
a single WebSocket connection in a standardised way. a single WebSocket connection in a standardised way.
Incoming messages on streams are mapped into a custom channel so you can Incoming messages on streams are dispatched to consumers so you can
just tie in consumers the normal way. The reply_channels are kept so just tie in consumers the normal way. The reply_channels are kept so
sessions/auth continue to work. Payloads must be a dict at the top level, sessions/auth continue to work. Payloads must be a dict at the top level,
so they fulfill the Channels message spec. so they fulfill the Channels message spec.
Set a mapping from streams to channels in the "mapping" key. We make you To answer with a multiplexed message, a multiplexer object
whitelist channels like this to allow different namespaces and for security with "send" and "group_send" methods is forwarded to the consumer as a kwargs
reasons (imagine if someone could inject straight into websocket.receive). "multiplexer".
"""
mapping = {} Set a mapping of streams to consumer classes in the "consumers" keyword.
def receive(self, content, **kwargs):
# Check the frame looks good
if isinstance(content, dict) and "stream" in content and "payload" in content:
# Match it to a channel
stream = content['stream']
if stream in self.mapping:
# Extract payload and add in reply_channel
payload = content['payload']
if not isinstance(payload, dict):
raise ValueError("Multiplexed frame payload is not a dict")
payload['reply_channel'] = self.message['reply_channel']
# Send it onto the new channel
Channel(self.mapping[stream]).send(payload)
else:
raise ValueError("Invalid multiplexed frame received (stream not mapped)")
else:
raise ValueError("Invalid multiplexed frame received (no channel/payload key)")
def send(self, stream, payload):
self.message.reply_channel.send(self.encode(stream, payload))
@classmethod
def group_send(cls, name, stream, payload, close=False):
message = cls.encode(stream, payload)
if close:
message["close"] = True
Group(name).send(message)
@classmethod
def encode(cls, stream, payload):
"""
Encodes stream + payload for outbound sending.
"""
return {"text": json.dumps({
"stream": stream,
"payload": payload,
}, cls=DjangoJSONEncoder)}
class WebsocketConsumerDemultiplexer(WebsocketDemultiplexer):
"""
Demultiplexer but for consumer classes.
Set a mapping of streams to consumer classes in the dict "consumers".
The demultiplexer dispatch the payload of incoming messages to the corresponding
consumers. The demultiplexer is forwarded to the consumer as a kwargs "demultiplexer".
This allows the consumer to answer with a multiplexed message using a send method
from the demultiplexer.
""" """
# Put your JSON consumers here: {stream_name : consumer} # Put your JSON consumers here: {stream_name : consumer}
@ -256,7 +206,10 @@ class WebsocketConsumerDemultiplexer(WebsocketDemultiplexer):
# The json consumer expects serialized JSON # The json consumer expects serialized JSON
self.message.content['text'] = json.dumps(payload) self.message.content['text'] = json.dumps(payload)
# Send demultiplexer to the consumer, to be able to answer # Send demultiplexer to the consumer, to be able to answer
kwargs['multiplexer'] = Multiplexer(stream, self) kwargs['multiplexer'] = WebsocketMultiplexer(stream, self.message.reply_channel)
# Patch send to avoid sending not formated messages from the consumer
consumer.send = self.send
# Dispatch message
consumer(self.message, **kwargs) consumer(self.message, **kwargs)
return return
@ -267,35 +220,55 @@ class WebsocketConsumerDemultiplexer(WebsocketDemultiplexer):
def connect(self, message, **kwargs): def connect(self, message, **kwargs):
"""Forward connection to all consumers.""" """Forward connection to all consumers."""
for stream, consumer in self.consumers.items(): for stream, consumer in self.consumers.items():
kwargs['multiplexer'] = Multiplexer(stream, self) kwargs['multiplexer'] = WebsocketMultiplexer(stream, self.message.reply_channel)
consumer(message, **kwargs) consumer(message, **kwargs)
def disconnect(self, message, **kwargs): def disconnect(self, message, **kwargs):
"""Forward disconnection to all consumers.""" """Forward disconnection to all consumers."""
for stream, consumer in self.consumers.items(): for stream, consumer in self.consumers.items():
kwargs['multiplexer'] = Multiplexer(stream, self) kwargs['multiplexer'] = WebsocketMultiplexer(stream, self.message.reply_channel)
consumer(message, **kwargs) consumer(message, **kwargs)
def send(self, *args):
raise SendNotAvailableOnDemultiplexer("Use multiplexer.send of the multiplexer kwarg.")
class Multiplexer(object): @classmethod
def group_send(cls, name, stream, payload, close=False):
raise SendNotAvailableOnDemultiplexer("Use WebsocketMultiplexer.group_send")
class WebsocketMultiplexer(object):
""" """
The opposite of the demultiplexer, to send a message though a multiplexed channel. The opposite of the demultiplexer, to send a message though a multiplexed channel.
The demultiplexer holds the mapping and the basic send function. The multiplexer object is passed as a kwargs to the consumer when the message is dispatched.
The multiplexer allows the consumer class to be independant of the stream name. This pattern allows the consumer class to be independant of the stream name.
""" """
stream = None stream = None
demultiplexer = None reply_channel = None
def __init__(self, stream, demultiplexer): def __init__(self, stream, reply_channel):
self.stream = stream self.stream = stream
self.demultiplexer = demultiplexer self.reply_channel = reply_channel
def send(self, payload): def send(self, payload):
"""Multiplex the payload using the stream name and send it.""" """Multiplex the payload using the stream name and send it."""
self.demultiplexer.send(self.stream, payload) self.reply_channel.send(self.encode(self.stream, payload))
def group_send(self, name, payload, close=False): @classmethod
"""Proxy that abstracts the stream name""" def encode(cls, stream, payload):
self.demultiplexer.group_send(name, self.stream, payload, close) """
Encodes stream + payload for outbound sending.
"""
return {"text": json.dumps({
"stream": stream,
"payload": payload,
}, cls=DjangoJSONEncoder)}
@classmethod
def group_send(cls, name, stream, payload, close=False):
message = WebsocketMultiplexer.encode(stream, payload)
if close:
message["close"] = True
Group(name).send(message)

View File

@ -5,6 +5,7 @@ import json
from django.test import override_settings from django.test import override_settings
from channels import route_class from channels import route_class
from channels.exceptions import SendNotAvailableOnDemultiplexer
from channels.generic import BaseConsumer, websockets from channels.generic import BaseConsumer, websockets
from channels.tests import ChannelTestCase, Client, apply_routes from channels.tests import ChannelTestCase, Client, apply_routes
@ -144,7 +145,7 @@ class GenericTests(ChannelTestCase):
def receive(self, content, multiplexer=None, **kwargs): def receive(self, content, multiplexer=None, **kwargs):
multiplexer.send(content) multiplexer.send(content)
class Demultiplexer(websockets.WebsocketConsumerDemultiplexer): class Demultiplexer(websockets.WebsocketDemultiplexer):
consumers = { consumers = {
"mystream": MyWebsocketConsumer "mystream": MyWebsocketConsumer
@ -185,3 +186,34 @@ class GenericTests(ChannelTestCase):
"payload": {"id": "1"}, "payload": {"id": "1"},
}) })
}) })
def test_websocket_demultiplexer_send(self):
class MyWebsocketConsumer(websockets.JsonWebsocketConsumer):
def receive(self, content, multiplexer=None, **kwargs):
import pdb; pdb.set_trace() # breakpoint 69f2473b //
self.send(content)
class Demultiplexer(websockets.WebsocketDemultiplexer):
consumers = {
"mystream": MyWebsocketConsumer
}
with apply_routes([
route_class(Demultiplexer, path='/path/(?P<id>\d+)'),
route_class(MyWebsocketConsumer),
]):
client = Client()
with self.assertRaises(SendNotAvailableOnDemultiplexer):
client.send_and_consume('websocket.receive', {
'path': '/path/1',
'text': json.dumps({
"stream": "mystream",
"payload": {"text_field": "mytext"}
})
})
client.receive()

View File

@ -181,16 +181,7 @@ WebSocket Multiplexing
---------------------- ----------------------
Channels provides a standard way to multiplex different data streams over Channels provides a standard way to multiplex different data streams over
a single WebSocket, called a ``Demultiplexer``. You use it like this:: a single WebSocket, called a ``Demultiplexer``.
from channels.generic.websockets import WebsocketDemultiplexer
class Demultiplexer(WebsocketDemultiplexer):
mapping = {
"intval": "binding.intval",
"stats": "internal.stats",
}
It expects JSON-formatted WebSocket frames with two keys, ``stream`` and It expects JSON-formatted WebSocket frames with two keys, ``stream`` and
``payload``, and will match the ``stream`` against the mapping to find a ``payload``, and will match the ``stream`` against the mapping to find a
@ -201,26 +192,36 @@ in the ``routing.py`` file, and use authentication decorators as you wish.
Example using class-based consumer:: Example using class-based consumer::
from channels.generic.websockets import WebsocketConsumerDemultiplexer, JsonWebsocketConsumer from channels.generic.websockets import WebsocketDemultiplexer, JsonWebsocketConsumer
class MyWebsocketConsumer(JsonWebsocketConsumer): class EchoConsumer(websockets.JsonWebsocketConsumer):
def connect(self, message, multiplexer=None, **kwargs): def connect(self, message, multiplexer=None, **kwargs):
# Send data with the multiplexer
multiplexer.send({"status": "I just connected!"}) multiplexer.send({"status": "I just connected!"})
def disconnect(self, message, multiplexer=None, **kwargs): def disconnect(self, message, multiplexer=None, **kwargs):
print(multiplexer.stream) print("Stream %s is closed" % multiplexer.stream)
def receive(self, content, multiplexer=None, **kwargs): def receive(self, content, multiplexer=None, **kwargs):
# simple echo # Simple echo
multiplexer.send(content) multiplexer.send({"original_message": content})
class Demultiplexer(WebsocketConsumerDemultiplexer):
# Put your JSON consumers here: {stream_name : consumer} class AnotherConsumer(websockets.JsonWebsocketConsumer):
def receive(self, content, multiplexer=None, **kwargs):
# Some other actions here
pass
class Demultiplexer(WebsocketDemultiplexer):
# Wire your JSON consumers here: {stream_name : consumer}
consumers = { consumers = {
"mystream": MyWebsocketConsumer "echo": EchoConsumer,
"other": AnotherConsumer,
} }
The ``multiplexer`` allows the consumer class to be independant of the stream name. The ``multiplexer`` allows the consumer class to be independant of the stream name.
It holds the stream name and the demultiplexer on the attributes ``stream`` and ``demultiplexer``. It holds the stream name and the demultiplexer on the attributes ``stream`` and ``demultiplexer``.