#!/usr/bin/env python3
"""
微信 P0 问题实时检测与分发
功能:
1. 从 MySQL 读取最近一段时间的微信用户火线救火群消息
2. 复用 sync_feishu_feedback.py 的聚类 + 优先级判定逻辑
3. 过滤已推送过的 P0 簇(去重)
4. 仅推送新增 P0 到「小葵小葵」群
设计:
- 每分钟由 crontab 调用一次
- 查询最近 2 小时的消息,确保聚类质量
- 用「簇签名」(sorted message_ids)做去重
- 每天 10:00 清空去重状态(与全量分发错开)
用法:
python3 detect_p0_wechat.py [--dry-run] [--lookback-minutes 120]
"""
import sys, os, re, json, hashlib, argparse, pymysql
from datetime import datetime, timedelta
SKILL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "skills", "feishu-feedback-sync", "scripts")
sys.path.insert(0, SKILL_DIR)
from sync_feishu_feedback import (
sort_threads, get_tenant_token, content_similarity,
DISPATCH_CHAT_ID, DISPATCH_CRED_DIR, P0_NOTIFY_USERS,
MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASS, MYSQL_DB,
)
from priority_classifier import compute_final_priority
# === 微信专用配置 ===
STATE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "tmp", "p0_dispatched_state_wechat.json")
LOOKBACK_MINUTES = 120
CLUSTER_MIN_SIZE = 2
def load_dispatched_state():
try:
with open(STATE_FILE, "r") as f:
state = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
state = {}
cutoff = (datetime.now() - timedelta(hours=24)).isoformat()
# 兼容新旧格式:新格式 value 是 {"time": ..., "fp": ...},旧格式是纯时间字符串
cleaned = {}
for k, v in state.items():
ts = v if isinstance(v, str) else v.get("time", "")
if ts > cutoff:
cleaned[k] = v
return cleaned
def save_dispatched_state(state):
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
tmp = STATE_FILE + ".tmp"
with open(tmp, "w") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
os.rename(tmp, STATE_FILE)
def cluster_signature(cluster_msgs):
ids = sorted(str(m[0]) for m in cluster_msgs)
return hashlib.md5(",".join(ids).encode()).hexdigest()
def cluster_content_fingerprint(cluster_msgs):
"""生成基于内容语义的簇指纹,用于跨扫描去重(不依赖消息ID集合)"""
# 拼接簇内所有有意义的消息内容(跳过纯图片/文件/表情)
all_contents = []
for m in cluster_msgs:
c = str(m[3]).strip() if m[3] else ""
if c and len(c) > 8:
all_contents.append(c[:300])
# 取前5条聚合,保证核心问题描述稳定
aggregated = " | ".join(all_contents[:5])
# 提取发送人集合(排序保证一致性)
senders = sorted(set(m[1] for m in cluster_msgs if m[1]))
# 提取小时粒度的时间窗口
times = [m[6] for m in cluster_msgs if m[6]]
hour = times[0][:13] if times else "unknown"
return {
"content": aggregated,
"senders": senders,
"hour": hour,
"msg_count": len(cluster_msgs),
}
def is_duplicate_p0(new_fp, dispatched_entries):
"""
基于内容语义判断新 P0 是否与已推送 P0 重复。
dispatched_entries: {sig: {"time": str, "fp": dict}}
"""
for entry in dispatched_entries.values():
old_fp = entry.get("fp")
if not old_fp:
continue
same_hour = new_fp["hour"] == old_fp["hour"]
sender_overlap = len(set(new_fp["senders"]) & set(old_fp["senders"]))
# 条件1: 同一小时 + 发送人有交集 + 内容相似度 > 0.20(聚合内容稳定,宽松阈值足够区分)
if same_hour and sender_overlap >= 1:
sim = content_similarity(new_fp["content"], old_fp["content"])
if sim > 0.20:
return True
# 条件2: 发送人高度重叠 + 内容相似度 > 0.35(跨小时场景)
if sender_overlap >= 2:
sim = content_similarity(new_fp["content"], old_fp["content"])
if sim > 0.35:
return True
return False
def is_probably_p0(cluster_msgs):
if len(cluster_msgs) < CLUSTER_MIN_SIZE:
return False, None
info = compute_final_priority(cluster_msgs)
return info["priority"] == "P0", info
# 内部技术讨论特征词(用于检测团队内部讨论而非用户反馈)
_INTERNAL_DISCUSSION_PATTERNS = [
# 技术术语
r'hotfix', r'\w+_hf\b', r'分支', r'打包', r'构建', r'部署', r'预发布',
r'测试环境', r'灰度', r'发版', r'上线', r'回滚', r'版本号',
r'master\b', r'develop\b', r'release\b',
# 讨论话术(否定/反问/建议)
r'^(?:不是|不对|不,)', r'我觉得', r'应该是', r'你们(?:用|试|改)',
r'想不改除非', r'需要再改回来', r'是一样的', r'除非你们',
r'这周要更新', r'更新之前', r'改回来',
# 技术操作描述
r'hotfix打的包', r'分支的内容', r'测试后面的关卡',
]
_INTERNAL_DISCUSSION_RE = re.compile('|'.join(_INTERNAL_DISCUSSION_PATTERNS), re.IGNORECASE)
def _is_internal_discussion(text):
"""判断消息是否为团队内部技术讨论,而非用户反馈。"""
if not text or len(text) < 5:
return False
# 检查是否匹配内部讨论特征
if _INTERNAL_DISCUSSION_RE.search(text):
return True
# 消息长度超过 80 字且包含多个技术/讨论特征 → 很可能是内部讨论
if len(text) > 80:
tech_count = len(re.findall(r'(?:hotfix|分支|打包|构建|部署|测试|版本|上线|发版|回滚|灰度|预发布)', text, re.IGNORECASE))
discuss_count = len(re.findall(r'(?:不是|不对|我觉得|应该是|你们|我们|改回来|除非|一样的)', text))
if tech_count >= 2 or discuss_count >= 2 or (tech_count + discuss_count) >= 3:
return True
return False
def _extract_problem_from_discussion(text, cluster_msgs=None):
"""从内部技术讨论中提取核心问题描述。
尝试从讨论中识别用户实际遇到的问题,而非团队的技术分析。
无法提取时,归纳讨论主题作为参考。
"""
# 先尝试从簇中找非内部讨论的消息
if cluster_msgs:
for m in cluster_msgs:
t = str(m[3]).strip() if m[3] else ""
if t and len(t) > 3 and not _is_internal_discussion(t):
return _clean_summary(t)
# 从讨论文本中提取问题关键词
problem_indicators = [
(r'(?:用户|玩家|有人).{0,10}(?:反馈|说|遇到|出现|发现).{0,30}(?:闪退|崩溃|卡死|卡顿|白屏|黑屏|无法|不能|打不开|进不去|报错|异常)', ''),
(r'(?:闪退|崩溃|卡死|卡顿|白屏|黑屏|无法登录|进不去|打不开|报错|异常|bug|BUG)', ''),
]
for pat, _ in problem_indicators:
m = re.search(pat, text, re.IGNORECASE)
if m:
extracted = m.group(0)
if len(extracted) > 5:
return f"用户反馈{extracted},需确认影响范围"
# 无法提取具体问题 → 归纳讨论主题
topic_map = [
(r'关卡|解锁|单元|U\d+', '关卡/解锁'),
(r'打包|构建|hotfix|分支|发版|上线|更新', '打包/发版'),
(r'测试|灰度|预发布|交叉测', '测试/灰度'),
(r'配置|后台|服务端|接口', '配置/后台'),
(r'闪退|崩溃|卡死|卡顿|白屏|黑屏', '崩溃/异常'),
(r'加载|转圈|进不去|打不开|无法', '加载/访问'),
]
topics = []
for pat, label in topic_map:
if re.search(pat, text, re.IGNORECASE):
topics.append(label)
topic_str = '/'.join(topics[:3]) if topics else '相关'
return f"团队内部讨论{topic_str}问题,需人工确认具体用户反馈内容"
def _clean_summary(text):
"""清洗摘要文本,提取核心问题描述(处理转发消息、内部讨论等噪音)。"""
# 去掉 [聊天记录] 等转发标记
text = re.sub(r'^\[聊天记录\]\s*', '', text)
# 去掉完整XML块
text = re.sub(r'.*?', '', text, flags=re.DOTALL)
# 去掉所有XML标签(含不完整的)
text = re.sub(r'?[a-zA-Z][a-zA-Z0-9]*(?:\s[^>]*)?>?', '', text)
# 去掉残留的XML属性片段
text = re.sub(r'\b[a-zA-Z]+="[^"]*"', '', text)
# 截断 "↳ 回复 xxx:" 及之后的所有内容(内部讨论引用,非用户原始反馈)
text = re.split(r'↳\s*回复', text)[0].strip()
# 去掉媒体标记
text = re.sub(r'\[视频\]|\[图片\]|\[语音\]|\[文件\]|\[表情\]', '', text)
# 拆分发送人标记(emoji/符号 + 名字 + emoji/符号 + :),取最后一条用户消息
sender_pat = r'(?:[^\u4e00-\u9fff\w\d\s]+\s*)+[\u4e00-\u9fff\s]{1,20}\s*(?:[^\u4e00-\u9fff\w\d\s]+\s*)*:\s*'
parts = re.split(sender_pat, text)
meaningful = [p.strip() for p in parts if p.strip() and len(p.strip()) > 3]
if meaningful:
text = meaningful[-1]
# 去掉手机号/用户ID
text = re.sub(r'(?:^|(?<=[^\d]))1[3-9]\d{9}(?=[^\d]|$)', '', text)
# 去掉话术后缀
text = re.sub(r'[,,]?\s*(老师|辛苦|麻烦|帮忙)\s*(看下|看一下|看看|看)[。!!]*$', '', text)
text = re.sub(r'[,,]?\s*@\S+\s*', '', text)
# 去掉残留的数字+XML碎片
text = re.sub(r'\d+[a-zA-Z<>/]+$', '', text)
# 清理多余空格和标点
text = re.sub(r'\s+', ' ', text).strip()
text = re.sub(r'^[,,\s]+|[,,\s]+$', '', text)
# === 内部讨论话术改写:疑问句 → 陈述句 ===
# 去掉"这个反馈可以跟用户确认下..."等讨论话术前缀
text = re.sub(r'^(?:这个|该)?反馈可以(?:跟|和)用户确认下?', '', text)
text = re.sub(r'^(?:这个|该)?问题可以(?:跟|和)用户确认下?', '', text)
text = re.sub(r'^(?:这个|该)?(?:反馈|问题)?(?:可以)?(?:跟|和)用户确认下?', '', text)
text = re.sub(r'是在进行什么操作时', '', text)
text = re.sub(r'具体是什么操作时', '', text)
text = re.sub(r'是在什么情况下', '', text)
# 去掉句末疑问词
text = re.sub(r'[??!!。.]+$', '', text)
text = re.sub(r'[吗呢吧啊呀]$', '', text)
# 碎片化症状词 → 补全为陈述句
symptom_map = {
r'^闪退的?$': '用户反馈闪退,需确认操作场景',
r'^崩溃的?$': '用户反馈崩溃,需确认操作场景',
r'^卡退的?$': '用户反馈卡退,需确认操作场景',
r'^卡死的?$': '用户反馈卡死,需确认操作场景',
r'^打不开的?$': '用户反馈打不开,需确认操作场景',
}
for pat, repl in symptom_map.items():
if re.match(pat, text):
text = repl
break
# 再次清理
text = re.sub(r'\s+', ' ', text).strip()
text = re.sub(r'^[,,\s]+|[,,\s]+$', '', text)
return text
def _pick_best_summary(cluster_msgs):
"""从簇中选出最能代表 P0 问题的摘要消息。
优先选择匹配 P0 关键词且非转发/非内部讨论的消息。
内部技术讨论消息会被跳过,优先使用用户原始反馈。"""
from priority_classifier import P0_KEYWORDS
# 收集所有 P0 关键词正则
p0_patterns = []
for cat_pats in P0_KEYWORDS.values():
p0_patterns.append(cat_pats)
combined_p0 = re.compile('|'.join(p0_patterns), re.IGNORECASE)
# 判断是否为转发消息(包含 [聊天记录]、↳ 回复、XML 标记)
def _is_forward(t):
return bool(re.search(r'^\[聊天记录\]|↳\s*回复||= %s AND msg_time <= %s
ORDER BY msg_time ASC
""", (lookback_start, now_str))
raw_rows = cursor.fetchall()
conn.close()
# 映射为统一元组格式 (message_id, sender_name, msg_type, content, media_url, quote_message_id, msg_time, msg_timestamp)
rows = []
for row in raw_rows:
svr_id, sname, mtype, content, murl, ref_id, mtime, mts = row
rows.append((
str(svr_id) if svr_id else "",
sname or "",
mtype or "text",
content or "",
murl or "",
str(ref_id) if ref_id else "",
mtime or "",
int(mts) if mts else 0,
))
print(f"[P0-wechat] 查询到 {len(rows)} 条微信消息")
if len(rows) < 2:
print("[P0-wechat] 消息不足,退出")
return
sorted_msgs, clusters, cluster_order = sort_threads(rows)
print(f"[P0-wechat] 聚类完成:{len(clusters)} 个簇")
state = load_dispatched_state()
print(f"[P0-wechat] 已记录 {len(state)} 个已推送簇")
new_p0_count = 0
for cid in cluster_order:
cmsgs = clusters[cid]
is_p0, info = is_probably_p0(cmsgs)
if not is_p0:
continue
sig = cluster_signature(cmsgs)
if sig in state:
print(f"[P0-wechat] 已推送过(精确匹配),跳过: sig={sig[:8]}...")
continue
# 内容语义去重
fp = cluster_content_fingerprint(cmsgs)
if is_duplicate_p0(fp, state):
print(f"[P0-wechat] 已推送过(内容匹配),跳过: senders={fp['senders'][:2]}... hour={fp['hour']}")
continue
print(f"[P0-wechat] 🚨 发现新 P0! sig={sig[:8]}... {len(cmsgs)}条消息")
# 如果簇中所有消息都是内部技术讨论,跳过不推送
all_internal = True
for m in cmsgs:
t = str(m[3]).strip() if m[3] else ""
if t and len(t) > 3 and not _is_internal_discussion(t):
all_internal = False
break
if all_internal:
print(f"[P0-wechat] ⏭️ 全部为内部技术讨论,跳过推送")
state[sig] = {"time": datetime.now().isoformat(), "fp": fp}
continue
if args.dry_run:
alert = generate_p0_alert_text(cmsgs, info)
print(f"[DRY-RUN] 将发送:\n{alert}")
state[sig] = {"time": datetime.now().isoformat(), "fp": fp}
new_p0_count += 1
else:
alert = generate_p0_alert_text(cmsgs, info)
if dispatch_p0_alert(alert):
print(f"[P0-wechat] ✅ P0 已实时推送")
state[sig] = {"time": datetime.now().isoformat(), "fp": fp}
new_p0_count += 1
else:
print(f"[P0-wechat] ❌ 推送失败")
if new_p0_count > 0:
save_dispatched_state(state)
print(f"[P0-wechat] 共推送 {new_p0_count} 个新 P0")
print("[P0-wechat] 完成")
if __name__ == "__main__":
main()