284 lines
8.6 KiB
Python
284 lines
8.6 KiB
Python
from fastapi import HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import and_
|
|
import typing as t
|
|
|
|
from . import models, schemas
|
|
from app.core.security import get_password_hash
|
|
|
|
|
|
def get_user(db: Session, user_id: int):
|
|
user = db.query(models.User).filter(models.User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
return user
|
|
|
|
|
|
def get_user_by_email(db: Session, email: str) -> schemas.UserBase:
|
|
return db.query(models.User).filter(models.User.email == email).first()
|
|
|
|
|
|
def get_users(
|
|
db: Session, skip: int = 0, limit: int = 100
|
|
) -> t.List[schemas.UserOut]:
|
|
return db.query(models.User).offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_user(db: Session, user: schemas.UserCreate):
|
|
hashed_password = get_password_hash(user.password)
|
|
db_user = models.User(
|
|
first_name=user.first_name,
|
|
last_name=user.last_name,
|
|
email=user.email,
|
|
is_active=user.is_active,
|
|
is_superuser=user.is_superuser,
|
|
hashed_password=hashed_password,
|
|
)
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
def delete_user(db: Session, user_id: int):
|
|
user = get_user(db, user_id)
|
|
if not user:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
db.delete(user)
|
|
db.commit()
|
|
return user
|
|
|
|
|
|
def edit_user(
|
|
db: Session, user_id: int, user: schemas.UserEdit
|
|
) -> schemas.User:
|
|
db_user = get_user(db, user_id)
|
|
if not db_user:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="User not found")
|
|
update_data = user.dict(exclude_unset=True)
|
|
|
|
if "password" in update_data:
|
|
update_data["hashed_password"] = get_password_hash(user.password)
|
|
del update_data["password"]
|
|
|
|
for key, value in update_data.items():
|
|
setattr(db_user, key, value)
|
|
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|
|
|
|
|
|
def get_appsetting(db: Session, id: int):
|
|
app = db.query(models.AppSetting).get(id)
|
|
if not app:
|
|
raise HTTPException(status_code=404, detail="App not found")
|
|
return app
|
|
|
|
def create_appsetting(db: Session, app: schemas.AppBase):
|
|
db_app = models.AppSetting(
|
|
appid=app.appid,
|
|
setting=app.setting,
|
|
)
|
|
db.add(db_app)
|
|
db.commit()
|
|
db.refresh(db_app)
|
|
return db_app
|
|
|
|
def delete_appsetting(db: Session, id: int):
|
|
app = get_appsetting(db, id)
|
|
if not app:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="App not found")
|
|
db.delete(app)
|
|
db.commit()
|
|
return app
|
|
|
|
|
|
def edit_appsetting(
|
|
db: Session, id: int, app: schemas.AppBase
|
|
) -> schemas.App:
|
|
db_app = get_appsetting(db, id)
|
|
if not db_app:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="App not found")
|
|
update_data = app.dict(exclude_unset=True)
|
|
|
|
for key, value in update_data.items():
|
|
setattr(db_app, key, value)
|
|
|
|
db.add(db_app)
|
|
db.commit()
|
|
db.refresh(db_app)
|
|
return db_app
|
|
|
|
|
|
def get_kintones(db: Session, type: int):
|
|
kintones = db.query(models.Kintone).filter(models.Kintone.type == type).all()
|
|
if not kintones:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return kintones
|
|
|
|
def get_actions(db: Session):
|
|
actions = db.query(models.Action).all()
|
|
if not actions:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return actions
|
|
|
|
|
|
def create_flow(db: Session, flow: schemas.FlowBase):
|
|
db_flow = models.Flow(
|
|
flowid=flow.flowid,
|
|
appid=flow.appid,
|
|
eventid=flow.eventid,
|
|
name=flow.name,
|
|
content=flow.content
|
|
)
|
|
db.add(db_flow)
|
|
db.commit()
|
|
db.refresh(db_flow)
|
|
return db_flow
|
|
|
|
def delete_flow(db: Session, flowid: str):
|
|
flow = get_flow(db, flowid)
|
|
if not flow:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow not found")
|
|
db.delete(flow)
|
|
db.commit()
|
|
return flow
|
|
|
|
|
|
def edit_flow(
|
|
db: Session, flow: schemas.FlowBase
|
|
) -> schemas.Flow:
|
|
db_flow = get_flow(db, flow.flowid)
|
|
if not db_flow:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow not found")
|
|
update_data = flow.dict(exclude_unset=True)
|
|
|
|
for key, value in update_data.items():
|
|
setattr(db_flow, key, value)
|
|
|
|
db.add(db_flow)
|
|
db.commit()
|
|
db.refresh(db_flow)
|
|
return db_flow
|
|
|
|
|
|
def get_flows(db: Session, flowid: str):
|
|
flows = db.query(models.Flow).all()
|
|
if not flows:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return flows
|
|
|
|
def get_flow(db: Session, flowid: str):
|
|
flow = db.query(models.Flow).filter(models.Flow.flowid == flowid).first()
|
|
if not flow:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return flow
|
|
|
|
def get_flows_by_app(db: Session, appid: str):
|
|
flows = db.query(models.Flow).filter(models.Flow.appid == appid).all()
|
|
if not flows:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return flows
|
|
|
|
def create_domain(db: Session, domain: schemas.DomainBase):
|
|
db_domain = models.Domain(
|
|
tenantid = domain.tenantid,
|
|
name=domain.name,
|
|
url=domain.url,
|
|
kintoneuser=domain.kintoneuser,
|
|
kintonepwd=domain.kintonepwd
|
|
)
|
|
db.add(db_domain)
|
|
db.commit()
|
|
db.refresh(db_domain)
|
|
return db_domain
|
|
|
|
def delete_domain(db: Session,id: int):
|
|
db_domain = db.query(models.Domain).get(id)
|
|
if not db_domain:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
|
db.delete(db_domain)
|
|
db.commit()
|
|
return db_domain
|
|
|
|
|
|
def edit_domain(
|
|
db: Session, domain: schemas.DomainBase
|
|
) -> schemas.Domain:
|
|
db_domain = db.query(models.Domain).get(domain.id)
|
|
if not db_domain:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
|
update_data = domain.dict(exclude_unset=True)
|
|
|
|
for key, value in update_data.items():
|
|
if(key != "id"):
|
|
setattr(db_domain, key, value)
|
|
|
|
db.add(db_domain)
|
|
db.commit()
|
|
db.refresh(db_domain)
|
|
return db_domain
|
|
|
|
def add_userdomain(db: Session, userid:int,domainids:list):
|
|
for domainid in domainids:
|
|
db_domain = models.UserDomain(
|
|
userid = userid,
|
|
domainid = domainid
|
|
)
|
|
db.add(db_domain)
|
|
db.commit()
|
|
db.refresh(db_domain)
|
|
return db_domain
|
|
|
|
def delete_userdomain(db: Session, userid: int,domainid: int):
|
|
db_domain = db.query(models.UserDomain).filter(and_(models.UserDomain.userid == userid,models.UserDomain.domainid == domainid)).first()
|
|
if not db_domain:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
|
db.delete(db_domain)
|
|
db.commit()
|
|
return db_domain
|
|
|
|
def active_userdomain(db: Session, userid: int,domainid: int):
|
|
db_userdomains = db.query(models.UserDomain).filter(models.UserDomain.userid == userid).all()
|
|
if not db_userdomains:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
|
for domain in db_userdomains:
|
|
if domain.domainid == domainid:
|
|
domain.active = True
|
|
else:
|
|
domain.active = False
|
|
db.add(domain)
|
|
db.commit()
|
|
return db_userdomains
|
|
|
|
def get_activedomain(db: Session, userid: int):
|
|
db_domain = db.query(models.Domain).join(models.UserDomain,models.UserDomain.domainid == models.Domain.id ).filter(and_(models.UserDomain.userid == userid,models.UserDomain.active == True)).first()
|
|
if not db_domain:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
|
return db_domain
|
|
|
|
def get_domain(db: Session, userid: str):
|
|
domains = db.query(models.Domain).join(models.UserDomain,models.UserDomain.domainid == models.Domain.id ).filter(models.UserDomain.userid == userid).all()
|
|
if not domains:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return domains
|
|
|
|
def get_domains(db: Session,tenantid:str):
|
|
domains = db.query(models.Domain).filter(models.Domain.tenantid == tenantid ).all()
|
|
if not domains:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return domains
|
|
|
|
def get_events(db: Session):
|
|
events = db.query(models.Event).all()
|
|
if not events:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return events
|
|
|
|
def get_eventactions(db: Session,eventid: str):
|
|
eveactions = db.query(models.Action).join(models.EventAction,models.EventAction.actionid == models.Action.id ).join(models.Event,models.Event.id == models.EventAction.eventid).filter(models.Event.eventid == eventid).all()
|
|
if not eveactions:
|
|
raise HTTPException(status_code=404, detail="Data not found")
|
|
return eveactions |