From 5ee120419de714a62dd02a777a119089feae8a8d Mon Sep 17 00:00:00 2001 From: Roman Mogylatov Date: Tue, 28 Jul 2020 15:47:59 -0400 Subject: [PATCH] Try fix Python 3.4, 3.5 tests --- tests/unit/providers/test_coroutines_py35.py | 79 ++++++++++++++++---- 1 file changed, 64 insertions(+), 15 deletions(-) diff --git a/tests/unit/providers/test_coroutines_py35.py b/tests/unit/providers/test_coroutines_py35.py index 72a3683d..c9109f26 100644 --- a/tests/unit/providers/test_coroutines_py35.py +++ b/tests/unit/providers/test_coroutines_py35.py @@ -1,6 +1,9 @@ """Dependency injector coroutine providers unit tests.""" import asyncio +import contextlib +import sys +import gc import unittest2 as unittest @@ -10,19 +13,65 @@ from dependency_injector import ( ) -@asyncio.coroutine -def _example(arg1, arg2, arg3, arg4): +async def _example(arg1, arg2, arg3, arg4): future = asyncio.Future() future.set_result(None) - yield from future + await future return arg1, arg2, arg3, arg4 -def run(*args, **kwargs): - return asyncio.run(*args, **kwargs) +def run(main): + loop = asyncio.get_event_loop() + return loop.run_until_complete(main) -class CoroutineTests(unittest.TestCase): +def setup_test_loop( + loop_factory=asyncio.new_event_loop +) -> asyncio.AbstractEventLoop: + loop = loop_factory() + try: + module = loop.__class__.__module__ + skip_watcher = 'uvloop' in module + except AttributeError: # pragma: no cover + # Just in case + skip_watcher = True + asyncio.set_event_loop(loop) + if sys.platform != "win32" and not skip_watcher: + policy = asyncio.get_event_loop_policy() + watcher = asyncio.SafeChildWatcher() # type: ignore + watcher.attach_loop(loop) + with contextlib.suppress(NotImplementedError): + policy.set_child_watcher(watcher) + return loop + + +def teardown_test_loop(loop: asyncio.AbstractEventLoop, + fast: bool=False) -> None: + closed = loop.is_closed() + if not closed: + loop.call_soon(loop.stop) + loop.run_forever() + loop.close() + + if not fast: + gc.collect() + + asyncio.set_event_loop(None) + + +class AsyncTestCase(unittest.TestCase): + + def setUp(self): + self.loop = setup_test_loop() + + def tearDown(self): + teardown_test_loop(self.loop) + + def _run(self, f): + return self.loop.run_until_complete(f) + + +class CoroutineTests(AsyncTestCase): def test_init_with_coroutine(self): self.assertTrue(providers.Coroutine(_example)) @@ -32,12 +81,12 @@ class CoroutineTests(unittest.TestCase): def test_call_with_positional_args(self): provider = providers.Coroutine(_example, 1, 2, 3, 4) - self.assertTupleEqual(run(provider()), (1, 2, 3, 4)) + self.assertTupleEqual(self._run(provider()), (1, 2, 3, 4)) def test_call_with_keyword_args(self): provider = providers.Coroutine(_example, arg1=1, arg2=2, arg3=3, arg4=4) - self.assertTupleEqual(run(provider()), (1, 2, 3, 4)) + self.assertTupleEqual(self._run(provider()), (1, 2, 3, 4)) def test_call_with_positional_and_keyword_args(self): provider = providers.Coroutine(_example, @@ -47,19 +96,19 @@ class CoroutineTests(unittest.TestCase): def test_call_with_context_args(self): provider = providers.Coroutine(_example, 1, 2) - self.assertTupleEqual(run(provider(3, 4)), (1, 2, 3, 4)) + self.assertTupleEqual(self._run(provider(3, 4)), (1, 2, 3, 4)) def test_call_with_context_kwargs(self): provider = providers.Coroutine(_example, arg1=1) self.assertTupleEqual( - run(provider(arg2=2, arg3=3, arg4=4)), + self._run(provider(arg2=2, arg3=3, arg4=4)), (1, 2, 3, 4), ) def test_call_with_context_args_and_kwargs(self): provider = providers.Coroutine(_example, 1) self.assertTupleEqual( - run(provider(2, arg3=3, arg4=4)), + self._run(provider(2, arg3=3, arg4=4)), (1, 2, 3, 4), ) @@ -68,7 +117,7 @@ class CoroutineTests(unittest.TestCase): .add_args(1, 2) \ .add_kwargs(arg3=3, arg4=4) - self.assertTupleEqual(run(provider()), (1, 2, 3, 4)) + self.assertTupleEqual(self._run(provider()), (1, 2, 3, 4)) def test_set_args(self): provider = providers.Coroutine(_example) \ @@ -212,7 +261,7 @@ class DelegatedCoroutineTests(unittest.TestCase): hex(id(provider)))) -class AbstractCoroutineTests(unittest.TestCase): +class AbstractCoroutineTests(AsyncTestCase): def test_inheritance(self): self.assertIsInstance(providers.AbstractCoroutine(_example), @@ -226,7 +275,7 @@ class AbstractCoroutineTests(unittest.TestCase): provider = providers.AbstractCoroutine(_abstract_example) provider.override(providers.Coroutine(_example)) - self.assertTrue(run(provider(1, 2, 3, 4)), (1, 2, 3, 4)) + self.assertTrue(self._run(provider(1, 2, 3, 4)), (1, 2, 3, 4)) def test_call_overridden_by_delegated_coroutine(self): @asyncio.coroutine @@ -236,7 +285,7 @@ class AbstractCoroutineTests(unittest.TestCase): provider = providers.AbstractCoroutine(_abstract_example) provider.override(providers.DelegatedCoroutine(_example)) - self.assertTrue(run(provider(1, 2, 3, 4)), (1, 2, 3, 4)) + self.assertTrue(self._run(provider(1, 2, 3, 4)), (1, 2, 3, 4)) def test_call_not_overridden(self): provider = providers.AbstractCoroutine(_example)