backend/med_backend/users/crud.py

73 lines
2.1 KiB
Python
Raw Normal View History

from typing import List
from fastapi import HTTPException
2022-12-09 14:40:57 +03:00
from sqlalchemy import delete, select, update
from sqlalchemy.ext.asyncio import AsyncSession
2022-12-09 02:33:22 +03:00
from med_backend.auth import schemas, services
2022-12-09 14:40:57 +03:00
from med_backend.auth.schemas import UpdateUserProfile
2022-12-09 02:33:22 +03:00
from med_backend.db.models.users import UserScheme
2022-12-09 02:33:22 +03:00
async def get_user_by_email(session: AsyncSession, email: str) -> schemas.User | None:
2022-12-08 12:18:59 +03:00
r = await session.execute(select(UserScheme).where(UserScheme.email == email))
user = r.scalars().first()
return user
2022-12-09 02:33:22 +03:00
async def get_user(session: AsyncSession, pk: int) -> schemas.User | None:
2022-12-08 12:18:59 +03:00
r = await session.execute(select(UserScheme).where(UserScheme.id == pk))
user = r.scalars().first()
return user
async def get_users(
session: AsyncSession,
skip: int = 0,
limit: int = 100,
2022-12-09 02:33:22 +03:00
) -> List[schemas.User] | None:
2022-12-08 03:27:15 +03:00
r = await session.execute(
select(UserScheme)
.where(UserScheme.is_manager == False)
.offset(skip)
.limit(limit),
)
users = r.scalars().all()
return users
async def create_user(session: AsyncSession, user: schemas.UserCreate) -> UserScheme:
if await get_user_by_email(session, user.email):
2022-12-08 12:18:59 +03:00
raise HTTPException(status_code=422, detail="Email already taken")
hashed_password = services.get_password_hash(user.password)
db_user = UserScheme(
email=user.email,
fullname=user.fullname,
2022-12-08 12:18:59 +03:00
gender=user.gender,
born=user.born.date(),
hashed_password=hashed_password,
disabled=False,
)
session.add(db_user)
await session.commit()
await session.refresh(db_user)
return db_user
2022-12-09 14:40:57 +03:00
async def update_user(session: AsyncSession, user_id: int, data: UpdateUserProfile):
if await get_user_by_email(session, data.email):
raise HTTPException(status_code=422, detail="Email already taken")
await session.execute(
update(UserScheme).where(UserScheme.id == user_id).values(**dict(data)),
)
await session.commit()
async def delete_user(session: AsyncSession, user_id: int):
await session.execute(
delete(UserScheme).where(UserScheme.id == user_id),
)
await session.commit()