mirror of
https://github.com/django/daphne.git
synced 2025-07-14 18:02:17 +03:00
Fix up new demultiplexer/databinding interactions
This commit is contained in:
parent
5a539659a3
commit
cba54f9749
|
@ -26,11 +26,9 @@ class WebsocketBinding(Binding):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Mark as abstract
|
# Mark as abstract
|
||||||
|
|
||||||
model = None
|
model = None
|
||||||
|
|
||||||
# Stream multiplexing name
|
# Stream multiplexing name
|
||||||
|
|
||||||
stream = None
|
stream = None
|
||||||
|
|
||||||
# Decorators
|
# Decorators
|
||||||
|
@ -81,6 +79,18 @@ class WebsocketBinding(Binding):
|
||||||
else:
|
else:
|
||||||
return handler
|
return handler
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def trigger_inbound(cls, message, **kwargs):
|
||||||
|
"""
|
||||||
|
Overrides base trigger_inbound to ignore connect/disconnect.
|
||||||
|
"""
|
||||||
|
# Only allow received packets through further.
|
||||||
|
if message.channel.name != "websocket.receive":
|
||||||
|
return
|
||||||
|
# Call superclass, unpacking the payload in the process
|
||||||
|
payload = json.loads(message['text'])
|
||||||
|
super(WebsocketBinding, cls).trigger_inbound(payload, **kwargs)
|
||||||
|
|
||||||
def deserialize(self, message):
|
def deserialize(self, message):
|
||||||
"""
|
"""
|
||||||
You must hook this up behind a Deserializer, so we expect the JSON
|
You must hook this up behind a Deserializer, so we expect the JSON
|
||||||
|
|
|
@ -208,6 +208,7 @@ class WebsocketDemultiplexer(JsonWebsocketConsumer):
|
||||||
# Send demultiplexer to the consumer, to be able to answer
|
# Send demultiplexer to the consumer, to be able to answer
|
||||||
kwargs['multiplexer'] = WebsocketMultiplexer(stream, self.message.reply_channel)
|
kwargs['multiplexer'] = WebsocketMultiplexer(stream, self.message.reply_channel)
|
||||||
# Patch send to avoid sending not formated messages from the consumer
|
# Patch send to avoid sending not formated messages from the consumer
|
||||||
|
if hasattr(consumer, "send"):
|
||||||
consumer.send = self.send
|
consumer.send = self.send
|
||||||
# Dispatch message
|
# Dispatch message
|
||||||
consumer(self.message, **kwargs)
|
consumer(self.message, **kwargs)
|
||||||
|
|
|
@ -204,115 +204,9 @@ class TestsBinding(ChannelTestCase):
|
||||||
received = client.receive()
|
received = client.receive()
|
||||||
self.assertIsNone(received)
|
self.assertIsNone(received)
|
||||||
|
|
||||||
def test_demultiplexer(self):
|
|
||||||
class Demultiplexer(WebsocketDemultiplexer):
|
|
||||||
mapping = {
|
|
||||||
'users': 'binding.users',
|
|
||||||
}
|
|
||||||
|
|
||||||
groups = ['inbound']
|
|
||||||
|
|
||||||
with apply_routes([Demultiplexer.as_route(path='/')]):
|
|
||||||
client = HttpClient()
|
|
||||||
client.send_and_consume('websocket.connect', path='/')
|
|
||||||
|
|
||||||
# assert in group
|
|
||||||
Group('inbound').send({'text': json.dumps({'test': 'yes'})}, immediately=True)
|
|
||||||
self.assertEqual(client.receive(), {'test': 'yes'})
|
|
||||||
|
|
||||||
# assert that demultiplexer stream message
|
|
||||||
client.send_and_consume('websocket.receive', path='/',
|
|
||||||
text={'stream': 'users', 'payload': {'test': 'yes'}})
|
|
||||||
message = client.get_next_message('binding.users')
|
|
||||||
self.assertIsNotNone(message)
|
|
||||||
self.assertEqual(message.content['test'], 'yes')
|
|
||||||
|
|
||||||
def test_demultiplexer_with_wrong_stream(self):
|
|
||||||
class Demultiplexer(WebsocketDemultiplexer):
|
|
||||||
mapping = {
|
|
||||||
'users': 'binding.users',
|
|
||||||
}
|
|
||||||
|
|
||||||
groups = ['inbound']
|
|
||||||
|
|
||||||
with apply_routes([Demultiplexer.as_route(path='/')]):
|
|
||||||
client = HttpClient()
|
|
||||||
client.send_and_consume('websocket.connect', path='/')
|
|
||||||
|
|
||||||
with self.assertRaises(ValueError) as value_error:
|
|
||||||
client.send_and_consume('websocket.receive', path='/', text={
|
|
||||||
'stream': 'wrong', 'payload': {'test': 'yes'}
|
|
||||||
})
|
|
||||||
|
|
||||||
self.assertIn('stream not mapped', value_error.exception.args[0])
|
|
||||||
|
|
||||||
message = client.get_next_message('binding.users')
|
|
||||||
self.assertIsNone(message)
|
|
||||||
|
|
||||||
def test_demultiplexer_with_wrong_payload(self):
|
|
||||||
class Demultiplexer(WebsocketDemultiplexer):
|
|
||||||
mapping = {
|
|
||||||
'users': 'binding.users',
|
|
||||||
}
|
|
||||||
|
|
||||||
groups = ['inbound']
|
|
||||||
|
|
||||||
with apply_routes([Demultiplexer.as_route(path='/')]):
|
|
||||||
client = HttpClient()
|
|
||||||
client.send_and_consume('websocket.connect', path='/')
|
|
||||||
|
|
||||||
with self.assertRaises(ValueError) as value_error:
|
|
||||||
client.send_and_consume('websocket.receive', path='/', text={
|
|
||||||
'stream': 'users', 'payload': 'test',
|
|
||||||
})
|
|
||||||
|
|
||||||
self.assertEqual(value_error.exception.args[0], 'Multiplexed frame payload is not a dict')
|
|
||||||
|
|
||||||
message = client.get_next_message('binding.users')
|
|
||||||
self.assertIsNone(message)
|
|
||||||
|
|
||||||
def test_demultiplexer_without_payload_and_steam(self):
|
|
||||||
class Demultiplexer(WebsocketDemultiplexer):
|
|
||||||
mapping = {
|
|
||||||
'users': 'binding.users',
|
|
||||||
}
|
|
||||||
|
|
||||||
groups = ['inbound']
|
|
||||||
|
|
||||||
with apply_routes([Demultiplexer.as_route(path='/')]):
|
|
||||||
client = HttpClient()
|
|
||||||
client.send_and_consume('websocket.connect', path='/')
|
|
||||||
|
|
||||||
with self.assertRaises(ValueError) as value_error:
|
|
||||||
client.send_and_consume('websocket.receive', path='/', text={
|
|
||||||
'nostream': 'users', 'payload': 'test',
|
|
||||||
})
|
|
||||||
|
|
||||||
self.assertIn('no channel/payload key', value_error.exception.args[0])
|
|
||||||
|
|
||||||
message = client.get_next_message('binding.users')
|
|
||||||
self.assertIsNone(message)
|
|
||||||
|
|
||||||
with self.assertRaises(ValueError) as value_error:
|
|
||||||
client.send_and_consume('websocket.receive', path='/', text={
|
|
||||||
'stream': 'users',
|
|
||||||
})
|
|
||||||
|
|
||||||
self.assertIn('no channel/payload key', value_error.exception.args[0])
|
|
||||||
|
|
||||||
message = client.get_next_message('binding.users')
|
|
||||||
self.assertIsNone(message)
|
|
||||||
|
|
||||||
def test_inbound_create(self):
|
def test_inbound_create(self):
|
||||||
self.assertEqual(User.objects.all().count(), 0)
|
self.assertEqual(User.objects.all().count(), 0)
|
||||||
|
|
||||||
class Demultiplexer(WebsocketDemultiplexer):
|
|
||||||
mapping = {
|
|
||||||
'users': 'binding.users',
|
|
||||||
}
|
|
||||||
|
|
||||||
groups = ['inbound']
|
|
||||||
|
|
||||||
class UserBinding(WebsocketBinding):
|
class UserBinding(WebsocketBinding):
|
||||||
model = User
|
model = User
|
||||||
stream = 'users'
|
stream = 'users'
|
||||||
|
@ -325,15 +219,23 @@ class TestsBinding(ChannelTestCase):
|
||||||
def has_permission(self, user, action, pk):
|
def has_permission(self, user, action, pk):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
with apply_routes([Demultiplexer.as_route(path='/'), route('binding.users', UserBinding.consumer)]):
|
class Demultiplexer(WebsocketDemultiplexer):
|
||||||
|
consumers = {
|
||||||
|
'users': UserBinding.consumer,
|
||||||
|
}
|
||||||
|
|
||||||
|
groups = ['inbound']
|
||||||
|
|
||||||
|
with apply_routes([Demultiplexer.as_route(path='/')]):
|
||||||
client = HttpClient()
|
client = HttpClient()
|
||||||
client.send_and_consume('websocket.connect', path='/')
|
client.send_and_consume('websocket.connect', path='/')
|
||||||
client.send_and_consume('websocket.receive', path='/', text={
|
client.send_and_consume('websocket.receive', path='/', text={
|
||||||
'stream': 'users',
|
'stream': 'users',
|
||||||
'payload': {'action': CREATE, 'data': {'username': 'test_inbound', 'email': 'test@user_steam.com'}}
|
'payload': {
|
||||||
|
'action': CREATE,
|
||||||
|
'data': {'username': 'test_inbound', 'email': 'test@user_steam.com'},
|
||||||
|
},
|
||||||
})
|
})
|
||||||
# our Demultiplexer route message to the inbound consumer, so call Demultiplexer consumer
|
|
||||||
client.consume('binding.users')
|
|
||||||
|
|
||||||
self.assertEqual(User.objects.all().count(), 1)
|
self.assertEqual(User.objects.all().count(), 1)
|
||||||
user = User.objects.all().first()
|
user = User.objects.all().first()
|
||||||
|
@ -345,13 +247,6 @@ class TestsBinding(ChannelTestCase):
|
||||||
def test_inbound_update(self):
|
def test_inbound_update(self):
|
||||||
user = User.objects.create(username='test', email='test@channels.com')
|
user = User.objects.create(username='test', email='test@channels.com')
|
||||||
|
|
||||||
class Demultiplexer(WebsocketDemultiplexer):
|
|
||||||
mapping = {
|
|
||||||
'users': 'binding.users',
|
|
||||||
}
|
|
||||||
|
|
||||||
groups = ['inbound']
|
|
||||||
|
|
||||||
class UserBinding(WebsocketBinding):
|
class UserBinding(WebsocketBinding):
|
||||||
model = User
|
model = User
|
||||||
stream = 'users'
|
stream = 'users'
|
||||||
|
@ -364,15 +259,20 @@ class TestsBinding(ChannelTestCase):
|
||||||
def has_permission(self, user, action, pk):
|
def has_permission(self, user, action, pk):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
with apply_routes([Demultiplexer.as_route(path='/'), route('binding.users', UserBinding.consumer)]):
|
class Demultiplexer(WebsocketDemultiplexer):
|
||||||
|
consumers = {
|
||||||
|
'users': UserBinding.consumer,
|
||||||
|
}
|
||||||
|
|
||||||
|
groups = ['inbound']
|
||||||
|
|
||||||
|
with apply_routes([Demultiplexer.as_route(path='/')]):
|
||||||
client = HttpClient()
|
client = HttpClient()
|
||||||
client.send_and_consume('websocket.connect', path='/')
|
client.send_and_consume('websocket.connect', path='/')
|
||||||
client.send_and_consume('websocket.receive', path='/', text={
|
client.send_and_consume('websocket.receive', path='/', text={
|
||||||
'stream': 'users',
|
'stream': 'users',
|
||||||
'payload': {'action': UPDATE, 'pk': user.pk, 'data': {'username': 'test_inbound'}}
|
'payload': {'action': UPDATE, 'pk': user.pk, 'data': {'username': 'test_inbound'}}
|
||||||
})
|
})
|
||||||
# our Demultiplexer route message to the inbound consumer, so call Demultiplexer consumer
|
|
||||||
client.consume('binding.users')
|
|
||||||
|
|
||||||
user = User.objects.get(pk=user.pk)
|
user = User.objects.get(pk=user.pk)
|
||||||
self.assertEqual(user.username, 'test_inbound')
|
self.assertEqual(user.username, 'test_inbound')
|
||||||
|
@ -383,7 +283,6 @@ class TestsBinding(ChannelTestCase):
|
||||||
'stream': 'users',
|
'stream': 'users',
|
||||||
'payload': {'action': UPDATE, 'pk': user.pk, 'data': {'email': 'new@test.com'}}
|
'payload': {'action': UPDATE, 'pk': user.pk, 'data': {'email': 'new@test.com'}}
|
||||||
})
|
})
|
||||||
client.consume('binding.users')
|
|
||||||
|
|
||||||
user = User.objects.get(pk=user.pk)
|
user = User.objects.get(pk=user.pk)
|
||||||
self.assertEqual(user.username, 'test_inbound')
|
self.assertEqual(user.username, 'test_inbound')
|
||||||
|
@ -394,13 +293,6 @@ class TestsBinding(ChannelTestCase):
|
||||||
def test_inbound_delete(self):
|
def test_inbound_delete(self):
|
||||||
user = User.objects.create(username='test', email='test@channels.com')
|
user = User.objects.create(username='test', email='test@channels.com')
|
||||||
|
|
||||||
class Demultiplexer(WebsocketDemultiplexer):
|
|
||||||
mapping = {
|
|
||||||
'users': 'binding.users',
|
|
||||||
}
|
|
||||||
|
|
||||||
groups = ['inbound']
|
|
||||||
|
|
||||||
class UserBinding(WebsocketBinding):
|
class UserBinding(WebsocketBinding):
|
||||||
model = User
|
model = User
|
||||||
stream = 'users'
|
stream = 'users'
|
||||||
|
@ -413,15 +305,20 @@ class TestsBinding(ChannelTestCase):
|
||||||
def has_permission(self, user, action, pk):
|
def has_permission(self, user, action, pk):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
with apply_routes([Demultiplexer.as_route(path='/'), route('binding.users', UserBinding.consumer)]):
|
class Demultiplexer(WebsocketDemultiplexer):
|
||||||
|
consumers = {
|
||||||
|
'users': UserBinding.consumer,
|
||||||
|
}
|
||||||
|
|
||||||
|
groups = ['inbound']
|
||||||
|
|
||||||
|
with apply_routes([Demultiplexer.as_route(path='/')]):
|
||||||
client = HttpClient()
|
client = HttpClient()
|
||||||
client.send_and_consume('websocket.connect', path='/')
|
client.send_and_consume('websocket.connect', path='/')
|
||||||
client.send_and_consume('websocket.receive', path='/', text={
|
client.send_and_consume('websocket.receive', path='/', text={
|
||||||
'stream': 'users',
|
'stream': 'users',
|
||||||
'payload': {'action': DELETE, 'pk': user.pk}
|
'payload': {'action': DELETE, 'pk': user.pk}
|
||||||
})
|
})
|
||||||
# our Demultiplexer route message to the inbound consumer, so call Demultiplexer consumer
|
|
||||||
client.consume('binding.users')
|
|
||||||
|
|
||||||
self.assertIsNone(User.objects.filter(pk=user.pk).first())
|
self.assertIsNone(User.objects.filter(pk=user.pk).first())
|
||||||
self.assertIsNone(client.receive())
|
self.assertIsNone(client.receive())
|
||||||
|
|
|
@ -125,9 +125,12 @@ class GenericTests(ChannelTestCase):
|
||||||
|
|
||||||
method_mapping = {'mychannel': 'test'}
|
method_mapping = {'mychannel': 'test'}
|
||||||
|
|
||||||
with apply_routes([WebsocketConsumer.as_route(
|
with apply_routes([
|
||||||
|
WebsocketConsumer.as_route(
|
||||||
{'method_mapping': method_mapping, 'trigger': 'from_as_route'},
|
{'method_mapping': method_mapping, 'trigger': 'from_as_route'},
|
||||||
name='filter')]):
|
name='filter',
|
||||||
|
),
|
||||||
|
]):
|
||||||
client = Client()
|
client = Client()
|
||||||
|
|
||||||
client.send_and_consume('mychannel', {'name': 'filter'})
|
client.send_and_consume('mychannel', {'name': 'filter'})
|
||||||
|
@ -191,8 +194,6 @@ class GenericTests(ChannelTestCase):
|
||||||
|
|
||||||
class MyWebsocketConsumer(websockets.JsonWebsocketConsumer):
|
class MyWebsocketConsumer(websockets.JsonWebsocketConsumer):
|
||||||
def receive(self, content, multiplexer=None, **kwargs):
|
def receive(self, content, multiplexer=None, **kwargs):
|
||||||
import pdb; pdb.set_trace() # breakpoint 69f2473b //
|
|
||||||
|
|
||||||
self.send(content)
|
self.send(content)
|
||||||
|
|
||||||
class Demultiplexer(websockets.WebsocketDemultiplexer):
|
class Demultiplexer(websockets.WebsocketDemultiplexer):
|
||||||
|
|
|
@ -107,23 +107,25 @@ connect. The WebSocket binding classes use the standard :ref:`multiplexing`,
|
||||||
so you just need to use that::
|
so you just need to use that::
|
||||||
|
|
||||||
from channels.generic.websockets import WebsocketDemultiplexer
|
from channels.generic.websockets import WebsocketDemultiplexer
|
||||||
|
from .binding import IntegerValueBinding
|
||||||
|
|
||||||
class Demultiplexer(WebsocketDemultiplexer):
|
class Demultiplexer(WebsocketDemultiplexer):
|
||||||
|
|
||||||
mapping = {
|
mapping = {
|
||||||
"intval": "binding.intval",
|
"intval": IntegerValueBinding.consumer,
|
||||||
}
|
}
|
||||||
|
|
||||||
def connection_groups(self):
|
def connection_groups(self):
|
||||||
return ["intval-updates"]
|
return ["intval-updates"]
|
||||||
|
|
||||||
As well as the standard stream-to-channel mapping, you also need to set
|
As well as the standard stream-to-consumer mapping, you also need to set
|
||||||
``connection_groups``, a list of groups to put people in when they connect.
|
``connection_groups``, a list of groups to put people in when they connect.
|
||||||
This should match the logic of ``group_names`` on your binding - we've used
|
This should match the logic of ``group_names`` on your binding - we've used
|
||||||
our fixed group name again.
|
our fixed group name again. Notice that the binding has a ``.consumer`` attribute;
|
||||||
|
this is a standard WebSocket-JSON consumer, that the demultiplexer can pass
|
||||||
|
demultiplexed ``websocket.receive`` messages to.
|
||||||
|
|
||||||
Tie that into your routing, and tie each demultiplexed channel into the
|
Tie that into your routing, and you're ready to go::
|
||||||
``.consumer`` attribute of the Binding, and you're ready to go::
|
|
||||||
|
|
||||||
from channels import route_class, route
|
from channels import route_class, route
|
||||||
from .consumers import Demultiplexer
|
from .consumers import Demultiplexer
|
||||||
|
@ -131,7 +133,6 @@ Tie that into your routing, and tie each demultiplexed channel into the
|
||||||
|
|
||||||
channel_routing = [
|
channel_routing = [
|
||||||
route_class(Demultiplexer, path="^/binding/"),
|
route_class(Demultiplexer, path="^/binding/"),
|
||||||
route("binding.intval", IntegerValueBinding.consumer),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user