70 lines
2.4 KiB
Python
70 lines
2.4 KiB
Python
#!/usr/bin/env python3
|
||
import os
|
||
import json
|
||
import requests
|
||
|
||
# 读取环境变量里的飞书凭证(需要提前配置FEISHU_APP_ID和FEISHU_APP_SECRET)
|
||
FEISHU_APP_ID = os.getenv("FEISHU_APP_ID", "cli_a4d9e0f56e7a8b9c")
|
||
FEISHU_APP_SECRET = os.getenv("FEISHU_APP_SECRET", "your_app_secret_here")
|
||
TARGET_USER_OPEN_ID = "ou_d0474502fe89122e69d0e13123c7bb45"
|
||
FILE_PATH = "/root/.openclaw/workspace-xiaoban/output/260126/账户id_2148_角色id_2895_导出时间_20260303.xlsx"
|
||
FILE_NAME = "账户id_2148_角色id_2895_学习行为数据.xlsx"
|
||
|
||
def get_tenant_access_token():
|
||
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
|
||
payload = json.dumps({
|
||
"app_id": FEISHU_APP_ID,
|
||
"app_secret": FEISHU_APP_SECRET
|
||
})
|
||
headers = {
|
||
'Content-Type': 'application/json'
|
||
}
|
||
response = requests.request("POST", url, headers=headers, data=payload)
|
||
return response.json()["tenant_access_token"]
|
||
|
||
def upload_file(token):
|
||
url = "https://open.feishu.cn/open-apis/im/v1/files"
|
||
params = {
|
||
"file_type": "xls",
|
||
"file_name": FILE_NAME
|
||
}
|
||
payload = {}
|
||
files=[
|
||
('file',(FILE_NAME,open(FILE_PATH,'rb'),'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'))
|
||
]
|
||
headers = {
|
||
'Authorization': f'Bearer {token}'
|
||
}
|
||
response = requests.request("POST", url, headers=headers, data=payload, files=files, params=params)
|
||
return response.json()["data"]["file_key"]
|
||
|
||
def send_file_message(token, file_key):
|
||
url = "https://open.feishu.cn/open-apis/im/v1/messages"
|
||
params = {
|
||
"receive_id_type": "open_id"
|
||
}
|
||
payload = json.dumps({
|
||
"receive_id": TARGET_USER_OPEN_ID,
|
||
"msg_type": "file",
|
||
"content": json.dumps({
|
||
"file_key": file_key
|
||
})
|
||
})
|
||
headers = {
|
||
'Content-Type': 'application/json',
|
||
'Authorization': f'Bearer {token}'
|
||
}
|
||
response = requests.request("POST", url, headers=headers, data=payload, params=params)
|
||
return response.json()
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
token = get_tenant_access_token()
|
||
print(f"获取token成功: {token[:10]}...")
|
||
file_key = upload_file(token)
|
||
print(f"上传文件成功,file_key: {file_key}")
|
||
res = send_file_message(token, file_key)
|
||
print(f"发送消息结果: {json.dumps(res, indent=2, ensure_ascii=False)}")
|
||
except Exception as e:
|
||
print(f"出错了: {e}")
|