#!/usr/bin/env python3 """审计写作和口语题型表的 explanation 字段 - 完整版""" import requests, json, re, time, copy APP_ID = "cli_a931175d41799cc7" APP_SECRET = "Iw2vEfbjT6GtV0GhbxbZqfQ4nAPtbR14" APP_TOKEN = "CMHSbUUjka3TrUsaxxEc297ongf" BASE = "https://open.feishu.cn/open-apis" CHOICE_TERMS_SPEAKING = [ "材料", "提到", "选", "误选", "干扰", "正确答案是", "根据.*内容", "原文", "文中", "文章", "选项", "排除", "不符合", "与.*不符", "图中", "图片中", "文本中", "文中显示" ] CHOICE_TERMS_WRITING = ["选", "误选", "干扰项", "选项", "排除", "正确答案", "材料中", "文中提到", "原文", "根据文章"] def get_token(): r = requests.post(f"{BASE}/auth/v3/tenant_access_token/internal", json={"app_id": APP_ID, "app_secret": APP_SECRET}) r.raise_for_status() return r.json()["tenant_access_token"] def get_all_records(token, table_id): all_recs = [] page_token = None while True: params = {"page_size": 500} if page_token: params["page_token"] = page_token r = requests.get(f"{BASE}/bitable/v1/apps/{APP_TOKEN}/tables/{table_id}/records", headers={"Authorization": f"Bearer {token}"}, params=params) r.raise_for_status() data = r.json() items = data.get("data", {}).get("items", []) all_recs.extend(items) if not data.get("data", {}).get("has_more", False): break page_token = data.get("data", {}).get("page_token", "") if not page_token: break return all_recs def update_record(token, table_id, record_id, fields): headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} r = requests.put(f"{BASE}/bitable/v1/apps/{APP_TOKEN}/tables/{table_id}/records/{record_id}", headers=headers, json={"fields": fields}) r.raise_for_status() return r.json() def check_explanation_problems(explanation, table_type, prev_explanations, idx_in_set): """Check if an explanation has problems. Returns (has_problem, description).""" if not explanation or explanation.strip() == "": return False, None exp = explanation.strip() # Check for placeholder content placeholders = ["xxxx", "这是一个解析", "这是一个能力项", "N/A", "占位"] for ph in placeholders: if ph in exp: return True, f"含占位符内容: {ph}" # Check for choice/reading terms terms = CHOICE_TERMS_SPEAKING if table_type == "speaking" else CHOICE_TERMS_WRITING for term in terms: if re.search(term, exp): return True, f"含选择题/阅读题用语: {term}" # Check for material description instead of student response if table_type == "speaking": material_pat = [r"图片中", r"图中", r"材料中", r"文本中", r"文中显示"] for pat in material_pat: if re.search(pat, exp): return True, f"描述材料内容而非学生回答: {pat}" # Check identical to previous in same set if idx_in_set > 0 and prev_explanations and exp == prev_explanations[-1].strip(): return True, "与同一questionSet内前一项解析逐字相同" return False, None def collect_all_explanations(parsed_json): """Collect all explanations from a parsed jsonData (dict or questionSet items).""" exps = [] for sub_key, sub_val in parsed_json.items(): if not isinstance(sub_val, dict): continue # Direct explanation if "explanation" in sub_val: exps.append(sub_val["explanation"]) # questionSet explanations qset = sub_val.get("questionSet", []) for q in qset: if isinstance(q, dict) and "explanation" in q: exps.append(q["explanation"]) return exps def generate_writing_explanation(sub_data): """Generate a proper writing explanation from sub-question data.""" text_desc = sub_data.get("textDesc", "") ability = sub_data.get("ability", []) category = sub_data.get("category", "") qtype = sub_data.get("type", "") parts = [] if text_desc: parts.append(f"写作任务: {text_desc}") if ability: if isinstance(ability, list): parts.append(f"能力目标: {'、'.join(ability)}") else: parts.append(f"能力目标: {ability}") parts.append("评分维度: 内容完整性、语言准确性、结构逻辑性、书写规范性") # Type-specific guidance if qtype == "writing_emailReply": parts.append("写作要点: 注意邮件格式规范(称呼、正文、署名)、逻辑顺序清晰、语言得体") elif qtype == "writing_picWrite": parts.append("写作要点: 按图片顺序组织叙述、使用时间衔接词、故事完整性") elif "看图" in text_desc: parts.append("写作要点: 准确描述图片内容、使用恰当的衔接词、逻辑连贯") return "\n".join(parts) def generate_speaking_explanation(q_data, sub_data): """Generate a proper speaking explanation from question data.""" question = q_data.get("question", q_data.get("content", "")) ability = q_data.get("ability", []) image_desc = q_data.get("imageDesc", "") parts = [] if question: parts.append(f"回答要点: {question}") if image_desc: parts.append(f"图片内容: {image_desc}") if ability: if isinstance(ability, list): ability_str = '、'.join(ability) else: ability_str = str(ability) if ability_str not in ["这是一个能力项", "这是第二个能力项", "xxxx"]: parts.append(f"考察能力: {ability_str}") parts.append("评估标准: 语音语调准确性、语言流利度、内容完整性与相关性、语法准确性") parts.append("回答指导: 鼓励学生用完整句子作答,根据图片内容组织语言,表达清晰有条理") return "\n".join(parts) def generate_speaking_explanation_simple(q_data, idx): """Simple speaking explanation for 看图识物 type.""" question = q_data.get("question", "") image_desc = q_data.get("imageDesc", "") parts = [] if question: parts.append(f"提问: {question}") if image_desc: parts.append(f"图片描述: {image_desc}") parts.append("评估要点: 语音语调、用词准确性、回答完整性") return "\n".join(parts) def audit_record_explanations(parsed_json, table_type): """Audit all explanations in a record. Returns (has_problems, fixed_json, problem_descs).""" has_problems = False problem_descs = [] fixed = copy.deepcopy(parsed_json) for sub_key, sub_val in parsed_json.items(): if not isinstance(sub_val, dict): continue # Check direct explanation at sub-question level if "explanation" in sub_val: exp = sub_val["explanation"] problem, desc = check_explanation_problems(exp, table_type, [], 0) if problem: has_problems = True problem_descs.append(f"{sub_key}.explanation: {desc}") fixed[sub_key]["explanation"] = generate_writing_explanation(sub_val) # Check explanations inside questionSet qset = sub_val.get("questionSet", []) if qset and len(qset) > 0: prev_exps = [] for i, q in enumerate(qset): if not isinstance(q, dict): continue if "explanation" in q: exp = q.get("explanation", "") problem, desc = check_explanation_problems(exp, table_type, prev_exps, i) if problem: has_problems = True problem_descs.append(f"{sub_key}.questionSet[{i}].explanation: {desc}") if table_type == "speaking": fixed[sub_key]["questionSet"][i]["explanation"] = \ generate_speaking_explanation(q, sub_val) else: fixed[sub_key]["questionSet"][i]["explanation"] = \ generate_writing_explanation(sub_val) if "explanation" in q: prev_exps.append(q["explanation"]) return has_problems, fixed, problem_descs def audit_table(token, table_id, table_type, audit_field_name): """Audit all records in a table.""" records = get_all_records(token, table_id) result = {"total": len(records), "has_json": 0, "problems": 0, "fixed": 0, "skipped": 0} for rec in records: rid = rec["record_id"] fields = rec.get("fields", {}) jd_raw = fields.get("jsonData", "") if not jd_raw or jd_raw.strip() == "": result["skipped"] += 1 continue try: parsed = json.loads(jd_raw) if isinstance(jd_raw, str) else jd_raw except: result["skipped"] += 1 continue if not isinstance(parsed, dict) or len(parsed) == 0: result["skipped"] += 1 continue result["has_json"] += 1 has_problems, fixed_json, descs = audit_record_explanations(parsed, table_type) if has_problems: result["problems"] += 1 result["fixed"] += 1 new_json = json.dumps(fixed_json, ensure_ascii=False) update_fields = {"jsonData": new_json} if audit_field_name: update_fields[audit_field_name] = "修复解析" update_record(token, table_id, rid, update_fields) print(f" [FIXED] {rid}: {'; '.join(descs)}") else: # No problems - update audit result only if audit_field_name: current = fields.get(audit_field_name, "") if current != "未改动": update_record(token, table_id, rid, {audit_field_name: "未改动"}) return result def main(): token = get_token() print(f"Token: {token[:20]}...") tables = [ ("写作-P1-邮件回复", "tblszuk1TeToofBF", "writing", "审校结果"), ("写作-P2-看图写作", "tblSAwlMumKoyjws", "writing", None), # No audit field ("写作-P3-看图回答题", "tblFc9TVl2PeM2tg", "writing", "审核结果"), ("口语-P2-话题讨论", "tblGoWYBmVI0IrvQ", "speaking", "审核结果"), ("口语-P3-看图回答", "tblOHgNkNer2hGEp", "speaking", "审核结果"), ("口语-P4-看图识物", "tblsD2dxaRpLmkXD", "speaking", None), # No audit field ] all_results = {} for name, tid, ttype, audit_field in tables: print(f"\n{'='*60}") print(f"审计: {name}") print(f"{'='*60}") r = audit_table(token, tid, ttype, audit_field) all_results[name] = r print(f" 总记录: {r['total']}, 含jsonData: {r['has_json']}, " f"有问题: {r['problems']}, 已修复: {r['fixed']}, 跳过: {r['skipped']}") # Summary print(f"\n{'='*60}") print("汇总") print(f"{'='*60}") total = sum(r["total"] for r in all_results.values()) t_json = sum(r["has_json"] for r in all_results.values()) t_prob = sum(r["problems"] for r in all_results.values()) t_fix = sum(r["fixed"] for r in all_results.values()) t_skip = sum(r["skipped"] for r in all_results.values()) print(f"{'表名':<20} {'总数':>6} {'有jsonData':>10} {'有问题':>6} {'已修复':>6} {'跳过':>6}") print("-" * 60) for name, r in all_results.items(): print(f"{name:<20} {r['total']:>6} {r['has_json']:>10} " f"{r['problems']:>6} {r['fixed']:>6} {r['skipped']:>6}") print("-" * 60) print(f"{'合计':<20} {total:>6} {t_json:>10} {t_prob:>6} {t_fix:>6} {t_skip:>6}") if __name__ == "__main__": main()