Files
KintoneAppBuilder/backend/app/core/operation.py

132 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from urllib.parse import parse_qs, urlencode
from fastapi import Request
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
from sqlalchemy.orm import Session
from app.db.models import OperationLog,User
from app.core.apiexception import APIException
from app.core.dbmanager import get_log_db
from app.db.crud import create_log
import json
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
if request.method in ("POST", "PUT", "PATCH","DELETE"):
content_type = request.headers.get('content-type', '')
if content_type.startswith('multipart/form-data'):
request.state.body = None
else:
try:
request.state.body = await request.json()
except json.JSONDecodeError:
request.state.body = await request.body()
else:
request.state.body = None
try:
response = await call_next(request)
state = request.state
except Exception as e:
await self.log_error(request, e)
response = JSONResponse(
content={"detail": "Internal Server Error"},
status_code=500
)
if hasattr(request.state, "user") and hasattr(request.state, "tenant"):
await self.log_request(request, response,state)
return response
def sanitize_password(self,data):
"""
データ内の password パラメータをフィルタリングする機能。
dict、JSON 文字列、URL エンコード文字列、QueryDict をサポート。
"""
if data is None:
return data
elif isinstance(data, dict):
data.pop('password', None)
return data
elif isinstance(data, list):
return [self.sanitize_password(item) for item in data]
elif isinstance(data, (str, bytes)):
if isinstance(data, bytes):
data = data.decode('utf-8') # bytes to str
# JSON解析
try:
parsed_json = json.loads(data)
sanitized_json = self.sanitize_password(parsed_json)
return json.dumps(sanitized_json, separators=(',', ':'))
except json.JSONDecodeError:
# URL 解析
try:
parsed_dict = parse_qs(data)
parsed_dict.pop('password', None)
return urlencode(parsed_dict, doseq=True)
except:
parts = data.split('&')
filtered_parts = []
for part in parts:
if '=' in part:
key, _ = part.split('=', 1)
if key == 'password':
continue
filtered_parts.append(part)
return '&'.join(filtered_parts)
# QueryDict 例えば FastAPI の request.query_params
elif hasattr(data, 'items'):
return {k: v for k, v in data.items() if k != 'password'}
return data
async def log_request(self, request: Request, response,state):
try:
headers = dict(request.headers)
route = request.scope.get("route")
if route:
path_template = route.path
else:
path_template = request.url.path
# passwordのパラメータを除外する
safe_query = self.sanitize_password(request.query_params.items())
# passwordのパラメータを除外する
safe_body = self.sanitize_password(request.state.body)
db_operation = OperationLog(
tenantid =request.state.tenant,
clientip = request.client.host if request.client else None,
useragent =headers.get("user-agent", ""),
userid = request.state.user,
operation = request.method,
function = path_template,
parameters = str({
"path": request.path_params,
"query": safe_query,
"body": safe_body
}),
response = f"status_code:{response.status_code }" )
db = request.state.db
if db:
await self.write_log_to_db(db_operation,db)
except Exception as e:
print(f"Logging failed: {str(e)}")
async def log_error(self, request: Request, e: Exception):
exc = APIException('operation:dispatch',request.url._url,f"Error occurred while writting operation log:",e)
db = get_log_db()
try:
create_log(db,exc.error)
finally:
db.close()
async def write_log_to_db(self, db_operation,db):
db.add(db_operation)
db.commit()