04db423416
- 70 skills with code and documentation - Add .gitignore (ignore __pycache__, output/, temp/, venv/) - Clean up test intermediates and caches
1152 lines
40 KiB
Python
1152 lines
40 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
从PPT提取知识点,转录视频,自动匹配时间戳,生成配置文件。
|
||
用法:python scripts/extract_terms_from_ppt.py <pptx_path> <video_path> <output_config.yaml>
|
||
|
||
GPU 资源管理:
|
||
- 转录前清理残留 Python 进程,释放 GPU 显存
|
||
- 转录完成后显式释放模型,避免显存泄漏
|
||
"""
|
||
|
||
import subprocess
|
||
import os
|
||
import sys
|
||
import json
|
||
import gc
|
||
import re
|
||
import yaml
|
||
import zipfile
|
||
import zhconv
|
||
|
||
|
||
def extract_ppt_text(pptx_path):
|
||
"""从PPTX提取文本(XML解包方式,兼容中文)"""
|
||
texts_by_slide = []
|
||
with zipfile.ZipFile(pptx_path, "r") as z:
|
||
slide_files = sorted(
|
||
[
|
||
f
|
||
for f in z.namelist()
|
||
if f.startswith("ppt/slides/slide") and f.endswith(".xml")
|
||
]
|
||
)
|
||
for slide_file in slide_files:
|
||
content = z.read(slide_file).decode("utf-8", errors="replace")
|
||
texts = re.findall(r"<a:t>([^<]*)</a:t>", content)
|
||
meaningful = [t.strip() for t in texts if t.strip() and len(t.strip()) > 1]
|
||
if meaningful:
|
||
slide_match = re.search(r"slide(\d+)", slide_file)
|
||
slide_num = int(slide_match.group(1)) if slide_match else 0
|
||
texts_by_slide.append(
|
||
{
|
||
"slide": slide_num,
|
||
"texts": meaningful,
|
||
"full_text": " ".join(meaningful),
|
||
}
|
||
)
|
||
return texts_by_slide
|
||
|
||
|
||
def find_main_knowledge_slide(ppt_texts):
|
||
"""找到'本课主要知识点'页面,提取完整知识点列表"""
|
||
for slide in ppt_texts:
|
||
text = slide["full_text"]
|
||
# 查找包含"本课主要知识点"或类似标题的页面
|
||
if any(
|
||
kw in text
|
||
for kw in [
|
||
"本课主要知识点",
|
||
"本节课重要知识点",
|
||
"本课知识点",
|
||
"主要知识点",
|
||
"本课内容",
|
||
]
|
||
):
|
||
# 从该页面提取完整的知识点文本
|
||
knowledge_points = extract_knowledge_points_from_slide(slide)
|
||
return slide, knowledge_points
|
||
return None, []
|
||
|
||
|
||
def extract_knowledge_points_from_slide(slide):
|
||
"""从知识点页面提取完整的知识点(按PPT结构解析,不拆分术语)"""
|
||
knowledge_points = []
|
||
seen = set()
|
||
|
||
# 先合并所有文本节点,然后整体处理
|
||
full_text = " ".join(slide["texts"])
|
||
|
||
# 确认这是知识点页面
|
||
if not any(
|
||
kw in full_text
|
||
for kw in ["本课主要知识点", "本节课重要知识点", "本课知识点", "主要知识点"]
|
||
):
|
||
return knowledge_points
|
||
|
||
# 先去掉标题行
|
||
full_text = re.sub(r"(本课|本节课)(重要|主要)?知识点", "", full_text)
|
||
# 去掉类别前缀(如"乐理:"、"演奏:")
|
||
full_text = re.sub(r"(乐理|演奏|弹奏|视奏|节奏训练)\s*[::]\s*", "", full_text)
|
||
# 去掉"的组合"等后缀
|
||
full_text = re.sub(r"的组合", "", full_text)
|
||
|
||
# 按顿号、逗号分割
|
||
parts = re.split(r"[、,,;;\s]+", full_text)
|
||
for part in parts:
|
||
part = part.strip()
|
||
if not part or len(part) < 2:
|
||
continue
|
||
|
||
# 处理"与"、"和"连接的术语
|
||
sub_parts = re.split(r"[与和]", part)
|
||
for sub in sub_parts:
|
||
sub = sub.strip()
|
||
# 去掉书名号
|
||
sub = re.sub(r"[《》]", "", sub)
|
||
if sub and len(sub) >= 2 and sub not in seen:
|
||
seen.add(sub)
|
||
knowledge_points.append(sub)
|
||
|
||
return knowledge_points
|
||
|
||
|
||
def find_homework_pages(ppt_texts):
|
||
"""找到作业页面"""
|
||
homework_pages = []
|
||
for slide in ppt_texts:
|
||
text = slide["full_text"]
|
||
if any(
|
||
kw in text
|
||
for kw in ["作业", "课后练习", "课后作业", "今天的作业", "布置作业"]
|
||
):
|
||
homework_pages.append(slide)
|
||
return homework_pages
|
||
|
||
|
||
def transcribe_video(video_path, output_dir):
|
||
"""转录整个视频,返回带时间戳的转录结果
|
||
|
||
GPU 资源管理:
|
||
- 转录前检查 GPU 状态,如有残留进程则释放
|
||
- 转录完成后显式释放模型,避免显存泄漏
|
||
"""
|
||
print("\n[步骤2] 转录视频...")
|
||
inter_dir = os.path.join(output_dir, "intermediates")
|
||
os.makedirs(inter_dir, exist_ok=True)
|
||
|
||
# 检查是否已有转录文件
|
||
transcript_path = os.path.join(inter_dir, "full_transcript.json")
|
||
if os.path.exists(transcript_path):
|
||
print(" 发现已有转录文件,跳过转录")
|
||
with open(transcript_path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
|
||
# 获取视频时长
|
||
result = subprocess.run(
|
||
f'ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1 "{video_path}"',
|
||
shell=True,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
duration = float(result.stdout.strip().split("=")[-1])
|
||
print(f" 视频时长: {duration:.0f}s ({duration / 60:.1f}分钟)")
|
||
|
||
# 分段转录(每5分钟一段)
|
||
chunk_size = 300
|
||
all_segments = []
|
||
chunk_idx = 0
|
||
offset = 0
|
||
|
||
from faster_whisper import WhisperModel
|
||
|
||
model_path = "D:/AI/LM-Models/faster-whisper/large-v3"
|
||
model = None
|
||
try:
|
||
model = WhisperModel(model_path, device="cuda", compute_type="float16")
|
||
print(" [INFO] 使用CUDA GPU加速转录")
|
||
except Exception as e:
|
||
print(f" [WARN] large-v3 加载失败: {e},尝试 base 模型")
|
||
model = WhisperModel("base", device="cuda", compute_type="float16")
|
||
print(" [INFO] 使用base模型转录")
|
||
|
||
try:
|
||
while offset < duration:
|
||
end = min(offset + chunk_size, duration)
|
||
print(f" 转录 {offset // 60}min-{end // 60}min...")
|
||
|
||
chunk_path = os.path.join(inter_dir, f"chunk_{chunk_idx}.mp4")
|
||
subprocess.run(
|
||
f'ffmpeg -y -ss {offset} -t {end - offset} -i "{video_path}" -c:v copy -c:a copy "{chunk_path}" -hide_banner -loglevel error',
|
||
shell=True,
|
||
)
|
||
|
||
segments, info = model.transcribe(chunk_path, language="zh", beam_size=5)
|
||
os.remove(chunk_path)
|
||
|
||
for seg in segments:
|
||
all_segments.append(
|
||
{
|
||
"start": offset + seg.start,
|
||
"end": offset + seg.end,
|
||
"text": seg.text,
|
||
}
|
||
)
|
||
|
||
offset += chunk_size
|
||
chunk_idx += 1
|
||
|
||
# 保存转录结果
|
||
with open(transcript_path, "w", encoding="utf-8") as f:
|
||
json.dump(all_segments, f, ensure_ascii=False, indent=2)
|
||
|
||
print(f" 转录完成: {len(all_segments)} 个片段")
|
||
finally:
|
||
# 释放 GPU 资源
|
||
print(" [GPU] 释放模型资源...")
|
||
if model is not None:
|
||
del model
|
||
gc.collect()
|
||
import torch
|
||
|
||
if torch.cuda.is_available():
|
||
torch.cuda.empty_cache()
|
||
print(" [GPU] 资源已释放")
|
||
|
||
return all_segments
|
||
|
||
|
||
def find_anchor_time(segments, knowledge_points):
|
||
"""定位'本课主要知识点'锚点时间"""
|
||
print("\n[步骤3] 定位知识点锚点时间...")
|
||
|
||
# 搜索引导语
|
||
guide_phrases = [
|
||
"本课主要知识点",
|
||
"今天我们要学",
|
||
"这节课我们讲",
|
||
"本节课我们",
|
||
"今天我们学习",
|
||
"这节课主要",
|
||
"本课内容",
|
||
"今天主要",
|
||
]
|
||
|
||
anchor_candidates = []
|
||
|
||
for seg in segments:
|
||
text = zhconv.convert(seg["text"], "zh-cn")
|
||
for phrase in guide_phrases:
|
||
if phrase in text:
|
||
anchor_candidates.append(seg["start"])
|
||
break
|
||
|
||
if anchor_candidates:
|
||
anchor_time = min(anchor_candidates)
|
||
print(f" 引导语锚点: {anchor_time:.0f}s ({anchor_time // 60}min)")
|
||
else:
|
||
anchor_time = 0
|
||
print(" [WARN] 未找到引导语锚点")
|
||
|
||
# 找到所有知识点首次出现的时间
|
||
kw_first_appearances = {}
|
||
for seg in segments:
|
||
text = zhconv.convert(seg["text"], "zh-cn")
|
||
for kw in knowledge_points:
|
||
kw_simple = zhconv.convert(kw, "zh-cn")
|
||
# 也尝试更短的关键词形式
|
||
search_terms = [kw_simple]
|
||
shorter = re.sub(r"[的与和及]", "", kw_simple)
|
||
if shorter != kw_simple:
|
||
search_terms.append(shorter)
|
||
core_words = re.findall(r"[\u4e00-\u9fff]{2,4}", kw_simple)
|
||
search_terms.extend(core_words)
|
||
|
||
for term in search_terms:
|
||
if term not in kw_first_appearances and len(term) >= 2 and term in text:
|
||
kw_first_appearances[term] = seg["start"]
|
||
break
|
||
|
||
if kw_first_appearances:
|
||
sorted_kws = sorted(kw_first_appearances.items(), key=lambda x: x[1])
|
||
print(f" 知识点首次出现:")
|
||
for kw, time in sorted_kws[:5]:
|
||
print(f" {kw}: {time:.0f}s ({time // 60}min)")
|
||
|
||
# 教学开始锚点 = 所有知识点首次出现的最早时间
|
||
# 不再使用 +300s 跳过逻辑,因为知识点可能分散在不同时间
|
||
# 用最早出现时间作为教学开始,后续 match_knowledge_points 会按密度聚类
|
||
first_kw_time = sorted_kws[0][1]
|
||
final_anchor = max(anchor_time, first_kw_time)
|
||
print(
|
||
f" 教学开始锚点: {final_anchor:.0f}s ({final_anchor // 60}min) (最早知识点: {sorted_kws[0][0]})"
|
||
)
|
||
return final_anchor
|
||
|
||
if anchor_time > 0:
|
||
return anchor_time
|
||
|
||
print(" [WARN] 未找到明确锚点,使用视频前10%作为排除区")
|
||
return segments[-1]["end"] * 0.1 if segments else 0
|
||
|
||
|
||
def find_homework_anchor(segments, knowledge_anchor):
|
||
"""定位作业部分锚点时间:用'作业'词密度替代引导语匹配"""
|
||
print("\n[步骤4] 定位作业部分锚点时间...")
|
||
|
||
# 只搜索知识点教学之后的片段
|
||
late_segments = [s for s in segments if s["start"] > knowledge_anchor]
|
||
if not late_segments:
|
||
print(" [WARN] 知识点后无内容,使用视频85%位置")
|
||
return segments[-1]["end"] * 0.85 if segments else knowledge_anchor + 600
|
||
|
||
# 统计每30秒窗口内"作业"的出现次数
|
||
video_end = late_segments[-1]["end"]
|
||
window_size = 30 # 30秒一个窗口
|
||
window_counts = []
|
||
|
||
for seg in late_segments:
|
||
text = zhconv.convert(seg["text"], "zh-cn")
|
||
count = text.count("作业")
|
||
if count > 0:
|
||
window_start = int(seg["start"] // window_size) * window_size
|
||
window_counts.append((window_start, count, seg["start"]))
|
||
|
||
if not window_counts:
|
||
print(" [WARN] 未找到'作业'关键词,使用视频85%位置")
|
||
return segments[-1]["end"] * 0.85
|
||
|
||
# 按窗口聚合
|
||
from collections import defaultdict
|
||
|
||
window_totals = defaultdict(int)
|
||
for ws, count, _ in window_counts:
|
||
window_totals[ws] += count
|
||
|
||
# 找密度最高的窗口
|
||
best_window = max(window_totals.items(), key=lambda x: x[1])
|
||
best_window_start = best_window[0]
|
||
best_window_count = best_window[1]
|
||
|
||
# 在该窗口内找到第一个出现"作业"的精确时间点
|
||
for seg in late_segments:
|
||
if (
|
||
seg["start"] >= best_window_start
|
||
and seg["start"] < best_window_start + window_size
|
||
):
|
||
text = zhconv.convert(seg["text"], "zh-cn")
|
||
if "作业" in text:
|
||
print(
|
||
f" 作业锚点: {seg['start']:.0f}s ({seg['start'] // 60}min) [窗口密度: {best_window_count}次]"
|
||
)
|
||
return seg["start"]
|
||
|
||
# 兜底
|
||
print(f" [WARN] 未找到精确锚点,使用密度窗口起点: {best_window_start}s")
|
||
return best_window_start
|
||
|
||
|
||
def detect_gap_cutoff(segments_in_cluster, max_gap=10):
|
||
"""检测字幕间隔,找到应该截断的位置"""
|
||
if not segments_in_cluster:
|
||
return 0
|
||
|
||
cutoff_time = segments_in_cluster[-1]["end"]
|
||
for i in range(len(segments_in_cluster) - 1):
|
||
gap = segments_in_cluster[i + 1]["start"] - segments_in_cluster[i]["end"]
|
||
if gap > max_gap:
|
||
cutoff_time = segments_in_cluster[i]["end"]
|
||
break
|
||
|
||
return cutoff_time
|
||
|
||
|
||
def find_homework_end(segments, homework_start):
|
||
"""找到作业讲解结束时间"""
|
||
end_phrases = [
|
||
"今天就到这里",
|
||
"下课",
|
||
"作业讲完",
|
||
"今天的课就上到这里",
|
||
"我们下课",
|
||
"好今天",
|
||
"今天就上到",
|
||
"今天的作业就",
|
||
]
|
||
|
||
for seg in segments:
|
||
if seg["start"] < homework_start:
|
||
continue
|
||
text = zhconv.convert(seg["text"], "zh-cn")
|
||
for phrase in end_phrases:
|
||
if phrase in text:
|
||
return seg["start"]
|
||
|
||
hw_segments = [s for s in segments if s["start"] >= homework_start]
|
||
if hw_segments:
|
||
cutoff = detect_gap_cutoff(hw_segments, max_gap=15)
|
||
if cutoff > homework_start:
|
||
return cutoff
|
||
|
||
if segments:
|
||
return segments[-1]["end"] - 30
|
||
return homework_start + 60
|
||
|
||
|
||
def match_knowledge_points(segments, knowledge_points, anchor_time, homework_anchor):
|
||
"""知识点匹配:基于教学特征识别,区分导读/教学/回顾
|
||
|
||
核心策略:
|
||
1. 对每个知识点,用完整关键词+核心子词找到所有相关 segment
|
||
2. 用滑动窗口(30秒)扫描,找到知识点"密集讨论区域"
|
||
3. 评分基于:该区域内相关 segment 的总文本量、关键词密度、孤立程度
|
||
4. 排除导读特征:多个知识点在极短时间内密集出现
|
||
"""
|
||
print("\n[步骤5] 匹配知识点到视频片段...")
|
||
|
||
valid_segments = [
|
||
s for s in segments if anchor_time - 5 <= s["start"] < homework_anchor
|
||
]
|
||
|
||
# 术语纠正映射
|
||
term_corrections = {
|
||
"副点": "附点",
|
||
"负点": "附点",
|
||
"付点": "附点",
|
||
"黑剑": "黑键",
|
||
"实质": "时值",
|
||
"演音": "延音",
|
||
"阅历": "乐理",
|
||
"音苻": "音符",
|
||
"调苻": "调号",
|
||
"拍苻": "拍符",
|
||
"谱苻": "谱号",
|
||
"首位": "手位",
|
||
"守位": "手位",
|
||
"只发": "指法",
|
||
"织法": "指法",
|
||
"台指": "抬指",
|
||
"抬纸": "抬指",
|
||
"只撑": "支撑",
|
||
"肢撑": "支撑",
|
||
"反服": "反复",
|
||
"反副": "反复",
|
||
"搞八度": "高八度",
|
||
"搞八渡": "高八度",
|
||
"底八度": "低八度",
|
||
"联音": "连音",
|
||
"连因": "连音",
|
||
"挑音": "跳音",
|
||
"还原记好": "还原记号",
|
||
"缓原记号": "还原记号",
|
||
"节牌": "节拍",
|
||
"节凑": "节奏",
|
||
"分首": "分手",
|
||
"分守": "分手",
|
||
"漫练": "慢练",
|
||
"曼练": "慢练",
|
||
"强若": "强弱",
|
||
"强落": "强弱",
|
||
"言音": "延音",
|
||
}
|
||
|
||
def correct_text(text):
|
||
for wrong, correct in term_corrections.items():
|
||
text = text.replace(wrong, correct)
|
||
return text
|
||
|
||
# 预计算纠正后的文本
|
||
enriched_segments = []
|
||
for seg in valid_segments:
|
||
text_corrected = correct_text(zhconv.convert(seg["text"], "zh-cn"))
|
||
enriched_segments.append({**seg, "text_corrected": text_corrected})
|
||
|
||
def get_relevance_score(seg_text, keyword_simple):
|
||
"""
|
||
计算 segment 与知识点的相关度
|
||
|
||
策略:优先完整匹配,其次核心子词,避免通用词误匹配
|
||
|
||
对于复合词(如"还原记号"),不匹配通用后缀(如"记号")
|
||
"""
|
||
# 完整关键词匹配
|
||
if keyword_simple in seg_text:
|
||
return 3.0
|
||
|
||
# 核心子词匹配(去掉"的"等连接词)
|
||
shorter = re.sub(r"[的与和及]", "", keyword_simple)
|
||
if shorter != keyword_simple and len(shorter) >= 3 and shorter in seg_text:
|
||
return 2.0
|
||
|
||
# 数字归一化匹配:中文数字 ↔ 阿拉伯数字
|
||
# "十六分音符" ↔ "16分音符","八分音符" ↔ "8分音符"
|
||
chinese_to_num = {
|
||
"一": "1",
|
||
"二": "2",
|
||
"三": "3",
|
||
"四": "4",
|
||
"五": "5",
|
||
"六": "6",
|
||
"七": "7",
|
||
"八": "8",
|
||
"九": "9",
|
||
"十": "10",
|
||
"十六": "16",
|
||
"十五": "15",
|
||
"十四": "14",
|
||
"十三": "13",
|
||
"十二": "12",
|
||
"十一": "11",
|
||
"二十": "20",
|
||
"三十": "30",
|
||
}
|
||
num_to_chinese = {v: k for k, v in chinese_to_num.items()}
|
||
|
||
# 尝试数字替换后的匹配
|
||
normalized_text = seg_text
|
||
for cn, num in chinese_to_num.items():
|
||
normalized_text = normalized_text.replace(cn, num)
|
||
normalized_keyword = keyword_simple
|
||
for cn, num in chinese_to_num.items():
|
||
normalized_keyword = normalized_keyword.replace(cn, num)
|
||
|
||
if normalized_keyword in normalized_text and len(normalized_keyword) >= 3:
|
||
return 2.5
|
||
|
||
# 知识点相关词映射(用于匹配教学中的变体表达)
|
||
related_terms = {
|
||
"升降记号": ["升号", "降号", "升记", "降记", "升降", "升半", "降半"],
|
||
"还原记号": ["还原"],
|
||
"附点音符": ["附点"],
|
||
"延音线": ["延音", "同音连线"],
|
||
"双音的支撑": ["双音", "支撑"],
|
||
"婚礼进行曲": ["婚礼"],
|
||
"掀起你的盖头来": ["盖头来", "盖头", "掀起"],
|
||
"十六分音符": ["16分", "十六分"],
|
||
"八分音符": ["8分", "八分"],
|
||
}
|
||
|
||
if keyword_simple in related_terms:
|
||
for term in related_terms[keyword_simple]:
|
||
if term in seg_text:
|
||
return 1.5
|
||
|
||
# 对于复合词,只匹配前缀部分,不匹配通用后缀
|
||
# 通用后缀列表
|
||
generic_suffixes = ["记号", "符号", "音符", "练习", "曲子", "曲子", "部分"]
|
||
for suffix in generic_suffixes:
|
||
if keyword_simple.endswith(suffix) and len(keyword_simple) > len(suffix):
|
||
prefix = keyword_simple[: -len(suffix)]
|
||
if len(prefix) >= 2 and prefix in seg_text:
|
||
return 1.5
|
||
break
|
||
|
||
# 2-4字核心词匹配(只匹配长度>=3的,避免2字通用词)
|
||
for length in [4, 3]:
|
||
words = re.findall(r"[\u4e00-\u9fff]{" + str(length) + r"}", keyword_simple)
|
||
for word in words:
|
||
# 跳过通用词
|
||
if word in generic_suffixes:
|
||
continue
|
||
if word in seg_text:
|
||
return 1.0
|
||
|
||
return 0.0
|
||
|
||
def find_teaching_regions(keyword_simple, all_segs):
|
||
"""
|
||
找到某个知识点的所有教学区域
|
||
|
||
使用滑动窗口(60秒)扫描,计算每个窗口内的"教学强度"
|
||
教学强度 = 相关 segment 数量 × 平均相关度 × 总文本量
|
||
"""
|
||
if not all_segs:
|
||
return []
|
||
|
||
# 计算每个 segment 的相关度
|
||
scored_segs = []
|
||
for s in all_segs:
|
||
rel = get_relevance_score(s["text_corrected"], keyword_simple)
|
||
if rel > 0:
|
||
scored_segs.append({**s, "relevance": rel})
|
||
|
||
if not scored_segs:
|
||
return []
|
||
|
||
# 用相关 segment 聚类(间隔<90秒的归为一组)
|
||
scored_segs.sort(key=lambda x: x["start"])
|
||
clusters = []
|
||
current = [scored_segs[0]]
|
||
for s in scored_segs[1:]:
|
||
if s["start"] - current[-1]["end"] < 90:
|
||
current.append(s)
|
||
else:
|
||
clusters.append(current)
|
||
current = [s]
|
||
clusters.append(current)
|
||
|
||
return clusters
|
||
|
||
def score_cluster(cluster, keyword_simple, homework_anchor):
|
||
"""
|
||
评分:基于教学强度 + 时间位置偏好 + 推迟语言检测
|
||
|
||
教学特征:
|
||
- 相关 segment 数量多(反复讲解)
|
||
- 总文本量大(有详细解释)
|
||
- 不和其他知识点密集出现(不是列举)
|
||
- 完整关键词出现次数多
|
||
- 在视频中较早出现(教学在前,回顾在后)
|
||
- 有实际讲解内容(不是"等下再说")
|
||
|
||
回顾特征:
|
||
- 靠近作业时间(通常在作业前 5-10 分钟)
|
||
- 提到"刚才"、"今天学了"等回顾性语言
|
||
|
||
推迟特征:
|
||
- "等下再说"、"后面讲"、"稍后"等
|
||
"""
|
||
total_count = len(cluster)
|
||
total_text_len = sum(len(s["text_corrected"]) for s in cluster)
|
||
time_span = max(cluster[-1]["end"] - cluster[0]["start"], 1)
|
||
cluster_start = cluster[0]["start"]
|
||
|
||
# 完整关键词出现次数
|
||
full_count = sum(1 for s in cluster if keyword_simple in s["text_corrected"])
|
||
|
||
# 平均相关度
|
||
avg_rel = sum(s.get("relevance", 0) for s in cluster) / max(total_count, 1)
|
||
|
||
# 检查是否和其他知识点密集出现
|
||
kw_simple_list = [zhconv.convert(kw, "zh-cn") for kw in knowledge_points]
|
||
other_kw_count = 0
|
||
for s in cluster:
|
||
for other_kw in kw_simple_list:
|
||
if other_kw != keyword_simple and other_kw in s["text_corrected"]:
|
||
other_kw_count += 1
|
||
break
|
||
|
||
# 检测推迟语言("等下再说"、"后面讲"等)
|
||
defer_phrases = [
|
||
"等下再说",
|
||
"等下讲",
|
||
"等一下再说",
|
||
"等一下讲",
|
||
"后面再说",
|
||
"后面讲",
|
||
"稍后再说",
|
||
"稍后讲",
|
||
"先不说",
|
||
"先不讲",
|
||
"先不讲了",
|
||
"待会儿说",
|
||
"待会儿讲",
|
||
"一会儿说",
|
||
"一会儿讲",
|
||
]
|
||
defer_count = 0
|
||
for s in cluster:
|
||
text = s["text_corrected"]
|
||
if any(phrase in text for phrase in defer_phrases):
|
||
defer_count += 1
|
||
|
||
defer_ratio = defer_count / max(total_count, 1)
|
||
|
||
# 检测预告/提及语言("先说一下"、"我先讲一下"等)vs 实际讲解
|
||
# 预告特征:提到知识点名称但没有详细解释
|
||
preview_phrases = [
|
||
"先说一下",
|
||
"先讲一下",
|
||
"先说",
|
||
"先讲",
|
||
"第一先说",
|
||
"首先说",
|
||
"首先讲",
|
||
"我先说",
|
||
"我先讲",
|
||
"提一下",
|
||
"提到",
|
||
]
|
||
# 讲解特征:有因果、解释、演示等
|
||
teaching_phrases = [
|
||
"因为",
|
||
"所以",
|
||
"就是",
|
||
"意思是",
|
||
"什么叫",
|
||
"什么意思",
|
||
"为什么",
|
||
"怎么",
|
||
"如何",
|
||
"比如说",
|
||
"例如",
|
||
"比如",
|
||
"像",
|
||
"大家看",
|
||
"看一下",
|
||
"看到",
|
||
"弹",
|
||
"按",
|
||
"练",
|
||
"练习",
|
||
"注意",
|
||
"要",
|
||
"需要",
|
||
"必须",
|
||
]
|
||
preview_count = sum(
|
||
1
|
||
for s in cluster
|
||
if any(phrase in s["text_corrected"] for phrase in preview_phrases)
|
||
)
|
||
teaching_count = sum(
|
||
1
|
||
for s in cluster
|
||
if any(phrase in s["text_corrected"] for phrase in teaching_phrases)
|
||
)
|
||
|
||
# 如果预告远多于讲解,说明只是提及而非教学
|
||
if teaching_count == 0 and preview_count > 0:
|
||
preview_ratio = preview_count / max(total_count, 1)
|
||
else:
|
||
preview_ratio = 0
|
||
|
||
# 评分公式
|
||
base_score = total_count * avg_rel
|
||
text_bonus = min(total_text_len / 30, 5.0)
|
||
full_bonus = full_count * 2.0
|
||
isolation_penalty = 1.0 / (1.0 + other_kw_count * 0.5)
|
||
|
||
score = (base_score + full_bonus) * text_bonus * isolation_penalty
|
||
|
||
# 推迟惩罚:如果 cluster 中有推迟语言,大幅降权
|
||
if defer_ratio > 0.1:
|
||
defer_penalty = max(0.1, 1.0 - defer_ratio * 2.0)
|
||
score *= defer_penalty
|
||
|
||
# 预告惩罚:如果 cluster 中只有预告没有讲解,大幅降权
|
||
if preview_ratio > 0.2:
|
||
preview_penalty = max(0.1, 1.0 - preview_ratio * 2.0)
|
||
score *= preview_penalty
|
||
|
||
# 讲解密度加成:讲解词占比越高,越像实际教学
|
||
teaching_density = teaching_count / max(total_count, 1)
|
||
teaching_bonus = 1.0 + teaching_density * 2.0
|
||
score *= teaching_bonus
|
||
|
||
# 时间位置:靠近作业时间的区域通常是回顾
|
||
time_to_homework = homework_anchor - cluster_start
|
||
|
||
# 导读过滤:如果 cluster 中完全没有讲解特征,且相关 segment 很少(<=2个),
|
||
# 说明只是导读提及而非实际教学,直接跳过
|
||
if teaching_count == 0 and total_count <= 2:
|
||
return {
|
||
"score": 0,
|
||
"total_count": total_count,
|
||
"full_count": full_count,
|
||
"time_span": round(time_span, 1),
|
||
"total_text_len": total_text_len,
|
||
"avg_rel": round(avg_rel, 2),
|
||
"other_kw_count": other_kw_count,
|
||
"has_review_language": False,
|
||
"time_to_homework": round(time_to_homework, 0),
|
||
"defer_count": defer_count,
|
||
"defer_ratio": round(defer_ratio, 2),
|
||
"teaching_count": teaching_count,
|
||
"preview_count": preview_count,
|
||
"preview_ratio": round(preview_ratio, 2),
|
||
}
|
||
|
||
# 时间位置惩罚:靠近作业时间的区域通常是回顾
|
||
time_to_homework = homework_anchor - cluster_start
|
||
if time_to_homework < 300:
|
||
review_penalty = max(0.1, time_to_homework / 300)
|
||
score *= review_penalty
|
||
|
||
# 回顾性语言检测
|
||
review_phrases = [
|
||
"刚才",
|
||
"刚刚",
|
||
"今天学",
|
||
"今天讲",
|
||
"回顾",
|
||
"练习一下",
|
||
"复习",
|
||
"我们学",
|
||
"我们讲",
|
||
]
|
||
has_review_language = any(
|
||
any(phrase in s["text_corrected"] for phrase in review_phrases)
|
||
for s in cluster
|
||
)
|
||
if has_review_language:
|
||
score *= 0.3
|
||
|
||
return {
|
||
"score": round(score, 2),
|
||
"total_count": total_count,
|
||
"full_count": full_count,
|
||
"time_span": round(time_span, 1),
|
||
"total_text_len": total_text_len,
|
||
"avg_rel": round(avg_rel, 2),
|
||
"other_kw_count": other_kw_count,
|
||
"has_review_language": has_review_language,
|
||
"time_to_homework": round(time_to_homework, 0),
|
||
"defer_count": defer_count,
|
||
"defer_ratio": round(defer_ratio, 2),
|
||
"teaching_count": teaching_count,
|
||
"preview_count": preview_count,
|
||
"preview_ratio": round(preview_ratio, 2),
|
||
}
|
||
|
||
all_candidates = []
|
||
for keyword in knowledge_points:
|
||
keyword_simple = zhconv.convert(keyword, "zh-cn")
|
||
|
||
clusters = find_teaching_regions(keyword_simple, enriched_segments)
|
||
|
||
if not clusters:
|
||
print(f" [SKIP] '{keyword}' - 转录中未找到")
|
||
all_candidates.append([])
|
||
continue
|
||
|
||
# 对每个簇评分
|
||
candidates = []
|
||
for cluster in clusters:
|
||
score_info = score_cluster(cluster, keyword_simple, homework_anchor)
|
||
|
||
if score_info["score"] == 0:
|
||
continue
|
||
|
||
# 检测字幕间隔截断
|
||
cutoff_time = detect_gap_cutoff(cluster, max_gap=15)
|
||
clip_duration = min(cutoff_time - cluster[0]["start"], 60)
|
||
clip_duration = max(clip_duration, 30)
|
||
clip_end = cluster[0]["start"] + clip_duration
|
||
|
||
candidates.append(
|
||
{
|
||
"title": keyword_simple,
|
||
"keyword": keyword_simple,
|
||
"start": int(cluster[0]["start"]),
|
||
"end": int(clip_end),
|
||
"density": round(
|
||
score_info["total_count"] / max(score_info["time_span"], 1), 4
|
||
),
|
||
"score": score_info["score"],
|
||
"total_count": score_info["total_count"],
|
||
"full_count": score_info["full_count"],
|
||
"time_span": score_info["time_span"],
|
||
"total_text_len": score_info["total_text_len"],
|
||
"avg_rel": score_info["avg_rel"],
|
||
"other_kw_count": score_info["other_kw_count"],
|
||
"preview": cluster[0]["text_corrected"][:60],
|
||
}
|
||
)
|
||
|
||
all_candidates.append(candidates)
|
||
|
||
if candidates:
|
||
best = max(candidates, key=lambda x: x["score"])
|
||
print(
|
||
f" [OK] '{keyword_simple}' -> {best['start']}s-{best['end']}s "
|
||
f"(score={best['score']:.1f}, 相关{best['total_count']}次/完整{best['full_count']}次, "
|
||
f"跨度{best['time_span']:.0f}s, 文本{best['total_text_len']}字, "
|
||
f"其他知识点{best['other_kw_count']}次, 预告{best.get('preview_count', 0)}/讲解{best.get('teaching_count', 0)})"
|
||
)
|
||
print(f" 预览: {best['preview']}")
|
||
else:
|
||
print(f" [SKIP] '{keyword}' - 无有效候选簇")
|
||
|
||
# 按视频时间顺序匹配,重叠时调整边界
|
||
print("\n [步骤6] 顺序约束匹配(按视频时间顺序)...")
|
||
|
||
all_best = []
|
||
for candidates in all_candidates:
|
||
if candidates:
|
||
all_best.append(max(candidates, key=lambda x: x["score"]))
|
||
|
||
all_best.sort(key=lambda x: x["start"])
|
||
|
||
filtered = []
|
||
for clip in all_best:
|
||
overlaps = False
|
||
for i, existing in enumerate(filtered):
|
||
if clip["start"] < existing["end"] and clip["end"] > existing["start"]:
|
||
overlaps = True
|
||
mid_point = (existing["end"] + clip["start"]) // 2
|
||
if clip["score"] > existing["score"]:
|
||
old_end = existing["end"]
|
||
existing["end"] = mid_point
|
||
print(
|
||
f" [ADJUST] '{existing['title']}' end {old_end}s -> {mid_point}s (让位给 '{clip['title']}')"
|
||
)
|
||
filtered.append(clip)
|
||
else:
|
||
new_start = mid_point
|
||
clip["start"] = new_start
|
||
print(
|
||
f" [ADJUST] '{clip['title']}' start {clip['start']}s (让位给 '{existing['title']}')"
|
||
)
|
||
filtered.append(clip)
|
||
break
|
||
if not overlaps:
|
||
filtered.append(clip)
|
||
print(
|
||
f" [MATCH] '{clip['title']}' -> {clip['start']}s-{clip['end']}s (score={clip['score']:.1f})"
|
||
)
|
||
|
||
filtered.sort(key=lambda x: x["start"])
|
||
return filtered
|
||
|
||
|
||
def match_homework(segments, homework_anchor, video_end):
|
||
"""匹配作业片段:基于语言分析定位作业结束点
|
||
|
||
作业结束的语言标记(使用模糊匹配,覆盖多种口语表达):
|
||
1. 明确结束语:"下课"、"拜拜"、"再见"
|
||
2. 作业完成语:"作业" + 完成标记(就这样/就这些/讲完了/说完了/到这儿/到这里)
|
||
3. 通用结束语:就到这里/就这样/说完了/讲完了/没什么说的
|
||
4. 群发通知:"发群里"、"到时候我发"
|
||
5. 长间隔:老师停顿超过 45 秒
|
||
"""
|
||
print("\n[步骤7] 匹配作业片段...")
|
||
|
||
hw_segments = [s for s in segments if s["start"] >= homework_anchor]
|
||
|
||
if not hw_segments:
|
||
print(" [SKIP] 未找到作业片段")
|
||
return None
|
||
|
||
# 模糊匹配:用正则表达式覆盖多种口语表达
|
||
# 优先级从高到低
|
||
end_patterns = [
|
||
# 1. 明确下课(最高优先级)
|
||
(r"下课", "下课"),
|
||
(r"拜拜", "拜拜"),
|
||
(r"再见", "再见"),
|
||
# 2. 作业完成语:"作业" + 各种完成表达
|
||
(r"作业.*就这样", "作业就这样"),
|
||
(r"作业.*就这些", "作业就这些"),
|
||
(r"作业.*就是这些", "作业就是这些"),
|
||
(r"作业.*讲到这里", "作业讲到这里"),
|
||
(r"作业.*讲到这", "作业讲到这"),
|
||
(r"作业.*说完了", "作业说完了"),
|
||
(r"作业.*讲完了", "作业讲完了"),
|
||
(r"作业.*布置完了", "作业布置完了"),
|
||
(r"作业.*就这么多", "作业就这么多"),
|
||
(r"作业.*到这儿", "作业到这儿"),
|
||
(r"作业.*到这里", "作业到这里"),
|
||
(r"作业.*完了", "作业完了"),
|
||
(r"作业.*结束", "作业结束"),
|
||
(r"作业.*说完了", "作业说完了"),
|
||
# 3. 通用结束语(中等优先级)
|
||
(r"就到这里", "就到这里"),
|
||
(r"就到这", "就到这"),
|
||
(r"就这样吧", "就这样吧"),
|
||
(r"就这样了", "就这样了"),
|
||
(r"就这些了", "就这些了"),
|
||
(r"就这些", "就这些"),
|
||
(r"说完了", "说完了"),
|
||
(r"讲完了", "讲完了"),
|
||
(r"没什么.*说的", "没什么说的"),
|
||
(r"没什么.*讲", "没什么讲的"),
|
||
(r"没别的", "没别的"),
|
||
(r"今天就到", "今天就到"),
|
||
(r"今天就这样", "今天就这样"),
|
||
(r"那就这样", "那就这样"),
|
||
(r"OK.*那就", "OK那就"),
|
||
# 4. 群发通知
|
||
(r"发群", "发群"),
|
||
(r"到时候.*发", "到时候发"),
|
||
# 5. 其他结束语(需要精确匹配,避免误匹配)
|
||
(r"好那", "好那"),
|
||
(r"好了", "好了"),
|
||
]
|
||
|
||
# 找到最后一个结束标记(按时间顺序扫描,记录最后一个匹配)
|
||
end_markers = [] # list of (time, pattern_name, text)
|
||
for seg in hw_segments:
|
||
text = zhconv.convert(seg["text"], "zh-cn")
|
||
for pattern, name in end_patterns:
|
||
if re.search(pattern, text):
|
||
end_markers.append((seg["start"], name, text[:60]))
|
||
break # 一个 segment 只匹配一个模式
|
||
|
||
if end_markers:
|
||
# 取最后一个结束标记
|
||
last_end_marker_time, last_pattern_name, _ = end_markers[-1]
|
||
print(f' 检测到结束标记: "{last_pattern_name}" @ {last_end_marker_time:.0f}s')
|
||
else:
|
||
last_end_marker_time = None
|
||
last_pattern_name = ""
|
||
|
||
# 策略2:检测长间隔(老师说完作业后的停顿)
|
||
gap_cutoff = detect_gap_cutoff(hw_segments, max_gap=45)
|
||
|
||
# 综合判断
|
||
if last_end_marker_time:
|
||
# 有结束语言,在结束语言后找第一个长间隔
|
||
after_end = [s for s in hw_segments if s["start"] >= last_end_marker_time]
|
||
if after_end and len(after_end) > 1:
|
||
gap_after_end = detect_gap_cutoff(after_end, max_gap=30)
|
||
# 只有当间隔是真正的间隔(不是视频末尾)时才使用
|
||
is_meaningful_gap = (
|
||
gap_after_end > last_end_marker_time + 45
|
||
and gap_after_end < video_end - 10
|
||
)
|
||
if is_meaningful_gap:
|
||
clip_end = min(gap_after_end, video_end)
|
||
print(
|
||
f' 作业结束: {last_end_marker_time:.0f}s ("{last_pattern_name}"),间隔截断: {clip_end:.0f}s'
|
||
)
|
||
else:
|
||
# 否则在结束语言后加30秒
|
||
clip_end = min(last_end_marker_time + 30, video_end)
|
||
print(
|
||
f' 作业结束: {last_end_marker_time:.0f}s ("{last_pattern_name}"),+30s兜底: {clip_end:.0f}s'
|
||
)
|
||
else:
|
||
clip_end = min(last_end_marker_time + 30, video_end)
|
||
print(
|
||
f' 作业结束: {last_end_marker_time:.0f}s ("{last_pattern_name}"),+30s兜底: {clip_end:.0f}s'
|
||
)
|
||
elif gap_cutoff > homework_anchor + 30:
|
||
# 没有明确结束语言,用间隔截断
|
||
clip_end = min(gap_cutoff, video_end)
|
||
print(f" 作业结束: 间隔截断 {clip_end:.0f}s")
|
||
else:
|
||
# 兜底:视频末尾前2分钟
|
||
clip_end = video_end - 120
|
||
print(f" 作业结束: 兜底到视频末尾前2分钟 {clip_end:.0f}s")
|
||
|
||
clip_end = min(clip_end, video_end)
|
||
duration = clip_end - homework_anchor
|
||
|
||
if duration < 10:
|
||
print(f" [SKIP] 作业片段太短: {duration:.0f}s")
|
||
return None
|
||
|
||
print(
|
||
f" [MATCH] '作业' -> {homework_anchor:.0f}s-{clip_end:.0f}s ({duration:.0f}s)"
|
||
)
|
||
|
||
return {
|
||
"title": "作业",
|
||
"keyword": "作业",
|
||
"start": int(homework_anchor),
|
||
"end": int(clip_end),
|
||
"density": 0,
|
||
"score": 0,
|
||
"preview": hw_segments[0]["text"][:60] if hw_segments else "",
|
||
}
|
||
|
||
|
||
def generate_config(video_path, clips, output_path):
|
||
"""生成配置文件"""
|
||
config = {
|
||
"video_src": video_path,
|
||
"output_dir": os.path.join(os.path.dirname(output_path), "output"),
|
||
"clips": [
|
||
{"title": c["title"], "start": c["start"], "end": c["end"]} for c in clips
|
||
],
|
||
"term_corrections": {
|
||
"黑剑": "黑键",
|
||
"负点": "附点",
|
||
"副点": "附点",
|
||
"实质": "时值",
|
||
"演音": "延音",
|
||
"阅历": "乐理",
|
||
"音苻": "音符",
|
||
"调苻": "调号",
|
||
"拍苻": "拍符",
|
||
"谱苻": "谱号",
|
||
"首位": "手位",
|
||
},
|
||
"video_params": {
|
||
"fade_duration": 1,
|
||
"title_duration": 3,
|
||
"title_fontsize": 90,
|
||
"title_color": "FFFF00",
|
||
"subtitle_fontsize": 24,
|
||
"subtitle_color": "FFFFFF",
|
||
"whisper_model": "large",
|
||
"use_fast_whisper": True,
|
||
"whisper_model_path": "D:/AI/LM-Models/faster-whisper/large-v3",
|
||
},
|
||
}
|
||
|
||
with open(output_path, "w", encoding="utf-8") as f:
|
||
yaml.dump(config, f, allow_unicode=True, default_flow_style=False)
|
||
|
||
print(f"\nOK: 配置文件已生成: {output_path}")
|
||
print(f" 知识点数量: {len(clips)}")
|
||
total_duration = sum(c["end"] - c["start"] for c in clips)
|
||
print(f" 总时长: {total_duration}s ({total_duration / 60:.1f}分钟)")
|
||
for i, c in enumerate(clips, 1):
|
||
print(f" {i}. {c['title']} ({c['start']}s-{c['end']}s)")
|
||
|
||
print("\n完成!使用以下命令生成精华视频:")
|
||
print(f" cd .opencode/skills/piano-lesson-highlight-generator")
|
||
print(f" python scripts/generate_highlights.py --config {output_path}")
|
||
|
||
|
||
def main():
|
||
if len(sys.argv) < 4:
|
||
print(
|
||
"用法: python extract_terms_from_ppt.py <pptx_path> <video_path> <output_config.yaml>"
|
||
)
|
||
sys.exit(1)
|
||
|
||
pptx_path = sys.argv[1]
|
||
video_path = sys.argv[2]
|
||
output_path = sys.argv[3]
|
||
|
||
# Step 1: 从PPT提取知识点
|
||
print("[步骤1] 从PPT提取知识点...")
|
||
ppt_texts = extract_ppt_text(pptx_path)
|
||
print(f" 提取到 {len(ppt_texts)} 页幻灯片内容")
|
||
|
||
knowledge_slide, knowledge_points = find_main_knowledge_slide(ppt_texts)
|
||
homework_pages = find_homework_pages(ppt_texts)
|
||
|
||
if knowledge_points:
|
||
print(f" 找到 {len(knowledge_points)} 个知识点: {', '.join(knowledge_points)}")
|
||
else:
|
||
print(" [WARN] 未找到'本课主要知识点'页面")
|
||
|
||
if homework_pages:
|
||
print(f" 找到 {len(homework_pages)} 个作业页面")
|
||
|
||
# Step 2: 转录视频
|
||
output_dir = os.path.dirname(output_path) or "."
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
segments = transcribe_video(video_path, output_dir)
|
||
|
||
# Step 3: 定位锚点时间
|
||
anchor_time = find_anchor_time(segments, knowledge_points)
|
||
|
||
# Step 4: 定位作业锚点
|
||
homework_anchor = find_homework_anchor(segments, anchor_time)
|
||
video_end = segments[-1]["end"] if segments else 0
|
||
|
||
# Step 5: 匹配知识点
|
||
clips = match_knowledge_points(
|
||
segments, knowledge_points, anchor_time, homework_anchor
|
||
)
|
||
|
||
# Step 6: 匹配作业
|
||
homework_clip = match_homework(segments, homework_anchor, video_end)
|
||
if homework_clip:
|
||
clips.append(homework_clip)
|
||
|
||
if not clips:
|
||
print("[WARN] 未找到任何匹配的知识点,请检查PPT内容或视频")
|
||
sys.exit(1)
|
||
|
||
# Step 7: 生成配置
|
||
generate_config(video_path, clips, output_path)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|