python-dependency-injector/examples/miniapps/monitoring-daemon-asyncio/monitoringdaemon/dispatcher.py
Roman Mogylatov 3a61457be7
Asyncio daemon tutorial (#278)
* Fix a typo in the docblock of the Configuration provider

* Update the changelog

* Add tutorial sections

* Switch to use https for httpbin.org requests

* Add what we are going to build section

* Fix ``Makefile`` to run ``aiohttp`` integration tests on Python 3.5+

* Add prerequisities and prepare the env sections

* Add logging, config and the dispacher sections

* Change logging

* Fix multiple typos in the ``flask`` and ``aiohttp`` tutorials

* Add the initial and dirty version

* Fix multiple typos in the ``flask`` and ``aiohttp`` tutorials

* Fix the 3.27.0 changelog

* Finish all the parts before the dispatcher

* Finish dispatcher section

* Update http monitor logging format

* Finish the tutorial

* Fix docblock in the dispatcher module

* Update changelog

* Add meta keywords and description
2020-08-08 14:48:05 -04:00

66 lines
1.8 KiB
Python

"""Dispatcher module."""
import asyncio
import logging
import signal
import time
from typing import List
from .monitors import Monitor
class Dispatcher:
def __init__(self, monitors: List[Monitor]) -> None:
self._monitors = monitors
self._monitor_tasks: List[asyncio.Task] = []
self._logger = logging.getLogger(self.__class__.__name__)
self._stopping = False
def run(self) -> None:
asyncio.run(self.start())
async def start(self) -> None:
self._logger.info('Starting up')
for monitor in self._monitors:
self._monitor_tasks.append(
asyncio.create_task(self._run_monitor(monitor)),
)
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop)
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop)
await asyncio.gather(*self._monitor_tasks, return_exceptions=True)
self.stop()
def stop(self) -> None:
if self._stopping:
return
self._stopping = True
self._logger.info('Shutting down')
for task, monitor in zip(self._monitor_tasks, self._monitors):
task.cancel()
self._logger.info('Shutdown finished successfully')
@staticmethod
async def _run_monitor(monitor: Monitor) -> None:
def _until_next(last: float) -> float:
time_took = time.time() - last
return monitor.check_every - time_took
while True:
time_start = time.time()
try:
await monitor.check()
except asyncio.CancelledError:
break
except Exception:
monitor.logger.exception('Error executing monitor check')
await asyncio.sleep(_until_next(last=time_start))