182 lines
4.9 KiB
Python
182 lines
4.9 KiB
Python
from fastapi import HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
import typing as t
|
|
|
|
from . import models, schemas
|
|
from app.core.security import get_password_hash
|
|
|
|
|
|
def get_user(db: Session, user_id: int):
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return user
|
|
|
|
|
|
def get_user_by_email(db: Session, email: str) -> schemas.UserBase:
|
|
return db.query(models.User).filter(models.User.email == email).first()
|
|
|
|
|
|
def get_users(
|
|
db: Session, skip: int = 0, limit: int = 100
|
|
) -> t.List[schemas.UserOut]:
|
|
return db.query(models.User).offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_user(db: Session, user: schemas.UserCreate):
|
|
hashed_password = get_password_hash(user.password)
|
|
db_user = models.User(
|
|
first_name=user.first_name,
|
|
last_name=user.last_name,
|
|
email=user.email,
|
|
is_active=user.is_active,
|
|
is_superuser=user.is_superuser,
|
|
hashed_password=hashed_password,
|
|
)
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
def delete_user(db: Session, user_id: int):
|
|
user = get_user(db, user_id)
|
|
if not user:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
db.delete(user)
|
|
db.commit()
|
|
return user
|
|
|
|
|
|
def edit_user(
|
|
db: Session, user_id: int, user: schemas.UserEdit
|
|
) -> schemas.User:
|
|
db_user = get_user(db, user_id)
|
|
if not db_user:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
update_data = user.dict(exclude_unset=True)
|
|
|
|
if "password" in update_data:
|
|
update_data["hashed_password"] = get_password_hash(user.password)
|
|
del update_data["password"]
|
|
|
|
for key, value in update_data.items():
|
|
setattr(db_user, key, value)
|
|
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
def get_appsetting(db: Session, id: int):
|
|
app = db.query(models.AppSetting).get(id)
|
|
if not app:
|
|
raise HTTPException(status_code=404, detail="App not found")
|
|
return app
|
|
|
|
def create_appsetting(db: Session, app: schemas.AppBase):
|
|
db_app = models.AppSetting(
|
|
appid=app.appid,
|
|
setting=app.setting,
|
|
)
|
|
db.add(db_app)
|
|
db.commit()
|
|
db.refresh(db_app)
|
|
return db_app
|
|
|
|
def delete_appsetting(db: Session, id: int):
|
|
app = get_appsetting(db, id)
|
|
if not app:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="App not found")
|
|
db.delete(app)
|
|
db.commit()
|
|
return app
|
|
|
|
|
|
def edit_appsetting(
|
|
db: Session, id: int, app: schemas.AppBase
|
|
) -> schemas.App:
|
|
db_app = get_appsetting(db, id)
|
|
if not db_app:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="App not found")
|
|
update_data = app.dict(exclude_unset=True)
|
|
|
|
for key, value in update_data.items():
|
|
setattr(db_app, key, value)
|
|
|
|
db.add(db_app)
|
|
db.commit()
|
|
db.refresh(db_app)
|
|
return db_app
|
|
|
|
|
|
def get_kintones(db: Session, type: int):
|
|
kintones = db.query(models.Kintone).filter(models.Kintone.type == type).all()
|
|
if not kintones:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return kintones
|
|
|
|
def get_actions(db: Session):
|
|
actions = db.query(models.Action).all()
|
|
if not actions:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return actions
|
|
|
|
|
|
def create_flow(db: Session, flow: schemas.FlowBase):
|
|
db_flow = models.Flow(
|
|
flowid=flow.flowid,
|
|
appid=flow.appid,
|
|
eventid=flow.eventid,
|
|
name=flow.name,
|
|
content=flow.content
|
|
)
|
|
db.add(db_flow)
|
|
db.commit()
|
|
db.refresh(db_flow)
|
|
return db_flow
|
|
|
|
def delete_flow(db: Session, flowid: str):
|
|
flow = get_flow(db, flowid)
|
|
if not flow:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow not found")
|
|
db.delete(flow)
|
|
db.commit()
|
|
return flow
|
|
|
|
|
|
def edit_flow(
|
|
db: Session, flow: schemas.FlowBase
|
|
) -> schemas.Flow:
|
|
db_flow = get_flow(db, flow.flowid)
|
|
if not db_flow:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow not found")
|
|
update_data = flow.dict(exclude_unset=True)
|
|
|
|
for key, value in update_data.items():
|
|
setattr(db_flow, key, value)
|
|
|
|
db.add(db_flow)
|
|
db.commit()
|
|
db.refresh(db_flow)
|
|
return db_flow
|
|
|
|
|
|
def get_flows(db: Session, flowid: str):
|
|
flows = db.query(models.Flow).all()
|
|
if not flows:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return flows
|
|
|
|
def get_flow(db: Session, flowid: str):
|
|
flow = db.query(models.Flow).filter(models.Flow.flowid == flowid).first()
|
|
if not flow:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return flow
|
|
|
|
def get_flows_by_app(db: Session, appid: str):
|
|
flows = db.query(models.Flow).filter(models.Flow.appid == appid).all()
|
|
if not flows:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return flows |