Files
KintoneAppBuilder/backend/app/db/cruddb/dbdomain.py
2024-12-16 15:49:00 +09:00

218 lines
9.6 KiB
Python

from datetime import datetime
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy import select,and_
import typing as t
from app.db.cruddb.crudbase import crudbase
from fastapi_pagination.ext.sqlalchemy import paginate
from app.core.common import ApiReturnPage
from app.db import models, schemas
from app.core.security import chacha20Decrypt, get_password_hash
class dbuserdomain(crudbase):
def __init__(self):
super().__init__(model=models.UserDomain)
def get_userdomain(self,db: Session,userid:int,domainid:int):
return db.execute(super().get_by_conditions({"userid":userid,"domainid":domainid})).scalars().first()
def get_userdomain_by_domainid(self,db: Session,ownerid:int,domainid:int):
return super().get_by_conditions({"domainid":domainid})
def get_default_domains(self,db: Session,domainid:int):
return db.execute(super().get_by_conditions({"domainid":domainid,"is_default":True})).scalars().all()
def get_user_default_domain(self,db: Session,userid:int):
return db.execute(super().get_by_conditions({"userid":userid,"is_default":True})).scalars().first()
dbuserdomain = dbuserdomain()
class dbmanagedomain(crudbase):
def __init__(self):
super().__init__(model=models.ManageDomain)
def get_managedomain(self,db: Session,userid:int,domainid:int):
return db.execute(super().get_by_conditions({"userid":userid,"domainid":domainid})).scalars().first()
def get_managedomain_by_domain(self,db: Session,domainid:int):
return db.execute(super().get_by_conditions({"domainid":domainid})).scalars().all()
dbmanagedomain = dbmanagedomain()
class dbdomain(crudbase):
def __init__(self):
super().__init__(model=models.Domain)
def get_domains(self,db: Session)-> ApiReturnPage[models.Base]:
return paginate(db,super().get_all())
def get_domains_by_manage(self,db: Session,userid:int)-> ApiReturnPage[models.Base]:
query = select(models.Domain).join(models.ManageDomain,models.ManageDomain.domainid == models.Domain.id).where(models.ManageDomain.userid == userid)
return paginate(db,query)
def get_domains_by_owner(self,db: Session,ownerid:int)-> ApiReturnPage[models.Base]:
return paginate(db,super().get_by_conditions({"ownerid":ownerid}))
def create_domain(self,db: Session, domain: schemas.DomainIn,userid:int):
#db_domain = super().get_by_conditions(db,{"url":domain.url,"kintoneuser":domain.kintoneuser,"onwerid":userid}).first()
#if not db_domain:
domain.encrypt_kintonepwd()
db_domain = models.Domain(
tenantid = domain.tenantid,
name = domain.name,
url = domain.url,
kintoneuser = domain.kintoneuser,
kintonepwd = domain.kintonepwd,
is_active = domain.is_active,
createuserid = userid,
updateuserid = userid,
ownerid = userid
)
db.add(db_domain)
db.flush()
user_domain = models.UserDomain(userid = userid, domainid = db_domain.id ,createuserid = userid,updateuserid = userid)
db.add(user_domain)
manage_domain = models.ManageDomain(userid = userid, domainid = db_domain.id ,createuserid = userid,updateuserid = userid)
db.add(manage_domain)
db.commit()
db.refresh(db_domain)
return db_domain
def delete_domain(self,db: Session,id: int):
db_managedomains = dbmanagedomain.get_managedomain_by_domain(db,id)
for manage in db_managedomains:
db.delete(manage)
return super().delete(db,id)
def edit_domain(self,db: Session, domain: schemas.DomainIn,userid:int) -> schemas.DomainOut:
db_domain = super().get(db,domain.id)
if db_domain:
db_domain.tenantid = domain.tenantid
db_domain.name=domain.name
db_domain.url=domain.url
if db_domain.is_active == True and domain.is_active == False:
db_userdomains = dbuserdomain.get_default_domains(db,domain.id)
for userdomain in db_userdomains:
userdomain.is_default = False
db.add(userdomain)
db_domain.is_active=domain.is_active
db_domain.kintoneuser=domain.kintoneuser
if domain.kintonepwd != "" and domain.kintonepwd != None:
domain.encrypt_kintonepwd()
db_domain.kintonepwd = domain.kintonepwd
db_domain.updateuserid = userid
db.add(db_domain)
db.commit()
db.refresh(db_domain)
return db_domain
return None
def add_userdomain(self,db: Session,ownerid:int,userid:int,domainid:int) -> schemas.DomainOut:
db_domain = super().get(db,domainid)
if db_domain and db_domain.is_active:
db_userdomain = dbuserdomain.get_userdomain(db,userid,domainid)
if not db_userdomain:
user_domain = models.UserDomain(userid = userid, domainid = domainid ,createuserid = ownerid,updateuserid = ownerid)
db.add(user_domain)
db.commit()
return db_domain
return None
def add_userdomain_by_owner(self,db: Session,ownerid:int, userid:int,domainid:int) -> schemas.DomainOut:
db_domain = db.execute(super().get_by_conditions({"id":domainid,"is_active":True})).scalars().first()
if db_domain:
db_userdomain = dbuserdomain.get_userdomain(db,userid,domainid)
if not db_userdomain:
user_domain = models.UserDomain(userid = userid, domainid = domainid ,createuserid =ownerid,updateuserid = ownerid)
db.add(user_domain)
db.commit()
return db_domain
return None
def delete_userdomain(self,db: Session, userid: int,domainid: int) -> schemas.DomainOut:
db_userdomain = dbuserdomain.get_userdomain(db,userid,domainid)
if db_userdomain:
domain = db_userdomain.domain
db.delete(db_userdomain)
db.commit()
return domain
return None
def get_default_domain(self,db: Session, userid: int) -> schemas.DomainOut:
userdomain = dbuserdomain.get_user_default_domain(db,userid)
if userdomain:
return userdomain.domain
else:
return None
def set_default_domain(self,db: Session, userid: int,domainid: int):
db_domain =db.execute(super().get_by_conditions({"id":domainid,"is_active":True})).scalars().first()
if db_domain:
db_default_domain = dbuserdomain.get_user_default_domain(db,userid)
db_userdomain =dbuserdomain.get_userdomain(db,userid,domainid)
if db_default_domain:
if db_default_domain.domainid != domainid:
db_default_domain.is_default = False
db_default_domain.updateuserid = userid
db.add(db_default_domain)
else:
return db_domain
if db_userdomain:
db_userdomain.is_default = True
db_userdomain.updateuserid = userid
db.add(db_userdomain)
else:
db_userdomain = dbuserdomain.create(db,schemas.UserDomainIn(domainid=domainid,userid=userid,is_default = True))
db.add(db_userdomain)
db.commit()
return db_domain
else:
return None
def get_shareddomain_users(self,db: Session,domainid: int) -> ApiReturnPage[models.Base]:
users = select(models.User).join(models.UserDomain,models.UserDomain.userid == models.User.id).filter(models.UserDomain.domainid ==domainid)
return paginate(db,users)
def add_managedomain(self,db: Session,ownerid:int,userid:int,domainid:int) -> schemas.DomainOut:
db_domain = self.get(db,domainid)
if db_domain:
db_managedomain = dbmanagedomain.get_managedomain(db,userid,domainid)
if not db_managedomain:
manage_domain = models.ManageDomain(userid = userid, domainid = domainid ,createuserid = ownerid,updateuserid = ownerid)
db.add(manage_domain)
db.commit()
return db_domain
return None
def add_managedomain_by_owner(self,db: Session,ownerid:int, userid:int,domainid:int) -> schemas.DomainOut:
db_domain = db.execute(super().get_by_conditions({"id":domainid,"ownerid":ownerid,})).scalars().first()
if db_domain:
db_managedomain = dbmanagedomain.get_managedomain(db,userid,domainid)
if not db_managedomain:
manage_domain = models.ManageDomain(userid = userid, domainid = domainid ,createuserid =ownerid,updateuserid = ownerid)
db.add(manage_domain)
db.commit()
return db_domain
return None
def delete_managedomain(self,db: Session, userid: int,domainid: int) -> schemas.DomainOut:
db_managedomain = dbmanagedomain.get_managedomain(db,userid,domainid)
if db_managedomain:
domain = db_managedomain.domain
if domain.ownerid != userid:
db.delete(db_managedomain)
db.commit()
return domain
return None
def get_managedomain_users(self,db: Session,domainid: int) -> ApiReturnPage[models.Base]:
users = select(models.User).join(models.ManageDomain,models.ManageDomain.userid == models.User.id).where(models.ManageDomain.domainid ==domainid)
return paginate(db,users)
domainService = dbdomain()