import requests import json import os from requests_toolbelt.multipart.encoder import MultipartEncoder DINGTALK_APP_KEY = "ding4ursdp0l2giat4bj" DINGTALK_APP_SECRET = "J0gBicjKiIHoKla7WfKKhRs1Tv8L6Xd5UhW3EVQByF16G7Vn7UUcRhP6u-PBCQNo" ROBOT_CODE = "ding4ursdp0l2giat4bj" OPEN_CONVERSATION_ID = "cidcjYshXVtKck5LfOO9AqOJg==" FILE_PATH = r"C:\Users\ALC\.openclaw\workspace\前后端功能与开源可修改性分析报告.docx" FILE_NAME = "前后端功能与开源可修改性分析报告.docx" def get_token(): url = "https://api.dingtalk.com/v1.0/oauth2/accessToken" headers = {"Content-Type": "application/json"} data = {"appKey": DINGTALK_APP_KEY, "appSecret": DINGTALK_APP_SECRET} r = requests.post(url, headers=headers, json=data, timeout=10) return r.json()["accessToken"] def upload_file_with_toolbelt(access_token, file_path): url = "https://oapi.dingtalk.com/media/upload" with open(file_path, 'rb') as f: fields = { 'type': 'file', 'media': (FILE_NAME, f, 'application/vnd.openxmlformats-officedocument.wordprocessingml.document') } m = MultipartEncoder(fields=fields) headers = { 'Content-Type': m.content_type, 'x-acs-dingtalk-access-token': access_token } r = requests.post(url, headers=headers, data=m, timeout=30) return r.json() def send_sample_file(access_token, media_id): url = "https://oapi.dingtalk.com/robot/send" payload = { "msgtype": "file", "file": { "media_id": media_id }, "openConversationId": OPEN_CONVERSATION_ID } r = requests.post(url, json=payload, timeout=30) return r.json() def main(): try: token = get_token() result_path = r"C:\Users\ALC\.openclaw\workspace\final_send_result.txt" with open(result_path, "w", encoding="utf-8") as f: f.write(f"=== DingTalk Final Send Attempt ===\n\n") f.write("Step 1: Get Token\n") f.write(f"Token obtained successfully\n\n") f.write("Step 2: Upload File\n") upload_result = upload_file_with_toolbelt(token, FILE_PATH) f.write(f"{json.dumps(upload_result, ensure_ascii=False)}\n\n") if 'media_id' in upload_result: media_id = upload_result['media_id'] f.write(f"Media ID: {media_id}\n\n") f.write("Step 3: Send sampleFile message\n") send_result = send_sample_file(token, media_id) f.write(f"{json.dumps(send_result, ensure_ascii=False)}\n\n") if send_result.get('errcode') == 0 and send_result.get('processQueryKey'): f.write("=== SUCCESS ===\n") f.write(f"ProcessQueryKey: {send_result['processQueryKey']}\n") else: f.write(f"=== FAILED ===\n") f.write(f"Error code: {send_result.get('errcode')}\n") f.write(f"Message: {send_result.get('errmsg', '')}\n") else: f.write("=== UPLOAD FAILED ===\n") f.write(f"{upload_result}\n") print("Done. Check result.txt") except Exception as e: with open(r"C:\Users\ALC\.openclaw\workspace\final_send_error.txt", "w", encoding="utf-8") as f: f.write(f"Error: {e}\n") import traceback f.write(traceback.format_exc()) print(f"Error: {e}") if __name__ == "__main__": main()