ai_member_xiaoai/scripts/export_review_audio.py

394 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
导出指定角色的课程巩固数据 + 原始音频。
用法: python3 export_review_audio.py <角色ID1> [角色ID2] ...
python3 export_review_audio.py 23600 23686
"""
import re, json, sys, os, subprocess
from datetime import datetime
# ── 加载 .env ───────────────────────────────────────
def load_env():
env_path = os.path.expanduser("~/.hermes/.env")
with open(env_path) as f:
content = f.read()
def g(k):
m = re.search(rf"{k}=(.+)", content)
return m.group(1).strip() if m else None
return g
g = load_env()
# ── 参数 ────────────────────────────────────────────
if len(sys.argv) < 2:
print("用法: python3 export_review_audio.py <角色ID1> [角色ID2] ...")
sys.exit(1)
user_ids = [int(x) for x in sys.argv[1:]]
output_dir = os.path.expanduser("~/.hermes/workspace/output")
os.makedirs(output_dir, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
uid_str = "_".join(str(u) for u in user_ids)
output_path = f"{output_dir}/知识巩固_音频_{uid_str}_{ts}.xlsx"
print(f"导出角色: {user_ids}")
print(f"输出文件: {output_path}")
# ── 1. 查询 PG: 课程巩固记录 ───────────────────────
print("\n[1/3] 查询 PostgreSQL 课程巩固记录...")
import psycopg2
from psycopg2.extras import RealDictCursor
pg_conn = psycopg2.connect(
host=g("VALA_PG_ONLINE_HOST"), port=int(g("VALA_PG_ONLINE_PORT")),
user=g("VALA_PG_ONLINE_USER"), password=g("VALA_PG_ONLINE_PASSWORD"),
dbname=g("VALA_PG_ONLINE_DB"), connect_timeout=10,
)
with pg_conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT user_id, story_id, chapter_id, unique_id,
score, score_text, sp_value, exp, level,
question_list, play_time, created_at, updated_at
FROM user_unit_review_question_result
WHERE user_id = ANY(%s) AND deleted_at IS NULL
ORDER BY user_id, updated_at DESC
""", (user_ids,))
review_rows = cur.fetchall()
# Parse question_list JSON — explode each question into its own row
expanded_rows = [] # one row per question
for row in review_rows:
ql = row["question_list"]
if isinstance(ql, str):
try:
ql = json.loads(ql)
except:
pass
base = {
"角色ID": row["user_id"],
"Level": row["level"],
"Story ID": row["story_id"],
"Chapter ID": row["chapter_id"],
"Unique ID": row["unique_id"],
"更新时间": str(row["updated_at"]),
}
if isinstance(ql, list):
for item in ql:
if isinstance(item, dict):
q = item.get("question", {})
qtype = q.get("type", "")
qtitle = q.get("title", "")
is_right = item.get("isRight")
status = "正确" if is_right is True else "错误"
audio_url = item.get("userAudio", "")
user_answer = item.get("userAnswer", "") or item.get("userAnswers", "")
if isinstance(user_answer, list):
user_answer = ", ".join(str(a) for a in user_answer)
expanded_rows.append({
**base,
"题目类型": qtype,
"题目内容": qtitle,
"题目对错": status,
"音频URL": audio_url,
"用户答案": user_answer,
})
row["question_count"] = len(ql) if isinstance(ql, list) else 0
pg_conn.close()
print(f" → 查询到 {len(review_rows)} 条课程巩固记录")
# ── 2. 查询 ES: 音频数据 ────────────────────────────
print("\n[2/3] 查询 Elasticsearch 音频数据...")
es_url = f"{g('VALA_ES_ONLINE_SCHEME')}://{g('VALA_ES_ONLINE_HOST')}:{g('VALA_ES_ONLINE_PORT')}"
auth = f"{g('VALA_ES_ONLINE_USER')}:{g('VALA_ES_ONLINE_PASSWORD')}"
audio_rows = []
scroll_id = None
page_size = 500
# First page
query = {
"query": {"terms": {"userId": user_ids}},
"sort": [{"timeInt": {"order": "desc"}}],
"size": page_size,
}
r = subprocess.run([
"curl", "-sk", "-u", auth,
"-H", "Content-Type: application/json",
"--connect-timeout", "10", "--max-time", "30",
"-X", "POST", "-d", json.dumps(query),
f"{es_url}/user-audio/_search?scroll=2m"
], capture_output=True, text=True, timeout=35)
resp = json.loads(r.stdout)
scroll_id = resp.get("_scroll_id")
total = resp.get("hits", {}).get("total", {}).get("value", 0)
print(f" → ES 总计 {total} 条音频记录,分批读取...")
hits = resp.get("hits", {}).get("hits", [])
for h in hits:
audio_rows.append(h["_source"])
# Scroll remaining
batch = 1
while len(audio_rows) < total:
r = subprocess.run([
"curl", "-sk", "-u", auth,
"-H", "Content-Type: application/json",
"--connect-timeout", "10", "--max-time", "30",
"-X", "POST", "-d", json.dumps({"scroll": "2m", "scroll_id": scroll_id}),
f"{es_url}/_search/scroll"
], capture_output=True, text=True, timeout=35)
resp = json.loads(r.stdout)
scroll_id = resp.get("_scroll_id")
hits = resp.get("hits", {}).get("hits", [])
if not hits:
break
for h in hits:
audio_rows.append(h["_source"])
batch += 1
print(f" → 批次 {batch}: 已读 {len(audio_rows)}/{total}")
# Clean up scroll
subprocess.run([
"curl", "-sk", "-u", auth, "--connect-timeout", "5",
"-X", "DELETE", "-d", json.dumps({"scroll_id": scroll_id}),
f"{es_url}/_search/scroll"
], capture_output=True, timeout=10)
print(f" → 共读取 {len(audio_rows)} 条音频记录")
# ── 3. 导出 Excel ────────────────────────────────────
print("\n[3/3] 生成 Excel...")
import pandas as pd
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.styles import Font, Alignment, PatternFill
wb = Workbook()
# Sheet 1: 课程巩固记录(每题一行)
ws1 = wb.active
ws1.title = "课程巩固记录"
# Build DataFrame from expanded_rows
review_data = []
for row in expanded_rows:
review_data.append({
"角色ID": row["角色ID"],
"Level": row["Level"],
"Story ID": row["Story ID"],
"Chapter ID": row["Chapter ID"],
"Unique ID": row["Unique ID"],
"题目类型": row["题目类型"],
"题目内容": row["题目内容"],
"题目对错": row["题目对错"],
"音频URL": row["音频URL"],
"用户答案": row["用户答案"],
"更新时间": row["更新时间"],
})
df1 = pd.DataFrame(review_data)
for r_idx, row in enumerate(dataframe_to_rows(df1, index=False, header=True), 1):
for c_idx, value in enumerate(row, 1):
ws1.cell(row=r_idx, column=c_idx, value=value)
# Style header
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
for cell in ws1[1]:
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center")
# Column widths
ws1.column_dimensions["A"].width = 10
ws1.column_dimensions["B"].width = 8
ws1.column_dimensions["C"].width = 12
ws1.column_dimensions["D"].width = 12
ws1.column_dimensions["E"].width = 36
ws1.column_dimensions["F"].width = 24
ws1.column_dimensions["G"].width = 30
ws1.column_dimensions["H"].width = 10
ws1.column_dimensions["I"].width = 55
ws1.column_dimensions["J"].width = 30
ws1.column_dimensions["K"].width = 22
# Sheet 2: 音频数据
ws2 = wb.create_sheet("音频数据")
audio_data = []
for a in audio_rows:
# Extract makee_id from userMsg if present
makee_id = ""
user_msg = a.get("userMsg", "")
if isinstance(user_msg, str) and "makee_id" in user_msg:
try:
um = json.loads(user_msg)
makee_id = um.get("makee_id", "")
except:
pass
audio_data.append({
"角色ID": a.get("userId"),
"角色名": a.get("userName"),
"Session ID": a.get("sessionId"),
"组件ID": a.get("componentId"),
"组件类型": a.get("componentType"),
"音频URL": a.get("audioUrl"),
"LLM音频URL": a.get("llmAudioUrl"),
"ASR状态": a.get("asrStatus"),
"发音评分(SOE)": json.dumps(a.get("soeData")) if a.get("soeData") else "",
"第几轮": a.get("roundNum"),
"Makee ID": makee_id,
"时间": a.get("timeStr"),
"时间戳": a.get("timeInt"),
"数据版本": a.get("dataVersion"),
})
df2 = pd.DataFrame(audio_data)
for r_idx, row in enumerate(dataframe_to_rows(df2, index=False, header=True), 1):
for c_idx, value in enumerate(row, 1):
ws2.cell(row=r_idx, column=c_idx, value=value)
for cell in ws2[1]:
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center")
ws2.column_dimensions["G"].width = 50
ws2.column_dimensions["H"].width = 50
ws2.column_dimensions["I"].width = 15
ws2.column_dimensions["K"].width = 40
ws2.column_dimensions["M"].width = 22
# Sheet 3: 汇总
ws3 = wb.create_sheet("汇总")
ws3["A1"] = "导出信息"
ws3["A1"].font = Font(bold=True, size=14)
ws3["A3"] = "导出时间"
ws3["B3"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ws3["A4"] = "角色ID"
ws3["B4"] = ", ".join(str(u) for u in user_ids)
ws3["A5"] = "课程巩固记录数(题目拆分行)"
ws3["B5"] = len(expanded_rows)
ws3["A6"] = "原始巩固回合数"
ws3["B6"] = len(review_rows)
ws3["A7"] = "音频记录数"
ws3["B7"] = len(audio_rows)
# Per-user breakdown
row_offset = 10
ws3[f"A{row_offset}"] = "按角色统计"
ws3[f"A{row_offset}"].font = Font(bold=True)
row_offset += 1
ws3[f"A{row_offset}"] = "角色ID"
ws3[f"B{row_offset}"] = "巩固记录"
ws3[f"C{row_offset}"] = "音频记录"
ws3[f"D{row_offset}"] = "最新巩固时间"
for cell in ws3[row_offset]:
cell.font = Font(bold=True)
cell.fill = header_fill
cell.font = Font(bold=True, color="FFFFFF")
row_offset += 1
for uid in user_ids:
r_cnt = sum(1 for r in review_rows if r["user_id"] == uid)
a_cnt = sum(1 for a in audio_rows if a.get("userId") == uid)
latest = max(
(str(r["updated_at"]) for r in review_rows if r["user_id"] == uid),
default=""
)
ws3[f"A{row_offset}"] = uid
ws3[f"B{row_offset}"] = r_cnt
ws3[f"C{row_offset}"] = a_cnt
ws3[f"D{row_offset}"] = latest
row_offset += 1
ws3.column_dimensions["A"].width = 18
ws3.column_dimensions["B"].width = 22
ws3.column_dimensions["C"].width = 18
ws3.column_dimensions["D"].width = 28
wb.save(output_path)
print(f"\n✅ 导出完成: {output_path}")
print(f" Sheet 1 — 课程巩固记录(每题一行): {len(expanded_rows)}")
print(f" Sheet 2 — 音频数据: {len(audio_rows)}")
print(f" Sheet 3 — 汇总")
# ── 4. 自动通过飞书发送文件 ──────────────────────────
# 默认发送给李若松 (user_id: 4aagb443)
# 可通过环境变量 FEISHU_SEND_TO 覆盖目标 user_id
SEND_TO = os.environ.get("FEISHU_SEND_TO", "4aagb443")
SEND_ENABLED = os.environ.get("FEISHU_SEND_ENABLED", "1")
if SEND_ENABLED != "1":
print("\n⏭️ 飞书自动发送已禁用 (FEISHU_SEND_ENABLED != 1)")
sys.exit(0)
print(f"\n[4/4] 通过飞书发送文件到 user_id={SEND_TO}...")
FEISHU_APP_ID = g("FEISHU_APP_ID")
FEISHU_APP_SECRET = g("FEISHU_APP_SECRET")
if not FEISHU_APP_ID or not FEISHU_APP_SECRET:
print(" ⚠️ 未找到 FEISHU_APP_ID/FEISHU_APP_SECRET跳过发送")
sys.exit(0)
def feishu_api(method, path, **kwargs):
"""调用飞书 Open API返回解析后的 JSON"""
cmd = ["curl", "-sk", "--connect-timeout", "10", "--max-time", "60"]
if method == "POST":
cmd += ["-X", "POST"]
cmd += ["-H", f"Authorization: Bearer {kwargs.get('token', '')}"]
if "json_data" in kwargs:
cmd += ["-H", "Content-Type: application/json", "-d", json.dumps(kwargs["json_data"])]
if "form_fields" in kwargs:
for key, val in kwargs["form_fields"].items():
cmd += ["-F", f"{key}={val}"]
cmd.append(f"https://open.feishu.cn{path}")
r = subprocess.run(cmd, capture_output=True, text=True, timeout=65)
try:
return json.loads(r.stdout)
except:
print(f" ❌ API 返回解析失败: {r.stdout[:200]}")
return {"code": -1, "msg": r.stdout[:200]}
# Step 1: 获取 tenant_access_token
print(" → 获取 tenant_access_token...")
token_resp = feishu_api("POST", "/open-apis/auth/v3/tenant_access_token/internal",
json_data={"app_id": FEISHU_APP_ID, "app_secret": FEISHU_APP_SECRET})
token = token_resp.get("tenant_access_token")
if not token:
print(f" ❌ 获取 token 失败: {token_resp}")
sys.exit(1)
print(f" → Token 获取成功")
# Step 2: 上传文件
file_name = os.path.basename(output_path)
file_ext = os.path.splitext(output_path)[1].lower()
file_type_map = {".xlsx": "xls", ".xls": "xls", ".pdf": "pdf", ".docx": "doc", ".pptx": "ppt"}
file_type = file_type_map.get(file_ext, "stream")
print(f" → 上传文件: {file_name} (type={file_type})...")
upload_resp = feishu_api("POST", "/open-apis/im/v1/files", token=token,
form_fields={"file_type": file_type, "file_name": file_name, "file": f"@{output_path}"})
file_key = upload_resp.get("data", {}).get("file_key")
if not file_key:
print(f" ❌ 文件上传失败: {upload_resp}")
sys.exit(1)
print(f" → 上传成功: {file_key}")
# Step 3: 发送文件消息
print(f" → 发送文件到 user_id={SEND_TO}...")
send_resp = feishu_api("POST", "/open-apis/im/v1/messages?receive_id_type=user_id", token=token,
json_data={
"receive_id": SEND_TO,
"msg_type": "file",
"content": json.dumps({"file_key": file_key})
})
msg_id = send_resp.get("data", {}).get("message_id")
if msg_id:
print(f" ✅ 文件已发送message_id={msg_id}")
else:
print(f" ⚠️ 发送可能失败: {send_resp}")