mirror of
https://github.com/django/daphne.git
synced 2025-07-12 17:02:18 +03:00
Fix re-inserting, with tests (#146)
* Stopping re-inserting at first success * Added a few tests for worker running * Coping routes in channels layers at the ChannelTestCase * Remake worker test with less mocking
This commit is contained in:
parent
619aed9be2
commit
9505906b42
|
@ -31,7 +31,7 @@ class ChannelTestCase(TestCase):
|
|||
ChannelLayerWrapper(
|
||||
InMemoryChannelLayer(),
|
||||
alias,
|
||||
channel_layers[alias].routing,
|
||||
channel_layers[alias].routing[:],
|
||||
)
|
||||
)
|
||||
|
||||
|
|
|
@ -1,10 +1,32 @@
|
|||
from __future__ import unicode_literals
|
||||
from django.test import SimpleTestCase
|
||||
|
||||
try:
|
||||
from unittest import mock
|
||||
except ImportError:
|
||||
import mock
|
||||
|
||||
from channels import Channel, route, DEFAULT_CHANNEL_LAYER
|
||||
from channels.asgi import channel_layers
|
||||
from channels.tests import ChannelTestCase
|
||||
from channels.worker import Worker
|
||||
from channels.exceptions import ConsumeLater
|
||||
|
||||
|
||||
class WorkerTests(SimpleTestCase):
|
||||
class PatchedWorker(Worker):
|
||||
"""Worker with specific numbers of loops"""
|
||||
def get_termed(self):
|
||||
if not self.__iters:
|
||||
return True
|
||||
self.__iters -= 1
|
||||
return False
|
||||
|
||||
def set_termed(self, value):
|
||||
self.__iters = value
|
||||
|
||||
termed = property(get_termed, set_termed)
|
||||
|
||||
|
||||
class WorkerTests(ChannelTestCase):
|
||||
"""
|
||||
Tests that the router's routing code works correctly.
|
||||
"""
|
||||
|
@ -35,3 +57,39 @@ class WorkerTests(SimpleTestCase):
|
|||
worker.apply_channel_filters(["yes.1", "no.1", "maybe.2", "yes"]),
|
||||
["yes.1"],
|
||||
)
|
||||
|
||||
def test_run_with_consume_later_error(self):
|
||||
|
||||
# consumer with ConsumeLater error at first call
|
||||
def _consumer(message, **kwargs):
|
||||
_consumer._call_count = getattr(_consumer, '_call_count', 0) + 1
|
||||
if _consumer._call_count == 1:
|
||||
raise ConsumeLater()
|
||||
|
||||
Channel('test').send({'test': 'test'})
|
||||
channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
channel_layer.router.add_route(route('test', _consumer))
|
||||
old_send = channel_layer.send
|
||||
channel_layer.send = mock.Mock(side_effect=old_send) # proxy 'send' for counting
|
||||
|
||||
worker = PatchedWorker(channel_layer)
|
||||
worker.termed = 2 # first loop with error, second with sending
|
||||
|
||||
worker.run()
|
||||
self.assertEqual(getattr(_consumer, '_call_count', None), 2)
|
||||
self.assertEqual(channel_layer.send.call_count, 1)
|
||||
|
||||
def test_normal_run(self):
|
||||
consumer = mock.Mock()
|
||||
Channel('test').send({'test': 'test'})
|
||||
channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER]
|
||||
channel_layer.router.add_route(route('test', consumer))
|
||||
old_send = channel_layer.send
|
||||
channel_layer.send = mock.Mock(side_effect=old_send) # proxy 'send' for counting
|
||||
|
||||
worker = PatchedWorker(channel_layer)
|
||||
worker.termed = 2
|
||||
|
||||
worker.run()
|
||||
self.assertEqual(consumer.call_count, 1)
|
||||
self.assertEqual(channel_layer.send.call_count, 0)
|
||||
|
|
|
@ -122,5 +122,7 @@ class Worker(object):
|
|||
self.channel_layer.send(channel, content)
|
||||
except self.channel_layer.ChannelFull:
|
||||
time.sleep(0.05)
|
||||
else:
|
||||
break
|
||||
except:
|
||||
logger.exception("Error processing message with consumer %s:", name_that_thing(consumer))
|
||||
|
|
Loading…
Reference in New Issue
Block a user