Files
hmo 04db423416 Initial commit: skills library
- 70 skills with code and documentation
- Add .gitignore (ignore __pycache__, output/, temp/, venv/)
- Clean up test intermediates and caches
2026-04-26 19:27:40 +08:00

446 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
视频片段特征提取工具
通过语音转录识别字幕文本特征,自动定位并剪辑出符合特征的片段。
用法:python scripts/extract_by_text_pattern.py --config 配置文件路径.yaml
"""
import subprocess
import os
import json
import yaml
import argparse
import re
import zhconv
from pypinyin import pinyin, Style
def run_cmd(cmd, capture=True):
"""执行命令"""
print(f"[CMD] {cmd[:100]}...")
if capture:
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
encoding="utf-8",
errors="ignore",
)
if result.returncode != 0:
print(f"[ERR] {result.stderr[:200] if result.stderr else 'unknown'}")
return result.returncode == 0
return os.system(cmd) == 0
def to_srt_time(t):
"""秒转SRT时间格式"""
h = int(t // 3600)
m = int((t % 3600) // 60)
s = int(t % 60)
ms = int((t % 1) * 1000)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def transcribe_full_video(video_path, output_dir):
"""全视频转录(分段处理)"""
print("\n[步骤1] 全视频转录...")
inter_dir = os.path.join(output_dir, "intermediates")
os.makedirs(inter_dir, exist_ok=True)
# 检查是否已有转录文件
transcript_path = os.path.join(inter_dir, "full_transcript.json")
if os.path.exists(transcript_path):
print(" 发现已有转录文件,跳过转录")
with open(transcript_path, "r", encoding="utf-8") as f:
return json.load(f)
# 获取视频时长
result = subprocess.run(
f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1 "{video_path}"',
shell=True,
capture_output=True,
text=True,
)
duration = float(result.stdout.strip().split("=")[-1])
print(f" 视频时长: {duration:.0f}s ({duration / 60:.1f}分钟)")
# 分段转录(每5分钟一段)
chunk_size = 300
all_segments = []
chunk_idx = 0
offset = 0
from faster_whisper import WhisperModel
model_path = "D:/AI/LM-Models/faster-whisper/large-v3"
try:
model = WhisperModel(model_path, device="cuda", compute_type="float16")
print(" [INFO] 使用CUDA GPU加速转录")
except:
model = WhisperModel("base", device="cuda", compute_type="float16")
print(" [INFO] 使用base模型转录")
while offset < duration:
end = min(offset + chunk_size, duration)
print(f" 转录 {offset // 60}min-{end // 60}min...")
chunk_path = os.path.join(inter_dir, f"chunk_{chunk_idx}.mp4")
subprocess.run(
f'ffmpeg -y -ss {offset} -t {end - offset} -i "{video_path}" -c:v copy -c:a copy "{chunk_path}" -hide_banner -loglevel error',
shell=True,
)
segments, info = model.transcribe(chunk_path, language="zh", beam_size=5)
os.remove(chunk_path)
for seg in segments:
all_segments.append(
{
"start": offset + seg.start,
"end": offset + seg.end,
"text": seg.text,
}
)
offset += chunk_size
chunk_idx += 1
# 保存完整转录
transcript_path = os.path.join(inter_dir, "full_transcript.json")
with open(transcript_path, "w", encoding="utf-8") as f:
json.dump(all_segments, f, ensure_ascii=False, indent=2)
print(f" 转录完成: {len(all_segments)} 个片段")
return all_segments
def find_pattern_clips(segments, pattern_config):
"""根据文本特征查找匹配的片段(节拍检测专用)"""
print("\n[步骤2] 文本特征匹配...")
keyword = pattern_config.get("keyword", "")
window_size = pattern_config.get("window_size", 10)
threshold = pattern_config.get("threshold", 3)
min_duration = pattern_config.get("min_clip_duration", 5)
max_duration = pattern_config.get("max_clip_duration", 30)
merge_gap = pattern_config.get("merge_gap", 5)
# 排除词列表:包含关键词但不是节拍用法的常见词
# 打拍子的"大"是发音,不是词义,所以包含"大"的正常词汇都要排除
exclude_phrases = [
"大拇指",
"大家",
"大小",
"大概",
"大量",
"大学",
"大陆",
"大胆",
"大约",
"大师",
"大片",
"大厅",
"大桥",
"大道",
"大海",
"大气",
"大赛",
"大楼",
"大脑",
"大多",
"大致",
"大幅",
"大奖",
"大家好",
"大的",
"大部分",
"大人",
"大型",
"大规模",
"大幅度",
"大门",
"大碗",
"大自然",
"大部分",
"大部分",
"大部分",
"大部分",
"大部分",
"大部分",
]
# 去重
exclude_phrases = list(set(exclude_phrases))
if not segments:
return []
# 扫描每个片段,判断是否为节拍用法
beat_segments = []
for seg in segments:
text = zhconv.convert(seg["text"], "zh-cn")
count = text.count(keyword)
if count == 0:
continue
# 排除包含排除词的片段
has_exclude = any(phrase in text for phrase in exclude_phrases)
if has_exclude:
continue
# 检测节拍特征
# 特征1: 连续"大"字(如"大大大大"
consecutive_da = len(re.findall(r"{2,}", text))
# 特征2: "大"+数字(如"大2大3"
da_number = len(re.findall(r"\d", text))
# 特征3: "大"单独出现(前后无其他汉字构成词)
standalone_da = len(
re.findall(
r"[^a-zA-Z\u4e00-\u9fff]大[^a-zA-Z\u4e00-\u9fff]", " " + text + " "
)
)
# 特征4: 大+唱名(do re mi fa sol la si
da_solfege = len(re.findall(r"大[哆来咪发嗦啦西doremi]", text, re.IGNORECASE))
beat_score = consecutive_da * 3 + da_number * 2 + standalone_da + da_solfege * 2
if beat_score > 0 or count >= 2:
beat_segments.append(
{
"start": seg["start"],
"end": seg["end"],
"count": count,
"beat_score": beat_score,
"text": text[:80],
}
)
if not beat_segments:
print(" 未找到节拍特征片段")
return []
# 滑动窗口检测密集区域(使用节拍分数)
matched_regions = []
for i, bs in enumerate(beat_segments):
window_start = bs["start"]
window_end = window_start + window_size
window_score = sum(
t["beat_score"] + t["count"]
for t in beat_segments
if t["start"] >= window_start and t["start"] < window_end
)
if window_score >= threshold:
matched_regions.append(
{
"start": bs["start"],
"end": bs["end"],
"score": window_score,
}
)
if not matched_regions:
print(f" 未达到阈值(score < {threshold}")
return []
# 合并相邻区域
merged = []
for region in matched_regions:
if merged and region["start"] - merged[-1]["end"] < merge_gap:
merged[-1]["end"] = max(merged[-1]["end"], region["end"])
else:
merged.append(dict(region))
# 调整片段时长
clips = []
for region in merged:
duration = region["end"] - region["start"]
if duration < min_duration:
center = (region["start"] + region["end"]) / 2
region["start"] = max(0, center - min_duration / 2)
region["end"] = region["start"] + min_duration
elif duration > max_duration:
region["end"] = region["start"] + max_duration
clips.append(region)
# 移除重叠
filtered = []
for clip in clips:
if filtered and clip["start"] < filtered[-1]["end"]:
filtered[-1]["end"] = clip["start"]
if clip["end"] - clip["start"] > 0:
filtered.append(clip)
print(f" 找到 {len(filtered)} 个匹配片段:")
for i, clip in enumerate(filtered):
duration = clip["end"] - clip["start"]
print(
f" 片段{i + 1}: {clip['start']:.0f}s-{clip['end']:.0f}s ({duration:.0f}s)"
)
return filtered
def extract_clips(video_path, clips, output_dir, fade_duration=1):
"""提取视频片段"""
print("\n[步骤3] 提取视频片段...")
inter_dir = os.path.join(output_dir, "intermediates")
os.makedirs(inter_dir, exist_ok=True)
clip_paths = []
for i, clip in enumerate(clips):
idx = i + 1
start = clip["start"]
end = clip["end"]
duration = end - start
out_path = os.path.join(inter_dir, f"clip{idx}.mp4")
cmd = f'ffmpeg -y -ss {start} -i "{video_path}" -t {duration} -c:v libx264 -preset fast -crf 20 -c:a aac -y "{out_path}"'
if run_cmd(cmd):
# 添加淡入淡出
faded_path = os.path.join(inter_dir, f"clip{idx}_fade.mp4")
cmd = f'ffmpeg -y -i "{out_path}" -vf "fade=t=in:st=0:d={fade_duration},fade=t=out:st={duration - fade_duration}:d={fade_duration}" -af "afade=t=in:st=0:d={fade_duration},afade=t=out:st={duration - fade_duration}:d={fade_duration}" -c:v libx264 -crf 20 -c:a aac -y "{faded_path}"'
run_cmd(cmd)
clip_paths.append(faded_path)
print(f" clip{idx}: {duration:.0f}s OK")
else:
print(f" clip{idx}: FAILED")
return clip_paths
def generate_subtitles(segments, clips, output_dir, video_params):
"""生成字幕"""
print("\n[步骤4] 生成字幕...")
subs_dir = os.path.join(output_dir, "subs")
os.makedirs(subs_dir, exist_ok=True)
# 计算偏移
offsets = []
current = 0
for clip in clips:
offsets.append(current)
current += clip["end"] - clip["start"]
srt_lines = []
sub_idx = 1
# 对白字幕
for i, clip in enumerate(clips):
offset = offsets[i]
for seg in segments:
if clip["start"] <= seg["start"] < clip["end"]:
text = zhconv.convert(seg["text"].strip(), "zh-cn")
if not text:
continue
abs_start = offset + (seg["start"] - clip["start"])
abs_end = offset + (seg["end"] - clip["start"])
srt_lines.append(f"{sub_idx}")
srt_lines.append(f"{to_srt_time(abs_start)} --> {to_srt_time(abs_end)}")
srt_lines.append(text)
srt_lines.append("")
sub_idx += 1
out_path = os.path.join(subs_dir, "v1_ai.srt")
with open(out_path, "w", encoding="utf-8") as f:
f.write("\n".join(srt_lines))
print(f" 生成字幕: {sub_idx - 1}")
return out_path
def merge_and_burn(clip_paths, subtitle_path, output_dir, video_params, title_text=""):
"""合并片段并烧录字幕"""
print("\n[步骤5] 合并片段并烧录字幕...")
inter_dir = os.path.join(output_dir, "intermediates")
# 合并片段
list_path = os.path.join(inter_dir, "concat_list.txt")
with open(list_path, "w", encoding="utf-8") as f:
for p in clip_paths:
f.write(f"file '{p}'\n")
concat_path = os.path.join(inter_dir, "concated.mp4")
cmd = f'ffmpeg -y -f concat -safe 0 -i "{list_path}" -c copy -y "{concat_path}"'
run_cmd(cmd)
# 构建标题卡滤镜
title_filters = []
if title_text:
title_dur = video_params.get("title_duration", 3)
title_fs = video_params.get("title_fontsize", 90)
title_color = video_params.get("title_color", "FFFF00")
filter_str = f"drawtext=text='{title_text}':fontfile='C\\:/Windows/Fonts/msyh.ttc':fontsize={title_fs}:fontcolor=yellow:x=(w-text_w)/2:y=(h-text_h)/2:enable='between(t,0,{title_dur})':borderw=4:bordercolor=black"
title_filters.append(filter_str)
# 字幕样式
sub_fs = video_params.get("subtitle_fontsize", 24)
sub_color = video_params.get("subtitle_color", "FFFFFF")
sub_path_fixed = subtitle_path.replace("\\", "/").replace(":", "\\\\:")
sub_style = f"FontSize={sub_fs},PrimaryColour=&H{sub_color},OutlineColour=&H000000,BorderStyle=3,Outline=1,MarginV=30"
all_filters = title_filters + [
f"subtitles={sub_path_fixed}:force_style='{sub_style}'"
]
vf_str = ",".join(all_filters)
# 输出
version = 1
while os.path.exists(os.path.join(output_dir, f"v{version}_final.mp4")):
version += 1
final_path = os.path.join(output_dir, f"v{version}_final.mp4")
cmd = f'ffmpeg -y -i "{concat_path}" -vf "{vf_str}" -c:v libx264 -crf 20 -c:a aac -y "{final_path}"'
run_cmd(cmd)
print(f"\n完成!输出: {final_path}")
return final_path
def main():
parser = argparse.ArgumentParser(description="视频片段特征提取工具")
parser.add_argument("--config", required=True, help="配置文件路径")
args = parser.parse_args()
with open(args.config, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
video_path = config["video_src"]
output_dir = config.get("output_dir", "./output")
os.makedirs(output_dir, exist_ok=True)
pattern_config = config.get("text_pattern", {})
video_params = config.get("video_params", {})
title_text = video_params.get("title_text", "")
# 1. 全视频转录
segments = transcribe_full_video(video_path, output_dir)
# 2. 文本特征匹配
clips = find_pattern_clips(segments, pattern_config)
if not clips:
print("未找到匹配的片段")
return
# 3. 提取片段
fade_dur = video_params.get("fade_duration", 1)
clip_paths = extract_clips(video_path, clips, output_dir, fade_dur)
# 4. 生成字幕
subtitle_path = generate_subtitles(segments, clips, output_dir, video_params)
# 5. 合并并烧录
final_path = merge_and_burn(
clip_paths, subtitle_path, output_dir, video_params, title_text
)
print(f"\n=== 生成完成 ===")
print(f"视频文件: {final_path}")
if __name__ == "__main__":
main()