#!/usr/bin/env python3 """ 微信 P0 问题实时检测与分发 功能: 1. 从 MySQL 读取最近一段时间的微信用户火线救火群消息 2. 复用 sync_feishu_feedback.py 的聚类 + 优先级判定逻辑 3. 过滤已推送过的 P0 簇(去重) 4. 仅推送新增 P0 到「小葵小葵」群 设计: - 每分钟由 crontab 调用一次 - 查询最近 2 小时的消息,确保聚类质量 - 用「簇签名」(sorted message_ids)做去重 - 每天 10:00 清空去重状态(与全量分发错开) 用法: python3 detect_p0_wechat.py [--dry-run] [--lookback-minutes 120] """ import sys, os, json, hashlib, argparse, pymysql from datetime import datetime, timedelta SKILL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "skills", "feishu-feedback-sync", "scripts") sys.path.insert(0, SKILL_DIR) from sync_feishu_feedback import ( sort_threads, get_tenant_token, DISPATCH_CHAT_ID, DISPATCH_CRED_DIR, P0_NOTIFY_USERS, MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASS, MYSQL_DB, ) from priority_classifier import compute_final_priority # === 微信专用配置 === STATE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "tmp", "p0_dispatched_state_wechat.json") LOOKBACK_MINUTES = 120 CLUSTER_MIN_SIZE = 2 def load_dispatched_state(): try: with open(STATE_FILE, "r") as f: state = json.load(f) except (FileNotFoundError, json.JSONDecodeError): state = {} cutoff = (datetime.now() - timedelta(hours=24)).isoformat() return {k: v for k, v in state.items() if v > cutoff} def save_dispatched_state(state): os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) tmp = STATE_FILE + ".tmp" with open(tmp, "w") as f: json.dump(state, f, ensure_ascii=False, indent=2) os.rename(tmp, STATE_FILE) def cluster_signature(cluster_msgs): ids = sorted(str(m[0]) for m in cluster_msgs) return hashlib.md5(",".join(ids).encode()).hexdigest() def is_probably_p0(cluster_msgs): if len(cluster_msgs) < CLUSTER_MIN_SIZE: return False, None info = compute_final_priority(cluster_msgs) return info["priority"] == "P0", info def generate_p0_alert_text(cluster_msgs, priority_info): root_sender = cluster_msgs[0][1] root_time = cluster_msgs[0][6] root_text = "" for m in cluster_msgs: t = str(m[3]) if m[3] else "" t = t.strip() if t and len(t) > 3: root_text = t[:100] break senders = list(dict.fromkeys(m[1] for m in cluster_msgs)) return "\n".join([ f"🚨 微信 P0 实时告警", f"", f"**报告人:** {root_sender}", f"**时间:** {root_time}", f"**涉及人员:** {'、'.join(senders[:5])}" + ("等" if len(senders) > 5 else ""), f"**消息数:** {len(cluster_msgs)} 条", f"", f"**摘要:** {root_text}", f"", f"**判定依据:** {priority_info.get('reasoning', 'P0')}", f"**修复时限:** {priority_info.get('deadline', '2小时内')}", ]) def dispatch_p0_alert(alert_text): import urllib.request token = get_tenant_token(cred_dir=DISPATCH_CRED_DIR) content_parts = [] for line in alert_text.split("\n"): if line.strip(): content_parts.append([{"tag": "text", "text": line + "\n"}]) if P0_NOTIFY_USERS: at_line = [{"tag": "text", "text": "\n⚠️ 请关注: "}] for uid in P0_NOTIFY_USERS: at_line.append({"tag": "at", "user_id": uid}) at_line.append({"tag": "text", "text": " "}) content_parts.append(at_line) post_content = json.dumps({ "zh_cn": { "title": "🚨 微信 P0 问题实时告警", "content": content_parts } }, ensure_ascii=False) body = json.dumps({ "receive_id": DISPATCH_CHAT_ID, "msg_type": "post", "content": post_content }, ensure_ascii=False).encode() req = urllib.request.Request( "https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id", data=body, headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, method="POST" ) resp = urllib.request.urlopen(req, timeout=10) d = json.loads(resp.read()) if d.get("code") == 0: return True else: print(f" ⚠️ 实时分发失败: {d.get('msg', '')[:100]}") return False def should_clear_state(): now = datetime.now() return now.hour == 10 and now.minute <= 1 def main(): parser = argparse.ArgumentParser(description="微信 P0 问题实时检测与分发") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--lookback-minutes", type=int, default=LOOKBACK_MINUTES) args = parser.parse_args() if should_clear_state(): print("[P0-wechat] 10:00 清空去重状态") save_dispatched_state({}) print("[P0-wechat] 全量分发时段,跳过实时检测") return print(f"[P0-wechat] 扫描最近 {args.lookback_minutes} 分钟微信消息...") lookback_start = (datetime.now() - timedelta(minutes=args.lookback_minutes)).strftime("%Y-%m-%d %H:%M:%S") now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") conn = pymysql.connect( host=MYSQL_HOST, port=MYSQL_PORT, user=MYSQL_USER, password=MYSQL_PASS, database=MYSQL_DB, charset="utf8mb4" ) cursor = conn.cursor() cursor.execute(""" SELECT svr_msg_id, sender_name, msg_type, content, media_url, refer_msg_svrid, DATE_FORMAT(msg_time, '%%Y-%%m-%%d %%H:%%i:%%s') as msg_time, msg_timestamp FROM wechat_group_message WHERE msg_time >= %s AND msg_time <= %s ORDER BY msg_time ASC """, (lookback_start, now_str)) raw_rows = cursor.fetchall() conn.close() # 映射为统一元组格式 (message_id, sender_name, msg_type, content, media_url, quote_message_id, msg_time, msg_timestamp) rows = [] for row in raw_rows: svr_id, sname, mtype, content, murl, ref_id, mtime, mts = row rows.append(( str(svr_id) if svr_id else "", sname or "", mtype or "text", content or "", murl or "", str(ref_id) if ref_id else "", mtime or "", int(mts) if mts else 0, )) print(f"[P0-wechat] 查询到 {len(rows)} 条微信消息") if len(rows) < 2: print("[P0-wechat] 消息不足,退出") return sorted_msgs, clusters, cluster_order = sort_threads(rows) print(f"[P0-wechat] 聚类完成:{len(clusters)} 个簇") state = load_dispatched_state() print(f"[P0-wechat] 已记录 {len(state)} 个已推送簇签名") new_p0_count = 0 for cid in cluster_order: cmsgs = clusters[cid] is_p0, info = is_probably_p0(cmsgs) if not is_p0: continue sig = cluster_signature(cmsgs) if sig in state: print(f"[P0-wechat] 已推送过,跳过: sig={sig[:8]}...") continue print(f"[P0-wechat] 🚨 发现新 P0! sig={sig[:8]}... {len(cmsgs)}条消息") if args.dry_run: alert = generate_p0_alert_text(cmsgs, info) print(f"[DRY-RUN] 将发送:\n{alert}") state[sig] = datetime.now().isoformat() new_p0_count += 1 else: alert = generate_p0_alert_text(cmsgs, info) if dispatch_p0_alert(alert): print(f"[P0-wechat] ✅ P0 已实时推送") state[sig] = datetime.now().isoformat() new_p0_count += 1 else: print(f"[P0-wechat] ❌ 推送失败") if new_p0_count > 0: save_dispatched_state(state) print(f"[P0-wechat] 共推送 {new_p0_count} 个新 P0") print("[P0-wechat] 完成") if __name__ == "__main__": main()