backend/med_backend/web/lifetime.py

91 lines
2.4 KiB
Python

from asyncio import current_task
from typing import Awaitable, Callable
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_scoped_session,
create_async_engine,
)
from sqlalchemy.orm import sessionmaker
from med_backend.db.meta import meta
from med_backend.db.models import load_all_models
from med_backend.services.redis.lifetime import init_redis, shutdown_redis
from med_backend.settings import settings
def _setup_db(app: FastAPI) -> None: # pragma: no cover
"""
Creates connection to the database.
This function creates SQLAlchemy engine instance,
session_factory for creating sessions
and stores them in the application's state property.
:param app: fastAPI application.
"""
engine = create_async_engine(str(settings.db_url), echo=settings.db_echo)
session_factory = async_scoped_session(
sessionmaker(
engine,
expire_on_commit=False,
class_=AsyncSession,
),
scopefunc=current_task,
)
app.state.db_engine = engine
app.state.db_session_factory = session_factory
async def _create_tables() -> None: # pragma: no cover
"""Populates tables in the database."""
load_all_models()
engine = create_async_engine(str(settings.db_url))
async with engine.begin() as connection:
await connection.run_sync(meta.create_all)
await engine.dispose()
def register_startup_event(
app: FastAPI,
) -> Callable[[], Awaitable[None]]: # pragma: no cover
"""
Actions to run on application startup.
This function uses fastAPI app to store data
inthe state, such as db_engine.
:param app: the fastAPI application.
:return: function that actually performs actions.
"""
@app.on_event("startup")
async def _startup() -> None: # noqa: WPS430
_setup_db(app)
await _create_tables()
init_redis(app)
pass # noqa: WPS420
return _startup
def register_shutdown_event(
app: FastAPI,
) -> Callable[[], Awaitable[None]]: # pragma: no cover
"""
Actions to run on application's shutdown.
:param app: fastAPI application.
:return: function that actually performs actions.
"""
@app.on_event("shutdown")
async def _shutdown() -> None: # noqa: WPS430
await app.state.db_engine.dispose()
await shutdown_redis(app)
pass # noqa: WPS420
return _shutdown