ai_member_xiaokui/scripts/sync_wechat_feedback_minutely.py
2026-04-18 08:10:01 +08:00

220 lines
6.7 KiB
Python

#!/usr/bin/env python3
"""
微信用户反馈每分钟同步到飞书收集表格
"""
import os
import json
import pymysql
import subprocess
from datetime import datetime
# 配置
LOG_FILE = "/var/log/sync_wechat_feedback.log"
LAST_SYNC_ID_FILE = "/root/.openclaw/workspace-xiaokui/data/last_wechat_sync_id"
TARGET_SPREADSHEET_TOKEN = "AOxbsifk3hybRZteGowcMxNnnqc"
TARGET_SHEET_ID = "f17380"
LARK_CLI_CONFIG = "/root/.openclaw/credentials/xiaoyan"
# 数据库配置
DB_CONFIG = {
"host": "bj-cdb-8frbdwju.sql.tencentcdb.com",
"port": 25413,
"user": "read_only",
"password": "fdsfiidier^$*hjfdijjd232",
"database": "vala_test",
"charset": "utf8mb4"
}
def log(message):
"""写日志"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] {message}\n")
print(f"[{timestamp}] {message}")
def get_last_sync_id():
"""获取上次同步的最大ID"""
if not os.path.exists(LAST_SYNC_ID_FILE):
return 0
with open(LAST_SYNC_ID_FILE, "r") as f:
return int(f.read().strip())
def save_last_sync_id(last_id):
"""保存上次同步的最大ID"""
with open(LAST_SYNC_ID_FILE, "w") as f:
f.write(str(last_id))
def fetch_new_wechat_data(last_id):
"""从MySQL读取新增的微信反馈数据"""
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
sql = """
SELECT id, msg_time, sender_name, msg_type, content
FROM wechat_group_message
WHERE id > %s
ORDER BY msg_time DESC
"""
cursor.execute(sql, (last_id,))
rows = cursor.fetchall()
cursor.close()
conn.close()
return rows
def convert_to_sheet_format(rows):
"""转换为飞书表格需要的格式"""
sheet_rows = []
max_id = 0
for row in rows:
id, msg_time, sender_name, msg_type, content = row
if id > max_id:
max_id = id
# 整理反馈内容
if msg_type == "text":
feedback_content = content
elif msg_type == "image":
feedback_content = f"[图片] {content}"
elif msg_type == "video":
feedback_content = f"[视频] {content}"
else:
feedback_content = f"[{msg_type}] {content}"
# 清理特殊字符
feedback_content = feedback_content.replace("\n", " ").replace("\r", "").replace('"', '\\"')
sheet_rows.append([
"", # A 序号后续统一生成
"微信-用户火线救火", # B 反馈渠道
msg_time.strftime("%Y-%m-%d %H:%M:%S"), # C 反馈时间
sender_name, # D 反馈人
feedback_content, # E 反馈内容
"", # F 备注 留空
"" # G 回复 留空
])
return sheet_rows, max_id
def get_existing_sheet_data():
"""读取目标表格现有数据"""
cmd = [
"lark-cli", "sheets", "+read",
"--spreadsheet-token", TARGET_SPREADSHEET_TOKEN,
"--range", f"{TARGET_SHEET_ID}!A2:O",
"--as", "bot"
]
env = os.environ.copy()
env["LARKSUITE_CLI_CONFIG_DIR"] = LARK_CLI_CONFIG
env["PATH"] = "/root/.nvm/versions/node/v24.14.0/bin:" + env.get("PATH", "")
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode != 0:
log(f"读取表格失败: {result.stderr}")
return []
try:
data = json.loads(result.stdout)
values = data.get("data", {}).get("valueRange", {}).get("values", [])
# 过滤空行
valid_values = [row for row in values if row and len(row) >= 1 and row[0] and row[0].strip()]
return valid_values
except Exception as e:
log(f"解析表格数据失败: {str(e)}")
return []
def merge_and_sort_data(new_rows, existing_rows):
"""合并新老数据,按时间倒序排列,重新生成序号,去重"""
seen = set()
all_rows = []
# 合并新老数据
for row in new_rows + existing_rows:
if not row or len(row) < 5:
continue
# 去重:时间+反馈人+内容
key = f'{row[2]}_{row[3]}_{row[4]}'
if key in seen:
continue
seen.add(key)
all_rows.append(row)
# 按反馈时间倒序排列
all_rows.sort(key=lambda x: x[2], reverse=True)
# 重新生成序号
for i, row in enumerate(all_rows):
row[0] = str(i + 1)
return all_rows
def write_to_sheet(all_rows):
"""写入数据到飞书表格"""
if not all_rows:
log("无数据需要写入")
return
# 清空原有数据
empty_data = [["" for _ in range(15)] for _ in range(200)]
cmd_clear = [
"lark-cli", "sheets", "+write",
"--spreadsheet-token", TARGET_SPREADSHEET_TOKEN,
"--range", f"{TARGET_SHEET_ID}!A2:O201",
"--values", json.dumps(empty_data, ensure_ascii=False),
"--as", "bot"
]
env = os.environ.copy()
env["LARKSUITE_CLI_CONFIG_DIR"] = LARK_CLI_CONFIG
env["PATH"] = "/root/.nvm/versions/node/v24.14.0/bin:" + env.get("PATH", "")
subprocess.run(cmd_clear, env=env, capture_output=True)
# 写入新数据
end_row = len(all_rows) + 1
cmd_write = [
"lark-cli", "sheets", "+write",
"--spreadsheet-token", TARGET_SPREADSHEET_TOKEN,
"--range", f"{TARGET_SHEET_ID}!A2:O{end_row}",
"--values", json.dumps(all_rows, ensure_ascii=False),
"--as", "bot"
]
result = subprocess.run(cmd_write, env=env, capture_output=True, text=True)
if result.returncode != 0:
log(f"写入表格失败: {result.stderr}")
return False
return True
def main():
log("=== 微信反馈同步任务开始 ===")
last_sync_id = get_last_sync_id()
log(f"上次同步最大ID: {last_sync_id}")
# 获取新数据
new_rows = fetch_new_wechat_data(last_sync_id)
if not new_rows:
log("无新增微信反馈数据,退出")
return
new_count = len(new_rows)
log(f"发现 {new_count} 条新增微信反馈数据")
# 转换格式
sheet_rows, new_max_id = convert_to_sheet_format(new_rows)
# 获取现有数据
existing_rows = get_existing_sheet_data()
log(f"现有表格数据量: {len(existing_rows)}")
# 合并排序
all_rows = merge_and_sort_data(sheet_rows, existing_rows)
log(f"合并后总数据量: {len(all_rows)}")
# 写入表格
success = write_to_sheet(all_rows)
if success:
save_last_sync_id(new_max_id)
log(f"✅ 同步完成,新增 {new_count} 条,总 {len(all_rows)} 条,已按时间倒序排列")
else:
log("❌ 同步失败")
log("=== 同步任务完成 ===")
if __name__ == "__main__":
main()