refactor: dbcrud->dbuser
This commit is contained in:
@@ -30,8 +30,12 @@ class crudbase:
|
||||
and_conditions.append(column == filter_value)
|
||||
elif operator == '>':
|
||||
and_conditions.append(column > filter_value)
|
||||
elif operator == '>=':
|
||||
and_conditions.append(column >= filter_value)
|
||||
elif operator == '<':
|
||||
and_conditions.append(column < filter_value)
|
||||
elif operator == '<=':
|
||||
and_conditions.append(column <= filter_value)
|
||||
elif operator == 'in':
|
||||
if isinstance(filter_value, list):
|
||||
or_conditions.append(column.in_(filter_value))
|
||||
|
||||
92
backend/app/db/cruddb/dbuser.py
Normal file
92
backend/app/db/cruddb/dbuser.py
Normal file
@@ -0,0 +1,92 @@
|
||||
from datetime import datetime
|
||||
from fastapi import HTTPException, status
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import and_
|
||||
import typing as t
|
||||
|
||||
from app.db.cruddb.crudbase import crudbase
|
||||
from fastapi_pagination.ext.sqlalchemy import paginate
|
||||
from app.core.common import ApiReturnPage
|
||||
|
||||
from app.db import models, schemas
|
||||
from app.core.security import chacha20Decrypt, get_password_hash
|
||||
|
||||
|
||||
class dbpermission(crudbase):
|
||||
def __init__(self):
|
||||
super().__init__(model=models.Permission)
|
||||
|
||||
dbpermission = dbpermission()
|
||||
|
||||
class dbrole(crudbase):
|
||||
def __init__(self):
|
||||
super().__init__(model=models.Role)
|
||||
|
||||
dbrole = dbrole()
|
||||
|
||||
class dbuser(crudbase):
|
||||
def __init__(self):
|
||||
super().__init__(model=models.User)
|
||||
|
||||
def get_user(self,db: Session, user_id: int) -> schemas.User:
|
||||
return super().get(db,user_id)
|
||||
|
||||
def get_user_by_email(self,db: Session, email: str) -> schemas.User:
|
||||
return super().get_by_conditions(db,{"email":email}).first()
|
||||
|
||||
def get_users(self,db: Session) -> ApiReturnPage[models.Base]:
|
||||
return paginate(super().get_all())
|
||||
|
||||
def get_users_not_admin(self,db: Session) -> ApiReturnPage[models.Base]:
|
||||
return paginate(super().get_by_conditions(db,{"is_superuser":False}))
|
||||
|
||||
def create_user(self,db: Session, user: schemas.UserCreate,userid:int):
|
||||
hashed_password = get_password_hash(user.password)
|
||||
user.hashed_password = hashed_password
|
||||
user.createuserid = userid
|
||||
user.updateuserid = userid
|
||||
del user.password
|
||||
return super().create(db,user)
|
||||
|
||||
def delete_user(self,db: Session, user_id: int):
|
||||
return super().delete(db,user_id)
|
||||
|
||||
|
||||
def edit_user(self,db: Session, user_id:int,user: schemas.UserEdit,userid: int) -> schemas.User:
|
||||
if not user.password is None and user.password != "":
|
||||
user.hashed_password = get_password_hash(user.password)
|
||||
del user.password
|
||||
user.updateuserid = userid
|
||||
return super().update(db,user_id,user)
|
||||
|
||||
def get_roles(self,db: Session) -> t.List[schemas.RoleBase]:
|
||||
return dbrole.get_all(db).all()
|
||||
|
||||
def get_roles_by_level(self,db: Session,level:int) -> t.List[schemas.RoleBase]:
|
||||
return dbrole.get_by_conditions(db,{"level":{"operator":">=","value":level}}).all()
|
||||
|
||||
def assign_userrole(self,db: Session, user_id: int, roles: t.List[int]):
|
||||
db_user = super().get(db,user_id)
|
||||
if db_user:
|
||||
for role in db_user.roles:
|
||||
db_user.roles.remove(role)
|
||||
for roleid in roles:
|
||||
role = dbrole.get(db,roleid)
|
||||
if role:
|
||||
db_user.roles.append(role)
|
||||
db.commit()
|
||||
db.refresh(db_user)
|
||||
return db_user
|
||||
|
||||
def get_permissions(self,db: Session,user_id: int) -> t.List[schemas.Permission]:
|
||||
return dbpermission.get_by_conditions(db).all()
|
||||
|
||||
def get_user_permissions(self,db: Session,user_id: int) -> t.List[schemas.Permission]:
|
||||
permissions =[]
|
||||
db_user = super().get(db,user_id)
|
||||
if db_user:
|
||||
for role in db_user.roles:
|
||||
permissions += role.permissions
|
||||
return list(set(permissions))
|
||||
|
||||
dbuser = dbuser()
|
||||
Reference in New Issue
Block a user