Files
workspace/send_dingtalk_file.py
aitest 15c4480db1 Add OpenClaw workspace configuration and tools
- Add agent configuration files (AGENTS.md, USER.md, IDENTITY.md, SOUL.md)
- Add git configuration and skills management scripts
- Add frontend/backend analysis tools and reports
- Add DingTalk media sender utilities and documentation
- Fix OpenClaw runtime environment (Node.js and Python)
- Configure git remotes and push scripts
2026-03-05 13:56:59 +09:00

128 lines
3.9 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""直接发送钉钉文件到群聊"""
import requests
import json
import os
# 配置信息
DINGTALK_APP_KEY = "ding4ursdp0l2giat4bj"
DINGTALK_APP_SECRET = "J0gBicjKiIHoKla7WfKKhRs1Tv8L6Xd5UhW3EVQByF16G7Vn7UUcRhP6u-PBCQNo"
ROBOT_CODE = "ding4ursdp0l2giat4bj"
OPEN_CONVERSATION_ID = "cidcjYshXVtKck5LfOO9AqOJg=="
FILE_PATH = r"C:\Users\ALC\.openclaw\workspace\前后端功能与开源可修改性分析报告.docx"
FILE_NAME = "前后端功能与开源可修改性分析报告.docx"
def get_access_token(app_key, app_secret):
"""获取钉钉access_token"""
url = "https://api.dingtalk.com/v1.0/oauth2/accessToken"
headers = {"Content-Type": "application/json"}
data = {
"appKey": app_key,
"appSecret": app_secret
}
response = requests.post(url, headers=headers, json=data, timeout=10)
result = response.json()
if "accessToken" in result:
return result["accessToken"]
else:
raise Exception(f"Failed to get access_token: {result}")
def upload_media(access_token, file_path, file_type="file"):
"""上传媒体文件"""
url = "https://api.dingtalk.com/v1.0/media/upload"
if not os.path.exists(file_path):
raise Exception(f"File not found: {file_path}")
files = {
'media': (os.path.basename(file_path), open(file_path, 'rb'), 'application/vnd.openxmlformats-officedocument.wordprocessingml.document')
}
params = {
'type': file_type
}
headers = {
'x-acs-dingtalk-access-token': access_token
}
response = requests.post(url, params=params, headers=headers, files=files, timeout=30)
result = response.json()
if "mediaId" in result:
print(f"Upload success! mediaId: {result['mediaId']}")
return result["mediaId"]
else:
raise Exception(f"Failed to upload media: {result}")
def send_group_media_message(access_token, open_conversation_id, robot_code, media_id, file_type="file", file_name=None):
"""发送群聊媒体消息"""
url = "https://api.dingtalk.com/v1.0/robot/orgGroup/send"
# 构建消息参数
msg_key = "sampleFile"
msg_param = json.dumps({
"mediaId": media_id,
"fileName": file_name or "文件"
})
headers = {
'x-acs-dingtalk-access-token': access_token,
'Content-Type': 'application/json'
}
data = {
"openConversationId": open_conversation_id,
"robotCode": robot_code,
"msgKey": msg_key,
"msgParam": msg_param
}
response = requests.post(url, headers=headers, json=data, timeout=30)
result = response.json()
print(f"Send result: {json.dumps(result, ensure_ascii=False)}")
if "processQueryKey" in result:
print(f"Success! processQueryKey: {result['processQueryKey']}")
return result['processQueryKey']
else:
raise Exception(f"Failed to send message: {result}")
def main():
try:
print("=== DingTalk File Sender ===")
print(f"File: {FILE_PATH.encode('utf-8')}")
print(f"Upload size: {os.path.getsize(FILE_PATH) / 1024:.2f} KB")
# Step 1: Get access token
print("\nStep 1: Getting access token...")
access_token = get_access_token(DINGTALK_APP_KEY, DINGTALK_APP_SECRET)
print("Access token obtained")
# Step 2: Upload file
print("\nStep 2: Uploading file...")
media_id = upload_media(access_token, FILE_PATH, "file")
# Step 3: Send to group
print("\nStep 3: Sending to group...")
process_query_key = send_group_media_message(access_token, OPEN_CONVERSATION_ID, ROBOT_CODE, media_id, "file", FILE_NAME)
print(f"\n=== Success! ===")
print(f"File sent successfully!")
print(f"processQueryKey: {process_query_key}")
except Exception as e:
print(f"\n=== Error ===")
print(f"Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()