ai_member_xiaoyan/scripts/knowledge_coverage_analyzer.py

448 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
知识点覆盖率分析 + 回填脚本
用法: python3 knowledge_coverage_analyzer.py <飞书文档ID或URL>
功能:
1. 解析剧本主表,区分 TL 行(输入)与组件行(输出)
2. 读取「知识点现状」表格中的单词和句型
3. 统计每个知识点在输入/输出中的出现次数
4. 回填到「知识点现状」表格
5. 输出用法质量检查报告
依赖: lark-cli用于读取文档 markdownPython3用于 API 回填)
"""
import json
import re
import subprocess
import sys
import time
import urllib.request
from pathlib import Path
# ── 配置 ──────────────────────────────────────────────
CONFIG_PATH = Path("/root/.openclaw/credentials/xiaoyan/config.json")
LARK_CLI_ENV = {
"LARKSUITE_CLI_CONFIG_DIR": "/root/.openclaw/credentials/xiaoyan",
"PATH": "/root/.nvm/versions/node/v24.14.0/bin:" + "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
}
# ── 工具函数 ──────────────────────────────────────────
def parse_doc_id(url_or_id):
"""从 URL 或直接 ID 中提取文档 ID"""
m = re.search(r"docx/([A-Za-z0-9]+)", url_or_id)
if m:
return m.group(1)
return url_or_id.strip()
def fetch_markdown(doc_id):
"""用 lark-cli 获取文档 markdown"""
result = subprocess.run(
["/root/.nvm/versions/node/v24.14.0/bin/lark-cli", "docs", "+fetch", "--doc", doc_id, "--as", "bot"],
capture_output=True, text=True, env=LARK_CLI_ENV, timeout=30,
)
data = json.loads(result.stdout)
return data["data"]["markdown"]
def get_token():
"""获取 tenant_access_token"""
with open(CONFIG_PATH) as f:
cfg = json.load(f)
app = cfg["apps"][0]
body = json.dumps({"app_id": app["appId"], "app_secret": app["appSecret"]}).encode()
req = urllib.request.Request(
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
data=body,
headers={"Content-Type": "application/json"},
)
resp = json.loads(urllib.request.urlopen(req).read())
return resp["tenant_access_token"]
def api_get(token, path):
"""GET 请求飞书 API"""
req = urllib.request.Request(
f"https://open.feishu.cn/open-apis{path}",
headers={"Authorization": f"Bearer {token}"},
)
return json.loads(urllib.request.urlopen(req).read())
def api_patch(token, path, body):
"""PATCH 请求飞书 API"""
data = json.dumps(body).encode()
req = urllib.request.Request(
f"https://open.feishu.cn/open-apis{path}",
data=data,
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
method="PATCH",
)
return json.loads(urllib.request.urlopen(req).read())
def api_get_all_children(token, doc_id, block_id):
"""分页获取 block 的所有子块"""
all_items = []
page_token = None
while True:
path = f"/docx/v1/documents/{doc_id}/blocks/{block_id}/children?page_size=50"
if page_token:
path += f"&page_token={page_token}"
resp = api_get(token, path)
items = resp.get("data", {}).get("items", [])
all_items.extend(items)
if not resp.get("data", {}).get("has_more"):
break
page_token = resp["data"]["page_token"]
return all_items
# ── Markdown 解析 ─────────────────────────────────────
def parse_markdown_table(md, heading_text):
"""找到标题下方的 lark-table返回二维数组 [row][col]"""
idx = md.find(heading_text)
if idx == -1:
return None
table_match = re.search(r"<lark-table.*?</lark-table>", md[idx:], re.DOTALL)
if not table_match:
return None
table_md = table_match.group(0)
rows = re.findall(r"<lark-tr>(.*?)</lark-tr>", table_md, re.DOTALL)
result = []
for row in rows:
cells = re.findall(r"<lark-td[^>]*>(.*?)</lark-td>", row, re.DOTALL)
clean_cells = []
for c in cells:
# 去掉所有 HTML 标签,提取纯文本
text = re.sub(r"<[^>]+>|\*\*|\{[^}]*\}", "", c).strip()
clean_cells.append(text)
result.append(clean_cells)
return result
def find_table_by_first_cell_md(md, first_cell_text):
"""在 markdown 中找到第一列第一行为指定文本的表格"""
tables = re.findall(r"<lark-table.*?</lark-table>", md, re.DOTALL)
for table_md in tables:
rows = re.findall(r"<lark-tr>(.*?)</lark-tr>", table_md, re.DOTALL)
if not rows:
continue
cells = re.findall(r"<lark-td[^>]*>(.*?)</lark-td>", rows[0], re.DOTALL)
if cells:
text = re.sub(r"<[^>]+>|\*\*|\{[^}]*\}", "", cells[0]).strip()
if text == first_cell_text:
return parse_markdown_table_from_string(table_md)
return None
def parse_markdown_table_from_string(table_md):
"""从 lark-table markdown 字符串解析为二维数组"""
rows = re.findall(r"<lark-tr>(.*?)</lark-tr>", table_md, re.DOTALL)
result = []
for row in rows:
cells = re.findall(r"<lark-td[^>]*>(.*?)</lark-td>", row, re.DOTALL)
clean_cells = [re.sub(r"<[^>]+>|\*\*|\{[^}]*\}", "", c).strip() for c in cells]
result.append(clean_cells)
return result
# ── 统计逻辑 ──────────────────────────────────────────
def count_occurrences(word, texts):
"""统计单词在文本列表中的出现次数(不区分大小写,整词匹配)"""
if not word:
return 0
count = 0
pattern = re.compile(r"\b" + re.escape(word) + r"\b", re.IGNORECASE)
for t in texts:
count += len(pattern.findall(t))
return count
def count_pattern_occurrences(pattern_text, texts):
"""统计句型在文本列表中的出现次数(子串匹配,不区分大小写)"""
if not pattern_text:
return 0
key = pattern_text.strip().rstrip(".").rstrip("!").rstrip("?")
key_lower = key.lower()
count = 0
for t in texts:
count += t.lower().count(key_lower)
return count
# ── 回填逻辑(使用 Block API─────────────────────────
def find_table_block_id(token, doc_id, heading_text):
"""找到标题下方的第一个表格 block_id"""
blocks = api_get(token, f"/docx/v1/documents/{doc_id}/blocks?page_size=500")
items = blocks.get("data", {}).get("items", [])
heading_index = -1
for i, b in enumerate(items):
text = ""
for e in b.get("text", {}).get("elements", []):
text += e.get("text_run", {}).get("content", "")
if text == heading_text:
heading_index = i
break
if heading_index == -1:
return None
for b in items[heading_index + 1:]:
if b.get("block_type") == 31:
return b["block_id"]
return None
def get_cell_block_ids(token, doc_id, table_block_id):
"""
获取表格中所有单元格的 block_id返回二维数组 [row][col]
飞书表格结构table > row > cell每个 row 只有一个 cell
需要按列数分组。
"""
rows = api_get_all_children(token, doc_id, table_block_id)
all_cells = []
for row in rows:
try:
cell_items = api_get_all_children(token, doc_id, row["block_id"])
except Exception:
continue
if cell_items:
all_cells.append(cell_items[0]["block_id"])
return all_cells
def fill_cells(token, doc_id, cell_ids, col_count, knowledge_items, tl_rows, comp_rows):
"""回填统计结果到表格"""
ok = 0
fail = 0
for item_idx, item in enumerate(knowledge_items):
word = item.get("word", "")
pattern = item.get("pattern", "")
# 计算该行在 cell_ids 中的起始位置
# 跳过表头行
row_start = (item_idx + 1) * col_count
if word:
word_only = word.split()[0] if word else ""
w_in = count_occurrences(word_only, tl_rows)
w_out = count_occurrences(word_only, comp_rows)
# col 1 = 输入(单词)
if row_start + 1 < len(cell_ids):
ok += patch_cell(token, doc_id, cell_ids[row_start + 1], str(w_in))
time.sleep(0.3)
else:
fail += 1
# col 2 = 输出(单词)
if row_start + 2 < len(cell_ids):
ok += patch_cell(token, doc_id, cell_ids[row_start + 2], str(w_out))
time.sleep(0.3)
else:
fail += 1
if pattern:
p_in = count_pattern_occurrences(pattern, tl_rows)
p_out = count_pattern_occurrences(pattern, comp_rows)
# col 4 = 输入(句型)
if row_start + 4 < len(cell_ids):
ok += patch_cell(token, doc_id, cell_ids[row_start + 4], str(p_in))
time.sleep(0.3)
else:
fail += 1
# col 5 = 输出(句型)
if row_start + 5 < len(cell_ids):
ok += patch_cell(token, doc_id, cell_ids[row_start + 5], str(p_out))
time.sleep(0.3)
else:
fail += 1
print(f" 回填完成: {ok} 成功, {fail} 失败")
return ok, fail
def patch_cell(token, doc_id, block_id, content):
"""更新单个文本块的内容"""
body = {
"update_text_elements": {
"elements": [{"text_run": {"content": content}}]
}
}
try:
resp = api_patch(token, f"/docx/v1/documents/{doc_id}/blocks/{block_id}", body)
return resp.get("code") == 0
except Exception as e:
print(f" ❌ 回填失败 {block_id}: {e}")
return False
# ── 报告 ──────────────────────────────────────────────
def print_report(knowledge_items, tl_rows, comp_rows):
"""打印统计报告"""
print("\n" + "=" * 60)
print("📊 知识点覆盖率统计")
print("=" * 60)
print(f"\nTL 行(输入): {len(tl_rows)}")
print(f"组件行(输出): {len(comp_rows)}")
print(f"\n{'知识点':<30} {'输入':>4} {'输出':>4}")
print("-" * 42)
for item in knowledge_items:
word = item.get("word", "")
pattern = item.get("pattern", "")
if word:
word_only = word.split()[0]
w_in = count_occurrences(word_only, tl_rows)
w_out = count_occurrences(word_only, comp_rows)
print(f"{word:<30} {w_in:>4} {w_out:>4}")
if pattern:
p_in = count_pattern_occurrences(pattern, tl_rows)
p_out = count_pattern_occurrences(pattern, comp_rows)
label = pattern[:28] + ".." if len(pattern) > 30 else pattern
print(f" {label:<28} {p_in:>4} {p_out:>4}")
# 质量检查
print("\n" + "=" * 60)
print("🔍 用法质量快速检查")
print("=" * 60)
for item in knowledge_items:
word = item.get("word", "")
if not word:
continue
word_only = word.split()[0]
w_out = count_occurrences(word_only, comp_rows)
w_total = count_occurrences(word_only, tl_rows) + w_out
if w_total < 2:
print(f"\n⚠️ {word} — 仅出现 {w_total} 次,建议增加曝光")
elif w_total > 8:
print(f"\n💡 {word} — 出现 {w_total} 次,可适当精简")
# ── 主入口 ────────────────────────────────────────────
def main():
if len(sys.argv) < 2:
print("用法: python3 knowledge_coverage_analyzer.py <飞书文档ID或URL>")
sys.exit(1)
doc_id = parse_doc_id(sys.argv[1])
print(f"📄 文档 ID: {doc_id}")
# Step 1: 用 lark-cli 获取 markdown
print("📥 获取文档内容...")
md = fetch_markdown(doc_id)
# Step 2: 解析知识点现状表格
print("📋 解析「知识点现状」表格...")
knowledge_data = parse_markdown_table(md, "知识点现状")
if not knowledge_data:
print("❌ 未找到「知识点现状」表格")
sys.exit(1)
# 表头: 单词, 输入, 输出, 句型, 输入, 输出
knowledge_items = []
for row in knowledge_data[1:]: # 跳过表头
if len(row) < 6:
continue
word = row[0]
# 清理 markdown 格式:去掉列表符号
word = re.sub(r'^[-*+]\s+', '', word).strip()
pattern = row[3].strip()
if not word and not pattern:
continue
knowledge_items.append({"word": word, "pattern": pattern})
print(f" 知识点: {len(knowledge_items)}")
for item in knowledge_items:
w = item["word"] or "(无)"
p = item["pattern"] or "(无)"
print(f" - {w} | {p}")
# Step 3: 解析剧本主表
print("\n📋 解析剧本主表...")
script_data = find_table_by_first_cell_md(md, "类型")
if not script_data:
print("❌ 未找到剧本主表(第一列第一行为「类型」)")
sys.exit(1)
tl_rows = []
comp_rows = []
last_was_tl = True
for row in script_data[1:]: # 跳过表头
if not row:
continue
type_text = row[0].strip()
# 排除「知识点」列第6列index 5避免单词自计数
row_text = " ".join(row[1:5] + row[6:] if len(row) > 6 else row[1:5])
# 分类:类型列明确标记为 TL → 输入;
# 类型列包含交互关键词 → 输出;
# 空类型 → 继承上一行
if type_text.startswith("TL"):
tl_rows.append(row_text)
last_was_tl = True
elif not type_text:
if last_was_tl:
tl_rows.append(row_text)
else:
comp_rows.append(row_text)
elif any(kw in type_text for kw in ("对话朗读", "对话挖空", "剧情任务", "对话", "挖空", "互动")):
comp_rows.append(row_text)
last_was_tl = False
else:
# 其他(场景描述等)归为 TL
tl_rows.append(row_text)
last_was_tl = True
print(f" TL 行: {len(tl_rows)}, 组件行: {len(comp_rows)}")
# Step 4: 获取 token + 回填
print("\n🔑 获取 API Token...")
token = get_token()
print("📝 查找表格 block ID...")
knowledge_table_id = find_table_block_id(token, doc_id, "知识点现状")
if not knowledge_table_id:
print("❌ 未找到「知识点现状」表格 block")
sys.exit(1)
cell_ids = get_cell_block_ids(token, doc_id, knowledge_table_id)
col_count = 6 # 知识点现状表格固定6列
print(f" 表格 block: {knowledge_table_id}, 共 {len(cell_ids)} 个单元格")
print("\n📝 回填统计结果...")
fill_cells(token, doc_id, cell_ids, col_count, knowledge_items, tl_rows, comp_rows)
# Step 5: 打印报告
print_report(knowledge_items, tl_rows, comp_rows)
print("\n✅ 完成!")
if __name__ == "__main__":
main()