ai_member_xiaoyan/skills/interactive-component-json/scripts/pipeline.py

647 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
端到端流水线
飞书wiki URL → 读取文档 → 解析sheet → 类型匹配 → LLM生成jsonData/kpInfo → 写入SQLite
"""
import os
import sys
import json
import time
import logging
import re
import threading
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
CURRENT_PATH = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, CURRENT_PATH)
PROJECT_ROOT = os.path.dirname(CURRENT_PATH)
CONFIG_PATH = os.path.join(PROJECT_ROOT, 'config.json')
from feishu_client import read_wiki_doc_with_sheet
from parse_script import parse_script_from_sheet
from match_component import match_component_type
from generate_json import generate_component
from llm_client import get_client
from db_manager import get_connection, init_db, insert_component, update_component_field
from html_report import generate_html_report
logger = logging.getLogger("pipeline")
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s"
))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def setup_file_logging(title=""):
"""
设置文件日志:将 pipeline 和 llm_client 的日志写入 outputs 目录下的日志文件。
成功时记录摘要,失败时记录完整 prompt + LLM 返回内容。
Returns:
str: 日志文件路径
"""
from datetime import datetime
outputs_dir = os.path.join(PROJECT_ROOT, "outputs")
os.makedirs(outputs_dir, exist_ok=True)
# 生成日志文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_title = re.sub(r'[^\w\u4e00-\u9fff-]', '_', title)[:40] if title else "pipeline"
log_filename = f"{safe_title}_{timestamp}.log"
log_path = os.path.join(outputs_dir, log_filename)
# 创建文件 handler
file_handler = logging.FileHandler(log_path, encoding="utf-8")
file_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(levelname)s - %(name)s - %(filename)s:%(lineno)d - %(message)s"
))
file_handler.setLevel(logging.DEBUG)
# 添加到 pipeline 和 llm_client logger
logger.addHandler(file_handler)
llm_logger = logging.getLogger("llm_client")
llm_logger.addHandler(file_handler)
logger.info(f"日志文件: {log_path}")
return log_path
def _load_proxy_config():
"""读取 config.json 中的 proxy 配置"""
if not os.path.exists(CONFIG_PATH):
return None
try:
with open(CONFIG_PATH, 'r') as f:
return json.load(f).get('proxy')
except Exception:
return None
def _ensure_proxy_running():
"""检测代理服务是否运行,未运行则自动启动。返回代理 URL 或 None。"""
import requests as _req
proxy_cfg = _load_proxy_config()
if not proxy_cfg:
logger.warning("未找到 config.json 或 proxy 配置,跳过代理")
return None
port = proxy_cfg['port']
external_ip = proxy_cfg.get('external_ip', '127.0.0.1')
health_url = f'http://127.0.0.1:{port}/health'
# 健康检查
try:
resp = _req.get(health_url, timeout=2)
if resp.status_code == 200:
logger.info(f"代理服务已运行 (port {port})")
return f'http://{external_ip}:{port}/api/push'
except Exception:
pass
# 尝试启动代理
logger.info("代理服务未运行,正在启动...")
proxy_script = os.path.join(CURRENT_PATH, 'proxy_server.py')
subprocess.Popen(
[sys.executable, proxy_script],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
# 等待启动
for _ in range(5):
time.sleep(1)
try:
resp = _req.get(health_url, timeout=2)
if resp.status_code == 200:
logger.info(f"代理服务启动成功 (port {port})")
return f'http://{external_ip}:{port}/api/push'
except Exception:
continue
logger.warning("代理服务启动失败HTML报告将使用直连地址")
return None
def process_script(wiki_url_or_token, db_path=None, dry_run=False):
"""
端到端处理一个剧本文档
Args:
wiki_url_or_token: 飞书 wiki URL 或 wiki_token
db_path: SQLite 数据库路径(默认使用 db/components.db
dry_run: 如果为 True只生成不写入DB
Returns:
dict: 处理报告
"""
report = {
"wiki_url": wiki_url_or_token,
"title": "",
"total_components": 0,
"success": 0,
"failed": 0,
"skipped": 0,
"results": [],
"errors": [],
}
# 设置文件日志(早期设置,后续用标题重命名)
log_path = setup_file_logging()
# Step 1: 读取飞书文档
logger.info(f"=== Step 1: 读取飞书文档 ===")
try:
doc_data = read_wiki_doc_with_sheet(wiki_url_or_token)
report["title"] = doc_data["title"]
logger.info(f"文档: {doc_data['title']}, obj_token={doc_data['obj_token']}")
except Exception as e:
report["errors"].append(f"读取文档失败: {e}")
logger.error(f"读取文档失败: {e}")
return report
if not doc_data["sheet_rows"] and not doc_data.get("all_sheets"):
report["errors"].append("文档中未找到内嵌sheet数据")
logger.error("文档中未找到内嵌sheet数据")
return report
# Step 2: 解析剧本尝试所有sheet取组件数最多的结果
logger.info(f"=== Step 2: 解析剧本 ===")
llm_client = get_client()
# MySQL 连通性检测
try:
from kp_matcher import _get_connection
_get_connection()
logger.info("MySQL 连接成功kpId 匹配可用)")
except Exception as e:
logger.error(f"MySQL 连接失败: {e} — kpId 匹配将全部为空")
# 收集所有sheet的数据逐一尝试解析
all_sheets = doc_data.get("all_sheets", [])
if not all_sheets and doc_data["sheet_rows"]:
all_sheets = [doc_data["sheet_rows"]]
best_parsed = None
best_sheet_rows = None
best_count = 0
for idx, sheet_rows in enumerate(all_sheets):
if not sheet_rows or len(sheet_rows) < 2:
continue
try:
parsed = parse_script_from_sheet(sheet_rows, doc_data["markdown"], llm_client=llm_client)
count = len(parsed.get("components", []))
logger.info(f" Sheet[{idx}]: 识别到 {count} 个组件")
if count > best_count:
best_count = count
best_parsed = parsed
best_sheet_rows = sheet_rows
except Exception as e:
logger.warning(f" Sheet[{idx}] 解析失败: {e}")
if best_parsed is None or best_count == 0:
report["errors"].append("未识别到任何组件行已尝试所有sheet")
return report
parsed = best_parsed
character_map = parsed["character_map"]
section_char_map = parsed.get("section_char_map", [])
components = parsed["components"]
# 更新 doc_data 中的 sheet_rows 为匹配到的那个
if best_sheet_rows is not None:
doc_data["sheet_rows"] = best_sheet_rows
report["total_components"] = len(components)
logger.info(f"最终识别到 {len(components)} 个组件, 角色映射: {character_map}, section映射: {len(section_char_map)}")
# Step 3: 初始化DB
if not dry_run:
init_db()
logger.info(f"数据库已初始化")
# Step 4: 并行组件生成
logger.info(f"=== Step 3: 并行组件生成 jsonData/kpInfo (workers=4) ===")
# 提取元数据
metadata = parsed.get("metadata", {})
script_id = doc_data["obj_token"]
script_title = doc_data["title"]
# 从标题提取 level 和 unit
level = _extract_level(script_title)
unit_id = _extract_unit(script_title)
# 预加载 examples cache线程安全主线程加载一次后只读
try:
from generate_json import _load_examples
_load_examples()
except Exception:
pass
# 进度锁
_progress_lock = threading.Lock()
_progress = {"success": 0, "failed": 0, "skipped": 0}
def _process_one(i, comp):
"""处理单个组件(线程 worker"""
from llm_client import LLMClient
from kp_matcher import _close_connection
# 每个 worker 独立 LLM client
worker_llm = LLMClient()
cId = comp["cId"]
type_name = comp["type_name"]
try:
# 类型匹配
type_info = match_component_type(type_name)
cType = type_info["cType"]
except ValueError as e:
with _progress_lock:
_progress["skipped"] += 1
logger.warning(f"跳过: {e}")
return {"index": i, "cId": cId, "type_name": type_name,
"status": "skipped", "reason": str(e),
"raw_config": comp.get("teaching_config", "")}
if cType is None:
with _progress_lock:
_progress["skipped"] += 1
logger.warning(f"跳过: {type_name} 尚未实现")
return {"index": i, "cId": cId, "type_name": type_name,
"status": "skipped", "reason": f"类型 '{type_name}' 尚未实现生成器",
"raw_config": comp.get("teaching_config", "")}
try:
# LLM 生成(使用 worker 独立的 llm_client
result = generate_component(comp, character_map, section_char_map, worker_llm, level=level)
category = result.get("category", "mid")
entry = {
"index": i,
"cId": cId,
"cType": result["cType"],
"type_name": type_name,
"category": category,
"result": result,
}
if dry_run:
entry["status"] = "generated"
else:
# 写入数据库
component_id = insert_component(
script_id=script_id,
cType=result["cType"],
type_name=type_name,
category=category,
has_image=result["has_image"],
level=level,
unit_id=unit_id,
knowledge_points_raw=comp.get("knowledge_text", ""),
raw_config=comp["teaching_config"],
component_index=i,
script_title=script_title,
bitable_token=result.get("bitable"),
db_table=result.get("db_table"),
)
update_component_field(component_id, "final_config_json",
json.dumps(result["jsonData"], ensure_ascii=False))
if result.get("kpInfo"):
update_component_field(component_id, "kp_info_json",
json.dumps(result["kpInfo"], ensure_ascii=False))
if category == "core":
_core_db_map = [
("task_info_json", "taskInfo"),
("material_info_json", "materialInfo"),
("flow_info_json", "flowInfo"),
("study_info_json", "studyInfo"),
("question_group_json", "questionGroup"),
("dialog_setting_json", "dialogSetting"),
("dialog_config_json", "dialogConfig"),
("image_info_json", "imageInfo"),
("option_list_json", "optionList"),
("question_list_json", "questionList"),
("pre_dialog_json", "preDialog"),
("dialog_list_json", "dialogList"),
("text_info_json", "textInfo"),
("eval_info_json", "evalInfo"),
]
for field, key in _core_db_map:
data = result.get(key)
if data:
update_component_field(component_id, field,
json.dumps(data, ensure_ascii=False))
if result.get("intermediate"):
update_component_field(component_id, "intermediate_json",
json.dumps(result["intermediate"], ensure_ascii=False))
update_component_field(component_id, "status", "generated")
entry["status"] = "saved"
entry["component_id"] = component_id
with _progress_lock:
_progress["success"] += 1
done = _progress["success"] + _progress["failed"] + _progress["skipped"]
logger.info(f"成功: {cType} ({cId})")
print(f" ✓ [{done}/{len(components)}] {type_name} ({cType})", flush=True)
return entry
except Exception as e:
import traceback as _tb
with _progress_lock:
_progress["failed"] += 1
done = _progress["success"] + _progress["failed"] + _progress["skipped"]
# 详细日志:包含 teaching_config 和完整堆栈
logger.error(
f"组件生成失败!\n"
f"── 组件信息 ──\n"
f" index={i}, cId={cId}, type_name={type_name}\n"
f"── teaching_config ({len(comp.get('teaching_config', ''))} chars) ──\n"
f"{comp.get('teaching_config', '')[:2000]}\n"
f"── 错误堆栈 ──\n{_tb.format_exc()}"
)
print(f" ✗ [{done}/{len(components)}] {type_name}: {str(e)[:60]}", flush=True)
return {"index": i, "cId": cId, "type_name": type_name,
"status": "failed", "error": str(e),
"raw_config": comp.get("teaching_config", "")}
finally:
_close_connection()
# 执行并行处理
results_map = {}
max_workers = min(4, len(components))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(_process_one, i, comp): i
for i, comp in enumerate(components, 1)
}
for future in as_completed(futures):
res = future.result()
results_map[res["index"]] = res
# 按原始顺序组装 report
for i in sorted(results_map.keys()):
res = results_map[i]
status = res["status"]
if status in ("generated", "saved"):
result_data = res.get("result", {})
category = res.get("category", "mid")
entry = {
"cId": res["cId"],
"cType": res.get("cType", ""),
"type_name": res["type_name"],
"category": category,
"status": status,
"jsonData": result_data.get("jsonData"),
"kpInfo": result_data.get("kpInfo"),
}
if status == "saved":
entry["component_id"] = res.get("component_id")
if category == "core":
for k in ["taskInfo", "materialInfo", "flowInfo", "studyInfo",
"questionGroup", "dialogSetting", "dialogConfig",
"imageInfo", "optionList", "questionList",
"preDialog", "dialogList", "textInfo", "evalInfo",
"intermediate"]:
if result_data.get(k) is not None:
entry[k] = result_data[k]
report["results"].append(entry)
elif status == "skipped":
report["results"].append({
"cId": res["cId"], "type_name": res["type_name"],
"status": "skipped", "reason": res.get("reason", ""),
"raw_config": res.get("raw_config", ""),
})
elif status == "failed":
report["results"].append({
"cId": res["cId"], "type_name": res["type_name"],
"status": "failed", "error": res.get("error", ""),
"raw_config": res.get("raw_config", ""),
})
report["success"] = _progress["success"]
report["failed"] = _progress["failed"]
report["skipped"] = _progress["skipped"]
# 记录日志路径到报告
report["log_path"] = log_path
# 最终进度
print(f"\n[完成] 总计:{len(components)} 成功:{report['success']} 失败:{report['failed']} 跳过:{report['skipped']}", flush=True)
# 打印报告 + 生成交互式HTML
_print_report(report)
# 确保代理服务运行
proxy_url = _ensure_proxy_running()
# 始终生成交互式 HTML 报告
if report["success"] > 0:
try:
from llm_client import DEFAULT_MODEL
wiki_token = wiki_url_or_token
if "/" in wiki_token:
wiki_token = wiki_token.rstrip("/").split("/")[-1]
html_path = generate_html_report(report, model_name=DEFAULT_MODEL, wiki_token=wiki_token, api_url=proxy_url)
report["html_path"] = html_path
logger.info(f"HTML报告已生成: {html_path}")
except Exception as e:
logger.error(f"HTML报告生成失败: {e}")
return report
def _extract_level(title):
"""从标题提取级别(如 L1, L2"""
m = re.search(r'\b(L\d+)\b', title, re.IGNORECASE)
return m.group(1).upper() if m else "L1"
def _extract_unit(title):
"""从标题提取单元(如 U14"""
m = re.search(r'\b(U\d+)\b', title, re.IGNORECASE)
return m.group(1).upper() if m else None
def _print_report(report):
"""打印处理报告"""
print("\n" + "=" * 60)
print(f"处理报告: {report['title']}")
print("=" * 60)
print(f"总组件数: {report['total_components']}")
print(f"成功: {report['success']}")
print(f"失败: {report['failed']}")
print(f"跳过: {report['skipped']}")
if report["errors"]:
print(f"\n全局错误:")
for err in report["errors"]:
print(f" - {err}")
print(f"\n详细结果:")
for r in report["results"]:
status = r["status"]
if status in ("generated", "saved"):
print(f" OK [{r['cId']}] {r['type_name']}{r['cType']}")
elif status == "skipped":
print(f" SKIP [{r['cId']}] {r['type_name']}: {r.get('reason', '')}")
elif status == "failed":
print(f" FAIL [{r['cId']}] {r['type_name']}: {r.get('error', '')[:80]}")
print("=" * 60)
def send_report_via_bot(html_path, receive_id, receive_id_type="user_id", agent_name="xiaoyan"):
"""
通过飞书 Bot 身份发送 HTML 报告文件
Args:
html_path: HTML 报告文件的本地路径
receive_id: 接收者 ID (user_id 或 chat_id)
receive_id_type: ID 类型 ("user_id""chat_id")
agent_name: Bot agent 名称
Returns:
dict: {"success": bool, "message_id": str or None, "error": str or None}
"""
import subprocess
cred_path = f"/root/.openclaw/credentials/{agent_name}/config.json"
if not os.path.exists(cred_path):
return {"success": False, "message_id": None, "error": f"凭证文件不存在: {cred_path}"}
if not os.path.exists(html_path):
return {"success": False, "message_id": None, "error": f"HTML文件不存在: {html_path}"}
# 读取凭证
with open(cred_path, 'r') as f:
cred = json.load(f)
app_id = cred["apps"][0]["appId"]
app_secret = cred["apps"][0]["appSecret"]
# 获取 tenant_access_token
import requests
token_resp = requests.post(
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": app_id, "app_secret": app_secret},
timeout=10,
)
token_data = token_resp.json()
if token_data.get("code") != 0:
return {"success": False, "message_id": None, "error": f"获取token失败: {token_data}"}
token = token_data["tenant_access_token"]
# 上传文件
file_name = os.path.basename(html_path)
with open(html_path, 'rb') as f:
upload_resp = requests.post(
"https://open.feishu.cn/open-apis/im/v1/files",
headers={"Authorization": f"Bearer {token}"},
data={"file_type": "stream", "file_name": file_name},
files={"file": (file_name, f, "text/html")},
timeout=30,
)
upload_data = upload_resp.json()
if upload_data.get("code") != 0:
return {"success": False, "message_id": None, "error": f"文件上传失败: {upload_data}"}
file_key = upload_data["data"]["file_key"]
logger.info(f"文件上传成功: file_key={file_key}")
# 发送文件消息
send_resp = requests.post(
f"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type={receive_id_type}",
headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
json={
"receive_id": receive_id,
"msg_type": "file",
"content": json.dumps({"file_key": file_key}),
},
timeout=10,
)
send_data = send_resp.json()
if send_data.get("code") != 0:
return {"success": False, "message_id": None, "error": f"消息发送失败: {send_data}"}
message_id = send_data.get("data", {}).get("message_id")
logger.info(f"HTML报告已发送: message_id={message_id}, receiver={receive_id}")
return {"success": True, "message_id": message_id, "error": None}
# ============ CLI ============
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法:")
print(" python3 pipeline.py <wiki_url_or_token> # 正常运行写入DB + 生成HTML")
print(" python3 pipeline.py <wiki_url_or_token> --dry-run # 仅生成不写入DB")
print(" python3 pipeline.py <wiki_url_or_token> --dry-run --limit 3 # 只处理前3个")
print(" python3 pipeline.py <wiki_url_or_token> --dry-run --send-to <user_id> # 生成后发送")
print(" python3 pipeline.py <wiki_url_or_token> --dry-run --send-to-chat <chat_id> # 发到群")
sys.exit(1)
wiki_input = sys.argv[1]
dry_run = "--dry-run" in sys.argv
# 支持 --limit N 参数
limit = None
if "--limit" in sys.argv:
idx = sys.argv.index("--limit")
if idx + 1 < len(sys.argv):
limit = int(sys.argv[idx + 1])
# 支持 --send-to <user_id> 参数
send_to_user = None
if "--send-to" in sys.argv:
idx = sys.argv.index("--send-to")
if idx + 1 < len(sys.argv):
send_to_user = sys.argv[idx + 1]
# 支持 --send-to-chat <chat_id> 参数
send_to_chat = None
if "--send-to-chat" in sys.argv:
idx = sys.argv.index("--send-to-chat")
if idx + 1 < len(sys.argv):
send_to_chat = sys.argv[idx + 1]
if limit:
original_parse = parse_script_from_sheet
def limited_parse(sheet_rows, markdown="", llm_client=None):
result = original_parse(sheet_rows, markdown, llm_client=llm_client)
result["components"] = result["components"][:limit]
return result
import parse_script
parse_script.parse_script_from_sheet = limited_parse
globals()["parse_script_from_sheet"] = limited_parse
report = process_script(wiki_input, dry_run=dry_run)
# 输出日志和HTML路径
if report.get("log_path"):
print(f"\n详细日志: {report['log_path']}")
if report.get("html_path"):
print(f"HTML报告: {report['html_path']}")
# 发送 HTML 报告文件
if report.get("html_path") and (send_to_user or send_to_chat):
if send_to_user:
result = send_report_via_bot(report["html_path"], send_to_user, "user_id")
else:
result = send_report_via_bot(report["html_path"], send_to_chat, "chat_id")
if result["success"]:
print(f"\n✅ HTML报告已发送: {result['message_id']}")
else:
print(f"\n❌ 发送失败: {result['error']}")
sys.exit(1)
# 退出码
if report["failed"] > 0:
sys.exit(1)