301 lines
12 KiB
Python
301 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""单元挑战全题型全面审核——检查解析(explanation)和答案(answer)"""
|
||
import json, sys, time, os
|
||
|
||
APP_TOKEN = "CMHSbUUjka3TrUsaxxEc297ongf"
|
||
APP_ID = "cli_a931175d41799cc7"
|
||
APP_SECRET = "Iw2vEfbjT6GtV0GhbxbZqfQ4nAPtbR14"
|
||
|
||
# 20个表
|
||
TABLES = [
|
||
("tbliZAhcc9C43B23", "听力-P1-图片选择题"),
|
||
("tblzTLNH7f13uWQN", "听力-P2-表格填空题"),
|
||
("tblgxsDn25oSq7WS", "听力-P3-长对话选择"),
|
||
("tblVmeDtBDKsAEfz", "听力-P4-短对话选择题"),
|
||
("tblDssVmhGzc3UKd", "听力-P5-信息匹配题"),
|
||
("tbly9SvPEa44k3yX", "听力-P7-听力拖拽"),
|
||
("tblCgfYDnnqwLfgH", "阅读-P1-信息匹配题"),
|
||
("tblEp820dnatNYbb", "阅读-P2-段落匹配题"),
|
||
("tbl4q0ZUV3HB54t1", "阅读-P3-长文选择题"),
|
||
("tblzKVm1FEukPgnN", "阅读-P4-完形填空题"),
|
||
("tblLmUxzzUDe0QAJ", "阅读-P5-开放填空题"),
|
||
("tblJc60aO0T163MJ", "阅读-P6-看图判断题"),
|
||
("tblweY65jGBiwSdt", "阅读-P7-看图回答题"),
|
||
("tblszuk1TeToofBF", "写作-P1-邮件回复"),
|
||
("tblSAwlMumKoyjws", "写作-P2-看图写作"),
|
||
("tblFc9TVl2PeM2tg", "写作-P3-看图回答题"),
|
||
("tblRGv7k4WH58Jgq", "口语-P1-日常回答"),
|
||
("tblGoWYBmVI0IrvQ", "口语-P2-话题讨论"),
|
||
("tblOHgNkNer2hGEp", "口语-P3-看图回答"),
|
||
("tblsD2dxaRpLmkXD", "口语-P4-看图识物"),
|
||
]
|
||
|
||
def get_token():
|
||
r = __import__('requests').post(
|
||
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
|
||
json={"app_id": APP_ID, "app_secret": APP_SECRET}
|
||
)
|
||
return r.json()["tenant_access_token"]
|
||
|
||
def fetch_records(token, table_id, page_size=100):
|
||
import requests
|
||
records = []
|
||
page_token = None
|
||
while True:
|
||
params = {"page_size": page_size}
|
||
if page_token:
|
||
params["page_token"] = page_token
|
||
r = requests.get(
|
||
f"https://open.feishu.cn/open-apis/bitable/v1/apps/{APP_TOKEN}/tables/{table_id}/records",
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
params=params
|
||
)
|
||
data = r.json()
|
||
if data.get("code") != 0:
|
||
print(f" ERROR fetching {table_id}: {data}", file=sys.stderr)
|
||
break
|
||
d = data.get("data", {})
|
||
records.extend(d.get("items", []))
|
||
if not d.get("has_more"):
|
||
break
|
||
page_token = d.get("page_token")
|
||
time.sleep(0.3)
|
||
return records
|
||
|
||
def is_english_only(text):
|
||
"""Check if text is predominantly English (no Chinese characters)."""
|
||
if not text or not text.strip():
|
||
return False, "空文本"
|
||
chinese_chars = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
|
||
total_chars = len(text.strip())
|
||
if chinese_chars == 0:
|
||
return True, "纯英文"
|
||
if chinese_chars < 5 and total_chars > 30:
|
||
return True, f"几乎纯英文({chinese_chars}个中文字/{total_chars}总字符)"
|
||
return False, f"含中文({chinese_chars}个中文字)"
|
||
|
||
def check_answer(answer, question_set, qtype):
|
||
"""Check answer validity."""
|
||
issues = []
|
||
n_questions = len(question_set) if question_set else 0
|
||
|
||
if not answer:
|
||
# Writing/speaking types may have empty answers
|
||
if qtype in ('writing_email', 'writing_picWrite', 'writing_pic_qa',
|
||
'speaking_qa', 'speaking_pic_qa', 'speaking_topic', 'speaking_pic_recognize'):
|
||
pass # OK for these types
|
||
else:
|
||
issues.append("🔴 answer为空但题型应有答案")
|
||
return issues
|
||
|
||
# Check answer count matches question count
|
||
if isinstance(answer, list):
|
||
if len(answer) != n_questions and n_questions > 0:
|
||
issues.append(f"🟡 answer数量({len(answer)})与questionSet题数({n_questions})不匹配")
|
||
|
||
# Check answer indices for choice types
|
||
choice_types = ('listening_choicePic', 'listening_choiceShort', 'listening_choiceLong',
|
||
'reading_choiceLong', 'reading_matchInfo', 'reading_matchPara',
|
||
'reading_cloze', 'reading_openCloze')
|
||
if qtype in choice_types:
|
||
for i, ans in enumerate(answer):
|
||
q = question_set[i] if i < n_questions else {}
|
||
options = q.get("options", [])
|
||
if options and isinstance(ans, int) and ans >= len(options):
|
||
issues.append(f"🔴 第{i+1}题answer索引({ans})超出选项范围(0-{len(options)-1})")
|
||
|
||
# Check for all-same answer
|
||
if isinstance(answer, list) and len(answer) > 1:
|
||
if len(set(str(a) for a in answer)) == 1:
|
||
issues.append(f"🟡 所有答案相同均为{answer[0]},疑似占位数据")
|
||
|
||
return issues
|
||
|
||
EXPLANATION_ISSUES = "解析纯英文问题"
|
||
ANSWER_ISSUES = "答案问题"
|
||
|
||
def audit_block(block, block_name, qtype, qsid):
|
||
"""Audit a single first/second block."""
|
||
issues = []
|
||
question_set = block.get("questionSet", [])
|
||
|
||
for i, q in enumerate(question_set):
|
||
loc = f"{block_name}[{i}]"
|
||
explanation = q.get("explanation", "")
|
||
|
||
# Check explanation
|
||
is_eng, detail = is_english_only(explanation)
|
||
if is_eng:
|
||
if explanation.strip():
|
||
# Truncate for report
|
||
snippet = explanation[:60] + "..." if len(explanation) > 60 else explanation
|
||
issues.append(f"🔴 {loc} explanation 纯英文: [{snippet}]")
|
||
else:
|
||
issues.append(f"🔴 {loc} explanation 为空")
|
||
|
||
# Check answer
|
||
answer = q.get("answer", q.get("answerText", None))
|
||
# For pic_judge and pic_qa type, answer is answerText
|
||
if answer is None:
|
||
answer = block.get("answer", block.get("answerSet", block.get("answerText", None)))
|
||
|
||
# Check empty explanation with content
|
||
if not explanation or not explanation.strip():
|
||
issues.append(f"🟡 {loc} explanation 为空")
|
||
|
||
# Check block-level answer
|
||
if qtype in ('listening_matchInfo', 'reading_matchInfo'):
|
||
answer_set = block.get("answerSet", [])
|
||
for j, match in enumerate(answer_set):
|
||
# match is like [1, "B"] or similar
|
||
pass # Handle in detail if needed
|
||
|
||
# Check overall answer
|
||
block_answer = block.get("answer", None)
|
||
if block_answer is None:
|
||
block_answer = block.get("answerSet", None)
|
||
|
||
if block_answer and isinstance(block_answer, list) and len(block_answer) > 0:
|
||
ans_issues = check_answer(block_answer, question_set, qtype)
|
||
issues.extend(ans_issues)
|
||
|
||
# For answerSet type
|
||
answer_set = block.get("answerSet", [])
|
||
if answer_set:
|
||
option_list = block.get("optionSetList", [])
|
||
for j, match in enumerate(answer_set):
|
||
if isinstance(match, list) and len(match) >= 2:
|
||
idx = match[1] if isinstance(match[1], int) else (ord(str(match[1]).upper()) - ord('A') if isinstance(match[1], str) else -1)
|
||
if isinstance(idx, int) and idx >= 0 and option_list and idx >= len(option_list):
|
||
issues.append(f"🔴 answerSet[{j}]索引({match[1]})超出optionSetList范围({len(option_list)})")
|
||
|
||
return issues
|
||
|
||
def audit_record(record, table_name):
|
||
"""Audit a single record's jsonData."""
|
||
fields = record.get("fields", {})
|
||
record_id = record.get("record_id", "")
|
||
qsid_field = fields.get("题目集合 ID", fields.get("题目集合ID", ""))
|
||
|
||
json_str = fields.get("jsonData", "")
|
||
if not json_str or not json_str.strip():
|
||
return [f"🔴 {qsid_field}: jsonData 为空"]
|
||
|
||
try:
|
||
jd = json.loads(json_str)
|
||
except json.JSONDecodeError as e:
|
||
return [f"🔴 {qsid_field}: jsonData JSON解析失败: {e}"]
|
||
|
||
all_issues = []
|
||
|
||
# Get question type
|
||
first = jd.get("first", {})
|
||
second = jd.get("second", {})
|
||
qtype = first.get("type", second.get("type", "unknown"))
|
||
|
||
# Audit first block
|
||
if first and first.get("questionSet"):
|
||
issues = audit_block(first, "first", qtype, qsid_field)
|
||
for iss in issues:
|
||
all_issues.append(f"[{qsid_field}] {iss}")
|
||
|
||
# Audit second block
|
||
if second and second.get("questionSet"):
|
||
issues = audit_block(second, "second", qtype, qsid_field)
|
||
for iss in issues:
|
||
all_issues.append(f"[{qsid_field}] {iss}")
|
||
|
||
# Also check first/second level explanation if present
|
||
for block_name, block in [("first", first), ("second", second)]:
|
||
if not block or not block.get("questionSet"):
|
||
continue
|
||
# Some types put explanation at block level
|
||
block_expl = block.get("explanation", "")
|
||
if block_expl:
|
||
is_eng, detail = is_english_only(block_expl)
|
||
if is_eng:
|
||
snippet = block_expl[:60] + "..." if len(block_expl) > 60 else block_expl
|
||
all_issues.append(f"[{qsid_field}] {block_name}.explanation 纯英文: [{snippet}]")
|
||
|
||
return all_issues
|
||
|
||
def main():
|
||
import requests
|
||
|
||
print("=" * 80)
|
||
print("单元挑战全题型审核报告")
|
||
print("检查重点:解析(explanation)纯英文问题、答案(answer)格式问题")
|
||
print("=" * 80)
|
||
|
||
token = get_token()
|
||
print(f"\nToken acquired: {token[:20]}...")
|
||
|
||
total_records = 0
|
||
total_issues = 0
|
||
all_results = {}
|
||
|
||
for table_id, table_name in TABLES:
|
||
print(f"\n{'─' * 60}")
|
||
print(f"📋 正在审核: {table_name} ({table_id})")
|
||
|
||
records = fetch_records(token, table_id)
|
||
print(f" 共 {len(records)} 条记录")
|
||
|
||
table_issues = 0
|
||
table_details = []
|
||
|
||
for rec in records:
|
||
issues = audit_record(rec, table_name)
|
||
if issues:
|
||
table_issues += len(issues)
|
||
table_details.append({
|
||
"record_id": rec.get("record_id"),
|
||
"qsid": rec.get("fields", {}).get("题目集合 ID", rec.get("fields", {}).get("题目集合ID", "")),
|
||
"issues": issues
|
||
})
|
||
|
||
total_records += len(records)
|
||
total_issues += table_issues
|
||
|
||
all_results[table_name] = {
|
||
"table_id": table_id,
|
||
"record_count": len(records),
|
||
"issue_count": table_issues,
|
||
"details": table_details
|
||
}
|
||
|
||
if table_issues == 0:
|
||
print(f" ✅ 无问题")
|
||
else:
|
||
print(f" ⚠️ 发现 {table_issues} 个问题")
|
||
for d in table_details:
|
||
print(f" [{d['qsid']}]")
|
||
for iss in d['issues']:
|
||
print(f" {iss}")
|
||
|
||
# Summary
|
||
print(f"\n{'=' * 80}")
|
||
print(f"📊 审核汇总")
|
||
print(f"{'=' * 80}")
|
||
print(f"表数量: {len(TABLES)}")
|
||
print(f"总记录数: {total_records}")
|
||
print(f"总问题数: {total_issues}")
|
||
print()
|
||
|
||
# Table-level summary
|
||
print("各表问题汇总:")
|
||
print(f"{'表名':<30} {'记录数':>6} {'问题数':>6} {'状态':<10}")
|
||
print("-" * 56)
|
||
for table_name, result in all_results.items():
|
||
status = "✅ OK" if result["issue_count"] == 0 else f"⚠️ {result['issue_count']}个问题"
|
||
print(f"{table_name:<30} {result['record_count']:>6} {result['issue_count']:>6} {status:<10}")
|
||
|
||
# Save detailed results
|
||
output_path = "/root/.openclaw/workspace-xiaoyan/output/unit_challenge_audit_report.json"
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
json.dump(all_results, f, ensure_ascii=False, indent=2)
|
||
print(f"\n详细审核结果已保存到: {output_path}")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|