#!/usr/bin/env python3 """ 发送图片到钉钉群聊 """ import requests import json import os import sys # 钉钉配置 APP_KEY = "ding4ursdp0l2giat4bj" APP_SECRET = "J0gBicjKiIHoKla7WfKKhRs1Tv8L6Xd5UhW3EVQByF16G7Vn7UUcRhP6u-PBCQNo" # 群聊和机器人配置 OPEN_CONVERSATION_ID = "cidcjYshXVtKck5LfOO9AqOJg==" ROBOT_CODE = "4293382733" # 图片路径 IMAGE_PATH = r"C:\Users\ALC\.openclaw\media\browser\1ad3cfc9-2dd5-496b-9fa5-26d23b973f76.jpg" def get_access_token(): """获取 access_token""" url = "https://api.dingtalk.com/v1.0/oauth2/accessToken" headers = {"Content-Type": "application/json"} data = { "appKey": APP_KEY, "appSecret": APP_SECRET } try: response = requests.post(url, headers=headers, json=data) result = response.json() if "accessToken" in result: print(f"✅ Access Token 获取成功") return result["accessToken"] else: print(f"❌ 获取 Access Token 失败: {result}") return None except Exception as e: print(f"❌ 获取 Access Token 异常: {e}") return None def upload_media(access_token, file_path, media_type="image"): """上传媒体文件""" url = "https://api.dingtalk.com/v1.0/media/upload" if not os.path.exists(file_path): print(f"❌ 文件不存在: {file_path}") return None file_size = os.path.getsize(file_path) print(f"📁 文件大小: {file_size} bytes ({file_size / 1024 / 1024:.2f} MB)") headers = { "x-acs-dingtalk-access-token": access_token } try: with open(file_path, 'rb') as f: files = {"file": f} params = {"type": media_type} response = requests.post(url, headers=headers, files=files, params=params) result = response.json() if "mediaId" in result: print(f"✅ 文件上传成功,mediaId: {result['mediaId']}") return result['mediaId'] else: print(f"❌ 上传失败: {result}") return None except Exception as e: print(f"❌ 上传异常: {e}") return None def send_media_message(access_token, conversation_id, robot_code, media_id, media_type="image"): """发送媒体消息""" url = "https://api.dingtalk.com/v1.0/robot/orgGroupSend" headers = { "x-acs-dingtalk-access-token": access_token, "Content-Type": "application/json" } # 根据媒体类型构建消息 if media_type == "image": msg_key = "sampleImage" msg_param = json.dumps({"mediaId": media_id, "altText": "日本 Yahoo 首页截图"}) elif media_type == "file": msg_key = "sampleFile" msg_param = json.dumps({"mediaId": media_id, "fileName": "文件"}) elif media_type == "video": msg_key = "sampleVideo" msg_param = json.dumps({"mediaId": media_id, "videoTitle": "视频"}) else: print(f"❌ 不支持的媒体类型: {media_type}") return False data = { "openConversationId": conversation_id, "robotCode": robot_code, "msgKey": msg_key, "msgParam": msg_param } try: response = requests.post(url, headers=headers, json=data) result = response.json() if "processQueryKey" in result: print(f"✅ 消息发送成功!processQueryKey: {result['processQueryKey']}") return True else: print(f"❌ 发送失败: {result}") return False except Exception as e: print(f"❌ 发送异常: {e}") return False def main(): """主函数""" print("=" * 60) print("开始发送图片到钉钉群聊") print("=" * 60) # 步骤 1: 获取 access_token print("\n步骤 1: 获取 Access Token...") access_token = get_access_token() if not access_token: print("❌ 无法继续,退出") return 1 # 步骤 2: 上传媒体文件 print("\n步骤 2: 上传媒体文件...") media_id = upload_media(access_token, IMAGE_PATH, "image") if not media_id: print("❌ 无法继续,退出") return 1 # 步骤 3: 发送媒体消息 print("\n步骤 3: 发送媒体消息到群聊...") success = send_media_message( access_token, OPEN_CONVERSATION_ID, ROBOT_CODE, media_id, "image" ) if success: print("\n" + "=" * 60) print("✅ 发送成功!") print("=" * 60) return 0 else: print("\n" + "=" * 60) print("❌ 发送失败") print("=" * 60) return 1 if __name__ == "__main__": sys.exit(main())