diff --git a/channels/generic/websockets.py b/channels/generic/websockets.py index d82c9fe..3a7a037 100644 --- a/channels/generic/websockets.py +++ b/channels/generic/websockets.py @@ -225,3 +225,77 @@ class WebsocketDemultiplexer(JsonWebsocketConsumer): "stream": stream, "payload": payload, }, cls=DjangoJSONEncoder)} + + +class WebsocketConsumerDemultiplexer(WebsocketDemultiplexer): + """ + Demultiplexer but for consumer classes. + + Set a mapping of streams to consumer classes in the dict "consumers". + + The demultiplexer dispatch the payload of incoming messages to the corresponding + consumers. The demultiplexer is forwarded to the consumer as a kwargs "demultiplexer". + This allows the consumer to answer with a multiplexed message using a send method + from the demultiplexer. + """ + + # Put your JSON consumers here: {stream_name : consumer} + consumers = {} + + def receive(self, content, **kwargs): + """Forward messages to all consumers.""" + # Check the frame looks good + if isinstance(content, dict) and "stream" in content and "payload" in content: + # Match it to a channel + for stream, consumer in self.consumers.items(): + if stream == content['stream']: + # Extract payload and add in reply_channel + payload = content['payload'] + if not isinstance(payload, dict): + raise ValueError("Multiplexed frame payload is not a dict") + # The json consumer expects serialized JSON + self.message.content['text'] = json.dumps(payload) + # Send demultiplexer to the consumer, to be able to answer + kwargs['multiplexer'] = Multiplexer(stream, self) + consumer(self.message, **kwargs) + return + + raise ValueError("Invalid multiplexed frame received (stream not mapped)") + else: + raise ValueError("Invalid multiplexed **frame received (no channel/payload key)") + + def connect(self, message, **kwargs): + """Forward connection to all consumers.""" + for stream, consumer in self.consumers.items(): + kwargs['multiplexer'] = Multiplexer(stream, self) + consumer(message, **kwargs) + + def disconnect(self, message, **kwargs): + """Forward disconnection to all consumers.""" + for stream, consumer in self.consumers.items(): + kwargs['multiplexer'] = Multiplexer(stream, self) + consumer(message, **kwargs) + + +class Multiplexer(object): + """ + The opposite of the demultiplexer, to send a message though a multiplexed channel. + + The demultiplexer holds the mapping and the basic send function. + The multiplexer allows the consumer class to be independant of the stream name. + """ + + stream = None + demultiplexer = None + + def __init__(self, stream, demultiplexer): + self.stream = stream + self.demultiplexer = demultiplexer + + def send(self, payload): + """Multiplex the payload using the stream name and send it.""" + self.demultiplexer.send(self.stream, payload) + + def group_send(self, name, payload, close=False): + """Proxy that abstracts the stream name""" + self.demultiplexer.group_send(name, self.stream, payload, close) diff --git a/channels/tests/base.py b/channels/tests/base.py index d3b3d76..7e7ddc9 100644 --- a/channels/tests/base.py +++ b/channels/tests/base.py @@ -105,6 +105,13 @@ class Client(object): return return Message(content, recv_channel, channel_layers[self.alias]) + def get_consumer_by_channel(self, channel): + message = Message({'text': ''}, channel, self.channel_layer) + match = self.channel_layer.router.match(message) + if match: + consumer, kwargs = match + return consumer + def send(self, to, content={}): """ Send a message to a channel. diff --git a/channels/tests/test_generic.py b/channels/tests/test_generic.py index aaac316..ba33cc6 100644 --- a/channels/tests/test_generic.py +++ b/channels/tests/test_generic.py @@ -1,5 +1,7 @@ from __future__ import unicode_literals +import json + from django.test import override_settings from channels import route_class @@ -129,3 +131,57 @@ class GenericTests(ChannelTestCase): client.send_and_consume('mychannel', {'name': 'filter'}) self.assertEqual(client.receive(), {'trigger': 'from_as_route'}) + + def test_websockets_demultiplexer(self): + + class MyWebsocketConsumer(websockets.JsonWebsocketConsumer): + def connect(self, message, multiplexer=None, **kwargs): + multiplexer.send(kwargs) + + def disconnect(self, message, multiplexer=None, **kwargs): + multiplexer.send(kwargs) + + def receive(self, content, multiplexer=None, **kwargs): + multiplexer.send(content) + + class Demultiplexer(websockets.WebsocketConsumerDemultiplexer): + + consumers = { + "mystream": MyWebsocketConsumer + } + + with apply_routes([ + route_class(Demultiplexer, path='/path/(?P\d+)'), + route_class(MyWebsocketConsumer), + ]): + client = Client() + + client.send_and_consume('websocket.connect', {'path': '/path/1'}) + self.assertEqual(client.receive(), { + "text": json.dumps({ + "stream": "mystream", + "payload": {"id": "1"}, + }) + }) + + client.send_and_consume('websocket.receive', { + 'path': '/path/1', + 'text': json.dumps({ + "stream": "mystream", + "payload": {"text_field": "mytext"} + }) + }) + self.assertEqual(client.receive(), { + "text": json.dumps({ + "stream": "mystream", + "payload": {"text_field": "mytext"}, + }) + }) + + client.send_and_consume('websocket.disconnect', {'path': '/path/1'}) + self.assertEqual(client.receive(), { + "text": json.dumps({ + "stream": "mystream", + "payload": {"id": "1"}, + }) + }) diff --git a/docs/generics.rst b/docs/generics.rst index c7caeee..4e9c18f 100644 --- a/docs/generics.rst +++ b/docs/generics.rst @@ -198,11 +198,31 @@ channel name. It will then forward the message onto that channel while preserving ``reply_channel``, so you can hook consumers up to them directly in the ``routing.py`` file, and use authentication decorators as you wish. -You cannot use class-based consumers this way as the messages are no -longer in WebSocket format, though. If you need to do operations on -``connect`` or ``disconnect``, override those methods on the ``Demultiplexer`` -itself (you can also provide a ``connection_groups`` method, as it's just -based on the JSON WebSocket generic consumer). + +Example using class-based consumer:: + + from channels.generic.websockets import WebsocketConsumerDemultiplexer, JsonWebsocketConsumer + + class MyWebsocketConsumer(JsonWebsocketConsumer): + def connect(self, message, multiplexer=None, **kwargs): + multiplexer.send({"status": "I just connected!"}) + + def disconnect(self, message, multiplexer=None, **kwargs): + print(multiplexer.stream) + + def receive(self, content, multiplexer=None, **kwargs): + # simple echo + multiplexer.send(content) + + class Demultiplexer(WebsocketConsumerDemultiplexer): + + # Put your JSON consumers here: {stream_name : consumer} + consumers = { + "mystream": MyWebsocketConsumer + } + +The ``multiplexer`` allows the consumer class to be independant of the stream name. +It holds the stream name and the demultiplexer on the attributes ``stream`` and ``demultiplexer``. The :doc:`data binding ` code will also send out messages to clients in the same format, and you can encode things in this format yourself by