#!/usr/bin/env python3 """ 互动组件配置 SQLite 数据库管理器 负责建表、CRUD操作、查询、导出 """ import sqlite3 import json import os from datetime import datetime DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "db", "components.db") def get_connection(): """获取数据库连接""" os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def init_db(): """初始化数据库表结构""" conn = get_connection() cursor = conn.cursor() # 组件主表 cursor.execute(""" CREATE TABLE IF NOT EXISTS components ( component_id INTEGER PRIMARY KEY AUTOINCREMENT, script_id TEXT NOT NULL, -- 剧本标识(如 S01E01、文档token等) component_index INTEGER DEFAULT 0, -- 组件在剧本中的顺序号 component_type TEXT NOT NULL, -- 组件类型标识(dialogue_reading等) component_subtype TEXT DEFAULT 'basic', -- 子类型(basic/with_image/核心互动/导览配置) level TEXT DEFAULT 'L1', -- 级别(L1/L2) unit_id TEXT, -- 单元标识 raw_text TEXT, -- 原始剧本文本 parsed_data TEXT, -- JSON: 解析后的结构化中间数据 knowledge_points TEXT, -- JSON: 匹配到的知识点 ai_derived_fields TEXT, -- JSON: AI衍生字段 final_config_json TEXT, -- JSON: 最终配置 status TEXT DEFAULT 'draft' CHECK(status IN ('draft','parsed','matched','generated','validated','exported')), created_at TEXT DEFAULT (datetime('now','localtime')), updated_at TEXT DEFAULT (datetime('now','localtime')) ) """) # 生成日志表 cursor.execute(""" CREATE TABLE IF NOT EXISTS generation_logs ( log_id INTEGER PRIMARY KEY AUTOINCREMENT, component_id INTEGER NOT NULL, step TEXT NOT NULL, -- parse/match_type/match_knowledge/ai_derive/generate_json/validate input_summary TEXT, -- 输入摘要 output_summary TEXT, -- 输出摘要 model_used TEXT, -- 使用的AI模型(如有) success INTEGER DEFAULT 1, -- 1=成功 0=失败 error_message TEXT, -- 失败时的错误信息 duration_ms INTEGER, -- 耗时(毫秒) created_at TEXT DEFAULT (datetime('now','localtime')), FOREIGN KEY (component_id) REFERENCES components(component_id) ON DELETE CASCADE ) """) # 索引 cursor.execute("CREATE INDEX IF NOT EXISTS idx_components_script ON components(script_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_components_type ON components(component_type)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_components_status ON components(status)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_logs_component ON generation_logs(component_id)") conn.commit() conn.close() print(f"✅ 数据库初始化完成: {DB_PATH}") # ============ CRUD 操作 ============ def insert_component(script_id, component_type, component_subtype="basic", level="L1", unit_id=None, raw_text=None, component_index=0): """插入一条新的组件记录,返回 component_id""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO components (script_id, component_index, component_type, component_subtype, level, unit_id, raw_text) VALUES (?, ?, ?, ?, ?, ?, ?) """, (script_id, component_index, component_type, component_subtype, level, unit_id, raw_text)) component_id = cursor.lastrowid conn.commit() conn.close() return component_id def update_component_field(component_id, field, value): """更新组件的某个字段(支持 parsed_data, knowledge_points, ai_derived_fields, final_config_json, status)""" allowed_fields = {'parsed_data', 'knowledge_points', 'ai_derived_fields', 'final_config_json', 'status', 'raw_text'} if field not in allowed_fields: raise ValueError(f"不允许更新的字段: {field}") conn = get_connection() cursor = conn.cursor() cursor.execute(f""" UPDATE components SET {field} = ?, updated_at = datetime('now','localtime') WHERE component_id = ? """, (value, component_id)) conn.commit() conn.close() def get_component(component_id): """获取单个组件""" conn = get_connection() row = conn.execute("SELECT * FROM components WHERE component_id = ?", (component_id,)).fetchone() conn.close() return dict(row) if row else None def list_components(script_id=None, component_type=None, status=None, limit=100): """查询组件列表""" conn = get_connection() query = "SELECT * FROM components WHERE 1=1" params = [] if script_id: query += " AND script_id = ?" params.append(script_id) if component_type: query += " AND component_type = ?" params.append(component_type) if status: query += " AND status = ?" params.append(status) query += " ORDER BY script_id, component_index LIMIT ?" params.append(limit) rows = conn.execute(query, params).fetchall() conn.close() return [dict(r) for r in rows] def insert_log(component_id, step, input_summary=None, output_summary=None, model_used=None, success=True, error_message=None, duration_ms=None): """插入一条生成日志""" conn = get_connection() conn.execute(""" INSERT INTO generation_logs (component_id, step, input_summary, output_summary, model_used, success, error_message, duration_ms) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (component_id, step, input_summary, output_summary, model_used, 1 if success else 0, error_message, duration_ms)) conn.commit() conn.close() def get_logs(component_id): """获取某组件的所有日志""" conn = get_connection() rows = conn.execute(""" SELECT * FROM generation_logs WHERE component_id = ? ORDER BY created_at """, (component_id,)).fetchall() conn.close() return [dict(r) for r in rows] def get_stats(): """获取数据库统计信息""" conn = get_connection() stats = {} stats['total'] = conn.execute("SELECT COUNT(*) FROM components").fetchone()[0] for status in ('draft', 'parsed', 'matched', 'generated', 'validated', 'exported'): stats[status] = conn.execute( "SELECT COUNT(*) FROM components WHERE status = ?", (status,) ).fetchone()[0] stats['by_type'] = {} rows = conn.execute( "SELECT component_type, COUNT(*) as cnt FROM components GROUP BY component_type" ).fetchall() for r in rows: stats['by_type'][r['component_type']] = r['cnt'] conn.close() return stats def export_final_json(script_id=None, status='validated'): """导出最终配置JSON""" components = list_components(script_id=script_id, status=status, limit=10000) result = [] for c in components: if c['final_config_json']: entry = { 'component_id': c['component_id'], 'script_id': c['script_id'], 'component_index': c['component_index'], 'component_type': c['component_type'], 'component_subtype': c['component_subtype'], 'config': json.loads(c['final_config_json']) } result.append(entry) return result # ============ CLI入口 ============ if __name__ == "__main__": import sys if len(sys.argv) < 2: print("用法: python3 db_manager.py ") print(" init - 初始化数据库") print(" stats - 查看统计信息") print(" list - 列出所有组件") sys.exit(1) cmd = sys.argv[1] if cmd == "init": init_db() elif cmd == "stats": init_db() # 确保表存在 stats = get_stats() print(json.dumps(stats, indent=2, ensure_ascii=False)) elif cmd == "list": init_db() components = list_components() for c in components: print(f"[{c['component_id']}] {c['script_id']} | {c['component_type']} | {c['status']}") else: print(f"未知命令: {cmd}")