#!/usr/bin/env python3 """ P0 问题实时检测与分发 功能: 1. 从 MySQL 读取最近一段时间的飞书群消息 2. 复用 sync_feishu_feedback.py 的聚类 + 优先级判定逻辑 3. 过滤已推送过的 P0 簇(去重) 4. 仅推送新增 P0 到「小葵小葵」群 设计: - 每分钟由 crontab 调用一次 - 查询最近 2 小时的消息,确保聚类质量 - 用「簇签名」(sorted message_ids)做去重 - 每天 10:00 清空去重状态(与全量分发错开) 用法: python3 detect_p0_realtime.py [--dry-run] [--lookback-minutes 120] """ import sys, os, re, json, urllib.request, argparse, hashlib from datetime import datetime, timedelta from pathlib import Path # 将 sync_feishu_feedback.py 所在目录加入 sys.path,以便 import SKILL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "skills", "feishu-feedback-sync", "scripts") sys.path.insert(0, SKILL_DIR) from sync_feishu_feedback import ( get_db_connection, query_messages, sort_threads, get_tenant_token, content_similarity, DISPATCH_CHAT_ID, DISPATCH_CRED_DIR, P0_NOTIFY_USERS, MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASS, MYSQL_DB, ) from priority_classifier import compute_final_priority # === 配置 === STATE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "tmp", "p0_dispatched_state.json") LOOKBACK_MINUTES = 120 # 默认回顾 2 小时的消息 CLUSTER_MIN_SIZE = 2 # 至少 2 条消息才算有效簇 def load_dispatched_state(): """加载已推送的 P0 簇状态。兼容新旧格式。""" try: with open(STATE_FILE, "r") as f: state = json.load(f) except (FileNotFoundError, json.JSONDecodeError): state = {} cutoff = (datetime.now() - timedelta(hours=24)).isoformat() cleaned = {} for k, v in state.items(): ts = v if isinstance(v, str) else v.get("time", "") if ts > cutoff: cleaned[k] = v return cleaned def save_dispatched_state(state): """保存已推送状态""" os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) tmp = STATE_FILE + ".tmp" with open(tmp, "w") as f: json.dump(state, f, ensure_ascii=False, indent=2) os.rename(tmp, STATE_FILE) def cluster_signature(cluster_msgs): """ 生成簇的唯一签名。 用簇内所有 message_id 排序后拼接的 MD5,对簇成员变化敏感但适度容忍轻微变化。 """ ids = sorted(str(m[0]) for m in cluster_msgs) joined = ",".join(ids) return hashlib.md5(joined.encode()).hexdigest() def cluster_content_fingerprint(cluster_msgs): """生成基于内容语义的簇指纹,用于跨扫描去重(不依赖消息ID集合)""" all_contents = [] for m in cluster_msgs: c = str(m[3]).strip() if m[3] else "" if c and len(c) > 8: all_contents.append(c[:300]) aggregated = " | ".join(all_contents[:5]) senders = sorted(set(m[1] for m in cluster_msgs if m[1])) times = [m[6] for m in cluster_msgs if m[6]] hour = times[0][:13] if times else "unknown" return { "content": aggregated, "senders": senders, "hour": hour, "msg_count": len(cluster_msgs), } def is_duplicate_p0(new_fp, dispatched_entries): """ 基于内容语义判断新 P0 是否与已推送 P0 重复。 dispatched_entries: {sig: {"time": str, "fp": dict}} """ for entry in dispatched_entries.values(): old_fp = entry.get("fp") if not old_fp: continue same_hour = new_fp["hour"] == old_fp["hour"] sender_overlap = len(set(new_fp["senders"]) & set(old_fp["senders"])) if same_hour and sender_overlap >= 1: sim = content_similarity(new_fp["content"], old_fp["content"]) if sim > 0.20: return True if sender_overlap >= 2: sim = content_similarity(new_fp["content"], old_fp["content"]) if sim > 0.35: return True return False def is_probably_p0(cluster_msgs): """ 快速判断一个簇是否是 P0 级别问题。 返回 (is_p0: bool, priority_info: dict) """ if len(cluster_msgs) < CLUSTER_MIN_SIZE: return False, None info = compute_final_priority(cluster_msgs) return info["priority"] == "P0", info def _clean_summary(text): """清洗摘要文本,去掉用户ID、话术后缀等冗余信息。""" # 去掉手机号/用户ID(11位数字,可能紧邻中文) text = re.sub(r'(?:^|(?<=[^\d]))1[3-9]\d{9}(?=[^\d]|$)', '', text) # 去掉话术后缀 text = re.sub(r'[,,]?\s*(老师|辛苦|麻烦|帮忙)\s*(看下|看一下|看看|看)[。!!]*$', '', text) text = re.sub(r'[,,]?\s*@\S+\s*', '', text) # 清理多余空格和标点 text = re.sub(r'\s+', ' ', text).strip() text = re.sub(r'^[,,\s]+|[,,\s]+$', '', text) return text def _pick_best_summary(cluster_msgs): """从簇中选出最能代表 P0 问题的摘要消息。 优先选择匹配 P0 关键词的消息,其次选第一条有意义的文本。""" from priority_classifier import P0_KEYWORDS # 收集所有 P0 关键词正则 p0_patterns = [] for cat_pats in P0_KEYWORDS.values(): p0_patterns.append(cat_pats) combined_p0 = re.compile('|'.join(p0_patterns), re.IGNORECASE) best = None for m in cluster_msgs: t = str(m[3]).strip() if m[3] else "" if not t or len(t) <= 3: continue if best is None: best = t # 兜底:第一条有意义的文本 if combined_p0.search(t): # 命中 P0 关键词,优先使用 return _clean_summary(t)[:150] return _clean_summary(best or "")[:150] def generate_p0_alert_text(cluster_msgs, priority_info): """ 生成 P0 问题的简短告警文本(精简版,不含完整文档链接)。 """ root_sender = cluster_msgs[0][1] root_time = cluster_msgs[0][6] root_text = _pick_best_summary(cluster_msgs) lines = [ f"🚨 P0 实时告警", f"", f"问题描述: {root_text}", f"报告人: {root_sender}", f"报告时间: {root_time}", f"判定依据: {priority_info.get('reasoning', 'P0')}", f"修复时限: {priority_info.get('deadline', '2小时内')}", ] return "\n".join(lines) def dispatch_p0_alert(alert_text): """ 将 P0 告警发送到「小葵小葵」群,@ 指定人员。 使用飞书 post 富文本格式。 """ import urllib.request token = get_tenant_token(cred_dir=DISPATCH_CRED_DIR) # 构建富文本内容 content_parts = [] for line in alert_text.split("\n"): if line.strip(): content_parts.append([{"tag": "text", "text": line + "\n"}]) # 追加 @ 通知 if P0_NOTIFY_USERS: at_line = [{"tag": "text", "text": "\n⚠️ 请关注: "}] for uid in P0_NOTIFY_USERS: at_line.append({"tag": "at", "user_id": uid}) at_line.append({"tag": "text", "text": " "}) content_parts.append(at_line) post_content = json.dumps({ "zh_cn": { "title": "🚨 P0 问题实时告警", "content": content_parts } }, ensure_ascii=False) body = json.dumps({ "receive_id": DISPATCH_CHAT_ID, "msg_type": "post", "content": post_content }, ensure_ascii=False).encode() req = urllib.request.Request( "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id", data=body, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, method="POST" ) resp = urllib.request.urlopen(req, timeout=10) d = json.loads(resp.read()) if d.get("code") == 0: return True else: print(f" ⚠️ 实时分发失败: {d.get('msg', '')[:100]}") return False def should_clear_state(): """检查是否到了每天清空状态的时间(10:00-10:01 之间清空,配合全量分发)""" now = datetime.now() return now.hour == 10 and now.minute <= 1 def main(): parser = argparse.ArgumentParser(description="P0 问题实时检测与分发") parser.add_argument("--dry-run", action="store_true", help="仅打印不推送") parser.add_argument("--lookback-minutes", type=int, default=LOOKBACK_MINUTES, help=f"查询最近 N 分钟的消息(默认 {LOOKBACK_MINUTES})") args = parser.parse_args() # 清空逻辑:每天 10:00-10:01 之间清空去重状态,让全量分发正常进行 if should_clear_state(): print("[P0-detect] 10:00 清空去重状态,配合全量分发") save_dispatched_state({}) # 10:00 的全量流程会处理,这里直接退出避免重复 print("[P0-detect] 全量分发时段,跳过实时检测") return print(f"[P0-detect] 扫描最近 {args.lookback_minutes} 分钟消息...") lookback_start = (datetime.now() - timedelta(minutes=args.lookback_minutes)).strftime("%Y-%m-%d %H:%M:%S") now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 查询最近消息(直接用 SQL,避免 query_messages 的按天查询限制) import pymysql conn = pymysql.connect( host=MYSQL_HOST, port=MYSQL_PORT, user=MYSQL_USER, password=MYSQL_PASS, database=MYSQL_DB, charset="utf8mb4" ) cursor = conn.cursor() cursor.execute(""" SELECT message_id, sender_name, msg_type, content, media_url, quote_message_id, DATE_FORMAT(msg_time, '%%Y-%%m-%%d %%H:%%i:%%s') as msg_time, msg_timestamp FROM lark_group_message WHERE msg_time >= %s AND msg_time <= %s ORDER BY msg_time ASC """, (lookback_start, now_str)) rows = cursor.fetchall() conn.close() print(f"[P0-detect] 查询到 {len(rows)} 条消息") if len(rows) < 2: print("[P0-detect] 消息不足,退出") return # 聚类 sorted_msgs, clusters, cluster_order = sort_threads(rows) print(f"[P0-detect] 聚类完成:{len(clusters)} 个簇") # 加载已推送状态 state = load_dispatched_state() print(f"[P0-detect] 已记录 {len(state)} 个已推送簇") # 遍历簇,找出 P0 且未推送的 new_p0_count = 0 for cid in cluster_order: cmsgs = clusters[cid] is_p0, info = is_probably_p0(cmsgs) if not is_p0: continue sig = cluster_signature(cmsgs) if sig in state: print(f"[P0-detect] 已推送过(精确匹配),跳过: sig={sig[:8]}...") continue # 内容语义去重 fp = cluster_content_fingerprint(cmsgs) if is_duplicate_p0(fp, state): print(f"[P0-detect] 已推送过(内容匹配),跳过: senders={fp['senders'][:2]}... hour={fp['hour']}") continue print(f"[P0-detect] 🚨 发现新 P0! sig={sig[:8]}... {len(cmsgs)}条消息") if args.dry_run: alert = generate_p0_alert_text(cmsgs, info) print(f"[DRY-RUN] 将发送:\n{alert}") state[sig] = {"time": datetime.now().isoformat(), "fp": fp} new_p0_count += 1 else: alert = generate_p0_alert_text(cmsgs, info) success = dispatch_p0_alert(alert) if success: print(f"[P0-detect] ✅ P0 已实时推送") state[sig] = {"time": datetime.now().isoformat(), "fp": fp} new_p0_count += 1 else: print(f"[P0-detect] ❌ 推送失败") if new_p0_count > 0: save_dispatched_state(state) print(f"[P0-detect] 共推送 {new_p0_count} 个新 P0") print("[P0-detect] 完成") return if __name__ == "__main__": main()