Async mode awaitable fix (#400)

* Fix mistakenly processed awaitable objects

* Update changelog

* Replace __isawaitable() with __is_future_or_coroutine()

* Refactor async mode
This commit is contained in:
Roman Mogylatov 2021-02-17 09:56:39 -05:00 committed by GitHub
parent 6e59b4ab6f
commit 27d0e07718
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 7097 additions and 7266 deletions

View File

@ -7,6 +7,14 @@ that were made in every particular version.
From version 0.7.6 *Dependency Injector* framework strictly
follows `Semantic versioning`_
Development version
-------------------
- Fix mistakenly processed awaitable objects in async mode. This bug has corrupted
``fastapi-redis`` example causing pool exhaustion.
Thanks to Ilya Miroshnichenko and Valery Komarov for finding and reporting
the issue.
- Refactor async mode.
4.23.2
------
- Improve async mode exceptions handling.

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,6 @@ except ImportError:
asyncio = None
import functools
import inspect
cimport cython
@ -362,7 +361,7 @@ cdef inline object __provide_positional_args(
):
cdef int index
cdef list positional_args = []
cdef list awaitables = []
cdef list future_args = []
cdef PositionalInjection injection
if inj_args_len == 0:
@ -373,13 +372,13 @@ cdef inline object __provide_positional_args(
value = __get_value(injection)
positional_args.append(value)
if __isawaitable(value):
awaitables.append((index, value))
if __is_future_or_coroutine(value):
future_args.append((index, value))
positional_args.extend(args)
if awaitables:
return __awaitable_args_kwargs_future(positional_args, awaitables)
if future_args:
return __combine_future_injections(positional_args, future_args)
return positional_args
@ -395,7 +394,7 @@ cdef inline object __provide_keyword_args(
cdef object name
cdef object value
cdef dict prefixed = {}
cdef list awaitables = []
cdef list future_kwargs = []
cdef NamedInjection kw_injection
if len(kwargs) == 0:
@ -404,8 +403,8 @@ cdef inline object __provide_keyword_args(
name = __get_name(kw_injection)
value = __get_value(kw_injection)
kwargs[name] = value
if __isawaitable(value):
awaitables.append((name, value))
if __is_future_or_coroutine(value):
future_kwargs.append((name, value))
else:
kwargs, prefixed = __separate_prefixed_kwargs(kwargs)
@ -423,28 +422,28 @@ cdef inline object __provide_keyword_args(
value = __get_value(kw_injection)
kwargs[name] = value
if __isawaitable(value):
awaitables.append((name, value))
if __is_future_or_coroutine(value):
future_kwargs.append((name, value))
if awaitables:
return __awaitable_args_kwargs_future(kwargs, awaitables)
if future_kwargs:
return __combine_future_injections(kwargs, future_kwargs)
return kwargs
cdef inline object __awaitable_args_kwargs_future(object args, list awaitables):
cdef inline object __combine_future_injections(object injections, list future_injections):
future_result = asyncio.Future()
args_ready = asyncio.gather(*[value for _, value in awaitables])
args_ready.add_done_callback(
injections_ready = asyncio.gather(*[value for _, value in future_injections])
injections_ready.add_done_callback(
functools.partial(
__async_prepare_args_kwargs_callback,
future_result,
args,
awaitables,
injections,
future_injections,
),
)
asyncio.ensure_future(args_ready)
asyncio.ensure_future(injections_ready)
return future_result
@ -452,13 +451,12 @@ cdef inline object __awaitable_args_kwargs_future(object args, list awaitables):
cdef inline void __async_prepare_args_kwargs_callback(
object future_result,
object args,
object awaitables,
object future_args_kwargs,
object future,
):
try:
awaited = future.result()
for value, (key, _) in zip(awaited, awaitables):
result = future.result()
for value, (key, _) in zip(result, future_args_kwargs):
args[key] = value
except Exception as exception:
future_result.set_exception(exception)
@ -471,18 +469,18 @@ cdef inline void __async_prepare_args_kwargs_callback(
cdef inline object __provide_attributes(tuple attributes, int attributes_len):
cdef NamedInjection attr_injection
cdef dict attribute_injections = {}
cdef list awaitables = []
cdef list future_attributes = []
for index in range(attributes_len):
attr_injection = <NamedInjection>attributes[index]
name = __get_name(attr_injection)
value = __get_value(attr_injection)
attribute_injections[name] = value
if __isawaitable(value):
awaitables.append((name, value))
if __is_future_or_coroutine(value):
future_attributes.append((name, value))
if awaitables:
return __awaitable_args_kwargs_future(attribute_injections, awaitables)
if future_attributes:
return __combine_future_injections(attribute_injections, future_attributes)
return attribute_injections
@ -539,23 +537,16 @@ cdef inline object __call(
injection_kwargs_len,
)
args_awaitable = __isawaitable(args)
kwargs_awaitable = __isawaitable(kwargs)
is_future_args = __is_future_or_coroutine(args)
is_future_kwargs = __is_future_or_coroutine(kwargs)
if args_awaitable or kwargs_awaitable:
if not args_awaitable:
future = asyncio.Future()
future.set_result(args)
args = future
if not kwargs_awaitable:
future = asyncio.Future()
future.set_result(kwargs)
kwargs = future
if is_future_args or is_future_kwargs:
future_args = args if is_future_args else __future_result(args)
future_kwargs = kwargs if is_future_kwargs else __future_result(kwargs)
future_result = asyncio.Future()
args_kwargs_ready = asyncio.gather(args, kwargs)
args_kwargs_ready = asyncio.gather(future_args, future_kwargs)
args_kwargs_ready.add_done_callback(
functools.partial(
__async_call_callback,
@ -577,7 +568,7 @@ cdef inline void __async_call_callback(object future_result, object call, object
except Exception as exception:
future_result.set_exception(exception)
else:
if __isawaitable(result):
if __is_future_or_coroutine(result):
result = asyncio.ensure_future(result)
result.add_done_callback(functools.partial(__async_result_callback, future_result))
return
@ -613,38 +604,26 @@ cdef inline object __factory_call(Factory self, tuple args, dict kwargs):
if self.__attributes_len > 0:
attributes = __provide_attributes(self.__attributes, self.__attributes_len)
instance_awaitable = __isawaitable(instance)
attributes_awaitable = __isawaitable(attributes)
is_future_instance = __is_future_or_coroutine(instance)
is_future_attributes = __is_future_or_coroutine(attributes)
if instance_awaitable or attributes_awaitable:
if not instance_awaitable:
future = asyncio.Future()
future.set_result(instance)
instance = future
if not attributes_awaitable:
future = asyncio.Future()
future.set_result(attributes)
attributes = future
return __async_inject_attributes(instance, attributes)
if is_future_instance or is_future_attributes:
future_instance = instance if is_future_instance else __future_result(instance)
future_attributes = attributes if is_future_attributes else __future_result(attributes)
return __async_inject_attributes(future_instance, future_attributes)
__inject_attributes(instance, attributes)
return instance
cdef bint __has_isawaitable = False
cdef inline bint __is_future_or_coroutine(object instance):
if asyncio is None:
return False
return asyncio.isfuture(instance) or asyncio.iscoroutine(instance)
cdef inline bint __isawaitable(object instance):
global __has_isawaitable
if __has_isawaitable is True:
return inspect.isawaitable(instance)
if hasattr(inspect, 'isawaitable'):
__has_isawaitable = True
return inspect.isawaitable(instance)
return False
cdef inline object __future_result(object instance):
future_result = asyncio.Future()
future_result.set_result(instance)
return future_result

View File

@ -192,13 +192,11 @@ cdef class Provider(object):
if self.is_async_mode_disabled():
return result
elif self.is_async_mode_enabled():
if not __isawaitable(result):
future_result = asyncio.Future()
future_result.set_result(result)
return future_result
return result
if __is_future_or_coroutine(result):
return result
return __future_result(result)
elif self.is_async_mode_undefined():
if __isawaitable(result):
if __is_future_or_coroutine(result):
self.enable_async_mode()
else:
self.disable_async_mode()
@ -661,18 +659,16 @@ cdef class Dependency(Provider):
self._check_instance_type(result)
return result
elif self.is_async_mode_enabled():
if __isawaitable(result):
if __is_future_or_coroutine(result):
future_result = asyncio.Future()
result = asyncio.ensure_future(result)
result.add_done_callback(functools.partial(self._async_provide, future_result))
return future_result
else:
self._check_instance_type(result)
future_result = asyncio.Future()
future_result.set_result(result)
return future_result
return __future_result(result)
elif self.is_async_mode_undefined():
if __isawaitable(result):
if __is_future_or_coroutine(result):
self.enable_async_mode()
future_result = asyncio.Future()
@ -2701,7 +2697,7 @@ cdef class Singleton(BaseSingleton):
:rtype: None
"""
if __isawaitable(self.__storage):
if __is_future_or_coroutine(self.__storage):
asyncio.ensure_future(self.__storage).cancel()
self.__storage = None
@ -2710,7 +2706,7 @@ cdef class Singleton(BaseSingleton):
if self.__storage is None:
instance = __factory_call(self.__instantiator, args, kwargs)
if __isawaitable(instance):
if __is_future_or_coroutine(instance):
future_result = asyncio.Future()
instance = asyncio.ensure_future(instance)
instance.add_done_callback(functools.partial(self._async_init_instance, future_result))
@ -2769,7 +2765,7 @@ cdef class ThreadSafeSingleton(BaseSingleton):
:rtype: None
"""
with self.__storage_lock:
if __isawaitable(self.__storage):
if __is_future_or_coroutine(self.__storage):
asyncio.ensure_future(self.__storage).cancel()
self.__storage = None
@ -2783,7 +2779,7 @@ cdef class ThreadSafeSingleton(BaseSingleton):
if self.__storage is None:
instance = __factory_call(self.__instantiator, args, kwargs)
if __isawaitable(instance):
if __is_future_or_coroutine(instance):
future_result = asyncio.Future()
instance = asyncio.ensure_future(instance)
instance.add_done_callback(functools.partial(self._async_init_instance, future_result))
@ -2850,7 +2846,7 @@ cdef class ThreadLocalSingleton(BaseSingleton):
:rtype: None
"""
if __isawaitable(self.__storage.instance):
if __is_future_or_coroutine(self.__storage.instance):
asyncio.ensure_future(self.__storage.instance).cancel()
del self.__storage.instance
@ -2863,7 +2859,7 @@ cdef class ThreadLocalSingleton(BaseSingleton):
except AttributeError:
instance = __factory_call(self.__instantiator, args, kwargs)
if __isawaitable(instance):
if __is_future_or_coroutine(instance):
future_result = asyncio.Future()
instance = asyncio.ensure_future(instance)
instance.add_done_callback(functools.partial(self._async_init_instance, future_result))
@ -3884,7 +3880,7 @@ cdef class AttributeGetter(Provider):
cpdef object _provide(self, tuple args, dict kwargs):
provided = self.__provider(*args, **kwargs)
if __isawaitable(provided):
if __is_future_or_coroutine(provided):
future_result = asyncio.Future()
provided = asyncio.ensure_future(provided)
provided.add_done_callback(functools.partial(self._async_provide, future_result))
@ -3892,9 +3888,13 @@ cdef class AttributeGetter(Provider):
return getattr(provided, self.__attribute)
def _async_provide(self, future_result, future):
provided = future.result()
result = getattr(provided, self.__attribute)
future_result.set_result(result)
try:
provided = future.result()
result = getattr(provided, self.__attribute)
except Exception:
pass
else:
future_result.set_result(result)
cdef class ItemGetter(Provider):
@ -3950,7 +3950,7 @@ cdef class ItemGetter(Provider):
cpdef object _provide(self, tuple args, dict kwargs):
provided = self.__provider(*args, **kwargs)
if __isawaitable(provided):
if __is_future_or_coroutine(provided):
future_result = asyncio.Future()
provided = asyncio.ensure_future(provided)
provided.add_done_callback(functools.partial(self._async_provide, future_result))
@ -4050,7 +4050,7 @@ cdef class MethodCaller(Provider):
cpdef object _provide(self, tuple args, dict kwargs):
call = self.__provider()
if __isawaitable(call):
if __is_future_or_coroutine(call):
future_result = asyncio.Future()
call = asyncio.ensure_future(call)
call.add_done_callback(functools.partial(self._async_provide, future_result, args, kwargs))

View File

@ -987,3 +987,65 @@ class AsyncProvidersWithAsyncDependenciesTests(AsyncTestCase):
service = self._run(container.service())
self.assertEquals(service, {'service': 'ok', 'db': {'db': 'ok'}})
class AsyncProviderWithAwaitableObjectTests(AsyncTestCase):
def test(self):
class SomeResource:
def __await__(self):
raise RuntimeError('Should never happen')
async def init_resource():
pool = SomeResource()
yield pool
class Service:
def __init__(self, resource) -> None:
self.resource = resource
class Container(containers.DeclarativeContainer):
resource = providers.Resource(init_resource)
service = providers.Singleton(Service, resource=resource)
container = Container()
self._run(container.init_resources())
self.assertIsInstance(container.service(), asyncio.Future)
self.assertIsInstance(container.resource(), asyncio.Future)
resource = self._run(container.resource())
service = self._run(container.service())
self.assertIsInstance(resource, SomeResource)
self.assertIsInstance(service.resource, SomeResource)
self.assertIs(service.resource, resource)
def test_without_init_resources(self):
class SomeResource:
def __await__(self):
raise RuntimeError('Should never happen')
async def init_resource():
pool = SomeResource()
yield pool
class Service:
def __init__(self, resource) -> None:
self.resource = resource
class Container(containers.DeclarativeContainer):
resource = providers.Resource(init_resource)
service = providers.Singleton(Service, resource=resource)
container = Container()
self.assertIsInstance(container.service(), asyncio.Future)
self.assertIsInstance(container.resource(), asyncio.Future)
resource = self._run(container.resource())
service = self._run(container.service())
self.assertIsInstance(resource, SomeResource)
self.assertIsInstance(service.resource, SomeResource)
self.assertIs(service.resource, resource)