Initial commit: lesson-highlights generator
This commit is contained in:
@@ -0,0 +1,323 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
字幕处理模块
|
||||
|
||||
包含字幕生成、SRT格式转换、纠错等功能
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from .utils import to_srt_time, to_ass_time, ensure_dir
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SubtitleSegment:
|
||||
"""字幕片段"""
|
||||
|
||||
def __init__(self, start, end, text, style=None):
|
||||
self.start = start
|
||||
self.end = end
|
||||
self.text = text
|
||||
self.style = style # 'title' or 'content'
|
||||
|
||||
def to_srt_line(self):
|
||||
"""转换为SRT格式"""
|
||||
return f"{to_srt_time(self.start)} --> {to_srt_time(self.end)}"
|
||||
|
||||
def to_ass_line(self):
|
||||
"""转换为ASS格式"""
|
||||
# ASS format: Start --> End
|
||||
return f"{to_ass_time(self.start)} --> {to_ass_time(self.end)}"
|
||||
|
||||
|
||||
class SubtitleTrack:
|
||||
"""字幕轨道"""
|
||||
|
||||
def __init__(self, style=None):
|
||||
self.segments = []
|
||||
self.style = style # 可以是 'title' 或 'content'
|
||||
|
||||
def add(self, start, end, text, style=None):
|
||||
"""添加字幕段"""
|
||||
seg = SubtitleSegment(start, end, text)
|
||||
seg.style = style or self.style
|
||||
self.segments.append(seg)
|
||||
|
||||
def to_srt(self, with_index=True):
|
||||
"""
|
||||
转换为SRT格式
|
||||
|
||||
Args:
|
||||
with_index: 是否包含序号
|
||||
|
||||
Returns:
|
||||
SRT格式字符串
|
||||
"""
|
||||
lines = []
|
||||
for i, seg in enumerate(self.segments, 1):
|
||||
if with_index:
|
||||
lines.append(str(i))
|
||||
lines.append(seg.to_srt_line())
|
||||
lines.append(seg.text)
|
||||
lines.append('')
|
||||
|
||||
return '\n'.join(lines)
|
||||
|
||||
def to_ass(self, style_name="Default", font_size=24, primary_color="FFFFFF", alignment=2):
|
||||
"""
|
||||
转换为ASS格式
|
||||
|
||||
Args:
|
||||
style_name: 样式名称
|
||||
font_size: 字体大小
|
||||
primary_color: 颜色(HTML格式)
|
||||
alignment: 对齐方式 (5=正中, 2=底部居中)
|
||||
|
||||
Returns:
|
||||
ASS格式字符串
|
||||
"""
|
||||
# ASS header
|
||||
ass_lines = [
|
||||
"[Script Info]",
|
||||
"Title: Generated by piano-lesson-highlight-generator",
|
||||
"ScriptType: v4.00+",
|
||||
"PlayResX: 1920",
|
||||
"PlayResY: 1080",
|
||||
"WrapStyle: 0",
|
||||
"",
|
||||
"[V4+ Styles]",
|
||||
f"Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding",
|
||||
]
|
||||
|
||||
# 转换HTML颜色到ASS格式 (BGR with &H prefix)
|
||||
def html_to_ass_bgr(color):
|
||||
if color.startswith('&H'):
|
||||
return color
|
||||
r = int(color[0:2], 16)
|
||||
g = int(color[2:4], 16)
|
||||
b = int(color[4:6], 16)
|
||||
return f"&H{b:02X}{g:02X}{r:02X}"
|
||||
|
||||
primary_bgr = html_to_ass_bgr(primary_color)
|
||||
# Outline颜色为黑色
|
||||
outline_bgr = "&H000000"
|
||||
# BackColour (阴影)为半透明黑色
|
||||
shadow_bgr = "&H80000000"
|
||||
|
||||
# Style行
|
||||
style_line = (
|
||||
f"Style: {style_name},微软雅黑,{font_size},{primary_bgr},"
|
||||
f"{primary_bgr},{outline_bgr},{shadow_bgr},0,0,0,0,100,100,0,0,1,2,2,"
|
||||
f"{alignment},10,10,30,1"
|
||||
)
|
||||
ass_lines.append(style_line)
|
||||
ass_lines.append("")
|
||||
ass_lines.append("[Events]")
|
||||
ass_lines.append("Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text")
|
||||
|
||||
# 添加字幕段
|
||||
for seg in self.segments:
|
||||
# Layer=0, Style=name, Name=, Margins=0, Effect=
|
||||
line = (
|
||||
f"Dialogue: 0,{seg.to_ass_line()},{style_name},"
|
||||
f"0,0,0,0,," # Margins and Effect
|
||||
f"{seg.text.replace(chr(10), '\\N')}"
|
||||
)
|
||||
ass_lines.append(line)
|
||||
|
||||
return '\n'.join(ass_lines)
|
||||
|
||||
def save(self, path):
|
||||
"""保存到文件"""
|
||||
with open(path, 'w', encoding='utf-8') as f:
|
||||
f.write(self.to_srt())
|
||||
logger.info(f"Saved subtitles: {path}")
|
||||
|
||||
def save_ass(self, path, style_name="Default", font_size=24, primary_color="FFFFFF", alignment=2):
|
||||
"""保存为ASS格式"""
|
||||
with open(path, 'w', encoding='utf-8') as f:
|
||||
f.write(self.to_ass(style_name, font_size, primary_color, alignment))
|
||||
logger.info(f"Saved ASS subtitles: {path}")
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_path, title=None):
|
||||
"""
|
||||
从JSON文件加载字幕
|
||||
|
||||
Args:
|
||||
json_path: JSON文件路径
|
||||
title: 可选的标题
|
||||
|
||||
Returns:
|
||||
SubtitleTrack对象
|
||||
"""
|
||||
with open(json_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
track = cls()
|
||||
|
||||
# 添加标题
|
||||
if title:
|
||||
track.add(0, 3, title)
|
||||
|
||||
# 添加字幕段
|
||||
for seg in data.get('segments', []):
|
||||
track.add(
|
||||
seg.get('start', 0),
|
||||
seg.get('end', 0),
|
||||
seg.get('text', '')
|
||||
)
|
||||
|
||||
return track
|
||||
|
||||
|
||||
class SubtitlePipeline:
|
||||
"""字幕处理流水线"""
|
||||
|
||||
def __init__(self, config, output_dir):
|
||||
self.config = config
|
||||
self.output_dir = output_dir
|
||||
self.subs_dir = ensure_dir(os.path.join(output_dir, 'subs'))
|
||||
|
||||
def load_clip_json(self, clip_num, inter_dir):
|
||||
"""
|
||||
加载clip的JSON
|
||||
|
||||
Args:
|
||||
clip_num: clip编号
|
||||
inter_dir: 中间目录
|
||||
|
||||
Returns:
|
||||
JSON数据
|
||||
"""
|
||||
json_path = os.path.join(inter_dir, f"clip{clip_num}.json")
|
||||
with open(json_path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
|
||||
def generate_from_clips(self, clip_configs, json_paths, apply_corrections=None):
|
||||
"""
|
||||
从clips生成字幕(分离标题和正文轨道)
|
||||
|
||||
Args:
|
||||
clip_configs: clip配置列表
|
||||
json_paths: JSON文件路径列表
|
||||
apply_corrections: 纠错函数
|
||||
|
||||
Returns:
|
||||
(title_track, content_track, title_path, content_path)
|
||||
"""
|
||||
title_track = SubtitleTrack(style='title')
|
||||
content_track = SubtitleTrack(style='content')
|
||||
current_time = 0
|
||||
|
||||
# 计算每个clip的偏移
|
||||
# 必须用 clip_configs 里的实际时长,而不是 Whisper 检测的语音结束时间
|
||||
# 因为 Whisper 只检测有语音的部分,无语音的间隙会被忽略,导致偏移累积偏差
|
||||
offsets = []
|
||||
for i, json_path in enumerate(json_paths):
|
||||
offsets.append(current_time)
|
||||
clip = clip_configs[i]
|
||||
clip_duration = clip['end'] - clip['start']
|
||||
current_time += clip_duration
|
||||
|
||||
# 重新遍历生成字幕
|
||||
current_time = 0
|
||||
for i, (clip, json_path) in enumerate(zip(clip_configs, json_paths)):
|
||||
offset = offsets[i]
|
||||
clip_duration = offsets[i+1] - offsets[i] if i+1 < len(offsets) else 3
|
||||
|
||||
# 添加标题(使用title样式)- 标题显示3秒后正文才显示,避免重叠
|
||||
title_duration = min(3, clip_duration)
|
||||
title_track.add(offset, offset + title_duration, clip['title'], style='title')
|
||||
|
||||
# 添加正文字幕 - 从标题结束后开始,避免重叠
|
||||
with open(json_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
content_start = offset + title_duration # 正文从标题结束后开始
|
||||
for seg in data.get('segments', []):
|
||||
text = seg.get('text', '').strip()
|
||||
if not text:
|
||||
continue
|
||||
|
||||
# 应用纠错
|
||||
if apply_corrections:
|
||||
text = apply_corrections(text)
|
||||
|
||||
# 计算相对偏移(正文时间从标题结束后开始)
|
||||
seg_start = offset + seg['start']
|
||||
seg_end = offset + seg['end']
|
||||
|
||||
# 只添加在clip时间范围内的字幕
|
||||
clip_end = clip['end'] - clip['start'] + offset
|
||||
if seg_start < clip_end and seg_end <= clip_end:
|
||||
content_track.add(
|
||||
seg_start,
|
||||
seg_end,
|
||||
text,
|
||||
style='content'
|
||||
)
|
||||
|
||||
# 保存两个轨道 - 标题使用SRT格式
|
||||
version = self._get_next_version()
|
||||
title_path = os.path.join(self.subs_dir, f"v{version}_title.srt")
|
||||
content_path = os.path.join(self.subs_dir, f"v{version}_content.srt")
|
||||
|
||||
title_track.save(title_path)
|
||||
content_track.save(content_path)
|
||||
|
||||
return title_track, content_track, title_path, content_path
|
||||
|
||||
def _get_next_version(self):
|
||||
"""获取下一个版本号"""
|
||||
existing = [f for f in os.listdir(self.subs_dir) if f.startswith('v') and f.endswith('_terms.srt')]
|
||||
|
||||
if not existing:
|
||||
return 1
|
||||
|
||||
# 提取版本号
|
||||
versions = []
|
||||
for f in existing:
|
||||
try:
|
||||
v = int(f.split('_')[0][1:])
|
||||
versions.append(v)
|
||||
except:
|
||||
pass
|
||||
|
||||
return max(versions) + 1 if versions else 1
|
||||
|
||||
def generate_v1(self, clip_configs, json_paths, apply_corrections=None):
|
||||
"""
|
||||
生成V1版本字幕(原版+纠错)
|
||||
|
||||
Args:
|
||||
clip_configs: clip配置
|
||||
json_paths: JSON路径
|
||||
apply_corrections: 纠错函数
|
||||
|
||||
Returns:
|
||||
字幕路径
|
||||
"""
|
||||
return self.generate_from_clips(clip_configs, json_paths, apply_corrections)[1]
|
||||
|
||||
|
||||
def load_clip_subtitles(inter_dir, clip_nums):
|
||||
"""
|
||||
批量加载多个clip的字幕
|
||||
|
||||
Args:
|
||||
inter_dir: 中间目录
|
||||
clip_nums: clip编号列表
|
||||
|
||||
Returns:
|
||||
{clip_num: json_data}
|
||||
"""
|
||||
clips = {}
|
||||
for num in clip_nums:
|
||||
json_path = os.path.join(inter_dir, f"clip{num}.json")
|
||||
if os.path.exists(json_path):
|
||||
with open(json_path, 'r', encoding='utf-8') as f:
|
||||
clips[num] = json.load(f)
|
||||
return clips
|
||||
Reference in New Issue
Block a user