ai_member_xiaoban/scripts/whisper_batch.py
2026-06-01 08:00:01 +08:00

143 lines
5.1 KiB
Python

#!/usr/bin/env python3
"""
批量 Whisper 转录脚本
- 并行度: 4 workers (medium模型, 31GB RAM安全)
- 输出: 每个视频的 .txt 逐字稿 + batch_summary.json
"""
import subprocess
import json
import os
import sys
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
VIDEO_DIR = "/root/.openclaw/workspace-xiaoban/tmp/Lina先生_视频/Lina先生_作品下载"
OUTPUT_DIR = "/root/.openclaw/workspace-xiaoban/tmp/whisper_output"
MODEL = "medium"
LANGUAGE = "Chinese"
WORKERS = 4 # 保守并行度
os.makedirs(OUTPUT_DIR, exist_ok=True)
def get_videos():
"""获取所有待处理视频列表"""
videos = []
for f in sorted(Path(VIDEO_DIR).glob("*.mp4")):
# 跳过已处理的
out_txt = Path(OUTPUT_DIR) / (f.stem + ".txt")
if out_txt.exists() and out_txt.stat().st_size > 0:
continue
# 获取时长
try:
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "csv=p=0", str(f)],
capture_output=True, text=True, timeout=10
)
duration = float(result.stdout.strip()) if result.stdout.strip() else 0
except:
duration = 0
videos.append({
"path": str(f),
"name": f.name,
"duration": duration
})
return videos
def transcribe(video_info):
"""单个视频转录"""
f = video_info["path"]
name = video_info["name"]
duration = video_info["duration"]
out_txt = os.path.join(OUTPUT_DIR, Path(f).stem + ".txt")
# 如果已存在有效输出,跳过
if os.path.exists(out_txt) and os.path.getsize(out_txt) > 10:
with open(out_txt, 'r') as fh:
content = fh.read()
return {"name": name, "duration": duration, "text": content, "elapsed": 0, "cached": True}
start = time.time()
try:
result = subprocess.run(
["/usr/local/bin/whisper", f,
"--model", MODEL,
"--language", LANGUAGE,
"--output_dir", OUTPUT_DIR,
"--output_format", "txt"],
capture_output=True, text=True, timeout=1800 # 30min max per video
)
elapsed = time.time() - start
if os.path.exists(out_txt):
with open(out_txt, 'r') as fh:
content = fh.read()
else:
content = "[转录失败]"
return {"name": name, "duration": duration, "text": content, "elapsed": elapsed, "cached": False}
except subprocess.TimeoutExpired:
elapsed = time.time() - start
return {"name": name, "duration": duration, "text": "[超时]", "elapsed": elapsed, "cached": False}
except Exception as e:
elapsed = time.time() - start
return {"name": name, "duration": duration, "text": f"[错误: {e}]", "elapsed": elapsed, "cached": False}
def main():
videos = get_videos()
total = len(videos)
total_duration = sum(v["duration"] for v in videos)
print(f"待处理: {total} 个视频, 总时长: {total_duration/3600:.1f} 小时")
print(f"模型: {MODEL}, 并行: {WORKERS} workers")
print(f"预计耗时: {total_duration * 2.6 / WORKERS / 3600:.1f} 小时")
print("=" * 60)
done = 0
total_elapsed = 0
start_time = time.time()
with ProcessPoolExecutor(max_workers=WORKERS) as executor:
futures = {executor.submit(transcribe, v): v for v in videos}
for future in as_completed(futures):
v = futures[future]
done += 1
try:
result = future.result()
cached = result.get("cached", False)
elapsed = result.get("elapsed", 0)
total_elapsed += elapsed
pct = done / total * 100
elapsed_all = time.time() - start_time
eta = elapsed_all / done * (total - done) if done > 0 else 0
tag = "♻️缓存" if cached else f"{elapsed:.0f}s"
print(f"[{done}/{total} {pct:.1f}%] {tag} | ETA: {eta/60:.0f}min | {result['name'][:50]}...")
# 每20个输出一次进度文件
if done % 20 == 0:
progress = {
"done": done, "total": total,
"elapsed_total": elapsed_all,
"eta_seconds": eta,
"pct": pct
}
with open(os.path.join(OUTPUT_DIR, "_progress.json"), 'w') as fp:
json.dump(progress, fp, indent=2)
except Exception as e:
print(f"[{done}/{total}] ❌ 错误: {v['name'][:50]}... => {e}")
total_time = time.time() - start_time
print(f"\n✅ 全部完成! 总耗时: {total_time/3600:.1f} 小时")
# 写入最终进度
with open(os.path.join(OUTPUT_DIR, "_progress.json"), 'w') as fp:
json.dump({"done": total, "total": total, "status": "complete"}, fp, indent=2)
if __name__ == "__main__":
main()