- Add agent configuration files (AGENTS.md, USER.md, IDENTITY.md, SOUL.md) - Add git configuration and skills management scripts - Add frontend/backend analysis tools and reports - Add DingTalk media sender utilities and documentation - Fix OpenClaw runtime environment (Node.js and Python) - Configure git remotes and push scripts
169 lines
4.6 KiB
Python
169 lines
4.6 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
发送图片到钉钉群聊
|
||
"""
|
||
import requests
|
||
import json
|
||
import os
|
||
import sys
|
||
|
||
# 钉钉配置
|
||
APP_KEY = "ding4ursdp0l2giat4bj"
|
||
APP_SECRET = "J0gBicjKiIHoKla7WfKKhRs1Tv8L6Xd5UhW3EVQByF16G7Vn7UUcRhP6u-PBCQNo"
|
||
|
||
# 群聊和机器人配置
|
||
OPEN_CONVERSATION_ID = "cidcjYshXVtKck5LfOO9AqOJg=="
|
||
ROBOT_CODE = "4293382733"
|
||
|
||
# 图片路径
|
||
IMAGE_PATH = r"C:\Users\ALC\.openclaw\media\browser\1ad3cfc9-2dd5-496b-9fa5-26d23b973f76.jpg"
|
||
|
||
|
||
def get_access_token():
|
||
"""获取 access_token"""
|
||
url = "https://api.dingtalk.com/v1.0/oauth2/accessToken"
|
||
headers = {"Content-Type": "application/json"}
|
||
data = {
|
||
"appKey": APP_KEY,
|
||
"appSecret": APP_SECRET
|
||
}
|
||
|
||
try:
|
||
response = requests.post(url, headers=headers, json=data)
|
||
result = response.json()
|
||
|
||
if "accessToken" in result:
|
||
print(f"✅ Access Token 获取成功")
|
||
return result["accessToken"]
|
||
else:
|
||
print(f"❌ 获取 Access Token 失败: {result}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"❌ 获取 Access Token 异常: {e}")
|
||
return None
|
||
|
||
|
||
def upload_media(access_token, file_path, media_type="image"):
|
||
"""上传媒体文件"""
|
||
url = "https://api.dingtalk.com/v1.0/media/upload"
|
||
|
||
if not os.path.exists(file_path):
|
||
print(f"❌ 文件不存在: {file_path}")
|
||
return None
|
||
|
||
file_size = os.path.getsize(file_path)
|
||
print(f"📁 文件大小: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)")
|
||
|
||
headers = {
|
||
"x-acs-dingtalk-access-token": access_token
|
||
}
|
||
|
||
try:
|
||
with open(file_path, 'rb') as f:
|
||
files = {"file": f}
|
||
params = {"type": media_type}
|
||
|
||
response = requests.post(url, headers=headers, files=files, params=params)
|
||
result = response.json()
|
||
|
||
if "mediaId" in result:
|
||
print(f"✅ 文件上传成功,mediaId: {result['mediaId']}")
|
||
return result['mediaId']
|
||
else:
|
||
print(f"❌ 上传失败: {result}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"❌ 上传异常: {e}")
|
||
return None
|
||
|
||
|
||
def send_media_message(access_token, conversation_id, robot_code, media_id, media_type="image"):
|
||
"""发送媒体消息"""
|
||
url = "https://api.dingtalk.com/v1.0/robot/orgGroupSend"
|
||
|
||
headers = {
|
||
"x-acs-dingtalk-access-token": access_token,
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
# 根据媒体类型构建消息
|
||
if media_type == "image":
|
||
msg_key = "sampleImage"
|
||
msg_param = json.dumps({"mediaId": media_id, "altText": "日本 Yahoo 首页截图"})
|
||
elif media_type == "file":
|
||
msg_key = "sampleFile"
|
||
msg_param = json.dumps({"mediaId": media_id, "fileName": "文件"})
|
||
elif media_type == "video":
|
||
msg_key = "sampleVideo"
|
||
msg_param = json.dumps({"mediaId": media_id, "videoTitle": "视频"})
|
||
else:
|
||
print(f"❌ 不支持的媒体类型: {media_type}")
|
||
return False
|
||
|
||
data = {
|
||
"openConversationId": conversation_id,
|
||
"robotCode": robot_code,
|
||
"msgKey": msg_key,
|
||
"msgParam": msg_param
|
||
}
|
||
|
||
try:
|
||
response = requests.post(url, headers=headers, json=data)
|
||
result = response.json()
|
||
|
||
if "processQueryKey" in result:
|
||
print(f"✅ 消息发送成功!processQueryKey: {result['processQueryKey']}")
|
||
return True
|
||
else:
|
||
print(f"❌ 发送失败: {result}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"❌ 发送异常: {e}")
|
||
return False
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("=" * 60)
|
||
print("开始发送图片到钉钉群聊")
|
||
print("=" * 60)
|
||
|
||
# 步骤 1: 获取 access_token
|
||
print("\n步骤 1: 获取 Access Token...")
|
||
access_token = get_access_token()
|
||
if not access_token:
|
||
print("❌ 无法继续,退出")
|
||
return 1
|
||
|
||
# 步骤 2: 上传媒体文件
|
||
print("\n步骤 2: 上传媒体文件...")
|
||
media_id = upload_media(access_token, IMAGE_PATH, "image")
|
||
if not media_id:
|
||
print("❌ 无法继续,退出")
|
||
return 1
|
||
|
||
# 步骤 3: 发送媒体消息
|
||
print("\n步骤 3: 发送媒体消息到群聊...")
|
||
success = send_media_message(
|
||
access_token,
|
||
OPEN_CONVERSATION_ID,
|
||
ROBOT_CODE,
|
||
media_id,
|
||
"image"
|
||
)
|
||
|
||
if success:
|
||
print("\n" + "=" * 60)
|
||
print("✅ 发送成功!")
|
||
print("=" * 60)
|
||
return 0
|
||
else:
|
||
print("\n" + "=" * 60)
|
||
print("❌ 发送失败")
|
||
print("=" * 60)
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|