python-dependency-injector/src/dependency_injector/providers.pyx
Roman Mogylatov 4cc39fc6eb
369 Add required argument to config from_* methods (#376)
* Update typing stubs

* Update from_yaml() method

* Update from_ini() method

* Update from_dict() method

* Update from_env() method

* Update documentation

* Update changelog

* Update changelog

* Make doc block fix

* Add extra test for from_ini()
2021-01-24 10:27:45 -05:00

3881 lines
112 KiB
Cython

"""Providers module."""
from __future__ import absolute_import
import copy
import errno
import functools
import inspect
import os
import re
import sys
import types
import threading
import warnings
import weakref
try:
import asyncio
except ImportError:
asyncio = None
_is_coroutine_marker = None
else:
if sys.version_info >= (3, 5, 3):
import asyncio.coroutines
_is_coroutine_marker = asyncio.coroutines._is_coroutine
else:
_is_coroutine_marker = True
try:
import ConfigParser as iniconfigparser
except ImportError:
import configparser as iniconfigparser
try:
import yaml
except ImportError:
yaml = None
from .errors import (
Error,
NoSuchProviderError,
)
cimport cython
if sys.version_info[0] == 3: # pragma: no cover
CLASS_TYPES = (type,)
else: # pragma: no cover
CLASS_TYPES = (type, types.ClassType)
copy._deepcopy_dispatch[types.MethodType] = \
lambda obj, memo: type(obj)(obj.im_func,
copy.deepcopy(obj.im_self, memo),
obj.im_class)
if sys.version_info[0] == 3:
class EnvInterpolation(iniconfigparser.BasicInterpolation):
"""Interpolation which expands environment variables in values."""
def before_get(self, parser, section, option, value, defaults):
value = super().before_get(parser, section, option, value, defaults)
return os.path.expandvars(value)
def _parse_ini_file(filepath):
parser = iniconfigparser.ConfigParser(interpolation=EnvInterpolation())
with open(filepath) as config_file:
parser.read_file(config_file)
return parser
else:
import StringIO
def _parse_ini_file(filepath):
parser = iniconfigparser.ConfigParser()
with open(filepath) as config_file:
config_string = os.path.expandvars(config_file.read())
parser.readfp(StringIO.StringIO(config_string))
return parser
if yaml:
# TODO: use SafeLoader without env interpolation by default in version 5.*
yaml_env_marker_pattern = re.compile(r'\$\{([^}^{]+)\}')
def yaml_env_marker_constructor(_, node):
""""Replace environment variable marker with its value."""
return os.path.expandvars(node.value)
yaml.add_implicit_resolver('!path', yaml_env_marker_pattern)
yaml.add_constructor('!path', yaml_env_marker_constructor)
class YamlLoader(yaml.SafeLoader):
"""Custom YAML loader.
Inherits ``yaml.SafeLoader`` and add environment variables interpolation.
"""
YamlLoader.add_implicit_resolver('!path', yaml_env_marker_pattern, None)
YamlLoader.add_constructor('!path', yaml_env_marker_constructor)
else:
class YamlLoader:
"""Custom YAML loader.
Inherits ``yaml.SafeLoader`` and add environment variables interpolation.
"""
UNDEFINED = object()
cdef int ASYNC_MODE_UNDEFINED = 0
cdef int ASYNC_MODE_ENABLED = 1
cdef int ASYNC_MODE_DISABLED = 2
cdef class Provider(object):
"""Base provider class.
:py:class:`Provider` is callable (implements ``__call__`` method). Every
call to provider object returns provided result, according to the providing
strategy of particular provider. This ``callable`` functionality is a
regular part of providers API and it should be the same for all provider's
subclasses.
Implementation of particular providing strategy should be done in
:py:meth:`Provider._provide` of :py:class:`Provider` subclass. Current
method is called every time when not overridden provider is called.
:py:class:`Provider` implements provider overriding logic that should be
also common for all providers:
.. code-block:: python
provider1 = Factory(SomeClass)
provider2 = Factory(ChildSomeClass)
provider1.override(provider2)
some_instance = provider1()
assert isinstance(some_instance, ChildSomeClass)
Also :py:class:`Provider` implements helper function for creating its
delegates:
.. code-block:: python
provider = Factory(object)
delegate = provider.delegate()
delegated = delegate()
assert provider is delegated
All providers should extend this class.
.. py:attribute:: overridden
:noindex:
Tuple of overriding providers, if any.
:type: tuple[:py:class:`Provider`] | None
"""
__IS_PROVIDER__ = True
overriding_lock = threading.RLock()
"""Overriding reentrant lock.
:type: :py:class:`threading.RLock`
"""
def __init__(self):
"""Initializer."""
self.__overridden = tuple()
self.__last_overriding = None
self.__async_mode = ASYNC_MODE_UNDEFINED
super(Provider, self).__init__()
def __call__(self, *args, **kwargs):
"""Return provided object.
Callable interface implementation.
"""
if self.__last_overriding is not None:
result = self.__last_overriding(*args, **kwargs)
else:
result = self._provide(args, kwargs)
if self.is_async_mode_disabled():
return result
elif self.is_async_mode_enabled():
if not __isawaitable(result):
future_result = asyncio.Future()
future_result.set_result(result)
return future_result
return result
elif self.is_async_mode_undefined():
if __isawaitable(result):
self.enable_async_mode()
else:
self.disable_async_mode()
return result
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
copied = self.__class__()
self._copy_overridings(copied, memo)
return copied
@classmethod
def __class_getitem__(cls, item):
return cls
def __str__(self):
"""Return string representation of provider.
:rtype: str
"""
return represent_provider(provider=self, provides=None)
def __repr__(self):
"""Return string representation of provider.
:rtype: str
"""
return self.__str__()
@property
def overridden(self):
"""Return tuple of overriding providers."""
with self.overriding_lock:
return self.__overridden
@property
def last_overriding(self):
"""Return last overriding provider.
If provider is not overridden, then None is returned.
"""
return self.__last_overriding
def override(self, provider):
"""Override provider with another provider.
:param provider: Overriding provider.
:type provider: :py:class:`Provider`
:raise: :py:exc:`dependency_injector.errors.Error`
:return: Overriding context.
:rtype: :py:class:`OverridingContext`
"""
if provider is self:
raise Error('Provider {0} could not be overridden '
'with itself'.format(self))
if not is_provider(provider):
provider = Object(provider)
with self.overriding_lock:
self.__overridden += (provider,)
self.__last_overriding = provider
return OverridingContext(self, provider)
def reset_last_overriding(self):
"""Reset last overriding provider.
:raise: :py:exc:`dependency_injector.errors.Error` if provider is not
overridden.
:rtype: None
"""
with self.overriding_lock:
if len(self.__overridden) == 0:
raise Error('Provider {0} is not overridden'.format(str(self)))
self.__overridden = self.__overridden[:-1]
try:
self.__last_overriding = self.__overridden[-1]
except IndexError:
self.__last_overriding = None
def reset_override(self):
"""Reset all overriding providers.
:rtype: None
"""
with self.overriding_lock:
self.__overridden = tuple()
self.__last_overriding = None
def async_(self, *args, **kwargs):
"""Return provided object asynchronously.
This method is a synonym of __call__().
It provides typing stubs for correct type checking with
`await` expression:
.. code-block:: python
database_provider: Provider[DatabaseConnection] = Resource(init_db_async)
async def main():
db: DatabaseConnection = await database_provider.async_()
...
"""
return self.__call__(*args, **kwargs)
def delegate(self):
"""Return provider's delegate.
:rtype: :py:class:`Delegate`
"""
warnings.warn(
'Method ".delegate()" is deprecated since version 4.0.0. '
'Use ".provider" attribute instead.',
category=DeprecationWarning,
)
return Delegate(self)
@property
def provider(self):
"""Return provider's delegate.
:rtype: :py:class:`Delegate`
"""
return Delegate(self)
@property
def provided(self):
"""Return :py:class:`ProvidedInstance` provider."""
return ProvidedInstance(self)
def enable_async_mode(self):
"""Enable async mode."""
self.__async_mode = ASYNC_MODE_ENABLED
def disable_async_mode(self):
"""Disable async mode."""
self.__async_mode = ASYNC_MODE_DISABLED
def reset_async_mode(self):
"""Reset async mode.
Provider will automatically set the mode on the next call.
"""
self.__async_mode = ASYNC_MODE_UNDEFINED
def is_async_mode_enabled(self):
"""Check if async mode is enabled."""
return self.__async_mode == ASYNC_MODE_ENABLED
def is_async_mode_disabled(self):
"""Check if async mode is disabled."""
return self.__async_mode == ASYNC_MODE_DISABLED
def is_async_mode_undefined(self):
"""Check if async mode is undefined."""
return self.__async_mode == ASYNC_MODE_UNDEFINED
cpdef object _provide(self, tuple args, dict kwargs):
"""Providing strategy implementation.
Abstract protected method that implements providing strategy of
particular provider. Current method is called every time when not
overridden provider is called. Need to be overridden in subclasses.
"""
raise NotImplementedError()
cpdef void _copy_overridings(self, Provider copied, dict memo):
"""Copy provider overridings to a newly copied provider."""
copied.__overridden = deepcopy(self.__overridden, memo)
copied.__last_overriding = deepcopy(self.__last_overriding, memo)
cdef class Object(Provider):
"""Object provider returns provided instance "as is".
.. py:attribute:: provides
Value that have to be provided.
:type: object
"""
def __init__(self, provides):
"""Initializer.
:param provides: Value that have to be provided.
:type provides: object
"""
self.__provides = provides
super(Object, self).__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
copied = self.__class__(self.__provides)
self._copy_overridings(copied, memo)
return copied
def __str__(self):
"""Return string representation of provider.
:rtype: str
"""
return represent_provider(provider=self, provides=self.__provides)
def __repr__(self):
"""Return string representation of provider.
:rtype: str
"""
return self.__str__()
cpdef object _provide(self, tuple args, dict kwargs):
"""Return provided instance.
:param args: Tuple of context positional arguments.
:type args: tuple[object]
:param kwargs: Dictionary of context keyword arguments.
:type kwargs: dict[str, object]
:rtype: object
"""
return self.__provides
cdef class Delegate(Provider):
"""Delegate provider returns provider "as is".
.. py:attribute:: provides
Value that have to be provided.
:type: object
"""
def __init__(self, provides):
"""Initializer.
:param provides: Value that have to be provided.
:type provides: :py:class:`Provider`
"""
self.__provides = ensure_is_provider(provides)
super(Delegate, self).__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
copied = self.__class__(deepcopy(self.__provides, memo))
self._copy_overridings(copied, memo)
return copied
def __str__(self):
"""Return string representation of provider.
:rtype: str
"""
return represent_provider(provider=self, provides=self.__provides)
def __repr__(self):
"""Return string representation of provider.
:rtype: str
"""
return self.__str__()
@property
def provides(self):
"""Return provider."""
return self.__provides
cpdef object _provide(self, tuple args, dict kwargs):
"""Return provided instance.
:param args: Tuple of context positional arguments.
:type args: tuple[object]
:param kwargs: Dictionary of context keyword arguments.
:type kwargs: dict[str, object]
:rtype: object
"""
return self.__provides
cdef class Dependency(Provider):
""":py:class:`Dependency` provider describes dependency interface.
This provider is used for description of dependency interface. That might
be useful when dependency could be provided in the client's code only,
but it's interface is known. Such situations could happen when required
dependency has non-deterministic list of dependencies itself.
.. code-block:: python
database_provider = Dependency(sqlite3.dbapi2.Connection)
database_provider.override(Factory(sqlite3.connect, ':memory:'))
database = database_provider()
.. py:attribute:: instance_of
:noindex:
Class of required dependency.
:type: type
"""
def __init__(self, object instance_of=object):
"""Initializer."""
if not isinstance(instance_of, CLASS_TYPES):
raise TypeError(
'Argument \'instance_of\' has incorrect type (expected {0}, got {1}))'.format(
CLASS_TYPES,
instance_of,
)
)
self.__instance_of = instance_of
super(Dependency, self).__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
copied = self.__class__(self.__instance_of)
self._copy_overridings(copied, memo)
return copied
def __call__(self, *args, **kwargs):
"""Return provided instance.
:raise: :py:exc:`dependency_injector.errors.Error`
:rtype: object
"""
if self.__last_overriding is None:
raise Error('Dependency is not defined')
result = self.__last_overriding(*args, **kwargs)
if self.is_async_mode_disabled():
self._check_instance_type(result)
return result
elif self.is_async_mode_enabled():
if __isawaitable(result):
future_result = asyncio.Future()
result = asyncio.ensure_future(result)
result.add_done_callback(functools.partial(self._async_provide, future_result))
return future_result
else:
self._check_instance_type(result)
future_result = asyncio.Future()
future_result.set_result(result)
return future_result
elif self.is_async_mode_undefined():
if __isawaitable(result):
self.enable_async_mode()
future_result = asyncio.Future()
result = asyncio.ensure_future(result)
result.add_done_callback(functools.partial(self._async_provide, future_result))
return future_result
else:
self.disable_async_mode()
self._check_instance_type(result)
return result
def __str__(self):
"""Return string representation of provider.
:rtype: str
"""
return represent_provider(provider=self, provides=self.__instance_of)
def __repr__(self):
"""Return string representation of provider.
:rtype: str
"""
return self.__str__()
@property
def instance_of(self):
"""Return class of required dependency."""
return self.__instance_of
def provided_by(self, provider):
"""Set external dependency provider.
:param provider: Provider that provides required dependency.
:type provider: :py:class:`Provider`
:rtype: None
"""
return self.override(provider)
def _async_provide(self, future_result, future):
instance = future.result()
try:
self._check_instance_type(instance)
except Error as exception:
future_result.set_exception(exception)
else:
future_result.set_result(instance)
def _check_instance_type(self, instance):
if not isinstance(instance, self.instance_of):
raise Error('{0} is not an instance of {1}'.format(instance, self.instance_of))
cdef class ExternalDependency(Dependency):
""":py:class:`ExternalDependency` provider describes dependency interface.
This provider is used for description of dependency interface. That might
be useful when dependency could be provided in the client's code only,
but it's interface is known. Such situations could happen when required
dependency has non-deterministic list of dependencies itself.
.. code-block:: python
database_provider = ExternalDependency(sqlite3.dbapi2.Connection)
database_provider.override(Factory(sqlite3.connect, ':memory:'))
database = database_provider()
.. deprecated:: 3.9
Use :py:class:`Dependency` instead.
.. py:attribute:: instance_of
:noindex:
Class of required dependency.
:type: type
"""
cdef class DependenciesContainer(Object):
""":py:class:`DependenciesContainer` provider provides set of dependencies.
Dependencies container provider is used to implement late static binding
for a set of providers of a particular container.
Example code:
.. code-block:: python
class Adapters(containers.DeclarativeContainer):
email_sender = providers.Singleton(SmtpEmailSender)
class TestAdapters(containers.DeclarativeContainer):
email_sender = providers.Singleton(EchoEmailSender)
class UseCases(containers.DeclarativeContainer):
adapters = providers.DependenciesContainer()
signup = providers.Factory(SignupUseCase,
email_sender=adapters.email_sender)
use_cases = UseCases(adapters=Adapters)
# or
use_cases = UseCases(adapters=TestAdapters)
# Another file
from .containers import use_cases
use_case = use_cases.signup()
use_case.execute()
"""
def __init__(self, **dependencies):
"""Initializer."""
self.__providers = dependencies
super(DependenciesContainer, self).__init__(None)
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
cdef DependenciesContainer copied
copied = memo.get(id(self))
if copied is not None:
return copied
copied = self.__class__()
copied.__provides = deepcopy(self.__provides, memo)
copied.__providers = deepcopy(self.__providers, memo)
self._copy_overridings(copied, memo)
return copied
def __getattr__(self, name):
"""Return dependency provider."""
if name.startswith('__') and name.endswith('__'):
raise AttributeError(
'\'{cls}\' object has no attribute '
'\'{attribute_name}\''.format(cls=self.__class__.__name__,
attribute_name=name))
provider = self.__providers.get(name)
if not provider:
provider = Dependency()
self.__providers[name] = provider
container = self.__call__()
if container:
dependency_provider = container.providers.get(name)
if dependency_provider:
provider.override(dependency_provider)
return provider
@property
def providers(self):
"""Read-only dictionary of dependency providers."""
return self.__providers
def override(self, provider):
"""Override provider with another provider.
:param provider: Overriding provider.
:type provider: :py:class:`Provider`
:raise: :py:exc:`dependency_injector.errors.Error`
:return: Overriding context.
:rtype: :py:class:`OverridingContext`
"""
self._override_providers(container=provider)
return super(DependenciesContainer, self).override(provider)
def reset_last_overriding(self):
"""Reset last overriding provider.
:raise: :py:exc:`dependency_injector.errors.Error` if provider is not
overridden.
:rtype: None
"""
for child in self.__providers.values():
try:
child.reset_last_overriding()
except Error:
pass
super(DependenciesContainer, self).reset_last_overriding()
def reset_override(self):
"""Reset all overriding providers.
:rtype: None
"""
for child in self.__providers.values():
child.reset_override()
super(DependenciesContainer, self).reset_override()
cpdef object _override_providers(self, object container):
"""Override providers with providers from provided container."""
for name, dependency_provider in container.providers.items():
provider = getattr(self, name)
if provider.last_overriding is dependency_provider:
continue
provider.override(dependency_provider)
cdef class OverridingContext(object):
"""Provider overriding context.
:py:class:`OverridingContext` is used by :py:meth:`Provider.override` for
implementing ``with`` contexts. When :py:class:`OverridingContext` is
closed, overriding that was created in this context is dropped also.
.. code-block:: python
with provider.override(another_provider):
assert provider.overridden
assert not provider.overridden
"""
def __init__(self, Provider overridden, Provider overriding):
"""Initializer.
:param overridden: Overridden provider.
:type overridden: :py:class:`Provider`
:param overriding: Overriding provider.
:type overriding: :py:class:`Provider`
"""
self.__overridden = overridden
self.__overriding = overriding
super(OverridingContext, self).__init__()
def __enter__(self):
"""Do nothing."""
return self.__overriding
def __exit__(self, *_):
"""Exit overriding context."""
self.__overridden.reset_last_overriding()
cdef class Callable(Provider):
r"""Callable provider calls wrapped callable on every call.
Callable supports positional and keyword argument injections:
.. code-block:: python
some_function = Callable(some_function,
'positional_arg1', 'positional_arg2',
keyword_argument1=3, keyword_argument=4)
# or
some_function = Callable(some_function) \
.add_args('positional_arg1', 'positional_arg2') \
.add_kwargs(keyword_argument1=3, keyword_argument=4)
# or
some_function = Callable(some_function)
some_function.add_args('positional_arg1', 'positional_arg2')
some_function.add_kwargs(keyword_argument1=3, keyword_argument=4)
"""
def __init__(self, provides, *args, **kwargs):
"""Initializer.
:param provides: Wrapped callable.
:type provides: callable
"""
if not callable(provides):
raise Error('Provider {0} expected to get callable, '
'got {0}'.format('.'.join((self.__class__.__module__,
self.__class__.__name__)),
provides))
self.__provides = provides
self.__args = tuple()
self.__args_len = 0
self.set_args(*args)
self.__kwargs = tuple()
self.__kwargs_len = 0
self.set_kwargs(**kwargs)
super(Callable, self).__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
provides = self.provides
if isinstance(provides, Provider):
provides = deepcopy(provides, memo)
copied = self.__class__(provides,
*deepcopy(self.args, memo),
**deepcopy(self.kwargs, memo))
self._copy_overridings(copied, memo)
return copied
def __str__(self):
"""Return string representation of provider.
:rtype: str
"""
return represent_provider(provider=self, provides=self.__provides)
@property
def provides(self):
"""Return wrapped callable."""
return self.__provides
@property
def args(self):
"""Return positional argument injections."""
cdef int index
cdef PositionalInjection arg
cdef list args
args = list()
for index in range(self.__args_len):
arg = self.__args[index]
args.append(arg.__value)
return tuple(args)
def add_args(self, *args):
"""Add positional argument injections.
:return: Reference ``self``
"""
self.__args += parse_positional_injections(args)
self.__args_len = len(self.__args)
return self
def set_args(self, *args):
"""Set positional argument injections.
Existing positional argument injections are dropped.
:return: Reference ``self``
"""
self.__args = parse_positional_injections(args)
self.__args_len = len(self.__args)
return self
def clear_args(self):
"""Drop positional argument injections.
:return: Reference ``self``
"""
self.__args = tuple()
self.__args_len = len(self.__args)
return self
@property
def kwargs(self):
"""Return keyword argument injections."""
cdef int index
cdef NamedInjection kwarg
cdef dict kwargs
kwargs = dict()
for index in range(self.__kwargs_len):
kwarg = self.__kwargs[index]
kwargs[kwarg.__name] = kwarg.__value
return kwargs
def add_kwargs(self, **kwargs):
"""Add keyword argument injections.
:return: Reference ``self``
"""
self.__kwargs += parse_named_injections(kwargs)
self.__kwargs_len = len(self.__kwargs)
return self
def set_kwargs(self, **kwargs):
"""Set keyword argument injections.
Existing keyword argument injections are dropped.
:return: Reference ``self``
"""
self.__kwargs = parse_named_injections(kwargs)
self.__kwargs_len = len(self.__kwargs)
return self
def clear_kwargs(self):
"""Drop keyword argument injections.
:return: Reference ``self``
"""
self.__kwargs = tuple()
self.__kwargs_len = len(self.__kwargs)
return self
cpdef object _provide(self, tuple args, dict kwargs):
"""Return result of provided callable's call."""
return __callable_call(self, args, kwargs)
cdef class DelegatedCallable(Callable):
"""Callable that is injected "as is".
DelegatedCallable is a :py:class:`Callable`, that is injected "as is".
"""
__IS_DELEGATED__ = True
cdef class AbstractCallable(Callable):
"""Abstract callable provider.
:py:class:`AbstractCallable` is a :py:class:`Callable` provider that must
be explicitly overridden before calling.
Overriding of :py:class:`AbstractCallable` is possible only by another
:py:class:`Callable` provider.
"""
def __call__(self, *args, **kwargs):
"""Return provided object.
Callable interface implementation.
"""
if self.__last_overriding is None:
raise Error('{0} must be overridden before calling'.format(self))
return super().__call__(*args, **kwargs)
def override(self, provider):
"""Override provider with another provider.
:param provider: Overriding provider.
:type provider: :py:class:`Provider`
:raise: :py:exc:`dependency_injector.errors.Error`
:return: Overriding context.
:rtype: :py:class:`OverridingContext`
"""
if not isinstance(provider, Callable):
raise Error('{0} must be overridden only by '
'{1} providers'.format(self, Callable))
return super(AbstractCallable, self).override(provider)
cpdef object _provide(self, tuple args, dict kwargs):
"""Return result of provided callable's call."""
raise NotImplementedError('Abstract provider forward providing logic '
'to overriding provider')
cdef class CallableDelegate(Delegate):
"""Callable delegate injects delegating callable "as is".
.. py:attribute:: provides
Value that have to be provided.
:type: object
"""
def __init__(self, callable):
"""Initializer.
:param callable: Value that have to be provided.
:type callable: object
"""
if isinstance(callable, Callable) is False:
raise Error('{0} can wrap only {1} providers'.format(
self.__class__, Callable))
super(CallableDelegate, self).__init__(callable)
cdef class Coroutine(Callable):
r"""Coroutine provider creates wrapped coroutine on every call.
Coroutine supports positional and keyword argument injections:
.. code-block:: python
some_coroutine = Coroutine(some_coroutine,
'positional_arg1', 'positional_arg2',
keyword_argument1=3, keyword_argument=4)
# or
some_coroutine = Coroutine(some_coroutine) \
.add_args('positional_arg1', 'positional_arg2') \
.add_kwargs(keyword_argument1=3, keyword_argument=4)
# or
some_coroutine = Coroutine(some_coroutine)
some_coroutine.add_args('positional_arg1', 'positional_arg2')
some_coroutine.add_kwargs(keyword_argument1=3, keyword_argument=4)
"""
_is_coroutine = _is_coroutine_marker
def __init__(self, provides, *args, **kwargs):
"""Initializer.
:param provides: Wrapped callable.
:type provides: callable
"""
if not asyncio:
raise Error('Package asyncio is not available')
if not asyncio.iscoroutinefunction(provides):
raise Error('Provider {0} expected to get coroutine function, '
'got {1}'.format('.'.join((self.__class__.__module__,
self.__class__.__name__)),
provides))
super(Coroutine, self).__init__(provides, *args, **kwargs)
cdef class DelegatedCoroutine(Coroutine):
"""Coroutine provider that is injected "as is".
DelegatedCoroutine is a :py:class:`Coroutine`, that is injected "as is".
"""
__IS_DELEGATED__ = True
cdef class AbstractCoroutine(Coroutine):
"""Abstract coroutine provider.
:py:class:`AbstractCoroutine` is a :py:class:`Coroutine` provider that must
be explicitly overridden before calling.
Overriding of :py:class:`AbstractCoroutine` is possible only by another
:py:class:`Coroutine` provider.
"""
def __call__(self, *args, **kwargs):
"""Return provided object.
Callable interface implementation.
"""
if self.__last_overriding is None:
raise Error('{0} must be overridden before calling'.format(self))
return super().__call__(*args, **kwargs)
def override(self, provider):
"""Override provider with another provider.
:param provider: Overriding provider.
:type provider: :py:class:`Provider`
:raise: :py:exc:`dependency_injector.errors.Error`
:return: Overriding context.
:rtype: :py:class:`OverridingContext`
"""
if not isinstance(provider, Coroutine):
raise Error('{0} must be overridden only by '
'{1} providers'.format(self, Coroutine))
return super(AbstractCoroutine, self).override(provider)
cpdef object _provide(self, tuple args, dict kwargs):
"""Return result of provided callable's call."""
raise NotImplementedError('Abstract provider forward providing logic '
'to overriding provider')
cdef class CoroutineDelegate(Delegate):
"""Coroutine delegate injects delegating coroutine "as is".
.. py:attribute:: provides
Value that have to be provided.
:type: object
"""
def __init__(self, coroutine):
"""Initializer.
:param coroutine: Value that have to be provided.
:type coroutine: object
"""
if isinstance(coroutine, Coroutine) is False:
raise Error('{0} can wrap only {1} providers'.format(
self.__class__, Callable))
super(CoroutineDelegate, self).__init__(coroutine)
cdef class ConfigurationOption(Provider):
"""Child configuration option provider.
This provider should not be used directly. It is a part of the
:py:class:`Configuration` provider.
"""
def __init__(self, name, root, required=False):
self.__name = name
self.__root_ref = weakref.ref(root)
self.__children = {}
self.__required = required
self.__cache = UNDEFINED
super().__init__()
def __deepcopy__(self, memo):
cdef ConfigurationOption copied
copied = memo.get(id(self))
if copied is not None:
return copied
copied_name = deepcopy(self.__name, memo)
root = self.__root_ref()
copied_root = memo.get(id(root))
if copied_root is None:
copied_root = deepcopy(root, memo)
copied = self.__class__(copied_name, copied_root, self.__required)
copied.__children = deepcopy(self.__children, memo)
return copied
def __str__(self):
return represent_provider(provider=self, provides=self.get_name())
def __getattr__(self, item):
if item.startswith('__') and item.endswith('__'):
raise AttributeError(
'\'{cls}\' object has no attribute '
'\'{attribute_name}\''.format(cls=self.__class__.__name__,
attribute_name=item))
child = self.__children.get(item)
if child is None:
child_name = self.__name + (item,)
child = ConfigurationOption(child_name, self.__root_ref())
self.__children[item] = child
return child
def __getitem__(self, item):
child = self.__children.get(item)
if child is None:
child_name = self.__name + (item,)
child = ConfigurationOption(child_name, self.__root_ref())
self.__children[item] = child
return child
cpdef object _provide(self, tuple args, dict kwargs):
"""Return new instance."""
if self.__cache is not UNDEFINED:
return self.__cache
root = self.__root_ref()
value = root.get(self._get_self_name(), self.__required)
self.__cache = value
return value
def _get_self_name(self):
return '.'.join(
segment() if is_provider(segment) else segment for segment in self.__name
)
@property
def root(self):
return self.__root_ref()
def get_name(self):
root = self.__root_ref()
return '.'.join((root.get_name(), self._get_self_name()))
def get_name_segments(self):
return self.__name
def as_int(self):
return TypedConfigurationOption(int, self)
def as_float(self):
return TypedConfigurationOption(float, self)
def as_(self, callback, *args, **kwargs):
return TypedConfigurationOption(callback, self, *args, **kwargs)
def required(self):
return self.__class__(self.__name, self.__root_ref(), required=True)
def is_required(self):
return self.__required
def override(self, value):
if isinstance(value, Provider):
raise Error('Configuration option can only be overridden by a value')
root = self.__root_ref()
return root.set(self._get_self_name(), value)
def reset_last_overriding(self):
raise Error('Configuration option does not support this method')
def reset_override(self):
raise Error('Configuration option does not support this method')
def reset_cache(self):
self.__cache = UNDEFINED
for child in self.__children.values():
child.reset_cache()
def update(self, value):
"""Set configuration options.
.. deprecated:: 3.11
Use :py:meth:`Configuration.override` instead.
:param value: Value of configuration option.
:type value: object | dict
:rtype: None
"""
self.override(value)
def from_ini(self, filepath, required=UNDEFINED):
"""Load configuration from the ini file.
Loaded configuration is merged recursively over existing configuration.
:param filepath: Path to the configuration file.
:type filepath: str
:param required: When required is True, raise an exception if file does not exist.
:type required: bool
:rtype: None
"""
try:
parser = _parse_ini_file(filepath)
except IOError as exception:
if required is not False \
and (self._is_strict_mode_enabled() or required is True) \
and exception.errno in (errno.ENOENT, errno.EISDIR):
exception.strerror = 'Unable to load configuration file {0}'.format(exception.strerror)
raise
return
config = {}
for section in parser.sections():
config[section] = dict(parser.items(section))
current_config = self.__call__()
if not current_config:
current_config = {}
self.override(merge_dicts(current_config, config))
def from_yaml(self, filepath, required=UNDEFINED, loader=None):
"""Load configuration from the yaml file.
Loaded configuration is merged recursively over existing configuration.
:param filepath: Path to the configuration file.
:type filepath: str
:param required: When required is True, raise an exception if file does not exist.
:type required: bool
:param loader: YAML loader, :py:class:`YamlLoader` is used if not specified.
:type loader: ``yaml.Loader``
:rtype: None
"""
if yaml is None:
raise Error(
'Unable to load yaml configuration - PyYAML is not installed. '
'Install PyYAML or install Dependency Injector with yaml extras: '
'"pip install dependency-injector[yaml]"'
)
if loader is None:
loader = YamlLoader
try:
with open(filepath) as opened_file:
config = yaml.load(opened_file, loader)
except IOError as exception:
if required is not False \
and (self._is_strict_mode_enabled() or required is True) \
and exception.errno in (errno.ENOENT, errno.EISDIR):
exception.strerror = 'Unable to load configuration file {0}'.format(exception.strerror)
raise
return
current_config = self.__call__()
if not current_config:
current_config = {}
self.override(merge_dicts(current_config, config))
def from_dict(self, options, required=UNDEFINED):
"""Load configuration from the dictionary.
Loaded configuration is merged recursively over existing configuration.
:param options: Configuration options.
:type options: dict
:param required: When required is True, raise an exception if dictionary is empty.
:type required: bool
:rtype: None
"""
if required is not False \
and (self._is_strict_mode_enabled() or required is True) \
and not options:
raise ValueError('Can not use empty dictionary')
try:
current_config = self.__call__()
except Error:
current_config = {}
else:
if not current_config:
current_config = {}
self.override(merge_dicts(current_config, options))
def from_env(self, name, default=UNDEFINED, required=UNDEFINED):
"""Load configuration value from the environment variable.
:param name: Name of the environment variable.
:type name: str
:param default: Default value that is used if environment variable does not exist.
:type default: object
:param required: When required is True, raise an exception if environment variable is undefined.
:type required: bool
:rtype: None
"""
value = os.environ.get(name, default)
if value is UNDEFINED:
if required is not False \
and (self._is_strict_mode_enabled() or required is True):
raise ValueError('Environment variable "{0}" is undefined'.format(name))
value = None
self.override(value)
def _is_strict_mode_enabled(self):
cdef Configuration root
root = self.__root_ref()
if not root:
return False
return root.__strict
cdef class TypedConfigurationOption(Callable):
@property
def option(self):
return self.args[0]
cdef class Configuration(Object):
"""Configuration provider provides configuration options to the other providers.
.. code-block:: python
config = Configuration('config')
print(config.section1.option1()) # None
print(config.section1.option2()) # None
config.from_dict(
{
'section1': {
'option1': 1,
'option2': 2,
},
},
)
print(config.section1.option1()) # 1
print(config.section1.option2()) # 2
"""
DEFAULT_NAME = 'config'
def __init__(self, name=DEFAULT_NAME, default=None, strict=False):
self.__name = name
self.__strict = strict
value = {}
if default is not None:
assert isinstance(default, dict), default
value = default.copy()
self.__children = {}
super().__init__(value)
def __deepcopy__(self, memo):
cdef Configuration copied
copied = memo.get(id(self))
if copied is not None:
return copied
copied = self.__class__(self.__name, self.__provides, self.__strict)
memo[id(self)] = copied
copied.__children = deepcopy(self.__children, memo)
self._copy_overridings(copied, memo)
return copied
def __str__(self):
return represent_provider(provider=self, provides=self.__name)
def __getattr__(self, item):
if item.startswith('__') and item.endswith('__'):
raise AttributeError(
'\'{cls}\' object has no attribute '
'\'{attribute_name}\''.format(cls=self.__class__.__name__,
attribute_name=item))
child = self.__children.get(item)
if child is None:
child = ConfigurationOption((item,), self)
self.__children[item] = child
return child
def __getitem__(self, item):
child = self.__children.get(item)
if child is None:
child = ConfigurationOption(item, self)
self.__children[item] = child
return child
def get_name(self):
return self.__name
def get(self, selector, required=False):
"""Return configuration option.
:param selector: Selector string, e.g. "option1.option2"
:type selector: str
:param required: Required flag, raise error if required option is missing
:type required: bool
:return: Option value.
:rtype: Any
"""
value = self.__call__()
if value is None:
if self._is_strict_mode_enabled() or required:
raise Error('Undefined configuration option "{0}.{1}"'.format(self.__name, selector))
return None
keys = selector.split('.')
while len(keys) > 0:
key = keys.pop(0)
value = value.get(key, UNDEFINED)
if value is UNDEFINED:
if self._is_strict_mode_enabled() or required:
raise Error('Undefined configuration option "{0}.{1}"'.format(self.__name, selector))
return None
return value
def set(self, selector, value):
"""Override configuration option.
:param selector: Selector string, e.g. "option1.option2"
:type selector: str
:param value: Overriding value
:type value: Any
:return: Overriding context.
:rtype: :py:class:`OverridingContext`
"""
original_value = current_value = deepcopy(self.__call__())
keys = selector.split('.')
while len(keys) > 0:
key = keys.pop(0)
if len(keys) == 0:
current_value[key] = value
break
temp_value = current_value.get(key, {})
current_value[key] = temp_value
current_value = temp_value
return self.override(original_value)
def override(self, provider):
"""Override provider with another provider.
:param provider: Overriding provider.
:type provider: :py:class:`Provider`
:raise: :py:exc:`dependency_injector.errors.Error`
:return: Overriding context.
:rtype: :py:class:`OverridingContext`
"""
context = super().override(provider)
self.reset_cache()
return context
def reset_last_overriding(self):
"""Reset last overriding provider.
:raise: :py:exc:`dependency_injector.errors.Error` if provider is not
overridden.
:rtype: None
"""
super().reset_last_overriding()
self.reset_cache()
def reset_override(self):
"""Reset all overriding providers.
:rtype: None
"""
super().reset_override()
self.reset_cache()
def reset_cache(self):
"""Reset children providers cache.
:rtype: None
"""
for child in self.__children.values():
child.reset_cache()
def update(self, value):
"""Set configuration options.
.. deprecated:: 3.11
Use :py:meth:`Configuration.override` instead.
:param value: Value of configuration option.
:type value: object | dict
:rtype: None
"""
self.override(value)
def from_ini(self, filepath, required=UNDEFINED):
"""Load configuration from the ini file.
Loaded configuration is merged recursively over existing configuration.
:param filepath: Path to the configuration file.
:type filepath: str
:param required: When required is True, raise an exception if file does not exist.
:type required: bool
:rtype: None
"""
try:
parser = _parse_ini_file(filepath)
except IOError as exception:
if required is not False \
and (self._is_strict_mode_enabled() or required is True) \
and exception.errno in (errno.ENOENT, errno.EISDIR):
exception.strerror = 'Unable to load configuration file {0}'.format(exception.strerror)
raise
return
config = {}
for section in parser.sections():
config[section] = dict(parser.items(section))
current_config = self.__call__()
if not current_config:
current_config = {}
self.override(merge_dicts(current_config, config))
def from_yaml(self, filepath, required=UNDEFINED, loader=None):
"""Load configuration from the yaml file.
Loaded configuration is merged recursively over existing configuration.
:param filepath: Path to the configuration file.
:type filepath: str
:param required: When required is True, raise an exception if file does not exist.
:type required: bool
:param loader: YAML loader, :py:class:`YamlLoader` is used if not specified.
:type loader: ``yaml.Loader``
:rtype: None
"""
if yaml is None:
raise Error(
'Unable to load yaml configuration - PyYAML is not installed. '
'Install PyYAML or install Dependency Injector with yaml extras: '
'"pip install dependency-injector[yaml]"'
)
if loader is None:
loader = YamlLoader
try:
with open(filepath) as opened_file:
config = yaml.load(opened_file, loader)
except IOError as exception:
if required is not False \
and (self._is_strict_mode_enabled() or required is True) \
and exception.errno in (errno.ENOENT, errno.EISDIR):
exception.strerror = 'Unable to load configuration file {0}'.format(exception.strerror)
raise
return
current_config = self.__call__()
if not current_config:
current_config = {}
self.override(merge_dicts(current_config, config))
def from_dict(self, options, required=UNDEFINED):
"""Load configuration from the dictionary.
Loaded configuration is merged recursively over existing configuration.
:param options: Configuration options.
:type options: dict
:param required: When required is True, raise an exception if dictionary is empty.
:type required: bool
:rtype: None
"""
if required is not False \
and (self._is_strict_mode_enabled() or required is True) \
and not options:
raise ValueError('Can not use empty dictionary')
current_config = self.__call__()
if not current_config:
current_config = {}
self.override(merge_dicts(current_config, options))
def from_env(self, name, default=UNDEFINED, required=UNDEFINED):
"""Load configuration value from the environment variable.
:param name: Name of the environment variable.
:type name: str
:param default: Default value that is used if environment variable does not exist.
:type default: object
:param required: When required is True, raise an exception if environment variable is undefined.
:type required: bool
:rtype: None
"""
value = os.environ.get(name, default)
if value is UNDEFINED:
if required is not False \
and (self._is_strict_mode_enabled() or required is True):
raise ValueError('Environment variable "{0}" is undefined'.format(name))
value = None
self.override(value)
def _is_strict_mode_enabled(self):
return self.__strict
cdef class Factory(Provider):
r"""Factory provider creates new instance on every call.
:py:class:`Factory` supports positional & keyword argument injections,
as well as attribute injections.
Positional and keyword argument injections could be defined like this:
.. code-block:: python
factory = Factory(SomeClass,
'positional_arg1', 'positional_arg2',
keyword_argument1=3, keyword_argument=4)
# or
factory = Factory(SomeClass) \
.add_args('positional_arg1', 'positional_arg2') \
.add_kwargs(keyword_argument1=3, keyword_argument=4)
# or
factory = Factory(SomeClass)
factory.add_args('positional_arg1', 'positional_arg2')
factory.add_kwargs(keyword_argument1=3, keyword_argument=4)
Attribute injections are defined by using
:py:meth:`Factory.add_attributes`:
.. code-block:: python
factory = Factory(SomeClass) \
.add_attributes(attribute1=1, attribute2=2)
Retrieving of provided instance can be performed via calling
:py:class:`Factory` object:
.. code-block:: python
factory = Factory(SomeClass)
some_object = factory()
.. py:attribute:: provided_type
If provided type is defined, provider checks that providing class is
its subclass.
:type: type | None
"""
provided_type = None
def __init__(self, provides, *args, **kwargs):
"""Initializer.
:param provides: Provided type.
:type provides: type
"""
if (self.__class__.provided_type and
not issubclass(provides, self.__class__.provided_type)):
raise Error('{0} can provide only {1} instances'.format(
self.__class__, self.__class__.provided_type))
self.__instantiator = Callable(provides, *args, **kwargs)
self.__attributes = tuple()
self.__attributes_len = 0
super(Factory, self).__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
cls = self.cls
if isinstance(cls, Provider):
cls = deepcopy(cls, memo)
copied = self.__class__(cls,
*deepcopy(self.args, memo),
**deepcopy(self.kwargs, memo))
copied.set_attributes(**deepcopy(self.attributes, memo))
self._copy_overridings(copied, memo)
return copied
def __str__(self):
"""Return string representation of provider.
:rtype: str
"""
return represent_provider(provider=self,
provides=self.__instantiator.provides)
@property
def cls(self):
"""Return provided type."""
return self.provides
@property
def provides(self):
"""Return provided type."""
return self.__instantiator.provides
@property
def args(self):
"""Return positional argument injections."""
return self.__instantiator.args
def add_args(self, *args):
"""Add __init__ positional argument injections.
:return: Reference ``self``
"""
self.__instantiator.add_args(*args)
return self
def set_args(self, *args):
"""Set __init__ positional argument injections.
Existing __init__ positional argument injections are dropped.
:return: Reference ``self``
"""
self.__instantiator.set_args(*args)
return self
def clear_args(self):
"""Drop __init__ positional argument injections.
:return: Reference ``self``
"""
self.__instantiator.clear_args()
return self
@property
def kwargs(self):
"""Return keyword argument injections."""
return self.__instantiator.kwargs
def add_kwargs(self, **kwargs):
"""Add __init__ keyword argument injections.
:return: Reference ``self``
"""
self.__instantiator.add_kwargs(**kwargs)
return self
def set_kwargs(self, **kwargs):
"""Set __init__ keyword argument injections.
Existing __init__ keyword argument injections are dropped.
:return: Reference ``self``
"""
self.__instantiator.set_kwargs(**kwargs)
return self
def clear_kwargs(self):
"""Drop __init__ keyword argument injections.
:return: Reference ``self``
"""
self.__instantiator.clear_kwargs()
return self
@property
def attributes(self):
"""Return attribute injections."""
cdef int index
cdef NamedInjection attribute
cdef dict attributes
attributes = dict()
for index in range(self.__attributes_len):
attribute = self.__attributes[index]
attributes[attribute.__name] = attribute.__value
return attributes
def add_attributes(self, **kwargs):
"""Add attribute injections.
:return: Reference ``self``
"""
self.__attributes += parse_named_injections(kwargs)
self.__attributes_len = len(self.__attributes)
return self
def set_attributes(self, **kwargs):
"""Set attribute injections.
Existing attribute injections are dropped.
:return: Reference ``self``
"""
self.__attributes = parse_named_injections(kwargs)
self.__attributes_len = len(self.__attributes)
return self
def clear_attributes(self):
"""Drop attribute injections.
:return: Reference ``self``
"""
self.__attributes = tuple()
self.__attributes_len = len(self.__attributes)
return self
cpdef object _provide(self, tuple args, dict kwargs):
"""Return new instance."""
return __factory_call(self, args, kwargs)
cdef class DelegatedFactory(Factory):
"""Factory that is injected "as is".
.. py:attribute:: provided_type
If provided type is defined, provider checks that providing class is
its subclass.
:type: type | None
.. py:attribute:: cls
:noindex:
Class that provides object.
Alias for :py:attr:`provides`.
:type: type
"""
__IS_DELEGATED__ = True
cdef class AbstractFactory(Factory):
"""Abstract factory provider.
:py:class:`AbstractFactory` is a :py:class:`Factory` provider that must
be explicitly overridden before calling.
Overriding of :py:class:`AbstractFactory` is possible only by another
:py:class:`Factory` provider.
"""
def __call__(self, *args, **kwargs):
"""Return provided object.
Callable interface implementation.
"""
if self.__last_overriding is None:
raise Error('{0} must be overridden before calling'.format(self))
return super().__call__(*args, **kwargs)
def override(self, provider):
"""Override provider with another provider.
:param provider: Overriding provider.
:type provider: :py:class:`Provider`
:raise: :py:exc:`dependency_injector.errors.Error`
:return: Overriding context.
:rtype: :py:class:`OverridingContext`
"""
if not isinstance(provider, Factory):
raise Error('{0} must be overridden only by '
'{1} providers'.format(self, Factory))
return super(AbstractFactory, self).override(provider)
cpdef object _provide(self, tuple args, dict kwargs):
"""Return result of provided callable's call."""
raise NotImplementedError('Abstract provider forward providing logic '
'to overriding provider')
cdef class FactoryDelegate(Delegate):
"""Factory delegate injects delegating factory "as is".
.. py:attribute:: provides
Value that have to be provided.
:type: object
"""
def __init__(self, factory):
"""Initializer.
:param factory: Value that have to be provided.
:type factory: object
"""
if isinstance(factory, Factory) is False:
raise Error('{0} can wrap only {1} providers'.format(
self.__class__, Factory))
super(FactoryDelegate, self).__init__(factory)
cdef class FactoryAggregate(Provider):
"""Factory providers aggregate.
:py:class:`FactoryAggregate` is an aggregate of :py:class:`Factory`
providers.
:py:class:`FactoryAggregate` is a delegated provider, meaning that it is
injected "as is".
All aggregated factories could be retrieved as a read-only
dictionary :py:attr:`FactoryAggregate.factories` or just as an attribute of
:py:class:`FactoryAggregate`.
"""
__IS_DELEGATED__ = True
def __init__(self, **factories):
"""Initializer.
:param factories: Dictionary of aggregate factories.
:type factories: dict[str, :py:class:`Factory`]
"""
for factory in factories.values():
if isinstance(factory, Factory) is False:
raise Error(
'{0} can aggregate only instances of {1}, given - {2}'
.format(self.__class__, Factory, factory))
self.__factories = factories
super(FactoryAggregate, self).__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
cdef FactoryAggregate copied
copied = memo.get(id(self))
if copied is not None:
return copied
copied = self.__class__()
copied.__factories = deepcopy(self.__factories, memo)
self._copy_overridings(copied, memo)
return copied
def __getattr__(self, factory_name):
"""Return aggregated factory."""
return self.__get_factory(factory_name)
def __str__(self):
"""Return string representation of provider.
:rtype: str
"""
return represent_provider(provider=self, provides=self.factories)
@property
def factories(self):
"""Return dictionary of factories, read-only."""
return self.__factories
def override(self, _):
"""Override provider with another provider.
:raise: :py:exc:`dependency_injector.errors.Error`
:return: Overriding context.
:rtype: :py:class:`OverridingContext`
"""
raise Error(
'{0} providers could not be overridden'.format(self.__class__))
cpdef object _provide(self, tuple args, dict kwargs):
try:
factory_name = args[0]
except IndexError:
try:
factory_name = kwargs.pop('factory_name')
except KeyError:
raise TypeError('Factory missing 1 required positional argument: \'factory_name\'')
else:
args = args[1:]
return self.__get_factory(factory_name)(*args, **kwargs)
cdef Factory __get_factory(self, str factory_name):
if factory_name not in self.__factories:
raise NoSuchProviderError(
'{0} does not contain factory with name {1}'.format(
self, factory_name))
return <Factory> self.__factories[factory_name]
cdef class BaseSingleton(Provider):
"""Base class of singleton providers."""
provided_type = None
def __init__(self, provides, *args, **kwargs):
"""Initializer.
:param provides: Provided type.
:type provides: type
"""
if (self.__class__.provided_type and
not issubclass(provides, self.__class__.provided_type)):
raise Error('{0} can provide only {1} instances'.format(
self.__class__, self.__class__.provided_type))
self.__instantiator = Factory(provides, *args, **kwargs)
super(BaseSingleton, self).__init__()
def __str__(self):
"""Return string representation of provider.
:rtype: str
"""
return represent_provider(provider=self,
provides=self.__instantiator.cls)
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
cls = self.cls
if isinstance(cls, Provider):
cls = deepcopy(cls, memo)
copied = self.__class__(cls,
*deepcopy(self.args, memo),
**deepcopy(self.kwargs, memo))
copied.set_attributes(**deepcopy(self.attributes, memo))
self._copy_overridings(copied, memo)
return copied
@property
def cls(self):
"""Return provided type."""
return self.__instantiator.cls
@property
def args(self):
"""Return positional argument injections."""
return self.__instantiator.args
def add_args(self, *args):
"""Add __init__ positional argument injections.
:return: Reference ``self``
"""
self.__instantiator.add_args(*args)
return self
def set_args(self, *args):
"""Set __init__ positional argument injections.
Existing __init__ positional argument injections are dropped.
:return: Reference ``self``
"""
self.__instantiator.set_args(*args)
return self
def clear_args(self):
"""Drop __init__ positional argument injections.
:return: Reference ``self``
"""
self.__instantiator.clear_args()
return self
@property
def kwargs(self):
"""Return keyword argument injections."""
return self.__instantiator.kwargs
def add_kwargs(self, **kwargs):
"""Add __init__ keyword argument injections.
:return: Reference ``self``
"""
self.__instantiator.add_kwargs(**kwargs)
return self
def set_kwargs(self, **kwargs):
"""Set __init__ keyword argument injections.
Existing __init__ keyword argument injections are dropped.
:return: Reference ``self``
"""
self.__instantiator.set_kwargs(**kwargs)
return self
def clear_kwargs(self):
"""Drop __init__ keyword argument injections.
:return: Reference ``self``
"""
self.__instantiator.clear_kwargs()
return self
@property
def attributes(self):
"""Return attribute injections."""
return self.__instantiator.attributes
def add_attributes(self, **kwargs):
"""Add attribute injections.
:return: Reference ``self``
"""
self.__instantiator.add_attributes(**kwargs)
return self
def set_attributes(self, **kwargs):
"""Set attribute injections.
Existing attribute injections are dropped.
:return: Reference ``self``
"""
self.__instantiator.set_attributes(**kwargs)
return self
def clear_attributes(self):
"""Drop attribute injections.
:return: Reference ``self``
"""
self.__instantiator.clear_attributes()
return self
def reset(self):
"""Reset cached instance, if any.
:rtype: None
"""
raise NotImplementedError()
def _async_init_instance(self, future_result, result):
try:
instance = result.result()
except Exception as exception:
self.__storage = None
future_result.set_exception(exception)
else:
self.__storage = instance
future_result.set_result(instance)
cdef class Singleton(BaseSingleton):
"""Singleton provider returns same instance on every call.
:py:class:`Singleton` provider creates instance once and returns it on
every call. :py:class:`Singleton` extends :py:class:`Factory`, so, please
follow :py:class:`Factory` documentation for getting familiar with
injections syntax.
Retrieving of provided instance can be performed via calling
:py:class:`Singleton` object:
.. code-block:: python
singleton = Singleton(SomeClass)
some_object = singleton()
.. py:attribute:: provided_type
If provided type is defined, provider checks that providing class is
its subclass.
:type: type | None
.. py:attribute:: cls
:noindex:
Class that provides object.
Alias for :py:attr:`provides`.
:type: type
"""
def __init__(self, provides, *args, **kwargs):
"""Initializer.
:param provides: Provided type.
:type provides: type
"""
self.__storage = None
super(Singleton, self).__init__(provides, *args, **kwargs)
def reset(self):
"""Reset cached instance, if any.
:rtype: None
"""
if __isawaitable(self.__storage):
asyncio.ensure_future(self.__storage).cancel()
self.__storage = None
cpdef object _provide(self, tuple args, dict kwargs):
"""Return single instance."""
if self.__storage is None:
instance = __factory_call(self.__instantiator, args, kwargs)
if __isawaitable(instance):
future_result = asyncio.Future()
instance = asyncio.ensure_future(instance)
instance.add_done_callback(functools.partial(self._async_init_instance, future_result))
self.__storage = future_result
return future_result
self.__storage = instance
return self.__storage
cdef class DelegatedSingleton(Singleton):
"""Delegated singleton is injected "as is".
.. py:attribute:: provided_type
If provided type is defined, provider checks that providing class is
its subclass.
:type: type | None
.. py:attribute:: cls
:noindex:
Class that provides object.
Alias for :py:attr:`provides`.
:type: type
"""
__IS_DELEGATED__ = True
cdef class ThreadSafeSingleton(BaseSingleton):
"""Thread-safe singleton provider."""
storage_lock = threading.RLock()
"""Storage reentrant lock.
:type: :py:class:`threading.RLock`
"""
def __init__(self, provides, *args, **kwargs):
"""Initializer.
:param provides: Provided type.
:type provides: type
"""
self.__storage = None
self.__storage_lock = self.__class__.storage_lock
super(ThreadSafeSingleton, self).__init__(provides, *args, **kwargs)
def reset(self):
"""Reset cached instance, if any.
:rtype: None
"""
with self.__storage_lock:
if __isawaitable(self.__storage):
asyncio.ensure_future(self.__storage).cancel()
self.__storage = None
cpdef object _provide(self, tuple args, dict kwargs):
"""Return single instance."""
instance = self.__storage
if instance is None:
with self.__storage_lock:
if self.__storage is None:
instance = __factory_call(self.__instantiator, args, kwargs)
if __isawaitable(instance):
future_result = asyncio.Future()
instance = asyncio.ensure_future(instance)
instance.add_done_callback(functools.partial(self._async_init_instance, future_result))
self.__storage = future_result
return future_result
self.__storage = instance
return instance
cdef class DelegatedThreadSafeSingleton(ThreadSafeSingleton):
"""Delegated thread-safe singleton is injected "as is".
.. py:attribute:: provided_type
If provided type is defined, provider checks that providing class is
its subclass.
:type: type | None
.. py:attribute:: cls
:noindex:
Class that provides object.
Alias for :py:attr:`provides`.
:type: type
"""
__IS_DELEGATED__ = True
cdef class ThreadLocalSingleton(BaseSingleton):
"""Thread-local singleton provides single objects in scope of thread.
.. py:attribute:: provided_type
If provided type is defined, provider checks that providing class is
its subclass.
:type: type | None
.. py:attribute:: cls
:noindex:
Class that provides object.
Alias for :py:attr:`provides`.
:type: type
"""
def __init__(self, provides, *args, **kwargs):
"""Initializer.
:param provides: Provided type.
:type provides: type
"""
self.__storage = threading.local()
super(ThreadLocalSingleton, self).__init__(provides, *args, **kwargs)
def reset(self):
"""Reset cached instance, if any.
:rtype: None
"""
if __isawaitable(self.__storage.instance):
asyncio.ensure_future(self.__storage.instance).cancel()
del self.__storage.instance
cpdef object _provide(self, tuple args, dict kwargs):
"""Return single instance."""
cdef object instance
try:
instance = self.__storage.instance
except AttributeError:
instance = __factory_call(self.__instantiator, args, kwargs)
if __isawaitable(instance):
future_result = asyncio.Future()
instance = asyncio.ensure_future(instance)
instance.add_done_callback(functools.partial(self._async_init_instance, future_result))
self.__storage.instance = future_result
return future_result
self.__storage.instance = instance
finally:
return instance
def _async_init_instance(self, future_result, result):
try:
instance = result.result()
except Exception as exception:
del self.__storage.instance
future_result.set_exception(exception)
else:
self.__storage.instance = instance
future_result.set_result(instance)
cdef class DelegatedThreadLocalSingleton(ThreadLocalSingleton):
"""Delegated thread-local singleton is injected "as is".
.. py:attribute:: provided_type
If provided type is defined, provider checks that providing class is
its subclass.
:type: type | None
.. py:attribute:: cls
:noindex:
Class that provides object.
Alias for :py:attr:`provides`.
:type: type
"""
__IS_DELEGATED__ = True
cdef class AbstractSingleton(BaseSingleton):
"""Abstract singleton provider.
:py:class:`AbstractSingleton` is a :py:class:`Singleton` provider that must
be explicitly overridden before calling.
Overriding of :py:class:`AbstractSingleton` is possible only by another
:py:class:`BaseSingleton` provider.
"""
def __call__(self, *args, **kwargs):
"""Return provided object.
Callable interface implementation.
"""
if self.__last_overriding is None:
raise Error('{0} must be overridden before calling'.format(self))
return super().__call__(*args, **kwargs)
def override(self, provider):
"""Override provider with another provider.
:param provider: Overriding provider.
:type provider: :py:class:`Provider`
:raise: :py:exc:`dependency_injector.errors.Error`
:return: Overriding context.
:rtype: :py:class:`OverridingContext`
"""
if not isinstance(provider, BaseSingleton):
raise Error('{0} must be overridden only by '
'{1} providers'.format(self, BaseSingleton))
return super(AbstractSingleton, self).override(provider)
def reset(self):
"""Reset cached instance, if any.
:rtype: None
"""
if self.__last_overriding is None:
raise Error('{0} must be overridden before calling'.format(self))
return self.__last_overriding.reset()
cdef class SingletonDelegate(Delegate):
"""Singleton delegate injects delegating singleton "as is".
.. py:attribute:: provides
Value that have to be provided.
:type: object
"""
def __init__(self, singleton):
"""Initializer.
:param singleton: Value that have to be provided.
:type singleton: py:class:`BaseSingleton`
"""
if isinstance(singleton, BaseSingleton) is False:
raise Error('{0} can wrap only {1} providers'.format(
self.__class__, BaseSingleton))
super(SingletonDelegate, self).__init__(singleton)
cdef class List(Provider):
"""List provider provides a list of values.
:py:class:`List` provider is needed for injecting a list of dependencies. It handles
positional argument injections the same way as :py:class:`Factory` provider.
Keyword argument injections are not supported.
.. code-block:: python
dispatcher_factory = Factory(
Dispatcher,
modules=List(
Factory(ModuleA, dependency_a),
Factory(ModuleB, dependency_b),
),
)
dispatcher = dispatcher_factory()
# is equivalent to:
dispatcher = Dispatcher(
modules=[
ModuleA(dependency_a),
ModuleB(dependency_b),
],
)
"""
def __init__(self, *args):
"""Initializer."""
self.__args = tuple()
self.__args_len = 0
self.set_args(*args)
super(List, self).__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
copied = self.__class__(*deepcopy(self.args, memo))
self._copy_overridings(copied, memo)
return copied
def __str__(self):
"""Return string representation of provider.
:rtype: str
"""
return represent_provider(provider=self, provides=list(self.args))
@property
def args(self):
"""Return positional argument injections."""
cdef int index
cdef PositionalInjection arg
cdef list args
args = list()
for index in range(self.__args_len):
arg = self.__args[index]
args.append(arg.__value)
return tuple(args)
def add_args(self, *args):
"""Add positional argument injections.
:return: Reference ``self``
"""
self.__args += parse_positional_injections(args)
self.__args_len = len(self.__args)
return self
def set_args(self, *args):
"""Set positional argument injections.
Existing positional argument injections are dropped.
:return: Reference ``self``
"""
self.__args = parse_positional_injections(args)
self.__args_len = len(self.__args)
return self
def clear_args(self):
"""Drop positional argument injections.
:return: Reference ``self``
"""
self.__args = tuple()
self.__args_len = len(self.__args)
return self
cpdef object _provide(self, tuple args, dict kwargs):
"""Return result of provided callable's call."""
return list(__provide_positional_args(args, self.__args, self.__args_len))
cdef class Dict(Provider):
"""Dict provider provides a dictionary of values.
:py:class:`Dict` provider is needed for injecting a dictionary of dependencies. It handles
keyword argument injections the same way as :py:class:`Factory` provider.
Positional argument injections are not supported.
.. code-block:: python
dispatcher_factory = Factory(
Dispatcher,
modules=Dict(
module1=Factory(ModuleA, dependency_a),
module2=Factory(ModuleB, dependency_b),
),
)
dispatcher = dispatcher_factory()
# is equivalent to:
dispatcher = Dispatcher(
modules={
'module1': ModuleA(dependency_a),
'module2': ModuleB(dependency_b),
},
)
"""
def __init__(self, dict_=None, **kwargs):
"""Initializer."""
self.__kwargs = tuple()
self.__kwargs_len = 0
self.add_kwargs(dict_, **kwargs)
super(Dict, self).__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
copied = self.__class__(deepcopy(self.kwargs, memo))
self._copy_overridings(copied, memo)
return copied
def __str__(self):
"""Return string representation of provider.
:rtype: str
"""
return represent_provider(provider=self, provides=self.kwargs)
@property
def kwargs(self):
"""Return keyword argument injections."""
cdef int index
cdef NamedInjection kwarg
cdef dict kwargs
kwargs = dict()
for index in range(self.__kwargs_len):
kwarg = self.__kwargs[index]
kwargs[kwarg.__name] = kwarg.__value
return kwargs
def add_kwargs(self, dict_=None, **kwargs):
"""Add keyword argument injections.
:return: Reference ``self``
"""
if dict_ is None:
dict_ = {}
self.__kwargs += parse_named_injections(dict_)
self.__kwargs += parse_named_injections(kwargs)
self.__kwargs_len = len(self.__kwargs)
return self
def set_kwargs(self, dict_=None, **kwargs):
"""Set keyword argument injections.
Existing keyword argument injections are dropped.
:return: Reference ``self``
"""
if dict_ is None:
dict_ = {}
self.__kwargs = parse_named_injections(dict_)
self.__kwargs += parse_named_injections(kwargs)
self.__kwargs_len = len(self.__kwargs)
return self
def clear_kwargs(self):
"""Drop keyword argument injections.
:return: Reference ``self``
"""
self.__kwargs = tuple()
self.__kwargs_len = len(self.__kwargs)
return self
cpdef object _provide(self, tuple args, dict kwargs):
"""Return result of provided callable's call."""
return __provide_keyword_args(kwargs, self.__kwargs, self.__kwargs_len)
cdef class Resource(Provider):
"""Resource provider provides a component with initialization and shutdown."""
def __init__(self, initializer, *args, **kwargs):
self.__initializer = initializer
self.__initialized = False
self.__resource = None
self.__shutdowner = None
self.__args = tuple()
self.__args_len = 0
self.set_args(*args)
self.__kwargs = tuple()
self.__kwargs_len = 0
self.set_kwargs(**kwargs)
super().__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
if self.__initialized:
raise Error('Can not copy initialized resource')
copied = self.__class__(
self.__initializer,
*deepcopy(self.args, memo),
**deepcopy(self.kwargs, memo),
)
self._copy_overridings(copied, memo)
return copied
def __repr__(self):
return (
f'{self.__class__.__name__}({self.__initializer}, '
f'initialized={self.__initialized})'
)
@property
def args(self):
"""Return positional argument injections."""
cdef int index
cdef PositionalInjection arg
cdef list args
args = list()
for index in range(self.__args_len):
arg = self.__args[index]
args.append(arg.__value)
return tuple(args)
def add_args(self, *args):
"""Add positional argument injections.
:return: Reference ``self``
"""
self.__args += parse_positional_injections(args)
self.__args_len = len(self.__args)
return self
def set_args(self, *args):
"""Set positional argument injections.
Existing positional argument injections are dropped.
:return: Reference ``self``
"""
self.__args = parse_positional_injections(args)
self.__args_len = len(self.__args)
return self
def clear_args(self):
"""Drop positional argument injections.
:return: Reference ``self``
"""
self.__args = tuple()
self.__args_len = len(self.__args)
return self
@property
def kwargs(self):
"""Return keyword argument injections."""
cdef int index
cdef NamedInjection kwarg
cdef dict kwargs
kwargs = dict()
for index in range(self.__kwargs_len):
kwarg = self.__kwargs[index]
kwargs[kwarg.__name] = kwarg.__value
return kwargs
def add_kwargs(self, **kwargs):
"""Add keyword argument injections.
:return: Reference ``self``
"""
self.__kwargs += parse_named_injections(kwargs)
self.__kwargs_len = len(self.__kwargs)
return self
def set_kwargs(self, **kwargs):
"""Set keyword argument injections.
Existing keyword argument injections are dropped.
:return: Reference ``self``
"""
self.__kwargs = parse_named_injections(kwargs)
self.__kwargs_len = len(self.__kwargs)
return self
def clear_kwargs(self):
"""Drop keyword argument injections.
:return: Reference ``self``
"""
self.__kwargs = tuple()
self.__kwargs_len = len(self.__kwargs)
return self
@property
def initialized(self):
"""Check if resource is initialized."""
return self.__initialized
def init(self):
"""Initialize resource."""
return self.__call__()
def shutdown(self):
"""Shutdown resource."""
if not self.__initialized:
if self.is_async_mode_enabled():
result = asyncio.Future()
result.set_result(None)
return result
return
if self.__shutdowner:
try:
shutdown = self.__shutdowner(self.__resource)
except StopIteration:
pass
else:
if inspect.isawaitable(shutdown):
return self._create_shutdown_future(shutdown)
self.__resource = None
self.__initialized = False
self.__shutdowner = None
if self.is_async_mode_enabled():
result = asyncio.Future()
result.set_result(None)
return result
cpdef object _provide(self, tuple args, dict kwargs):
if self.__initialized:
return self.__resource
if self._is_resource_subclass(self.__initializer):
initializer = self.__initializer()
self.__resource = __call(
initializer.init,
args,
self.__args,
self.__args_len,
kwargs,
self.__kwargs,
self.__kwargs_len,
)
self.__shutdowner = initializer.shutdown
elif self._is_async_resource_subclass(self.__initializer):
initializer = self.__initializer()
async_init = __call(
initializer.init,
args,
self.__args,
self.__args_len,
kwargs,
self.__kwargs,
self.__kwargs_len,
)
self.__initialized = True
return self._create_init_future(async_init, initializer.shutdown)
elif inspect.isgeneratorfunction(self.__initializer):
initializer = __call(
self.__initializer,
args,
self.__args,
self.__args_len,
kwargs,
self.__kwargs,
self.__kwargs_len,
)
self.__resource = next(initializer)
self.__shutdowner = initializer.send
elif iscoroutinefunction(self.__initializer):
initializer = __call(
self.__initializer,
args,
self.__args,
self.__args_len,
kwargs,
self.__kwargs,
self.__kwargs_len,
)
self.__initialized = True
return self._create_init_future(initializer)
elif isasyncgenfunction(self.__initializer):
initializer = __call(
self.__initializer,
args,
self.__args,
self.__args_len,
kwargs,
self.__kwargs,
self.__kwargs_len,
)
self.__initialized = True
return self._create_async_gen_init_future(initializer)
elif callable(self.__initializer):
self.__resource = __call(
self.__initializer,
args,
self.__args,
self.__args_len,
kwargs,
self.__kwargs,
self.__kwargs_len,
)
else:
raise Error('Unknown type of resource initializer')
self.__initialized = True
return self.__resource
def _create_init_future(self, future, shutdowner=None):
callback = self._async_init_callback
if shutdowner:
callback = functools.partial(callback, shutdowner=shutdowner)
future = asyncio.ensure_future(future)
future.add_done_callback(callback)
self.__resource = future
return future
def _create_async_gen_init_future(self, initializer):
if inspect.isasyncgen(initializer):
return self._create_init_future(initializer.__anext__(), initializer.asend)
future = asyncio.Future()
create_initializer = asyncio.ensure_future(initializer)
create_initializer.add_done_callback(functools.partial(self._async_create_gen_callback, future))
self.__resource = future
return future
def _async_init_callback(self, initializer, shutdowner=None):
try:
resource = initializer.result()
except Exception:
self.__initialized = False
raise
else:
self.__resource = resource
self.__shutdowner = shutdowner
def _async_create_gen_callback(self, future, initializer_future):
initializer = initializer_future.result()
init_future = self._create_init_future(initializer.__anext__(), initializer.asend)
init_future.add_done_callback(functools.partial(self._async_trigger_result, future))
def _async_trigger_result(self, future, future_result):
future.set_result(future_result.result())
def _create_shutdown_future(self, shutdown_future):
future = asyncio.Future()
shutdown_future = asyncio.ensure_future(shutdown_future)
shutdown_future.add_done_callback(functools.partial(self._async_shutdown_callback, future))
return future
def _async_shutdown_callback(self, future_result, shutdowner):
try:
shutdowner.result()
except StopAsyncIteration:
pass
self.__resource = None
self.__initialized = False
self.__shutdowner = None
future_result.set_result(None)
@staticmethod
def _is_resource_subclass(instance):
if sys.version_info < (3, 5):
return False
if not isinstance(instance, CLASS_TYPES):
return
from . import resources
return issubclass(instance, resources.Resource)
@staticmethod
def _is_async_resource_subclass(instance):
if sys.version_info < (3, 5):
return False
if not isinstance(instance, CLASS_TYPES):
return
from . import resources
return issubclass(instance, resources.AsyncResource)
cdef class Container(Provider):
"""Container provider provides an instance of declarative container.
.. warning::
Provider is experimental. Its interface may change.
"""
def __init__(self, container_cls, container=None, **overriding_providers):
"""Initialize provider."""
self.__container_cls = container_cls
self.__overriding_providers = overriding_providers
if container is None:
container = container_cls()
self.__container = container
super(Container, self).__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
copied = self.__class__(
self.__container_cls,
deepcopy(self.__container, memo),
**deepcopy(self.__overriding_providers, memo),
)
return copied
def __getattr__(self, name):
"""Return dependency provider."""
if name.startswith('__') and name.endswith('__'):
raise AttributeError(
'\'{cls}\' object has no attribute '
'\'{attribute_name}\''.format(cls=self.__class__.__name__,
attribute_name=name))
return getattr(self.__container, name)
@property
def providers(self):
return self.__container.providers
@property
def container(self):
return self.__container
def override(self, provider):
"""Override provider with another provider."""
if not hasattr(provider, 'providers'):
raise Error('Container provider {0} can be overridden only by providers container'.format(self))
self.__container.override_providers(**provider.providers)
super().override(provider)
def apply_overridings(self):
"""Apply container overriding.
This method should not be called directly. It is called on
declarative container initialization."""
self.__container.override_providers(**self.__overriding_providers)
cpdef object _provide(self, tuple args, dict kwargs):
"""Return single instance."""
return self.__container
cdef class Selector(Provider):
"""Selector provider selects provider based on the configuration value or other callable.
:py:class:`Selector` provider has a callable called ``selector`` and a dictionary of providers.
The ``selector`` callable is provided as a first positional argument. It can be
:py:class:`Configuration` provider or any other callable. It has to return a string value.
That value is used as a key for selecting the provider from the dictionary of providers.
The providers are provided as keyword arguments. Argument name is used as a key for
selecting the provider.
.. code-block:: python
config = Configuration()
selector = Selector(
config.one_or_another,
one=providers.Factory(SomeClass),
another=providers.Factory(SomeOtherClass),
)
config.override({'one_or_another': 'one'})
instance_1 = selector()
assert isinstance(instance_1, SomeClass)
config.override({'one_or_another': 'another'})
instance_2 = selector()
assert isinstance(instance_2, SomeOtherClass)
"""
def __init__(self, selector, **providers):
"""Initialize provider."""
self.__selector = selector
self.__providers = providers
super(Selector, self).__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
copied = self.__class__(
deepcopy(self.__selector, memo),
**deepcopy(self.__providers, memo),
)
self._copy_overridings(copied, memo)
return copied
def __getattr__(self, name):
"""Return provider."""
if name.startswith('__') and name.endswith('__'):
raise AttributeError(
'\'{cls}\' object has no attribute '
'\'{attribute_name}\''.format(cls=self.__class__.__name__,
attribute_name=name))
if name not in self.__providers:
raise AttributeError('Selector has no "{0}" provider'.format(name))
return self.__providers[name]
def __str__(self):
"""Return string representation of provider.
:rtype: str
"""
return '<{provider}({selector}, {providers}) at {address}>'.format(
provider='.'.join(( self.__class__.__module__, self.__class__.__name__)),
selector=self.__selector,
providers=', '.join((
'{0}={1}'.format(name, provider)
for name, provider in self.__providers.items()
)),
address=hex(id(self)),
)
@property
def providers(self):
"""Return providers."""
return dict(self.__providers)
cpdef object _provide(self, tuple args, dict kwargs):
"""Return single instance."""
selector_value = self.__selector()
if selector_value is None:
raise Error('Selector value is undefined')
if selector_value not in self.__providers:
raise Error('Selector has no "{0}" provider'.format(selector_value))
return self.__providers[selector_value](*args, **kwargs)
cdef class ProvidedInstance(Provider):
"""Provider that helps to inject attributes and items of the injected instance.
You can use it like that:
.. code-block:: python
service = providers.Singleton(Service)
client_factory = providers.Factory(
Client,
value1=service.provided[0],
value2=service.provided.value,
value3=service.provided.values[0],
value4=service.provided.get_value.call(),
)
You should not create this provider directly. Get it from the ``.provided`` attribute of the
injected provider. This attribute returns the :py:class:`ProvidedInstance` for that provider.
Providers that have ``.provided`` attribute:
- :py:class:`Callable` and its subclasses
- :py:class:`Factory` and its subclasses
- :py:class:`Singleton` and its subclasses
- :py:class:`Object`
- :py:class:`List`
- :py:class:`Selector`
- :py:class:`Dependency`
"""
def __init__(self, provider):
self.__provider = provider
super().__init__()
def __repr__(self):
return f'{self.__class__.__name__}(\'{self.__provider}\')'
def __deepcopy__(self, memo=None):
cdef ProvidedInstance copied
copied = memo.get(id(self))
if copied is not None:
return copied
return self.__class__(
deepcopy(self.__provider, memo),
)
def __getattr__(self, item):
return AttributeGetter(self, item)
def __getitem__(self, item):
return ItemGetter(self, item)
@property
def provides(self):
"""Return provider."""
return self.__provider
def call(self, *args, **kwargs):
return MethodCaller(self, *args, **kwargs)
cpdef object _provide(self, tuple args, dict kwargs):
return self.__provider(*args, **kwargs)
cdef class AttributeGetter(Provider):
"""Provider that returns the attribute of the injected instance.
You should not create this provider directly. See :py:class:`ProvidedInstance` instead.
"""
def __init__(self, provider, attribute):
self.__provider = provider
self.__attribute = attribute
super().__init__()
def __repr__(self):
return f'{self.__class__.__name__}(\'{self.__attribute}\')'
def __deepcopy__(self, memo=None):
cdef AttributeGetter copied
copied = memo.get(id(self))
if copied is not None:
return copied
return self.__class__(
deepcopy(self.__provider, memo),
self.__attribute,
)
def __getattr__(self, item):
return AttributeGetter(self, item)
def __getitem__(self, item):
return ItemGetter(self, item)
@property
def provides(self):
"""Return provider."""
return self.__provider
@property
def name(self):
"""Return name of the attribute."""
return self.__attribute
def call(self, *args, **kwargs):
return MethodCaller(self, *args, **kwargs)
cpdef object _provide(self, tuple args, dict kwargs):
provided = self.__provider(*args, **kwargs)
if __isawaitable(provided):
future_result = asyncio.Future()
provided = asyncio.ensure_future(provided)
provided.add_done_callback(functools.partial(self._async_provide, future_result))
return future_result
return getattr(provided, self.__attribute)
def _async_provide(self, future_result, future):
provided = future.result()
result = getattr(provided, self.__attribute)
future_result.set_result(result)
cdef class ItemGetter(Provider):
"""Provider that returns the item of the injected instance.
You should not create this provider directly. See :py:class:`ProvidedInstance` instead.
"""
def __init__(self, Provider provider, object item):
self.__provider = provider
self.__item = item
super().__init__()
def __repr__(self):
return f'{self.__class__.__name__}(\'{self.__item}\')'
def __deepcopy__(self, memo=None):
cdef ItemGetter copied
copied = memo.get(id(self))
if copied is not None:
return copied
return self.__class__(
deepcopy(self.__provider, memo),
self.__item,
)
def __getattr__(self, item):
return AttributeGetter(self, item)
def __getitem__(self, item):
return ItemGetter(self, item)
@property
def provides(self):
"""Return provider."""
return self.__provider
@property
def name(self):
"""Return name of the item."""
return self.__item
def call(self, *args, **kwargs):
return MethodCaller(self, *args, **kwargs)
cpdef object _provide(self, tuple args, dict kwargs):
provided = self.__provider(*args, **kwargs)
if __isawaitable(provided):
future_result = asyncio.Future()
provided = asyncio.ensure_future(provided)
provided.add_done_callback(functools.partial(self._async_provide, future_result))
return future_result
return provided[self.__item]
def _async_provide(self, future_result, future):
provided = future.result()
result = provided[self.__item]
future_result.set_result(result)
cdef class MethodCaller(Provider):
"""Provider that calls the method of the injected instance.
You should not create this provider directly. See :py:class:`ProvidedInstance` instead.
"""
def __init__(self, provider, *args, **kwargs):
self.__provider = provider
self.__args = parse_positional_injections(args)
self.__args_len = len(self.__args)
self.__kwargs = parse_named_injections(kwargs)
self.__kwargs_len = len(self.__kwargs)
super().__init__()
def __repr__(self):
return f'{self.__class__.__name__}({self.__provider})'
def __deepcopy__(self, memo=None):
cdef MethodCaller copied
copied = memo.get(id(self))
if copied is not None:
return copied
copied = self.__class__(deepcopy(self.__provider, memo))
copied.__args = deepcopy(self.__args, memo)
copied.__args_len = self.__args_len
copied.__kwargs = deepcopy(self.__kwargs, memo)
copied.__kwargs_len = self.__kwargs_len
self._copy_overridings(copied, memo)
return copied
def __getattr__(self, item):
return AttributeGetter(self, item)
def __getitem__(self, item):
return ItemGetter(self, item)
@property
def provides(self):
"""Return provider."""
return self.__provider
@property
def args(self):
"""Return positional argument injections."""
cdef int index
cdef PositionalInjection arg
cdef list args
args = list()
for index in range(self.__args_len):
arg = self.__args[index]
args.append(arg.__value)
return tuple(args)
@property
def kwargs(self):
"""Return keyword argument injections."""
cdef int index
cdef NamedInjection kwarg
cdef dict kwargs
kwargs = dict()
for index in range(self.__kwargs_len):
kwarg = self.__kwargs[index]
kwargs[kwarg.__name] = kwarg.__value
return kwargs
def call(self, *args, **kwargs):
return MethodCaller(self, *args, **kwargs)
cpdef object _provide(self, tuple args, dict kwargs):
call = self.__provider()
if __isawaitable(call):
future_result = asyncio.Future()
call = asyncio.ensure_future(call)
call.add_done_callback(functools.partial(self._async_provide, future_result, args, kwargs))
return future_result
return __call(
call,
args,
self.__args,
self.__args_len,
kwargs,
self.__kwargs,
self.__kwargs_len,
)
def _async_provide(self, future_result, args, kwargs, future):
call = future.result()
result = __call(
call,
args,
self.__args,
self.__args_len,
kwargs,
self.__kwargs,
self.__kwargs_len,
)
future_result.set_result(result)
cdef class Injection(object):
"""Abstract injection class."""
cdef class PositionalInjection(Injection):
"""Positional injection class."""
def __init__(self, value):
"""Initializer."""
self.__value = value
self.__is_provider = <int>is_provider(value)
self.__is_delegated = <int>is_delegated(value)
self.__call = <int>(self.__is_provider == 1 and
self.__is_delegated == 0)
super(PositionalInjection, self).__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
return self.__class__(deepcopy(self.__value, memo))
def get_value(self):
"""Return injection value."""
return __get_value(self)
def get_original_value(self):
"""Return original value."""
return self.__value
cdef class NamedInjection(Injection):
"""Keyword injection class."""
def __init__(self, name, value):
"""Initializer."""
self.__name = name
self.__value = value
self.__is_provider = <int>is_provider(value)
self.__is_delegated = <int>is_delegated(value)
self.__call = <int>(self.__is_provider == 1 and
self.__is_delegated == 0)
super(NamedInjection, self).__init__()
def __deepcopy__(self, memo):
"""Create and return full copy of provider."""
copied = memo.get(id(self))
if copied is not None:
return copied
return self.__class__(deepcopy(self.__name, memo),
deepcopy(self.__value, memo))
def get_name(self):
"""Return injection value."""
return __get_name(self)
def get_value(self):
"""Return injection value."""
return __get_value(self)
def get_original_value(self):
"""Return original value."""
return self.__value
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef tuple parse_positional_injections(tuple args):
"""Parse positional injections."""
cdef list injections = list()
cdef int args_len = len(args)
cdef int index
cdef object arg
cdef PositionalInjection injection
for index in range(args_len):
arg = args[index]
injection = PositionalInjection(arg)
injections.append(injection)
return tuple(injections)
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef tuple parse_named_injections(dict kwargs):
"""Parse named injections."""
cdef list injections = list()
cdef object name
cdef object arg
cdef NamedInjection injection
for name, arg in kwargs.items():
injection = NamedInjection(name, arg)
injections.append(injection)
return tuple(injections)
cpdef bint is_provider(object instance):
"""Check if instance is provider instance.
:param instance: Instance to be checked.
:type instance: object
:rtype: bool
"""
return (not isinstance(instance, CLASS_TYPES) and
getattr(instance, '__IS_PROVIDER__', False) is True)
cpdef object ensure_is_provider(object instance):
"""Check if instance is provider instance and return it.
:param instance: Instance to be checked.
:type instance: object
:raise: :py:exc:`dependency_injector.errors.Error` if provided instance is
not provider.
:rtype: :py:class:`dependency_injector.providers.Provider`
"""
if not is_provider(instance):
raise Error('Expected provider instance, '
'got {0}'.format(str(instance)))
return instance
cpdef bint is_delegated(object instance):
"""Check if instance is delegated provider.
:param instance: Instance to be checked.
:type instance: object
:rtype: bool
"""
return (not isinstance(instance, CLASS_TYPES) and
getattr(instance, '__IS_DELEGATED__', False) is True)
cpdef str represent_provider(object provider, object provides):
"""Return string representation of provider.
:param provider: Provider object
:type provider: :py:class:`dependency_injector.providers.Provider`
:param provides: Object that provider provides
:type provider: object
:return: String representation of provider
:rtype: str
"""
return '<{provider}({provides}) at {address}>'.format(
provider='.'.join((provider.__class__.__module__,
provider.__class__.__name__)),
provides=repr(provides) if provides is not None else '',
address=hex(id(provider)))
cpdef object deepcopy(object instance, dict memo=None):
"""Return full copy of provider or container with providers."""
if memo is None:
memo = dict()
__add_sys_streams(memo)
return copy.deepcopy(instance, memo)
def __add_sys_streams(memo):
"""Add system streams to memo dictionary.
This helps to avoid copying of system streams while making a deepcopy of
objects graph.
"""
memo[id(sys.stdin)] = sys.stdin
memo[id(sys.stdout)] = sys.stdout
memo[id(sys.stderr)] = sys.stderr
def merge_dicts(dict1, dict2):
"""Merge dictionaries recursively.
:param dict1: Dictionary 1
:type dict1: dict
:param dict2: Dictionary 2
:type dict2: dict
:return: New resulting dictionary
:rtype: dict
"""
for key, value in dict1.items():
if key in dict2:
if isinstance(value, dict) and isinstance(dict2[key], dict):
dict2[key] = merge_dicts(value, dict2[key])
result = dict1.copy()
result.update(dict2)
return result
def isawaitable(obj):
"""Check if object is a coroutine function.
Return False for any object in Python 3.4 or below.
"""
try:
return inspect.isawaitable(obj)
except AttributeError:
return False
def iscoroutinefunction(obj):
"""Check if object is a coroutine function.
Return False for any object in Python 3.4 or below.
"""
try:
return inspect.iscoroutinefunction(obj)
except AttributeError:
return False
def isasyncgenfunction(obj):
"""Check if object is an asynchronous generator function.
Return False for any object in Python 3.4 or below.
"""
try:
return inspect.isasyncgenfunction(obj)
except AttributeError:
return False