ai_member_xiaokui/scripts/sync_wechat_feedback.py
2026-05-23 08:10:01 +08:00

305 lines
12 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
微信用户反馈同步脚本 — 复用飞书聚类/归纳/优先级/分发逻辑
数据格式与飞书 sync_feishu_feedback.py 完全一致:
(message_id, sender_name, msg_type, content, media_url, quote_message_id, msg_time, msg_timestamp)
用法:
python3 sync_wechat_feedback.py --date 2026-05-21 --steps 7 --ai-placeholders --skip-dispatch
python3 sync_wechat_feedback.py --apply-ai /path/to/ai_descriptions.json
"""
import sys
import os
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SKILL_DIR = os.path.join(SCRIPT_DIR, "..", "skills", "feishu-feedback-sync", "scripts")
sys.path.insert(0, SKILL_DIR)
import argparse
import json
import pymysql
import subprocess
from datetime import datetime, timedelta
# ── Monkey-patch 微信专用常量 BEFORE importing 共享模块 ──
import sync_feishu_feedback as fsf
# 微信知识库父文档
fsf.SUMMARY_PARENT_NODE = "XhtGwjitFizzCNkw8Xzc2IXsnuf" # 微信用户反馈问题汇总
# ── 数据库 ──
DB_CONFIG = {
"host": "bj-cdb-8frbdwju.sql.tencentcdb.com",
"port": 25413,
"user": "read_only",
"password": "fdsfiidier^$*hjfdijjd232",
"database": "vala_test",
"charset": "utf8mb4",
}
CLI = "lark-cli"
CRED_DIR = "/root/.openclaw/credentials/xiaokui"
CONTEXT_DIR = os.path.join(SCRIPT_DIR, "..", "output", "daily_feedback")
def fetch_wechat_data(date_str):
"""从 wechat_group_message 读取指定日期的消息,
映射为与飞书 fetch_data 完全一致的元组格式:
(message_id, sender_name, msg_type, content, media_url, quote_message_id, msg_time, msg_timestamp)
"""
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
cursor.execute("""
SELECT svr_msg_id, sender_name, msg_type, content, media_url,
refer_msg_svrid, DATE_FORMAT(msg_time, '%%Y-%%m-%%d %%H:%%i:%%s') as msg_time,
msg_timestamp
FROM wechat_group_message
WHERE msg_time >= %s AND msg_time < %s
ORDER BY msg_time ASC
""", (f"{date_str} 00:00:00", f"{date_str} 23:59:59"))
rows = cursor.fetchall()
conn.close()
result = []
for row in rows:
svr_id, sname, mtype, content, murl, ref_id, mtime, mts = row
result.append((
str(svr_id) if svr_id else "", # message_id
sname or "", # sender_name
mtype or "text", # msg_type
content or "", # content
murl or "", # media_url
str(ref_id) if ref_id else "", # quote_message_id
mtime or "", # msg_time
int(mts) if mts else 0, # msg_timestamp
))
return result
def gen_context_json(date_str, clusters, cluster_order):
"""保存簇上下文 JSON 供 AI 使用"""
os.makedirs(CONTEXT_DIR, exist_ok=True)
ctx = {"date": date_str, "clusters": []}
for idx, cid in enumerate(cluster_order):
cmsgs = clusters[cid]
ctx["clusters"].append({
"index": idx + 1,
"cluster_id": cid,
"message_count": len(cmsgs),
"messages": [
{"sender": m[1], "content": m[3], "msg_type": m[2],
"time": m[6], "message_id": m[0], "quote_message_id": m[5]}
for m in cmsgs
],
})
path = os.path.join(CONTEXT_DIR, f"wechat_cluster_context_{date_str}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(ctx, f, ensure_ascii=False, indent=2)
print(f" 📝 微信 AI 上下文已保存: {path}")
return path
def main():
parser = argparse.ArgumentParser(description="微信问题反馈同步")
parser.add_argument("--date", type=str, required=True, help="处理日期 YYYY-MM-DD")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--skip-priority", action="store_true")
parser.add_argument("--skip-dispatch", action="store_true")
parser.add_argument("--ai-placeholders", action="store_true",
help="使用 [待AI归纳:#N] 占位符")
parser.add_argument("--apply-ai", type=str, default=None,
help="应用 AI 描述 JSON")
parser.add_argument("--steps", type=str, default="1-7")
args = parser.parse_args()
date_str = args.date
# ── --apply-ai 模式 ──
if args.apply_ai:
with open(args.apply_ai, "r", encoding="utf-8") as f:
ai_data = json.load(f)
descriptions = ai_data.get("descriptions", [])
print(f"📋 加载 {len(descriptions)} 条微信 AI 描述,日期: {date_str}")
# 读取上下文重建聚类
ctx_path = os.path.join(CONTEXT_DIR, f"wechat_cluster_context_{date_str}.json")
if not os.path.exists(ctx_path):
print(f"❌ 上下文文件不存在: {ctx_path}")
sys.exit(1)
with open(ctx_path, "r", encoding="utf-8") as f:
ctx = json.load(f)
clusters = {}
cluster_order = []
original_index_map = {} # cluster_id → original context index
for c in ctx["clusters"]:
cid = c["cluster_id"]
original_index_map[cid] = c["index"]
msgs_data = c["messages"]
msgs = []
for m in msgs_data:
msgs.append((
m.get("message_id", cid),
m["sender"],
m.get("msg_type", "text"),
m["content"],
"",
m.get("quote_message_id", ""),
m.get("time", ""),
0
))
if cid not in clusters:
clusters[cid] = []
cluster_order.append(cid)
clusters[cid].extend(msgs)
# 生成归中文档(复用飞书函数)
# generate_summary 会跳过 <2 条消息的簇并重新编号
# 需要建立 original_index → placeholder_number 映射
summary_md = fsf.generate_summary(
clusters, cluster_order,
skip_priority=True, ai_placeholders=True
)
if isinstance(summary_md, tuple):
summary_md = summary_md[0]
# 构建 original_index → placeholder_number 映射
valid_count = 0
index_mapping = {} # original_index → placeholder_number
for cid in cluster_order:
if len(clusters[cid]) >= 2:
valid_count += 1
idx = original_index_map.get(cid, valid_count)
index_mapping[idx] = valid_count
# 替换占位符:用 original_index 查映射得到 placeholder_number
for item in descriptions:
old_idx = item["index"]
desc = item["description"]
new_idx = index_mapping.get(old_idx)
if new_idx is None:
# 该簇被 generate_summary 跳过(单消息),忽略
continue
placeholder = f"[待AI归纳:#{new_idx}]"
summary_md = summary_md.replace(placeholder, desc)
print(f" 🔄 微信 #{old_idx}→#{new_idx}: {placeholder}{desc[:50]}...")
# 回写知识库子文档
title = f"微信-{date_str} 问题反馈"
nodes = fsf.list_child_nodes()
node_info = nodes.get(title, {})
obj_token = node_info.get("obj_token")
if not obj_token:
print(f" 📝 创建新文档: {title}")
fsf.update_summary_doc_as_children({date_str: summary_md}, title_prefix="微信-")
nodes = fsf.list_child_nodes()
node_info = nodes.get(title, {})
obj_token = node_info.get("obj_token")
if not obj_token:
print("❌ 无法创建/找到文档")
sys.exit(1)
tmp_md = "tmp/wechat_ai_summary.md"
with open(tmp_md, "w", encoding="utf-8") as f:
f.write(summary_md)
env = os.environ.copy()
env["LARKSUITE_CLI_CONFIG_DIR"] = CRED_DIR
result = subprocess.run(
[CLI, "docs", "+update", "--doc", obj_token, "--as", "bot",
"--mode", "overwrite", "--markdown", f"@{tmp_md}"],
env=env, capture_output=True, text=True, timeout=15)
os.unlink(tmp_md)
try:
resp = json.loads(result.stdout)
assert resp.get("ok"), f"写入失败: {result.stdout[:300]}"
print(f" ✅ AI 描述已应用到微信文档: {title}")
# 分发到群聊
if not args.skip_dispatch:
print(f" 📨 分发微信归纳到群聊...")
child_nt = node_info.get("node_token", fsf.SUMMARY_PARENT_NODE)
child_url = f"https://makee-interactive.feishu.cn/wiki/{child_nt}"
fsf.dispatch_summary_to_chat(
f"微信-{date_str}", summary_md,
p0_only=False, doc_url=child_url
)
print(f" ✅ 已分发")
except Exception as e:
print(f"{e}")
sys.exit(1)
if os.path.exists(ctx_path):
os.unlink(ctx_path)
print(f" 🗑️ 已清理上下文文件")
return
# ── 正常同步流程 ──
print(f"\n📊 查询微信 {date_str} 数据...")
rows = fetch_wechat_data(date_str)
if not rows:
print(f" ⚠️ {date_str} 无微信数据")
return
print(f" 📋 共 {len(rows)} 条消息")
do_summary = int(args.steps.split("-")[-1]) >= 3
total = len(rows)
if do_summary:
# 步骤 3聚类直接复用飞书 sort_threads
sorted_rows, clusters, cluster_order = fsf.sort_threads(rows)
if not cluster_order:
print(f" 无有效问题簇需要≥2条消息")
return
print(f" 聚类完成:{len(cluster_order)} 个问题")
for cid in cluster_order:
cmsgs = clusters[cid]
earliest = min(m[6] for m in cmsgs)
print(f"{cid}: {len(cmsgs)} 条消息,始于 {earliest}")
# 保存 AI 上下文
ctx_path = None
if args.ai_placeholders:
ctx_path = gen_context_json(date_str, clusters, cluster_order)
# 步骤 4-6生成归纳 + 优先级 + 写入知识库
summary_md = fsf.generate_summary(
clusters, cluster_order,
skip_priority=args.skip_priority,
ai_placeholders=args.ai_placeholders
)
if isinstance(summary_md, tuple):
summary_md = summary_md[0]
print(summary_md)
if not args.dry_run:
fsf.update_summary_doc_as_children({date_str: summary_md}, title_prefix="微信-")
print(f" ✅ 微信-{date_str} 问题反馈 写入成功")
# 步骤 7分发非 ai_placeholders 模式直接分发,否则等 --apply-ai
if do_summary and not args.skip_dispatch and not args.ai_placeholders:
print(f"\n📨 微信步骤7问题分发...")
child_nodes = fsf.list_child_nodes()
child_title = f"微信-{date_str} 问题反馈"
child_info = child_nodes.get(child_title, {})
child_nt = child_info.get("node_token", fsf.SUMMARY_PARENT_NODE)
child_url = f"https://makee-interactive.feishu.cn/wiki/{child_nt}"
fsf.dispatch_summary_to_chat(
f"微信-{date_str}", summary_md,
p0_only=False, doc_url=child_url
)
print(f" ✅ 已分发")
print(f"\n🎉 微信同步完成,总计处理 {total}")
if __name__ == "__main__":
main()