Initial commit: skills library

- 70 skills with code and documentation
- Add .gitignore (ignore __pycache__, output/, temp/, venv/)
- Clean up test intermediates and caches
This commit is contained in:
hmo
2026-04-26 19:27:40 +08:00
commit 04db423416
861 changed files with 210414 additions and 0 deletions
@@ -0,0 +1,445 @@
#!/usr/bin/env python3
"""
视频片段特征提取工具
通过语音转录识别字幕文本特征,自动定位并剪辑出符合特征的片段。
用法:python scripts/extract_by_text_pattern.py --config 配置文件路径.yaml
"""
import subprocess
import os
import json
import yaml
import argparse
import re
import zhconv
from pypinyin import pinyin, Style
def run_cmd(cmd, capture=True):
"""执行命令"""
print(f"[CMD] {cmd[:100]}...")
if capture:
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
encoding="utf-8",
errors="ignore",
)
if result.returncode != 0:
print(f"[ERR] {result.stderr[:200] if result.stderr else 'unknown'}")
return result.returncode == 0
return os.system(cmd) == 0
def to_srt_time(t):
"""秒转SRT时间格式"""
h = int(t // 3600)
m = int((t % 3600) // 60)
s = int(t % 60)
ms = int((t % 1) * 1000)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def transcribe_full_video(video_path, output_dir):
"""全视频转录(分段处理)"""
print("\n[步骤1] 全视频转录...")
inter_dir = os.path.join(output_dir, "intermediates")
os.makedirs(inter_dir, exist_ok=True)
# 检查是否已有转录文件
transcript_path = os.path.join(inter_dir, "full_transcript.json")
if os.path.exists(transcript_path):
print(" 发现已有转录文件,跳过转录")
with open(transcript_path, "r", encoding="utf-8") as f:
return json.load(f)
# 获取视频时长
result = subprocess.run(
f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1 "{video_path}"',
shell=True,
capture_output=True,
text=True,
)
duration = float(result.stdout.strip().split("=")[-1])
print(f" 视频时长: {duration:.0f}s ({duration / 60:.1f}分钟)")
# 分段转录(每5分钟一段)
chunk_size = 300
all_segments = []
chunk_idx = 0
offset = 0
from faster_whisper import WhisperModel
model_path = "D:/AI/LM-Models/faster-whisper/large-v3"
try:
model = WhisperModel(model_path, device="cuda", compute_type="float16")
print(" [INFO] 使用CUDA GPU加速转录")
except:
model = WhisperModel("base", device="cuda", compute_type="float16")
print(" [INFO] 使用base模型转录")
while offset < duration:
end = min(offset + chunk_size, duration)
print(f" 转录 {offset // 60}min-{end // 60}min...")
chunk_path = os.path.join(inter_dir, f"chunk_{chunk_idx}.mp4")
subprocess.run(
f'ffmpeg -y -ss {offset} -t {end - offset} -i "{video_path}" -c:v copy -c:a copy "{chunk_path}" -hide_banner -loglevel error',
shell=True,
)
segments, info = model.transcribe(chunk_path, language="zh", beam_size=5)
os.remove(chunk_path)
for seg in segments:
all_segments.append(
{
"start": offset + seg.start,
"end": offset + seg.end,
"text": seg.text,
}
)
offset += chunk_size
chunk_idx += 1
# 保存完整转录
transcript_path = os.path.join(inter_dir, "full_transcript.json")
with open(transcript_path, "w", encoding="utf-8") as f:
json.dump(all_segments, f, ensure_ascii=False, indent=2)
print(f" 转录完成: {len(all_segments)} 个片段")
return all_segments
def find_pattern_clips(segments, pattern_config):
"""根据文本特征查找匹配的片段(节拍检测专用)"""
print("\n[步骤2] 文本特征匹配...")
keyword = pattern_config.get("keyword", "")
window_size = pattern_config.get("window_size", 10)
threshold = pattern_config.get("threshold", 3)
min_duration = pattern_config.get("min_clip_duration", 5)
max_duration = pattern_config.get("max_clip_duration", 30)
merge_gap = pattern_config.get("merge_gap", 5)
# 排除词列表:包含关键词但不是节拍用法的常见词
# 打拍子的"大"是发音,不是词义,所以包含"大"的正常词汇都要排除
exclude_phrases = [
"大拇指",
"大家",
"大小",
"大概",
"大量",
"大学",
"大陆",
"大胆",
"大约",
"大师",
"大片",
"大厅",
"大桥",
"大道",
"大海",
"大气",
"大赛",
"大楼",
"大脑",
"大多",
"大致",
"大幅",
"大奖",
"大家好",
"大的",
"大部分",
"大人",
"大型",
"大规模",
"大幅度",
"大门",
"大碗",
"大自然",
"大部分",
"大部分",
"大部分",
"大部分",
"大部分",
"大部分",
]
# 去重
exclude_phrases = list(set(exclude_phrases))
if not segments:
return []
# 扫描每个片段,判断是否为节拍用法
beat_segments = []
for seg in segments:
text = zhconv.convert(seg["text"], "zh-cn")
count = text.count(keyword)
if count == 0:
continue
# 排除包含排除词的片段
has_exclude = any(phrase in text for phrase in exclude_phrases)
if has_exclude:
continue
# 检测节拍特征
# 特征1: 连续"大"字(如"大大大大"
consecutive_da = len(re.findall(r"{2,}", text))
# 特征2: "大"+数字(如"大2大3"
da_number = len(re.findall(r"\d", text))
# 特征3: "大"单独出现(前后无其他汉字构成词)
standalone_da = len(
re.findall(
r"[^a-zA-Z\u4e00-\u9fff]大[^a-zA-Z\u4e00-\u9fff]", " " + text + " "
)
)
# 特征4: 大+唱名(do re mi fa sol la si
da_solfege = len(re.findall(r"大[哆来咪发嗦啦西doremi]", text, re.IGNORECASE))
beat_score = consecutive_da * 3 + da_number * 2 + standalone_da + da_solfege * 2
if beat_score > 0 or count >= 2:
beat_segments.append(
{
"start": seg["start"],
"end": seg["end"],
"count": count,
"beat_score": beat_score,
"text": text[:80],
}
)
if not beat_segments:
print(" 未找到节拍特征片段")
return []
# 滑动窗口检测密集区域(使用节拍分数)
matched_regions = []
for i, bs in enumerate(beat_segments):
window_start = bs["start"]
window_end = window_start + window_size
window_score = sum(
t["beat_score"] + t["count"]
for t in beat_segments
if t["start"] >= window_start and t["start"] < window_end
)
if window_score >= threshold:
matched_regions.append(
{
"start": bs["start"],
"end": bs["end"],
"score": window_score,
}
)
if not matched_regions:
print(f" 未达到阈值(score < {threshold}")
return []
# 合并相邻区域
merged = []
for region in matched_regions:
if merged and region["start"] - merged[-1]["end"] < merge_gap:
merged[-1]["end"] = max(merged[-1]["end"], region["end"])
else:
merged.append(dict(region))
# 调整片段时长
clips = []
for region in merged:
duration = region["end"] - region["start"]
if duration < min_duration:
center = (region["start"] + region["end"]) / 2
region["start"] = max(0, center - min_duration / 2)
region["end"] = region["start"] + min_duration
elif duration > max_duration:
region["end"] = region["start"] + max_duration
clips.append(region)
# 移除重叠
filtered = []
for clip in clips:
if filtered and clip["start"] < filtered[-1]["end"]:
filtered[-1]["end"] = clip["start"]
if clip["end"] - clip["start"] > 0:
filtered.append(clip)
print(f" 找到 {len(filtered)} 个匹配片段:")
for i, clip in enumerate(filtered):
duration = clip["end"] - clip["start"]
print(
f" 片段{i + 1}: {clip['start']:.0f}s-{clip['end']:.0f}s ({duration:.0f}s)"
)
return filtered
def extract_clips(video_path, clips, output_dir, fade_duration=1):
"""提取视频片段"""
print("\n[步骤3] 提取视频片段...")
inter_dir = os.path.join(output_dir, "intermediates")
os.makedirs(inter_dir, exist_ok=True)
clip_paths = []
for i, clip in enumerate(clips):
idx = i + 1
start = clip["start"]
end = clip["end"]
duration = end - start
out_path = os.path.join(inter_dir, f"clip{idx}.mp4")
cmd = f'ffmpeg -y -ss {start} -i "{video_path}" -t {duration} -c:v libx264 -preset fast -crf 20 -c:a aac -y "{out_path}"'
if run_cmd(cmd):
# 添加淡入淡出
faded_path = os.path.join(inter_dir, f"clip{idx}_fade.mp4")
cmd = f'ffmpeg -y -i "{out_path}" -vf "fade=t=in:st=0:d={fade_duration},fade=t=out:st={duration - fade_duration}:d={fade_duration}" -af "afade=t=in:st=0:d={fade_duration},afade=t=out:st={duration - fade_duration}:d={fade_duration}" -c:v libx264 -crf 20 -c:a aac -y "{faded_path}"'
run_cmd(cmd)
clip_paths.append(faded_path)
print(f" clip{idx}: {duration:.0f}s OK")
else:
print(f" clip{idx}: FAILED")
return clip_paths
def generate_subtitles(segments, clips, output_dir, video_params):
"""生成字幕"""
print("\n[步骤4] 生成字幕...")
subs_dir = os.path.join(output_dir, "subs")
os.makedirs(subs_dir, exist_ok=True)
# 计算偏移
offsets = []
current = 0
for clip in clips:
offsets.append(current)
current += clip["end"] - clip["start"]
srt_lines = []
sub_idx = 1
# 对白字幕
for i, clip in enumerate(clips):
offset = offsets[i]
for seg in segments:
if clip["start"] <= seg["start"] < clip["end"]:
text = zhconv.convert(seg["text"].strip(), "zh-cn")
if not text:
continue
abs_start = offset + (seg["start"] - clip["start"])
abs_end = offset + (seg["end"] - clip["start"])
srt_lines.append(f"{sub_idx}")
srt_lines.append(f"{to_srt_time(abs_start)} --> {to_srt_time(abs_end)}")
srt_lines.append(text)
srt_lines.append("")
sub_idx += 1
out_path = os.path.join(subs_dir, "v1_ai.srt")
with open(out_path, "w", encoding="utf-8") as f:
f.write("\n".join(srt_lines))
print(f" 生成字幕: {sub_idx - 1}")
return out_path
def merge_and_burn(clip_paths, subtitle_path, output_dir, video_params, title_text=""):
"""合并片段并烧录字幕"""
print("\n[步骤5] 合并片段并烧录字幕...")
inter_dir = os.path.join(output_dir, "intermediates")
# 合并片段
list_path = os.path.join(inter_dir, "concat_list.txt")
with open(list_path, "w", encoding="utf-8") as f:
for p in clip_paths:
f.write(f"file '{p}'\n")
concat_path = os.path.join(inter_dir, "concated.mp4")
cmd = f'ffmpeg -y -f concat -safe 0 -i "{list_path}" -c copy -y "{concat_path}"'
run_cmd(cmd)
# 构建标题卡滤镜
title_filters = []
if title_text:
title_dur = video_params.get("title_duration", 3)
title_fs = video_params.get("title_fontsize", 90)
title_color = video_params.get("title_color", "FFFF00")
filter_str = f"drawtext=text='{title_text}':fontfile='C\\:/Windows/Fonts/msyh.ttc':fontsize={title_fs}:fontcolor=yellow:x=(w-text_w)/2:y=(h-text_h)/2:enable='between(t,0,{title_dur})':borderw=4:bordercolor=black"
title_filters.append(filter_str)
# 字幕样式
sub_fs = video_params.get("subtitle_fontsize", 24)
sub_color = video_params.get("subtitle_color", "FFFFFF")
sub_path_fixed = subtitle_path.replace("\\", "/").replace(":", "\\\\:")
sub_style = f"FontSize={sub_fs},PrimaryColour=&H{sub_color},OutlineColour=&H000000,BorderStyle=3,Outline=1,MarginV=30"
all_filters = title_filters + [
f"subtitles={sub_path_fixed}:force_style='{sub_style}'"
]
vf_str = ",".join(all_filters)
# 输出
version = 1
while os.path.exists(os.path.join(output_dir, f"v{version}_final.mp4")):
version += 1
final_path = os.path.join(output_dir, f"v{version}_final.mp4")
cmd = f'ffmpeg -y -i "{concat_path}" -vf "{vf_str}" -c:v libx264 -crf 20 -c:a aac -y "{final_path}"'
run_cmd(cmd)
print(f"\n完成!输出: {final_path}")
return final_path
def main():
parser = argparse.ArgumentParser(description="视频片段特征提取工具")
parser.add_argument("--config", required=True, help="配置文件路径")
args = parser.parse_args()
with open(args.config, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
video_path = config["video_src"]
output_dir = config.get("output_dir", "./output")
os.makedirs(output_dir, exist_ok=True)
pattern_config = config.get("text_pattern", {})
video_params = config.get("video_params", {})
title_text = video_params.get("title_text", "")
# 1. 全视频转录
segments = transcribe_full_video(video_path, output_dir)
# 2. 文本特征匹配
clips = find_pattern_clips(segments, pattern_config)
if not clips:
print("未找到匹配的片段")
return
# 3. 提取片段
fade_dur = video_params.get("fade_duration", 1)
clip_paths = extract_clips(video_path, clips, output_dir, fade_dur)
# 4. 生成字幕
subtitle_path = generate_subtitles(segments, clips, output_dir, video_params)
# 5. 合并并烧录
final_path = merge_and_burn(
clip_paths, subtitle_path, output_dir, video_params, title_text
)
print(f"\n=== 生成完成 ===")
print(f"视频文件: {final_path}")
if __name__ == "__main__":
main()