ai_member_xiaoyan/scripts/audit_unit_challenge_v3.py

207 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
修正版审校脚本:按题型区分字段要求
"""
import json, subprocess, sys
APP_TOKEN = "CMHSbUUjka3TrUsaxxEc297ongf"
SKILL_SCRIPT = "/root/.openclaw/workspace-xiaoyan/skills/lark_bitable_operate_as_bot/scripts/operate_bitable.sh"
# Standard ability tags
TAG_L1 = {"显性信息定位|关键词识别","基础语境理解|场景/行为理解","图文判断|句图一致性",
"扫读定位|信息匹配","主旨理解|段落/文本大意","释义选词|定义匹配",
"对话理解|问答匹配","标识与通知理解Signs & Notices"}
TAG_L2 = {"细节理解|事实信息提取","词义理解|语境义判断","同义替换|词/短语级",
"语法结构识别|完形填空","推理判断|原因/结果"}
ALL_TAGS = TAG_L1 | TAG_L2
TAG_MAP = {
"信息提取":"细节理解|事实信息提取", "细节理解":"细节理解|事实信息提取",
"信息定位":"扫读定位|信息匹配", "信息定位与提取":"扫读定位|信息匹配",
"因果推断":"推理判断|原因/结果", "主旨归纳":"主旨理解|段落/文本大意",
"主旨概括":"主旨理解|段落/文本大意", "推理判断":"推理判断|原因/结果",
"信息匹配":"扫读定位|信息匹配",
}
# Per-type required question fields
# reading_cloze and reading_openCloze don't need individual 'question' field
TYPE_FIELDS = {
"reading_matchInfo": ["question", "options", "answer", "ability", "explanation"],
"reading_choiceLong": ["question", "options", "answer", "ability", "explanation"],
"reading_cloze": ["options", "answer", "ability", "explanation"], # no question
"reading_openCloze": ["options", "answer", "ability", "explanation"], # no question
}
def exec_bash(cmd):
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
return json.loads(result.stdout) if result.stdout else {}
def audit_question(q, req_fields, type_name):
"""Audit a single question, returns (errors, warnings, tag_issues)"""
errors = []
warnings = []
tag_issues = []
for req in req_fields:
if req not in q:
errors.append(f"缺少必填字段 '{req}'")
elif q[req] is None:
errors.append(f"必填字段 '{req}' 为 null")
# answer bounds
answer = q.get('answer', [])
options = q.get('options', [])
if isinstance(answer, list) and options:
for ai in answer:
if isinstance(ai, int) and (ai < 0 or ai >= len(options)):
errors.append(f"answer索引{ai}超出options范围(0-{len(options)-1})")
# ability tags
abilities = q.get('ability', [])
if not abilities:
warnings.append("能力标签为空,需补充")
for a in abilities:
if a in ALL_TAGS:
continue
if a in TAG_MAP:
tag_issues.append(f"'{a}''{TAG_MAP[a]}'")
else:
tag_issues.append(f"'{a}' 不在标准标签库中")
if any('听觉' in str(a) or '听力' in str(a) for a in abilities):
errors.append(f"能力标签含'听觉/听力'但题型为{type_name}")
# explanation
expl = q.get('explanation', '')
if expl is None or expl == '':
warnings.append("解析为空,需补充")
elif isinstance(expl, str) and len(expl) < 10:
warnings.append(f"解析过短({len(expl)}字)")
return errors, warnings, tag_issues
def audit_record(fields, record_id, table_name):
jd_str = fields.get('jsonData', '{}')
sid = fields.get('题目集合 ID', '')
try:
jd = json.loads(jd_str)
except:
return [f"jsonData 解析失败"], [], [f"只有一道题组缺少second题组"]
first = jd.get('first', {})
second = jd.get('second', {})
qs1 = first.get('questionSet', [])
qs2 = second.get('questionSet', [])
ftype = first.get('type', 'unknown')
req_fields = TYPE_FIELDS.get(ftype, ["question", "options", "answer", "ability", "explanation"])
errors = []
warnings = []
notes = []
if not qs1 and not qs2:
errors.append("first和second题组均为空")
elif qs1 and not qs2:
notes.append("只有一道题组first缺少second题组")
elif qs2 and not qs1:
notes.append("只有一道题组second缺少first题组")
all_tag_issues = []
for set_name, qset in [('first', qs1), ('second', qs2)]:
for qi, q in enumerate(qset):
prefix = f"{set_name}[{qi}]"
q_errors, q_warnings, q_tag_issues = audit_question(q, req_fields, ftype)
for e in q_errors:
errors.append(f"{prefix}: {e}")
for w in q_warnings:
warnings.append(f"{prefix}: {w}")
all_tag_issues.extend(q_tag_issues)
# Consolidate tag issues
tag_summary = {}
for ti in all_tag_issues:
tag_summary[ti] = tag_summary.get(ti, 0) + 1
return errors, warnings, notes, tag_summary, ftype, len(qs1), len(qs2), sid
def format_result(errors, warnings, notes, tag_summary, ftype, n1, n2, sid):
lines = []
if errors:
lines.append(f"❌ 审校发现问题({len(errors)}项错误)")
elif warnings or notes:
lines.append(f"⚠️ 审校通过({len(warnings)}项建议 + {len(notes)}项备注)")
else:
lines.append("✅ 审校通过(无问题)")
lines.append(f"题型:{ftype} | 题组first={n1}题 second={n2}")
for n in notes:
lines.append(f"📝 备注:{n}")
if errors:
lines.append(f"\n🔴 必须修改:")
for e in errors:
lines.append(f" - {e}")
if tag_summary:
lines.append(f"\n🟡 能力标签映射建议:")
for tag, count in sorted(tag_summary.items()):
lines.append(f" - {tag}{count}处)")
if warnings:
lines.append(f"\n🟡 其他建议:")
for w in warnings:
lines.append(f" - {w}")
return '\n'.join(lines)
# ===== Main =====
targets = {
"阅读-P1": {"table_id": "tblCgfYDnnqwLfgH", "filter_ids": ["032501"]},
"阅读-P3": {"table_id": "tbl4q0ZUV3HB54t1", "filter_ids": None},
"阅读-P4": {"table_id": "tblzKVm1FEukPgnN", "filter_ids": None},
"阅读-P5": {"table_id": "tblLmUxzzUDe0QAJ", "filter_ids": None},
}
all_results = []
for name, config in targets.items():
cmd = f"bash {SKILL_SCRIPT} list_records {APP_TOKEN} {config['table_id']} 500"
data = exec_bash(cmd)
if data.get('code') != 0:
continue
for item in data['data']['items']:
fields = item.get('fields', {})
sid = fields.get('题目集合 ID', '') or ''
if '010199' in str(sid):
continue
if config['filter_ids'] and sid not in config['filter_ids']:
continue
if not sid:
continue
errors, warnings, notes, tag_summary, ftype, n1, n2, sid2 = audit_record(fields, item['record_id'], name)
result_text = format_result(errors, warnings, notes, tag_summary, ftype, n1, n2, sid)
print(f"\n{'='*60}")
print(f"{name} | ID={sid}")
print(f"{'='*60}")
print(result_text)
all_results.append({
"table_name": name,
"table_id": config['table_id'],
"record_id": item['record_id'],
"sid": sid,
"audit_text": result_text,
"has_errors": len(errors) > 0,
})
# Output backfill data
print(f"\n\n===== BACKFILL ({len(all_results)} records) =====")
print(json.dumps(all_results, ensure_ascii=False, indent=2))