# 学员管理路由 import csv import io from flask import ( request, jsonify, render_template, session, redirect, url_for, Response, ) from app.routes import main_bp from app.models import ( db, Student, Class, PracticePlan, PROBLEM_LIST, SEVERITY_LEVELS, PRACTICE_TIME_OPTIONS, User, ) from app.routes.auth import login_required_json, check_login @main_bp.route("/") def index(): """首页""" if not check_login(): return redirect("/login") student_count = Student.query.count() class_count = Class.query.count() plan_count = PracticePlan.query.count() return render_template( "home.html", student_count=student_count, class_count=class_count, plan_count=plan_count, active_nav="home", ) @main_bp.route("/student/") @login_required_json def student_detail_page(student_id): """学员详情页""" student = Student.query.get_or_404(student_id) return render_template("student.html", student=student, active_nav="students") @main_bp.route("/students") def students_page(): """学员列表页""" if not check_login(): return redirect("/login") return render_template( "index.html", problem_list=PROBLEM_LIST, severity_levels=SEVERITY_LEVELS, practice_time_options=PRACTICE_TIME_OPTIONS, active_nav="students", ) @main_bp.route("/api/students", methods=["GET"]) @login_required_json def get_students(): """获取学员列表""" # 支持筛选参数: class_id, name(模糊搜索) class_id = request.args.get("class_id") name = request.args.get("name") query = Student.query if class_id: query = query.filter(Student.class_id == int(class_id)) if name: query = query.filter(Student.name.contains(name)) students = query.order_by(Student.created_at.desc()).all() return jsonify([s.to_dict() for s in students]) @main_bp.route("/api/students", methods=["POST"]) @login_required_json def create_student(): """创建新学员""" data = request.get_json() student = Student( name=data.get("name"), phone=data.get("phone", ""), wechat_nickname=data.get("wechat_nickname", ""), practice_time=data.get("practice_time", "30分钟"), notes=data.get("notes", ""), class_id=data.get("class_id"), ) db.session.add(student) db.session.commit() return jsonify(student.to_dict()) @main_bp.route("/api/students/", methods=["GET"]) @login_required_json def get_student(student_id): """获取学员详情""" student = Student.query.get_or_404(student_id) problems = student.problems.all() plans = student.plans.order_by(db.desc("created_at")).all() # 获取班级名称 class_name = None if student.class_id: cls = Class.query.get(student.class_id) if cls: class_name = cls.name student_dict = student.to_dict() student_dict["class_name"] = class_name return jsonify( { "student": student_dict, "problems": [p.to_dict() for p in problems], "plans": [p.to_dict() for p in plans], } ) @main_bp.route("/api/students/", methods=["PUT"]) @login_required_json def update_student(student_id): """更新学员信息""" student = Student.query.get_or_404(student_id) data = request.get_json() if "name" in data: student.name = data["name"] if "phone" in data: student.phone = data["phone"] if "wechat_nickname" in data: student.wechat_nickname = data["wechat_nickname"] if "practice_time" in data: student.practice_time = data["practice_time"] if "notes" in data: student.notes = data["notes"] if "class_id" in data: student.class_id = data["class_id"] db.session.commit() return jsonify(student.to_dict()) @main_bp.route("/api/students/", methods=["DELETE"]) @login_required_json def delete_student(student_id): """删除学员""" student = Student.query.get_or_404(student_id) db.session.delete(student) db.session.commit() return jsonify({"message": "删除成功"}) @main_bp.route("/api/students/template") @login_required_json def download_student_template(): """下载CSV导入模板""" headers = ["ID", "姓名", "电话", "微信昵称", "每日练习时间", "班级", "备注"] # 练习时间选项 practice_times = PRACTICE_TIME_OPTIONS # 示例数据(ID留空,表示新增) example = ["", "张三", "13800138000", "昵称", "30分钟", "班级A", "备注信息"] output = io.StringIO() writer = csv.writer(output) writer.writerow(headers) writer.writerow(example) # 添加22行空行,让可选值出现在第25-26行 for _ in range(22): writer.writerow([]) # 获取进行中的班级列表 active_classes = Class.query.filter_by(active=True).all() class_names = [c.name for c in active_classes] if active_classes else [] # 添加可选练习时间和可选班级说明(第25-26行) writer.writerow(["可选练习时间:"] + list(practice_times)) if class_names: writer.writerow(["可选班级:"] + class_names) else: writer.writerow(["可选班级:无进行中班级"]) # 使用BytesIO处理编码 csv_content = output.getvalue().encode("utf-8-sig") return Response( csv_content, mimetype="text/csv; charset=utf-8-sig", headers={"Content-Disposition": "attachment; filename=student_template.csv"}, ) @main_bp.route("/api/students/export") @login_required_json def export_students(): """导出学员CSV""" # 获取所有学员 students = Student.query.order_by(Student.created_at.desc()).all() # 班级ID到名称映射 class_map = {c.id: c.name for c in Class.query.all()} headers = ["ID", "姓名", "电话", "微信昵称", "每日练习时间", "班级", "备注"] output = io.StringIO() writer = csv.writer(output) writer.writerow(headers) for s in students: class_name = class_map.get(s.class_id, "") if s.class_id else "" writer.writerow( [ s.id, s.name, s.phone or "", s.wechat_nickname or "", s.practice_time, class_name, s.notes or "", ] ) csv_content = output.getvalue().encode("utf-8-sig") return Response( csv_content, mimetype="text/csv; charset=utf-8-sig", headers={"Content-Disposition": "attachment; filename=students_export.csv"}, ) @main_bp.route("/api/students/import", methods=["POST"]) @login_required_json def import_students(): """导入学员CSV""" if "file" not in request.files: return jsonify({"error": "请选择文件"}), 400 file = request.files["file"] if file.filename == "": return jsonify({"error": "请选择文件"}), 400 if not file.filename.endswith(".csv"): return jsonify({"error": "请上传CSV文件"}), 400 try: raw_content = file.read() # 尝试多种编码 try: content = raw_content.decode("utf-8-sig") except UnicodeDecodeError: try: content = raw_content.decode("gbk") except UnicodeDecodeError: content = raw_content.decode("utf-8", errors="ignore") reader = csv.DictReader(io.StringIO(content)) success_count = 0 error_count = 0 errors = [] # 获取班级名称到ID的映射 class_map = {c.name: c.id for c in Class.query.all()} # 练习时间列表 practice_time_list = PRACTICE_TIME_OPTIONS for row_num, row in enumerate(reader, start=2): # 检查是否为空白行(所有字段都为空) is_empty = all(not (v and v.strip()) for v in row.values()) if is_empty: # 遇到空行后停止导入(空行后是可选值说明) break # 跳过说明行 if row.get("姓名", "").strip() in ["可选练习时间:", "可选班级:"]: continue try: name = row.get("姓名", "").strip() phone = row.get("电话", "").strip() wechat_nickname = row.get("微信昵称", "").strip() practice_time = row.get("每日练习时间", "30分钟").strip() class_name = row.get("班级", "").strip() notes = row.get("备注", "").strip() if not name: continue # 验证并转换练习时间 if practice_time not in practice_time_list: practice_time = "30分钟" # 查找班级ID class_id = None if class_name: class_id = class_map.get(class_name) # 优先用ID查找,否则用姓名+电话 existing = None if row.get("ID", "").strip(): try: student_id = int(row.get("ID", "").strip()) existing = Student.query.get(student_id) except (ValueError, TypeError): pass # 如果没找到,尝试用姓名+电话 if not existing: existing = Student.query.filter_by( name=name, phone=phone if phone else None ).first() if existing: # 更新已存在的学员(所有字段都允许更新) existing.name = name existing.phone = phone existing.wechat_nickname = wechat_nickname existing.practice_time = practice_time existing.class_id = class_id existing.notes = notes else: # 创建新学员 student = Student( name=name, phone=phone, wechat_nickname=wechat_nickname, practice_time=practice_time, notes=notes, class_id=class_id, ) db.session.add(student) success_count += 1 except Exception as e: error_count += 1 errors.append(f"第{row_num}行: {str(e)}") db.session.commit() result = {"message": f"导入成功: {success_count}条"} if error_count > 0: result["error_details"] = errors[:10] # 只返回前10条错误 return jsonify(result) except Exception as e: return jsonify({"error": f"导入失败: {str(e)}"}), 500