ai_member_xiaoyan/scripts/fix_p4_dialogue.py

153 lines
5.3 KiB
Python

#!/usr/bin/env python3
"""
Fix P4 题目1/题目2 fields: re-add dialogue text
The original script's pop("dialogue") corrupted module-level data during dry run.
This script extracts dialogues from the rewrite script source and writes correct 题目1/题目2.
"""
import json, subprocess, os, re
APP_TOKEN = "CMHSbUUjka3TrUsaxxEc297ongf"
TABLE_ID = "tblVmeDtBDKsAEfz"
def get_token():
CRED_FILE = "/root/.openclaw/credentials/xiaoyan/config.json"
with open(CRED_FILE) as f:
cred = json.load(f)
app_id = cred['apps'][0]['appId']
app_secret = cred['apps'][0]['appSecret']
result = subprocess.run([
"curl", "-s", "-X", "POST",
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
"-H", "Content-Type: application/json",
"-d", json.dumps({"app_id": app_id, "app_secret": app_secret})
], capture_output=True, text=True)
return json.loads(result.stdout)['tenant_access_token']
def get_records(token):
result = subprocess.run([
"curl", "-s", "-X", "GET",
f"https://open.feishu.cn/open-apis/bitable/v1/apps/{APP_TOKEN}/tables/{TABLE_ID}/records?page_size=50",
"-H", f"Authorization: Bearer {token}",
], capture_output=True, text=True)
return json.loads(result.stdout).get('data', {}).get('items', [])
def update_record(token, record_id, fields):
body = json.dumps({"fields": fields}, ensure_ascii=False)
result = subprocess.run([
"curl", "-s", "-X", "PUT",
f"https://open.feishu.cn/open-apis/bitable/v1/apps/{APP_TOKEN}/tables/{TABLE_ID}/records/{record_id}",
"-H", f"Authorization: Bearer {token}",
"-H", "Content-Type: application/json; charset=utf-8",
"-d", body
], capture_output=True, text=True)
return json.loads(result.stdout)
def make_text_with_dialogue(questions):
"""Generate 题目 text from question list, with dialogue"""
lines = []
for i, q in enumerate(questions, 1):
lines.append(f"{i}.")
lines.append(f"【描述】{q['questionDesc']}")
lines.append(f"【听力文本】")
if 'dialogue' in q:
lines.append(q['dialogue'])
lines.append(f"【题目】")
lines.append(q['question'])
for j, opt in enumerate(q['options']):
marker = "(正确)" if j == q['answer'][0] else ""
lines.append(f"{chr(65+j)}. {opt}{marker}")
lines.append("")
return "\n".join(lines)
# Step 1: Extract all dialogues from rewrite script by re-executing the data definitions
# We exec the script data without running main()
script_path = os.path.dirname(os.path.abspath(__file__))
rewrite_script = os.path.join(script_path, 'rewrite_p4_all.py')
with open(rewrite_script) as f:
script_content = f.read()
# Remove the main() call at the end and replace with our own
# Find and extract dialogue data for each QSID
# Pattern: QSID_XXXXXX_FIRST_DIALOGUES = [...] and QSID_XXXXXX_SECOND_DIALOGUES = [...]
# Parse the script content to get all dialogue arrays
qsid_dialogues = {}
# Find all DIALOGUES blocks
pattern = r"QSID_(\d{6})_(FIRST|SECOND)_DIALOGUES\s*=\s*\[(.*?)\]"
matches = list(re.finditer(pattern, script_content, re.DOTALL))
for m in matches:
qsid = m.group(1)
block_type = m.group(2).lower()
dialogues_raw = m.group(3)
# Extract individual dialogue strings (triple-quoted)
dialogue_pattern = r'"""(.*?)"""'
dialogues = re.findall(dialogue_pattern, dialogues_raw, re.DOTALL)
dialogues = [d.strip() for d in dialogues]
key = f"{qsid}_{block_type}"
qsid_dialogues[key] = dialogues
print(f"Extracted {len(dialogues)} dialogues for {qsid} {block_type}")
# Now assign dialogues and regenerate text
token = get_token()
records = get_records(token)
for rec in records:
f = rec.get('fields', {})
qsid = f.get('题目集合 ID')
if not qsid:
continue
jd = f.get('jsonData')
if not jd:
continue
try:
parsed = json.loads(jd)
except:
continue
first_qs = parsed.get('first', {}).get('questionSet', [])
second_qs = parsed.get('second', {}).get('questionSet', [])
# Assign dialogues
first_dials = qsid_dialogues.get(f"{qsid}_first", [])
second_dials = qsid_dialogues.get(f"{qsid}_second", [])
for i, d in enumerate(first_dials):
if i < len(first_qs):
first_qs[i]['dialogue'] = d
for i, d in enumerate(second_dials):
if i < len(second_qs):
second_qs[i]['dialogue'] = d
# Generate text
q1_text = make_text_with_dialogue(first_qs) if first_qs else ""
q2_text = make_text_with_dialogue(second_qs) if second_qs else ""
# Check if dialogue is actually included
has_dialogue = 'Ben' in q1_text or 'Lucy' in q1_text or 'Daisy' in q1_text
if not has_dialogue:
print(f"⚠️ {qsid}: 题目1 still missing dialogue!")
print(f" first_dials count: {len(first_dials)}, first_qs count: {len(first_qs)}")
continue
# Write
result = update_record(token, rec['record_id'], {
"题目1": q1_text,
"题目2": q2_text,
})
code = result.get('code', -1)
if code == 0:
print(f"{qsid}: 题目1={len(q1_text)}bytes 题目2={len(q2_text)}bytes")
else:
print(f"{qsid}: {result.get('msg', 'unknown')}")
print("\nDone!")