Files
lesson-highlights/src/core/pipeline.py
T

848 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Pipeline - 核心业务逻辑
统一管理从视频提取到最终输出的完整流程
UI和CLI共用同一套逻辑
"""
import os
import json
import logging
from typing import Callable, Optional, List, Dict, Any
from .video import extract_clip, merge_clips, burn_dual_subtitles
from .subtitle import SubtitlePipeline, correct_subtitles_llm
from .llm import LLMClient
from .corrections import apply_all_corrections, load_term_corrections_from_config
from .utils import ensure_dir
logger = logging.getLogger(__name__)
class Pipeline:
"""
精华视频生成流水线
使用方法:
# CLI模式
pipeline = Pipeline(config)
pipeline.run()
# UI模式 (带回调)
pipeline = Pipeline(config, progress_callback=my_callback)
pipeline.run()
"""
def __init__(
self,
config: dict,
progress_callback: Optional[Callable[[str, int, str], None]] = None,
step_callback: Optional[Callable[[str], None]] = None,
):
"""
初始化流水线
Args:
config: 配置字典,包含:
- video_src: 视频路径
- clips: [{title, start, end}, ...]
- output_dir: 输出目录
- api_key: LLM API密钥
- api_host: LLM API地址
- whisper_model_path: Whisper模型路径
- term_corrections: 术语纠正字典
- video_params: 视频参数
progress_callback: 进度回调 (step, percent, message)
step_callback: 步骤开始/完成回调 (step_name)
"""
self.config = config
self.progress_callback = progress_callback if progress_callback else (lambda s, p, m: logger.info(f"[{s}] {p}%: {m}"))
self.step_callback = step_callback if step_callback else (lambda s: None)
# 路径
self.output_dir = config.get('output_dir', './output')
self.inter_dir = ensure_dir(os.path.join(self.output_dir, 'intermediates'))
self.subs_dir = ensure_dir(os.path.join(self.output_dir, 'subs'))
# 配置
self.clips = config.get('clips', [])
self.video_src = config.get('video_src')
self.video_params = config.get('video_params', {})
self.fade_duration = self.video_params.get('fade_duration', 1)
# LLM客户端 (延迟初始化)
self._llm_client = None
# 字幕处理
self._subtitle_pipeline = None
# 术语纠正
self.term_corrections = load_term_corrections_from_config(config)
@property
def llm_client(self) -> LLMClient:
if self._llm_client is None:
self._llm_client = LLMClient(
api_key=self.config.get('api_key'),
api_host=self.config.get('api_host')
)
return self._llm_client
@property
def subtitle_pipeline(self) -> SubtitlePipeline:
if self._subtitle_pipeline is None:
self._subtitle_pipeline = SubtitlePipeline(self.config, self.output_dir)
return self._subtitle_pipeline
# ==================== 步骤方法 ====================
def step_extract(self) -> List[str]:
"""
Step 1: 提取视频片段
Returns:
clip_paths: 提取的片段路径列表
"""
self.step_callback('extracting')
self.progress_callback('extracting', 0, "开始提取片段...")
if not self.clips:
raise ValueError("No clips configured")
if not self.video_src or not os.path.exists(self.video_src):
raise ValueError(f"Video file not found: {self.video_src}")
clip_paths = []
total = len(self.clips)
for i, clip in enumerate(self.clips, 1):
clip_path = os.path.join(self.inter_dir, f"clip{i}.mp4")
fade_path = os.path.join(self.inter_dir, f"clip{i}_fade.mp4")
# 提取片段
success = extract_clip(
self.video_src,
clip['start'],
clip['end'],
clip_path,
fade_duration=0 # 先不添加淡出
)
if not success:
logger.warning(f"Failed to extract clip {i}")
continue
# 如果需要淡入淡出
if self.fade_duration > 0:
duration = clip['end'] - clip['start']
fade_out_start = max(0, duration - self.fade_duration)
from .constants import FFMPEG_CMD
from .utils import run_cmd
cmd = f'"{FFMPEG_CMD}" -y -i "{clip_path}" '
cmd += f'-vf "fade=t=in:st=0:d={self.fade_duration},fade=t=out:st={fade_out_start}:d={self.fade_duration}" '
cmd += f'-c:v libx264 -crf 20 -c:a aac -y "{fade_path}"'
if run_cmd(cmd):
clip_paths.append(fade_path)
else:
clip_paths.append(clip_path)
else:
clip_paths.append(clip_path)
percent = int((i / total) * 100)
self.progress_callback('extracting', percent, f"提取片段 {i}/{total}")
self.progress_callback('extracting', 100, f"提取完成,共 {len(clip_paths)} 个片段")
self.step_callback('extracting')
return clip_paths
def step_transcribe(self, clip_paths: List[str]) -> List[str]:
"""
Step 2: 转录片段
Args:
clip_paths: 片段路径列表
Returns:
json_paths: JSON转录文件路径列表
"""
self.step_callback('transcribing')
self.progress_callback('transcribing', 0, "开始转录...")
# 延迟导入,避免没有faster-whisper时无法import
try:
from faster_whisper import WhisperModel
except ImportError:
logger.warning("faster-whisper not available, skipping transcription")
self.progress_callback('transcribing', 100, "faster-whisper未安装,跳过转录")
self.step_callback('transcribing')
return []
model_path = self.config.get('whisper_model_path')
model_name = self.config.get('whisper_model', 'large')
# 加载模型
self.progress_callback('transcribing', 5, "加载Whisper模型...")
model = WhisperModel(model_path or model_name, compute_type="float16")
# 通过YAML配置hash检测配置是否改变,如果改变则删除所有旧JSON
import hashlib
config_str = str([(c['start'], c['end'], c.get('title', '')) for c in self.clips])
config_hash = hashlib.md5(config_str.encode()).hexdigest()
hash_file = os.path.join(self.inter_dir, '.config_hash')
old_hash = None
if os.path.exists(hash_file):
with open(hash_file, 'r') as f:
old_hash = f.read().strip()
if old_hash != config_hash:
# 配置变了,删除所有旧JSON
for f in os.listdir(self.inter_dir):
if f.startswith('clip') and f.endswith('.json'):
os.remove(os.path.join(self.inter_dir, f))
logger.info(f"清理旧JSON: {f} (配置已改变)")
with open(hash_file, 'w') as f:
f.write(config_hash)
logger.info("配置已更新,清除所有旧JSON,重新转录")
json_paths = []
total = len(clip_paths)
for i, clip_path in enumerate(clip_paths, 1):
json_path = os.path.join(self.inter_dir, f"clip{i}.json")
json_paths.append(json_path)
# 如果JSON已存在,跳过
if os.path.exists(json_path):
logger.info(f"Clip {i}: JSON exists, skipping")
self.progress_callback('transcribing', int((i/total)*100), f"跳过片段 {i} (已存在)")
continue
# 转录
self.progress_callback('transcribing', int((i/total)*90), f"转录片段 {i}/{total}")
try:
segments, _ = model.transcribe(clip_path, language='zh', beam_size=5, word_timestamps=True)
# 保存转录结果(按句末标点进一步切分)
segments_data = []
for seg in segments:
words = seg.words if hasattr(seg, 'words') else []
if words:
# 用 word-level 时间戳在句末标点处切分
# 注意:标点可能附着在词后(如"吗?"、"奏,"),需 strip 后判断
_END_MARKS = '。!??'
sub_start = words[0].start
sub_text_parts = []
for word in words:
sub_text_parts.append(word.word)
# 剥离标点后判断是否为句末标记
stripped = word.word.rstrip(',、,')
if any(stripped.endswith(m) for m in _END_MARKS):
sub_end = word.end
sub_text = ''.join(sub_text_parts).strip()
if sub_text:
segments_data.append({'start': sub_start, 'end': sub_end, 'text': sub_text})
sub_start = word.end
sub_text_parts = []
# 剩余未到句末的文本
if sub_text_parts:
remaining = ''.join(sub_text_parts).strip()
if remaining:
segments_data.append({'start': sub_start, 'end': words[-1].end, 'text': remaining})
else:
# fallback:无 word timestamps,直接用原 segment
segments_data.append({
'start': seg.start,
'end': seg.end,
'text': seg.text.strip()
})
with open(json_path, 'w', encoding='utf-8') as f:
json.dump({'segments': segments_data}, f, ensure_ascii=False, indent=2)
logger.info(f"Transcribed clip {i}: {json_path}")
except Exception as e:
logger.error(f"Failed to transcribe clip {i}: {e}")
# 不手动 del model —— CUDA 上下文在 Windows 下销毁时容易触发
# Access Violation (0xC0000005),让进程自然释放即可。
self.progress_callback('transcribing', 100, "转录完成")
self.step_callback('transcribing')
return json_paths
def _recalculate_title_segments_from_transcript(
self,
clips: List[Dict],
json_paths: List[str]
) -> None:
"""
用 transcript 数据重新计算重叠片段的 title_segments 切分点。
重叠片段的 switch_offset 应该按 transcript 中第二个标题关键词
首次出现的时间来算,而不是按 clip 边界。
"""
for i, clip in enumerate(clips):
ts = clip.get('title_segments')
if not ts or len(ts) < 2:
continue
# 取第二个标题段 [title, offset]
second_title, old_offset = ts[1]
json_path = json_paths[i] if i < len(json_paths) else None
if not json_path or not os.path.exists(json_path):
continue
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
except Exception:
continue
# 在 transcript 中搜索 second_title 的首次出现时间
first_time = None
for seg in data.get('segments', []):
for word_info in seg.get('words', []):
w = word_info.get('word', '')
# 关键词匹配(标题可能含多字符,取子串)
if second_title and second_title in w:
first_time = word_info['start']
break
if first_time is not None:
break
if first_time is not None:
new_offset = first_time
clip['title_segments'][1][1] = new_offset
logger.info(
f" clip{i+1} title_segments: "
f"'{second_title}'{old_offset:.2f}s → {new_offset:.2f}s"
)
else:
logger.warning(
f" clip{i+1} title_segments: "
f"未在 transcript 中找到 '{second_title}',保留原 offset {old_offset:.2f}s"
)
def step_generate_subtitles(self, corrected_clips: List[Dict], json_paths: List[str]) -> tuple:
"""
Step 4: 生成字幕
Args:
corrected_clips: 纠正后的片段配置
json_paths: JSON文件路径列表
Returns:
(title_path, content_path): 字幕文件路径
"""
self.step_callback('generating_subtitles')
self.progress_callback('generating_subtitles', 0, "开始生成字幕...")
# 准备clip配置
clip_configs = []
valid_json_paths = []
for i, (clip, json_path) in enumerate(zip(corrected_clips, json_paths), 1):
clip_config = {
'index': i - 1,
'start': clip['start'],
'end': clip['end'],
'title': clip.get('title', clip.get('original_title', '')),
'title_segments': clip.get('title_segments'), # 可能为None
}
clip_configs.append(clip_config)
if json_path and os.path.exists(json_path):
valid_json_paths.append(json_path)
else:
valid_json_path = os.path.join(self.inter_dir, f"clip{i}.json")
if os.path.exists(valid_json_path):
valid_json_paths.append(valid_json_path)
if not valid_json_paths:
raise ValueError("No valid JSON files for subtitle generation")
# 纠错函数
def correct(text):
return apply_all_corrections(text, self.term_corrections)
self.progress_callback('generating_subtitles', 50, "生成字幕轨道...")
# 生成字幕
_, _, title_path, content_path = self.subtitle_pipeline.generate_from_clips(
clip_configs,
valid_json_paths,
apply_corrections=correct
)
self.progress_callback('generating_subtitles', 100, "字幕生成完成")
self.step_callback('generating_subtitles')
return title_path, content_path
def step_correct_subtitles(self, title_path: str, content_path: str) -> str:
"""
Step 4.5: LLM纠正字幕内容
参考title.srt(时间轴锚点)和PPT原文(术语参考),
修正content.srt中的错字、漏字、术语错误。
Args:
title_path: 标题字幕路径
content_path: 内容字幕路径
Returns:
修正后的content_path
"""
ppt_text = self.config.get('ppt_text', '')
if not ppt_text:
logger.warning("PPT原文为空,跳过字幕纠正步骤")
return content_path
self.step_callback('correcting_subtitles')
self.progress_callback('correcting_subtitles', 0, "开始纠正字幕...")
corrected_path = correct_subtitles_llm(
title_path=title_path,
content_path=content_path,
ppt_text=ppt_text,
llm_client=self.llm_client,
)
self.progress_callback('correcting_subtitles', 100, "字幕纠正完成")
self.step_callback('correcting_subtitles')
return corrected_path
def step_merge(self, clip_paths: List[str]) -> str:
"""
Step 5: 合并视频
Args:
clip_paths: 片段路径列表
Returns:
merged_path: 合并后的视频路径
"""
self.step_callback('merging')
self.progress_callback('merging', 0, "开始合并视频...")
if not clip_paths:
raise ValueError("No clips to merge")
merged_path = os.path.join(self.output_dir, "concat_merged.mp4")
success = merge_clips(clip_paths, merged_path, self.inter_dir)
if not success:
raise RuntimeError("Failed to merge clips")
self.progress_callback('merging', 100, f"合并完成: {merged_path}")
self.step_callback('merging')
return merged_path
def step_burn(self, merged_path: str, title_path: str, content_path: str) -> str:
"""
Step 6: 烧录字幕
Args:
merged_path: 合并后的视频路径
title_path: 标题字幕路径
content_path: 正文字幕路径
Returns:
final_path: 最终视频路径
"""
self.step_callback('burning')
self.progress_callback('burning', 0, "开始烧录字幕...")
if not os.path.exists(merged_path):
raise ValueError(f"Merged video not found: {merged_path}")
final_path = os.path.join(self.output_dir, "final.mp4")
video_params = self.config.get('video_params', {})
success = burn_dual_subtitles(
merged_path,
title_path,
content_path,
final_path,
title_fontsize=video_params.get('title_fontsize', 60),
title_color=video_params.get('title_color', 'FFFF00'),
subtitle_fontsize=video_params.get('subtitle_fontsize', 24),
subtitle_color=video_params.get('subtitle_color', 'FFFFFF')
)
if not success:
raise RuntimeError("Failed to burn subtitles")
self.progress_callback('burning', 100, f"完成: {final_path}")
self.step_callback('burning')
return final_path
# ==================== 辅助方法 ====================
def _save_config(self) -> None:
"""将 self.clips 等配置写回 generated_config.yaml。"""
import yaml
config_path = os.path.join(self.output_dir, 'generated_config.yaml')
# 保留原有配置,只更新 clips 和 video_params
saved_config = {}
if os.path.exists(config_path):
try:
with open(config_path, 'r', encoding='utf-8') as f:
saved_config = yaml.safe_load(f) or {}
except Exception:
pass
saved_config['clips'] = self.clips
# 同步 video_params
if 'video_params' not in saved_config:
saved_config['video_params'] = self.config.get('video_params', {})
with open(config_path, 'w', encoding='utf-8') as f:
yaml.dump(saved_config, f, allow_unicode=True, default_flow_style=False)
logger.info(f"配置已保存: {config_path}")
def reextract_clip(self, clip_index: int, new_title: str) -> bool:
"""
用新标题重新匹配单个 clip 的时间段。
Args:
clip_index: clip 在 self.clips 中的索引
new_title: 新的标题文字
Returns:
bool: 是否匹配成功(匹配到了返回 True,匹配不到返回 False)
"""
# 1. 加载 corrected_transcript.json
transcript_path = os.path.join(self.inter_dir, 'corrected_transcript.json')
if not os.path.exists(transcript_path):
logger.warning(f"corrected_transcript.json not found: {transcript_path}")
return False
with open(transcript_path, 'r', encoding='utf-8') as f:
corrected_segments = json.load(f)
# 2. 调用 PPTParser._find_title_in_transcript 匹配新标题
from .ppt_parser import PPTParser
# 用 __new__ 绕过 __init__,只设置 inter_dir
parser = PPTParser.__new__(PPTParser)
parser.inter_dir = self.inter_dir
result = parser._find_title_in_transcript(new_title, corrected_segments)
if result is None:
# 匹配不到:标记为 unmatched,不参与烧录
self.clips[clip_index]['matched'] = False
self.clips[clip_index]['title'] = new_title
clip_json = os.path.join(self.inter_dir, f'clip{clip_index + 1}.json')
if os.path.exists(clip_json):
os.remove(clip_json)
self._save_config()
return False
start, end = result
self.clips[clip_index]['title'] = new_title
self.clips[clip_index]['start'] = start
self.clips[clip_index]['end'] = end
self.clips[clip_index]['matched'] = True
# 3. 删除对应 json(触发重新生成)
clip_json = os.path.join(self.inter_dir, f'clip{clip_index + 1}.json')
if os.path.exists(clip_json):
os.remove(clip_json)
# 4. 重新合并重叠片段
self.clips = parser._merge_overlapping_clips(self.clips)
# 5. 保存更新后的 config
self._save_config()
return True
def delete_clip(self, clip_index):
"""
删除指定 clip。
Args:
clip_index: clip 在 self.clips 中的索引
"""
if clip_index < 0 or clip_index >= len(self.clips):
return
# 从 self.clips 删除
self.clips.pop(clip_index)
# 删除对应的中间文件(文件名是 clip{index+1} 因为是 1-based
clip_num = clip_index + 1
for ext in ['.json', '.mp4', '_fade.mp4']:
path = os.path.join(self.inter_dir, f'clip{clip_num}{ext}')
if os.path.exists(path):
os.remove(path)
# 不重编号后续 clip 文件,GUI 加载时按顺序读取所有 clip*.* 文件即可
# 保存更新后的 config
self._save_config()
def add_clip_by_title(self, new_title):
"""
用新标题在转录中匹配时间段,判断合并或新增。
Args:
new_title: 新知识点标题
Returns:
tuple: (clip_index, matched) — 新 clip 在 self.clips 中的索引,matched 是否匹配成功
"""
# 1. 加载 corrected_transcript.json
transcript_path = os.path.join(self.inter_dir, 'corrected_transcript.json')
if not os.path.exists(transcript_path):
logger.warning(f"corrected_transcript.json not found")
return None, False
with open(transcript_path, 'r', encoding='utf-8') as f:
corrected_segments = json.load(f)
# 2. 匹配标题
from .ppt_parser import PPTParser
parser = PPTParser.__new__(PPTParser)
parser.inter_dir = self.inter_dir
result = parser._find_title_in_transcript(new_title, corrected_segments)
if result is None:
# 匹配不到:不加入 clips(用户可在 GUI 中看到未匹配状态)
logger.info(f"标题 '{new_title}' 在转录中未找到匹配")
return None, False
start, end = result
# 3. 构建新 clip
new_clip = {
'title': new_title,
'start': start,
'end': end,
'matched': True,
}
# 4. 判断是否与现有 clip 重叠
overlapped_index = None
for i, clip in enumerate(self.clips):
if clip.get('matched', True) is False:
continue
if start < clip['end'] and end > clip['start']:
overlapped_index = i
break
if overlapped_index is not None:
# 有重叠:合并到现有 clip
self.clips.append(new_clip)
self.clips = parser._merge_overlapping_clips(self.clips)
else:
# 无重叠:直接追加
self.clips.append(new_clip)
# 重新排序
self.clips = sorted(self.clips, key=lambda c: c['start'])
# 找到新 clip 的索引
new_clip_index = next(
i for i, c in enumerate(self.clips)
if c['title'] == new_title and c['start'] == start
)
# 5. 保存 config
self._save_config()
return new_clip_index, True
def reburn_titles(self) -> str:
"""
只重烧标题轨。
用已有 intermediates/clip*.json 重新生成标题轨并烧录,
保留用户修改过的字幕内容。
Returns:
final_path: 最终视频路径
"""
import glob
import shutil
# 确保有合并视频
merged_path = os.path.join(self.output_dir, 'concat_merged.mp4')
if not os.path.exists(merged_path):
clip_files = sorted(glob.glob(os.path.join(self.inter_dir, 'clip*.mp4')))
if not clip_files:
raise ValueError("No clips found")
merged_path = self.step_merge(clip_files)
# 收集 json paths
json_paths = [
os.path.join(self.inter_dir, f'clip{i+1}.json')
for i in range(len(self.clips))
]
json_paths = [p for p in json_paths if os.path.exists(p)]
if not json_paths:
raise ValueError("No clip JSON files found")
# 备份用户字幕(如果存在)
content_path = os.path.join(self.subs_dir, 'v1_content.srt')
content_backup = content_path + '.reburn_titles.bak'
if os.path.exists(content_path):
shutil.copy(content_path, content_backup)
# 生成字幕(标题+内容)
title_path, _ = self.step_generate_subtitles(self.clips, json_paths)
# 恢复用户字幕
if os.path.exists(content_backup):
shutil.move(content_backup, content_path)
# 烧录
final_path = self.step_burn(merged_path, title_path, content_path)
return final_path
def reburn_subtitles(self, user_texts=None) -> str:
"""
只重烧字幕轨。
跳过 LLM 校正步骤,直接从 json 生成或使用用户提供的文本。
Args:
user_texts: 可选,用户直接提供的 SRT 格式字幕文本。
如果为 None,从 json 重新生成(跳过 LLM 校正)。
Returns:
final_path: 最终视频路径
"""
import glob
# 确保有合并视频
merged_path = os.path.join(self.output_dir, 'concat_merged.mp4')
if not os.path.exists(merged_path):
clip_files = sorted(glob.glob(os.path.join(self.inter_dir, 'clip*.mp4')))
if not clip_files:
raise ValueError("No clips found")
merged_path = self.step_merge(clip_files)
# 获取标题轨(用已有的,不重新生成标题)
title_path = os.path.join(self.subs_dir, 'v1_title.srt')
if not os.path.exists(title_path):
json_paths = [
os.path.join(self.inter_dir, f'clip{i+1}.json')
for i in range(len(self.clips))
]
json_paths = [p for p in json_paths if os.path.exists(p)]
if json_paths:
title_path, _ = self.step_generate_subtitles(self.clips, json_paths)
# 处理字幕内容
if user_texts is not None:
# 用户直接提供字幕文本,写入文件
content_path = os.path.join(self.subs_dir, 'v1_content.srt')
with open(content_path, 'w', encoding='utf-8') as f:
f.write(user_texts)
else:
# 从 json 重新生成字幕(跳过 LLM 校正)
json_paths = [
os.path.join(self.inter_dir, f'clip{i+1}.json')
for i in range(len(self.clips))
]
json_paths = [p for p in json_paths if os.path.exists(p)]
if not json_paths:
raise ValueError("No clip JSON files found")
_, content_path = self.step_generate_subtitles(self.clips, json_paths)
# 烧录
final_path = self.step_burn(merged_path, title_path, content_path)
return final_path
# ==================== 主流程 ====================
def run(self) -> str:
"""
运行完整流水线
Returns:
final_path: 最终视频路径
Raises:
ValueError: 配置错误
RuntimeError: 处理失败
"""
logger.info(f"Pipeline starting: {len(self.clips)} clips, output: {self.output_dir}")
# Step 1: 提取
clip_paths = self.step_extract()
if not clip_paths:
raise RuntimeError("No clips extracted")
# Step 2: 转录
json_paths = self.step_transcribe(clip_paths)
# Step 2.5: 用 transcript 重新计算重叠片段的 title_segments 切分点
self._recalculate_title_segments_from_transcript(self.clips, json_paths)
# Step 3-6: 生成字幕、纠正、合并、烧录
title_path, content_path = self.step_generate_subtitles(self.clips, json_paths)
corrected_content_path = self.step_correct_subtitles(title_path, content_path)
merged_path = self.step_merge(clip_paths)
final_path = self.step_burn(merged_path, title_path, corrected_content_path)
logger.info(f"Pipeline completed: {final_path}")
return final_path
def run_with_user_confirm(self, confirmed_titles: List[Dict[str, Any]]) -> str:
"""
运行流水线,在标题纠正后等待用户确认
Args:
confirmed_titles: 用户确认后的标题列表
Returns:
final_path: 最终视频路径
"""
logger.info(f"Pipeline starting with user confirmation: {len(self.clips)} clips")
# Step 1-2: 提取+转录
clip_paths = self.step_extract()
if not clip_paths:
raise RuntimeError("No clips extracted")
json_paths = self.step_transcribe(clip_paths)
# Step 2.5: 用 transcript 重新计算重叠片段的 title_segments 切分点
self._recalculate_title_segments_from_transcript(self.clips, json_paths)
# 应用用户确认的标题
for i, confirmed in enumerate(confirmed_titles):
if i < len(self.clips):
self.clips[i]['title'] = confirmed.get('title', self.clips[i].get('title', ''))
# Step 3-6: 生成字幕、纠正、合并、烧录
title_path, content_path = self.step_generate_subtitles(self.clips, json_paths)
corrected_content_path = self.step_correct_subtitles(title_path, content_path)
merged_path = self.step_merge(clip_paths)
final_path = self.step_burn(merged_path, title_path, corrected_content_path)
logger.info(f"Pipeline completed: {final_path}")
return final_path
def create_pipeline_from_yaml(config_path: str, **kwargs) -> Pipeline:
"""
从YAML配置文件创建Pipeline
Args:
config_path: 配置文件路径
**kwargs: 额外配置参数
Returns:
Pipeline实例
"""
import yaml
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# 合并额外参数
config.update(kwargs)
return Pipeline(config, **kwargs)