290 lines
9.8 KiB
Python
290 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
导出指定角色的课程巩固数据 + 原始音频。
|
|
用法: python3 export_review_audio.py <角色ID1> [角色ID2] ...
|
|
python3 export_review_audio.py 23600 23686
|
|
"""
|
|
import re, json, sys, os, subprocess
|
|
from datetime import datetime
|
|
|
|
# ── 加载 .env ───────────────────────────────────────
|
|
def load_env():
|
|
env_path = os.path.expanduser("~/.hermes/.env")
|
|
with open(env_path) as f:
|
|
content = f.read()
|
|
def g(k):
|
|
m = re.search(rf"{k}=(.+)", content)
|
|
return m.group(1).strip() if m else None
|
|
return g
|
|
|
|
g = load_env()
|
|
|
|
# ── 参数 ────────────────────────────────────────────
|
|
if len(sys.argv) < 2:
|
|
print("用法: python3 export_review_audio.py <角色ID1> [角色ID2] ...")
|
|
sys.exit(1)
|
|
|
|
user_ids = [int(x) for x in sys.argv[1:]]
|
|
output_dir = os.path.expanduser("~/.hermes/workspace/output")
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
uid_str = "_".join(str(u) for u in user_ids)
|
|
output_path = f"{output_dir}/知识巩固_音频_{uid_str}_{ts}.xlsx"
|
|
|
|
print(f"导出角色: {user_ids}")
|
|
print(f"输出文件: {output_path}")
|
|
|
|
# ── 1. 查询 PG: 课程巩固记录 ───────────────────────
|
|
print("\n[1/3] 查询 PostgreSQL 课程巩固记录...")
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
|
|
pg_conn = psycopg2.connect(
|
|
host=g("VALA_PG_ONLINE_HOST"), port=int(g("VALA_PG_ONLINE_PORT")),
|
|
user=g("VALA_PG_ONLINE_USER"), password=g("VALA_PG_ONLINE_PASSWORD"),
|
|
dbname=g("VALA_PG_ONLINE_DB"), connect_timeout=10,
|
|
)
|
|
|
|
with pg_conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT user_id, story_id, chapter_id, unique_id,
|
|
score, score_text, sp_value, exp, level,
|
|
question_list, play_time, created_at, updated_at
|
|
FROM user_unit_review_question_result
|
|
WHERE user_id = ANY(%s) AND deleted_at IS NULL
|
|
ORDER BY user_id, updated_at DESC
|
|
""", (user_ids,))
|
|
review_rows = cur.fetchall()
|
|
|
|
# Parse question_list JSON for readable summary
|
|
for row in review_rows:
|
|
ql = row["question_list"]
|
|
if isinstance(ql, str):
|
|
try:
|
|
ql = json.loads(ql)
|
|
except:
|
|
pass
|
|
questions = []
|
|
if isinstance(ql, list):
|
|
for item in ql:
|
|
if isinstance(item, dict):
|
|
q = item.get("question", {})
|
|
qtype = q.get("type", "")
|
|
qtitle = q.get("title", "")
|
|
user_answer = item.get("userAnswer", "")
|
|
score = item.get("score", "")
|
|
questions.append(f"[{qtype}] {qtitle} | 回答: {user_answer} | 得分: {score}")
|
|
row["question_summary"] = "\n".join(questions)
|
|
row["question_count"] = len(ql) if isinstance(ql, list) else 0
|
|
|
|
pg_conn.close()
|
|
print(f" → 查询到 {len(review_rows)} 条课程巩固记录")
|
|
|
|
# ── 2. 查询 ES: 音频数据 ────────────────────────────
|
|
print("\n[2/3] 查询 Elasticsearch 音频数据...")
|
|
es_url = f"{g('VALA_ES_ONLINE_SCHEME')}://{g('VALA_ES_ONLINE_HOST')}:{g('VALA_ES_ONLINE_PORT')}"
|
|
auth = f"{g('VALA_ES_ONLINE_USER')}:{g('VALA_ES_ONLINE_PASSWORD')}"
|
|
|
|
audio_rows = []
|
|
scroll_id = None
|
|
page_size = 500
|
|
|
|
# First page
|
|
query = {
|
|
"query": {"terms": {"userId": user_ids}},
|
|
"sort": [{"timeInt": {"order": "desc"}}],
|
|
"size": page_size,
|
|
}
|
|
r = subprocess.run([
|
|
"curl", "-sk", "-u", auth,
|
|
"-H", "Content-Type: application/json",
|
|
"--connect-timeout", "10", "--max-time", "30",
|
|
"-X", "POST", "-d", json.dumps(query),
|
|
f"{es_url}/user-audio/_search?scroll=2m"
|
|
], capture_output=True, text=True, timeout=35)
|
|
resp = json.loads(r.stdout)
|
|
scroll_id = resp.get("_scroll_id")
|
|
total = resp.get("hits", {}).get("total", {}).get("value", 0)
|
|
print(f" → ES 总计 {total} 条音频记录,分批读取...")
|
|
|
|
hits = resp.get("hits", {}).get("hits", [])
|
|
for h in hits:
|
|
audio_rows.append(h["_source"])
|
|
|
|
# Scroll remaining
|
|
batch = 1
|
|
while len(audio_rows) < total:
|
|
r = subprocess.run([
|
|
"curl", "-sk", "-u", auth,
|
|
"-H", "Content-Type: application/json",
|
|
"--connect-timeout", "10", "--max-time", "30",
|
|
"-X", "POST", "-d", json.dumps({"scroll": "2m", "scroll_id": scroll_id}),
|
|
f"{es_url}/_search/scroll"
|
|
], capture_output=True, text=True, timeout=35)
|
|
resp = json.loads(r.stdout)
|
|
scroll_id = resp.get("_scroll_id")
|
|
hits = resp.get("hits", {}).get("hits", [])
|
|
if not hits:
|
|
break
|
|
for h in hits:
|
|
audio_rows.append(h["_source"])
|
|
batch += 1
|
|
print(f" → 批次 {batch}: 已读 {len(audio_rows)}/{total} 条")
|
|
|
|
# Clean up scroll
|
|
subprocess.run([
|
|
"curl", "-sk", "-u", auth, "--connect-timeout", "5",
|
|
"-X", "DELETE", "-d", json.dumps({"scroll_id": scroll_id}),
|
|
f"{es_url}/_search/scroll"
|
|
], capture_output=True, timeout=10)
|
|
|
|
print(f" → 共读取 {len(audio_rows)} 条音频记录")
|
|
|
|
# ── 3. 导出 Excel ────────────────────────────────────
|
|
print("\n[3/3] 生成 Excel...")
|
|
import pandas as pd
|
|
from openpyxl import Workbook
|
|
from openpyxl.utils.dataframe import dataframe_to_rows
|
|
from openpyxl.styles import Font, Alignment, PatternFill
|
|
|
|
wb = Workbook()
|
|
|
|
# Sheet 1: 课程巩固记录
|
|
ws1 = wb.active
|
|
ws1.title = "课程巩固记录"
|
|
review_data = []
|
|
for row in review_rows:
|
|
review_data.append({
|
|
"角色ID": row["user_id"],
|
|
"Level": row["level"],
|
|
"Story ID": row["story_id"],
|
|
"Chapter ID": row["chapter_id"],
|
|
"Unique ID": row["unique_id"],
|
|
"得分": row["score"],
|
|
"评级": row["score_text"],
|
|
"SP值": row["sp_value"],
|
|
"经验值": row["exp"],
|
|
"题目数": row["question_count"],
|
|
"耗时(秒)": row["play_time"],
|
|
"题目详情": row["question_summary"],
|
|
"更新时间": str(row["updated_at"]),
|
|
"创建时间": str(row["created_at"]),
|
|
})
|
|
|
|
df1 = pd.DataFrame(review_data)
|
|
for r_idx, row in enumerate(dataframe_to_rows(df1, index=False, header=True), 1):
|
|
for c_idx, value in enumerate(row, 1):
|
|
ws1.cell(row=r_idx, column=c_idx, value=value)
|
|
|
|
# Style header
|
|
header_font = Font(bold=True, color="FFFFFF")
|
|
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
|
|
for cell in ws1[1]:
|
|
cell.font = header_font
|
|
cell.fill = header_fill
|
|
cell.alignment = Alignment(horizontal="center")
|
|
|
|
# Column widths
|
|
ws1.column_dimensions["A"].width = 10
|
|
ws1.column_dimensions["L"].width = 12
|
|
ws1.column_dimensions["M"].width = 60
|
|
|
|
# Sheet 2: 音频数据
|
|
ws2 = wb.create_sheet("音频数据")
|
|
audio_data = []
|
|
for a in audio_rows:
|
|
# Extract makee_id from userMsg if present
|
|
makee_id = ""
|
|
user_msg = a.get("userMsg", "")
|
|
if isinstance(user_msg, str) and "makee_id" in user_msg:
|
|
try:
|
|
um = json.loads(user_msg)
|
|
makee_id = um.get("makee_id", "")
|
|
except:
|
|
pass
|
|
|
|
audio_data.append({
|
|
"角色ID": a.get("userId"),
|
|
"角色名": a.get("userName"),
|
|
"Session ID": a.get("sessionId"),
|
|
"组件ID": a.get("componentId"),
|
|
"组件类型": a.get("componentType"),
|
|
"音频URL": a.get("audioUrl"),
|
|
"LLM音频URL": a.get("llmAudioUrl"),
|
|
"ASR状态": a.get("asrStatus"),
|
|
"发音评分(SOE)": json.dumps(a.get("soeData")) if a.get("soeData") else "",
|
|
"第几轮": a.get("roundNum"),
|
|
"Makee ID": makee_id,
|
|
"时间": a.get("timeStr"),
|
|
"时间戳": a.get("timeInt"),
|
|
"数据版本": a.get("dataVersion"),
|
|
})
|
|
|
|
df2 = pd.DataFrame(audio_data)
|
|
for r_idx, row in enumerate(dataframe_to_rows(df2, index=False, header=True), 1):
|
|
for c_idx, value in enumerate(row, 1):
|
|
ws2.cell(row=r_idx, column=c_idx, value=value)
|
|
|
|
for cell in ws2[1]:
|
|
cell.font = header_font
|
|
cell.fill = header_fill
|
|
cell.alignment = Alignment(horizontal="center")
|
|
|
|
ws2.column_dimensions["G"].width = 50
|
|
ws2.column_dimensions["H"].width = 50
|
|
ws2.column_dimensions["I"].width = 15
|
|
ws2.column_dimensions["K"].width = 40
|
|
ws2.column_dimensions["M"].width = 22
|
|
|
|
# Sheet 3: 汇总
|
|
ws3 = wb.create_sheet("汇总")
|
|
ws3["A1"] = "导出信息"
|
|
ws3["A1"].font = Font(bold=True, size=14)
|
|
ws3["A3"] = "导出时间"
|
|
ws3["B3"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
ws3["A4"] = "角色ID"
|
|
ws3["B4"] = ", ".join(str(u) for u in user_ids)
|
|
ws3["A5"] = "课程巩固记录数"
|
|
ws3["B5"] = len(review_rows)
|
|
ws3["A6"] = "音频记录数"
|
|
ws3["B6"] = len(audio_rows)
|
|
|
|
# Per-user breakdown
|
|
row_offset = 8
|
|
ws3[f"A{row_offset}"] = "按角色统计"
|
|
ws3[f"A{row_offset}"].font = Font(bold=True)
|
|
row_offset += 1
|
|
ws3[f"A{row_offset}"] = "角色ID"
|
|
ws3[f"B{row_offset}"] = "巩固记录"
|
|
ws3[f"C{row_offset}"] = "音频记录"
|
|
ws3[f"D{row_offset}"] = "最新巩固时间"
|
|
for cell in ws3[row_offset]:
|
|
cell.font = Font(bold=True)
|
|
cell.fill = header_fill
|
|
cell.font = Font(bold=True, color="FFFFFF")
|
|
|
|
row_offset += 1
|
|
for uid in user_ids:
|
|
r_cnt = sum(1 for r in review_rows if r["user_id"] == uid)
|
|
a_cnt = sum(1 for a in audio_rows if a.get("userId") == uid)
|
|
latest = max(
|
|
(str(r["updated_at"]) for r in review_rows if r["user_id"] == uid),
|
|
default="无"
|
|
)
|
|
ws3[f"A{row_offset}"] = uid
|
|
ws3[f"B{row_offset}"] = r_cnt
|
|
ws3[f"C{row_offset}"] = a_cnt
|
|
ws3[f"D{row_offset}"] = latest
|
|
row_offset += 1
|
|
|
|
ws3.column_dimensions["A"].width = 18
|
|
ws3.column_dimensions["B"].width = 22
|
|
ws3.column_dimensions["C"].width = 18
|
|
ws3.column_dimensions["D"].width = 28
|
|
|
|
wb.save(output_path)
|
|
print(f"\n✅ 导出完成: {output_path}")
|
|
print(f" Sheet 1 — 课程巩固记录: {len(review_rows)} 行")
|
|
print(f" Sheet 2 — 音频数据: {len(audio_rows)} 行")
|
|
print(f" Sheet 3 — 汇总")
|