Files
piano-plan/app/routes/plans.py
T

994 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 方案生成路由
import json
import os
from flask import (
request,
jsonify,
render_template,
send_file,
current_app,
Response,
stream_with_context,
session,
)
from app.routes import main_bp
from app.models import db, Student, PracticePlan, StudentProblem, StudentGoal, Class
from app.services.plan_generator import generate_practice_plan, generate_ai_report
from app.services.pdf_generator import generate_pdf
from app.routes.auth import login_required_json, admin_required
def sse_format(data):
"""格式化SSE数据"""
return f"data: {json.dumps(data)}\n\n"
@main_bp.route("/api/students/<int:student_id>/plans", methods=["GET"])
@login_required_json
def get_student_plans(student_id):
"""获取学员的练习方案列表"""
student = Student.query.get_or_404(student_id)
plans = student.plans.order_by(PracticePlan.created_at.desc()).all()
return jsonify([p.to_dict() for p in plans])
@main_bp.route("/api/plans", methods=["GET"])
@login_required_json
def get_all_plans():
"""获取所有方案(支持多条件筛选)
查询参数:
- class_id: 班级ID
- problem_ids: Problem.id 列表,逗号分隔
- template_id: 模板ID
- is_typical: 是否典型 (true/false)
- student_name: 学员姓名(模糊匹配)
- mine: true/false(我的学员的方案)
"""
import json as json_module
from app.models import Class
from sqlalchemy.orm import joinedload
from sqlalchemy import exists
query = PracticePlan.query
# 检查是否需要 join Student
needs_student_join = False
needs_class_join = False
class_id = request.args.get('class_id', type=int)
student_name = request.args.get('student_name')
problem_ids = request.args.get('problem_ids')
mine = request.args.get('mine')
if class_id or student_name or problem_ids or (mine and mine.lower() == 'true'):
needs_student_join = True
query = query.join(Student)
if mine and mine.lower() == 'true':
needs_class_join = True
query = query.join(Class)
# 按班级筛选
if class_id:
query = query.filter(Student.class_id == class_id)
# 按模板筛选
template_id = request.args.get('template_id', type=int)
if template_id:
query = query.filter(PracticePlan.template_id == template_id)
# 按典型状态筛选
is_typical = request.args.get('is_typical')
if is_typical and is_typical.lower() == 'true':
query = query.filter(PracticePlan.is_typical == True)
# 按学员姓名模糊筛选
if student_name:
query = query.filter(Student.name.like(f'%%{student_name}%%'))
# 按问题筛选(通过 problem_id 关联到学员的问题)
if problem_ids:
problem_id_list = [int(pid.strip()) for pid in problem_ids.split(',') if pid.strip()]
if problem_id_list:
query = query.filter(
exists().where(
(StudentProblem.student_id == Student.id) &
(StudentProblem.problem_id.in_(problem_id_list))
)
)
# 我的学员筛选(所在班级的老师是当前用户)
if needs_class_join:
user_id = session.get('user_id')
if user_id:
query = query.filter(Class.teacher_id == user_id)
plans = query.order_by(PracticePlan.created_at.desc()).all()
return jsonify([p.to_dict() for p in plans])
@main_bp.route("/api/plans/<int:plan_id>/typical", methods=["POST"])
@login_required_json
def toggle_plan_typical(plan_id):
"""切换方案的典型状态"""
plan = PracticePlan.query.get_or_404(plan_id)
plan.is_typical = not plan.is_typical
db.session.commit()
return jsonify({"success": True, "is_typical": plan.is_typical})
@main_bp.route("/api/students/<int:student_id>/recommended-plans", methods=["GET"])
@login_required_json
def get_recommended_plans(student_id):
"""获取推荐方案 - 当前学员问题与典型方案问题有交集的方案
查询参数:
- mine: true/false(我的学员的典型方案)
返回字段:
- can_adopt: 问题集合是否完全一致,可采纳
"""
try:
student = Student.query.get_or_404(student_id)
# 获取当前学员的问题名称集合
student_problems = StudentProblem.query.filter_by(student_id=student_id).all()
# 通过 Problem 关联获取问题名称
student_problem_names = set()
for sp in student_problems:
if sp.problem:
student_problem_names.add(sp.problem.name)
# 获取所有典型方案,排除当前学员自己的方案
query = PracticePlan.query.filter(
PracticePlan.is_typical == True,
PracticePlan.student_id != student_id
)
# 我的筛选:只显示当前用户创建的学员的典型方案
mine = request.args.get('mine')
if mine and mine.lower() == 'true':
user_id = session.get('user_id')
if user_id:
query = query.join(Student).join(Class).filter(Class.teacher_id == user_id)
typical_plans = query.order_by(PracticePlan.created_at.desc()).all()
# 筛选:方案的问题与当前学员的问题有交集
import json as json_module
recommended = []
for plan in typical_plans:
try:
content = json_module.loads(plan.content) if plan.content else {}
except:
continue
plan_problems = content.get('problems', [])
# 提取方案中的问题名称
plan_problem_names = set()
for p in plan_problems:
name = p.get('name') or p.get('problem_name', '')
if name:
plan_problem_names.add(name)
# 检查交集
if student_problem_names & plan_problem_names:
# 计算交集问题
matched_problems = student_problem_names & plan_problem_names
plan_dict = plan.to_dict()
plan_dict['matched_problems'] = list(matched_problems)
plan_dict['matched_count'] = len(matched_problems)
# 检查问题集合是否完全一致(可采纳)
plan_dict['can_adopt'] = (student_problem_names == plan_problem_names)
recommended.append(plan_dict)
# 按匹配数量降序排序
recommended.sort(key=lambda x: x['matched_count'], reverse=True)
return jsonify(recommended)
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"error": str(e)}), 500
@main_bp.route("/api/students/<int:student_id>/plans/from-typical/<int:plan_id>", methods=["POST"])
@login_required_json
def adopt_typical_plan(student_id, plan_id):
"""采纳典型方案 - 复制该方案给当前学员
前端已判断 can_adopt,后端直接采纳并记录来源
"""
student = Student.query.get_or_404(student_id)
typical_plan = PracticePlan.query.get_or_404(plan_id)
if not typical_plan.is_typical:
return jsonify({"error": "只能采纳典型方案"}), 400
# 获取典型方案的问题名称集合并验证一致性
try:
content = json.loads(typical_plan.content) if typical_plan.content else {}
except:
content = {}
plan_problems = content.get('problems', [])
plan_problem_names = set()
for p in plan_problems:
name = p.get('name') or p.get('problem_name', '')
if name:
plan_problem_names.add(name)
# 获取当前学员的问题名称集合并验证一致性
student_problems = StudentProblem.query.filter_by(student_id=student_id).all()
student_problem_names = set()
for sp in student_problems:
if sp.problem:
student_problem_names.add(sp.problem.name)
# 检查问题名称集合是否完全一致
if student_problem_names != plan_problem_names:
return jsonify({
"error": "采纳失败:方案的问题与当前学员的问题不一致",
"student_problems": list(student_problem_names),
"plan_problems": list(plan_problem_names)
}), 400
# 替换内容中的原学员姓名为当前学员姓名
old_name = typical_plan.student.name if typical_plan.student else ""
if old_name and old_name in str(content):
content_str = json.dumps(content, ensure_ascii=False)
content_str = content_str.replace(old_name, student.name)
content = json.loads(content_str)
# 添加采纳来源信息
from datetime import datetime
content['adopted_from'] = {
'student_name': old_name,
'plan_id': typical_plan.id,
'adopted_at': datetime.now().strftime('%Y-%m-%d')
}
# 创建新方案
new_plan = PracticePlan(
student_id=student_id,
template_id=typical_plan.template_id,
content=json.dumps(content, ensure_ascii=False),
is_typical=False, # 采纳的方案不再是典型
created_by=session.get('user_id')
)
db.session.add(new_plan)
db.session.commit()
return jsonify({
"message": "方案已采纳",
"plan_id": new_plan.id,
"plan": new_plan.to_dict()
}), 201
@main_bp.route("/plans")
@login_required_json
def plans_page():
"""方案管理页面"""
return render_template("plans.html", active_nav="plans")
@main_bp.route("/api/admin/fix-plan-templates", methods=["GET"])
@admin_required
def fix_plan_templates():
"""临时接口:修复所有方案的模板关联"""
from app.models import Template, PracticePlan
simple_template = Template.query.filter_by(name='简单文字版').first()
formal_template = Template.query.filter_by(name='正式报告版').first()
if not simple_template or not formal_template:
return jsonify({"error": "模板没找到"}), 400
plans = PracticePlan.query.order_by(PracticePlan.created_at.desc()).all()
if plans:
plans[0].template_id = simple_template.id
for plan in plans[1:]:
plan.template_id = formal_template.id
db.session.commit()
return jsonify({"ok": True, "updated": len(plans)})
@main_bp.route("/plan/<int:plan_id>")
@login_required_json
def plan_detail_page(plan_id):
"""方案详情页面"""
return render_template("plan_detail.html", active_nav="plans")
@main_bp.route("/plan/<int:plan_id>/edit")
@login_required_json
def plan_edit_page(plan_id):
"""方案编辑页面"""
return render_template("plan_edit.html", active_nav="plans", plan_id=plan_id)
@main_bp.route("/api/generate-plan", methods=["POST"])
@login_required_json
def generate_plan():
"""生成练习方案 - 使用SSE实时推送进度"""
data = request.get_json()
student_id = data.get("student_id")
use_ai = data.get("use_ai", True)
template_id = data.get("template_id") # AI提示词模板ID
student = Student.query.get_or_404(student_id)
problems = student.problems.all()
# 获取学员的目标(只获取未完成的)
goals = StudentGoal.query.filter_by(student_id=student_id).all()
goal_data = [g.to_dict() for g in goals]
if not problems:
return jsonify({"error": "请先记录学员的问题"}), 400
# 预先收集所有数据,避免在generator中访问数据库
# 学员的统一练习时间
practice_time = student.practice_time or "30-60分钟"
problem_data = []
for p in problems:
# 使用 Problem 关联获取问题信息
problem_obj = p.problem
if problem_obj:
problem_data.append(
{
"problem_id": problem_obj.id, # 使用 Problem.id
"problem_name": problem_obj.name,
"problem_no": problem_obj.no,
"severity": p.severity,
"level": p.level,
"content": problem_obj.content or "",
}
)
time_mapping = {
"15分钟": {"total": 15, "basic": 10, "tech": 2, "piece": 3},
"30分钟": {"total": 30, "basic": 15, "tech": 5, "piece": 10},
"45分钟": {"total": 45, "basic": 15, "tech": 10, "piece": 20},
"60分钟": {"total": 60, "basic": 20, "tech": 15, "piece": 25},
"90分钟": {"total": 90, "basic": 25, "tech": 25, "piece": 40},
"120分钟": {"total": 120, "basic": 30, "tech": 35, "piece": 55},
"150分钟以上": {"total": 150, "basic": 35, "tech": 45, "piece": 70},
}
time_config = time_mapping.get(practice_time, time_mapping["30分钟"])
# 加载API配置
from app.config import load_api_config
api_config = load_api_config(current_app.config)
@stream_with_context
def generate():
# Step 1: 收集问题数据
yield sse_format(
{"step": "collecting", "message": "正在收集问题数据...", "progress": 10}
)
# Step 2: 生成基础方案
yield sse_format(
{"step": "basic", "message": "正在生成基础练习方案...", "progress": 30}
)
plan_content = generate_practice_plan(
student_name=student.name,
problems=problem_data,
practice_time=practice_time,
goals=goal_data,
)
yield sse_format(
{"step": "basic_done", "message": "基础方案生成完成", "progress": 50}
)
# Step 3: 如果启用AI,生成AI报告
ai_report = None
if use_ai:
# 显示AI配置信息
yield sse_format(
{
"step": "ai_config",
"message": f"AI配置: {api_config.get('model', 'N/A')} @ {api_config.get('provider', 'N/A')}",
"progress": 55,
"detail": f"Endpoint: {api_config.get('base_url', '')}",
}
)
yield sse_format(
{
"step": "ai_start",
"message": "正在调用AI生成个性化报告...",
"progress": 60,
}
)
# 构建问题摘要用于显示
problems_summary = []
for p in problem_data[:3]: # 只显示前3个问题
level = p.get("level", "入门")
severity = p.get("severity", "中等")
problems_summary.append(f"- {p['problem_name']} [{level} | {severity}]")
yield sse_format(
{
"step": "ai_problems",
"message": "发送问题给AI:",
"progress": 62,
"detail": "\n".join(problems_summary),
}
)
yield sse_format(
{"step": "ai_request", "message": "等待AI响应...", "progress": 65}
)
# 先用dry_run模式获取提示词并显示给用户
prompt, _, error, extra_info = generate_ai_report(
student_name=student.name,
wechat_nickname=student.wechat_nickname or "",
problems=problem_data,
practice_time=practice_time,
time_config=time_config,
template_id=template_id,
dry_run=True,
goals=goal_data
)
# 发送提示词给前端显示(只发长度,不发完整内容避免SSE缓冲问题)
prompt_length = len(prompt) if prompt else 0
yield sse_format({
"step": "ai_prompt",
"message": "AI提示词已生成",
"progress": 60,
"prompt_length": prompt_length,
"student_problems_length": extra_info.get("student_problems_length", 0) if extra_info else 0,
"problems_length": extra_info.get("problems_length", 0) if extra_info else 0,
"student_goals_length": extra_info.get("student_goals_length", 0) if extra_info else 0,
})
if error:
yield sse_format(
{
"step": "ai_error",
"message": f"获取提示词失败: {error}",
"progress": 80,
"error": error,
}
)
return
yield sse_format(
{"step": "ai_generating", "message": "正在生成AI报告...", "progress": 70}
)
# 真正调用API生成报告
_, ai_report, error, _ = generate_ai_report(
student_name=student.name,
wechat_nickname=student.wechat_nickname or "",
problems=problem_data,
practice_time=practice_time,
time_config=time_config,
template_id=template_id,
dry_run=False,
goals=goal_data
)
if error:
yield sse_format(
{
"step": "ai_error",
"message": f"AI生成失败: {error}",
"progress": 80,
"error": error,
}
)
plan_content["ai_report_error"] = error
else:
# 显示AI返回的报告长度
report_lines = len(ai_report.split("\n")) if ai_report else 0
yield sse_format(
{
"step": "ai_response",
"message": f"AI报告已生成",
"progress": 75,
"detail": f"报告长度: {len(ai_report) if ai_report else 0} 字符, {report_lines}",
}
)
yield sse_format(
{
"step": "ai_done",
"message": "AI报告生成完成",
"progress": 80,
"has_ai": True,
}
)
plan_content["ai_report"] = ai_report
else:
yield sse_format(
{"step": "ai_skip", "message": "跳过AI生成", "progress": 80}
)
# Step 4: 保存方案
yield sse_format(
{"step": "saving", "message": "正在保存方案...", "progress": 90}
)
plan = None
try:
plan = PracticePlan(
student_id=student_id,
template_id=template_id,
content=json.dumps(plan_content, ensure_ascii=False),
created_by=session.get('user_id')
)
db.session.add(plan)
db.session.commit()
yield sse_format(
{
"step": "saved",
"message": f"方案已保存 (ID: {plan.id})",
"progress": 95,
}
)
except Exception as e:
db.session.rollback()
yield sse_format(
{
"step": "save_error",
"message": f"保存失败: {str(e)}",
"progress": 90,
"error": str(e),
}
)
# 确保 complete 消息被发送
try:
yield sse_format(
{
"step": "complete",
"message": "方案生成完成!",
"progress": 100,
"plan_id": plan.id if plan and hasattr(plan, 'id') else None,
"content": plan_content,
"ai_report": ai_report,
"prompt_length": prompt_length,
"ai_report_length": len(ai_report) if ai_report else 0,
}
)
except Exception as e:
# 最后一道防护 - 即使出错也要发送 complete
yield sse_format(
{
"step": "complete",
"message": f"方案生成完成!(部分错误: {str(e)[:50]}",
"progress": 100,
"plan_id": None,
"error": str(e),
}
)
return Response(
generate(),
mimetype="text/event-stream",
headers={
"X-Accel-Buffering": "no",
"Cache-Control": "no-cache",
}
)
@main_bp.route("/api/plans/<int:plan_id>", methods=["GET"])
@login_required_json
def get_plan(plan_id):
"""获取单个方案详情"""
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
return jsonify(
{
"id": plan.id,
"student_id": plan.student_id,
"student_name": plan.student.name if plan.student else "",
"created_at": plan.created_at.strftime("%Y-%m-%d %H:%M"),
"updated_at": plan.updated_at.strftime("%Y-%m-%d %H:%M") if plan.updated_at else None,
"updated_by_name": plan.updater.name if plan.updated_by and plan.updater else None,
"is_typical": plan.is_typical,
"content": content,
}
)
@main_bp.route("/api/plans/<int:plan_id>/pdf", methods=["GET"])
@login_required_json
def export_pdf(plan_id):
"""导出PDF - 使用模板"""
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
student_name = plan.student.name if plan.student else "未知学员"
template_id = request.args.get('template_id', type=int)
# 获取报告模板
report_template = None
try:
from app.models import Template
if template_id:
tmpl = Template.query.get(template_id)
if tmpl and tmpl.type == "report":
report_template = tmpl.content
else:
tmpl = Template.query.filter_by(type="report").order_by(Template.sort_order.asc()).first()
if tmpl:
report_template = tmpl.content
except:
pass
# 获取API配置(水印文本等)
from app.config import load_api_config
api_config = load_api_config(current_app.config)
# 如果有模板,先渲染
rendered_report = None
if report_template:
rendered_report = report_template
rendered_report = rendered_report.replace("{student_name}", student_name)
rendered_report = rendered_report.replace("{practice_time}", content.get('practice_time', 'N/A'))
rendered_report = rendered_report.replace("{total_minutes}", str(content.get('total_daily_minutes', 0)))
rendered_report = rendered_report.replace("{generated_at}", content.get('generated_at', ''))
# 获取当前用户姓名
from app.models import User
user_id = session.get('user_id')
user_name = '未知'
if user_id:
user = User.query.get(user_id)
if user and user.name:
user_name = user.name
rendered_report = rendered_report.replace("{generated_by}", user_name)
if content.get('ai_report'):
rendered_report = rendered_report.replace("{ai_report}", content['ai_report'])
else:
rendered_report = rendered_report.replace("{ai_report}", "(未生成AI报告)")
problem_tags = ""
for problem in content.get('problems', []):
problem_tags += f"- **{problem.get('name', '')}** ({problem.get('severity', '')})\n"
rendered_report = rendered_report.replace("{problem_tags}", problem_tags or "(无)")
# 替换学员目标
student_goals_list = StudentGoal.query.filter_by(student_id=plan.student_id).all() if plan.student_id else []
goals_text_parts = []
for g in student_goals_list:
if g.status != "已完成":
goals_text_parts.append(f"- **{g.goal.name}**\n {g.goal.content if g.goal else '未提供具体内容'}")
goals_text = "\n".join(goals_text_parts) if goals_text_parts else "(无)"
rendered_report = rendered_report.replace("{student_goals}", goals_text)
pdf_path = generate_pdf(
plan_id=plan_id,
student_name=student_name,
content=content,
output_dir=current_app.config["PDF_OUTPUT_DIR"],
rendered_report=rendered_report, # 传递渲染后的报告
watermark_text=api_config.get("watermark_text"),
)
return send_file(
pdf_path, as_attachment=True, download_name=f"{student_name}_练习方案.pdf"
)
@main_bp.route("/api/plans/<int:plan_id>/md", methods=["GET"])
@login_required_json
def export_md(plan_id):
"""导出Markdown"""
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
student_name = plan.student.name if plan.student else "未知学员"
template_id = request.args.get('template_id', type=int)
# 获取报告模板
report_template = None
try:
from app.models import Template
if template_id:
tmpl = Template.query.get(template_id)
if tmpl and tmpl.type == "report":
report_template = tmpl.content
else:
tmpl = Template.query.filter_by(type="report").order_by(Template.sort_order.asc()).first()
if tmpl:
report_template = tmpl.content
except:
pass
if report_template:
# 使用模板渲染
# 替换变量
rendered = report_template
rendered = rendered.replace("{student_name}", student_name)
rendered = rendered.replace("{practice_time}", content.get('practice_time', 'N/A'))
rendered = rendered.replace("{total_minutes}", str(content.get('total_daily_minutes', 0)))
rendered = rendered.replace("{generated_at}", content.get('generated_at', ''))
# 获取当前用户姓名
from app.models import User
user_id = session.get('user_id')
user_name = '未知'
if user_id:
user = User.query.get(user_id)
if user and user.name:
user_name = user.name
rendered = rendered.replace("{generated_by}", user_name)
# AI报告
if content.get('ai_report'):
rendered = rendered.replace("{ai_report}", content['ai_report'])
else:
rendered = rendered.replace("{ai_report}", "(未生成AI报告)")
# 问题诊断标签
problem_tags = ""
for problem in content.get('problems', []):
problem_tags += f"- **{problem.get('name', '')}** ({problem.get('severity', '')})\n"
rendered = rendered.replace("{problem_tags}", problem_tags or "(无)")
md_content = rendered
else:
# 兜底:生成完整内容(AI报告已包含所有内容)
md_lines = [
f"# {student_name} - 个性化练习方案\n",
f"**练习时间**: {content.get('practice_time', 'N/A')} (共{content.get('total_daily_minutes', 0)}分钟)\n",
f"**生成时间**: {content.get('generated_at', '')}\n",
"\n---\n",
]
if content.get('ai_report'):
md_lines.append(content['ai_report'])
md_content = ''.join(md_lines)
from urllib.parse import quote
filename = quote(f"{student_name}_练习方案.md")
return Response(
md_content,
mimetype='text/markdown; charset=utf-8',
headers={
'Content-Disposition': f"attachment; filename*=UTF-8''{filename}"
}
)
@main_bp.route("/api/plans/<int:plan_id>/preview", methods=["GET"])
@login_required_json
def preview_report(plan_id):
"""预览报告模板渲染结果 - 返回HTML片段"""
import markdown
from app.models import Template
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
student_name = plan.student.name if plan.student else "未知学员"
# 获取选中的报告模板
template_id = request.args.get('template_id', type=int)
report_template = None
try:
if template_id:
tmpl = Template.query.get(template_id)
if tmpl and tmpl.type == "report":
report_template = tmpl.content
else:
tmpl = Template.query.filter_by(type="report").order_by(Template.sort_order.asc()).first()
if tmpl:
report_template = tmpl.content
except:
pass
if not report_template:
return "<div class='text-muted'>无可用模板</div>", 200
# 占位符替换(复用 export_pdf 逻辑)
rendered = report_template
rendered = rendered.replace("{student_name}", student_name)
rendered = rendered.replace("{practice_time}", content.get('practice_time', 'N/A'))
rendered = rendered.replace("{total_minutes}", str(content.get('total_daily_minutes', 0)))
rendered = rendered.replace("{generated_at}", content.get('generated_at', ''))
from app.models import User
user_id = session.get('user_id')
user_name = '未知'
if user_id:
user = User.query.get(user_id)
if user and user.name:
user_name = user.name
rendered = rendered.replace("{generated_by}", user_name)
if content.get('ai_report'):
rendered = rendered.replace("{ai_report}", content['ai_report'])
else:
rendered = rendered.replace("{ai_report}", "(未生成AI报告)")
problem_tags = ""
for problem in content.get('problems', []):
problem_tags += f"- **{problem.get('name', '')}** ({problem.get('severity', '')})\n"
rendered = rendered.replace("{problem_tags}", problem_tags or "(无)")
# 学员目标
from app.models import StudentGoal
student_goals_list = StudentGoal.query.filter_by(student_id=plan.student_id).all() if plan.student_id else []
goals_text_parts = []
for g in student_goals_list:
if g.status != "已完成":
goal_content = (g.goal.content if g.goal else '未提供具体内容').replace('\n', '<br>')
goals_text_parts.append(f"- **{g.goal.name}**<br>{goal_content}")
goals_text = "<br>".join(goals_text_parts) if goals_text_parts else "(无)"
rendered = rendered.replace("{student_goals}", goals_text)
# Markdown 转 HTML
html_content = markdown.markdown(rendered, extensions=['tables', 'fenced_code'])
# URL 转二维码图片(预览用)
import re
import qrcode
import base64
from io import BytesIO
def make_qr_base64(url):
qr = qrcode.make(url, box_size=5)
buf = BytesIO()
qr.save(buf, format='PNG')
return base64.b64encode(buf.getvalue()).decode()
def replace_url_with_qr(match):
url = match.group(0)
try:
qr_b64 = make_qr_base64(url)
return f'<br><img src="data:image/png;base64,{qr_b64}" alt="QR" style="width:80px;display:block;margin:8px auto;">'
except:
return url
url_pattern = re.compile(r'https?://[^\s<>"{}|\\^`\[\]]+')
html_content = url_pattern.sub(replace_url_with_qr, html_content)
# 支持 ReportLab <para alignment="center"> 语法,转为 HTML
import re
html_content = re.sub(
r'<para\s+alignment="center">(.+?)</para>',
r'<div style="text-align:center">\1</div>',
html_content
)
return html_content, 200, {'Content-Type': 'text/html; charset=utf-8'}
@main_bp.route("/plans/<int:plan_id>/wechat", methods=["GET"])
@login_required_json
def wechat_card(plan_id):
"""微信卡片展示页"""
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
return render_template(
"wechat_card.html",
plan_id=plan_id,
student_name=plan.student.name if plan.student else "",
content=content,
)
@main_bp.route("/api/plans/<int:plan_id>", methods=["DELETE"])
@login_required_json
def delete_plan(plan_id):
"""删除方案"""
plan = PracticePlan.query.get_or_404(plan_id)
db.session.delete(plan)
db.session.commit()
return jsonify({"message": "删除成功"})
@main_bp.route("/api/plans/<int:plan_id>/content", methods=["PUT"])
@login_required_json
def update_plan_content(plan_id):
"""更新方案内容(用于编辑)"""
from datetime import datetime
plan = PracticePlan.query.get_or_404(plan_id)
data = request.get_json()
# 合并content字段 - 保留原有字段,只更新ai_report和daily_schedule
if "content" in data:
new_content_str = data["content"]
existing_content = json.loads(plan.content) if plan.content else {}
# 解析新content(可能是字符串或对象)
if isinstance(new_content_str, str):
new_content = json.loads(new_content_str)
else:
new_content = new_content_str
# 合并:保留existing中的所有字段,用new_content覆盖ai_report和daily_schedule
merged = existing_content.copy()
merged.update({
"ai_report": new_content.get("ai_report", ""),
"daily_schedule": new_content.get("daily_schedule", [])
})
plan.content = json.dumps(merged, ensure_ascii=False)
plan.updated_by = session.get('user_id')
plan.updated_at = datetime.now()
db.session.commit()
return jsonify({"message": "保存成功"})
@main_bp.route("/api/generate-plan/preview", methods=["POST"])
@login_required_json
def generate_plan_preview():
"""预览AI提示词 - 返回完整提示词内容"""
data = request.get_json()
student_id = data.get("student_id")
template_id = data.get("template_id") # AI提示词模板ID
student = Student.query.get_or_404(student_id)
problems = student.problems.all()
if not problems:
return jsonify({"error": "请先记录学员的问题"}), 400
# 获取学员的目标
goals = StudentGoal.query.filter_by(student_id=student_id).all()
goal_data = [g.to_dict() for g in goals]
# 学员的统一练习时间
practice_time = student.practice_time or "30-60分钟"
problem_data = []
for p in problems:
problem_obj = p.problem
if problem_obj:
problem_data.append(
{
"problem_id": problem_obj.id,
"problem_name": problem_obj.name,
"problem_no": problem_obj.no,
"severity": p.severity,
"level": p.level,
"content": problem_obj.content or "",
}
)
time_mapping = {
"15分钟": {"total": 15, "basic": 10, "tech": 2, "piece": 3},
"30分钟": {"total": 30, "basic": 15, "tech": 5, "piece": 10},
"45分钟": {"total": 45, "basic": 15, "tech": 10, "piece": 20},
"60分钟": {"total": 60, "basic": 20, "tech": 15, "piece": 25},
"90分钟": {"total": 90, "basic": 25, "tech": 25, "piece": 40},
"120分钟": {"total": 120, "basic": 30, "tech": 35, "piece": 55},
"150分钟以上": {"total": 150, "basic": 35, "tech": 45, "piece": 70},
}
time_config = time_mapping.get(practice_time, time_mapping["30分钟"])
# 调用生成器获取提示词(dry_run模式)
from app.services.plan_generator import generate_ai_report
prompt, _, _, extra_info = generate_ai_report(
student_name=student.name,
wechat_nickname=student.wechat_nickname or "",
problems=problem_data,
practice_time=practice_time,
time_config=time_config,
template_id=template_id,
dry_run=True,
goals=goal_data
)
return jsonify({
"prompt": prompt,
"prompt_length": len(prompt) if prompt else 0,
"student_problems_length": extra_info.get("student_problems_length", 0) if extra_info else 0,
"problems_length": extra_info.get("problems_length", 0) if extra_info else 0,
"student_goals_length": extra_info.get("student_goals_length", 0) if extra_info else 0,
})