""" 初版需求v1.0: 2025.11.18 导出 一个userId的多表数据, 最终按照不同sheet,输出到一个 excel文件中。 1. 第一个sheet:"全部音频数据" es相关配置通过以下环境变量 ES_HOST=xxx ES_PORT=9200 ES_SCHEME=https ES_USER=elastic ES_PASSWORD=xxx index: user-audio 脚本思路: 过滤字段: userId == xxxx 输出该userId的全部记录 按时间倒序排序 包含以下字段内容: userId userMsg userName soeData audioUrl asrStatus componentId componentType dataVersion 2. 第二个sheet:"互动组件学习记录" 在 PGsql数据库中 筛选出 user_id 对应的记录 按时间(updated_at)倒序排列。 数据库相关配置 从.env中读取: PG_DB_HOST = xxx PG_DB_PORT = xxx PG_DB_USER = xxx PG_DB_PASSWORD = xxx PG_DB_DATABASE = xxx 读取以下数据表: user_component_play_record_0 ~ user_component_play_record_7 输出以下字段: user_id, component_unique_code, session_id, c_type, c_id, play_result, user_behavior_info, updated_at 3.第三个sheet:"课程巩固记录" 在 PGsql数据库中 筛选出 user_id 对应的记录 按时间(updated_at)倒序排列。 数据表:user_unit_review_question_result 输出以下字段: user_id story_id chapter_id question_list updated_at 4.第四个sheet:"单元挑战记录" 在 PGsql数据库中 筛选出 user_id 对应的记录 按时间(updated_at)倒序排列。 数据表:user_unit_challenge_question_result 输出以下字段: user_id story_id category score_text, question_list updated_at ------------ 需求补充v1.1: "全部音频数据"这个sheet 输出字段 添加timeStr 并按时间倒序排列 最新的记录 在最上面 ------------ 需求补充v1.2: "全部音频数据"这个sheet 如果userMsg字段内容 包含 ”makee_id“ 要进行以下处理: 从userMsg字段中提取出具体的makee_id: 此时的字段样例: ``` asr msg信息为:{ "time_ms": 358, "time_ms_api": 357, "hot_words_str": "{\n \"context_type\": \"dialog_ctx\",\n \"context_data\": [\n {\n \"text\": \"planet Walla\"\n },\n {\n \"text\": \"Walla\"\n }\n ]\n}", "makee_id": "d208c617-902f-4f81-8255-b5fb73599546", "volcano_fast_x_tt_logid": "202511151541355DF72BE5EBFE73795BFD", "api_name": "volcano-fast" } ``` 然后基于makee_id 去另一个表里查记录: index:llm_asr_log 将查询到的记录的 result_text 字段内容 回填到 userMsg。 将source字段内容 输出 到 source。 如果userMsg字段内容 不包含 ”makee_id“ 保持之前的逻辑。 -------------- 需求补充 v1.3 当前输入 只支持配置单个 userId (业务侧名称为角色id) 期望扩展为以下逻辑: 1. 改为配置 角色id list , 分别 导出 多份excel文件。命名格式为 角色id_{}_导出时间_{}.xlsx 2. 改为配置 账户id list , 分别 导出 多份excel文件。命名格式为 账户id_{}_角色id_{}_导出时间_{}.xlsx 关于 账户 id 到角色id 的映射逻辑, 首先 读取 mysql 表 vala_app_character 筛选 account_id字段值 == 账户id 的 记录, 其中 该记录 的 id值,则为角色id 一个 账户id 可以对应多个角色id 本次需求只针对输入侧调整, 数据抽取聚合逻辑部分和之前保持一致 --------------- 需求补充 v1.4 增加一个sheet "单元总结记录", 导出对应角色id的单元总结记录。 参考 export_unit_summary.py 中的原始数据提取方案即可(不必关注其中的数据统计部分)。 其他已有逻辑保持不动哦。 ---------------- 需求补充 v1.5 1."互动组件学习记录"sheet 增加以下字段 "互动组件名称"、"组件标题"、"组件配置摘要"、"知识点": 字段取值规则: 根据 c_type 及组件配置(从mysql表获取) 进行映射和处理: ``` 1).如果 c_type 开头为"mid" 则读取下表:表名:middle_interaction_component 获取以下字段值: title (作为组件标题) component_config (完整的组件配置) 获取其中 的 question 字段值 作为 组件配置摘要; kp_relation_info 字段值 作为 知识点 "互动组件名称"规则: "物品互动": "mid_vocab_item", "图片互动": "mid_vocab_image", "填词互动": "mid_vocab_fillBlank", "指令互动": "mid_vocab_instruction" "对话互动-表达": "mid_sentence_dialogue", 且 component_config->question->mode == "express" "对话互动-朗读": "mid_sentence_dialogue", 且 component_config->question->mode == "read" "语音互动": "mid_sentence_voice", "材料互动": "mid_sentence_material", "造句互动": "mid_sentence_makeSentence" "挖空互动": "mid_grammar_cloze", "组句互动": "mid_grammar_sentence" "发音互动": "mid_pron_pron" 2). 如果 c_type 开头为"core" 则读取下表:表名:core_interaction_component 获取以下字段值: title (作为组件标题) component_config (完整的组件配置) 获取其中 的 taskInfo 字段值 作为 组件配置摘要 kp_relation_info 字段值 作为 知识点 "互动组件名称"规则: "口语快答": "core_speaking_reply", "口语妙问": "core_speaking_inquiry", "口语探讨": "core_speaking_explore", "口语独白": "core_speaking_monologue" "合作阅读": "core_reading_order", "合作听力": "core_listening_order", "看图组句": "core_writing_imgMakeSentence", "看图撰写": "core_writing_imgWrite", "问题组句": "core_writing_questionMakeSentence", "问题撰写": "core_writing_questionWrite", ``` 2."课程巩固记录" sheet 增加以下字段 "正确率": 参考 export_lesson_review.py 中的计算逻辑 3. 新增一个"汇总统计"sheet 统计并展示以下内容 请以 可读性 比较好的方式排列、展示 a. "所有互动-按互动组件类型-通过情况统计" 以每种"互动组件名称"进行聚合 统计play_result的取值分布情况,算以下指标: 总数量、Perfect数量、Good数量、Failed数量、Pass数量、Perfect比例、Good比例、Failed比例、Pass比例 b. "中互动组件-按知识点-通过情况统计" 以每个知识点进行聚合 其中 知识点配置格式如下: ``` [{"kpId":"0000004","kpType":"sentence","kpTitle":"My name is ...","kpSkill":"sentence_pron","kpSkillName":"语音"},{"kpId":"0000004","kpType":"sentence","kpTitle":"My name is ...","kpSkill":"sentence_meaning","kpSkillName":"语义"},{"kpId":"0000005","kpType":"sentence","kpTitle":"I'm… years old.","kpSkill":"sentence_pron","kpSkillName":"语音"},{"kpId":"0000005","kpType":"sentence","kpTitle":"I'm… years old.","kpSkill":"sentence_meaning","kpSkillName":"语义"},{"kpId":"0000014","kpType":"sentence","kpTitle":"Nice to meet you.","kpSkill":"sentence_pron","kpSkillName":"语音"},{"kpId":"0000014","kpType":"sentence","kpTitle":"Nice to meet you.","kpSkill":"sentence_meaning","kpSkillName":"语义"}] ``` 一个组件可以绑定多个知识点,以每个知识点的 kpId + kpType + kpTitle 进行 展示及聚合 对所有绑定了某个知识点的中互动组件(c_type以mid开头) 统计play_result的取值分布情况,算以下指标: 总数量、Perfect数量、Good数量、Failed数量、Pass数量、Perfect比例、Good比例、Failed比例、Pass比例 c. "单元总结-按单元统计时长" 将"单元总结记录"中的"play_time_seconds"字段值 以每个单元id 进行聚合 进行 累加 统计,并增加一列 转换为分钟为单位 取整数 """ # ==== 可直接修改的脚本变量(不使用命令行传参) ==== # 三种模式互斥,只能配置一个: # 模式1:单个角色id USER_ID = None # 单个角色ID,示例:2911 # 模式2:角色id列表(多个角色id批量导出) USER_ID_LIST = None # 角色ID列表,示例:[2911, 2912, 2913] # 模式3:账户id列表(通过账户id查询对应的角色id后批量导出) ACCOUNT_ID_LIST = [9343] # 账户ID列表,示例:[100, 101, 102] OUTPUT_DIR = "output/" # 输出目录,默认为output文件夹 # ==== 变量结束 ==== import os import json import re from typing import Any, Dict, List, Optional import datetime try: import requests except Exception: requests = None try: import psycopg2 from psycopg2.extras import RealDictCursor except Exception: psycopg2 = None RealDictCursor = None try: import pymysql import pymysql.cursors except Exception: pymysql = None try: import pandas as pd except Exception: pd = None try: import urllib3 except Exception: urllib3 = None SHEET1_COLUMNS = [ "userId", "userMsg", "source", "userName", "soeData", "audioUrl", "asrStatus", "componentId", "componentType", "dataVersion", "timeStr", ] SHEET2_COLUMNS = [ "user_id", "component_unique_code", "session_id", "c_type", "c_id", "互动组件名称", "组件标题", "组件配置摘要", "知识点", "play_result", "user_behavior_info", "updated_at", ] SHEET3_COLUMNS = [ "user_id", "unit_id", "lesson_id", "question_list", "正确率", "updated_at", ] SHEET4_COLUMNS = [ "user_id", "unit_id", "category", "score_text", "question_list", "updated_at", ] SHEET5_COLUMNS = [ "id", "user_id", "unit_id", "updated_at", "km_id", "km_type", "play_time_seconds", ] def _load_env_file(path: str) -> None: if not os.path.exists(path): return try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" not in line: continue k, v = line.split("=", 1) k = k.strip() v = v.strip().strip('"').strip("'") if k and (os.getenv(k) is None): os.environ[k] = v except Exception: pass def load_env() -> None: _load_env_file(os.path.join(os.getcwd(), ".env")) _load_env_file(os.path.join(os.getcwd(), ".env.local")) def to_json_str(v: Any) -> Any: if isinstance(v, (dict, list)): try: return json.dumps(v, ensure_ascii=False) except Exception: return str(v) return v def parse_time(value: Any) -> Optional[datetime.datetime]: if value is None: return None if isinstance(value, (int, float)): try: v = float(value) # 兼容毫秒级时间戳 if v > 1e11: v = v / 1000.0 return datetime.datetime.fromtimestamp(v) except Exception: return None if isinstance(value, str): fmts = [ "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%d %H:%M:%S", "%Y-%m-%d", ] for fmt in fmts: try: return datetime.datetime.strptime(value, fmt) except Exception: continue try: return datetime.datetime.fromisoformat(value) except Exception: return None return None def pick_time(source: Dict[str, Any]) -> Optional[datetime.datetime]: candidates = [ "updated_at", "created_at", "@timestamp", "timestamp", "updatedAt", "createdAt", "time", "ts", "timeStr", "update_time", "create_time", ] for key in candidates: if key in source: t = parse_time(source.get(key)) if t is not None: return t # 宽松匹配:尝试扫描所有可能的时间相关字段 for k, v in source.items(): lk = str(k).lower() if any(s in lk for s in ["time", "date", "_at", "timestamp"]): t = parse_time(v) if t is not None: return t return None def extract_makee_id_from_user_msg(user_msg: Any) -> Optional[str]: # 支持dict或字符串形式 if isinstance(user_msg, dict): mk = user_msg.get("makee_id") if isinstance(mk, str) and mk: return mk if isinstance(user_msg, str) and user_msg: # 1) 尝试整体解析为JSON try: obj = json.loads(user_msg) mk = obj.get("makee_id") if isinstance(mk, str) and mk: return mk except Exception: pass # 2) 尝试截取大括号中的JSON try: start = user_msg.find("{") end = user_msg.rfind("}") if start != -1 and end != -1 and end > start: candidate = user_msg[start : end + 1] obj = json.loads(candidate) mk = obj.get("makee_id") if isinstance(mk, str) and mk: return mk except Exception: pass # 3) 正则匹配 makee_id m = re.search(r"\bmakee_id\b\s*:\s*\"([^\"]+)\"", user_msg) if m: return m.group(1) return None def fetch_es_asr_log(makee_id: str, es_cfg: Dict[str, Any]) -> Optional[Dict[str, Any]]: if requests is None: raise RuntimeError("缺少requests依赖,请安装后再运行。") host = es_cfg.get("host") port = es_cfg.get("port") scheme = es_cfg.get("scheme", "http") user = es_cfg.get("user") password = es_cfg.get("password") index = "llm_asr_log" if not host: return None base = f"{scheme}://{host}:{port}" url = f"{base}/{index}/_search" headers = {"Content-Type": "application/json"} body = { "query": { "bool": { "should": [ {"term": {"makee_id": {"value": str(makee_id)}}}, {"term": {"makee_id.keyword": {"value": str(makee_id)}}}, ], "minimum_should_match": 1, } }, "size": 10, "_source": [ "makee_id", "result_text", "source", "updated_at", "created_at", "@timestamp", "timestamp", "updatedAt", "createdAt", "time", "ts", "timeStr", "update_time", "create_time", ], } auth = (user, password) if user and password else None try: if scheme == "https" and urllib3 is not None: try: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) except Exception: pass resp = requests.post(url, headers=headers, json=body, auth=auth, timeout=20, verify=False if scheme == "https" else True) resp.raise_for_status() data = resp.json() except Exception: return None hits = data.get("hits", {}).get("hits", []) if not hits: return None # 选最新的 chosen = None best_t = None for h in hits: src = h.get("_source", {}) or {} t = pick_time(src) if t is None: continue if best_t is None or t > best_t: best_t = t chosen = src if chosen is None: # 如果都没有时间,选第一条 chosen = (hits[0].get("_source", {}) or {}) return chosen def get_es_config() -> Dict[str, Any]: return { "host": os.getenv("ES_HOST"), "port": os.getenv("ES_PORT", "9200"), "scheme": os.getenv("ES_SCHEME", "http"), "user": os.getenv("ES_USER"), "password": os.getenv("ES_PASSWORD"), "index": "user-audio", } def fetch_es_user_audio(user_id: str, es_cfg: Dict[str, Any]) -> List[Dict[str, Any]]: if requests is None: raise RuntimeError("缺少requests依赖,请安装后再运行。") print(f" [ES] 开始查询user-audio索引...") start_time = datetime.datetime.now() host = es_cfg.get("host") port = es_cfg.get("port") scheme = es_cfg.get("scheme", "http") user = es_cfg.get("user") password = es_cfg.get("password") index = es_cfg.get("index", "user-audio") if not host: return [] base = f"{scheme}://{host}:{port}" url = f"{base}/{index}/_search" headers = {"Content-Type": "application/json"} body = { "query": { "bool": { "should": [ {"term": {"userId": {"value": str(user_id)}}}, {"term": {"userId.keyword": {"value": str(user_id)}}}, ], "minimum_should_match": 1, } }, "size": 10000, "_source": [ "userId", "userMsg", "userName", "soeData", "audioUrl", "asrStatus", "componentId", "componentType", "dataVersion", "updated_at", "created_at", "@timestamp", "timestamp", "updatedAt", "createdAt", "time", "ts", "timeStr", "update_time", "create_time", ], } auth = (user, password) if user and password else None try: # 抑制自签证书下的HTTPS不安全警告 if scheme == "https" and urllib3 is not None: try: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) except Exception: pass resp = requests.post(url, headers=headers, json=body, auth=auth, timeout=30, verify=False if scheme == "https" else True) resp.raise_for_status() data = resp.json() except Exception as e: raise RuntimeError(f"ES查询失败: {e}") hits = data.get("hits", {}).get("hits", []) print(f" [ES] 查询完成,获得{len(hits)}条记录,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}秒") if not hits: return [] print(f" [ES] 开始处理音频数据...") process_start = datetime.datetime.now() rows: List[Dict[str, Any]] = [] asr_cache: Dict[str, Dict[str, Any]] = {} makee_id_count = 0 for idx, h in enumerate(hits, 1): # 每处理100条显示一次进度 if idx % 100 == 0 or idx == len(hits): print(f" [ES] 处理进度: {idx}/{len(hits)} ({idx*100//len(hits)}%)") src = h.get("_source", {}) or {} row = { "userId": src.get("userId"), "userMsg": src.get("userMsg"), "source": None, "userName": src.get("userName"), "soeData": to_json_str(src.get("soeData")), "audioUrl": src.get("audioUrl"), "asrStatus": src.get("asrStatus"), "componentId": src.get("componentId"), "componentType": src.get("componentType"), "dataVersion": src.get("dataVersion"), } t = pick_time(src) row["_time"] = t.isoformat() if t else None row["timeStr"] = t.strftime("%Y-%m-%d %H:%M:%S") if t else None # v1.2: 当userMsg包含makee_id时,补充查询llm_asr_log并回填 mk = extract_makee_id_from_user_msg(row.get("userMsg")) if mk: makee_id_count += 1 asr_doc = asr_cache.get(mk) if asr_doc is None: asr_doc = fetch_es_asr_log(mk, es_cfg) if asr_doc is not None: asr_cache[mk] = asr_doc if asr_doc is not None: rt = asr_doc.get("result_text") if rt: row["userMsg"] = rt row["source"] = to_json_str(asr_doc.get("source")) rows.append(row) print(f" [ES] 数据处理完成,发现{makee_id_count}条包含makee_id的记录,耗时{(datetime.datetime.now() - process_start).total_seconds():.2f}秒") print(f" [ES] 开始排序...") rows.sort(key=lambda x: parse_time(x.get("_time")) or datetime.datetime.min, reverse=True) print(f" [ES] 音频数据处理完成,总耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}秒") return rows def get_pg_conn() -> Any: if psycopg2 is None: raise RuntimeError("缺少psycopg2依赖,请安装后再运行。") host = os.getenv("PG_DB_HOST") port = int(os.getenv("PG_DB_PORT", "5432")) user = os.getenv("PG_DB_USER") password = os.getenv("PG_DB_PASSWORD") dbname = os.getenv("PG_DB_DATABASE") if not host or not dbname: raise RuntimeError("PG数据库环境变量未配置完整") conn = psycopg2.connect(host=host, port=port, user=user, password=password, dbname=dbname) return conn def get_mysql_conn(database: str) -> Any: """ 获取MySQL数据库连接 Args: database: 数据库名,可选值:'vala_user' 或 'vala_test' vala_user 使用 online 配置(环境变量后缀 _online) vala_test 使用默认配置 Returns: MySQL连接对象 """ if pymysql is None: raise RuntimeError("缺少pymysql依赖,请安装后再运行。") # 根据数据库选择不同的环境变量配置 if database == "vala_user": # vala_user 数据库使用 online 配置 host = os.getenv("MYSQL_HOST_online") port = int(os.getenv("MYSQL_PORT_online", "3306")) user = os.getenv("MYSQL_USERNAME_online") password = os.getenv("MYSQL_PASSWORD_online") if not host: raise RuntimeError("MySQL数据库环境变量未配置完整(缺少MYSQL_HOST_online)") else: # vala_test 等其他数据库使用默认配置 host = os.getenv("MYSQL_HOST") port = int(os.getenv("MYSQL_PORT", "3306")) user = os.getenv("MYSQL_USERNAME") password = os.getenv("MYSQL_PASSWORD") if not host: raise RuntimeError("MySQL数据库环境变量未配置完整(缺少MYSQL_HOST)") conn = pymysql.connect( host=host, port=port, user=user, password=password, database=database, # 直接使用传入的数据库名 charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, ) return conn def get_id_2_unit_index(conn: Any) -> Dict[int, int]: """ 从MySQL获取 story_id 到 unit_id 的映射关系 Args: conn: MySQL数据库连接 Returns: 映射字典 {story_id: unit_id} """ sql = """ SELECT * FROM `vala_game_info` WHERE id > 0 AND `vala_game_info`.`deleted_at` IS NULL ORDER BY season_package_id asc, `index` asc """ try: with conn.cursor() as cur: cur.execute(sql) rows = cur.fetchall() or [] # 构建映射表:按查询结果的顺序,索引即为unit_id id_2_unit_index = {} for index, row in enumerate(rows): id_2_unit_index[row["id"]] = index return id_2_unit_index except Exception as e: print(f"[ERROR] 获取story_id到unit_id映射失败: {e}") return {} def get_chapter_id_to_lesson_id(conn: Any) -> Dict[int, int]: """ 从MySQL获取 chapter_id 到 lesson_id 的映射关系 Args: conn: MySQL数据库连接 Returns: 映射字典 {chapter_id: lesson_id} """ sql = """ SELECT id, `index` FROM `vala_game_chapter` WHERE deleted_at IS NULL """ try: with conn.cursor() as cur: cur.execute(sql) rows = cur.fetchall() or [] # 构建映射表:chapter的index字段即为lesson_id chapter_id_to_lesson_id = {} for row in rows: chapter_id_to_lesson_id[row["id"]] = row["index"] return chapter_id_to_lesson_id except Exception as e: print(f"[ERROR] 获取chapter_id到lesson_id映射失败: {e}") return {} # 组件类型到组件名称的映射 COMPONENT_TYPE_NAMES = { "mid_vocab_item": "物品互动", "mid_vocab_image": "图片互动", "mid_vocab_fillBlank": "填词互动", "mid_vocab_instruction": "指令互动", "mid_sentence_dialogue": "对话互动", # 需要根据mode进一步判断 "mid_sentence_voice": "语音互动", "mid_sentence_material": "材料互动", "mid_sentence_makeSentence": "造句互动", "mid_grammar_cloze": "挖空互动", "mid_grammar_sentence": "组句互动", "mid_pron_pron": "发音互动", "core_speaking_reply": "口语快答", "core_speaking_inquiry": "口语妙问", "core_speaking_explore": "口语探讨", "core_speaking_monologue": "口语独白", "core_reading_order": "合作阅读", "core_listening_order": "合作听力", "core_writing_imgMakeSentence": "看图组句", "core_writing_imgWrite": "看图撰写", "core_writing_questionMakeSentence": "问题组句", "core_writing_questionWrite": "问题撰写", } def get_component_name(c_type: str, component_config: Optional[Dict[str, Any]]) -> str: """ 根据c_type和组件配置获取组件名称 Args: c_type: 组件类型 component_config: 组件配置(用于判断对话互动的mode) Returns: 组件名称 """ if not c_type: return "" # 特殊处理:对话互动需要根据mode判断 if c_type == "mid_sentence_dialogue" and component_config: try: question = component_config.get("question", {}) mode = question.get("mode", "") if mode == "express": return "对话互动-表达" elif mode == "read": return "对话互动-朗读" except Exception: pass return COMPONENT_TYPE_NAMES.get(c_type, "") def batch_fetch_component_configs(play_records: List[Dict[str, Any]], mysql_conn: Any) -> Dict[str, Dict[str, Any]]: """ 批量查询组件配置信息 Args: play_records: 播放记录列表 mysql_conn: MySQL连接 Returns: 组件配置映射 {c_type_c_id: {title, component_config, kp_relation_info}} """ print(f" [MySQL] 开始批量查询组件配置...") start_time = datetime.datetime.now() # 收集需要查询的c_type和c_id mid_c_ids = set() core_c_ids = set() mid_type_id_pairs = [] # 用于调试日志 core_type_id_pairs = [] for record in play_records: c_type = record.get("c_type", "") c_id = record.get("c_id") if c_type and c_id: if c_type.startswith("mid"): mid_c_ids.add(c_id) mid_type_id_pairs.append((c_type, c_id)) elif c_type.startswith("core"): core_c_ids.add(c_id) core_type_id_pairs.append((c_type, c_id)) print(f" [MySQL] 需要查询中互动组件: {len(mid_c_ids)}个, 核心互动组件: {len(core_c_ids)}个") if mid_c_ids: print(f" [MySQL] 中互动组件ID列表(前10个): {sorted(list(mid_c_ids))[:10]}") if core_c_ids: print(f" [MySQL] 核心互动组件ID列表(前10个): {sorted(list(core_c_ids))[:10]}") config_map = {} # 批量查询middle_interaction_component if mid_c_ids: try: with mysql_conn.cursor() as cur: placeholders = ','.join(['%s'] * len(mid_c_ids)) sql = f""" SELECT c_id, c_type, title, component_config, kp_relation_info FROM middle_interaction_component WHERE c_id IN ({placeholders}) AND deleted_at IS NULL """ print(f" [MySQL] 执行中互动组件查询,查询条件: c_id IN ({len(mid_c_ids)}个ID)") cur.execute(sql, tuple(mid_c_ids)) rows = cur.fetchall() or [] print(f" [MySQL] 查询到{len(rows)}条中互动组件配置") if len(rows) == 0 and len(mid_c_ids) > 0: print(f" [MySQL] [警告] 查询结果为空!可能的原因:") print(f" [MySQL] - 数据库中没有匹配的c_id记录") print(f" [MySQL] - deleted_at字段不为NULL") print(f" [MySQL] - c_id不存在") for idx, row in enumerate(rows): c_type = row.get("c_type", "") c_id = row.get("c_id") key = f"{c_type}_{c_id}" if idx < 3: # 输出前3条的详细信息 print(f" [MySQL] [样例{idx+1}] id={c_id}, c_type={c_type}, key={key}") print(f" [MySQL] [样例{idx+1}] title={row.get('title', '')[:50]}") # 解析component_config component_config = row.get("component_config") if isinstance(component_config, str): try: component_config = json.loads(component_config) except Exception as e: print(f" [MySQL] [警告] 解析component_config失败 (id={c_id}): {e}") component_config = {} # 提取question字段作为摘要 summary = "" if isinstance(component_config, dict): question = component_config.get("question") summary = to_json_str(question) if question else "" if idx < 3 and question: print(f" [MySQL] [样例{idx+1}] 提取到question字段,长度: {len(summary)}") # 解析kp_relation_info kp_relation_info = row.get("kp_relation_info") if isinstance(kp_relation_info, str): try: kp_relation_info = json.loads(kp_relation_info) except Exception: kp_relation_info = [] config_map[key] = { "title": row.get("title", ""), "component_config": component_config, "summary": summary, "kp_relation_info": to_json_str(kp_relation_info), } print(f" [MySQL] 中互动组件配置已加入config_map,当前map大小: {len(config_map)}") except Exception as e: print(f" [MySQL] [错误] 查询中互动组件配置失败: {e}") import traceback traceback.print_exc() # 批量查询core_interaction_component if core_c_ids: try: with mysql_conn.cursor() as cur: placeholders = ','.join(['%s'] * len(core_c_ids)) sql = f""" SELECT c_id, c_type, title, component_config, kp_relation_info FROM core_interaction_component WHERE c_id IN ({placeholders}) AND deleted_at IS NULL """ print(f" [MySQL] 执行核心互动组件查询,查询条件: c_id IN ({len(core_c_ids)}个ID)") cur.execute(sql, tuple(core_c_ids)) rows = cur.fetchall() or [] print(f" [MySQL] 查询到{len(rows)}条核心互动组件配置") if len(rows) == 0 and len(core_c_ids) > 0: print(f" [MySQL] [警告] 查询结果为空!可能的原因:") print(f" [MySQL] - 数据库中没有匹配的c_id记录") print(f" [MySQL] - deleted_at字段不为NULL") print(f" [MySQL] - c_id不存在") for idx, row in enumerate(rows): c_type = row.get("c_type", "") c_id = row.get("c_id") key = f"{c_type}_{c_id}" if idx < 3: # 输出前3条的详细信息 print(f" [MySQL] [样例{idx+1}] id={c_id}, c_type={c_type}, key={key}") print(f" [MySQL] [样例{idx+1}] title={row.get('title', '')[:50]}") # 解析component_config component_config = row.get("component_config") if isinstance(component_config, str): try: component_config = json.loads(component_config) except Exception as e: print(f" [MySQL] [警告] 解析component_config失败 (id={c_id}): {e}") component_config = {} # 提取taskInfo字段作为摘要 summary = "" if isinstance(component_config, dict): task_info = component_config.get("taskInfo") summary = to_json_str(task_info) if task_info else "" if idx < 3 and task_info: print(f" [MySQL] [样例{idx+1}] 提取到taskInfo字段,长度: {len(summary)}") # 解析kp_relation_info kp_relation_info = row.get("kp_relation_info") if isinstance(kp_relation_info, str): try: kp_relation_info = json.loads(kp_relation_info) except Exception: kp_relation_info = [] config_map[key] = { "title": row.get("title", ""), "component_config": component_config, "summary": summary, "kp_relation_info": to_json_str(kp_relation_info), } print(f" [MySQL] 核心互动组件配置已加入config_map,当前map大小: {len(config_map)}") except Exception as e: print(f" [MySQL] [错误] 查询核心互动组件配置失败: {e}") import traceback traceback.print_exc() print(f" [MySQL] 组件配置查询完成,共{len(config_map)}条,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}秒") return config_map def calculate_accuracy(question_list: Any) -> float: """ 计算问题列表的正确率 Args: question_list: 问题列表(可能是JSON字符串或list) Returns: 正确率(百分比,保留2位小数) """ try: if isinstance(question_list, str): question_list = json.loads(question_list) if not isinstance(question_list, list) or len(question_list) == 0: return 0.0 total = len(question_list) correct = sum(1 for q in question_list if q.get('isRight') == True) accuracy = round(correct / total * 100, 2) if total > 0 else 0.0 return accuracy except Exception: return 0.0 def fetch_character_ids_by_account(account_id: str, conn: Any) -> List[str]: """根据账户id查询对应的角色id列表""" sql = "SELECT id FROM vala_app_character WHERE account_id = %s" try: with conn.cursor() as cur: cur.execute(sql, (account_id,)) rows = cur.fetchall() or [] return [str(row["id"]) for row in rows if row.get("id")] except Exception as e: print(f"[ERROR] 查询账户id={account_id}的角色id失败: {e}") return [] def fetch_pg_play_records(user_id: str, conn: Any, mysql_conn: Any) -> List[Dict[str, Any]]: """ 查询互动组件学习记录并补充组件配置信息 Args: user_id: 用户ID(角色ID) conn: PostgreSQL数据库连接 mysql_conn: MySQL数据库连接 Returns: 互动组件学习记录列表 """ print(f" [PG] 开始查询互动组件学习记录(8张分表)...") start_time = datetime.datetime.now() tables = [f"user_component_play_record_{i}" for i in range(8)] rows: List[Dict[str, Any]] = [] with conn.cursor(cursor_factory=RealDictCursor) as cur: for t in tables: try: cur.execute( f""" SELECT user_id, component_unique_code, session_id, c_type, c_id, play_result, user_behavior_info, updated_at FROM {t} WHERE user_id = %s ORDER BY updated_at DESC """, (user_id,), ) part = cur.fetchall() or [] if part: print(f" [PG] 表{t}查到{len(part)}条记录") for r in part: r = dict(r) r["play_result"] = to_json_str(r.get("play_result")) r["user_behavior_info"] = to_json_str(r.get("user_behavior_info")) # 将带时区的时间转换为无时区,避免Excel写入报错 upd = r.get("updated_at") if isinstance(upd, datetime.datetime): try: if upd.tzinfo is not None and upd.tzinfo.utcoffset(upd) is not None: r["updated_at"] = upd.replace(tzinfo=None) except Exception: # 回退为字符串 r["updated_at"] = str(upd) rows.append(r) except Exception as e: print(f" [PG] 表{t}查询失败: {e}") continue rows.sort(key=lambda x: parse_time(x.get("updated_at")) or datetime.datetime.min, reverse=True) print(f" [PG] 互动组件学习记录查询完成,共{len(rows)}条,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}秒") # 批量查询组件配置 if rows and mysql_conn: config_map = batch_fetch_component_configs(rows, mysql_conn) # 补充组件信息 print(f" [PG] 开始补充组件配置信息...") filled_count = 0 empty_count = 0 sample_keys = [] sample_mode_check = [] # 检查对话互动的mode for r in rows: c_type = r.get("c_type", "") c_id = r.get("c_id") key = f"{c_type}_{c_id}" if c_type and c_id else "" config = config_map.get(key, {}) component_config = config.get("component_config", {}) component_name = get_component_name(c_type, component_config) r["互动组件名称"] = component_name r["组件标题"] = config.get("title", "") r["组件配置摘要"] = config.get("summary", "") r["知识点"] = config.get("kp_relation_info", "") # 统计填充情况 if config: filled_count += 1 if len(sample_keys) < 3: sample_keys.append((key, component_name, r["组件标题"][:30] if r["组件标题"] else "")) # 检查对话互动的mode if c_type == "mid_sentence_dialogue" and len(sample_mode_check) < 3: mode = "" if isinstance(component_config, dict): question = component_config.get("question", {}) if isinstance(question, dict): mode = question.get("mode", "") sample_mode_check.append({ "key": key, "mode": mode, "component_name": component_name }) else: empty_count += 1 if empty_count <= 5: # 输出前5个未匹配的key print(f" [PG] [警告] 未找到组件配置: key={key}") print(f" [PG] 组件配置信息补充完成") print(f" [PG] 匹配到配置: {filled_count}条, 未匹配: {empty_count}条") if sample_keys: print(f" [PG] 样例数据(前3条):") for key, name, title in sample_keys: print(f" [PG] - key={key}, 名称={name}, 标题={title}") if sample_mode_check: print(f" [PG] 对话互动mode检查(前3条):") for s in sample_mode_check: print(f" [PG] - key={s['key']}, mode={s['mode']}, 最终名称={s['component_name']}") return rows def fetch_pg_unit_review(user_id: str, conn: Any, id_2_unit_index: Dict[int, int], chapter_id_to_lesson_id: Dict[int, int]) -> List[Dict[str, Any]]: """ 查询课程巩固记录 Args: user_id: 用户ID(角色ID) conn: PostgreSQL数据库连接 id_2_unit_index: story_id到unit_id的映射字典 chapter_id_to_lesson_id: chapter_id到lesson_id的映射字典 Returns: 课程巩固记录列表 """ print(f" [PG] 开始查询课程巩固记录...") start_time = datetime.datetime.now() sql = ( "SELECT user_id, story_id, chapter_id, question_list, updated_at " "FROM user_unit_review_question_result WHERE user_id = %s ORDER BY updated_at DESC" ) with conn.cursor(cursor_factory=RealDictCursor) as cur: try: cur.execute(sql, (user_id,)) rows = cur.fetchall() or [] except Exception as e: print(f" [PG] 课程巩固记录查询失败: {e}") rows = [] out: List[Dict[str, Any]] = [] for r in rows: d = dict(r) # 映射 story_id 到 unit_id story_id = d.get("story_id") unit_id = id_2_unit_index.get(story_id) if story_id else None d["unit_id"] = unit_id # 映射 chapter_id 到 lesson_id chapter_id = d.get("chapter_id") lesson_id = chapter_id_to_lesson_id.get(chapter_id) if chapter_id else None d["lesson_id"] = lesson_id # 计算正确率 question_list = d.get("question_list") d["正确率"] = calculate_accuracy(question_list) d["question_list"] = to_json_str(question_list) upd = d.get("updated_at") if isinstance(upd, datetime.datetime): try: if upd.tzinfo is not None and upd.tzinfo.utcoffset(upd) is not None: d["updated_at"] = upd.replace(tzinfo=None) except Exception: d["updated_at"] = str(upd) out.append(d) print(f" [PG] 课程巩固记录查询完成,共{len(out)}条,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}秒") return out def fetch_pg_unit_challenge(user_id: str, conn: Any, id_2_unit_index: Dict[int, int]) -> List[Dict[str, Any]]: """ 查询单元挑战记录 Args: user_id: 用户ID(角色ID) conn: PostgreSQL数据库连接 id_2_unit_index: story_id到unit_id的映射字典 Returns: 单元挑战记录列表 """ print(f" [PG] 开始查询单元挑战记录...") start_time = datetime.datetime.now() sql = ( "SELECT user_id, story_id, category, score_text, question_list, updated_at " "FROM user_unit_challenge_question_result WHERE user_id = %s ORDER BY updated_at DESC" ) with conn.cursor(cursor_factory=RealDictCursor) as cur: try: cur.execute(sql, (user_id,)) rows = cur.fetchall() or [] except Exception as e: print(f" [PG] 单元挑战记录查询失败: {e}") rows = [] out: List[Dict[str, Any]] = [] for r in rows: d = dict(r) # 映射 story_id 到 unit_id story_id = d.get("story_id") unit_id = id_2_unit_index.get(story_id) if story_id else None d["unit_id"] = unit_id d["question_list"] = to_json_str(d.get("question_list")) upd = d.get("updated_at") if isinstance(upd, datetime.datetime): try: if upd.tzinfo is not None and upd.tzinfo.utcoffset(upd) is not None: d["updated_at"] = upd.replace(tzinfo=None) except Exception: d["updated_at"] = str(upd) out.append(d) print(f" [PG] 单元挑战记录查询完成,共{len(out)}条,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}秒") return out def fetch_pg_unit_summary(user_id: str, conn: Any, id_2_unit_index: Dict[int, int]) -> List[Dict[str, Any]]: """ 查询单元总结知识点结果数据 Args: user_id: 用户ID(角色ID) conn: PostgreSQL数据库连接 id_2_unit_index: story_id到unit_id的映射字典 Returns: 单元总结记录列表 """ print(f" [PG] 开始查询单元总结记录...") start_time = datetime.datetime.now() sql = ( "SELECT id, user_id, story_id, updated_at, km_id, km_type, play_time " "FROM user_unit_summary_km_result WHERE user_id = %s AND deleted_at IS NULL ORDER BY updated_at DESC" ) with conn.cursor(cursor_factory=RealDictCursor) as cur: try: cur.execute(sql, (user_id,)) rows = cur.fetchall() or [] except Exception as e: print(f" [PG] 单元总结记录查询失败: {e}") rows = [] out: List[Dict[str, Any]] = [] for r in rows: d = dict(r) # 映射 story_id 到 unit_id story_id = d.get("story_id") unit_id = id_2_unit_index.get(story_id) if story_id else None d["unit_id"] = unit_id # 转换 play_time (毫秒) 为秒 (整数) play_time = d.get("play_time") d["play_time_seconds"] = play_time // 1000 if play_time else 0 # 移除时区信息 upd = d.get("updated_at") if isinstance(upd, datetime.datetime): try: if upd.tzinfo is not None and upd.tzinfo.utcoffset(upd) is not None: d["updated_at"] = upd.replace(tzinfo=None) except Exception: d["updated_at"] = str(upd) out.append(d) print(f" [PG] 单元总结记录查询完成,共{len(out)}条,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}秒") return out def generate_statistics(sheet2_rows: List[Dict[str, Any]], sheet5_rows: List[Dict[str, Any]]) -> tuple: """ 生成汇总统计数据 Args: sheet2_rows: 互动组件学习记录 sheet5_rows: 单元总结记录 Returns: (组件统计DataFrame, 知识点统计DataFrame, 单元时长统计DataFrame) """ if pd is None: raise RuntimeError("缺少pandas依赖,请安装后再运行。") print(f" [统计] 开始生成汇总统计数据...") start_time = datetime.datetime.now() from collections import defaultdict # ============ a. 所有互动-按互动组件类型-通过情况统计 ============ component_stats_data = [] component_stats = defaultdict(lambda: {"Perfect": 0, "Good": 0, "Failed": 0, "Pass": 0, "Oops": 0, "total": 0}) # 用于调试 sample_results = [] parse_error_count = 0 for idx, record in enumerate(sheet2_rows): component_name = record.get("互动组件名称", "") if not component_name: continue play_result_str = record.get("play_result", "") # 解析play_result result = "" try: # 先判断是否是简单的字符串(Perfect/Good/Failed/Pass/Oops) if isinstance(play_result_str, str): # 去除空格后检查 stripped = play_result_str.strip() if stripped in ["Perfect", "Good", "Failed", "Pass", "Oops"]: # 直接使用 result = stripped else: # 尝试JSON解析 try: play_result = json.loads(play_result_str) if isinstance(play_result, dict): result = play_result.get("result", "") else: result = "" except: result = "" else: # 如果不是字符串,尝试当dict处理 if isinstance(play_result_str, dict): result = play_result_str.get("result", "") else: result = "" # 收集前3个样例 if idx < 3: sample_results.append({ "component": component_name, "raw": str(play_result_str)[:100], "result": result }) except Exception as e: parse_error_count += 1 if parse_error_count <= 3: print(f" [统计] [警告] 解析play_result失败 (第{idx+1}条): {e}, 原始值: {str(play_result_str)[:100]}") result = "" component_stats[component_name]["total"] += 1 if result in ["Perfect", "Good", "Failed", "Pass", "Oops"]: component_stats[component_name][result] += 1 print(f" [统计] play_result解析样例(前3条):") for s in sample_results: print(f" [统计] - 组件: {s['component']}, 结果: {s['result']}, 原始: {s['raw']}") if parse_error_count > 0: print(f" [统计] play_result解析失败总数: {parse_error_count}") # 生成统计数据行 for component_name in sorted(component_stats.keys()): stats = component_stats[component_name] total = stats["total"] perfect = stats["Perfect"] good = stats["Good"] failed = stats["Failed"] pass_count = stats["Pass"] oops = stats["Oops"] perfect_ratio = round(perfect / total * 100, 2) if total > 0 else 0 good_ratio = round(good / total * 100, 2) if total > 0 else 0 failed_ratio = round(failed / total * 100, 2) if total > 0 else 0 pass_ratio = round(pass_count / total * 100, 2) if total > 0 else 0 oops_ratio = round(oops / total * 100, 2) if total > 0 else 0 component_stats_data.append({ "互动组件名称": component_name, "总数量": total, "Perfect数量": perfect, "Good数量": good, "Failed数量": failed, "Pass数量": pass_count, "Oops数量": oops, "Perfect比例(%)": perfect_ratio, "Good比例(%)": good_ratio, "Failed比例(%)": failed_ratio, "Pass比例(%)": pass_ratio, "Oops比例(%)": oops_ratio, }) # ============ b. 中互动组件-按知识点-通过情况统计 ============ kp_stats_data = [] kp_stats = defaultdict(lambda: {"Perfect": 0, "Good": 0, "Failed": 0, "Pass": 0, "Oops": 0, "total": 0}) # 调试信息 mid_count = 0 has_kp_count = 0 sample_kp_records = [] for idx, record in enumerate(sheet2_rows): c_type = record.get("c_type", "") if not c_type or not c_type.startswith("mid"): continue mid_count += 1 kp_relation_info_str = record.get("知识点", "") if not kp_relation_info_str: continue has_kp_count += 1 # 解析知识点 try: if isinstance(kp_relation_info_str, str): kp_relation_info = json.loads(kp_relation_info_str) else: kp_relation_info = kp_relation_info_str if not isinstance(kp_relation_info, list): continue # 收集样例 if len(sample_kp_records) < 3: sample_kp_records.append({ "c_type": c_type, "kp_count": len(kp_relation_info), "kp_info": str(kp_relation_info)[:200] }) # 解析play_result(使用相同的逻辑) play_result_str = record.get("play_result", "") result = "" if isinstance(play_result_str, str): stripped = play_result_str.strip() if stripped in ["Perfect", "Good", "Failed", "Pass", "Oops"]: result = stripped else: try: play_result = json.loads(play_result_str) if isinstance(play_result, dict): result = play_result.get("result", "") except: pass elif isinstance(play_result_str, dict): result = play_result_str.get("result", "") # 为每个知识点统计 for kp in kp_relation_info: if not isinstance(kp, dict): continue kp_id = kp.get("kpId", "") kp_type = kp.get("kpType", "") kp_title = kp.get("kpTitle", "") if not kp_id: continue kp_key = f"{kp_id}|{kp_type}|{kp_title}" kp_stats[kp_key]["total"] += 1 if result in ["Perfect", "Good", "Failed", "Pass", "Oops"]: kp_stats[kp_key][result] += 1 except Exception as e: if len(sample_kp_records) < 5: print(f" [统计] [警告] 解析知识点失败: {e}, 原始值: {str(kp_relation_info_str)[:100]}") continue print(f" [统计] 中互动组件统计: 总数={mid_count}, 有知识点={has_kp_count}, 知识点条目数={len(kp_stats)}") if sample_kp_records: print(f" [统计] 知识点样例(前3条):") for s in sample_kp_records: print(f" [统计] - c_type={s['c_type']}, 知识点数量={s['kp_count']}, 内容={s['kp_info']}") # 生成知识点统计数据行 for kp_key in sorted(kp_stats.keys()): parts = kp_key.split("|") if len(parts) != 3: continue kp_id, kp_type, kp_title = parts stats = kp_stats[kp_key] total = stats["total"] perfect = stats["Perfect"] good = stats["Good"] failed = stats["Failed"] pass_count = stats["Pass"] oops = stats["Oops"] perfect_ratio = round(perfect / total * 100, 2) if total > 0 else 0 good_ratio = round(good / total * 100, 2) if total > 0 else 0 failed_ratio = round(failed / total * 100, 2) if total > 0 else 0 pass_ratio = round(pass_count / total * 100, 2) if total > 0 else 0 oops_ratio = round(oops / total * 100, 2) if total > 0 else 0 kp_stats_data.append({ "知识点ID": kp_id, "知识点类型": kp_type, "知识点标题": kp_title, "总数量": total, "Perfect数量": perfect, "Good数量": good, "Failed数量": failed, "Pass数量": pass_count, "Oops数量": oops, "Perfect比例(%)": perfect_ratio, "Good比例(%)": good_ratio, "Failed比例(%)": failed_ratio, "Pass比例(%)": pass_ratio, "Oops比例(%)": oops_ratio, }) # ============ c. 单元总结-按单元统计时长 ============ unit_time_stats_data = [] unit_time_stats = defaultdict(int) for record in sheet5_rows: unit_id = record.get("unit_id") play_time_seconds = record.get("play_time_seconds", 0) if unit_id is not None: unit_time_stats[unit_id] += play_time_seconds # 生成单元时长统计数据行 for unit_id in sorted(unit_time_stats.keys()): total_seconds = unit_time_stats[unit_id] total_minutes = int(total_seconds / 60) unit_time_stats_data.append({ "单元ID": f"unit_{unit_id}", "总时长(秒)": total_seconds, "总时长(分钟)": total_minutes, }) print(f" [统计] 汇总统计数据生成完成,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}秒") print(f" [统计] 生成了{len(component_stats_data)}条组件统计, {len(kp_stats_data)}条知识点统计, {len(unit_time_stats_data)}条单元时长统计") return ( pd.DataFrame(component_stats_data), pd.DataFrame(kp_stats_data), pd.DataFrame(unit_time_stats_data) ) def write_excel(path: str, sheet1_rows: List[Dict[str, Any]], sheet2_rows: List[Dict[str, Any]], sheet3_rows: List[Dict[str, Any]], sheet4_rows: List[Dict[str, Any]], sheet5_rows: List[Dict[str, Any]], stats_component_df: Any, stats_kp_df: Any, stats_unit_time_df: Any) -> None: if pd is None: raise RuntimeError("缺少pandas依赖,请安装后再运行。") print(f" [Excel] 开始写入Excel文件: {path}") start_time = datetime.datetime.now() out_dir = os.path.dirname(path) or "." os.makedirs(out_dir, exist_ok=True) with pd.ExcelWriter(path, engine="openpyxl") as writer: pd.DataFrame(sheet1_rows, columns=SHEET1_COLUMNS).to_excel(writer, sheet_name="全部音频数据", index=False) pd.DataFrame(sheet2_rows, columns=SHEET2_COLUMNS).to_excel(writer, sheet_name="互动组件学习记录", index=False) pd.DataFrame(sheet3_rows, columns=SHEET3_COLUMNS).to_excel(writer, sheet_name="课程巩固记录", index=False) pd.DataFrame(sheet4_rows, columns=SHEET4_COLUMNS).to_excel(writer, sheet_name="单元挑战记录", index=False) pd.DataFrame(sheet5_rows, columns=SHEET5_COLUMNS).to_excel(writer, sheet_name="单元总结记录", index=False) stats_component_df.to_excel(writer, sheet_name="统计-互动组件通过情况", index=False) stats_kp_df.to_excel(writer, sheet_name="统计-知识点通过情况", index=False) stats_unit_time_df.to_excel(writer, sheet_name="统计-单元总结时长", index=False) print(f" [Excel] 写入完成,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}秒") def get_date_str() -> str: """获取当前日期字符串 格式:YYYYMMDD""" return datetime.datetime.now().strftime("%Y%m%d") def export_single_user(user_id: str, es_cfg: Dict[str, Any], pg_conn: Any, mysql_conn: Any, output_path: str, id_2_unit_index: Dict[int, int], chapter_id_to_lesson_id: Dict[int, int]) -> bool: """ 导出单个角色id的数据 Args: user_id: 角色ID es_cfg: ES配置 pg_conn: PostgreSQL连接 mysql_conn: MySQL连接 output_path: 输出路径 id_2_unit_index: story_id到unit_id的映射字典 chapter_id_to_lesson_id: chapter_id到lesson_id的映射字典 Returns: True表示成功,False表示失败 """ try: print(f"\n[INFO] ========== 开始导出角色id={user_id} ==========") total_start_time = datetime.datetime.now() # 查询ES数据 sheet1_rows = fetch_es_user_audio(user_id, es_cfg) # 查询PG数据 sheet2_rows = fetch_pg_play_records(user_id, pg_conn, mysql_conn) sheet3_rows = fetch_pg_unit_review(user_id, pg_conn, id_2_unit_index, chapter_id_to_lesson_id) sheet4_rows = fetch_pg_unit_challenge(user_id, pg_conn, id_2_unit_index) sheet5_rows = fetch_pg_unit_summary(user_id, pg_conn, id_2_unit_index) # 检查是否有有效数据 total_records = len(sheet1_rows) + len(sheet2_rows) + len(sheet3_rows) + len(sheet4_rows) + len(sheet5_rows) print(f" [统计] 数据汇总:") print(f" - 全部音频数据: {len(sheet1_rows)}条") print(f" - 互动组件学习记录: {len(sheet2_rows)}条") print(f" - 课程巩固记录: {len(sheet3_rows)}条") print(f" - 单元挑战记录: {len(sheet4_rows)}条") print(f" - 单元总结记录: {len(sheet5_rows)}条") print(f" - 总计: {total_records}条") if total_records == 0: print(f"[WARN] 角色id={user_id} 没有找到任何有效记录,跳过导出") return False # 生成汇总统计数据 stats_component_df, stats_kp_df, stats_unit_time_df = generate_statistics(sheet2_rows, sheet5_rows) # 写入Excel write_excel(output_path, sheet1_rows, sheet2_rows, sheet3_rows, sheet4_rows, sheet5_rows, stats_component_df, stats_kp_df, stats_unit_time_df) total_time = (datetime.datetime.now() - total_start_time).total_seconds() print(f"[INFO] 角色id={user_id} 导出成功") print(f"[INFO] 文件路径: {output_path}") print(f"[INFO] 总耗时: {total_time:.2f}秒") print(f"[INFO] ========== 完成 ==========\n") return True except Exception as e: print(f"[ERROR] 角色id={user_id} 导出失败: {e}") import traceback traceback.print_exc() return False def main(): load_env() # 确定运行模式并收集需要导出的角色id列表 user_id_list: List[tuple] = [] # [(user_id, account_id or None), ...] date_str = get_date_str() # 检查三种模式的配置 has_user_id = USER_ID is not None has_user_id_list = USER_ID_LIST is not None and len(USER_ID_LIST) > 0 has_account_id_list = ACCOUNT_ID_LIST is not None and len(ACCOUNT_ID_LIST) > 0 # 验证只能配置一种模式 mode_count = sum([has_user_id, has_user_id_list, has_account_id_list]) if mode_count == 0: raise RuntimeError("请配置 USER_ID、USER_ID_LIST 或 ACCOUNT_ID_LIST 中的一个") if mode_count > 1: raise RuntimeError("USER_ID、USER_ID_LIST、ACCOUNT_ID_LIST 只能配置一个,请检查配置") # 模式1:单个角色id if has_user_id: user_id_list = [(str(USER_ID), None)] print(f"[INFO] 运行模式:单个角色id") # 模式2:角色id列表 elif has_user_id_list: user_id_list = [(str(uid), None) for uid in USER_ID_LIST] print(f"[INFO] 运行模式:角色id列表,共{len(user_id_list)}个角色") # 模式3:账户id列表 elif has_account_id_list: print(f"[INFO] 运行模式:账户id列表,共{len(ACCOUNT_ID_LIST)}个账户") mysql_conn = None try: mysql_conn = get_mysql_conn("vala_user") # 查询用户表,使用 vala_user 数据库 for account_id in ACCOUNT_ID_LIST: account_id_str = str(account_id) print(f"[INFO] 查询账户id={account_id_str}对应的角色id...") character_ids = fetch_character_ids_by_account(account_id_str, mysql_conn) if not character_ids: print(f"[WARN] 账户id={account_id_str} 未找到关联的角色id,跳过") continue print(f"[INFO] 账户id={account_id_str} 找到{len(character_ids)}个角色id: {character_ids}") for cid in character_ids: user_id_list.append((cid, account_id_str)) finally: if mysql_conn: try: mysql_conn.close() except Exception: pass if not user_id_list: print("[WARN] 没有需要导出的角色id,程序退出") return # 初始化连接 es_cfg = get_es_config() pg_conn = get_pg_conn() # 获取映射表(只需要查询一次,所有角色共用) print(f"\n[INFO] ===== 准备工作:获取映射表 =====") mysql_conn = None id_2_unit_index = {} chapter_id_to_lesson_id = {} try: print(f"[INFO] 正在连接MySQL数据库(vala_test)...") mysql_conn = get_mysql_conn("vala_test") # 查询游戏配置表,使用 vala_test 数据库 print(f"[INFO] 正在获取 story_id 到 unit_id 的映射...") id_2_unit_index = get_id_2_unit_index(mysql_conn) print(f"[INFO] 成功获取 {len(id_2_unit_index)} 个 story_id 映射") print(f"[INFO] 正在获取 chapter_id 到 lesson_id 的映射...") chapter_id_to_lesson_id = get_chapter_id_to_lesson_id(mysql_conn) print(f"[INFO] 成功获取 {len(chapter_id_to_lesson_id)} 个 chapter_id 映射") except Exception as e: print(f"[ERROR] 获取映射表失败: {e}") import traceback traceback.print_exc() if pg_conn: try: pg_conn.close() except Exception: pass if mysql_conn: try: mysql_conn.close() except Exception: pass return try: # 统计信息 success_count = 0 skip_count = 0 print(f"\n[INFO] ===== 开始批量导出 =====") print(f"[INFO] 共需导出{len(user_id_list)}个角色\n") batch_start_time = datetime.datetime.now() # 循环处理每个角色id for idx, (user_id, account_id) in enumerate(user_id_list, 1): print(f"\n{'='*60}") print(f"[INFO] 进度: {idx}/{len(user_id_list)} ({idx*100//len(user_id_list)}%)") print(f"{'='*60}") # 生成输出文件名 if account_id is None: # 模式1和模式2:角色id_{}_导出时间_{}.xlsx filename = f"角色id_{user_id}_导出时间_{date_str}.xlsx" else: # 模式3:账户id_{}_角色id_{}_导出时间_{}.xlsx filename = f"账户id_{account_id}_角色id_{user_id}_导出时间_{date_str}.xlsx" output_path = os.path.join(OUTPUT_DIR, filename) # 导出单个角色的数据 result = export_single_user(user_id, es_cfg, pg_conn, mysql_conn, output_path, id_2_unit_index, chapter_id_to_lesson_id) if result: success_count += 1 else: skip_count += 1 # 输出统计信息 batch_total_time = (datetime.datetime.now() - batch_start_time).total_seconds() print(f"\n{'='*60}") print(f"[INFO] ===== 全部导出完成 =====") print(f"[INFO] 总计: {len(user_id_list)}个角色") print(f"[INFO] 成功: {success_count}个") print(f"[INFO] 跳过: {skip_count}个") print(f"[INFO] 总耗时: {batch_total_time:.2f}秒 ({batch_total_time/60:.2f}分钟)") if success_count > 0: print(f"[INFO] 平均每个角色: {batch_total_time/success_count:.2f}秒") print(f"{'='*60}\n") finally: if pg_conn: try: pg_conn.close() except Exception: pass if mysql_conn: try: mysql_conn.close() except Exception: pass if __name__ == "__main__": main()