#!/usr/bin/env python3 """ 将审校结果写回单元挑战多维表格的"审校结果"列 """ import json, subprocess, os, sys SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) WORKSPACE = os.path.dirname(SCRIPT_DIR) BITABLE_SCRIPT = os.path.join(WORKSPACE, "skills/lark_bitable_operate_as_bot/scripts/operate_bitable.sh") APP_TOKEN = "CMHSbUUjka3TrUsaxxEc297ongf" TABLES = { "听力-P1-图片选择题": "tbliZAhcc9C43B23", "听力-P2-表格填空题": "tblzTLNH7f13uWQN", "听力-P4-短对话选择题": "tblVmeDtBDKsAEfz", "听力-P5-信息匹配题": "tblDssVmhGzc3UKd", "听力-P6-听力选图": "tbloiMcD0sBtGSTq", "听力-P7-听力拖拽": "tbly9SvPEa44k3yX", } KNOWN_ABILITY_LABELS = { "显性事实理解|关键词识别", "显性事实理解|单句信息点抓取", "显性细节理解|数字/时间/地点", "多特征整合", "语用推断", "干扰抑制|多信息筛选", "多句保持|信息整合", "语用推断|否定与纠错", "听觉抓取关键信息", "问题意图识别", "关键细节听辨", "图像语义对齐", "近义改写", "否定与纠错", } def fetch_all(): all_recs = {} for tname, tid in TABLES.items(): result = subprocess.run( ["bash", BITABLE_SCRIPT, "list_records", APP_TOKEN, tid, "200"], capture_output=True, text=True, timeout=60 ) try: data = json.loads(result.stdout) if data.get("code") == 0: all_recs[tname] = data["data"]["items"] except: all_recs[tname] = [] return all_recs def audit_record(rec): """Returns (result_text, has_errors)""" issues = [] fields = rec.get("fields", {}) jd_raw = fields.get("jsonData") qs_id = fields.get("题目集合 ID", "") if not jd_raw: return None, False # empty record, skip try: parsed = json.loads(jd_raw) except json.JSONDecodeError as e: return f"❌ jsonData JSON解析失败: {e}", True first = parsed.get("first", {}) second = parsed.get("second", {}) qtype = first.get("type", "unknown") f_qsid = first.get("questionSetID", "") s_qsid = second.get("questionSetID", "") # Check questionSetID consistency if qs_id and f_qsid and f_qsid != qs_id: issues.append(f" ❌ first questionSetID({f_qsid})与字段'题目集合 ID'({qs_id})不一致") if qs_id and s_qsid and s_qsid != qs_id: issues.append(f" ❌ second questionSetID({s_qsid})与字段'题目集合 ID'({qs_id})不一致") if f_qsid == "000001": issues.append(f" ❌ questionSetID为000001(占位数据)") if qs_id and not qs_id.isdigit(): issues.append(f" ❌ 题目集合 ID异常: '{qs_id}'") # Check each block for bname, block in [("first", first), ("second", second)]: qs = block.get("questionSet", []) if not isinstance(qs, list) or len(qs) == 0: if block: # non-empty block issues.append(f" ❌ {bname}.questionSet为空") continue for i, q in enumerate(qs): # explanation expl = q.get("explanation", "") if not expl or expl.strip() == "": issues.append(f" ❌ {bname}[{i}]: explanation为空") elif len(expl) < 20: issues.append(f" 🟡 {bname}[{i}]: explanation过短({len(expl)}字)") # ability ability = q.get("ability", []) if not ability: issues.append(f" ❌ {bname}[{i}]: ability为空") else: for a in ability: if isinstance(a, str) and "¥¥" in a: issues.append(f" ❌ {bname}[{i}]: ability使用¥¥分隔符: '{a[:60]}...'") break if isinstance(a, str) and a not in KNOWN_ABILITY_LABELS: if "|" not in a and len(a) > 5: issues.append(f" 🟡 {bname}[{i}]: ability非标准: '{a}'") # Check text fields text1 = None text2 = None for k in ["题目1 完整配置", "题目1", "题目完整配置"]: if fields.get(k): text1 = fields[k] break for k in ["题目2 完整配置", "题目2"]: if fields.get(k): text2 = fields[k] break if not text1: issues.append(f" ❌ 题目1文本字段为空") if second and second.get("questionSet") and not text2: issues.append(f" 🟡 题目2文本字段为空(但jsonData有second块)") if not issues: return f"✅ 审校通过\n题型:{qtype} | 题组:first={len(first.get('questionSet',[]))}题 second={len(second.get('questionSet',[]))}题", False else: header = f"❌ 审校发现问题({len(issues)}项)\n题型:{qtype} | 题组:first={len(first.get('questionSet',[]))}题 second={len(second.get('questionSet',[]))}题\n" return header + "\n".join(issues), True def write_result(table_id, record_id, result_text): """Write audit result to bitable""" payload = json.dumps({"审校结果": result_text}) result = subprocess.run( ["bash", BITABLE_SCRIPT, "update_record", APP_TOKEN, table_id, record_id, payload], capture_output=True, text=True, timeout=30 ) return "success" in result.stdout def main(): all_recs = fetch_all() total_err = 0 total_ok = 0 total_skipped = 0 for tname, records in all_recs.items(): tid = TABLES[tname] print(f"\n--- {tname} ---") for rec in records: rid = rec["record_id"] fields = rec.get("fields", {}) ds = fields.get("dataStatus") if ds != "0" or not fields.get("jsonData"): total_skipped += 1 continue result_text, has_err = audit_record(rec) if result_text is None: total_skipped += 1 continue # Write result ok = write_result(tid, rid, result_text) status = "✅" if ok else "❌写入失败" label = "🔴" if has_err else "✅" print(f" {label} {rid}: {status}") if has_err: total_err += 1 else: total_ok += 1 print(f"\n{'='*40}") print(f"汇总: ✅通过={total_ok}, 🔴问题={total_err}, ⏭️跳过={total_skipped}") if __name__ == "__main__": main()