from datetime import datetime from fastapi import HTTPException, status from sqlalchemy.orm import Session from sqlalchemy import and_ import typing as t from . import models, schemas from app.core.security import chacha20Decrypt, get_password_hash def get_user(db: Session, user_id: int): user = db.query(models.User).filter(models.User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") return user def get_user_by_email(db: Session, email: str) -> schemas.UserBase: return db.query(models.User).filter(models.User.email == email).first() def get_allusers( db: Session ) -> t.List[schemas.UserOut]: return db.query(models.User).all() def get_users( db: Session, super:bool ) -> t.List[schemas.UserOut]: return db.query(models.User).filter(models.User.is_superuser == False) def create_user(db: Session, user: schemas.UserCreate): hashed_password = get_password_hash(user.password) db_user = models.User( first_name=user.first_name, last_name=user.last_name, email=user.email, is_active=user.is_active, is_superuser=user.is_superuser, hashed_password=hashed_password, ) db.add(db_user) db.commit() db.refresh(db_user) return db_user def delete_user(db: Session, user_id: int): user = get_user(db, user_id) if not user: raise HTTPException(status.HTTP_404_NOT_FOUND, detail="User not found") db.delete(user) db.commit() return user def edit_user( db: Session, user_id: int, user: schemas.UserEdit ) -> schemas.User: db_user = get_user(db, user_id) if not db_user: raise HTTPException(status.HTTP_404_NOT_FOUND, detail="User not found") update_data = user.dict(exclude_unset=True) if "password" in update_data: update_data["hashed_password"] = get_password_hash(user.password) del update_data["password"] for key, value in update_data.items(): setattr(db_user, key, value) db.add(db_user) db.commit() db.refresh(db_user) return db_user def get_roles( db: Session ) -> t.List[schemas.Role]: return db.query(models.Role).all() def assign_userrole( db: Session, user_id: int, roles: t.List[int]): db_user = db.query(models.User).get(user_id) if db_user: for role in db_user.roles: db_user.roles.remove(role) for roleid in roles: role = db.query(models.Role).get(roleid) if role: db_user.roles.append(role) db.commit() db.refresh(db_user) return db_user def get_apps( db: Session, domainurl:str ) -> t.List[schemas.AppList]: return db.query(models.App).filter(models.App.domainurl == domainurl).all() def update_appversion(db: Session, appedit: schemas.AppVersion,userid:int): db_app = db.query(models.App).filter(and_(models.App.domainurl == appedit.domainurl,models.App.appid == appedit.appid)).first() if not db_app: raise HTTPException(status.HTTP_404_NOT_FOUND, detail="User not found") db_app.version = db_app.version + 1 appversion = models.AppVersion( domainurl = appedit.domainurl, appid=appedit.appid, appname=db_app.appname, version = db_app.version, versionname = appedit.versionname, comment = appedit.comment, updateuserid = userid, createuserid = userid ) db.add(appversion) db.add(db_app) flows = db.query(models.Flow).filter(and_(models.Flow.domainurl == appedit.domainurl,models.App.appid == appedit.appid)) for flow in flows: db_flowhistory = models.FlowHistory( flowid = flow.flowid, appid = flow.appid, eventid = flow.eventid, domainurl = flow.domainurl, name = flow.name, content = flow.content, createuser = userid, version = db_app.version, updateuserid = userid, createuserid = userid ) db.add(db_flowhistory) db.commit() db.refresh(db_app) return db_app def delete_apps(db: Session, domainurl: str,appid: str ): db_app = db.query(models.App).filter(and_(models.App.domainurl == domainurl,models.App.appid ==appid)).first() if not db_app: raise HTTPException(status.HTTP_404_NOT_FOUND, detail="App not found") db.delete(db_app) db_flows = db.query(models.Flow).filter(and_(models.Flow.domainurl == domainurl,models.Flow.appid ==appid)) for flow in db_flows: db.delete(flow) db.commit() return db_app def get_appsetting(db: Session, id: int): app = db.query(models.AppSetting).get(id) if not app: raise HTTPException(status_code=404, detail="App not found") return app def create_appsetting(db: Session, app: schemas.AppBase): db_app = models.AppSetting( appid=app.appid, setting=app.setting, ) db.add(db_app) db.commit() db.refresh(db_app) return db_app def delete_appsetting(db: Session, id: int): app = get_appsetting(db, id) if not app: raise HTTPException(status.HTTP_404_NOT_FOUND, detail="App not found") db.delete(app) db.commit() return app def edit_appsetting( db: Session, id: int, app: schemas.AppBase ) -> schemas.App: db_app = get_appsetting(db, id) if not db_app: raise HTTPException(status.HTTP_404_NOT_FOUND, detail="App not found") update_data = app.dict(exclude_unset=True) for key, value in update_data.items(): setattr(db_app, key, value) db.add(db_app) db.commit() db.refresh(db_app) return db_app def get_kintones(db: Session, type: int): kintones = db.query(models.Kintone).filter(models.Kintone.type == type).all() if not kintones: raise HTTPException(status_code=404, detail="Data not found") return kintones def get_actions(db: Session): actions = db.query(models.Action).all() if not actions: raise HTTPException(status_code=404, detail="Data not found") return actions def create_flow(db: Session, domainurl: str, flow: schemas.FlowIn,userid:int): db_flow = models.Flow( flowid=flow.flowid, appid=flow.appid, eventid=flow.eventid, domainurl=domainurl, name=flow.name, content=flow.content, createuserid = userid, updateuserid = userid ) db.add(db_flow) db_app = db.query(models.App).filter(and_(models.App.domainurl == domainurl,models.App.appid == flow.appid)).first() if not db_app: db_app = models.App( domainurl = domainurl, appid=flow.appid, appname=flow.appname, version = 0, createuserid= userid, updateuserid = userid ) db.commit() db.refresh(db_flow) return db_flow def delete_flow(db: Session, flowid: str): flow = get_flow(db, flowid) if not flow: raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow not found") db.delete(flow) db.commit() return flow def edit_flow( db: Session, domainurl: str, flow: schemas.FlowIn,userid:int ) -> schemas.Flow: db_flow = get_flow(db, flow.flowid) if not db_flow: #見つからない時新規作成 return create_flow(db,domainurl,flow,userid) db_flow.appid =flow.appid db_flow.eventid=flow.eventid db_flow.domainurl=domainurl db_flow.name=flow.name db_flow.content=flow.content db_flow.updateuserid = userid db.add(db_flow) db.commit() db.refresh(db_flow) return db_flow def get_flows(db: Session, flowid: str): flows = db.query(models.Flow).all() if not flows: raise HTTPException(status_code=404, detail="Data not found") return flows def get_flow(db: Session, flowid: str): flow = db.query(models.Flow).filter(models.Flow.flowid == flowid).first() # if not flow: # raise HTTPException(status_code=404, detail="Data not found") return flow def get_flows_by_app(db: Session,domainurl: str, appid: str): flows = db.query(models.Flow).filter(and_(models.Flow.domainurl == domainurl,models.Flow.appid == appid)).all() if not flows: raise Exception("Data not found") return flows def create_domain(db: Session, domain: schemas.DomainIn,userid:int): domain.encrypt_kintonepwd() db_domain = models.Domain( tenantid = domain.tenantid, name=domain.name, url=domain.url, is_active=domain.is_active, kintoneuser=domain.kintoneuser, kintonepwd=domain.kintonepwd, createuserid = userid, updateuserid = userid, ownerid = domain.ownerid ) db.add(db_domain) #add_userdomain(db,userid,db_domain.id) db.commit() db.refresh(db_domain) return db_domain def delete_domain(db: Session,id: int): db_domain = db.query(models.Domain).get(id) #if not db_domain: # raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found") if db_domain: db.delete(db_domain) db.commit() return True def edit_domain( db: Session, domain: schemas.DomainIn,userid:int ) -> schemas.Domain: if domain.kintonepwd != "": domain.encrypt_kintonepwd() db_domain = db.query(models.Domain).get(domain.id) if not db_domain: raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found") db_domain.tenantid = domain.tenantid db_domain.name=domain.name db_domain.url=domain.url if db_domain.is_active == True and domain.is_active == False: db_userdomains = db.query(models.UserDomain).filter(and_(models.UserDomain.domainid == db_domain.id,models.UserDomain.active == True)).all() for userdomain in db_userdomains: userdomain.active = False db.add(userdomain) db_domain.is_active=domain.is_active db_domain.kintoneuser=domain.kintoneuser if domain.kintonepwd != "": db_domain.kintonepwd = domain.kintonepwd db_domain.updateuserid = userid db_domain.ownerid = domain.ownerid db.add(db_domain) db.commit() db.refresh(db_domain) return db_domain def add_admindomain(db: Session,userid:int,domainid:int): db_domain = db.query(models.Domain).filter(and_(models.Domain.id == domainid,models.Domain.is_active)).first() if db_domain: user_domain = models.UserDomain(userid = userid, domainid = domainid ) db.add(user_domain) db.commit() return db_domain def add_userdomain(db: Session,ownerid:int, userid:int,domainid:int): db_domain = db.query(models.Domain).filter(and_(models.Domain.id == domainid,models.Domain.ownerid == ownerid,models.Domain.is_active)).first() if db_domain: user_domain = models.UserDomain(userid = userid, domainid = domainid ) db.add(user_domain) db.commit() return db_domain def add_userdomains(db: Session, userid:int,domainids:list[str]): dbCommits = list(map(lambda domainid: models.UserDomain(userid = userid, domainid = domainid ), domainids)) db.bulk_save_objects(dbCommits) db.commit() return dbCommits def delete_userdomain(db: Session, userid: int,domainid: int): db_domain = db.query(models.UserDomain).filter(and_(models.UserDomain.userid == userid,models.UserDomain.domainid == domainid)).first() #if not db_domain: # raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found") if db_domain: db.delete(db_domain) db.commit() return True def active_userdomain(db: Session, userid: int,domainid: int): db_domain = db.query(models.Domain).filter(and_(models.Domain.id == domainid,models.Domain.is_active)).first() if db_domain: db_userdomains = db.query(models.UserDomain).filter(models.UserDomain.userid == userid).all() # if not db_userdomains: # raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found") for domain in db_userdomains: if domain.domainid == domainid: domain.active = True else: domain.active = False db.add(domain) db.commit() return db_domain def get_activedomain(db: Session, userid: int): # user_domains = (db.query(models.Domain,models.UserDomain.active) # .join(models.UserDomain,models.UserDomain.domainid == models.Domain.id ) # .filter(models.UserDomain.userid == userid) # .all()) db_domain=(db.query(models.Domain).filter(models.Domain.is_active) .join(models.UserDomain,models.UserDomain.domainid == models.Domain.id).filter(and_(models.UserDomain.active,models.UserDomain.userid == userid)).first()) # if len(user_domains)==1: # db_domain = user_domains[0][0]; # else: # db_domain = next((domain for domain,active in user_domains if active),None) # raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found") return db_domain def get_domain(db: Session, userid: str): domains = db.query(models.Domain).join(models.UserDomain,models.UserDomain.domainid == models.Domain.id ).filter(models.UserDomain.userid == userid).all() # if not domains: # raise HTTPException(status_code=404, detail="Data not found") # for domain in domains: # decrypted_pwd = chacha20Decrypt(domain.kintonepwd) # domain.kintonepwd = decrypted_pwd return domains def get_alldomains(db: Session): domains = db.query(models.Domain).all() return domains def get_domains(db: Session,userid:int): domains = db.query(models.Domain).filter(models.Domain.ownerid == userid ).all() return domains def get_events(db: Session): events = db.query(models.Event).all() if not events: raise HTTPException(status_code=404, detail="Data not found") return events def get_category(db:Session): categorys=db.query(models.Category).all() return categorys def get_eventactions(db: Session,eventid: str): #eveactions = db.query(models.Action).join(models.EventAction,models.EventAction.actionid == models.Action.id ).join(models.Event,models.Event.id == models.EventAction.eventid).filter(models.Event.eventid == eventid).all() #category = get_category(db) blackactions = ( db.query(models.EventAction.actionid) .filter(models.EventAction.eventid == eventid) .subquery() ) eveactions = ( db.query( models.Action.id, models.Action.name, models.Action.title, models.Action.subtitle, models.Action.outputpoints, models.Action.property, models.Action.categoryid, models.Action.nosort, models.Category.categoryname) .join(models.Category,models.Category.id == models.Action.categoryid) .filter(models.Action.id.notin_(blackactions)) .order_by(models.Category.nosort,models.Action.nosort) .all() ) if not eveactions: raise HTTPException(status_code=404, detail="Data not found") return eveactions def create_log(db: Session, error:schemas.ErrorCreate): db_log = models.ErrorLog(title=error.title,location=error.location,content=error.content) db.add(db_log) db.commit() db.refresh(db_log) return db_log def get_kintoneformat(db: Session): formats = db.query(models.KintoneFormat).order_by(models.KintoneFormat.id).all() return formats