#!/usr/bin/env python3 """ 剧本文档解析器 从飞书文档中提取末尾的组件类型表格和内嵌sheet中的组件配置 支持两种数据源: 1. 文档 markdown 中的 lark-table(备选) 2. 内嵌 sheet 的二维数组数据(主要) """ import re import json import html import logging logger = logging.getLogger("parse_script") if not logger.handlers: handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( "%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s" )) logger.addHandler(handler) logger.setLevel(logging.INFO) def parse_lark_table(table_html: str) -> list: """ 解析飞书lark-table HTML为二维数组 """ rows = [] # 提取所有行 tr_pattern = re.compile(r'(.*?)', re.DOTALL) td_pattern = re.compile(r']*)?>(.*?)', re.DOTALL) for tr_match in tr_pattern.finditer(table_html): tr_content = tr_match.group(1) cells = [] for td_match in td_pattern.finditer(tr_content): cell_content = td_match.group(1).strip() # 清理HTML标签但保留文本 cell_text = re.sub(r'<[^>]+>', '', cell_content).strip() # 清理多余空白 cell_text = re.sub(r'\s+', ' ', cell_text).strip() cells.append(cell_text) rows.append(cells) return rows def extract_component_table(markdown: str) -> list: """ 从剧本markdown中提取末尾的组件类型表格。 表格特征:第一行包含"类型"和"知识点"列头 Returns: [ {"index": 1, "type": "对话朗读", "knowledge_points": "school 1"}, {"index": 2, "type": "对话挖空", "knowledge_points": "school 2"}, ... ] """ # 找到所有lark-table table_pattern = re.compile(r']*>(.*?)', re.DOTALL) tables = list(table_pattern.finditer(markdown)) if not tables: raise ValueError("文档中未找到lark-table表格") # 从最后一个表格开始往前找,找到包含"类型"列头的表格 component_table = None for table_match in reversed(tables): table_html = table_match.group(0) rows = parse_lark_table(table_html) if rows and len(rows) > 1: # 检查第一行是否包含"类型" header = rows[0] if any("类型" in cell for cell in header): component_table = rows break if component_table is None: raise ValueError("未找到包含'类型'列头的组件表格") # 解析表头,确定列索引 header = component_table[0] type_col = None kp_col = None index_col = None component_col = None # "组件"列(如果有) for i, cell in enumerate(header): cell_lower = cell.strip() if cell_lower == "类型": type_col = i elif "知识点" in cell_lower: kp_col = i elif cell_lower == "" and i == 0: index_col = i # 第一列通常是序号 elif "组件" in cell_lower: component_col = i if type_col is None: raise ValueError(f"表头中未找到'类型'列: {header}") # 解析数据行 components = [] for row in component_table[1:]: if len(row) <= type_col: continue type_text = row[type_col].strip() if not type_text: continue entry = { "index": len(components) + 1, "type": type_text, } # 序号 if index_col is not None and len(row) > index_col: idx_text = row[index_col].strip() if idx_text.isdigit(): entry["index"] = int(idx_text) # 知识点 if kp_col is not None and len(row) > kp_col: kp_text = row[kp_col].strip() entry["knowledge_points"] = kp_text # 组件配置(如果有) if component_col is not None and len(row) > component_col: comp_text = row[component_col].strip() if comp_text: entry["component_config"] = comp_text components.append(entry) return components def extract_sheet_token(markdown: str) -> str: """提取内嵌sheet的token""" match = re.search(r'', markdown) if match: return match.group(1) return None def extract_script_metadata(markdown: str) -> dict: """提取剧本元信息(标题等)""" metadata = {} # 尝试从标题提取级别和单元信息 # 标题格式如: L1-S2-U14-L1 到你上学啦 title_match = re.search(r'(L\d+)-S(\d+)-U(\d+)-L(\d+)\s+(.+)', markdown[:500]) if title_match: metadata["level"] = title_match.group(1) metadata["season"] = int(title_match.group(2)) metadata["unit"] = int(title_match.group(3)) metadata["lesson"] = int(title_match.group(4)) metadata["title"] = title_match.group(5).strip() return metadata def parse_script_document(markdown: str) -> dict: """ 解析完整剧本文档 Returns: { "metadata": {...}, "sheet_token": "xxx_yyy", "components": [...], } """ result = { "metadata": extract_script_metadata(markdown), "sheet_token": extract_sheet_token(markdown), "components": extract_component_table(markdown), } return result # ============ Sheet 数据解析(主要入口) ============ def extract_component_id(type_cell): """ 从 sheet 类型单元格中提取组件类型名、组件ID、是否配图 Sheet 中类型列格式: - "对话朗读+图片\n1214101" → type_name="对话朗读", cId="1214101", has_image=True - "对话挖空\n1214102" → type_name="对话挖空", cId="1214102", has_image=False - "合作阅读 0000800" → type_name="合作阅读", cId="0000800" (单行空格分隔) - "核心互动- 囗语\n听力选择\n0000810" → type_name="听力选择", cId="0000810" (多行带前缀) - "TL" → None (非组件行) - "场景" → None (非组件行) Returns: dict: {"type_name": str, "cId": str, "has_image": bool} 或 None """ if not type_cell or not isinstance(type_cell, str): return None text = type_cell.strip() lines = [l.strip() for l in text.split("\n") if l.strip()] # 检测是否为非组件标记行 non_component_markers = { "TL", "场景", "角色", "AI动画", "场景变换", "画面", "BGM", "SE", "类型", } type_part = None cId = None has_image = False # --- 策略1: 标准格式 "类型名\nID" --- if len(lines) >= 2: first_line = lines[0] # 检查每一行是否为纯数字 ID for i in range(1, len(lines)): if re.match(r'^\d{5,}$', lines[i]): cId = lines[i] # 类型名取 ID 所在行的前一行 type_part = lines[i - 1] if i > 0 else first_line break # 如果未在后续行找到 ID,尝试从第一行尾部提取 if cId is None: m = re.search(r'\s+(\d{5,})$', first_line) if m: cId = m.group(1) type_part = first_line[:m.start()].strip() # --- 策略2: 单行 "类型名 ID" (空格分隔) --- if cId is None and len(lines) == 1: m = re.search(r'\s+(\d{5,})$', lines[0]) if m: cId = m.group(1) type_part = lines[0][:m.start()].strip() # --- 策略3: 经典单行格式(只有一行,ID在第二行)--- if cId is None and len(lines) >= 2: id_candidate = lines[1] if re.match(r'^\d+$', id_candidate): cId = id_candidate type_part = lines[0] # 如果依然无 type_part,用第一行 if type_part is None and lines: type_part = lines[0] # 去掉"核心互动-"前缀(如果 type_part 本身就是前缀而实际类型在后面) if type_part and re.match(r'^核心互动', type_part): # type_part 是前缀,不是实际类型名 # 向后找有效的类型名(非前缀、非ID的行) for l in lines: cleaned = l.strip() if cleaned == type_part or re.match(r'^\d+$', cleaned): continue if re.match(r'^核心互动', cleaned): continue type_part = cleaned break # 过滤非组件标记 if not type_part: return None base_type = type_part.replace("+图片", "").replace("-配图", "").strip() if base_type in non_component_markers or not base_type: return None # 如果没有 ID,不是组件行(组件行必须有 ID) if cId is None: if re.match(r'^[A-Z0-9_]+$', type_part): return None return None # 检测配图 if "+图片" in type_part: has_image = True type_part = type_part.replace("+图片", "").strip() elif "-配图" in type_part: has_image = True type_part = type_part.replace("-配图", "").strip() return { "type_name": type_part, "cId": cId, "has_image": has_image, } # LLM 兜底缓存(避免重复调用) _llm_fallback_cache = {} def extract_component_id_with_llm_fallback(type_cell, llm_client=None): """ 先用正则提取,失败时用 LLM 兜底。 仅在 type_cell 看起来可能是组件行(含中文且有数字)时调用 LLM。 Returns: dict: {"type_name": str, "cId": str, "has_image": bool} 或 None """ result = extract_component_id(type_cell) if result is not None: return result # 快速判断是否值得调 LLM:必须同时包含中文和数字 if not type_cell or not isinstance(type_cell, str): return None text = type_cell.strip() has_chinese = bool(re.search(r'[一-鿿]', text)) has_digits = bool(re.search(r'\d{5,}', text)) if not has_chinese or not has_digits: return None if not llm_client: return None # 检查缓存 if text in _llm_fallback_cache: return _llm_fallback_cache[text] logger.info(f"正则无法解析,尝试 LLM 兜底: {repr(text[:80])}") try: system_prompt = """你是组件类型解析器。用户给你一个表格单元格的文本内容,你需要提取: 1. type_name: 组件类型名(如 "对话朗读"、"合作阅读"、"听力选择" 等) 2. cId: 组件ID(纯数字字符串,通常5-7位) 3. has_image: 是否配图(文本中含"+图片"或"-配图"则为true) 注意: - 忽略"核心互动"等前缀 - type_name 只保留最终的类型名 - 如果无法确定,返回 null 返回 JSON 格式:{"type_name": "...", "cId": "...", "has_image": false} 如果不是组件行,返回 null""" parsed, _ = llm_client.call_for_json(system_prompt, text, max_tokens=200, temperature=0) if parsed and isinstance(parsed, dict) and parsed.get("type_name") and parsed.get("cId"): result = { "type_name": str(parsed["type_name"]).strip(), "cId": str(parsed["cId"]).strip(), "has_image": bool(parsed.get("has_image", False)), } _llm_fallback_cache[text] = result logger.info(f"LLM 兜底成功: {result}") return result else: _llm_fallback_cache[text] = None return None except Exception as e: logger.warning(f"LLM 兜底失败: {e}") _llm_fallback_cache[text] = None return None def extract_section_character_map(markdown): """ 从文档 markdown 中提取 "角色-section对应" 表格。 格式示例: # 角色-section对应 S15-S16738-eva S1-S14663-EVA 653-peter ... Returns: list[dict]: [ { "sections": ["S15", "S16"], # or ["S1".."S14"], or [] for global "characters": {"Eva": 738} }, ... ] """ if not markdown: return [] # Find the "角色-section对应" section match = re.search(r'#\s*角色-section对应\s*\n(.*?)(?=\n#|$)', markdown, re.DOTALL) if not match: logger.info("文档中未找到 '角色-section对应' 章节") return [] table_html = match.group(1) # Parse table rows rows = re.findall(r'(.*?)', table_html, re.DOTALL) char_pattern = re.compile(r'(\d{2,})[-_]([A-Za-z]{2,})') section_range_pattern = re.compile(r'S(\d+)(?:\s*-\s*S(\d+))?', re.IGNORECASE) entries = [] current_sections = [] # carry forward from previous row if empty for row_html in rows: cells = re.findall(r'(.*?)', row_html, re.DOTALL) if len(cells) < 2: continue section_cell = cells[0].strip() char_cell = cells[1].strip() # Parse section range if section_cell: sections = [] for sm in section_range_pattern.finditer(section_cell): start = int(sm.group(1)) end = int(sm.group(2)) if sm.group(2) else start for s in range(start, end + 1): sections.append(f"S{s}") if sections: current_sections = sections # Parse character char_match = char_pattern.search(char_cell) if char_match: char_id = int(char_match.group(1)) char_name = char_match.group(2).capitalize() entries.append({ "sections": list(current_sections), "characters": {char_name: char_id}, }) logger.info(f"提取角色-section对应: {len(entries)} 条记录") return entries def resolve_resource_mapping(section_char_map, config_info, fallback_char_map=None): """ 根据组件的 "配置信息" 字段(如 "S1主线")和角色-section映射, 解析出该组件应使用的 resourceMapping。 Args: section_char_map: list from extract_section_character_map() config_info: str, e.g. "S1主线" fallback_char_map: dict, fallback character map from sheet header Returns: dict: {"Eva": 663, "Peter": 653, "Vicky": 658} """ if not config_info or not section_char_map: return fallback_char_map or {} # Extract section number from config_info (e.g. "S1主线" → "S1") sm = re.search(r'S(\d+)', config_info, re.IGNORECASE) if not sm: return fallback_char_map or {} section_key = f"S{sm.group(1)}" result = {} for entry in section_char_map: # If entry has no sections, it applies globally if not entry["sections"] or section_key in entry["sections"]: result.update(entry["characters"]) if not result: return fallback_char_map or {} logger.debug(f"Section {section_key} → resourceMapping: {result}") return result def extract_character_map(sheet_rows): """ 从 sheet 数据中提取角色表:NPC名称 → 角色ID 角色信息通常在表头几行中,格式如 "653-peter"、"663-EVA"、"658-Vicky" 出现在 col B(配置信息)或 col C(剧情描述)中 Returns: dict: {"Eva": 663, "Peter": 653, "Vicky": 658, ...} """ char_map = {} # 正则匹配 "数字-名字" 或 "数字_名字" 模式 # 名字至少2个字母,避免匹配 L1_S02 之类的剧本标记 pattern = re.compile(r'(\d{2,})[-_]([A-Za-z]{2,})') for row in sheet_rows[:30]: # 角色表通常在前 30 行 for cell in row: if not cell or not isinstance(cell, str): continue for match in pattern.finditer(cell): char_id = int(match.group(1)) char_name = match.group(2) # 过滤掉明显不是角色名的(纯大写短标记如 "TA") if len(char_name) <= 2 and char_name.isupper(): continue # 统一首字母大写 normalized = char_name.capitalize() if normalized not in char_map: char_map[normalized] = char_id logger.info(f"提取角色映射: {char_map}") return char_map def parse_sheet_rows(sheet_rows, llm_client=None): """ 从 sheet 二维数组中识别并提取所有组件行 Args: sheet_rows: list[list] — 来自 feishu_client.read_sheet_data llm_client: 可选 LLM 客户端,用于正则无法提取时兜底 Returns: list[dict]: [ { "row_index": int, # 原始行号(0-based) "type_name": str, # 中文组件类型名(如 "对话朗读") "cId": str, # 组件ID(如 "1214101") "has_image": bool, # 是否配图变体 "teaching_config": str, # 教研配置文本(col G) "knowledge_text": str, # 知识点文本(col H) }, ... ] """ if not sheet_rows or len(sheet_rows) < 2: return [] # 确定列索引 — 按实际表头匹配 # 实际表头: 类型 | 配置信息 | 剧情描述 | 角色名 | 编剧台词 | 英文台词 | 组件配置 | 知识点 header = sheet_rows[0] col_map = {} component_cols = [] # 可能有多个"组件配置"列 for i, cell in enumerate(header): if not cell: continue cell_str = str(cell).strip() if cell_str == "类型": col_map["type"] = i elif cell_str == "配置信息": col_map["config_info"] = i elif cell_str == "ID": col_map["id"] = i elif "组件" in cell_str and "配置" in cell_str: # "组件配置" — 教研配置文本列(可能有多个) component_cols.append(i) elif "知识点" in cell_str: col_map["knowledge"] = i type_col = col_map.get("type", 0) kp_col = col_map.get("knowledge") config_info_col = col_map.get("config_info") id_col = col_map.get("id") # 组件配置列:优先选第一个(通常包含配置文本),如有多个则全保留用于fallback comp_col = component_cols[0] if component_cols else None alt_comp_cols = component_cols[1:] if len(component_cols) > 1 else [] # 回退默认值(以防表头格式变化) if comp_col is None: logger.warning("sheet表头中未找到'组件配置'列,回退到默认列索引6") comp_col = 6 if len(header) > 6 else None if kp_col is None: logger.warning("sheet表头中未找到'知识点'列,回退到默认列索引7") kp_col = 7 if len(header) > 7 else None if config_info_col is None: logger.warning("sheet表头中未找到'配置信息'列,回退到默认列索引1") config_info_col = 1 if len(header) > 1 else None logger.info(f"列映射: 类型={type_col}, 配置信息={config_info_col}, 组件配置={comp_col}, 知识点={kp_col}, ID={id_col}, alt_comp={alt_comp_cols}") components = [] for row_idx, row in enumerate(sheet_rows[1:], start=1): # 获取类型列 type_cell = row[type_col] if len(row) > type_col else None comp_info = extract_component_id_with_llm_fallback(type_cell, llm_client) # 如果类型列没有嵌入式ID,但有独立ID列且类型名是纯中文组件名 if comp_info is None and id_col is not None: raw_cell = str(row[id_col]).strip() if len(row) > id_col and row[id_col] else "" if raw_cell and re.match(r'^\d{5,}$', raw_cell): # 从类型列提取纯类型名(可能是"图片多选"、"对话挖空"等简单名称) type_text = str(type_cell).strip() if type_cell else "" # 过滤非组件标记 base_type = type_text.split('\n')[0].strip() if '\n' in type_text else type_text non_component_markers = { "TL", "场景", "角色", "AI动画", "场景变换", "画面", "BGM", "SE", "类型", "测试类型", } if base_type and base_type not in non_component_markers: has_image = "+图片" in type_text or "-配图" in type_text clean_type = type_text.replace("+图片", "").replace("-配图", "").strip() comp_info = { "type_name": clean_type, "cId": raw_cell, "has_image": has_image, } if comp_info is None: continue # 获取教研配置(组件列 + 备选列 fallback) teaching_config = "" for col in [comp_col] + alt_comp_cols: if col is not None and len(row) > col and row[col]: teaching_config = str(row[col]).strip() break # 如果配置信息列有内容且组件配置为空,用配置信息列作为fallback if not teaching_config and config_info_col is not None: if len(row) > config_info_col and row[config_info_col]: cfg = str(row[config_info_col]).strip() # 只取包含数字ID的配置信息行(排除纯TL标记) if re.search(r'\d{5,}', cfg): teaching_config = cfg # 必须有教研配置才算有效组件行 if not teaching_config: continue # 获取知识点 knowledge_text = "" if kp_col is not None and len(row) > kp_col and row[kp_col]: knowledge_text = str(row[kp_col]).strip() # 获取配置信息(如 "S1主线") config_info = "" if config_info_col is not None and len(row) > config_info_col and row[config_info_col]: config_info = str(row[config_info_col]).strip() components.append({ "row_index": row_idx, "type_name": comp_info["type_name"], "cId": comp_info["cId"], "has_image": comp_info["has_image"], "teaching_config": teaching_config, "knowledge_text": knowledge_text, "config_info": config_info, }) logger.info(f"从 sheet 中识别到 {len(components)} 个组件行") return components def parse_script_from_sheet(sheet_rows, markdown="", llm_client=None): """ 从 sheet 数据解析完整剧本信息 Returns: dict: { "metadata": {...}, "character_map": {"Eva": 663, ...}, "section_char_map": [...], # from "角色-section对应" "components": [...], } """ metadata = {} section_char_map = [] if markdown: metadata = extract_script_metadata(markdown) section_char_map = extract_section_character_map(markdown) character_map = extract_character_map(sheet_rows) components = parse_sheet_rows(sheet_rows, llm_client=llm_client) return { "metadata": metadata, "character_map": character_map, "section_char_map": section_char_map, "components": components, } # ============ CLI ============ if __name__ == "__main__": import sys if len(sys.argv) > 1: with open(sys.argv[1], 'r') as f: markdown = f.read() result = parse_script_document(markdown) print(json.dumps(result, ensure_ascii=False, indent=2)) else: # 测试用硬编码的markdown片段 test_md = """ 类型 知识点 1 对话朗读 school 1 2 对话挖空 school 2 3 听力选择 school 3 study 2 """ result = parse_script_document(test_md) print(json.dumps(result, ensure_ascii=False, indent=2))