import pytest from sqlalchemy import create_engine, event from sqlalchemy.orm import sessionmaker from fastapi.testclient import TestClient import typing as t from app.core import config, security from app.core.dbmanager import get_db from app.db import models,schemas from app.main import app from app.core import security import jwt SQLALCHEMY_DATABASE_URI = f"postgresql+psycopg2://kabAdmin:P%40ssw0rd!@kintonetooldb.postgres.database.azure.com/test" engine = create_engine(SQLALCHEMY_DATABASE_URI,echo=True) test_session_maker = sessionmaker(autocommit=False, autoflush=False, bind=engine) @pytest.fixture(scope="session") def test_db(): connection = engine.connect() transaction = connection.begin() test_session = test_session_maker(bind=connection) yield test_session test_session.close() transaction.rollback() #transaction.commit() connection.close() @pytest.fixture(scope="session") def test_client(test_db): def get_test_db(): try: yield test_db finally: test_db.close() app.dependency_overrides[get_db] = get_test_db with TestClient(app) as test_client: yield test_client @pytest.fixture(scope="session") def test_tenant_id(): return "1" @pytest.fixture(scope="session") def test_user(test_db,test_tenant_id): password ="test" user = models.User( email = "test@test.com", first_name = "test", last_name = "abc", hashed_password = security.get_password_hash(password), is_active = True, is_superuser = False, tenantid = test_tenant_id ) test_db.add(user) test_db.commit() test_db.refresh(user) dicUser = user.__dict__ dicUser["password"] = password return dicUser @pytest.fixture(scope="session") def password(): return "password" @pytest.fixture(scope="session") def user(test_db,password,test_tenant_id): user = models.User( email = "user@test.com", first_name = "user", last_name = "abc", hashed_password = security.get_password_hash(password), is_active = True, is_superuser = False, tenantid = test_tenant_id ) test_db.add(user) test_db.commit() test_db.refresh(user) return user.__dict__ @pytest.fixture(scope="session") def admin(test_db,password,test_tenant_id): user = models.User( email = "admin@test.com", first_name = "admin", last_name = "abc", hashed_password = security.get_password_hash(password), is_active = True, is_superuser = True, tenantid =test_tenant_id ) test_db.add(user) test_db.commit() test_db.refresh(user) return user.__dict__ @pytest.fixture(scope="session") def login_user(test_db,test_client,user,password): # test_db.add(user) # test_db.commit() #test_db.refresh(user) response = test_client.post("/api/token", data={"username": user["email"], "password":password }) return response.json()["access_token"] @pytest.fixture(scope="session") def login_admin(test_db,test_client,admin,password): # test_db.add(admin) # test_db.commit() #test_db.refresh(admin) response = test_client.post("/api/token", data={"username": admin["email"], "password":password }) return response.json()["access_token"] @pytest.fixture(scope="session") def login_user_id(login_user): payload = jwt.decode(login_user, security.SECRET_KEY, algorithms=[security.ALGORITHM]) id = payload.get("sub") return id @pytest.fixture(scope="session") def login_admin_id(login_admin): payload = jwt.decode(login_admin, security.SECRET_KEY, algorithms=[security.ALGORITHM]) id = payload.get("sub") return id @pytest.fixture(scope="session") def test_role(test_db): role = models.Role( name = "test", description = "test", level = 1 ) test_db.add(role) test_db.commit() test_db.refresh(role) return role.__dict__ @pytest.fixture(scope="session") def test_domain(test_db,login_user_id): domain = models.Domain( tenantid = "1", name = "テスト環境", url = "https://mfu07rkgnb7c.cybozu.com", kintoneuser = "MXZ", kintonepwd = security.chacha20Encrypt("maxz1205"), is_active = True, createuserid =login_user_id, updateuserid =login_user_id, ownerid = login_user_id ) test_db.add(domain) test_db.flush() user_domain = models.UserDomain(userid = login_user_id, domainid = domain.id ,createuserid = login_user_id,updateuserid = login_user_id) test_db.add(user_domain) manage_domain = models.ManageDomain(userid = login_user_id, domainid = domain.id ,createuserid = login_user_id,updateuserid = login_user_id) test_db.add(manage_domain) test_db.commit() test_db.refresh(domain) return domain @pytest.fixture(scope="session") def test_app_id(): return "132" # @pytest.fixture # def test_password() -> str: # return "securepassword" # def get_password_hash() -> str: # """ # Password hashing can be expensive so a mock will be much faster # """ # return "supersecrethash" # @pytest.fixture # def test_user(test_db) -> models.User: # """ # Make a test user in the database # """ # user = models.User( # email="fake@email.com", # hashed_password=get_password_hash(), # is_active=True, # ) # test_db.add(user) # test_db.commit() # return user # @pytest.fixture # def test_superuser(test_db) -> models.User: # """ # Superuser for testing # """ # user = models.User( # email="fakeadmin@email.com", # hashed_password=get_password_hash(), # is_superuser=True, # ) # test_db.add(user) # test_db.commit() # return user # def verify_password_mock(first: str, second: str) -> bool: # return True # @pytest.fixture # def user_token_headers( # client: TestClient, test_user, test_password, monkeypatch # ) -> t.Dict[str, str]: # monkeypatch.setattr(security, "verify_password", verify_password_mock) # login_data = { # "username": test_user.email, # "password": test_password, # } # r = client.post("/api/token", data=login_data) # tokens = r.json() # a_token = tokens["access_token"] # headers = {"Authorization": f"Bearer {a_token}"} # return headers # @pytest.fixture # def superuser_token_headers( # client: TestClient, test_superuser, test_password, monkeypatch # ) -> t.Dict[str, str]: # monkeypatch.setattr(security, "verify_password", verify_password_mock) # login_data = { # "username": test_superuser.email, # "password": test_password, # } # r = client.post("/api/token", data=login_data) # tokens = r.json() # a_token = tokens["access_token"] # headers = {"Authorization": f"Bearer {a_token}"} # return headers