#!/usr/bin/env python3 # -*- coding: utf-8 -*- """直接发送钉钉文件到群聊""" import requests import json import os # 配置信息 DINGTALK_APP_KEY = "ding4ursdp0l2giat4bj" DINGTALK_APP_SECRET = "J0gBicjKiIHoKla7WfKKhRs1Tv8L6Xd5UhW3EVQByF16G7Vn7UUcRhP6u-PBCQNo" ROBOT_CODE = "ding4ursdp0l2giat4bj" OPEN_CONVERSATION_ID = "cidcjYshXVtKck5LfOO9AqOJg==" FILE_PATH = r"C:\Users\ALC\.openclaw\workspace\前后端功能与开源可修改性分析报告.docx" FILE_NAME = "前后端功能与开源可修改性分析报告.docx" def get_access_token(app_key, app_secret): """获取钉钉access_token""" url = "https://api.dingtalk.com/v1.0/oauth2/accessToken" headers = {"Content-Type": "application/json"} data = { "appKey": app_key, "appSecret": app_secret } response = requests.post(url, headers=headers, json=data, timeout=10) result = response.json() if "accessToken" in result: return result["accessToken"] else: raise Exception(f"Failed to get access_token: {result}") def upload_media(access_token, file_path, file_type="file"): """上传媒体文件""" url = "https://api.dingtalk.com/v1.0/media/upload" if not os.path.exists(file_path): raise Exception(f"File not found: {file_path}") files = { 'media': (os.path.basename(file_path), open(file_path, 'rb'), 'application/vnd.openxmlformats-officedocument.wordprocessingml.document') } params = { 'type': file_type } headers = { 'x-acs-dingtalk-access-token': access_token } response = requests.post(url, params=params, headers=headers, files=files, timeout=30) result = response.json() if "mediaId" in result: print(f"Upload success! mediaId: {result['mediaId']}") return result["mediaId"] else: raise Exception(f"Failed to upload media: {result}") def send_group_media_message(access_token, open_conversation_id, robot_code, media_id, file_type="file", file_name=None): """发送群聊媒体消息""" url = "https://api.dingtalk.com/v1.0/robot/orgGroup/send" # 构建消息参数 msg_key = "sampleFile" msg_param = json.dumps({ "mediaId": media_id, "fileName": file_name or "文件" }) headers = { 'x-acs-dingtalk-access-token': access_token, 'Content-Type': 'application/json' } data = { "openConversationId": open_conversation_id, "robotCode": robot_code, "msgKey": msg_key, "msgParam": msg_param } response = requests.post(url, headers=headers, json=data, timeout=30) result = response.json() print(f"Send result: {json.dumps(result, ensure_ascii=False)}") if "processQueryKey" in result: print(f"Success! processQueryKey: {result['processQueryKey']}") return result['processQueryKey'] else: raise Exception(f"Failed to send message: {result}") def main(): try: print("=== DingTalk File Sender ===") print(f"File: {FILE_PATH.encode('utf-8')}") print(f"Upload size: {os.path.getsize(FILE_PATH) / 1024:.2f} KB") # Step 1: Get access token print("\nStep 1: Getting access token...") access_token = get_access_token(DINGTALK_APP_KEY, DINGTALK_APP_SECRET) print("Access token obtained") # Step 2: Upload file print("\nStep 2: Uploading file...") media_id = upload_media(access_token, FILE_PATH, "file") # Step 3: Send to group print("\nStep 3: Sending to group...") process_query_key = send_group_media_message(access_token, OPEN_CONVERSATION_ID, ROBOT_CODE, media_id, "file", FILE_NAME) print(f"\n=== Success! ===") print(f"File sent successfully!") print(f"processQueryKey: {process_query_key}") except Exception as e: print(f"\n=== Error ===") print(f"Error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()