Files
piano-plan/app/routes/students.py
T
hmo 18351212e8 feat: 问题数据迁移到数据库;学员详情页URL导航改造;侧边栏统一
- 问题从文件系统迁移到数据库 problems 表
- 移除 PROBLEMS_DIR 配置和文件读取逻辑
- student.html 完整重写:编辑/添加/删除问题,生成方案进度显示
- 学员详情页支持独立URL访问 (/student/<id>)
- 统一侧边栏到 base.html
- 更新文档:DEPLOYMENT_SOP, MODELS, STRUCTURE, FRONTEND_ARCH
- 部署到生产环境 v1.2.0
2026-04-23 06:35:32 +08:00

362 lines
11 KiB
Python

# 学员管理路由
import csv
import io
from flask import (
request,
jsonify,
render_template,
session,
redirect,
url_for,
Response,
)
from app.routes import main_bp
from app.models import (
db,
Student,
Class,
PracticePlan,
PROBLEM_LIST,
SEVERITY_LEVELS,
PRACTICE_TIME_OPTIONS,
User,
)
from app.routes.auth import login_required_json, check_login
@main_bp.route("/")
def index():
"""首页"""
if not check_login():
return redirect("/login")
student_count = Student.query.count()
class_count = Class.query.count()
plan_count = PracticePlan.query.count()
return render_template(
"home.html",
student_count=student_count,
class_count=class_count,
plan_count=plan_count,
active_nav="home",
)
@main_bp.route("/student/<int:student_id>")
@login_required_json
def student_detail_page(student_id):
"""学员详情页"""
student = Student.query.get_or_404(student_id)
return render_template("student.html", student=student, active_nav="students")
@main_bp.route("/students")
def students_page():
"""学员列表页"""
if not check_login():
return redirect("/login")
return render_template(
"index.html",
problem_list=PROBLEM_LIST,
severity_levels=SEVERITY_LEVELS,
practice_time_options=PRACTICE_TIME_OPTIONS,
active_nav="students",
)
@main_bp.route("/api/students", methods=["GET"])
@login_required_json
def get_students():
"""获取学员列表"""
# 支持筛选参数: class_id, name(模糊搜索)
class_id = request.args.get("class_id")
name = request.args.get("name")
query = Student.query
if class_id:
query = query.filter(Student.class_id == int(class_id))
if name:
query = query.filter(Student.name.contains(name))
students = query.order_by(Student.created_at.desc()).all()
return jsonify([s.to_dict() for s in students])
@main_bp.route("/api/students", methods=["POST"])
@login_required_json
def create_student():
"""创建新学员"""
data = request.get_json()
student = Student(
name=data.get("name"),
phone=data.get("phone", ""),
wechat_nickname=data.get("wechat_nickname", ""),
practice_time=data.get("practice_time", "30分钟"),
notes=data.get("notes", ""),
class_id=data.get("class_id"),
)
db.session.add(student)
db.session.commit()
return jsonify(student.to_dict())
@main_bp.route("/api/students/<int:student_id>", methods=["GET"])
@login_required_json
def get_student(student_id):
"""获取学员详情"""
student = Student.query.get_or_404(student_id)
problems = student.problems.all()
plans = student.plans.order_by(db.desc("created_at")).all()
# 获取班级名称
class_name = None
if student.class_id:
cls = Class.query.get(student.class_id)
if cls:
class_name = cls.name
student_dict = student.to_dict()
student_dict["class_name"] = class_name
return jsonify(
{
"student": student_dict,
"problems": [p.to_dict() for p in problems],
"plans": [p.to_dict() for p in plans],
}
)
@main_bp.route("/api/students/<int:student_id>", methods=["PUT"])
@login_required_json
def update_student(student_id):
"""更新学员信息"""
student = Student.query.get_or_404(student_id)
data = request.get_json()
if "name" in data:
student.name = data["name"]
if "phone" in data:
student.phone = data["phone"]
if "wechat_nickname" in data:
student.wechat_nickname = data["wechat_nickname"]
if "practice_time" in data:
student.practice_time = data["practice_time"]
if "notes" in data:
student.notes = data["notes"]
if "class_id" in data:
student.class_id = data["class_id"]
db.session.commit()
return jsonify(student.to_dict())
@main_bp.route("/api/students/<int:student_id>", methods=["DELETE"])
@login_required_json
def delete_student(student_id):
"""删除学员"""
student = Student.query.get_or_404(student_id)
db.session.delete(student)
db.session.commit()
return jsonify({"message": "删除成功"})
@main_bp.route("/api/students/template")
@login_required_json
def download_student_template():
"""下载CSV导入模板"""
headers = ["ID", "姓名", "电话", "微信昵称", "每日练习时间", "班级", "备注"]
# 练习时间选项
practice_times = PRACTICE_TIME_OPTIONS
# 示例数据(ID留空,表示新增)
example = ["", "张三", "13800138000", "昵称", "30分钟", "班级A", "备注信息"]
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(headers)
writer.writerow(example)
# 添加22行空行,让可选值出现在第25-26行
for _ in range(22):
writer.writerow([])
# 获取进行中的班级列表
active_classes = Class.query.filter_by(active=True).all()
class_names = [c.name for c in active_classes] if active_classes else []
# 添加可选练习时间和可选班级说明(第25-26行)
writer.writerow(["可选练习时间:"] + list(practice_times))
if class_names:
writer.writerow(["可选班级:"] + class_names)
else:
writer.writerow(["可选班级:无进行中班级"])
# 使用BytesIO处理编码
csv_content = output.getvalue().encode("utf-8-sig")
return Response(
csv_content,
mimetype="text/csv; charset=utf-8-sig",
headers={"Content-Disposition": "attachment; filename=student_template.csv"},
)
@main_bp.route("/api/students/export")
@login_required_json
def export_students():
"""导出学员CSV"""
# 获取所有学员
students = Student.query.order_by(Student.created_at.desc()).all()
# 班级ID到名称映射
class_map = {c.id: c.name for c in Class.query.all()}
headers = ["ID", "姓名", "电话", "微信昵称", "每日练习时间", "班级", "备注"]
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(headers)
for s in students:
class_name = class_map.get(s.class_id, "") if s.class_id else ""
writer.writerow(
[
s.id,
s.name,
s.phone or "",
s.wechat_nickname or "",
s.practice_time,
class_name,
s.notes or "",
]
)
csv_content = output.getvalue().encode("utf-8-sig")
return Response(
csv_content,
mimetype="text/csv; charset=utf-8-sig",
headers={"Content-Disposition": "attachment; filename=students_export.csv"},
)
@main_bp.route("/api/students/import", methods=["POST"])
@login_required_json
def import_students():
"""导入学员CSV"""
if "file" not in request.files:
return jsonify({"error": "请选择文件"}), 400
file = request.files["file"]
if file.filename == "":
return jsonify({"error": "请选择文件"}), 400
if not file.filename.endswith(".csv"):
return jsonify({"error": "请上传CSV文件"}), 400
try:
raw_content = file.read()
# 尝试多种编码
try:
content = raw_content.decode("utf-8-sig")
except UnicodeDecodeError:
try:
content = raw_content.decode("gbk")
except UnicodeDecodeError:
content = raw_content.decode("utf-8", errors="ignore")
reader = csv.DictReader(io.StringIO(content))
success_count = 0
error_count = 0
errors = []
# 获取班级名称到ID的映射
class_map = {c.name: c.id for c in Class.query.all()}
# 练习时间列表
practice_time_list = PRACTICE_TIME_OPTIONS
for row_num, row in enumerate(reader, start=2):
# 检查是否为空白行(所有字段都为空)
is_empty = all(not (v and v.strip()) for v in row.values())
if is_empty:
# 遇到空行后停止导入(空行后是可选值说明)
break
# 跳过说明行
if row.get("姓名", "").strip() in ["可选练习时间:", "可选班级:"]:
continue
try:
name = row.get("姓名", "").strip()
phone = row.get("电话", "").strip()
wechat_nickname = row.get("微信昵称", "").strip()
practice_time = row.get("每日练习时间", "30分钟").strip()
class_name = row.get("班级", "").strip()
notes = row.get("备注", "").strip()
if not name:
continue
# 验证并转换练习时间
if practice_time not in practice_time_list:
practice_time = "30分钟"
# 查找班级ID
class_id = None
if class_name:
class_id = class_map.get(class_name)
# 优先用ID查找,否则用姓名+电话
existing = None
if row.get("ID", "").strip():
try:
student_id = int(row.get("ID", "").strip())
existing = Student.query.get(student_id)
except (ValueError, TypeError):
pass
# 如果没找到,尝试用姓名+电话
if not existing:
existing = Student.query.filter_by(
name=name, phone=phone if phone else None
).first()
if existing:
# 更新已存在的学员(所有字段都允许更新)
existing.name = name
existing.phone = phone
existing.wechat_nickname = wechat_nickname
existing.practice_time = practice_time
existing.class_id = class_id
existing.notes = notes
else:
# 创建新学员
student = Student(
name=name,
phone=phone,
wechat_nickname=wechat_nickname,
practice_time=practice_time,
notes=notes,
class_id=class_id,
)
db.session.add(student)
success_count += 1
except Exception as e:
error_count += 1
errors.append(f"{row_num}行: {str(e)}")
db.session.commit()
result = {"message": f"导入成功: {success_count}"}
if error_count > 0:
result["error_details"] = errors[:10] # 只返回前10条错误
return jsonify(result)
except Exception as e:
return jsonify({"error": f"导入失败: {str(e)}"}), 500