Files
KintoneAppBuilder/backend/app/core/auth.py

86 lines
2.6 KiB
Python

from fastapi.security import SecurityScopes
import jwt
from fastapi import Depends, HTTPException, Request, Security, status
from jwt import PyJWTError
from app.db import models, schemas, session
from app.db.crud import get_user_by_email, create_user,get_user
from app.core import security
async def get_current_user(security_scopes: SecurityScopes,
db=Depends(session.get_db), token: str = Depends(security.oauth2_scheme)
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token, security.SECRET_KEY, algorithms=[security.ALGORITHM]
)
id: int = payload.get("sub")
if id is None:
raise credentials_exception
permissions: str = payload.get("permissions")
if not permissions =="ALL":
for scope in security_scopes.scopes:
if scope not in permissions.split(";"):
raise HTTPException(
status_code=403, detail="The user doesn't have enough privileges"
)
token_data = schemas.TokenData(id = id, permissions=permissions)
except PyJWTError:
raise credentials_exception
user = get_user(db, token_data.id)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: models.User = Depends(get_current_user),
):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
async def get_current_active_superuser(
current_user: models.User = Depends(get_current_user),
) -> models.User:
if not current_user.is_superuser:
raise HTTPException(
status_code=403, detail="The user doesn't have enough privileges"
)
return current_user
def authenticate_user(db, email: str, password: str):
user = get_user_by_email(db, email)
if not user:
return False
if not security.verify_password(password, user.hashed_password):
return False
return user
def sign_up_new_user(db, email: str, password: str, firstname: str,lastname: str):
user = get_user_by_email(db, email)
if user:
return False # User already exists
new_user = create_user(
db,
schemas.UserCreate(
email=email,
password=password,
first_name = firstname,
last_name = lastname,
is_active=True,
is_superuser=False,
),
)
return new_user