Rework async resources callbacks to .add_done_callback() style (fixes pypy3 issue)

This commit is contained in:
Roman Mogylatov 2020-12-02 21:24:14 -05:00
parent 31b03243a4
commit 3ab2b1f573
2 changed files with 7311 additions and 7862 deletions

File diff suppressed because it is too large Load Diff

View File

@ -3,6 +3,7 @@
from __future__ import absolute_import
import copy
import functools
import inspect
import os
import re
@ -2719,7 +2720,7 @@ cdef class Resource(Provider):
pass
else:
if inspect.isawaitable(shutdown):
return __async_resource_shutdown(self, shutdown)
return self._create_shutdown_future(shutdown)
self.__resource = None
self.__initialized = False
@ -2764,7 +2765,7 @@ cdef class Resource(Provider):
)
self.__initialized = True
self.__async = True
return __async_resource_init(self, async_init, initializer.shutdown)
return self._create_init_future(async_init, initializer.shutdown)
elif inspect.isgeneratorfunction(self.__initializer):
initializer = __call(
self.__initializer,
@ -2789,7 +2790,7 @@ cdef class Resource(Provider):
)
self.__initialized = True
self.__async = True
return __async_resource_init(self, initializer)
return self._create_init_future(initializer)
elif isasyncgenfunction(self.__initializer):
initializer = __call(
self.__initializer,
@ -2802,7 +2803,7 @@ cdef class Resource(Provider):
)
self.__initialized = True
self.__async = True
return __async_resource_init(self, initializer.__anext__(), initializer.asend)
return self._create_init_future(initializer.__anext__(), initializer.asend)
elif callable(self.__initializer):
self.__resource = __call(
self.__initializer,
@ -2819,6 +2820,43 @@ cdef class Resource(Provider):
self.__initialized = True
return self.__resource
def _create_init_future(self, future, shutdowner=None):
callback = self._async_init_callback
if shutdowner:
callback = functools.partial(callback, shutdowner=shutdowner)
future = asyncio.ensure_future(future)
future.add_done_callback(callback)
return future
def _async_init_callback(self, initializer, shutdowner=None):
try:
resource = initializer.result()
except Exception:
self.__initialized = False
raise
else:
self.__resource = resource
self.__shutdowner = shutdowner
def _create_shutdown_future(self, shutdown_future):
future = asyncio.Future()
shutdown_future = asyncio.ensure_future(shutdown_future)
shutdown_future.add_done_callback(functools.partial(self._async_shutdown_callback, future))
return future
def _async_shutdown_callback(self, future_result, shutdowner):
try:
shutdowner.result()
except StopAsyncIteration:
pass
self.__resource = None
self.__initialized = False
self.__shutdowner = None
future_result.set_result(None)
@staticmethod
def _is_resource_subclass(instance):
if sys.version_info < (3, 5):
@ -3468,26 +3506,3 @@ def isasyncgenfunction(obj):
return inspect.isasyncgenfunction(obj)
except AttributeError:
return False
async def __async_resource_init(self: Resource, initializer: object, shutdowner: object = None) -> None:
try:
resource = await initializer
except Exception:
self.__initialized = False
raise
else:
self.__resource = resource
self.__shutdowner = shutdowner
return self.__resource
async def __async_resource_shutdown(self: Resource, shutdowner: object) -> None:
try:
await shutdowner
except StopAsyncIteration:
pass
self.__resource = None
self.__initialized = False
self.__shutdowner = None