455 lines
15 KiB
Python
455 lines
15 KiB
Python
from datetime import datetime
|
|
from fastapi import HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import and_
|
|
import typing as t
|
|
|
|
from . import models, schemas
|
|
from app.core.security import chacha20Decrypt, get_password_hash
|
|
|
|
|
|
def get_user(db: Session, user_id: int):
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return user
|
|
|
|
|
|
def get_user_by_email(db: Session, email: str) -> schemas.UserBase:
|
|
return db.query(models.User).filter(models.User.email == email).first()
|
|
|
|
|
|
def get_allusers(
|
|
db: Session
|
|
) -> t.List[schemas.UserOut]:
|
|
return db.query(models.User).all()
|
|
|
|
def get_users(
|
|
db: Session, super:bool
|
|
) -> t.List[schemas.UserOut]:
|
|
return db.query(models.User).filter(models.User.is_superuser == False)
|
|
|
|
|
|
def create_user(db: Session, user: schemas.UserCreate):
|
|
hashed_password = get_password_hash(user.password)
|
|
db_user = models.User(
|
|
first_name=user.first_name,
|
|
last_name=user.last_name,
|
|
email=user.email,
|
|
is_active=user.is_active,
|
|
is_superuser=user.is_superuser,
|
|
hashed_password=hashed_password,
|
|
)
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
def delete_user(db: Session, user_id: int):
|
|
user = get_user(db, user_id)
|
|
if not user:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
db.delete(user)
|
|
db.commit()
|
|
return user
|
|
|
|
|
|
def edit_user(
|
|
db: Session, user_id: int, user: schemas.UserEdit
|
|
) -> schemas.User:
|
|
db_user = get_user(db, user_id)
|
|
if not db_user:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
update_data = user.dict(exclude_unset=True)
|
|
|
|
if "password" in update_data:
|
|
update_data["hashed_password"] = get_password_hash(user.password)
|
|
del update_data["password"]
|
|
|
|
for key, value in update_data.items():
|
|
setattr(db_user, key, value)
|
|
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
def get_roles(
|
|
db: Session
|
|
) -> t.List[schemas.Role]:
|
|
return db.query(models.Role).all()
|
|
|
|
def assign_userrole( db: Session, user_id: int, roles: t.List[int]):
|
|
db_user = db.query(models.User).get(user_id)
|
|
if db_user:
|
|
for role in db_user.roles:
|
|
db_user.roles.remove(role)
|
|
for roleid in roles:
|
|
role = db.query(models.Role).get(roleid)
|
|
if role:
|
|
db_user.roles.append(role)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
def get_apps(
|
|
db: Session,
|
|
domainurl:str
|
|
) -> t.List[schemas.AppList]:
|
|
return db.query(models.App).filter(models.App.domainurl == domainurl).all()
|
|
|
|
def update_appversion(db: Session, appedit: schemas.AppVersion,userid:int):
|
|
db_app = db.query(models.App).filter(and_(models.App.domainurl == appedit.domainurl,models.App.appid == appedit.appid)).first()
|
|
if not db_app:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
|
|
db_app.version = db_app.version + 1
|
|
appversion = models.AppVersion(
|
|
domainurl = appedit.domainurl,
|
|
appid=appedit.appid,
|
|
appname=db_app.appname,
|
|
version = db_app.version,
|
|
versionname = appedit.versionname,
|
|
comment = appedit.comment,
|
|
updateuserid = userid,
|
|
createuserid = userid
|
|
)
|
|
db.add(appversion)
|
|
db.add(db_app)
|
|
|
|
flows = db.query(models.Flow).filter(and_(models.Flow.domainurl == appedit.domainurl,models.App.appid == appedit.appid))
|
|
for flow in flows:
|
|
db_flowhistory = models.FlowHistory(
|
|
flowid = flow.flowid,
|
|
appid = flow.appid,
|
|
eventid = flow.eventid,
|
|
domainurl = flow.domainurl,
|
|
name = flow.name,
|
|
content = flow.content,
|
|
createuser = userid,
|
|
version = db_app.version,
|
|
updateuserid = userid,
|
|
createuserid = userid
|
|
)
|
|
db.add(db_flowhistory)
|
|
|
|
db.commit()
|
|
db.refresh(db_app)
|
|
return db_app
|
|
|
|
def delete_apps(db: Session, domainurl: str,appid: str ):
|
|
db_app = db.query(models.App).filter(and_(models.App.domainurl == domainurl,models.App.appid ==appid)).first()
|
|
if not db_app:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="App not found")
|
|
db.delete(db_app)
|
|
db_flows = db.query(models.Flow).filter(and_(models.Flow.domainurl == domainurl,models.Flow.appid ==appid))
|
|
for flow in db_flows:
|
|
db.delete(flow)
|
|
db.commit()
|
|
return db_app
|
|
|
|
def get_appsetting(db: Session, id: int):
|
|
app = db.query(models.AppSetting).get(id)
|
|
if not app:
|
|
raise HTTPException(status_code=404, detail="App not found")
|
|
return app
|
|
|
|
def create_appsetting(db: Session, app: schemas.AppBase):
|
|
db_app = models.AppSetting(
|
|
appid=app.appid,
|
|
setting=app.setting,
|
|
)
|
|
db.add(db_app)
|
|
db.commit()
|
|
db.refresh(db_app)
|
|
return db_app
|
|
|
|
def delete_appsetting(db: Session, id: int):
|
|
app = get_appsetting(db, id)
|
|
if not app:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="App not found")
|
|
db.delete(app)
|
|
db.commit()
|
|
return app
|
|
|
|
|
|
def edit_appsetting(
|
|
db: Session, id: int, app: schemas.AppBase
|
|
) -> schemas.App:
|
|
db_app = get_appsetting(db, id)
|
|
if not db_app:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="App not found")
|
|
update_data = app.dict(exclude_unset=True)
|
|
|
|
for key, value in update_data.items():
|
|
setattr(db_app, key, value)
|
|
|
|
db.add(db_app)
|
|
db.commit()
|
|
db.refresh(db_app)
|
|
return db_app
|
|
|
|
|
|
def get_kintones(db: Session, type: int):
|
|
kintones = db.query(models.Kintone).filter(models.Kintone.type == type).all()
|
|
if not kintones:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return kintones
|
|
|
|
def get_actions(db: Session):
|
|
actions = db.query(models.Action).all()
|
|
if not actions:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return actions
|
|
|
|
|
|
def create_flow(db: Session, domainurl: str, flow: schemas.FlowIn,userid:int):
|
|
db_flow = models.Flow(
|
|
flowid=flow.flowid,
|
|
appid=flow.appid,
|
|
eventid=flow.eventid,
|
|
domainurl=domainurl,
|
|
name=flow.name,
|
|
content=flow.content,
|
|
createuserid = userid,
|
|
updateuserid = userid
|
|
)
|
|
db.add(db_flow)
|
|
db_app = db.query(models.App).filter(and_(models.App.domainurl == domainurl,models.App.appid == flow.appid)).first()
|
|
if not db_app:
|
|
db_app = models.App(
|
|
domainurl = domainurl,
|
|
appid=flow.appid,
|
|
appname=flow.appname,
|
|
version = 0,
|
|
createuserid= userid,
|
|
updateuserid = userid
|
|
)
|
|
db.commit()
|
|
db.refresh(db_flow)
|
|
return db_flow
|
|
|
|
def delete_flow(db: Session, flowid: str):
|
|
flow = get_flow(db, flowid)
|
|
if not flow:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow not found")
|
|
db.delete(flow)
|
|
db.commit()
|
|
return flow
|
|
|
|
|
|
def edit_flow(
|
|
db: Session, domainurl: str, flow: schemas.FlowIn,userid:int
|
|
) -> schemas.Flow:
|
|
db_flow = get_flow(db, flow.flowid)
|
|
if not db_flow:
|
|
#見つからない時新規作成
|
|
return create_flow(db,domainurl,flow,userid)
|
|
|
|
db_flow.appid =flow.appid
|
|
db_flow.eventid=flow.eventid
|
|
db_flow.domainurl=domainurl
|
|
db_flow.name=flow.name
|
|
db_flow.content=flow.content
|
|
db_flow.updateuserid = userid
|
|
|
|
db.add(db_flow)
|
|
db.commit()
|
|
db.refresh(db_flow)
|
|
return db_flow
|
|
|
|
|
|
def get_flows(db: Session, flowid: str):
|
|
flows = db.query(models.Flow).all()
|
|
if not flows:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return flows
|
|
|
|
def get_flow(db: Session, flowid: str):
|
|
flow = db.query(models.Flow).filter(models.Flow.flowid == flowid).first()
|
|
# if not flow:
|
|
# raise HTTPException(status_code=404, detail="Data not found")
|
|
return flow
|
|
|
|
def get_flows_by_app(db: Session,domainurl: str, appid: str):
|
|
flows = db.query(models.Flow).filter(and_(models.Flow.domainurl == domainurl,models.Flow.appid == appid)).all()
|
|
if not flows:
|
|
raise Exception("Data not found")
|
|
return flows
|
|
|
|
def create_domain(db: Session, domain: schemas.DomainIn,userid:int):
|
|
domain.encrypt_kintonepwd()
|
|
db_domain = models.Domain(
|
|
tenantid = domain.tenantid,
|
|
name=domain.name,
|
|
url=domain.url,
|
|
is_active=domain.is_active,
|
|
kintoneuser=domain.kintoneuser,
|
|
kintonepwd=domain.kintonepwd,
|
|
createuserid = userid,
|
|
updateuserid = userid,
|
|
ownerid = domain.ownerid
|
|
)
|
|
db.add(db_domain)
|
|
#add_userdomain(db,userid,db_domain.id)
|
|
db.commit()
|
|
db.refresh(db_domain)
|
|
return db_domain
|
|
|
|
def delete_domain(db: Session,id: int):
|
|
db_domain = db.query(models.Domain).get(id)
|
|
#if not db_domain:
|
|
# raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
|
if db_domain:
|
|
db.delete(db_domain)
|
|
db.commit()
|
|
return True
|
|
|
|
|
|
def edit_domain(
|
|
db: Session, domain: schemas.DomainIn,userid:int
|
|
) -> schemas.Domain:
|
|
if domain.kintonepwd != "":
|
|
domain.encrypt_kintonepwd()
|
|
db_domain = db.query(models.Domain).get(domain.id)
|
|
if not db_domain:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
|
db_domain.tenantid = domain.tenantid
|
|
db_domain.name=domain.name
|
|
db_domain.url=domain.url
|
|
if db_domain.is_active == True and domain.is_active == False:
|
|
db_userdomains = db.query(models.UserDomain).filter(and_(models.UserDomain.domainid == db_domain.id,models.UserDomain.active == True)).all()
|
|
for userdomain in db_userdomains:
|
|
userdomain.active = False
|
|
db.add(userdomain)
|
|
db_domain.is_active=domain.is_active
|
|
db_domain.kintoneuser=domain.kintoneuser
|
|
if domain.kintonepwd != "":
|
|
db_domain.kintonepwd = domain.kintonepwd
|
|
db_domain.updateuserid = userid
|
|
db_domain.ownerid = domain.ownerid
|
|
db.add(db_domain)
|
|
db.commit()
|
|
db.refresh(db_domain)
|
|
return db_domain
|
|
|
|
|
|
def add_userdomain(db: Session, userid:int,domainid:int):
|
|
db_domain = db.query(models.Domain).get(domainid)
|
|
if db_domain:
|
|
user_domain = models.UserDomain(userid = userid, domainid = domainid )
|
|
db.add(user_domain)
|
|
db.commit()
|
|
return db_domain
|
|
|
|
def add_userdomains(db: Session, userid:int,domainids:list[str]):
|
|
dbCommits = list(map(lambda domainid: models.UserDomain(userid = userid, domainid = domainid ), domainids))
|
|
db.bulk_save_objects(dbCommits)
|
|
db.commit()
|
|
return dbCommits
|
|
|
|
def delete_userdomain(db: Session, userid: int,domainid: int):
|
|
db_domain = db.query(models.UserDomain).filter(and_(models.UserDomain.userid == userid,models.UserDomain.domainid == domainid)).first()
|
|
#if not db_domain:
|
|
# raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
|
if db_domain:
|
|
db.delete(db_domain)
|
|
db.commit()
|
|
return True
|
|
|
|
def active_userdomain(db: Session, userid: int,domainid: int):
|
|
db_userdomains = db.query(models.UserDomain).filter(models.UserDomain.userid == userid).all()
|
|
if not db_userdomains:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
|
for domain in db_userdomains:
|
|
if domain.domainid == domainid:
|
|
domain.active = True
|
|
else:
|
|
domain.active = False
|
|
db.add(domain)
|
|
db.commit()
|
|
return db_userdomains
|
|
|
|
def get_activedomain(db: Session, userid: int):
|
|
# user_domains = (db.query(models.Domain,models.UserDomain.active)
|
|
# .join(models.UserDomain,models.UserDomain.domainid == models.Domain.id )
|
|
# .filter(models.UserDomain.userid == userid)
|
|
# .all())
|
|
db_domain=(db.query(models.Domain).filter(models.Domain.is_active)
|
|
.join(models.UserDomain,models.UserDomain.domainid == models.Domain.id).filter(and_(models.UserDomain.active,models.UserDomain.userid == userid)).first())
|
|
# if len(user_domains)==1:
|
|
# db_domain = user_domains[0][0];
|
|
# else:
|
|
# db_domain = next((domain for domain,active in user_domains if active),None)
|
|
# raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
|
return db_domain
|
|
|
|
def get_domain(db: Session, userid: str):
|
|
domains = db.query(models.Domain).join(models.UserDomain,models.UserDomain.domainid == models.Domain.id ).filter(models.UserDomain.userid == userid).all()
|
|
# if not domains:
|
|
# raise HTTPException(status_code=404, detail="Data not found")
|
|
# for domain in domains:
|
|
# decrypted_pwd = chacha20Decrypt(domain.kintonepwd)
|
|
# domain.kintonepwd = decrypted_pwd
|
|
return domains
|
|
|
|
def get_alldomains(db: Session):
|
|
domains = db.query(models.Domain).all()
|
|
return domains
|
|
|
|
def get_domains(db: Session,userid:int):
|
|
domains = db.query(models.Domain).filter(models.Domain.ownerid == userid ).all()
|
|
return domains
|
|
|
|
def get_events(db: Session):
|
|
events = db.query(models.Event).all()
|
|
if not events:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return events
|
|
|
|
def get_category(db:Session):
|
|
categorys=db.query(models.Category).all()
|
|
return categorys
|
|
|
|
def get_eventactions(db: Session,eventid: str):
|
|
#eveactions = db.query(models.Action).join(models.EventAction,models.EventAction.actionid == models.Action.id ).join(models.Event,models.Event.id == models.EventAction.eventid).filter(models.Event.eventid == eventid).all()
|
|
#category = get_category(db)
|
|
blackactions = (
|
|
db.query(models.EventAction.actionid)
|
|
.filter(models.EventAction.eventid == eventid)
|
|
.subquery()
|
|
)
|
|
eveactions = (
|
|
db.query(
|
|
models.Action.id,
|
|
models.Action.name,
|
|
models.Action.title,
|
|
models.Action.subtitle,
|
|
models.Action.outputpoints,
|
|
models.Action.property,
|
|
models.Action.categoryid,
|
|
models.Action.nosort,
|
|
models.Category.categoryname)
|
|
.join(models.Category,models.Category.id == models.Action.categoryid)
|
|
.filter(models.Action.id.notin_(blackactions))
|
|
.order_by(models.Category.nosort,models.Action.nosort)
|
|
.all()
|
|
)
|
|
|
|
if not eveactions:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return eveactions
|
|
|
|
|
|
def create_log(db: Session, error:schemas.ErrorCreate):
|
|
db_log = models.ErrorLog(title=error.title,location=error.location,content=error.content)
|
|
db.add(db_log)
|
|
db.commit()
|
|
db.refresh(db_log)
|
|
return db_log
|
|
|
|
def get_kintoneformat(db: Session):
|
|
formats = db.query(models.KintoneFormat).order_by(models.KintoneFormat.id).all()
|
|
return formats |