ai_member_xiaoyan/skills/interactive-component-json/scripts/parse_script.py

710 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
剧本文档解析器
从飞书文档中提取末尾的组件类型表格和内嵌sheet中的组件配置
支持两种数据源:
1. 文档 markdown 中的 lark-table备选
2. 内嵌 sheet 的二维数组数据(主要)
"""
import re
import json
import html
import logging
logger = logging.getLogger("parse_script")
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s"
))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def parse_lark_table(table_html: str) -> list:
"""
解析飞书lark-table HTML为二维数组
"""
rows = []
# 提取所有行
tr_pattern = re.compile(r'<lark-tr>(.*?)</lark-tr>', re.DOTALL)
td_pattern = re.compile(r'<lark-td(?:\s+[^>]*)?>(.*?)</lark-td>', re.DOTALL)
for tr_match in tr_pattern.finditer(table_html):
tr_content = tr_match.group(1)
cells = []
for td_match in td_pattern.finditer(tr_content):
cell_content = td_match.group(1).strip()
# 清理HTML标签但保留文本
cell_text = re.sub(r'<[^>]+>', '', cell_content).strip()
# 清理多余空白
cell_text = re.sub(r'\s+', ' ', cell_text).strip()
cells.append(cell_text)
rows.append(cells)
return rows
def extract_component_table(markdown: str) -> list:
"""
从剧本markdown中提取末尾的组件类型表格。
表格特征:第一行包含"类型""知识点"列头
Returns:
[
{"index": 1, "type": "对话朗读", "knowledge_points": "school 1"},
{"index": 2, "type": "对话挖空", "knowledge_points": "school 2"},
...
]
"""
# 找到所有lark-table
table_pattern = re.compile(r'<lark-table[^>]*>(.*?)</lark-table>', re.DOTALL)
tables = list(table_pattern.finditer(markdown))
if not tables:
raise ValueError("文档中未找到lark-table表格")
# 从最后一个表格开始往前找,找到包含"类型"列头的表格
component_table = None
for table_match in reversed(tables):
table_html = table_match.group(0)
rows = parse_lark_table(table_html)
if rows and len(rows) > 1:
# 检查第一行是否包含"类型"
header = rows[0]
if any("类型" in cell for cell in header):
component_table = rows
break
if component_table is None:
raise ValueError("未找到包含'类型'列头的组件表格")
# 解析表头,确定列索引
header = component_table[0]
type_col = None
kp_col = None
index_col = None
component_col = None # "组件"列(如果有)
for i, cell in enumerate(header):
cell_lower = cell.strip()
if cell_lower == "类型":
type_col = i
elif "知识点" in cell_lower:
kp_col = i
elif cell_lower == "" and i == 0:
index_col = i # 第一列通常是序号
elif "组件" in cell_lower:
component_col = i
if type_col is None:
raise ValueError(f"表头中未找到'类型'列: {header}")
# 解析数据行
components = []
for row in component_table[1:]:
if len(row) <= type_col:
continue
type_text = row[type_col].strip()
if not type_text:
continue
entry = {
"index": len(components) + 1,
"type": type_text,
}
# 序号
if index_col is not None and len(row) > index_col:
idx_text = row[index_col].strip()
if idx_text.isdigit():
entry["index"] = int(idx_text)
# 知识点
if kp_col is not None and len(row) > kp_col:
kp_text = row[kp_col].strip()
entry["knowledge_points"] = kp_text
# 组件配置(如果有)
if component_col is not None and len(row) > component_col:
comp_text = row[component_col].strip()
if comp_text:
entry["component_config"] = comp_text
components.append(entry)
return components
def extract_sheet_token(markdown: str) -> str:
"""提取内嵌sheet的token"""
match = re.search(r'<sheet\s+token="([^"]+)"\s*/>', markdown)
if match:
return match.group(1)
return None
def extract_script_metadata(markdown: str) -> dict:
"""提取剧本元信息(标题等)"""
metadata = {}
# 尝试从标题提取级别和单元信息
# 标题格式如: L1-S2-U14-L1 到你上学啦
title_match = re.search(r'(L\d+)-S(\d+)-U(\d+)-L(\d+)\s+(.+)', markdown[:500])
if title_match:
metadata["level"] = title_match.group(1)
metadata["season"] = int(title_match.group(2))
metadata["unit"] = int(title_match.group(3))
metadata["lesson"] = int(title_match.group(4))
metadata["title"] = title_match.group(5).strip()
return metadata
def parse_script_document(markdown: str) -> dict:
"""
解析完整剧本文档
Returns:
{
"metadata": {...},
"sheet_token": "xxx_yyy",
"components": [...],
}
"""
result = {
"metadata": extract_script_metadata(markdown),
"sheet_token": extract_sheet_token(markdown),
"components": extract_component_table(markdown),
}
return result
# ============ Sheet 数据解析(主要入口) ============
def extract_component_id(type_cell):
"""
从 sheet 类型单元格中提取组件类型名、组件ID、是否配图
Sheet 中类型列格式:
- "对话朗读+图片\n1214101" → type_name="对话朗读", cId="1214101", has_image=True
- "对话挖空\n1214102" → type_name="对话挖空", cId="1214102", has_image=False
- "合作阅读 0000800" → type_name="合作阅读", cId="0000800" (单行空格分隔)
- "核心互动- 囗语\n听力选择\n0000810" → type_name="听力选择", cId="0000810" (多行带前缀)
- "TL" → None (非组件行)
- "场景" → None (非组件行)
Returns:
dict: {"type_name": str, "cId": str, "has_image": bool} 或 None
"""
if not type_cell or not isinstance(type_cell, str):
return None
text = type_cell.strip()
lines = [l.strip() for l in text.split("\n") if l.strip()]
# 检测是否为非组件标记行
non_component_markers = {
"TL", "场景", "角色", "AI动画", "场景变换", "画面",
"BGM", "SE", "类型",
}
type_part = None
cId = None
has_image = False
# --- 策略1: 标准格式 "类型名\nID" ---
if len(lines) >= 2:
first_line = lines[0]
# 检查每一行是否为纯数字 ID
for i in range(1, len(lines)):
if re.match(r'^\d{5,}$', lines[i]):
cId = lines[i]
# 类型名取 ID 所在行的前一行
type_part = lines[i - 1] if i > 0 else first_line
break
# 如果未在后续行找到 ID尝试从第一行尾部提取
if cId is None:
m = re.search(r'\s+(\d{5,})$', first_line)
if m:
cId = m.group(1)
type_part = first_line[:m.start()].strip()
# --- 策略2: 单行 "类型名 ID" (空格分隔) ---
if cId is None and len(lines) == 1:
m = re.search(r'\s+(\d{5,})$', lines[0])
if m:
cId = m.group(1)
type_part = lines[0][:m.start()].strip()
# --- 策略3: 经典单行格式只有一行ID在第二行---
if cId is None and len(lines) >= 2:
id_candidate = lines[1]
if re.match(r'^\d+$', id_candidate):
cId = id_candidate
type_part = lines[0]
# 如果依然无 type_part用第一行
if type_part is None and lines:
type_part = lines[0]
# 去掉"核心互动-"前缀(如果 type_part 本身就是前缀而实际类型在后面)
if type_part and re.match(r'^核心互动', type_part):
# type_part 是前缀,不是实际类型名
# 向后找有效的类型名非前缀、非ID的行
for l in lines:
cleaned = l.strip()
if cleaned == type_part or re.match(r'^\d+$', cleaned):
continue
if re.match(r'^核心互动', cleaned):
continue
type_part = cleaned
break
# 过滤非组件标记
if not type_part:
return None
base_type = type_part.replace("+图片", "").replace("-配图", "").strip()
if base_type in non_component_markers or not base_type:
return None
# 如果没有 ID不是组件行组件行必须有 ID
if cId is None:
if re.match(r'^[A-Z0-9_]+$', type_part):
return None
return None
# 检测配图
if "+图片" in type_part:
has_image = True
type_part = type_part.replace("+图片", "").strip()
elif "-配图" in type_part:
has_image = True
type_part = type_part.replace("-配图", "").strip()
return {
"type_name": type_part,
"cId": cId,
"has_image": has_image,
}
# LLM 兜底缓存(避免重复调用)
_llm_fallback_cache = {}
def extract_component_id_with_llm_fallback(type_cell, llm_client=None):
"""
先用正则提取,失败时用 LLM 兜底。
仅在 type_cell 看起来可能是组件行(含中文且有数字)时调用 LLM。
Returns:
dict: {"type_name": str, "cId": str, "has_image": bool} 或 None
"""
result = extract_component_id(type_cell)
if result is not None:
return result
# 快速判断是否值得调 LLM必须同时包含中文和数字
if not type_cell or not isinstance(type_cell, str):
return None
text = type_cell.strip()
has_chinese = bool(re.search(r'[一-鿿]', text))
has_digits = bool(re.search(r'\d{5,}', text))
if not has_chinese or not has_digits:
return None
if not llm_client:
return None
# 检查缓存
if text in _llm_fallback_cache:
return _llm_fallback_cache[text]
logger.info(f"正则无法解析,尝试 LLM 兜底: {repr(text[:80])}")
try:
system_prompt = """你是组件类型解析器。用户给你一个表格单元格的文本内容,你需要提取:
1. type_name: 组件类型名(如 "对话朗读""合作阅读""听力选择" 等)
2. cId: 组件ID纯数字字符串通常5-7位
3. has_image: 是否配图(文本中含"+图片""-配图"则为true
注意:
- 忽略"核心互动"等前缀
- type_name 只保留最终的类型名
- 如果无法确定,返回 null
返回 JSON 格式:{"type_name": "...", "cId": "...", "has_image": false}
如果不是组件行,返回 null"""
parsed, _ = llm_client.call_for_json(system_prompt, text, max_tokens=200, temperature=0)
if parsed and isinstance(parsed, dict) and parsed.get("type_name") and parsed.get("cId"):
result = {
"type_name": str(parsed["type_name"]).strip(),
"cId": str(parsed["cId"]).strip(),
"has_image": bool(parsed.get("has_image", False)),
}
_llm_fallback_cache[text] = result
logger.info(f"LLM 兜底成功: {result}")
return result
else:
_llm_fallback_cache[text] = None
return None
except Exception as e:
logger.warning(f"LLM 兜底失败: {e}")
_llm_fallback_cache[text] = None
return None
def extract_section_character_map(markdown):
"""
从文档 markdown 中提取 "角色-section对应" 表格。
格式示例:
# 角色-section对应
<lark-table ...>
<lark-tr><lark-td>S15-S16</lark-td><lark-td>738-eva</lark-td></lark-tr>
<lark-tr><lark-td>S1-S14</lark-td><lark-td>663-EVA</lark-td></lark-tr>
<lark-tr><lark-td></lark-td><lark-td>653-peter</lark-td></lark-tr>
...
Returns:
list[dict]: [
{
"sections": ["S15", "S16"], # or ["S1".."S14"], or [] for global
"characters": {"Eva": 738}
},
...
]
"""
if not markdown:
return []
# Find the "角色-section对应" section
match = re.search(r'#\s*角色-section对应\s*\n(.*?)(?=\n#|$)', markdown, re.DOTALL)
if not match:
logger.info("文档中未找到 '角色-section对应' 章节")
return []
table_html = match.group(1)
# Parse table rows
rows = re.findall(r'<lark-tr>(.*?)</lark-tr>', table_html, re.DOTALL)
char_pattern = re.compile(r'(\d{2,})[-_]([A-Za-z]{2,})')
section_range_pattern = re.compile(r'S(\d+)(?:\s*-\s*S(\d+))?', re.IGNORECASE)
entries = []
current_sections = [] # carry forward from previous row if empty
for row_html in rows:
cells = re.findall(r'<lark-td>(.*?)</lark-td>', row_html, re.DOTALL)
if len(cells) < 2:
continue
section_cell = cells[0].strip()
char_cell = cells[1].strip()
# Parse section range
if section_cell:
sections = []
for sm in section_range_pattern.finditer(section_cell):
start = int(sm.group(1))
end = int(sm.group(2)) if sm.group(2) else start
for s in range(start, end + 1):
sections.append(f"S{s}")
if sections:
current_sections = sections
# Parse character
char_match = char_pattern.search(char_cell)
if char_match:
char_id = int(char_match.group(1))
char_name = char_match.group(2).capitalize()
entries.append({
"sections": list(current_sections),
"characters": {char_name: char_id},
})
logger.info(f"提取角色-section对应: {len(entries)} 条记录")
return entries
def resolve_resource_mapping(section_char_map, config_info, fallback_char_map=None):
"""
根据组件的 "配置信息" 字段(如 "S1主线")和角色-section映射
解析出该组件应使用的 resourceMapping。
Args:
section_char_map: list from extract_section_character_map()
config_info: str, e.g. "S1主线"
fallback_char_map: dict, fallback character map from sheet header
Returns:
dict: {"Eva": 663, "Peter": 653, "Vicky": 658}
"""
if not config_info or not section_char_map:
return fallback_char_map or {}
# Extract section number from config_info (e.g. "S1主线" → "S1")
sm = re.search(r'S(\d+)', config_info, re.IGNORECASE)
if not sm:
return fallback_char_map or {}
section_key = f"S{sm.group(1)}"
result = {}
for entry in section_char_map:
# If entry has no sections, it applies globally
if not entry["sections"] or section_key in entry["sections"]:
result.update(entry["characters"])
if not result:
return fallback_char_map or {}
logger.debug(f"Section {section_key} → resourceMapping: {result}")
return result
def extract_character_map(sheet_rows):
"""
从 sheet 数据中提取角色表NPC名称 → 角色ID
角色信息通常在表头几行中,格式如 "653-peter""663-EVA""658-Vicky"
出现在 col B配置信息或 col C剧情描述
Returns:
dict: {"Eva": 663, "Peter": 653, "Vicky": 658, ...}
"""
char_map = {}
# 正则匹配 "数字-名字" 或 "数字_名字" 模式
# 名字至少2个字母避免匹配 L1_S02 之类的剧本标记
pattern = re.compile(r'(\d{2,})[-_]([A-Za-z]{2,})')
for row in sheet_rows[:30]: # 角色表通常在前 30 行
for cell in row:
if not cell or not isinstance(cell, str):
continue
for match in pattern.finditer(cell):
char_id = int(match.group(1))
char_name = match.group(2)
# 过滤掉明显不是角色名的(纯大写短标记如 "TA"
if len(char_name) <= 2 and char_name.isupper():
continue
# 统一首字母大写
normalized = char_name.capitalize()
if normalized not in char_map:
char_map[normalized] = char_id
logger.info(f"提取角色映射: {char_map}")
return char_map
def parse_sheet_rows(sheet_rows, llm_client=None):
"""
从 sheet 二维数组中识别并提取所有组件行
Args:
sheet_rows: list[list] — 来自 feishu_client.read_sheet_data
llm_client: 可选 LLM 客户端,用于正则无法提取时兜底
Returns:
list[dict]: [
{
"row_index": int, # 原始行号0-based
"type_name": str, # 中文组件类型名(如 "对话朗读"
"cId": str, # 组件ID"1214101"
"has_image": bool, # 是否配图变体
"teaching_config": str, # 教研配置文本col G
"knowledge_text": str, # 知识点文本col H
},
...
]
"""
if not sheet_rows or len(sheet_rows) < 2:
return []
# 确定列索引 — 按实际表头匹配
# 实际表头: 类型 | 配置信息 | 剧情描述 | 角色名 | 编剧台词 | 英文台词 | 组件配置 | 知识点
header = sheet_rows[0]
col_map = {}
component_cols = [] # 可能有多个"组件配置"列
for i, cell in enumerate(header):
if not cell:
continue
cell_str = str(cell).strip()
if cell_str == "类型":
col_map["type"] = i
elif cell_str == "配置信息":
col_map["config_info"] = i
elif cell_str == "ID":
col_map["id"] = i
elif "组件" in cell_str and "配置" in cell_str:
# "组件配置" — 教研配置文本列(可能有多个)
component_cols.append(i)
elif "知识点" in cell_str:
col_map["knowledge"] = i
type_col = col_map.get("type", 0)
kp_col = col_map.get("knowledge")
config_info_col = col_map.get("config_info")
id_col = col_map.get("id")
# 组件配置列优先选第一个通常包含配置文本如有多个则全保留用于fallback
comp_col = component_cols[0] if component_cols else None
alt_comp_cols = component_cols[1:] if len(component_cols) > 1 else []
# 回退默认值(以防表头格式变化)
if comp_col is None:
logger.warning("sheet表头中未找到'组件配置'回退到默认列索引6")
comp_col = 6 if len(header) > 6 else None
if kp_col is None:
logger.warning("sheet表头中未找到'知识点'回退到默认列索引7")
kp_col = 7 if len(header) > 7 else None
if config_info_col is None:
logger.warning("sheet表头中未找到'配置信息'回退到默认列索引1")
config_info_col = 1 if len(header) > 1 else None
logger.info(f"列映射: 类型={type_col}, 配置信息={config_info_col}, 组件配置={comp_col}, 知识点={kp_col}, ID={id_col}, alt_comp={alt_comp_cols}")
components = []
for row_idx, row in enumerate(sheet_rows[1:], start=1):
# 获取类型列
type_cell = row[type_col] if len(row) > type_col else None
comp_info = extract_component_id_with_llm_fallback(type_cell, llm_client)
# 如果类型列没有嵌入式ID但有独立ID列且类型名是纯中文组件名
if comp_info is None and id_col is not None:
raw_cell = str(row[id_col]).strip() if len(row) > id_col and row[id_col] else ""
if raw_cell and re.match(r'^\d{5,}$', raw_cell):
# 从类型列提取纯类型名(可能是"图片多选"、"对话挖空"等简单名称)
type_text = str(type_cell).strip() if type_cell else ""
# 过滤非组件标记
base_type = type_text.split('\n')[0].strip() if '\n' in type_text else type_text
non_component_markers = {
"TL", "场景", "角色", "AI动画", "场景变换", "画面",
"BGM", "SE", "类型", "测试类型",
}
if base_type and base_type not in non_component_markers:
has_image = "+图片" in type_text or "-配图" in type_text
clean_type = type_text.replace("+图片", "").replace("-配图", "").strip()
comp_info = {
"type_name": clean_type,
"cId": raw_cell,
"has_image": has_image,
}
if comp_info is None:
continue
# 获取教研配置(组件列 + 备选列 fallback
teaching_config = ""
for col in [comp_col] + alt_comp_cols:
if col is not None and len(row) > col and row[col]:
teaching_config = str(row[col]).strip()
break
# 如果配置信息列有内容且组件配置为空用配置信息列作为fallback
if not teaching_config and config_info_col is not None:
if len(row) > config_info_col and row[config_info_col]:
cfg = str(row[config_info_col]).strip()
# 只取包含数字ID的配置信息行排除纯TL标记
if re.search(r'\d{5,}', cfg):
teaching_config = cfg
# 必须有教研配置才算有效组件行
if not teaching_config:
continue
# 获取知识点
knowledge_text = ""
if kp_col is not None and len(row) > kp_col and row[kp_col]:
knowledge_text = str(row[kp_col]).strip()
# 获取配置信息(如 "S1主线"
config_info = ""
if config_info_col is not None and len(row) > config_info_col and row[config_info_col]:
config_info = str(row[config_info_col]).strip()
components.append({
"row_index": row_idx,
"type_name": comp_info["type_name"],
"cId": comp_info["cId"],
"has_image": comp_info["has_image"],
"teaching_config": teaching_config,
"knowledge_text": knowledge_text,
"config_info": config_info,
})
logger.info(f"从 sheet 中识别到 {len(components)} 个组件行")
return components
def parse_script_from_sheet(sheet_rows, markdown="", llm_client=None):
"""
从 sheet 数据解析完整剧本信息
Returns:
dict: {
"metadata": {...},
"character_map": {"Eva": 663, ...},
"section_char_map": [...], # from "角色-section对应"
"components": [...],
}
"""
metadata = {}
section_char_map = []
if markdown:
metadata = extract_script_metadata(markdown)
section_char_map = extract_section_character_map(markdown)
character_map = extract_character_map(sheet_rows)
components = parse_sheet_rows(sheet_rows, llm_client=llm_client)
return {
"metadata": metadata,
"character_map": character_map,
"section_char_map": section_char_map,
"components": components,
}
# ============ CLI ============
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
with open(sys.argv[1], 'r') as f:
markdown = f.read()
result = parse_script_document(markdown)
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
# 测试用硬编码的markdown片段
test_md = """
<sheet token="SlFGsyYkPh33kZtDQtecxc7vn4c_m8vOBk"/>
<lark-table rows="4" cols="3" column-widths="100,100,100">
<lark-tr>
<lark-td></lark-td>
<lark-td>类型</lark-td>
<lark-td>知识点</lark-td>
</lark-tr>
<lark-tr>
<lark-td>1</lark-td>
<lark-td>对话朗读</lark-td>
<lark-td>school 1</lark-td>
</lark-tr>
<lark-tr>
<lark-td>2</lark-td>
<lark-td>对话挖空</lark-td>
<lark-td>school 2</lark-td>
</lark-tr>
<lark-tr>
<lark-td>3</lark-td>
<lark-td>听力选择</lark-td>
<lark-td>school 3 study 2</lark-td>
</lark-tr>
</lark-table>
"""
result = parse_script_document(test_md)
print(json.dumps(result, ensure_ascii=False, indent=2))