BUG555: Chacha20アルゴリズムで暗号化。注:このコミットは全員の開発環境に存在する必要があります。その後、/#/domainページにアクセスし、暗号化されていないアカウントの「編集」をクリックして直接保存し、暗号化されていないアカウントを暗号化します。
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import os
|
||||
import base64
|
||||
|
||||
|
||||
PROJECT_NAME = "KintoneAppBuilder"
|
||||
|
||||
#SQLALCHEMY_DATABASE_URI = "postgres://maxz64:m@xz1205@alicornkintone.postgres.database.azure.com/postgres"
|
||||
@@ -18,6 +19,8 @@ KINTONE_FIELD_TYPE=["GROUP","GROUP_SELECT","CHECK_BOX","SUBTABLE","DROP_DOWN","U
|
||||
|
||||
KINTONE_FIELD_PROPERTY=['label','code','type','required','unique','maxValue','minValue','maxLength','minLength','defaultValue','defaultNowValue','options','expression','hideExpression','digit','protocol','displayScale','unit','unitPosition']
|
||||
|
||||
KINTONE_PSW_CRYPTO_KEY=bytes.fromhex("53 6c 93 bd 48 ad b5 c0 93 df a1 27 25 a1 a3 32 a2 03 3b a0 27 1f 51 dc 20 0e 6c d7 be fc fb ea")
|
||||
|
||||
class KINTONE_ENV:
|
||||
|
||||
BASE_URL = ""
|
||||
|
||||
@@ -2,6 +2,10 @@ import jwt
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from passlib.context import CryptContext
|
||||
from datetime import datetime, timedelta
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
|
||||
import os
|
||||
import base64
|
||||
from app.core import config
|
||||
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/token")
|
||||
|
||||
@@ -29,3 +33,30 @@ def create_access_token(*, data: dict, expires_delta: timedelta = None):
|
||||
to_encode.update({"exp": expire})
|
||||
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||||
return encoded_jwt
|
||||
|
||||
def chacha20Encrypt(plaintext:str, key=config.KINTONE_PSW_CRYPTO_KEY):
|
||||
nonce = os.urandom(16)
|
||||
algorithm = algorithms.ChaCha20(key, nonce)
|
||||
cipher = Cipher(algorithm, mode=None)
|
||||
encryptor = cipher.encryptor()
|
||||
ciphertext = encryptor.update(plaintext.encode('utf-8')) + encryptor.finalize()
|
||||
return base64.b64encode(nonce +'𒀸'.encode('utf-8')+ ciphertext).decode('utf-8')
|
||||
|
||||
def chacha20Decrypt(encoded_str:str, key=config.KINTONE_PSW_CRYPTO_KEY):
|
||||
try:
|
||||
decoded_data = base64.b64decode(encoded_str)
|
||||
if len(decoded_data) < 18:
|
||||
return encoded_str
|
||||
special_char = decoded_data[16:20]
|
||||
if special_char != '𒀸'.encode('utf-8'):
|
||||
return encoded_str
|
||||
nonce = decoded_data[:16]
|
||||
ciphertext = decoded_data[20:]
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
return encoded_str
|
||||
algorithm = algorithms.ChaCha20(key, nonce)
|
||||
cipher = Cipher(algorithm, mode=None)
|
||||
decryptor = cipher.decryptor()
|
||||
plaintext_bytes = decryptor.update(ciphertext) + decryptor.finalize()
|
||||
return plaintext_bytes.decode('utf-8')
|
||||
@@ -4,7 +4,7 @@ from sqlalchemy import and_
|
||||
import typing as t
|
||||
|
||||
from . import models, schemas
|
||||
from app.core.security import get_password_hash
|
||||
from app.core.security import chacha20Decrypt, get_password_hash
|
||||
|
||||
|
||||
def get_user(db: Session, user_id: int):
|
||||
@@ -184,6 +184,7 @@ def get_flows_by_app(db: Session, domainid: int, appid: str):
|
||||
return flows
|
||||
|
||||
def create_domain(db: Session, domain: schemas.DomainBase):
|
||||
domain.encrypt_kintonepwd()
|
||||
db_domain = models.Domain(
|
||||
tenantid = domain.tenantid,
|
||||
name=domain.name,
|
||||
@@ -208,6 +209,7 @@ def delete_domain(db: Session,id: int):
|
||||
def edit_domain(
|
||||
db: Session, domain: schemas.DomainBase
|
||||
) -> schemas.Domain:
|
||||
domain.encrypt_kintonepwd()
|
||||
db_domain = db.query(models.Domain).get(domain.id)
|
||||
if not db_domain:
|
||||
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
||||
@@ -264,12 +266,19 @@ def get_domain(db: Session, userid: str):
|
||||
domains = db.query(models.Domain).join(models.UserDomain,models.UserDomain.domainid == models.Domain.id ).filter(models.UserDomain.userid == userid).all()
|
||||
if not domains:
|
||||
raise HTTPException(status_code=404, detail="Data not found")
|
||||
for domain in domains:
|
||||
decrypted_pwd = chacha20Decrypt(domain.kintonepwd)
|
||||
domain.kintonepwd = decrypted_pwd
|
||||
|
||||
return domains
|
||||
|
||||
def get_domains(db: Session,tenantid:str):
|
||||
domains = db.query(models.Domain).filter(models.Domain.tenantid == tenantid ).all()
|
||||
if not domains:
|
||||
raise HTTPException(status_code=404, detail="Data not found")
|
||||
for domain in domains:
|
||||
decrypted_pwd = chacha20Decrypt(domain.kintonepwd)
|
||||
domain.kintonepwd = decrypted_pwd
|
||||
return domains
|
||||
|
||||
def get_events(db: Session):
|
||||
|
||||
@@ -2,6 +2,7 @@ from pydantic import BaseModel
|
||||
from datetime import datetime
|
||||
import typing as t
|
||||
|
||||
from app.core.security import chacha20Decrypt, chacha20Encrypt
|
||||
|
||||
class Base(BaseModel):
|
||||
create_time: datetime
|
||||
@@ -119,6 +120,10 @@ class DomainBase(BaseModel):
|
||||
kintoneuser: str
|
||||
kintonepwd: str
|
||||
|
||||
def encrypt_kintonepwd(self):
|
||||
encrypted_pwd = chacha20Encrypt(self.kintonepwd)
|
||||
self.kintonepwd = encrypted_pwd
|
||||
|
||||
class Domain(Base):
|
||||
id: int
|
||||
tenantid: str
|
||||
|
||||
Reference in New Issue
Block a user