- Add agent configuration files (AGENTS.md, USER.md, IDENTITY.md, SOUL.md) - Add git configuration and skills management scripts - Add frontend/backend analysis tools and reports - Add DingTalk media sender utilities and documentation - Fix OpenClaw runtime environment (Node.js and Python) - Configure git remotes and push scripts
128 lines
3.9 KiB
Python
128 lines
3.9 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""直接发送钉钉文件到群聊"""
|
|
|
|
import requests
|
|
import json
|
|
import os
|
|
|
|
# 配置信息
|
|
DINGTALK_APP_KEY = "ding4ursdp0l2giat4bj"
|
|
DINGTALK_APP_SECRET = "J0gBicjKiIHoKla7WfKKhRs1Tv8L6Xd5UhW3EVQByF16G7Vn7UUcRhP6u-PBCQNo"
|
|
ROBOT_CODE = "ding4ursdp0l2giat4bj"
|
|
OPEN_CONVERSATION_ID = "cidcjYshXVtKck5LfOO9AqOJg=="
|
|
|
|
FILE_PATH = r"C:\Users\ALC\.openclaw\workspace\前后端功能与开源可修改性分析报告.docx"
|
|
FILE_NAME = "前后端功能与开源可修改性分析报告.docx"
|
|
|
|
def get_access_token(app_key, app_secret):
|
|
"""获取钉钉access_token"""
|
|
url = "https://api.dingtalk.com/v1.0/oauth2/accessToken"
|
|
headers = {"Content-Type": "application/json"}
|
|
data = {
|
|
"appKey": app_key,
|
|
"appSecret": app_secret
|
|
}
|
|
|
|
response = requests.post(url, headers=headers, json=data, timeout=10)
|
|
result = response.json()
|
|
|
|
if "accessToken" in result:
|
|
return result["accessToken"]
|
|
else:
|
|
raise Exception(f"Failed to get access_token: {result}")
|
|
|
|
def upload_media(access_token, file_path, file_type="file"):
|
|
"""上传媒体文件"""
|
|
url = "https://api.dingtalk.com/v1.0/media/upload"
|
|
|
|
if not os.path.exists(file_path):
|
|
raise Exception(f"File not found: {file_path}")
|
|
|
|
files = {
|
|
'media': (os.path.basename(file_path), open(file_path, 'rb'), 'application/vnd.openxmlformats-officedocument.wordprocessingml.document')
|
|
}
|
|
|
|
params = {
|
|
'type': file_type
|
|
}
|
|
|
|
headers = {
|
|
'x-acs-dingtalk-access-token': access_token
|
|
}
|
|
|
|
response = requests.post(url, params=params, headers=headers, files=files, timeout=30)
|
|
result = response.json()
|
|
|
|
if "mediaId" in result:
|
|
print(f"Upload success! mediaId: {result['mediaId']}")
|
|
return result["mediaId"]
|
|
else:
|
|
raise Exception(f"Failed to upload media: {result}")
|
|
|
|
def send_group_media_message(access_token, open_conversation_id, robot_code, media_id, file_type="file", file_name=None):
|
|
"""发送群聊媒体消息"""
|
|
url = "https://api.dingtalk.com/v1.0/robot/orgGroup/send"
|
|
|
|
# 构建消息参数
|
|
msg_key = "sampleFile"
|
|
msg_param = json.dumps({
|
|
"mediaId": media_id,
|
|
"fileName": file_name or "文件"
|
|
})
|
|
|
|
headers = {
|
|
'x-acs-dingtalk-access-token': access_token,
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
data = {
|
|
"openConversationId": open_conversation_id,
|
|
"robotCode": robot_code,
|
|
"msgKey": msg_key,
|
|
"msgParam": msg_param
|
|
}
|
|
|
|
response = requests.post(url, headers=headers, json=data, timeout=30)
|
|
result = response.json()
|
|
|
|
print(f"Send result: {json.dumps(result, ensure_ascii=False)}")
|
|
|
|
if "processQueryKey" in result:
|
|
print(f"Success! processQueryKey: {result['processQueryKey']}")
|
|
return result['processQueryKey']
|
|
else:
|
|
raise Exception(f"Failed to send message: {result}")
|
|
|
|
def main():
|
|
try:
|
|
print("=== DingTalk File Sender ===")
|
|
print(f"File: {FILE_PATH.encode('utf-8')}")
|
|
print(f"Upload size: {os.path.getsize(FILE_PATH) / 1024:.2f} KB")
|
|
|
|
# Step 1: Get access token
|
|
print("\nStep 1: Getting access token...")
|
|
access_token = get_access_token(DINGTALK_APP_KEY, DINGTALK_APP_SECRET)
|
|
print("Access token obtained")
|
|
|
|
# Step 2: Upload file
|
|
print("\nStep 2: Uploading file...")
|
|
media_id = upload_media(access_token, FILE_PATH, "file")
|
|
|
|
# Step 3: Send to group
|
|
print("\nStep 3: Sending to group...")
|
|
process_query_key = send_group_media_message(access_token, OPEN_CONVERSATION_ID, ROBOT_CODE, media_id, "file", FILE_NAME)
|
|
|
|
print(f"\n=== Success! ===")
|
|
print(f"File sent successfully!")
|
|
print(f"processQueryKey: {process_query_key}")
|
|
|
|
except Exception as e:
|
|
print(f"\n=== Error ===")
|
|
print(f"Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|