ai_member_xiaoyan/scripts/audit_explanation.py

299 lines
12 KiB
Python

#!/usr/bin/env python3
"""审计写作和口语题型表的 explanation 字段 - 完整版"""
import requests, json, re, time, copy
APP_ID = "cli_a931175d41799cc7"
APP_SECRET = "Iw2vEfbjT6GtV0GhbxbZqfQ4nAPtbR14"
APP_TOKEN = "CMHSbUUjka3TrUsaxxEc297ongf"
BASE = "https://open.feishu.cn/open-apis"
CHOICE_TERMS_SPEAKING = [
"材料", "提到", "", "误选", "干扰", "正确答案是", "根据.*内容",
"原文", "文中", "文章", "选项", "排除", "不符合", "与.*不符",
"图中", "图片中", "文本中", "文中显示"
]
CHOICE_TERMS_WRITING = ["", "误选", "干扰项", "选项", "排除", "正确答案",
"材料中", "文中提到", "原文", "根据文章"]
def get_token():
r = requests.post(f"{BASE}/auth/v3/tenant_access_token/internal",
json={"app_id": APP_ID, "app_secret": APP_SECRET})
r.raise_for_status()
return r.json()["tenant_access_token"]
def get_all_records(token, table_id):
all_recs = []
page_token = None
while True:
params = {"page_size": 500}
if page_token:
params["page_token"] = page_token
r = requests.get(f"{BASE}/bitable/v1/apps/{APP_TOKEN}/tables/{table_id}/records",
headers={"Authorization": f"Bearer {token}"}, params=params)
r.raise_for_status()
data = r.json()
items = data.get("data", {}).get("items", [])
all_recs.extend(items)
if not data.get("data", {}).get("has_more", False):
break
page_token = data.get("data", {}).get("page_token", "")
if not page_token:
break
return all_recs
def update_record(token, table_id, record_id, fields):
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
r = requests.put(f"{BASE}/bitable/v1/apps/{APP_TOKEN}/tables/{table_id}/records/{record_id}",
headers=headers, json={"fields": fields})
r.raise_for_status()
return r.json()
def check_explanation_problems(explanation, table_type, prev_explanations, idx_in_set):
"""Check if an explanation has problems. Returns (has_problem, description)."""
if not explanation or explanation.strip() == "":
return False, None
exp = explanation.strip()
# Check for placeholder content
placeholders = ["xxxx", "这是一个解析", "这是一个能力项", "N/A", "占位"]
for ph in placeholders:
if ph in exp:
return True, f"含占位符内容: {ph}"
# Check for choice/reading terms
terms = CHOICE_TERMS_SPEAKING if table_type == "speaking" else CHOICE_TERMS_WRITING
for term in terms:
if re.search(term, exp):
return True, f"含选择题/阅读题用语: {term}"
# Check for material description instead of student response
if table_type == "speaking":
material_pat = [r"图片中", r"图中", r"材料中", r"文本中", r"文中显示"]
for pat in material_pat:
if re.search(pat, exp):
return True, f"描述材料内容而非学生回答: {pat}"
# Check identical to previous in same set
if idx_in_set > 0 and prev_explanations and exp == prev_explanations[-1].strip():
return True, "与同一questionSet内前一项解析逐字相同"
return False, None
def collect_all_explanations(parsed_json):
"""Collect all explanations from a parsed jsonData (dict or questionSet items)."""
exps = []
for sub_key, sub_val in parsed_json.items():
if not isinstance(sub_val, dict):
continue
# Direct explanation
if "explanation" in sub_val:
exps.append(sub_val["explanation"])
# questionSet explanations
qset = sub_val.get("questionSet", [])
for q in qset:
if isinstance(q, dict) and "explanation" in q:
exps.append(q["explanation"])
return exps
def generate_writing_explanation(sub_data):
"""Generate a proper writing explanation from sub-question data."""
text_desc = sub_data.get("textDesc", "")
ability = sub_data.get("ability", [])
category = sub_data.get("category", "")
qtype = sub_data.get("type", "")
parts = []
if text_desc:
parts.append(f"写作任务: {text_desc}")
if ability:
if isinstance(ability, list):
parts.append(f"能力目标: {''.join(ability)}")
else:
parts.append(f"能力目标: {ability}")
parts.append("评分维度: 内容完整性、语言准确性、结构逻辑性、书写规范性")
# Type-specific guidance
if qtype == "writing_emailReply":
parts.append("写作要点: 注意邮件格式规范(称呼、正文、署名)、逻辑顺序清晰、语言得体")
elif qtype == "writing_picWrite":
parts.append("写作要点: 按图片顺序组织叙述、使用时间衔接词、故事完整性")
elif "看图" in text_desc:
parts.append("写作要点: 准确描述图片内容、使用恰当的衔接词、逻辑连贯")
return "\n".join(parts)
def generate_speaking_explanation(q_data, sub_data):
"""Generate a proper speaking explanation from question data."""
question = q_data.get("question", q_data.get("content", ""))
ability = q_data.get("ability", [])
image_desc = q_data.get("imageDesc", "")
parts = []
if question:
parts.append(f"回答要点: {question}")
if image_desc:
parts.append(f"图片内容: {image_desc}")
if ability:
if isinstance(ability, list):
ability_str = ''.join(ability)
else:
ability_str = str(ability)
if ability_str not in ["这是一个能力项", "这是第二个能力项", "xxxx"]:
parts.append(f"考察能力: {ability_str}")
parts.append("评估标准: 语音语调准确性、语言流利度、内容完整性与相关性、语法准确性")
parts.append("回答指导: 鼓励学生用完整句子作答,根据图片内容组织语言,表达清晰有条理")
return "\n".join(parts)
def generate_speaking_explanation_simple(q_data, idx):
"""Simple speaking explanation for 看图识物 type."""
question = q_data.get("question", "")
image_desc = q_data.get("imageDesc", "")
parts = []
if question:
parts.append(f"提问: {question}")
if image_desc:
parts.append(f"图片描述: {image_desc}")
parts.append("评估要点: 语音语调、用词准确性、回答完整性")
return "\n".join(parts)
def audit_record_explanations(parsed_json, table_type):
"""Audit all explanations in a record. Returns (has_problems, fixed_json, problem_descs)."""
has_problems = False
problem_descs = []
fixed = copy.deepcopy(parsed_json)
for sub_key, sub_val in parsed_json.items():
if not isinstance(sub_val, dict):
continue
# Check direct explanation at sub-question level
if "explanation" in sub_val:
exp = sub_val["explanation"]
problem, desc = check_explanation_problems(exp, table_type, [], 0)
if problem:
has_problems = True
problem_descs.append(f"{sub_key}.explanation: {desc}")
fixed[sub_key]["explanation"] = generate_writing_explanation(sub_val)
# Check explanations inside questionSet
qset = sub_val.get("questionSet", [])
if qset and len(qset) > 0:
prev_exps = []
for i, q in enumerate(qset):
if not isinstance(q, dict):
continue
if "explanation" in q:
exp = q.get("explanation", "")
problem, desc = check_explanation_problems(exp, table_type, prev_exps, i)
if problem:
has_problems = True
problem_descs.append(f"{sub_key}.questionSet[{i}].explanation: {desc}")
if table_type == "speaking":
fixed[sub_key]["questionSet"][i]["explanation"] = \
generate_speaking_explanation(q, sub_val)
else:
fixed[sub_key]["questionSet"][i]["explanation"] = \
generate_writing_explanation(sub_val)
if "explanation" in q:
prev_exps.append(q["explanation"])
return has_problems, fixed, problem_descs
def audit_table(token, table_id, table_type, audit_field_name):
"""Audit all records in a table."""
records = get_all_records(token, table_id)
result = {"total": len(records), "has_json": 0, "problems": 0, "fixed": 0, "skipped": 0}
for rec in records:
rid = rec["record_id"]
fields = rec.get("fields", {})
jd_raw = fields.get("jsonData", "")
if not jd_raw or jd_raw.strip() == "":
result["skipped"] += 1
continue
try:
parsed = json.loads(jd_raw) if isinstance(jd_raw, str) else jd_raw
except:
result["skipped"] += 1
continue
if not isinstance(parsed, dict) or len(parsed) == 0:
result["skipped"] += 1
continue
result["has_json"] += 1
has_problems, fixed_json, descs = audit_record_explanations(parsed, table_type)
if has_problems:
result["problems"] += 1
result["fixed"] += 1
new_json = json.dumps(fixed_json, ensure_ascii=False)
update_fields = {"jsonData": new_json}
if audit_field_name:
update_fields[audit_field_name] = "修复解析"
update_record(token, table_id, rid, update_fields)
print(f" [FIXED] {rid}: {'; '.join(descs)}")
else:
# No problems - update audit result only
if audit_field_name:
current = fields.get(audit_field_name, "")
if current != "未改动":
update_record(token, table_id, rid, {audit_field_name: "未改动"})
return result
def main():
token = get_token()
print(f"Token: {token[:20]}...")
tables = [
("写作-P1-邮件回复", "tblszuk1TeToofBF", "writing", "审校结果"),
("写作-P2-看图写作", "tblSAwlMumKoyjws", "writing", None), # No audit field
("写作-P3-看图回答题", "tblFc9TVl2PeM2tg", "writing", "审核结果"),
("口语-P2-话题讨论", "tblGoWYBmVI0IrvQ", "speaking", "审核结果"),
("口语-P3-看图回答", "tblOHgNkNer2hGEp", "speaking", "审核结果"),
("口语-P4-看图识物", "tblsD2dxaRpLmkXD", "speaking", None), # No audit field
]
all_results = {}
for name, tid, ttype, audit_field in tables:
print(f"\n{'='*60}")
print(f"审计: {name}")
print(f"{'='*60}")
r = audit_table(token, tid, ttype, audit_field)
all_results[name] = r
print(f" 总记录: {r['total']}, 含jsonData: {r['has_json']}, "
f"有问题: {r['problems']}, 已修复: {r['fixed']}, 跳过: {r['skipped']}")
# Summary
print(f"\n{'='*60}")
print("汇总")
print(f"{'='*60}")
total = sum(r["total"] for r in all_results.values())
t_json = sum(r["has_json"] for r in all_results.values())
t_prob = sum(r["problems"] for r in all_results.values())
t_fix = sum(r["fixed"] for r in all_results.values())
t_skip = sum(r["skipped"] for r in all_results.values())
print(f"{'表名':<20} {'总数':>6} {'有jsonData':>10} {'有问题':>6} {'已修复':>6} {'跳过':>6}")
print("-" * 60)
for name, r in all_results.items():
print(f"{name:<20} {r['total']:>6} {r['has_json']:>10} "
f"{r['problems']:>6} {r['fixed']:>6} {r['skipped']:>6}")
print("-" * 60)
print(f"{'合计':<20} {total:>6} {t_json:>10} {t_prob:>6} {t_fix:>6} {t_skip:>6}")
if __name__ == "__main__":
main()