#!/usr/bin/env python3 """ 微信 P0 问题实时检测与分发 功能: 1. 从 MySQL 读取最近一段时间的微信用户火线救火群消息 2. 复用 sync_feishu_feedback.py 的聚类 + 优先级判定逻辑 3. 过滤已推送过的 P0 簇(去重) 4. 仅推送新增 P0 到「小葵小葵」群 设计: - 每分钟由 crontab 调用一次 - 查询最近 2 小时的消息,确保聚类质量 - 用「簇签名」(sorted message_ids)做去重 - 每天 10:00 清空去重状态(与全量分发错开) 用法: python3 detect_p0_wechat.py [--dry-run] [--lookback-minutes 120] """ import sys, os, json, hashlib, argparse, pymysql from datetime import datetime, timedelta SKILL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "skills", "feishu-feedback-sync", "scripts") sys.path.insert(0, SKILL_DIR) from sync_feishu_feedback import ( sort_threads, get_tenant_token, content_similarity, DISPATCH_CHAT_ID, DISPATCH_CRED_DIR, P0_NOTIFY_USERS, MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASS, MYSQL_DB, ) from priority_classifier import compute_final_priority # === 微信专用配置 === STATE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "tmp", "p0_dispatched_state_wechat.json") LOOKBACK_MINUTES = 120 CLUSTER_MIN_SIZE = 2 def load_dispatched_state(): try: with open(STATE_FILE, "r") as f: state = json.load(f) except (FileNotFoundError, json.JSONDecodeError): state = {} cutoff = (datetime.now() - timedelta(hours=24)).isoformat() # 兼容新旧格式:新格式 value 是 {"time": ..., "fp": ...},旧格式是纯时间字符串 cleaned = {} for k, v in state.items(): ts = v if isinstance(v, str) else v.get("time", "") if ts > cutoff: cleaned[k] = v return cleaned def save_dispatched_state(state): os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) tmp = STATE_FILE + ".tmp" with open(tmp, "w") as f: json.dump(state, f, ensure_ascii=False, indent=2) os.rename(tmp, STATE_FILE) def cluster_signature(cluster_msgs): ids = sorted(str(m[0]) for m in cluster_msgs) return hashlib.md5(",".join(ids).encode()).hexdigest() def cluster_content_fingerprint(cluster_msgs): """生成基于内容语义的簇指纹,用于跨扫描去重(不依赖消息ID集合)""" # 拼接簇内所有有意义的消息内容(跳过纯图片/文件/表情) all_contents = [] for m in cluster_msgs: c = str(m[3]).strip() if m[3] else "" if c and len(c) > 8: all_contents.append(c[:300]) # 取前5条聚合,保证核心问题描述稳定 aggregated = " | ".join(all_contents[:5]) # 提取发送人集合(排序保证一致性) senders = sorted(set(m[1] for m in cluster_msgs if m[1])) # 提取小时粒度的时间窗口 times = [m[6] for m in cluster_msgs if m[6]] hour = times[0][:13] if times else "unknown" return { "content": aggregated, "senders": senders, "hour": hour, "msg_count": len(cluster_msgs), } def is_duplicate_p0(new_fp, dispatched_entries): """ 基于内容语义判断新 P0 是否与已推送 P0 重复。 dispatched_entries: {sig: {"time": str, "fp": dict}} """ for entry in dispatched_entries.values(): old_fp = entry.get("fp") if not old_fp: continue same_hour = new_fp["hour"] == old_fp["hour"] sender_overlap = len(set(new_fp["senders"]) & set(old_fp["senders"])) # 条件1: 同一小时 + 发送人有交集 + 内容相似度 > 0.20(聚合内容稳定,宽松阈值足够区分) if same_hour and sender_overlap >= 1: sim = content_similarity(new_fp["content"], old_fp["content"]) if sim > 0.20: return True # 条件2: 发送人高度重叠 + 内容相似度 > 0.35(跨小时场景) if sender_overlap >= 2: sim = content_similarity(new_fp["content"], old_fp["content"]) if sim > 0.35: return True return False def is_probably_p0(cluster_msgs): if len(cluster_msgs) < CLUSTER_MIN_SIZE: return False, None info = compute_final_priority(cluster_msgs) return info["priority"] == "P0", info def generate_p0_alert_text(cluster_msgs, priority_info): root_sender = cluster_msgs[0][1] root_time = cluster_msgs[0][6] root_text = "" for m in cluster_msgs: t = str(m[3]) if m[3] else "" t = t.strip() if t and len(t) > 3: root_text = t[:100] break senders = list(dict.fromkeys(m[1] for m in cluster_msgs)) return "\n".join([ f"🚨 微信 P0 实时告警", f"", f"**报告人:** {root_sender}", f"**时间:** {root_time}", f"**涉及人员:** {'、'.join(senders[:5])}" + ("等" if len(senders) > 5 else ""), f"**消息数:** {len(cluster_msgs)} 条", f"", f"**摘要:** {root_text}", f"", f"**判定依据:** {priority_info.get('reasoning', 'P0')}", f"**修复时限:** {priority_info.get('deadline', '2小时内')}", ]) def dispatch_p0_alert(alert_text): import urllib.request token = get_tenant_token(cred_dir=DISPATCH_CRED_DIR) content_parts = [] for line in alert_text.split("\n"): if line.strip(): content_parts.append([{"tag": "text", "text": line + "\n"}]) if P0_NOTIFY_USERS: at_line = [{"tag": "text", "text": "\n⚠️ 请关注: "}] for uid in P0_NOTIFY_USERS: at_line.append({"tag": "at", "user_id": uid}) at_line.append({"tag": "text", "text": " "}) content_parts.append(at_line) post_content = json.dumps({ "zh_cn": { "title": "🚨 微信 P0 问题实时告警", "content": content_parts } }, ensure_ascii=False) body = json.dumps({ "receive_id": DISPATCH_CHAT_ID, "msg_type": "post", "content": post_content }, ensure_ascii=False).encode() req = urllib.request.Request( "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id", data=body, headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, method="POST" ) resp = urllib.request.urlopen(req, timeout=10) d = json.loads(resp.read()) if d.get("code") == 0: return True else: print(f" ⚠️ 实时分发失败: {d.get('msg', '')[:100]}") return False def should_clear_state(): now = datetime.now() return now.hour == 10 and now.minute <= 1 def main(): parser = argparse.ArgumentParser(description="微信 P0 问题实时检测与分发") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--lookback-minutes", type=int, default=LOOKBACK_MINUTES) args = parser.parse_args() if should_clear_state(): print("[P0-wechat] 10:00 清空去重状态") save_dispatched_state({}) print("[P0-wechat] 全量分发时段,跳过实时检测") return print(f"[P0-wechat] 扫描最近 {args.lookback_minutes} 分钟微信消息...") lookback_start = (datetime.now() - timedelta(minutes=args.lookback_minutes)).strftime("%Y-%m-%d %H:%M:%S") now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") conn = pymysql.connect( host=MYSQL_HOST, port=MYSQL_PORT, user=MYSQL_USER, password=MYSQL_PASS, database=MYSQL_DB, charset="utf8mb4" ) cursor = conn.cursor() cursor.execute(""" SELECT svr_msg_id, sender_name, msg_type, content, media_url, refer_msg_svrid, DATE_FORMAT(msg_time, '%%Y-%%m-%%d %%H:%%i:%%s') as msg_time, msg_timestamp FROM wechat_group_message WHERE msg_time >= %s AND msg_time <= %s ORDER BY msg_time ASC """, (lookback_start, now_str)) raw_rows = cursor.fetchall() conn.close() # 映射为统一元组格式 (message_id, sender_name, msg_type, content, media_url, quote_message_id, msg_time, msg_timestamp) rows = [] for row in raw_rows: svr_id, sname, mtype, content, murl, ref_id, mtime, mts = row rows.append(( str(svr_id) if svr_id else "", sname or "", mtype or "text", content or "", murl or "", str(ref_id) if ref_id else "", mtime or "", int(mts) if mts else 0, )) print(f"[P0-wechat] 查询到 {len(rows)} 条微信消息") if len(rows) < 2: print("[P0-wechat] 消息不足,退出") return sorted_msgs, clusters, cluster_order = sort_threads(rows) print(f"[P0-wechat] 聚类完成:{len(clusters)} 个簇") state = load_dispatched_state() print(f"[P0-wechat] 已记录 {len(state)} 个已推送簇") new_p0_count = 0 for cid in cluster_order: cmsgs = clusters[cid] is_p0, info = is_probably_p0(cmsgs) if not is_p0: continue sig = cluster_signature(cmsgs) if sig in state: print(f"[P0-wechat] 已推送过(精确匹配),跳过: sig={sig[:8]}...") continue # 内容语义去重 fp = cluster_content_fingerprint(cmsgs) if is_duplicate_p0(fp, state): print(f"[P0-wechat] 已推送过(内容匹配),跳过: senders={fp['senders'][:2]}... hour={fp['hour']}") continue print(f"[P0-wechat] 🚨 发现新 P0! sig={sig[:8]}... {len(cmsgs)}条消息") if args.dry_run: alert = generate_p0_alert_text(cmsgs, info) print(f"[DRY-RUN] 将发送:\n{alert}") state[sig] = {"time": datetime.now().isoformat(), "fp": fp} new_p0_count += 1 else: alert = generate_p0_alert_text(cmsgs, info) if dispatch_p0_alert(alert): print(f"[P0-wechat] ✅ P0 已实时推送") state[sig] = {"time": datetime.now().isoformat(), "fp": fp} new_p0_count += 1 else: print(f"[P0-wechat] ❌ 推送失败") if new_p0_count > 0: save_dispatched_state(state) print(f"[P0-wechat] 共推送 {new_p0_count} 个新 P0") print("[P0-wechat] 完成") if __name__ == "__main__": main()