This commit is contained in:
Roman Mogylatov 2021-07-16 20:03:56 -04:00
parent 54de3a9d2c
commit f5e76aef28
2 changed files with 4898 additions and 2220 deletions

File diff suppressed because it is too large Load Diff

View File

@ -290,16 +290,53 @@ class DynamicContainer(Container):
def shutdown_resources(self): def shutdown_resources(self):
"""Shutdown all container resources.""" """Shutdown all container resources."""
futures = [] def _is_safe_to_shutdown(resource):
for related in resource.related:
if isinstance(related, providers.Resource) and related.initialized:
return False
return True
for provider in self.traverse(types=[providers.Resource]): def _resources_without_dependencies(resources):
shutdown = provider.shutdown() for resource in resources:
if _is_safe_to_shutdown(resource):
yield resource
if __is_future_or_coroutine(shutdown): async def _async_shutdown(initialized_resources):
futures.append(shutdown) initialized_resources = set(initialized_resources)
while initialized_resources:
resources_to_shutdown = set(_resources_without_dependencies(initialized_resources))
if not resources_to_shutdown:
raise RuntimeError('Unable to resolve resources shutdown order')
await asyncio.gather(
*(
resource.shutdown()
for resource in resources_to_shutdown
),
)
initialized_resources -= resources_to_shutdown
if futures: def _sync_shutdown(initialized_resources):
return asyncio.gather(*futures) initialized_resources = set(initialized_resources)
while initialized_resources:
resources_to_shutdown = set(_resources_without_dependencies(initialized_resources))
if not resources_to_shutdown:
raise RuntimeError('Unable to resolve resources shutdown order')
for resource in resources_to_shutdown:
resource.shutdown()
initialized_resources -= resources_to_shutdown
initialized_resources = list(
filter(
lambda resource: resource.initialized,
self.traverse(types=[providers.Resource])
),
)
is_async = any((resource.is_async_mode_enabled() for resource in initialized_resources))
if is_async:
return _async_shutdown(initialized_resources)
else:
return _sync_shutdown(initialized_resources)
def apply_container_providers_overridings(self): def apply_container_providers_overridings(self):
"""Apply container providers' overridings.""" """Apply container providers' overridings."""