""" 完整审校脚本:单元挑战 阅读-P1/P3/P4/P5 """ import json, subprocess, copy APP_TOKEN = "CMHSbUUjka3TrUsaxxEc297ongf" SKILL_SCRIPT = "/root/.openclaw/workspace-xiaoyan/skills/lark_bitable_operate_as_bot/scripts/operate_bitable.sh" # ===== Standard Reading Ability Tags ===== STANDARD_TAGS_L1 = { "显性信息定位|关键词识别", "基础语境理解|场景/行为理解", "图文判断|句图一致性", "扫读定位|信息匹配", "主旨理解|段落/文本大意", "释义选词|定义匹配", "对话理解|问答匹配", "标识与通知理解|Signs & Notices", } STANDARD_TAGS_L2 = { "细节理解|事实信息提取", "词义理解|语境义判断", "同义替换|词/短语级", "语法结构识别|完形填空", "推理判断|原因/结果", } ALL_STANDARD = STANDARD_TAGS_L1 | STANDARD_TAGS_L2 # Non-standard → standard mapping TAG_MAP = { "信息提取": "细节理解|事实信息提取", "细节理解": "细节理解|事实信息提取", "信息定位": "扫读定位|信息匹配", # depends on context, default for reading "信息定位与提取": "扫读定位|信息匹配", "因果推断": "推理判断|原因/结果", "主旨归纳": "主旨理解|段落/文本大意", "主旨概括": "主旨理解|段落/文本大意", "推理判断": "推理判断|原因/结果", "信息匹配": "扫读定位|信息匹配", } def exec_bash(cmd): result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) return json.loads(result.stdout) if result.stdout else {} def audit_ability_tags(abilities, qtype, qcat): """Check and map ability tags""" issues = [] mapped = [] for a in abilities: if a in ALL_STANDARD: mapped.append(a) elif a in TAG_MAP: mapped.append(TAG_MAP[a]) issues.append(f"能力标签 '{a}' → '{TAG_MAP[a]}'(非标准标签,已建议映射)") else: mapped.append(a) issues.append(f"能力标签 '{a}' 不在标准标签库中,需人工确认") return mapped, issues def audit_record(fields, record_id, table_name): """Deep audit of a single record""" jd_str = fields.get('jsonData', '{}') sid = fields.get('题目集合 ID', '') try: jd = json.loads(jd_str) except: return {"errors": [f"jsonData 解析失败"], "warnings": [], "notes": []} errors = [] warnings = [] notes = [] first = jd.get('first', {}) second = jd.get('second', {}) qs1 = first.get('questionSet', []) qs2 = second.get('questionSet', []) ftype = first.get('type', '') fcat = first.get('category', '') # 1. Check question set count if not qs1 and not qs2: errors.append("first和second题组均为空") elif qs1 and not qs2: notes.append("只有一道题组(first),缺少second题组") elif qs2 and not qs1: notes.append("只有一道题组(second),缺少first题组") # 2. Check each question set all_ability_issues = [] for set_name, qset in [('first', qs1), ('second', qs2)]: if not qset: continue for qi, q in enumerate(qset): prefix = f"{set_name}[{qi}]" # Check required fields for req in ['question', 'options', 'answer', 'ability', 'explanation']: if req not in q or not q[req]: errors.append(f"{prefix}: 缺少必填字段 '{req}'") # Check answer bounds answer = q.get('answer', []) options = q.get('options', []) if isinstance(answer, list): for ai in answer: if isinstance(ai, int) and (ai < 0 or ai >= len(options)): errors.append(f"{prefix}: answer索引{ai}超出options范围(0-{len(options)-1})") elif isinstance(answer, int): if answer < 0 or answer >= len(options): errors.append(f"{prefix}: answer索引{answer}超出options范围(0-{len(options)-1})") # Check ability tags abilities = q.get('ability', []) # Hearing tags in reading if any('听觉' in str(a) or '听力' in str(a) for a in abilities): errors.append(f"{prefix}: 能力标签含'听觉/听力'但题型为{ftype}") # Standard check mapped, tag_issues = audit_ability_tags(abilities, ftype, fcat) for ti in tag_issues: warnings.append(f"{prefix}: {ti}") all_ability_issues.extend(tag_issues) # Check explanation quality expl = q.get('explanation', '') if len(expl) < 10: warnings.append(f"{prefix}: 解析过短({len(expl)}字)") # Check questionImage naming qimg = q.get('questionImage', '') if qimg and not qimg.startswith(sid): warnings.append(f"{prefix}: questionImage '{qimg}' 与题目集合ID '{sid}' 不匹配") # 3. Check type consistency second_has_type = second and second.get('type') if second_has_type and second.get('type') != ftype: errors.append(f"first type={ftype} 与 second type={second.get('type')} 不一致") # 4. Check dataStatus ds = fields.get('dataStatus', '') if ds and ds != '1' and ds != '0': warnings.append(f"dataStatus='{ds}' 非标准值") return { "sid": sid, "table_name": table_name, "record_id": record_id, "type": f"{fcat}/{ftype}", "dataStatus": ds, "first_count": len(qs1), "second_count": len(qs2), "errors": errors, "warnings": warnings, "notes": notes, "existing_audit": (fields.get('审校结果', '') or ''), } def format_audit_result(audit): """Format audit result as a structured string for backfill""" lines = [] # Title line has_errors = len(audit['errors']) > 0 has_warnings = len(audit['warnings']) > 0 has_notes = len(audit['notes']) > 0 if has_errors: lines.append(f"❌ 审校发现问题({len(audit['errors'])}项错误)") elif has_warnings: lines.append(f"⚠️ 审校通过({len(audit['warnings'])}项建议)") else: lines.append("✅ 审校通过(无问题)") lines.append(f"题型:{audit['type']} | 题组:first={audit['first_count']}题 second={audit['second_count']}题") if has_notes: for n in audit['notes']: lines.append(f"📝 备注:{n}") if has_errors: lines.append(f"\n🔴 必须修改:") for e in audit['errors']: lines.append(f" - {e}") if has_warnings: lines.append(f"\n🟡 建议修改:") for w in audit['warnings']: lines.append(f" - {w}") return '\n'.join(lines) # ===== Main ===== targets = { "阅读-P1": {"table_id": "tblCgfYDnnqwLfgH", "filter_ids": ["032501"]}, "阅读-P3": {"table_id": "tbl4q0ZUV3HB54t1", "filter_ids": None}, "阅读-P4": {"table_id": "tblzKVm1FEukPgnN", "filter_ids": None}, "阅读-P5": {"table_id": "tblLmUxzzUDe0QAJ", "filter_ids": None}, } all_results = [] for name, config in targets.items(): cmd = f"bash {SKILL_SCRIPT} list_records {APP_TOKEN} {config['table_id']} 500" data = exec_bash(cmd) if data.get('code') != 0: continue for item in data['data']['items']: fields = item.get('fields', {}) sid = fields.get('题目集合 ID', '') or '' if '010199' in str(sid): continue if config['filter_ids'] and sid not in config['filter_ids']: continue if not sid: jd = fields.get('jsonData', '') if jd and jd != '{}': # Record with data but no ID - skip, not valid pass continue audit = audit_record(fields, item['record_id'], name) result_text = format_audit_result(audit) print(f"\n{'='*60}") print(f"{name} | ID={sid}") print(f"{'='*60}") print(result_text) all_results.append({ "table_name": name, "table_id": config['table_id'], "record_id": item['record_id'], "sid": sid, "audit_text": result_text, "has_errors": len(audit['errors']) > 0, }) # Output backfill JSON print(f"\n\n===== BACKFILL DATA ({len(all_results)} records) =====") print(json.dumps(all_results, ensure_ascii=False, indent=2))