python-dependency-injector/examples/providers/singleton_thread_locals.py
Roman Mogylatov 0bb30f91ef
Singleton docs update (#288)
* Update docblocks of factory provider examples

* Edit singleton docs
2020-09-01 16:04:48 -04:00

44 lines
1.1 KiB
Python

"""`ThreadLocalSingleton` provider example."""
import threading
import queue
from dependency_injector import providers
def put_in_queue(example_object, queue_object):
queue_object.put(example_object)
thread_local_object = providers.ThreadLocalSingleton(object)
queue_provider = providers.ThreadSafeSingleton(queue.Queue)
put_in_queue = providers.Callable(
put_in_queue,
example_object=thread_local_object,
queue_object=queue_provider,
)
thread_factory = providers.Factory(
threading.Thread,
target=put_in_queue.provider,
)
if __name__ == '__main__':
threads = []
for thread_number in range(10):
threads.append(
thread_factory(name='Thread{0}'.format(thread_number)),
)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
all_objects = set()
while not queue_provider().empty():
all_objects.add(queue_provider().get())
assert len(all_objects) == len(threads)
# Queue contains same number of objects as number of threads where
# thread-local singleton provider was used.