Files
hmo 04db423416 Initial commit: skills library
- 70 skills with code and documentation
- Add .gitignore (ignore __pycache__, output/, temp/, venv/)
- Clean up test intermediates and caches
2026-04-26 19:27:40 +08:00

1152 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
从PPT提取知识点,转录视频,自动匹配时间戳,生成配置文件。
用法:python scripts/extract_terms_from_ppt.py <pptx_path> <video_path> <output_config.yaml>
GPU 资源管理:
- 转录前清理残留 Python 进程,释放 GPU 显存
- 转录完成后显式释放模型,避免显存泄漏
"""
import subprocess
import os
import sys
import json
import gc
import re
import yaml
import zipfile
import zhconv
def extract_ppt_text(pptx_path):
"""从PPTX提取文本(XML解包方式,兼容中文)"""
texts_by_slide = []
with zipfile.ZipFile(pptx_path, "r") as z:
slide_files = sorted(
[
f
for f in z.namelist()
if f.startswith("ppt/slides/slide") and f.endswith(".xml")
]
)
for slide_file in slide_files:
content = z.read(slide_file).decode("utf-8", errors="replace")
texts = re.findall(r"<a:t>([^<]*)</a:t>", content)
meaningful = [t.strip() for t in texts if t.strip() and len(t.strip()) > 1]
if meaningful:
slide_match = re.search(r"slide(\d+)", slide_file)
slide_num = int(slide_match.group(1)) if slide_match else 0
texts_by_slide.append(
{
"slide": slide_num,
"texts": meaningful,
"full_text": " ".join(meaningful),
}
)
return texts_by_slide
def find_main_knowledge_slide(ppt_texts):
"""找到'本课主要知识点'页面,提取完整知识点列表"""
for slide in ppt_texts:
text = slide["full_text"]
# 查找包含"本课主要知识点"或类似标题的页面
if any(
kw in text
for kw in [
"本课主要知识点",
"本节课重要知识点",
"本课知识点",
"主要知识点",
"本课内容",
]
):
# 从该页面提取完整的知识点文本
knowledge_points = extract_knowledge_points_from_slide(slide)
return slide, knowledge_points
return None, []
def extract_knowledge_points_from_slide(slide):
"""从知识点页面提取完整的知识点(按PPT结构解析,不拆分术语)"""
knowledge_points = []
seen = set()
# 先合并所有文本节点,然后整体处理
full_text = " ".join(slide["texts"])
# 确认这是知识点页面
if not any(
kw in full_text
for kw in ["本课主要知识点", "本节课重要知识点", "本课知识点", "主要知识点"]
):
return knowledge_points
# 先去掉标题行
full_text = re.sub(r"(本课|本节课)(重要|主要)?知识点", "", full_text)
# 去掉类别前缀(如"乐理:"、"演奏:"
full_text = re.sub(r"(乐理|演奏|弹奏|视奏|节奏训练)\s*[:]\s*", "", full_text)
# 去掉"的组合"等后缀
full_text = re.sub(r"的组合", "", full_text)
# 按顿号、逗号分割
parts = re.split(r"[、,,;\s]+", full_text)
for part in parts:
part = part.strip()
if not part or len(part) < 2:
continue
# 处理"与"、"和"连接的术语
sub_parts = re.split(r"[与和]", part)
for sub in sub_parts:
sub = sub.strip()
# 去掉书名号
sub = re.sub(r"[《》]", "", sub)
if sub and len(sub) >= 2 and sub not in seen:
seen.add(sub)
knowledge_points.append(sub)
return knowledge_points
def find_homework_pages(ppt_texts):
"""找到作业页面"""
homework_pages = []
for slide in ppt_texts:
text = slide["full_text"]
if any(
kw in text
for kw in ["作业", "课后练习", "课后作业", "今天的作业", "布置作业"]
):
homework_pages.append(slide)
return homework_pages
def transcribe_video(video_path, output_dir):
"""转录整个视频,返回带时间戳的转录结果
GPU 资源管理:
- 转录前检查 GPU 状态,如有残留进程则释放
- 转录完成后显式释放模型,避免显存泄漏
"""
print("\n[步骤2] 转录视频...")
inter_dir = os.path.join(output_dir, "intermediates")
os.makedirs(inter_dir, exist_ok=True)
# 检查是否已有转录文件
transcript_path = os.path.join(inter_dir, "full_transcript.json")
if os.path.exists(transcript_path):
print(" 发现已有转录文件,跳过转录")
with open(transcript_path, "r", encoding="utf-8") as f:
return json.load(f)
# 获取视频时长
result = subprocess.run(
f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1 "{video_path}"',
shell=True,
capture_output=True,
text=True,
)
duration = float(result.stdout.strip().split("=")[-1])
print(f" 视频时长: {duration:.0f}s ({duration / 60:.1f}分钟)")
# 分段转录(每5分钟一段)
chunk_size = 300
all_segments = []
chunk_idx = 0
offset = 0
from faster_whisper import WhisperModel
model_path = "D:/AI/LM-Models/faster-whisper/large-v3"
model = None
try:
model = WhisperModel(model_path, device="cuda", compute_type="float16")
print(" [INFO] 使用CUDA GPU加速转录")
except Exception as e:
print(f" [WARN] large-v3 加载失败: {e},尝试 base 模型")
model = WhisperModel("base", device="cuda", compute_type="float16")
print(" [INFO] 使用base模型转录")
try:
while offset < duration:
end = min(offset + chunk_size, duration)
print(f" 转录 {offset // 60}min-{end // 60}min...")
chunk_path = os.path.join(inter_dir, f"chunk_{chunk_idx}.mp4")
subprocess.run(
f'ffmpeg -y -ss {offset} -t {end - offset} -i "{video_path}" -c:v copy -c:a copy "{chunk_path}" -hide_banner -loglevel error',
shell=True,
)
segments, info = model.transcribe(chunk_path, language="zh", beam_size=5)
os.remove(chunk_path)
for seg in segments:
all_segments.append(
{
"start": offset + seg.start,
"end": offset + seg.end,
"text": seg.text,
}
)
offset += chunk_size
chunk_idx += 1
# 保存转录结果
with open(transcript_path, "w", encoding="utf-8") as f:
json.dump(all_segments, f, ensure_ascii=False, indent=2)
print(f" 转录完成: {len(all_segments)} 个片段")
finally:
# 释放 GPU 资源
print(" [GPU] 释放模型资源...")
if model is not None:
del model
gc.collect()
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
print(" [GPU] 资源已释放")
return all_segments
def find_anchor_time(segments, knowledge_points):
"""定位'本课主要知识点'锚点时间"""
print("\n[步骤3] 定位知识点锚点时间...")
# 搜索引导语
guide_phrases = [
"本课主要知识点",
"今天我们要学",
"这节课我们讲",
"本节课我们",
"今天我们学习",
"这节课主要",
"本课内容",
"今天主要",
]
anchor_candidates = []
for seg in segments:
text = zhconv.convert(seg["text"], "zh-cn")
for phrase in guide_phrases:
if phrase in text:
anchor_candidates.append(seg["start"])
break
if anchor_candidates:
anchor_time = min(anchor_candidates)
print(f" 引导语锚点: {anchor_time:.0f}s ({anchor_time // 60}min)")
else:
anchor_time = 0
print(" [WARN] 未找到引导语锚点")
# 找到所有知识点首次出现的时间
kw_first_appearances = {}
for seg in segments:
text = zhconv.convert(seg["text"], "zh-cn")
for kw in knowledge_points:
kw_simple = zhconv.convert(kw, "zh-cn")
# 也尝试更短的关键词形式
search_terms = [kw_simple]
shorter = re.sub(r"[的与和及]", "", kw_simple)
if shorter != kw_simple:
search_terms.append(shorter)
core_words = re.findall(r"[\u4e00-\u9fff]{2,4}", kw_simple)
search_terms.extend(core_words)
for term in search_terms:
if term not in kw_first_appearances and len(term) >= 2 and term in text:
kw_first_appearances[term] = seg["start"]
break
if kw_first_appearances:
sorted_kws = sorted(kw_first_appearances.items(), key=lambda x: x[1])
print(f" 知识点首次出现:")
for kw, time in sorted_kws[:5]:
print(f" {kw}: {time:.0f}s ({time // 60}min)")
# 教学开始锚点 = 所有知识点首次出现的最早时间
# 不再使用 +300s 跳过逻辑,因为知识点可能分散在不同时间
# 用最早出现时间作为教学开始,后续 match_knowledge_points 会按密度聚类
first_kw_time = sorted_kws[0][1]
final_anchor = max(anchor_time, first_kw_time)
print(
f" 教学开始锚点: {final_anchor:.0f}s ({final_anchor // 60}min) (最早知识点: {sorted_kws[0][0]})"
)
return final_anchor
if anchor_time > 0:
return anchor_time
print(" [WARN] 未找到明确锚点,使用视频前10%作为排除区")
return segments[-1]["end"] * 0.1 if segments else 0
def find_homework_anchor(segments, knowledge_anchor):
"""定位作业部分锚点时间:用'作业'词密度替代引导语匹配"""
print("\n[步骤4] 定位作业部分锚点时间...")
# 只搜索知识点教学之后的片段
late_segments = [s for s in segments if s["start"] > knowledge_anchor]
if not late_segments:
print(" [WARN] 知识点后无内容,使用视频85%位置")
return segments[-1]["end"] * 0.85 if segments else knowledge_anchor + 600
# 统计每30秒窗口内"作业"的出现次数
video_end = late_segments[-1]["end"]
window_size = 30 # 30秒一个窗口
window_counts = []
for seg in late_segments:
text = zhconv.convert(seg["text"], "zh-cn")
count = text.count("作业")
if count > 0:
window_start = int(seg["start"] // window_size) * window_size
window_counts.append((window_start, count, seg["start"]))
if not window_counts:
print(" [WARN] 未找到'作业'关键词,使用视频85%位置")
return segments[-1]["end"] * 0.85
# 按窗口聚合
from collections import defaultdict
window_totals = defaultdict(int)
for ws, count, _ in window_counts:
window_totals[ws] += count
# 找密度最高的窗口
best_window = max(window_totals.items(), key=lambda x: x[1])
best_window_start = best_window[0]
best_window_count = best_window[1]
# 在该窗口内找到第一个出现"作业"的精确时间点
for seg in late_segments:
if (
seg["start"] >= best_window_start
and seg["start"] < best_window_start + window_size
):
text = zhconv.convert(seg["text"], "zh-cn")
if "作业" in text:
print(
f" 作业锚点: {seg['start']:.0f}s ({seg['start'] // 60}min) [窗口密度: {best_window_count}次]"
)
return seg["start"]
# 兜底
print(f" [WARN] 未找到精确锚点,使用密度窗口起点: {best_window_start}s")
return best_window_start
def detect_gap_cutoff(segments_in_cluster, max_gap=10):
"""检测字幕间隔,找到应该截断的位置"""
if not segments_in_cluster:
return 0
cutoff_time = segments_in_cluster[-1]["end"]
for i in range(len(segments_in_cluster) - 1):
gap = segments_in_cluster[i + 1]["start"] - segments_in_cluster[i]["end"]
if gap > max_gap:
cutoff_time = segments_in_cluster[i]["end"]
break
return cutoff_time
def find_homework_end(segments, homework_start):
"""找到作业讲解结束时间"""
end_phrases = [
"今天就到这里",
"下课",
"作业讲完",
"今天的课就上到这里",
"我们下课",
"好今天",
"今天就上到",
"今天的作业就",
]
for seg in segments:
if seg["start"] < homework_start:
continue
text = zhconv.convert(seg["text"], "zh-cn")
for phrase in end_phrases:
if phrase in text:
return seg["start"]
hw_segments = [s for s in segments if s["start"] >= homework_start]
if hw_segments:
cutoff = detect_gap_cutoff(hw_segments, max_gap=15)
if cutoff > homework_start:
return cutoff
if segments:
return segments[-1]["end"] - 30
return homework_start + 60
def match_knowledge_points(segments, knowledge_points, anchor_time, homework_anchor):
"""知识点匹配:基于教学特征识别,区分导读/教学/回顾
核心策略:
1. 对每个知识点,用完整关键词+核心子词找到所有相关 segment
2. 用滑动窗口(30秒)扫描,找到知识点"密集讨论区域"
3. 评分基于:该区域内相关 segment 的总文本量、关键词密度、孤立程度
4. 排除导读特征:多个知识点在极短时间内密集出现
"""
print("\n[步骤5] 匹配知识点到视频片段...")
valid_segments = [
s for s in segments if anchor_time - 5 <= s["start"] < homework_anchor
]
# 术语纠正映射
term_corrections = {
"副点": "附点",
"负点": "附点",
"付点": "附点",
"黑剑": "黑键",
"实质": "时值",
"演音": "延音",
"阅历": "乐理",
"音苻": "音符",
"调苻": "调号",
"拍苻": "拍符",
"谱苻": "谱号",
"首位": "手位",
"守位": "手位",
"只发": "指法",
"织法": "指法",
"台指": "抬指",
"抬纸": "抬指",
"只撑": "支撑",
"肢撑": "支撑",
"反服": "反复",
"反副": "反复",
"搞八度": "高八度",
"搞八渡": "高八度",
"底八度": "低八度",
"联音": "连音",
"连因": "连音",
"挑音": "跳音",
"还原记好": "还原记号",
"缓原记号": "还原记号",
"节牌": "节拍",
"节凑": "节奏",
"分首": "分手",
"分守": "分手",
"漫练": "慢练",
"曼练": "慢练",
"强若": "强弱",
"强落": "强弱",
"言音": "延音",
}
def correct_text(text):
for wrong, correct in term_corrections.items():
text = text.replace(wrong, correct)
return text
# 预计算纠正后的文本
enriched_segments = []
for seg in valid_segments:
text_corrected = correct_text(zhconv.convert(seg["text"], "zh-cn"))
enriched_segments.append({**seg, "text_corrected": text_corrected})
def get_relevance_score(seg_text, keyword_simple):
"""
计算 segment 与知识点的相关度
策略:优先完整匹配,其次核心子词,避免通用词误匹配
对于复合词(如"还原记号"),不匹配通用后缀(如"记号"
"""
# 完整关键词匹配
if keyword_simple in seg_text:
return 3.0
# 核心子词匹配(去掉"的"等连接词)
shorter = re.sub(r"[的与和及]", "", keyword_simple)
if shorter != keyword_simple and len(shorter) >= 3 and shorter in seg_text:
return 2.0
# 数字归一化匹配:中文数字 ↔ 阿拉伯数字
# "十六分音符" ↔ "16分音符""八分音符" ↔ "8分音符"
chinese_to_num = {
"": "1",
"": "2",
"": "3",
"": "4",
"": "5",
"": "6",
"": "7",
"": "8",
"": "9",
"": "10",
"十六": "16",
"十五": "15",
"十四": "14",
"十三": "13",
"十二": "12",
"十一": "11",
"二十": "20",
"三十": "30",
}
num_to_chinese = {v: k for k, v in chinese_to_num.items()}
# 尝试数字替换后的匹配
normalized_text = seg_text
for cn, num in chinese_to_num.items():
normalized_text = normalized_text.replace(cn, num)
normalized_keyword = keyword_simple
for cn, num in chinese_to_num.items():
normalized_keyword = normalized_keyword.replace(cn, num)
if normalized_keyword in normalized_text and len(normalized_keyword) >= 3:
return 2.5
# 知识点相关词映射(用于匹配教学中的变体表达)
related_terms = {
"升降记号": ["升号", "降号", "升记", "降记", "升降", "升半", "降半"],
"还原记号": ["还原"],
"附点音符": ["附点"],
"延音线": ["延音", "同音连线"],
"双音的支撑": ["双音", "支撑"],
"婚礼进行曲": ["婚礼"],
"掀起你的盖头来": ["盖头来", "盖头", "掀起"],
"十六分音符": ["16分", "十六分"],
"八分音符": ["8分", "八分"],
}
if keyword_simple in related_terms:
for term in related_terms[keyword_simple]:
if term in seg_text:
return 1.5
# 对于复合词,只匹配前缀部分,不匹配通用后缀
# 通用后缀列表
generic_suffixes = ["记号", "符号", "音符", "练习", "曲子", "曲子", "部分"]
for suffix in generic_suffixes:
if keyword_simple.endswith(suffix) and len(keyword_simple) > len(suffix):
prefix = keyword_simple[: -len(suffix)]
if len(prefix) >= 2 and prefix in seg_text:
return 1.5
break
# 2-4字核心词匹配(只匹配长度>=3的,避免2字通用词)
for length in [4, 3]:
words = re.findall(r"[\u4e00-\u9fff]{" + str(length) + r"}", keyword_simple)
for word in words:
# 跳过通用词
if word in generic_suffixes:
continue
if word in seg_text:
return 1.0
return 0.0
def find_teaching_regions(keyword_simple, all_segs):
"""
找到某个知识点的所有教学区域
使用滑动窗口(60秒)扫描,计算每个窗口内的"教学强度"
教学强度 = 相关 segment 数量 × 平均相关度 × 总文本量
"""
if not all_segs:
return []
# 计算每个 segment 的相关度
scored_segs = []
for s in all_segs:
rel = get_relevance_score(s["text_corrected"], keyword_simple)
if rel > 0:
scored_segs.append({**s, "relevance": rel})
if not scored_segs:
return []
# 用相关 segment 聚类(间隔<90秒的归为一组)
scored_segs.sort(key=lambda x: x["start"])
clusters = []
current = [scored_segs[0]]
for s in scored_segs[1:]:
if s["start"] - current[-1]["end"] < 90:
current.append(s)
else:
clusters.append(current)
current = [s]
clusters.append(current)
return clusters
def score_cluster(cluster, keyword_simple, homework_anchor):
"""
评分:基于教学强度 + 时间位置偏好 + 推迟语言检测
教学特征:
- 相关 segment 数量多(反复讲解)
- 总文本量大(有详细解释)
- 不和其他知识点密集出现(不是列举)
- 完整关键词出现次数多
- 在视频中较早出现(教学在前,回顾在后)
- 有实际讲解内容(不是"等下再说"
回顾特征:
- 靠近作业时间(通常在作业前 5-10 分钟)
- 提到"刚才""今天学了"等回顾性语言
推迟特征:
- "等下再说""后面讲""稍后"
"""
total_count = len(cluster)
total_text_len = sum(len(s["text_corrected"]) for s in cluster)
time_span = max(cluster[-1]["end"] - cluster[0]["start"], 1)
cluster_start = cluster[0]["start"]
# 完整关键词出现次数
full_count = sum(1 for s in cluster if keyword_simple in s["text_corrected"])
# 平均相关度
avg_rel = sum(s.get("relevance", 0) for s in cluster) / max(total_count, 1)
# 检查是否和其他知识点密集出现
kw_simple_list = [zhconv.convert(kw, "zh-cn") for kw in knowledge_points]
other_kw_count = 0
for s in cluster:
for other_kw in kw_simple_list:
if other_kw != keyword_simple and other_kw in s["text_corrected"]:
other_kw_count += 1
break
# 检测推迟语言("等下再说"、"后面讲"等)
defer_phrases = [
"等下再说",
"等下讲",
"等一下再说",
"等一下讲",
"后面再说",
"后面讲",
"稍后再说",
"稍后讲",
"先不说",
"先不讲",
"先不讲了",
"待会儿说",
"待会儿讲",
"一会儿说",
"一会儿讲",
]
defer_count = 0
for s in cluster:
text = s["text_corrected"]
if any(phrase in text for phrase in defer_phrases):
defer_count += 1
defer_ratio = defer_count / max(total_count, 1)
# 检测预告/提及语言("先说一下"、"我先讲一下"等)vs 实际讲解
# 预告特征:提到知识点名称但没有详细解释
preview_phrases = [
"先说一下",
"先讲一下",
"先说",
"先讲",
"第一先说",
"首先说",
"首先讲",
"我先说",
"我先讲",
"提一下",
"提到",
]
# 讲解特征:有因果、解释、演示等
teaching_phrases = [
"因为",
"所以",
"就是",
"意思是",
"什么叫",
"什么意思",
"为什么",
"怎么",
"如何",
"比如说",
"例如",
"比如",
"",
"大家看",
"看一下",
"看到",
"",
"",
"",
"练习",
"注意",
"",
"需要",
"必须",
]
preview_count = sum(
1
for s in cluster
if any(phrase in s["text_corrected"] for phrase in preview_phrases)
)
teaching_count = sum(
1
for s in cluster
if any(phrase in s["text_corrected"] for phrase in teaching_phrases)
)
# 如果预告远多于讲解,说明只是提及而非教学
if teaching_count == 0 and preview_count > 0:
preview_ratio = preview_count / max(total_count, 1)
else:
preview_ratio = 0
# 评分公式
base_score = total_count * avg_rel
text_bonus = min(total_text_len / 30, 5.0)
full_bonus = full_count * 2.0
isolation_penalty = 1.0 / (1.0 + other_kw_count * 0.5)
score = (base_score + full_bonus) * text_bonus * isolation_penalty
# 推迟惩罚:如果 cluster 中有推迟语言,大幅降权
if defer_ratio > 0.1:
defer_penalty = max(0.1, 1.0 - defer_ratio * 2.0)
score *= defer_penalty
# 预告惩罚:如果 cluster 中只有预告没有讲解,大幅降权
if preview_ratio > 0.2:
preview_penalty = max(0.1, 1.0 - preview_ratio * 2.0)
score *= preview_penalty
# 讲解密度加成:讲解词占比越高,越像实际教学
teaching_density = teaching_count / max(total_count, 1)
teaching_bonus = 1.0 + teaching_density * 2.0
score *= teaching_bonus
# 时间位置:靠近作业时间的区域通常是回顾
time_to_homework = homework_anchor - cluster_start
# 导读过滤:如果 cluster 中完全没有讲解特征,且相关 segment 很少(<=2个),
# 说明只是导读提及而非实际教学,直接跳过
if teaching_count == 0 and total_count <= 2:
return {
"score": 0,
"total_count": total_count,
"full_count": full_count,
"time_span": round(time_span, 1),
"total_text_len": total_text_len,
"avg_rel": round(avg_rel, 2),
"other_kw_count": other_kw_count,
"has_review_language": False,
"time_to_homework": round(time_to_homework, 0),
"defer_count": defer_count,
"defer_ratio": round(defer_ratio, 2),
"teaching_count": teaching_count,
"preview_count": preview_count,
"preview_ratio": round(preview_ratio, 2),
}
# 时间位置惩罚:靠近作业时间的区域通常是回顾
time_to_homework = homework_anchor - cluster_start
if time_to_homework < 300:
review_penalty = max(0.1, time_to_homework / 300)
score *= review_penalty
# 回顾性语言检测
review_phrases = [
"刚才",
"刚刚",
"今天学",
"今天讲",
"回顾",
"练习一下",
"复习",
"我们学",
"我们讲",
]
has_review_language = any(
any(phrase in s["text_corrected"] for phrase in review_phrases)
for s in cluster
)
if has_review_language:
score *= 0.3
return {
"score": round(score, 2),
"total_count": total_count,
"full_count": full_count,
"time_span": round(time_span, 1),
"total_text_len": total_text_len,
"avg_rel": round(avg_rel, 2),
"other_kw_count": other_kw_count,
"has_review_language": has_review_language,
"time_to_homework": round(time_to_homework, 0),
"defer_count": defer_count,
"defer_ratio": round(defer_ratio, 2),
"teaching_count": teaching_count,
"preview_count": preview_count,
"preview_ratio": round(preview_ratio, 2),
}
all_candidates = []
for keyword in knowledge_points:
keyword_simple = zhconv.convert(keyword, "zh-cn")
clusters = find_teaching_regions(keyword_simple, enriched_segments)
if not clusters:
print(f" [SKIP] '{keyword}' - 转录中未找到")
all_candidates.append([])
continue
# 对每个簇评分
candidates = []
for cluster in clusters:
score_info = score_cluster(cluster, keyword_simple, homework_anchor)
if score_info["score"] == 0:
continue
# 检测字幕间隔截断
cutoff_time = detect_gap_cutoff(cluster, max_gap=15)
clip_duration = min(cutoff_time - cluster[0]["start"], 60)
clip_duration = max(clip_duration, 30)
clip_end = cluster[0]["start"] + clip_duration
candidates.append(
{
"title": keyword_simple,
"keyword": keyword_simple,
"start": int(cluster[0]["start"]),
"end": int(clip_end),
"density": round(
score_info["total_count"] / max(score_info["time_span"], 1), 4
),
"score": score_info["score"],
"total_count": score_info["total_count"],
"full_count": score_info["full_count"],
"time_span": score_info["time_span"],
"total_text_len": score_info["total_text_len"],
"avg_rel": score_info["avg_rel"],
"other_kw_count": score_info["other_kw_count"],
"preview": cluster[0]["text_corrected"][:60],
}
)
all_candidates.append(candidates)
if candidates:
best = max(candidates, key=lambda x: x["score"])
print(
f" [OK] '{keyword_simple}' -> {best['start']}s-{best['end']}s "
f"(score={best['score']:.1f}, 相关{best['total_count']}次/完整{best['full_count']}次, "
f"跨度{best['time_span']:.0f}s, 文本{best['total_text_len']}字, "
f"其他知识点{best['other_kw_count']}次, 预告{best.get('preview_count', 0)}/讲解{best.get('teaching_count', 0)})"
)
print(f" 预览: {best['preview']}")
else:
print(f" [SKIP] '{keyword}' - 无有效候选簇")
# 按视频时间顺序匹配,重叠时调整边界
print("\n [步骤6] 顺序约束匹配(按视频时间顺序)...")
all_best = []
for candidates in all_candidates:
if candidates:
all_best.append(max(candidates, key=lambda x: x["score"]))
all_best.sort(key=lambda x: x["start"])
filtered = []
for clip in all_best:
overlaps = False
for i, existing in enumerate(filtered):
if clip["start"] < existing["end"] and clip["end"] > existing["start"]:
overlaps = True
mid_point = (existing["end"] + clip["start"]) // 2
if clip["score"] > existing["score"]:
old_end = existing["end"]
existing["end"] = mid_point
print(
f" [ADJUST] '{existing['title']}' end {old_end}s -> {mid_point}s (让位给 '{clip['title']}')"
)
filtered.append(clip)
else:
new_start = mid_point
clip["start"] = new_start
print(
f" [ADJUST] '{clip['title']}' start {clip['start']}s (让位给 '{existing['title']}')"
)
filtered.append(clip)
break
if not overlaps:
filtered.append(clip)
print(
f" [MATCH] '{clip['title']}' -> {clip['start']}s-{clip['end']}s (score={clip['score']:.1f})"
)
filtered.sort(key=lambda x: x["start"])
return filtered
def match_homework(segments, homework_anchor, video_end):
"""匹配作业片段:基于语言分析定位作业结束点
作业结束的语言标记(使用模糊匹配,覆盖多种口语表达):
1. 明确结束语:"下课""拜拜""再见"
2. 作业完成语:"作业" + 完成标记(就这样/就这些/讲完了/说完了/到这儿/到这里)
3. 通用结束语:就到这里/就这样/说完了/讲完了/没什么说的
4. 群发通知:"发群里""到时候我发"
5. 长间隔:老师停顿超过 45 秒
"""
print("\n[步骤7] 匹配作业片段...")
hw_segments = [s for s in segments if s["start"] >= homework_anchor]
if not hw_segments:
print(" [SKIP] 未找到作业片段")
return None
# 模糊匹配:用正则表达式覆盖多种口语表达
# 优先级从高到低
end_patterns = [
# 1. 明确下课(最高优先级)
(r"下课", "下课"),
(r"拜拜", "拜拜"),
(r"再见", "再见"),
# 2. 作业完成语:"作业" + 各种完成表达
(r"作业.*就这样", "作业就这样"),
(r"作业.*就这些", "作业就这些"),
(r"作业.*就是这些", "作业就是这些"),
(r"作业.*讲到这里", "作业讲到这里"),
(r"作业.*讲到这", "作业讲到这"),
(r"作业.*说完了", "作业说完了"),
(r"作业.*讲完了", "作业讲完了"),
(r"作业.*布置完了", "作业布置完了"),
(r"作业.*就这么多", "作业就这么多"),
(r"作业.*到这儿", "作业到这儿"),
(r"作业.*到这里", "作业到这里"),
(r"作业.*完了", "作业完了"),
(r"作业.*结束", "作业结束"),
(r"作业.*说完了", "作业说完了"),
# 3. 通用结束语(中等优先级)
(r"就到这里", "就到这里"),
(r"就到这", "就到这"),
(r"就这样吧", "就这样吧"),
(r"就这样了", "就这样了"),
(r"就这些了", "就这些了"),
(r"就这些", "就这些"),
(r"说完了", "说完了"),
(r"讲完了", "讲完了"),
(r"没什么.*说的", "没什么说的"),
(r"没什么.*讲", "没什么讲的"),
(r"没别的", "没别的"),
(r"今天就到", "今天就到"),
(r"今天就这样", "今天就这样"),
(r"那就这样", "那就这样"),
(r"OK.*那就", "OK那就"),
# 4. 群发通知
(r"发群", "发群"),
(r"到时候.*发", "到时候发"),
# 5. 其他结束语(需要精确匹配,避免误匹配)
(r"好那", "好那"),
(r"好了", "好了"),
]
# 找到最后一个结束标记(按时间顺序扫描,记录最后一个匹配)
end_markers = [] # list of (time, pattern_name, text)
for seg in hw_segments:
text = zhconv.convert(seg["text"], "zh-cn")
for pattern, name in end_patterns:
if re.search(pattern, text):
end_markers.append((seg["start"], name, text[:60]))
break # 一个 segment 只匹配一个模式
if end_markers:
# 取最后一个结束标记
last_end_marker_time, last_pattern_name, _ = end_markers[-1]
print(f' 检测到结束标记: "{last_pattern_name}" @ {last_end_marker_time:.0f}s')
else:
last_end_marker_time = None
last_pattern_name = ""
# 策略2:检测长间隔(老师说完作业后的停顿)
gap_cutoff = detect_gap_cutoff(hw_segments, max_gap=45)
# 综合判断
if last_end_marker_time:
# 有结束语言,在结束语言后找第一个长间隔
after_end = [s for s in hw_segments if s["start"] >= last_end_marker_time]
if after_end and len(after_end) > 1:
gap_after_end = detect_gap_cutoff(after_end, max_gap=30)
# 只有当间隔是真正的间隔(不是视频末尾)时才使用
is_meaningful_gap = (
gap_after_end > last_end_marker_time + 45
and gap_after_end < video_end - 10
)
if is_meaningful_gap:
clip_end = min(gap_after_end, video_end)
print(
f' 作业结束: {last_end_marker_time:.0f}s ("{last_pattern_name}"),间隔截断: {clip_end:.0f}s'
)
else:
# 否则在结束语言后加30秒
clip_end = min(last_end_marker_time + 30, video_end)
print(
f' 作业结束: {last_end_marker_time:.0f}s ("{last_pattern_name}")+30s兜底: {clip_end:.0f}s'
)
else:
clip_end = min(last_end_marker_time + 30, video_end)
print(
f' 作业结束: {last_end_marker_time:.0f}s ("{last_pattern_name}")+30s兜底: {clip_end:.0f}s'
)
elif gap_cutoff > homework_anchor + 30:
# 没有明确结束语言,用间隔截断
clip_end = min(gap_cutoff, video_end)
print(f" 作业结束: 间隔截断 {clip_end:.0f}s")
else:
# 兜底:视频末尾前2分钟
clip_end = video_end - 120
print(f" 作业结束: 兜底到视频末尾前2分钟 {clip_end:.0f}s")
clip_end = min(clip_end, video_end)
duration = clip_end - homework_anchor
if duration < 10:
print(f" [SKIP] 作业片段太短: {duration:.0f}s")
return None
print(
f" [MATCH] '作业' -> {homework_anchor:.0f}s-{clip_end:.0f}s ({duration:.0f}s)"
)
return {
"title": "作业",
"keyword": "作业",
"start": int(homework_anchor),
"end": int(clip_end),
"density": 0,
"score": 0,
"preview": hw_segments[0]["text"][:60] if hw_segments else "",
}
def generate_config(video_path, clips, output_path):
"""生成配置文件"""
config = {
"video_src": video_path,
"output_dir": os.path.join(os.path.dirname(output_path), "output"),
"clips": [
{"title": c["title"], "start": c["start"], "end": c["end"]} for c in clips
],
"term_corrections": {
"黑剑": "黑键",
"负点": "附点",
"副点": "附点",
"实质": "时值",
"演音": "延音",
"阅历": "乐理",
"音苻": "音符",
"调苻": "调号",
"拍苻": "拍符",
"谱苻": "谱号",
"首位": "手位",
},
"video_params": {
"fade_duration": 1,
"title_duration": 3,
"title_fontsize": 90,
"title_color": "FFFF00",
"subtitle_fontsize": 24,
"subtitle_color": "FFFFFF",
"whisper_model": "large",
"use_fast_whisper": True,
"whisper_model_path": "D:/AI/LM-Models/faster-whisper/large-v3",
},
}
with open(output_path, "w", encoding="utf-8") as f:
yaml.dump(config, f, allow_unicode=True, default_flow_style=False)
print(f"\nOK: 配置文件已生成: {output_path}")
print(f" 知识点数量: {len(clips)}")
total_duration = sum(c["end"] - c["start"] for c in clips)
print(f" 总时长: {total_duration}s ({total_duration / 60:.1f}分钟)")
for i, c in enumerate(clips, 1):
print(f" {i}. {c['title']} ({c['start']}s-{c['end']}s)")
print("\n完成!使用以下命令生成精华视频:")
print(f" cd .opencode/skills/piano-lesson-highlight-generator")
print(f" python scripts/generate_highlights.py --config {output_path}")
def main():
if len(sys.argv) < 4:
print(
"用法: python extract_terms_from_ppt.py <pptx_path> <video_path> <output_config.yaml>"
)
sys.exit(1)
pptx_path = sys.argv[1]
video_path = sys.argv[2]
output_path = sys.argv[3]
# Step 1: 从PPT提取知识点
print("[步骤1] 从PPT提取知识点...")
ppt_texts = extract_ppt_text(pptx_path)
print(f" 提取到 {len(ppt_texts)} 页幻灯片内容")
knowledge_slide, knowledge_points = find_main_knowledge_slide(ppt_texts)
homework_pages = find_homework_pages(ppt_texts)
if knowledge_points:
print(f" 找到 {len(knowledge_points)} 个知识点: {', '.join(knowledge_points)}")
else:
print(" [WARN] 未找到'本课主要知识点'页面")
if homework_pages:
print(f" 找到 {len(homework_pages)} 个作业页面")
# Step 2: 转录视频
output_dir = os.path.dirname(output_path) or "."
os.makedirs(output_dir, exist_ok=True)
segments = transcribe_video(video_path, output_dir)
# Step 3: 定位锚点时间
anchor_time = find_anchor_time(segments, knowledge_points)
# Step 4: 定位作业锚点
homework_anchor = find_homework_anchor(segments, anchor_time)
video_end = segments[-1]["end"] if segments else 0
# Step 5: 匹配知识点
clips = match_knowledge_points(
segments, knowledge_points, anchor_time, homework_anchor
)
# Step 6: 匹配作业
homework_clip = match_homework(segments, homework_anchor, video_end)
if homework_clip:
clips.append(homework_clip)
if not clips:
print("[WARN] 未找到任何匹配的知识点,请检查PPT内容或视频")
sys.exit(1)
# Step 7: 生成配置
generate_config(video_path, clips, output_path)
if __name__ == "__main__":
main()