Merged PR 74: feat:kintone APIのパスワードの暗号化対応
kintoneのログイン情報の暗号化を対応しました Related work items: #555
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
import os
|
import os
|
||||||
import base64
|
import base64
|
||||||
|
|
||||||
|
|
||||||
PROJECT_NAME = "KintoneAppBuilder"
|
PROJECT_NAME = "KintoneAppBuilder"
|
||||||
|
|
||||||
#SQLALCHEMY_DATABASE_URI = "postgres://maxz64:m@xz1205@alicornkintone.postgres.database.azure.com/postgres"
|
#SQLALCHEMY_DATABASE_URI = "postgres://maxz64:m@xz1205@alicornkintone.postgres.database.azure.com/postgres"
|
||||||
@@ -18,6 +19,8 @@ KINTONE_FIELD_TYPE=["GROUP","GROUP_SELECT","CHECK_BOX","SUBTABLE","DROP_DOWN","U
|
|||||||
|
|
||||||
KINTONE_FIELD_PROPERTY=['label','code','type','required','unique','maxValue','minValue','maxLength','minLength','defaultValue','defaultNowValue','options','expression','hideExpression','digit','protocol','displayScale','unit','unitPosition']
|
KINTONE_FIELD_PROPERTY=['label','code','type','required','unique','maxValue','minValue','maxLength','minLength','defaultValue','defaultNowValue','options','expression','hideExpression','digit','protocol','displayScale','unit','unitPosition']
|
||||||
|
|
||||||
|
KINTONE_PSW_CRYPTO_KEY=bytes.fromhex("53 6c 93 bd 48 ad b5 c0 93 df a1 27 25 a1 a3 32 a2 03 3b a0 27 1f 51 dc 20 0e 6c d7 be fc fb ea")
|
||||||
|
|
||||||
class KINTONE_ENV:
|
class KINTONE_ENV:
|
||||||
|
|
||||||
BASE_URL = ""
|
BASE_URL = ""
|
||||||
@@ -35,4 +38,4 @@ class KINTONE_ENV:
|
|||||||
self.DOMAIN_ID=domain.id
|
self.DOMAIN_ID=domain.id
|
||||||
self.BASE_URL = domain.url
|
self.BASE_URL = domain.url
|
||||||
self.KINTONE_USER = domain.kintoneuser
|
self.KINTONE_USER = domain.kintoneuser
|
||||||
self.API_V1_AUTH_VALUE = base64.b64encode(bytes(f"{domain.kintoneuser}:{domain.kintonepwd}","utf-8"))
|
self.API_V1_AUTH_VALUE = base64.b64encode(bytes(f"{domain.kintoneuser}:{domain.decrypt_kintonepwd()}","utf-8"))
|
||||||
@@ -2,6 +2,10 @@ import jwt
|
|||||||
from fastapi.security import OAuth2PasswordBearer
|
from fastapi.security import OAuth2PasswordBearer
|
||||||
from passlib.context import CryptContext
|
from passlib.context import CryptContext
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
|
||||||
|
import os
|
||||||
|
import base64
|
||||||
|
from app.core import config
|
||||||
|
|
||||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/token")
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/token")
|
||||||
|
|
||||||
@@ -29,3 +33,30 @@ def create_access_token(*, data: dict, expires_delta: timedelta = None):
|
|||||||
to_encode.update({"exp": expire})
|
to_encode.update({"exp": expire})
|
||||||
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||||||
return encoded_jwt
|
return encoded_jwt
|
||||||
|
|
||||||
|
def chacha20Encrypt(plaintext:str, key=config.KINTONE_PSW_CRYPTO_KEY):
|
||||||
|
nonce = os.urandom(16)
|
||||||
|
algorithm = algorithms.ChaCha20(key, nonce)
|
||||||
|
cipher = Cipher(algorithm, mode=None)
|
||||||
|
encryptor = cipher.encryptor()
|
||||||
|
ciphertext = encryptor.update(plaintext.encode('utf-8')) + encryptor.finalize()
|
||||||
|
return base64.b64encode(nonce +'𒀸'.encode('utf-8')+ ciphertext).decode('utf-8')
|
||||||
|
|
||||||
|
def chacha20Decrypt(encoded_str:str, key=config.KINTONE_PSW_CRYPTO_KEY):
|
||||||
|
try:
|
||||||
|
decoded_data = base64.b64decode(encoded_str)
|
||||||
|
if len(decoded_data) < 18:
|
||||||
|
return encoded_str
|
||||||
|
special_char = decoded_data[16:20]
|
||||||
|
if special_char != '𒀸'.encode('utf-8'):
|
||||||
|
return encoded_str
|
||||||
|
nonce = decoded_data[:16]
|
||||||
|
ciphertext = decoded_data[20:]
|
||||||
|
except Exception as e:
|
||||||
|
print(f"An error occurred: {e}")
|
||||||
|
return encoded_str
|
||||||
|
algorithm = algorithms.ChaCha20(key, nonce)
|
||||||
|
cipher = Cipher(algorithm, mode=None)
|
||||||
|
decryptor = cipher.decryptor()
|
||||||
|
plaintext_bytes = decryptor.update(ciphertext) + decryptor.finalize()
|
||||||
|
return plaintext_bytes.decode('utf-8')
|
||||||
@@ -4,7 +4,7 @@ from sqlalchemy import and_
|
|||||||
import typing as t
|
import typing as t
|
||||||
|
|
||||||
from . import models, schemas
|
from . import models, schemas
|
||||||
from app.core.security import get_password_hash
|
from app.core.security import chacha20Decrypt, get_password_hash
|
||||||
|
|
||||||
|
|
||||||
def get_user(db: Session, user_id: int):
|
def get_user(db: Session, user_id: int):
|
||||||
@@ -184,6 +184,7 @@ def get_flows_by_app(db: Session, domainid: int, appid: str):
|
|||||||
return flows
|
return flows
|
||||||
|
|
||||||
def create_domain(db: Session, domain: schemas.DomainBase):
|
def create_domain(db: Session, domain: schemas.DomainBase):
|
||||||
|
domain.encrypt_kintonepwd()
|
||||||
db_domain = models.Domain(
|
db_domain = models.Domain(
|
||||||
tenantid = domain.tenantid,
|
tenantid = domain.tenantid,
|
||||||
name=domain.name,
|
name=domain.name,
|
||||||
@@ -208,6 +209,7 @@ def delete_domain(db: Session,id: int):
|
|||||||
def edit_domain(
|
def edit_domain(
|
||||||
db: Session, domain: schemas.DomainBase
|
db: Session, domain: schemas.DomainBase
|
||||||
) -> schemas.Domain:
|
) -> schemas.Domain:
|
||||||
|
domain.encrypt_kintonepwd()
|
||||||
db_domain = db.query(models.Domain).get(domain.id)
|
db_domain = db.query(models.Domain).get(domain.id)
|
||||||
if not db_domain:
|
if not db_domain:
|
||||||
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Domain not found")
|
||||||
@@ -264,12 +266,18 @@ def get_domain(db: Session, userid: str):
|
|||||||
domains = db.query(models.Domain).join(models.UserDomain,models.UserDomain.domainid == models.Domain.id ).filter(models.UserDomain.userid == userid).all()
|
domains = db.query(models.Domain).join(models.UserDomain,models.UserDomain.domainid == models.Domain.id ).filter(models.UserDomain.userid == userid).all()
|
||||||
if not domains:
|
if not domains:
|
||||||
raise HTTPException(status_code=404, detail="Data not found")
|
raise HTTPException(status_code=404, detail="Data not found")
|
||||||
|
# for domain in domains:
|
||||||
|
# decrypted_pwd = chacha20Decrypt(domain.kintonepwd)
|
||||||
|
# domain.kintonepwd = decrypted_pwd
|
||||||
return domains
|
return domains
|
||||||
|
|
||||||
def get_domains(db: Session,tenantid:str):
|
def get_domains(db: Session,tenantid:str):
|
||||||
domains = db.query(models.Domain).filter(models.Domain.tenantid == tenantid ).all()
|
domains = db.query(models.Domain).filter(models.Domain.tenantid == tenantid ).all()
|
||||||
if not domains:
|
if not domains:
|
||||||
raise HTTPException(status_code=404, detail="Data not found")
|
raise HTTPException(status_code=404, detail="Data not found")
|
||||||
|
# for domain in domains:
|
||||||
|
# decrypted_pwd = chacha20Decrypt(domain.kintonepwd)
|
||||||
|
# domain.kintonepwd = decrypted_pwd
|
||||||
return domains
|
return domains
|
||||||
|
|
||||||
def get_events(db: Session):
|
def get_events(db: Session):
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ from sqlalchemy import Boolean, Column, Integer, String, DateTime,ForeignKey
|
|||||||
from sqlalchemy.ext.declarative import as_declarative
|
from sqlalchemy.ext.declarative import as_declarative
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
from app.core.security import chacha20Decrypt
|
||||||
|
|
||||||
@as_declarative()
|
@as_declarative()
|
||||||
class Base:
|
class Base:
|
||||||
id = Column(Integer, primary_key=True, index=True)
|
id = Column(Integer, primary_key=True, index=True)
|
||||||
@@ -68,6 +70,9 @@ class Domain(Base):
|
|||||||
url = Column(String(200), nullable=False)
|
url = Column(String(200), nullable=False)
|
||||||
kintoneuser = Column(String(100), nullable=False)
|
kintoneuser = Column(String(100), nullable=False)
|
||||||
kintonepwd = Column(String(100), nullable=False)
|
kintonepwd = Column(String(100), nullable=False)
|
||||||
|
def decrypt_kintonepwd(self):
|
||||||
|
decrypted_pwd = chacha20Decrypt(self.kintonepwd)
|
||||||
|
return decrypted_pwd
|
||||||
|
|
||||||
|
|
||||||
class UserDomain(Base):
|
class UserDomain(Base):
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ from pydantic import BaseModel
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import typing as t
|
import typing as t
|
||||||
|
|
||||||
|
from app.core.security import chacha20Decrypt, chacha20Encrypt
|
||||||
|
|
||||||
class Base(BaseModel):
|
class Base(BaseModel):
|
||||||
create_time: datetime
|
create_time: datetime
|
||||||
@@ -119,6 +120,10 @@ class DomainBase(BaseModel):
|
|||||||
kintoneuser: str
|
kintoneuser: str
|
||||||
kintonepwd: str
|
kintonepwd: str
|
||||||
|
|
||||||
|
def encrypt_kintonepwd(self):
|
||||||
|
encrypted_pwd = chacha20Encrypt(self.kintonepwd)
|
||||||
|
self.kintonepwd = encrypted_pwd
|
||||||
|
|
||||||
class Domain(Base):
|
class Domain(Base):
|
||||||
id: int
|
id: int
|
||||||
tenantid: str
|
tenantid: str
|
||||||
@@ -126,7 +131,6 @@ class Domain(Base):
|
|||||||
url: str
|
url: str
|
||||||
kintoneuser: str
|
kintoneuser: str
|
||||||
kintonepwd: str
|
kintonepwd: str
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
orm_mode = True
|
orm_mode = True
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user