279 lines
11 KiB
Python
279 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Level 1 配置表 句子知识点审校 — 自动化检查脚本
|
||
用法: python3 scripts/audit_l1_config.py <bitable_app_token> <table_id> [record_id]
|
||
record_id 可选,不传则检查全表
|
||
"""
|
||
import json, requests, re, sys, os
|
||
|
||
CRED = "/root/.openclaw/credentials/xiaoyan/config.json"
|
||
THIRD_SINGULAR_RULES = [
|
||
(r"\bHe\b.*\bneed\b", "need→needs"), (r"\bShe\b.*\bneed\b", "need→needs"),
|
||
(r"\bIt\b.*\bneed\b", "need→needs"), (r"\bOtis\b.*\bneed\b", "need→needs"),
|
||
(r"\bTom\b.*\bneed\b", "need→needs"), (r"\bMum\b.*\bneed\b", "need→needs"),
|
||
(r"\bDad\b.*\bneed\b", "need→needs"), (r"\bBen\b.*\bneed\b", "need→needs"),
|
||
(r"\bhave\b", "三单检查"),
|
||
]
|
||
|
||
REQUIRED_JSON_FIELDS = {
|
||
"basicInfo": ["type", "id", "meaning", "desc", "structure", "valaLevel"],
|
||
"classificationInfo": ["type", "id", "cambridgeLevel", "cefrLevel", "ncLevel"],
|
||
"config": ["type", "id", "title"],
|
||
"usageInfo": ["type", "id", "usage"],
|
||
}
|
||
QUESTION_FIELDS = {
|
||
"sentenceMeaningChooseMcq": "场景选择题",
|
||
"sentenceMeaningMatchMcq": "听句作答题",
|
||
"sentenceMeaningMeaning": "句意选择题",
|
||
"sentenceMeaningPic2SentMcq": "看图选择题",
|
||
"sentencePronRead": "句子朗读",
|
||
"sentencePronRepeatSentence": "句子跟读题",
|
||
"sentenceStructureClozeWordMcq": "句子补全题",
|
||
"sentenceStructureSort": "句型结构题",
|
||
}
|
||
Q_REQUIRED = ["category", "skill", "type", "pointId", "question", "options", "answer"]
|
||
# 发音类题型不需要 options/answer
|
||
PRONUNCIATION_TYPES = {"sentence_pron_read", "sentence_pron_repeat_sentence"}
|
||
|
||
def get_token():
|
||
with open(CRED) as f:
|
||
c = json.load(f)
|
||
r = requests.post("https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
|
||
json={"app_id": c["apps"][0]["appId"], "app_secret": c["apps"][0]["appSecret"]})
|
||
return r.json()["tenant_access_token"]
|
||
|
||
def check_classification_swap(record):
|
||
"""检查 classificationInfo 中 cambridgeLevel/cefrLevel 是否与列字段互换"""
|
||
issues = []
|
||
try:
|
||
ci = json.loads(record.get("classificationInfo", "{}"))
|
||
except:
|
||
return ["❌ classificationInfo JSON 解析失败"]
|
||
col_cambridge = record.get("剑桥考试级别", "")
|
||
col_cefr = record.get("欧标等级", "")
|
||
if ci.get("cambridgeLevel") and col_cambridge and ci["cambridgeLevel"] != col_cambridge:
|
||
if ci.get("cefrLevel") == col_cambridge:
|
||
issues.append(f"❌ classificationInfo 值互换: cambridgeLevel={ci['cambridgeLevel']}(应为{col_cambridge}), cefrLevel={ci['cefrLevel']}(应为{col_cefr})")
|
||
else:
|
||
issues.append(f"⚠️ classificationInfo.cambridgeLevel={ci['cambridgeLevel']} ≠ 列字段剑桥考试级别={col_cambridge}")
|
||
if ci.get("cefrLevel") and col_cefr and ci["cefrLevel"] != col_cefr:
|
||
if ci.get("cambridgeLevel") != col_cefr:
|
||
issues.append(f"⚠️ classificationInfo.cefrLevel={ci['cefrLevel']} ≠ 列字段欧标等级={col_cefr}")
|
||
return issues
|
||
|
||
def check_json_integrity(record, field, required):
|
||
"""检查 JSON 字段的必填字段完整性"""
|
||
issues = []
|
||
try:
|
||
data = json.loads(record.get(field, "{}"))
|
||
except:
|
||
return [f"❌ {field} JSON 解析失败"]
|
||
if not isinstance(data, dict):
|
||
return []
|
||
for key in required:
|
||
if key not in data or data[key] is None or data[key] == "":
|
||
issues.append(f"❌ {field} 缺少必填字段 {key}")
|
||
# id consistency (strip trailing spaces from both sides)
|
||
rid = (record.get("ID", "") or "").strip()
|
||
if data.get("id") and rid and str(data["id"]).strip() != rid:
|
||
issues.append(f"❌ {field}.id={data['id']} ≠ record ID={rid}")
|
||
return issues
|
||
|
||
def check_question_item(q, qtype, rid):
|
||
"""检查单个 question 对象的完整性"""
|
||
issues = []
|
||
qtype_name = q.get("type", "")
|
||
# 发音类题型豁免 options/answer
|
||
skip = {"options", "answer"} if qtype_name in PRONUNCIATION_TYPES else set()
|
||
for key in Q_REQUIRED:
|
||
if key in skip:
|
||
continue
|
||
if key not in q or q[key] is None or (isinstance(q[key], (list, str)) and len(q[key]) == 0):
|
||
issues.append(f"⚠️ {qtype} 缺少字段 {key}")
|
||
# pointId consistency (strip spaces)
|
||
rid_stripped = rid.strip() if rid else ""
|
||
if q.get("pointId") and rid_stripped and str(q["pointId"]).strip() != rid_stripped:
|
||
issues.append(f"⚠️ {qtype}.pointId={q['pointId']} ≠ record ID={rid_stripped}")
|
||
# answer index validity
|
||
options = q.get("options", [])
|
||
answers = q.get("answer", [])
|
||
for a in answers:
|
||
if isinstance(a, int) and (a < 0 or a >= len(options)):
|
||
issues.append(f"❌ {qtype} answer 索引 {a} 超出 options 范围 0~{len(options)-1}")
|
||
return issues
|
||
|
||
def check_sort_wordbank_answer(question, rid):
|
||
"""检查句型结构题:单词库能否拼出答案,第三人称单数是否正确"""
|
||
issues = []
|
||
words = question.get("options", [])
|
||
indices = question.get("answer", [])
|
||
explanation = question.get("explanation", "")
|
||
# 拼出句子
|
||
try:
|
||
assembled = " ".join(words[i] for i in indices)
|
||
except:
|
||
return [f"❌ sentenceStructureSort answer 索引超出单词库范围"]
|
||
# 从 explanation 提取声明答案
|
||
m = re.search(r'正确答案是 "(.+?)"', explanation)
|
||
if m:
|
||
declared = m.group(1)
|
||
# 标准化后比较:去末尾标点 + 首字母大写归一
|
||
def norm(s):
|
||
s = s.rstrip(".!?")
|
||
if s and s[0].islower():
|
||
s = s[0].upper() + s[1:]
|
||
return s
|
||
if norm(assembled) != norm(declared):
|
||
issues.append(f"❌ sentenceStructureSort 单词拼出 \"{assembled}\" ≠ 解释声明 \"{declared}\"")
|
||
# 第三人称单数检查
|
||
third_singular_subjects = ["He", "She", "It"] + re.findall(r'\b([A-Z][a-z]+)\b', " ".join(words[:1]))
|
||
for subj in third_singular_subjects:
|
||
if assembled.startswith(subj) and subj not in ["I", "You", "We", "They"]:
|
||
if "need " in assembled and "needs" not in words:
|
||
issues.append(f"❌ sentenceStructureSort {subj}三单主语,单词库有need无needs")
|
||
break
|
||
return issues
|
||
|
||
def check_question_set(record, rid):
|
||
"""检查所有题型 JSON"""
|
||
issues = []
|
||
for field, cname in QUESTION_FIELDS.items():
|
||
raw = record.get(field, "")
|
||
if not raw:
|
||
issues.append(f"⚠️ {field}({cname}) 为空")
|
||
continue
|
||
try:
|
||
questions = json.loads(raw)
|
||
except:
|
||
issues.append(f"❌ {field}({cname}) JSON 解析失败")
|
||
continue
|
||
if not isinstance(questions, list):
|
||
issues.append(f"❌ {field}({cname}) 应为数组")
|
||
continue
|
||
for i, q in enumerate(questions):
|
||
q_issues = check_question_item(q, f"{cname}[{i}]", rid)
|
||
issues.extend(q_issues)
|
||
# 句型结构题专项检查
|
||
if field == "sentenceStructureSort":
|
||
issues.extend(check_sort_wordbank_answer(q, rid))
|
||
return issues
|
||
|
||
def check_consistency_with_chinese_fields(record):
|
||
"""中文描述列与 JSON 列内容一致性"""
|
||
issues = []
|
||
mapping = [
|
||
("句意选择题", "sentenceMeaningMeaning"),
|
||
("句型结构题", "sentenceStructureSort"),
|
||
("句子朗读", "sentencePronRead"),
|
||
("看图选择题", "sentenceMeaningPic2SentMcq"),
|
||
("听句作答题", "sentenceMeaningMatchMcq"),
|
||
("场景选择题", "sentenceMeaningChooseMcq"),
|
||
("句子补全题", "sentenceStructureClozeWordMcq"),
|
||
("句子跟读题", "sentencePronRepeatSentence"),
|
||
]
|
||
for col, json_field in mapping:
|
||
cn = record.get(col, "")
|
||
try:
|
||
jd = json.loads(record.get(json_field, "[]"))
|
||
except:
|
||
continue
|
||
if cn and not jd:
|
||
issues.append(f"⚠️ {col} 有中文内容但 {json_field} 为空")
|
||
if not cn and jd:
|
||
issues.append(f"⚠️ {json_field} 有数据但 {col} 为空")
|
||
return issues
|
||
|
||
def audit_record(record):
|
||
"""对单条记录执行全量自动化审校"""
|
||
rid = record.get("ID", record.get("record_id", "unknown"))
|
||
all_issues = []
|
||
|
||
# 0. ID 字段检查(末尾空格)
|
||
if isinstance(rid, str) and rid != rid.rstrip():
|
||
all_issues.append(f"⚠️ ID 字段含末尾空格: [{rid}] len={len(rid)}")
|
||
rid_clean = rid.strip() if isinstance(rid, str) else str(rid)
|
||
|
||
# 1. 基础信息完整性
|
||
for field, required in REQUIRED_JSON_FIELDS.items():
|
||
all_issues.extend(check_json_integrity(record, field, required))
|
||
|
||
# 2. classificationInfo 互换检测
|
||
all_issues.extend(check_classification_swap(record))
|
||
|
||
# 3. 题型 JSON 检查
|
||
all_issues.extend(check_question_set(record, rid_clean))
|
||
|
||
# 4. 中英文字段一致性
|
||
all_issues.extend(check_consistency_with_chinese_fields(record))
|
||
|
||
return rid_clean, all_issues
|
||
|
||
def main():
|
||
app_token = sys.argv[1] if len(sys.argv) > 1 else "Nq3Zb258aae7SRs2QfXcqsQYnxJ"
|
||
table_id = sys.argv[2] if len(sys.argv) > 2 else "tblTxGpf6GQ5c7DZ"
|
||
target_id = sys.argv[3] if len(sys.argv) > 3 else None
|
||
|
||
print(f"🔍 Level 1 配置表审校 | {app_token}/{table_id}")
|
||
if target_id:
|
||
print(f" 目标: {target_id}")
|
||
print()
|
||
|
||
token = get_token()
|
||
|
||
if target_id:
|
||
# 搜索特定记录
|
||
page_token = ""
|
||
record = None
|
||
while True:
|
||
r = requests.get(
|
||
f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records",
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
params={"page_size": 50, "page_token": page_token})
|
||
d = r.json().get("data", {})
|
||
for item in (d.get("items") or []):
|
||
if item["fields"].get("ID") == target_id:
|
||
record = item["fields"]
|
||
record["record_id"] = item["record_id"]
|
||
break
|
||
if record:
|
||
break
|
||
if not d.get("has_more"):
|
||
break
|
||
page_token = d.get("page_token", "")
|
||
|
||
if not record:
|
||
print(f"❌ 未找到 {target_id}")
|
||
return
|
||
rid, issues = audit_record(record)
|
||
print(f"=== {rid} === 共 {len(issues)} 个问题")
|
||
for i in issues:
|
||
print(f" {i}")
|
||
if not issues:
|
||
print(" ✅ 自动化审校通过")
|
||
else:
|
||
# 全表扫描
|
||
page_token = ""
|
||
total_issues = 0
|
||
total_records = 0
|
||
while True:
|
||
r = requests.get(
|
||
f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records",
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
params={"page_size": 50, "page_token": page_token})
|
||
d = r.json().get("data", {})
|
||
for item in (d.get("items") or []):
|
||
rec = item["fields"]
|
||
rec["record_id"] = item["record_id"]
|
||
rid, issues = audit_record(rec)
|
||
if issues:
|
||
total_issues += len(issues)
|
||
total_records += 1
|
||
print(f" {rid}: {len(issues)} issues | {issues[0][:80]}...")
|
||
if not d.get("has_more"):
|
||
break
|
||
page_token = d.get("page_token", "")
|
||
print(f"\n全表扫描完成: {total_records} 条记录有 {total_issues} 个问题")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|