299 lines
12 KiB
Python
299 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""审计写作和口语题型表的 explanation 字段 - 完整版"""
|
|
import requests, json, re, time, copy
|
|
|
|
APP_ID = "cli_a931175d41799cc7"
|
|
APP_SECRET = "Iw2vEfbjT6GtV0GhbxbZqfQ4nAPtbR14"
|
|
APP_TOKEN = "CMHSbUUjka3TrUsaxxEc297ongf"
|
|
BASE = "https://open.feishu.cn/open-apis"
|
|
|
|
CHOICE_TERMS_SPEAKING = [
|
|
"材料", "提到", "选", "误选", "干扰", "正确答案是", "根据.*内容",
|
|
"原文", "文中", "文章", "选项", "排除", "不符合", "与.*不符",
|
|
"图中", "图片中", "文本中", "文中显示"
|
|
]
|
|
CHOICE_TERMS_WRITING = ["选", "误选", "干扰项", "选项", "排除", "正确答案",
|
|
"材料中", "文中提到", "原文", "根据文章"]
|
|
|
|
def get_token():
|
|
r = requests.post(f"{BASE}/auth/v3/tenant_access_token/internal",
|
|
json={"app_id": APP_ID, "app_secret": APP_SECRET})
|
|
r.raise_for_status()
|
|
return r.json()["tenant_access_token"]
|
|
|
|
def get_all_records(token, table_id):
|
|
all_recs = []
|
|
page_token = None
|
|
while True:
|
|
params = {"page_size": 500}
|
|
if page_token:
|
|
params["page_token"] = page_token
|
|
r = requests.get(f"{BASE}/bitable/v1/apps/{APP_TOKEN}/tables/{table_id}/records",
|
|
headers={"Authorization": f"Bearer {token}"}, params=params)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
items = data.get("data", {}).get("items", [])
|
|
all_recs.extend(items)
|
|
if not data.get("data", {}).get("has_more", False):
|
|
break
|
|
page_token = data.get("data", {}).get("page_token", "")
|
|
if not page_token:
|
|
break
|
|
return all_recs
|
|
|
|
def update_record(token, table_id, record_id, fields):
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
r = requests.put(f"{BASE}/bitable/v1/apps/{APP_TOKEN}/tables/{table_id}/records/{record_id}",
|
|
headers=headers, json={"fields": fields})
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
def check_explanation_problems(explanation, table_type, prev_explanations, idx_in_set):
|
|
"""Check if an explanation has problems. Returns (has_problem, description)."""
|
|
if not explanation or explanation.strip() == "":
|
|
return False, None
|
|
|
|
exp = explanation.strip()
|
|
|
|
# Check for placeholder content
|
|
placeholders = ["xxxx", "这是一个解析", "这是一个能力项", "N/A", "占位"]
|
|
for ph in placeholders:
|
|
if ph in exp:
|
|
return True, f"含占位符内容: {ph}"
|
|
|
|
# Check for choice/reading terms
|
|
terms = CHOICE_TERMS_SPEAKING if table_type == "speaking" else CHOICE_TERMS_WRITING
|
|
for term in terms:
|
|
if re.search(term, exp):
|
|
return True, f"含选择题/阅读题用语: {term}"
|
|
|
|
# Check for material description instead of student response
|
|
if table_type == "speaking":
|
|
material_pat = [r"图片中", r"图中", r"材料中", r"文本中", r"文中显示"]
|
|
for pat in material_pat:
|
|
if re.search(pat, exp):
|
|
return True, f"描述材料内容而非学生回答: {pat}"
|
|
|
|
# Check identical to previous in same set
|
|
if idx_in_set > 0 and prev_explanations and exp == prev_explanations[-1].strip():
|
|
return True, "与同一questionSet内前一项解析逐字相同"
|
|
|
|
return False, None
|
|
|
|
def collect_all_explanations(parsed_json):
|
|
"""Collect all explanations from a parsed jsonData (dict or questionSet items)."""
|
|
exps = []
|
|
for sub_key, sub_val in parsed_json.items():
|
|
if not isinstance(sub_val, dict):
|
|
continue
|
|
# Direct explanation
|
|
if "explanation" in sub_val:
|
|
exps.append(sub_val["explanation"])
|
|
# questionSet explanations
|
|
qset = sub_val.get("questionSet", [])
|
|
for q in qset:
|
|
if isinstance(q, dict) and "explanation" in q:
|
|
exps.append(q["explanation"])
|
|
return exps
|
|
|
|
def generate_writing_explanation(sub_data):
|
|
"""Generate a proper writing explanation from sub-question data."""
|
|
text_desc = sub_data.get("textDesc", "")
|
|
ability = sub_data.get("ability", [])
|
|
category = sub_data.get("category", "")
|
|
qtype = sub_data.get("type", "")
|
|
|
|
parts = []
|
|
if text_desc:
|
|
parts.append(f"写作任务: {text_desc}")
|
|
if ability:
|
|
if isinstance(ability, list):
|
|
parts.append(f"能力目标: {'、'.join(ability)}")
|
|
else:
|
|
parts.append(f"能力目标: {ability}")
|
|
|
|
parts.append("评分维度: 内容完整性、语言准确性、结构逻辑性、书写规范性")
|
|
|
|
# Type-specific guidance
|
|
if qtype == "writing_emailReply":
|
|
parts.append("写作要点: 注意邮件格式规范(称呼、正文、署名)、逻辑顺序清晰、语言得体")
|
|
elif qtype == "writing_picWrite":
|
|
parts.append("写作要点: 按图片顺序组织叙述、使用时间衔接词、故事完整性")
|
|
elif "看图" in text_desc:
|
|
parts.append("写作要点: 准确描述图片内容、使用恰当的衔接词、逻辑连贯")
|
|
|
|
return "\n".join(parts)
|
|
|
|
def generate_speaking_explanation(q_data, sub_data):
|
|
"""Generate a proper speaking explanation from question data."""
|
|
question = q_data.get("question", q_data.get("content", ""))
|
|
ability = q_data.get("ability", [])
|
|
image_desc = q_data.get("imageDesc", "")
|
|
|
|
parts = []
|
|
if question:
|
|
parts.append(f"回答要点: {question}")
|
|
if image_desc:
|
|
parts.append(f"图片内容: {image_desc}")
|
|
if ability:
|
|
if isinstance(ability, list):
|
|
ability_str = '、'.join(ability)
|
|
else:
|
|
ability_str = str(ability)
|
|
if ability_str not in ["这是一个能力项", "这是第二个能力项", "xxxx"]:
|
|
parts.append(f"考察能力: {ability_str}")
|
|
|
|
parts.append("评估标准: 语音语调准确性、语言流利度、内容完整性与相关性、语法准确性")
|
|
parts.append("回答指导: 鼓励学生用完整句子作答,根据图片内容组织语言,表达清晰有条理")
|
|
|
|
return "\n".join(parts)
|
|
|
|
def generate_speaking_explanation_simple(q_data, idx):
|
|
"""Simple speaking explanation for 看图识物 type."""
|
|
question = q_data.get("question", "")
|
|
image_desc = q_data.get("imageDesc", "")
|
|
parts = []
|
|
if question:
|
|
parts.append(f"提问: {question}")
|
|
if image_desc:
|
|
parts.append(f"图片描述: {image_desc}")
|
|
parts.append("评估要点: 语音语调、用词准确性、回答完整性")
|
|
return "\n".join(parts)
|
|
|
|
|
|
def audit_record_explanations(parsed_json, table_type):
|
|
"""Audit all explanations in a record. Returns (has_problems, fixed_json, problem_descs)."""
|
|
has_problems = False
|
|
problem_descs = []
|
|
fixed = copy.deepcopy(parsed_json)
|
|
|
|
for sub_key, sub_val in parsed_json.items():
|
|
if not isinstance(sub_val, dict):
|
|
continue
|
|
|
|
# Check direct explanation at sub-question level
|
|
if "explanation" in sub_val:
|
|
exp = sub_val["explanation"]
|
|
problem, desc = check_explanation_problems(exp, table_type, [], 0)
|
|
if problem:
|
|
has_problems = True
|
|
problem_descs.append(f"{sub_key}.explanation: {desc}")
|
|
fixed[sub_key]["explanation"] = generate_writing_explanation(sub_val)
|
|
|
|
# Check explanations inside questionSet
|
|
qset = sub_val.get("questionSet", [])
|
|
if qset and len(qset) > 0:
|
|
prev_exps = []
|
|
for i, q in enumerate(qset):
|
|
if not isinstance(q, dict):
|
|
continue
|
|
if "explanation" in q:
|
|
exp = q.get("explanation", "")
|
|
problem, desc = check_explanation_problems(exp, table_type, prev_exps, i)
|
|
if problem:
|
|
has_problems = True
|
|
problem_descs.append(f"{sub_key}.questionSet[{i}].explanation: {desc}")
|
|
if table_type == "speaking":
|
|
fixed[sub_key]["questionSet"][i]["explanation"] = \
|
|
generate_speaking_explanation(q, sub_val)
|
|
else:
|
|
fixed[sub_key]["questionSet"][i]["explanation"] = \
|
|
generate_writing_explanation(sub_val)
|
|
if "explanation" in q:
|
|
prev_exps.append(q["explanation"])
|
|
|
|
return has_problems, fixed, problem_descs
|
|
|
|
|
|
def audit_table(token, table_id, table_type, audit_field_name):
|
|
"""Audit all records in a table."""
|
|
records = get_all_records(token, table_id)
|
|
result = {"total": len(records), "has_json": 0, "problems": 0, "fixed": 0, "skipped": 0}
|
|
|
|
for rec in records:
|
|
rid = rec["record_id"]
|
|
fields = rec.get("fields", {})
|
|
jd_raw = fields.get("jsonData", "")
|
|
|
|
if not jd_raw or jd_raw.strip() == "":
|
|
result["skipped"] += 1
|
|
continue
|
|
|
|
try:
|
|
parsed = json.loads(jd_raw) if isinstance(jd_raw, str) else jd_raw
|
|
except:
|
|
result["skipped"] += 1
|
|
continue
|
|
|
|
if not isinstance(parsed, dict) or len(parsed) == 0:
|
|
result["skipped"] += 1
|
|
continue
|
|
|
|
result["has_json"] += 1
|
|
|
|
has_problems, fixed_json, descs = audit_record_explanations(parsed, table_type)
|
|
|
|
if has_problems:
|
|
result["problems"] += 1
|
|
result["fixed"] += 1
|
|
new_json = json.dumps(fixed_json, ensure_ascii=False)
|
|
update_fields = {"jsonData": new_json}
|
|
if audit_field_name:
|
|
update_fields[audit_field_name] = "修复解析"
|
|
update_record(token, table_id, rid, update_fields)
|
|
print(f" [FIXED] {rid}: {'; '.join(descs)}")
|
|
else:
|
|
# No problems - update audit result only
|
|
if audit_field_name:
|
|
current = fields.get(audit_field_name, "")
|
|
if current != "未改动":
|
|
update_record(token, table_id, rid, {audit_field_name: "未改动"})
|
|
|
|
return result
|
|
|
|
|
|
def main():
|
|
token = get_token()
|
|
print(f"Token: {token[:20]}...")
|
|
|
|
tables = [
|
|
("写作-P1-邮件回复", "tblszuk1TeToofBF", "writing", "审校结果"),
|
|
("写作-P2-看图写作", "tblSAwlMumKoyjws", "writing", None), # No audit field
|
|
("写作-P3-看图回答题", "tblFc9TVl2PeM2tg", "writing", "审核结果"),
|
|
("口语-P2-话题讨论", "tblGoWYBmVI0IrvQ", "speaking", "审核结果"),
|
|
("口语-P3-看图回答", "tblOHgNkNer2hGEp", "speaking", "审核结果"),
|
|
("口语-P4-看图识物", "tblsD2dxaRpLmkXD", "speaking", None), # No audit field
|
|
]
|
|
|
|
all_results = {}
|
|
|
|
for name, tid, ttype, audit_field in tables:
|
|
print(f"\n{'='*60}")
|
|
print(f"审计: {name}")
|
|
print(f"{'='*60}")
|
|
r = audit_table(token, tid, ttype, audit_field)
|
|
all_results[name] = r
|
|
print(f" 总记录: {r['total']}, 含jsonData: {r['has_json']}, "
|
|
f"有问题: {r['problems']}, 已修复: {r['fixed']}, 跳过: {r['skipped']}")
|
|
|
|
# Summary
|
|
print(f"\n{'='*60}")
|
|
print("汇总")
|
|
print(f"{'='*60}")
|
|
total = sum(r["total"] for r in all_results.values())
|
|
t_json = sum(r["has_json"] for r in all_results.values())
|
|
t_prob = sum(r["problems"] for r in all_results.values())
|
|
t_fix = sum(r["fixed"] for r in all_results.values())
|
|
t_skip = sum(r["skipped"] for r in all_results.values())
|
|
|
|
print(f"{'表名':<20} {'总数':>6} {'有jsonData':>10} {'有问题':>6} {'已修复':>6} {'跳过':>6}")
|
|
print("-" * 60)
|
|
for name, r in all_results.items():
|
|
print(f"{name:<20} {r['total']:>6} {r['has_json']:>10} "
|
|
f"{r['problems']:>6} {r['fixed']:>6} {r['skipped']:>6}")
|
|
print("-" * 60)
|
|
print(f"{'合计':<20} {total:>6} {t_json:>10} {t_prob:>6} {t_fix:>6} {t_skip:>6}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|