diff --git a/channels/binding/websockets.py b/channels/binding/websockets.py index b3e4dfe..c3043f0 100644 --- a/channels/binding/websockets.py +++ b/channels/binding/websockets.py @@ -3,7 +3,7 @@ import json from django.core import serializers from django.core.serializers.json import DjangoJSONEncoder -from ..generic.websockets import WebsocketDemultiplexer +from ..generic.websockets import WebsocketMultiplexer from ..sessions import enforce_ordering from .base import Binding @@ -40,7 +40,7 @@ class WebsocketBinding(Binding): # Outbound @classmethod def encode(cls, stream, payload): - return WebsocketDemultiplexer.encode(stream, payload) + return WebsocketMultiplexer.encode(stream, payload) def serialize(self, instance, action): payload = { diff --git a/channels/exceptions.py b/channels/exceptions.py index d81af8b..afadf9e 100644 --- a/channels/exceptions.py +++ b/channels/exceptions.py @@ -33,7 +33,14 @@ class RequestAborted(Exception): class DenyConnection(Exception): """ - Raise during a websocket.connect (or other supported connection) handler + Raised during a websocket.connect (or other supported connection) handler to deny the connection. """ pass + + +class SendNotAvailableOnDemultiplexer(Exception): + """ + Raised when trying to send with a WebsocketDemultiplexer. Use the multiplexer instead. + """ + pass diff --git a/channels/generic/websockets.py b/channels/generic/websockets.py index 3a7a037..e3a2742 100644 --- a/channels/generic/websockets.py +++ b/channels/generic/websockets.py @@ -1,8 +1,9 @@ from django.core.serializers.json import DjangoJSONEncoder, json from ..auth import channel_session_user_from_http -from ..channel import Channel, Group +from ..channel import Group from ..sessions import enforce_ordering +from ..exceptions import SendNotAvailableOnDemultiplexer from .base import BaseConsumer @@ -176,67 +177,16 @@ class WebsocketDemultiplexer(JsonWebsocketConsumer): in a sub-dict called "payload". This lets you run multiple streams over a single WebSocket connection in a standardised way. - Incoming messages on streams are mapped into a custom channel so you can + Incoming messages on streams are dispatched to consumers so you can just tie in consumers the normal way. The reply_channels are kept so sessions/auth continue to work. Payloads must be a dict at the top level, so they fulfill the Channels message spec. - Set a mapping from streams to channels in the "mapping" key. We make you - whitelist channels like this to allow different namespaces and for security - reasons (imagine if someone could inject straight into websocket.receive). - """ + To answer with a multiplexed message, a multiplexer object + with "send" and "group_send" methods is forwarded to the consumer as a kwargs + "multiplexer". - mapping = {} - - def receive(self, content, **kwargs): - # Check the frame looks good - if isinstance(content, dict) and "stream" in content and "payload" in content: - # Match it to a channel - stream = content['stream'] - if stream in self.mapping: - # Extract payload and add in reply_channel - payload = content['payload'] - if not isinstance(payload, dict): - raise ValueError("Multiplexed frame payload is not a dict") - payload['reply_channel'] = self.message['reply_channel'] - # Send it onto the new channel - Channel(self.mapping[stream]).send(payload) - else: - raise ValueError("Invalid multiplexed frame received (stream not mapped)") - else: - raise ValueError("Invalid multiplexed frame received (no channel/payload key)") - - def send(self, stream, payload): - self.message.reply_channel.send(self.encode(stream, payload)) - - @classmethod - def group_send(cls, name, stream, payload, close=False): - message = cls.encode(stream, payload) - if close: - message["close"] = True - Group(name).send(message) - - @classmethod - def encode(cls, stream, payload): - """ - Encodes stream + payload for outbound sending. - """ - return {"text": json.dumps({ - "stream": stream, - "payload": payload, - }, cls=DjangoJSONEncoder)} - - -class WebsocketConsumerDemultiplexer(WebsocketDemultiplexer): - """ - Demultiplexer but for consumer classes. - - Set a mapping of streams to consumer classes in the dict "consumers". - - The demultiplexer dispatch the payload of incoming messages to the corresponding - consumers. The demultiplexer is forwarded to the consumer as a kwargs "demultiplexer". - This allows the consumer to answer with a multiplexed message using a send method - from the demultiplexer. + Set a mapping of streams to consumer classes in the "consumers" keyword. """ # Put your JSON consumers here: {stream_name : consumer} @@ -256,7 +206,10 @@ class WebsocketConsumerDemultiplexer(WebsocketDemultiplexer): # The json consumer expects serialized JSON self.message.content['text'] = json.dumps(payload) # Send demultiplexer to the consumer, to be able to answer - kwargs['multiplexer'] = Multiplexer(stream, self) + kwargs['multiplexer'] = WebsocketMultiplexer(stream, self.message.reply_channel) + # Patch send to avoid sending not formated messages from the consumer + consumer.send = self.send + # Dispatch message consumer(self.message, **kwargs) return @@ -267,35 +220,55 @@ class WebsocketConsumerDemultiplexer(WebsocketDemultiplexer): def connect(self, message, **kwargs): """Forward connection to all consumers.""" for stream, consumer in self.consumers.items(): - kwargs['multiplexer'] = Multiplexer(stream, self) + kwargs['multiplexer'] = WebsocketMultiplexer(stream, self.message.reply_channel) consumer(message, **kwargs) def disconnect(self, message, **kwargs): """Forward disconnection to all consumers.""" for stream, consumer in self.consumers.items(): - kwargs['multiplexer'] = Multiplexer(stream, self) + kwargs['multiplexer'] = WebsocketMultiplexer(stream, self.message.reply_channel) consumer(message, **kwargs) + def send(self, *args): + raise SendNotAvailableOnDemultiplexer("Use multiplexer.send of the multiplexer kwarg.") -class Multiplexer(object): + @classmethod + def group_send(cls, name, stream, payload, close=False): + raise SendNotAvailableOnDemultiplexer("Use WebsocketMultiplexer.group_send") + + +class WebsocketMultiplexer(object): """ The opposite of the demultiplexer, to send a message though a multiplexed channel. - The demultiplexer holds the mapping and the basic send function. - The multiplexer allows the consumer class to be independant of the stream name. + The multiplexer object is passed as a kwargs to the consumer when the message is dispatched. + This pattern allows the consumer class to be independant of the stream name. """ stream = None - demultiplexer = None + reply_channel = None - def __init__(self, stream, demultiplexer): + def __init__(self, stream, reply_channel): self.stream = stream - self.demultiplexer = demultiplexer + self.reply_channel = reply_channel def send(self, payload): """Multiplex the payload using the stream name and send it.""" - self.demultiplexer.send(self.stream, payload) + self.reply_channel.send(self.encode(self.stream, payload)) - def group_send(self, name, payload, close=False): - """Proxy that abstracts the stream name""" - self.demultiplexer.group_send(name, self.stream, payload, close) + @classmethod + def encode(cls, stream, payload): + """ + Encodes stream + payload for outbound sending. + """ + return {"text": json.dumps({ + "stream": stream, + "payload": payload, + }, cls=DjangoJSONEncoder)} + + @classmethod + def group_send(cls, name, stream, payload, close=False): + message = WebsocketMultiplexer.encode(stream, payload) + if close: + message["close"] = True + Group(name).send(message) diff --git a/channels/tests/test_generic.py b/channels/tests/test_generic.py index ba33cc6..6aabbdd 100644 --- a/channels/tests/test_generic.py +++ b/channels/tests/test_generic.py @@ -5,6 +5,7 @@ import json from django.test import override_settings from channels import route_class +from channels.exceptions import SendNotAvailableOnDemultiplexer from channels.generic import BaseConsumer, websockets from channels.tests import ChannelTestCase, Client, apply_routes @@ -144,7 +145,7 @@ class GenericTests(ChannelTestCase): def receive(self, content, multiplexer=None, **kwargs): multiplexer.send(content) - class Demultiplexer(websockets.WebsocketConsumerDemultiplexer): + class Demultiplexer(websockets.WebsocketDemultiplexer): consumers = { "mystream": MyWebsocketConsumer @@ -185,3 +186,34 @@ class GenericTests(ChannelTestCase): "payload": {"id": "1"}, }) }) + + def test_websocket_demultiplexer_send(self): + + class MyWebsocketConsumer(websockets.JsonWebsocketConsumer): + def receive(self, content, multiplexer=None, **kwargs): + import pdb; pdb.set_trace() # breakpoint 69f2473b // + + self.send(content) + + class Demultiplexer(websockets.WebsocketDemultiplexer): + + consumers = { + "mystream": MyWebsocketConsumer + } + + with apply_routes([ + route_class(Demultiplexer, path='/path/(?P\d+)'), + route_class(MyWebsocketConsumer), + ]): + client = Client() + + with self.assertRaises(SendNotAvailableOnDemultiplexer): + client.send_and_consume('websocket.receive', { + 'path': '/path/1', + 'text': json.dumps({ + "stream": "mystream", + "payload": {"text_field": "mytext"} + }) + }) + + client.receive() diff --git a/docs/generics.rst b/docs/generics.rst index 4e9c18f..b871015 100644 --- a/docs/generics.rst +++ b/docs/generics.rst @@ -181,16 +181,7 @@ WebSocket Multiplexing ---------------------- Channels provides a standard way to multiplex different data streams over -a single WebSocket, called a ``Demultiplexer``. You use it like this:: - - from channels.generic.websockets import WebsocketDemultiplexer - - class Demultiplexer(WebsocketDemultiplexer): - - mapping = { - "intval": "binding.intval", - "stats": "internal.stats", - } +a single WebSocket, called a ``Demultiplexer``. It expects JSON-formatted WebSocket frames with two keys, ``stream`` and ``payload``, and will match the ``stream`` against the mapping to find a @@ -201,26 +192,36 @@ in the ``routing.py`` file, and use authentication decorators as you wish. Example using class-based consumer:: - from channels.generic.websockets import WebsocketConsumerDemultiplexer, JsonWebsocketConsumer + from channels.generic.websockets import WebsocketDemultiplexer, JsonWebsocketConsumer - class MyWebsocketConsumer(JsonWebsocketConsumer): + class EchoConsumer(websockets.JsonWebsocketConsumer): def connect(self, message, multiplexer=None, **kwargs): + # Send data with the multiplexer multiplexer.send({"status": "I just connected!"}) def disconnect(self, message, multiplexer=None, **kwargs): - print(multiplexer.stream) + print("Stream %s is closed" % multiplexer.stream) def receive(self, content, multiplexer=None, **kwargs): - # simple echo - multiplexer.send(content) + # Simple echo + multiplexer.send({"original_message": content}) - class Demultiplexer(WebsocketConsumerDemultiplexer): - # Put your JSON consumers here: {stream_name : consumer} + class AnotherConsumer(websockets.JsonWebsocketConsumer): + def receive(self, content, multiplexer=None, **kwargs): + # Some other actions here + pass + + + class Demultiplexer(WebsocketDemultiplexer): + + # Wire your JSON consumers here: {stream_name : consumer} consumers = { - "mystream": MyWebsocketConsumer + "echo": EchoConsumer, + "other": AnotherConsumer, } + The ``multiplexer`` allows the consumer class to be independant of the stream name. It holds the stream name and the demultiplexer on the attributes ``stream`` and ``demultiplexer``.