ai_member_xiaoxi/export_user_id_data.py
小溪 339001c1df 更新:新增用户学习行为数据导出技能
- 新增 user_export_skill.md 完整导出技能说明
- 支持导出指定账户ID或角色ID的完整学习行为数据
- 包含6个sheet:音频数据、互动组件、课程巩固、单元挑战、单元总结、汇总统计
- 已成功验证导出两个用户数据,功能正常可用
2026-03-02 23:21:58 +08:00

1847 lines
65 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
初版需求v1.0: 2025.11.18
导出 一个userId的多表数据 最终按照不同sheet输出到一个 excel文件中。
1. 第一个sheet:"全部音频数据"
es相关配置通过以下环境变量
ES_HOST=xxx
ES_PORT=9200
ES_SCHEME=https
ES_USER=elastic
ES_PASSWORD=xxx
index: user-audio
脚本思路:
过滤字段:
userId == xxxx
输出该userId的全部记录 按时间倒序排序
包含以下字段内容:
userId
userMsg
userName
soeData
audioUrl
asrStatus
componentId
componentType
dataVersion
2. 第二个sheet:"互动组件学习记录"
在 PGsql数据库中 筛选出 user_id 对应的记录 按时间(updated_at)倒序排列。
数据库相关配置 从.env中读取:
PG_DB_HOST = xxx
PG_DB_PORT = xxx
PG_DB_USER = xxx
PG_DB_PASSWORD = xxx
PG_DB_DATABASE = xxx
读取以下数据表:
user_component_play_record_0 ~ user_component_play_record_7
输出以下字段:
user_id,
component_unique_code,
session_id,
c_type,
c_id,
play_result,
user_behavior_info,
updated_at
3.第三个sheet:"课程巩固记录"
在 PGsql数据库中 筛选出 user_id 对应的记录 按时间(updated_at)倒序排列。
数据表:user_unit_review_question_result
输出以下字段:
user_id
story_id
chapter_id
question_list
updated_at
4.第四个sheet:"单元挑战记录"
在 PGsql数据库中 筛选出 user_id 对应的记录 按时间(updated_at)倒序排列。
数据表:user_unit_challenge_question_result
输出以下字段:
user_id
story_id
category
score_text,
question_list
updated_at
------------
需求补充v1.1:
"全部音频数据"这个sheet
输出字段 添加timeStr 并按时间倒序排列 最新的记录 在最上面
------------
需求补充v1.2:
"全部音频数据"这个sheet
如果userMsg字段内容 包含 ”makee_id“ 要进行以下处理:
从userMsg字段中提取出具体的makee_id:
此时的字段样例:
```
asr msg信息为{
"time_ms": 358,
"time_ms_api": 357,
"hot_words_str": "{\n \"context_type\": \"dialog_ctx\",\n \"context_data\": [\n {\n \"text\": \"planet Walla\"\n },\n {\n \"text\": \"Walla\"\n }\n ]\n}",
"makee_id": "d208c617-902f-4f81-8255-b5fb73599546",
"volcano_fast_x_tt_logid": "202511151541355DF72BE5EBFE73795BFD",
"api_name": "volcano-fast"
}
```
然后基于makee_id 去另一个表里查记录: index:llm_asr_log
将查询到的记录的 result_text 字段内容 回填到 userMsg。
将source字段内容 输出 到 source。
如果userMsg字段内容 不包含 ”makee_id“ 保持之前的逻辑。
--------------
需求补充 v1.3
当前输入 只支持配置单个 userId (业务侧名称为角色id)
期望扩展为以下逻辑:
1. 改为配置 角色id list 分别 导出 多份excel文件。命名格式为 角色id_{}_导出时间_{}.xlsx
2. 改为配置 账户id list 分别 导出 多份excel文件。命名格式为 账户id_{}_角色id_{}_导出时间_{}.xlsx
关于 账户 id 到角色id 的映射逻辑,
首先 读取 mysql 表 vala_app_character
筛选 account_id字段值 == 账户id 的 记录, 其中 该记录 的 id值则为角色id 一个 账户id 可以对应多个角色id
本次需求只针对输入侧调整, 数据抽取聚合逻辑部分和之前保持一致
---------------
需求补充 v1.4
增加一个sheet "单元总结记录"
导出对应角色id的单元总结记录。 参考 export_unit_summary.py 中的原始数据提取方案即可(不必关注其中的数据统计部分)。
其他已有逻辑保持不动哦。
----------------
需求补充 v1.5
1."互动组件学习记录"sheet 增加以下字段
"互动组件名称""组件标题""组件配置摘要""知识点":
字段取值规则:
根据 c_type 及组件配置(从mysql表获取) 进行映射和处理:
```
1.如果 c_type 开头为"mid"
则读取下表:表名:middle_interaction_component
获取以下字段值:
title (作为组件标题)
component_config (完整的组件配置) 获取其中 的 question 字段值 作为 组件配置摘要;
kp_relation_info 字段值 作为 知识点
"互动组件名称"规则:
"物品互动": "mid_vocab_item",
"图片互动": "mid_vocab_image",
"填词互动": "mid_vocab_fillBlank",
"指令互动": "mid_vocab_instruction"
"对话互动-表达": "mid_sentence_dialogue", 且 component_config->question->mode == "express"
"对话互动-朗读": "mid_sentence_dialogue", 且 component_config->question->mode == "read"
"语音互动": "mid_sentence_voice",
"材料互动": "mid_sentence_material",
"造句互动": "mid_sentence_makeSentence"
"挖空互动": "mid_grammar_cloze",
"组句互动": "mid_grammar_sentence"
"发音互动": "mid_pron_pron"
2. 如果 c_type 开头为"core"
则读取下表:表名:core_interaction_component
获取以下字段值:
title (作为组件标题)
component_config (完整的组件配置) 获取其中 的 taskInfo 字段值 作为 组件配置摘要
kp_relation_info 字段值 作为 知识点
"互动组件名称"规则:
"口语快答": "core_speaking_reply",
"口语妙问": "core_speaking_inquiry",
"口语探讨": "core_speaking_explore",
"口语独白": "core_speaking_monologue"
"合作阅读": "core_reading_order",
"合作听力": "core_listening_order",
"看图组句": "core_writing_imgMakeSentence",
"看图撰写": "core_writing_imgWrite",
"问题组句": "core_writing_questionMakeSentence",
"问题撰写": "core_writing_questionWrite",
```
2."课程巩固记录" sheet 增加以下字段
"正确率": 参考 export_lesson_review.py 中的计算逻辑
3. 新增一个"汇总统计"sheet
统计并展示以下内容 请以 可读性 比较好的方式排列、展示
a. "所有互动-按互动组件类型-通过情况统计"
以每种"互动组件名称"进行聚合
统计play_result的取值分布情况算以下指标:
总数量、Perfect数量、Good数量、Failed数量、Pass数量、Perfect比例、Good比例、Failed比例、Pass比例
b. "中互动组件-按知识点-通过情况统计"
以每个知识点进行聚合
其中 知识点配置格式如下:
```
[{"kpId":"0000004","kpType":"sentence","kpTitle":"My name is ...","kpSkill":"sentence_pron","kpSkillName":"语音"},{"kpId":"0000004","kpType":"sentence","kpTitle":"My name is ...","kpSkill":"sentence_meaning","kpSkillName":"语义"},{"kpId":"0000005","kpType":"sentence","kpTitle":"I'm… years old.","kpSkill":"sentence_pron","kpSkillName":"语音"},{"kpId":"0000005","kpType":"sentence","kpTitle":"I'm… years old.","kpSkill":"sentence_meaning","kpSkillName":"语义"},{"kpId":"0000014","kpType":"sentence","kpTitle":"Nice to meet you.","kpSkill":"sentence_pron","kpSkillName":"语音"},{"kpId":"0000014","kpType":"sentence","kpTitle":"Nice to meet you.","kpSkill":"sentence_meaning","kpSkillName":"语义"}]
```
一个组件可以绑定多个知识点,以每个知识点的 kpId + kpType + kpTitle 进行 展示及聚合
对所有绑定了某个知识点的中互动组件(c_type以mid开头)
统计play_result的取值分布情况算以下指标:
总数量、Perfect数量、Good数量、Failed数量、Pass数量、Perfect比例、Good比例、Failed比例、Pass比例
c. "单元总结-按单元统计时长"
"单元总结记录"中的"play_time_seconds"字段值 以每个单元id 进行聚合 进行 累加 统计,并增加一列 转换为分钟为单位 取整数
"""
# ==== 可直接修改的脚本变量(不使用命令行传参) ====
# 三种模式互斥,只能配置一个:
# 模式1单个角色id
USER_ID = None # 单个角色ID示例2911
# 模式2角色id列表多个角色id批量导出
USER_ID_LIST = None # 角色ID列表示例[2911, 2912, 2913]
# 模式3账户id列表通过账户id查询对应的角色id后批量导出
ACCOUNT_ID_LIST = [9343] # 账户ID列表示例[100, 101, 102]
OUTPUT_DIR = "output/" # 输出目录默认为output文件夹
# ==== 变量结束 ====
import os
import json
import re
from typing import Any, Dict, List, Optional
import datetime
try:
import requests
except Exception:
requests = None
try:
import psycopg2
from psycopg2.extras import RealDictCursor
except Exception:
psycopg2 = None
RealDictCursor = None
try:
import pymysql
import pymysql.cursors
except Exception:
pymysql = None
try:
import pandas as pd
except Exception:
pd = None
try:
import urllib3
except Exception:
urllib3 = None
SHEET1_COLUMNS = [
"userId",
"userMsg",
"source",
"userName",
"soeData",
"audioUrl",
"asrStatus",
"componentId",
"componentType",
"dataVersion",
"timeStr",
]
SHEET2_COLUMNS = [
"user_id",
"component_unique_code",
"session_id",
"c_type",
"c_id",
"互动组件名称",
"组件标题",
"组件配置摘要",
"知识点",
"play_result",
"user_behavior_info",
"updated_at",
]
SHEET3_COLUMNS = [
"user_id",
"unit_id",
"lesson_id",
"question_list",
"正确率",
"updated_at",
]
SHEET4_COLUMNS = [
"user_id",
"unit_id",
"category",
"score_text",
"question_list",
"updated_at",
]
SHEET5_COLUMNS = [
"id",
"user_id",
"unit_id",
"updated_at",
"km_id",
"km_type",
"play_time_seconds",
]
def _load_env_file(path: str) -> None:
if not os.path.exists(path):
return
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
k, v = line.split("=", 1)
k = k.strip()
v = v.strip().strip('"').strip("'")
if k and (os.getenv(k) is None):
os.environ[k] = v
except Exception:
pass
def load_env() -> None:
_load_env_file(os.path.join(os.getcwd(), ".env"))
_load_env_file(os.path.join(os.getcwd(), ".env.local"))
def to_json_str(v: Any) -> Any:
if isinstance(v, (dict, list)):
try:
return json.dumps(v, ensure_ascii=False)
except Exception:
return str(v)
return v
def parse_time(value: Any) -> Optional[datetime.datetime]:
if value is None:
return None
if isinstance(value, (int, float)):
try:
v = float(value)
# 兼容毫秒级时间戳
if v > 1e11:
v = v / 1000.0
return datetime.datetime.fromtimestamp(v)
except Exception:
return None
if isinstance(value, str):
fmts = [
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d",
]
for fmt in fmts:
try:
return datetime.datetime.strptime(value, fmt)
except Exception:
continue
try:
return datetime.datetime.fromisoformat(value)
except Exception:
return None
return None
def pick_time(source: Dict[str, Any]) -> Optional[datetime.datetime]:
candidates = [
"updated_at",
"created_at",
"@timestamp",
"timestamp",
"updatedAt",
"createdAt",
"time",
"ts",
"timeStr",
"update_time",
"create_time",
]
for key in candidates:
if key in source:
t = parse_time(source.get(key))
if t is not None:
return t
# 宽松匹配:尝试扫描所有可能的时间相关字段
for k, v in source.items():
lk = str(k).lower()
if any(s in lk for s in ["time", "date", "_at", "timestamp"]):
t = parse_time(v)
if t is not None:
return t
return None
def extract_makee_id_from_user_msg(user_msg: Any) -> Optional[str]:
# 支持dict或字符串形式
if isinstance(user_msg, dict):
mk = user_msg.get("makee_id")
if isinstance(mk, str) and mk:
return mk
if isinstance(user_msg, str) and user_msg:
# 1) 尝试整体解析为JSON
try:
obj = json.loads(user_msg)
mk = obj.get("makee_id")
if isinstance(mk, str) and mk:
return mk
except Exception:
pass
# 2) 尝试截取大括号中的JSON
try:
start = user_msg.find("{")
end = user_msg.rfind("}")
if start != -1 and end != -1 and end > start:
candidate = user_msg[start : end + 1]
obj = json.loads(candidate)
mk = obj.get("makee_id")
if isinstance(mk, str) and mk:
return mk
except Exception:
pass
# 3) 正则匹配 makee_id
m = re.search(r"\bmakee_id\b\s*:\s*\"([^\"]+)\"", user_msg)
if m:
return m.group(1)
return None
def fetch_es_asr_log(makee_id: str, es_cfg: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if requests is None:
raise RuntimeError("缺少requests依赖请安装后再运行。")
host = es_cfg.get("host")
port = es_cfg.get("port")
scheme = es_cfg.get("scheme", "http")
user = es_cfg.get("user")
password = es_cfg.get("password")
index = "llm_asr_log"
if not host:
return None
base = f"{scheme}://{host}:{port}"
url = f"{base}/{index}/_search"
headers = {"Content-Type": "application/json"}
body = {
"query": {
"bool": {
"should": [
{"term": {"makee_id": {"value": str(makee_id)}}},
{"term": {"makee_id.keyword": {"value": str(makee_id)}}},
],
"minimum_should_match": 1,
}
},
"size": 10,
"_source": [
"makee_id",
"result_text",
"source",
"updated_at",
"created_at",
"@timestamp",
"timestamp",
"updatedAt",
"createdAt",
"time",
"ts",
"timeStr",
"update_time",
"create_time",
],
}
auth = (user, password) if user and password else None
try:
if scheme == "https" and urllib3 is not None:
try:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except Exception:
pass
resp = requests.post(url, headers=headers, json=body, auth=auth, timeout=20, verify=False if scheme == "https" else True)
resp.raise_for_status()
data = resp.json()
except Exception:
return None
hits = data.get("hits", {}).get("hits", [])
if not hits:
return None
# 选最新的
chosen = None
best_t = None
for h in hits:
src = h.get("_source", {}) or {}
t = pick_time(src)
if t is None:
continue
if best_t is None or t > best_t:
best_t = t
chosen = src
if chosen is None:
# 如果都没有时间,选第一条
chosen = (hits[0].get("_source", {}) or {})
return chosen
def get_es_config() -> Dict[str, Any]:
return {
"host": os.getenv("ES_HOST"),
"port": os.getenv("ES_PORT", "9200"),
"scheme": os.getenv("ES_SCHEME", "http"),
"user": os.getenv("ES_USER"),
"password": os.getenv("ES_PASSWORD"),
"index": "user-audio",
}
def fetch_es_user_audio(user_id: str, es_cfg: Dict[str, Any]) -> List[Dict[str, Any]]:
if requests is None:
raise RuntimeError("缺少requests依赖请安装后再运行。")
print(f" [ES] 开始查询user-audio索引...")
start_time = datetime.datetime.now()
host = es_cfg.get("host")
port = es_cfg.get("port")
scheme = es_cfg.get("scheme", "http")
user = es_cfg.get("user")
password = es_cfg.get("password")
index = es_cfg.get("index", "user-audio")
if not host:
return []
base = f"{scheme}://{host}:{port}"
url = f"{base}/{index}/_search"
headers = {"Content-Type": "application/json"}
body = {
"query": {
"bool": {
"should": [
{"term": {"userId": {"value": str(user_id)}}},
{"term": {"userId.keyword": {"value": str(user_id)}}},
],
"minimum_should_match": 1,
}
},
"size": 10000,
"_source": [
"userId",
"userMsg",
"userName",
"soeData",
"audioUrl",
"asrStatus",
"componentId",
"componentType",
"dataVersion",
"updated_at",
"created_at",
"@timestamp",
"timestamp",
"updatedAt",
"createdAt",
"time",
"ts",
"timeStr",
"update_time",
"create_time",
],
}
auth = (user, password) if user and password else None
try:
# 抑制自签证书下的HTTPS不安全警告
if scheme == "https" and urllib3 is not None:
try:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except Exception:
pass
resp = requests.post(url, headers=headers, json=body, auth=auth, timeout=30, verify=False if scheme == "https" else True)
resp.raise_for_status()
data = resp.json()
except Exception as e:
raise RuntimeError(f"ES查询失败: {e}")
hits = data.get("hits", {}).get("hits", [])
print(f" [ES] 查询完成,获得{len(hits)}条记录,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}")
if not hits:
return []
print(f" [ES] 开始处理音频数据...")
process_start = datetime.datetime.now()
rows: List[Dict[str, Any]] = []
asr_cache: Dict[str, Dict[str, Any]] = {}
makee_id_count = 0
for idx, h in enumerate(hits, 1):
# 每处理100条显示一次进度
if idx % 100 == 0 or idx == len(hits):
print(f" [ES] 处理进度: {idx}/{len(hits)} ({idx*100//len(hits)}%)")
src = h.get("_source", {}) or {}
row = {
"userId": src.get("userId"),
"userMsg": src.get("userMsg"),
"source": None,
"userName": src.get("userName"),
"soeData": to_json_str(src.get("soeData")),
"audioUrl": src.get("audioUrl"),
"asrStatus": src.get("asrStatus"),
"componentId": src.get("componentId"),
"componentType": src.get("componentType"),
"dataVersion": src.get("dataVersion"),
}
t = pick_time(src)
row["_time"] = t.isoformat() if t else None
row["timeStr"] = t.strftime("%Y-%m-%d %H:%M:%S") if t else None
# v1.2: 当userMsg包含makee_id时补充查询llm_asr_log并回填
mk = extract_makee_id_from_user_msg(row.get("userMsg"))
if mk:
makee_id_count += 1
asr_doc = asr_cache.get(mk)
if asr_doc is None:
asr_doc = fetch_es_asr_log(mk, es_cfg)
if asr_doc is not None:
asr_cache[mk] = asr_doc
if asr_doc is not None:
rt = asr_doc.get("result_text")
if rt:
row["userMsg"] = rt
row["source"] = to_json_str(asr_doc.get("source"))
rows.append(row)
print(f" [ES] 数据处理完成,发现{makee_id_count}条包含makee_id的记录耗时{(datetime.datetime.now() - process_start).total_seconds():.2f}")
print(f" [ES] 开始排序...")
rows.sort(key=lambda x: parse_time(x.get("_time")) or datetime.datetime.min, reverse=True)
print(f" [ES] 音频数据处理完成,总耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}")
return rows
def get_pg_conn() -> Any:
if psycopg2 is None:
raise RuntimeError("缺少psycopg2依赖请安装后再运行。")
host = os.getenv("PG_DB_HOST")
port = int(os.getenv("PG_DB_PORT", "5432"))
user = os.getenv("PG_DB_USER")
password = os.getenv("PG_DB_PASSWORD")
dbname = os.getenv("PG_DB_DATABASE")
if not host or not dbname:
raise RuntimeError("PG数据库环境变量未配置完整")
conn = psycopg2.connect(host=host, port=port, user=user, password=password, dbname=dbname)
return conn
def get_mysql_conn(database: str) -> Any:
"""
获取MySQL数据库连接
Args:
database: 数据库名,可选值:'vala_user''vala_test'
vala_user 使用 online 配置(环境变量后缀 _online
vala_test 使用默认配置
Returns:
MySQL连接对象
"""
if pymysql is None:
raise RuntimeError("缺少pymysql依赖请安装后再运行。")
# 根据数据库选择不同的环境变量配置
if database == "vala_user":
# vala_user 数据库使用 online 配置
host = os.getenv("MYSQL_HOST_online")
port = int(os.getenv("MYSQL_PORT_online", "3306"))
user = os.getenv("MYSQL_USERNAME_online")
password = os.getenv("MYSQL_PASSWORD_online")
if not host:
raise RuntimeError("MySQL数据库环境变量未配置完整缺少MYSQL_HOST_online")
else:
# vala_test 等其他数据库使用默认配置
host = os.getenv("MYSQL_HOST")
port = int(os.getenv("MYSQL_PORT", "3306"))
user = os.getenv("MYSQL_USERNAME")
password = os.getenv("MYSQL_PASSWORD")
if not host:
raise RuntimeError("MySQL数据库环境变量未配置完整缺少MYSQL_HOST")
conn = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
database=database, # 直接使用传入的数据库名
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
)
return conn
def get_id_2_unit_index(conn: Any) -> Dict[int, int]:
"""
从MySQL获取 story_id 到 unit_id 的映射关系
Args:
conn: MySQL数据库连接
Returns:
映射字典 {story_id: unit_id}
"""
sql = """
SELECT *
FROM `vala_game_info`
WHERE id > 0
AND `vala_game_info`.`deleted_at` IS NULL
ORDER BY season_package_id asc, `index` asc
"""
try:
with conn.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall() or []
# 构建映射表按查询结果的顺序索引即为unit_id
id_2_unit_index = {}
for index, row in enumerate(rows):
id_2_unit_index[row["id"]] = index
return id_2_unit_index
except Exception as e:
print(f"[ERROR] 获取story_id到unit_id映射失败: {e}")
return {}
def get_chapter_id_to_lesson_id(conn: Any) -> Dict[int, int]:
"""
从MySQL获取 chapter_id 到 lesson_id 的映射关系
Args:
conn: MySQL数据库连接
Returns:
映射字典 {chapter_id: lesson_id}
"""
sql = """
SELECT id, `index`
FROM `vala_game_chapter`
WHERE deleted_at IS NULL
"""
try:
with conn.cursor() as cur:
cur.execute(sql)
rows = cur.fetchall() or []
# 构建映射表chapter的index字段即为lesson_id
chapter_id_to_lesson_id = {}
for row in rows:
chapter_id_to_lesson_id[row["id"]] = row["index"]
return chapter_id_to_lesson_id
except Exception as e:
print(f"[ERROR] 获取chapter_id到lesson_id映射失败: {e}")
return {}
# 组件类型到组件名称的映射
COMPONENT_TYPE_NAMES = {
"mid_vocab_item": "物品互动",
"mid_vocab_image": "图片互动",
"mid_vocab_fillBlank": "填词互动",
"mid_vocab_instruction": "指令互动",
"mid_sentence_dialogue": "对话互动", # 需要根据mode进一步判断
"mid_sentence_voice": "语音互动",
"mid_sentence_material": "材料互动",
"mid_sentence_makeSentence": "造句互动",
"mid_grammar_cloze": "挖空互动",
"mid_grammar_sentence": "组句互动",
"mid_pron_pron": "发音互动",
"core_speaking_reply": "口语快答",
"core_speaking_inquiry": "口语妙问",
"core_speaking_explore": "口语探讨",
"core_speaking_monologue": "口语独白",
"core_reading_order": "合作阅读",
"core_listening_order": "合作听力",
"core_writing_imgMakeSentence": "看图组句",
"core_writing_imgWrite": "看图撰写",
"core_writing_questionMakeSentence": "问题组句",
"core_writing_questionWrite": "问题撰写",
}
def get_component_name(c_type: str, component_config: Optional[Dict[str, Any]]) -> str:
"""
根据c_type和组件配置获取组件名称
Args:
c_type: 组件类型
component_config: 组件配置用于判断对话互动的mode
Returns:
组件名称
"""
if not c_type:
return ""
# 特殊处理对话互动需要根据mode判断
if c_type == "mid_sentence_dialogue" and component_config:
try:
question = component_config.get("question", {})
mode = question.get("mode", "")
if mode == "express":
return "对话互动-表达"
elif mode == "read":
return "对话互动-朗读"
except Exception:
pass
return COMPONENT_TYPE_NAMES.get(c_type, "")
def batch_fetch_component_configs(play_records: List[Dict[str, Any]], mysql_conn: Any) -> Dict[str, Dict[str, Any]]:
"""
批量查询组件配置信息
Args:
play_records: 播放记录列表
mysql_conn: MySQL连接
Returns:
组件配置映射 {c_type_c_id: {title, component_config, kp_relation_info}}
"""
print(f" [MySQL] 开始批量查询组件配置...")
start_time = datetime.datetime.now()
# 收集需要查询的c_type和c_id
mid_c_ids = set()
core_c_ids = set()
mid_type_id_pairs = [] # 用于调试日志
core_type_id_pairs = []
for record in play_records:
c_type = record.get("c_type", "")
c_id = record.get("c_id")
if c_type and c_id:
if c_type.startswith("mid"):
mid_c_ids.add(c_id)
mid_type_id_pairs.append((c_type, c_id))
elif c_type.startswith("core"):
core_c_ids.add(c_id)
core_type_id_pairs.append((c_type, c_id))
print(f" [MySQL] 需要查询中互动组件: {len(mid_c_ids)}个, 核心互动组件: {len(core_c_ids)}")
if mid_c_ids:
print(f" [MySQL] 中互动组件ID列表前10个: {sorted(list(mid_c_ids))[:10]}")
if core_c_ids:
print(f" [MySQL] 核心互动组件ID列表前10个: {sorted(list(core_c_ids))[:10]}")
config_map = {}
# 批量查询middle_interaction_component
if mid_c_ids:
try:
with mysql_conn.cursor() as cur:
placeholders = ','.join(['%s'] * len(mid_c_ids))
sql = f"""
SELECT c_id, c_type, title, component_config, kp_relation_info
FROM middle_interaction_component
WHERE c_id IN ({placeholders}) AND deleted_at IS NULL
"""
print(f" [MySQL] 执行中互动组件查询,查询条件: c_id IN ({len(mid_c_ids)}个ID)")
cur.execute(sql, tuple(mid_c_ids))
rows = cur.fetchall() or []
print(f" [MySQL] 查询到{len(rows)}条中互动组件配置")
if len(rows) == 0 and len(mid_c_ids) > 0:
print(f" [MySQL] [警告] 查询结果为空!可能的原因:")
print(f" [MySQL] - 数据库中没有匹配的c_id记录")
print(f" [MySQL] - deleted_at字段不为NULL")
print(f" [MySQL] - c_id不存在")
for idx, row in enumerate(rows):
c_type = row.get("c_type", "")
c_id = row.get("c_id")
key = f"{c_type}_{c_id}"
if idx < 3: # 输出前3条的详细信息
print(f" [MySQL] [样例{idx+1}] id={c_id}, c_type={c_type}, key={key}")
print(f" [MySQL] [样例{idx+1}] title={row.get('title', '')[:50]}")
# 解析component_config
component_config = row.get("component_config")
if isinstance(component_config, str):
try:
component_config = json.loads(component_config)
except Exception as e:
print(f" [MySQL] [警告] 解析component_config失败 (id={c_id}): {e}")
component_config = {}
# 提取question字段作为摘要
summary = ""
if isinstance(component_config, dict):
question = component_config.get("question")
summary = to_json_str(question) if question else ""
if idx < 3 and question:
print(f" [MySQL] [样例{idx+1}] 提取到question字段长度: {len(summary)}")
# 解析kp_relation_info
kp_relation_info = row.get("kp_relation_info")
if isinstance(kp_relation_info, str):
try:
kp_relation_info = json.loads(kp_relation_info)
except Exception:
kp_relation_info = []
config_map[key] = {
"title": row.get("title", ""),
"component_config": component_config,
"summary": summary,
"kp_relation_info": to_json_str(kp_relation_info),
}
print(f" [MySQL] 中互动组件配置已加入config_map当前map大小: {len(config_map)}")
except Exception as e:
print(f" [MySQL] [错误] 查询中互动组件配置失败: {e}")
import traceback
traceback.print_exc()
# 批量查询core_interaction_component
if core_c_ids:
try:
with mysql_conn.cursor() as cur:
placeholders = ','.join(['%s'] * len(core_c_ids))
sql = f"""
SELECT c_id, c_type, title, component_config, kp_relation_info
FROM core_interaction_component
WHERE c_id IN ({placeholders}) AND deleted_at IS NULL
"""
print(f" [MySQL] 执行核心互动组件查询,查询条件: c_id IN ({len(core_c_ids)}个ID)")
cur.execute(sql, tuple(core_c_ids))
rows = cur.fetchall() or []
print(f" [MySQL] 查询到{len(rows)}条核心互动组件配置")
if len(rows) == 0 and len(core_c_ids) > 0:
print(f" [MySQL] [警告] 查询结果为空!可能的原因:")
print(f" [MySQL] - 数据库中没有匹配的c_id记录")
print(f" [MySQL] - deleted_at字段不为NULL")
print(f" [MySQL] - c_id不存在")
for idx, row in enumerate(rows):
c_type = row.get("c_type", "")
c_id = row.get("c_id")
key = f"{c_type}_{c_id}"
if idx < 3: # 输出前3条的详细信息
print(f" [MySQL] [样例{idx+1}] id={c_id}, c_type={c_type}, key={key}")
print(f" [MySQL] [样例{idx+1}] title={row.get('title', '')[:50]}")
# 解析component_config
component_config = row.get("component_config")
if isinstance(component_config, str):
try:
component_config = json.loads(component_config)
except Exception as e:
print(f" [MySQL] [警告] 解析component_config失败 (id={c_id}): {e}")
component_config = {}
# 提取taskInfo字段作为摘要
summary = ""
if isinstance(component_config, dict):
task_info = component_config.get("taskInfo")
summary = to_json_str(task_info) if task_info else ""
if idx < 3 and task_info:
print(f" [MySQL] [样例{idx+1}] 提取到taskInfo字段长度: {len(summary)}")
# 解析kp_relation_info
kp_relation_info = row.get("kp_relation_info")
if isinstance(kp_relation_info, str):
try:
kp_relation_info = json.loads(kp_relation_info)
except Exception:
kp_relation_info = []
config_map[key] = {
"title": row.get("title", ""),
"component_config": component_config,
"summary": summary,
"kp_relation_info": to_json_str(kp_relation_info),
}
print(f" [MySQL] 核心互动组件配置已加入config_map当前map大小: {len(config_map)}")
except Exception as e:
print(f" [MySQL] [错误] 查询核心互动组件配置失败: {e}")
import traceback
traceback.print_exc()
print(f" [MySQL] 组件配置查询完成,共{len(config_map)}条,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}")
return config_map
def calculate_accuracy(question_list: Any) -> float:
"""
计算问题列表的正确率
Args:
question_list: 问题列表可能是JSON字符串或list
Returns:
正确率百分比保留2位小数
"""
try:
if isinstance(question_list, str):
question_list = json.loads(question_list)
if not isinstance(question_list, list) or len(question_list) == 0:
return 0.0
total = len(question_list)
correct = sum(1 for q in question_list if q.get('isRight') == True)
accuracy = round(correct / total * 100, 2) if total > 0 else 0.0
return accuracy
except Exception:
return 0.0
def fetch_character_ids_by_account(account_id: str, conn: Any) -> List[str]:
"""根据账户id查询对应的角色id列表"""
sql = "SELECT id FROM vala_app_character WHERE account_id = %s"
try:
with conn.cursor() as cur:
cur.execute(sql, (account_id,))
rows = cur.fetchall() or []
return [str(row["id"]) for row in rows if row.get("id")]
except Exception as e:
print(f"[ERROR] 查询账户id={account_id}的角色id失败: {e}")
return []
def fetch_pg_play_records(user_id: str, conn: Any, mysql_conn: Any) -> List[Dict[str, Any]]:
"""
查询互动组件学习记录并补充组件配置信息
Args:
user_id: 用户ID角色ID
conn: PostgreSQL数据库连接
mysql_conn: MySQL数据库连接
Returns:
互动组件学习记录列表
"""
print(f" [PG] 开始查询互动组件学习记录8张分表...")
start_time = datetime.datetime.now()
tables = [f"user_component_play_record_{i}" for i in range(8)]
rows: List[Dict[str, Any]] = []
with conn.cursor(cursor_factory=RealDictCursor) as cur:
for t in tables:
try:
cur.execute(
f"""
SELECT user_id, component_unique_code, session_id, c_type, c_id,
play_result, user_behavior_info, updated_at
FROM {t}
WHERE user_id = %s
ORDER BY updated_at DESC
""",
(user_id,),
)
part = cur.fetchall() or []
if part:
print(f" [PG] 表{t}查到{len(part)}条记录")
for r in part:
r = dict(r)
r["play_result"] = to_json_str(r.get("play_result"))
r["user_behavior_info"] = to_json_str(r.get("user_behavior_info"))
# 将带时区的时间转换为无时区避免Excel写入报错
upd = r.get("updated_at")
if isinstance(upd, datetime.datetime):
try:
if upd.tzinfo is not None and upd.tzinfo.utcoffset(upd) is not None:
r["updated_at"] = upd.replace(tzinfo=None)
except Exception:
# 回退为字符串
r["updated_at"] = str(upd)
rows.append(r)
except Exception as e:
print(f" [PG] 表{t}查询失败: {e}")
continue
rows.sort(key=lambda x: parse_time(x.get("updated_at")) or datetime.datetime.min, reverse=True)
print(f" [PG] 互动组件学习记录查询完成,共{len(rows)}条,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}")
# 批量查询组件配置
if rows and mysql_conn:
config_map = batch_fetch_component_configs(rows, mysql_conn)
# 补充组件信息
print(f" [PG] 开始补充组件配置信息...")
filled_count = 0
empty_count = 0
sample_keys = []
sample_mode_check = [] # 检查对话互动的mode
for r in rows:
c_type = r.get("c_type", "")
c_id = r.get("c_id")
key = f"{c_type}_{c_id}" if c_type and c_id else ""
config = config_map.get(key, {})
component_config = config.get("component_config", {})
component_name = get_component_name(c_type, component_config)
r["互动组件名称"] = component_name
r["组件标题"] = config.get("title", "")
r["组件配置摘要"] = config.get("summary", "")
r["知识点"] = config.get("kp_relation_info", "")
# 统计填充情况
if config:
filled_count += 1
if len(sample_keys) < 3:
sample_keys.append((key, component_name, r["组件标题"][:30] if r["组件标题"] else ""))
# 检查对话互动的mode
if c_type == "mid_sentence_dialogue" and len(sample_mode_check) < 3:
mode = ""
if isinstance(component_config, dict):
question = component_config.get("question", {})
if isinstance(question, dict):
mode = question.get("mode", "")
sample_mode_check.append({
"key": key,
"mode": mode,
"component_name": component_name
})
else:
empty_count += 1
if empty_count <= 5: # 输出前5个未匹配的key
print(f" [PG] [警告] 未找到组件配置: key={key}")
print(f" [PG] 组件配置信息补充完成")
print(f" [PG] 匹配到配置: {filled_count}条, 未匹配: {empty_count}")
if sample_keys:
print(f" [PG] 样例数据前3条:")
for key, name, title in sample_keys:
print(f" [PG] - key={key}, 名称={name}, 标题={title}")
if sample_mode_check:
print(f" [PG] 对话互动mode检查前3条:")
for s in sample_mode_check:
print(f" [PG] - key={s['key']}, mode={s['mode']}, 最终名称={s['component_name']}")
return rows
def fetch_pg_unit_review(user_id: str, conn: Any, id_2_unit_index: Dict[int, int], chapter_id_to_lesson_id: Dict[int, int]) -> List[Dict[str, Any]]:
"""
查询课程巩固记录
Args:
user_id: 用户ID角色ID
conn: PostgreSQL数据库连接
id_2_unit_index: story_id到unit_id的映射字典
chapter_id_to_lesson_id: chapter_id到lesson_id的映射字典
Returns:
课程巩固记录列表
"""
print(f" [PG] 开始查询课程巩固记录...")
start_time = datetime.datetime.now()
sql = (
"SELECT user_id, story_id, chapter_id, question_list, updated_at "
"FROM user_unit_review_question_result WHERE user_id = %s ORDER BY updated_at DESC"
)
with conn.cursor(cursor_factory=RealDictCursor) as cur:
try:
cur.execute(sql, (user_id,))
rows = cur.fetchall() or []
except Exception as e:
print(f" [PG] 课程巩固记录查询失败: {e}")
rows = []
out: List[Dict[str, Any]] = []
for r in rows:
d = dict(r)
# 映射 story_id 到 unit_id
story_id = d.get("story_id")
unit_id = id_2_unit_index.get(story_id) if story_id else None
d["unit_id"] = unit_id
# 映射 chapter_id 到 lesson_id
chapter_id = d.get("chapter_id")
lesson_id = chapter_id_to_lesson_id.get(chapter_id) if chapter_id else None
d["lesson_id"] = lesson_id
# 计算正确率
question_list = d.get("question_list")
d["正确率"] = calculate_accuracy(question_list)
d["question_list"] = to_json_str(question_list)
upd = d.get("updated_at")
if isinstance(upd, datetime.datetime):
try:
if upd.tzinfo is not None and upd.tzinfo.utcoffset(upd) is not None:
d["updated_at"] = upd.replace(tzinfo=None)
except Exception:
d["updated_at"] = str(upd)
out.append(d)
print(f" [PG] 课程巩固记录查询完成,共{len(out)}条,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}")
return out
def fetch_pg_unit_challenge(user_id: str, conn: Any, id_2_unit_index: Dict[int, int]) -> List[Dict[str, Any]]:
"""
查询单元挑战记录
Args:
user_id: 用户ID角色ID
conn: PostgreSQL数据库连接
id_2_unit_index: story_id到unit_id的映射字典
Returns:
单元挑战记录列表
"""
print(f" [PG] 开始查询单元挑战记录...")
start_time = datetime.datetime.now()
sql = (
"SELECT user_id, story_id, category, score_text, question_list, updated_at "
"FROM user_unit_challenge_question_result WHERE user_id = %s ORDER BY updated_at DESC"
)
with conn.cursor(cursor_factory=RealDictCursor) as cur:
try:
cur.execute(sql, (user_id,))
rows = cur.fetchall() or []
except Exception as e:
print(f" [PG] 单元挑战记录查询失败: {e}")
rows = []
out: List[Dict[str, Any]] = []
for r in rows:
d = dict(r)
# 映射 story_id 到 unit_id
story_id = d.get("story_id")
unit_id = id_2_unit_index.get(story_id) if story_id else None
d["unit_id"] = unit_id
d["question_list"] = to_json_str(d.get("question_list"))
upd = d.get("updated_at")
if isinstance(upd, datetime.datetime):
try:
if upd.tzinfo is not None and upd.tzinfo.utcoffset(upd) is not None:
d["updated_at"] = upd.replace(tzinfo=None)
except Exception:
d["updated_at"] = str(upd)
out.append(d)
print(f" [PG] 单元挑战记录查询完成,共{len(out)}条,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}")
return out
def fetch_pg_unit_summary(user_id: str, conn: Any, id_2_unit_index: Dict[int, int]) -> List[Dict[str, Any]]:
"""
查询单元总结知识点结果数据
Args:
user_id: 用户ID角色ID
conn: PostgreSQL数据库连接
id_2_unit_index: story_id到unit_id的映射字典
Returns:
单元总结记录列表
"""
print(f" [PG] 开始查询单元总结记录...")
start_time = datetime.datetime.now()
sql = (
"SELECT id, user_id, story_id, updated_at, km_id, km_type, play_time "
"FROM user_unit_summary_km_result WHERE user_id = %s AND deleted_at IS NULL ORDER BY updated_at DESC"
)
with conn.cursor(cursor_factory=RealDictCursor) as cur:
try:
cur.execute(sql, (user_id,))
rows = cur.fetchall() or []
except Exception as e:
print(f" [PG] 单元总结记录查询失败: {e}")
rows = []
out: List[Dict[str, Any]] = []
for r in rows:
d = dict(r)
# 映射 story_id 到 unit_id
story_id = d.get("story_id")
unit_id = id_2_unit_index.get(story_id) if story_id else None
d["unit_id"] = unit_id
# 转换 play_time (毫秒) 为秒 (整数)
play_time = d.get("play_time")
d["play_time_seconds"] = play_time // 1000 if play_time else 0
# 移除时区信息
upd = d.get("updated_at")
if isinstance(upd, datetime.datetime):
try:
if upd.tzinfo is not None and upd.tzinfo.utcoffset(upd) is not None:
d["updated_at"] = upd.replace(tzinfo=None)
except Exception:
d["updated_at"] = str(upd)
out.append(d)
print(f" [PG] 单元总结记录查询完成,共{len(out)}条,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}")
return out
def generate_statistics(sheet2_rows: List[Dict[str, Any]], sheet5_rows: List[Dict[str, Any]]) -> tuple:
"""
生成汇总统计数据
Args:
sheet2_rows: 互动组件学习记录
sheet5_rows: 单元总结记录
Returns:
(组件统计DataFrame, 知识点统计DataFrame, 单元时长统计DataFrame)
"""
if pd is None:
raise RuntimeError("缺少pandas依赖请安装后再运行。")
print(f" [统计] 开始生成汇总统计数据...")
start_time = datetime.datetime.now()
from collections import defaultdict
# ============ a. 所有互动-按互动组件类型-通过情况统计 ============
component_stats_data = []
component_stats = defaultdict(lambda: {"Perfect": 0, "Good": 0, "Failed": 0, "Pass": 0, "Oops": 0, "total": 0})
# 用于调试
sample_results = []
parse_error_count = 0
for idx, record in enumerate(sheet2_rows):
component_name = record.get("互动组件名称", "")
if not component_name:
continue
play_result_str = record.get("play_result", "")
# 解析play_result
result = ""
try:
# 先判断是否是简单的字符串Perfect/Good/Failed/Pass/Oops
if isinstance(play_result_str, str):
# 去除空格后检查
stripped = play_result_str.strip()
if stripped in ["Perfect", "Good", "Failed", "Pass", "Oops"]:
# 直接使用
result = stripped
else:
# 尝试JSON解析
try:
play_result = json.loads(play_result_str)
if isinstance(play_result, dict):
result = play_result.get("result", "")
else:
result = ""
except:
result = ""
else:
# 如果不是字符串尝试当dict处理
if isinstance(play_result_str, dict):
result = play_result_str.get("result", "")
else:
result = ""
# 收集前3个样例
if idx < 3:
sample_results.append({
"component": component_name,
"raw": str(play_result_str)[:100],
"result": result
})
except Exception as e:
parse_error_count += 1
if parse_error_count <= 3:
print(f" [统计] [警告] 解析play_result失败 (第{idx+1}条): {e}, 原始值: {str(play_result_str)[:100]}")
result = ""
component_stats[component_name]["total"] += 1
if result in ["Perfect", "Good", "Failed", "Pass", "Oops"]:
component_stats[component_name][result] += 1
print(f" [统计] play_result解析样例前3条:")
for s in sample_results:
print(f" [统计] - 组件: {s['component']}, 结果: {s['result']}, 原始: {s['raw']}")
if parse_error_count > 0:
print(f" [统计] play_result解析失败总数: {parse_error_count}")
# 生成统计数据行
for component_name in sorted(component_stats.keys()):
stats = component_stats[component_name]
total = stats["total"]
perfect = stats["Perfect"]
good = stats["Good"]
failed = stats["Failed"]
pass_count = stats["Pass"]
oops = stats["Oops"]
perfect_ratio = round(perfect / total * 100, 2) if total > 0 else 0
good_ratio = round(good / total * 100, 2) if total > 0 else 0
failed_ratio = round(failed / total * 100, 2) if total > 0 else 0
pass_ratio = round(pass_count / total * 100, 2) if total > 0 else 0
oops_ratio = round(oops / total * 100, 2) if total > 0 else 0
component_stats_data.append({
"互动组件名称": component_name,
"总数量": total,
"Perfect数量": perfect,
"Good数量": good,
"Failed数量": failed,
"Pass数量": pass_count,
"Oops数量": oops,
"Perfect比例(%)": perfect_ratio,
"Good比例(%)": good_ratio,
"Failed比例(%)": failed_ratio,
"Pass比例(%)": pass_ratio,
"Oops比例(%)": oops_ratio,
})
# ============ b. 中互动组件-按知识点-通过情况统计 ============
kp_stats_data = []
kp_stats = defaultdict(lambda: {"Perfect": 0, "Good": 0, "Failed": 0, "Pass": 0, "Oops": 0, "total": 0})
# 调试信息
mid_count = 0
has_kp_count = 0
sample_kp_records = []
for idx, record in enumerate(sheet2_rows):
c_type = record.get("c_type", "")
if not c_type or not c_type.startswith("mid"):
continue
mid_count += 1
kp_relation_info_str = record.get("知识点", "")
if not kp_relation_info_str:
continue
has_kp_count += 1
# 解析知识点
try:
if isinstance(kp_relation_info_str, str):
kp_relation_info = json.loads(kp_relation_info_str)
else:
kp_relation_info = kp_relation_info_str
if not isinstance(kp_relation_info, list):
continue
# 收集样例
if len(sample_kp_records) < 3:
sample_kp_records.append({
"c_type": c_type,
"kp_count": len(kp_relation_info),
"kp_info": str(kp_relation_info)[:200]
})
# 解析play_result使用相同的逻辑
play_result_str = record.get("play_result", "")
result = ""
if isinstance(play_result_str, str):
stripped = play_result_str.strip()
if stripped in ["Perfect", "Good", "Failed", "Pass", "Oops"]:
result = stripped
else:
try:
play_result = json.loads(play_result_str)
if isinstance(play_result, dict):
result = play_result.get("result", "")
except:
pass
elif isinstance(play_result_str, dict):
result = play_result_str.get("result", "")
# 为每个知识点统计
for kp in kp_relation_info:
if not isinstance(kp, dict):
continue
kp_id = kp.get("kpId", "")
kp_type = kp.get("kpType", "")
kp_title = kp.get("kpTitle", "")
if not kp_id:
continue
kp_key = f"{kp_id}|{kp_type}|{kp_title}"
kp_stats[kp_key]["total"] += 1
if result in ["Perfect", "Good", "Failed", "Pass", "Oops"]:
kp_stats[kp_key][result] += 1
except Exception as e:
if len(sample_kp_records) < 5:
print(f" [统计] [警告] 解析知识点失败: {e}, 原始值: {str(kp_relation_info_str)[:100]}")
continue
print(f" [统计] 中互动组件统计: 总数={mid_count}, 有知识点={has_kp_count}, 知识点条目数={len(kp_stats)}")
if sample_kp_records:
print(f" [统计] 知识点样例前3条:")
for s in sample_kp_records:
print(f" [统计] - c_type={s['c_type']}, 知识点数量={s['kp_count']}, 内容={s['kp_info']}")
# 生成知识点统计数据行
for kp_key in sorted(kp_stats.keys()):
parts = kp_key.split("|")
if len(parts) != 3:
continue
kp_id, kp_type, kp_title = parts
stats = kp_stats[kp_key]
total = stats["total"]
perfect = stats["Perfect"]
good = stats["Good"]
failed = stats["Failed"]
pass_count = stats["Pass"]
oops = stats["Oops"]
perfect_ratio = round(perfect / total * 100, 2) if total > 0 else 0
good_ratio = round(good / total * 100, 2) if total > 0 else 0
failed_ratio = round(failed / total * 100, 2) if total > 0 else 0
pass_ratio = round(pass_count / total * 100, 2) if total > 0 else 0
oops_ratio = round(oops / total * 100, 2) if total > 0 else 0
kp_stats_data.append({
"知识点ID": kp_id,
"知识点类型": kp_type,
"知识点标题": kp_title,
"总数量": total,
"Perfect数量": perfect,
"Good数量": good,
"Failed数量": failed,
"Pass数量": pass_count,
"Oops数量": oops,
"Perfect比例(%)": perfect_ratio,
"Good比例(%)": good_ratio,
"Failed比例(%)": failed_ratio,
"Pass比例(%)": pass_ratio,
"Oops比例(%)": oops_ratio,
})
# ============ c. 单元总结-按单元统计时长 ============
unit_time_stats_data = []
unit_time_stats = defaultdict(int)
for record in sheet5_rows:
unit_id = record.get("unit_id")
play_time_seconds = record.get("play_time_seconds", 0)
if unit_id is not None:
unit_time_stats[unit_id] += play_time_seconds
# 生成单元时长统计数据行
for unit_id in sorted(unit_time_stats.keys()):
total_seconds = unit_time_stats[unit_id]
total_minutes = int(total_seconds / 60)
unit_time_stats_data.append({
"单元ID": f"unit_{unit_id}",
"总时长(秒)": total_seconds,
"总时长(分钟)": total_minutes,
})
print(f" [统计] 汇总统计数据生成完成,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}")
print(f" [统计] 生成了{len(component_stats_data)}条组件统计, {len(kp_stats_data)}条知识点统计, {len(unit_time_stats_data)}条单元时长统计")
return (
pd.DataFrame(component_stats_data),
pd.DataFrame(kp_stats_data),
pd.DataFrame(unit_time_stats_data)
)
def write_excel(path: str, sheet1_rows: List[Dict[str, Any]], sheet2_rows: List[Dict[str, Any]], sheet3_rows: List[Dict[str, Any]], sheet4_rows: List[Dict[str, Any]], sheet5_rows: List[Dict[str, Any]], stats_component_df: Any, stats_kp_df: Any, stats_unit_time_df: Any) -> None:
if pd is None:
raise RuntimeError("缺少pandas依赖请安装后再运行。")
print(f" [Excel] 开始写入Excel文件: {path}")
start_time = datetime.datetime.now()
out_dir = os.path.dirname(path) or "."
os.makedirs(out_dir, exist_ok=True)
with pd.ExcelWriter(path, engine="openpyxl") as writer:
pd.DataFrame(sheet1_rows, columns=SHEET1_COLUMNS).to_excel(writer, sheet_name="全部音频数据", index=False)
pd.DataFrame(sheet2_rows, columns=SHEET2_COLUMNS).to_excel(writer, sheet_name="互动组件学习记录", index=False)
pd.DataFrame(sheet3_rows, columns=SHEET3_COLUMNS).to_excel(writer, sheet_name="课程巩固记录", index=False)
pd.DataFrame(sheet4_rows, columns=SHEET4_COLUMNS).to_excel(writer, sheet_name="单元挑战记录", index=False)
pd.DataFrame(sheet5_rows, columns=SHEET5_COLUMNS).to_excel(writer, sheet_name="单元总结记录", index=False)
stats_component_df.to_excel(writer, sheet_name="统计-互动组件通过情况", index=False)
stats_kp_df.to_excel(writer, sheet_name="统计-知识点通过情况", index=False)
stats_unit_time_df.to_excel(writer, sheet_name="统计-单元总结时长", index=False)
print(f" [Excel] 写入完成,耗时{(datetime.datetime.now() - start_time).total_seconds():.2f}")
def get_date_str() -> str:
"""获取当前日期字符串 格式YYYYMMDD"""
return datetime.datetime.now().strftime("%Y%m%d")
def export_single_user(user_id: str, es_cfg: Dict[str, Any], pg_conn: Any, mysql_conn: Any, output_path: str, id_2_unit_index: Dict[int, int], chapter_id_to_lesson_id: Dict[int, int]) -> bool:
"""
导出单个角色id的数据
Args:
user_id: 角色ID
es_cfg: ES配置
pg_conn: PostgreSQL连接
mysql_conn: MySQL连接
output_path: 输出路径
id_2_unit_index: story_id到unit_id的映射字典
chapter_id_to_lesson_id: chapter_id到lesson_id的映射字典
Returns:
True表示成功False表示失败
"""
try:
print(f"\n[INFO] ========== 开始导出角色id={user_id} ==========")
total_start_time = datetime.datetime.now()
# 查询ES数据
sheet1_rows = fetch_es_user_audio(user_id, es_cfg)
# 查询PG数据
sheet2_rows = fetch_pg_play_records(user_id, pg_conn, mysql_conn)
sheet3_rows = fetch_pg_unit_review(user_id, pg_conn, id_2_unit_index, chapter_id_to_lesson_id)
sheet4_rows = fetch_pg_unit_challenge(user_id, pg_conn, id_2_unit_index)
sheet5_rows = fetch_pg_unit_summary(user_id, pg_conn, id_2_unit_index)
# 检查是否有有效数据
total_records = len(sheet1_rows) + len(sheet2_rows) + len(sheet3_rows) + len(sheet4_rows) + len(sheet5_rows)
print(f" [统计] 数据汇总:")
print(f" - 全部音频数据: {len(sheet1_rows)}")
print(f" - 互动组件学习记录: {len(sheet2_rows)}")
print(f" - 课程巩固记录: {len(sheet3_rows)}")
print(f" - 单元挑战记录: {len(sheet4_rows)}")
print(f" - 单元总结记录: {len(sheet5_rows)}")
print(f" - 总计: {total_records}")
if total_records == 0:
print(f"[WARN] 角色id={user_id} 没有找到任何有效记录,跳过导出")
return False
# 生成汇总统计数据
stats_component_df, stats_kp_df, stats_unit_time_df = generate_statistics(sheet2_rows, sheet5_rows)
# 写入Excel
write_excel(output_path, sheet1_rows, sheet2_rows, sheet3_rows, sheet4_rows, sheet5_rows, stats_component_df, stats_kp_df, stats_unit_time_df)
total_time = (datetime.datetime.now() - total_start_time).total_seconds()
print(f"[INFO] 角色id={user_id} 导出成功")
print(f"[INFO] 文件路径: {output_path}")
print(f"[INFO] 总耗时: {total_time:.2f}")
print(f"[INFO] ========== 完成 ==========\n")
return True
except Exception as e:
print(f"[ERROR] 角色id={user_id} 导出失败: {e}")
import traceback
traceback.print_exc()
return False
def main():
load_env()
# 确定运行模式并收集需要导出的角色id列表
user_id_list: List[tuple] = [] # [(user_id, account_id or None), ...]
date_str = get_date_str()
# 检查三种模式的配置
has_user_id = USER_ID is not None
has_user_id_list = USER_ID_LIST is not None and len(USER_ID_LIST) > 0
has_account_id_list = ACCOUNT_ID_LIST is not None and len(ACCOUNT_ID_LIST) > 0
# 验证只能配置一种模式
mode_count = sum([has_user_id, has_user_id_list, has_account_id_list])
if mode_count == 0:
raise RuntimeError("请配置 USER_ID、USER_ID_LIST 或 ACCOUNT_ID_LIST 中的一个")
if mode_count > 1:
raise RuntimeError("USER_ID、USER_ID_LIST、ACCOUNT_ID_LIST 只能配置一个,请检查配置")
# 模式1单个角色id
if has_user_id:
user_id_list = [(str(USER_ID), None)]
print(f"[INFO] 运行模式单个角色id")
# 模式2角色id列表
elif has_user_id_list:
user_id_list = [(str(uid), None) for uid in USER_ID_LIST]
print(f"[INFO] 运行模式角色id列表{len(user_id_list)}个角色")
# 模式3账户id列表
elif has_account_id_list:
print(f"[INFO] 运行模式账户id列表{len(ACCOUNT_ID_LIST)}个账户")
mysql_conn = None
try:
mysql_conn = get_mysql_conn("vala_user") # 查询用户表,使用 vala_user 数据库
for account_id in ACCOUNT_ID_LIST:
account_id_str = str(account_id)
print(f"[INFO] 查询账户id={account_id_str}对应的角色id...")
character_ids = fetch_character_ids_by_account(account_id_str, mysql_conn)
if not character_ids:
print(f"[WARN] 账户id={account_id_str} 未找到关联的角色id跳过")
continue
print(f"[INFO] 账户id={account_id_str} 找到{len(character_ids)}个角色id: {character_ids}")
for cid in character_ids:
user_id_list.append((cid, account_id_str))
finally:
if mysql_conn:
try:
mysql_conn.close()
except Exception:
pass
if not user_id_list:
print("[WARN] 没有需要导出的角色id程序退出")
return
# 初始化连接
es_cfg = get_es_config()
pg_conn = get_pg_conn()
# 获取映射表(只需要查询一次,所有角色共用)
print(f"\n[INFO] ===== 准备工作:获取映射表 =====")
mysql_conn = None
id_2_unit_index = {}
chapter_id_to_lesson_id = {}
try:
print(f"[INFO] 正在连接MySQL数据库vala_test...")
mysql_conn = get_mysql_conn("vala_test") # 查询游戏配置表,使用 vala_test 数据库
print(f"[INFO] 正在获取 story_id 到 unit_id 的映射...")
id_2_unit_index = get_id_2_unit_index(mysql_conn)
print(f"[INFO] 成功获取 {len(id_2_unit_index)} 个 story_id 映射")
print(f"[INFO] 正在获取 chapter_id 到 lesson_id 的映射...")
chapter_id_to_lesson_id = get_chapter_id_to_lesson_id(mysql_conn)
print(f"[INFO] 成功获取 {len(chapter_id_to_lesson_id)} 个 chapter_id 映射")
except Exception as e:
print(f"[ERROR] 获取映射表失败: {e}")
import traceback
traceback.print_exc()
if pg_conn:
try:
pg_conn.close()
except Exception:
pass
if mysql_conn:
try:
mysql_conn.close()
except Exception:
pass
return
try:
# 统计信息
success_count = 0
skip_count = 0
print(f"\n[INFO] ===== 开始批量导出 =====")
print(f"[INFO] 共需导出{len(user_id_list)}个角色\n")
batch_start_time = datetime.datetime.now()
# 循环处理每个角色id
for idx, (user_id, account_id) in enumerate(user_id_list, 1):
print(f"\n{'='*60}")
print(f"[INFO] 进度: {idx}/{len(user_id_list)} ({idx*100//len(user_id_list)}%)")
print(f"{'='*60}")
# 生成输出文件名
if account_id is None:
# 模式1和模式2角色id_{}_导出时间_{}.xlsx
filename = f"角色id_{user_id}_导出时间_{date_str}.xlsx"
else:
# 模式3账户id_{}_角色id_{}_导出时间_{}.xlsx
filename = f"账户id_{account_id}_角色id_{user_id}_导出时间_{date_str}.xlsx"
output_path = os.path.join(OUTPUT_DIR, filename)
# 导出单个角色的数据
result = export_single_user(user_id, es_cfg, pg_conn, mysql_conn, output_path, id_2_unit_index, chapter_id_to_lesson_id)
if result:
success_count += 1
else:
skip_count += 1
# 输出统计信息
batch_total_time = (datetime.datetime.now() - batch_start_time).total_seconds()
print(f"\n{'='*60}")
print(f"[INFO] ===== 全部导出完成 =====")
print(f"[INFO] 总计: {len(user_id_list)}个角色")
print(f"[INFO] 成功: {success_count}")
print(f"[INFO] 跳过: {skip_count}")
print(f"[INFO] 总耗时: {batch_total_time:.2f}秒 ({batch_total_time/60:.2f}分钟)")
if success_count > 0:
print(f"[INFO] 平均每个角色: {batch_total_time/success_count:.2f}")
print(f"{'='*60}\n")
finally:
if pg_conn:
try:
pg_conn.close()
except Exception:
pass
if mysql_conn:
try:
mysql_conn.close()
except Exception:
pass
if __name__ == "__main__":
main()