#!/usr/bin/env python3 """ 导出指定角色的课程巩固数据 + 原始音频。 用法: python3 export_review_audio.py <角色ID1> [角色ID2] ... python3 export_review_audio.py 23600 23686 """ import re, json, sys, os, subprocess from datetime import datetime # ── 加载 .env ─────────────────────────────────────── def load_env(): env_path = os.path.expanduser("~/.hermes/.env") with open(env_path) as f: content = f.read() def g(k): m = re.search(rf"{k}=(.+)", content) return m.group(1).strip() if m else None return g g = load_env() # ── 参数 ──────────────────────────────────────────── if len(sys.argv) < 2: print("用法: python3 export_review_audio.py <角色ID1> [角色ID2] ...") sys.exit(1) user_ids = [int(x) for x in sys.argv[1:]] output_dir = os.path.expanduser("~/.hermes/workspace/output") os.makedirs(output_dir, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") uid_str = "_".join(str(u) for u in user_ids) output_path = f"{output_dir}/知识巩固_音频_{uid_str}_{ts}.xlsx" print(f"导出角色: {user_ids}") print(f"输出文件: {output_path}") # ── 1. 查询 PG: 课程巩固记录 ─────────────────────── print("\n[1/3] 查询 PostgreSQL 课程巩固记录...") import psycopg2 from psycopg2.extras import RealDictCursor pg_conn = psycopg2.connect( host=g("VALA_PG_ONLINE_HOST"), port=int(g("VALA_PG_ONLINE_PORT")), user=g("VALA_PG_ONLINE_USER"), password=g("VALA_PG_ONLINE_PASSWORD"), dbname=g("VALA_PG_ONLINE_DB"), connect_timeout=10, ) with pg_conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT user_id, story_id, chapter_id, unique_id, score, score_text, sp_value, exp, level, question_list, play_time, created_at, updated_at FROM user_unit_review_question_result WHERE user_id = ANY(%s) AND deleted_at IS NULL ORDER BY user_id, updated_at DESC """, (user_ids,)) review_rows = cur.fetchall() # Parse question_list JSON — explode each question into its own row expanded_rows = [] # one row per question for row in review_rows: ql = row["question_list"] if isinstance(ql, str): try: ql = json.loads(ql) except: pass base = { "角色ID": row["user_id"], "Level": row["level"], "Story ID": row["story_id"], "Chapter ID": row["chapter_id"], "Unique ID": row["unique_id"], "更新时间": str(row["updated_at"]), } if isinstance(ql, list): for item in ql: if isinstance(item, dict): q = item.get("question", {}) qtype = q.get("type", "") qtitle = q.get("title", "") is_right = item.get("isRight") status = "正确" if is_right is True else "错误" audio_url = item.get("userAudio", "") user_answer = item.get("userAnswer", "") or item.get("userAnswers", "") if isinstance(user_answer, list): user_answer = ", ".join(str(a) for a in user_answer) expanded_rows.append({ **base, "题目类型": qtype, "题目内容": qtitle, "题目对错": status, "音频URL": audio_url, "用户答案": user_answer, }) row["question_count"] = len(ql) if isinstance(ql, list) else 0 pg_conn.close() print(f" → 查询到 {len(review_rows)} 条课程巩固记录") # ── 2. 查询 ES: 音频数据 ──────────────────────────── print("\n[2/3] 查询 Elasticsearch 音频数据...") es_url = f"{g('VALA_ES_ONLINE_SCHEME')}://{g('VALA_ES_ONLINE_HOST')}:{g('VALA_ES_ONLINE_PORT')}" auth = f"{g('VALA_ES_ONLINE_USER')}:{g('VALA_ES_ONLINE_PASSWORD')}" audio_rows = [] scroll_id = None page_size = 500 # First page query = { "query": {"terms": {"userId": user_ids}}, "sort": [{"timeInt": {"order": "desc"}}], "size": page_size, } r = subprocess.run([ "curl", "-sk", "-u", auth, "-H", "Content-Type: application/json", "--connect-timeout", "10", "--max-time", "30", "-X", "POST", "-d", json.dumps(query), f"{es_url}/user-audio/_search?scroll=2m" ], capture_output=True, text=True, timeout=35) resp = json.loads(r.stdout) scroll_id = resp.get("_scroll_id") total = resp.get("hits", {}).get("total", {}).get("value", 0) print(f" → ES 总计 {total} 条音频记录,分批读取...") hits = resp.get("hits", {}).get("hits", []) for h in hits: audio_rows.append(h["_source"]) # Scroll remaining batch = 1 while len(audio_rows) < total: r = subprocess.run([ "curl", "-sk", "-u", auth, "-H", "Content-Type: application/json", "--connect-timeout", "10", "--max-time", "30", "-X", "POST", "-d", json.dumps({"scroll": "2m", "scroll_id": scroll_id}), f"{es_url}/_search/scroll" ], capture_output=True, text=True, timeout=35) resp = json.loads(r.stdout) scroll_id = resp.get("_scroll_id") hits = resp.get("hits", {}).get("hits", []) if not hits: break for h in hits: audio_rows.append(h["_source"]) batch += 1 print(f" → 批次 {batch}: 已读 {len(audio_rows)}/{total} 条") # Clean up scroll subprocess.run([ "curl", "-sk", "-u", auth, "--connect-timeout", "5", "-X", "DELETE", "-d", json.dumps({"scroll_id": scroll_id}), f"{es_url}/_search/scroll" ], capture_output=True, timeout=10) print(f" → 共读取 {len(audio_rows)} 条音频记录") # ── 3. 导出 Excel ──────────────────────────────────── print("\n[3/3] 生成 Excel...") import pandas as pd from openpyxl import Workbook from openpyxl.utils.dataframe import dataframe_to_rows from openpyxl.styles import Font, Alignment, PatternFill wb = Workbook() # Sheet 1: 课程巩固记录(每题一行) ws1 = wb.active ws1.title = "课程巩固记录" # Build DataFrame from expanded_rows review_data = [] for row in expanded_rows: review_data.append({ "角色ID": row["角色ID"], "Level": row["Level"], "Story ID": row["Story ID"], "Chapter ID": row["Chapter ID"], "Unique ID": row["Unique ID"], "题目类型": row["题目类型"], "题目内容": row["题目内容"], "题目对错": row["题目对错"], "音频URL": row["音频URL"], "用户答案": row["用户答案"], "更新时间": row["更新时间"], }) df1 = pd.DataFrame(review_data) for r_idx, row in enumerate(dataframe_to_rows(df1, index=False, header=True), 1): for c_idx, value in enumerate(row, 1): ws1.cell(row=r_idx, column=c_idx, value=value) # Style header header_font = Font(bold=True, color="FFFFFF") header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") for cell in ws1[1]: cell.font = header_font cell.fill = header_fill cell.alignment = Alignment(horizontal="center") # Column widths ws1.column_dimensions["A"].width = 10 ws1.column_dimensions["B"].width = 8 ws1.column_dimensions["C"].width = 12 ws1.column_dimensions["D"].width = 12 ws1.column_dimensions["E"].width = 36 ws1.column_dimensions["F"].width = 24 ws1.column_dimensions["G"].width = 30 ws1.column_dimensions["H"].width = 10 ws1.column_dimensions["I"].width = 55 ws1.column_dimensions["J"].width = 30 ws1.column_dimensions["K"].width = 22 # Sheet 2: 音频数据 ws2 = wb.create_sheet("音频数据") audio_data = [] for a in audio_rows: # Extract makee_id from userMsg if present makee_id = "" user_msg = a.get("userMsg", "") if isinstance(user_msg, str) and "makee_id" in user_msg: try: um = json.loads(user_msg) makee_id = um.get("makee_id", "") except: pass audio_data.append({ "角色ID": a.get("userId"), "角色名": a.get("userName"), "Session ID": a.get("sessionId"), "组件ID": a.get("componentId"), "组件类型": a.get("componentType"), "音频URL": a.get("audioUrl"), "LLM音频URL": a.get("llmAudioUrl"), "ASR状态": a.get("asrStatus"), "发音评分(SOE)": json.dumps(a.get("soeData")) if a.get("soeData") else "", "第几轮": a.get("roundNum"), "Makee ID": makee_id, "时间": a.get("timeStr"), "时间戳": a.get("timeInt"), "数据版本": a.get("dataVersion"), }) df2 = pd.DataFrame(audio_data) for r_idx, row in enumerate(dataframe_to_rows(df2, index=False, header=True), 1): for c_idx, value in enumerate(row, 1): ws2.cell(row=r_idx, column=c_idx, value=value) for cell in ws2[1]: cell.font = header_font cell.fill = header_fill cell.alignment = Alignment(horizontal="center") ws2.column_dimensions["G"].width = 50 ws2.column_dimensions["H"].width = 50 ws2.column_dimensions["I"].width = 15 ws2.column_dimensions["K"].width = 40 ws2.column_dimensions["M"].width = 22 # Sheet 3: 汇总 ws3 = wb.create_sheet("汇总") ws3["A1"] = "导出信息" ws3["A1"].font = Font(bold=True, size=14) ws3["A3"] = "导出时间" ws3["B3"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") ws3["A4"] = "角色ID" ws3["B4"] = ", ".join(str(u) for u in user_ids) ws3["A5"] = "课程巩固记录数(题目拆分行)" ws3["B5"] = len(expanded_rows) ws3["A6"] = "原始巩固回合数" ws3["B6"] = len(review_rows) ws3["A7"] = "音频记录数" ws3["B7"] = len(audio_rows) # Per-user breakdown row_offset = 10 ws3[f"A{row_offset}"] = "按角色统计" ws3[f"A{row_offset}"].font = Font(bold=True) row_offset += 1 ws3[f"A{row_offset}"] = "角色ID" ws3[f"B{row_offset}"] = "巩固记录" ws3[f"C{row_offset}"] = "音频记录" ws3[f"D{row_offset}"] = "最新巩固时间" for cell in ws3[row_offset]: cell.font = Font(bold=True) cell.fill = header_fill cell.font = Font(bold=True, color="FFFFFF") row_offset += 1 for uid in user_ids: r_cnt = sum(1 for r in review_rows if r["user_id"] == uid) a_cnt = sum(1 for a in audio_rows if a.get("userId") == uid) latest = max( (str(r["updated_at"]) for r in review_rows if r["user_id"] == uid), default="无" ) ws3[f"A{row_offset}"] = uid ws3[f"B{row_offset}"] = r_cnt ws3[f"C{row_offset}"] = a_cnt ws3[f"D{row_offset}"] = latest row_offset += 1 ws3.column_dimensions["A"].width = 18 ws3.column_dimensions["B"].width = 22 ws3.column_dimensions["C"].width = 18 ws3.column_dimensions["D"].width = 28 wb.save(output_path) print(f"\n✅ 导出完成: {output_path}") print(f" Sheet 1 — 课程巩固记录(每题一行): {len(expanded_rows)} 行") print(f" Sheet 2 — 音频数据: {len(audio_rows)} 行") print(f" Sheet 3 — 汇总") # ── 4. 自动通过飞书发送文件 ────────────────────────── # 默认发送给李若松 (user_id: 4aagb443) # 可通过环境变量 FEISHU_SEND_TO 覆盖目标 user_id SEND_TO = os.environ.get("FEISHU_SEND_TO", "4aagb443") SEND_ENABLED = os.environ.get("FEISHU_SEND_ENABLED", "1") if SEND_ENABLED != "1": print("\n⏭️ 飞书自动发送已禁用 (FEISHU_SEND_ENABLED != 1)") sys.exit(0) print(f"\n[4/4] 通过飞书发送文件到 user_id={SEND_TO}...") FEISHU_APP_ID = g("FEISHU_APP_ID") FEISHU_APP_SECRET = g("FEISHU_APP_SECRET") if not FEISHU_APP_ID or not FEISHU_APP_SECRET: print(" ⚠️ 未找到 FEISHU_APP_ID/FEISHU_APP_SECRET,跳过发送") sys.exit(0) def feishu_api(method, path, **kwargs): """调用飞书 Open API,返回解析后的 JSON""" cmd = ["curl", "-sk", "--connect-timeout", "10", "--max-time", "60"] if method == "POST": cmd += ["-X", "POST"] cmd += ["-H", f"Authorization: Bearer {kwargs.get('token', '')}"] if "json_data" in kwargs: cmd += ["-H", "Content-Type: application/json", "-d", json.dumps(kwargs["json_data"])] if "form_fields" in kwargs: for key, val in kwargs["form_fields"].items(): cmd += ["-F", f"{key}={val}"] cmd.append(f"https://open.feishu.cn{path}") r = subprocess.run(cmd, capture_output=True, text=True, timeout=65) try: return json.loads(r.stdout) except: print(f" ❌ API 返回解析失败: {r.stdout[:200]}") return {"code": -1, "msg": r.stdout[:200]} # Step 1: 获取 tenant_access_token print(" → 获取 tenant_access_token...") token_resp = feishu_api("POST", "/open-apis/auth/v3/tenant_access_token/internal", json_data={"app_id": FEISHU_APP_ID, "app_secret": FEISHU_APP_SECRET}) token = token_resp.get("tenant_access_token") if not token: print(f" ❌ 获取 token 失败: {token_resp}") sys.exit(1) print(f" → Token 获取成功") # Step 2: 上传文件 file_name = os.path.basename(output_path) file_ext = os.path.splitext(output_path)[1].lower() file_type_map = {".xlsx": "xls", ".xls": "xls", ".pdf": "pdf", ".docx": "doc", ".pptx": "ppt"} file_type = file_type_map.get(file_ext, "stream") print(f" → 上传文件: {file_name} (type={file_type})...") upload_resp = feishu_api("POST", "/open-apis/im/v1/files", token=token, form_fields={"file_type": file_type, "file_name": file_name, "file": f"@{output_path}"}) file_key = upload_resp.get("data", {}).get("file_key") if not file_key: print(f" ❌ 文件上传失败: {upload_resp}") sys.exit(1) print(f" → 上传成功: {file_key}") # Step 3: 发送文件消息 print(f" → 发送文件到 user_id={SEND_TO}...") send_resp = feishu_api("POST", "/open-apis/im/v1/messages?receive_id_type=user_id", token=token, json_data={ "receive_id": SEND_TO, "msg_type": "file", "content": json.dumps({"file_key": file_key}) }) msg_id = send_resp.get("data", {}).get("message_id") if msg_id: print(f" ✅ 文件已发送,message_id={msg_id}") else: print(f" ⚠️ 发送可能失败: {send_resp}")