#!/usr/bin/env python3 """ 批量回写 2026-05-11 ~ 2026-05-19 的问题描述(AI 归纳版) 对每个日期: 1. 从 MySQL 读取消息 → 聚类 → 生成问题簇 2. 调用 DeepSeek API 为每个簇生成精炼问题描述 3. 用 AI 描述重新生成完整归纳内容(替代脚本默认的 generate_problem_description) 4. 覆盖写入飞书知识库对应日期的子文档 """ import sys, os, json, urllib.request, subprocess, time, re from datetime import datetime, date, timedelta from collections import defaultdict sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "skills", "feishu-feedback-sync", "scripts")) from sync_feishu_feedback import ( get_db_connection, sort_threads, sort_cluster_msgs, extract_location_elements, extract_conclusion, classify_problem, get_tenant_token, list_child_nodes, create_child_doc, SUMMARY_PARENT_NODE, SUMMARY_SPACE_ID, DISPATCH_CRED_DIR, XIAOKUI_BOT_OPEN_ID, CLI, get_env, ) from priority_classifier import compute_final_priority, sort_by_priority # === 配置 === DEEPSEEK_API_KEY = "sk-7cf94305fb12473b956fd2ed2a6db05b" DEEPSEEK_BASE_URL = "https://api.deepseek.com/v1" DEEPSEEK_MODEL = "deepseek-v4-pro" START_DATE = "2026-05-11" END_DATE = "2026-05-19" SYSTEM_PROMPT = """你是一个游戏产品的问题归纳助手。阅读来自测试群的多人对话,用一句中文描述他们讨论的具体问题。 要求: 1. 只描述问题本身,不评价、不建议 2. 包含关键要素:在哪个端/环节、什么表现 3. 有频率信息(偶现/频繁/必现)要体现 4. 仅输出一句中文,不加任何前缀、编号、引号或换行 5. 如果对话全是无实质内容的闲聊,输出"无明确问题" 6. 如果是打包/热更类问题,说清楚是哪个版本/分支的包 7. 如果是语音识别问题,说清楚识别什么内容、识别成了什么""" def query_date_messages(date_str): """读取指定日期消息""" conn = get_db_connection() cursor = conn.cursor() next_date = (datetime.strptime(date_str, "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d") cursor.execute( """SELECT message_id, sender_name, msg_type, content, media_url, quote_message_id, msg_time, msg_timestamp FROM lark_group_message WHERE msg_time >= %s AND msg_time < %s ORDER BY msg_time ASC""", (f"{date_str} 00:00:00", f"{next_date} 00:00:00"), ) rows = cursor.fetchall() conn.close() # 格式化时间为字符串 formatted_rows = [] for r in rows: r_list = list(r) if r_list[6] and hasattr(r_list[6], 'strftime'): r_list[6] = r_list[6].strftime('%Y-%m-%d %H:%M:%S') formatted_rows.append(tuple(r_list)) return formatted_rows def build_ai_prompt(cluster): """为单个问题簇构建 LLM prompt""" lines = [] lines.append(f"优先级: {cluster.get('priority', '?')}") lines.append(f"分类: {cluster.get('category', '?')}") lines.append(f"排查结论: {cluster.get('conclusion', '无')}") lines.append("--- 对话 ---") for msg in cluster.get("messages", []): sender = msg.get("sender", "?") content = msg.get("content", "").strip() mtype = msg.get("msg_type", "text") t = msg.get("time", "") if mtype in ("image", "post_image", "media", "file") and not content: continue if not content: continue if len(content) > 200: content = content[:197] + "..." lines.append(f"[{t}] {sender}: {content}") return "\n".join(lines) def call_deepseek(user_prompt, max_retries=2): """调用 DeepSeek 生成问题描述""" body = json.dumps( { "model": DEEPSEEK_MODEL, "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], "temperature": 0.3, "max_tokens": 256, } ).encode() for attempt in range(max_retries + 1): try: req = urllib.request.Request( f"{DEEPSEEK_BASE_URL}/chat/completions", data=body, headers={"Authorization": f"Bearer {DEEPSEEK_API_KEY}", "Content-Type": "application/json"}, method="POST", ) resp = urllib.request.urlopen(req, timeout=60) data = json.loads(resp.read()) content = data["choices"][0]["message"]["content"].strip() content = content.strip('"\'""'' \n') return content except Exception as e: if attempt < max_retries: print(f" ⚠️ 重试 {attempt + 1}: {e}") time.sleep(3) else: raise def is_valid_description(desc): """验证 AI 生成的描述是否有效""" if not desc or not desc.strip(): return False # 过滤明显的垃圾输出 garbage_patterns = [ r'^(没有回答|没有回复|不知道|不确定|无法判断|不太清楚)[。!]?$', r'^(没有回答。|没有回复。){3,}', # 重复"没有回答" r'^[。!,\s]+$', # 纯标点 r'^[??]+$', # 纯问号 ] for pat in garbage_patterns: if re.search(pat, desc.strip()): return False # 太短(<3个中文字符) chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', desc)) if chinese_chars < 3: return False return True def generate_ai_description(cluster_data, cluster_msgs): """为单个簇调用 AI 生成描述,失败时回退到关键词生成""" prompt = build_ai_prompt(cluster_data) try: desc = call_deepseek(prompt) if is_valid_description(desc): return desc else: print(f" ⚠️ AI 输出无效,使用关键词 fallback") # 回退到关键词生成 from sync_feishu_feedback import generate_problem_description, extract_location_elements loc = extract_location_elements(cluster_msgs) return generate_problem_description(cluster_msgs, loc, "") except Exception as e: print(f" ❌ AI 调用失败: {e},使用关键词 fallback") from sync_feishu_feedback import generate_problem_description, extract_location_elements loc = extract_location_elements(cluster_msgs) return generate_problem_description(cluster_msgs, loc, "") def build_summary_markdown(valid_clusters): """ 用 AI 描述重新生成归纳内容(替代 generate_summary) valid_clusters: list of {cluster_id, msgs, earliest_time, priority_info, ai_description, category} """ lines = ["## 今日问题归纳\n"] # 按优先级+分类分组 grouped = defaultdict(list) for vc in valid_clusters: p = vc.get("priority_info", {}).get("priority", "P2") grouped[p].append(vc) priority_headers = { "P0": "⚠️ P0级核心问题(需优先处理)", "P1": "⚡ P1级重要问题", "P2": "📌 P2级一般问题", "P3": "📝 P3级低优先级", } for p_level in ["P0", "P1", "P2", "P3"]: items = grouped.get(p_level, []) if not items: continue lines.append(f"**{priority_headers[p_level]}**") by_category = defaultdict(list) for vc in items: by_category[vc["category"]].append(vc) cat_idx = 0 for cat_name, cat_items in by_category.items(): cat_idx += 1 lines.append(f"{cat_idx}. **{cat_name}**") for vc in cat_items: desc = vc.get("ai_description", "") or vc.get("fallback_description", "未知问题") lines.append(f" - {desc}") lines.append("") # 问题拆解 lines.append("## 今日问题拆解\n") idx = 0 for vc in valid_clusters: idx += 1 pi = vc.get("priority_info", {}) priority_label = pi.get("priority", "P2") emoji = pi.get("emoji", "📌") desc = vc.get("ai_description", "") or "未知问题" sorted_msgs = sort_cluster_msgs(vc["msgs"]) lines.append(f"### {emoji} {priority_label}") lines.append("") lines.append(f"**{idx},问题描述:** {desc}") lines.append("") conclusion = extract_conclusion(sorted_msgs) lines.append(conclusion) lines.append("") lines.append("| 发言人 | 对话信息 |") lines.append("|--------|---------|") first_speaker = sorted_msgs[0][1] last_speaker = sorted_msgs[-1][1] seen_speakers = set() for i, m in enumerate(sorted_msgs): name = m[1] text = str(m[3]).replace("\n", " ").replace("\r", " ").strip() if m[3] else "" text = re.sub(r"\[Image:[^\]]+\]", "", text) text = re.sub(r"https?://\S+", "", text) text = re.sub(r"\s+", " ", text) media_url = str(m[4]) if m[4] else "" info_parts = [] if text: if len(text) > 80: text = text[:77] + "..." info_parts.append(text) if media_url: label = "图片" if media_url.lower().endswith((".png", ".jpg", ".jpeg", ".gif", ".webp")) else "文件" info_parts.append(f"📎 [{label}]({media_url})") if not info_parts: info_parts.append("[图片]") dialogue_info = "
".join(info_parts) if len(info_parts) > 1 else info_parts[0] role_tag = "" if name == first_speaker and name not in seen_speakers: role_tag = "🚩 报告:" elif name == last_speaker and i == len(sorted_msgs) - 1: role_tag = "✅ " seen_speakers.add(name) lines.append(f"| {name} | {role_tag}{dialogue_info} |") lines.append("") lines.append("---") lines.append("") return "\n".join(lines) def write_to_kb(date_str, markdown_content): """覆盖写入知识库子文档""" title = f"{date_str} 问题反馈" children = list_child_nodes() if title in children: obj_token = children[title]["obj_token"] print(f" 📝 更新: {title}") else: obj_token = create_child_doc(title) if not obj_token: children = list_child_nodes() if title in children: obj_token = children[title]["obj_token"] else: print(f" ❌ 无法创建/找到子文档: {title}") return False print(f" ➕ 新建: {title}") # 写入(lark-cli --markdown @file 要求相对路径) os.makedirs("tmp", exist_ok=True) tmp_md = f"tmp/_xiaokui_backfill_{date_str}.txt" with open(tmp_md, "w", encoding="utf-8") as f: f.write(markdown_content) env = get_env() result = subprocess.run( [CLI, "docs", "+update", "--doc", obj_token, "--as", "bot", "--mode", "overwrite", "--markdown", f"@{tmp_md}"], env=env, capture_output=True, text=True, timeout=30, ) os.unlink(tmp_md) # lark-cli 输出可能在 stdout 或 stderr output = (result.stdout + result.stderr).strip() try: d = json.loads(output) if d.get("ok"): print(f" ✅ 写入成功") return True else: print(f" ❌ 写入失败: {d.get('error', {}).get('message', output)[:200]}") return False except json.JSONDecodeError: print(f" ❌ 响应解析失败: {output[:200]}") return False def process_date(date_str, dry_run=False): """处理单个日期""" print(f"\n{'=' * 60}") print(f"📅 {date_str}") print(f"{'=' * 60}") # 1. 读取消息 rows = query_date_messages(date_str) if not rows: print(" ⚠️ 无消息") return None print(f" 📊 {len(rows)} 条消息") # 2. 聚类 sorted_msgs, clusters, cluster_order = sort_threads(rows) print(f" 🔗 {len(clusters)} 个簇") # 3. 收集有效簇(≥2条消息) valid_clusters = [] for cid in cluster_order: cmsgs = clusters[cid] if len(cmsgs) < 2: continue pi = compute_final_priority(cmsgs) cat = classify_problem(cmsgs) # 构建簇消息摘要(用于 AI prompt) cluster_data = { "cluster_id": cid, "msgs": cmsgs, "earliest_time": min(m[6] for m in cmsgs), "priority_info": pi, "category": cat, "conclusion": extract_conclusion(sort_cluster_msgs(cmsgs)), "messages": [ { "sender": m[1], "content": str(m[3]) if m[3] else "", "msg_type": str(m[2]), "media_url": str(m[4]) if m[4] else "", "time": str(m[6]), } for m in cmsgs ], } valid_clusters.append(cluster_data) if not valid_clusters: print(" ⚠️ 无有效问题簇(需≥2条消息)") return None # 按优先级排序 valid_clusters = sort_by_priority(valid_clusters) # 4. AI 生成描述 for vc in valid_clusters: idx = valid_clusters.index(vc) + 1 print(f" 🤖 簇 #{idx}/{len(valid_clusters)}...") if not dry_run: desc = generate_ai_description(vc, vc["msgs"]) if desc and desc != "(待归纳)": vc["ai_description"] = desc print(f" ✅ {desc[:80]}...") else: print(f" ⚠️ AI 失败,使用空描述") vc["ai_description"] = "(待归纳)" # API 限速保护 time.sleep(0.5) else: vc["ai_description"] = f"[DRY-RUN] 待AI归纳" print(f" [DRY-RUN]") # 5. 生成完整 markdown markdown = build_summary_markdown(valid_clusters) if dry_run: print(f"\n --- 预览前 500 字符 ---") print(markdown[:500]) return markdown # 6. 写入知识库 write_to_kb(date_str, markdown) return markdown def main(): import argparse parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true") parser.add_argument("--date", help="仅处理指定日期") args = parser.parse_args() if args.date: dates = [args.date] else: start = datetime.strptime(START_DATE, "%Y-%m-%d") end = datetime.strptime(END_DATE, "%Y-%m-%d") dates = [(start + timedelta(days=i)).strftime("%Y-%m-%d") for i in range((end - start).days + 1)] print(f"🚀 批量 AI 归纳回写 {'[DRY-RUN]' if args.dry_run else ''}") print(f" 日期范围: {dates[0]} ~ {dates[-1]} ({len(dates)} 天)") results = {} for d in dates: try: r = process_date(d, dry_run=args.dry_run) results[d] = "✅" if r else "⏭️" except Exception as e: print(f" ❌ 异常: {e}") results[d] = f"❌ {e}" if not args.dry_run: # 避免 API 频率限制 time.sleep(1) print(f"\n{'=' * 60}") print("📊 汇总:") for d, status in results.items(): print(f" {d}: {status}") if __name__ == "__main__": main()