#!/usr/bin/env python3
"""
剧本文档解析器
从飞书文档中提取末尾的组件类型表格和内嵌sheet中的组件配置
支持两种数据源:
1. 文档 markdown 中的 lark-table(备选)
2. 内嵌 sheet 的二维数组数据(主要)
"""
import re
import json
import html
import logging
logger = logging.getLogger("parse_script")
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s"
))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def parse_lark_table(table_html: str) -> list:
"""
解析飞书lark-table HTML为二维数组
"""
rows = []
# 提取所有行
tr_pattern = re.compile(r'(.*?)', re.DOTALL)
td_pattern = re.compile(r']*)?>(.*?)', re.DOTALL)
for tr_match in tr_pattern.finditer(table_html):
tr_content = tr_match.group(1)
cells = []
for td_match in td_pattern.finditer(tr_content):
cell_content = td_match.group(1).strip()
# 清理HTML标签但保留文本
cell_text = re.sub(r'<[^>]+>', '', cell_content).strip()
# 清理多余空白
cell_text = re.sub(r'\s+', ' ', cell_text).strip()
cells.append(cell_text)
rows.append(cells)
return rows
def extract_component_table(markdown: str) -> list:
"""
从剧本markdown中提取末尾的组件类型表格。
表格特征:第一行包含"类型"和"知识点"列头
Returns:
[
{"index": 1, "type": "对话朗读", "knowledge_points": "school 1"},
{"index": 2, "type": "对话挖空", "knowledge_points": "school 2"},
...
]
"""
# 找到所有lark-table
table_pattern = re.compile(r']*>(.*?)', re.DOTALL)
tables = list(table_pattern.finditer(markdown))
if not tables:
raise ValueError("文档中未找到lark-table表格")
# 从最后一个表格开始往前找,找到包含"类型"列头的表格
component_table = None
for table_match in reversed(tables):
table_html = table_match.group(0)
rows = parse_lark_table(table_html)
if rows and len(rows) > 1:
# 检查第一行是否包含"类型"
header = rows[0]
if any("类型" in cell for cell in header):
component_table = rows
break
if component_table is None:
raise ValueError("未找到包含'类型'列头的组件表格")
# 解析表头,确定列索引
header = component_table[0]
type_col = None
kp_col = None
index_col = None
component_col = None # "组件"列(如果有)
for i, cell in enumerate(header):
cell_lower = cell.strip()
if cell_lower == "类型":
type_col = i
elif "知识点" in cell_lower:
kp_col = i
elif cell_lower == "" and i == 0:
index_col = i # 第一列通常是序号
elif "组件" in cell_lower:
component_col = i
if type_col is None:
raise ValueError(f"表头中未找到'类型'列: {header}")
# 解析数据行
components = []
for row in component_table[1:]:
if len(row) <= type_col:
continue
type_text = row[type_col].strip()
if not type_text:
continue
entry = {
"index": len(components) + 1,
"type": type_text,
}
# 序号
if index_col is not None and len(row) > index_col:
idx_text = row[index_col].strip()
if idx_text.isdigit():
entry["index"] = int(idx_text)
# 知识点
if kp_col is not None and len(row) > kp_col:
kp_text = row[kp_col].strip()
entry["knowledge_points"] = kp_text
# 组件配置(如果有)
if component_col is not None and len(row) > component_col:
comp_text = row[component_col].strip()
if comp_text:
entry["component_config"] = comp_text
components.append(entry)
return components
def extract_sheet_token(markdown: str) -> str:
"""提取内嵌sheet的token"""
match = re.search(r'', markdown)
if match:
return match.group(1)
return None
def extract_script_metadata(markdown: str) -> dict:
"""提取剧本元信息(标题等)"""
metadata = {}
# 尝试从标题提取级别和单元信息
# 标题格式如: L1-S2-U14-L1 到你上学啦
title_match = re.search(r'(L\d+)-S(\d+)-U(\d+)-L(\d+)\s+(.+)', markdown[:500])
if title_match:
metadata["level"] = title_match.group(1)
metadata["season"] = int(title_match.group(2))
metadata["unit"] = int(title_match.group(3))
metadata["lesson"] = int(title_match.group(4))
metadata["title"] = title_match.group(5).strip()
return metadata
def parse_script_document(markdown: str) -> dict:
"""
解析完整剧本文档
Returns:
{
"metadata": {...},
"sheet_token": "xxx_yyy",
"components": [...],
}
"""
result = {
"metadata": extract_script_metadata(markdown),
"sheet_token": extract_sheet_token(markdown),
"components": extract_component_table(markdown),
}
return result
# ============ Sheet 数据解析(主要入口) ============
def extract_component_id(type_cell):
"""
从 sheet 类型单元格中提取组件类型名、组件ID、是否配图
Sheet 中类型列格式:
- "对话朗读+图片\n1214101" → type_name="对话朗读", cId="1214101", has_image=True
- "对话挖空\n1214102" → type_name="对话挖空", cId="1214102", has_image=False
- "合作阅读 0000800" → type_name="合作阅读", cId="0000800" (单行空格分隔)
- "核心互动- 囗语\n听力选择\n0000810" → type_name="听力选择", cId="0000810" (多行带前缀)
- "TL" → None (非组件行)
- "场景" → None (非组件行)
Returns:
dict: {"type_name": str, "cId": str, "has_image": bool} 或 None
"""
if not type_cell or not isinstance(type_cell, str):
return None
text = type_cell.strip()
lines = [l.strip() for l in text.split("\n") if l.strip()]
# 检测是否为非组件标记行
non_component_markers = {
"TL", "场景", "角色", "AI动画", "场景变换", "画面",
"BGM", "SE", "类型",
}
type_part = None
cId = None
has_image = False
# --- 策略1: 标准格式 "类型名\nID" ---
if len(lines) >= 2:
first_line = lines[0]
# 检查每一行是否为纯数字 ID
for i in range(1, len(lines)):
if re.match(r'^\d{5,}$', lines[i]):
cId = lines[i]
# 类型名取 ID 所在行的前一行
type_part = lines[i - 1] if i > 0 else first_line
break
# 如果未在后续行找到 ID,尝试从第一行尾部提取
if cId is None:
m = re.search(r'\s+(\d{5,})$', first_line)
if m:
cId = m.group(1)
type_part = first_line[:m.start()].strip()
# --- 策略2: 单行 "类型名 ID" (空格分隔) ---
if cId is None and len(lines) == 1:
m = re.search(r'\s+(\d{5,})$', lines[0])
if m:
cId = m.group(1)
type_part = lines[0][:m.start()].strip()
# --- 策略3: 经典单行格式(只有一行,ID在第二行)---
if cId is None and len(lines) >= 2:
id_candidate = lines[1]
if re.match(r'^\d+$', id_candidate):
cId = id_candidate
type_part = lines[0]
# 如果依然无 type_part,用第一行
if type_part is None and lines:
type_part = lines[0]
# 去掉"核心互动-"前缀(如果 type_part 本身就是前缀而实际类型在后面)
if type_part and re.match(r'^核心互动', type_part):
# type_part 是前缀,不是实际类型名
# 向后找有效的类型名(非前缀、非ID的行)
for l in lines:
cleaned = l.strip()
if cleaned == type_part or re.match(r'^\d+$', cleaned):
continue
if re.match(r'^核心互动', cleaned):
continue
type_part = cleaned
break
# 过滤非组件标记
if not type_part:
return None
base_type = type_part.replace("+图片", "").replace("-配图", "").strip()
if base_type in non_component_markers or not base_type:
return None
# 如果没有 ID,不是组件行(组件行必须有 ID)
if cId is None:
if re.match(r'^[A-Z0-9_]+$', type_part):
return None
return None
# 检测配图
if "+图片" in type_part:
has_image = True
type_part = type_part.replace("+图片", "").strip()
elif "-配图" in type_part:
has_image = True
type_part = type_part.replace("-配图", "").strip()
return {
"type_name": type_part,
"cId": cId,
"has_image": has_image,
}
# LLM 兜底缓存(避免重复调用)
_llm_fallback_cache = {}
def extract_component_id_with_llm_fallback(type_cell, llm_client=None):
"""
先用正则提取,失败时用 LLM 兜底。
仅在 type_cell 看起来可能是组件行(含中文且有数字)时调用 LLM。
Returns:
dict: {"type_name": str, "cId": str, "has_image": bool} 或 None
"""
result = extract_component_id(type_cell)
if result is not None:
return result
# 快速判断是否值得调 LLM:必须同时包含中文和数字
if not type_cell or not isinstance(type_cell, str):
return None
text = type_cell.strip()
has_chinese = bool(re.search(r'[一-鿿]', text))
has_digits = bool(re.search(r'\d{5,}', text))
if not has_chinese or not has_digits:
return None
if not llm_client:
return None
# 检查缓存
if text in _llm_fallback_cache:
return _llm_fallback_cache[text]
logger.info(f"正则无法解析,尝试 LLM 兜底: {repr(text[:80])}")
try:
system_prompt = """你是组件类型解析器。用户给你一个表格单元格的文本内容,你需要提取:
1. type_name: 组件类型名(如 "对话朗读"、"合作阅读"、"听力选择" 等)
2. cId: 组件ID(纯数字字符串,通常5-7位)
3. has_image: 是否配图(文本中含"+图片"或"-配图"则为true)
注意:
- 忽略"核心互动"等前缀
- type_name 只保留最终的类型名
- 如果无法确定,返回 null
返回 JSON 格式:{"type_name": "...", "cId": "...", "has_image": false}
如果不是组件行,返回 null"""
parsed, _ = llm_client.call_for_json(system_prompt, text, max_tokens=200, temperature=0)
if parsed and isinstance(parsed, dict) and parsed.get("type_name") and parsed.get("cId"):
result = {
"type_name": str(parsed["type_name"]).strip(),
"cId": str(parsed["cId"]).strip(),
"has_image": bool(parsed.get("has_image", False)),
}
_llm_fallback_cache[text] = result
logger.info(f"LLM 兜底成功: {result}")
return result
else:
_llm_fallback_cache[text] = None
return None
except Exception as e:
logger.warning(f"LLM 兜底失败: {e}")
_llm_fallback_cache[text] = None
return None
def extract_section_character_map(markdown):
"""
从文档 markdown 中提取 "角色-section对应" 表格。
格式示例:
# 角色-section对应
S15-S16738-eva
S1-S14663-EVA
653-peter
...
Returns:
list[dict]: [
{
"sections": ["S15", "S16"], # or ["S1".."S14"], or [] for global
"characters": {"Eva": 738}
},
...
]
"""
if not markdown:
return []
# Find the "角色-section对应" section
match = re.search(r'#\s*角色-section对应\s*\n(.*?)(?=\n#|$)', markdown, re.DOTALL)
if not match:
logger.info("文档中未找到 '角色-section对应' 章节")
return []
table_html = match.group(1)
# Parse table rows
rows = re.findall(r'(.*?)', table_html, re.DOTALL)
char_pattern = re.compile(r'(\d{2,})[-_]([A-Za-z]{2,})')
section_range_pattern = re.compile(r'S(\d+)(?:\s*-\s*S(\d+))?', re.IGNORECASE)
entries = []
current_sections = [] # carry forward from previous row if empty
for row_html in rows:
cells = re.findall(r'(.*?)', row_html, re.DOTALL)
if len(cells) < 2:
continue
section_cell = cells[0].strip()
char_cell = cells[1].strip()
# Parse section range
if section_cell:
sections = []
for sm in section_range_pattern.finditer(section_cell):
start = int(sm.group(1))
end = int(sm.group(2)) if sm.group(2) else start
for s in range(start, end + 1):
sections.append(f"S{s}")
if sections:
current_sections = sections
# Parse character
char_match = char_pattern.search(char_cell)
if char_match:
char_id = int(char_match.group(1))
char_name = char_match.group(2).capitalize()
entries.append({
"sections": list(current_sections),
"characters": {char_name: char_id},
})
logger.info(f"提取角色-section对应: {len(entries)} 条记录")
return entries
def resolve_resource_mapping(section_char_map, config_info, fallback_char_map=None):
"""
根据组件的 "配置信息" 字段(如 "S1主线")和角色-section映射,
解析出该组件应使用的 resourceMapping。
Args:
section_char_map: list from extract_section_character_map()
config_info: str, e.g. "S1主线"
fallback_char_map: dict, fallback character map from sheet header
Returns:
dict: {"Eva": 663, "Peter": 653, "Vicky": 658}
"""
if not config_info or not section_char_map:
return fallback_char_map or {}
# Extract section number from config_info (e.g. "S1主线" → "S1")
sm = re.search(r'S(\d+)', config_info, re.IGNORECASE)
if not sm:
return fallback_char_map or {}
section_key = f"S{sm.group(1)}"
result = {}
for entry in section_char_map:
# If entry has no sections, it applies globally
if not entry["sections"] or section_key in entry["sections"]:
result.update(entry["characters"])
if not result:
return fallback_char_map or {}
logger.debug(f"Section {section_key} → resourceMapping: {result}")
return result
def extract_character_map(sheet_rows):
"""
从 sheet 数据中提取角色表:NPC名称 → 角色ID
角色信息通常在表头几行中,格式如 "653-peter"、"663-EVA"、"658-Vicky"
出现在 col B(配置信息)或 col C(剧情描述)中
Returns:
dict: {"Eva": 663, "Peter": 653, "Vicky": 658, ...}
"""
char_map = {}
# 正则匹配 "数字-名字" 或 "数字_名字" 模式
# 名字至少2个字母,避免匹配 L1_S02 之类的剧本标记
pattern = re.compile(r'(\d{2,})[-_]([A-Za-z]{2,})')
for row in sheet_rows[:30]: # 角色表通常在前 30 行
for cell in row:
if not cell or not isinstance(cell, str):
continue
for match in pattern.finditer(cell):
char_id = int(match.group(1))
char_name = match.group(2)
# 过滤掉明显不是角色名的(纯大写短标记如 "TA")
if len(char_name) <= 2 and char_name.isupper():
continue
# 统一首字母大写
normalized = char_name.capitalize()
if normalized not in char_map:
char_map[normalized] = char_id
logger.info(f"提取角色映射: {char_map}")
return char_map
def parse_sheet_rows(sheet_rows, llm_client=None):
"""
从 sheet 二维数组中识别并提取所有组件行
Args:
sheet_rows: list[list] — 来自 feishu_client.read_sheet_data
llm_client: 可选 LLM 客户端,用于正则无法提取时兜底
Returns:
list[dict]: [
{
"row_index": int, # 原始行号(0-based)
"type_name": str, # 中文组件类型名(如 "对话朗读")
"cId": str, # 组件ID(如 "1214101")
"has_image": bool, # 是否配图变体
"teaching_config": str, # 教研配置文本(col G)
"knowledge_text": str, # 知识点文本(col H)
},
...
]
"""
if not sheet_rows or len(sheet_rows) < 2:
return []
# 确定列索引 — 按实际表头匹配
# 实际表头: 类型 | 配置信息 | 剧情描述 | 角色名 | 编剧台词 | 英文台词 | 组件配置 | 知识点
header = sheet_rows[0]
col_map = {}
component_cols = [] # 可能有多个"组件配置"列
for i, cell in enumerate(header):
if not cell:
continue
cell_str = str(cell).strip()
if cell_str == "类型":
col_map["type"] = i
elif cell_str == "配置信息":
col_map["config_info"] = i
elif cell_str == "ID":
col_map["id"] = i
elif "组件" in cell_str and "配置" in cell_str:
# "组件配置" — 教研配置文本列(可能有多个)
component_cols.append(i)
elif "知识点" in cell_str:
col_map["knowledge"] = i
type_col = col_map.get("type", 0)
kp_col = col_map.get("knowledge")
config_info_col = col_map.get("config_info")
id_col = col_map.get("id")
# 组件配置列:优先选第一个(通常包含配置文本),如有多个则全保留用于fallback
comp_col = component_cols[0] if component_cols else None
alt_comp_cols = component_cols[1:] if len(component_cols) > 1 else []
# 回退默认值(以防表头格式变化)
if comp_col is None:
logger.warning("sheet表头中未找到'组件配置'列,回退到默认列索引6")
comp_col = 6 if len(header) > 6 else None
if kp_col is None:
logger.warning("sheet表头中未找到'知识点'列,回退到默认列索引7")
kp_col = 7 if len(header) > 7 else None
if config_info_col is None:
logger.warning("sheet表头中未找到'配置信息'列,回退到默认列索引1")
config_info_col = 1 if len(header) > 1 else None
logger.info(f"列映射: 类型={type_col}, 配置信息={config_info_col}, 组件配置={comp_col}, 知识点={kp_col}, ID={id_col}, alt_comp={alt_comp_cols}")
components = []
for row_idx, row in enumerate(sheet_rows[1:], start=1):
# 获取类型列
type_cell = row[type_col] if len(row) > type_col else None
comp_info = extract_component_id_with_llm_fallback(type_cell, llm_client)
# 如果类型列没有嵌入式ID,但有独立ID列且类型名是纯中文组件名
if comp_info is None and id_col is not None:
raw_cell = str(row[id_col]).strip() if len(row) > id_col and row[id_col] else ""
if raw_cell and re.match(r'^\d{5,}$', raw_cell):
# 从类型列提取纯类型名(可能是"图片多选"、"对话挖空"等简单名称)
type_text = str(type_cell).strip() if type_cell else ""
# 过滤非组件标记
base_type = type_text.split('\n')[0].strip() if '\n' in type_text else type_text
non_component_markers = {
"TL", "场景", "角色", "AI动画", "场景变换", "画面",
"BGM", "SE", "类型", "测试类型",
}
if base_type and base_type not in non_component_markers:
has_image = "+图片" in type_text or "-配图" in type_text
clean_type = type_text.replace("+图片", "").replace("-配图", "").strip()
comp_info = {
"type_name": clean_type,
"cId": raw_cell,
"has_image": has_image,
}
if comp_info is None:
continue
# 获取教研配置(组件列 + 备选列 fallback)
teaching_config = ""
for col in [comp_col] + alt_comp_cols:
if col is not None and len(row) > col and row[col]:
teaching_config = str(row[col]).strip()
break
# 如果配置信息列有内容且组件配置为空,用配置信息列作为fallback
if not teaching_config and config_info_col is not None:
if len(row) > config_info_col and row[config_info_col]:
cfg = str(row[config_info_col]).strip()
# 只取包含数字ID的配置信息行(排除纯TL标记)
if re.search(r'\d{5,}', cfg):
teaching_config = cfg
# 必须有教研配置才算有效组件行
if not teaching_config:
continue
# 获取知识点
knowledge_text = ""
if kp_col is not None and len(row) > kp_col and row[kp_col]:
knowledge_text = str(row[kp_col]).strip()
# 获取配置信息(如 "S1主线")
config_info = ""
if config_info_col is not None and len(row) > config_info_col and row[config_info_col]:
config_info = str(row[config_info_col]).strip()
components.append({
"row_index": row_idx,
"type_name": comp_info["type_name"],
"cId": comp_info["cId"],
"has_image": comp_info["has_image"],
"teaching_config": teaching_config,
"knowledge_text": knowledge_text,
"config_info": config_info,
})
logger.info(f"从 sheet 中识别到 {len(components)} 个组件行")
return components
def parse_script_from_sheet(sheet_rows, markdown="", llm_client=None):
"""
从 sheet 数据解析完整剧本信息
Returns:
dict: {
"metadata": {...},
"character_map": {"Eva": 663, ...},
"section_char_map": [...], # from "角色-section对应"
"components": [...],
}
"""
metadata = {}
section_char_map = []
if markdown:
metadata = extract_script_metadata(markdown)
section_char_map = extract_section_character_map(markdown)
character_map = extract_character_map(sheet_rows)
components = parse_sheet_rows(sheet_rows, llm_client=llm_client)
return {
"metadata": metadata,
"character_map": character_map,
"section_char_map": section_char_map,
"components": components,
}
# ============ CLI ============
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
with open(sys.argv[1], 'r') as f:
markdown = f.read()
result = parse_script_document(markdown)
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
# 测试用硬编码的markdown片段
test_md = """
类型
知识点
1
对话朗读
school 1
2
对话挖空
school 2
3
听力选择
school 3 study 2
"""
result = parse_script_document(test_md)
print(json.dumps(result, ensure_ascii=False, indent=2))