python-dependency-injector/examples/providers/singleton_thread_locals.py
Roman Mogilatov b0d507b8bf Release 3.1.3
2016-11-15 14:32:23 +02:00

52 lines
1.6 KiB
Python

"""`ThreadLocalSingleton` providers example."""
import threading
import Queue
import dependency_injector.providers as providers
def example(example_object, queue):
"""Example function that puts provided object in the provided queue."""
queue.put(example_object)
# Create thread-local singleton provider for some object (main thread):
thread_local_object = providers.ThreadLocalSingleton(object)
# Create singleton provider for thread-safe queue:
queue = providers.Singleton(Queue.Queue)
# Create callable provider for example(), inject dependencies:
example = providers.DelegatedCallable(example,
example_object=thread_local_object,
queue=queue)
# Create factory provider for threads that are targeted to execute example():
thread_factory = providers.Factory(threading.Thread,
target=example)
if __name__ == '__main__':
# Create 10 threads for concurrent execution of example():
threads = []
for thread_number in range(10):
threads.append(thread_factory(name='Thread{0}'.format(thread_number)))
# Start execution of all created threads:
for thread in threads:
thread.start()
# Wait while threads would complete their work:
for thread in threads:
thread.join()
# Making some asserts (main thread):
all_objects = set()
while not queue().empty():
all_objects.add(queue().get())
assert len(all_objects) == len(threads)
# Queue contains same number of objects as number of threads where
# thread-local singleton provider was used.