from datetime import datetime from fastapi import HTTPException, status from sqlalchemy.orm import Session from sqlalchemy import select,and_ import typing as t from app.db.cruddb.crudbase import crudbase from fastapi_pagination.ext.sqlalchemy import paginate from app.core.common import ApiReturnPage from app.db import models, schemas from app.core.security import chacha20Decrypt, get_password_hash class dbflowhistory(crudbase): def __init__(self): super().__init__(model=models.FlowHistory) def get_flows_by_appid_version(self,db: Session,domainurl:str,appid:str,version:int): return db.execute(super().get_by_conditions({"domainurl":domainurl,"appid":appid, "version":version})).scalars().all() dbflowhistory = dbflowhistory() class dbflow(crudbase): def __init__(self): super().__init__(model=models.Flow) def get_domain_apps(self): return None def get_flow_by_flowid(self,db: Session,flowid:str): return db.execute(super().get_by_conditions({"flowid":flowid})).scalars().first() def get_flows_by_appid(self,db: Session,domainurl:str,appid:str): return db.execute(select(models.Flow).filter(and_(models.Flow.domainurl == domainurl,models.Flow.appid == appid))).scalars().all() def create_flow(self,db: Session, domainurl: str, flow: schemas.FlowIn,userid:int): db_flow = models.Flow( flowid=flow.flowid, appid=flow.appid, eventid=flow.eventid, domainurl=domainurl, name=flow.name, content=flow.content, createuserid = userid, updateuserid = userid ) db.add(db_flow) db_app = db.execute(select(models.App).where(and_(models.App.domainurl == domainurl,models.App.appid == flow.appid))).scalars().first() if not db_app: db_app = models.App( domainurl = domainurl, appid=flow.appid, appname=flow.appname, version = 0, createuserid= userid, updateuserid = userid ) db.add(db_app) db.commit() db.refresh(db_flow) return db_flow dbflow = dbflow() class dbappversion(crudbase): def __init__(self): super().__init__(model=models.AppVersion) def get_appversions(self,domainurl:str,appid:str): return super().get_by_conditions({"domainurl":domainurl,"appid":appid}) def get_app_by_version(self,db: Session,domainurl:str,appid:str,version:int): return db.execute(super().get_by_conditions({"domainurl":domainurl,"appid":appid,"version":version})).scalars().first() def get_app_latestversion(self,db: Session,domainurl:str,appid:str): appversion = db.execute(super().get_by_conditions({"domainurl":domainurl,"appid":appid},"version","desc")).scalars().first() if appversion: return appversion.version else: return 0 dbappversion = dbappversion() class dbapp(crudbase): def __init__(self): super().__init__(model=models.App) def get_app(self,db: Session,domainurl:str,appid:str): return db.execute(super().get_by_conditions({"domainurl":domainurl,"appid":appid})).scalars().first() def get_apps(self,db: Session,domainurl:str): return db.execute(super().get_by_conditions({"domainurl":domainurl})).scalars().all() def update_appversion(self,db: Session,domainurl, newversion: schemas.VersionUpdate,userid:int): db_app = self.get_app(db,domainurl,newversion.appid) if db_app: db_app.version = dbappversion.get_app_latestversion(db,domainurl,newversion.appid)+1 db_app.updateuserid = userid, db_app.versionname = newversion.versionname, appversion = models.AppVersion( domainurl = db_app.domainurl, appid=db_app.appid, appname=db_app.appname, version = db_app.version, versionname = newversion.versionname, comment = newversion.comment, updateuserid = userid, createuserid = userid ) db.add(appversion) db.add(db_app) flows = dbflow.get_flows_by_appid(db,domainurl,newversion.appid) for flow in flows: db_flowhistory = models.FlowHistory( flowid = flow.flowid, appid = flow.appid, eventid = flow.eventid, domainurl = flow.domainurl, name = flow.name, content = flow.content, version = db_app.version, updateuserid = userid, createuserid = userid ) db.add(db_flowhistory) db.add(db_flowhistory) db.commit() db.refresh(db_app) return db_app return None def change_appversion(self,db: Session, domainurl:str,appid:str,version:int,userid:int): db_app = self.get_app(db, domainurl, appid) if not db_app: return None db_appversion = dbappversion.get_app_by_version(db, domainurl, appid, version) if not db_appversion: return None db_app.version = version db_app.versionname = db_appversion.versionname db_app.updateuserid = userid db.add(db_app) flows = dbflow.get_flows_by_appid(db, domainurl, appid) for flow in flows: db.delete(flow) db.flush() flowhistorys = dbflowhistory.get_flows_by_appid_version(db, domainurl, appid, version) for flow in flowhistorys: db_flow = models.Flow( flowid = flow.flowid, appid = flow.appid, eventid = flow.eventid, domainurl = flow.domainurl, name = flow.name, content = flow.content, updateuserid = userid, createuserid = userid ) db.add(db_flow) db.commit() db.refresh(db_app) return db_app def delete_app(self,db: Session, domainurl: str,appid: str ): db_app =self.get_app(db,domainurl,appid) if db_app: flows = dbflow.get_flows_by_appid(db,domainurl,appid) for flow in flows: db.delete(flow) db.delete(db_app) db.commit() return db_app return None def get_appversions(self,db: Session, domainurl:str,appid:str): return paginate(db,dbappversion.get_appversions(domainurl,appid)) def get_flow(self,db: Session, domainurl: str, appid:str,userid:int): return dbflow.get_flows_by_appid(db,domainurl,appid) def create_flow(self,db: Session, domainurl: str, flow: schemas.FlowIn,userid:int): return dbflow.create_flow(db,domainurl,flow,userid) def edit_flow(self,db: Session, domainurl: str, flow: schemas.FlowIn,userid:int): db_flow = dbflow.get_flow_by_flowid(db, flow.flowid) if not db_flow: return dbflow.create_flow(db,domainurl,flow,userid) db_flow.appid =flow.appid db_flow.eventid=flow.eventid db_flow.domainurl=domainurl db_flow.name=flow.name db_flow.content=flow.content db_flow.updateuserid = userid db.add(db_flow) db.commit() db.refresh(db_flow) return db_flow def delete_flow(self,db: Session, flowid: str): db_flow = dbflow.get_flow_by_flowid(db,flowid) if db_flow: return dbflow.delete(db,db_flow.id) return None appService = dbapp()