python-dependency-injector/examples/miniapps/asyncio-daemon/monitoringdaemon/tests.py
Roman Mogylatov 7d160cb4a5
Wiring with string module names (#515)
* Update main example

* Updating wiring module

* Update wiring test case name

* Implement string imports for wiring

* Update example

* Refactor implementation

* Update front example

* Fix a typo in README

* Update wiring docs

* Update single container example

* Update multiple containers example

* Update quotes in multiple containers example

* Update quotes in single container example

* Update decoupled-packages example

* Update single and multiple containers example

* Update quotes

* Update fastapi+redis example

* Update resource docs

* Update quotes in CLI tutorial

* Update CLI application (movie lister) tutorial

* Update monitoring daemon example

* Update python version in asyncio daemon example

* Update asyncio daemon tutorial

* Update quotes in wiring docs

* Update wiring docs
2021-09-30 15:03:19 -04:00

80 lines
2.0 KiB
Python

"""Tests module."""
import asyncio
import dataclasses
from unittest import mock
import pytest
from .containers import Container
@dataclasses.dataclass
class RequestStub:
status: int
content_length: int
@pytest.fixture
def container():
container = Container()
container.config.from_dict({
"log": {
"level": "INFO",
"formant": "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s",
},
"monitors": {
"example": {
"method": "GET",
"url": "http://fake-example.com",
"timeout": 1,
"check_every": 1,
},
"httpbin": {
"method": "GET",
"url": "https://fake-httpbin.org/get",
"timeout": 1,
"check_every": 1,
},
},
})
return container
@pytest.mark.asyncio
async def test_example_monitor(container, caplog):
caplog.set_level("INFO")
http_client_mock = mock.AsyncMock()
http_client_mock.request.return_value = RequestStub(
status=200,
content_length=635,
)
with container.http_client.override(http_client_mock):
example_monitor = container.example_monitor()
await example_monitor.check()
assert "http://fake-example.com" in caplog.text
assert "response code: 200" in caplog.text
assert "content length: 635" in caplog.text
@pytest.mark.asyncio
async def test_dispatcher(container, caplog, event_loop):
caplog.set_level("INFO")
example_monitor_mock = mock.AsyncMock()
httpbin_monitor_mock = mock.AsyncMock()
with container.example_monitor.override(example_monitor_mock), \
container.httpbin_monitor.override(httpbin_monitor_mock):
dispatcher = container.dispatcher()
event_loop.create_task(dispatcher.start())
await asyncio.sleep(0.1)
dispatcher.stop()
assert example_monitor_mock.check.called
assert httpbin_monitor_mock.check.called