Refactoring and sync + async test

This commit is contained in:
Roman Mogylatov 2021-07-20 18:35:06 -04:00
parent f1073958d4
commit 5a4fc3eaba
3 changed files with 3298 additions and 3087 deletions

File diff suppressed because it is too large Load Diff

View File

@ -290,53 +290,46 @@ class DynamicContainer(Container):
def shutdown_resources(self): def shutdown_resources(self):
"""Shutdown all container resources.""" """Shutdown all container resources."""
def _is_safe_to_shutdown(resource): def _no_initialized_dependencies(resource):
for related in resource.related: for related in resource.related:
if isinstance(related, providers.Resource) and related.initialized: if isinstance(related, providers.Resource) and related.initialized:
return False return False
return True return True
def _resources_without_dependencies(resources): def _without_initialized_dependencies(resources):
for resource in resources: return list(filter(_no_initialized_dependencies, resources))
if _is_safe_to_shutdown(resource):
yield resource
async def _async_shutdown(initialized_resources): def _any_initialized(resources):
initialized_resources = set(initialized_resources) return any(resource.initialized for resource in resources)
while initialized_resources:
resources_to_shutdown = set(_resources_without_dependencies(initialized_resources)) def _any_in_async_mode(resources):
return any(resource.is_async_mode_enabled() for resource in resources)
async def _async_ordered_shutdown(resources):
while _any_initialized(resources):
resources_to_shutdown = _without_initialized_dependencies(resources)
if not resources_to_shutdown: if not resources_to_shutdown:
raise RuntimeError('Unable to resolve resources shutdown order') raise RuntimeError('Unable to resolve resources shutdown order')
await asyncio.gather( futures = []
*( for resource in resources_to_shutdown:
resource.shutdown() result = resource.shutdown()
for resource in resources_to_shutdown if __is_future_or_coroutine(result):
), futures.append(result)
) await asyncio.gather(*futures)
initialized_resources -= resources_to_shutdown
def _sync_shutdown(initialized_resources): def _sync_ordered_shutdown(resources):
initialized_resources = set(initialized_resources) while _any_initialized(resources):
while initialized_resources: resources_to_shutdown = _without_initialized_dependencies(resources)
resources_to_shutdown = set(_resources_without_dependencies(initialized_resources))
if not resources_to_shutdown: if not resources_to_shutdown:
raise RuntimeError('Unable to resolve resources shutdown order') raise RuntimeError('Unable to resolve resources shutdown order')
for resource in resources_to_shutdown: for resource in resources_to_shutdown:
resource.shutdown() resource.shutdown()
initialized_resources -= resources_to_shutdown
initialized_resources = list( resources = list(self.traverse(types=[providers.Resource]))
filter( if _any_in_async_mode(resources):
lambda resource: resource.initialized, return _async_ordered_shutdown(resources)
self.traverse(types=[providers.Resource])
),
)
is_async = any((resource.is_async_mode_enabled() for resource in initialized_resources))
if is_async:
return _async_shutdown(initialized_resources)
else: else:
return _sync_shutdown(initialized_resources) return _sync_ordered_shutdown(resources)
def apply_container_providers_overridings(self): def apply_container_providers_overridings(self):
"""Apply container providers' overridings.""" """Apply container providers' overridings."""

View File

@ -109,3 +109,51 @@ class AsyncResourcesTest(AsyncTestCase):
with self.assertRaises(RuntimeError) as context: with self.assertRaises(RuntimeError) as context:
self._run(container.shutdown_resources()) self._run(container.shutdown_resources())
self.assertEqual(str(context.exception), 'Unable to resolve resources shutdown order') self.assertEqual(str(context.exception), 'Unable to resolve resources shutdown order')
def test_shutdown_sync_and_async_ordering(self):
initialized_resources = []
shutdown_resources = []
def _sync_resource(name, **_):
initialized_resources.append(name)
yield name
shutdown_resources.append(name)
async def _async_resource(name, **_):
initialized_resources.append(name)
yield name
shutdown_resources.append(name)
class Container(containers.DeclarativeContainer):
resource1 = providers.Resource(
_sync_resource,
name='r1',
)
resource2 = providers.Resource(
_sync_resource,
name='r2',
r1=resource1,
)
resource3 = providers.Resource(
_async_resource,
name='r3',
r2=resource2,
)
container = Container()
self._run(container.init_resources())
self.assertEqual(initialized_resources, ['r1', 'r2', 'r3'])
self.assertEqual(shutdown_resources, [])
self._run(container.shutdown_resources())
self.assertEqual(initialized_resources, ['r1', 'r2', 'r3'])
self.assertEqual(shutdown_resources, ['r1', 'r2', 'r3'])
self._run(container.init_resources())
self.assertEqual(initialized_resources, ['r1', 'r2', 'r3', 'r1', 'r2', 'r3'])
self.assertEqual(shutdown_resources, ['r1', 'r2', 'r3'])
self._run(container.shutdown_resources())
self.assertEqual(initialized_resources, ['r1', 'r2', 'r3', 'r1', 'r2', 'r3'])
self.assertEqual(shutdown_resources, ['r1', 'r2', 'r3', 'r1', 'r2', 'r3'])