394 lines
14 KiB
Python
394 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
导出指定角色的课程巩固数据 + 原始音频。
|
||
用法: python3 export_review_audio.py <角色ID1> [角色ID2] ...
|
||
python3 export_review_audio.py 23600 23686
|
||
"""
|
||
import re, json, sys, os, subprocess
|
||
from datetime import datetime
|
||
|
||
# ── 加载 .env ───────────────────────────────────────
|
||
def load_env():
|
||
env_path = os.path.expanduser("~/.hermes/.env")
|
||
with open(env_path) as f:
|
||
content = f.read()
|
||
def g(k):
|
||
m = re.search(rf"{k}=(.+)", content)
|
||
return m.group(1).strip() if m else None
|
||
return g
|
||
|
||
g = load_env()
|
||
|
||
# ── 参数 ────────────────────────────────────────────
|
||
if len(sys.argv) < 2:
|
||
print("用法: python3 export_review_audio.py <角色ID1> [角色ID2] ...")
|
||
sys.exit(1)
|
||
|
||
user_ids = [int(x) for x in sys.argv[1:]]
|
||
output_dir = os.path.expanduser("~/.hermes/workspace/output")
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
uid_str = "_".join(str(u) for u in user_ids)
|
||
output_path = f"{output_dir}/知识巩固_音频_{uid_str}_{ts}.xlsx"
|
||
|
||
print(f"导出角色: {user_ids}")
|
||
print(f"输出文件: {output_path}")
|
||
|
||
# ── 1. 查询 PG: 课程巩固记录 ───────────────────────
|
||
print("\n[1/3] 查询 PostgreSQL 课程巩固记录...")
|
||
import psycopg2
|
||
from psycopg2.extras import RealDictCursor
|
||
|
||
pg_conn = psycopg2.connect(
|
||
host=g("VALA_PG_ONLINE_HOST"), port=int(g("VALA_PG_ONLINE_PORT")),
|
||
user=g("VALA_PG_ONLINE_USER"), password=g("VALA_PG_ONLINE_PASSWORD"),
|
||
dbname=g("VALA_PG_ONLINE_DB"), connect_timeout=10,
|
||
)
|
||
|
||
with pg_conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||
cur.execute("""
|
||
SELECT user_id, story_id, chapter_id, unique_id,
|
||
score, score_text, sp_value, exp, level,
|
||
question_list, play_time, created_at, updated_at
|
||
FROM user_unit_review_question_result
|
||
WHERE user_id = ANY(%s) AND deleted_at IS NULL
|
||
ORDER BY user_id, updated_at DESC
|
||
""", (user_ids,))
|
||
review_rows = cur.fetchall()
|
||
|
||
# Parse question_list JSON — explode each question into its own row
|
||
expanded_rows = [] # one row per question
|
||
for row in review_rows:
|
||
ql = row["question_list"]
|
||
if isinstance(ql, str):
|
||
try:
|
||
ql = json.loads(ql)
|
||
except:
|
||
pass
|
||
base = {
|
||
"角色ID": row["user_id"],
|
||
"Level": row["level"],
|
||
"Story ID": row["story_id"],
|
||
"Chapter ID": row["chapter_id"],
|
||
"Unique ID": row["unique_id"],
|
||
"更新时间": str(row["updated_at"]),
|
||
}
|
||
if isinstance(ql, list):
|
||
for item in ql:
|
||
if isinstance(item, dict):
|
||
q = item.get("question", {})
|
||
qtype = q.get("type", "")
|
||
qtitle = q.get("title", "")
|
||
is_right = item.get("isRight")
|
||
status = "正确" if is_right is True else "错误"
|
||
audio_url = item.get("userAudio", "")
|
||
user_answer = item.get("userAnswer", "") or item.get("userAnswers", "")
|
||
if isinstance(user_answer, list):
|
||
user_answer = ", ".join(str(a) for a in user_answer)
|
||
expanded_rows.append({
|
||
**base,
|
||
"题目类型": qtype,
|
||
"题目内容": qtitle,
|
||
"题目对错": status,
|
||
"音频URL": audio_url,
|
||
"用户答案": user_answer,
|
||
})
|
||
row["question_count"] = len(ql) if isinstance(ql, list) else 0
|
||
|
||
pg_conn.close()
|
||
print(f" → 查询到 {len(review_rows)} 条课程巩固记录")
|
||
|
||
# ── 2. 查询 ES: 音频数据 ────────────────────────────
|
||
print("\n[2/3] 查询 Elasticsearch 音频数据...")
|
||
es_url = f"{g('VALA_ES_ONLINE_SCHEME')}://{g('VALA_ES_ONLINE_HOST')}:{g('VALA_ES_ONLINE_PORT')}"
|
||
auth = f"{g('VALA_ES_ONLINE_USER')}:{g('VALA_ES_ONLINE_PASSWORD')}"
|
||
|
||
audio_rows = []
|
||
scroll_id = None
|
||
page_size = 500
|
||
|
||
# First page
|
||
query = {
|
||
"query": {"terms": {"userId": user_ids}},
|
||
"sort": [{"timeInt": {"order": "desc"}}],
|
||
"size": page_size,
|
||
}
|
||
r = subprocess.run([
|
||
"curl", "-sk", "-u", auth,
|
||
"-H", "Content-Type: application/json",
|
||
"--connect-timeout", "10", "--max-time", "30",
|
||
"-X", "POST", "-d", json.dumps(query),
|
||
f"{es_url}/user-audio/_search?scroll=2m"
|
||
], capture_output=True, text=True, timeout=35)
|
||
resp = json.loads(r.stdout)
|
||
scroll_id = resp.get("_scroll_id")
|
||
total = resp.get("hits", {}).get("total", {}).get("value", 0)
|
||
print(f" → ES 总计 {total} 条音频记录,分批读取...")
|
||
|
||
hits = resp.get("hits", {}).get("hits", [])
|
||
for h in hits:
|
||
audio_rows.append(h["_source"])
|
||
|
||
# Scroll remaining
|
||
batch = 1
|
||
while len(audio_rows) < total:
|
||
r = subprocess.run([
|
||
"curl", "-sk", "-u", auth,
|
||
"-H", "Content-Type: application/json",
|
||
"--connect-timeout", "10", "--max-time", "30",
|
||
"-X", "POST", "-d", json.dumps({"scroll": "2m", "scroll_id": scroll_id}),
|
||
f"{es_url}/_search/scroll"
|
||
], capture_output=True, text=True, timeout=35)
|
||
resp = json.loads(r.stdout)
|
||
scroll_id = resp.get("_scroll_id")
|
||
hits = resp.get("hits", {}).get("hits", [])
|
||
if not hits:
|
||
break
|
||
for h in hits:
|
||
audio_rows.append(h["_source"])
|
||
batch += 1
|
||
print(f" → 批次 {batch}: 已读 {len(audio_rows)}/{total} 条")
|
||
|
||
# Clean up scroll
|
||
subprocess.run([
|
||
"curl", "-sk", "-u", auth, "--connect-timeout", "5",
|
||
"-X", "DELETE", "-d", json.dumps({"scroll_id": scroll_id}),
|
||
f"{es_url}/_search/scroll"
|
||
], capture_output=True, timeout=10)
|
||
|
||
print(f" → 共读取 {len(audio_rows)} 条音频记录")
|
||
|
||
# ── 3. 导出 Excel ────────────────────────────────────
|
||
print("\n[3/3] 生成 Excel...")
|
||
import pandas as pd
|
||
from openpyxl import Workbook
|
||
from openpyxl.utils.dataframe import dataframe_to_rows
|
||
from openpyxl.styles import Font, Alignment, PatternFill
|
||
|
||
wb = Workbook()
|
||
|
||
# Sheet 1: 课程巩固记录(每题一行)
|
||
ws1 = wb.active
|
||
ws1.title = "课程巩固记录"
|
||
|
||
# Build DataFrame from expanded_rows
|
||
review_data = []
|
||
for row in expanded_rows:
|
||
review_data.append({
|
||
"角色ID": row["角色ID"],
|
||
"Level": row["Level"],
|
||
"Story ID": row["Story ID"],
|
||
"Chapter ID": row["Chapter ID"],
|
||
"Unique ID": row["Unique ID"],
|
||
"题目类型": row["题目类型"],
|
||
"题目内容": row["题目内容"],
|
||
"题目对错": row["题目对错"],
|
||
"音频URL": row["音频URL"],
|
||
"用户答案": row["用户答案"],
|
||
"更新时间": row["更新时间"],
|
||
})
|
||
|
||
df1 = pd.DataFrame(review_data)
|
||
for r_idx, row in enumerate(dataframe_to_rows(df1, index=False, header=True), 1):
|
||
for c_idx, value in enumerate(row, 1):
|
||
ws1.cell(row=r_idx, column=c_idx, value=value)
|
||
|
||
# Style header
|
||
header_font = Font(bold=True, color="FFFFFF")
|
||
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
|
||
for cell in ws1[1]:
|
||
cell.font = header_font
|
||
cell.fill = header_fill
|
||
cell.alignment = Alignment(horizontal="center")
|
||
|
||
# Column widths
|
||
ws1.column_dimensions["A"].width = 10
|
||
ws1.column_dimensions["B"].width = 8
|
||
ws1.column_dimensions["C"].width = 12
|
||
ws1.column_dimensions["D"].width = 12
|
||
ws1.column_dimensions["E"].width = 36
|
||
ws1.column_dimensions["F"].width = 24
|
||
ws1.column_dimensions["G"].width = 30
|
||
ws1.column_dimensions["H"].width = 10
|
||
ws1.column_dimensions["I"].width = 55
|
||
ws1.column_dimensions["J"].width = 30
|
||
ws1.column_dimensions["K"].width = 22
|
||
|
||
# Sheet 2: 音频数据
|
||
ws2 = wb.create_sheet("音频数据")
|
||
audio_data = []
|
||
for a in audio_rows:
|
||
# Extract makee_id from userMsg if present
|
||
makee_id = ""
|
||
user_msg = a.get("userMsg", "")
|
||
if isinstance(user_msg, str) and "makee_id" in user_msg:
|
||
try:
|
||
um = json.loads(user_msg)
|
||
makee_id = um.get("makee_id", "")
|
||
except:
|
||
pass
|
||
|
||
audio_data.append({
|
||
"角色ID": a.get("userId"),
|
||
"角色名": a.get("userName"),
|
||
"Session ID": a.get("sessionId"),
|
||
"组件ID": a.get("componentId"),
|
||
"组件类型": a.get("componentType"),
|
||
"音频URL": a.get("audioUrl"),
|
||
"LLM音频URL": a.get("llmAudioUrl"),
|
||
"ASR状态": a.get("asrStatus"),
|
||
"发音评分(SOE)": json.dumps(a.get("soeData")) if a.get("soeData") else "",
|
||
"第几轮": a.get("roundNum"),
|
||
"Makee ID": makee_id,
|
||
"时间": a.get("timeStr"),
|
||
"时间戳": a.get("timeInt"),
|
||
"数据版本": a.get("dataVersion"),
|
||
})
|
||
|
||
df2 = pd.DataFrame(audio_data)
|
||
for r_idx, row in enumerate(dataframe_to_rows(df2, index=False, header=True), 1):
|
||
for c_idx, value in enumerate(row, 1):
|
||
ws2.cell(row=r_idx, column=c_idx, value=value)
|
||
|
||
for cell in ws2[1]:
|
||
cell.font = header_font
|
||
cell.fill = header_fill
|
||
cell.alignment = Alignment(horizontal="center")
|
||
|
||
ws2.column_dimensions["G"].width = 50
|
||
ws2.column_dimensions["H"].width = 50
|
||
ws2.column_dimensions["I"].width = 15
|
||
ws2.column_dimensions["K"].width = 40
|
||
ws2.column_dimensions["M"].width = 22
|
||
|
||
# Sheet 3: 汇总
|
||
ws3 = wb.create_sheet("汇总")
|
||
ws3["A1"] = "导出信息"
|
||
ws3["A1"].font = Font(bold=True, size=14)
|
||
ws3["A3"] = "导出时间"
|
||
ws3["B3"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
ws3["A4"] = "角色ID"
|
||
ws3["B4"] = ", ".join(str(u) for u in user_ids)
|
||
ws3["A5"] = "课程巩固记录数(题目拆分行)"
|
||
ws3["B5"] = len(expanded_rows)
|
||
ws3["A6"] = "原始巩固回合数"
|
||
ws3["B6"] = len(review_rows)
|
||
ws3["A7"] = "音频记录数"
|
||
ws3["B7"] = len(audio_rows)
|
||
|
||
# Per-user breakdown
|
||
row_offset = 10
|
||
ws3[f"A{row_offset}"] = "按角色统计"
|
||
ws3[f"A{row_offset}"].font = Font(bold=True)
|
||
row_offset += 1
|
||
ws3[f"A{row_offset}"] = "角色ID"
|
||
ws3[f"B{row_offset}"] = "巩固记录"
|
||
ws3[f"C{row_offset}"] = "音频记录"
|
||
ws3[f"D{row_offset}"] = "最新巩固时间"
|
||
for cell in ws3[row_offset]:
|
||
cell.font = Font(bold=True)
|
||
cell.fill = header_fill
|
||
cell.font = Font(bold=True, color="FFFFFF")
|
||
|
||
row_offset += 1
|
||
for uid in user_ids:
|
||
r_cnt = sum(1 for r in review_rows if r["user_id"] == uid)
|
||
a_cnt = sum(1 for a in audio_rows if a.get("userId") == uid)
|
||
latest = max(
|
||
(str(r["updated_at"]) for r in review_rows if r["user_id"] == uid),
|
||
default="无"
|
||
)
|
||
ws3[f"A{row_offset}"] = uid
|
||
ws3[f"B{row_offset}"] = r_cnt
|
||
ws3[f"C{row_offset}"] = a_cnt
|
||
ws3[f"D{row_offset}"] = latest
|
||
row_offset += 1
|
||
|
||
ws3.column_dimensions["A"].width = 18
|
||
ws3.column_dimensions["B"].width = 22
|
||
ws3.column_dimensions["C"].width = 18
|
||
ws3.column_dimensions["D"].width = 28
|
||
|
||
wb.save(output_path)
|
||
print(f"\n✅ 导出完成: {output_path}")
|
||
print(f" Sheet 1 — 课程巩固记录(每题一行): {len(expanded_rows)} 行")
|
||
print(f" Sheet 2 — 音频数据: {len(audio_rows)} 行")
|
||
print(f" Sheet 3 — 汇总")
|
||
|
||
# ── 4. 自动通过飞书发送文件 ──────────────────────────
|
||
# 默认发送给李若松 (user_id: 4aagb443)
|
||
# 可通过环境变量 FEISHU_SEND_TO 覆盖目标 user_id
|
||
SEND_TO = os.environ.get("FEISHU_SEND_TO", "4aagb443")
|
||
SEND_ENABLED = os.environ.get("FEISHU_SEND_ENABLED", "1")
|
||
|
||
if SEND_ENABLED != "1":
|
||
print("\n⏭️ 飞书自动发送已禁用 (FEISHU_SEND_ENABLED != 1)")
|
||
sys.exit(0)
|
||
|
||
print(f"\n[4/4] 通过飞书发送文件到 user_id={SEND_TO}...")
|
||
|
||
FEISHU_APP_ID = g("FEISHU_APP_ID")
|
||
FEISHU_APP_SECRET = g("FEISHU_APP_SECRET")
|
||
|
||
if not FEISHU_APP_ID or not FEISHU_APP_SECRET:
|
||
print(" ⚠️ 未找到 FEISHU_APP_ID/FEISHU_APP_SECRET,跳过发送")
|
||
sys.exit(0)
|
||
|
||
def feishu_api(method, path, **kwargs):
|
||
"""调用飞书 Open API,返回解析后的 JSON"""
|
||
cmd = ["curl", "-sk", "--connect-timeout", "10", "--max-time", "60"]
|
||
if method == "POST":
|
||
cmd += ["-X", "POST"]
|
||
cmd += ["-H", f"Authorization: Bearer {kwargs.get('token', '')}"]
|
||
if "json_data" in kwargs:
|
||
cmd += ["-H", "Content-Type: application/json", "-d", json.dumps(kwargs["json_data"])]
|
||
if "form_fields" in kwargs:
|
||
for key, val in kwargs["form_fields"].items():
|
||
cmd += ["-F", f"{key}={val}"]
|
||
cmd.append(f"https://open.feishu.cn{path}")
|
||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=65)
|
||
try:
|
||
return json.loads(r.stdout)
|
||
except:
|
||
print(f" ❌ API 返回解析失败: {r.stdout[:200]}")
|
||
return {"code": -1, "msg": r.stdout[:200]}
|
||
|
||
# Step 1: 获取 tenant_access_token
|
||
print(" → 获取 tenant_access_token...")
|
||
token_resp = feishu_api("POST", "/open-apis/auth/v3/tenant_access_token/internal",
|
||
json_data={"app_id": FEISHU_APP_ID, "app_secret": FEISHU_APP_SECRET})
|
||
token = token_resp.get("tenant_access_token")
|
||
if not token:
|
||
print(f" ❌ 获取 token 失败: {token_resp}")
|
||
sys.exit(1)
|
||
print(f" → Token 获取成功")
|
||
|
||
# Step 2: 上传文件
|
||
file_name = os.path.basename(output_path)
|
||
file_ext = os.path.splitext(output_path)[1].lower()
|
||
file_type_map = {".xlsx": "xls", ".xls": "xls", ".pdf": "pdf", ".docx": "doc", ".pptx": "ppt"}
|
||
file_type = file_type_map.get(file_ext, "stream")
|
||
|
||
print(f" → 上传文件: {file_name} (type={file_type})...")
|
||
upload_resp = feishu_api("POST", "/open-apis/im/v1/files", token=token,
|
||
form_fields={"file_type": file_type, "file_name": file_name, "file": f"@{output_path}"})
|
||
file_key = upload_resp.get("data", {}).get("file_key")
|
||
if not file_key:
|
||
print(f" ❌ 文件上传失败: {upload_resp}")
|
||
sys.exit(1)
|
||
print(f" → 上传成功: {file_key}")
|
||
|
||
# Step 3: 发送文件消息
|
||
print(f" → 发送文件到 user_id={SEND_TO}...")
|
||
send_resp = feishu_api("POST", "/open-apis/im/v1/messages?receive_id_type=user_id", token=token,
|
||
json_data={
|
||
"receive_id": SEND_TO,
|
||
"msg_type": "file",
|
||
"content": json.dumps({"file_key": file_key})
|
||
})
|
||
msg_id = send_resp.get("data", {}).get("message_id")
|
||
if msg_id:
|
||
print(f" ✅ 文件已发送,message_id={msg_id}")
|
||
else:
|
||
print(f" ⚠️ 发送可能失败: {send_resp}")
|