Files
KintoneAppBuilder/backend/app/db/crud.py

367 lines
11 KiB
Python

from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy import and_
import typing as t
from . import models, schemas
from app.core.security import chacha20Decrypt, get_password_hash
def get_user(db: Session, user_id: int):
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_user_by_email(db: Session, email: str) -> schemas.UserBase:
return db.query(models.User).filter(models.User.email == email).first()
def get_users(
db: Session, skip: int = 0, limit: int = 100
) -> t.List[schemas.UserOut]:
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
hashed_password = get_password_hash(user.password)
db_user = models.User(
first_name=user.first_name,
last_name=user.last_name,
email=user.email,
is_active=user.is_active,
is_superuser=user.is_superuser,
hashed_password=hashed_password,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def delete_user(db: Session, user_id: int):
user = get_user(db, user_id)
if not user:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="User not found")
db.delete(user)
db.commit()
return user
def edit_user(
db: Session, user_id: int, user: schemas.UserEdit
) -> schemas.User:
db_user = get_user(db, user_id)
if not db_user:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="User not found")
update_data = user.dict(exclude_unset=True)
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(user.password)
del update_data["password"]
for key, value in update_data.items():
setattr(db_user, key, value)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def get_apps(
db: Session
) -> t.List[schemas.AppList]:
return db.query(models.App).all()
def update_appversion(db: Session, appedit: schemas.AppVersion,userid:int):
app = db.query(models.App).filter(and_(models.App.domainurl == appedit.domainurl,models.App.appid == appedit.appid)).first()
if app:
app.version = app.version + 1
db_app = app
appver = app.version
else:
appver = 1
db_app = models.App(
domainurl = appedit.domainurl,
appid=appedit.appid,
appname=appedit.appname,
version = 1,
updateuser= userid
)
db.add(db_app)
flows = db.query(models.Flow).filter(and_(models.Flow.domainurl == appedit.domainurl,models.App.appid == appedit.appid))
for flow in flows:
db_flowhistory = models.FlowHistory(
flowid = flow.flowid,
appid = flow.appid,
eventid = flow.eventid,
domainurl = flow.domainurl,
name = flow.name,
content = flow.content,
createuser = userid,
version = appver
)
db.add(db_flowhistory)
db.commit()
db.refresh(db_app)
return db_app
def get_appsetting(db: Session, id: int):
app = db.query(models.AppSetting).get(id)
if not app:
raise HTTPException(status_code=404, detail="App not found")
return app
def create_appsetting(db: Session, app: schemas.AppBase):
db_app = models.AppSetting(
appid=app.appid,
setting=app.setting,
)
db.add(db_app)
db.commit()
db.refresh(db_app)
return db_app
def delete_appsetting(db: Session, id: int):
app = get_appsetting(db, id)
if not app:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="App not found")
db.delete(app)
db.commit()
return app
def edit_appsetting(
db: Session, id: int, app: schemas.AppBase
) -> schemas.App:
db_app = get_appsetting(db, id)
if not db_app:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="App not found")
update_data = app.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_app, key, value)
db.add(db_app)
db.commit()
db.refresh(db_app)
return db_app
def get_kintones(db: Session, type: int):
kintones = db.query(models.Kintone).filter(models.Kintone.type == type).all()
if not kintones:
raise HTTPException(status_code=404, detail="Data not found")
return kintones
def get_actions(db: Session):
actions = db.query(models.Action).all()
if not actions:
raise HTTPException(status_code=404, detail="Data not found")
return actions
def create_flow(db: Session, domainurl: str, flow: schemas.FlowBase):
db_flow = models.Flow(
flowid=flow.flowid,
appid=flow.appid,
eventid=flow.eventid,
domainurl=domainurl,
name=flow.name,
content=flow.content
)
db.add(db_flow)
db.commit()
db.refresh(db_flow)
return db_flow
def delete_flow(db: Session, flowid: str):
flow = get_flow(db, flowid)
if not flow:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow not found")
db.delete(flow)
db.commit()
return flow
def edit_flow(
db: Session, flow: schemas.FlowBase
) -> schemas.Flow:
db_flow = get_flow(db, flow.flowid)
if not db_flow:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow not found")
update_data = flow.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(db_flow, key, value)
db.add(db_flow)
db.commit()
db.refresh(db_flow)
return db_flow
def get_flows(db: Session, flowid: str):
flows = db.query(models.Flow).all()
if not flows:
raise HTTPException(status_code=404, detail="Data not found")
return flows
def get_flow(db: Session, flowid: str):
flow = db.query(models.Flow).filter(models.Flow.flowid == flowid).first()
if not flow:
raise HTTPException(status_code=404, detail="Data not found")
return flow
def get_flows_by_app(db: Session,domainurl: str, appid: str):
flows = db.query(models.Flow).filter(and_(models.Flow.domainurl == domainurl,models.Flow.appid == appid)).all()
if not flows:
raise Exception("Data not found")
return flows
def create_domain(db: Session, domain: schemas.DomainBase):
domain.encrypt_kintonepwd()
db_domain = models.Domain(
tenantid = domain.tenantid,
name=domain.name,
url=domain.url,
kintoneuser=domain.kintoneuser,
kintonepwd=domain.kintonepwd
)
db.add(db_domain)
db.commit()
db.refresh(db_domain)
return db_domain
def delete_domain(db: Session,id: int):
db_domain = db.query(models.Domain).get(id)
if not db_domain:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
db.delete(db_domain)
db.commit()
return db_domain
def edit_domain(
db: Session, domain: schemas.DomainBase
) -> schemas.Domain:
domain.encrypt_kintonepwd()
db_domain = db.query(models.Domain).get(domain.id)
if not db_domain:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
update_data = domain.dict(exclude_unset=True)
for key, value in update_data.items():
if key != "id" and not (key == "kintonepwd" and (value is None or value == "")):
setattr(db_domain, key, value)
print(str(db_domain))
db.add(db_domain)
db.commit()
db.refresh(db_domain)
return db_domain
def add_userdomain(db: Session, userid:int,domainids:list[str]):
dbCommits = list(map(lambda domainid: models.UserDomain(userid = userid, domainid = domainid ), domainids))
db.bulk_save_objects(dbCommits)
db.commit()
return dbCommits
def delete_userdomain(db: Session, userid: int,domainid: int):
db_domain = db.query(models.UserDomain).filter(and_(models.UserDomain.userid == userid,models.UserDomain.domainid == domainid)).first()
if not db_domain:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
db.delete(db_domain)
db.commit()
return db_domain
def active_userdomain(db: Session, userid: int,domainid: int):
db_userdomains = db.query(models.UserDomain).filter(models.UserDomain.userid == userid).all()
if not db_userdomains:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
for domain in db_userdomains:
if domain.domainid == domainid:
domain.active = True
else:
domain.active = False
db.add(domain)
db.commit()
return db_userdomains
def get_activedomain(db: Session, userid: int):
db_domain = db.query(models.Domain).join(models.UserDomain,models.UserDomain.domainid == models.Domain.id ).filter(and_(models.UserDomain.userid == userid,models.UserDomain.active == True)).first()
# if not db_domain:
# raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
return db_domain
def get_domain(db: Session, userid: str):
domains = db.query(models.Domain).join(models.UserDomain,models.UserDomain.domainid == models.Domain.id ).filter(models.UserDomain.userid == userid).all()
if not domains:
raise HTTPException(status_code=404, detail="Data not found")
# for domain in domains:
# decrypted_pwd = chacha20Decrypt(domain.kintonepwd)
# domain.kintonepwd = decrypted_pwd
return domains
def get_domains(db: Session,tenantid:str):
domains = db.query(models.Domain).filter(models.Domain.tenantid == tenantid ).all()
if not domains:
raise HTTPException(status_code=404, detail="Data not found")
# for domain in domains:
# decrypted_pwd = chacha20Decrypt(domain.kintonepwd)
# domain.kintonepwd = decrypted_pwd
return domains
def get_events(db: Session):
events = db.query(models.Event).all()
if not events:
raise HTTPException(status_code=404, detail="Data not found")
return events
def get_category(db:Session):
categorys=db.query(models.Category).all()
return categorys
def get_eventactions(db: Session,eventid: str):
#eveactions = db.query(models.Action).join(models.EventAction,models.EventAction.actionid == models.Action.id ).join(models.Event,models.Event.id == models.EventAction.eventid).filter(models.Event.eventid == eventid).all()
#category = get_category(db)
blackactions = (
db.query(models.EventAction.actionid)
.filter(models.EventAction.eventid == eventid)
.subquery()
)
eveactions = (
db.query(
models.Action.id,
models.Action.name,
models.Action.title,
models.Action.subtitle,
models.Action.outputpoints,
models.Action.property,
models.Action.categoryid,
models.Action.nosort,
models.Category.categoryname)
.join(models.Category,models.Category.id == models.Action.categoryid)
.filter(models.Action.id.notin_(blackactions))
.order_by(models.Category.nosort,models.Action.nosort)
.all()
)
if not eveactions:
raise HTTPException(status_code=404, detail="Data not found")
return eveactions
def create_log(db: Session, error:schemas.ErrorCreate):
db_log = models.ErrorLog(title=error.title,location=error.location,content=error.content)
db.add(db_log)
db.commit()
db.refresh(db_log)
return db_log
def get_kintoneformat(db: Session):
formats = db.query(models.KintoneFormat).order_by(models.KintoneFormat.id).all()
return formats