#!/usr/bin/env python3 """ 知识点覆盖率分析 + 回填脚本 用法: python3 knowledge_coverage_analyzer.py <飞书文档ID或URL> 功能: 1. 解析剧本主表,区分 TL 行(输入)与组件行(输出) 2. 读取「知识点现状」表格中的单词和句型 3. 统计每个知识点在输入/输出中的出现次数 4. 回填到「知识点现状」表格 5. 输出用法质量检查报告 依赖: lark-cli(用于读取文档 markdown),Python3(用于 API 回填) """ import json import re import subprocess import sys import time import urllib.request from pathlib import Path # ── 配置 ────────────────────────────────────────────── CONFIG_PATH = Path("/root/.openclaw/credentials/xiaoyan/config.json") LARK_CLI_ENV = { "LARKSUITE_CLI_CONFIG_DIR": "/root/.openclaw/credentials/xiaoyan", "PATH": "/root/.nvm/versions/node/v24.14.0/bin:" + "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", } # ── 工具函数 ────────────────────────────────────────── def parse_doc_id(url_or_id): """从 URL 或直接 ID 中提取文档 ID""" m = re.search(r"docx/([A-Za-z0-9]+)", url_or_id) if m: return m.group(1) return url_or_id.strip() def fetch_markdown(doc_id): """用 lark-cli 获取文档 markdown""" result = subprocess.run( ["/root/.nvm/versions/node/v24.14.0/bin/lark-cli", "docs", "+fetch", "--doc", doc_id, "--as", "bot"], capture_output=True, text=True, env=LARK_CLI_ENV, timeout=30, ) data = json.loads(result.stdout) return data["data"]["markdown"] def get_token(): """获取 tenant_access_token""" with open(CONFIG_PATH) as f: cfg = json.load(f) app = cfg["apps"][0] body = json.dumps({"app_id": app["appId"], "app_secret": app["appSecret"]}).encode() req = urllib.request.Request( "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", data=body, headers={"Content-Type": "application/json"}, ) resp = json.loads(urllib.request.urlopen(req).read()) return resp["tenant_access_token"] def api_get(token, path): """GET 请求飞书 API""" req = urllib.request.Request( f"https://open.feishu.cn/open-apis{path}", headers={"Authorization": f"Bearer {token}"}, ) return json.loads(urllib.request.urlopen(req).read()) def api_patch(token, path, body): """PATCH 请求飞书 API""" data = json.dumps(body).encode() req = urllib.request.Request( f"https://open.feishu.cn/open-apis{path}", data=data, headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, method="PATCH", ) return json.loads(urllib.request.urlopen(req).read()) def api_get_all_children(token, doc_id, block_id): """分页获取 block 的所有子块""" all_items = [] page_token = None while True: path = f"/docx/v1/documents/{doc_id}/blocks/{block_id}/children?page_size=50" if page_token: path += f"&page_token={page_token}" resp = api_get(token, path) items = resp.get("data", {}).get("items", []) all_items.extend(items) if not resp.get("data", {}).get("has_more"): break page_token = resp["data"]["page_token"] return all_items # ── Markdown 解析 ───────────────────────────────────── def parse_markdown_table(md, heading_text): """找到标题下方的 lark-table,返回二维数组 [row][col]""" idx = md.find(heading_text) if idx == -1: return None table_match = re.search(r"", md[idx:], re.DOTALL) if not table_match: return None table_md = table_match.group(0) rows = re.findall(r"(.*?)", table_md, re.DOTALL) result = [] for row in rows: cells = re.findall(r"]*>(.*?)", row, re.DOTALL) clean_cells = [] for c in cells: # 去掉所有 HTML 标签,提取纯文本 text = re.sub(r"<[^>]+>|\*\*|\{[^}]*\}", "", c).strip() clean_cells.append(text) result.append(clean_cells) return result def find_table_by_first_cell_md(md, first_cell_text): """在 markdown 中找到第一列第一行为指定文本的表格""" tables = re.findall(r"", md, re.DOTALL) for table_md in tables: rows = re.findall(r"(.*?)", table_md, re.DOTALL) if not rows: continue cells = re.findall(r"]*>(.*?)", rows[0], re.DOTALL) if cells: text = re.sub(r"<[^>]+>|\*\*|\{[^}]*\}", "", cells[0]).strip() if text == first_cell_text: return parse_markdown_table_from_string(table_md) return None def parse_markdown_table_from_string(table_md): """从 lark-table markdown 字符串解析为二维数组""" rows = re.findall(r"(.*?)", table_md, re.DOTALL) result = [] for row in rows: cells = re.findall(r"]*>(.*?)", row, re.DOTALL) clean_cells = [re.sub(r"<[^>]+>|\*\*|\{[^}]*\}", "", c).strip() for c in cells] result.append(clean_cells) return result # ── 统计逻辑 ────────────────────────────────────────── def count_occurrences(word, texts): """统计单词在文本列表中的出现次数(不区分大小写,整词匹配)""" if not word: return 0 count = 0 pattern = re.compile(r"\b" + re.escape(word) + r"\b", re.IGNORECASE) for t in texts: count += len(pattern.findall(t)) return count def count_pattern_occurrences(pattern_text, texts): """统计句型在文本列表中的出现次数(子串匹配,不区分大小写)""" if not pattern_text: return 0 key = pattern_text.strip().rstrip(".").rstrip("!").rstrip("?") key_lower = key.lower() count = 0 for t in texts: count += t.lower().count(key_lower) return count # ── 回填逻辑(使用 Block API)───────────────────────── def find_table_block_id(token, doc_id, heading_text): """找到标题下方的第一个表格 block_id""" blocks = api_get(token, f"/docx/v1/documents/{doc_id}/blocks?page_size=500") items = blocks.get("data", {}).get("items", []) heading_index = -1 for i, b in enumerate(items): text = "" for e in b.get("text", {}).get("elements", []): text += e.get("text_run", {}).get("content", "") if text == heading_text: heading_index = i break if heading_index == -1: return None for b in items[heading_index + 1:]: if b.get("block_type") == 31: return b["block_id"] return None def get_cell_block_ids(token, doc_id, table_block_id): """ 获取表格中所有单元格的 block_id,返回二维数组 [row][col] 飞书表格结构:table > row > cell,每个 row 只有一个 cell 需要按列数分组。 """ rows = api_get_all_children(token, doc_id, table_block_id) all_cells = [] for row in rows: try: cell_items = api_get_all_children(token, doc_id, row["block_id"]) except Exception: continue if cell_items: all_cells.append(cell_items[0]["block_id"]) return all_cells def fill_cells(token, doc_id, cell_ids, col_count, knowledge_items, tl_rows, comp_rows): """回填统计结果到表格""" ok = 0 fail = 0 for item_idx, item in enumerate(knowledge_items): word = item.get("word", "") pattern = item.get("pattern", "") # 计算该行在 cell_ids 中的起始位置 # 跳过表头行 row_start = (item_idx + 1) * col_count if word: word_only = word.split()[0] if word else "" w_in = count_occurrences(word_only, tl_rows) w_out = count_occurrences(word_only, comp_rows) # col 1 = 输入(单词) if row_start + 1 < len(cell_ids): ok += patch_cell(token, doc_id, cell_ids[row_start + 1], str(w_in)) time.sleep(0.3) else: fail += 1 # col 2 = 输出(单词) if row_start + 2 < len(cell_ids): ok += patch_cell(token, doc_id, cell_ids[row_start + 2], str(w_out)) time.sleep(0.3) else: fail += 1 if pattern: p_in = count_pattern_occurrences(pattern, tl_rows) p_out = count_pattern_occurrences(pattern, comp_rows) # col 4 = 输入(句型) if row_start + 4 < len(cell_ids): ok += patch_cell(token, doc_id, cell_ids[row_start + 4], str(p_in)) time.sleep(0.3) else: fail += 1 # col 5 = 输出(句型) if row_start + 5 < len(cell_ids): ok += patch_cell(token, doc_id, cell_ids[row_start + 5], str(p_out)) time.sleep(0.3) else: fail += 1 print(f" 回填完成: {ok} 成功, {fail} 失败") return ok, fail def patch_cell(token, doc_id, block_id, content): """更新单个文本块的内容""" body = { "update_text_elements": { "elements": [{"text_run": {"content": content}}] } } try: resp = api_patch(token, f"/docx/v1/documents/{doc_id}/blocks/{block_id}", body) return resp.get("code") == 0 except Exception as e: print(f" ❌ 回填失败 {block_id}: {e}") return False # ── 报告 ────────────────────────────────────────────── def print_report(knowledge_items, tl_rows, comp_rows): """打印统计报告""" print("\n" + "=" * 60) print("📊 知识点覆盖率统计") print("=" * 60) print(f"\nTL 行(输入): {len(tl_rows)} 行") print(f"组件行(输出): {len(comp_rows)} 行") print(f"\n{'知识点':<30} {'输入':>4} {'输出':>4}") print("-" * 42) for item in knowledge_items: word = item.get("word", "") pattern = item.get("pattern", "") if word: word_only = word.split()[0] w_in = count_occurrences(word_only, tl_rows) w_out = count_occurrences(word_only, comp_rows) print(f"{word:<30} {w_in:>4} {w_out:>4}") if pattern: p_in = count_pattern_occurrences(pattern, tl_rows) p_out = count_pattern_occurrences(pattern, comp_rows) label = pattern[:28] + ".." if len(pattern) > 30 else pattern print(f" {label:<28} {p_in:>4} {p_out:>4}") # 质量检查 print("\n" + "=" * 60) print("🔍 用法质量快速检查") print("=" * 60) for item in knowledge_items: word = item.get("word", "") if not word: continue word_only = word.split()[0] w_out = count_occurrences(word_only, comp_rows) w_total = count_occurrences(word_only, tl_rows) + w_out if w_total < 2: print(f"\n⚠️ {word} — 仅出现 {w_total} 次,建议增加曝光") elif w_total > 8: print(f"\n💡 {word} — 出现 {w_total} 次,可适当精简") # ── 主入口 ──────────────────────────────────────────── def main(): if len(sys.argv) < 2: print("用法: python3 knowledge_coverage_analyzer.py <飞书文档ID或URL>") sys.exit(1) doc_id = parse_doc_id(sys.argv[1]) print(f"📄 文档 ID: {doc_id}") # Step 1: 用 lark-cli 获取 markdown print("📥 获取文档内容...") md = fetch_markdown(doc_id) # Step 2: 解析知识点现状表格 print("📋 解析「知识点现状」表格...") knowledge_data = parse_markdown_table(md, "知识点现状") if not knowledge_data: print("❌ 未找到「知识点现状」表格") sys.exit(1) # 表头: 单词, 输入, 输出, 句型, 输入, 输出 knowledge_items = [] for row in knowledge_data[1:]: # 跳过表头 if len(row) < 6: continue word = row[0] # 清理 markdown 格式:去掉列表符号 word = re.sub(r'^[-*+]\s+', '', word).strip() pattern = row[3].strip() if not word and not pattern: continue knowledge_items.append({"word": word, "pattern": pattern}) print(f" 知识点: {len(knowledge_items)} 条") for item in knowledge_items: w = item["word"] or "(无)" p = item["pattern"] or "(无)" print(f" - {w} | {p}") # Step 3: 解析剧本主表 print("\n📋 解析剧本主表...") script_data = find_table_by_first_cell_md(md, "类型") if not script_data: print("❌ 未找到剧本主表(第一列第一行为「类型」)") sys.exit(1) tl_rows = [] comp_rows = [] last_was_tl = True for row in script_data[1:]: # 跳过表头 if not row: continue type_text = row[0].strip() # 排除「知识点」列(第6列,index 5)避免单词自计数 row_text = " ".join(row[1:5] + row[6:] if len(row) > 6 else row[1:5]) # 分类:类型列明确标记为 TL → 输入; # 类型列包含交互关键词 → 输出; # 空类型 → 继承上一行 if type_text.startswith("TL"): tl_rows.append(row_text) last_was_tl = True elif not type_text: if last_was_tl: tl_rows.append(row_text) else: comp_rows.append(row_text) elif any(kw in type_text for kw in ("对话朗读", "对话挖空", "剧情任务", "对话", "挖空", "互动")): comp_rows.append(row_text) last_was_tl = False else: # 其他(场景描述等)归为 TL tl_rows.append(row_text) last_was_tl = True print(f" TL 行: {len(tl_rows)}, 组件行: {len(comp_rows)}") # Step 4: 获取 token + 回填 print("\n🔑 获取 API Token...") token = get_token() print("📝 查找表格 block ID...") knowledge_table_id = find_table_block_id(token, doc_id, "知识点现状") if not knowledge_table_id: print("❌ 未找到「知识点现状」表格 block") sys.exit(1) cell_ids = get_cell_block_ids(token, doc_id, knowledge_table_id) col_count = 6 # 知识点现状表格固定6列 print(f" 表格 block: {knowledge_table_id}, 共 {len(cell_ids)} 个单元格") print("\n📝 回填统计结果...") fill_cells(token, doc_id, cell_ids, col_count, knowledge_items, tl_rows, comp_rows) # Step 5: 打印报告 print_report(knowledge_items, tl_rows, comp_rows) print("\n✅ 完成!") if __name__ == "__main__": main()