98 lines
3.4 KiB
Python
98 lines
3.4 KiB
Python
from datetime import datetime
|
|
from fastapi import HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import and_
|
|
import typing as t
|
|
|
|
from app.db.cruddb.crudbase import crudbase
|
|
from fastapi_pagination.ext.sqlalchemy import paginate
|
|
from app.core.common import ApiReturnPage
|
|
|
|
from app.db import models, schemas
|
|
from app.core.security import chacha20Decrypt, get_password_hash
|
|
|
|
|
|
class dbpermission(crudbase):
|
|
def __init__(self):
|
|
super().__init__(model=models.Permission)
|
|
|
|
dbpermission = dbpermission()
|
|
|
|
class dbrole(crudbase):
|
|
def __init__(self):
|
|
super().__init__(model=models.Role)
|
|
|
|
dbrole = dbrole()
|
|
|
|
class dbuser(crudbase):
|
|
def __init__(self):
|
|
super().__init__(model=models.User)
|
|
|
|
def get_user(self,db: Session, user_id: int) -> schemas.User:
|
|
return super().get(db,user_id)
|
|
|
|
def get_user_by_email(self,db: Session, email: str) -> schemas.User:
|
|
return db.execute(super().get_by_conditions({"email":email})).scalars().first()
|
|
|
|
def get_users(self,db: Session) -> ApiReturnPage[models.Base]:
|
|
return paginate(db,super().get_all())
|
|
|
|
def get_users_not_admin(self,db: Session) -> ApiReturnPage[models.Base]:
|
|
return paginate(db,super().get_by_conditions({"is_superuser":False}))
|
|
|
|
def create_user(self,db: Session, user: schemas.UserCreate,userid:int):
|
|
hashed_password = get_password_hash(user.password)
|
|
user.hashed_password = hashed_password
|
|
user.createuserid = userid
|
|
user.updateuserid = userid
|
|
del user.password
|
|
return super().create(db,user)
|
|
|
|
def delete_user(self,db: Session, user_id: int):
|
|
return super().delete(db,user_id)
|
|
|
|
|
|
def edit_user(self,db: Session, user_id:int,user: schemas.UserEdit,userid: int) -> schemas.User:
|
|
if not user.password is None and user.password != "":
|
|
user.hashed_password = get_password_hash(user.password)
|
|
del user.password
|
|
user.updateuserid = userid
|
|
return super().update(db,user_id,user)
|
|
|
|
def get_roles(self,db: Session) -> t.List[schemas.RoleBase]:
|
|
return db.execute(dbrole.get_all()).scalars().all()
|
|
#return dbrole.get_all().all()
|
|
|
|
def get_roles_by_level(self,db: Session,roles:t.List[models.Role]) -> t.List[schemas.RoleBase]:
|
|
level = 99999
|
|
for role in roles:
|
|
if role.level < level:
|
|
level = role.level
|
|
return db.execute(dbrole.get_by_conditions({"level":{"operator":">","value":level}})).scalars().all()
|
|
|
|
def assign_userrole(self,db: Session, user_id: int, roles: t.List[int]):
|
|
db_user = super().get(db,user_id)
|
|
if db_user:
|
|
for role in db_user.roles:
|
|
if role.id not in roles:
|
|
db_user.roles.remove(role)
|
|
for roleid in roles:
|
|
role = dbrole.get(db,roleid)
|
|
if role not in db_user.roles:
|
|
db_user.roles.append(role)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
def get_permissions(self,db: Session) -> t.List[schemas.Permission]:
|
|
return db.execute(dbpermission.get_all()).scalars().all()
|
|
|
|
def get_user_permissions(self,db: Session,user_id: int) -> t.List[schemas.Permission]:
|
|
permissions =[]
|
|
db_user = super().get(db,user_id)
|
|
if db_user:
|
|
for role in db_user.roles:
|
|
permissions += role.permissions
|
|
return list(set(permissions))
|
|
|
|
userService = dbuser() |