ai_member_xiaokui/scripts/detect_p0_wechat.py
2026-05-23 08:10:01 +08:00

242 lines
7.8 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
微信 P0 问题实时检测与分发
功能:
1. 从 MySQL 读取最近一段时间的微信用户火线救火群消息
2. 复用 sync_feishu_feedback.py 的聚类 + 优先级判定逻辑
3. 过滤已推送过的 P0 簇(去重)
4. 仅推送新增 P0 到「小葵小葵」群
设计:
- 每分钟由 crontab 调用一次
- 查询最近 2 小时的消息,确保聚类质量
- 用「簇签名」sorted message_ids做去重
- 每天 10:00 清空去重状态(与全量分发错开)
用法:
python3 detect_p0_wechat.py [--dry-run] [--lookback-minutes 120]
"""
import sys, os, json, hashlib, argparse, pymysql
from datetime import datetime, timedelta
SKILL_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "skills", "feishu-feedback-sync", "scripts")
sys.path.insert(0, SKILL_DIR)
from sync_feishu_feedback import (
sort_threads, get_tenant_token,
DISPATCH_CHAT_ID, DISPATCH_CRED_DIR, P0_NOTIFY_USERS,
MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASS, MYSQL_DB,
)
from priority_classifier import compute_final_priority
# === 微信专用配置 ===
STATE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "tmp", "p0_dispatched_state_wechat.json")
LOOKBACK_MINUTES = 120
CLUSTER_MIN_SIZE = 2
def load_dispatched_state():
try:
with open(STATE_FILE, "r") as f:
state = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
state = {}
cutoff = (datetime.now() - timedelta(hours=24)).isoformat()
return {k: v for k, v in state.items() if v > cutoff}
def save_dispatched_state(state):
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
tmp = STATE_FILE + ".tmp"
with open(tmp, "w") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
os.rename(tmp, STATE_FILE)
def cluster_signature(cluster_msgs):
ids = sorted(str(m[0]) for m in cluster_msgs)
return hashlib.md5(",".join(ids).encode()).hexdigest()
def is_probably_p0(cluster_msgs):
if len(cluster_msgs) < CLUSTER_MIN_SIZE:
return False, None
info = compute_final_priority(cluster_msgs)
return info["priority"] == "P0", info
def generate_p0_alert_text(cluster_msgs, priority_info):
root_sender = cluster_msgs[0][1]
root_time = cluster_msgs[0][6]
root_text = ""
for m in cluster_msgs:
t = str(m[3]) if m[3] else ""
t = t.strip()
if t and len(t) > 3:
root_text = t[:100]
break
senders = list(dict.fromkeys(m[1] for m in cluster_msgs))
return "\n".join([
f"🚨 微信 P0 实时告警",
f"",
f"**报告人:** {root_sender}",
f"**时间:** {root_time}",
f"**涉及人员:** {''.join(senders[:5])}" + ("" if len(senders) > 5 else ""),
f"**消息数:** {len(cluster_msgs)}",
f"",
f"**摘要:** {root_text}",
f"",
f"**判定依据:** {priority_info.get('reasoning', 'P0')}",
f"**修复时限:** {priority_info.get('deadline', '2小时内')}",
])
def dispatch_p0_alert(alert_text):
import urllib.request
token = get_tenant_token(cred_dir=DISPATCH_CRED_DIR)
content_parts = []
for line in alert_text.split("\n"):
if line.strip():
content_parts.append([{"tag": "text", "text": line + "\n"}])
if P0_NOTIFY_USERS:
at_line = [{"tag": "text", "text": "\n⚠️ 请关注: "}]
for uid in P0_NOTIFY_USERS:
at_line.append({"tag": "at", "user_id": uid})
at_line.append({"tag": "text", "text": " "})
content_parts.append(at_line)
post_content = json.dumps({
"zh_cn": {
"title": "🚨 微信 P0 问题实时告警",
"content": content_parts
}
}, ensure_ascii=False)
body = json.dumps({
"receive_id": DISPATCH_CHAT_ID,
"msg_type": "post",
"content": post_content
}, ensure_ascii=False).encode()
req = urllib.request.Request(
"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id",
data=body,
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
method="POST"
)
resp = urllib.request.urlopen(req, timeout=10)
d = json.loads(resp.read())
if d.get("code") == 0:
return True
else:
print(f" ⚠️ 实时分发失败: {d.get('msg', '')[:100]}")
return False
def should_clear_state():
now = datetime.now()
return now.hour == 10 and now.minute <= 1
def main():
parser = argparse.ArgumentParser(description="微信 P0 问题实时检测与分发")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--lookback-minutes", type=int, default=LOOKBACK_MINUTES)
args = parser.parse_args()
if should_clear_state():
print("[P0-wechat] 10:00 清空去重状态")
save_dispatched_state({})
print("[P0-wechat] 全量分发时段,跳过实时检测")
return
print(f"[P0-wechat] 扫描最近 {args.lookback_minutes} 分钟微信消息...")
lookback_start = (datetime.now() - timedelta(minutes=args.lookback_minutes)).strftime("%Y-%m-%d %H:%M:%S")
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn = pymysql.connect(
host=MYSQL_HOST, port=MYSQL_PORT,
user=MYSQL_USER, password=MYSQL_PASS,
database=MYSQL_DB, charset="utf8mb4"
)
cursor = conn.cursor()
cursor.execute("""
SELECT svr_msg_id, sender_name, msg_type, content, media_url, refer_msg_svrid,
DATE_FORMAT(msg_time, '%%Y-%%m-%%d %%H:%%i:%%s') as msg_time, msg_timestamp
FROM wechat_group_message
WHERE msg_time >= %s AND msg_time <= %s
ORDER BY msg_time ASC
""", (lookback_start, now_str))
raw_rows = cursor.fetchall()
conn.close()
# 映射为统一元组格式 (message_id, sender_name, msg_type, content, media_url, quote_message_id, msg_time, msg_timestamp)
rows = []
for row in raw_rows:
svr_id, sname, mtype, content, murl, ref_id, mtime, mts = row
rows.append((
str(svr_id) if svr_id else "",
sname or "",
mtype or "text",
content or "",
murl or "",
str(ref_id) if ref_id else "",
mtime or "",
int(mts) if mts else 0,
))
print(f"[P0-wechat] 查询到 {len(rows)} 条微信消息")
if len(rows) < 2:
print("[P0-wechat] 消息不足,退出")
return
sorted_msgs, clusters, cluster_order = sort_threads(rows)
print(f"[P0-wechat] 聚类完成:{len(clusters)} 个簇")
state = load_dispatched_state()
print(f"[P0-wechat] 已记录 {len(state)} 个已推送簇签名")
new_p0_count = 0
for cid in cluster_order:
cmsgs = clusters[cid]
is_p0, info = is_probably_p0(cmsgs)
if not is_p0:
continue
sig = cluster_signature(cmsgs)
if sig in state:
print(f"[P0-wechat] 已推送过,跳过: sig={sig[:8]}...")
continue
print(f"[P0-wechat] 🚨 发现新 P0! sig={sig[:8]}... {len(cmsgs)}条消息")
if args.dry_run:
alert = generate_p0_alert_text(cmsgs, info)
print(f"[DRY-RUN] 将发送:\n{alert}")
state[sig] = datetime.now().isoformat()
new_p0_count += 1
else:
alert = generate_p0_alert_text(cmsgs, info)
if dispatch_p0_alert(alert):
print(f"[P0-wechat] ✅ P0 已实时推送")
state[sig] = datetime.now().isoformat()
new_p0_count += 1
else:
print(f"[P0-wechat] ❌ 推送失败")
if new_p0_count > 0:
save_dispatched_state(state)
print(f"[P0-wechat] 共推送 {new_p0_count} 个新 P0")
print("[P0-wechat] 完成")
if __name__ == "__main__":
main()