220 lines
6.7 KiB
Python
220 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
微信用户反馈每分钟同步到飞书收集表格
|
|
"""
|
|
import os
|
|
import json
|
|
import pymysql
|
|
import subprocess
|
|
from datetime import datetime
|
|
|
|
# 配置
|
|
LOG_FILE = "/var/log/sync_wechat_feedback.log"
|
|
LAST_SYNC_ID_FILE = "/root/.openclaw/workspace-xiaokui/data/last_wechat_sync_id"
|
|
TARGET_SPREADSHEET_TOKEN = "AOxbsifk3hybRZteGowcMxNnnqc"
|
|
TARGET_SHEET_ID = "f17380"
|
|
LARK_CLI_CONFIG = "/root/.openclaw/credentials/xiaoyan"
|
|
|
|
# 数据库配置
|
|
DB_CONFIG = {
|
|
"host": "bj-cdb-8frbdwju.sql.tencentcdb.com",
|
|
"port": 25413,
|
|
"user": "read_only",
|
|
"password": "fdsfiidier^$*hjfdijjd232",
|
|
"database": "vala_test",
|
|
"charset": "utf8mb4"
|
|
}
|
|
|
|
def log(message):
|
|
"""写日志"""
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
|
f.write(f"[{timestamp}] {message}\n")
|
|
print(f"[{timestamp}] {message}")
|
|
|
|
def get_last_sync_id():
|
|
"""获取上次同步的最大ID"""
|
|
if not os.path.exists(LAST_SYNC_ID_FILE):
|
|
return 0
|
|
with open(LAST_SYNC_ID_FILE, "r") as f:
|
|
return int(f.read().strip())
|
|
|
|
def save_last_sync_id(last_id):
|
|
"""保存上次同步的最大ID"""
|
|
with open(LAST_SYNC_ID_FILE, "w") as f:
|
|
f.write(str(last_id))
|
|
|
|
def fetch_new_wechat_data(last_id):
|
|
"""从MySQL读取新增的微信反馈数据"""
|
|
conn = pymysql.connect(**DB_CONFIG)
|
|
cursor = conn.cursor()
|
|
sql = """
|
|
SELECT id, msg_time, sender_name, msg_type, content
|
|
FROM wechat_group_message
|
|
WHERE id > %s
|
|
ORDER BY msg_time DESC
|
|
"""
|
|
cursor.execute(sql, (last_id,))
|
|
rows = cursor.fetchall()
|
|
cursor.close()
|
|
conn.close()
|
|
return rows
|
|
|
|
def convert_to_sheet_format(rows):
|
|
"""转换为飞书表格需要的格式"""
|
|
sheet_rows = []
|
|
max_id = 0
|
|
for row in rows:
|
|
id, msg_time, sender_name, msg_type, content = row
|
|
if id > max_id:
|
|
max_id = id
|
|
|
|
# 整理反馈内容
|
|
if msg_type == "text":
|
|
feedback_content = content
|
|
elif msg_type == "image":
|
|
feedback_content = f"[图片] {content}"
|
|
elif msg_type == "video":
|
|
feedback_content = f"[视频] {content}"
|
|
else:
|
|
feedback_content = f"[{msg_type}] {content}"
|
|
|
|
# 清理特殊字符
|
|
feedback_content = feedback_content.replace("\n", " ").replace("\r", "").replace('"', '\\"')
|
|
|
|
sheet_rows.append([
|
|
"", # A 序号后续统一生成
|
|
"微信-用户火线救火", # B 反馈渠道
|
|
msg_time.strftime("%Y-%m-%d %H:%M:%S"), # C 反馈时间
|
|
sender_name, # D 反馈人
|
|
feedback_content, # E 反馈内容
|
|
"", # F 备注 留空
|
|
"" # G 回复 留空
|
|
])
|
|
return sheet_rows, max_id
|
|
|
|
def get_existing_sheet_data():
|
|
"""读取目标表格现有数据"""
|
|
cmd = [
|
|
"lark-cli", "sheets", "+read",
|
|
"--spreadsheet-token", TARGET_SPREADSHEET_TOKEN,
|
|
"--range", f"{TARGET_SHEET_ID}!A2:O",
|
|
"--as", "bot"
|
|
]
|
|
env = os.environ.copy()
|
|
env["LARKSUITE_CLI_CONFIG_DIR"] = LARK_CLI_CONFIG
|
|
env["PATH"] = "/root/.nvm/versions/node/v24.14.0/bin:" + env.get("PATH", "")
|
|
|
|
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
log(f"读取表格失败: {result.stderr}")
|
|
return []
|
|
|
|
try:
|
|
data = json.loads(result.stdout)
|
|
values = data.get("data", {}).get("valueRange", {}).get("values", [])
|
|
# 过滤空行
|
|
valid_values = [row for row in values if row and len(row) >= 1 and row[0] and row[0].strip()]
|
|
return valid_values
|
|
except Exception as e:
|
|
log(f"解析表格数据失败: {str(e)}")
|
|
return []
|
|
|
|
def merge_and_sort_data(new_rows, existing_rows):
|
|
"""合并新老数据,按时间倒序排列,重新生成序号,去重"""
|
|
seen = set()
|
|
all_rows = []
|
|
|
|
# 合并新老数据
|
|
for row in new_rows + existing_rows:
|
|
if not row or len(row) < 5:
|
|
continue
|
|
# 去重:时间+反馈人+内容
|
|
key = f'{row[2]}_{row[3]}_{row[4]}'
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
all_rows.append(row)
|
|
|
|
# 按反馈时间倒序排列
|
|
all_rows.sort(key=lambda x: x[2], reverse=True)
|
|
# 重新生成序号
|
|
for i, row in enumerate(all_rows):
|
|
row[0] = str(i + 1)
|
|
return all_rows
|
|
|
|
def write_to_sheet(all_rows):
|
|
"""写入数据到飞书表格"""
|
|
if not all_rows:
|
|
log("无数据需要写入")
|
|
return
|
|
|
|
# 清空原有数据
|
|
empty_data = [["" for _ in range(15)] for _ in range(200)]
|
|
cmd_clear = [
|
|
"lark-cli", "sheets", "+write",
|
|
"--spreadsheet-token", TARGET_SPREADSHEET_TOKEN,
|
|
"--range", f"{TARGET_SHEET_ID}!A2:O201",
|
|
"--values", json.dumps(empty_data, ensure_ascii=False),
|
|
"--as", "bot"
|
|
]
|
|
env = os.environ.copy()
|
|
env["LARKSUITE_CLI_CONFIG_DIR"] = LARK_CLI_CONFIG
|
|
env["PATH"] = "/root/.nvm/versions/node/v24.14.0/bin:" + env.get("PATH", "")
|
|
|
|
subprocess.run(cmd_clear, env=env, capture_output=True)
|
|
|
|
# 写入新数据
|
|
end_row = len(all_rows) + 1
|
|
cmd_write = [
|
|
"lark-cli", "sheets", "+write",
|
|
"--spreadsheet-token", TARGET_SPREADSHEET_TOKEN,
|
|
"--range", f"{TARGET_SHEET_ID}!A2:O{end_row}",
|
|
"--values", json.dumps(all_rows, ensure_ascii=False),
|
|
"--as", "bot"
|
|
]
|
|
|
|
result = subprocess.run(cmd_write, env=env, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
log(f"写入表格失败: {result.stderr}")
|
|
return False
|
|
return True
|
|
|
|
def main():
|
|
log("=== 微信反馈同步任务开始 ===")
|
|
last_sync_id = get_last_sync_id()
|
|
log(f"上次同步最大ID: {last_sync_id}")
|
|
|
|
# 获取新数据
|
|
new_rows = fetch_new_wechat_data(last_sync_id)
|
|
if not new_rows:
|
|
log("无新增微信反馈数据,退出")
|
|
return
|
|
|
|
new_count = len(new_rows)
|
|
log(f"发现 {new_count} 条新增微信反馈数据")
|
|
|
|
# 转换格式
|
|
sheet_rows, new_max_id = convert_to_sheet_format(new_rows)
|
|
|
|
# 获取现有数据
|
|
existing_rows = get_existing_sheet_data()
|
|
log(f"现有表格数据量: {len(existing_rows)} 条")
|
|
|
|
# 合并排序
|
|
all_rows = merge_and_sort_data(sheet_rows, existing_rows)
|
|
log(f"合并后总数据量: {len(all_rows)} 条")
|
|
|
|
# 写入表格
|
|
success = write_to_sheet(all_rows)
|
|
if success:
|
|
save_last_sync_id(new_max_id)
|
|
log(f"✅ 同步完成,新增 {new_count} 条,总 {len(all_rows)} 条,已按时间倒序排列")
|
|
else:
|
|
log("❌ 同步失败")
|
|
|
|
log("=== 同步任务完成 ===")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|