add manage domain function
This commit is contained in:
@@ -347,7 +347,7 @@ async def domain_list(
|
|||||||
if user.is_superuser:
|
if user.is_superuser:
|
||||||
domains = domainService.get_domains(db)
|
domains = domainService.get_domains(db)
|
||||||
else:
|
else:
|
||||||
domains = domainService.get_domains_by_owner(db,user.id)
|
domains = domainService.get_domains_by_manage(db,user.id)
|
||||||
return domains
|
return domains
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise APIException('platform:domains',request.url._url,f"Error occurred while get domains:",e)
|
raise APIException('platform:domains',request.url._url,f"Error occurred while get domains:",e)
|
||||||
@@ -416,7 +416,7 @@ async def domain_delete(
|
|||||||
try:
|
try:
|
||||||
return ApiReturnModel(data = domainService.delete_domain(db,id))
|
return ApiReturnModel(data = domainService.delete_domain(db,id))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise APIException('platform:domain',request.url._url,f"Error occurred while delete domain:",e)
|
raise APIException('platform:domain',request.url._url,f"Error occurred while delete domain({id}):",e)
|
||||||
|
|
||||||
@r.get(
|
@r.get(
|
||||||
"/domain",
|
"/domain",
|
||||||
@@ -433,28 +433,29 @@ async def userdomain_details(
|
|||||||
domains = get_domain(db, userId if userId is not None else user.id)
|
domains = get_domain(db, userId if userId is not None else user.id)
|
||||||
return domains
|
return domains
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise APIException('platform:domain',request.url._url,f"Error occurred while get user({user.id}) domain:",e)
|
raise APIException('platform:userdomain',request.url._url,f"Error occurred while get user({user.id}) domain:",e)
|
||||||
|
|
||||||
@r.post(
|
@r.post(
|
||||||
"/domain/{userid}",tags=["Domain"],
|
"/userdomain",tags=["Domain"],
|
||||||
response_model=ApiReturnModel[DomainOut|None],
|
response_model=ApiReturnModel[DomainOut|None],
|
||||||
response_model_exclude_none=True,
|
response_model_exclude_none=True,
|
||||||
)
|
)
|
||||||
async def create_userdomain(
|
async def create_userdomain(
|
||||||
request: Request,
|
request: Request,
|
||||||
userid: int,
|
userdomain:UserDomainParam,
|
||||||
domainid:int ,
|
|
||||||
user=Depends(get_current_active_user),
|
user=Depends(get_current_active_user),
|
||||||
db=Depends(get_db),
|
db=Depends(get_db),
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
|
userid = userdomain.userid
|
||||||
|
domainid = userdomain.domainid
|
||||||
if user.is_superuser:
|
if user.is_superuser:
|
||||||
domain = domainService.add_userdomain(db,user.id,userid,domainid)
|
domain = domainService.add_userdomain(db,user.id,userid,domainid)
|
||||||
else:
|
else:
|
||||||
domain = domainService.add_userdomain_by_owner(db,user.id,userid,domainid)
|
domain = domainService.add_userdomain_by_owner(db,user.id,userid,domainid)
|
||||||
return ApiReturnModel(data = domain)
|
return ApiReturnModel(data = domain)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise APIException('platform:domain',request.url._url,f"Error occurred while add user({userid}) domain:",e)
|
raise APIException('platform:userdomain',request.url._url,f"Error occurred while add user({userid}) domain({domainid}):",e)
|
||||||
|
|
||||||
@r.delete(
|
@r.delete(
|
||||||
"/domain/{domainid}/{userid}",tags=["Domain"],
|
"/domain/{domainid}/{userid}",tags=["Domain"],
|
||||||
@@ -471,9 +472,68 @@ async def delete_userdomain(
|
|||||||
try:
|
try:
|
||||||
return ApiReturnModel(data = domainService.delete_userdomain(db,userid,domainid))
|
return ApiReturnModel(data = domainService.delete_userdomain(db,userid,domainid))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise APIException('platform:delete',request.url._url,f"Error occurred while delete user({userid}) domain:",e)
|
raise APIException('platform:userdomain',request.url._url,f"Error occurred while delete user({userid}) domain({domainid}):",e)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@r.get(
|
||||||
|
"/managedomainuser/{domainid}",tags=["Domain"],
|
||||||
|
response_model=ApiReturnPage[UserOut|None],
|
||||||
|
response_model_exclude_none=True,
|
||||||
|
)
|
||||||
|
async def get_managedomainuser(
|
||||||
|
request: Request,
|
||||||
|
domainid:int,
|
||||||
|
user=Depends(get_current_active_user),
|
||||||
|
db=Depends(get_db),
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
return domainService.get_managedomain_users(db,domainid)
|
||||||
|
except Exception as e:
|
||||||
|
raise APIException('platform:managedomain',request.url._url,f"Error occurred while get managedomain({user.id}) user:",e)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@r.post(
|
||||||
|
"/managedomain",tags=["Domain"],
|
||||||
|
response_model=ApiReturnModel[DomainOut|None],
|
||||||
|
response_model_exclude_none=True,
|
||||||
|
)
|
||||||
|
async def create_managedomain(
|
||||||
|
request: Request,
|
||||||
|
userdomain:UserDomainParam,
|
||||||
|
user=Depends(get_current_active_user),
|
||||||
|
db=Depends(get_db),
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
userid = userdomain.userid
|
||||||
|
domainid = userdomain.domainid
|
||||||
|
if user.is_superuser:
|
||||||
|
domain = domainService.add_managedomain(db,user.id,userid,domainid)
|
||||||
|
else:
|
||||||
|
domain = domainService.add_managedomain_by_owner(db,user.id,userid,domainid)
|
||||||
|
return ApiReturnModel(data = domain)
|
||||||
|
except Exception as e:
|
||||||
|
raise APIException('platform:managedomain',request.url._url,f"Error occurred while add manage({userid}) domain({domainid}):",e)
|
||||||
|
|
||||||
|
@r.delete(
|
||||||
|
"/managedomain/{domainid}/{userid}",tags=["Domain"],
|
||||||
|
response_model=ApiReturnModel[DomainOut|None],
|
||||||
|
response_model_exclude_none=True,
|
||||||
|
)
|
||||||
|
async def delete_managedomain(
|
||||||
|
request: Request,
|
||||||
|
domainid:int,
|
||||||
|
userid: int,
|
||||||
|
user=Depends(get_current_active_user),
|
||||||
|
db=Depends(get_db),
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
return ApiReturnModel(data = domainService.delete_managedomain(db,userid,domainid))
|
||||||
|
except Exception as e:
|
||||||
|
raise APIException('platform:managedomain',request.url._url,f"Error occurred while delete managedomain({userid}) domain({domainid}):",e)
|
||||||
|
|
||||||
@r.get(
|
@r.get(
|
||||||
"/defaultdomain",tags=["Domain"],
|
"/defaultdomain",tags=["Domain"],
|
||||||
response_model=ApiReturnModel[DomainOut|None],
|
response_model=ApiReturnModel[DomainOut|None],
|
||||||
@@ -519,7 +579,7 @@ async def get_domainshareduser(
|
|||||||
db=Depends(get_db),
|
db=Depends(get_db),
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
return domainService.get_shareddomain_users(db,user.id,domainid)
|
return domainService.get_shareddomain_users(db,domainid)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise APIException('platform:sharedomain',request.url._url,f"Error occurred while get user({user.id}) sharedomain:",e)
|
raise APIException('platform:sharedomain',request.url._url,f"Error occurred while get user({user.id}) sharedomain:",e)
|
||||||
|
|
||||||
|
|||||||
@@ -31,6 +31,18 @@ class dbuserdomain(crudbase):
|
|||||||
|
|
||||||
dbuserdomain = dbuserdomain()
|
dbuserdomain = dbuserdomain()
|
||||||
|
|
||||||
|
class dbmanagedomain(crudbase):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(model=models.ManageDomain)
|
||||||
|
|
||||||
|
def get_managedomain(self,db: Session,userid:int,domainid:int):
|
||||||
|
return db.execute(super().get_by_conditions({"userid":userid,"domainid":domainid})).scalars().first()
|
||||||
|
|
||||||
|
def get_managedomain_by_domain(self,db: Session,domainid:int):
|
||||||
|
return db.execute(super().get_by_conditions({"domainid":domainid})).scalars().all()
|
||||||
|
|
||||||
|
dbmanagedomain = dbmanagedomain()
|
||||||
|
|
||||||
class dbdomain(crudbase):
|
class dbdomain(crudbase):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__(model=models.Domain)
|
super().__init__(model=models.Domain)
|
||||||
@@ -38,6 +50,10 @@ class dbdomain(crudbase):
|
|||||||
def get_domains(self,db: Session)-> ApiReturnPage[models.Base]:
|
def get_domains(self,db: Session)-> ApiReturnPage[models.Base]:
|
||||||
return paginate(db,super().get_all())
|
return paginate(db,super().get_all())
|
||||||
|
|
||||||
|
def get_domains_by_manage(self,db: Session,userid:int)-> ApiReturnPage[models.Base]:
|
||||||
|
query = select(models.Domain).join(models.ManageDomain,models.ManageDomain.domainid == models.Domain.id).where(models.ManageDomain.userid == userid)
|
||||||
|
return paginate(db,query)
|
||||||
|
|
||||||
def get_domains_by_owner(self,db: Session,ownerid:int)-> ApiReturnPage[models.Base]:
|
def get_domains_by_owner(self,db: Session,ownerid:int)-> ApiReturnPage[models.Base]:
|
||||||
return paginate(db,super().get_by_conditions({"ownerid":ownerid}))
|
return paginate(db,super().get_by_conditions({"ownerid":ownerid}))
|
||||||
|
|
||||||
@@ -45,14 +61,31 @@ class dbdomain(crudbase):
|
|||||||
#db_domain = super().get_by_conditions(db,{"url":domain.url,"kintoneuser":domain.kintoneuser,"onwerid":userid}).first()
|
#db_domain = super().get_by_conditions(db,{"url":domain.url,"kintoneuser":domain.kintoneuser,"onwerid":userid}).first()
|
||||||
#if not db_domain:
|
#if not db_domain:
|
||||||
domain.encrypt_kintonepwd()
|
domain.encrypt_kintonepwd()
|
||||||
domain.id = None
|
db_domain = models.Domain(
|
||||||
domain.createuserid = userid
|
tenantid = domain.tenantid,
|
||||||
domain.updateuserid = userid
|
name = domain.name,
|
||||||
domain.ownerid = userid
|
url = domain.url,
|
||||||
return super().create(db,domain)
|
kintoneuser = domain.kintoneuser,
|
||||||
#return db_domain
|
kintonepwd = domain.kintonepwd,
|
||||||
|
is_active = domain.is_active,
|
||||||
|
createuserid = userid,
|
||||||
|
updateuserid = userid,
|
||||||
|
ownerid = userid
|
||||||
|
)
|
||||||
|
db.add(db_domain)
|
||||||
|
db.flush()
|
||||||
|
user_domain = models.UserDomain(userid = userid, domainid = db_domain.id ,createuserid = userid,updateuserid = userid)
|
||||||
|
db.add(user_domain)
|
||||||
|
manage_domain = models.ManageDomain(userid = userid, domainid = db_domain.id ,createuserid = userid,updateuserid = userid)
|
||||||
|
db.add(manage_domain)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(db_domain)
|
||||||
|
return db_domain
|
||||||
|
|
||||||
def delete_domain(self,db: Session,id: int):
|
def delete_domain(self,db: Session,id: int):
|
||||||
|
db_managedomains = dbmanagedomain.get_managedomain_by_domain(db,id)
|
||||||
|
for manage in db_managedomains:
|
||||||
|
db.delete(manage)
|
||||||
return super().delete(db,id)
|
return super().delete(db,id)
|
||||||
|
|
||||||
def edit_domain(self,db: Session, domain: schemas.DomainIn,userid:int) -> schemas.DomainOut:
|
def edit_domain(self,db: Session, domain: schemas.DomainIn,userid:int) -> schemas.DomainOut:
|
||||||
@@ -90,7 +123,7 @@ class dbdomain(crudbase):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def add_userdomain_by_owner(self,db: Session,ownerid:int, userid:int,domainid:int) -> schemas.DomainOut:
|
def add_userdomain_by_owner(self,db: Session,ownerid:int, userid:int,domainid:int) -> schemas.DomainOut:
|
||||||
db_domain = super().get_by_conditions(db,{"id":domainid,"ownerid":ownerid,"is_active":True}).first()
|
db_domain = db.execute(super().get_by_conditions({"id":domainid,"is_active":True})).scalars().first()
|
||||||
if db_domain:
|
if db_domain:
|
||||||
db_userdomain = dbuserdomain.get_userdomain(db,userid,domainid)
|
db_userdomain = dbuserdomain.get_userdomain(db,userid,domainid)
|
||||||
if not db_userdomain:
|
if not db_userdomain:
|
||||||
@@ -141,8 +174,45 @@ class dbdomain(crudbase):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_shareddomain_users(self,db: Session,ownerid:int,domainid: int) -> ApiReturnPage[models.Base]:
|
def get_shareddomain_users(self,db: Session,domainid: int) -> ApiReturnPage[models.Base]:
|
||||||
users = select(models.User).join(models.UserDomain,models.UserDomain.userid == models.User.id).filter(models.UserDomain.domainid ==domainid)
|
users = select(models.User).join(models.UserDomain,models.UserDomain.userid == models.User.id).filter(models.UserDomain.domainid ==domainid)
|
||||||
return paginate(db,users)
|
return paginate(db,users)
|
||||||
|
|
||||||
|
|
||||||
|
def add_managedomain(self,db: Session,ownerid:int,userid:int,domainid:int) -> schemas.DomainOut:
|
||||||
|
db_domain = self.get_domains_by_owner
|
||||||
|
if db_domain:
|
||||||
|
db_managedomain = dbmanagedomain.get_managedomain(db,userid,domainid)
|
||||||
|
if not db_managedomain:
|
||||||
|
manage_domain = models.ManageDomain(userid = userid, domainid = domainid ,createuserid = ownerid,updateuserid = ownerid)
|
||||||
|
db.add(manage_domain)
|
||||||
|
db.commit()
|
||||||
|
return db_domain
|
||||||
|
return None
|
||||||
|
|
||||||
|
def add_managedomain_by_owner(self,db: Session,ownerid:int, userid:int,domainid:int) -> schemas.DomainOut:
|
||||||
|
db_domain = db.execute(super().get_by_conditions({"id":domainid,"ownerid":ownerid,})).scalars().first()
|
||||||
|
if db_domain:
|
||||||
|
db_managedomain = dbmanagedomain.get_managedomain(db,userid,domainid)
|
||||||
|
if not db_managedomain:
|
||||||
|
manage_domain = models.ManageDomain(userid = userid, domainid = domainid ,createuserid =ownerid,updateuserid = ownerid)
|
||||||
|
db.add(manage_domain)
|
||||||
|
db.commit()
|
||||||
|
return db_domain
|
||||||
|
return None
|
||||||
|
|
||||||
|
def delete_managedomain(self,db: Session, userid: int,domainid: int) -> schemas.DomainOut:
|
||||||
|
db_managedomain = dbmanagedomain.get_managedomain(db,userid,domainid)
|
||||||
|
if db_managedomain:
|
||||||
|
domain = db_managedomain.domain
|
||||||
|
if domain.ownerid != userid:
|
||||||
|
db.delete(db_managedomain)
|
||||||
|
db.commit()
|
||||||
|
return domain
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_managedomain_users(self,db: Session,domainid: int) -> ApiReturnPage[models.Base]:
|
||||||
|
users = select(models.User).join(models.ManageDomain,models.ManageDomain.userid == models.User.id).where(models.ManageDomain.domainid ==domainid)
|
||||||
|
return paginate(db,users)
|
||||||
|
|
||||||
domainService = dbdomain()
|
domainService = dbdomain()
|
||||||
@@ -183,6 +183,18 @@ class UserDomain(Base):
|
|||||||
createuser = relationship('User',foreign_keys=[createuserid])
|
createuser = relationship('User',foreign_keys=[createuserid])
|
||||||
updateuser = relationship('User',foreign_keys=[updateuserid])
|
updateuser = relationship('User',foreign_keys=[updateuserid])
|
||||||
|
|
||||||
|
class ManageDomain(Base):
|
||||||
|
__tablename__ = "managedomain"
|
||||||
|
|
||||||
|
userid = mapped_column(Integer,ForeignKey("user.id"))
|
||||||
|
domainid = mapped_column(Integer,ForeignKey("domain.id"))
|
||||||
|
createuserid = mapped_column(Integer,ForeignKey("user.id"))
|
||||||
|
updateuserid = mapped_column(Integer,ForeignKey("user.id"))
|
||||||
|
domain = relationship("Domain")
|
||||||
|
user = relationship("User",foreign_keys=[userid])
|
||||||
|
createuser = relationship('User',foreign_keys=[createuserid])
|
||||||
|
updateuser = relationship('User',foreign_keys=[updateuserid])
|
||||||
|
|
||||||
class Event(Base):
|
class Event(Base):
|
||||||
__tablename__ = "event"
|
__tablename__ = "event"
|
||||||
|
|
||||||
|
|||||||
@@ -192,6 +192,10 @@ class DomainOut(BaseModel):
|
|||||||
class ConfigDict:
|
class ConfigDict:
|
||||||
orm_mode = True
|
orm_mode = True
|
||||||
|
|
||||||
|
class UserDomainParam(BaseModel):
|
||||||
|
userid:int
|
||||||
|
domainid:int
|
||||||
|
|
||||||
class UserDomain(BaseModel):
|
class UserDomain(BaseModel):
|
||||||
id: int
|
id: int
|
||||||
is_default: bool
|
is_default: bool
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ def test_client(test_db):
|
|||||||
yield test_client
|
yield test_client
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="session")
|
||||||
def test_user(test_db):
|
def test_user(test_db):
|
||||||
password ="test"
|
password ="test"
|
||||||
user = models.User(
|
user = models.User(
|
||||||
@@ -131,6 +131,11 @@ def test_domain(test_db,login_user_id):
|
|||||||
ownerid = login_user_id
|
ownerid = login_user_id
|
||||||
)
|
)
|
||||||
test_db.add(domain)
|
test_db.add(domain)
|
||||||
|
test_db.flush()
|
||||||
|
user_domain = models.UserDomain(userid = login_user_id, domainid = domain.id ,createuserid = login_user_id,updateuserid = login_user_id)
|
||||||
|
test_db.add(user_domain)
|
||||||
|
manage_domain = models.ManageDomain(userid = login_user_id, domainid = domain.id ,createuserid = login_user_id,updateuserid = login_user_id)
|
||||||
|
test_db.add(manage_domain)
|
||||||
test_db.commit()
|
test_db.commit()
|
||||||
test_db.refresh(domain)
|
test_db.refresh(domain)
|
||||||
return domain
|
return domain
|
||||||
|
|||||||
@@ -20,8 +20,9 @@ def test_create_domain(test_client, login_user,login_user_id):
|
|||||||
"is_active": True,
|
"is_active": True,
|
||||||
}
|
}
|
||||||
response = test_client.post("/api/domain", json=create_domain,headers={"Authorization": "Bearer " + login_user})
|
response = test_client.post("/api/domain", json=create_domain,headers={"Authorization": "Bearer " + login_user})
|
||||||
assert response.status_code == 200
|
|
||||||
data = response.json()
|
data = response.json()
|
||||||
|
logging.error(data)
|
||||||
|
assert response.status_code == 200
|
||||||
assert "data" in data
|
assert "data" in data
|
||||||
assert data["data"] is not None
|
assert data["data"] is not None
|
||||||
assert data["data"]["name"] == create_domain["name"]
|
assert data["data"]["name"] == create_domain["name"]
|
||||||
@@ -29,8 +30,63 @@ def test_create_domain(test_client, login_user,login_user_id):
|
|||||||
assert data["data"]["kintoneuser"] == create_domain["kintoneuser"]
|
assert data["data"]["kintoneuser"] == create_domain["kintoneuser"]
|
||||||
assert data["data"]["is_active"] == create_domain["is_active"]
|
assert data["data"]["is_active"] == create_domain["is_active"]
|
||||||
assert data["data"]["owner"]["id"] == login_user_id
|
assert data["data"]["owner"]["id"] == login_user_id
|
||||||
|
|
||||||
def test_delete_domain(test_client, login_user):
|
|
||||||
|
def test_get_managedomainuser(test_client,test_domain,login_user,login_user_id):
|
||||||
|
response = test_client.get("/api/managedomainuser/" + str(test_domain.id),headers={"Authorization": "Bearer " + login_user})
|
||||||
|
data = response.json()
|
||||||
|
logging.error(data)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "data" in data
|
||||||
|
assert len(data["data"]) == 1
|
||||||
|
assert data["data"][0]["id"] == login_user_id
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_delete_userdomain(test_client,test_domain,test_user,login_user,login_user_id):
|
||||||
|
userdomain ={
|
||||||
|
"userid":test_user["id"],
|
||||||
|
"domainid":test_domain.id
|
||||||
|
}
|
||||||
|
response = test_client.post("/api/userdomain" , json=userdomain,headers={"Authorization": "Bearer " + login_user})
|
||||||
|
data = response.json()
|
||||||
|
logging.error(data)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "data" in data
|
||||||
|
assert data["data"] is not None
|
||||||
|
assert data["data"]["name"] == test_domain.name
|
||||||
|
assert data["data"]["url"] == test_domain.url
|
||||||
|
response = test_client.delete(f"/api/domain/{test_domain.id}/{test_user["id"]}" , headers={"Authorization": "Bearer " + login_user})
|
||||||
|
data = response.json()
|
||||||
|
logging.error(data)
|
||||||
|
assert "data" in data
|
||||||
|
assert data["data"] is not None
|
||||||
|
assert data["data"]["name"] == test_domain.name
|
||||||
|
assert data["data"]["url"] == test_domain.url
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_delete_managedomain(test_client,test_domain,test_user,login_user,login_user_id):
|
||||||
|
userdomain ={
|
||||||
|
"userid":test_user["id"],
|
||||||
|
"domainid":test_domain.id
|
||||||
|
}
|
||||||
|
response = test_client.post("/api/managedomain" , json=userdomain,headers={"Authorization": "Bearer " + login_user})
|
||||||
|
data = response.json()
|
||||||
|
logging.error(data)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "data" in data
|
||||||
|
assert data["data"] is not None
|
||||||
|
assert data["data"]["name"] == test_domain.name
|
||||||
|
assert data["data"]["url"] == test_domain.url
|
||||||
|
response = test_client.delete(f"/api/managedomain/{test_domain.id}/{test_user["id"]}" , headers={"Authorization": "Bearer " + login_user})
|
||||||
|
data = response.json()
|
||||||
|
logging.error(data)
|
||||||
|
assert "data" in data
|
||||||
|
assert data["data"] is not None
|
||||||
|
assert data["data"]["name"] == test_domain.name
|
||||||
|
assert data["data"]["url"] == test_domain.url
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_domain(test_client, login_user,login_user_id):
|
||||||
delete_domain ={
|
delete_domain ={
|
||||||
"id": 0,
|
"id": 0,
|
||||||
"tenantid": "1",
|
"tenantid": "1",
|
||||||
@@ -41,15 +97,22 @@ def test_delete_domain(test_client, login_user):
|
|||||||
"is_active": True,
|
"is_active": True,
|
||||||
}
|
}
|
||||||
response = test_client.post("/api/domain", json=delete_domain,headers={"Authorization": "Bearer " + login_user})
|
response = test_client.post("/api/domain", json=delete_domain,headers={"Authorization": "Bearer " + login_user})
|
||||||
assert response.status_code == 200
|
|
||||||
data = response.json()
|
data = response.json()
|
||||||
|
assert response.status_code == 200
|
||||||
assert "data" in data
|
assert "data" in data
|
||||||
assert data["data"] is not None
|
assert data["data"] is not None
|
||||||
id = data["data"]["id"]
|
id = data["data"]["id"]
|
||||||
response = test_client.delete("/api/domain/"+ str(id),headers={"Authorization": "Bearer " + login_user})
|
|
||||||
|
response = test_client.delete(f"/api/domain/{id}/{login_user_id}",headers={"Authorization": "Bearer " + login_user})
|
||||||
|
data = response.json()
|
||||||
|
logging.error(data)
|
||||||
|
|
||||||
|
response = test_client.delete(f"/api/domain/{id}",headers={"Authorization": "Bearer " + login_user})
|
||||||
|
data = response.json()
|
||||||
|
logging.error(data)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert response.json()["data"]["name"] == delete_domain["name"]
|
assert response.json()["data"]["name"] == delete_domain["name"]
|
||||||
response = test_client.get("/api/domain/"+ str(id), headers={"Authorization": "Bearer " + login_user})
|
response = test_client.get(f"/api/domain/{id}", headers={"Authorization": "Bearer " + login_user})
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert "data" not in response.json()
|
assert "data" not in response.json()
|
||||||
|
|
||||||
@@ -83,8 +146,9 @@ def test_get_defaultuserdomain(test_client, test_domain,login_user):
|
|||||||
|
|
||||||
def test_get_domainshareduser(test_client, test_domain,login_user,login_user_id):
|
def test_get_domainshareduser(test_client, test_domain,login_user,login_user_id):
|
||||||
response = test_client.get("/api/domainshareduser/"+str(test_domain.id), headers={"Authorization": "Bearer " + login_user})
|
response = test_client.get("/api/domainshareduser/"+str(test_domain.id), headers={"Authorization": "Bearer " + login_user})
|
||||||
assert response.status_code == 200
|
|
||||||
data = response.json()
|
data = response.json()
|
||||||
|
logging.error(data)
|
||||||
|
assert response.status_code == 200
|
||||||
assert "data" in data
|
assert "data" in data
|
||||||
assert data["data"] is not None
|
assert data["data"] is not None
|
||||||
assert len(data["data"]) == 1
|
assert len(data["data"]) == 1
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ def test_users_list_for_admin(test_client,login_admin):
|
|||||||
|
|
||||||
def test_user_create(test_client,login_user):
|
def test_user_create(test_client,login_user):
|
||||||
user_data = {
|
user_data = {
|
||||||
"email": "newuser@example.com",
|
"email": "newuser1@example.com",
|
||||||
"password": "password123",
|
"password": "password123",
|
||||||
"first_name": "New",
|
"first_name": "New",
|
||||||
"last_name": "User",
|
"last_name": "User",
|
||||||
@@ -37,7 +37,7 @@ def test_user_create(test_client,login_user):
|
|||||||
|
|
||||||
def test_admin_create(test_client,login_user):
|
def test_admin_create(test_client,login_user):
|
||||||
user_data = {
|
user_data = {
|
||||||
"email": "newuser@example.com",
|
"email": "newuser2@example.com",
|
||||||
"password": "password123",
|
"password": "password123",
|
||||||
"first_name": "New",
|
"first_name": "New",
|
||||||
"last_name": "User",
|
"last_name": "User",
|
||||||
|
|||||||
Reference in New Issue
Block a user