KintoneAppBuilder created

This commit is contained in:
2023-07-10 09:40:49 +09:00
commit 80446a3860
94 changed files with 13622 additions and 0 deletions

View File

@@ -0,0 +1,63 @@
from fastapi.security import OAuth2PasswordRequestForm
from fastapi import APIRouter, Depends, HTTPException, status
from datetime import timedelta
from app.db.session import get_db
from app.core import security
from app.core.auth import authenticate_user, sign_up_new_user
auth_router = r = APIRouter()
@r.post("/token")
async def login(
db=Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()
):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(
minutes=security.ACCESS_TOKEN_EXPIRE_MINUTES
)
if user.is_superuser:
permissions = "admin"
else:
permissions = "user"
access_token = security.create_access_token(
data={"sub": user.email, "permissions": permissions},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@r.post("/signup")
async def signup(
db=Depends(get_db), form_data: OAuth2PasswordRequestForm = Depends()
):
user = sign_up_new_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Account already exists",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(
minutes=security.ACCESS_TOKEN_EXPIRE_MINUTES
)
if user.is_superuser:
permissions = "admin"
else:
permissions = "user"
access_token = security.create_access_token(
data={"sub": user.email, "permissions": permissions},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}

View File

@@ -0,0 +1,66 @@
from app.core import security
# Monkey patch function we can use to shave a second off our tests by skipping the password hashing check
def verify_password_mock(first: str, second: str):
return True
def test_login(client, test_user, monkeypatch):
# Patch the test to skip password hashing check for speed
monkeypatch.setattr(security, "verify_password", verify_password_mock)
response = client.post(
"/api/token",
data={"username": test_user.email, "password": "nottheactualpass"},
)
assert response.status_code == 200
def test_signup(client, monkeypatch):
def get_password_hash_mock(first: str, second: str):
return True
monkeypatch.setattr(security, "get_password_hash", get_password_hash_mock)
response = client.post(
"/api/signup",
data={"username": "some@email.com", "password": "randompassword"},
)
assert response.status_code == 200
def test_resignup(client, test_user, monkeypatch):
# Patch the test to skip password hashing check for speed
monkeypatch.setattr(security, "verify_password", verify_password_mock)
response = client.post(
"/api/signup",
data={
"username": test_user.email,
"password": "password_hashing_is_skipped_via_monkey_patch",
},
)
assert response.status_code == 409
def test_wrong_password(
client, test_db, test_user, test_password, monkeypatch
):
def verify_password_failed_mock(first: str, second: str):
return False
monkeypatch.setattr(
security, "verify_password", verify_password_failed_mock
)
response = client.post(
"/api/token", data={"username": test_user.email, "password": "wrong"}
)
assert response.status_code == 401
def test_wrong_login(client, test_db, test_user, test_password):
response = client.post(
"/api/token", data={"username": "fakeuser", "password": test_password}
)
assert response.status_code == 401

View File

@@ -0,0 +1,110 @@
from app.db import models
def test_get_users(client, test_superuser, superuser_token_headers):
response = client.get("/api/v1/users", headers=superuser_token_headers)
assert response.status_code == 200
assert response.json() == [
{
"id": test_superuser.id,
"email": test_superuser.email,
"is_active": test_superuser.is_active,
"is_superuser": test_superuser.is_superuser,
}
]
def test_delete_user(client, test_superuser, test_db, superuser_token_headers):
response = client.delete(
f"/api/v1/users/{test_superuser.id}", headers=superuser_token_headers
)
assert response.status_code == 200
assert test_db.query(models.User).all() == []
def test_delete_user_not_found(client, superuser_token_headers):
response = client.delete(
"/api/v1/users/4321", headers=superuser_token_headers
)
assert response.status_code == 404
def test_edit_user(client, test_superuser, superuser_token_headers):
new_user = {
"email": "newemail@email.com",
"is_active": False,
"is_superuser": True,
"first_name": "Joe",
"last_name": "Smith",
"password": "new_password",
}
response = client.put(
f"/api/v1/users/{test_superuser.id}",
json=new_user,
headers=superuser_token_headers,
)
assert response.status_code == 200
new_user["id"] = test_superuser.id
new_user.pop("password")
assert response.json() == new_user
def test_edit_user_not_found(client, test_db, superuser_token_headers):
new_user = {
"email": "newemail@email.com",
"is_active": False,
"is_superuser": False,
"password": "new_password",
}
response = client.put(
"/api/v1/users/1234", json=new_user, headers=superuser_token_headers
)
assert response.status_code == 404
def test_get_user(
client,
test_user,
superuser_token_headers,
):
response = client.get(
f"/api/v1/users/{test_user.id}", headers=superuser_token_headers
)
assert response.status_code == 200
assert response.json() == {
"id": test_user.id,
"email": test_user.email,
"is_active": bool(test_user.is_active),
"is_superuser": test_user.is_superuser,
}
def test_user_not_found(client, superuser_token_headers):
response = client.get("/api/v1/users/123", headers=superuser_token_headers)
assert response.status_code == 404
def test_authenticated_user_me(client, user_token_headers):
response = client.get("/api/v1/users/me", headers=user_token_headers)
assert response.status_code == 200
def test_unauthenticated_routes(client):
response = client.get("/api/v1/users/me")
assert response.status_code == 401
response = client.get("/api/v1/users")
assert response.status_code == 401
response = client.get("/api/v1/users/123")
assert response.status_code == 401
response = client.put("/api/v1/users/123")
assert response.status_code == 401
response = client.delete("/api/v1/users/123")
assert response.status_code == 401
def test_unauthorized_routes(client, user_token_headers):
response = client.get("/api/v1/users", headers=user_token_headers)
assert response.status_code == 403
response = client.get("/api/v1/users/123", headers=user_token_headers)
assert response.status_code == 403

View File

@@ -0,0 +1,107 @@
from fastapi import APIRouter, Request, Depends, Response, encoders
import typing as t
from app.db.session import get_db
from app.db.crud import (
get_users,
get_user,
create_user,
delete_user,
edit_user,
)
from app.db.schemas import UserCreate, UserEdit, User, UserOut
from app.core.auth import get_current_active_user, get_current_active_superuser
users_router = r = APIRouter()
@r.get(
"/users",
response_model=t.List[User],
response_model_exclude_none=True,
)
async def users_list(
response: Response,
db=Depends(get_db),
current_user=Depends(get_current_active_superuser),
):
"""
Get all users
"""
users = get_users(db)
# This is necessary for react-admin to work
response.headers["Content-Range"] = f"0-9/{len(users)}"
return users
@r.get("/users/me", response_model=User, response_model_exclude_none=True)
async def user_me(current_user=Depends(get_current_active_user)):
"""
Get own user
"""
return current_user
@r.get(
"/users/{user_id}",
response_model=User,
response_model_exclude_none=True,
)
async def user_details(
request: Request,
user_id: int,
db=Depends(get_db),
current_user=Depends(get_current_active_superuser),
):
"""
Get any user details
"""
user = get_user(db, user_id)
return user
# return encoders.jsonable_encoder(
# user, skip_defaults=True, exclude_none=True,
# )
@r.post("/users", response_model=User, response_model_exclude_none=True)
async def user_create(
request: Request,
user: UserCreate,
db=Depends(get_db),
current_user=Depends(get_current_active_superuser),
):
"""
Create a new user
"""
return create_user(db, user)
@r.put(
"/users/{user_id}", response_model=User, response_model_exclude_none=True
)
async def user_edit(
request: Request,
user_id: int,
user: UserEdit,
db=Depends(get_db),
current_user=Depends(get_current_active_superuser),
):
"""
Update existing user
"""
return edit_user(db, user_id, user)
@r.delete(
"/users/{user_id}", response_model=User, response_model_exclude_none=True
)
async def user_delete(
request: Request,
user_id: int,
db=Depends(get_db),
current_user=Depends(get_current_active_superuser),
):
"""
Delete existing user
"""
return delete_user(db, user_id)