ai_member_xiaokui/scripts/backfill_ai_descriptions.py
2026-05-22 08:10:01 +08:00

450 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
批量回写 2026-05-11 ~ 2026-05-19 的问题描述AI 归纳版)
对每个日期:
1. 从 MySQL 读取消息 → 聚类 → 生成问题簇
2. 调用 DeepSeek API 为每个簇生成精炼问题描述
3. 用 AI 描述重新生成完整归纳内容(替代脚本默认的 generate_problem_description
4. 覆盖写入飞书知识库对应日期的子文档
"""
import sys, os, json, urllib.request, subprocess, time, re
from datetime import datetime, date, timedelta
from collections import defaultdict
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "skills", "feishu-feedback-sync", "scripts"))
from sync_feishu_feedback import (
get_db_connection, sort_threads, sort_cluster_msgs,
extract_location_elements, extract_conclusion, classify_problem,
get_tenant_token, list_child_nodes, create_child_doc,
SUMMARY_PARENT_NODE, SUMMARY_SPACE_ID, DISPATCH_CRED_DIR, XIAOKUI_BOT_OPEN_ID,
CLI, get_env,
)
from priority_classifier import compute_final_priority, sort_by_priority
# === 配置 ===
DEEPSEEK_API_KEY = "sk-7cf94305fb12473b956fd2ed2a6db05b"
DEEPSEEK_BASE_URL = "https://api.deepseek.com/v1"
DEEPSEEK_MODEL = "deepseek-v4-pro"
START_DATE = "2026-05-11"
END_DATE = "2026-05-19"
SYSTEM_PROMPT = """你是一个游戏产品的问题归纳助手。阅读来自测试群的多人对话,用一句中文描述他们讨论的具体问题。
要求:
1. 只描述问题本身,不评价、不建议
2. 包含关键要素:在哪个端/环节、什么表现
3. 有频率信息(偶现/频繁/必现)要体现
4. 仅输出一句中文,不加任何前缀、编号、引号或换行
5. 如果对话全是无实质内容的闲聊,输出"无明确问题"
6. 如果是打包/热更类问题,说清楚是哪个版本/分支的包
7. 如果是语音识别问题,说清楚识别什么内容、识别成了什么"""
def query_date_messages(date_str):
"""读取指定日期消息"""
conn = get_db_connection()
cursor = conn.cursor()
next_date = (datetime.strptime(date_str, "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d")
cursor.execute(
"""SELECT message_id, sender_name, msg_type, content, media_url, quote_message_id,
msg_time, msg_timestamp
FROM lark_group_message
WHERE msg_time >= %s AND msg_time < %s
ORDER BY msg_time ASC""",
(f"{date_str} 00:00:00", f"{next_date} 00:00:00"),
)
rows = cursor.fetchall()
conn.close()
# 格式化时间为字符串
formatted_rows = []
for r in rows:
r_list = list(r)
if r_list[6] and hasattr(r_list[6], 'strftime'):
r_list[6] = r_list[6].strftime('%Y-%m-%d %H:%M:%S')
formatted_rows.append(tuple(r_list))
return formatted_rows
def build_ai_prompt(cluster):
"""为单个问题簇构建 LLM prompt"""
lines = []
lines.append(f"优先级: {cluster.get('priority', '?')}")
lines.append(f"分类: {cluster.get('category', '?')}")
lines.append(f"排查结论: {cluster.get('conclusion', '')}")
lines.append("--- 对话 ---")
for msg in cluster.get("messages", []):
sender = msg.get("sender", "?")
content = msg.get("content", "").strip()
mtype = msg.get("msg_type", "text")
t = msg.get("time", "")
if mtype in ("image", "post_image", "media", "file") and not content:
continue
if not content:
continue
if len(content) > 200:
content = content[:197] + "..."
lines.append(f"[{t}] {sender}: {content}")
return "\n".join(lines)
def call_deepseek(user_prompt, max_retries=2):
"""调用 DeepSeek 生成问题描述"""
body = json.dumps(
{
"model": DEEPSEEK_MODEL,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
"temperature": 0.3,
"max_tokens": 256,
}
).encode()
for attempt in range(max_retries + 1):
try:
req = urllib.request.Request(
f"{DEEPSEEK_BASE_URL}/chat/completions",
data=body,
headers={"Authorization": f"Bearer {DEEPSEEK_API_KEY}", "Content-Type": "application/json"},
method="POST",
)
resp = urllib.request.urlopen(req, timeout=60)
data = json.loads(resp.read())
content = data["choices"][0]["message"]["content"].strip()
content = content.strip('"\'""'' \n')
return content
except Exception as e:
if attempt < max_retries:
print(f" ⚠️ 重试 {attempt + 1}: {e}")
time.sleep(3)
else:
raise
def is_valid_description(desc):
"""验证 AI 生成的描述是否有效"""
if not desc or not desc.strip():
return False
# 过滤明显的垃圾输出
garbage_patterns = [
r'^(没有回答|没有回复|不知道|不确定|无法判断|不太清楚)[。!]?$',
r'^(没有回答。|没有回复。){3,}', # 重复"没有回答"
r'^[。!,\s]+$', # 纯标点
r'^[?]+$', # 纯问号
]
for pat in garbage_patterns:
if re.search(pat, desc.strip()):
return False
# 太短(<3个中文字符
chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', desc))
if chinese_chars < 3:
return False
return True
def generate_ai_description(cluster_data, cluster_msgs):
"""为单个簇调用 AI 生成描述,失败时回退到关键词生成"""
prompt = build_ai_prompt(cluster_data)
try:
desc = call_deepseek(prompt)
if is_valid_description(desc):
return desc
else:
print(f" ⚠️ AI 输出无效,使用关键词 fallback")
# 回退到关键词生成
from sync_feishu_feedback import generate_problem_description, extract_location_elements
loc = extract_location_elements(cluster_msgs)
return generate_problem_description(cluster_msgs, loc, "")
except Exception as e:
print(f" ❌ AI 调用失败: {e},使用关键词 fallback")
from sync_feishu_feedback import generate_problem_description, extract_location_elements
loc = extract_location_elements(cluster_msgs)
return generate_problem_description(cluster_msgs, loc, "")
def build_summary_markdown(valid_clusters):
"""
用 AI 描述重新生成归纳内容(替代 generate_summary
valid_clusters: list of {cluster_id, msgs, earliest_time, priority_info, ai_description, category}
"""
lines = ["## 今日问题归纳\n"]
# 按优先级+分类分组
grouped = defaultdict(list)
for vc in valid_clusters:
p = vc.get("priority_info", {}).get("priority", "P2")
grouped[p].append(vc)
priority_headers = {
"P0": "⚠️ P0级核心问题需优先处理",
"P1": "⚡ P1级重要问题",
"P2": "📌 P2级一般问题",
"P3": "📝 P3级低优先级",
}
for p_level in ["P0", "P1", "P2", "P3"]:
items = grouped.get(p_level, [])
if not items:
continue
lines.append(f"**{priority_headers[p_level]}**")
by_category = defaultdict(list)
for vc in items:
by_category[vc["category"]].append(vc)
cat_idx = 0
for cat_name, cat_items in by_category.items():
cat_idx += 1
lines.append(f"{cat_idx}. **{cat_name}**")
for vc in cat_items:
desc = vc.get("ai_description", "") or vc.get("fallback_description", "未知问题")
lines.append(f" - {desc}")
lines.append("")
# 问题拆解
lines.append("## 今日问题拆解\n")
idx = 0
for vc in valid_clusters:
idx += 1
pi = vc.get("priority_info", {})
priority_label = pi.get("priority", "P2")
emoji = pi.get("emoji", "📌")
desc = vc.get("ai_description", "") or "未知问题"
sorted_msgs = sort_cluster_msgs(vc["msgs"])
lines.append(f"### {emoji} {priority_label}")
lines.append("")
lines.append(f"**{idx},问题描述:** {desc}")
lines.append("")
conclusion = extract_conclusion(sorted_msgs)
lines.append(conclusion)
lines.append("")
lines.append("| 发言人 | 对话信息 |")
lines.append("|--------|---------|")
first_speaker = sorted_msgs[0][1]
last_speaker = sorted_msgs[-1][1]
seen_speakers = set()
for i, m in enumerate(sorted_msgs):
name = m[1]
text = str(m[3]).replace("\n", " ").replace("\r", " ").strip() if m[3] else ""
text = re.sub(r"\[Image:[^\]]+\]", "", text)
text = re.sub(r"https?://\S+", "", text)
text = re.sub(r"\s+", " ", text)
media_url = str(m[4]) if m[4] else ""
info_parts = []
if text:
if len(text) > 80:
text = text[:77] + "..."
info_parts.append(text)
if media_url:
label = "图片" if media_url.lower().endswith((".png", ".jpg", ".jpeg", ".gif", ".webp")) else "文件"
info_parts.append(f"📎 [{label}]({media_url})")
if not info_parts:
info_parts.append("[图片]")
dialogue_info = "<br>".join(info_parts) if len(info_parts) > 1 else info_parts[0]
role_tag = ""
if name == first_speaker and name not in seen_speakers:
role_tag = "🚩 报告:"
elif name == last_speaker and i == len(sorted_msgs) - 1:
role_tag = ""
seen_speakers.add(name)
lines.append(f"| {name} | {role_tag}{dialogue_info} |")
lines.append("")
lines.append("---")
lines.append("")
return "\n".join(lines)
def write_to_kb(date_str, markdown_content):
"""覆盖写入知识库子文档"""
title = f"{date_str} 问题反馈"
children = list_child_nodes()
if title in children:
obj_token = children[title]["obj_token"]
print(f" 📝 更新: {title}")
else:
obj_token = create_child_doc(title)
if not obj_token:
children = list_child_nodes()
if title in children:
obj_token = children[title]["obj_token"]
else:
print(f" ❌ 无法创建/找到子文档: {title}")
return False
print(f" 新建: {title}")
# 写入lark-cli --markdown @file 要求相对路径)
os.makedirs("tmp", exist_ok=True)
tmp_md = f"tmp/_xiaokui_backfill_{date_str}.txt"
with open(tmp_md, "w", encoding="utf-8") as f:
f.write(markdown_content)
env = get_env()
result = subprocess.run(
[CLI, "docs", "+update", "--doc", obj_token, "--as", "bot", "--mode", "overwrite", "--markdown", f"@{tmp_md}"],
env=env,
capture_output=True,
text=True,
timeout=30,
)
os.unlink(tmp_md)
# lark-cli 输出可能在 stdout 或 stderr
output = (result.stdout + result.stderr).strip()
try:
d = json.loads(output)
if d.get("ok"):
print(f" ✅ 写入成功")
return True
else:
print(f" ❌ 写入失败: {d.get('error', {}).get('message', output)[:200]}")
return False
except json.JSONDecodeError:
print(f" ❌ 响应解析失败: {output[:200]}")
return False
def process_date(date_str, dry_run=False):
"""处理单个日期"""
print(f"\n{'=' * 60}")
print(f"📅 {date_str}")
print(f"{'=' * 60}")
# 1. 读取消息
rows = query_date_messages(date_str)
if not rows:
print(" ⚠️ 无消息")
return None
print(f" 📊 {len(rows)} 条消息")
# 2. 聚类
sorted_msgs, clusters, cluster_order = sort_threads(rows)
print(f" 🔗 {len(clusters)} 个簇")
# 3. 收集有效簇≥2条消息
valid_clusters = []
for cid in cluster_order:
cmsgs = clusters[cid]
if len(cmsgs) < 2:
continue
pi = compute_final_priority(cmsgs)
cat = classify_problem(cmsgs)
# 构建簇消息摘要(用于 AI prompt
cluster_data = {
"cluster_id": cid,
"msgs": cmsgs,
"earliest_time": min(m[6] for m in cmsgs),
"priority_info": pi,
"category": cat,
"conclusion": extract_conclusion(sort_cluster_msgs(cmsgs)),
"messages": [
{
"sender": m[1],
"content": str(m[3]) if m[3] else "",
"msg_type": str(m[2]),
"media_url": str(m[4]) if m[4] else "",
"time": str(m[6]),
}
for m in cmsgs
],
}
valid_clusters.append(cluster_data)
if not valid_clusters:
print(" ⚠️ 无有效问题簇需≥2条消息")
return None
# 按优先级排序
valid_clusters = sort_by_priority(valid_clusters)
# 4. AI 生成描述
for vc in valid_clusters:
idx = valid_clusters.index(vc) + 1
print(f" 🤖 簇 #{idx}/{len(valid_clusters)}...")
if not dry_run:
desc = generate_ai_description(vc, vc["msgs"])
if desc and desc != "(待归纳)":
vc["ai_description"] = desc
print(f"{desc[:80]}...")
else:
print(f" ⚠️ AI 失败,使用空描述")
vc["ai_description"] = "(待归纳)"
# API 限速保护
time.sleep(0.5)
else:
vc["ai_description"] = f"[DRY-RUN] 待AI归纳"
print(f" [DRY-RUN]")
# 5. 生成完整 markdown
markdown = build_summary_markdown(valid_clusters)
if dry_run:
print(f"\n --- 预览前 500 字符 ---")
print(markdown[:500])
return markdown
# 6. 写入知识库
write_to_kb(date_str, markdown)
return markdown
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--date", help="仅处理指定日期")
args = parser.parse_args()
if args.date:
dates = [args.date]
else:
start = datetime.strptime(START_DATE, "%Y-%m-%d")
end = datetime.strptime(END_DATE, "%Y-%m-%d")
dates = [(start + timedelta(days=i)).strftime("%Y-%m-%d") for i in range((end - start).days + 1)]
print(f"🚀 批量 AI 归纳回写 {'[DRY-RUN]' if args.dry_run else ''}")
print(f" 日期范围: {dates[0]} ~ {dates[-1]} ({len(dates)} 天)")
results = {}
for d in dates:
try:
r = process_date(d, dry_run=args.dry_run)
results[d] = "" if r else "⏭️"
except Exception as e:
print(f" ❌ 异常: {e}")
results[d] = f"{e}"
if not args.dry_run:
# 避免 API 频率限制
time.sleep(1)
print(f"\n{'=' * 60}")
print("📊 汇总:")
for d, status in results.items():
print(f" {d}: {status}")
if __name__ == "__main__":
main()