448 lines
15 KiB
Python
448 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
知识点覆盖率分析 + 回填脚本
|
||
用法: python3 knowledge_coverage_analyzer.py <飞书文档ID或URL>
|
||
|
||
功能:
|
||
1. 解析剧本主表,区分 TL 行(输入)与组件行(输出)
|
||
2. 读取「知识点现状」表格中的单词和句型
|
||
3. 统计每个知识点在输入/输出中的出现次数
|
||
4. 回填到「知识点现状」表格
|
||
5. 输出用法质量检查报告
|
||
|
||
依赖: lark-cli(用于读取文档 markdown),Python3(用于 API 回填)
|
||
"""
|
||
|
||
import json
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import urllib.request
|
||
from pathlib import Path
|
||
|
||
# ── 配置 ──────────────────────────────────────────────
|
||
CONFIG_PATH = Path("/root/.openclaw/credentials/xiaoyan/config.json")
|
||
LARK_CLI_ENV = {
|
||
"LARKSUITE_CLI_CONFIG_DIR": "/root/.openclaw/credentials/xiaoyan",
|
||
"PATH": "/root/.nvm/versions/node/v24.14.0/bin:" + "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin",
|
||
}
|
||
|
||
|
||
# ── 工具函数 ──────────────────────────────────────────
|
||
|
||
def parse_doc_id(url_or_id):
|
||
"""从 URL 或直接 ID 中提取文档 ID"""
|
||
m = re.search(r"docx/([A-Za-z0-9]+)", url_or_id)
|
||
if m:
|
||
return m.group(1)
|
||
return url_or_id.strip()
|
||
|
||
|
||
def fetch_markdown(doc_id):
|
||
"""用 lark-cli 获取文档 markdown"""
|
||
result = subprocess.run(
|
||
["/root/.nvm/versions/node/v24.14.0/bin/lark-cli", "docs", "+fetch", "--doc", doc_id, "--as", "bot"],
|
||
capture_output=True, text=True, env=LARK_CLI_ENV, timeout=30,
|
||
)
|
||
data = json.loads(result.stdout)
|
||
return data["data"]["markdown"]
|
||
|
||
|
||
def get_token():
|
||
"""获取 tenant_access_token"""
|
||
with open(CONFIG_PATH) as f:
|
||
cfg = json.load(f)
|
||
app = cfg["apps"][0]
|
||
body = json.dumps({"app_id": app["appId"], "app_secret": app["appSecret"]}).encode()
|
||
req = urllib.request.Request(
|
||
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
|
||
data=body,
|
||
headers={"Content-Type": "application/json"},
|
||
)
|
||
resp = json.loads(urllib.request.urlopen(req).read())
|
||
return resp["tenant_access_token"]
|
||
|
||
|
||
def api_get(token, path):
|
||
"""GET 请求飞书 API"""
|
||
req = urllib.request.Request(
|
||
f"https://open.feishu.cn/open-apis{path}",
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
)
|
||
return json.loads(urllib.request.urlopen(req).read())
|
||
|
||
|
||
def api_patch(token, path, body):
|
||
"""PATCH 请求飞书 API"""
|
||
data = json.dumps(body).encode()
|
||
req = urllib.request.Request(
|
||
f"https://open.feishu.cn/open-apis{path}",
|
||
data=data,
|
||
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
|
||
method="PATCH",
|
||
)
|
||
return json.loads(urllib.request.urlopen(req).read())
|
||
|
||
|
||
def api_get_all_children(token, doc_id, block_id):
|
||
"""分页获取 block 的所有子块"""
|
||
all_items = []
|
||
page_token = None
|
||
while True:
|
||
path = f"/docx/v1/documents/{doc_id}/blocks/{block_id}/children?page_size=50"
|
||
if page_token:
|
||
path += f"&page_token={page_token}"
|
||
resp = api_get(token, path)
|
||
items = resp.get("data", {}).get("items", [])
|
||
all_items.extend(items)
|
||
if not resp.get("data", {}).get("has_more"):
|
||
break
|
||
page_token = resp["data"]["page_token"]
|
||
return all_items
|
||
|
||
|
||
# ── Markdown 解析 ─────────────────────────────────────
|
||
|
||
def parse_markdown_table(md, heading_text):
|
||
"""找到标题下方的 lark-table,返回二维数组 [row][col]"""
|
||
idx = md.find(heading_text)
|
||
if idx == -1:
|
||
return None
|
||
|
||
table_match = re.search(r"<lark-table.*?</lark-table>", md[idx:], re.DOTALL)
|
||
if not table_match:
|
||
return None
|
||
|
||
table_md = table_match.group(0)
|
||
rows = re.findall(r"<lark-tr>(.*?)</lark-tr>", table_md, re.DOTALL)
|
||
|
||
result = []
|
||
for row in rows:
|
||
cells = re.findall(r"<lark-td[^>]*>(.*?)</lark-td>", row, re.DOTALL)
|
||
clean_cells = []
|
||
for c in cells:
|
||
# 去掉所有 HTML 标签,提取纯文本
|
||
text = re.sub(r"<[^>]+>|\*\*|\{[^}]*\}", "", c).strip()
|
||
clean_cells.append(text)
|
||
result.append(clean_cells)
|
||
|
||
return result
|
||
|
||
|
||
def find_table_by_first_cell_md(md, first_cell_text):
|
||
"""在 markdown 中找到第一列第一行为指定文本的表格"""
|
||
tables = re.findall(r"<lark-table.*?</lark-table>", md, re.DOTALL)
|
||
for table_md in tables:
|
||
rows = re.findall(r"<lark-tr>(.*?)</lark-tr>", table_md, re.DOTALL)
|
||
if not rows:
|
||
continue
|
||
cells = re.findall(r"<lark-td[^>]*>(.*?)</lark-td>", rows[0], re.DOTALL)
|
||
if cells:
|
||
text = re.sub(r"<[^>]+>|\*\*|\{[^}]*\}", "", cells[0]).strip()
|
||
if text == first_cell_text:
|
||
return parse_markdown_table_from_string(table_md)
|
||
return None
|
||
|
||
|
||
def parse_markdown_table_from_string(table_md):
|
||
"""从 lark-table markdown 字符串解析为二维数组"""
|
||
rows = re.findall(r"<lark-tr>(.*?)</lark-tr>", table_md, re.DOTALL)
|
||
result = []
|
||
for row in rows:
|
||
cells = re.findall(r"<lark-td[^>]*>(.*?)</lark-td>", row, re.DOTALL)
|
||
clean_cells = [re.sub(r"<[^>]+>|\*\*|\{[^}]*\}", "", c).strip() for c in cells]
|
||
result.append(clean_cells)
|
||
return result
|
||
|
||
|
||
# ── 统计逻辑 ──────────────────────────────────────────
|
||
|
||
def count_occurrences(word, texts):
|
||
"""统计单词在文本列表中的出现次数(不区分大小写,整词匹配)"""
|
||
if not word:
|
||
return 0
|
||
count = 0
|
||
pattern = re.compile(r"\b" + re.escape(word) + r"\b", re.IGNORECASE)
|
||
for t in texts:
|
||
count += len(pattern.findall(t))
|
||
return count
|
||
|
||
|
||
def count_pattern_occurrences(pattern_text, texts):
|
||
"""统计句型在文本列表中的出现次数(子串匹配,不区分大小写)"""
|
||
if not pattern_text:
|
||
return 0
|
||
key = pattern_text.strip().rstrip(".").rstrip("!").rstrip("?")
|
||
key_lower = key.lower()
|
||
count = 0
|
||
for t in texts:
|
||
count += t.lower().count(key_lower)
|
||
return count
|
||
|
||
|
||
# ── 回填逻辑(使用 Block API)─────────────────────────
|
||
|
||
def find_table_block_id(token, doc_id, heading_text):
|
||
"""找到标题下方的第一个表格 block_id"""
|
||
blocks = api_get(token, f"/docx/v1/documents/{doc_id}/blocks?page_size=500")
|
||
items = blocks.get("data", {}).get("items", [])
|
||
|
||
heading_index = -1
|
||
for i, b in enumerate(items):
|
||
text = ""
|
||
for e in b.get("text", {}).get("elements", []):
|
||
text += e.get("text_run", {}).get("content", "")
|
||
if text == heading_text:
|
||
heading_index = i
|
||
break
|
||
|
||
if heading_index == -1:
|
||
return None
|
||
|
||
for b in items[heading_index + 1:]:
|
||
if b.get("block_type") == 31:
|
||
return b["block_id"]
|
||
|
||
return None
|
||
|
||
|
||
def get_cell_block_ids(token, doc_id, table_block_id):
|
||
"""
|
||
获取表格中所有单元格的 block_id,返回二维数组 [row][col]
|
||
飞书表格结构:table > row > cell,每个 row 只有一个 cell
|
||
需要按列数分组。
|
||
"""
|
||
rows = api_get_all_children(token, doc_id, table_block_id)
|
||
|
||
all_cells = []
|
||
for row in rows:
|
||
try:
|
||
cell_items = api_get_all_children(token, doc_id, row["block_id"])
|
||
except Exception:
|
||
continue
|
||
if cell_items:
|
||
all_cells.append(cell_items[0]["block_id"])
|
||
|
||
return all_cells
|
||
|
||
|
||
def fill_cells(token, doc_id, cell_ids, col_count, knowledge_items, tl_rows, comp_rows):
|
||
"""回填统计结果到表格"""
|
||
ok = 0
|
||
fail = 0
|
||
|
||
for item_idx, item in enumerate(knowledge_items):
|
||
word = item.get("word", "")
|
||
pattern = item.get("pattern", "")
|
||
|
||
# 计算该行在 cell_ids 中的起始位置
|
||
# 跳过表头行
|
||
row_start = (item_idx + 1) * col_count
|
||
|
||
if word:
|
||
word_only = word.split()[0] if word else ""
|
||
w_in = count_occurrences(word_only, tl_rows)
|
||
w_out = count_occurrences(word_only, comp_rows)
|
||
|
||
# col 1 = 输入(单词)
|
||
if row_start + 1 < len(cell_ids):
|
||
ok += patch_cell(token, doc_id, cell_ids[row_start + 1], str(w_in))
|
||
time.sleep(0.3)
|
||
else:
|
||
fail += 1
|
||
# col 2 = 输出(单词)
|
||
if row_start + 2 < len(cell_ids):
|
||
ok += patch_cell(token, doc_id, cell_ids[row_start + 2], str(w_out))
|
||
time.sleep(0.3)
|
||
else:
|
||
fail += 1
|
||
|
||
if pattern:
|
||
p_in = count_pattern_occurrences(pattern, tl_rows)
|
||
p_out = count_pattern_occurrences(pattern, comp_rows)
|
||
|
||
# col 4 = 输入(句型)
|
||
if row_start + 4 < len(cell_ids):
|
||
ok += patch_cell(token, doc_id, cell_ids[row_start + 4], str(p_in))
|
||
time.sleep(0.3)
|
||
else:
|
||
fail += 1
|
||
# col 5 = 输出(句型)
|
||
if row_start + 5 < len(cell_ids):
|
||
ok += patch_cell(token, doc_id, cell_ids[row_start + 5], str(p_out))
|
||
time.sleep(0.3)
|
||
else:
|
||
fail += 1
|
||
|
||
print(f" 回填完成: {ok} 成功, {fail} 失败")
|
||
return ok, fail
|
||
|
||
|
||
def patch_cell(token, doc_id, block_id, content):
|
||
"""更新单个文本块的内容"""
|
||
body = {
|
||
"update_text_elements": {
|
||
"elements": [{"text_run": {"content": content}}]
|
||
}
|
||
}
|
||
try:
|
||
resp = api_patch(token, f"/docx/v1/documents/{doc_id}/blocks/{block_id}", body)
|
||
return resp.get("code") == 0
|
||
except Exception as e:
|
||
print(f" ❌ 回填失败 {block_id}: {e}")
|
||
return False
|
||
|
||
|
||
# ── 报告 ──────────────────────────────────────────────
|
||
|
||
def print_report(knowledge_items, tl_rows, comp_rows):
|
||
"""打印统计报告"""
|
||
print("\n" + "=" * 60)
|
||
print("📊 知识点覆盖率统计")
|
||
print("=" * 60)
|
||
|
||
print(f"\nTL 行(输入): {len(tl_rows)} 行")
|
||
print(f"组件行(输出): {len(comp_rows)} 行")
|
||
|
||
print(f"\n{'知识点':<30} {'输入':>4} {'输出':>4}")
|
||
print("-" * 42)
|
||
|
||
for item in knowledge_items:
|
||
word = item.get("word", "")
|
||
pattern = item.get("pattern", "")
|
||
|
||
if word:
|
||
word_only = word.split()[0]
|
||
w_in = count_occurrences(word_only, tl_rows)
|
||
w_out = count_occurrences(word_only, comp_rows)
|
||
print(f"{word:<30} {w_in:>4} {w_out:>4}")
|
||
|
||
if pattern:
|
||
p_in = count_pattern_occurrences(pattern, tl_rows)
|
||
p_out = count_pattern_occurrences(pattern, comp_rows)
|
||
label = pattern[:28] + ".." if len(pattern) > 30 else pattern
|
||
print(f" {label:<28} {p_in:>4} {p_out:>4}")
|
||
|
||
# 质量检查
|
||
print("\n" + "=" * 60)
|
||
print("🔍 用法质量快速检查")
|
||
print("=" * 60)
|
||
|
||
for item in knowledge_items:
|
||
word = item.get("word", "")
|
||
if not word:
|
||
continue
|
||
word_only = word.split()[0]
|
||
w_out = count_occurrences(word_only, comp_rows)
|
||
w_total = count_occurrences(word_only, tl_rows) + w_out
|
||
|
||
if w_total < 2:
|
||
print(f"\n⚠️ {word} — 仅出现 {w_total} 次,建议增加曝光")
|
||
elif w_total > 8:
|
||
print(f"\n💡 {word} — 出现 {w_total} 次,可适当精简")
|
||
|
||
|
||
# ── 主入口 ────────────────────────────────────────────
|
||
|
||
def main():
|
||
if len(sys.argv) < 2:
|
||
print("用法: python3 knowledge_coverage_analyzer.py <飞书文档ID或URL>")
|
||
sys.exit(1)
|
||
|
||
doc_id = parse_doc_id(sys.argv[1])
|
||
print(f"📄 文档 ID: {doc_id}")
|
||
|
||
# Step 1: 用 lark-cli 获取 markdown
|
||
print("📥 获取文档内容...")
|
||
md = fetch_markdown(doc_id)
|
||
|
||
# Step 2: 解析知识点现状表格
|
||
print("📋 解析「知识点现状」表格...")
|
||
knowledge_data = parse_markdown_table(md, "知识点现状")
|
||
if not knowledge_data:
|
||
print("❌ 未找到「知识点现状」表格")
|
||
sys.exit(1)
|
||
|
||
# 表头: 单词, 输入, 输出, 句型, 输入, 输出
|
||
knowledge_items = []
|
||
for row in knowledge_data[1:]: # 跳过表头
|
||
if len(row) < 6:
|
||
continue
|
||
word = row[0]
|
||
# 清理 markdown 格式:去掉列表符号
|
||
word = re.sub(r'^[-*+]\s+', '', word).strip()
|
||
pattern = row[3].strip()
|
||
if not word and not pattern:
|
||
continue
|
||
knowledge_items.append({"word": word, "pattern": pattern})
|
||
|
||
print(f" 知识点: {len(knowledge_items)} 条")
|
||
for item in knowledge_items:
|
||
w = item["word"] or "(无)"
|
||
p = item["pattern"] or "(无)"
|
||
print(f" - {w} | {p}")
|
||
|
||
# Step 3: 解析剧本主表
|
||
print("\n📋 解析剧本主表...")
|
||
script_data = find_table_by_first_cell_md(md, "类型")
|
||
if not script_data:
|
||
print("❌ 未找到剧本主表(第一列第一行为「类型」)")
|
||
sys.exit(1)
|
||
|
||
tl_rows = []
|
||
comp_rows = []
|
||
last_was_tl = True
|
||
for row in script_data[1:]: # 跳过表头
|
||
if not row:
|
||
continue
|
||
type_text = row[0].strip()
|
||
# 排除「知识点」列(第6列,index 5)避免单词自计数
|
||
row_text = " ".join(row[1:5] + row[6:] if len(row) > 6 else row[1:5])
|
||
# 分类:类型列明确标记为 TL → 输入;
|
||
# 类型列包含交互关键词 → 输出;
|
||
# 空类型 → 继承上一行
|
||
if type_text.startswith("TL"):
|
||
tl_rows.append(row_text)
|
||
last_was_tl = True
|
||
elif not type_text:
|
||
if last_was_tl:
|
||
tl_rows.append(row_text)
|
||
else:
|
||
comp_rows.append(row_text)
|
||
elif any(kw in type_text for kw in ("对话朗读", "对话挖空", "剧情任务", "对话", "挖空", "互动")):
|
||
comp_rows.append(row_text)
|
||
last_was_tl = False
|
||
else:
|
||
# 其他(场景描述等)归为 TL
|
||
tl_rows.append(row_text)
|
||
last_was_tl = True
|
||
|
||
print(f" TL 行: {len(tl_rows)}, 组件行: {len(comp_rows)}")
|
||
|
||
# Step 4: 获取 token + 回填
|
||
print("\n🔑 获取 API Token...")
|
||
token = get_token()
|
||
|
||
print("📝 查找表格 block ID...")
|
||
knowledge_table_id = find_table_block_id(token, doc_id, "知识点现状")
|
||
if not knowledge_table_id:
|
||
print("❌ 未找到「知识点现状」表格 block")
|
||
sys.exit(1)
|
||
|
||
cell_ids = get_cell_block_ids(token, doc_id, knowledge_table_id)
|
||
col_count = 6 # 知识点现状表格固定6列
|
||
print(f" 表格 block: {knowledge_table_id}, 共 {len(cell_ids)} 个单元格")
|
||
|
||
print("\n📝 回填统计结果...")
|
||
fill_cells(token, doc_id, cell_ids, col_count, knowledge_items, tl_rows, comp_rows)
|
||
|
||
# Step 5: 打印报告
|
||
print_report(knowledge_items, tl_rows, comp_rows)
|
||
|
||
print("\n✅ 完成!")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|