From cba54f974908777287aecb0db3e0bd84e8dae83f Mon Sep 17 00:00:00 2001 From: Andrew Godwin Date: Sun, 8 Jan 2017 17:56:37 -0800 Subject: [PATCH] Fix up new demultiplexer/databinding interactions --- channels/binding/websockets.py | 14 ++- channels/generic/websockets.py | 3 +- channels/tests/test_binding.py | 159 ++++++--------------------------- channels/tests/test_generic.py | 9 +- docs/binding.rst | 13 +-- 5 files changed, 54 insertions(+), 144 deletions(-) diff --git a/channels/binding/websockets.py b/channels/binding/websockets.py index c3043f0..cd910ca 100644 --- a/channels/binding/websockets.py +++ b/channels/binding/websockets.py @@ -26,11 +26,9 @@ class WebsocketBinding(Binding): """ # Mark as abstract - model = None # Stream multiplexing name - stream = None # Decorators @@ -81,6 +79,18 @@ class WebsocketBinding(Binding): else: return handler + @classmethod + def trigger_inbound(cls, message, **kwargs): + """ + Overrides base trigger_inbound to ignore connect/disconnect. + """ + # Only allow received packets through further. + if message.channel.name != "websocket.receive": + return + # Call superclass, unpacking the payload in the process + payload = json.loads(message['text']) + super(WebsocketBinding, cls).trigger_inbound(payload, **kwargs) + def deserialize(self, message): """ You must hook this up behind a Deserializer, so we expect the JSON diff --git a/channels/generic/websockets.py b/channels/generic/websockets.py index e3a2742..256978c 100644 --- a/channels/generic/websockets.py +++ b/channels/generic/websockets.py @@ -208,7 +208,8 @@ class WebsocketDemultiplexer(JsonWebsocketConsumer): # Send demultiplexer to the consumer, to be able to answer kwargs['multiplexer'] = WebsocketMultiplexer(stream, self.message.reply_channel) # Patch send to avoid sending not formated messages from the consumer - consumer.send = self.send + if hasattr(consumer, "send"): + consumer.send = self.send # Dispatch message consumer(self.message, **kwargs) return diff --git a/channels/tests/test_binding.py b/channels/tests/test_binding.py index 391baff..b2975c2 100644 --- a/channels/tests/test_binding.py +++ b/channels/tests/test_binding.py @@ -204,115 +204,9 @@ class TestsBinding(ChannelTestCase): received = client.receive() self.assertIsNone(received) - def test_demultiplexer(self): - class Demultiplexer(WebsocketDemultiplexer): - mapping = { - 'users': 'binding.users', - } - - groups = ['inbound'] - - with apply_routes([Demultiplexer.as_route(path='/')]): - client = HttpClient() - client.send_and_consume('websocket.connect', path='/') - - # assert in group - Group('inbound').send({'text': json.dumps({'test': 'yes'})}, immediately=True) - self.assertEqual(client.receive(), {'test': 'yes'}) - - # assert that demultiplexer stream message - client.send_and_consume('websocket.receive', path='/', - text={'stream': 'users', 'payload': {'test': 'yes'}}) - message = client.get_next_message('binding.users') - self.assertIsNotNone(message) - self.assertEqual(message.content['test'], 'yes') - - def test_demultiplexer_with_wrong_stream(self): - class Demultiplexer(WebsocketDemultiplexer): - mapping = { - 'users': 'binding.users', - } - - groups = ['inbound'] - - with apply_routes([Demultiplexer.as_route(path='/')]): - client = HttpClient() - client.send_and_consume('websocket.connect', path='/') - - with self.assertRaises(ValueError) as value_error: - client.send_and_consume('websocket.receive', path='/', text={ - 'stream': 'wrong', 'payload': {'test': 'yes'} - }) - - self.assertIn('stream not mapped', value_error.exception.args[0]) - - message = client.get_next_message('binding.users') - self.assertIsNone(message) - - def test_demultiplexer_with_wrong_payload(self): - class Demultiplexer(WebsocketDemultiplexer): - mapping = { - 'users': 'binding.users', - } - - groups = ['inbound'] - - with apply_routes([Demultiplexer.as_route(path='/')]): - client = HttpClient() - client.send_and_consume('websocket.connect', path='/') - - with self.assertRaises(ValueError) as value_error: - client.send_and_consume('websocket.receive', path='/', text={ - 'stream': 'users', 'payload': 'test', - }) - - self.assertEqual(value_error.exception.args[0], 'Multiplexed frame payload is not a dict') - - message = client.get_next_message('binding.users') - self.assertIsNone(message) - - def test_demultiplexer_without_payload_and_steam(self): - class Demultiplexer(WebsocketDemultiplexer): - mapping = { - 'users': 'binding.users', - } - - groups = ['inbound'] - - with apply_routes([Demultiplexer.as_route(path='/')]): - client = HttpClient() - client.send_and_consume('websocket.connect', path='/') - - with self.assertRaises(ValueError) as value_error: - client.send_and_consume('websocket.receive', path='/', text={ - 'nostream': 'users', 'payload': 'test', - }) - - self.assertIn('no channel/payload key', value_error.exception.args[0]) - - message = client.get_next_message('binding.users') - self.assertIsNone(message) - - with self.assertRaises(ValueError) as value_error: - client.send_and_consume('websocket.receive', path='/', text={ - 'stream': 'users', - }) - - self.assertIn('no channel/payload key', value_error.exception.args[0]) - - message = client.get_next_message('binding.users') - self.assertIsNone(message) - def test_inbound_create(self): self.assertEqual(User.objects.all().count(), 0) - class Demultiplexer(WebsocketDemultiplexer): - mapping = { - 'users': 'binding.users', - } - - groups = ['inbound'] - class UserBinding(WebsocketBinding): model = User stream = 'users' @@ -325,15 +219,23 @@ class TestsBinding(ChannelTestCase): def has_permission(self, user, action, pk): return True - with apply_routes([Demultiplexer.as_route(path='/'), route('binding.users', UserBinding.consumer)]): + class Demultiplexer(WebsocketDemultiplexer): + consumers = { + 'users': UserBinding.consumer, + } + + groups = ['inbound'] + + with apply_routes([Demultiplexer.as_route(path='/')]): client = HttpClient() client.send_and_consume('websocket.connect', path='/') client.send_and_consume('websocket.receive', path='/', text={ 'stream': 'users', - 'payload': {'action': CREATE, 'data': {'username': 'test_inbound', 'email': 'test@user_steam.com'}} + 'payload': { + 'action': CREATE, + 'data': {'username': 'test_inbound', 'email': 'test@user_steam.com'}, + }, }) - # our Demultiplexer route message to the inbound consumer, so call Demultiplexer consumer - client.consume('binding.users') self.assertEqual(User.objects.all().count(), 1) user = User.objects.all().first() @@ -345,13 +247,6 @@ class TestsBinding(ChannelTestCase): def test_inbound_update(self): user = User.objects.create(username='test', email='test@channels.com') - class Demultiplexer(WebsocketDemultiplexer): - mapping = { - 'users': 'binding.users', - } - - groups = ['inbound'] - class UserBinding(WebsocketBinding): model = User stream = 'users' @@ -364,15 +259,20 @@ class TestsBinding(ChannelTestCase): def has_permission(self, user, action, pk): return True - with apply_routes([Demultiplexer.as_route(path='/'), route('binding.users', UserBinding.consumer)]): + class Demultiplexer(WebsocketDemultiplexer): + consumers = { + 'users': UserBinding.consumer, + } + + groups = ['inbound'] + + with apply_routes([Demultiplexer.as_route(path='/')]): client = HttpClient() client.send_and_consume('websocket.connect', path='/') client.send_and_consume('websocket.receive', path='/', text={ 'stream': 'users', 'payload': {'action': UPDATE, 'pk': user.pk, 'data': {'username': 'test_inbound'}} }) - # our Demultiplexer route message to the inbound consumer, so call Demultiplexer consumer - client.consume('binding.users') user = User.objects.get(pk=user.pk) self.assertEqual(user.username, 'test_inbound') @@ -383,7 +283,6 @@ class TestsBinding(ChannelTestCase): 'stream': 'users', 'payload': {'action': UPDATE, 'pk': user.pk, 'data': {'email': 'new@test.com'}} }) - client.consume('binding.users') user = User.objects.get(pk=user.pk) self.assertEqual(user.username, 'test_inbound') @@ -394,13 +293,6 @@ class TestsBinding(ChannelTestCase): def test_inbound_delete(self): user = User.objects.create(username='test', email='test@channels.com') - class Demultiplexer(WebsocketDemultiplexer): - mapping = { - 'users': 'binding.users', - } - - groups = ['inbound'] - class UserBinding(WebsocketBinding): model = User stream = 'users' @@ -413,15 +305,20 @@ class TestsBinding(ChannelTestCase): def has_permission(self, user, action, pk): return True - with apply_routes([Demultiplexer.as_route(path='/'), route('binding.users', UserBinding.consumer)]): + class Demultiplexer(WebsocketDemultiplexer): + consumers = { + 'users': UserBinding.consumer, + } + + groups = ['inbound'] + + with apply_routes([Demultiplexer.as_route(path='/')]): client = HttpClient() client.send_and_consume('websocket.connect', path='/') client.send_and_consume('websocket.receive', path='/', text={ 'stream': 'users', 'payload': {'action': DELETE, 'pk': user.pk} }) - # our Demultiplexer route message to the inbound consumer, so call Demultiplexer consumer - client.consume('binding.users') self.assertIsNone(User.objects.filter(pk=user.pk).first()) self.assertIsNone(client.receive()) diff --git a/channels/tests/test_generic.py b/channels/tests/test_generic.py index 6aabbdd..40ade79 100644 --- a/channels/tests/test_generic.py +++ b/channels/tests/test_generic.py @@ -125,9 +125,12 @@ class GenericTests(ChannelTestCase): method_mapping = {'mychannel': 'test'} - with apply_routes([WebsocketConsumer.as_route( + with apply_routes([ + WebsocketConsumer.as_route( {'method_mapping': method_mapping, 'trigger': 'from_as_route'}, - name='filter')]): + name='filter', + ), + ]): client = Client() client.send_and_consume('mychannel', {'name': 'filter'}) @@ -191,8 +194,6 @@ class GenericTests(ChannelTestCase): class MyWebsocketConsumer(websockets.JsonWebsocketConsumer): def receive(self, content, multiplexer=None, **kwargs): - import pdb; pdb.set_trace() # breakpoint 69f2473b // - self.send(content) class Demultiplexer(websockets.WebsocketDemultiplexer): diff --git a/docs/binding.rst b/docs/binding.rst index 6ee77d4..089ddbb 100644 --- a/docs/binding.rst +++ b/docs/binding.rst @@ -107,23 +107,25 @@ connect. The WebSocket binding classes use the standard :ref:`multiplexing`, so you just need to use that:: from channels.generic.websockets import WebsocketDemultiplexer + from .binding import IntegerValueBinding class Demultiplexer(WebsocketDemultiplexer): mapping = { - "intval": "binding.intval", + "intval": IntegerValueBinding.consumer, } def connection_groups(self): return ["intval-updates"] -As well as the standard stream-to-channel mapping, you also need to set +As well as the standard stream-to-consumer mapping, you also need to set ``connection_groups``, a list of groups to put people in when they connect. This should match the logic of ``group_names`` on your binding - we've used -our fixed group name again. +our fixed group name again. Notice that the binding has a ``.consumer`` attribute; +this is a standard WebSocket-JSON consumer, that the demultiplexer can pass +demultiplexed ``websocket.receive`` messages to. -Tie that into your routing, and tie each demultiplexed channel into the -``.consumer`` attribute of the Binding, and you're ready to go:: +Tie that into your routing, and you're ready to go:: from channels import route_class, route from .consumers import Demultiplexer @@ -131,7 +133,6 @@ Tie that into your routing, and tie each demultiplexed channel into the channel_routing = [ route_class(Demultiplexer, path="^/binding/"), - route("binding.intval", IntegerValueBinding.consumer), ]