147 lines
4.9 KiB
Python
Executable File
147 lines
4.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
import os
|
||
import json
|
||
import time
|
||
import subprocess
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
# 配置区域 - 修改为你的实际配置
|
||
CHAT_ID = "oc_fabff7672e62a9ced7b326ee4a286c26" # 内容测试问题反馈群ID
|
||
SPREADSHEET_TOKEN = "E8vFsCmPBhT4SCtNmnJchqeJnJe" # 目标电子表格token
|
||
SHEET_ID = "7bce8f" # sheet页ID
|
||
LARK_CLI_CONFIG = "/root/.openclaw/credentials/xiaoyan" # Bot凭证目录
|
||
LAST_SYNC_FILE = "/tmp/last_feedback_sync_time"
|
||
COS_BASE_PATH = "feedback" # COS存储路径前缀
|
||
|
||
def run_cmd(cmd):
|
||
"""执行shell命令并返回结果"""
|
||
env = os.environ.copy()
|
||
env["LARKSUITE_CLI_CONFIG_DIR"] = LARK_CLI_CONFIG
|
||
result = subprocess.run(cmd, shell=True, env=env, capture_output=True, text=True)
|
||
if result.returncode != 0:
|
||
print(f"命令执行失败: {cmd}\n错误: {result.stderr}")
|
||
return None
|
||
return json.loads(result.stdout)
|
||
|
||
def get_last_sync_time():
|
||
"""获取上次同步时间,首次运行返回1小时前的时间戳"""
|
||
if not os.path.exists(LAST_SYNC_FILE):
|
||
return int(time.time()) - 3600
|
||
with open(LAST_SYNC_FILE, 'r') as f:
|
||
return int(f.read().strip())
|
||
|
||
def save_sync_time(timestamp):
|
||
"""保存本次同步时间"""
|
||
with open(LAST_SYNC_FILE, 'w') as f:
|
||
f.write(str(timestamp))
|
||
|
||
def get_group_messages(start_time, end_time):
|
||
"""获取指定时间范围内的群消息"""
|
||
# 转换时间戳为ISO 8601格式(带时区)
|
||
start_iso = datetime.fromtimestamp(start_time, tz=timezone(timedelta(hours=8))).isoformat()
|
||
end_iso = datetime.fromtimestamp(end_time, tz=timezone(timedelta(hours=8))).isoformat()
|
||
cmd = f'lark-cli im +chat-messages-list --chat-id {CHAT_ID} --start "{start_iso}" --end "{end_iso}" --page-size 50 --as bot'
|
||
result = run_cmd(cmd)
|
||
if not result or result.get("code") != 0:
|
||
print(f"获取群消息失败: {result}")
|
||
return []
|
||
return result.get("data", {}).get("items", [])
|
||
|
||
def process_message(msg):
|
||
"""处理单条消息,返回要写入表格的行数据"""
|
||
msg_type = msg.get("msg_type")
|
||
sender = msg.get("sender", {}).get("name", "未知用户")
|
||
create_time = datetime.fromtimestamp(int(msg.get("create_time"))).strftime("%Y-%m-%d %H:%M")
|
||
|
||
# 跳过系统消息和已删除消息
|
||
if msg_type in ["system", "deleted"]:
|
||
return None
|
||
|
||
content = json.loads(msg.get("content", "{}"))
|
||
info_type = "文本"
|
||
info_content = ""
|
||
|
||
if msg_type == "text":
|
||
info_content = content.get("text", "")
|
||
elif msg_type == "image":
|
||
info_type = "图片"
|
||
# 后续可添加上传COS逻辑,这里先记录image_key
|
||
info_content = f"image_key: {content.get('image_key', '')}"
|
||
elif msg_type == "media":
|
||
info_type = "视频"
|
||
info_content = f"file_key: {content.get('file_key', '')}"
|
||
elif msg_type == "audio":
|
||
info_type = "语音"
|
||
info_content = f"file_key: {content.get('file_key', '')}"
|
||
elif msg_type == "file":
|
||
info_type = "文件"
|
||
info_content = f"file_key: {content.get('file_key', '')}, 文件名: {content.get('file_name', '')}"
|
||
elif msg_type == "sticker":
|
||
info_type = "表情包"
|
||
info_content = "表情包消息"
|
||
else:
|
||
info_type = "其他"
|
||
info_content = f"未支持的消息类型: {msg_type}"
|
||
|
||
# 空内容跳过
|
||
if not info_content.strip():
|
||
return None
|
||
|
||
return [create_time, sender, info_type, info_content]
|
||
|
||
def append_to_sheet(rows):
|
||
"""批量追加行到电子表格"""
|
||
if not rows:
|
||
print("没有需要同步的消息")
|
||
return True
|
||
|
||
# 构造写入数据
|
||
values = []
|
||
for row in rows:
|
||
values.append({"values": [{"type": "text", "text": str(cell)} for cell in row]})
|
||
|
||
data = {
|
||
"range": f"{SHEET_ID}!A:D",
|
||
"values": values
|
||
}
|
||
|
||
cmd = f'lark-cli sheets values append --params \'{json.dumps(data)}\' --spreadsheet-token {SPREADSHEET_TOKEN} --as bot'
|
||
result = run_cmd(cmd)
|
||
if not result or result.get("code") != 0:
|
||
print(f"写入表格失败: {result}")
|
||
return False
|
||
print(f"成功写入{len(rows)}条记录到表格")
|
||
return True
|
||
|
||
def main():
|
||
print(f"开始同步群消息,时间范围: {datetime.fromtimestamp(get_last_sync_time())} 到 {datetime.now()}")
|
||
|
||
start_time = get_last_sync_time()
|
||
end_time = int(time.time())
|
||
|
||
# 获取消息
|
||
messages = get_group_messages(start_time, end_time)
|
||
if not messages:
|
||
print("没有获取到新消息")
|
||
return
|
||
|
||
# 处理消息
|
||
rows = []
|
||
for msg in messages:
|
||
row = process_message(msg)
|
||
if row:
|
||
rows.append(row)
|
||
|
||
# 倒序,最新的在后面
|
||
rows.reverse()
|
||
|
||
# 写入表格
|
||
if append_to_sheet(rows):
|
||
save_sync_time(end_time)
|
||
print("同步完成")
|
||
else:
|
||
print("同步失败")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|