backend/med_backend/db/utils.py

45 lines
1.5 KiB
Python

from sqlalchemy import text
from sqlalchemy.engine import make_url
from sqlalchemy.ext.asyncio import create_async_engine
from med_backend.settings import settings
async def create_database() -> None:
"""Create a databse."""
db_url = make_url(str(settings.db_url.with_path("/postgres")))
engine = create_async_engine(db_url, isolation_level="AUTOCOMMIT")
async with engine.connect() as conn:
database_existance = await conn.execute(
text(
f"SELECT 1 FROM pg_database WHERE datname='{settings.db_base}'", # noqa: E501, S608
),
)
database_exists = database_existance.scalar() == 1
if database_exists:
await drop_database()
async with engine.connect() as conn: # noqa: WPS440
await conn.execute(
text(
f'CREATE DATABASE "{settings.db_base}" ENCODING "utf8" TEMPLATE template1', # noqa: E501
),
)
async def drop_database() -> None:
"""Drop current database."""
db_url = make_url(str(settings.db_url.with_path("/postgres")))
engine = create_async_engine(db_url, isolation_level="AUTOCOMMIT")
async with engine.connect() as conn:
disc_users = (
"SELECT pg_terminate_backend(pg_stat_activity.pid) " # noqa: S608
"FROM pg_stat_activity "
f"WHERE pg_stat_activity.datname = '{settings.db_base}' "
"AND pid <> pg_backend_pid();"
)
await conn.execute(text(disc_users))
await conn.execute(text(f'DROP DATABASE "{settings.db_base}"'))