#!/usr/bin/env python3 """ 微信用户反馈每分钟同步到飞书收集表格 """ import os import json import pymysql import subprocess from datetime import datetime # 配置 LOG_FILE = "/var/log/sync_wechat_feedback.log" LAST_SYNC_ID_FILE = "/root/.openclaw/workspace-xiaokui/data/last_wechat_sync_id" TARGET_SPREADSHEET_TOKEN = "AOxbsifk3hybRZteGowcMxNnnqc" TARGET_SHEET_ID = "f17380" LARK_CLI_CONFIG = "/root/.openclaw/credentials/xiaoyan" # 数据库配置 DB_CONFIG = { "host": "bj-cdb-8frbdwju.sql.tencentcdb.com", "port": 25413, "user": "read_only", "password": "fdsfiidier^$*hjfdijjd232", "database": "vala_test", "charset": "utf8mb4" } def log(message): """写日志""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(f"[{timestamp}] {message}\n") print(f"[{timestamp}] {message}") def get_last_sync_id(): """获取上次同步的最大ID""" if not os.path.exists(LAST_SYNC_ID_FILE): return 0 with open(LAST_SYNC_ID_FILE, "r") as f: return int(f.read().strip()) def save_last_sync_id(last_id): """保存上次同步的最大ID""" with open(LAST_SYNC_ID_FILE, "w") as f: f.write(str(last_id)) def fetch_new_wechat_data(last_id): """从MySQL读取新增的微信反馈数据""" conn = pymysql.connect(**DB_CONFIG) cursor = conn.cursor() sql = """ SELECT id, msg_time, sender_name, msg_type, content FROM wechat_group_message WHERE id > %s ORDER BY msg_time DESC """ cursor.execute(sql, (last_id,)) rows = cursor.fetchall() cursor.close() conn.close() return rows def convert_to_sheet_format(rows): """转换为飞书表格需要的格式""" sheet_rows = [] max_id = 0 for row in rows: id, msg_time, sender_name, msg_type, content = row if id > max_id: max_id = id # 整理反馈内容 if msg_type == "text": feedback_content = content elif msg_type == "image": feedback_content = f"[图片] {content}" elif msg_type == "video": feedback_content = f"[视频] {content}" else: feedback_content = f"[{msg_type}] {content}" # 清理特殊字符 feedback_content = feedback_content.replace("\n", " ").replace("\r", "").replace('"', '\\"') sheet_rows.append([ "", # A 序号后续统一生成 "微信-用户火线救火", # B 反馈渠道 msg_time.strftime("%Y-%m-%d %H:%M:%S"), # C 反馈时间 sender_name, # D 反馈人 feedback_content, # E 反馈内容 "", # F 备注 留空 "" # G 回复 留空 ]) return sheet_rows, max_id def get_existing_sheet_data(): """读取目标表格现有数据""" cmd = [ "lark-cli", "sheets", "+read", "--spreadsheet-token", TARGET_SPREADSHEET_TOKEN, "--range", f"{TARGET_SHEET_ID}!A2:O", "--as", "bot" ] env = os.environ.copy() env["LARKSUITE_CLI_CONFIG_DIR"] = LARK_CLI_CONFIG env["PATH"] = "/root/.nvm/versions/node/v24.14.0/bin:" + env.get("PATH", "") result = subprocess.run(cmd, env=env, capture_output=True, text=True) if result.returncode != 0: log(f"读取表格失败: {result.stderr}") return [] try: data = json.loads(result.stdout) values = data.get("data", {}).get("valueRange", {}).get("values", []) # 过滤空行 valid_values = [row for row in values if row and len(row) >= 1 and row[0] and row[0].strip()] return valid_values except Exception as e: log(f"解析表格数据失败: {str(e)}") return [] def merge_and_sort_data(new_rows, existing_rows): """合并新老数据,按时间倒序排列,重新生成序号,去重""" seen = set() all_rows = [] # 合并新老数据 for row in new_rows + existing_rows: if not row or len(row) < 5: continue # 去重:时间+反馈人+内容 key = f'{row[2]}_{row[3]}_{row[4]}' if key in seen: continue seen.add(key) all_rows.append(row) # 按反馈时间倒序排列 all_rows.sort(key=lambda x: x[2], reverse=True) # 重新生成序号 for i, row in enumerate(all_rows): row[0] = str(i + 1) return all_rows def write_to_sheet(all_rows): """写入数据到飞书表格""" if not all_rows: log("无数据需要写入") return # 清空原有数据 empty_data = [["" for _ in range(15)] for _ in range(200)] cmd_clear = [ "lark-cli", "sheets", "+write", "--spreadsheet-token", TARGET_SPREADSHEET_TOKEN, "--range", f"{TARGET_SHEET_ID}!A2:O201", "--values", json.dumps(empty_data, ensure_ascii=False), "--as", "bot" ] env = os.environ.copy() env["LARKSUITE_CLI_CONFIG_DIR"] = LARK_CLI_CONFIG env["PATH"] = "/root/.nvm/versions/node/v24.14.0/bin:" + env.get("PATH", "") subprocess.run(cmd_clear, env=env, capture_output=True) # 写入新数据 end_row = len(all_rows) + 1 cmd_write = [ "lark-cli", "sheets", "+write", "--spreadsheet-token", TARGET_SPREADSHEET_TOKEN, "--range", f"{TARGET_SHEET_ID}!A2:O{end_row}", "--values", json.dumps(all_rows, ensure_ascii=False), "--as", "bot" ] result = subprocess.run(cmd_write, env=env, capture_output=True, text=True) if result.returncode != 0: log(f"写入表格失败: {result.stderr}") return False return True def main(): log("=== 微信反馈同步任务开始 ===") last_sync_id = get_last_sync_id() log(f"上次同步最大ID: {last_sync_id}") # 获取新数据 new_rows = fetch_new_wechat_data(last_sync_id) if not new_rows: log("无新增微信反馈数据,退出") return new_count = len(new_rows) log(f"发现 {new_count} 条新增微信反馈数据") # 转换格式 sheet_rows, new_max_id = convert_to_sheet_format(new_rows) # 获取现有数据 existing_rows = get_existing_sheet_data() log(f"现有表格数据量: {len(existing_rows)} 条") # 合并排序 all_rows = merge_and_sort_data(sheet_rows, existing_rows) log(f"合并后总数据量: {len(all_rows)} 条") # 写入表格 success = write_to_sheet(all_rows) if success: save_last_sync_id(new_max_id) log(f"✅ 同步完成,新增 {new_count} 条,总 {len(all_rows)} 条,已按时间倒序排列") else: log("❌ 同步失败") log("=== 同步任务完成 ===") if __name__ == "__main__": main()