refactor: extract config.py, add burn_only, fix title_segments and font size

- Extract all path/API config to config.py (single source of truth)
- Add run.py / burn_only.py / run.bat / burn.bat entry points
- burn_only: skip transcription/subtitle gen, fast reburn existing SRTs
- Fix title_segments: use transcript keyword time for split point
- Fix subtitle: each overlapping title shows max title_duration (not full clip)
- Fix burn_only font size: default from 90 to 60
- Delete old run_lesson1.bat/py, temp debug scripts
- Update README, ARCHITECTURE, CHANGELOG, add USAGE.md
This commit is contained in:
hmo
2026-05-03 23:22:10 +08:00
parent cf5004cf6a
commit aad1548348
39 changed files with 826 additions and 556 deletions
+48 -1
View File
@@ -60,8 +60,12 @@ def parse_args():
help='LLM API地址')
parser.add_argument('--whisper-model', type=str, default='large',
help='Whisper模型 (默认: large)')
parser.add_argument('--max-total-duration', type=int, default=300,
help='精华片段总时长上限(秒),默认300')
parser.add_argument('--verbose', '-V', action='store_true',
help='详细输出')
parser.add_argument('--resume-from-burn', action='store_true',
help='快速模式:跳过所有步骤,直接用已有片段和字幕文件合并烧录(用于手动修改SRT后快速重生成)')
return parser.parse_args()
@@ -77,7 +81,7 @@ def load_config_from_args(args) -> dict:
'whisper_model': args.whisper_model,
'video_params': {
'fade_duration': 1,
'title_fontsize': 90,
'title_fontsize': 60,
'title_color': 'FFFF00',
'subtitle_fontsize': 24,
'subtitle_color': 'FFFFFF',
@@ -137,8 +141,15 @@ def generate_config_from_ppt(args) -> dict:
progress_callback=progress_callback,
api_key=args.api_key,
api_host=args.api_host,
max_total_duration=args.max_total_duration,
)
# 补充API配置(parse_ppt_to_config不返回这些)
if args.api_key:
config['api_key'] = args.api_key
if args.api_host:
config['api_host'] = args.api_host
# 保存生成的配置
config_path = os.path.join(args.output, 'generated_config.yaml')
import yaml
@@ -207,6 +218,42 @@ def main():
pipeline = Pipeline(config)
# 快速模式:跳过所有步骤,直接用已有片段和字幕合并烧录
if args.resume_from_burn:
import glob
import shutil
output_dir = config.get('output_dir')
clips_dir = os.path.join(output_dir, 'clips')
merged_dir = os.path.join(output_dir, 'merged')
merged_path = os.path.join(merged_dir, 'merged.mp4')
title_path = os.path.join(output_dir, 'title.srt')
content_path = os.path.join(output_dir, 'content.srt')
# 检查必要文件
if not os.path.exists(title_path):
logger.error(f"找不到 title.srt: {title_path}")
return 1
if not os.path.exists(content_path):
logger.error(f"找不到 content.srt: {content_path}")
return 1
# 已有合并视频则直接烧录;否则先合并
if os.path.exists(merged_path):
logger.info(f"找到已有合并视频: {merged_path}")
else:
logger.info("开始合并片段...")
clip_files = sorted(glob.glob(os.path.join(clips_dir, 'clip*.mp4')))
if not clip_files:
logger.error(f"找不到片段视频: {clips_dir}/clip*.mp4")
return 1
merged_path = pipeline.step_merge(clip_files)
logger.info(f"合并完成: {merged_path}")
logger.info("开始烧录...")
final_path = pipeline.step_burn(merged_path, title_path, content_path)
logger.info(f"完成! 最终视频: {final_path}")
return 0
logger.info("开始处理...")
final_path = pipeline.run()
+1 -1
View File
@@ -79,7 +79,7 @@ DEFAULT_OUTPUT_DIR = os.path.join(PROJECT_ROOT, "output")
DEFAULT_VIDEO_PARAMS = {
"fade_duration": 1,
"title_duration": 3,
"title_fontsize": 90,
"title_fontsize": 60,
"title_color": "FFFF00",
"subtitle_fontsize": 24,
"subtitle_color": "FFFFFF",
+3 -100
View File
@@ -56,6 +56,8 @@ class LLMClient:
"max_tokens": max_tokens
}
logger.info(f"[LLM] request chars={len(prompt)}, max_tokens={max_tokens}")
for attempt in range(LLM_MAX_RETRIES):
try:
response = requests.post(url, headers=headers, json=payload, timeout=timeout)
@@ -73,6 +75,7 @@ class LLMClient:
content = choices[0].get("message", {}).get("content", "").strip()
if content:
logger.info(f"[LLM] response chars={len(content)}")
return content
logger.warning(f"LLM: Empty content (attempt {attempt+1})")
@@ -88,106 +91,6 @@ class LLMClient:
return None
def correct_title(self, transcript_text, original_title, all_titles=None):
"""
使用LLM纠正标题
Args:
transcript_text: 字幕文本
original_title: 原始标题
all_titles: 所有标题列表
Returns:
纠正后的标题
"""
titles_str = ", ".join(all_titles[:20]) if all_titles else ""
prompt = f"""你是一个钢琴教学视频的标题验证专家。
PPT提取的标题:{original_title}
视频字幕内容:{transcript_text[:500] if transcript_text else ""}
本节课所有标题:{titles_str}
【重要规则】
- 只有当你有90%以上把握认为原标题错误时,才输出纠正后的标题
- 如果原标题基本正确,即使不完美,也必须输出原标题
- 绝对不能输出与原标题完全不同概念的词
- 如果不确定,输出原标题
请直接输出标题,不要添加任何解释。"""
result = self.chat(prompt, max_tokens=50, timeout=LLM_TITLE_TIMEOUT)
return result if result else original_title
def validate_content(self, transcript_text, title):
"""
使用LLM验证内容是否与标题相关
Args:
transcript_text: 字幕文本
title: 标题
Returns:
(is_valid: bool, reason: str)
"""
prompt = f"""判断视频字幕内容是否与标题相关。
标题:{title}
字幕内容:{transcript_text[:300] if transcript_text else ""}
判断标准:
- 内容讨论的主题与标题概念相关 = 相关
- 内容与标题无关(如广告、闲聊、无关话题)= 无关
- 无法判断 = 不确定
请直接输出:相关/无关/不确定"""
result = self.chat(prompt, max_tokens=20, timeout=LLM_VALIDATE_TIMEOUT)
if not result:
return True, "error"
if "无关" in result:
return False, result
elif "不确定" in result:
return True, "uncertain"
return True, result
def full_text_correction(self, text, clip_title, knowledge_terms=None):
"""
使用LLM进行全文字幕纠错
Args:
text: 原始字幕
clip_title: 片段标题
knowledge_terms: 知识点列表
Returns:
纠错后的字幕
"""
knowledge_str = ", ".join(knowledge_terms[:20]) if knowledge_terms else ""
prompt = f"""你是一个钢琴教学视频的字幕纠错专家。
原始字幕:{text}
本节课片段标题:{clip_title}
本节课知识点:{knowledge_str}
请进行字幕纠错:
1. 修复语音识别错误(如"羞耻""休止""副点""附点""负点""附点"
2. 修复同音字错误
3. 保留原文的专业术语和表达方式
4. 不要改变原文的语气和意思
请直接输出纠错后的字幕,不要添加任何解释。"""
result = self.chat(prompt, max_tokens=500, timeout=LLM_TIMEOUT)
return result if result else text
# 全局LLM客户端实例
_llm_client = None
+127 -70
View File
@@ -12,7 +12,7 @@ import logging
from typing import Callable, Optional, List, Dict, Any
from .video import extract_clip, merge_clips, burn_dual_subtitles
from .subtitle import SubtitlePipeline
from .subtitle import SubtitlePipeline, correct_subtitles_llm
from .llm import LLMClient
from .corrections import apply_all_corrections, load_term_corrections_from_config
from .utils import ensure_dir
@@ -223,16 +223,41 @@ class Pipeline:
self.progress_callback('transcribing', int((i/total)*90), f"转录片段 {i}/{total}")
try:
segments, _ = model.transcribe(clip_path, language='zh', beam_size=5)
segments, _ = model.transcribe(clip_path, language='zh', beam_size=5, word_timestamps=True)
# 保存转录结果
# 保存转录结果(按句末标点进一步切分)
segments_data = []
for seg in segments:
segments_data.append({
'start': seg.start,
'end': seg.end,
'text': seg.text.strip()
})
words = seg.words if hasattr(seg, 'words') else []
if words:
# 用 word-level 时间戳在句末标点处切分
# 注意:标点可能附着在词后(如"吗?"、"奏,"),需 strip 后判断
_END_MARKS = '。!??'
sub_start = words[0].start
sub_text_parts = []
for word in words:
sub_text_parts.append(word.word)
# 剥离标点后判断是否为句末标记
stripped = word.word.rstrip(',、,')
if any(stripped.endswith(m) for m in _END_MARKS):
sub_end = word.end
sub_text = ''.join(sub_text_parts).strip()
if sub_text:
segments_data.append({'start': sub_start, 'end': sub_end, 'text': sub_text})
sub_start = word.end
sub_text_parts = []
# 剩余未到句末的文本
if sub_text_parts:
remaining = ''.join(sub_text_parts).strip()
if remaining:
segments_data.append({'start': sub_start, 'end': words[-1].end, 'text': remaining})
else:
# fallback:无 word timestamps,直接用原 segment
segments_data.append({
'start': seg.start,
'end': seg.end,
'text': seg.text.strip()
})
with open(json_path, 'w', encoding='utf-8') as f:
json.dump({'segments': segments_data}, f, ensure_ascii=False, indent=2)
@@ -249,59 +274,58 @@ class Pipeline:
self.step_callback('transcribing')
return json_paths
def step_correct_titles(self, json_paths: List[str]) -> List[Dict[str, Any]]:
def _recalculate_title_segments_from_transcript(
self,
clips: List[Dict],
json_paths: List[str]
) -> None:
"""
Step 3: LLM标题纠正
用 transcript 数据重新计算重叠片段的 title_segments 切分点。
Args:
json_paths: JSON文件路径列表
Returns:
corrected_clips: 纠正后的片段配置列表
重叠片段的 switch_offset 应该按 transcript 中第二个标题关键词
首次出现的时间来算,而不是按 clip 边界。
"""
self.step_callback('title_correcting')
self.progress_callback('title_correcting', 0, "开始标题纠正...")
for i, clip in enumerate(clips):
ts = clip.get('title_segments')
if not ts or len(ts) < 2:
continue
corrected_clips = []
total = len(self.clips)
# 取第二个标题段 [title, offset]
second_title, old_offset = ts[1]
json_path = json_paths[i] if i < len(json_paths) else None
if not json_path or not os.path.exists(json_path):
continue
for i, (clip, json_path) in enumerate(zip(self.clips, json_paths), 1):
original_title = clip.get('title', f'Clip {i}')
# 读取转录文本
transcript_text = ''
if json_path and os.path.exists(json_path):
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
transcript_text = ' '.join(seg.get('text', '') for seg in data.get('segments', []))
except Exception:
continue
# LLM纠正标题
corrected_title = original_title
if transcript_text and self.config.get('api_key'):
try:
corrected_title = self.llm_client.correct_title(
transcript_text,
original_title,
[c.get('title', '') for c in self.clips]
) or original_title
except Exception as e:
logger.warning(f"LLM title correction failed for clip {i}: {e}")
# 在 transcript 中搜索 second_title 的首次出现时间
first_time = None
for seg in data.get('segments', []):
for word_info in seg.get('words', []):
w = word_info.get('word', '')
# 关键词匹配(标题可能含多字符,取子串)
if second_title and second_title in w:
first_time = word_info['start']
break
if first_time is not None:
break
corrected_clip = {
'index': i - 1,
'title': corrected_title,
'original_title': original_title,
'start': clip['start'],
'end': clip['end'],
}
corrected_clips.append(corrected_clip)
percent = int((i / total) * 100)
self.progress_callback('title_correcting', percent, f"纠正标题 {i}/{total}")
self.progress_callback('title_correcting', 100, "标题纠正完成")
self.step_callback('title_correcting')
return corrected_clips
if first_time is not None:
new_offset = first_time
clip['title_segments'][1][1] = new_offset
logger.info(
f" clip{i+1} title_segments: "
f"'{second_title}'{old_offset:.2f}s → {new_offset:.2f}s"
)
else:
logger.warning(
f" clip{i+1} title_segments: "
f"未在 transcript 中找到 '{second_title}',保留原 offset {old_offset:.2f}s"
)
def step_generate_subtitles(self, corrected_clips: List[Dict], json_paths: List[str]) -> tuple:
"""
@@ -327,6 +351,7 @@ class Pipeline:
'start': clip['start'],
'end': clip['end'],
'title': clip.get('title', clip.get('original_title', '')),
'title_segments': clip.get('title_segments'), # 可能为None
}
clip_configs.append(clip_config)
@@ -357,6 +382,39 @@ class Pipeline:
self.step_callback('generating_subtitles')
return title_path, content_path
def step_correct_subtitles(self, title_path: str, content_path: str) -> str:
"""
Step 4.5: LLM纠正字幕内容
参考title.srt(时间轴锚点)和PPT原文(术语参考),
修正content.srt中的错字、漏字、术语错误。
Args:
title_path: 标题字幕路径
content_path: 内容字幕路径
Returns:
修正后的content_path
"""
ppt_text = self.config.get('ppt_text', '')
if not ppt_text:
logger.warning("PPT原文为空,跳过字幕纠正步骤")
return content_path
self.step_callback('correcting_subtitles')
self.progress_callback('correcting_subtitles', 0, "开始纠正字幕...")
corrected_path = correct_subtitles_llm(
title_path=title_path,
content_path=content_path,
ppt_text=ppt_text,
llm_client=self.llm_client,
)
self.progress_callback('correcting_subtitles', 100, "字幕纠正完成")
self.step_callback('correcting_subtitles')
return corrected_path
def step_merge(self, clip_paths: List[str]) -> str:
"""
Step 5: 合并视频
@@ -411,7 +469,7 @@ class Pipeline:
title_path,
content_path,
final_path,
title_fontsize=video_params.get('title_fontsize', 90),
title_fontsize=video_params.get('title_fontsize', 60),
title_color=video_params.get('title_color', 'FFFF00'),
subtitle_fontsize=video_params.get('subtitle_fontsize', 24),
subtitle_color=video_params.get('subtitle_color', 'FFFFFF')
@@ -447,17 +505,14 @@ class Pipeline:
# Step 2: 转录
json_paths = self.step_transcribe(clip_paths)
# Step 3: 标题纠正
corrected_clips = self.step_correct_titles(json_paths)
# Step 2.5: 用 transcript 重新计算重叠片段的 title_segments 切分点
self._recalculate_title_segments_from_transcript(self.clips, json_paths)
# Step 4: 生成字幕
title_path, content_path = self.step_generate_subtitles(corrected_clips, json_paths)
# Step 5: 合并
# Step 3-6: 生成字幕、纠正、合并、烧录
title_path, content_path = self.step_generate_subtitles(self.clips, json_paths)
corrected_content_path = self.step_correct_subtitles(title_path, content_path)
merged_path = self.step_merge(clip_paths)
# Step 6: 烧录
final_path = self.step_burn(merged_path, title_path, content_path)
final_path = self.step_burn(merged_path, title_path, corrected_content_path)
logger.info(f"Pipeline completed: {final_path}")
return final_path
@@ -474,23 +529,25 @@ class Pipeline:
"""
logger.info(f"Pipeline starting with user confirmation: {len(self.clips)} clips")
# Step 1-3: 同上
# Step 1-2: 提取+转录
clip_paths = self.step_extract()
if not clip_paths:
raise RuntimeError("No clips extracted")
json_paths = self.step_transcribe(clip_paths)
corrected_clips = self.step_correct_titles(json_paths)
# Step 2.5: 用 transcript 重新计算重叠片段的 title_segments 切分点
self._recalculate_title_segments_from_transcript(self.clips, json_paths)
# 应用用户确认的标题
for i, confirmed in enumerate(confirmed_titles):
if i < len(corrected_clips):
corrected_clips[i]['title'] = confirmed.get('title', corrected_clips[i]['title'])
if i < len(self.clips):
self.clips[i]['title'] = confirmed.get('title', self.clips[i].get('title', ''))
# Step 4-6: 同上
title_path, content_path = self.step_generate_subtitles(corrected_clips, json_paths)
# Step 3-6: 生成字幕、纠正、合并、烧录
title_path, content_path = self.step_generate_subtitles(self.clips, json_paths)
corrected_content_path = self.step_correct_subtitles(title_path, content_path)
merged_path = self.step_merge(clip_paths)
final_path = self.step_burn(merged_path, title_path, content_path)
final_path = self.step_burn(merged_path, title_path, corrected_content_path)
logger.info(f"Pipeline completed: {final_path}")
return final_path
+68 -62
View File
@@ -17,6 +17,8 @@ import zipfile
import logging
from typing import List, Dict, Any, Optional, Callable, Tuple
from .llm import LLMClient
logger = logging.getLogger(__name__)
@@ -36,6 +38,7 @@ class PPTParser:
api_key: Optional[str] = None,
api_host: Optional[str] = None,
max_clip_duration: int = 30,
max_total_duration: int = 300,
):
"""
初始化PPT解析器
@@ -48,6 +51,7 @@ class PPTParser:
api_key: LLM API密钥
api_host: LLM API地址
max_clip_duration: 每个精华片段的最大时长(秒),默认30秒
max_total_duration: 所有精华片段的总时长上限(秒),默认300秒(5分钟)
"""
self.video_path = video_path
self.ppt_path = ppt_path
@@ -56,6 +60,7 @@ class PPTParser:
self.api_key = api_key
self.api_host = api_host
self.max_clip_duration = max_clip_duration
self.max_total_duration = max_total_duration
self.inter_dir = os.path.join(output_dir, 'intermediates')
os.makedirs(self.inter_dir, exist_ok=True)
@@ -284,50 +289,19 @@ class PPTParser:
def _call_llm(self, prompt: str, max_tokens: int = 4096, timeout: int = 300, retries: int = 3) -> Optional[str]:
"""
带重试的 LLM 调用
使用实例的 api_key/api_host 创建 LLMClient 并调用 chat
Args:
prompt: 发送给 LLM 的提示词
max_tokens: 最大 token 数
timeout: 单次请求超时(秒)
retries: 最大重试次数
retries: 最大重试次数(chat() 内部也有重试,这里传 retries 但 chat() 忽略它)
Returns:
LLM 返回的 content,失败返回 None
"""
import requests
url = f"{self.api_host}/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "doubao-seed-2.0-lite",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": 0.1
}
last_err = None
for attempt in range(retries):
try:
response = requests.post(url, headers=headers, json=payload, timeout=timeout)
response.raise_for_status()
result = response.json()
content = result.get("choices", [{}])[0].get("message", {}).get("content", "")
if content:
return content
logger.warning(f"LLM返回空内容(第{attempt+1}次尝试)")
last_err = "空内容"
except requests.exceptions.Timeout:
logger.warning(f"LLM请求超时(第{attempt+1}次尝试,timeout={timeout}s")
last_err = "超时"
except requests.exceptions.RequestException as e:
logger.warning(f"LLM请求失败(第{attempt+1}次尝试): {e}")
last_err = str(e)
logger.error(f"LLM调用失败(已重试{retries}次): {last_err}")
return None
client = LLMClient(api_key=self.api_key, api_host=self.api_host)
return client.chat(prompt=prompt, max_tokens=max_tokens, timeout=timeout)
def llm_extract_knowledge_points_from_ppt(self) -> Tuple[Optional[List[Dict[str, Any]]], Optional[str]]:
"""
@@ -415,7 +389,7 @@ class PPTParser:
- 一种方法:如"放松练习""分手练习""慢速练习""唱谱法"
- 一个专题:如"乐理基础""手型要求""课后作业"
【文本清理规则】(以不影响原文意思表达为前提):
【文本清理规则】(用于 cleaned_text,不影响知识点提取):
- 合并连续的空行(超过1个空行的压缩为1个)
- 去除行首行尾多余空格
- 保留页面之间的自然分段(每页独立段落)
@@ -423,12 +397,16 @@ class PPTParser:
- 无标点的长句子:如果一行文字超过50字且无标点,才合并到下一行
- 保留专有名词、术语的原始写法
重要规则】:
知识点提取规则】:
1. 扫描全部页面:不要只找"知识点汇总页",每页都要看
2. 原文保留:知识点原文是什么就写什么,不要解释、概括、翻译或扩展
3. 拆分合并:被拆分的片段(如"的三"+"种方法""谱号、"+"大谱表、"等)要合并为完整知识词
4. 标题过滤:忽略"本课主要知识点""课程回顾""本节课重要知识点"等纯导航/目录类标题
5. 分类项处理:格式如"XX:子项1、子项2、子项3"时,冒号后的每个子项各自独立成知识点;但如果冒号后是完整句子或定义(如"XX:这是指……"),则整句描述的对象本身才是知识点
5. 列表/定义项拆分:
- 格式为"XX:子项1,子项2,子项3"时,冒号后的每个子项各自独立成知识点
- 格式为多行列表(如"重复:xxx\n级进:xxx\n跳进:xxx"),每行各自独立成知识点
- 如果冒号后是完整句子或定义(如"XX:这是指……"),则整句描述的对象本身才是知识点
- **知识点标题不得包含括号、冒号、引号等任何标点符号**,只保留核心词(如"重复(旋律进行方式)"应输出为"重复""音高、和弦"应输出为"音高""和弦"
6. 列表项过滤:只保留有独立含义的知识点,忽略序号、标点符号、无意义的装饰词
7. 内容页优先:如果一个知识点在教学内容页展开讲解了,比仅出现在列表中更重要
8. 最小粒度:宁可多输出几个独立的知识词,也不要合并成一个大而笼统的标题
@@ -668,13 +646,24 @@ class PPTParser:
for clip in sorted_clips[1:]:
prev = merged[-1]
if clip['start'] < prev['end']:
# 重叠:prev延伸到clip的end保留clip的标题(标题在clip原start处切换
# 重叠:prev延伸到clip的end检测标题切换
if clip['title'] != prev['title']:
# 标题切换点 = clip['start'] 相对于 prev 起点的时间
switch_offset = clip['start'] - prev['start']
# 建立 title_segments
prev['title_segments'] = [
[prev['title'], 0],
[clip['title'], switch_offset],
]
prev['title'] = prev['title'] # 保留第一个标题作主标题
prev['end'] = clip['end']
logger.info(f" 合并重叠: '{prev['title']}' 延伸至 {prev['end']}s"
f"标题在 {clip['start']}s 切换为 '{clip['title']}'")
else:
# 不重叠:直接添加
merged.append(dict(clip))
# 不重叠:直接添加,清除 title_segments(由系统默认处理)
c = dict(clip)
c.pop('title_segments', None)
merged.append(c)
return merged
@@ -855,7 +844,11 @@ class PPTParser:
# PPT参考(完整文本 + 知识点列表)
if ppt_full_text or ppt_knowledge:
knowledge_lines = "\n".join([f" - {kp['title']}" for kp in (ppt_knowledge or [])])
knowledge_list = ppt_knowledge or []
# 带序号的列表,LLM 用序号引用,不许自由发挥
knowledge_lines = "\n".join(
[f" [{i}] {kp['title']}" for i, kp in enumerate(knowledge_list)]
)
knowledge_section = f"""
【PPT参考文本(权威背景)】
以下是与本节课配套的PPT完整内容,请以此为权威参考:
@@ -887,14 +880,13 @@ class PPTParser:
【重要规则】
1. 逐条处理:必须为列表中的**每一个知识点**搜索视频转录文本,找到讲解最集中的片段
2. **title 必须完全等于知识点列表中的原名**,不许改写、不许概括、不许扩展
- ✅ 正确:knowledge_point 是"弹琴的手型"title 就用"弹琴的手型"
- ❌ 错误:title 用"手型支撑与放松的核心要求"(自己发挥
3. **knowledge_point 字段也必须用知识点列表中的原名**
4. 时间必须精确:使用转录文本中的实际时间戳
5. 时长控制:每个片段约5-15秒,重要内容可以稍长(最长不超过20秒)
6. 总时长不超过180秒:如果知识点太多导致总时长超标,优先保留最重要的知识点,其余在not_found中说明
7. 只输出JSON,不要添加任何解释
2. **输出序号而非名称**:kp_idx 必须是列表中的序号(如 0、3、7),不许自己发挥名称
- ✅ 正确:"kp_idx": 3 对应列表中第 4 项
- ❌ 错误:"kp_idx": "重复(旋律进行方式)"(这是自由发挥,不是序号
3. 时间必须精确:使用转录文本中的实际时间戳
4. 时长控制:每个片段约5-15秒,重要内容可以稍长(最长不超过20秒)
5. 时长不超过{self.max_total_duration}秒:如果知识点太多导致总时长超标,优先保留最重要的知识点,其余在not_found中说明
6. 只输出JSON,不要添加任何解释
【视频转录文本(带时间戳)】
{transcript_text}
@@ -902,10 +894,10 @@ class PPTParser:
请以以下JSON格式输出(不要输出其他内容):
{{
"clips": [
{{"title": "知识点原名(不许改写)", "start": 开始秒数, "end": 结束秒数, "knowledge_point": "知识点原名"}},
{{"title": "知识点原名", "start": 开始秒数, "end": 结束秒数, "knowledge_point": "知识点原名"}}
{{"kp_idx": 序号, "start": 开始秒数, "end": 结束秒数}},
{{"kp_idx": 序号, "start": 开始秒数, "end": 结束秒数}}
],
"not_found": ["知识点原名(必须与列表中的名称完全一致)"]
"not_found": [序号, 序号]
}}"""
try:
@@ -929,31 +921,41 @@ class PPTParser:
return None
clips = parsed.get("clips", [])
not_found = parsed.get("not_found", [])
not_found_idxs = parsed.get("not_found", [])
if not clips and not not_found:
if not clips and not not_found_idxs:
return None
# 验证和清理
# 通过序号映射回原始名称(序号 → 原始知识点名称)
knowledge_list = ppt_knowledge or []
title_map = {i: kp['title'] for i, kp in enumerate(knowledge_list)}
# 验证和清理:序号 → 原始名称
validated = []
for clip in clips:
title = clip.get("title", "")
kp_idx = int(clip.get("kp_idx", -1))
if kp_idx not in title_map:
logger.warning(f" 跳过无效序号 kp_idx={kp_idx}(超出范围 0-{len(title_map)-1}")
continue
title = title_map[kp_idx]
start = max(0, float(clip.get("start", 0)))
raw_end = float(clip.get("end", 0))
end = min(raw_end, start + self.max_clip_duration)
kp = clip.get("knowledge_point", "")
validated.append({
"title": title,
"start": int(start),
"end": int(end),
"knowledge_point": kp,
"knowledge_point": title,
})
logger.info(f"LLM提取成功: {len(validated)} 个片段,{len(not_found)} 个未找到")
# not_found 中的序号也映射回名称
not_found_names = [title_map[i] for i in not_found_idxs if i in title_map]
logger.info(f"LLM提取成功: {len(validated)} 个片段,{len(not_found_names)} 个未找到")
for c in validated:
logger.info(f" [{c['knowledge_point']}] {c['title']}: {c['start']}s - {c['end']}s")
if not_found:
logger.info(f" 未找到知识点: {not_found}")
if not_found_names:
logger.info(f" 未找到知识点: {not_found_names}")
return validated
@@ -1007,6 +1009,9 @@ class PPTParser:
}, f, ensure_ascii=False)
logger.info(f"已保存PPT知识点到checkpoint")
# 保存PPT原文供后续步骤使用
self.ppt_text = ppt_cleaned_text or ""
# Step 3: LLM校正文本(以PPT全文为参考)- 带checkpoint复用
self._report('parse', 30, "LLM校正文本...")
corrected_checkpoint = os.path.join(self.inter_dir, "corrected_transcript.json")
@@ -1052,6 +1057,7 @@ class PPTParser:
"clips": clips,
"output_dir": self.output_dir,
"term_corrections": self.term_corrections,
"ppt_text": getattr(self, 'ppt_text', ''),
"video_params": {
"fade_duration": 1,
"title_fontsize": 48,
+241 -11
View File
@@ -228,15 +228,32 @@ class SubtitlePipeline:
offset = offsets[i]
clip_duration = offsets[i+1] - offsets[i] if i+1 < len(offsets) else 3
# 添加标题(使用title样式)- 标题显示3秒后正文才显示,避免重叠
title_duration = min(3, clip_duration)
title_track.add(offset, offset + title_duration, clip['title'], style='title')
# 添加标题(使用title样式)
if clip.get('title_segments'):
# 多标题片段:遍历 title_segments [(title, start_offset), ...]
# 每个标题最多显示 title_duration 秒
segs = clip['title_segments']
for j, (title, seg_start) in enumerate(segs):
next_start = segs[j+1][1] if j+1 < len(segs) else clip_duration
seg_end = min(seg_start + title_duration, next_start)
title_track.add(
offset + seg_start,
offset + seg_end,
title,
style='title'
)
# 正文字幕从最后一个标题段结束后开始
content_start = offset + segs[-1][1]
else:
# 单标题:标题显示3秒后正文才显示,避免重叠
title_duration = min(3, clip_duration)
title_track.add(offset, offset + title_duration, clip['title'], style='title')
content_start = offset + title_duration
# 添加正文字幕 - 从标题结束后开始,避免重叠
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
content_start = offset + title_duration # 正文从标题结束后开始
for seg in data.get('segments', []):
text = seg.get('text', '').strip()
if not text:
@@ -253,12 +270,37 @@ class SubtitlePipeline:
# 只添加在clip时间范围内的字幕
clip_end = clip['end'] - clip['start'] + offset
if seg_start < clip_end and seg_end <= clip_end:
content_track.add(
seg_start,
seg_end,
text,
style='content'
)
# pipeline.py 已按标点拆分,此处只处理意外超长segment(无标点且>8秒)
duration = seg_end - seg_start
if duration > 8.0:
# 按标点拆分
import re
parts = re.split(r'(?<=[。!??])', text)
if len(parts) > 1:
total_len = sum(len(p) for p in parts)
if total_len > 0:
cum_len = 0
s_start = seg_start
for part in parts:
part = part.strip()
if not part:
continue
cum_len += len(part)
s_end = seg_start + duration * cum_len / total_len
content_track.add(s_start, s_end, part, style='content')
s_start = s_end
continue
# 无标点则平均拆分
num_splits = max(2, int(duration / 8.0) + 1)
chunk_len = len(text) // num_splits
for i in range(num_splits):
t_start = seg_start + duration * i / num_splits
t_end = seg_start + duration * (i + 1) / num_splits
chunk_text = text[i * chunk_len:(i + 1) * chunk_len].strip()
if chunk_text:
content_track.add(t_start, t_end, chunk_text, style='content')
else:
content_track.add(seg_start, seg_end, text, style='content')
# 保存两个轨道 - 标题使用SRT格式
version = self._get_next_version()
@@ -320,4 +362,192 @@ def load_clip_subtitles(inter_dir, clip_nums):
if os.path.exists(json_path):
with open(json_path, 'r', encoding='utf-8') as f:
clips[num] = json.load(f)
return clips
return clips
def parse_srt(content: str) -> list:
"""
解析SRT文本为字幕段列表
Args:
content: SRT文件内容
Returns:
[(index, start, end, text), ...]
"""
blocks = content.strip().split('\n\n')
segments = []
for block in blocks:
lines = block.strip().split('\n')
if len(lines) >= 3:
try:
idx = int(lines[0])
times = lines[1].split(' --> ')
start = times[0].strip().replace(',', '.')
end = times[1].strip().replace(',', '.')
text = '\n'.join(lines[2:])
segments.append((idx, start, end, text))
except (ValueError, IndexError):
continue
return segments
def format_srt(segments: list) -> str:
"""
将字幕段列表格式化为SRT文本
Args:
segments: [(index, start, end, text), ...]
Returns:
SRT格式字符串
"""
lines = []
for i, (idx, start, end, text) in enumerate(segments):
start_s = start.replace('.', ',')
end_s = end.replace('.', ',')
lines.append(f"{idx}\n{start_s} --> {end_s}\n{text}")
return '\n\n'.join(lines) + '\n'
def correct_subtitles_llm(
title_path: str,
content_path: str,
ppt_text: str,
llm_client,
output_path: str = None,
) -> str:
"""
用LLM纠正字幕内容(idx|text格式,只发纯文本,保留时间轴)
参考title.srt(时间轴+知识点锚点)和PPT原文(术语纠错),
修正content.srt中的错字、漏字、术语错误。
Args:
title_path: 标题字幕SRT路径
content_path: 内容字幕SRT路径(待修正)
ppt_text: PPT原文(术语参考)
llm_client: LLM客户端
output_path: 修正后输出路径(默认覆盖原content_path
Returns:
修正后的字幕文件路径
"""
import json
# 读取原始字幕
with open(title_path, 'r', encoding='utf-8') as f:
title_srt = f.read()
with open(content_path, 'r', encoding='utf-8') as f:
content_srt = f.read()
# 解析SRT,保留完整timestamp
content_segments = parse_srt(content_srt)
# 构建idx|text格式的纯文本
lines_for_llm = []
for seg in content_segments:
idx, start, end, text = seg
lines_for_llm.append(f"{idx}|{text}")
transcript_text = '\n'.join(lines_for_llm)
# 构建prompt
prompt = f"""你是一个钢琴教学视频的字幕纠错专家。
## 参考信息
标题字幕(title.srt)- 权威知识点参考:
{title_srt[:2000]}
PPT原文(ppt- 术语权威参考:
{ppt_text[:3000]}
## 任务
修正以下转录文本中的错字、漏字、术语错误(如""改为"sol""拿两个音速"改为"拿两个因素"等)。
每行格式:序号|原始文字
## 待纠正文本({len(content_segments)}条):
{transcript_text}
## 输出要求
- 以JSON格式输出,只输出JSON,不要有任何其他解释
- 用原始序号匹配,不要改变结构
{{
"corrected": [
{{"idx": 序号, "text": "修正后的文字"}},
{{"idx": 序号, "text": "修正后的文字"}}
]
}}"""
# 调用LLM
response = llm_client.chat(
prompt=prompt,
max_tokens=8192,
)
if not response:
logger.warning("LLM返回为空,保留原字幕")
return content_path
# 解析JSON
try:
import re
# 去掉markdown代码块
response_clean = response.strip()
if response_clean.startswith('```'):
lines = response_clean.split('\n')
if lines[0].strip().strip('`'):
lines = lines[1:]
if lines and lines[-1].strip().strip('`'):
lines = lines[:-1]
response_clean = '\n'.join(lines)
# 提取JSON
json_match = re.search(r'\{.*\}', response_clean, re.DOTALL)
if not json_match:
raise ValueError("No JSON found in response")
result = json.loads(json_match.group())
corrected_list = result.get('corrected', [])
# 建立 idx -> corrected_text 的映射
corrected_map = {item['idx']: item['text'] for item in corrected_list}
except Exception as e:
logger.warning(f"字幕纠正JSON解析失败,保留原字幕: {e}")
return content_path
# 重建SRT,对比diff
orig_by_idx = {seg[0]: seg[3] for seg in content_segments}
changed = []
result_lines = []
for seg in content_segments:
idx, start, end, orig_text = seg
new_text = corrected_map.get(idx, orig_text)
# 恢复SRT格式
start_s = start.replace('.', ',')
end_s = end.replace('.', ',')
result_lines.append(f"{idx}\n{start_s} --> {end_s}\n{new_text}")
if new_text != orig_text:
changed.append((idx, orig_text, new_text))
corrected_srt = '\n\n'.join(result_lines) + '\n'
# 保存
if output_path is None:
output_path = content_path
with open(output_path, 'w', encoding='utf-8') as f:
f.write(corrected_srt)
# Diff日志
if changed:
logger.info(f"字幕纠正,共 {len(changed)} 处修改:")
for idx, old, new in changed:
old_s = old[:50] + ('...' if len(old) > 50 else '')
new_s = new[:50] + ('...' if len(new) > 50 else '')
logger.info(f" [{idx:3d}] \"{old_s}\"\"{new_s}\"")
else:
logger.info("字幕纠正,无修改")
logger.info(f"字幕已修正: {output_path}")
return output_path
+4 -11
View File
@@ -146,7 +146,7 @@ def burn_subtitles(video_path, srt_path, output_path):
return success
def burn_dual_subtitles(video_path, title_srt_path, content_srt_path, output_path, title_fontsize=90, title_color="FFFF00", subtitle_fontsize=24, subtitle_color="FFFFFF"):
def burn_dual_subtitles(video_path, title_srt_path, content_srt_path, output_path, title_fontsize=60, title_color="FFFF00", subtitle_fontsize=24, subtitle_color="FFFFFF"):
"""
烧录两层字幕到视频(标题在屏幕正中,正文在下方)
@@ -163,7 +163,7 @@ def burn_dual_subtitles(video_path, title_srt_path, content_srt_path, output_pat
Returns:
True if success
"""
# Windows路径转义
# Windows路径转义D:/ 需要双反斜杠转义
title_escaped = title_srt_path.replace('\\', '/').replace('D:/', 'D\\:/')
content_escaped = content_srt_path.replace('\\', '/').replace('D:/', 'D\\:/')
@@ -180,19 +180,12 @@ def burn_dual_subtitles(video_path, title_srt_path, content_srt_path, output_pat
title_bgr = html_to_bgr(title_color)
subtitle_bgr = html_to_bgr(subtitle_color)
# 标题样式:使用SRT+force_styleAlignment=5水平居中,垂直位置由MarginV控制
# 标题样式:使用SRT+force_styleAlignment=2水平居中,MarginV=150使其位于屏幕上偏下区域(36%高度)
# 正文字样式:底部居中,24字号,白色,带描边
content_style = f"FontName=微软雅黑,FontSize={subtitle_fontsize},PrimaryColour={subtitle_bgr},Alignment=2,MarginV=20,Outline=1,Shadow=1"
# 使用两个独立字幕滤镜分别渲染,然后叠加
# 标题使用Alignment=5,MarginV=0(正中)
title_style = f"FontName=微软雅黑,FontSize={title_fontsize},PrimaryColour={title_bgr},Alignment=5,MarginV=0,Outline=3,Shadow=2"
title_style = f"FontName=微软雅黑,FontSize={title_fontsize},PrimaryColour={title_bgr},Alignment=2,MarginV=150,Outline=3,Shadow=2"
# 使用两个字幕滤镜叠加,然后映射视频+原始音频
# 标题使用Alignment=5,MarginV=0(正中)
title_style = f"FontName=微软雅黑,FontSize={title_fontsize},PrimaryColour={title_bgr},Alignment=5,MarginV=0,Outline=3,Shadow=2"
# 使用两个字幕滤镜叠加
filter_str = f"[0:v]subtitles='{title_escaped}':force_style='{title_style}',subtitles='{content_escaped}':force_style='{content_style}'[out]"
# 保留原始音频 - 映射视频输出和原始音频