SQLAlchemy 1.0->SQLAlchemy 2.x

This commit is contained in:
2024-12-07 21:37:33 +09:00
parent 29501f785f
commit 3aec075927
7 changed files with 200 additions and 41 deletions

View File

@@ -281,7 +281,7 @@ async def flow_delete(
response_model=ApiReturnPage[Domain], response_model=ApiReturnPage[Domain],
response_model_exclude_none=True, response_model_exclude_none=True,
) )
async def domain_details( async def domain_list(
request: Request, request: Request,
user=Depends(get_current_active_user), user=Depends(get_current_active_user),
db=Depends(get_db), db=Depends(get_db),
@@ -295,6 +295,22 @@ async def domain_details(
except Exception as e: except Exception as e:
raise APIException('platform:domains',request.url._url,f"Error occurred while get domains:",e) raise APIException('platform:domains',request.url._url,f"Error occurred while get domains:",e)
@r.get(
"/domain/{domain_id}",tags=["Domain"],
response_model=ApiReturnModel[Domain|None],
response_model_exclude_none=True,
)
async def domain_detail(
request: Request,
domain_id:int,
user=Depends(get_current_active_user),
db=Depends(get_db),
):
try:
return ApiReturnModel(data = dbdomain.get(db,domain_id))
except Exception as e:
raise APIException('platform:domain',request.url._url,f"Error occurred while get domain detail:",e)
@r.post("/domain", tags=["Domain"], @r.post("/domain", tags=["Domain"],
response_model=ApiReturnModel[Domain], response_model=ApiReturnModel[Domain],
response_model_exclude_none=True) response_model_exclude_none=True)

View File

@@ -1,4 +1,4 @@
from sqlalchemy import asc, desc from sqlalchemy import asc, desc, select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy.orm.query import Query from sqlalchemy.orm.query import Query
from typing import Type, List, Optional from typing import Type, List, Optional
@@ -46,9 +46,10 @@ class crudbase:
and_conditions.append(column == value) and_conditions.append(column == value)
if and_conditions: if and_conditions:
query = query.filter(*and_conditions) query = query.where(and_(*and_conditions))
if or_conditions: if or_conditions:
query = query.filter(or_(*or_conditions)) query = query.where(or_(*or_conditions))
return query return query
def _apply_sorting(self, query: Query, sort_by: Optional[str], sort_order: Optional[str]) -> Query: def _apply_sorting(self, query: Query, sort_by: Optional[str], sort_order: Optional[str]) -> Query:
@@ -61,12 +62,11 @@ class crudbase:
query = query.order_by(asc(column)) query = query.order_by(asc(column))
return query return query
def get_all(self, db: Session) -> Query: def get_all(self) -> Query:
return db.query(self.model) return select(self.model)
def get(self, db: Session, item_id: int) -> Optional[models.Base]: def get(self, db: Session, item_id: int) -> Optional[models.Base]:
return db.query(self.model).get(item_id) return db.execute(select(self.model).filter(self.model.id == item_id)).scalar_one_or_none()
def create(self, db: Session, obj_in: BaseModel) -> models.Base: def create(self, db: Session, obj_in: BaseModel) -> models.Base:
db_obj = self.model(**obj_in.model_dump()) db_obj = self.model(**obj_in.model_dump())
@@ -76,7 +76,7 @@ class crudbase:
return db_obj return db_obj
def update(self, db: Session, item_id: int, obj_in: BaseModel) -> Optional[models.Base]: def update(self, db: Session, item_id: int, obj_in: BaseModel) -> Optional[models.Base]:
db_obj = db.query(self.model).filter(self.model.id == item_id).first() db_obj = self.get(db,item_id)
if db_obj: if db_obj:
for key, value in obj_in.model_dump(exclude_unset=True).items(): for key, value in obj_in.model_dump(exclude_unset=True).items():
setattr(db_obj, key, value) setattr(db_obj, key, value)
@@ -86,16 +86,16 @@ class crudbase:
return None return None
def delete(self, db: Session, item_id: int) -> Optional[models.Base]: def delete(self, db: Session, item_id: int) -> Optional[models.Base]:
db_obj = db.query(self.model).get(item_id) db_obj = self.get(db,item_id)
if db_obj: if db_obj:
db.delete(db_obj) db.delete(db_obj)
db.commit() db.commit()
return db_obj return db_obj
return None return None
def get_by_conditions(self, db: Session, filters: Optional[dict] = None, sort_by: Optional[str] = None, def get_by_conditions(self, filters: Optional[dict] = None, sort_by: Optional[str] = None,
sort_order: Optional[str] = "asc") -> Query: sort_order: Optional[str] = "asc") -> Query:
query = db.query(self.model) query = select(self.model)
if filters: if filters:
query = self._apply_filters(query, filters) query = self._apply_filters(query, filters)
if sort_by: if sort_by:

View File

@@ -16,17 +16,17 @@ class dbuserdomain(crudbase):
super().__init__(model=models.UserDomain) super().__init__(model=models.UserDomain)
def get_userdomain(self,db: Session,userid:int,domainid:int): def get_userdomain(self,db: Session,userid:int,domainid:int):
return super().get_by_conditions(db,{"userid":userid,"domainid":domainid}).first() return db.execute(super().get_by_conditions({"userid":userid,"domainid":domainid})).scalars().first()
def get_userdomain_by_domainid(self,db: Session,ownerid:int,domainid:int): def get_userdomain_by_domainid(self,db: Session,ownerid:int,domainid:int):
return super().get_by_conditions(db,{"domainid":domainid}) return super().get_by_conditions({"domainid":domainid})
def get_default_domains(self,db: Session,domainid:int): def get_default_domains(self,db: Session,domainid:int):
return super().get_by_conditions(db,{"domainid":domainid,"is_default":True}).all() return db.execute(super().get_by_conditions({"domainid":domainid,"is_default":True})).scalars().all()
def get_user_default_domain(self,db: Session,userid:int): def get_user_default_domain(self,db: Session,userid:int):
return super().get_by_conditions(db,{"userid":userid,"is_default":True}).first() return db.execute(super().get_by_conditions({"userid":userid,"is_default":True})).scalars().first()
dbuserdomain = dbuserdomain() dbuserdomain = dbuserdomain()
@@ -36,10 +36,10 @@ class dbdomain(crudbase):
super().__init__(model=models.Domain) super().__init__(model=models.Domain)
def get_domains(self,db: Session)-> ApiReturnPage[models.Base]: def get_domains(self,db: Session)-> ApiReturnPage[models.Base]:
return paginate(super().get_all(db)) return paginate(db,super().get_all())
def get_domains_by_owner(self,db: Session,ownerid:int)-> ApiReturnPage[models.Base]: def get_domains_by_owner(self,db: Session,ownerid:int)-> ApiReturnPage[models.Base]:
return paginate( super().get_by_conditions(db,{"ownerid":ownerid})) return paginate(db,super().get_by_conditions({"ownerid":ownerid}))
def create_domain(self,db: Session, domain: schemas.DomainIn,userid:int): def create_domain(self,db: Session, domain: schemas.DomainIn,userid:int):
#db_domain = super().get_by_conditions(db,{"url":domain.url,"kintoneuser":domain.kintoneuser,"onwerid":userid}).first() #db_domain = super().get_by_conditions(db,{"url":domain.url,"kintoneuser":domain.kintoneuser,"onwerid":userid}).first()
@@ -79,8 +79,8 @@ class dbdomain(crudbase):
return None return None
def add_userdomain(self,db: Session,ownerid:int,userid:int,domainid:int) -> schemas.DomainOut: def add_userdomain(self,db: Session,ownerid:int,userid:int,domainid:int) -> schemas.DomainOut:
db_domain = super().get_by_conditions(db,{"id":domainid,"is_active":True}).first() db_domain = super().get(db,domainid)
if db_domain: if db_domain and db_domain.is_active:
db_userdomain = dbuserdomain.get_userdomain(db,userid,domainid) db_userdomain = dbuserdomain.get_userdomain(db,userid,domainid)
if not db_userdomain: if not db_userdomain:
user_domain = models.UserDomain(userid = userid, domainid = domainid ,createuserid = ownerid,updateuserid = ownerid) user_domain = models.UserDomain(userid = userid, domainid = domainid ,createuserid = ownerid,updateuserid = ownerid)

View File

@@ -32,13 +32,13 @@ class dbuser(crudbase):
return super().get(db,user_id) return super().get(db,user_id)
def get_user_by_email(self,db: Session, email: str) -> schemas.User: def get_user_by_email(self,db: Session, email: str) -> schemas.User:
return super().get_by_conditions(db,{"email":email}).first() return db.execute(super().get_by_conditions({"email":email})).scalars().first()
def get_users(self,db: Session) -> ApiReturnPage[models.Base]: def get_users(self,db: Session) -> ApiReturnPage[models.Base]:
return paginate(super().get_all(db)) return paginate(db,super().get_all())
def get_users_not_admin(self,db: Session) -> ApiReturnPage[models.Base]: def get_users_not_admin(self,db: Session) -> ApiReturnPage[models.Base]:
return paginate(super().get_by_conditions(db,{"is_superuser":False})) return paginate(db,super().get_by_conditions({"is_superuser":False}))
def create_user(self,db: Session, user: schemas.UserCreate,userid:int): def create_user(self,db: Session, user: schemas.UserCreate,userid:int):
hashed_password = get_password_hash(user.password) hashed_password = get_password_hash(user.password)
@@ -63,7 +63,7 @@ class dbuser(crudbase):
return dbrole.get_all(db).all() return dbrole.get_all(db).all()
def get_roles_by_level(self,db: Session,level:int) -> t.List[schemas.RoleBase]: def get_roles_by_level(self,db: Session,level:int) -> t.List[schemas.RoleBase]:
return dbrole.get_by_conditions(db,{"level":{"operator":">=","value":level}}).all() return db.execute(dbrole.get_by_conditions({"level":{"operator":">=","value":level}})).scalars().all()
def assign_userrole(self,db: Session, user_id: int, roles: t.List[int]): def assign_userrole(self,db: Session, user_id: int, roles: t.List[int]):
db_user = super().get(db,user_id) db_user = super().get(db,user_id)

View File

@@ -9,6 +9,11 @@ from app.db.session import Base, get_db
from app.db import models,schemas from app.db import models,schemas
from app.main import app from app.main import app
from app.core import security
import jwt
SQLALCHEMY_DATABASE_URI = f"postgresql+psycopg2://kabAdmin:P%40ssw0rd!@kintonetooldb.postgres.database.azure.com/test" SQLALCHEMY_DATABASE_URI = f"postgresql+psycopg2://kabAdmin:P%40ssw0rd!@kintonetooldb.postgres.database.azure.com/test"
engine = create_engine(SQLALCHEMY_DATABASE_URI,echo=True) engine = create_engine(SQLALCHEMY_DATABASE_URI,echo=True)
@@ -100,6 +105,38 @@ def login_admin(test_db,test_client,admin,password):
response = test_client.post("/api/token", data={"username": admin.email, "password":password }) response = test_client.post("/api/token", data={"username": admin.email, "password":password })
return response.json()["access_token"] return response.json()["access_token"]
@pytest.fixture(scope="session")
def login_user_id(login_user):
payload = jwt.decode(login_user, security.SECRET_KEY, algorithms=[security.ALGORITHM])
id = payload.get("sub")
return id
@pytest.fixture(scope="session")
def login_admin_id(login_admin):
payload = jwt.decode(login_admin, security.SECRET_KEY, algorithms=[security.ALGORITHM])
id = payload.get("sub")
return id
@pytest.fixture(scope="session")
def test_domain(test_db,login_user_id):
domain = models.Domain(
tenantid = "1",
name = "テスト環境",
url = "https://alicorn.cybozu.com",
kintoneuser = "maxz",
kintonepwd = security.chacha20Encrypt("maxz1205"),
is_active = True,
createuserid =login_user_id,
updateuserid =login_user_id,
ownerid = login_user_id
)
test_db.add(domain)
test_db.commit()
test_db.refresh(domain)
return domain
# @pytest.fixture # @pytest.fixture
# def test_password() -> str: # def test_password() -> str:
# return "securepassword" # return "securepassword"

View File

@@ -0,0 +1,70 @@
def test_get_domains(test_client,test_domain,login_user):
response = test_client.get("/api/domains",headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
data = response.json()
assert len(data["data"]) == 1
assert data["data"][0]["name"] == test_domain.name
def test_create_domain(test_client, login_user,login_user_id):
create_domain ={
"id": 0,
"tenantid": "1",
"name": "abc",
"url": "efg",
"kintoneuser": "eee",
"kintonepwd": "fff",
"is_active": True,
}
response = test_client.post("/api/domain", json=create_domain,headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
data = response.json()
assert "data" in data
assert data["data"] is not None
assert data["data"]["name"] == create_domain["name"]
assert data["data"]["url"] == create_domain["url"]
assert data["data"]["kintoneuser"] == create_domain["kintoneuser"]
assert data["data"]["is_active"] == create_domain["is_active"]
assert data["data"]["owner"]["id"] == login_user_id
def test_edit_domain(test_client, test_domain, login_user):
update_domain ={
"id": test_domain.id,
"tenantid": "1",
"name": "テスト環境abc",
"url": test_domain.url,
"kintoneuser": test_domain.kintoneuser,
"is_active": True
}
response = test_client.put("/api/domain", json=update_domain,headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
data = response.json()
assert data["data"]["name"] == update_domain["name"]
assert data["data"]["name"] == update_domain["name"]
assert data["data"]["url"] == update_domain["url"]
assert data["data"]["kintoneuser"] == update_domain["kintoneuser"]
assert data["data"]["is_active"] == update_domain["is_active"]
def test_delete_domain(test_client, login_user):
delete_domain ={
"id": 0,
"tenantid": "1",
"name": "delete",
"url": "delete",
"kintoneuser": "delete",
"kintonepwd": "delete",
"is_active": True,
}
response = test_client.post("/api/domain", json=delete_domain,headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
data = response.json()
assert "data" in data
assert data["data"] is not None
id = data["data"]["id"]
response = test_client.delete("/api/domain/"+ str(id),headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
assert response.json()["data"]["name"] == delete_domain["name"]
response = test_client.get("/api/domain/"+ str(id), headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
assert "data" not in response.json()

View File

@@ -1,9 +1,4 @@
from fastapi import security
import jwt
def test_users_list(test_client,login_user): def test_users_list(test_client,login_user):
response = test_client.get("/api/v1/users", headers={"Authorization": "Bearer " + login_user}) response = test_client.get("/api/v1/users", headers={"Authorization": "Bearer " + login_user})
@@ -32,6 +27,7 @@ def test_user_create(test_client,login_user):
data = response.json() data = response.json()
assert "data" in data assert "data" in data
assert data["data"] is not None assert data["data"] is not None
assert data["data"]["id"] > 0
assert data["data"]["email"] == user_data["email"] assert data["data"]["email"] == user_data["email"]
assert data["data"]["first_name"] == user_data["first_name"] assert data["data"]["first_name"] == user_data["first_name"]
assert data["data"]["last_name"] == user_data["last_name"] assert data["data"]["last_name"] == user_data["last_name"]
@@ -66,20 +62,60 @@ def test_admin_create_for_admin(test_client,login_admin):
data = response.json() data = response.json()
assert "data" in data assert "data" in data
assert data["data"] is not None assert data["data"] is not None
assert data["data"]["id"] > 0
assert data["data"]["email"] == user_data["email"] assert data["data"]["email"] == user_data["email"]
assert data["data"]["first_name"] == user_data["first_name"] assert data["data"]["first_name"] == user_data["first_name"]
assert data["data"]["last_name"] == user_data["last_name"] assert data["data"]["last_name"] == user_data["last_name"]
assert data["data"]["is_active"] == user_data["is_active"] assert data["data"]["is_active"] == user_data["is_active"]
assert data["data"]["is_superuser"] == user_data["is_superuser"] assert data["data"]["is_superuser"] == user_data["is_superuser"]
def test_user_details(test_client, login_user,user): def test_user_details(test_client, login_user_id, login_user,user):
payload = jwt.decode(login_user, security.SECRET_KEY, algorithms=[security.ALGORITHM]) id = login_user_id
id = payload.get("sub") response = test_client.get("/api/v1/users/"+ str(id), headers={"Authorization": "Bearer " + login_user})
response = test_client.get("/api/v1/users/"+ id)
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["data"]["email"] ==user["email"] data = response.json()
assert data["data"]["first_name"] == user["first_name"] assert data["data"]["email"] ==user.email
assert data["data"]["last_name"] == user["last_name"] assert data["data"]["first_name"] == user.first_name
assert data["data"]["is_active"] == user["is_active"] assert data["data"]["last_name"] == user.last_name
assert data["data"]["is_superuser"] == user["is_superuser"] assert data["data"]["is_active"] == user.is_active
assert response.json()["data"]["id"] == id assert data["data"]["is_superuser"] == user.is_superuser
assert data["data"]["id"] == id
def test_user_edit(test_client, login_user_id,login_user,user):
id = login_user_id
user_data = {
"email": user.email,
"first_name": "Updated",
"last_name": "test",
"is_active": True,
"is_superuser": False
}
response = test_client.put("/api/v1/users/" + str(id), json=user_data, headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
data = response.json()
assert data["data"]["email"] == user.email
assert data["data"]["first_name"] == user_data["first_name"]
assert data["data"]["last_name"] == user_data["last_name"]
assert data["data"]["is_active"] == user.is_active
assert data["data"]["id"] == id
def test_user_delete(test_client, login_user):
user_data = {
"email": "delete@example.com",
"password": "password123",
"first_name": "delete",
"last_name": "User",
"is_active": True,
"is_superuser": False
}
response = test_client.post("/api/v1/users", json=user_data, headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
data = response.json()
id = data["data"]["id"]
response = test_client.delete("/api/v1/users/"+ str(id),headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
assert response.json()["data"]["email"] == "delete@example.com"
response = test_client.get("/api/v1/users/"+ str(id), headers={"Authorization": "Bearer " + login_user})
assert response.status_code == 200
assert "data" not in response.json()