feat: v1.4.0 - 典型方案采纳、推荐方案列表、审计字段、导航优化

- 添加典型方案采纳功能 (POST /api/plans/<id>/adopt)
- 添加推荐方案列表 (GET /api/students/<id>/recommended-plans)
- PracticePlan 新增 created_by/updated_by/updated_at 审计字段
- 方案编辑/详情页导航优化 (bfcache 处理、pageshow 事件)
- 方案列表支持删除功能
- 学员列表'暂无方案/问题'样式统一
- 更新文档:问题文件已废弃(迁移到数据库)
- 更新部署脚本和验证清单
This commit is contained in:
hmo
2026-04-27 02:01:22 +08:00
parent 6abdd49c04
commit e50a9207b4
20 changed files with 873 additions and 88 deletions
+198 -15
View File
@@ -13,7 +13,7 @@ from flask import (
session,
)
from app.routes import main_bp
from app.models import db, Student, PracticePlan, StudentProblem, StudentGoal
from app.models import db, Student, PracticePlan, StudentProblem, StudentGoal, Class
from app.services.plan_generator import generate_practice_plan, generate_ai_report
from app.services.pdf_generator import generate_pdf
from app.routes.auth import login_required_json, admin_required
@@ -48,13 +48,30 @@ def get_all_plans():
"""
import json as json_module
from app.models import Class
from sqlalchemy.orm import joinedload
from sqlalchemy import exists
query = PracticePlan.query
# 按班级筛选
# 检查是否需要 join Student
needs_student_join = False
needs_class_join = False
class_id = request.args.get('class_id', type=int)
student_name = request.args.get('student_name')
problem_ids = request.args.get('problem_ids')
mine = request.args.get('mine')
if class_id or student_name or problem_ids or (mine and mine.lower() == 'true'):
needs_student_join = True
query = query.join(Student)
if mine and mine.lower() == 'true':
needs_class_join = True
query = query.join(Class)
# 按班级筛选
if class_id:
query = query.join(Student).filter(Student.class_id == class_id)
query = query.filter(Student.class_id == class_id)
# 按模板筛选
template_id = request.args.get('template_id', type=int)
@@ -67,19 +84,14 @@ def get_all_plans():
query = query.filter(PracticePlan.is_typical == True)
# 按学员姓名模糊筛选
student_name = request.args.get('student_name')
if student_name:
query = query.join(Student).filter(Student.name.like(f'%%{student_name}%%'))
query = query.filter(Student.name.like(f'%%{student_name}%%'))
# 按问题筛选(通过 problem_id 关联到学员的问题)
problem_ids = request.args.get('problem_ids')
if problem_ids:
problem_id_list = [int(pid.strip()) for pid in problem_ids.split(',') if pid.strip()]
if problem_id_list:
# 筛选:方案对应的学员有指定问题之一的
# 使用子查询避免笛卡尔积导致的重复
from sqlalchemy import exists
query = query.join(Student).filter(
query = query.filter(
exists().where(
(StudentProblem.student_id == Student.id) &
(StudentProblem.problem_id.in_(problem_id_list))
@@ -87,11 +99,10 @@ def get_all_plans():
)
# 我的学员筛选(所在班级的老师是当前用户)
mine = request.args.get('mine')
if mine and mine.lower() == 'true':
if needs_class_join:
user_id = session.get('user_id')
if user_id:
query = query.join(Student).join(Class).filter(Class.teacher_id == user_id)
query = query.filter(Class.teacher_id == user_id)
plans = query.order_by(PracticePlan.created_at.desc()).all()
return jsonify([p.to_dict() for p in plans])
@@ -107,6 +118,155 @@ def toggle_plan_typical(plan_id):
return jsonify({"success": True, "is_typical": plan.is_typical})
@main_bp.route("/api/students/<int:student_id>/recommended-plans", methods=["GET"])
@login_required_json
def get_recommended_plans(student_id):
"""获取推荐方案 - 当前学员问题与典型方案问题有交集的方案
查询参数:
- mine: true/false(我的学员的典型方案)
返回字段:
- can_adopt: 问题集合是否完全一致,可采纳
"""
try:
student = Student.query.get_or_404(student_id)
# 获取当前学员的问题名称集合
student_problems = StudentProblem.query.filter_by(student_id=student_id).all()
# 通过 Problem 关联获取问题名称
student_problem_names = set()
for sp in student_problems:
if sp.problem:
student_problem_names.add(sp.problem.name)
# 获取所有典型方案,排除当前学员自己的方案
query = PracticePlan.query.filter(
PracticePlan.is_typical == True,
PracticePlan.student_id != student_id
)
# 我的筛选:只显示当前用户创建的学员的典型方案
mine = request.args.get('mine')
if mine and mine.lower() == 'true':
user_id = session.get('user_id')
if user_id:
query = query.join(Student).join(Class).filter(Class.teacher_id == user_id)
typical_plans = query.order_by(PracticePlan.created_at.desc()).all()
# 筛选:方案的问题与当前学员的问题有交集
import json as json_module
recommended = []
for plan in typical_plans:
try:
content = json_module.loads(plan.content) if plan.content else {}
except:
continue
plan_problems = content.get('problems', [])
# 提取方案中的问题名称
plan_problem_names = set()
for p in plan_problems:
name = p.get('name') or p.get('problem_name', '')
if name:
plan_problem_names.add(name)
# 检查交集
if student_problem_names & plan_problem_names:
# 计算交集问题
matched_problems = student_problem_names & plan_problem_names
plan_dict = plan.to_dict()
plan_dict['matched_problems'] = list(matched_problems)
plan_dict['matched_count'] = len(matched_problems)
# 检查问题集合是否完全一致(可采纳)
plan_dict['can_adopt'] = (student_problem_names == plan_problem_names)
recommended.append(plan_dict)
# 按匹配数量降序排序
recommended.sort(key=lambda x: x['matched_count'], reverse=True)
return jsonify(recommended)
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"error": str(e)}), 500
@main_bp.route("/api/students/<int:student_id>/plans/from-typical/<int:plan_id>", methods=["POST"])
@login_required_json
def adopt_typical_plan(student_id, plan_id):
"""采纳典型方案 - 复制该方案给当前学员
前端已判断 can_adopt,后端直接采纳并记录来源
"""
student = Student.query.get_or_404(student_id)
typical_plan = PracticePlan.query.get_or_404(plan_id)
if not typical_plan.is_typical:
return jsonify({"error": "只能采纳典型方案"}), 400
# 获取典型方案的问题名称集合并验证一致性
try:
content = json.loads(typical_plan.content) if typical_plan.content else {}
except:
content = {}
plan_problems = content.get('problems', [])
plan_problem_names = set()
for p in plan_problems:
name = p.get('name') or p.get('problem_name', '')
if name:
plan_problem_names.add(name)
# 获取当前学员的问题名称集合并验证一致性
student_problems = StudentProblem.query.filter_by(student_id=student_id).all()
student_problem_names = set()
for sp in student_problems:
if sp.problem:
student_problem_names.add(sp.problem.name)
# 检查问题名称集合是否完全一致
if student_problem_names != plan_problem_names:
return jsonify({
"error": "采纳失败:方案的问题与当前学员的问题不一致",
"student_problems": list(student_problem_names),
"plan_problems": list(plan_problem_names)
}), 400
# 替换内容中的原学员姓名为当前学员姓名
old_name = typical_plan.student.name if typical_plan.student else ""
if old_name and old_name in str(content):
content_str = json.dumps(content, ensure_ascii=False)
content_str = content_str.replace(old_name, student.name)
content = json.loads(content_str)
# 添加采纳来源信息
from datetime import datetime
content['adopted_from'] = {
'student_name': old_name,
'plan_id': typical_plan.id,
'adopted_at': datetime.now().strftime('%Y-%m-%d')
}
# 创建新方案
new_plan = PracticePlan(
student_id=student_id,
template_id=typical_plan.template_id,
content=json.dumps(content, ensure_ascii=False),
is_typical=False, # 采纳的方案不再是典型
created_by=session.get('user_id')
)
db.session.add(new_plan)
db.session.commit()
return jsonify({
"message": "方案已采纳",
"plan_id": new_plan.id,
"plan": new_plan.to_dict()
}), 201
@main_bp.route("/plans")
@login_required_json
def plans_page():
@@ -364,6 +524,7 @@ def generate_plan():
student_id=student_id,
template_id=template_id,
content=json.dumps(plan_content, ensure_ascii=False),
created_by=session.get('user_id')
)
db.session.add(plan)
db.session.commit()
@@ -433,6 +594,8 @@ def get_plan(plan_id):
"student_id": plan.student_id,
"student_name": plan.student.name if plan.student else "",
"created_at": plan.created_at.strftime("%Y-%m-%d %H:%M"),
"updated_at": plan.updated_at.strftime("%Y-%m-%d %H:%M") if plan.updated_at else None,
"updated_by_name": plan.updater.name if plan.updated_by and plan.updater else None,
"is_typical": plan.is_typical,
"content": content,
}
@@ -613,12 +776,32 @@ def delete_plan(plan_id):
@login_required_json
def update_plan_content(plan_id):
"""更新方案内容(用于编辑)"""
from datetime import datetime
plan = PracticePlan.query.get_or_404(plan_id)
data = request.get_json()
# 更新content字段
# 合并content字段 - 保留原有字段,只更新ai_report和daily_schedule
if "content" in data:
plan.content = data["content"]
new_content_str = data["content"]
existing_content = json.loads(plan.content) if plan.content else {}
# 解析新content(可能是字符串或对象)
if isinstance(new_content_str, str):
new_content = json.loads(new_content_str)
else:
new_content = new_content_str
# 合并:保留existing中的所有字段,用new_content覆盖ai_report和daily_schedule
merged = existing_content.copy()
merged.update({
"ai_report": new_content.get("ai_report", ""),
"daily_schedule": new_content.get("daily_schedule", [])
})
plan.content = json.dumps(merged, ensure_ascii=False)
plan.updated_by = session.get('user_id')
plan.updated_at = datetime.now()
db.session.commit()
return jsonify({"message": "保存成功"})