From 9505906b4254144749445126b6a398661bd1063c Mon Sep 17 00:00:00 2001 From: Krukov D Date: Sun, 8 May 2016 22:21:58 +0300 Subject: [PATCH] Fix re-inserting, with tests (#146) * Stopping re-inserting at first success * Added a few tests for worker running * Coping routes in channels layers at the ChannelTestCase * Remake worker test with less mocking --- channels/tests/base.py | 2 +- channels/tests/test_worker.py | 62 +++++++++++++++++++++++++++++++++-- channels/worker.py | 2 ++ 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/channels/tests/base.py b/channels/tests/base.py index 09ce66f..95bd9af 100644 --- a/channels/tests/base.py +++ b/channels/tests/base.py @@ -31,7 +31,7 @@ class ChannelTestCase(TestCase): ChannelLayerWrapper( InMemoryChannelLayer(), alias, - channel_layers[alias].routing, + channel_layers[alias].routing[:], ) ) diff --git a/channels/tests/test_worker.py b/channels/tests/test_worker.py index 53f8d40..5cff5b7 100644 --- a/channels/tests/test_worker.py +++ b/channels/tests/test_worker.py @@ -1,10 +1,32 @@ from __future__ import unicode_literals -from django.test import SimpleTestCase +try: + from unittest import mock +except ImportError: + import mock + +from channels import Channel, route, DEFAULT_CHANNEL_LAYER +from channels.asgi import channel_layers +from channels.tests import ChannelTestCase from channels.worker import Worker +from channels.exceptions import ConsumeLater -class WorkerTests(SimpleTestCase): +class PatchedWorker(Worker): + """Worker with specific numbers of loops""" + def get_termed(self): + if not self.__iters: + return True + self.__iters -= 1 + return False + + def set_termed(self, value): + self.__iters = value + + termed = property(get_termed, set_termed) + + +class WorkerTests(ChannelTestCase): """ Tests that the router's routing code works correctly. """ @@ -35,3 +57,39 @@ class WorkerTests(SimpleTestCase): worker.apply_channel_filters(["yes.1", "no.1", "maybe.2", "yes"]), ["yes.1"], ) + + def test_run_with_consume_later_error(self): + + # consumer with ConsumeLater error at first call + def _consumer(message, **kwargs): + _consumer._call_count = getattr(_consumer, '_call_count', 0) + 1 + if _consumer._call_count == 1: + raise ConsumeLater() + + Channel('test').send({'test': 'test'}) + channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER] + channel_layer.router.add_route(route('test', _consumer)) + old_send = channel_layer.send + channel_layer.send = mock.Mock(side_effect=old_send) # proxy 'send' for counting + + worker = PatchedWorker(channel_layer) + worker.termed = 2 # first loop with error, second with sending + + worker.run() + self.assertEqual(getattr(_consumer, '_call_count', None), 2) + self.assertEqual(channel_layer.send.call_count, 1) + + def test_normal_run(self): + consumer = mock.Mock() + Channel('test').send({'test': 'test'}) + channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER] + channel_layer.router.add_route(route('test', consumer)) + old_send = channel_layer.send + channel_layer.send = mock.Mock(side_effect=old_send) # proxy 'send' for counting + + worker = PatchedWorker(channel_layer) + worker.termed = 2 + + worker.run() + self.assertEqual(consumer.call_count, 1) + self.assertEqual(channel_layer.send.call_count, 0) diff --git a/channels/worker.py b/channels/worker.py index 6e65492..935ed80 100644 --- a/channels/worker.py +++ b/channels/worker.py @@ -122,5 +122,7 @@ class Worker(object): self.channel_layer.send(channel, content) except self.channel_layer.ChannelFull: time.sleep(0.05) + else: + break except: logger.exception("Error processing message with consumer %s:", name_that_thing(consumer))