#!/usr/bin/env python3 """ AI 问题归纳脚本 读取 cluster_context_{date}.json,调用 LLM 为每个问题簇生成精炼的问题描述, 输出 ai_descriptions_{date}.json,然后回写到飞书知识库文档。 用法: python3 ai_summarize_feedback.py [--date YYYY-MM-DD] [--dry-run] crontab: 5 10 * * * python3 .../ai_summarize_feedback.py >> /var/log/xiaokui_ai_summarize.log 2>&1 """ import sys, os, json, argparse, re, subprocess, urllib.request from datetime import datetime, date, timedelta # === 配置 === DEEPSEEK_API_KEY = "sk-7cf94305fb12473b956fd2ed2a6db05b" DEEPSEEK_BASE_URL = "https://api.deepseek.com/v1" DEEPSEEK_MODEL = "deepseek-v4-pro" CONTEXT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "output", "daily_feedback") SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) SKILL_SCRIPT_DIR = os.path.join(SCRIPT_DIR, "..", "skills", "feishu-feedback-sync", "scripts") sys.path.insert(0, SKILL_SCRIPT_DIR) import sync_feishu_feedback # noqa: E402 — 用于 fallback 关键词规则 SYSTEM_PROMPT = """你是一个游戏产品的问题归纳助手。你的任务是: 阅读一段来自测试群的多人对话(可能包含多个发言人、多轮讨论), 从中提炼出他们正在讨论的「具体问题是什么」,用一句中文描述清楚。 要求: 1. 只描述问题本身,不要评价或建议 2. 包含关键要素:在哪个端、哪个环节、什么表现 3. 如果对话中有多种说法,优先采用最后确认的描述 4. 输出仅一句中文,不要加任何前缀、编号、引号或换行 5. 如果对话全是无实质内容的闲聊(如"好的""收到"),输出"无明确问题" 6. **严禁**在问题描述中出现任何员工姓名(如江涛、张骜等),人名用"相关人员"替代 输出格式(严格):直接输出问题描述,无任何额外文字。""" def load_context(date_str, channel="feishu"): """加载指定日期的 cluster_context JSON""" prefix = "wechat_cluster_context" if channel == "wechat" else "cluster_context" path = os.path.join(CONTEXT_DIR, f"{prefix}_{date_str}.json") if not os.path.exists(path): print(f" ⚠️ 无上下文文件: {path}") return None with open(path, "r", encoding="utf-8") as f: return json.load(f) def build_user_prompt(cluster): """为单个问题簇构建 LLM prompt""" lines = [] lines.append(f"优先级: {cluster.get('priority', '?')}") lines.append(f"分类: {cluster.get('category', '?')}") lines.append(f"当前排查结论: {cluster.get('conclusion', '无')}") lines.append("") lines.append("--- 对话记录 ---") for msg in cluster.get("messages", []): sender = msg.get("sender", "?") content = msg.get("content", "") mtype = msg.get("msg_type", "text") time = msg.get("time", "") # 跳过纯媒体消息(无有效文本) if mtype in ("image", "post_image", "media", "file", "sticker") and not content.strip(): continue if not content.strip(): continue # 截断过长内容 if len(content) > 200: content = content[:197] + "..." lines.append(f"[{time}] {sender}: {content}") return "\n".join(lines) def call_deepseek(system_prompt, user_prompt, max_retries=2): """调用 DeepSeek API 生成问题描述""" body = json.dumps({ "model": DEEPSEEK_MODEL, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], "temperature": 0.3, "max_tokens": 1024, }).encode() for attempt in range(max_retries + 1): try: req = urllib.request.Request( f"{DEEPSEEK_BASE_URL}/chat/completions", data=body, headers={ "Authorization": f"Bearer {DEEPSEEK_API_KEY}", "Content-Type": "application/json", }, method="POST", ) resp = urllib.request.urlopen(req, timeout=60) data = json.loads(resp.read()) content = data["choices"][0]["message"]["content"].strip() # 清理常见的引号/前缀 content = content.strip('"\'""'' \n') return content except Exception as e: if attempt < max_retries: print(f" ⚠️ API 调用重试 {attempt + 1}: {e}") import time time.sleep(2) else: raise def generate_fallback_description(cluster): """AI 返回空描述时的回退:调用 sync_feishu_feedback.py 的关键词规则生成""" # 将 context JSON 消息格式转换为 sync_feishu_feedback 期望的数据库行格式 # 数据库行: (msg_id, sender, msg_type, content, media_url, quote_id, time, timestamp) converted = [] for m in cluster.get("messages", []): converted.append(( m.get("message_id", ""), m.get("sender", ""), m.get("msg_type", "text"), m.get("content", ""), m.get("media_url", ""), m.get("quote_message_id", ""), m.get("time", ""), 0, )) idx = cluster.get("index", 0) location = sync_feishu_feedback.extract_location_elements(converted) root_text = converted[0][3] if converted else "" return sync_feishu_feedback.generate_problem_description(converted, location, root_text, ai_placeholder=False, placeholder_idx=idx) def strip_names(text, cluster=None): """移除问题描述中的员工姓名(后处理兜底)。 1. 优先使用簇中实际发送者姓名做精确替换 2. 然后对常见姓氏+1字做保守匹配(排除已知内容词) """ import re if not text: return text # 1. 精确替换:簇中出现的发送者姓名 if cluster: sender_names = set() for m in cluster.get("messages", []): name = m.get("sender", "").strip() if name and len(name) >= 2: sender_names.add(name) for name in sorted(sender_names, key=len, reverse=True): text = text.replace(name, '相关人员') # 2. 保守模式:姓氏 + 1个中文字符(两字名),排除已知内容词 surnames = '李王张刘陈杨赵黄周吴徐孙胡朱高林何郭马罗梁宋郑谢韩唐冯于董萧程曹袁邓许傅沈曾彭吕苏卢蒋蔡贾丁魏薛叶阎余潘杜戴夏钟汪田任姜范方石姚谭廖邹熊金陆郝孔白崔康毛邱秦江史顾侯邵孟龙万段雷钱汤尹黎易常武乔贺赖龚文' pattern = '[' + surnames + '][一-鿿]' # 需要排除的已知内容词 content_words = { '文件','资源','存在','动画','角色','设计','问题','音频','显示', '界面','关卡','课程','内容','配置','重点','引导','模型', '测试','环境','部署','灰度','版本','组件','数据','命名', '图片','视频','格式','选项','处理','结果','玩家','游戏', '开发','项目','报告','任务','状态','进度','确认','反馈', '功能','系统','后台','前端','服务','需要','可能','正常', '异常','错误','修复','解决','检查','查看','说明','登录', '注册','打开','关闭','更新','调试','运行','启动','停止', '通过','失败','成功','完成','开始','结束','使用','操作', '调整','优化','修改','增加','删除','添加','移除','切换', '程序','方式','相关','进入','平板','第四','单元','原生', '声音','断断','续续','后台','托管','无法','熏听','加载', '消耗','容器','时候','较多','知识','巩固','环节','第一', '播放','警报','正确','系统','操作','权限','人员','内核', } def _replace(m): name = m.group(0) return '相关人员' if name not in content_words else name text = re.sub(pattern, _replace, text) return text def generate_descriptions(context_data, dry_run=False): """为所有问题簇生成 AI 描述""" clusters = context_data.get("clusters", []) if not clusters: print(" ⚠️ 无问题簇数据") return None descriptions = [] for cluster in clusters: idx = cluster.get("index", 0) print(f" 🤖 处理簇 #{idx}...") user_prompt = build_user_prompt(cluster) if dry_run: print(f" [DRY-RUN] Prompt 长度: {len(user_prompt)} chars") # 输出前 200 字符预览 print(f" [DRY-RUN] 对话预览: {user_prompt[:200]}...") description = f"[DRY-RUN] 问题{idx}" else: try: description = call_deepseek(SYSTEM_PROMPT, user_prompt) except Exception as e: print(f" ❌ 簇 #{idx} API 调用失败: {e}") description = f"[API调用失败: {str(e)[:50]}]" # AI 返回空描述时回退 if not description or not description.strip(): description = generate_fallback_description(cluster) print(f" ⚠️ AI 返回空,回退: {description}") else: print(f" 📝 描述: {description}") # 脱敏:移除员工姓名 description = strip_names(description, cluster=cluster) descriptions.append({"index": idx, "description": description}) return descriptions def apply_descriptions(date_str, descriptions, channel="feishu"): """调用 sync_*_feedback.py --apply-ai 回写文档 channel: "feishu" 或 "wechat" """ sys.path.insert(0, SKILL_SCRIPT_DIR) # 渠道前缀 prefix = "wechat_" if channel == "wechat" else "" # 先保存描述 JSON desc_path = os.path.join(CONTEXT_DIR, f"ai_descriptions_{channel}_{date_str}.json") payload = {"date": date_str, "descriptions": descriptions} with open(desc_path, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) print(f" 💾 描述已保存: {desc_path}") # 调用 --apply-ai if channel == "wechat": sync_script = os.path.join(SCRIPT_DIR, "sync_wechat_feedback.py") else: sync_script = os.path.join(SKILL_SCRIPT_DIR, "sync_feishu_feedback.py") env = os.environ.copy() env["LARKSUITE_CLI_CONFIG_DIR"] = "/root/.openclaw/credentials/xiaokui" env["HOME"] = "/root" env["PATH"] = "/root/.nvm/versions/node/v24.14.0/bin:" + env.get("PATH", "") cmd = ["python3", sync_script, "--date", date_str, "--apply-ai", desc_path] result = subprocess.run( cmd, capture_output=True, text=True, timeout=60, env=env ) if "AI 描述已应用" in result.stdout or "✅" in result.stdout: print(f" ✅ AI 描述已回写到知识库文档") # 回写成功后清理上下文文件,避免心跳重复处理 ctx_prefix = "wechat_cluster_context" if channel == "wechat" else "cluster_context" context_path = os.path.join(CONTEXT_DIR, f"{ctx_prefix}_{date_str}.json") if os.path.exists(context_path): os.remove(context_path) print(f" 🗑️ 已清理上下文文件: {context_path}") return True else: print(f" ❌ 回写失败: {result.stdout[:300]}") if result.stderr: print(f" stderr: {result.stderr[:300]}") return False def main(): parser = argparse.ArgumentParser(description="AI 问题归纳") parser.add_argument("--date", help="日期 YYYY-MM-DD,默认昨天") parser.add_argument("--dry-run", action="store_true", help="仅预览不实际调用 API") parser.add_argument("--channel", default="feishu", choices=["feishu", "wechat"], help="数据渠道(默认 feishu)") args = parser.parse_args() if args.date: date_str = args.date else: date_str = (date.today() - timedelta(days=1)).strftime("%Y-%m-%d") channel = args.channel label = "微信" if channel == "wechat" else "飞书" print(f"📋 AI 问题归纳 - {date_str} [{label}]") os.makedirs(CONTEXT_DIR, exist_ok=True) context = load_context(date_str, channel=channel) if not context: print(" ℹ️ 无待处理数据,退出") return descriptions = generate_descriptions(context, dry_run=args.dry_run) if not descriptions: return if args.dry_run: desc_path = os.path.join(CONTEXT_DIR, f"ai_descriptions_{channel}_{date_str}.json") payload = {"date": date_str, "descriptions": descriptions} with open(desc_path, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) print(f"[DRY-RUN] 描述已保存到 {desc_path},未回写文档") return apply_descriptions(date_str, descriptions, channel=channel) if __name__ == "__main__": main()