ai_member_xiaokui/scripts/detect_p0_realtime.py
2026-05-28 08:10:01 +08:00

323 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
P0 问题实时检测与分发
功能:
1. 从 MySQL 读取最近一段时间的飞书群消息
2. 复用 sync_feishu_feedback.py 的聚类 + 优先级判定逻辑
3. 过滤已推送过的 P0 簇(去重)
4. 仅推送新增 P0 到「小葵小葵」群
设计:
- 每分钟由 crontab 调用一次
- 查询最近 2 小时的消息,确保聚类质量
- 用「簇签名」sorted message_ids做去重
- 每天 10:00 清空去重状态(与全量分发错开)
用法:
python3 detect_p0_realtime.py [--dry-run] [--lookback-minutes 120]
"""
import sys, os, json, urllib.request, argparse, hashlib
from datetime import datetime, timedelta
from pathlib import Path
# 将 sync_feishu_feedback.py 所在目录加入 sys.path以便 import
SKILL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "skills", "feishu-feedback-sync", "scripts")
sys.path.insert(0, SKILL_DIR)
from sync_feishu_feedback import (
get_db_connection, query_messages, sort_threads, get_tenant_token, content_similarity,
DISPATCH_CHAT_ID, DISPATCH_CRED_DIR, P0_NOTIFY_USERS,
MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASS, MYSQL_DB,
)
from priority_classifier import compute_final_priority
# === 配置 ===
STATE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "tmp", "p0_dispatched_state.json")
LOOKBACK_MINUTES = 120 # 默认回顾 2 小时的消息
CLUSTER_MIN_SIZE = 2 # 至少 2 条消息才算有效簇
def load_dispatched_state():
"""加载已推送的 P0 簇状态。兼容新旧格式。"""
try:
with open(STATE_FILE, "r") as f:
state = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
state = {}
cutoff = (datetime.now() - timedelta(hours=24)).isoformat()
cleaned = {}
for k, v in state.items():
ts = v if isinstance(v, str) else v.get("time", "")
if ts > cutoff:
cleaned[k] = v
return cleaned
def save_dispatched_state(state):
"""保存已推送状态"""
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
tmp = STATE_FILE + ".tmp"
with open(tmp, "w") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
os.rename(tmp, STATE_FILE)
def cluster_signature(cluster_msgs):
"""
生成簇的唯一签名。
用簇内所有 message_id 排序后拼接的 MD5对簇成员变化敏感但适度容忍轻微变化。
"""
ids = sorted(str(m[0]) for m in cluster_msgs)
joined = ",".join(ids)
return hashlib.md5(joined.encode()).hexdigest()
def cluster_content_fingerprint(cluster_msgs):
"""生成基于内容语义的簇指纹用于跨扫描去重不依赖消息ID集合"""
all_contents = []
for m in cluster_msgs:
c = str(m[3]).strip() if m[3] else ""
if c and len(c) > 8:
all_contents.append(c[:300])
aggregated = " | ".join(all_contents[:5])
senders = sorted(set(m[1] for m in cluster_msgs if m[1]))
times = [m[6] for m in cluster_msgs if m[6]]
hour = times[0][:13] if times else "unknown"
return {
"content": aggregated,
"senders": senders,
"hour": hour,
"msg_count": len(cluster_msgs),
}
def is_duplicate_p0(new_fp, dispatched_entries):
"""
基于内容语义判断新 P0 是否与已推送 P0 重复。
dispatched_entries: {sig: {"time": str, "fp": dict}}
"""
for entry in dispatched_entries.values():
old_fp = entry.get("fp")
if not old_fp:
continue
same_hour = new_fp["hour"] == old_fp["hour"]
sender_overlap = len(set(new_fp["senders"]) & set(old_fp["senders"]))
if same_hour and sender_overlap >= 1:
sim = content_similarity(new_fp["content"], old_fp["content"])
if sim > 0.20:
return True
if sender_overlap >= 2:
sim = content_similarity(new_fp["content"], old_fp["content"])
if sim > 0.35:
return True
return False
def is_probably_p0(cluster_msgs):
"""
快速判断一个簇是否是 P0 级别问题。
返回 (is_p0: bool, priority_info: dict)
"""
if len(cluster_msgs) < CLUSTER_MIN_SIZE:
return False, None
info = compute_final_priority(cluster_msgs)
return info["priority"] == "P0", info
def generate_p0_alert_text(cluster_msgs, priority_info):
"""
生成 P0 问题的简短告警文本(精简版,不含完整文档链接)。
"""
# 收集关键信息
root_sender = cluster_msgs[0][1]
root_time = cluster_msgs[0][6]
latest_time = cluster_msgs[-1][6]
# 提取第一条有实质内容的消息作为摘要
root_text = ""
for m in cluster_msgs:
t = str(m[3]) if m[3] else ""
t = t.strip()
if t and len(t) > 3:
root_text = t[:100]
break
# 收集所有发言人
senders = list(dict.fromkeys(m[1] for m in cluster_msgs)) # 去重保序
lines = [
f"🚨 P0 实时告警",
f"",
f"**报告人:** {root_sender}",
f"**时间:** {root_time}",
f"**涉及人员:** {''.join(senders[:5])}" + ("" if len(senders) > 5 else ""),
f"**消息数:** {len(cluster_msgs)}",
f"",
f"**摘要:** {root_text}",
f"",
f"**判定依据:** {priority_info.get('reasoning', 'P0')}",
f"**修复时限:** {priority_info.get('deadline', '2小时内')}",
]
return "\n".join(lines)
def dispatch_p0_alert(alert_text):
"""
将 P0 告警发送到「小葵小葵」群,@ 指定人员。
使用飞书 post 富文本格式。
"""
import urllib.request
token = get_tenant_token(cred_dir=DISPATCH_CRED_DIR)
# 构建富文本内容
content_parts = []
for line in alert_text.split("\n"):
if line.strip():
content_parts.append([{"tag": "text", "text": line + "\n"}])
# 追加 @ 通知
if P0_NOTIFY_USERS:
at_line = [{"tag": "text", "text": "\n⚠️ 请关注: "}]
for uid in P0_NOTIFY_USERS:
at_line.append({"tag": "at", "user_id": uid})
at_line.append({"tag": "text", "text": " "})
content_parts.append(at_line)
post_content = json.dumps({
"zh_cn": {
"title": "🚨 P0 问题实时告警",
"content": content_parts
}
}, ensure_ascii=False)
body = json.dumps({
"receive_id": DISPATCH_CHAT_ID,
"msg_type": "post",
"content": post_content
}, ensure_ascii=False).encode()
req = urllib.request.Request(
"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id",
data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
},
method="POST"
)
resp = urllib.request.urlopen(req, timeout=10)
d = json.loads(resp.read())
if d.get("code") == 0:
return True
else:
print(f" ⚠️ 实时分发失败: {d.get('msg', '')[:100]}")
return False
def should_clear_state():
"""检查是否到了每天清空状态的时间10:00-10:01 之间清空,配合全量分发)"""
now = datetime.now()
return now.hour == 10 and now.minute <= 1
def main():
parser = argparse.ArgumentParser(description="P0 问题实时检测与分发")
parser.add_argument("--dry-run", action="store_true", help="仅打印不推送")
parser.add_argument("--lookback-minutes", type=int, default=LOOKBACK_MINUTES,
help=f"查询最近 N 分钟的消息(默认 {LOOKBACK_MINUTES}")
args = parser.parse_args()
# 清空逻辑:每天 10:00-10:01 之间清空去重状态,让全量分发正常进行
if should_clear_state():
print("[P0-detect] 10:00 清空去重状态,配合全量分发")
save_dispatched_state({})
# 10:00 的全量流程会处理,这里直接退出避免重复
print("[P0-detect] 全量分发时段,跳过实时检测")
return
print(f"[P0-detect] 扫描最近 {args.lookback_minutes} 分钟消息...")
lookback_start = (datetime.now() - timedelta(minutes=args.lookback_minutes)).strftime("%Y-%m-%d %H:%M:%S")
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 查询最近消息(直接用 SQL避免 query_messages 的按天查询限制)
import pymysql
conn = pymysql.connect(
host=MYSQL_HOST, port=MYSQL_PORT,
user=MYSQL_USER, password=MYSQL_PASS,
database=MYSQL_DB, charset="utf8mb4"
)
cursor = conn.cursor()
cursor.execute("""
SELECT message_id, sender_name, msg_type, content, media_url, quote_message_id,
DATE_FORMAT(msg_time, '%%Y-%%m-%%d %%H:%%i:%%s') as msg_time, msg_timestamp
FROM lark_group_message
WHERE msg_time >= %s AND msg_time <= %s
ORDER BY msg_time ASC
""", (lookback_start, now_str))
rows = cursor.fetchall()
conn.close()
print(f"[P0-detect] 查询到 {len(rows)} 条消息")
if len(rows) < 2:
print("[P0-detect] 消息不足,退出")
return
# 聚类
sorted_msgs, clusters, cluster_order = sort_threads(rows)
print(f"[P0-detect] 聚类完成:{len(clusters)} 个簇")
# 加载已推送状态
state = load_dispatched_state()
print(f"[P0-detect] 已记录 {len(state)} 个已推送簇")
# 遍历簇,找出 P0 且未推送的
new_p0_count = 0
for cid in cluster_order:
cmsgs = clusters[cid]
is_p0, info = is_probably_p0(cmsgs)
if not is_p0:
continue
sig = cluster_signature(cmsgs)
if sig in state:
print(f"[P0-detect] 已推送过(精确匹配),跳过: sig={sig[:8]}...")
continue
# 内容语义去重
fp = cluster_content_fingerprint(cmsgs)
if is_duplicate_p0(fp, state):
print(f"[P0-detect] 已推送过(内容匹配),跳过: senders={fp['senders'][:2]}... hour={fp['hour']}")
continue
print(f"[P0-detect] 🚨 发现新 P0! sig={sig[:8]}... {len(cmsgs)}条消息")
if args.dry_run:
alert = generate_p0_alert_text(cmsgs, info)
print(f"[DRY-RUN] 将发送:\n{alert}")
state[sig] = {"time": datetime.now().isoformat(), "fp": fp}
new_p0_count += 1
else:
alert = generate_p0_alert_text(cmsgs, info)
success = dispatch_p0_alert(alert)
if success:
print(f"[P0-detect] ✅ P0 已实时推送")
state[sig] = {"time": datetime.now().isoformat(), "fp": fp}
new_p0_count += 1
else:
print(f"[P0-detect] ❌ 推送失败")
if new_p0_count > 0:
save_dispatched_state(state)
print(f"[P0-detect] 共推送 {new_p0_count} 个新 P0")
print("[P0-detect] 完成")
return
if __name__ == "__main__":
main()