# 方案生成路由 import json import os from flask import ( request, jsonify, render_template, send_file, current_app, Response, stream_with_context, session, ) from app.routes import main_bp from app.models import db, Student, PracticePlan, StudentProblem, StudentGoal, Class from app.services.plan_generator import generate_practice_plan, generate_ai_report from app.services.pdf_generator import generate_pdf from app.routes.auth import login_required_json, admin_required def sse_format(data): """格式化SSE数据""" return f"data: {json.dumps(data)}\n\n" @main_bp.route("/api/students//plans", methods=["GET"]) @login_required_json def get_student_plans(student_id): """获取学员的练习方案列表""" student = Student.query.get_or_404(student_id) plans = student.plans.order_by(PracticePlan.created_at.desc()).all() return jsonify([p.to_dict() for p in plans]) @main_bp.route("/api/plans", methods=["GET"]) @login_required_json def get_all_plans(): """获取所有方案(支持多条件筛选) 查询参数: - class_id: 班级ID - problem_ids: Problem.id 列表,逗号分隔 - template_id: 模板ID - is_typical: 是否典型 (true/false) - student_name: 学员姓名(模糊匹配) - mine: true/false(我的学员的方案) """ import json as json_module from app.models import Class from sqlalchemy.orm import joinedload from sqlalchemy import exists query = PracticePlan.query # 检查是否需要 join Student needs_student_join = False needs_class_join = False class_id = request.args.get('class_id', type=int) student_name = request.args.get('student_name') problem_ids = request.args.get('problem_ids') mine = request.args.get('mine') if class_id or student_name or problem_ids or (mine and mine.lower() == 'true'): needs_student_join = True query = query.join(Student) if mine and mine.lower() == 'true': needs_class_join = True query = query.join(Class) # 按班级筛选 if class_id: query = query.filter(Student.class_id == class_id) # 按模板筛选 template_id = request.args.get('template_id', type=int) if template_id: query = query.filter(PracticePlan.template_id == template_id) # 按典型状态筛选 is_typical = request.args.get('is_typical') if is_typical and is_typical.lower() == 'true': query = query.filter(PracticePlan.is_typical == True) # 按学员姓名模糊筛选 if student_name: query = query.filter(Student.name.like(f'%%{student_name}%%')) # 按问题筛选(通过 problem_id 关联到学员的问题) if problem_ids: problem_id_list = [int(pid.strip()) for pid in problem_ids.split(',') if pid.strip()] if problem_id_list: query = query.filter( exists().where( (StudentProblem.student_id == Student.id) & (StudentProblem.problem_id.in_(problem_id_list)) ) ) # 我的学员筛选(所在班级的老师是当前用户) if needs_class_join: user_id = session.get('user_id') if user_id: query = query.filter(Class.teacher_id == user_id) plans = query.order_by(PracticePlan.created_at.desc()).all() return jsonify([p.to_dict() for p in plans]) @main_bp.route("/api/plans//typical", methods=["POST"]) @login_required_json def toggle_plan_typical(plan_id): """切换方案的典型状态""" plan = PracticePlan.query.get_or_404(plan_id) plan.is_typical = not plan.is_typical db.session.commit() return jsonify({"success": True, "is_typical": plan.is_typical}) @main_bp.route("/api/students//recommended-plans", methods=["GET"]) @login_required_json def get_recommended_plans(student_id): """获取推荐方案 - 当前学员问题与典型方案问题有交集的方案 查询参数: - mine: true/false(我的学员的典型方案) 返回字段: - can_adopt: 问题集合是否完全一致,可采纳 """ try: student = Student.query.get_or_404(student_id) # 获取当前学员的问题名称集合 student_problems = StudentProblem.query.filter_by(student_id=student_id).all() # 通过 Problem 关联获取问题名称 student_problem_names = set() for sp in student_problems: if sp.problem: student_problem_names.add(sp.problem.name) # 获取所有典型方案,排除当前学员自己的方案 query = PracticePlan.query.filter( PracticePlan.is_typical == True, PracticePlan.student_id != student_id ) # 我的筛选:只显示当前用户创建的学员的典型方案 mine = request.args.get('mine') if mine and mine.lower() == 'true': user_id = session.get('user_id') if user_id: query = query.join(Student).join(Class).filter(Class.teacher_id == user_id) typical_plans = query.order_by(PracticePlan.created_at.desc()).all() # 筛选:方案的问题与当前学员的问题有交集 import json as json_module recommended = [] for plan in typical_plans: try: content = json_module.loads(plan.content) if plan.content else {} except: continue plan_problems = content.get('problems', []) # 提取方案中的问题名称 plan_problem_names = set() for p in plan_problems: name = p.get('name') or p.get('problem_name', '') if name: plan_problem_names.add(name) # 检查交集 if student_problem_names & plan_problem_names: # 计算交集问题 matched_problems = student_problem_names & plan_problem_names plan_dict = plan.to_dict() plan_dict['matched_problems'] = list(matched_problems) plan_dict['matched_count'] = len(matched_problems) # 检查问题集合是否完全一致(可采纳) plan_dict['can_adopt'] = (student_problem_names == plan_problem_names) recommended.append(plan_dict) # 按匹配数量降序排序 recommended.sort(key=lambda x: x['matched_count'], reverse=True) return jsonify(recommended) except Exception as e: import traceback traceback.print_exc() return jsonify({"error": str(e)}), 500 @main_bp.route("/api/students//plans/from-typical/", methods=["POST"]) @login_required_json def adopt_typical_plan(student_id, plan_id): """采纳典型方案 - 复制该方案给当前学员 前端已判断 can_adopt,后端直接采纳并记录来源 """ student = Student.query.get_or_404(student_id) typical_plan = PracticePlan.query.get_or_404(plan_id) if not typical_plan.is_typical: return jsonify({"error": "只能采纳典型方案"}), 400 # 获取典型方案的问题名称集合并验证一致性 try: content = json.loads(typical_plan.content) if typical_plan.content else {} except: content = {} plan_problems = content.get('problems', []) plan_problem_names = set() for p in plan_problems: name = p.get('name') or p.get('problem_name', '') if name: plan_problem_names.add(name) # 获取当前学员的问题名称集合并验证一致性 student_problems = StudentProblem.query.filter_by(student_id=student_id).all() student_problem_names = set() for sp in student_problems: if sp.problem: student_problem_names.add(sp.problem.name) # 检查问题名称集合是否完全一致 if student_problem_names != plan_problem_names: return jsonify({ "error": "采纳失败:方案的问题与当前学员的问题不一致", "student_problems": list(student_problem_names), "plan_problems": list(plan_problem_names) }), 400 # 替换内容中的原学员姓名为当前学员姓名 old_name = typical_plan.student.name if typical_plan.student else "" if old_name and old_name in str(content): content_str = json.dumps(content, ensure_ascii=False) content_str = content_str.replace(old_name, student.name) content = json.loads(content_str) # 添加采纳来源信息 from datetime import datetime content['adopted_from'] = { 'student_name': old_name, 'plan_id': typical_plan.id, 'adopted_at': datetime.now().strftime('%Y-%m-%d') } # 创建新方案 new_plan = PracticePlan( student_id=student_id, template_id=typical_plan.template_id, content=json.dumps(content, ensure_ascii=False), is_typical=False, # 采纳的方案不再是典型 created_by=session.get('user_id') ) db.session.add(new_plan) db.session.commit() return jsonify({ "message": "方案已采纳", "plan_id": new_plan.id, "plan": new_plan.to_dict() }), 201 @main_bp.route("/plans") @login_required_json def plans_page(): """方案管理页面""" return render_template("plans.html", active_nav="plans") @main_bp.route("/api/admin/fix-plan-templates", methods=["GET"]) @admin_required def fix_plan_templates(): """临时接口:修复所有方案的模板关联""" from app.models import Template, PracticePlan simple_template = Template.query.filter_by(name='简单文字版').first() formal_template = Template.query.filter_by(name='正式报告版').first() if not simple_template or not formal_template: return jsonify({"error": "模板没找到"}), 400 plans = PracticePlan.query.order_by(PracticePlan.created_at.desc()).all() if plans: plans[0].template_id = simple_template.id for plan in plans[1:]: plan.template_id = formal_template.id db.session.commit() return jsonify({"ok": True, "updated": len(plans)}) @main_bp.route("/plan/") @login_required_json def plan_detail_page(plan_id): """方案详情页面""" return render_template("plan_detail.html", active_nav="plans") @main_bp.route("/plan//edit") @login_required_json def plan_edit_page(plan_id): """方案编辑页面""" return render_template("plan_edit.html", active_nav="plans", plan_id=plan_id) @main_bp.route("/api/generate-plan", methods=["POST"]) @login_required_json def generate_plan(): """生成练习方案 - 使用SSE实时推送进度""" data = request.get_json() student_id = data.get("student_id") use_ai = data.get("use_ai", True) template_id = data.get("template_id") # AI提示词模板ID student = Student.query.get_or_404(student_id) problems = student.problems.all() # 获取学员的目标(只获取未完成的) goals = StudentGoal.query.filter_by(student_id=student_id).all() goal_data = [g.to_dict() for g in goals] if not problems: return jsonify({"error": "请先记录学员的问题"}), 400 # 预先收集所有数据,避免在generator中访问数据库 # 学员的统一练习时间 practice_time = student.practice_time or "30-60分钟" problem_data = [] for p in problems: # 使用 Problem 关联获取问题信息 problem_obj = p.problem if problem_obj: problem_data.append( { "problem_id": problem_obj.id, # 使用 Problem.id "problem_name": problem_obj.name, "problem_no": problem_obj.no, "severity": p.severity, "level": p.level, "content": problem_obj.content or "", } ) time_mapping = { "15分钟": {"total": 15, "basic": 10, "tech": 2, "piece": 3}, "30分钟": {"total": 30, "basic": 15, "tech": 5, "piece": 10}, "45分钟": {"total": 45, "basic": 15, "tech": 10, "piece": 20}, "60分钟": {"total": 60, "basic": 20, "tech": 15, "piece": 25}, "90分钟": {"total": 90, "basic": 25, "tech": 25, "piece": 40}, "120分钟": {"total": 120, "basic": 30, "tech": 35, "piece": 55}, "150分钟以上": {"total": 150, "basic": 35, "tech": 45, "piece": 70}, } time_config = time_mapping.get(practice_time, time_mapping["30分钟"]) # 加载API配置 from app.config import load_api_config api_config = load_api_config(current_app.config) @stream_with_context def generate(): # Step 1: 收集问题数据 yield sse_format( {"step": "collecting", "message": "正在收集问题数据...", "progress": 10} ) # Step 2: 生成基础方案 yield sse_format( {"step": "basic", "message": "正在生成基础练习方案...", "progress": 30} ) plan_content = generate_practice_plan( student_name=student.name, problems=problem_data, practice_time=practice_time, goals=goal_data, ) yield sse_format( {"step": "basic_done", "message": "基础方案生成完成", "progress": 50} ) # Step 3: 如果启用AI,生成AI报告 ai_report = None if use_ai: # 显示AI配置信息 yield sse_format( { "step": "ai_config", "message": f"AI配置: {api_config.get('model', 'N/A')} @ {api_config.get('provider', 'N/A')}", "progress": 55, "detail": f"Endpoint: {api_config.get('base_url', '')}", } ) yield sse_format( { "step": "ai_start", "message": "正在调用AI生成个性化报告...", "progress": 60, } ) # 构建问题摘要用于显示 problems_summary = [] for p in problem_data[:3]: # 只显示前3个问题 level = p.get("level", "入门") severity = p.get("severity", "中等") problems_summary.append(f"- {p['problem_name']} [{level} | {severity}]") yield sse_format( { "step": "ai_problems", "message": "发送问题给AI:", "progress": 62, "detail": "\n".join(problems_summary), } ) yield sse_format( {"step": "ai_request", "message": "等待AI响应...", "progress": 65} ) # 先用dry_run模式获取提示词并显示给用户 prompt, _, error, extra_info = generate_ai_report( student_name=student.name, wechat_nickname=student.wechat_nickname or "", problems=problem_data, practice_time=practice_time, time_config=time_config, template_id=template_id, dry_run=True, goals=goal_data ) # 发送提示词给前端显示(只发长度,不发完整内容避免SSE缓冲问题) prompt_length = len(prompt) if prompt else 0 yield sse_format({ "step": "ai_prompt", "message": "AI提示词已生成", "progress": 60, "prompt_length": prompt_length, "student_problems_length": extra_info.get("student_problems_length", 0) if extra_info else 0, "problems_length": extra_info.get("problems_length", 0) if extra_info else 0, "student_goals_length": extra_info.get("student_goals_length", 0) if extra_info else 0, }) if error: yield sse_format( { "step": "ai_error", "message": f"获取提示词失败: {error}", "progress": 80, "error": error, } ) return yield sse_format( {"step": "ai_generating", "message": "正在生成AI报告...", "progress": 70} ) # 真正调用API生成报告 _, ai_report, error, _ = generate_ai_report( student_name=student.name, wechat_nickname=student.wechat_nickname or "", problems=problem_data, practice_time=practice_time, time_config=time_config, template_id=template_id, dry_run=False, goals=goal_data ) if error: yield sse_format( { "step": "ai_error", "message": f"AI生成失败: {error}", "progress": 80, "error": error, } ) plan_content["ai_report_error"] = error else: # 显示AI返回的报告长度 report_lines = len(ai_report.split("\n")) if ai_report else 0 yield sse_format( { "step": "ai_response", "message": f"AI报告已生成", "progress": 75, "detail": f"报告长度: {len(ai_report) if ai_report else 0} 字符, {report_lines} 行", } ) yield sse_format( { "step": "ai_done", "message": "AI报告生成完成", "progress": 80, "has_ai": True, } ) plan_content["ai_report"] = ai_report else: yield sse_format( {"step": "ai_skip", "message": "跳过AI生成", "progress": 80} ) # Step 4: 保存方案 yield sse_format( {"step": "saving", "message": "正在保存方案...", "progress": 90} ) plan = None try: plan = PracticePlan( student_id=student_id, template_id=template_id, content=json.dumps(plan_content, ensure_ascii=False), created_by=session.get('user_id') ) db.session.add(plan) db.session.commit() yield sse_format( { "step": "saved", "message": f"方案已保存 (ID: {plan.id})", "progress": 95, } ) except Exception as e: db.session.rollback() yield sse_format( { "step": "save_error", "message": f"保存失败: {str(e)}", "progress": 90, "error": str(e), } ) # 确保 complete 消息被发送 try: yield sse_format( { "step": "complete", "message": "方案生成完成!", "progress": 100, "plan_id": plan.id if plan and hasattr(plan, 'id') else None, "content": plan_content, "ai_report": ai_report, "prompt_length": prompt_length, "ai_report_length": len(ai_report) if ai_report else 0, } ) except Exception as e: # 最后一道防护 - 即使出错也要发送 complete yield sse_format( { "step": "complete", "message": f"方案生成完成!(部分错误: {str(e)[:50]})", "progress": 100, "plan_id": None, "error": str(e), } ) return Response( generate(), mimetype="text/event-stream", headers={ "X-Accel-Buffering": "no", "Cache-Control": "no-cache", } ) @main_bp.route("/api/plans/", methods=["GET"]) @login_required_json def get_plan(plan_id): """获取单个方案详情""" plan = PracticePlan.query.get_or_404(plan_id) content = json.loads(plan.content) return jsonify( { "id": plan.id, "student_id": plan.student_id, "student_name": plan.student.name if plan.student else "", "created_at": plan.created_at.strftime("%Y-%m-%d %H:%M"), "updated_at": plan.updated_at.strftime("%Y-%m-%d %H:%M") if plan.updated_at else None, "updated_by_name": plan.updater.name if plan.updated_by and plan.updater else None, "is_typical": plan.is_typical, "content": content, } ) @main_bp.route("/api/plans//pdf", methods=["GET"]) @login_required_json def export_pdf(plan_id): """导出PDF - 使用模板""" plan = PracticePlan.query.get_or_404(plan_id) content = json.loads(plan.content) student_name = plan.student.name if plan.student else "未知学员" template_id = request.args.get('template_id', type=int) # 获取报告模板 report_template = None try: from app.models import Template if template_id: tmpl = Template.query.get(template_id) if tmpl and tmpl.type == "report": report_template = tmpl.content else: tmpl = Template.query.filter_by(type="report").order_by(Template.sort_order.asc()).first() if tmpl: report_template = tmpl.content except: pass # 获取API配置(水印文本等) from app.config import load_api_config api_config = load_api_config(current_app.config) # 如果有模板,先渲染 rendered_report = None if report_template: rendered_report = report_template rendered_report = rendered_report.replace("{student_name}", student_name) rendered_report = rendered_report.replace("{practice_time}", content.get('practice_time', 'N/A')) rendered_report = rendered_report.replace("{total_minutes}", str(content.get('total_daily_minutes', 0))) rendered_report = rendered_report.replace("{generated_at}", content.get('generated_at', '')) # 获取当前用户姓名 from app.models import User user_id = session.get('user_id') user_name = '未知' if user_id: user = User.query.get(user_id) if user and user.name: user_name = user.name rendered_report = rendered_report.replace("{generated_by}", user_name) if content.get('ai_report'): rendered_report = rendered_report.replace("{ai_report}", content['ai_report']) else: rendered_report = rendered_report.replace("{ai_report}", "(未生成AI报告)") problem_tags = "" for problem in content.get('problems', []): problem_tags += f"- **{problem.get('name', '')}** ({problem.get('severity', '')})\n" rendered_report = rendered_report.replace("{problem_tags}", problem_tags or "(无)") pdf_path = generate_pdf( plan_id=plan_id, student_name=student_name, content=content, output_dir=current_app.config["PDF_OUTPUT_DIR"], rendered_report=rendered_report, # 传递渲染后的报告 watermark_text=api_config.get("watermark_text"), ) return send_file( pdf_path, as_attachment=True, download_name=f"{student_name}_练习方案.pdf" ) @main_bp.route("/api/plans//md", methods=["GET"]) @login_required_json def export_md(plan_id): """导出Markdown""" plan = PracticePlan.query.get_or_404(plan_id) content = json.loads(plan.content) student_name = plan.student.name if plan.student else "未知学员" template_id = request.args.get('template_id', type=int) # 获取报告模板 report_template = None try: from app.models import Template if template_id: tmpl = Template.query.get(template_id) if tmpl and tmpl.type == "report": report_template = tmpl.content else: tmpl = Template.query.filter_by(type="report").order_by(Template.sort_order.asc()).first() if tmpl: report_template = tmpl.content except: pass if report_template: # 使用模板渲染 # 替换变量 rendered = report_template rendered = rendered.replace("{student_name}", student_name) rendered = rendered.replace("{practice_time}", content.get('practice_time', 'N/A')) rendered = rendered.replace("{total_minutes}", str(content.get('total_daily_minutes', 0))) rendered = rendered.replace("{generated_at}", content.get('generated_at', '')) # 获取当前用户姓名 from app.models import User user_id = session.get('user_id') user_name = '未知' if user_id: user = User.query.get(user_id) if user and user.name: user_name = user.name rendered = rendered.replace("{generated_by}", user_name) # AI报告 if content.get('ai_report'): rendered = rendered.replace("{ai_report}", content['ai_report']) else: rendered = rendered.replace("{ai_report}", "(未生成AI报告)") # 问题诊断标签 problem_tags = "" for problem in content.get('problems', []): problem_tags += f"- **{problem.get('name', '')}** ({problem.get('severity', '')})\n" rendered = rendered.replace("{problem_tags}", problem_tags or "(无)") md_content = rendered else: # 兜底:生成完整内容(AI报告已包含所有内容) md_lines = [ f"# {student_name} - 个性化练习方案\n", f"**练习时间**: {content.get('practice_time', 'N/A')} (共{content.get('total_daily_minutes', 0)}分钟)\n", f"**生成时间**: {content.get('generated_at', '')}\n", "\n---\n", ] if content.get('ai_report'): md_lines.append(content['ai_report']) md_content = ''.join(md_lines) from urllib.parse import quote filename = quote(f"{student_name}_练习方案.md") return Response( md_content, mimetype='text/markdown; charset=utf-8', headers={ 'Content-Disposition': f"attachment; filename*=UTF-8''{filename}" } ) @main_bp.route("/plans//wechat", methods=["GET"]) @login_required_json def wechat_card(plan_id): """微信卡片展示页""" plan = PracticePlan.query.get_or_404(plan_id) content = json.loads(plan.content) return render_template( "wechat_card.html", plan_id=plan_id, student_name=plan.student.name if plan.student else "", content=content, ) @main_bp.route("/api/plans/", methods=["DELETE"]) @login_required_json def delete_plan(plan_id): """删除方案""" plan = PracticePlan.query.get_or_404(plan_id) db.session.delete(plan) db.session.commit() return jsonify({"message": "删除成功"}) @main_bp.route("/api/plans//content", methods=["PUT"]) @login_required_json def update_plan_content(plan_id): """更新方案内容(用于编辑)""" from datetime import datetime plan = PracticePlan.query.get_or_404(plan_id) data = request.get_json() # 合并content字段 - 保留原有字段,只更新ai_report和daily_schedule if "content" in data: new_content_str = data["content"] existing_content = json.loads(plan.content) if plan.content else {} # 解析新content(可能是字符串或对象) if isinstance(new_content_str, str): new_content = json.loads(new_content_str) else: new_content = new_content_str # 合并:保留existing中的所有字段,用new_content覆盖ai_report和daily_schedule merged = existing_content.copy() merged.update({ "ai_report": new_content.get("ai_report", ""), "daily_schedule": new_content.get("daily_schedule", []) }) plan.content = json.dumps(merged, ensure_ascii=False) plan.updated_by = session.get('user_id') plan.updated_at = datetime.now() db.session.commit() return jsonify({"message": "保存成功"}) @main_bp.route("/api/generate-plan/preview", methods=["POST"]) @login_required_json def generate_plan_preview(): """预览AI提示词 - 返回完整提示词内容""" data = request.get_json() student_id = data.get("student_id") template_id = data.get("template_id") # AI提示词模板ID student = Student.query.get_or_404(student_id) problems = student.problems.all() if not problems: return jsonify({"error": "请先记录学员的问题"}), 400 # 获取学员的目标 goals = StudentGoal.query.filter_by(student_id=student_id).all() goal_data = [g.to_dict() for g in goals] # 学员的统一练习时间 practice_time = student.practice_time or "30-60分钟" problem_data = [] for p in problems: problem_obj = p.problem if problem_obj: problem_data.append( { "problem_id": problem_obj.id, "problem_name": problem_obj.name, "problem_no": problem_obj.no, "severity": p.severity, "level": p.level, "content": problem_obj.content or "", } ) time_mapping = { "15分钟": {"total": 15, "basic": 10, "tech": 2, "piece": 3}, "30分钟": {"total": 30, "basic": 15, "tech": 5, "piece": 10}, "45分钟": {"total": 45, "basic": 15, "tech": 10, "piece": 20}, "60分钟": {"total": 60, "basic": 20, "tech": 15, "piece": 25}, "90分钟": {"total": 90, "basic": 25, "tech": 25, "piece": 40}, "120分钟": {"total": 120, "basic": 30, "tech": 35, "piece": 55}, "150分钟以上": {"total": 150, "basic": 35, "tech": 45, "piece": 70}, } time_config = time_mapping.get(practice_time, time_mapping["30分钟"]) # 调用生成器获取提示词(dry_run模式) from app.services.plan_generator import generate_ai_report prompt, _, _, extra_info = generate_ai_report( student_name=student.name, wechat_nickname=student.wechat_nickname or "", problems=problem_data, practice_time=practice_time, time_config=time_config, template_id=template_id, dry_run=True, goals=goal_data ) return jsonify({ "prompt": prompt, "prompt_length": len(prompt) if prompt else 0, "student_problems_length": extra_info.get("student_problems_length", 0) if extra_info else 0, "problems_length": extra_info.get("problems_length", 0) if extra_info else 0, "student_goals_length": extra_info.get("student_goals_length", 0) if extra_info else 0, })