add appversion & test case

This commit is contained in:
2024-12-08 22:01:27 +09:00
parent 62b6d7a878
commit 8c4aa3119a
10 changed files with 277 additions and 52 deletions

View File

@@ -11,7 +11,7 @@ from app.core.auth import get_current_active_user,get_current_user
from app.core.apiexception import APIException
from app.core.common import ApiReturnModel,ApiReturnPage
#from fastapi_pagination import Page
from app.db.cruddb import dbdomain
from app.db.cruddb import domainService,appService
import httpx
import app.core.config as config
@@ -27,7 +27,7 @@ async def test(
user = Depends(get_current_active_user),
db=Depends(get_db),
):
dbdomain.select(db,{"tenantid":1,"name":["b","c"]})
domainService.select(db,{"tenantid":1,"name":["b","c"]})
@r.get(
@@ -42,11 +42,11 @@ async def apps_list(
):
try:
domain = dbdomain.get_default_domain(db,user.id) #get_activedomain(db, user.id)
domain = domainService.get_default_domain(db,user.id) #get_activedomain(db, user.id)
if not domain:
return ApiReturnModel(data = None)
filtered_apps = []
platformapps = get_apps(db,domain.url)
platformapps = appService.get_apps(db,domain.url)
kintoneevn = config.KINTONE_ENV(domain)
headers={config.API_V1_AUTH_KEY:kintoneevn.API_V1_AUTH_VALUE}
url = f"{kintoneevn.BASE_URL}{config.API_V1_STR}/apps.json"
@@ -72,8 +72,9 @@ async def apps_list(
except Exception as e:
raise APIException('platform:apps',request.url._url,f"Error occurred while get apps:",e)
@r.post("/apps", response_model=AppList, response_model_exclude_none=True)
@r.post("/apps", tags=["App"],
response_model=ApiReturnModel[AppList|None],
response_model_exclude_none=True)
async def apps_update(
request: Request,
app: AppVersion,
@@ -81,24 +82,47 @@ async def apps_update(
db=Depends(get_db),
):
try:
return update_appversion(db, app,user.id)
return ApiReturnModel(data =appService.update_appversion(db, app,user.id))
except Exception as e:
raise APIException('platform:apps',request.url._url,f"Error occurred while get create app :",e)
@r.delete(
"/apps/{domainurl}/{appid}", response_model_exclude_none=True
"/apps/{appid}",tags=["App"],
response_model=ApiReturnModel[AppList|None],
response_model_exclude_none=True
)
async def apps_delete(
async def delete_app(
request: Request,
domainurl:str,
appid: str,
user=Depends(get_current_active_user),
db=Depends(get_db),
):
try:
return delete_apps(db, domainurl,appid)
domain = domainService.get_default_domain(db, user.id) #get_activedomain(db, user.id)
if not domain:
return ApiReturnModel(data = None)
return ApiReturnModel(data =appService.delete_app(db, domain.url,appid))
except Exception as e:
raise APIException('platform:apps',request.url._url,f"Error occurred while delete apps({domainurl}:{appid}):",e)
raise APIException('platform:apps',request.url._url,f"Error occurred while delete app({appid}):",e)
@r.get(
"/appversions/{appid}",tags=["App"],
response_model=ApiReturnPage[AppVersion|None],
response_model_exclude_none=True,
)
async def appversions_list(
request: Request,
appid: str,
user = Depends(get_current_active_user),
db=Depends(get_db),
):
try:
domain = domainService.get_default_domain(db, user.id) #get_activedomain(db, user.id)
if not domain:
return ApiReturnPage(data = None)
return appService.get_appversions(db,domain.url,appid)
except Exception as e:
raise APIException('platform:appversions',request.url._url,f"Error occurred while get app({appid}) version :",e)
@r.get(
"/appsettings/{id}",
@@ -218,7 +242,7 @@ async def flow_list(
db=Depends(get_db),
):
try:
domain = dbdomain.get_default_domain(db, user.id) #get_activedomain(db, user.id)
domain = domainService.get_default_domain(db, user.id) #get_activedomain(db, user.id)
if not domain:
return []
print("domain=>",domain)
@@ -228,7 +252,9 @@ async def flow_list(
raise APIException('platform:flow',request.url._url,f"Error occurred while get flow by appid:",e)
@r.post("/flow", response_model=Flow|None, response_model_exclude_none=True)
@r.post("/flow", tags=["App"],
response_model=ApiReturnModel[Flow|None],
response_model_exclude_none=True)
async def flow_create(
request: Request,
flow: FlowIn,
@@ -236,10 +262,10 @@ async def flow_create(
db=Depends(get_db),
):
try:
domain = dbdomain.get_default_domain(db, user.id) #get_activedomain(db, user.id)
domain = domainService.get_default_domain(db, user.id) #get_activedomain(db, user.id)
if not domain:
return None
return create_flow(db, domain.url, flow)
return ApiReturnModel(data = None)
return ApiReturnModel(data = appService.create_flow(db, domain.url, flow,user.id))
except Exception as e:
raise APIException('platform:flow',request.url._url,f"Error occurred while create flow:",e)
@@ -255,7 +281,7 @@ async def flow_edit(
db=Depends(get_db),
):
try:
domain = dbdomain.get_default_domain(db, user.id) #get_activedomain(db, user.id)
domain = domainService.get_default_domain(db, user.id) #get_activedomain(db, user.id)
if not domain:
return None
return edit_flow(db,domain.url, flow,user.id)
@@ -288,9 +314,9 @@ async def domain_list(
):
try:
if user.is_superuser:
domains = dbdomain.get_domains(db)
domains = domainService.get_domains(db)
else:
domains = dbdomain.get_domains_by_owner(db,user.id)
domains = domainService.get_domains_by_owner(db,user.id)
return domains
except Exception as e:
raise APIException('platform:domains',request.url._url,f"Error occurred while get domains:",e)
@@ -307,7 +333,7 @@ async def domain_detail(
db=Depends(get_db),
):
try:
return ApiReturnModel(data = dbdomain.get(db,domain_id))
return ApiReturnModel(data = domainService.get(db,domain_id))
except Exception as e:
raise APIException('platform:domain',request.url._url,f"Error occurred while get domain detail:",e)
@@ -321,7 +347,7 @@ async def domain_create(
db=Depends(get_db),
):
try:
return ApiReturnModel(data = dbdomain.create_domain(db, domain,user.id))
return ApiReturnModel(data = domainService.create_domain(db, domain,user.id))
except Exception as e:
raise APIException('platform:domain',request.url._url,f"Error occurred while create domain:",e)
@@ -338,7 +364,7 @@ async def domain_edit(
db=Depends(get_db),
):
try:
return ApiReturnModel(data = dbdomain.edit_domain(db, domain,user.id))
return ApiReturnModel(data = domainService.edit_domain(db, domain,user.id))
except Exception as e:
raise APIException('platform:domain',request.url._url,f"Error occurred while edit domain:",e)
@@ -354,7 +380,7 @@ async def domain_delete(
db=Depends(get_db),
):
try:
return ApiReturnModel(data = dbdomain.delete_domain(db,id))
return ApiReturnModel(data = domainService.delete_domain(db,id))
except Exception as e:
raise APIException('platform:domain',request.url._url,f"Error occurred while delete domain:",e)
@@ -389,9 +415,9 @@ async def create_userdomain(
):
try:
if user.is_superuser:
domain = dbdomain.add_userdomain(db,user.id,userid,domainid)
domain = domainService.add_userdomain(db,user.id,userid,domainid)
else:
domain = dbdomain.add_userdomain_by_owner(db,user.id,userid,domainid)
domain = domainService.add_userdomain_by_owner(db,user.id,userid,domainid)
return ApiReturnModel(data = domain)
except Exception as e:
raise APIException('platform:domain',request.url._url,f"Error occurred while add user({userid}) domain:",e)
@@ -409,7 +435,7 @@ async def delete_userdomain(
db=Depends(get_db),
):
try:
return ApiReturnModel(data = dbdomain.delete_userdomain(db,userid,domainid))
return ApiReturnModel(data = domainService.delete_userdomain(db,userid,domainid))
except Exception as e:
raise APIException('platform:delete',request.url._url,f"Error occurred while delete user({userid}) domain:",e)
@@ -425,7 +451,7 @@ async def get_defaultuserdomain(
db=Depends(get_db),
):
try:
return ApiReturnModel(data =dbdomain.get_default_domain(db, user.id))
return ApiReturnModel(data =domainService.get_default_domain(db, user.id))
except Exception as e:
raise APIException('platform:defaultdomain',request.url._url,f"Error occurred while get user({user.id}) defaultdomain:",e)
@@ -441,7 +467,7 @@ async def set_defualtuserdomain(
db=Depends(get_db),
):
try:
domain = dbdomain.set_default_domain(db,user.id,domainid)
domain = domainService.set_default_domain(db,user.id,domainid)
return ApiReturnModel(data= domain)
except Exception as e:
raise APIException('platform:defaultdomain',request.url._url,f"Error occurred while update user({user.id}) defaultdomain:",e)
@@ -459,7 +485,7 @@ async def get_domainshareduser(
db=Depends(get_db),
):
try:
return dbdomain.get_shareddomain_users(db,user.id,domainid)
return domainService.get_shareddomain_users(db,user.id,domainid)
except Exception as e:
raise APIException('platform:sharedomain',request.url._url,f"Error occurred while get user({user.id}) sharedomain:",e)

View File

@@ -15,7 +15,7 @@ from app.db.crud import (
)
from app.db.schemas import UserCreate, UserEdit, User, UserOut,RoleBase,Permission
from app.core.auth import get_current_user,get_current_active_user, get_current_active_superuser
from app.db.cruddb import dbuser
from app.db.cruddb import userService
users_router = r = APIRouter()
@@ -32,9 +32,9 @@ async def users_list(
):
try:
if current_user.is_superuser:
users = dbuser.get_users(db)
users = userService.get_users(db)
else:
users = dbuser.get_users_not_admin(db)
users = userService.get_users_not_admin(db)
return users
except Exception as e:
raise APIException('user:users',request.url._url,f"Error occurred while get user list",e)
@@ -59,7 +59,7 @@ async def user_details(
current_user=Depends(get_current_active_user),
):
try:
user = dbuser.get(db, user_id)
user = userService.get(db, user_id)
if user:
if user.is_superuser and not current_user.is_superuser:
user = None
@@ -81,7 +81,7 @@ async def user_create(
try:
if user.is_superuser and not current_user.is_superuser:
return ApiReturnModel(data = None)
return ApiReturnModel(data =dbuser.create_user(db, user,current_user.id))
return ApiReturnModel(data =userService.create_user(db, user,current_user.id))
except Exception as e:
raise APIException('user:users',request.url._url,f"Error occurred while create user({user.email}):",e)
@@ -101,7 +101,7 @@ async def user_edit(
try:
if user.is_superuser and not current_user.is_superuser:
return ApiReturnModel(data = None)
return ApiReturnModel(data = dbuser.edit_user(db,user_id,user,current_user.id))
return ApiReturnModel(data = userService.edit_user(db,user_id,user,current_user.id))
except Exception as e:
raise APIException('user:users',request.url._url,f"Error occurred while edit user({user_id}):",e)
@@ -117,10 +117,10 @@ async def user_delete(
current_user=Depends(get_current_active_user),
):
try:
user = dbuser.get(db,user_id)
user = userService.get(db,user_id)
if user.is_superuser and not current_user.is_superuser:
return ApiReturnModel(data = None)
return ApiReturnModel(data = dbuser.delete_user(db, user_id))
return ApiReturnModel(data = userService.delete_user(db, user_id))
except Exception as e:
raise APIException('user:users',request.url._url,f"Error occurred while delete user({user_id}):",e)
@@ -135,7 +135,7 @@ async def assign_role(
db=Depends(get_db)
):
try:
return ApiReturnModel(data = dbuser.assign_userrole(db,user_id,roles))
return ApiReturnModel(data = userService.assign_userrole(db,user_id,roles))
except Exception as e:
raise APIException('user:userrole',request.url._url,f"Error occurred while assign user({user_id}) roles({roles}):",e)
@@ -152,10 +152,10 @@ async def roles_list(
):
try:
if current_user.is_superuser:
roles = dbuser.get_roles(db)
roles = userService.get_roles(db)
else:
if len(current_user.roles)>0:
roles = dbuser.get_roles_by_level(db,current_user.roles[0].level)
roles = userService.get_roles_by_level(db,current_user.roles[0].level)
else:
roles = []
return ApiReturnModel(data = roles)
@@ -175,10 +175,10 @@ async def permssions_list(
):
try:
if current_user.is_superuser:
permissions = dbuser.get_permissions(db)
permissions = userService.get_permissions(db)
else:
if len(current_user.roles)>0:
permissions = dbuser.get_user_permissions(db,current_user.id)
permissions = userService.get_user_permissions(db,current_user.id)
else:
permissions = []
return ApiReturnModel(data = permissions)

View File

@@ -6,7 +6,7 @@ from jwt import PyJWTError
from app.db import models, schemas, session
from app.db.crud import get_user_by_email, create_user,get_user
from app.core import security
from app.db.cruddb import dbuser
from app.db.cruddb import userService
async def get_current_user(security_scopes: SecurityScopes,
db=Depends(session.get_db), token: str = Depends(security.oauth2_scheme)
@@ -35,7 +35,7 @@ async def get_current_user(security_scopes: SecurityScopes,
token_data = schemas.TokenData(id = id, permissions=permissions)
except PyJWTError:
raise credentials_exception
user = dbuser.get_user(db, token_data.id)
user = userService.get_user(db, token_data.id)
if user is None:
raise credentials_exception
return user

View File

@@ -1,2 +1,3 @@
from app.db.cruddb.dbuser import dbuser
from app.db.cruddb.dbdomain import dbdomain
from app.db.cruddb.dbuser import userService
from app.db.cruddb.dbdomain import domainService
from app.db.cruddb.dbapp import appService

View File

@@ -0,0 +1,129 @@
from datetime import datetime
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy import select,and_
import typing as t
from app.db.cruddb.crudbase import crudbase
from fastapi_pagination.ext.sqlalchemy import paginate
from app.core.common import ApiReturnPage
from app.db import models, schemas
from app.core.security import chacha20Decrypt, get_password_hash
class dbflow(crudbase):
def __init__(self):
super().__init__(model=models.Flow)
def get_domain_apps(self):
return None
def get_domain_app_by_appid(self,db: Session,domainurl:str,appid:str):
return db.execute(select(models.Flow).filter(and_(models.Flow.domainurl == domainurl,models.Flow.appid == appid))).scalars().all()
def create_flow(self,db: Session, domainurl: str, flow: schemas.FlowIn,userid:int):
db_flow = models.Flow(
flowid=flow.flowid,
appid=flow.appid,
eventid=flow.eventid,
domainurl=domainurl,
name=flow.name,
content=flow.content,
createuserid = userid,
updateuserid = userid
)
db.add(db_flow)
db_app = db.execute(select(models.App).filter(and_(models.App.domainurl == domainurl,models.App.appid == flow.appid))).scalars().first()
if not db_app:
db_app = models.App(
domainurl = domainurl,
appid=flow.appid,
appname=flow.appname,
version = 0,
createuserid= userid,
updateuserid = userid
)
db.add(db_app)
db.commit()
db.refresh(db_flow)
return db_flow
dbflow = dbflow()
class dbappversion(crudbase):
def __init__(self):
super().__init__(model=models.AppVersion)
def get_appversions(self,domainurl:str,appid:str):
return super().get_by_conditions({"domainurl":domainurl,"appid":appid})
dbappversion = dbappversion()
class dbapp(crudbase):
def __init__(self):
super().__init__(model=models.App)
def get_app(self,db: Session,domainurl:str,appid:str):
return db.execute(super().get_by_conditions({"domainurl":domainurl,"appid":appid})).scalars().first()
def get_apps(self,db: Session,domainurl:str):
return db.execute(super().get_by_conditions({"domainurl":domainurl})).scalars().all()
def update_appversion(self,db: Session, appedit: schemas.AppVersion,userid:int):
db_app = db.execute(select(models.App).filter(and_(models.App.domainurl == appedit.domainurl,models.App.appid == appedit.appid))).scalars().first()
if not db_app:
return db_app
db_app.version = db_app.version + 1
appversion = models.AppVersion(
domainurl = appedit.domainurl,
appid=appedit.appid,
appname=db_app.appname,
version = db_app.version,
versionname = appedit.versionname,
comment = appedit.comment,
updateuserid = userid,
createuserid = userid
)
db.add(appversion)
db.add(db_app)
flows = dbflow.get_domain_app_by_appid(db,appedit.domainurl,appedit.appid) #select(models.Flow).filter(and_(models.Flow.domainurl == appedit.domainurl,models.App.appid == appedit.appid))
for flow in flows:
db_flowhistory = models.FlowHistory(
flowid = flow.flowid,
appid = flow.appid,
eventid = flow.eventid,
domainurl = flow.domainurl,
name = flow.name,
content = flow.content,
version = db_app.version,
updateuserid = userid,
createuserid = userid
)
db.add(db_flowhistory)
db.commit()
db.refresh(db_app)
return db_app
def delete_app(self,db: Session, domainurl: str,appid: str ):
db_app =self.get_app(db,domainurl,appid)
if db_app:
db.delete(db_app)
db_flows = dbflow.get_domain_app_by_appid(db,domainurl,appid)
for flow in db_flows:
db.delete(flow)
db.commit()
return db_app
return None
def get_appversions(self,db: Session, domainurl:str,appid:str):
return paginate(db,dbappversion.get_appversions(domainurl,appid))
def create_flow(self,db: Session, domainurl: str, flow: schemas.FlowIn,userid:int):
return dbflow.create_flow(db,domainurl,flow,userid)
appService = dbapp()

View File

@@ -145,4 +145,4 @@ class dbdomain(crudbase):
users = select(models.User).join(models.UserDomain,models.UserDomain.userid == models.User.id).filter(models.UserDomain.domainid ==domainid)
return paginate(db,users)
dbdomain = dbdomain()
domainService = dbdomain()

View File

@@ -89,4 +89,4 @@ class dbuser(crudbase):
permissions += role.permissions
return list(set(permissions))
dbuser = dbuser()
userService = dbuser()

View File

@@ -28,6 +28,7 @@ def test_db():
yield test_session
test_session.close()
transaction.rollback()
#transaction.commit()
connection.close()
@pytest.fixture(scope="session")
@@ -122,8 +123,8 @@ def test_domain(test_db,login_user_id):
domain = models.Domain(
tenantid = "1",
name = "テスト環境",
url = "https://alicorn.cybozu.com",
kintoneuser = "maxz",
url = "https://mfu07rkgnb7c.cybozu.com",
kintoneuser = "MXZ",
kintonepwd = security.chacha20Encrypt("maxz1205"),
is_active = True,
createuserid =login_user_id,
@@ -135,6 +136,9 @@ def test_domain(test_db,login_user_id):
test_db.refresh(domain)
return domain
@pytest.fixture(scope="session")
def test_app_id():
return "132"
# @pytest.fixture

View File

@@ -11,6 +11,7 @@ def test_users_list_for_admin(test_client,login_admin):
response = test_client.get("/api/v1/users", headers={"Authorization": "Bearer " + login_admin})
assert response.status_code == 200
data = response.json()
assert "data" in data
assert len(data["data"]) == 3
def test_user_create(test_client,login_user):

View File

@@ -0,0 +1,64 @@
import json
def test_create_flow(test_client,test_domain,test_app_id,login_user):
test_flow={
"flowid": "73e82bee-76a2-4347-a069-e21bf5e21111",
"appid": test_app_id,
"appname": "test_app",
"eventid": "a",
"name": "保存をクリックしたとき",
"content": ""
}
response = test_client.post("/api/flow", json=test_flow,headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
data = response.json()
assert "data" in data
assert data["data"] is not None
assert data["data"]["domainurl"] == test_domain.url
assert data["data"]["flowid"] == test_flow["flowid"]
assert data["data"]["appid"] == test_flow["appid"]
assert data["data"]["eventid"] == test_flow["eventid"]
assert data["data"]["content"] == test_flow["content"]
def test_apps_update(test_client,test_domain,test_app_id,login_user):
app_version ={
"domainurl": test_domain.url,
"appname": "test_app",
"versionname": "testversion",
"comment": "test",
"appid": test_app_id
}
response = test_client.post("/api/apps", json=app_version,headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
data = response.json()
assert "data" in data
assert data["data"] is not None
assert data["data"]["domainurl"] == test_domain.url
assert data["data"]["appname"] == app_version["appname"]
#assert data["data"]["version"] == app_version["versionname"]
assert data["data"]["appid"] == app_version["appid"]
def test_apps_list(test_client,login_user):
response = test_client.get("/api/apps", headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
data = response.json()
assert "data" in data
assert data["data"] is not None
assert len(data["data"]) == 1
def test_appversions_list(test_client,test_domain,test_app_id,login_user):
response = test_client.get("/api/appversions/" + test_app_id , headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
data = response.json()
assert "data" in data
assert data["data"] is not None
assert len(data["data"]) == 1
def test_delete_app(test_client,test_app_id,login_user):
response = test_client.delete("/api/apps/"+ test_app_id, headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
data = response.json()
assert "data" in data
assert data["data"] is not None