ai_member_xiaoyan/scripts/audit_l1_config.py

279 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Level 1 配置表 句子知识点审校 — 自动化检查脚本
用法: python3 scripts/audit_l1_config.py <bitable_app_token> <table_id> [record_id]
record_id 可选,不传则检查全表
"""
import json, requests, re, sys, os
CRED = "/root/.openclaw/credentials/xiaoyan/config.json"
THIRD_SINGULAR_RULES = [
(r"\bHe\b.*\bneed\b", "need→needs"), (r"\bShe\b.*\bneed\b", "need→needs"),
(r"\bIt\b.*\bneed\b", "need→needs"), (r"\bOtis\b.*\bneed\b", "need→needs"),
(r"\bTom\b.*\bneed\b", "need→needs"), (r"\bMum\b.*\bneed\b", "need→needs"),
(r"\bDad\b.*\bneed\b", "need→needs"), (r"\bBen\b.*\bneed\b", "need→needs"),
(r"\bhave\b", "三单检查"),
]
REQUIRED_JSON_FIELDS = {
"basicInfo": ["type", "id", "meaning", "desc", "structure", "valaLevel"],
"classificationInfo": ["type", "id", "cambridgeLevel", "cefrLevel", "ncLevel"],
"config": ["type", "id", "title"],
"usageInfo": ["type", "id", "usage"],
}
QUESTION_FIELDS = {
"sentenceMeaningChooseMcq": "场景选择题",
"sentenceMeaningMatchMcq": "听句作答题",
"sentenceMeaningMeaning": "句意选择题",
"sentenceMeaningPic2SentMcq": "看图选择题",
"sentencePronRead": "句子朗读",
"sentencePronRepeatSentence": "句子跟读题",
"sentenceStructureClozeWordMcq": "句子补全题",
"sentenceStructureSort": "句型结构题",
}
Q_REQUIRED = ["category", "skill", "type", "pointId", "question", "options", "answer"]
# 发音类题型不需要 options/answer
PRONUNCIATION_TYPES = {"sentence_pron_read", "sentence_pron_repeat_sentence"}
def get_token():
with open(CRED) as f:
c = json.load(f)
r = requests.post("https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": c["apps"][0]["appId"], "app_secret": c["apps"][0]["appSecret"]})
return r.json()["tenant_access_token"]
def check_classification_swap(record):
"""检查 classificationInfo 中 cambridgeLevel/cefrLevel 是否与列字段互换"""
issues = []
try:
ci = json.loads(record.get("classificationInfo", "{}"))
except:
return ["❌ classificationInfo JSON 解析失败"]
col_cambridge = record.get("剑桥考试级别", "")
col_cefr = record.get("欧标等级", "")
if ci.get("cambridgeLevel") and col_cambridge and ci["cambridgeLevel"] != col_cambridge:
if ci.get("cefrLevel") == col_cambridge:
issues.append(f"❌ classificationInfo 值互换: cambridgeLevel={ci['cambridgeLevel']}(应为{col_cambridge}), cefrLevel={ci['cefrLevel']}(应为{col_cefr})")
else:
issues.append(f"⚠️ classificationInfo.cambridgeLevel={ci['cambridgeLevel']} ≠ 列字段剑桥考试级别={col_cambridge}")
if ci.get("cefrLevel") and col_cefr and ci["cefrLevel"] != col_cefr:
if ci.get("cambridgeLevel") != col_cefr:
issues.append(f"⚠️ classificationInfo.cefrLevel={ci['cefrLevel']} ≠ 列字段欧标等级={col_cefr}")
return issues
def check_json_integrity(record, field, required):
"""检查 JSON 字段的必填字段完整性"""
issues = []
try:
data = json.loads(record.get(field, "{}"))
except:
return [f"{field} JSON 解析失败"]
if not isinstance(data, dict):
return []
for key in required:
if key not in data or data[key] is None or data[key] == "":
issues.append(f"{field} 缺少必填字段 {key}")
# id consistency (strip trailing spaces from both sides)
rid = (record.get("ID", "") or "").strip()
if data.get("id") and rid and str(data["id"]).strip() != rid:
issues.append(f"{field}.id={data['id']} ≠ record ID={rid}")
return issues
def check_question_item(q, qtype, rid):
"""检查单个 question 对象的完整性"""
issues = []
qtype_name = q.get("type", "")
# 发音类题型豁免 options/answer
skip = {"options", "answer"} if qtype_name in PRONUNCIATION_TYPES else set()
for key in Q_REQUIRED:
if key in skip:
continue
if key not in q or q[key] is None or (isinstance(q[key], (list, str)) and len(q[key]) == 0):
issues.append(f"⚠️ {qtype} 缺少字段 {key}")
# pointId consistency (strip spaces)
rid_stripped = rid.strip() if rid else ""
if q.get("pointId") and rid_stripped and str(q["pointId"]).strip() != rid_stripped:
issues.append(f"⚠️ {qtype}.pointId={q['pointId']} ≠ record ID={rid_stripped}")
# answer index validity
options = q.get("options", [])
answers = q.get("answer", [])
for a in answers:
if isinstance(a, int) and (a < 0 or a >= len(options)):
issues.append(f"{qtype} answer 索引 {a} 超出 options 范围 0~{len(options)-1}")
return issues
def check_sort_wordbank_answer(question, rid):
"""检查句型结构题:单词库能否拼出答案,第三人称单数是否正确"""
issues = []
words = question.get("options", [])
indices = question.get("answer", [])
explanation = question.get("explanation", "")
# 拼出句子
try:
assembled = " ".join(words[i] for i in indices)
except:
return [f"❌ sentenceStructureSort answer 索引超出单词库范围"]
# 从 explanation 提取声明答案
m = re.search(r'正确答案是 "(.+?)"', explanation)
if m:
declared = m.group(1)
# 标准化后比较:去末尾标点 + 首字母大写归一
def norm(s):
s = s.rstrip(".!?")
if s and s[0].islower():
s = s[0].upper() + s[1:]
return s
if norm(assembled) != norm(declared):
issues.append(f"❌ sentenceStructureSort 单词拼出 \"{assembled}\" ≠ 解释声明 \"{declared}\"")
# 第三人称单数检查
third_singular_subjects = ["He", "She", "It"] + re.findall(r'\b([A-Z][a-z]+)\b', " ".join(words[:1]))
for subj in third_singular_subjects:
if assembled.startswith(subj) and subj not in ["I", "You", "We", "They"]:
if "need " in assembled and "needs" not in words:
issues.append(f"❌ sentenceStructureSort {subj}三单主语单词库有need无needs")
break
return issues
def check_question_set(record, rid):
"""检查所有题型 JSON"""
issues = []
for field, cname in QUESTION_FIELDS.items():
raw = record.get(field, "")
if not raw:
issues.append(f"⚠️ {field}({cname}) 为空")
continue
try:
questions = json.loads(raw)
except:
issues.append(f"{field}({cname}) JSON 解析失败")
continue
if not isinstance(questions, list):
issues.append(f"{field}({cname}) 应为数组")
continue
for i, q in enumerate(questions):
q_issues = check_question_item(q, f"{cname}[{i}]", rid)
issues.extend(q_issues)
# 句型结构题专项检查
if field == "sentenceStructureSort":
issues.extend(check_sort_wordbank_answer(q, rid))
return issues
def check_consistency_with_chinese_fields(record):
"""中文描述列与 JSON 列内容一致性"""
issues = []
mapping = [
("句意选择题", "sentenceMeaningMeaning"),
("句型结构题", "sentenceStructureSort"),
("句子朗读", "sentencePronRead"),
("看图选择题", "sentenceMeaningPic2SentMcq"),
("听句作答题", "sentenceMeaningMatchMcq"),
("场景选择题", "sentenceMeaningChooseMcq"),
("句子补全题", "sentenceStructureClozeWordMcq"),
("句子跟读题", "sentencePronRepeatSentence"),
]
for col, json_field in mapping:
cn = record.get(col, "")
try:
jd = json.loads(record.get(json_field, "[]"))
except:
continue
if cn and not jd:
issues.append(f"⚠️ {col} 有中文内容但 {json_field} 为空")
if not cn and jd:
issues.append(f"⚠️ {json_field} 有数据但 {col} 为空")
return issues
def audit_record(record):
"""对单条记录执行全量自动化审校"""
rid = record.get("ID", record.get("record_id", "unknown"))
all_issues = []
# 0. ID 字段检查(末尾空格)
if isinstance(rid, str) and rid != rid.rstrip():
all_issues.append(f"⚠️ ID 字段含末尾空格: [{rid}] len={len(rid)}")
rid_clean = rid.strip() if isinstance(rid, str) else str(rid)
# 1. 基础信息完整性
for field, required in REQUIRED_JSON_FIELDS.items():
all_issues.extend(check_json_integrity(record, field, required))
# 2. classificationInfo 互换检测
all_issues.extend(check_classification_swap(record))
# 3. 题型 JSON 检查
all_issues.extend(check_question_set(record, rid_clean))
# 4. 中英文字段一致性
all_issues.extend(check_consistency_with_chinese_fields(record))
return rid_clean, all_issues
def main():
app_token = sys.argv[1] if len(sys.argv) > 1 else "Nq3Zb258aae7SRs2QfXcqsQYnxJ"
table_id = sys.argv[2] if len(sys.argv) > 2 else "tblTxGpf6GQ5c7DZ"
target_id = sys.argv[3] if len(sys.argv) > 3 else None
print(f"🔍 Level 1 配置表审校 | {app_token}/{table_id}")
if target_id:
print(f" 目标: {target_id}")
print()
token = get_token()
if target_id:
# 搜索特定记录
page_token = ""
record = None
while True:
r = requests.get(
f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records",
headers={"Authorization": f"Bearer {token}"},
params={"page_size": 50, "page_token": page_token})
d = r.json().get("data", {})
for item in (d.get("items") or []):
if item["fields"].get("ID") == target_id:
record = item["fields"]
record["record_id"] = item["record_id"]
break
if record:
break
if not d.get("has_more"):
break
page_token = d.get("page_token", "")
if not record:
print(f"❌ 未找到 {target_id}")
return
rid, issues = audit_record(record)
print(f"=== {rid} === 共 {len(issues)} 个问题")
for i in issues:
print(f" {i}")
if not issues:
print(" ✅ 自动化审校通过")
else:
# 全表扫描
page_token = ""
total_issues = 0
total_records = 0
while True:
r = requests.get(
f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records",
headers={"Authorization": f"Bearer {token}"},
params={"page_size": 50, "page_token": page_token})
d = r.json().get("data", {})
for item in (d.get("items") or []):
rec = item["fields"]
rec["record_id"] = item["record_id"]
rid, issues = audit_record(rec)
if issues:
total_issues += len(issues)
total_records += 1
print(f" {rid}: {len(issues)} issues | {issues[0][:80]}...")
if not d.get("has_more"):
break
page_token = d.get("page_token", "")
print(f"\n全表扫描完成: {total_records} 条记录有 {total_issues} 个问题")
if __name__ == "__main__":
main()