ai_member_xiaoyan/scripts/write_audit_results_v3.py

143 lines
5.0 KiB
Python

#!/usr/bin/env python3
"""写审校结果到单元挑战听力表"""
import requests, json, sys, time
APP_TOKEN = "CMHSbUUjka3TrUsaxxEc297ongf"
APP_ID = "cli_a931175d41799cc7"
APP_SECRET = "Iw2vEfbjT6GtV0GhbxbZqfQ4nAPtbR14"
BASE = "https://open.feishu.cn/open-apis/bitable/v1"
TABLES = [
("听力-P1-图片选择题", "tbliZAhcc9C43B23"),
("听力-P2-表格填空题", "tblzTLNH7f13uWQN"),
("听力-P4-短对话选择题", "tblVmeDtBDKsAEfz"),
("听力-P5-信息匹配题", "tblDssVmhGzc3UKd"),
("听力-P6-听力选图", "tbloiMcD0sBtGSTq"),
("听力-P7-听力拖拽", "tbly9SvPEa44k3yX"),
]
def get_token():
r = requests.post(
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": APP_ID, "app_secret": APP_SECRET}, timeout=10
)
return r.json()["tenant_access_token"]
def fetch_all(token, table_id):
items = []
pt = None
while True:
url = f"{BASE}/apps/{APP_TOKEN}/tables/{table_id}/records?page_size=200"
if pt:
url += f"&page_token={pt}"
r = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30)
d = r.json()
if d.get("code") != 0:
print(f" fetch error: {d.get('msg')}", file=sys.stderr)
break
items.extend(d["data"]["items"])
if not d["data"].get("has_more"):
break
pt = d["data"].get("page_token")
return items
def audit(rec):
issues = []
fld = rec.get("fields", {})
jd = fld.get("jsonData")
qsid = fld.get("题目集合 ID", "")
if not jd:
return None
try:
p = json.loads(jd)
except:
return ("❌ jsonData解析失败", True)
f = p.get("first", {})
s = p.get("second", {})
qt = f.get("type", "?")
fq = f.get("questionSetID", "")
sq = s.get("questionSetID", "")
if qsid and fq and fq != qsid:
issues.append(f" ❌ first QSID({fq})≠字段({qsid})")
if qsid and sq and sq != qsid:
issues.append(f" ❌ second QSID({sq})≠字段({qsid})")
if fq == "000001":
issues.append(f" ❌ QSID=000001")
if qsid and not qsid.replace("-","").isdigit() and qsid != "000001":
issues.append(f" ❌ QSID异常:'{qsid}'")
for bn, blk in [("first", f), ("second", s)]:
qs = blk.get("questionSet", [])
if not isinstance(qs, list) or not qs:
continue
for i, q in enumerate(qs):
e = q.get("explanation", "")
if not e or not e.strip():
issues.append(f"{bn}[{i}]: explanation空")
elif len(e) < 20:
issues.append(f" 🟡 {bn}[{i}]: explanation短({len(e)}字)")
ab = q.get("ability", [])
if not ab:
issues.append(f"{bn}[{i}]: ability空")
else:
for a in ab:
if isinstance(a, str) and "¥¥" in a:
issues.append(f"{bn}[{i}]: ability¥¥分隔")
break
t1 = fld.get("题目1 完整配置") or fld.get("题目1") or fld.get("题目完整配置")
t2 = fld.get("题目2 完整配置") or fld.get("题目2")
if not t1:
issues.append(f" ❌ 题目1字段空")
if s and s.get("questionSet") and not t2:
issues.append(f" 🟡 题目2字段空")
if not issues:
return (f"✅ 审校通过\n题型:{qt} | first={len(f.get('questionSet',[]))}题 second={len(s.get('questionSet',[]))}", False)
hdr = f"❌ 审校发现问题({len(issues)}项)\n题型:{qt} | first={len(f.get('questionSet',[]))}题 second={len(s.get('questionSet',[]))}\n"
return (hdr + "\n".join(issues), True)
def main():
token = get_token()
print(f"✅ Token OK")
err = ok = skip = 0
for tname, tid in TABLES:
print(f"\n📋 {tname}", flush=True)
recs = fetch_all(token, tid)
print(f" 记录:{len(recs)}", flush=True)
for rec in recs:
rid = rec["record_id"]
fld = rec.get("fields", {})
if fld.get("dataStatus") != "0" or not fld.get("jsonData"):
skip += 1
continue
r = audit(rec)
if r is None:
skip += 1
continue
txt, has_err = r
r2 = requests.put(
f"{BASE}/apps/{APP_TOKEN}/tables/{tid}/records/{rid}",
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
json={"fields": {"审校结果": txt}}, timeout=15
)
if r2.json().get("code") == 0:
tag = "🔴" if has_err else ""
print(f" {tag} {rid}", flush=True)
if has_err: err += 1
else: ok += 1
else:
print(f"{rid} write fail: {r2.json().get('msg')}", flush=True)
time.sleep(0.08) # rate limit
print(f"\n✅={ok} 🔴={err} ⏭️={skip}")
if __name__ == "__main__":
main()