ai_member_xiaokui/scripts/detect_p0_wechat.py
2026-06-25 08:10:01 +08:00

491 lines
19 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
微信 P0 问题实时检测与分发
功能:
1. 从 MySQL 读取最近一段时间的微信用户火线救火群消息
2. 复用 sync_feishu_feedback.py 的聚类 + 优先级判定逻辑
3. 过滤已推送过的 P0 簇(去重)
4. 仅推送新增 P0 到「小葵小葵」群
设计:
- 每分钟由 crontab 调用一次
- 查询最近 2 小时的消息,确保聚类质量
- 用「簇签名」sorted message_ids做去重
- 每天 10:00 清空去重状态(与全量分发错开)
用法:
python3 detect_p0_wechat.py [--dry-run] [--lookback-minutes 120]
"""
import sys, os, re, json, hashlib, argparse, pymysql
from datetime import datetime, timedelta
SKILL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "skills", "feishu-feedback-sync", "scripts")
sys.path.insert(0, SKILL_DIR)
from sync_feishu_feedback import (
sort_threads, get_tenant_token, content_similarity,
DISPATCH_CHAT_ID, DISPATCH_CRED_DIR, P0_NOTIFY_USERS,
MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASS, MYSQL_DB,
)
from priority_classifier import compute_final_priority
# === 微信专用配置 ===
STATE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "tmp", "p0_dispatched_state_wechat.json")
LOOKBACK_MINUTES = 120
CLUSTER_MIN_SIZE = 2
def load_dispatched_state():
try:
with open(STATE_FILE, "r") as f:
state = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
state = {}
cutoff = (datetime.now() - timedelta(hours=24)).isoformat()
# 兼容新旧格式:新格式 value 是 {"time": ..., "fp": ...},旧格式是纯时间字符串
cleaned = {}
for k, v in state.items():
ts = v if isinstance(v, str) else v.get("time", "")
if ts > cutoff:
cleaned[k] = v
return cleaned
def save_dispatched_state(state):
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
tmp = STATE_FILE + ".tmp"
with open(tmp, "w") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
os.rename(tmp, STATE_FILE)
def cluster_signature(cluster_msgs):
ids = sorted(str(m[0]) for m in cluster_msgs)
return hashlib.md5(",".join(ids).encode()).hexdigest()
def cluster_content_fingerprint(cluster_msgs):
"""生成基于内容语义的簇指纹用于跨扫描去重不依赖消息ID集合"""
# 拼接簇内所有有意义的消息内容(跳过纯图片/文件/表情)
all_contents = []
for m in cluster_msgs:
c = str(m[3]).strip() if m[3] else ""
if c and len(c) > 8:
all_contents.append(c[:300])
# 取前5条聚合保证核心问题描述稳定
aggregated = " | ".join(all_contents[:5])
# 提取发送人集合(排序保证一致性)
senders = sorted(set(m[1] for m in cluster_msgs if m[1]))
# 提取小时粒度的时间窗口
times = [m[6] for m in cluster_msgs if m[6]]
hour = times[0][:13] if times else "unknown"
return {
"content": aggregated,
"senders": senders,
"hour": hour,
"msg_count": len(cluster_msgs),
}
def is_duplicate_p0(new_fp, dispatched_entries):
"""
基于内容语义判断新 P0 是否与已推送 P0 重复。
dispatched_entries: {sig: {"time": str, "fp": dict}}
"""
for entry in dispatched_entries.values():
old_fp = entry.get("fp")
if not old_fp:
continue
same_hour = new_fp["hour"] == old_fp["hour"]
sender_overlap = len(set(new_fp["senders"]) & set(old_fp["senders"]))
# 条件1: 同一小时 + 发送人有交集 + 内容相似度 > 0.20(聚合内容稳定,宽松阈值足够区分)
if same_hour and sender_overlap >= 1:
sim = content_similarity(new_fp["content"], old_fp["content"])
if sim > 0.20:
return True
# 条件2: 发送人高度重叠 + 内容相似度 > 0.35(跨小时场景)
if sender_overlap >= 2:
sim = content_similarity(new_fp["content"], old_fp["content"])
if sim > 0.35:
return True
return False
def is_probably_p0(cluster_msgs):
if len(cluster_msgs) < CLUSTER_MIN_SIZE:
return False, None
info = compute_final_priority(cluster_msgs)
return info["priority"] == "P0", info
# 内部技术讨论特征词(用于检测团队内部讨论而非用户反馈)
_INTERNAL_DISCUSSION_PATTERNS = [
# 技术术语
r'hotfix', r'\w+_hf\b', r'分支', r'打包', r'构建', r'部署', r'预发布',
r'测试环境', r'灰度', r'发版', r'上线', r'回滚', r'版本号',
r'master\b', r'develop\b', r'release\b',
# 讨论话术(否定/反问/建议)
r'^(?:不是|不对|不,)', r'我觉得', r'应该是', r'你们(?:用|试|改)',
r'想不改除非', r'需要再改回来', r'是一样的', r'除非你们',
r'这周要更新', r'更新之前', r'改回来',
# 技术操作描述
r'hotfix打的包', r'分支的内容', r'测试后面的关卡',
]
_INTERNAL_DISCUSSION_RE = re.compile('|'.join(_INTERNAL_DISCUSSION_PATTERNS), re.IGNORECASE)
def _is_internal_discussion(text):
"""判断消息是否为团队内部技术讨论,而非用户反馈。"""
if not text or len(text) < 5:
return False
# 检查是否匹配内部讨论特征
if _INTERNAL_DISCUSSION_RE.search(text):
return True
# 消息长度超过 80 字且包含多个技术/讨论特征 → 很可能是内部讨论
if len(text) > 80:
tech_count = len(re.findall(r'(?:hotfix|分支|打包|构建|部署|测试|版本|上线|发版|回滚|灰度|预发布)', text, re.IGNORECASE))
discuss_count = len(re.findall(r'(?:不是|不对|我觉得|应该是|你们|我们|改回来|除非|一样的)', text))
if tech_count >= 2 or discuss_count >= 2 or (tech_count + discuss_count) >= 3:
return True
return False
def _extract_problem_from_discussion(text, cluster_msgs=None):
"""从内部技术讨论中提取核心问题描述。
尝试从讨论中识别用户实际遇到的问题,而非团队的技术分析。
无法提取时,归纳讨论主题作为参考。
"""
# 先尝试从簇中找非内部讨论的消息
if cluster_msgs:
for m in cluster_msgs:
t = str(m[3]).strip() if m[3] else ""
if t and len(t) > 3 and not _is_internal_discussion(t):
return _clean_summary(t)
# 从讨论文本中提取问题关键词
problem_indicators = [
(r'(?:用户|玩家|有人).{0,10}(?:反馈|说|遇到|出现|发现).{0,30}(?:闪退|崩溃|卡死|卡顿|白屏|黑屏|无法|不能|打不开|进不去|报错|异常)', ''),
(r'(?:闪退|崩溃|卡死|卡顿|白屏|黑屏|无法登录|进不去|打不开|报错|异常|bug|BUG)', ''),
]
for pat, _ in problem_indicators:
m = re.search(pat, text, re.IGNORECASE)
if m:
extracted = m.group(0)
if len(extracted) > 5:
return f"用户反馈{extracted},需确认影响范围"
# 无法提取具体问题 → 归纳讨论主题
topic_map = [
(r'关卡|解锁|单元|U\d+', '关卡/解锁'),
(r'打包|构建|hotfix|分支|发版|上线|更新', '打包/发版'),
(r'测试|灰度|预发布|交叉测', '测试/灰度'),
(r'配置|后台|服务端|接口', '配置/后台'),
(r'闪退|崩溃|卡死|卡顿|白屏|黑屏', '崩溃/异常'),
(r'加载|转圈|进不去|打不开|无法', '加载/访问'),
]
topics = []
for pat, label in topic_map:
if re.search(pat, text, re.IGNORECASE):
topics.append(label)
topic_str = '/'.join(topics[:3]) if topics else '相关'
return f"团队内部讨论{topic_str}问题,需人工确认具体用户反馈内容"
def _clean_summary(text):
"""清洗摘要文本,提取核心问题描述(处理转发消息、内部讨论等噪音)。"""
# 去掉 [聊天记录] 等转发标记
text = re.sub(r'^\[聊天记录\]\s*', '', text)
# 去掉完整XML块
text = re.sub(r'<msg>.*?</msg>', '', text, flags=re.DOTALL)
# 去掉所有XML标签含不完整的
text = re.sub(r'</?[a-zA-Z][a-zA-Z0-9]*(?:\s[^>]*)?>?', '', text)
# 去掉残留的XML属性片段
text = re.sub(r'\b[a-zA-Z]+="[^"]*"', '', text)
# 截断 "↳ 回复 xxx:" 及之后的所有内容(内部讨论引用,非用户原始反馈)
text = re.split(r'\s*回复', text)[0].strip()
# 去掉媒体标记
text = re.sub(r'\[视频\]|\[图片\]|\[语音\]|\[文件\]|\[表情\]', '', text)
# 拆分发送人标记emoji/符号 + 名字 + emoji/符号 + :),取最后一条用户消息
sender_pat = r'(?:[^\u4e00-\u9fff\w\d\s]+\s*)+[\u4e00-\u9fff\s]{1,20}\s*(?:[^\u4e00-\u9fff\w\d\s]+\s*)*:\s*'
parts = re.split(sender_pat, text)
meaningful = [p.strip() for p in parts if p.strip() and len(p.strip()) > 3]
if meaningful:
text = meaningful[-1]
# 去掉手机号/用户ID
text = re.sub(r'(?:^|(?<=[^\d]))1[3-9]\d{9}(?=[^\d]|$)', '', text)
# 去掉话术后缀
text = re.sub(r'[,]?\s*(老师|辛苦|麻烦|帮忙)\s*(看下|看一下|看看|看)[。!!]*$', '', text)
text = re.sub(r'[,]?\s*@\S+\s*', '', text)
# 去掉残留的数字+XML碎片
text = re.sub(r'\d+[a-zA-Z<>/]+$', '', text)
# 清理多余空格和标点
text = re.sub(r'\s+', ' ', text).strip()
text = re.sub(r'^[,\s]+|[,\s]+$', '', text)
# === 内部讨论话术改写:疑问句 → 陈述句 ===
# 去掉"这个反馈可以跟用户确认下..."等讨论话术前缀
text = re.sub(r'^(?:这个|该)?反馈可以(?:跟|和)用户确认下?', '', text)
text = re.sub(r'^(?:这个|该)?问题可以(?:跟|和)用户确认下?', '', text)
text = re.sub(r'^(?:这个|该)?(?:反馈|问题)?(?:可以)?(?:跟|和)用户确认下?', '', text)
text = re.sub(r'是在进行什么操作时', '', text)
text = re.sub(r'具体是什么操作时', '', text)
text = re.sub(r'是在什么情况下', '', text)
# 去掉句末疑问词
text = re.sub(r'[?!。.]+$', '', text)
text = re.sub(r'[吗呢吧啊呀]$', '', text)
# 碎片化症状词 → 补全为陈述句
symptom_map = {
r'^闪退的?$': '用户反馈闪退,需确认操作场景',
r'^崩溃的?$': '用户反馈崩溃,需确认操作场景',
r'^卡退的?$': '用户反馈卡退,需确认操作场景',
r'^卡死的?$': '用户反馈卡死,需确认操作场景',
r'^打不开的?$': '用户反馈打不开,需确认操作场景',
}
for pat, repl in symptom_map.items():
if re.match(pat, text):
text = repl
break
# 再次清理
text = re.sub(r'\s+', ' ', text).strip()
text = re.sub(r'^[,\s]+|[,\s]+$', '', text)
return text
def _pick_best_summary(cluster_msgs):
"""从簇中选出最能代表 P0 问题的摘要消息。
优先选择匹配 P0 关键词且非转发/非内部讨论的消息。
内部技术讨论消息会被跳过,优先使用用户原始反馈。"""
from priority_classifier import P0_KEYWORDS
# 收集所有 P0 关键词正则
p0_patterns = []
for cat_pats in P0_KEYWORDS.values():
p0_patterns.append(cat_pats)
combined_p0 = re.compile('|'.join(p0_patterns), re.IGNORECASE)
# 判断是否为转发消息(包含 [聊天记录]、↳ 回复、XML 标记)
def _is_forward(t):
return bool(re.search(r'^\[聊天记录\]|↳\s*回复|<msg>|<appmsg', t))
best = None
best_noise = None # 兜底:第一条非空消息(即使有噪音)
all_internal = True # 是否所有消息都是内部讨论
for m in cluster_msgs:
t = str(m[3]).strip() if m[3] else ""
if not t or len(t) <= 3:
continue
if best_noise is None:
best_noise = t
# 跳过转发消息
if _is_forward(t):
continue
# 跳过内部技术讨论
if _is_internal_discussion(t):
continue
all_internal = False
if best is None:
best = t
if combined_p0.search(t):
return _clean_summary(t)[:150]
# 如果没有非转发/非内部讨论的消息,从有噪音的消息中找 P0 关键词
if best is None:
for m in cluster_msgs:
t = str(m[3]).strip() if m[3] else ""
if not t or len(t) <= 3:
continue
if combined_p0.search(t):
return _clean_summary(t)[:150]
best = best_noise
# 如果所有消息都是内部讨论,尝试从中提取问题描述
if all_internal and best:
extracted = _extract_problem_from_discussion(best, cluster_msgs)
if extracted:
return extracted[:150]
return _clean_summary(best or "")[:150]
def generate_p0_alert_text(cluster_msgs, priority_info):
root_sender = cluster_msgs[0][1]
root_time = cluster_msgs[0][6]
root_text = _pick_best_summary(cluster_msgs)
return "\n".join([
f"🚨 微信 P0 实时告警",
f"",
f"问题描述: {root_text}",
f"报告人: {root_sender}",
f"报告时间: {root_time}",
f"判定依据: {priority_info.get('reasoning', 'P0')}",
f"修复时限: {priority_info.get('deadline', '2小时内')}",
])
def dispatch_p0_alert(alert_text):
import urllib.request
token = get_tenant_token(cred_dir=DISPATCH_CRED_DIR)
content_parts = []
for line in alert_text.split("\n"):
if line.strip():
content_parts.append([{"tag": "text", "text": line + "\n"}])
if P0_NOTIFY_USERS:
at_line = [{"tag": "text", "text": "\n⚠️ 请关注: "}]
for uid in P0_NOTIFY_USERS:
at_line.append({"tag": "at", "user_id": uid})
at_line.append({"tag": "text", "text": " "})
content_parts.append(at_line)
post_content = json.dumps({
"zh_cn": {
"title": "🚨 微信 P0 问题实时告警",
"content": content_parts
}
}, ensure_ascii=False)
body = json.dumps({
"receive_id": DISPATCH_CHAT_ID,
"msg_type": "post",
"content": post_content
}, ensure_ascii=False).encode()
req = urllib.request.Request(
"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id",
data=body,
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
method="POST"
)
resp = urllib.request.urlopen(req, timeout=10)
d = json.loads(resp.read())
if d.get("code") == 0:
return True
else:
print(f" ⚠️ 实时分发失败: {d.get('msg', '')[:100]}")
return False
def should_clear_state():
now = datetime.now()
return now.hour == 10 and now.minute <= 1
def main():
parser = argparse.ArgumentParser(description="微信 P0 问题实时检测与分发")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--lookback-minutes", type=int, default=LOOKBACK_MINUTES)
args = parser.parse_args()
if should_clear_state():
print("[P0-wechat] 10:00 清空去重状态")
save_dispatched_state({})
print("[P0-wechat] 全量分发时段,跳过实时检测")
return
print(f"[P0-wechat] 扫描最近 {args.lookback_minutes} 分钟微信消息...")
lookback_start = (datetime.now() - timedelta(minutes=args.lookback_minutes)).strftime("%Y-%m-%d %H:%M:%S")
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn = pymysql.connect(
host=MYSQL_HOST, port=MYSQL_PORT,
user=MYSQL_USER, password=MYSQL_PASS,
database=MYSQL_DB, charset="utf8mb4"
)
cursor = conn.cursor()
cursor.execute("""
SELECT svr_msg_id, sender_name, msg_type, content, media_url, refer_msg_svrid,
DATE_FORMAT(msg_time, '%%Y-%%m-%%d %%H:%%i:%%s') as msg_time, msg_timestamp
FROM wechat_group_message
WHERE msg_time >= %s AND msg_time <= %s
ORDER BY msg_time ASC
""", (lookback_start, now_str))
raw_rows = cursor.fetchall()
conn.close()
# 映射为统一元组格式 (message_id, sender_name, msg_type, content, media_url, quote_message_id, msg_time, msg_timestamp)
rows = []
for row in raw_rows:
svr_id, sname, mtype, content, murl, ref_id, mtime, mts = row
rows.append((
str(svr_id) if svr_id else "",
sname or "",
mtype or "text",
content or "",
murl or "",
str(ref_id) if ref_id else "",
mtime or "",
int(mts) if mts else 0,
))
print(f"[P0-wechat] 查询到 {len(rows)} 条微信消息")
if len(rows) < 2:
print("[P0-wechat] 消息不足,退出")
return
sorted_msgs, clusters, cluster_order = sort_threads(rows)
print(f"[P0-wechat] 聚类完成:{len(clusters)} 个簇")
state = load_dispatched_state()
print(f"[P0-wechat] 已记录 {len(state)} 个已推送簇")
new_p0_count = 0
for cid in cluster_order:
cmsgs = clusters[cid]
is_p0, info = is_probably_p0(cmsgs)
if not is_p0:
continue
sig = cluster_signature(cmsgs)
if sig in state:
print(f"[P0-wechat] 已推送过(精确匹配),跳过: sig={sig[:8]}...")
continue
# 内容语义去重
fp = cluster_content_fingerprint(cmsgs)
if is_duplicate_p0(fp, state):
print(f"[P0-wechat] 已推送过(内容匹配),跳过: senders={fp['senders'][:2]}... hour={fp['hour']}")
continue
print(f"[P0-wechat] 🚨 发现新 P0! sig={sig[:8]}... {len(cmsgs)}条消息")
# 如果簇中所有消息都是内部技术讨论,跳过不推送
all_internal = True
for m in cmsgs:
t = str(m[3]).strip() if m[3] else ""
if t and len(t) > 3 and not _is_internal_discussion(t):
all_internal = False
break
if all_internal:
print(f"[P0-wechat] ⏭️ 全部为内部技术讨论,跳过推送")
state[sig] = {"time": datetime.now().isoformat(), "fp": fp}
continue
if args.dry_run:
alert = generate_p0_alert_text(cmsgs, info)
print(f"[DRY-RUN] 将发送:\n{alert}")
state[sig] = {"time": datetime.now().isoformat(), "fp": fp}
new_p0_count += 1
else:
alert = generate_p0_alert_text(cmsgs, info)
if dispatch_p0_alert(alert):
print(f"[P0-wechat] ✅ P0 已实时推送")
state[sig] = {"time": datetime.now().isoformat(), "fp": fp}
new_p0_count += 1
else:
print(f"[P0-wechat] ❌ 推送失败")
if new_p0_count > 0:
save_dispatched_state(state)
print(f"[P0-wechat] 共推送 {new_p0_count} 个新 P0")
print("[P0-wechat] 完成")
if __name__ == "__main__":
main()