1
0
Fork 0
mirror of https://github.com/mealie-recipes/mealie.git synced 2025-07-24 15:49:42 +02:00
mealie/mealie/routes/auth/auth.py
Hayden 82dc586bac
chores: updates-and-linters (#1868)
* switch to ruff

* add ruff

* run ruff --fix

* update ruff

* resolve ruff errors

* drop isort from CI

* fix decorator order
2022-11-30 20:20:28 -09:00

75 lines
2.6 KiB
Python

from datetime import timedelta
from fastapi import APIRouter, Depends, Form, status
from fastapi.exceptions import HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from sqlalchemy.orm.session import Session
from mealie.core import security
from mealie.core.dependencies import get_current_user
from mealie.core.security import authenticate_user
from mealie.core.security.security import UserLockedOut
from mealie.db.db_setup import generate_session
from mealie.routes._base.routers import UserAPIRouter
from mealie.schema.user import PrivateUser
public_router = APIRouter(tags=["Users: Authentication"])
user_router = UserAPIRouter(tags=["Users: Authentication"])
class CustomOAuth2Form(OAuth2PasswordRequestForm):
def __init__(
self,
grant_type: str = Form(None, regex="password"),
username: str = Form(...),
password: str = Form(...),
remember_me: bool = Form(False),
scope: str = Form(""),
client_id: str | None = Form(None),
client_secret: str | None = Form(None),
):
self.grant_type = grant_type
self.username = username
self.password = password
self.remember_me = remember_me
self.scopes = scope.split()
self.client_id = client_id
self.client_secret = client_secret
class MealieAuthToken(BaseModel):
access_token: str
token_type: str = "bearer"
@classmethod
def respond(cls, token: str, token_type: str = "bearer") -> dict:
return cls(access_token=token, token_type=token_type).dict()
@public_router.post("/token")
def get_token(data: CustomOAuth2Form = Depends(), session: Session = Depends(generate_session)):
email = data.username
password = data.password
try:
user = authenticate_user(session, email, password) # type: ignore
except UserLockedOut as e:
raise HTTPException(status_code=status.HTTP_423_LOCKED, detail="User is locked out") from e
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
)
duration = timedelta(days=14) if data.remember_me else None
access_token = security.create_access_token(dict(sub=str(user.id)), duration) # type: ignore
return MealieAuthToken.respond(access_token)
@user_router.get("/refresh")
async def refresh_token(current_user: PrivateUser = Depends(get_current_user)):
"""Use a valid token to get another token"""
access_token = security.create_access_token(data=dict(sub=str(current_user.id)))
return MealieAuthToken.respond(access_token)