diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 36c613d..ecee426 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -1,6 +1,7 @@ import os import base64 + PROJECT_NAME = "KintoneAppBuilder" #SQLALCHEMY_DATABASE_URI = "postgres://maxz64:m@xz1205@alicornkintone.postgres.database.azure.com/postgres" @@ -18,6 +19,8 @@ KINTONE_FIELD_TYPE=["GROUP","GROUP_SELECT","CHECK_BOX","SUBTABLE","DROP_DOWN","U KINTONE_FIELD_PROPERTY=['label','code','type','required','unique','maxValue','minValue','maxLength','minLength','defaultValue','defaultNowValue','options','expression','hideExpression','digit','protocol','displayScale','unit','unitPosition'] +KINTONE_PSW_CRYPTO_KEY=bytes.fromhex("53 6c 93 bd 48 ad b5 c0 93 df a1 27 25 a1 a3 32 a2 03 3b a0 27 1f 51 dc 20 0e 6c d7 be fc fb ea") + class KINTONE_ENV: BASE_URL = "" diff --git a/backend/app/core/security.py b/backend/app/core/security.py index 6465002..8e0f70f 100644 --- a/backend/app/core/security.py +++ b/backend/app/core/security.py @@ -2,6 +2,10 @@ import jwt from fastapi.security import OAuth2PasswordBearer from passlib.context import CryptContext from datetime import datetime, timedelta +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms +import os +import base64 +from app.core import config oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/token") @@ -29,3 +33,30 @@ def create_access_token(*, data: dict, expires_delta: timedelta = None): to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt + +def chacha20Encrypt(plaintext:str, key=config.KINTONE_PSW_CRYPTO_KEY): + nonce = os.urandom(16) + algorithm = algorithms.ChaCha20(key, nonce) + cipher = Cipher(algorithm, mode=None) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(plaintext.encode('utf-8')) + encryptor.finalize() + return base64.b64encode(nonce +'𒀸'.encode('utf-8')+ ciphertext).decode('utf-8') + +def chacha20Decrypt(encoded_str:str, key=config.KINTONE_PSW_CRYPTO_KEY): + try: + decoded_data = base64.b64decode(encoded_str) + if len(decoded_data) < 18: + return encoded_str + special_char = decoded_data[16:20] + if special_char != '𒀸'.encode('utf-8'): + return encoded_str + nonce = decoded_data[:16] + ciphertext = decoded_data[20:] + except Exception as e: + print(f"An error occurred: {e}") + return encoded_str + algorithm = algorithms.ChaCha20(key, nonce) + cipher = Cipher(algorithm, mode=None) + decryptor = cipher.decryptor() + plaintext_bytes = decryptor.update(ciphertext) + decryptor.finalize() + return plaintext_bytes.decode('utf-8') \ No newline at end of file diff --git a/backend/app/db/crud.py b/backend/app/db/crud.py index 9250a54..b68beb0 100644 --- a/backend/app/db/crud.py +++ b/backend/app/db/crud.py @@ -4,7 +4,7 @@ from sqlalchemy import and_ import typing as t from . import models, schemas -from app.core.security import get_password_hash +from app.core.security import chacha20Decrypt, get_password_hash def get_user(db: Session, user_id: int): @@ -184,6 +184,7 @@ def get_flows_by_app(db: Session, domainid: int, appid: str): return flows def create_domain(db: Session, domain: schemas.DomainBase): + domain.encrypt_kintonepwd() db_domain = models.Domain( tenantid = domain.tenantid, name=domain.name, @@ -208,6 +209,7 @@ def delete_domain(db: Session,id: int): def edit_domain( db: Session, domain: schemas.DomainBase ) -> schemas.Domain: + domain.encrypt_kintonepwd() db_domain = db.query(models.Domain).get(domain.id) if not db_domain: raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found") @@ -264,12 +266,19 @@ def get_domain(db: Session, userid: str): domains = db.query(models.Domain).join(models.UserDomain,models.UserDomain.domainid == models.Domain.id ).filter(models.UserDomain.userid == userid).all() if not domains: raise HTTPException(status_code=404, detail="Data not found") + for domain in domains: + decrypted_pwd = chacha20Decrypt(domain.kintonepwd) + domain.kintonepwd = decrypted_pwd + return domains def get_domains(db: Session,tenantid:str): domains = db.query(models.Domain).filter(models.Domain.tenantid == tenantid ).all() if not domains: raise HTTPException(status_code=404, detail="Data not found") + for domain in domains: + decrypted_pwd = chacha20Decrypt(domain.kintonepwd) + domain.kintonepwd = decrypted_pwd return domains def get_events(db: Session): diff --git a/backend/app/db/schemas.py b/backend/app/db/schemas.py index c6b77b6..f896c96 100644 --- a/backend/app/db/schemas.py +++ b/backend/app/db/schemas.py @@ -2,6 +2,7 @@ from pydantic import BaseModel from datetime import datetime import typing as t +from app.core.security import chacha20Decrypt, chacha20Encrypt class Base(BaseModel): create_time: datetime @@ -119,6 +120,10 @@ class DomainBase(BaseModel): kintoneuser: str kintonepwd: str + def encrypt_kintonepwd(self): + encrypted_pwd = chacha20Encrypt(self.kintonepwd) + self.kintonepwd = encrypted_pwd + class Domain(Base): id: int tenantid: str