#!/usr/bin/env python3 """ Level 1 配置表 句子知识点审校 — 自动化检查脚本 用法: python3 scripts/audit_l1_config.py [record_id] record_id 可选,不传则检查全表 """ import json, requests, re, sys, os CRED = "/root/.openclaw/credentials/xiaoyan/config.json" THIRD_SINGULAR_RULES = [ (r"\bHe\b.*\bneed\b", "need→needs"), (r"\bShe\b.*\bneed\b", "need→needs"), (r"\bIt\b.*\bneed\b", "need→needs"), (r"\bOtis\b.*\bneed\b", "need→needs"), (r"\bTom\b.*\bneed\b", "need→needs"), (r"\bMum\b.*\bneed\b", "need→needs"), (r"\bDad\b.*\bneed\b", "need→needs"), (r"\bBen\b.*\bneed\b", "need→needs"), (r"\bhave\b", "三单检查"), ] REQUIRED_JSON_FIELDS = { "basicInfo": ["type", "id", "meaning", "desc", "structure", "valaLevel"], "classificationInfo": ["type", "id", "cambridgeLevel", "cefrLevel", "ncLevel"], "config": ["type", "id", "title"], "usageInfo": ["type", "id", "usage"], } QUESTION_FIELDS = { "sentenceMeaningChooseMcq": "场景选择题", "sentenceMeaningMatchMcq": "听句作答题", "sentenceMeaningMeaning": "句意选择题", "sentenceMeaningPic2SentMcq": "看图选择题", "sentencePronRead": "句子朗读", "sentencePronRepeatSentence": "句子跟读题", "sentenceStructureClozeWordMcq": "句子补全题", "sentenceStructureSort": "句型结构题", } Q_REQUIRED = ["category", "skill", "type", "pointId", "question", "options", "answer"] # 发音类题型不需要 options/answer PRONUNCIATION_TYPES = {"sentence_pron_read", "sentence_pron_repeat_sentence"} def get_token(): with open(CRED) as f: c = json.load(f) r = requests.post("https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", json={"app_id": c["apps"][0]["appId"], "app_secret": c["apps"][0]["appSecret"]}) return r.json()["tenant_access_token"] def check_classification_swap(record): """检查 classificationInfo 中 cambridgeLevel/cefrLevel 是否与列字段互换""" issues = [] try: ci = json.loads(record.get("classificationInfo", "{}")) except: return ["❌ classificationInfo JSON 解析失败"] col_cambridge = record.get("剑桥考试级别", "") col_cefr = record.get("欧标等级", "") if ci.get("cambridgeLevel") and col_cambridge and ci["cambridgeLevel"] != col_cambridge: if ci.get("cefrLevel") == col_cambridge: issues.append(f"❌ classificationInfo 值互换: cambridgeLevel={ci['cambridgeLevel']}(应为{col_cambridge}), cefrLevel={ci['cefrLevel']}(应为{col_cefr})") else: issues.append(f"⚠️ classificationInfo.cambridgeLevel={ci['cambridgeLevel']} ≠ 列字段剑桥考试级别={col_cambridge}") if ci.get("cefrLevel") and col_cefr and ci["cefrLevel"] != col_cefr: if ci.get("cambridgeLevel") != col_cefr: issues.append(f"⚠️ classificationInfo.cefrLevel={ci['cefrLevel']} ≠ 列字段欧标等级={col_cefr}") return issues def check_json_integrity(record, field, required): """检查 JSON 字段的必填字段完整性""" issues = [] try: data = json.loads(record.get(field, "{}")) except: return [f"❌ {field} JSON 解析失败"] if not isinstance(data, dict): return [] for key in required: if key not in data or data[key] is None or data[key] == "": issues.append(f"❌ {field} 缺少必填字段 {key}") # id consistency (strip trailing spaces from both sides) rid = (record.get("ID", "") or "").strip() if data.get("id") and rid and str(data["id"]).strip() != rid: issues.append(f"❌ {field}.id={data['id']} ≠ record ID={rid}") return issues def check_question_item(q, qtype, rid): """检查单个 question 对象的完整性""" issues = [] qtype_name = q.get("type", "") # 发音类题型豁免 options/answer skip = {"options", "answer"} if qtype_name in PRONUNCIATION_TYPES else set() for key in Q_REQUIRED: if key in skip: continue if key not in q or q[key] is None or (isinstance(q[key], (list, str)) and len(q[key]) == 0): issues.append(f"⚠️ {qtype} 缺少字段 {key}") # pointId consistency (strip spaces) rid_stripped = rid.strip() if rid else "" if q.get("pointId") and rid_stripped and str(q["pointId"]).strip() != rid_stripped: issues.append(f"⚠️ {qtype}.pointId={q['pointId']} ≠ record ID={rid_stripped}") # answer index validity options = q.get("options", []) answers = q.get("answer", []) for a in answers: if isinstance(a, int) and (a < 0 or a >= len(options)): issues.append(f"❌ {qtype} answer 索引 {a} 超出 options 范围 0~{len(options)-1}") return issues def check_sort_wordbank_answer(question, rid): """检查句型结构题:单词库能否拼出答案,第三人称单数是否正确""" issues = [] words = question.get("options", []) indices = question.get("answer", []) explanation = question.get("explanation", "") # 拼出句子 try: assembled = " ".join(words[i] for i in indices) except: return [f"❌ sentenceStructureSort answer 索引超出单词库范围"] # 从 explanation 提取声明答案 m = re.search(r'正确答案是 "(.+?)"', explanation) if m: declared = m.group(1) # 标准化后比较:去末尾标点 + 首字母大写归一 def norm(s): s = s.rstrip(".!?") if s and s[0].islower(): s = s[0].upper() + s[1:] return s if norm(assembled) != norm(declared): issues.append(f"❌ sentenceStructureSort 单词拼出 \"{assembled}\" ≠ 解释声明 \"{declared}\"") # 第三人称单数检查 third_singular_subjects = ["He", "She", "It"] + re.findall(r'\b([A-Z][a-z]+)\b', " ".join(words[:1])) for subj in third_singular_subjects: if assembled.startswith(subj) and subj not in ["I", "You", "We", "They"]: if "need " in assembled and "needs" not in words: issues.append(f"❌ sentenceStructureSort {subj}三单主语,单词库有need无needs") break return issues def check_question_set(record, rid): """检查所有题型 JSON""" issues = [] for field, cname in QUESTION_FIELDS.items(): raw = record.get(field, "") if not raw: issues.append(f"⚠️ {field}({cname}) 为空") continue try: questions = json.loads(raw) except: issues.append(f"❌ {field}({cname}) JSON 解析失败") continue if not isinstance(questions, list): issues.append(f"❌ {field}({cname}) 应为数组") continue for i, q in enumerate(questions): q_issues = check_question_item(q, f"{cname}[{i}]", rid) issues.extend(q_issues) # 句型结构题专项检查 if field == "sentenceStructureSort": issues.extend(check_sort_wordbank_answer(q, rid)) return issues def check_consistency_with_chinese_fields(record): """中文描述列与 JSON 列内容一致性""" issues = [] mapping = [ ("句意选择题", "sentenceMeaningMeaning"), ("句型结构题", "sentenceStructureSort"), ("句子朗读", "sentencePronRead"), ("看图选择题", "sentenceMeaningPic2SentMcq"), ("听句作答题", "sentenceMeaningMatchMcq"), ("场景选择题", "sentenceMeaningChooseMcq"), ("句子补全题", "sentenceStructureClozeWordMcq"), ("句子跟读题", "sentencePronRepeatSentence"), ] for col, json_field in mapping: cn = record.get(col, "") try: jd = json.loads(record.get(json_field, "[]")) except: continue if cn and not jd: issues.append(f"⚠️ {col} 有中文内容但 {json_field} 为空") if not cn and jd: issues.append(f"⚠️ {json_field} 有数据但 {col} 为空") return issues def audit_record(record): """对单条记录执行全量自动化审校""" rid = record.get("ID", record.get("record_id", "unknown")) all_issues = [] # 0. ID 字段检查(末尾空格) if isinstance(rid, str) and rid != rid.rstrip(): all_issues.append(f"⚠️ ID 字段含末尾空格: [{rid}] len={len(rid)}") rid_clean = rid.strip() if isinstance(rid, str) else str(rid) # 1. 基础信息完整性 for field, required in REQUIRED_JSON_FIELDS.items(): all_issues.extend(check_json_integrity(record, field, required)) # 2. classificationInfo 互换检测 all_issues.extend(check_classification_swap(record)) # 3. 题型 JSON 检查 all_issues.extend(check_question_set(record, rid_clean)) # 4. 中英文字段一致性 all_issues.extend(check_consistency_with_chinese_fields(record)) return rid_clean, all_issues def main(): app_token = sys.argv[1] if len(sys.argv) > 1 else "Nq3Zb258aae7SRs2QfXcqsQYnxJ" table_id = sys.argv[2] if len(sys.argv) > 2 else "tblTxGpf6GQ5c7DZ" target_id = sys.argv[3] if len(sys.argv) > 3 else None print(f"🔍 Level 1 配置表审校 | {app_token}/{table_id}") if target_id: print(f" 目标: {target_id}") print() token = get_token() if target_id: # 搜索特定记录 page_token = "" record = None while True: r = requests.get( f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records", headers={"Authorization": f"Bearer {token}"}, params={"page_size": 50, "page_token": page_token}) d = r.json().get("data", {}) for item in (d.get("items") or []): if item["fields"].get("ID") == target_id: record = item["fields"] record["record_id"] = item["record_id"] break if record: break if not d.get("has_more"): break page_token = d.get("page_token", "") if not record: print(f"❌ 未找到 {target_id}") return rid, issues = audit_record(record) print(f"=== {rid} === 共 {len(issues)} 个问题") for i in issues: print(f" {i}") if not issues: print(" ✅ 自动化审校通过") else: # 全表扫描 page_token = "" total_issues = 0 total_records = 0 while True: r = requests.get( f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records", headers={"Authorization": f"Bearer {token}"}, params={"page_size": 50, "page_token": page_token}) d = r.json().get("data", {}) for item in (d.get("items") or []): rec = item["fields"] rec["record_id"] = item["record_id"] rid, issues = audit_record(rec) if issues: total_issues += len(issues) total_records += 1 print(f" {rid}: {len(issues)} issues | {issues[0][:80]}...") if not d.get("has_more"): break page_token = d.get("page_token", "") print(f"\n全表扫描完成: {total_records} 条记录有 {total_issues} 个问题") if __name__ == "__main__": main()