feat: add Pipeline.reburn_titles and reburn_subtitles
This commit is contained in:
@@ -653,6 +653,108 @@ class Pipeline:
|
|||||||
self._save_config()
|
self._save_config()
|
||||||
return new_clip_index, True
|
return new_clip_index, True
|
||||||
|
|
||||||
|
def reburn_titles(self) -> str:
|
||||||
|
"""
|
||||||
|
只重烧标题轨。
|
||||||
|
|
||||||
|
用已有 intermediates/clip*.json 重新生成标题轨并烧录,
|
||||||
|
保留用户修改过的字幕内容。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
final_path: 最终视频路径
|
||||||
|
"""
|
||||||
|
import glob
|
||||||
|
import shutil
|
||||||
|
|
||||||
|
# 确保有合并视频
|
||||||
|
merged_path = os.path.join(self.output_dir, 'concat_merged.mp4')
|
||||||
|
if not os.path.exists(merged_path):
|
||||||
|
clip_files = sorted(glob.glob(os.path.join(self.inter_dir, 'clip*.mp4')))
|
||||||
|
if not clip_files:
|
||||||
|
raise ValueError("No clips found")
|
||||||
|
merged_path = self.step_merge(clip_files)
|
||||||
|
|
||||||
|
# 收集 json paths
|
||||||
|
json_paths = [
|
||||||
|
os.path.join(self.inter_dir, f'clip{i+1}.json')
|
||||||
|
for i in range(len(self.clips))
|
||||||
|
]
|
||||||
|
json_paths = [p for p in json_paths if os.path.exists(p)]
|
||||||
|
if not json_paths:
|
||||||
|
raise ValueError("No clip JSON files found")
|
||||||
|
|
||||||
|
# 备份用户字幕(如果存在)
|
||||||
|
content_path = os.path.join(self.subs_dir, 'v1_content.srt')
|
||||||
|
content_backup = content_path + '.reburn_titles.bak'
|
||||||
|
if os.path.exists(content_path):
|
||||||
|
shutil.copy(content_path, content_backup)
|
||||||
|
|
||||||
|
# 生成字幕(标题+内容)
|
||||||
|
title_path, _ = self.step_generate_subtitles(self.clips, json_paths)
|
||||||
|
|
||||||
|
# 恢复用户字幕
|
||||||
|
if os.path.exists(content_backup):
|
||||||
|
shutil.move(content_backup, content_path)
|
||||||
|
|
||||||
|
# 烧录
|
||||||
|
final_path = self.step_burn(merged_path, title_path, content_path)
|
||||||
|
return final_path
|
||||||
|
|
||||||
|
def reburn_subtitles(self, user_texts=None) -> str:
|
||||||
|
"""
|
||||||
|
只重烧字幕轨。
|
||||||
|
|
||||||
|
跳过 LLM 校正步骤,直接从 json 生成或使用用户提供的文本。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_texts: 可选,用户直接提供的 SRT 格式字幕文本。
|
||||||
|
如果为 None,从 json 重新生成(跳过 LLM 校正)。
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
final_path: 最终视频路径
|
||||||
|
"""
|
||||||
|
import glob
|
||||||
|
|
||||||
|
# 确保有合并视频
|
||||||
|
merged_path = os.path.join(self.output_dir, 'concat_merged.mp4')
|
||||||
|
if not os.path.exists(merged_path):
|
||||||
|
clip_files = sorted(glob.glob(os.path.join(self.inter_dir, 'clip*.mp4')))
|
||||||
|
if not clip_files:
|
||||||
|
raise ValueError("No clips found")
|
||||||
|
merged_path = self.step_merge(clip_files)
|
||||||
|
|
||||||
|
# 获取标题轨(用已有的,不重新生成标题)
|
||||||
|
title_path = os.path.join(self.subs_dir, 'v1_title.srt')
|
||||||
|
if not os.path.exists(title_path):
|
||||||
|
json_paths = [
|
||||||
|
os.path.join(self.inter_dir, f'clip{i+1}.json')
|
||||||
|
for i in range(len(self.clips))
|
||||||
|
]
|
||||||
|
json_paths = [p for p in json_paths if os.path.exists(p)]
|
||||||
|
if json_paths:
|
||||||
|
title_path, _ = self.step_generate_subtitles(self.clips, json_paths)
|
||||||
|
|
||||||
|
# 处理字幕内容
|
||||||
|
if user_texts is not None:
|
||||||
|
# 用户直接提供字幕文本,写入文件
|
||||||
|
content_path = os.path.join(self.subs_dir, 'v1_content.srt')
|
||||||
|
with open(content_path, 'w', encoding='utf-8') as f:
|
||||||
|
f.write(user_texts)
|
||||||
|
else:
|
||||||
|
# 从 json 重新生成字幕(跳过 LLM 校正)
|
||||||
|
json_paths = [
|
||||||
|
os.path.join(self.inter_dir, f'clip{i+1}.json')
|
||||||
|
for i in range(len(self.clips))
|
||||||
|
]
|
||||||
|
json_paths = [p for p in json_paths if os.path.exists(p)]
|
||||||
|
if not json_paths:
|
||||||
|
raise ValueError("No clip JSON files found")
|
||||||
|
_, content_path = self.step_generate_subtitles(self.clips, json_paths)
|
||||||
|
|
||||||
|
# 烧录
|
||||||
|
final_path = self.step_burn(merged_path, title_path, content_path)
|
||||||
|
return final_path
|
||||||
|
|
||||||
# ==================== 主流程 ====================
|
# ==================== 主流程 ====================
|
||||||
|
|
||||||
def run(self) -> str:
|
def run(self) -> str:
|
||||||
|
|||||||
Reference in New Issue
Block a user