209 lines
7.5 KiB
Python
209 lines
7.5 KiB
Python
from datetime import datetime
|
|
from fastapi import HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import select,and_
|
|
import typing as t
|
|
|
|
from app.db.cruddb.crudbase import crudbase
|
|
from fastapi_pagination.ext.sqlalchemy import paginate
|
|
from app.core.common import ApiReturnPage
|
|
|
|
from app.db import models, schemas
|
|
from app.core.security import chacha20Decrypt, get_password_hash
|
|
|
|
|
|
class dbflowhistory(crudbase):
|
|
def __init__(self):
|
|
super().__init__(model=models.FlowHistory)
|
|
|
|
def get_flows_by_appid_version(self,db: Session,domainurl:str,appid:str,version:int):
|
|
return db.execute(super().get_by_conditions({"domainurl":domainurl,"appid":appid, "version":version})).scalars().all()
|
|
|
|
dbflowhistory = dbflowhistory()
|
|
|
|
class dbflow(crudbase):
|
|
def __init__(self):
|
|
super().__init__(model=models.Flow)
|
|
|
|
def get_domain_apps(self):
|
|
return None
|
|
|
|
def get_flow_by_flowid(self,db: Session,flowid:str):
|
|
return db.execute(super().get_by_conditions({"flowid":flowid})).scalars().first()
|
|
|
|
def get_flows_by_appid(self,db: Session,domainurl:str,appid:str):
|
|
return db.execute(select(models.Flow).filter(and_(models.Flow.domainurl == domainurl,models.Flow.appid == appid))).scalars().all()
|
|
|
|
|
|
def create_flow(self,db: Session, domainurl: str, flow: schemas.FlowIn,userid:int):
|
|
db_flow = models.Flow(
|
|
flowid=flow.flowid,
|
|
appid=flow.appid,
|
|
eventid=flow.eventid,
|
|
domainurl=domainurl,
|
|
name=flow.name,
|
|
content=flow.content,
|
|
createuserid = userid,
|
|
updateuserid = userid
|
|
)
|
|
db.add(db_flow)
|
|
db_app = db.execute(select(models.App).where(and_(models.App.domainurl == domainurl,models.App.appid == flow.appid))).scalars().first()
|
|
if not db_app:
|
|
db_app = models.App(
|
|
domainurl = domainurl,
|
|
appid=flow.appid,
|
|
appname=flow.appname,
|
|
version = 0,
|
|
createuserid= userid,
|
|
updateuserid = userid
|
|
)
|
|
db.add(db_app)
|
|
db.commit()
|
|
db.refresh(db_flow)
|
|
return db_flow
|
|
|
|
dbflow = dbflow()
|
|
|
|
|
|
class dbappversion(crudbase):
|
|
def __init__(self):
|
|
super().__init__(model=models.AppVersion)
|
|
|
|
def get_appversions(self,domainurl:str,appid:str):
|
|
return super().get_by_conditions({"domainurl":domainurl,"appid":appid})
|
|
|
|
def get_app_by_version(self,db: Session,domainurl:str,appid:str,version:int):
|
|
return db.execute(super().get_by_conditions({"domainurl":domainurl,"appid":appid,"version":version})).scalars().first()
|
|
|
|
def get_app_latestversion(self,db: Session,domainurl:str,appid:str):
|
|
appversion = db.execute(super().get_by_conditions({"domainurl":domainurl,"appid":appid},"version","desc")).scalars().first()
|
|
if appversion:
|
|
return appversion.version
|
|
else:
|
|
return 0
|
|
|
|
dbappversion = dbappversion()
|
|
|
|
class dbapp(crudbase):
|
|
def __init__(self):
|
|
super().__init__(model=models.App)
|
|
|
|
def get_app(self,db: Session,domainurl:str,appid:str):
|
|
return db.execute(super().get_by_conditions({"domainurl":domainurl,"appid":appid})).scalars().first()
|
|
|
|
def get_apps(self,db: Session,domainurl:str):
|
|
return db.execute(super().get_by_conditions({"domainurl":domainurl})).scalars().all()
|
|
|
|
def update_appversion(self,db: Session,domainurl, newversion: schemas.VersionUpdate,userid:int):
|
|
db_app = self.get_app(db,domainurl,newversion.appid)
|
|
if db_app:
|
|
db_app.version = dbappversion.get_app_latestversion(db,domainurl,newversion.appid)+1
|
|
db_app.updateuserid = userid,
|
|
appversion = models.AppVersion(
|
|
domainurl = db_app.domainurl,
|
|
appid=db_app.appid,
|
|
appname=db_app.appname,
|
|
version = db_app.version,
|
|
versionname = newversion.versionname,
|
|
comment = newversion.comment,
|
|
updateuserid = userid,
|
|
createuserid = userid
|
|
)
|
|
db.add(appversion)
|
|
db.add(db_app)
|
|
|
|
flows = dbflow.get_flows_by_appid(db,domainurl,newversion.appid)
|
|
for flow in flows:
|
|
db_flowhistory = models.FlowHistory(
|
|
flowid = flow.flowid,
|
|
appid = flow.appid,
|
|
eventid = flow.eventid,
|
|
domainurl = flow.domainurl,
|
|
name = flow.name,
|
|
content = flow.content,
|
|
version = db_app.version,
|
|
updateuserid = userid,
|
|
createuserid = userid
|
|
)
|
|
db.add(db_flowhistory)
|
|
db.add(db_flowhistory)
|
|
db.commit()
|
|
db.refresh(db_app)
|
|
return db_app
|
|
return None
|
|
|
|
def change_appversion(self,db: Session, domainurl:str,appid:str,version:int,userid:int):
|
|
db_app = self.get_app(db, domainurl, appid)
|
|
if not db_app:
|
|
return None
|
|
db_appversion = dbappversion.get_app_by_version(db, domainurl, appid, version)
|
|
if not db_appversion:
|
|
return None
|
|
db_app.version = version
|
|
db_app.versionname = db_appversion.versionname
|
|
db_app.updateuserid = userid
|
|
db.add(db_app)
|
|
|
|
flows = dbflow.get_flows_by_appid(db, domainurl, appid)
|
|
for flow in flows:
|
|
db.delete(flow)
|
|
db.flush()
|
|
flowhistorys = dbflowhistory.get_flows_by_appid_version(db, domainurl, appid, version)
|
|
for flow in flowhistorys:
|
|
db_flow = models.Flow(
|
|
flowid = flow.flowid,
|
|
appid = flow.appid,
|
|
eventid = flow.eventid,
|
|
domainurl = flow.domainurl,
|
|
name = flow.name,
|
|
content = flow.content,
|
|
updateuserid = userid,
|
|
createuserid = userid
|
|
)
|
|
db.add(db_flow)
|
|
db.commit()
|
|
db.refresh(db_app)
|
|
return db_app
|
|
|
|
def delete_app(self,db: Session, domainurl: str,appid: str ):
|
|
db_app =self.get_app(db,domainurl,appid)
|
|
if db_app:
|
|
flows = dbflow.get_flows_by_appid(db,domainurl,appid)
|
|
for flow in flows:
|
|
db.delete(flow)
|
|
db.delete(db_app)
|
|
db.commit()
|
|
return db_app
|
|
return None
|
|
|
|
def get_appversions(self,db: Session, domainurl:str,appid:str):
|
|
return paginate(db,dbappversion.get_appversions(domainurl,appid))
|
|
|
|
def get_flow(self,db: Session, domainurl: str, appid:str,userid:int):
|
|
return dbflow.get_flows_by_appid(db,domainurl,appid)
|
|
|
|
def create_flow(self,db: Session, domainurl: str, flow: schemas.FlowIn,userid:int):
|
|
return dbflow.create_flow(db,domainurl,flow,userid)
|
|
|
|
def edit_flow(self,db: Session, domainurl: str, flow: schemas.FlowIn,userid:int):
|
|
db_flow = dbflow.get_flow_by_flowid(db, flow.flowid)
|
|
if not db_flow:
|
|
return dbflow.create_flow(db,domainurl,flow,userid)
|
|
db_flow.appid =flow.appid
|
|
db_flow.eventid=flow.eventid
|
|
db_flow.domainurl=domainurl
|
|
db_flow.name=flow.name
|
|
db_flow.content=flow.content
|
|
db_flow.updateuserid = userid
|
|
db.add(db_flow)
|
|
db.commit()
|
|
db.refresh(db_flow)
|
|
return db_flow
|
|
|
|
def delete_flow(self,db: Session, flowid: str):
|
|
db_flow = dbflow.get_flow_by_flowid(db,flowid)
|
|
if db_flow:
|
|
return dbflow.delete(db,db_flow.id)
|
|
return None
|
|
|
|
appService = dbapp() |