#!/usr/bin/env python3 """ 视频片段特征提取工具 通过语音转录识别字幕文本特征,自动定位并剪辑出符合特征的片段。 用法:python scripts/extract_by_text_pattern.py --config 配置文件路径.yaml """ import subprocess import os import json import yaml import argparse import re import zhconv from pypinyin import pinyin, Style def run_cmd(cmd, capture=True): """执行命令""" print(f"[CMD] {cmd[:100]}...") if capture: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, encoding="utf-8", errors="ignore", ) if result.returncode != 0: print(f"[ERR] {result.stderr[:200] if result.stderr else 'unknown'}") return result.returncode == 0 return os.system(cmd) == 0 def to_srt_time(t): """秒转SRT时间格式""" h = int(t // 3600) m = int((t % 3600) // 60) s = int(t % 60) ms = int((t % 1) * 1000) return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}" def transcribe_full_video(video_path, output_dir): """全视频转录(分段处理)""" print("\n[步骤1] 全视频转录...") inter_dir = os.path.join(output_dir, "intermediates") os.makedirs(inter_dir, exist_ok=True) # 检查是否已有转录文件 transcript_path = os.path.join(inter_dir, "full_transcript.json") if os.path.exists(transcript_path): print(" 发现已有转录文件,跳过转录") with open(transcript_path, "r", encoding="utf-8") as f: return json.load(f) # 获取视频时长 result = subprocess.run( f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1 "{video_path}"', shell=True, capture_output=True, text=True, ) duration = float(result.stdout.strip().split("=")[-1]) print(f" 视频时长: {duration:.0f}s ({duration / 60:.1f}分钟)") # 分段转录(每5分钟一段) chunk_size = 300 all_segments = [] chunk_idx = 0 offset = 0 from faster_whisper import WhisperModel model_path = "D:/AI/LM-Models/faster-whisper/large-v3" try: model = WhisperModel(model_path, device="cuda", compute_type="float16") print(" [INFO] 使用CUDA GPU加速转录") except: model = WhisperModel("base", device="cuda", compute_type="float16") print(" [INFO] 使用base模型转录") while offset < duration: end = min(offset + chunk_size, duration) print(f" 转录 {offset // 60}min-{end // 60}min...") chunk_path = os.path.join(inter_dir, f"chunk_{chunk_idx}.mp4") subprocess.run( f'ffmpeg -y -ss {offset} -t {end - offset} -i "{video_path}" -c:v copy -c:a copy "{chunk_path}" -hide_banner -loglevel error', shell=True, ) segments, info = model.transcribe(chunk_path, language="zh", beam_size=5) os.remove(chunk_path) for seg in segments: all_segments.append( { "start": offset + seg.start, "end": offset + seg.end, "text": seg.text, } ) offset += chunk_size chunk_idx += 1 # 保存完整转录 transcript_path = os.path.join(inter_dir, "full_transcript.json") with open(transcript_path, "w", encoding="utf-8") as f: json.dump(all_segments, f, ensure_ascii=False, indent=2) print(f" 转录完成: {len(all_segments)} 个片段") return all_segments def find_pattern_clips(segments, pattern_config): """根据文本特征查找匹配的片段(节拍检测专用)""" print("\n[步骤2] 文本特征匹配...") keyword = pattern_config.get("keyword", "大") window_size = pattern_config.get("window_size", 10) threshold = pattern_config.get("threshold", 3) min_duration = pattern_config.get("min_clip_duration", 5) max_duration = pattern_config.get("max_clip_duration", 30) merge_gap = pattern_config.get("merge_gap", 5) # 排除词列表:包含关键词但不是节拍用法的常见词 # 打拍子的"大"是发音,不是词义,所以包含"大"的正常词汇都要排除 exclude_phrases = [ "大拇指", "大家", "大小", "大概", "大量", "大学", "大陆", "大胆", "大约", "大师", "大片", "大厅", "大桥", "大道", "大海", "大气", "大赛", "大楼", "大脑", "大多", "大致", "大幅", "大奖", "大家好", "大的", "大部分", "大人", "大型", "大规模", "大幅度", "大门", "大碗", "大自然", "大部分", "大部分", "大部分", "大部分", "大部分", "大部分", ] # 去重 exclude_phrases = list(set(exclude_phrases)) if not segments: return [] # 扫描每个片段,判断是否为节拍用法 beat_segments = [] for seg in segments: text = zhconv.convert(seg["text"], "zh-cn") count = text.count(keyword) if count == 0: continue # 排除包含排除词的片段 has_exclude = any(phrase in text for phrase in exclude_phrases) if has_exclude: continue # 检测节拍特征 # 特征1: 连续"大"字(如"大大大大") consecutive_da = len(re.findall(r"大{2,}", text)) # 特征2: "大"+数字(如"大2大3") da_number = len(re.findall(r"大\d", text)) # 特征3: "大"单独出现(前后无其他汉字构成词) standalone_da = len( re.findall( r"[^a-zA-Z\u4e00-\u9fff]大[^a-zA-Z\u4e00-\u9fff]", " " + text + " " ) ) # 特征4: 大+唱名(do re mi fa sol la si) da_solfege = len(re.findall(r"大[哆来咪发嗦啦西doremi]", text, re.IGNORECASE)) beat_score = consecutive_da * 3 + da_number * 2 + standalone_da + da_solfege * 2 if beat_score > 0 or count >= 2: beat_segments.append( { "start": seg["start"], "end": seg["end"], "count": count, "beat_score": beat_score, "text": text[:80], } ) if not beat_segments: print(" 未找到节拍特征片段") return [] # 滑动窗口检测密集区域(使用节拍分数) matched_regions = [] for i, bs in enumerate(beat_segments): window_start = bs["start"] window_end = window_start + window_size window_score = sum( t["beat_score"] + t["count"] for t in beat_segments if t["start"] >= window_start and t["start"] < window_end ) if window_score >= threshold: matched_regions.append( { "start": bs["start"], "end": bs["end"], "score": window_score, } ) if not matched_regions: print(f" 未达到阈值(score < {threshold})") return [] # 合并相邻区域 merged = [] for region in matched_regions: if merged and region["start"] - merged[-1]["end"] < merge_gap: merged[-1]["end"] = max(merged[-1]["end"], region["end"]) else: merged.append(dict(region)) # 调整片段时长 clips = [] for region in merged: duration = region["end"] - region["start"] if duration < min_duration: center = (region["start"] + region["end"]) / 2 region["start"] = max(0, center - min_duration / 2) region["end"] = region["start"] + min_duration elif duration > max_duration: region["end"] = region["start"] + max_duration clips.append(region) # 移除重叠 filtered = [] for clip in clips: if filtered and clip["start"] < filtered[-1]["end"]: filtered[-1]["end"] = clip["start"] if clip["end"] - clip["start"] > 0: filtered.append(clip) print(f" 找到 {len(filtered)} 个匹配片段:") for i, clip in enumerate(filtered): duration = clip["end"] - clip["start"] print( f" 片段{i + 1}: {clip['start']:.0f}s-{clip['end']:.0f}s ({duration:.0f}s)" ) return filtered def extract_clips(video_path, clips, output_dir, fade_duration=1): """提取视频片段""" print("\n[步骤3] 提取视频片段...") inter_dir = os.path.join(output_dir, "intermediates") os.makedirs(inter_dir, exist_ok=True) clip_paths = [] for i, clip in enumerate(clips): idx = i + 1 start = clip["start"] end = clip["end"] duration = end - start out_path = os.path.join(inter_dir, f"clip{idx}.mp4") cmd = f'ffmpeg -y -ss {start} -i "{video_path}" -t {duration} -c:v libx264 -preset fast -crf 20 -c:a aac -y "{out_path}"' if run_cmd(cmd): # 添加淡入淡出 faded_path = os.path.join(inter_dir, f"clip{idx}_fade.mp4") cmd = f'ffmpeg -y -i "{out_path}" -vf "fade=t=in:st=0:d={fade_duration},fade=t=out:st={duration - fade_duration}:d={fade_duration}" -af "afade=t=in:st=0:d={fade_duration},afade=t=out:st={duration - fade_duration}:d={fade_duration}" -c:v libx264 -crf 20 -c:a aac -y "{faded_path}"' run_cmd(cmd) clip_paths.append(faded_path) print(f" clip{idx}: {duration:.0f}s OK") else: print(f" clip{idx}: FAILED") return clip_paths def generate_subtitles(segments, clips, output_dir, video_params): """生成字幕""" print("\n[步骤4] 生成字幕...") subs_dir = os.path.join(output_dir, "subs") os.makedirs(subs_dir, exist_ok=True) # 计算偏移 offsets = [] current = 0 for clip in clips: offsets.append(current) current += clip["end"] - clip["start"] srt_lines = [] sub_idx = 1 # 对白字幕 for i, clip in enumerate(clips): offset = offsets[i] for seg in segments: if clip["start"] <= seg["start"] < clip["end"]: text = zhconv.convert(seg["text"].strip(), "zh-cn") if not text: continue abs_start = offset + (seg["start"] - clip["start"]) abs_end = offset + (seg["end"] - clip["start"]) srt_lines.append(f"{sub_idx}") srt_lines.append(f"{to_srt_time(abs_start)} --> {to_srt_time(abs_end)}") srt_lines.append(text) srt_lines.append("") sub_idx += 1 out_path = os.path.join(subs_dir, "v1_ai.srt") with open(out_path, "w", encoding="utf-8") as f: f.write("\n".join(srt_lines)) print(f" 生成字幕: {sub_idx - 1}条") return out_path def merge_and_burn(clip_paths, subtitle_path, output_dir, video_params, title_text=""): """合并片段并烧录字幕""" print("\n[步骤5] 合并片段并烧录字幕...") inter_dir = os.path.join(output_dir, "intermediates") # 合并片段 list_path = os.path.join(inter_dir, "concat_list.txt") with open(list_path, "w", encoding="utf-8") as f: for p in clip_paths: f.write(f"file '{p}'\n") concat_path = os.path.join(inter_dir, "concated.mp4") cmd = f'ffmpeg -y -f concat -safe 0 -i "{list_path}" -c copy -y "{concat_path}"' run_cmd(cmd) # 构建标题卡滤镜 title_filters = [] if title_text: title_dur = video_params.get("title_duration", 3) title_fs = video_params.get("title_fontsize", 90) title_color = video_params.get("title_color", "FFFF00") filter_str = f"drawtext=text='{title_text}':fontfile='C\\:/Windows/Fonts/msyh.ttc':fontsize={title_fs}:fontcolor=yellow:x=(w-text_w)/2:y=(h-text_h)/2:enable='between(t,0,{title_dur})':borderw=4:bordercolor=black" title_filters.append(filter_str) # 字幕样式 sub_fs = video_params.get("subtitle_fontsize", 24) sub_color = video_params.get("subtitle_color", "FFFFFF") sub_path_fixed = subtitle_path.replace("\\", "/").replace(":", "\\\\:") sub_style = f"FontSize={sub_fs},PrimaryColour=&H{sub_color},OutlineColour=&H000000,BorderStyle=3,Outline=1,MarginV=30" all_filters = title_filters + [ f"subtitles={sub_path_fixed}:force_style='{sub_style}'" ] vf_str = ",".join(all_filters) # 输出 version = 1 while os.path.exists(os.path.join(output_dir, f"v{version}_final.mp4")): version += 1 final_path = os.path.join(output_dir, f"v{version}_final.mp4") cmd = f'ffmpeg -y -i "{concat_path}" -vf "{vf_str}" -c:v libx264 -crf 20 -c:a aac -y "{final_path}"' run_cmd(cmd) print(f"\n完成!输出: {final_path}") return final_path def main(): parser = argparse.ArgumentParser(description="视频片段特征提取工具") parser.add_argument("--config", required=True, help="配置文件路径") args = parser.parse_args() with open(args.config, "r", encoding="utf-8") as f: config = yaml.safe_load(f) video_path = config["video_src"] output_dir = config.get("output_dir", "./output") os.makedirs(output_dir, exist_ok=True) pattern_config = config.get("text_pattern", {}) video_params = config.get("video_params", {}) title_text = video_params.get("title_text", "") # 1. 全视频转录 segments = transcribe_full_video(video_path, output_dir) # 2. 文本特征匹配 clips = find_pattern_clips(segments, pattern_config) if not clips: print("未找到匹配的片段") return # 3. 提取片段 fade_dur = video_params.get("fade_duration", 1) clip_paths = extract_clips(video_path, clips, output_dir, fade_dur) # 4. 生成字幕 subtitle_path = generate_subtitles(segments, clips, output_dir, video_params) # 5. 合并并烧录 final_path = merge_and_burn( clip_paths, subtitle_path, output_dir, video_params, title_text ) print(f"\n=== 生成完成 ===") print(f"视频文件: {final_path}") if __name__ == "__main__": main()