Refactor async mode

This commit is contained in:
Roman Mogylatov 2021-02-17 09:44:41 -05:00
parent adf1bc7f79
commit f2f89419c0
4 changed files with 7444 additions and 7638 deletions

View File

@ -13,6 +13,7 @@ Development version
``fastapi-redis`` example causing pool exhaustion.
Thanks to Ilya Miroshnichenko and Valery Komarov for finding and reporting
the issue.
- Refactor async mode.
4.23.2
------

File diff suppressed because it is too large Load Diff

View File

@ -17,7 +17,6 @@ cdef class Provider(object):
cdef int __async_mode
cpdef object _provide(self, tuple args, dict kwargs)
cpdef object _process_result(self, object result)
cpdef void _copy_overridings(self, Provider copied, dict memo)
@ -362,7 +361,7 @@ cdef inline object __provide_positional_args(
):
cdef int index
cdef list positional_args = []
cdef list awaitables = []
cdef list future_args = []
cdef PositionalInjection injection
if inj_args_len == 0:
@ -374,12 +373,12 @@ cdef inline object __provide_positional_args(
positional_args.append(value)
if __is_future_or_coroutine(value):
awaitables.append((index, value))
future_args.append((index, value))
positional_args.extend(args)
if awaitables:
return __awaitable_args_kwargs_future(positional_args, awaitables)
if future_args:
return __combine_future_injections(positional_args, future_args)
return positional_args
@ -395,7 +394,7 @@ cdef inline object __provide_keyword_args(
cdef object name
cdef object value
cdef dict prefixed = {}
cdef list awaitables = []
cdef list future_kwargs = []
cdef NamedInjection kw_injection
if len(kwargs) == 0:
@ -405,7 +404,7 @@ cdef inline object __provide_keyword_args(
value = __get_value(kw_injection)
kwargs[name] = value
if __is_future_or_coroutine(value):
awaitables.append((name, value))
future_kwargs.append((name, value))
else:
kwargs, prefixed = __separate_prefixed_kwargs(kwargs)
@ -424,27 +423,27 @@ cdef inline object __provide_keyword_args(
kwargs[name] = value
if __is_future_or_coroutine(value):
awaitables.append((name, value))
future_kwargs.append((name, value))
if awaitables:
return __awaitable_args_kwargs_future(kwargs, awaitables)
if future_kwargs:
return __combine_future_injections(kwargs, future_kwargs)
return kwargs
cdef inline object __awaitable_args_kwargs_future(object args, list awaitables):
cdef inline object __combine_future_injections(object injections, list future_injections):
future_result = asyncio.Future()
args_ready = asyncio.gather(*[value for _, value in awaitables])
args_ready.add_done_callback(
injections_ready = asyncio.gather(*[value for _, value in future_injections])
injections_ready.add_done_callback(
functools.partial(
__async_prepare_args_kwargs_callback,
future_result,
args,
awaitables,
injections,
future_injections,
),
)
asyncio.ensure_future(args_ready)
asyncio.ensure_future(injections_ready)
return future_result
@ -452,13 +451,12 @@ cdef inline object __awaitable_args_kwargs_future(object args, list awaitables):
cdef inline void __async_prepare_args_kwargs_callback(
object future_result,
object args,
object awaitables,
object future_args_kwargs,
object future,
):
try:
awaited = future.result()
for value, (key, _) in zip(awaited, awaitables):
result = future.result()
for value, (key, _) in zip(result, future_args_kwargs):
args[key] = value
except Exception as exception:
future_result.set_exception(exception)
@ -471,7 +469,7 @@ cdef inline void __async_prepare_args_kwargs_callback(
cdef inline object __provide_attributes(tuple attributes, int attributes_len):
cdef NamedInjection attr_injection
cdef dict attribute_injections = {}
cdef list awaitables = []
cdef list future_attributes = []
for index in range(attributes_len):
attr_injection = <NamedInjection>attributes[index]
@ -479,10 +477,10 @@ cdef inline object __provide_attributes(tuple attributes, int attributes_len):
value = __get_value(attr_injection)
attribute_injections[name] = value
if __is_future_or_coroutine(value):
awaitables.append((name, value))
future_attributes.append((name, value))
if awaitables:
return __awaitable_args_kwargs_future(attribute_injections, awaitables)
if future_attributes:
return __combine_future_injections(attribute_injections, future_attributes)
return attribute_injections
@ -539,18 +537,16 @@ cdef inline object __call(
injection_kwargs_len,
)
args_awaitable = __is_future_or_coroutine(args)
kwargs_awaitable = __is_future_or_coroutine(kwargs)
is_future_args = __is_future_or_coroutine(args)
is_future_kwargs = __is_future_or_coroutine(kwargs)
if args_awaitable or kwargs_awaitable:
if not args_awaitable:
args = __future_result(args)
if not kwargs_awaitable:
kwargs = __future_result(kwargs)
if is_future_args or is_future_kwargs:
future_args = args if is_future_args else __future_result(args)
future_kwargs = kwargs if is_future_kwargs else __future_result(kwargs)
future_result = asyncio.Future()
args_kwargs_ready = asyncio.gather(args, kwargs)
args_kwargs_ready = asyncio.gather(future_args, future_kwargs)
args_kwargs_ready.add_done_callback(
functools.partial(
__async_call_callback,
@ -608,15 +604,13 @@ cdef inline object __factory_call(Factory self, tuple args, dict kwargs):
if self.__attributes_len > 0:
attributes = __provide_attributes(self.__attributes, self.__attributes_len)
instance_awaitable = __is_future_or_coroutine(instance)
attributes_awaitable = __is_future_or_coroutine(attributes)
is_future_instance = __is_future_or_coroutine(instance)
is_future_attributes = __is_future_or_coroutine(attributes)
if instance_awaitable or attributes_awaitable:
if not instance_awaitable:
instance = __future_result(instance)
if not attributes_awaitable:
attributes = __future_result(attributes)
return __async_inject_attributes(instance, attributes)
if is_future_instance or is_future_attributes:
future_instance = instance if is_future_instance else __future_result(instance)
future_attributes = attributes if is_future_attributes else __future_result(attributes)
return __async_inject_attributes(future_instance, future_attributes)
__inject_attributes(instance, attributes)

View File

@ -188,7 +188,19 @@ cdef class Provider(object):
result = self.__last_overriding(*args, **kwargs)
else:
result = self._provide(args, kwargs)
return self._process_result(result)
if self.is_async_mode_disabled():
return result
elif self.is_async_mode_enabled():
if __is_future_or_coroutine(result):
return result
return __future_result(result)
elif self.is_async_mode_undefined():
if __is_future_or_coroutine(result):
self.enable_async_mode()
else:
self.disable_async_mode()
return result
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
@ -372,20 +384,6 @@ cdef class Provider(object):
"""
raise NotImplementedError()
cpdef object _process_result(self, object result):
if self.is_async_mode_disabled():
return result
elif self.is_async_mode_enabled():
if __is_future_or_coroutine(result):
return result
return __future_result(result)
elif self.is_async_mode_undefined():
if __is_future_or_coroutine(result):
self.enable_async_mode()
else:
self.disable_async_mode()
return result
cpdef void _copy_overridings(self, Provider copied, dict memo):
"""Copy provider overridings to a newly copied provider."""
copied.__overridden = deepcopy(self.__overridden, memo)