ai_member_xiaoyan/scripts/audit_all_unit_challenge.py

301 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""单元挑战全题型全面审核——检查解析explanation和答案answer"""
import json, sys, time, os
APP_TOKEN = "CMHSbUUjka3TrUsaxxEc297ongf"
APP_ID = "cli_a931175d41799cc7"
APP_SECRET = "Iw2vEfbjT6GtV0GhbxbZqfQ4nAPtbR14"
# 20个表
TABLES = [
("tbliZAhcc9C43B23", "听力-P1-图片选择题"),
("tblzTLNH7f13uWQN", "听力-P2-表格填空题"),
("tblgxsDn25oSq7WS", "听力-P3-长对话选择"),
("tblVmeDtBDKsAEfz", "听力-P4-短对话选择题"),
("tblDssVmhGzc3UKd", "听力-P5-信息匹配题"),
("tbly9SvPEa44k3yX", "听力-P7-听力拖拽"),
("tblCgfYDnnqwLfgH", "阅读-P1-信息匹配题"),
("tblEp820dnatNYbb", "阅读-P2-段落匹配题"),
("tbl4q0ZUV3HB54t1", "阅读-P3-长文选择题"),
("tblzKVm1FEukPgnN", "阅读-P4-完形填空题"),
("tblLmUxzzUDe0QAJ", "阅读-P5-开放填空题"),
("tblJc60aO0T163MJ", "阅读-P6-看图判断题"),
("tblweY65jGBiwSdt", "阅读-P7-看图回答题"),
("tblszuk1TeToofBF", "写作-P1-邮件回复"),
("tblSAwlMumKoyjws", "写作-P2-看图写作"),
("tblFc9TVl2PeM2tg", "写作-P3-看图回答题"),
("tblRGv7k4WH58Jgq", "口语-P1-日常回答"),
("tblGoWYBmVI0IrvQ", "口语-P2-话题讨论"),
("tblOHgNkNer2hGEp", "口语-P3-看图回答"),
("tblsD2dxaRpLmkXD", "口语-P4-看图识物"),
]
def get_token():
r = __import__('requests').post(
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": APP_ID, "app_secret": APP_SECRET}
)
return r.json()["tenant_access_token"]
def fetch_records(token, table_id, page_size=100):
import requests
records = []
page_token = None
while True:
params = {"page_size": page_size}
if page_token:
params["page_token"] = page_token
r = requests.get(
f"https://open.feishu.cn/open-apis/bitable/v1/apps/{APP_TOKEN}/tables/{table_id}/records",
headers={"Authorization": f"Bearer {token}"},
params=params
)
data = r.json()
if data.get("code") != 0:
print(f" ERROR fetching {table_id}: {data}", file=sys.stderr)
break
d = data.get("data", {})
records.extend(d.get("items", []))
if not d.get("has_more"):
break
page_token = d.get("page_token")
time.sleep(0.3)
return records
def is_english_only(text):
"""Check if text is predominantly English (no Chinese characters)."""
if not text or not text.strip():
return False, "空文本"
chinese_chars = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
total_chars = len(text.strip())
if chinese_chars == 0:
return True, "纯英文"
if chinese_chars < 5 and total_chars > 30:
return True, f"几乎纯英文({chinese_chars}个中文字/{total_chars}总字符)"
return False, f"含中文({chinese_chars}个中文字)"
def check_answer(answer, question_set, qtype):
"""Check answer validity."""
issues = []
n_questions = len(question_set) if question_set else 0
if not answer:
# Writing/speaking types may have empty answers
if qtype in ('writing_email', 'writing_picWrite', 'writing_pic_qa',
'speaking_qa', 'speaking_pic_qa', 'speaking_topic', 'speaking_pic_recognize'):
pass # OK for these types
else:
issues.append("🔴 answer为空但题型应有答案")
return issues
# Check answer count matches question count
if isinstance(answer, list):
if len(answer) != n_questions and n_questions > 0:
issues.append(f"🟡 answer数量({len(answer)})与questionSet题数({n_questions})不匹配")
# Check answer indices for choice types
choice_types = ('listening_choicePic', 'listening_choiceShort', 'listening_choiceLong',
'reading_choiceLong', 'reading_matchInfo', 'reading_matchPara',
'reading_cloze', 'reading_openCloze')
if qtype in choice_types:
for i, ans in enumerate(answer):
q = question_set[i] if i < n_questions else {}
options = q.get("options", [])
if options and isinstance(ans, int) and ans >= len(options):
issues.append(f"🔴 第{i+1}题answer索引({ans})超出选项范围(0-{len(options)-1})")
# Check for all-same answer
if isinstance(answer, list) and len(answer) > 1:
if len(set(str(a) for a in answer)) == 1:
issues.append(f"🟡 所有答案相同均为{answer[0]},疑似占位数据")
return issues
EXPLANATION_ISSUES = "解析纯英文问题"
ANSWER_ISSUES = "答案问题"
def audit_block(block, block_name, qtype, qsid):
"""Audit a single first/second block."""
issues = []
question_set = block.get("questionSet", [])
for i, q in enumerate(question_set):
loc = f"{block_name}[{i}]"
explanation = q.get("explanation", "")
# Check explanation
is_eng, detail = is_english_only(explanation)
if is_eng:
if explanation.strip():
# Truncate for report
snippet = explanation[:60] + "..." if len(explanation) > 60 else explanation
issues.append(f"🔴 {loc} explanation 纯英文: [{snippet}]")
else:
issues.append(f"🔴 {loc} explanation 为空")
# Check answer
answer = q.get("answer", q.get("answerText", None))
# For pic_judge and pic_qa type, answer is answerText
if answer is None:
answer = block.get("answer", block.get("answerSet", block.get("answerText", None)))
# Check empty explanation with content
if not explanation or not explanation.strip():
issues.append(f"🟡 {loc} explanation 为空")
# Check block-level answer
if qtype in ('listening_matchInfo', 'reading_matchInfo'):
answer_set = block.get("answerSet", [])
for j, match in enumerate(answer_set):
# match is like [1, "B"] or similar
pass # Handle in detail if needed
# Check overall answer
block_answer = block.get("answer", None)
if block_answer is None:
block_answer = block.get("answerSet", None)
if block_answer and isinstance(block_answer, list) and len(block_answer) > 0:
ans_issues = check_answer(block_answer, question_set, qtype)
issues.extend(ans_issues)
# For answerSet type
answer_set = block.get("answerSet", [])
if answer_set:
option_list = block.get("optionSetList", [])
for j, match in enumerate(answer_set):
if isinstance(match, list) and len(match) >= 2:
idx = match[1] if isinstance(match[1], int) else (ord(str(match[1]).upper()) - ord('A') if isinstance(match[1], str) else -1)
if isinstance(idx, int) and idx >= 0 and option_list and idx >= len(option_list):
issues.append(f"🔴 answerSet[{j}]索引({match[1]})超出optionSetList范围({len(option_list)})")
return issues
def audit_record(record, table_name):
"""Audit a single record's jsonData."""
fields = record.get("fields", {})
record_id = record.get("record_id", "")
qsid_field = fields.get("题目集合 ID", fields.get("题目集合ID", ""))
json_str = fields.get("jsonData", "")
if not json_str or not json_str.strip():
return [f"🔴 {qsid_field}: jsonData 为空"]
try:
jd = json.loads(json_str)
except json.JSONDecodeError as e:
return [f"🔴 {qsid_field}: jsonData JSON解析失败: {e}"]
all_issues = []
# Get question type
first = jd.get("first", {})
second = jd.get("second", {})
qtype = first.get("type", second.get("type", "unknown"))
# Audit first block
if first and first.get("questionSet"):
issues = audit_block(first, "first", qtype, qsid_field)
for iss in issues:
all_issues.append(f"[{qsid_field}] {iss}")
# Audit second block
if second and second.get("questionSet"):
issues = audit_block(second, "second", qtype, qsid_field)
for iss in issues:
all_issues.append(f"[{qsid_field}] {iss}")
# Also check first/second level explanation if present
for block_name, block in [("first", first), ("second", second)]:
if not block or not block.get("questionSet"):
continue
# Some types put explanation at block level
block_expl = block.get("explanation", "")
if block_expl:
is_eng, detail = is_english_only(block_expl)
if is_eng:
snippet = block_expl[:60] + "..." if len(block_expl) > 60 else block_expl
all_issues.append(f"[{qsid_field}] {block_name}.explanation 纯英文: [{snippet}]")
return all_issues
def main():
import requests
print("=" * 80)
print("单元挑战全题型审核报告")
print("检查重点解析explanation纯英文问题、答案answer格式问题")
print("=" * 80)
token = get_token()
print(f"\nToken acquired: {token[:20]}...")
total_records = 0
total_issues = 0
all_results = {}
for table_id, table_name in TABLES:
print(f"\n{'' * 60}")
print(f"📋 正在审核: {table_name} ({table_id})")
records = fetch_records(token, table_id)
print(f"{len(records)} 条记录")
table_issues = 0
table_details = []
for rec in records:
issues = audit_record(rec, table_name)
if issues:
table_issues += len(issues)
table_details.append({
"record_id": rec.get("record_id"),
"qsid": rec.get("fields", {}).get("题目集合 ID", rec.get("fields", {}).get("题目集合ID", "")),
"issues": issues
})
total_records += len(records)
total_issues += table_issues
all_results[table_name] = {
"table_id": table_id,
"record_count": len(records),
"issue_count": table_issues,
"details": table_details
}
if table_issues == 0:
print(f" ✅ 无问题")
else:
print(f" ⚠️ 发现 {table_issues} 个问题")
for d in table_details:
print(f" [{d['qsid']}]")
for iss in d['issues']:
print(f" {iss}")
# Summary
print(f"\n{'=' * 80}")
print(f"📊 审核汇总")
print(f"{'=' * 80}")
print(f"表数量: {len(TABLES)}")
print(f"总记录数: {total_records}")
print(f"总问题数: {total_issues}")
print()
# Table-level summary
print("各表问题汇总:")
print(f"{'表名':<30} {'记录数':>6} {'问题数':>6} {'状态':<10}")
print("-" * 56)
for table_name, result in all_results.items():
status = "✅ OK" if result["issue_count"] == 0 else f"⚠️ {result['issue_count']}个问题"
print(f"{table_name:<30} {result['record_count']:>6} {result['issue_count']:>6} {status:<10}")
# Save detailed results
output_path = "/root/.openclaw/workspace-xiaoyan/output/unit_challenge_audit_report.json"
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(all_results, f, ensure_ascii=False, indent=2)
print(f"\n详细审核结果已保存到: {output_path}")
if __name__ == "__main__":
main()