mirror of
https://github.com/django/daphne.git
synced 2025-07-16 10:52:17 +03:00
Inbound tests (#351)
* Added groups as Binding attr for easy groups_names definition * Binding: inbound - updating fields that only in fields attribute * Added tests for inbound part of binding #343 * Fix for flake8 checker * Revert "Added groups as Binding attr for easy groups_names definition" This reverts commit 009b4adbee534d4ccbea191ce2523a0edb09d407. * Using group_names at inbound tests
This commit is contained in:
parent
e176e913d8
commit
075897d910
|
@ -110,7 +110,8 @@ class WebsocketBinding(Binding):
|
|||
instance = self.model.objects.get(pk=pk)
|
||||
hydrated = self._hydrate(pk, data)
|
||||
for name in data.keys():
|
||||
setattr(instance, name, getattr(hydrated.object, name))
|
||||
if name in self.fields or self.fields == ['__all__']:
|
||||
setattr(instance, name, getattr(hydrated.object, name))
|
||||
instance.save()
|
||||
|
||||
|
||||
|
|
|
@ -1,9 +1,12 @@
|
|||
from __future__ import unicode_literals
|
||||
import json
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from channels.binding.base import CREATE, UPDATE, DELETE
|
||||
from channels.binding.websockets import WebsocketBinding
|
||||
from channels.generic.websockets import WebsocketDemultiplexer
|
||||
from channels.tests import ChannelTestCase, apply_routes, HttpClient
|
||||
from channels import route
|
||||
from channels import route, Group
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
|
@ -134,3 +137,220 @@ class TestsBinding(ChannelTestCase):
|
|||
|
||||
received = client.receive()
|
||||
self.assertIsNone(received)
|
||||
|
||||
def test_demultiplexer(self):
|
||||
class Demultiplexer(WebsocketDemultiplexer):
|
||||
mapping = {
|
||||
'users': 'binding.users',
|
||||
}
|
||||
|
||||
groups = ['inbound']
|
||||
|
||||
with apply_routes([Demultiplexer.as_route(path='/')]):
|
||||
client = HttpClient()
|
||||
client.send_and_consume('websocket.connect', path='/')
|
||||
|
||||
# assert in group
|
||||
Group('inbound').send({'text': json.dumps({'test': 'yes'})})
|
||||
self.assertEqual(client.receive(), {'test': 'yes'})
|
||||
|
||||
# assert that demultiplexer stream message
|
||||
client.send_and_consume('websocket.receive', path='/',
|
||||
text={'stream': 'users', 'payload': {'test': 'yes'}})
|
||||
message = client.get_next_message('binding.users')
|
||||
self.assertIsNotNone(message)
|
||||
self.assertEqual(message.content['test'], 'yes')
|
||||
|
||||
def test_demultiplexer_with_wrong_stream(self):
|
||||
class Demultiplexer(WebsocketDemultiplexer):
|
||||
mapping = {
|
||||
'users': 'binding.users',
|
||||
}
|
||||
|
||||
groups = ['inbound']
|
||||
|
||||
with apply_routes([Demultiplexer.as_route(path='/')]):
|
||||
client = HttpClient()
|
||||
client.send_and_consume('websocket.connect', path='/')
|
||||
|
||||
with self.assertRaises(ValueError) as value_error:
|
||||
client.send_and_consume('websocket.receive', path='/', text={
|
||||
'stream': 'wrong', 'payload': {'test': 'yes'}
|
||||
})
|
||||
|
||||
self.assertIn('stream not mapped', value_error.exception.args[0])
|
||||
|
||||
message = client.get_next_message('binding.users')
|
||||
self.assertIsNone(message)
|
||||
|
||||
def test_demultiplexer_with_wrong_payload(self):
|
||||
class Demultiplexer(WebsocketDemultiplexer):
|
||||
mapping = {
|
||||
'users': 'binding.users',
|
||||
}
|
||||
|
||||
groups = ['inbound']
|
||||
|
||||
with apply_routes([Demultiplexer.as_route(path='/')]):
|
||||
client = HttpClient()
|
||||
client.send_and_consume('websocket.connect', path='/')
|
||||
|
||||
with self.assertRaises(ValueError) as value_error:
|
||||
client.send_and_consume('websocket.receive', path='/', text={
|
||||
'stream': 'users', 'payload': 'test',
|
||||
})
|
||||
|
||||
self.assertEqual(value_error.exception.args[0], 'Multiplexed frame payload is not a dict')
|
||||
|
||||
message = client.get_next_message('binding.users')
|
||||
self.assertIsNone(message)
|
||||
|
||||
def test_demultiplexer_without_payload_and_steam(self):
|
||||
class Demultiplexer(WebsocketDemultiplexer):
|
||||
mapping = {
|
||||
'users': 'binding.users',
|
||||
}
|
||||
|
||||
groups = ['inbound']
|
||||
|
||||
with apply_routes([Demultiplexer.as_route(path='/')]):
|
||||
client = HttpClient()
|
||||
client.send_and_consume('websocket.connect', path='/')
|
||||
|
||||
with self.assertRaises(ValueError) as value_error:
|
||||
client.send_and_consume('websocket.receive', path='/', text={
|
||||
'nostream': 'users', 'payload': 'test',
|
||||
})
|
||||
|
||||
self.assertIn('no channel/payload key', value_error.exception.args[0])
|
||||
|
||||
message = client.get_next_message('binding.users')
|
||||
self.assertIsNone(message)
|
||||
|
||||
with self.assertRaises(ValueError) as value_error:
|
||||
client.send_and_consume('websocket.receive', path='/', text={
|
||||
'stream': 'users',
|
||||
})
|
||||
|
||||
self.assertIn('no channel/payload key', value_error.exception.args[0])
|
||||
|
||||
message = client.get_next_message('binding.users')
|
||||
self.assertIsNone(message)
|
||||
|
||||
def test_inbound_create(self):
|
||||
self.assertEqual(User.objects.all().count(), 0)
|
||||
|
||||
class Demultiplexer(WebsocketDemultiplexer):
|
||||
mapping = {
|
||||
'users': 'binding.users',
|
||||
}
|
||||
|
||||
groups = ['inbound']
|
||||
|
||||
class UserBinding(WebsocketBinding):
|
||||
model = User
|
||||
stream = 'users'
|
||||
fields = ['username', 'email', 'password', 'last_name']
|
||||
|
||||
@classmethod
|
||||
def group_names(cls, instance, action):
|
||||
return ['users_outbound']
|
||||
|
||||
def has_permission(self, user, action, pk):
|
||||
return True
|
||||
|
||||
with apply_routes([Demultiplexer.as_route(path='/'), route('binding.users', UserBinding.consumer)]):
|
||||
client = HttpClient()
|
||||
client.send_and_consume('websocket.connect', path='/')
|
||||
client.send_and_consume('websocket.receive', path='/', text={
|
||||
'stream': 'users',
|
||||
'payload': {'action': CREATE, 'data': {'username': 'test_inbound', 'email': 'test@user_steam.com'}}
|
||||
})
|
||||
# our Demultiplexer route message to the inbound consumer, so call Demultiplexer consumer
|
||||
client.consume('binding.users')
|
||||
|
||||
self.assertEqual(User.objects.all().count(), 1)
|
||||
user = User.objects.all().first()
|
||||
self.assertEqual(user.username, 'test_inbound')
|
||||
self.assertEqual(user.email, 'test@user_steam.com')
|
||||
|
||||
def test_inbound_update(self):
|
||||
user = User.objects.create(username='test', email='test@channels.com')
|
||||
|
||||
class Demultiplexer(WebsocketDemultiplexer):
|
||||
mapping = {
|
||||
'users': 'binding.users',
|
||||
}
|
||||
|
||||
groups = ['inbound']
|
||||
|
||||
class UserBinding(WebsocketBinding):
|
||||
model = User
|
||||
stream = 'users'
|
||||
fields = ['username', ]
|
||||
|
||||
@classmethod
|
||||
def group_names(cls, instance, action):
|
||||
return ['users_outbound']
|
||||
|
||||
def has_permission(self, user, action, pk):
|
||||
return True
|
||||
|
||||
with apply_routes([Demultiplexer.as_route(path='/'), route('binding.users', UserBinding.consumer)]):
|
||||
client = HttpClient()
|
||||
client.send_and_consume('websocket.connect', path='/')
|
||||
client.send_and_consume('websocket.receive', path='/', text={
|
||||
'stream': 'users',
|
||||
'payload': {'action': UPDATE, 'pk': user.pk, 'data': {'username': 'test_inbound'}}
|
||||
})
|
||||
# our Demultiplexer route message to the inbound consumer, so call Demultiplexer consumer
|
||||
client.consume('binding.users')
|
||||
|
||||
user = User.objects.get(pk=user.pk)
|
||||
self.assertEqual(user.username, 'test_inbound')
|
||||
self.assertEqual(user.email, 'test@channels.com')
|
||||
|
||||
# trying change field that not in binding fields
|
||||
client.send_and_consume('websocket.receive', path='/', text={
|
||||
'stream': 'users',
|
||||
'payload': {'action': UPDATE, 'pk': user.pk, 'data': {'email': 'new@test.com'}}
|
||||
})
|
||||
client.consume('binding.users')
|
||||
|
||||
user = User.objects.get(pk=user.pk)
|
||||
self.assertEqual(user.username, 'test_inbound')
|
||||
self.assertEqual(user.email, 'test@channels.com')
|
||||
|
||||
def test_inbound_delete(self):
|
||||
user = User.objects.create(username='test', email='test@channels.com')
|
||||
|
||||
class Demultiplexer(WebsocketDemultiplexer):
|
||||
mapping = {
|
||||
'users': 'binding.users',
|
||||
}
|
||||
|
||||
groups = ['inbound']
|
||||
|
||||
class UserBinding(WebsocketBinding):
|
||||
model = User
|
||||
stream = 'users'
|
||||
fields = ['username', ]
|
||||
|
||||
@classmethod
|
||||
def group_names(cls, instance, action):
|
||||
return ['users_outbound']
|
||||
|
||||
def has_permission(self, user, action, pk):
|
||||
return True
|
||||
|
||||
with apply_routes([Demultiplexer.as_route(path='/'), route('binding.users', UserBinding.consumer)]):
|
||||
client = HttpClient()
|
||||
client.send_and_consume('websocket.connect', path='/')
|
||||
client.send_and_consume('websocket.receive', path='/', text={
|
||||
'stream': 'users',
|
||||
'payload': {'action': DELETE, 'pk': user.pk}
|
||||
})
|
||||
# our Demultiplexer route message to the inbound consumer, so call Demultiplexer consumer
|
||||
client.consume('binding.users')
|
||||
|
||||
self.assertIsNone(User.objects.filter(pk=user.pk).first())
|
||||
|
|
Loading…
Reference in New Issue
Block a user