From 3aec0759274f15f9532cf7c18c6a13a62b44e561 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=20=E6=9F=8F?= Date: Sat, 7 Dec 2024 21:37:33 +0900 Subject: [PATCH] SQLAlchemy 1.0->SQLAlchemy 2.x --- backend/app/api/api_v1/routers/platform.py | 18 +++++- backend/app/db/cruddb/crudbase.py | 24 ++++---- backend/app/db/cruddb/dbdomain.py | 16 ++--- backend/app/db/cruddb/dbuser.py | 8 +-- backend/app/tests/conftest.py | 37 ++++++++++++ backend/app/tests/test_domain.py | 70 ++++++++++++++++++++++ backend/app/tests/test_user.py | 68 ++++++++++++++++----- 7 files changed, 200 insertions(+), 41 deletions(-) diff --git a/backend/app/api/api_v1/routers/platform.py b/backend/app/api/api_v1/routers/platform.py index 1d712fd..9c3a76e 100644 --- a/backend/app/api/api_v1/routers/platform.py +++ b/backend/app/api/api_v1/routers/platform.py @@ -281,7 +281,7 @@ async def flow_delete( response_model=ApiReturnPage[Domain], response_model_exclude_none=True, ) -async def domain_details( +async def domain_list( request: Request, user=Depends(get_current_active_user), db=Depends(get_db), @@ -295,6 +295,22 @@ async def domain_details( except Exception as e: raise APIException('platform:domains',request.url._url,f"Error occurred while get domains:",e) +@r.get( + "/domain/{domain_id}",tags=["Domain"], + response_model=ApiReturnModel[Domain|None], + response_model_exclude_none=True, +) +async def domain_detail( + request: Request, + domain_id:int, + user=Depends(get_current_active_user), + db=Depends(get_db), +): + try: + return ApiReturnModel(data = dbdomain.get(db,domain_id)) + except Exception as e: + raise APIException('platform:domain',request.url._url,f"Error occurred while get domain detail:",e) + @r.post("/domain", tags=["Domain"], response_model=ApiReturnModel[Domain], response_model_exclude_none=True) diff --git a/backend/app/db/cruddb/crudbase.py b/backend/app/db/cruddb/crudbase.py index f7b05ce..d3f1ecb 100644 --- a/backend/app/db/cruddb/crudbase.py +++ b/backend/app/db/cruddb/crudbase.py @@ -1,4 +1,4 @@ -from sqlalchemy import asc, desc +from sqlalchemy import asc, desc, select from sqlalchemy.orm import Session from sqlalchemy.orm.query import Query from typing import Type, List, Optional @@ -46,11 +46,12 @@ class crudbase: and_conditions.append(column == value) if and_conditions: - query = query.filter(*and_conditions) + query = query.where(and_(*and_conditions)) if or_conditions: - query = query.filter(or_(*or_conditions)) - return query + query = query.where(or_(*or_conditions)) + return query + def _apply_sorting(self, query: Query, sort_by: Optional[str], sort_order: Optional[str]) -> Query: if sort_by: column = getattr(self.model, sort_by, None) @@ -61,12 +62,11 @@ class crudbase: query = query.order_by(asc(column)) return query - def get_all(self, db: Session) -> Query: - return db.query(self.model) + def get_all(self) -> Query: + return select(self.model) - def get(self, db: Session, item_id: int) -> Optional[models.Base]: - return db.query(self.model).get(item_id) + return db.execute(select(self.model).filter(self.model.id == item_id)).scalar_one_or_none() def create(self, db: Session, obj_in: BaseModel) -> models.Base: db_obj = self.model(**obj_in.model_dump()) @@ -76,7 +76,7 @@ class crudbase: return db_obj def update(self, db: Session, item_id: int, obj_in: BaseModel) -> Optional[models.Base]: - db_obj = db.query(self.model).filter(self.model.id == item_id).first() + db_obj = self.get(db,item_id) if db_obj: for key, value in obj_in.model_dump(exclude_unset=True).items(): setattr(db_obj, key, value) @@ -86,16 +86,16 @@ class crudbase: return None def delete(self, db: Session, item_id: int) -> Optional[models.Base]: - db_obj = db.query(self.model).get(item_id) + db_obj = self.get(db,item_id) if db_obj: db.delete(db_obj) db.commit() return db_obj return None - def get_by_conditions(self, db: Session, filters: Optional[dict] = None, sort_by: Optional[str] = None, + def get_by_conditions(self, filters: Optional[dict] = None, sort_by: Optional[str] = None, sort_order: Optional[str] = "asc") -> Query: - query = db.query(self.model) + query = select(self.model) if filters: query = self._apply_filters(query, filters) if sort_by: diff --git a/backend/app/db/cruddb/dbdomain.py b/backend/app/db/cruddb/dbdomain.py index 2acf05f..b6f017c 100644 --- a/backend/app/db/cruddb/dbdomain.py +++ b/backend/app/db/cruddb/dbdomain.py @@ -16,17 +16,17 @@ class dbuserdomain(crudbase): super().__init__(model=models.UserDomain) def get_userdomain(self,db: Session,userid:int,domainid:int): - return super().get_by_conditions(db,{"userid":userid,"domainid":domainid}).first() + return db.execute(super().get_by_conditions({"userid":userid,"domainid":domainid})).scalars().first() def get_userdomain_by_domainid(self,db: Session,ownerid:int,domainid:int): - return super().get_by_conditions(db,{"domainid":domainid}) + return super().get_by_conditions({"domainid":domainid}) def get_default_domains(self,db: Session,domainid:int): - return super().get_by_conditions(db,{"domainid":domainid,"is_default":True}).all() + return db.execute(super().get_by_conditions({"domainid":domainid,"is_default":True})).scalars().all() def get_user_default_domain(self,db: Session,userid:int): - return super().get_by_conditions(db,{"userid":userid,"is_default":True}).first() + return db.execute(super().get_by_conditions({"userid":userid,"is_default":True})).scalars().first() dbuserdomain = dbuserdomain() @@ -36,10 +36,10 @@ class dbdomain(crudbase): super().__init__(model=models.Domain) def get_domains(self,db: Session)-> ApiReturnPage[models.Base]: - return paginate(super().get_all(db)) + return paginate(db,super().get_all()) def get_domains_by_owner(self,db: Session,ownerid:int)-> ApiReturnPage[models.Base]: - return paginate( super().get_by_conditions(db,{"ownerid":ownerid})) + return paginate(db,super().get_by_conditions({"ownerid":ownerid})) def create_domain(self,db: Session, domain: schemas.DomainIn,userid:int): #db_domain = super().get_by_conditions(db,{"url":domain.url,"kintoneuser":domain.kintoneuser,"onwerid":userid}).first() @@ -79,8 +79,8 @@ class dbdomain(crudbase): return None def add_userdomain(self,db: Session,ownerid:int,userid:int,domainid:int) -> schemas.DomainOut: - db_domain = super().get_by_conditions(db,{"id":domainid,"is_active":True}).first() - if db_domain: + db_domain = super().get(db,domainid) + if db_domain and db_domain.is_active: db_userdomain = dbuserdomain.get_userdomain(db,userid,domainid) if not db_userdomain: user_domain = models.UserDomain(userid = userid, domainid = domainid ,createuserid = ownerid,updateuserid = ownerid) diff --git a/backend/app/db/cruddb/dbuser.py b/backend/app/db/cruddb/dbuser.py index 41e128b..848eb81 100644 --- a/backend/app/db/cruddb/dbuser.py +++ b/backend/app/db/cruddb/dbuser.py @@ -32,13 +32,13 @@ class dbuser(crudbase): return super().get(db,user_id) def get_user_by_email(self,db: Session, email: str) -> schemas.User: - return super().get_by_conditions(db,{"email":email}).first() + return db.execute(super().get_by_conditions({"email":email})).scalars().first() def get_users(self,db: Session) -> ApiReturnPage[models.Base]: - return paginate(super().get_all(db)) + return paginate(db,super().get_all()) def get_users_not_admin(self,db: Session) -> ApiReturnPage[models.Base]: - return paginate(super().get_by_conditions(db,{"is_superuser":False})) + return paginate(db,super().get_by_conditions({"is_superuser":False})) def create_user(self,db: Session, user: schemas.UserCreate,userid:int): hashed_password = get_password_hash(user.password) @@ -63,7 +63,7 @@ class dbuser(crudbase): return dbrole.get_all(db).all() def get_roles_by_level(self,db: Session,level:int) -> t.List[schemas.RoleBase]: - return dbrole.get_by_conditions(db,{"level":{"operator":">=","value":level}}).all() + return db.execute(dbrole.get_by_conditions({"level":{"operator":">=","value":level}})).scalars().all() def assign_userrole(self,db: Session, user_id: int, roles: t.List[int]): db_user = super().get(db,user_id) diff --git a/backend/app/tests/conftest.py b/backend/app/tests/conftest.py index 1e053d1..cc909f9 100644 --- a/backend/app/tests/conftest.py +++ b/backend/app/tests/conftest.py @@ -9,6 +9,11 @@ from app.db.session import Base, get_db from app.db import models,schemas from app.main import app + +from app.core import security +import jwt + + SQLALCHEMY_DATABASE_URI = f"postgresql+psycopg2://kabAdmin:P%40ssw0rd!@kintonetooldb.postgres.database.azure.com/test" engine = create_engine(SQLALCHEMY_DATABASE_URI,echo=True) @@ -100,6 +105,38 @@ def login_admin(test_db,test_client,admin,password): response = test_client.post("/api/token", data={"username": admin.email, "password":password }) return response.json()["access_token"] +@pytest.fixture(scope="session") +def login_user_id(login_user): + payload = jwt.decode(login_user, security.SECRET_KEY, algorithms=[security.ALGORITHM]) + id = payload.get("sub") + return id + +@pytest.fixture(scope="session") +def login_admin_id(login_admin): + payload = jwt.decode(login_admin, security.SECRET_KEY, algorithms=[security.ALGORITHM]) + id = payload.get("sub") + return id + +@pytest.fixture(scope="session") +def test_domain(test_db,login_user_id): + domain = models.Domain( + tenantid = "1", + name = "テスト環境", + url = "https://alicorn.cybozu.com", + kintoneuser = "maxz", + kintonepwd = security.chacha20Encrypt("maxz1205"), + is_active = True, + createuserid =login_user_id, + updateuserid =login_user_id, + ownerid = login_user_id + ) + test_db.add(domain) + test_db.commit() + test_db.refresh(domain) + return domain + + + # @pytest.fixture # def test_password() -> str: # return "securepassword" diff --git a/backend/app/tests/test_domain.py b/backend/app/tests/test_domain.py index e69de29..17b317e 100644 --- a/backend/app/tests/test_domain.py +++ b/backend/app/tests/test_domain.py @@ -0,0 +1,70 @@ + +def test_get_domains(test_client,test_domain,login_user): + response = test_client.get("/api/domains",headers={"Authorization": "Bearer " + login_user}) + assert response.status_code == 200 + data = response.json() + assert len(data["data"]) == 1 + assert data["data"][0]["name"] == test_domain.name + +def test_create_domain(test_client, login_user,login_user_id): + create_domain ={ + "id": 0, + "tenantid": "1", + "name": "abc", + "url": "efg", + "kintoneuser": "eee", + "kintonepwd": "fff", + "is_active": True, + } + response = test_client.post("/api/domain", json=create_domain,headers={"Authorization": "Bearer " + login_user}) + assert response.status_code == 200 + data = response.json() + assert "data" in data + assert data["data"] is not None + assert data["data"]["name"] == create_domain["name"] + assert data["data"]["url"] == create_domain["url"] + assert data["data"]["kintoneuser"] == create_domain["kintoneuser"] + assert data["data"]["is_active"] == create_domain["is_active"] + assert data["data"]["owner"]["id"] == login_user_id + +def test_edit_domain(test_client, test_domain, login_user): + update_domain ={ + "id": test_domain.id, + "tenantid": "1", + "name": "テスト環境abc", + "url": test_domain.url, + "kintoneuser": test_domain.kintoneuser, + "is_active": True + } + response = test_client.put("/api/domain", json=update_domain,headers={"Authorization": "Bearer " + login_user}) + assert response.status_code == 200 + data = response.json() + assert data["data"]["name"] == update_domain["name"] + assert data["data"]["name"] == update_domain["name"] + assert data["data"]["url"] == update_domain["url"] + assert data["data"]["kintoneuser"] == update_domain["kintoneuser"] + assert data["data"]["is_active"] == update_domain["is_active"] + + +def test_delete_domain(test_client, login_user): + delete_domain ={ + "id": 0, + "tenantid": "1", + "name": "delete", + "url": "delete", + "kintoneuser": "delete", + "kintonepwd": "delete", + "is_active": True, + } + response = test_client.post("/api/domain", json=delete_domain,headers={"Authorization": "Bearer " + login_user}) + assert response.status_code == 200 + data = response.json() + assert "data" in data + assert data["data"] is not None + id = data["data"]["id"] + response = test_client.delete("/api/domain/"+ str(id),headers={"Authorization": "Bearer " + login_user}) + assert response.status_code == 200 + assert response.json()["data"]["name"] == delete_domain["name"] + response = test_client.get("/api/domain/"+ str(id), headers={"Authorization": "Bearer " + login_user}) + assert response.status_code == 200 + assert "data" not in response.json() \ No newline at end of file diff --git a/backend/app/tests/test_user.py b/backend/app/tests/test_user.py index ec608d6..e7fa2a4 100644 --- a/backend/app/tests/test_user.py +++ b/backend/app/tests/test_user.py @@ -1,9 +1,4 @@ - -from fastapi import security -import jwt - - def test_users_list(test_client,login_user): response = test_client.get("/api/v1/users", headers={"Authorization": "Bearer " + login_user}) @@ -32,6 +27,7 @@ def test_user_create(test_client,login_user): data = response.json() assert "data" in data assert data["data"] is not None + assert data["data"]["id"] > 0 assert data["data"]["email"] == user_data["email"] assert data["data"]["first_name"] == user_data["first_name"] assert data["data"]["last_name"] == user_data["last_name"] @@ -66,20 +62,60 @@ def test_admin_create_for_admin(test_client,login_admin): data = response.json() assert "data" in data assert data["data"] is not None + assert data["data"]["id"] > 0 assert data["data"]["email"] == user_data["email"] assert data["data"]["first_name"] == user_data["first_name"] assert data["data"]["last_name"] == user_data["last_name"] assert data["data"]["is_active"] == user_data["is_active"] assert data["data"]["is_superuser"] == user_data["is_superuser"] - def test_user_details(test_client, login_user,user): - payload = jwt.decode(login_user, security.SECRET_KEY, algorithms=[security.ALGORITHM]) - id = payload.get("sub") - response = test_client.get("/api/v1/users/"+ id) - assert response.status_code == 200 - assert response.json()["data"]["email"] ==user["email"] - assert data["data"]["first_name"] == user["first_name"] - assert data["data"]["last_name"] == user["last_name"] - assert data["data"]["is_active"] == user["is_active"] - assert data["data"]["is_superuser"] == user["is_superuser"] - assert response.json()["data"]["id"] == id \ No newline at end of file +def test_user_details(test_client, login_user_id, login_user,user): + id = login_user_id + response = test_client.get("/api/v1/users/"+ str(id), headers={"Authorization": "Bearer " + login_user}) + assert response.status_code == 200 + data = response.json() + assert data["data"]["email"] ==user.email + assert data["data"]["first_name"] == user.first_name + assert data["data"]["last_name"] == user.last_name + assert data["data"]["is_active"] == user.is_active + assert data["data"]["is_superuser"] == user.is_superuser + assert data["data"]["id"] == id + +def test_user_edit(test_client, login_user_id,login_user,user): + id = login_user_id + user_data = { + "email": user.email, + "first_name": "Updated", + "last_name": "test", + "is_active": True, + "is_superuser": False + } + response = test_client.put("/api/v1/users/" + str(id), json=user_data, headers={"Authorization": "Bearer " + login_user}) + + assert response.status_code == 200 + data = response.json() + assert data["data"]["email"] == user.email + assert data["data"]["first_name"] == user_data["first_name"] + assert data["data"]["last_name"] == user_data["last_name"] + assert data["data"]["is_active"] == user.is_active + assert data["data"]["id"] == id + +def test_user_delete(test_client, login_user): + user_data = { + "email": "delete@example.com", + "password": "password123", + "first_name": "delete", + "last_name": "User", + "is_active": True, + "is_superuser": False + } + response = test_client.post("/api/v1/users", json=user_data, headers={"Authorization": "Bearer " + login_user}) + assert response.status_code == 200 + data = response.json() + id = data["data"]["id"] + response = test_client.delete("/api/v1/users/"+ str(id),headers={"Authorization": "Bearer " + login_user}) + assert response.status_code == 200 + assert response.json()["data"]["email"] == "delete@example.com" + response = test_client.get("/api/v1/users/"+ str(id), headers={"Authorization": "Bearer " + login_user}) + assert response.status_code == 200 + assert "data" not in response.json() \ No newline at end of file