Files
workspace/send_dingtalk_image.py
aitest 15c4480db1 Add OpenClaw workspace configuration and tools
- Add agent configuration files (AGENTS.md, USER.md, IDENTITY.md, SOUL.md)
- Add git configuration and skills management scripts
- Add frontend/backend analysis tools and reports
- Add DingTalk media sender utilities and documentation
- Fix OpenClaw runtime environment (Node.js and Python)
- Configure git remotes and push scripts
2026-03-05 13:56:59 +09:00

169 lines
4.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
发送图片到钉钉群聊
"""
import requests
import json
import os
import sys
# 钉钉配置
APP_KEY = "ding4ursdp0l2giat4bj"
APP_SECRET = "J0gBicjKiIHoKla7WfKKhRs1Tv8L6Xd5UhW3EVQByF16G7Vn7UUcRhP6u-PBCQNo"
# 群聊和机器人配置
OPEN_CONVERSATION_ID = "cidcjYshXVtKck5LfOO9AqOJg=="
ROBOT_CODE = "4293382733"
# 图片路径
IMAGE_PATH = r"C:\Users\ALC\.openclaw\media\browser\1ad3cfc9-2dd5-496b-9fa5-26d23b973f76.jpg"
def get_access_token():
"""获取 access_token"""
url = "https://api.dingtalk.com/v1.0/oauth2/accessToken"
headers = {"Content-Type": "application/json"}
data = {
"appKey": APP_KEY,
"appSecret": APP_SECRET
}
try:
response = requests.post(url, headers=headers, json=data)
result = response.json()
if "accessToken" in result:
print(f"✅ Access Token 获取成功")
return result["accessToken"]
else:
print(f"❌ 获取 Access Token 失败: {result}")
return None
except Exception as e:
print(f"❌ 获取 Access Token 异常: {e}")
return None
def upload_media(access_token, file_path, media_type="image"):
"""上传媒体文件"""
url = "https://api.dingtalk.com/v1.0/media/upload"
if not os.path.exists(file_path):
print(f"❌ 文件不存在: {file_path}")
return None
file_size = os.path.getsize(file_path)
print(f"📁 文件大小: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)")
headers = {
"x-acs-dingtalk-access-token": access_token
}
try:
with open(file_path, 'rb') as f:
files = {"file": f}
params = {"type": media_type}
response = requests.post(url, headers=headers, files=files, params=params)
result = response.json()
if "mediaId" in result:
print(f"✅ 文件上传成功mediaId: {result['mediaId']}")
return result['mediaId']
else:
print(f"❌ 上传失败: {result}")
return None
except Exception as e:
print(f"❌ 上传异常: {e}")
return None
def send_media_message(access_token, conversation_id, robot_code, media_id, media_type="image"):
"""发送媒体消息"""
url = "https://api.dingtalk.com/v1.0/robot/orgGroupSend"
headers = {
"x-acs-dingtalk-access-token": access_token,
"Content-Type": "application/json"
}
# 根据媒体类型构建消息
if media_type == "image":
msg_key = "sampleImage"
msg_param = json.dumps({"mediaId": media_id, "altText": "日本 Yahoo 首页截图"})
elif media_type == "file":
msg_key = "sampleFile"
msg_param = json.dumps({"mediaId": media_id, "fileName": "文件"})
elif media_type == "video":
msg_key = "sampleVideo"
msg_param = json.dumps({"mediaId": media_id, "videoTitle": "视频"})
else:
print(f"❌ 不支持的媒体类型: {media_type}")
return False
data = {
"openConversationId": conversation_id,
"robotCode": robot_code,
"msgKey": msg_key,
"msgParam": msg_param
}
try:
response = requests.post(url, headers=headers, json=data)
result = response.json()
if "processQueryKey" in result:
print(f"✅ 消息发送成功processQueryKey: {result['processQueryKey']}")
return True
else:
print(f"❌ 发送失败: {result}")
return False
except Exception as e:
print(f"❌ 发送异常: {e}")
return False
def main():
"""主函数"""
print("=" * 60)
print("开始发送图片到钉钉群聊")
print("=" * 60)
# 步骤 1: 获取 access_token
print("\n步骤 1: 获取 Access Token...")
access_token = get_access_token()
if not access_token:
print("❌ 无法继续,退出")
return 1
# 步骤 2: 上传媒体文件
print("\n步骤 2: 上传媒体文件...")
media_id = upload_media(access_token, IMAGE_PATH, "image")
if not media_id:
print("❌ 无法继续,退出")
return 1
# 步骤 3: 发送媒体消息
print("\n步骤 3: 发送媒体消息到群聊...")
success = send_media_message(
access_token,
OPEN_CONVERSATION_ID,
ROBOT_CODE,
media_id,
"image"
)
if success:
print("\n" + "=" * 60)
print("✅ 发送成功!")
print("=" * 60)
return 0
else:
print("\n" + "=" * 60)
print("❌ 发送失败")
print("=" * 60)
return 1
if __name__ == "__main__":
sys.exit(main())