Try fix 3.6 tests

This commit is contained in:
Roman Mogylatov 2020-07-27 22:45:48 -04:00
parent 9161b01e97
commit ff4803d6a5

View File

@ -1,7 +1,7 @@
"""Dependency injector coroutine providers unit tests."""
import asyncio
from asyncio import coroutines, events, tasks
from asyncio import coroutines, events
import unittest2 as unittest
@ -34,35 +34,12 @@ def _run(coro, debug=False):
return loop.run_until_complete(coro)
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()
def _cancel_all_tasks(loop):
to_cancel = tasks.all_tasks(loop)
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(
tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler({
'message': 'unhandled exception during asyncio.run() shutdown',
'exception': task.exception(),
'task': task,
})
class CoroutineTests(unittest.TestCase):
def test_init_with_coroutine(self):