python-dependency-injector/examples/miniapps/fastapi-sqlalchemy/webapp/endpoints.py
Roman Mogylatov d45d98e300
Fastapi sqlalchemy example (#389)
* Add application

* Dockerize the app

* Fix 204 content-leength error

* Rename database file

* Add tests

* Add README

* Fix a typo in FastAPI example

* Add docs on FastAPI + SQLAlchemy example

* Update changelog

* Add link to the example to README and other docs pages

* Add EOF to the config.yml
2021-02-04 18:18:25 -05:00

58 lines
1.4 KiB
Python

"""Endpoints module."""
from fastapi import APIRouter, Depends, Response, status
from dependency_injector.wiring import inject, Provide
from .containers import Container
from .services import UserService
from .repositories import NotFoundError
router = APIRouter()
@router.get('/users')
@inject
def get_list(
user_service: UserService = Depends(Provide[Container.user_service]),
):
return user_service.get_users()
@router.get('/users/{user_id}')
@inject
def get_by_id(
user_id: int,
user_service: UserService = Depends(Provide[Container.user_service]),
):
try:
return user_service.get_user_by_id(user_id)
except NotFoundError:
return Response(status_code=status.HTTP_404_NOT_FOUND)
@router.post('/users', status_code=status.HTTP_201_CREATED)
@inject
def add(
user_service: UserService = Depends(Provide[Container.user_service]),
):
return user_service.create_user()
@router.delete('/users/{user_id}', status_code=status.HTTP_204_NO_CONTENT)
@inject
def remove(
user_id: int,
user_service: UserService = Depends(Provide[Container.user_service]),
):
try:
user_service.delete_user_by_id(user_id)
except NotFoundError:
return Response(status_code=status.HTTP_404_NOT_FOUND)
else:
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.get('/status')
def get_status():
return {'status': 'OK'}