Files
workspace/send_dingtalk_file_v2.py
aitest 15c4480db1 Add OpenClaw workspace configuration and tools
- Add agent configuration files (AGENTS.md, USER.md, IDENTITY.md, SOUL.md)
- Add git configuration and skills management scripts
- Add frontend/backend analysis tools and reports
- Add DingTalk media sender utilities and documentation
- Fix OpenClaw runtime environment (Node.js and Python)
- Configure git remotes and push scripts
2026-03-05 13:56:59 +09:00

116 lines
3.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import json
import os
#钉钉配置
DINGTALK_APP_KEY = "ding4ursdp0l2giat4bj"
DINGTALK_APP_SECRET = "J0gBicjKiIHoKla7WfKKhRs1Tv8L6Xd5UhW3EVQByF16G7Vn7UUcRhP6u-PBCQNo"
ROBOT_CODE = "ding4ursdp0l2giat4bj"
OPEN_CONVERSATION_ID = "cidcjYshXVtKck5LfOO9AqOJg=="
FILE_PATH = r"C:\Users\ALC\.openclaw\workspace\前后端功能与开源可修改性分析报告.docx"
FILE_NAME = "前后端功能与开源可修改性分析报告.docx"
def upload_v2(access_token, file_path, file_type="file"):
"""钉钉机器人V2文件上传"""
url = "https://api.dingtalk.com/v2.0/media/upload"
if not os.path.exists(file_path):
raise Exception(f"File not found: {file_path}")
files = {
'media': (os.path.basename(file_path), open(file_path, 'rb'))
}
params = {
'type': file_type
}
headers = {
'x-acs-dingtalk-access-token': access_token
}
response = requests.post(url, params=params, headers=headers, files=files, timeout=30)
result = response.json()
if "mediaId" in result:
print(f"Upload success! mediaId: {result['mediaId']}")
return result["mediaId"]
else:
raise Exception(f"Failed to upload media: {result}")
def send_group_v2(access_token, open_conversation_id, robot_code, media_id, file_type="file", file_name=None):
"""发送群聊媒体消息V2"""
url = "https://api.dingtalk.com/v2.0/robot/orgGroup/send"
msg_key = "sampleFile"
msg_param = json.dumps({
"mediaId": media_id,
"fileName": file_name or "file"
}, ensure_ascii=False)
headers = {
'x-acs-dingtalk-access-token': access_token,
'Content-Type': 'application/json; charset=utf-8'
}
payload = {
"openConversationId": open_conversation_id,
"robotCode": robot_code,
"msgKey": msg_key,
"msgParam": msg_param
}
response = requests.post(url, headers=headers, json=payload, timeout=30)
result = response.json()
print(f"Send result: {json.dumps(result, ensure_ascii=False)}")
if "processQueryKey" in result:
print(f"Success! processQueryKey: {result['processQueryKey']}")
return result['processQueryKey']
else:
raise Exception(f"Failed to send message: {result}")
def get_token_v2(app_key, app_secret):
"""获取access token"""
url = "https://api.dingtalk.com/v1.0/oauth2/getAccessToken"
headers = {"Content-Type": "application/json"}
data = {"appKey": app_key, "appSecret": app_secret}
response = requests.post(url, headers=headers, json=data, timeout=10)
result = response.json()
if "accessToken" in result:
return result["accessToken"]
else:
raise Exception(f"Failed to get token: {result}")
def main():
try:
print("=== DingTalk V2 File Sender ===")
print(f"File: {FILE_PATH.encode('utf-8')}")
print(f"Size: {os.path.getsize(FILE_PATH) / 1024:.2f} KB")
print("\nStep 1: Getting token...")
access_token = get_token_v2(DINGTALK_APP_KEY, DINGTALK_APP_SECRET)
print("Token obtained")
print("\nStep 2: Upload file...")
media_id = upload_v2(access_token, FILE_PATH, "file")
print("\nStep 3: Send to group...")
process_query_key = send_group_v2(access_token, OPEN_CONVERSATION_ID, ROBOT_CODE, media_id, "file", FILE_NAME)
print(f"\n=== Success! ===")
print(f"File sent! processQueryKey: {process_query_key}")
except Exception as e:
print(f"\n=== Error ===")
print(f"Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()