410 lines
15 KiB
Python
410 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
P0 问题实时检测与分发
|
||
|
||
功能:
|
||
1. 从 MySQL 读取最近一段时间的飞书群消息
|
||
2. 复用 sync_feishu_feedback.py 的聚类 + 优先级判定逻辑
|
||
3. 过滤已推送过的 P0 簇(去重)
|
||
4. 仅推送新增 P0 到「小葵小葵」群
|
||
|
||
设计:
|
||
- 每分钟由 crontab 调用一次
|
||
- 查询最近 2 小时的消息,确保聚类质量
|
||
- 用「簇签名」(sorted message_ids)做去重
|
||
- 每天 10:00 清空去重状态(与全量分发错开)
|
||
|
||
用法:
|
||
python3 detect_p0_realtime.py [--dry-run] [--lookback-minutes 120]
|
||
"""
|
||
|
||
import sys, os, re, json, urllib.request, argparse, hashlib
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
|
||
# 将 sync_feishu_feedback.py 所在目录加入 sys.path,以便 import
|
||
SKILL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "skills", "feishu-feedback-sync", "scripts")
|
||
sys.path.insert(0, SKILL_DIR)
|
||
|
||
from sync_feishu_feedback import (
|
||
get_db_connection, query_messages, sort_threads, get_tenant_token, content_similarity,
|
||
DISPATCH_CHAT_ID, DISPATCH_CRED_DIR, P0_NOTIFY_USERS,
|
||
MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASS, MYSQL_DB,
|
||
)
|
||
from priority_classifier import compute_final_priority
|
||
|
||
# === 配置 ===
|
||
STATE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "tmp", "p0_dispatched_state.json")
|
||
LOOKBACK_MINUTES = 120 # 默认回顾 2 小时的消息
|
||
CLUSTER_MIN_SIZE = 2 # 至少 2 条消息才算有效簇
|
||
|
||
|
||
def load_dispatched_state():
|
||
"""加载已推送的 P0 簇状态。兼容新旧格式。"""
|
||
try:
|
||
with open(STATE_FILE, "r") as f:
|
||
state = json.load(f)
|
||
except (FileNotFoundError, json.JSONDecodeError):
|
||
state = {}
|
||
cutoff = (datetime.now() - timedelta(hours=24)).isoformat()
|
||
cleaned = {}
|
||
for k, v in state.items():
|
||
ts = v if isinstance(v, str) else v.get("time", "")
|
||
if ts > cutoff:
|
||
cleaned[k] = v
|
||
return cleaned
|
||
|
||
|
||
def save_dispatched_state(state):
|
||
"""保存已推送状态"""
|
||
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
|
||
tmp = STATE_FILE + ".tmp"
|
||
with open(tmp, "w") as f:
|
||
json.dump(state, f, ensure_ascii=False, indent=2)
|
||
os.rename(tmp, STATE_FILE)
|
||
|
||
|
||
def cluster_signature(cluster_msgs):
|
||
"""
|
||
生成簇的唯一签名。
|
||
用簇内所有 message_id 排序后拼接的 MD5,对簇成员变化敏感但适度容忍轻微变化。
|
||
"""
|
||
ids = sorted(str(m[0]) for m in cluster_msgs)
|
||
joined = ",".join(ids)
|
||
return hashlib.md5(joined.encode()).hexdigest()
|
||
|
||
|
||
def cluster_content_fingerprint(cluster_msgs):
|
||
"""生成基于内容语义的簇指纹,用于跨扫描去重(不依赖消息ID集合)"""
|
||
all_contents = []
|
||
for m in cluster_msgs:
|
||
c = str(m[3]).strip() if m[3] else ""
|
||
if c and len(c) > 8:
|
||
all_contents.append(c[:300])
|
||
aggregated = " | ".join(all_contents[:5])
|
||
senders = sorted(set(m[1] for m in cluster_msgs if m[1]))
|
||
times = [m[6] for m in cluster_msgs if m[6]]
|
||
hour = times[0][:13] if times else "unknown"
|
||
return {
|
||
"content": aggregated,
|
||
"senders": senders,
|
||
"hour": hour,
|
||
"msg_count": len(cluster_msgs),
|
||
}
|
||
|
||
|
||
def is_duplicate_p0(new_fp, dispatched_entries):
|
||
"""
|
||
基于内容语义判断新 P0 是否与已推送 P0 重复。
|
||
dispatched_entries: {sig: {"time": str, "fp": dict}}
|
||
"""
|
||
for entry in dispatched_entries.values():
|
||
old_fp = entry.get("fp")
|
||
if not old_fp:
|
||
continue
|
||
same_hour = new_fp["hour"] == old_fp["hour"]
|
||
sender_overlap = len(set(new_fp["senders"]) & set(old_fp["senders"]))
|
||
if same_hour and sender_overlap >= 1:
|
||
sim = content_similarity(new_fp["content"], old_fp["content"])
|
||
if sim > 0.20:
|
||
return True
|
||
if sender_overlap >= 2:
|
||
sim = content_similarity(new_fp["content"], old_fp["content"])
|
||
if sim > 0.35:
|
||
return True
|
||
return False
|
||
|
||
|
||
def is_probably_p0(cluster_msgs):
|
||
"""
|
||
快速判断一个簇是否是 P0 级别问题。
|
||
返回 (is_p0: bool, priority_info: dict)
|
||
"""
|
||
if len(cluster_msgs) < CLUSTER_MIN_SIZE:
|
||
return False, None
|
||
info = compute_final_priority(cluster_msgs)
|
||
return info["priority"] == "P0", info
|
||
|
||
|
||
def _clean_summary(text):
|
||
"""清洗摘要文本,提取核心问题描述(处理转发消息、内部讨论等噪音)。"""
|
||
# 去掉 [聊天记录] 等转发标记
|
||
text = re.sub(r'^\[聊天记录\]\s*', '', text)
|
||
# 去掉完整XML块
|
||
text = re.sub(r'<msg>.*?</msg>', '', text, flags=re.DOTALL)
|
||
# 去掉所有XML标签(含不完整的)
|
||
text = re.sub(r'</?[a-zA-Z][a-zA-Z0-9]*(?:\s[^>]*)?>?', '', text)
|
||
# 去掉残留的XML属性片段
|
||
text = re.sub(r'\b[a-zA-Z]+="[^"]*"', '', text)
|
||
# 截断 "↳ 回复 xxx:" 及之后的所有内容(内部讨论引用,非用户原始反馈)
|
||
text = re.split(r'↳\s*回复', text)[0].strip()
|
||
# 去掉媒体标记
|
||
text = re.sub(r'\[视频\]|\[图片\]|\[语音\]|\[文件\]|\[表情\]', '', text)
|
||
# 拆分发送人标记(emoji/符号 + 名字 + emoji/符号 + :),取最后一条用户消息
|
||
sender_pat = r'(?:[^\u4e00-\u9fff\w\d\s]+\s*)+[\u4e00-\u9fff\s]{1,20}\s*(?:[^\u4e00-\u9fff\w\d\s]+\s*)*:\s*'
|
||
parts = re.split(sender_pat, text)
|
||
meaningful = [p.strip() for p in parts if p.strip() and len(p.strip()) > 3]
|
||
if meaningful:
|
||
text = meaningful[-1]
|
||
# 去掉手机号/用户ID
|
||
text = re.sub(r'(?:^|(?<=[^\d]))1[3-9]\d{9}(?=[^\d]|$)', '', text)
|
||
# 去掉话术后缀
|
||
text = re.sub(r'[,,]?\s*(老师|辛苦|麻烦|帮忙)\s*(看下|看一下|看看|看)[。!!]*$', '', text)
|
||
text = re.sub(r'[,,]?\s*@\S+\s*', '', text)
|
||
# 去掉残留的数字+XML碎片
|
||
text = re.sub(r'\d+[a-zA-Z<>/]+$', '', text)
|
||
# 清理多余空格和标点
|
||
text = re.sub(r'\s+', ' ', text).strip()
|
||
text = re.sub(r'^[,,\s]+|[,,\s]+$', '', text)
|
||
|
||
# === 内部讨论话术改写:疑问句 → 陈述句 ===
|
||
# 去掉"这个反馈可以跟用户确认下..."等讨论话术前缀
|
||
text = re.sub(r'^(?:这个|该)?反馈可以(?:跟|和)用户确认下?', '', text)
|
||
text = re.sub(r'^(?:这个|该)?问题可以(?:跟|和)用户确认下?', '', text)
|
||
text = re.sub(r'^(?:这个|该)?(?:反馈|问题)?(?:可以)?(?:跟|和)用户确认下?', '', text)
|
||
text = re.sub(r'是在进行什么操作时', '', text)
|
||
text = re.sub(r'具体是什么操作时', '', text)
|
||
text = re.sub(r'是在什么情况下', '', text)
|
||
# 去掉句末疑问词
|
||
text = re.sub(r'[??!!。.]+$', '', text)
|
||
text = re.sub(r'[吗呢吧啊呀]$', '', text)
|
||
# 碎片化症状词 → 补全为陈述句
|
||
symptom_map = {
|
||
r'^闪退的?$': '用户反馈闪退,需确认操作场景',
|
||
r'^崩溃的?$': '用户反馈崩溃,需确认操作场景',
|
||
r'^卡退的?$': '用户反馈卡退,需确认操作场景',
|
||
r'^卡死的?$': '用户反馈卡死,需确认操作场景',
|
||
r'^打不开的?$': '用户反馈打不开,需确认操作场景',
|
||
}
|
||
for pat, repl in symptom_map.items():
|
||
if re.match(pat, text):
|
||
text = repl
|
||
break
|
||
# 再次清理
|
||
text = re.sub(r'\s+', ' ', text).strip()
|
||
text = re.sub(r'^[,,\s]+|[,,\s]+$', '', text)
|
||
return text
|
||
|
||
|
||
def _pick_best_summary(cluster_msgs):
|
||
"""从簇中选出最能代表 P0 问题的摘要消息。
|
||
优先选择匹配 P0 关键词且非转发/非内部讨论的消息。"""
|
||
from priority_classifier import P0_KEYWORDS
|
||
|
||
# 收集所有 P0 关键词正则
|
||
p0_patterns = []
|
||
for cat_pats in P0_KEYWORDS.values():
|
||
p0_patterns.append(cat_pats)
|
||
combined_p0 = re.compile('|'.join(p0_patterns), re.IGNORECASE)
|
||
|
||
# 判断是否为转发消息/内部讨论(包含 [聊天记录]、↳ 回复 等标记)
|
||
def _is_forward_or_discussion(t):
|
||
return bool(re.search(r'^\[聊天记录\]|↳\s*回复|<msg>|<appmsg', t))
|
||
|
||
best = None
|
||
best_noise = None # 兜底:第一条非空消息(即使有噪音)
|
||
for m in cluster_msgs:
|
||
t = str(m[3]).strip() if m[3] else ""
|
||
if not t or len(t) <= 3:
|
||
continue
|
||
if best_noise is None:
|
||
best_noise = t
|
||
# 跳过转发消息和内部讨论,优先找用户原始反馈
|
||
if _is_forward_or_discussion(t):
|
||
continue
|
||
if best is None:
|
||
best = t
|
||
if combined_p0.search(t):
|
||
return _clean_summary(t)[:150]
|
||
|
||
# 如果没有非转发的消息,从有噪音的消息中找 P0 关键词
|
||
if best is None:
|
||
for m in cluster_msgs:
|
||
t = str(m[3]).strip() if m[3] else ""
|
||
if not t or len(t) <= 3:
|
||
continue
|
||
if combined_p0.search(t):
|
||
return _clean_summary(t)[:150]
|
||
best = best_noise
|
||
|
||
return _clean_summary(best or "")[:150]
|
||
|
||
|
||
def generate_p0_alert_text(cluster_msgs, priority_info):
|
||
"""
|
||
生成 P0 问题的简短告警文本(精简版,不含完整文档链接)。
|
||
"""
|
||
root_sender = cluster_msgs[0][1]
|
||
root_time = cluster_msgs[0][6]
|
||
root_text = _pick_best_summary(cluster_msgs)
|
||
|
||
lines = [
|
||
f"🚨 P0 实时告警",
|
||
f"",
|
||
f"问题描述: {root_text}",
|
||
f"报告人: {root_sender}",
|
||
f"报告时间: {root_time}",
|
||
f"判定依据: {priority_info.get('reasoning', 'P0')}",
|
||
f"修复时限: {priority_info.get('deadline', '2小时内')}",
|
||
]
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def dispatch_p0_alert(alert_text):
|
||
"""
|
||
将 P0 告警发送到「小葵小葵」群,@ 指定人员。
|
||
使用飞书 post 富文本格式。
|
||
"""
|
||
import urllib.request
|
||
token = get_tenant_token(cred_dir=DISPATCH_CRED_DIR)
|
||
|
||
# 构建富文本内容
|
||
content_parts = []
|
||
for line in alert_text.split("\n"):
|
||
if line.strip():
|
||
content_parts.append([{"tag": "text", "text": line + "\n"}])
|
||
|
||
# 追加 @ 通知
|
||
if P0_NOTIFY_USERS:
|
||
at_line = [{"tag": "text", "text": "\n⚠️ 请关注: "}]
|
||
for uid in P0_NOTIFY_USERS:
|
||
at_line.append({"tag": "at", "user_id": uid})
|
||
at_line.append({"tag": "text", "text": " "})
|
||
content_parts.append(at_line)
|
||
|
||
post_content = json.dumps({
|
||
"zh_cn": {
|
||
"title": "🚨 P0 问题实时告警",
|
||
"content": content_parts
|
||
}
|
||
}, ensure_ascii=False)
|
||
|
||
body = json.dumps({
|
||
"receive_id": DISPATCH_CHAT_ID,
|
||
"msg_type": "post",
|
||
"content": post_content
|
||
}, ensure_ascii=False).encode()
|
||
|
||
req = urllib.request.Request(
|
||
"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id",
|
||
data=body,
|
||
headers={
|
||
"Authorization": f"Bearer {token}",
|
||
"Content-Type": "application/json"
|
||
},
|
||
method="POST"
|
||
)
|
||
resp = urllib.request.urlopen(req, timeout=10)
|
||
d = json.loads(resp.read())
|
||
if d.get("code") == 0:
|
||
return True
|
||
else:
|
||
print(f" ⚠️ 实时分发失败: {d.get('msg', '')[:100]}")
|
||
return False
|
||
|
||
|
||
def should_clear_state():
|
||
"""检查是否到了每天清空状态的时间(10:00-10:01 之间清空,配合全量分发)"""
|
||
now = datetime.now()
|
||
return now.hour == 10 and now.minute <= 1
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="P0 问题实时检测与分发")
|
||
parser.add_argument("--dry-run", action="store_true", help="仅打印不推送")
|
||
parser.add_argument("--lookback-minutes", type=int, default=LOOKBACK_MINUTES,
|
||
help=f"查询最近 N 分钟的消息(默认 {LOOKBACK_MINUTES})")
|
||
args = parser.parse_args()
|
||
|
||
# 清空逻辑:每天 10:00-10:01 之间清空去重状态,让全量分发正常进行
|
||
if should_clear_state():
|
||
print("[P0-detect] 10:00 清空去重状态,配合全量分发")
|
||
save_dispatched_state({})
|
||
# 10:00 的全量流程会处理,这里直接退出避免重复
|
||
print("[P0-detect] 全量分发时段,跳过实时检测")
|
||
return
|
||
|
||
print(f"[P0-detect] 扫描最近 {args.lookback_minutes} 分钟消息...")
|
||
lookback_start = (datetime.now() - timedelta(minutes=args.lookback_minutes)).strftime("%Y-%m-%d %H:%M:%S")
|
||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
# 查询最近消息(直接用 SQL,避免 query_messages 的按天查询限制)
|
||
import pymysql
|
||
conn = pymysql.connect(
|
||
host=MYSQL_HOST, port=MYSQL_PORT,
|
||
user=MYSQL_USER, password=MYSQL_PASS,
|
||
database=MYSQL_DB, charset="utf8mb4"
|
||
)
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
SELECT message_id, sender_name, msg_type, content, media_url, quote_message_id,
|
||
DATE_FORMAT(msg_time, '%%Y-%%m-%%d %%H:%%i:%%s') as msg_time, msg_timestamp
|
||
FROM lark_group_message
|
||
WHERE msg_time >= %s AND msg_time <= %s
|
||
ORDER BY msg_time ASC
|
||
""", (lookback_start, now_str))
|
||
rows = cursor.fetchall()
|
||
conn.close()
|
||
|
||
print(f"[P0-detect] 查询到 {len(rows)} 条消息")
|
||
|
||
if len(rows) < 2:
|
||
print("[P0-detect] 消息不足,退出")
|
||
return
|
||
|
||
# 聚类
|
||
sorted_msgs, clusters, cluster_order = sort_threads(rows)
|
||
print(f"[P0-detect] 聚类完成:{len(clusters)} 个簇")
|
||
|
||
# 加载已推送状态
|
||
state = load_dispatched_state()
|
||
print(f"[P0-detect] 已记录 {len(state)} 个已推送簇")
|
||
|
||
# 遍历簇,找出 P0 且未推送的
|
||
new_p0_count = 0
|
||
for cid in cluster_order:
|
||
cmsgs = clusters[cid]
|
||
is_p0, info = is_probably_p0(cmsgs)
|
||
if not is_p0:
|
||
continue
|
||
|
||
sig = cluster_signature(cmsgs)
|
||
if sig in state:
|
||
print(f"[P0-detect] 已推送过(精确匹配),跳过: sig={sig[:8]}...")
|
||
continue
|
||
|
||
# 内容语义去重
|
||
fp = cluster_content_fingerprint(cmsgs)
|
||
if is_duplicate_p0(fp, state):
|
||
print(f"[P0-detect] 已推送过(内容匹配),跳过: senders={fp['senders'][:2]}... hour={fp['hour']}")
|
||
continue
|
||
|
||
print(f"[P0-detect] 🚨 发现新 P0! sig={sig[:8]}... {len(cmsgs)}条消息")
|
||
|
||
if args.dry_run:
|
||
alert = generate_p0_alert_text(cmsgs, info)
|
||
print(f"[DRY-RUN] 将发送:\n{alert}")
|
||
state[sig] = {"time": datetime.now().isoformat(), "fp": fp}
|
||
new_p0_count += 1
|
||
else:
|
||
alert = generate_p0_alert_text(cmsgs, info)
|
||
success = dispatch_p0_alert(alert)
|
||
if success:
|
||
print(f"[P0-detect] ✅ P0 已实时推送")
|
||
state[sig] = {"time": datetime.now().isoformat(), "fp": fp}
|
||
new_p0_count += 1
|
||
else:
|
||
print(f"[P0-detect] ❌ 推送失败")
|
||
|
||
if new_p0_count > 0:
|
||
save_dispatched_state(state)
|
||
print(f"[P0-detect] 共推送 {new_p0_count} 个新 P0")
|
||
|
||
print("[P0-detect] 完成")
|
||
return
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|