From 2823364148bacb796974ffee9c391c71b4e0b59b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=B9=20=E6=9F=8F?= Date: Sat, 14 Dec 2024 10:32:32 +0900 Subject: [PATCH] add manage domain function --- backend/app/api/api_v1/routers/platform.py | 78 +++++++++++++++++--- backend/app/db/cruddb/dbdomain.py | 86 ++++++++++++++++++++-- backend/app/db/models.py | 12 +++ backend/app/db/schemas.py | 4 + backend/app/tests/conftest.py | 7 +- backend/app/tests/test_domain.py | 78 ++++++++++++++++++-- backend/app/tests/test_user.py | 4 +- 7 files changed, 242 insertions(+), 27 deletions(-) diff --git a/backend/app/api/api_v1/routers/platform.py b/backend/app/api/api_v1/routers/platform.py index aa28460..fdc7d0f 100644 --- a/backend/app/api/api_v1/routers/platform.py +++ b/backend/app/api/api_v1/routers/platform.py @@ -347,7 +347,7 @@ async def domain_list( if user.is_superuser: domains = domainService.get_domains(db) else: - domains = domainService.get_domains_by_owner(db,user.id) + domains = domainService.get_domains_by_manage(db,user.id) return domains except Exception as e: raise APIException('platform:domains',request.url._url,f"Error occurred while get domains:",e) @@ -416,7 +416,7 @@ async def domain_delete( try: return ApiReturnModel(data = domainService.delete_domain(db,id)) except Exception as e: - raise APIException('platform:domain',request.url._url,f"Error occurred while delete domain:",e) + raise APIException('platform:domain',request.url._url,f"Error occurred while delete domain({id}):",e) @r.get( "/domain", @@ -433,28 +433,29 @@ async def userdomain_details( domains = get_domain(db, userId if userId is not None else user.id) return domains except Exception as e: - raise APIException('platform:domain',request.url._url,f"Error occurred while get user({user.id}) domain:",e) + raise APIException('platform:userdomain',request.url._url,f"Error occurred while get user({user.id}) domain:",e) @r.post( - "/domain/{userid}",tags=["Domain"], + "/userdomain",tags=["Domain"], response_model=ApiReturnModel[DomainOut|None], response_model_exclude_none=True, ) async def create_userdomain( request: Request, - userid: int, - domainid:int , + userdomain:UserDomainParam, user=Depends(get_current_active_user), db=Depends(get_db), ): try: + userid = userdomain.userid + domainid = userdomain.domainid if user.is_superuser: domain = domainService.add_userdomain(db,user.id,userid,domainid) else: domain = domainService.add_userdomain_by_owner(db,user.id,userid,domainid) return ApiReturnModel(data = domain) except Exception as e: - raise APIException('platform:domain',request.url._url,f"Error occurred while add user({userid}) domain:",e) + raise APIException('platform:userdomain',request.url._url,f"Error occurred while add user({userid}) domain({domainid}):",e) @r.delete( "/domain/{domainid}/{userid}",tags=["Domain"], @@ -471,9 +472,68 @@ async def delete_userdomain( try: return ApiReturnModel(data = domainService.delete_userdomain(db,userid,domainid)) except Exception as e: - raise APIException('platform:delete',request.url._url,f"Error occurred while delete user({userid}) domain:",e) + raise APIException('platform:userdomain',request.url._url,f"Error occurred while delete user({userid}) domain({domainid}):",e) + +@r.get( + "/managedomainuser/{domainid}",tags=["Domain"], + response_model=ApiReturnPage[UserOut|None], + response_model_exclude_none=True, +) +async def get_managedomainuser( + request: Request, + domainid:int, + user=Depends(get_current_active_user), + db=Depends(get_db), +): + try: + return domainService.get_managedomain_users(db,domainid) + except Exception as e: + raise APIException('platform:managedomain',request.url._url,f"Error occurred while get managedomain({user.id}) user:",e) + + + + +@r.post( + "/managedomain",tags=["Domain"], + response_model=ApiReturnModel[DomainOut|None], + response_model_exclude_none=True, +) +async def create_managedomain( + request: Request, + userdomain:UserDomainParam, + user=Depends(get_current_active_user), + db=Depends(get_db), +): + try: + userid = userdomain.userid + domainid = userdomain.domainid + if user.is_superuser: + domain = domainService.add_managedomain(db,user.id,userid,domainid) + else: + domain = domainService.add_managedomain_by_owner(db,user.id,userid,domainid) + return ApiReturnModel(data = domain) + except Exception as e: + raise APIException('platform:managedomain',request.url._url,f"Error occurred while add manage({userid}) domain({domainid}):",e) + +@r.delete( + "/managedomain/{domainid}/{userid}",tags=["Domain"], + response_model=ApiReturnModel[DomainOut|None], + response_model_exclude_none=True, +) +async def delete_managedomain( + request: Request, + domainid:int, + userid: int, + user=Depends(get_current_active_user), + db=Depends(get_db), +): + try: + return ApiReturnModel(data = domainService.delete_managedomain(db,userid,domainid)) + except Exception as e: + raise APIException('platform:managedomain',request.url._url,f"Error occurred while delete managedomain({userid}) domain({domainid}):",e) + @r.get( "/defaultdomain",tags=["Domain"], response_model=ApiReturnModel[DomainOut|None], @@ -519,7 +579,7 @@ async def get_domainshareduser( db=Depends(get_db), ): try: - return domainService.get_shareddomain_users(db,user.id,domainid) + return domainService.get_shareddomain_users(db,domainid) except Exception as e: raise APIException('platform:sharedomain',request.url._url,f"Error occurred while get user({user.id}) sharedomain:",e) diff --git a/backend/app/db/cruddb/dbdomain.py b/backend/app/db/cruddb/dbdomain.py index 36e969d..b16ff04 100644 --- a/backend/app/db/cruddb/dbdomain.py +++ b/backend/app/db/cruddb/dbdomain.py @@ -31,6 +31,18 @@ class dbuserdomain(crudbase): dbuserdomain = dbuserdomain() +class dbmanagedomain(crudbase): + def __init__(self): + super().__init__(model=models.ManageDomain) + + def get_managedomain(self,db: Session,userid:int,domainid:int): + return db.execute(super().get_by_conditions({"userid":userid,"domainid":domainid})).scalars().first() + + def get_managedomain_by_domain(self,db: Session,domainid:int): + return db.execute(super().get_by_conditions({"domainid":domainid})).scalars().all() + +dbmanagedomain = dbmanagedomain() + class dbdomain(crudbase): def __init__(self): super().__init__(model=models.Domain) @@ -38,6 +50,10 @@ class dbdomain(crudbase): def get_domains(self,db: Session)-> ApiReturnPage[models.Base]: return paginate(db,super().get_all()) + def get_domains_by_manage(self,db: Session,userid:int)-> ApiReturnPage[models.Base]: + query = select(models.Domain).join(models.ManageDomain,models.ManageDomain.domainid == models.Domain.id).where(models.ManageDomain.userid == userid) + return paginate(db,query) + def get_domains_by_owner(self,db: Session,ownerid:int)-> ApiReturnPage[models.Base]: return paginate(db,super().get_by_conditions({"ownerid":ownerid})) @@ -45,14 +61,31 @@ class dbdomain(crudbase): #db_domain = super().get_by_conditions(db,{"url":domain.url,"kintoneuser":domain.kintoneuser,"onwerid":userid}).first() #if not db_domain: domain.encrypt_kintonepwd() - domain.id = None - domain.createuserid = userid - domain.updateuserid = userid - domain.ownerid = userid - return super().create(db,domain) - #return db_domain + db_domain = models.Domain( + tenantid = domain.tenantid, + name = domain.name, + url = domain.url, + kintoneuser = domain.kintoneuser, + kintonepwd = domain.kintonepwd, + is_active = domain.is_active, + createuserid = userid, + updateuserid = userid, + ownerid = userid + ) + db.add(db_domain) + db.flush() + user_domain = models.UserDomain(userid = userid, domainid = db_domain.id ,createuserid = userid,updateuserid = userid) + db.add(user_domain) + manage_domain = models.ManageDomain(userid = userid, domainid = db_domain.id ,createuserid = userid,updateuserid = userid) + db.add(manage_domain) + db.commit() + db.refresh(db_domain) + return db_domain def delete_domain(self,db: Session,id: int): + db_managedomains = dbmanagedomain.get_managedomain_by_domain(db,id) + for manage in db_managedomains: + db.delete(manage) return super().delete(db,id) def edit_domain(self,db: Session, domain: schemas.DomainIn,userid:int) -> schemas.DomainOut: @@ -90,7 +123,7 @@ class dbdomain(crudbase): return None def add_userdomain_by_owner(self,db: Session,ownerid:int, userid:int,domainid:int) -> schemas.DomainOut: - db_domain = super().get_by_conditions(db,{"id":domainid,"ownerid":ownerid,"is_active":True}).first() + db_domain = db.execute(super().get_by_conditions({"id":domainid,"is_active":True})).scalars().first() if db_domain: db_userdomain = dbuserdomain.get_userdomain(db,userid,domainid) if not db_userdomain: @@ -141,8 +174,45 @@ class dbdomain(crudbase): return None - def get_shareddomain_users(self,db: Session,ownerid:int,domainid: int) -> ApiReturnPage[models.Base]: + def get_shareddomain_users(self,db: Session,domainid: int) -> ApiReturnPage[models.Base]: users = select(models.User).join(models.UserDomain,models.UserDomain.userid == models.User.id).filter(models.UserDomain.domainid ==domainid) return paginate(db,users) + + def add_managedomain(self,db: Session,ownerid:int,userid:int,domainid:int) -> schemas.DomainOut: + db_domain = self.get_domains_by_owner + if db_domain: + db_managedomain = dbmanagedomain.get_managedomain(db,userid,domainid) + if not db_managedomain: + manage_domain = models.ManageDomain(userid = userid, domainid = domainid ,createuserid = ownerid,updateuserid = ownerid) + db.add(manage_domain) + db.commit() + return db_domain + return None + + def add_managedomain_by_owner(self,db: Session,ownerid:int, userid:int,domainid:int) -> schemas.DomainOut: + db_domain = db.execute(super().get_by_conditions({"id":domainid,"ownerid":ownerid,})).scalars().first() + if db_domain: + db_managedomain = dbmanagedomain.get_managedomain(db,userid,domainid) + if not db_managedomain: + manage_domain = models.ManageDomain(userid = userid, domainid = domainid ,createuserid =ownerid,updateuserid = ownerid) + db.add(manage_domain) + db.commit() + return db_domain + return None + + def delete_managedomain(self,db: Session, userid: int,domainid: int) -> schemas.DomainOut: + db_managedomain = dbmanagedomain.get_managedomain(db,userid,domainid) + if db_managedomain: + domain = db_managedomain.domain + if domain.ownerid != userid: + db.delete(db_managedomain) + db.commit() + return domain + return None + + def get_managedomain_users(self,db: Session,domainid: int) -> ApiReturnPage[models.Base]: + users = select(models.User).join(models.ManageDomain,models.ManageDomain.userid == models.User.id).where(models.ManageDomain.domainid ==domainid) + return paginate(db,users) + domainService = dbdomain() \ No newline at end of file diff --git a/backend/app/db/models.py b/backend/app/db/models.py index f53a6a6..7b997ff 100644 --- a/backend/app/db/models.py +++ b/backend/app/db/models.py @@ -183,6 +183,18 @@ class UserDomain(Base): createuser = relationship('User',foreign_keys=[createuserid]) updateuser = relationship('User',foreign_keys=[updateuserid]) +class ManageDomain(Base): + __tablename__ = "managedomain" + + userid = mapped_column(Integer,ForeignKey("user.id")) + domainid = mapped_column(Integer,ForeignKey("domain.id")) + createuserid = mapped_column(Integer,ForeignKey("user.id")) + updateuserid = mapped_column(Integer,ForeignKey("user.id")) + domain = relationship("Domain") + user = relationship("User",foreign_keys=[userid]) + createuser = relationship('User',foreign_keys=[createuserid]) + updateuser = relationship('User',foreign_keys=[updateuserid]) + class Event(Base): __tablename__ = "event" diff --git a/backend/app/db/schemas.py b/backend/app/db/schemas.py index 36f47cf..ca75ae1 100644 --- a/backend/app/db/schemas.py +++ b/backend/app/db/schemas.py @@ -192,6 +192,10 @@ class DomainOut(BaseModel): class ConfigDict: orm_mode = True +class UserDomainParam(BaseModel): + userid:int + domainid:int + class UserDomain(BaseModel): id: int is_default: bool diff --git a/backend/app/tests/conftest.py b/backend/app/tests/conftest.py index 030e51d..a989443 100644 --- a/backend/app/tests/conftest.py +++ b/backend/app/tests/conftest.py @@ -43,7 +43,7 @@ def test_client(test_db): yield test_client -@pytest.fixture(scope="function") +@pytest.fixture(scope="session") def test_user(test_db): password ="test" user = models.User( @@ -131,6 +131,11 @@ def test_domain(test_db,login_user_id): ownerid = login_user_id ) test_db.add(domain) + test_db.flush() + user_domain = models.UserDomain(userid = login_user_id, domainid = domain.id ,createuserid = login_user_id,updateuserid = login_user_id) + test_db.add(user_domain) + manage_domain = models.ManageDomain(userid = login_user_id, domainid = domain.id ,createuserid = login_user_id,updateuserid = login_user_id) + test_db.add(manage_domain) test_db.commit() test_db.refresh(domain) return domain diff --git a/backend/app/tests/test_domain.py b/backend/app/tests/test_domain.py index f923f3d..2562009 100644 --- a/backend/app/tests/test_domain.py +++ b/backend/app/tests/test_domain.py @@ -20,8 +20,9 @@ def test_create_domain(test_client, login_user,login_user_id): "is_active": True, } response = test_client.post("/api/domain", json=create_domain,headers={"Authorization": "Bearer " + login_user}) - assert response.status_code == 200 data = response.json() + logging.error(data) + assert response.status_code == 200 assert "data" in data assert data["data"] is not None assert data["data"]["name"] == create_domain["name"] @@ -29,8 +30,63 @@ def test_create_domain(test_client, login_user,login_user_id): assert data["data"]["kintoneuser"] == create_domain["kintoneuser"] assert data["data"]["is_active"] == create_domain["is_active"] assert data["data"]["owner"]["id"] == login_user_id - -def test_delete_domain(test_client, login_user): + + +def test_get_managedomainuser(test_client,test_domain,login_user,login_user_id): + response = test_client.get("/api/managedomainuser/" + str(test_domain.id),headers={"Authorization": "Bearer " + login_user}) + data = response.json() + logging.error(data) + assert response.status_code == 200 + assert "data" in data + assert len(data["data"]) == 1 + assert data["data"][0]["id"] == login_user_id + + +def test_add_delete_userdomain(test_client,test_domain,test_user,login_user,login_user_id): + userdomain ={ + "userid":test_user["id"], + "domainid":test_domain.id + } + response = test_client.post("/api/userdomain" , json=userdomain,headers={"Authorization": "Bearer " + login_user}) + data = response.json() + logging.error(data) + assert response.status_code == 200 + assert "data" in data + assert data["data"] is not None + assert data["data"]["name"] == test_domain.name + assert data["data"]["url"] == test_domain.url + response = test_client.delete(f"/api/domain/{test_domain.id}/{test_user["id"]}" , headers={"Authorization": "Bearer " + login_user}) + data = response.json() + logging.error(data) + assert "data" in data + assert data["data"] is not None + assert data["data"]["name"] == test_domain.name + assert data["data"]["url"] == test_domain.url + + +def test_add_delete_managedomain(test_client,test_domain,test_user,login_user,login_user_id): + userdomain ={ + "userid":test_user["id"], + "domainid":test_domain.id + } + response = test_client.post("/api/managedomain" , json=userdomain,headers={"Authorization": "Bearer " + login_user}) + data = response.json() + logging.error(data) + assert response.status_code == 200 + assert "data" in data + assert data["data"] is not None + assert data["data"]["name"] == test_domain.name + assert data["data"]["url"] == test_domain.url + response = test_client.delete(f"/api/managedomain/{test_domain.id}/{test_user["id"]}" , headers={"Authorization": "Bearer " + login_user}) + data = response.json() + logging.error(data) + assert "data" in data + assert data["data"] is not None + assert data["data"]["name"] == test_domain.name + assert data["data"]["url"] == test_domain.url + + +def test_delete_domain(test_client, login_user,login_user_id): delete_domain ={ "id": 0, "tenantid": "1", @@ -41,15 +97,22 @@ def test_delete_domain(test_client, login_user): "is_active": True, } response = test_client.post("/api/domain", json=delete_domain,headers={"Authorization": "Bearer " + login_user}) - assert response.status_code == 200 data = response.json() + assert response.status_code == 200 assert "data" in data assert data["data"] is not None id = data["data"]["id"] - response = test_client.delete("/api/domain/"+ str(id),headers={"Authorization": "Bearer " + login_user}) + + response = test_client.delete(f"/api/domain/{id}/{login_user_id}",headers={"Authorization": "Bearer " + login_user}) + data = response.json() + logging.error(data) + + response = test_client.delete(f"/api/domain/{id}",headers={"Authorization": "Bearer " + login_user}) + data = response.json() + logging.error(data) assert response.status_code == 200 assert response.json()["data"]["name"] == delete_domain["name"] - response = test_client.get("/api/domain/"+ str(id), headers={"Authorization": "Bearer " + login_user}) + response = test_client.get(f"/api/domain/{id}", headers={"Authorization": "Bearer " + login_user}) assert response.status_code == 200 assert "data" not in response.json() @@ -83,8 +146,9 @@ def test_get_defaultuserdomain(test_client, test_domain,login_user): def test_get_domainshareduser(test_client, test_domain,login_user,login_user_id): response = test_client.get("/api/domainshareduser/"+str(test_domain.id), headers={"Authorization": "Bearer " + login_user}) - assert response.status_code == 200 data = response.json() + logging.error(data) + assert response.status_code == 200 assert "data" in data assert data["data"] is not None assert len(data["data"]) == 1 diff --git a/backend/app/tests/test_user.py b/backend/app/tests/test_user.py index 83bdff0..2b2c142 100644 --- a/backend/app/tests/test_user.py +++ b/backend/app/tests/test_user.py @@ -16,7 +16,7 @@ def test_users_list_for_admin(test_client,login_admin): def test_user_create(test_client,login_user): user_data = { - "email": "newuser@example.com", + "email": "newuser1@example.com", "password": "password123", "first_name": "New", "last_name": "User", @@ -37,7 +37,7 @@ def test_user_create(test_client,login_user): def test_admin_create(test_client,login_user): user_data = { - "email": "newuser@example.com", + "email": "newuser2@example.com", "password": "password123", "first_name": "New", "last_name": "User",