#!/usr/bin/env python3 """ 钢琴课精华视频生成主脚本 通用版本,支持配置化 GPU 资源管理: - 转录前清理残留 Python 进程,释放 GPU 显存 - 转录完成后显式释放模型,避免显存泄漏 """ import subprocess import os import json import yaml import gc import torch import argparse import re import zhconv from pypinyin import pinyin, Style from correction_dict import ( DIRECT_FIXES, SONG_NAME_FIXES, ANOMALY_WORDS, MUSIC_TERMS, ANOMALY_PATTERNS, ) def get_pinyin(text): """获取文本的拼音(无声调)""" return "".join([item[0] for item in pinyin(text, style=Style.NORMAL)]) def pinyin_similarity(word1, word2): """计算两个词的拼音相似度(考虑声母韵母近似)""" py1 = get_pinyin(word1) py2 = get_pinyin(word2) if py1 == py2: return 1.0 max_len = max(len(py1), len(py2)) if max_len == 0: return 0 # 字符级编辑距离 common = sum(1 for c1, c2 in zip(py1, py2) if c1 == c2) return common / max_len def detect_anomalies_in_text(text, knowledge_terms=None): """ 检测文本中的语义异常词 返回: list of (异常词, 建议替换词, 原因) """ if knowledge_terms is None: knowledge_terms = set() anomalies = [] # 第一步:基于正则模式的异常检测 for rule in ANOMALY_PATTERNS: matches = re.findall(rule["pattern"], text) if matches: for match in matches: # 获取完整匹配 full_match = re.search(rule["pattern"], text) if full_match: original = full_match.group(0) replacement = full_match.expand(rule["replace"]) anomalies.append((original, replacement, rule["reason"])) # 第二步:独立异常词检测 + 上下文推断 for anomaly in ANOMALY_WORDS: if anomaly in text: # 检查异常词周围的上下文 idx = text.find(anomaly) context_start = max(0, idx - 10) context_end = min(len(text), idx + len(anomaly) + 10) context = text[context_start:context_end] # 检查上下文中是否有音乐术语 has_music_context = any(term in context for term in MUSIC_TERMS) has_music_context = has_music_context or any( term in context for term in knowledge_terms ) # 检查前后是否有数字+分的模式(如"八分"、"四分"、"十六分") has_note_context = bool( re.search(r"[一二三四五六七八九十百千万\d]+分", context) ) if has_music_context or has_note_context: # 在音乐术语词库中查找拼音近似的词 anomaly_py = get_pinyin(anomaly) best_match = None best_score = 0 for term in MUSIC_TERMS: score = pinyin_similarity(anomaly, term) if score > best_score and score >= 0.5: best_score = score best_match = term # 也检查知识点列表 for term in knowledge_terms: score = pinyin_similarity(anomaly, term) if score > best_score and score >= 0.5: best_score = score best_match = term if best_match: reason = ( f"'{anomaly}'在音乐教学语境中语义异常," f"上下文包含音乐术语," f"拼音相似度{best_score:.2f},推断为'{best_match}'" ) anomalies.append((anomaly, best_match, reason)) return anomalies def ai_context_correct(text, clip_title="", all_clips=None): """ AI上下文纠错:基于语义异常检测 + 上下文推断 + 拼音相似度 工作流程: 1. 直接替换已知的固定错误(安全网) 2. 检测语义异常(与音乐教学无关的词、语法不通的词) 3. 分析异常词的上下文(前后10个字符) 4. 结合知识点列表和音乐术语词库,用拼音相似度匹配最合理的替换 5. 应用替换 """ if all_clips is None: all_clips = [] # 第零步:直接替换已知的固定错误(安全网,确保一定生效) direct_fixes = { "羞耻": "休止", "休指": "休止", "修止": "休止", "八分羞耻": "八分休止", "四分羞耻": "四分休止", "十六分羞耻": "十六分休止", "二分羞耻": "二分休止", "全羞耻": "全休止", "分羞耻": "分休止", "盖头来": "《掀起你的盖头来》", "掀起我的盖头来": "《掀起你的盖头来》", } for wrong, correct in direct_fixes.items(): text = text.replace(wrong, correct) # 收集所有知识点名称 knowledge_terms = set() for clip in all_clips: title = clip.get("title", "") title = re.sub(r"^知识点\d+[::]\s*", "", title) if title: knowledge_terms.add(title) for kw in MUSIC_TERMS: if kw in title: knowledge_terms.add(kw) # 第一步:术语库直接替换(已知的固定错误) term_corrections = { "负点": "附点", "副点": "附点", "付点": "附点", "实质": "时值", "实值": "时值", "演音": "延音", "言音": "延音", "阅历": "乐理", "月理": "乐理", "音苻": "音符", "调苻": "调号", "拍苻": "拍符", "谱苻": "谱号", "首位": "手位", "守位": "手位", "只发": "指法", "织法": "指法", "台指": "抬指", "抬纸": "抬指", "只撑": "支撑", "肢撑": "支撑", "反服": "反复", "反副": "反复", "搞八度": "高八度", "搞八渡": "高八度", "底八度": "低八度", "联音": "连音", "连因": "连音", "挑音": "跳音", "还原记好": "还原记号", "缓原记号": "还原记号", "节牌": "节拍", "节凑": "节奏", "分首": "分手", "分守": "分手", "漫练": "慢练", "曼练": "慢练", "强若": "强弱", "强落": "强弱", "八分音苻": "八分音符", "十六分音苻": "十六分音符", "负其实": "附其实", "负加": "附加", "一数排": "一组排", } for wrong, correct in term_corrections.items(): text = text.replace(wrong, correct) # 第二步:语义异常检测 + 上下文推断 anomalies = detect_anomalies_in_text(text, knowledge_terms) for original, replacement, reason in anomalies: if original in text: text = text.replace(original, replacement) # 第三步:歌曲名称补全 song_names = { "盖头来": "《掀起你的盖头来》", "掀起我的盖头来": "《掀起你的盖头来》", "小星星": "《小星星》", "两只老虎": "《两只老虎》", "欢乐颂": "《欢乐颂》", "献给爱丽丝": "《献给爱丽丝》", "土耳其进行曲": "《土耳其进行曲》", "小步舞曲": "《小步舞曲》", } for fragment, full_name in song_names.items(): if fragment in text and full_name not in text: text = text.replace(fragment, full_name) return text def load_config(config_path): """加载配置文件""" with open(config_path, "r", encoding="utf-8") as f: return yaml.safe_load(f) def run_cmd(cmd, capture=True): """执行命令""" print(f"[CMD] {cmd[:100]}...") if capture: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, encoding="utf-8", errors="ignore", ) if result.returncode != 0: print(f"[ERR] {result.stderr[:200] if result.stderr else 'unknown'}") return result.returncode == 0 return os.system(cmd) == 0 def to_srt_time(t): """秒转SRT时间格式""" h = int(t // 3600) m = int((t % 3600) // 60) s = int(t % 60) ms = int((t % 1) * 1000) return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" def extract_clips(config, output_dir): """提取知识点片段""" print("\n[步骤1] 提取视频片段...") clip_paths = [] inter_dir = os.path.join(output_dir, "intermediates") os.makedirs(inter_dir, exist_ok=True) # 铁律:检测并修复重叠片段 clips = config["clips"] filtered_clips = [] for i, clip in enumerate(clips): new_clip = dict(clip) # 复制一份 if filtered_clips and new_clip["start"] < filtered_clips[-1]["end"]: # 重叠:调整前一个片段的end时间 old_end = filtered_clips[-1]["end"] filtered_clips[-1]["end"] = new_clip["start"] print( f" [FIX] 重叠修复: {filtered_clips[-1]['title']} end {old_end}s -> {new_clip['start']}s" ) filtered_clips.append(new_clip) # 移除时长<=0的片段(重叠修复后可能出现) valid_clips = [] for clip in filtered_clips: if clip["end"] - clip["start"] > 0: valid_clips.append(clip) else: print(f" [SKIP] {clip['title']} 时长为0,跳过") for i, clip in enumerate(valid_clips): idx = i + 1 start = clip["start"] end = clip["end"] duration = end - start if duration <= 0: print(f" [SKIP] {clip['title']} 时长为0,跳过") continue out_path = os.path.join(inter_dir, f"clip{idx}.mp4") fade_dur = config.get("fade_duration", 1) cmd = f'ffmpeg -y -ss {start} -i "{config["video_src"]}" -t {duration} -c:v libx264 -preset fast -crf 20 -c:a aac -y "{out_path}"' if run_cmd(cmd): # 添加淡入淡出 faded_path = os.path.join(inter_dir, f"clip{idx}_fade.mp4") cmd = f'ffmpeg -y -i "{out_path}" -vf "fade=t=in:st=0:d={fade_dur},fade=t=out:st={duration - fade_dur}:d={fade_dur}" -af "afade=t=in:st=0:d={fade_dur},afade=t=out:st={duration - fade_dur}:d={fade_dur}" -c:v libx264 -crf 20 -c:a aac -y "{faded_path}"' run_cmd(cmd) clip_paths.append(faded_path) # 移除标题中的emoji避免终端编码错误 clean_title = clip["title"].encode("gbk", errors="ignore").decode("gbk") print(f" clip{idx}: {clean_title} ({duration}s) OK") else: clean_title = clip["title"].encode("gbk", errors="ignore").decode("gbk") print(f" clip{idx}: {clean_title} FAILED") return clip_paths, valid_clips def transcribe_clips(clip_paths, config, output_dir): """转录片段(使用本地模型,GPU优先,CPU保底)""" print("\n[步骤2] 转录片段...") json_paths = [] video_params = config.get("video_params", {}) model = video_params.get("whisper_model", "large") model_path = video_params.get( "whisper_model_path", "D:/AI/LM-Models/faster-whisper/large-v3" ) inter_dir = os.path.join(output_dir, "intermediates") # 尝试加载完整转录文件(由extract_terms_from_ppt.py生成) # 可能在output/intermediates/或上一级的intermediates/ full_transcript_path = os.path.join(inter_dir, "full_transcript.json") if not os.path.exists(full_transcript_path): parent_inter_dir = os.path.join(os.path.dirname(output_dir), "intermediates") full_transcript_path = os.path.join(parent_inter_dir, "full_transcript.json") full_transcript = None if os.path.exists(full_transcript_path): with open(full_transcript_path, "r", encoding="utf-8") as f: full_transcript = json.load(f) print( f" [INFO] 加载完整转录文件: {len(full_transcript)} 个片段 ({full_transcript_path})" ) use_fast_whisper = video_params.get("use_fast_whisper", True) if use_fast_whisper: from faster_whisper import WhisperModel # 先尝试GPU,不行就用CPU,保证能运行 model = None try: model = WhisperModel(model_path, device="cuda", compute_type="float16") print(" [INFO] 使用CUDA GPU加速转录") except Exception as e: print(f" [WARNING] GPU不可用,使用CPU转录: {str(e)[:50]}") model = WhisperModel(model_path, device="cpu", compute_type="int8") for i, (path, clip) in enumerate(zip(clip_paths, config["clips"]), 1): print(f" 转录 clip{i} ({clip['title']})...") # 如果有完整转录,直接使用对应时间段的内容 if full_transcript: clip_start = clip["start"] clip_end = clip["end"] # 放宽时间匹配:只要片段与 clip 有重叠就包含(而非严格要求 start 在范围内) # 原因:Whisper 的一句话可能跨越片段边界,过严过滤会导致内容缺失 clip_segments = [ seg for seg in full_transcript if seg["end"] > clip_start and seg["start"] < clip_end ] if clip_segments: # 调整时间戳为相对于片段开始,并限制在 clip 实际时长内 clip_duration = clip_end - clip_start result = {"text": "", "segments": []} for seg in clip_segments: adj_start = max(0, seg["start"] - clip_start) adj_end = seg["end"] - clip_start # 限制在 clip 实际时长范围内 if adj_start >= clip_duration: continue adj_end = min(adj_end, clip_duration) if adj_end <= adj_start: adj_end = adj_start + 0.1 result["text"] += seg["text"] result["segments"].append( { "start": adj_start, "end": adj_end, "text": seg["text"], } ) # 内容验证 - 使用多种关键词形式 title = clip.get("title", "") clean_title = re.sub(r"^知识点\d+[::]\s*", "", title) clean_title = re.sub(r"[《》]", "", clean_title) keywords = [clean_title] # 去掉"的"、"与"、"和"等连接词 shorter = re.sub(r"[的与和及]", "", clean_title) if shorter != clean_title: keywords.append(shorter) # 提取所有2-4字符的中文词组(从短到长) core_words = [] for length in [2, 3, 4]: words = re.findall( r"[\u4e00-\u9fff]{" + str(length) + r"}", clean_title ) core_words.extend(words) keywords.extend(core_words) keywords = list(dict.fromkeys(keywords)) # 对转录文本应用术语纠正后再验证(Whisper 可能把"延音"识别为"演音"/"言音"等) term_corrections = dict(config.get("term_corrections", {})) # 补充内置纠正规则 term_corrections.update( { "言音": "延音", "演音": "延音", "副点": "附点", "负点": "附点", "付点": "附点", } ) transcript_text = result["text"] for wrong, correct in term_corrections.items(): transcript_text = transcript_text.replace(wrong, correct) match_count = sum(1 for kw in keywords if kw in transcript_text) matched = [kw for kw in keywords if kw in transcript_text] if keywords and match_count == 0: print( f" [SKIP] 内容不匹配: 标题'{clean_title}',关键词{keywords},转录中未找到" ) print(f" 转录内容: {transcript_text[:100]}...") json_paths.append(None) continue json_path = os.path.join(inter_dir, f"clip{i}.json") with open(json_path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) json_paths.append(json_path) print( f" clip{i}完成 ({match_count}/{len(keywords)} 关键词匹配: {matched})" ) continue # 如果没有完整转录,则重新转录 segments, info = model.transcribe(path, language="zh", beam_size=5) result = {"text": "", "segments": []} for seg in segments: result["text"] += seg.text result["segments"].append( {"start": seg.start, "end": seg.end, "text": seg.text} ) # 内容验证 title = clip.get("title", "") clean_title = re.sub(r"^知识点\d+[::]\s*", "", title) clean_title = re.sub(r"[《》]", "", clean_title) keywords = [clean_title] if len(clean_title) > 6: for length in [6, 5, 4, 3]: if len(clean_title) >= length: keywords.append(clean_title[-length:]) keywords = list(dict.fromkeys(keywords)) transcript_text = result["text"] match_count = sum(1 for kw in keywords if kw in transcript_text) matched = [kw for kw in keywords if kw in transcript_text] if keywords and match_count == 0: print( f" [SKIP] 内容不匹配: 标题'{clean_title}',关键词{keywords},转录中未找到" ) print(f" 转录内容: {transcript_text[:100]}...") json_paths.append(None) continue json_path = os.path.join(inter_dir, f"clip{i}.json") with open(json_path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) json_paths.append(json_path) print( f" clip{i}完成 ({match_count}/{len(keywords)} 关键词匹配: {matched})" ) # 释放 GPU 资源 print(" [GPU] 释放模型资源...") if model is not None: del model gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() print(" [GPU] 资源已释放") return json_paths def generate_subtitles(clip_paths, json_paths, config, output_dir): """生成三级字幕""" print("\n[步骤3] 生成字幕...") subs_dir = os.path.join(output_dir, "subs") os.makedirs(subs_dir, exist_ok=True) # 计算偏移:用 JSON 中 segments 的实际最大 end 时间,而非 config 中的 duration # 原因:放宽的转录过滤可能包含跨边界的片段,实际时长可能略大于 config duration offsets = [] current = 0 valid_clips = [] for i, (clip, jp) in enumerate(zip(config["clips"], json_paths)): if jp and os.path.exists(jp): offsets.append(current) valid_clips.append(clip) # 用 JSON 中 segments 的实际最大 end 作为偏移增量 with open(jp, "r", encoding="utf-8") as f: data = json.load(f) segs = data.get("segments", []) if segs: actual_duration = max(s["end"] for s in segs) else: actual_duration = clip["end"] - clip["start"] current += actual_duration else: print(f" [SKIP] 字幕跳过: {clip['title']} (内容不匹配)") term_corrections = config.get("term_corrections", {}) # 生成三个版本 for version in ["original", "terms", "ai"]: srt_lines = [] sub_idx = 1 # 标题 title_dur = config.get("title_duration", 3) for i, clip in enumerate(valid_clips): offset = offsets[i] srt_lines.append(f"{sub_idx}") srt_lines.append( f"{to_srt_time(offset)} --> {to_srt_time(min(offset + title_dur, offset + 25))}" ) srt_lines.append(clip["title"]) srt_lines.append("") sub_idx += 1 # 对白 for i, clip in enumerate(valid_clips): json_path = json_paths[i] if not json_path or not os.path.exists(json_path): continue with open(json_path, "r", encoding="utf-8") as f: data = json.load(f) for seg in data.get("segments", []): text = seg["text"].strip() if not text: continue # 第一步:繁体转简体(必须在所有纠正之前,确保后续处理都是简体) text = zhconv.convert(text, "zh-cn") # 纠正处理 if version == "terms" or version == "ai": for wrong, correct in term_corrections.items(): text = text.replace(wrong, correct) # AI上下文纠正:基于语义异常检测 + 同音推断 + 知识点上下文 if version == "ai": # 第一步:术语库纠正 for wrong, correct in term_corrections.items(): text = text.replace(wrong, correct) # 第二步:直接替换已知错误(安全网,确保一定生效) direct_fixes = { "羞耻": "休止", "休指": "休止", "修止": "休止", "八分羞耻": "八分休止", "四分羞耻": "四分休止", "十六分羞耻": "十六分休止", "二分羞耻": "二分休止", "全羞耻": "全休止", "分羞耻": "分休止", "盖头来": "《掀起你的盖头来》", "掀起我的盖头来": "《掀起你的盖头来》", "负点": "附点", "副点": "附点", "付点": "附点", "实质": "时值", "演音": "延音", "言音": "延音", "阅历": "乐理", "月理": "乐理", "音苻": "音符", "调苻": "调号", "拍苻": "拍符", "谱苻": "谱号", "首位": "手位", "守位": "手位", "只发": "指法", "织法": "指法", "台指": "抬指", "抬纸": "抬指", "只撑": "支撑", "肢撑": "支撑", "反服": "反复", "反副": "反复", "搞八度": "高八度", "搞八渡": "高八度", "底八度": "低八度", "联音": "连音", "连因": "连音", "挑音": "跳音", "还原记好": "还原记号", "缓原记号": "还原记号", "节牌": "节拍", "节凑": "节奏", "分首": "分手", "分守": "分手", "漫练": "慢练", "曼练": "慢练", "强若": "强弱", "强落": "强弱", "负其实": "附其实", "负加": "附加", "一数排": "一组排", } for wrong, correct in direct_fixes.items(): text = text.replace(wrong, correct) # 第三步:语义异常检测与同音修正 original_text = text text = ai_context_correct( text, clip.get("title", ""), config.get("clips", []) ) if original_text != text: print(f' [AI纠正] "{original_text}" -> "{text}"') abs_start = offsets[i] + seg["start"] abs_end = offsets[i] + seg["end"] srt_lines.append(f"{sub_idx}") srt_lines.append(f"{to_srt_time(abs_start)} --> {to_srt_time(abs_end)}") srt_lines.append(text) srt_lines.append("") sub_idx += 1 # 保存 out_path = os.path.join(subs_dir, f"v1_{version}.srt") with open(out_path, "w", encoding="utf-8") as f: f.write("\n".join(srt_lines)) print(f" 生成v1_{version}.srt: {sub_idx - 1}条") return os.path.join(subs_dir, "v1_ai.srt") def merge_and_burn(clip_paths, subtitle_path, config, output_dir): """合并片段、添加标题卡并烧录字幕""" print("\n[步骤4] 合并片段、添加标题卡并烧录字幕...") # 合并片段(只合并内容匹配的片段) inter_dir = os.path.join(output_dir, "intermediates") list_path = os.path.join(inter_dir, "concat_list.txt") with open(list_path, "w", encoding="utf-8") as f: for i, p in enumerate(clip_paths): # 跳过内容不匹配的片段 json_path = os.path.join(inter_dir, f"clip{i + 1}.json") if json_path and os.path.exists(json_path): f.write(f"file '{p}'\n") concat_path = os.path.join(inter_dir, "concated.mp4") cmd = f'ffmpeg -y -f concat -safe 0 -i "{list_path}" -c copy -y "{concat_path}"' run_cmd(cmd) # 烧录字幕 - Windows路径需要转义 sub_path_fixed = subtitle_path.replace("\\", "/").replace(":", "\\\\:") title_style = f"FontSize={config.get('title_fontsize', 60)},PrimaryColour={config.get('title_color', '&HFFFF00')},Bold=1,MarginV=200" sub_style = f"FontSize={config.get('subtitle_fontsize', 24)},PrimaryColour={config.get('subtitle_color', '&HFFFFFF')},OutlineColour=&H000000,BorderStyle=3,Outline=1,MarginV=30" # 构建标题卡滤镜(每个知识点开头显示3秒黄色大字居中) # 重要:标题偏移量必须基于实际提取的片段时长,且只使用内容匹配的片段 title_filters = [] current_offset = 0 for i, clip_path in enumerate(clip_paths): # 跳过内容不匹配的片段 json_path = os.path.join(inter_dir, f"clip{i + 1}.json") if not json_path or not os.path.exists(json_path): continue clip = config["clips"][i] title_text = clip["title"] # 去掉"知识点X:"前缀 title_text = re.sub(r"^知识点\d+[::]\s*", "", title_text) # 转义特殊字符 title_text_escaped = title_text.replace("'", "\\'").replace(":", "\\:") # 获取实际片段时长 result = subprocess.run( f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1 "{clip_path}"', shell=True, capture_output=True, text=True, ) try: actual_duration = float(result.stdout.strip()) except: actual_duration = clip["end"] - clip["start"] title_dur = config.get("title_duration", 3) filter_str = f"drawtext=text='{title_text_escaped}':fontfile='C\\:/Windows/Fonts/msyh.ttc':fontsize={config.get('title_fontsize', 90)}:fontcolor=yellow:x=(w-text_w)/2:y=(h-text_h)/2:enable='between(t,{current_offset},{current_offset + min(title_dur, actual_duration)})':borderw=4:bordercolor=black" title_filters.append(filter_str) current_offset += actual_duration # 合并标题卡和字幕滤镜 all_filters = title_filters + [ f"subtitles={sub_path_fixed}:force_style='{sub_style}'" ] vf_str = ",".join(all_filters) # 获取下一个版本号 version = 1 while os.path.exists(os.path.join(output_dir, f"v{version}_final.mp4")): version += 1 final_path = os.path.join(output_dir, f"v{version}_final.mp4") cmd = f'ffmpeg -y -i "{concat_path}" -vf "{vf_str}" -c:v libx264 -crf 20 -c:a aac -y "{final_path}"' run_cmd(cmd) print(f"\n完成!输出: {final_path}") return final_path def main(): parser = argparse.ArgumentParser(description="钢琴课精华视频生成工具") parser.add_argument("--config", required=True, help="配置文件路径") parser.add_argument("--output", default=None, help="输出目录") args = parser.parse_args() config = load_config(args.config) # Use config's output_dir if --output not specified output_dir = args.output or config.get("output_dir", "./output") os.makedirs(output_dir, exist_ok=True) clip_paths, filtered_clips = extract_clips(config, output_dir) # Update config with filtered clips (remove overlapping ones) config["clips"] = filtered_clips json_paths = transcribe_clips(clip_paths, config, output_dir) subtitle_path = generate_subtitles(clip_paths, json_paths, config, output_dir) final_path = merge_and_burn(clip_paths, subtitle_path, config, output_dir) print(f"\n=== 生成完成 ===") print(f"视频文件: {final_path}") print(f"字幕文件: {os.path.join(output_dir, 'subs/')}") if __name__ == "__main__": main()