"""Schema module.""" import builtins import importlib from typing import Dict, Any, Type, Optional from . import containers, providers ContainerSchema = Dict[Any, Any] ProviderSchema = Dict[Any, Any] class SchemaProcessorV1: def __init__(self, schema: ContainerSchema) -> None: self._schema = schema self._container = containers.DynamicContainer() def process(self): """Process schema.""" self._create_providers(self._schema['container']) self._setup_injections(self._schema['container']) def get_providers(self): """Return providers.""" return self._container.providers def _create_providers( self, provider_schema: ProviderSchema, container: Optional[containers.Container] = None, ) -> None: if container is None: container = self._container for provider_name, data in provider_schema.items(): provider = None if 'provider' in data: provider_type = _get_provider_cls(data['provider']) args = [] # provides = data.get('provides') # if provides: # provides = _import_string(provides) # if provides: # args.append(provides) provider = provider_type(*args) if provider is None: provider = providers.Container(containers.DynamicContainer) container.set_provider(provider_name, provider) if isinstance(provider, providers.Container): self._create_providers(provider_schema=data, container=provider) def _setup_injections( # noqa: C901 self, provider_schema: ProviderSchema, container: Optional[containers.Container] = None, ) -> None: if container is None: container = self._container for provider_name, data in provider_schema.items(): provider = getattr(container, provider_name) args = [] kwargs = {} provides = data.get('provides') if provides: if isinstance(provides, str) and provides.startswith('container.'): provides = self._resolve_provider(provides[len('container.'):]) else: provides = _import_string(provides) provider.set_provides(provides) arg_injections = data.get('args') if arg_injections: for arg in arg_injections: injection = None if isinstance(arg, str) and arg.startswith('container.'): injection = self._resolve_provider(arg[len('container.'):]) # TODO: refactoring if isinstance(arg, dict): provider_args = [] provider_type = _get_provider_cls(arg.get('provider')) provides = arg.get('provides') if provides: if isinstance(provides, str) and provides.startswith('container.'): provides = self._resolve_provider(provides[len('container.'):]) else: provides = _import_string(provides) provider_args.append(provides) for provider_arg in arg.get('args', []): if isinstance(provider_arg, str) \ and provider_arg.startswith('container.'): provider_args.append( self._resolve_provider(provider_arg[len('container.'):]), ) injection = provider_type(*provider_args) if not injection: injection = arg args.append(injection) if args: provider.add_args(*args) kwarg_injections = data.get('kwargs') if kwarg_injections: for name, arg in kwarg_injections.items(): injection = None if isinstance(arg, str) and arg.startswith('container.'): injection = self._resolve_provider(arg[len('container.'):]) # TODO: refactoring if isinstance(arg, dict): provider_args = [] provider_type = _get_provider_cls(arg.get('provider')) provides = arg.get('provides') if provides: if isinstance(provides, str) and provides.startswith('container.'): provides = self._resolve_provider(provides[len('container.'):]) else: provides = _import_string(provides) provider_args.append(provides) for provider_arg in arg.get('args', []): if isinstance(provider_arg, str) \ and provider_arg.startswith('container.'): provider_args.append( self._resolve_provider(provider_arg[len('container.'):]), ) injection = provider_type(*provider_args) if not injection: injection = arg kwargs[name] = injection if kwargs: provider.add_kwargs(**kwargs) if isinstance(provider, providers.Container): self._setup_injections(provider_schema=data, container=provider) def _resolve_provider(self, name: str) -> Optional[providers.Provider]: segments = name.split('.') try: provider = getattr(self._container, segments[0]) except AttributeError: return None for segment in segments[1:]: parentheses = '' if '(' in segment and ')' in segment: parentheses = segment[segment.find('('):segment.rfind(')')+1] segment = segment.replace(parentheses, '') try: provider = getattr(provider, segment) except AttributeError: # TODO return None if parentheses: # TODO provider = provider() return provider def build_schema(schema: ContainerSchema) -> Dict[str, providers.Provider]: """Build provider schema.""" schema_processor = SchemaProcessorV1(schema) schema_processor.process() return schema_processor.get_providers() def _get_provider_cls(provider_cls_name: str) -> Type[providers.Provider]: std_provider_type = _fetch_provider_cls_from_std(provider_cls_name) if std_provider_type: return std_provider_type custom_provider_type = _import_provider_cls(provider_cls_name) if custom_provider_type: return custom_provider_type raise SchemaError(f'Undefined provider class "{provider_cls_name}"') def _fetch_provider_cls_from_std(provider_cls_name: str) -> Optional[Type[providers.Provider]]: return getattr(providers, provider_cls_name, None) def _import_provider_cls(provider_cls_name: str) -> Optional[Type[providers.Provider]]: try: cls = _import_string(provider_cls_name) except (ImportError, ValueError) as exception: raise SchemaError(f'Can not import provider "{provider_cls_name}"') from exception except AttributeError: return None else: if isinstance(cls, type) and not issubclass(cls, providers.Provider): raise SchemaError(f'Provider class "{cls}" is not a subclass of providers base class') return cls def _import_string(string_name: str) -> Optional[object]: segments = string_name.split('.') if len(segments) == 1: member = getattr(builtins, segments[0], None) if member: return member module_name = '.'.join(segments[:-1]) if not module_name: return None member = segments[-1] module = importlib.import_module(module_name) return getattr(module, member, None) class SchemaError(Exception): """Schema-related error."""