refactor the crud & paginate
This commit is contained in:
101
backend/app/db/cruddb/crudbase.py
Normal file
101
backend/app/db/cruddb/crudbase.py
Normal file
@@ -0,0 +1,101 @@
|
||||
from sqlalchemy import asc, desc
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.orm.query import Query
|
||||
from typing import Type, List, Optional
|
||||
from app.core.common import ApiReturnPage
|
||||
from fastapi_pagination.ext.sqlalchemy import paginate
|
||||
from sqlalchemy import and_ ,or_
|
||||
from pydantic import BaseModel
|
||||
from .. import models, schemas
|
||||
|
||||
class crudbase:
|
||||
def __init__(self, model: Type[models.Base]):
|
||||
self.model = model
|
||||
|
||||
def _apply_filters(self, query: Query, filters: dict) -> Query:
|
||||
and_conditions = []
|
||||
or_conditions = []
|
||||
for column_name, value in filters.items():
|
||||
column = getattr(self.model, column_name, None)
|
||||
if column:
|
||||
if isinstance(value, dict):
|
||||
if 'operator' in value:
|
||||
operator = value['operator']
|
||||
filter_value = value['value']
|
||||
if operator == '!=':
|
||||
and_conditions.append(column != filter_value)
|
||||
elif operator == 'like':
|
||||
and_conditions.append(column.like(f"%{filter_value}%"))
|
||||
elif operator == '=':
|
||||
and_conditions.append(column == filter_value)
|
||||
elif operator == '>':
|
||||
and_conditions.append(column > filter_value)
|
||||
elif operator == '<':
|
||||
and_conditions.append(column < filter_value)
|
||||
elif operator == 'in':
|
||||
if isinstance(filter_value, list):
|
||||
or_conditions.append(column.in_(filter_value))
|
||||
else:
|
||||
and_conditions.append(column == filter_value)
|
||||
else:
|
||||
and_conditions.append(column == value)
|
||||
else:
|
||||
and_conditions.append(column == value)
|
||||
|
||||
if and_conditions:
|
||||
query = query.filter(*and_conditions)
|
||||
if or_conditions:
|
||||
query = query.filter(or_(*or_conditions))
|
||||
return query
|
||||
|
||||
def _apply_sorting(self, query: Query, sort_by: Optional[str], sort_order: Optional[str]) -> Query:
|
||||
if sort_by:
|
||||
column = getattr(self.model, sort_by, None)
|
||||
if column:
|
||||
if sort_order == "desc":
|
||||
query = query.order_by(desc(column))
|
||||
else:
|
||||
query = query.order_by(asc(column))
|
||||
return query
|
||||
|
||||
def get_all(self, db: Session) -> ApiReturnPage[models.Base]:
|
||||
return paginate(db.query(self.model))
|
||||
|
||||
|
||||
def get(self, db: Session, item_id: int) -> Optional[models.Base]:
|
||||
return db.query(self.model).get(item_id)
|
||||
|
||||
def create(self, db: Session, obj_in: BaseModel) -> models.Base:
|
||||
db_obj = self.model(**obj_in.model_dump())
|
||||
db.add(db_obj)
|
||||
db.commit()
|
||||
db.refresh(db_obj)
|
||||
return db_obj
|
||||
|
||||
def update(self, db: Session, item_id: int, obj_in: BaseModel) -> Optional[models.Base]:
|
||||
db_obj = db.query(self.model).filter(self.model.id == item_id).first()
|
||||
if db_obj:
|
||||
for key, value in obj_in.model_dump(exclude_unset=True).items():
|
||||
setattr(db_obj, key, value)
|
||||
db.commit()
|
||||
db.refresh(db_obj)
|
||||
return db_obj
|
||||
return None
|
||||
|
||||
def delete(self, db: Session, item_id: int) -> Optional[models.Base]:
|
||||
db_obj = db.query(self.model).get(item_id)
|
||||
if db_obj:
|
||||
db.delete(db_obj)
|
||||
db.commit()
|
||||
return db_obj
|
||||
return None
|
||||
|
||||
def get_by_conditions(self, db: Session, filters: Optional[dict] = None, sort_by: Optional[str] = None,
|
||||
sort_order: Optional[str] = "asc") -> Query:
|
||||
query = db.query(self.model)
|
||||
if filters:
|
||||
query = self._apply_filters(query, filters)
|
||||
if sort_by:
|
||||
query = self._apply_sorting(query, sort_by, sort_order)
|
||||
print(str(query))
|
||||
return query
|
||||
135
backend/app/db/cruddb/dbdomain.py
Normal file
135
backend/app/db/cruddb/dbdomain.py
Normal file
@@ -0,0 +1,135 @@
|
||||
from datetime import datetime
|
||||
from fastapi import HTTPException, status
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import and_
|
||||
import typing as t
|
||||
|
||||
from app.db.cruddb.crudbase import crudbase
|
||||
from fastapi_pagination.ext.sqlalchemy import paginate
|
||||
from app.core.common import ApiReturnPage
|
||||
|
||||
from app.db import models, schemas
|
||||
from app.core.security import chacha20Decrypt, get_password_hash
|
||||
|
||||
class dbuserdomain(crudbase):
|
||||
def __init__(self):
|
||||
super().__init__(model=models.UserDomain)
|
||||
|
||||
def get_userdomain(self,db: Session,userid:int,domainid:int):
|
||||
return super().get_by_conditions(db,{"userid":userid,"domainid":domainid}).first()
|
||||
|
||||
def get_userdomain_by_domainid(self,db: Session,ownerid:int,domainid:int):
|
||||
return super().get_by_conditions(db,{"domainid":domainid})
|
||||
|
||||
|
||||
def get_default_domains(self,db: Session,domainid:int):
|
||||
return super().get_by_conditions(db,{"domainid":domainid,"is_default":True}).all()
|
||||
|
||||
def get_user_default_domain(self,db: Session,userid:int):
|
||||
return super().get_by_conditions(db,{"userid":userid,"is_default":True}).first()
|
||||
|
||||
|
||||
dbuserdomain = dbuserdomain()
|
||||
|
||||
class dbdomain(crudbase):
|
||||
def __init__(self):
|
||||
super().__init__(model=models.Domain)
|
||||
|
||||
def get_domains(self,db: Session)-> ApiReturnPage[models.Base]:
|
||||
return super().get_all(db)
|
||||
|
||||
def get_domains_by_owner(self,db: Session,ownerid:int)-> ApiReturnPage[models.Base]:
|
||||
return paginate( super().get_by_conditions(db,{"ownerid":ownerid}))
|
||||
|
||||
def create_domain(self,db: Session, domain: schemas.DomainIn,userid:int):
|
||||
db_domain = super().get_by_conditions(db,{"url":domain.url,"kintoneuser":domain.kintoneuser,"onwerid":userid}).first()
|
||||
if not db_domain:
|
||||
domain.encrypt_kintonepwd()
|
||||
domain.id = None
|
||||
domain.createuserid = userid
|
||||
domain.updateuserid = userid
|
||||
domain.ownerid = userid
|
||||
return super().create(db,domain)
|
||||
return db_domain
|
||||
|
||||
def delete_domain(self,db: Session,id: int):
|
||||
return super().delete(db,id)
|
||||
|
||||
def edit_domain(self,db: Session, domain: schemas.DomainIn,userid:int) -> schemas.DomainOut:
|
||||
db_domain = super().get(db,domain.id)
|
||||
if db_domain:
|
||||
db_domain.tenantid = domain.tenantid
|
||||
db_domain.name=domain.name
|
||||
db_domain.url=domain.url
|
||||
if db_domain.is_active == True and domain.is_active == False:
|
||||
db_userdomains = dbuserdomain.get_default_domains(db,domain.id)
|
||||
for userdomain in db_userdomains:
|
||||
userdomain.active = False
|
||||
db.add(userdomain)
|
||||
db_domain.is_active=domain.is_active
|
||||
db_domain.kintoneuser=domain.kintoneuser
|
||||
if domain.kintonepwd != "" and domain.kintonepwd != None:
|
||||
domain.encrypt_kintonepwd()
|
||||
db_domain.kintonepwd = domain.kintonepwd
|
||||
db_domain.updateuserid = userid
|
||||
db.add(db_domain)
|
||||
db.commit()
|
||||
db.refresh(db_domain)
|
||||
return db_domain
|
||||
return None
|
||||
|
||||
def add_userdomain(self,db: Session,ownerid:int,userid:int,domainid:int) -> schemas.DomainOut:
|
||||
db_domain = super().get_by_conditions(db,{"id":domainid,"is_active":True}).first()
|
||||
if db_domain:
|
||||
db_userdomain = dbuserdomain.get_userdomain(db,userid,domainid)
|
||||
if not db_userdomain:
|
||||
user_domain = models.UserDomain(userid = userid, domainid = domainid ,createuserid = ownerid,updateuserid = ownerid)
|
||||
db.add(user_domain)
|
||||
db.commit()
|
||||
return db_domain
|
||||
return None
|
||||
|
||||
def add_userdomain_by_owner(self,db: Session,ownerid:int, userid:int,domainid:int) -> schemas.DomainOut:
|
||||
db_domain = super().get_by_conditions(db,{"id":domainid,"ownerid":ownerid,"is_active":True}).first()
|
||||
if db_domain:
|
||||
db_userdomain = dbuserdomain.get_userdomain(db,userid,domainid)
|
||||
if not db_userdomain:
|
||||
user_domain = models.UserDomain(userid = userid, domainid = domainid ,createuserid =ownerid,updateuserid = ownerid)
|
||||
db.add(user_domain)
|
||||
db.commit()
|
||||
return db_domain
|
||||
return None
|
||||
|
||||
def delete_userdomain(self,db: Session, userid: int,domainid: int) -> schemas.DomainOut:
|
||||
db_userdomain = dbuserdomain.get_userdomain(db,userid,domainid)
|
||||
if db_userdomain:
|
||||
domain = db_userdomain.domain
|
||||
db.delete(db_userdomain)
|
||||
db.commit()
|
||||
return domain
|
||||
return None
|
||||
|
||||
def get_default_domain(self,db: Session, userid: int) -> schemas.UserDomain:
|
||||
return dbuserdomain.get_user_default_domain(db,userid)
|
||||
|
||||
def set_default_domain(self,db: Session, userid: int,domainid: int):
|
||||
db_domain =super().get_by_conditions(db,{"id":domainid,"is_active":True}).first()
|
||||
if db_domain:
|
||||
db_default_domain = dbuserdomain.get_user_default_domain(db,userid)
|
||||
db_userdomain =dbuserdomain.get_userdomain(db,userid,domainid)
|
||||
if db_default_domain:
|
||||
db_default_domain.is_default = False
|
||||
db_default_domain.updateuserid = userid
|
||||
db.add(db_default_domain)
|
||||
if db_userdomain:
|
||||
db_userdomain.is_default = True
|
||||
db_userdomain.updateuserid = userid
|
||||
db.add(db_userdomain)
|
||||
db.commit()
|
||||
return db_domain
|
||||
|
||||
def get_shareddomain_users(self,db: Session,ownerid:int,domainid: int) -> ApiReturnPage[models.Base]:
|
||||
users = db.query(models.User).join(models.UserDomain,models.UserDomain.userid == models.User.id).filter(models.UserDomain.domainid ==domainid)
|
||||
return paginate(users)
|
||||
|
||||
dbdomain = dbdomain()
|
||||
@@ -145,6 +145,7 @@ class Tenant(Base):
|
||||
startdate = Column(DateTime)
|
||||
enddate = Column(DateTime)
|
||||
|
||||
|
||||
class Domain(Base):
|
||||
__tablename__ = "domain"
|
||||
|
||||
@@ -153,6 +154,7 @@ class Domain(Base):
|
||||
url = Column(String(200), nullable=False)
|
||||
kintoneuser = Column(String(100), nullable=False)
|
||||
kintonepwd = Column(String(100), nullable=False)
|
||||
is_active = Column(Boolean, default=True)
|
||||
def decrypt_kintonepwd(self):
|
||||
decrypted_pwd = chacha20Decrypt(self.kintonepwd)
|
||||
return decrypted_pwd
|
||||
@@ -162,14 +164,20 @@ class Domain(Base):
|
||||
createuser = relationship('User',foreign_keys=[createuserid])
|
||||
updateuser = relationship('User',foreign_keys=[updateuserid])
|
||||
owner = relationship('User',foreign_keys=[ownerid])
|
||||
is_active = Column(Boolean, default=True)
|
||||
|
||||
|
||||
class UserDomain(Base):
|
||||
__tablename__ = "userdomain"
|
||||
|
||||
userid = Column(Integer,ForeignKey("user.id"))
|
||||
domainid = Column(Integer,ForeignKey("domain.id"))
|
||||
active = Column(Boolean, default=False)
|
||||
is_default = Column(Boolean, default=False)
|
||||
createuserid = Column(Integer,ForeignKey("user.id"))
|
||||
updateuserid = Column(Integer,ForeignKey("user.id"))
|
||||
domain = relationship("Domain")
|
||||
user = relationship("User",foreign_keys=[userid])
|
||||
createuser = relationship('User',foreign_keys=[createuserid])
|
||||
updateuser = relationship('User',foreign_keys=[updateuserid])
|
||||
|
||||
class Event(Base):
|
||||
__tablename__ = "event"
|
||||
|
||||
@@ -28,9 +28,14 @@ class UserBase(BaseModel):
|
||||
first_name: str = None
|
||||
last_name: str = None
|
||||
roles:t.List[Role] = []
|
||||
|
||||
|
||||
class UserOut(UserBase):
|
||||
pass
|
||||
class UserOut(BaseModel):
|
||||
email: str
|
||||
is_active: bool = True
|
||||
is_superuser: bool = False
|
||||
first_name: str = None
|
||||
last_name: str = None
|
||||
|
||||
|
||||
class UserCreate(UserBase):
|
||||
@@ -149,9 +154,11 @@ class DomainIn(BaseModel):
|
||||
name: str
|
||||
url: str
|
||||
kintoneuser: str
|
||||
kintonepwd: str
|
||||
kintonepwd: t.Optional[str] = None
|
||||
is_active: bool
|
||||
ownerid:int
|
||||
createuserid:t.Optional[int] = None
|
||||
updateuserid:t.Optional[int] = None
|
||||
ownerid:t.Optional[int] = None
|
||||
|
||||
def encrypt_kintonepwd(self):
|
||||
encrypted_pwd = chacha20Encrypt(self.kintonepwd)
|
||||
@@ -162,16 +169,18 @@ class DomainOut(BaseModel):
|
||||
tenantid: str
|
||||
name: str
|
||||
url: str
|
||||
kintoneuser: str
|
||||
is_active: bool
|
||||
ownerid:int
|
||||
|
||||
class ConfigDict:
|
||||
orm_mode = True
|
||||
|
||||
class ActiveDomain(BaseModel):
|
||||
class UserDomain(BaseModel):
|
||||
id: int
|
||||
tenantid: str
|
||||
name: str
|
||||
url: str
|
||||
is_active: bool
|
||||
is_default: bool
|
||||
domain:DomainOut
|
||||
user:UserOut
|
||||
|
||||
class Domain(Base):
|
||||
id: int
|
||||
@@ -179,12 +188,14 @@ class Domain(Base):
|
||||
name: str
|
||||
url: str
|
||||
kintoneuser: str
|
||||
kintonepwd: str
|
||||
is_active: bool
|
||||
updateuser:UserOut
|
||||
owner:UserOut
|
||||
|
||||
class ConfigDict:
|
||||
orm_mode = True
|
||||
|
||||
|
||||
class Event(Base):
|
||||
id: int
|
||||
category: str
|
||||
|
||||
Reference in New Issue
Block a user