#!/usr/bin/env python3 """ 批量 Whisper 转录脚本 - 并行度: 4 workers (medium模型, 31GB RAM安全) - 输出: 每个视频的 .txt 逐字稿 + batch_summary.json """ import subprocess import json import os import sys import time from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path VIDEO_DIR = "/root/.openclaw/workspace-xiaoban/tmp/Lina先生_视频/Lina先生_作品下载" OUTPUT_DIR = "/root/.openclaw/workspace-xiaoban/tmp/whisper_output" MODEL = "medium" LANGUAGE = "Chinese" WORKERS = 4 # 保守并行度 os.makedirs(OUTPUT_DIR, exist_ok=True) def get_videos(): """获取所有待处理视频列表""" videos = [] for f in sorted(Path(VIDEO_DIR).glob("*.mp4")): # 跳过已处理的 out_txt = Path(OUTPUT_DIR) / (f.stem + ".txt") if out_txt.exists() and out_txt.stat().st_size > 0: continue # 获取时长 try: result = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "csv=p=0", str(f)], capture_output=True, text=True, timeout=10 ) duration = float(result.stdout.strip()) if result.stdout.strip() else 0 except: duration = 0 videos.append({ "path": str(f), "name": f.name, "duration": duration }) return videos def transcribe(video_info): """单个视频转录""" f = video_info["path"] name = video_info["name"] duration = video_info["duration"] out_txt = os.path.join(OUTPUT_DIR, Path(f).stem + ".txt") # 如果已存在有效输出,跳过 if os.path.exists(out_txt) and os.path.getsize(out_txt) > 10: with open(out_txt, 'r') as fh: content = fh.read() return {"name": name, "duration": duration, "text": content, "elapsed": 0, "cached": True} start = time.time() try: result = subprocess.run( ["/usr/local/bin/whisper", f, "--model", MODEL, "--language", LANGUAGE, "--output_dir", OUTPUT_DIR, "--output_format", "txt"], capture_output=True, text=True, timeout=1800 # 30min max per video ) elapsed = time.time() - start if os.path.exists(out_txt): with open(out_txt, 'r') as fh: content = fh.read() else: content = "[转录失败]" return {"name": name, "duration": duration, "text": content, "elapsed": elapsed, "cached": False} except subprocess.TimeoutExpired: elapsed = time.time() - start return {"name": name, "duration": duration, "text": "[超时]", "elapsed": elapsed, "cached": False} except Exception as e: elapsed = time.time() - start return {"name": name, "duration": duration, "text": f"[错误: {e}]", "elapsed": elapsed, "cached": False} def main(): videos = get_videos() total = len(videos) total_duration = sum(v["duration"] for v in videos) print(f"待处理: {total} 个视频, 总时长: {total_duration/3600:.1f} 小时") print(f"模型: {MODEL}, 并行: {WORKERS} workers") print(f"预计耗时: {total_duration * 2.6 / WORKERS / 3600:.1f} 小时") print("=" * 60) done = 0 total_elapsed = 0 start_time = time.time() with ProcessPoolExecutor(max_workers=WORKERS) as executor: futures = {executor.submit(transcribe, v): v for v in videos} for future in as_completed(futures): v = futures[future] done += 1 try: result = future.result() cached = result.get("cached", False) elapsed = result.get("elapsed", 0) total_elapsed += elapsed pct = done / total * 100 elapsed_all = time.time() - start_time eta = elapsed_all / done * (total - done) if done > 0 else 0 tag = "♻️缓存" if cached else f"⏱{elapsed:.0f}s" print(f"[{done}/{total} {pct:.1f}%] {tag} | ETA: {eta/60:.0f}min | {result['name'][:50]}...") # 每20个输出一次进度文件 if done % 20 == 0: progress = { "done": done, "total": total, "elapsed_total": elapsed_all, "eta_seconds": eta, "pct": pct } with open(os.path.join(OUTPUT_DIR, "_progress.json"), 'w') as fp: json.dump(progress, fp, indent=2) except Exception as e: print(f"[{done}/{total}] ❌ 错误: {v['name'][:50]}... => {e}") total_time = time.time() - start_time print(f"\n✅ 全部完成! 总耗时: {total_time/3600:.1f} 小时") # 写入最终进度 with open(os.path.join(OUTPUT_DIR, "_progress.json"), 'w') as fp: json.dump({"done": total, "total": total, "status": "complete"}, fp, indent=2) if __name__ == "__main__": main()