Add ThreadLocalSingleton, its delegated version and example of usage

This commit is contained in:
Roman Mogilatov 2016-08-17 00:03:26 +03:00
parent 1c47f73610
commit f27fa60413
3 changed files with 157 additions and 0 deletions

View File

@ -17,6 +17,8 @@ from dependency_injector.providers.creational import (
DelegatedFactory,
Singleton,
DelegatedSingleton,
ThreadLocalSingleton,
DelegatedThreadLocalSingleton
)
@ -34,6 +36,10 @@ __all__ = (
'Factory',
'DelegatedFactory',
'Singleton',
'DelegatedSingleton',
'ThreadLocalSingleton',
'DelegatedThreadLocalSingleton',
)

View File

@ -1,5 +1,7 @@
"""Dependency injector creational providers."""
import threading
import six
from dependency_injector.providers.callable import Callable
@ -247,3 +249,103 @@ class DelegatedSingleton(Singleton):
:rtype: object
"""
return self
class ThreadLocalSingleton(Factory):
""":py:class:`ThreadLocalSingleton` is singleton based on thread locals.
:py:class:`ThreadLocalSingleton` provider creates instance once for each
thread and returns it on every call. :py:class:`ThreadLocalSingleton`
extends :py:class:`Factory`, so, please follow :py:class:`Factory`
documentation for getting familiar with injections syntax.
:py:class:`ThreadLocalSingleton` is thread-safe and could be used in
multithreading environment without any negative impact.
Retrieving of provided instance can be performed via calling
:py:class:`ThreadLocalSingleton` object:
.. code-block:: python
singleton = ThreadLocalSingleton(SomeClass)
some_object = singleton()
.. py:attribute:: provided_type
If provided type is defined, provider checks that providing class is
its subclass.
:type: type | None
.. py:attribute:: cls
Class that provides object.
Alias for :py:attr:`provides`.
:type: type
"""
__slots__ = ('local_storage',)
def __init__(self, provides, *args, **kwargs):
"""Initializer.
:param provides: Class or other callable that provides object
for creation.
:type provides: type | callable
"""
self.local_storage = threading.local()
super(ThreadLocalSingleton, self).__init__(provides, *args, **kwargs)
def reset(self):
"""Reset cached instance, if any.
:rtype: None
"""
self.local_storage.instance = None
def _provide(self, *args, **kwargs):
"""Return provided instance.
:param args: Tuple of context positional arguments.
:type args: tuple[object]
:param kwargs: Dictionary of context keyword arguments.
:type kwargs: dict[str, object]
:rtype: object
"""
try:
instance = self.local_storage.instance
except AttributeError:
instance = super(ThreadLocalSingleton, self)._provide(*args,
**kwargs)
self.local_storage.instance = instance
finally:
return instance
class DelegatedThreadLocalSingleton(ThreadLocalSingleton):
""":py:class:`ThreadLocalSingleton` that is injected "as is".
.. py:attribute:: provided_type
If provided type is defined, provider checks that providing class is
its subclass.
:type: type | None
.. py:attribute:: cls
Class that provides object.
Alias for :py:attr:`provides`.
:type: type
"""
def provide_injection(self):
"""Injection strategy implementation.
:rtype: object
"""
return self

View File

@ -0,0 +1,49 @@
"""`ThreadLocalSingleton` providers example."""
import threading
import Queue
import dependency_injector.providers as providers
def example(example_object, queue):
"""Example function that puts provided object in the provided queue."""
queue.put(example_object)
# Create thread-local singleton provider for some object (main thread):
thread_local_object = providers.ThreadLocalSingleton(object)
# Create singleton provider for thread-safe queue:
queue = providers.Singleton(Queue.Queue)
# Create callable provider for example(), inject dependencies:
example = providers.DelegatedCallable(example,
example_object=thread_local_object,
queue=queue)
# Create factory provider for threads that are targeted to execute example():
thread_factory = providers.Factory(threading.Thread,
target=example)
# Create 10 threads for concurrent execution of example():
threads = []
for thread_number in xrange(10):
threads.append(thread_factory(name='Thread{0}'.format(thread_number)))
# Start execution of all create threads:
for thread in threads:
thread.start()
# Wait while threads would complete their work:
for thread in threads:
thread.join()
# Making some asserts (main thread):
all_objects = set()
while not queue().empty():
all_objects.add(queue().get())
assert len(all_objects) == len(threads)
# Queue contains same number of objects as number of threads where thread-local
# singleton provider was used.