Files
piano-plan/app/routes/plans.py
T
hmo 18351212e8 feat: 问题数据迁移到数据库;学员详情页URL导航改造;侧边栏统一
- 问题从文件系统迁移到数据库 problems 表
- 移除 PROBLEMS_DIR 配置和文件读取逻辑
- student.html 完整重写:编辑/添加/删除问题,生成方案进度显示
- 学员详情页支持独立URL访问 (/student/<id>)
- 统一侧边栏到 base.html
- 更新文档:DEPLOYMENT_SOP, MODELS, STRUCTURE, FRONTEND_ARCH
- 部署到生产环境 v1.2.0
2026-04-23 06:35:32 +08:00

605 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 方案生成路由
import json
import os
from flask import (
request,
jsonify,
render_template,
send_file,
current_app,
Response,
stream_with_context,
session,
)
from app.routes import main_bp
from app.models import db, Student, PracticePlan, StudentProblem
from app.services.plan_generator import generate_practice_plan, generate_ai_report
from app.services.pdf_generator import generate_pdf
from app.routes.auth import login_required_json, admin_required
def sse_format(data):
"""格式化SSE数据"""
return f"data: {json.dumps(data)}\n\n"
@main_bp.route("/api/students/<int:student_id>/plans", methods=["GET"])
@login_required_json
def get_student_plans(student_id):
"""获取学员的练习方案列表"""
student = Student.query.get_or_404(student_id)
plans = student.plans.order_by(PracticePlan.created_at.desc()).all()
return jsonify([p.to_dict() for p in plans])
@main_bp.route("/api/plans", methods=["GET"])
@login_required_json
def get_all_plans():
"""获取所有方案(支持多条件筛选)
查询参数:
- class_id: 班级ID
- problem_ids: Problem.id 列表,逗号分隔
- template_id: 模板ID
- is_typical: 是否典型 (true/false)
- student_name: 学员姓名(模糊匹配)
"""
import json as json_module
query = PracticePlan.query
# 按班级筛选
class_id = request.args.get('class_id', type=int)
if class_id:
query = query.join(Student).filter(Student.class_id == class_id)
# 按模板筛选
template_id = request.args.get('template_id', type=int)
if template_id:
query = query.filter(PracticePlan.template_id == template_id)
# 按典型状态筛选
is_typical = request.args.get('is_typical')
if is_typical and is_typical.lower() == 'true':
query = query.filter(PracticePlan.is_typical == True)
# 按学员姓名模糊筛选
student_name = request.args.get('student_name')
if student_name:
query = query.join(Student).filter(Student.name.like(f'%%{student_name}%%'))
# 按问题筛选(通过 problem_id 关联到学员的问题)
problem_ids = request.args.get('problem_ids')
if problem_ids:
problem_id_list = [int(pid.strip()) for pid in problem_ids.split(',') if pid.strip()]
if problem_id_list:
# 筛选:方案对应的学员有指定问题之一的
# 使用子查询避免笛卡尔积导致的重复
from sqlalchemy import exists
query = query.join(Student).filter(
exists().where(
(StudentProblem.student_id == Student.id) &
(StudentProblem.problem_id.in_(problem_id_list))
)
)
plans = query.order_by(PracticePlan.created_at.desc()).all()
return jsonify([p.to_dict() for p in plans])
@main_bp.route("/api/plans/<int:plan_id>/typical", methods=["POST"])
@login_required_json
def toggle_plan_typical(plan_id):
"""切换方案的典型状态"""
plan = PracticePlan.query.get_or_404(plan_id)
plan.is_typical = not plan.is_typical
db.session.commit()
return jsonify({"success": True, "is_typical": plan.is_typical})
@main_bp.route("/plans")
@login_required_json
def plans_page():
"""方案管理页面"""
return render_template("plans.html", active_nav="plans")
@main_bp.route("/api/admin/fix-plan-templates", methods=["GET"])
@admin_required
def fix_plan_templates():
"""临时接口:修复所有方案的模板关联"""
from app.models import Template, PracticePlan
simple_template = Template.query.filter_by(name='简单文字版').first()
formal_template = Template.query.filter_by(name='正式报告版').first()
if not simple_template or not formal_template:
return jsonify({"error": "模板没找到"}), 400
plans = PracticePlan.query.order_by(PracticePlan.created_at.desc()).all()
if plans:
plans[0].template_id = simple_template.id
for plan in plans[1:]:
plan.template_id = formal_template.id
db.session.commit()
return jsonify({"ok": True, "updated": len(plans)})
@main_bp.route("/plan/<int:plan_id>")
@login_required_json
def plan_detail_page(plan_id):
"""方案详情页面"""
return render_template("plan_detail.html", active_nav="plans")
@main_bp.route("/plan/<int:plan_id>/edit")
@login_required_json
def plan_edit_page(plan_id):
"""方案编辑页面"""
return render_template("plan_edit.html", active_nav="plans", plan_id=plan_id)
@main_bp.route("/api/generate-plan", methods=["POST"])
@login_required_json
def generate_plan():
"""生成练习方案 - 使用SSE实时推送进度"""
data = request.get_json()
student_id = data.get("student_id")
use_ai = data.get("use_ai", True)
template_id = data.get("template_id") # AI提示词模板ID
student = Student.query.get_or_404(student_id)
problems = student.problems.all()
if not problems:
return jsonify({"error": "请先记录学员的问题"}), 400
# 预先收集所有数据,避免在generator中访问数据库
# 学员的统一练习时间
practice_time = student.practice_time or "30-60分钟"
problem_data = []
for p in problems:
# 使用 Problem 关联获取问题信息
problem_obj = p.problem
if problem_obj:
problem_data.append(
{
"problem_id": problem_obj.id, # 使用 Problem.id
"problem_name": problem_obj.name,
"problem_no": problem_obj.no,
"severity": p.severity,
"level": p.level,
"content": problem_obj.content or "",
}
)
time_mapping = {
"15分钟": {"total": 15, "basic": 10, "tech": 2, "piece": 3},
"30分钟": {"total": 30, "basic": 15, "tech": 5, "piece": 10},
"45分钟": {"total": 45, "basic": 15, "tech": 10, "piece": 20},
"60分钟": {"total": 60, "basic": 20, "tech": 15, "piece": 25},
"90分钟": {"total": 90, "basic": 25, "tech": 25, "piece": 40},
"120分钟": {"total": 120, "basic": 30, "tech": 35, "piece": 55},
"150分钟以上": {"total": 150, "basic": 35, "tech": 45, "piece": 70},
}
time_config = time_mapping.get(practice_time, time_mapping["30分钟"])
# 加载API配置
from app.config import load_api_config
api_config = load_api_config(current_app.config)
@stream_with_context
def generate():
# Step 1: 收集问题数据
yield sse_format(
{"step": "collecting", "message": "正在收集问题数据...", "progress": 10}
)
# Step 2: 生成基础方案
yield sse_format(
{"step": "basic", "message": "正在生成基础练习方案...", "progress": 30}
)
plan_content = generate_practice_plan(
student_name=student.name,
problems=problem_data,
practice_time=practice_time,
)
yield sse_format(
{"step": "basic_done", "message": "基础方案生成完成", "progress": 50}
)
# Step 3: 如果启用AI,生成AI报告
ai_report = None
if use_ai:
# 显示AI配置信息
yield sse_format(
{
"step": "ai_config",
"message": f"AI配置: {api_config.get('model', 'N/A')} @ {api_config.get('provider', 'N/A')}",
"progress": 55,
"detail": f"Endpoint: {api_config.get('base_url', '')}",
}
)
yield sse_format(
{
"step": "ai_start",
"message": "正在调用AI生成个性化报告...",
"progress": 60,
}
)
# 构建问题摘要用于显示
problems_summary = []
for p in problem_data[:3]: # 只显示前3个问题
level = p.get("level", "入门")
severity = p.get("severity", "中等")
problems_summary.append(f"- {p['problem_name']} [{level} | {severity}]")
yield sse_format(
{
"step": "ai_problems",
"message": "发送问题给AI:",
"progress": 62,
"detail": "\n".join(problems_summary),
}
)
yield sse_format(
{"step": "ai_request", "message": "等待AI响应...", "progress": 65}
)
# 先用dry_run模式获取提示词并显示给用户
prompt, _, error = generate_ai_report(
student_name=student.name,
wechat_nickname=student.wechat_nickname or "",
problems=problem_data,
practice_time=practice_time,
time_config=time_config,
template_id=template_id,
dry_run=True
)
# 发送提示词给前端显示
yield sse_format({
"step": "ai_prompt",
"message": "发送给AI的提示词:",
"progress": 60,
"detail": prompt if prompt else ""
})
if error:
yield sse_format(
{
"step": "ai_error",
"message": f"获取提示词失败: {error}",
"progress": 80,
"error": error,
}
)
return
yield sse_format(
{"step": "ai_generating", "message": "正在生成AI报告...", "progress": 70}
)
# 真正调用API生成报告
_, ai_report, error = generate_ai_report(
student_name=student.name,
wechat_nickname=student.wechat_nickname or "",
problems=problem_data,
practice_time=practice_time,
time_config=time_config,
template_id=template_id,
dry_run=False
)
if error:
yield sse_format(
{
"step": "ai_error",
"message": f"AI生成失败: {error}",
"progress": 80,
"error": error,
}
)
plan_content["ai_report_error"] = error
else:
# 显示AI返回的报告长度
report_lines = len(ai_report.split("\n")) if ai_report else 0
yield sse_format(
{
"step": "ai_response",
"message": f"AI报告已生成",
"progress": 75,
"detail": f"报告长度: {len(ai_report) if ai_report else 0} 字符, {report_lines}",
}
)
yield sse_format(
{
"step": "ai_done",
"message": "AI报告生成完成",
"progress": 80,
"has_ai": True,
}
)
plan_content["ai_report"] = ai_report
else:
yield sse_format(
{"step": "ai_skip", "message": "跳过AI生成", "progress": 80}
)
# Step 4: 保存方案
yield sse_format(
{"step": "saving", "message": "正在保存方案...", "progress": 90}
)
plan = None
try:
plan = PracticePlan(
student_id=student_id,
template_id=template_id,
content=json.dumps(plan_content, ensure_ascii=False),
)
db.session.add(plan)
db.session.commit()
yield sse_format(
{
"step": "saved",
"message": f"方案已保存 (ID: {plan.id})",
"progress": 95,
}
)
except Exception as e:
db.session.rollback()
yield sse_format(
{
"step": "save_error",
"message": f"保存失败: {str(e)}",
"progress": 90,
"error": str(e),
}
)
# 确保 complete 消息被发送
try:
yield sse_format(
{
"step": "complete",
"message": "方案生成完成!",
"progress": 100,
"plan_id": plan.id if plan and hasattr(plan, 'id') else None,
"content": plan_content,
"ai_report": ai_report,
}
)
except Exception as e:
# 最后一道防护 - 即使出错也要发送 complete
yield sse_format(
{
"step": "complete",
"message": f"方案生成完成!(部分错误: {str(e)[:50]}",
"progress": 100,
"plan_id": None,
"error": str(e),
}
)
return Response(
generate(),
mimetype="text/event-stream",
headers={
"X-Accel-Buffering": "no",
"Cache-Control": "no-cache",
}
)
@main_bp.route("/api/plans/<int:plan_id>", methods=["GET"])
@login_required_json
def get_plan(plan_id):
"""获取单个方案详情"""
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
return jsonify(
{
"id": plan.id,
"student_id": plan.student_id,
"student_name": plan.student.name if plan.student else "",
"created_at": plan.created_at.strftime("%Y-%m-%d %H:%M"),
"content": content,
}
)
@main_bp.route("/api/plans/<int:plan_id>/pdf", methods=["GET"])
@login_required_json
def export_pdf(plan_id):
"""导出PDF - 使用模板"""
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
student_name = plan.student.name if plan.student else "未知学员"
# 尝试使用数据库中的报告模板
report_template = None
try:
from app.models import Template
tmpl = Template.query.filter_by(type="report").first()
if tmpl:
report_template = tmpl.content
except:
pass
# 如果有模板,先渲染
rendered_report = None
if report_template:
rendered_report = report_template
rendered_report = rendered_report.replace("{student_name}", student_name)
rendered_report = rendered_report.replace("{practice_time}", content.get('practice_time', 'N/A'))
rendered_report = rendered_report.replace("{total_minutes}", str(content.get('total_daily_minutes', 0)))
rendered_report = rendered_report.replace("{generated_at}", content.get('generated_at', ''))
if content.get('ai_report'):
rendered_report = rendered_report.replace("{ai_report}", content['ai_report'])
else:
rendered_report = rendered_report.replace("{ai_report}", "(未生成AI报告)")
problem_tags = ""
for problem in content.get('problems', []):
problem_tags += f"- **{problem.get('name', '')}** ({problem.get('severity', '')})\n"
rendered_report = rendered_report.replace("{problem_tags}", problem_tags or "(无)")
schedule_table = "| 阶段 | 时长 | 内容 | 目的 |\n|------|------|------|------|\n"
for item in content.get('daily_schedule', []):
schedule_table += f"| {item.get('phase', '')} | {item.get('duration', '')} | {item.get('content', '')} | {item.get('purpose', '')} |\n"
rendered_report = rendered_report.replace("{schedule_table}", schedule_table)
pdf_path = generate_pdf(
plan_id=plan_id,
student_name=student_name,
content=content,
output_dir=current_app.config["PDF_OUTPUT_DIR"],
rendered_report=rendered_report, # 传递渲染后的报告
)
return send_file(
pdf_path, as_attachment=True, download_name=f"{student_name}_练习方案.pdf"
)
@main_bp.route("/api/plans/<int:plan_id>/md", methods=["GET"])
@login_required_json
def export_md(plan_id):
"""导出Markdown"""
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
student_name = plan.student.name if plan.student else "未知学员"
template_id = request.args.get('template_id', type=int)
# 获取报告模板
report_template = None
try:
from app.models import Template
if template_id:
tmpl = Template.query.get(template_id)
if tmpl and tmpl.type == "report":
report_template = tmpl.content
else:
tmpl = Template.query.filter_by(type="report").order_by(Template.sort_order.asc()).first()
if tmpl:
report_template = tmpl.content
except:
pass
if report_template:
# 使用模板渲染
# 替换变量
rendered = report_template
rendered = rendered.replace("{student_name}", student_name)
rendered = rendered.replace("{practice_time}", content.get('practice_time', 'N/A'))
rendered = rendered.replace("{total_minutes}", str(content.get('total_daily_minutes', 0)))
rendered = rendered.replace("{generated_at}", content.get('generated_at', ''))
# AI报告
if content.get('ai_report'):
rendered = rendered.replace("{ai_report}", content['ai_report'])
else:
rendered = rendered.replace("{ai_report}", "(未生成AI报告)")
# 问题诊断标签
problem_tags = ""
for problem in content.get('problems', []):
problem_tags += f"- **{problem.get('name', '')}** ({problem.get('severity', '')})\n"
rendered = rendered.replace("{problem_tags}", problem_tags or "(无)")
# 每日计划表格
schedule_table = "| 阶段 | 时长 | 内容 | 目的 |\n|------|------|------|------|\n"
for item in content.get('daily_schedule', []):
schedule_table += f"| {item.get('phase', '')} | {item.get('duration', '')} | {item.get('content', '')} | {item.get('purpose', '')} |\n"
rendered = rendered.replace("{schedule_table}", schedule_table)
md_content = rendered
else:
# 兜底:生成完整内容
md_lines = [
f"# {student_name} - 个性化练习方案\n",
f"**练习时间**: {content.get('practice_time', 'N/A')} (共{content.get('total_daily_minutes', 0)}分钟)\n",
f"**生成时间**: {content.get('generated_at', '')}\n",
"\n---\n",
]
if content.get('ai_report'):
md_lines.append("## 📝 AI个性化报告\n")
md_lines.append(content['ai_report'])
md_lines.append("\n---\n")
md_lines.append("## 🔍 问题诊断\n")
for problem in content.get('problems', []):
md_lines.append(f"- **{problem.get('name', '')}** ({problem.get('severity', '')})\n")
md_lines.append("\n")
if content.get('problem_details'):
md_lines.append("## 📚 问题详解\n")
for detail in content.get('problem_details', []):
md_lines.append(f"### {detail.get('name', '')} ({detail.get('severity', '')})\n")
md_lines.append(f"{detail.get('content', '')}\n\n")
md_lines.append("## 📅 每日练习计划\n")
md_lines.append("| 阶段 | 时长 | 内容 | 目的 |\n|------|------|------|------|\n")
for item in content.get('daily_schedule', []):
md_lines.append(f"| {item.get('phase', '')} | {item.get('duration', '')} | {item.get('content', '')} | {item.get('purpose', '')} |\n")
md_content = ''.join(md_lines)
from urllib.parse import quote
filename = quote(f"{student_name}_练习方案.md")
return Response(
md_content,
mimetype='text/markdown; charset=utf-8',
headers={
'Content-Disposition': f"attachment; filename*=UTF-8''{filename}"
}
)
@main_bp.route("/plans/<int:plan_id>/wechat", methods=["GET"])
@login_required_json
def wechat_card(plan_id):
"""微信卡片展示页"""
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
return render_template(
"wechat_card.html",
plan_id=plan_id,
student_name=plan.student.name if plan.student else "",
content=content,
)
@main_bp.route("/api/plans/<int:plan_id>", methods=["DELETE"])
@login_required_json
def delete_plan(plan_id):
"""删除方案"""
plan = PracticePlan.query.get_or_404(plan_id)
db.session.delete(plan)
db.session.commit()
return jsonify({"message": "删除成功"})
@main_bp.route("/api/plans/<int:plan_id>/content", methods=["PUT"])
@login_required_json
def update_plan_content(plan_id):
"""更新方案内容(用于编辑)"""
plan = PracticePlan.query.get_or_404(plan_id)
data = request.get_json()
# 更新content字段
if "content" in data:
plan.content = data["content"]
db.session.commit()
return jsonify({"message": "保存成功"})