bugfix get roles

This commit is contained in:
2024-12-22 15:00:44 +09:00
parent 1420773548
commit a5f5b3fccf
8 changed files with 78 additions and 18 deletions

View File

@@ -256,7 +256,7 @@ async def flow_details(
raise APIException('platform:flow',request.url._url,f"Error occurred while get flow by flowid:",e)
@r.get(
"/flows/{appid}",
"/flows/{appid}", tags=["App"],
response_model=List[Flow|None],
response_model_exclude_none=True,
)
@@ -270,7 +270,8 @@ async def flow_list(
domainurl = domainCacheService.get_default_domainurl(db,user.id) #get_activedomain(db, user.id)
if not domainurl:
return []
flows = get_flows_by_app(db, domainurl, appid)
#flows = get_flows_by_app(db, domainurl, appid)
flows = appService.get_flow(db,domainurl,appid,user.id)
return flows
except Exception as e:
raise APIException('platform:flow',request.url._url,f"Error occurred while get flow by appid:",e)

View File

@@ -13,7 +13,7 @@ from app.db.crud import (
assign_userrole,
get_roles,
)
from app.db.schemas import UserCreate, UserEdit, User, UserOut,RoleBase,Permission
from app.db.schemas import UserCreate, UserEdit, User, UserOut,RoleBase,AssignUserRoles,Permission
from app.core.auth import get_current_user,get_current_active_user, get_current_active_superuser
from app.db.cruddb import userService
from app.core import tenantCacheService
@@ -131,14 +131,13 @@ async def user_delete(
response_model_exclude_none=True,)
async def assign_role(
request: Request,
user_id:int,
roles:t.List[int],
userroles:AssignUserRoles,
db=Depends(get_db)
):
try:
return ApiReturnModel(data = userService.assign_userrole(db,user_id,roles))
return ApiReturnModel(data = userService.assign_userrole(db,userroles.userid,userroles.roleids))
except Exception as e:
raise APIException('user:userrole',request.url._url,f"Error occurred while assign user({user_id}) roles({roles}):",e)
raise APIException('user:userrole',request.url._url,f"Error occurred while assign user({userroles.userid}) roles({userroles.roleids}):",e)
@r.get(
"/roles",tags=["User"],
@@ -152,11 +151,12 @@ async def roles_list(
#current_user=Security(get_current_active_user, scopes=["role_list"]),
):
try:
if current_user.is_superuser:
roles = userService.get_roles(db)
else:
if len(current_user.roles)>0:
roles = userService.get_roles_by_level(db,current_user.roles[0].level)
roles = userService.get_roles_by_level(db,current_user.roles)
else:
roles = []
return ApiReturnModel(data = roles)

View File

@@ -1,7 +1,7 @@
from fastapi import HTTPException, status,Depends
import httpx
from app.db.schemas import ErrorCreate
from app.db.session import get_tenant_db
from app.core.dbmanager import get_log_db
from app.db.crud import create_log
class APIException(Exception):
@@ -31,9 +31,10 @@ class APIException(Exception):
self.error = ErrorCreate(location=location, title=title, content=content)
super().__init__(self.error)
def writedblog(exc: APIException,db = Depends(get_tenant_db())):
def writedblog(exc: APIException,):
#db = SessionLocal()
#try:
db = get_log_db()
try:
create_log(db,exc.error)
#finally:
#db.close()
finally:
db.close()

View File

@@ -2,6 +2,7 @@
from fastapi import Depends
from app.db.session import get_tenant_db,get_user_db
from app.core import tenantCacheService
from app.db.session import tenantdb
def get_db(tenant:str = "1",tenantdb = Depends(get_tenant_db)):
db_url = tenantCacheService.get_tenant_db(tenantdb,tenant)
@@ -10,3 +11,8 @@ def get_db(tenant:str = "1",tenantdb = Depends(get_tenant_db)):
yield db
finally:
db.close()
def get_log_db():
db = tenantdb.get_db()
return db

View File

@@ -60,10 +60,15 @@ class dbuser(crudbase):
return super().update(db,user_id,user)
def get_roles(self,db: Session) -> t.List[schemas.RoleBase]:
return dbrole.get_all(db).all()
return db.execute(dbrole.get_all()).scalars().all()
#return dbrole.get_all().all()
def get_roles_by_level(self,db: Session,level:int) -> t.List[schemas.RoleBase]:
return db.execute(dbrole.get_by_conditions({"level":{"operator":">=","value":level}})).scalars().all()
def get_roles_by_level(self,db: Session,roles:t.List[models.Role]) -> t.List[schemas.RoleBase]:
level = 99999
for role in roles:
if role.level < level:
level = role.level
return db.execute(dbrole.get_by_conditions({"level":{"operator":">","value":level}})).scalars().all()
def assign_userrole(self,db: Session, user_id: int, roles: t.List[int]):
db_user = super().get(db,user_id)

View File

@@ -25,6 +25,10 @@ class RoleBase(BaseModel):
class RoleWithPermission(RoleBase):
permissions:t.List[Permission] = []
class AssignUserRoles(BaseModel):
userid:int
roleids:t.List[int]
class UserBase(BaseModel):
email: str
is_active: bool = True

View File

@@ -129,6 +129,19 @@ def login_admin_id(login_admin):
id = payload.get("sub")
return id
@pytest.fixture(scope="session")
def test_role(test_db):
role = models.Role(
name = "test",
description = "test",
level = 1
)
test_db.add(role)
test_db.commit()
test_db.refresh(role)
return role.__dict__
@pytest.fixture(scope="session")
def test_domain(test_db,login_user_id):
domain = models.Domain(

View File

@@ -124,3 +124,33 @@ def test_user_delete(test_client, login_user):
response = test_client.get("/api/v1/users/"+ str(id), headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
assert "data" not in response.json()
def test_role_assign(test_client, login_user_id,login_user,test_role):
userroles ={
"userid":login_user_id,
"roleids":[test_role["id"]]
}
response = test_client.post("/api/v1/userrole", json=userroles, headers={"Authorization": "Bearer " + login_user})
data = response.json()
logging.error(data)
assert response.status_code == 200
response = test_client.get("/api/v1/users/"+ str(login_user_id), headers={"Authorization": "Bearer " + login_user})
data = response.json()
logging.error(data)
assert response.status_code == 200
assert "data" in data
assert len(data["data"]["roles"]) == 1
def test_roles_get(test_client,login_user):
response = test_client.get("/api/v1/roles", headers={"Authorization": "Bearer " + login_user})
data = response.json()
logging.error(data)
assert response.status_code == 200
assert len(data["data"]) == 0
def test_roles_admin_get(test_client,login_admin):
response = test_client.get("/api/v1/roles", headers={"Authorization": "Bearer " + login_admin})
data = response.json()
logging.error(data)
assert response.status_code == 200
assert len(data["data"]) == 1