# 学员管理路由 import csv import io from flask import ( request, jsonify, render_template, session, redirect, url_for, Response, ) from app.routes import main_bp from app.models import ( db, Student, Class, StudentProblem, PracticePlan, PROBLEM_LIST, SEVERITY_LEVELS, PRACTICE_TIME_OPTIONS, User, ) from app.routes.auth import login_required_json, check_login @main_bp.route("/") def index(): """首页""" if not check_login(): return redirect("/login") student_count = Student.query.count() class_count = Class.query.count() plan_count = PracticePlan.query.count() return render_template( "home.html", student_count=student_count, class_count=class_count, plan_count=plan_count, active_nav="home", ) @main_bp.route("/student/") @login_required_json def student_detail_page(student_id): """学员详情页""" student = Student.query.get_or_404(student_id) return render_template("student.html", student=student, active_nav="students") @main_bp.route("/students") def students_page(): """学员列表页""" if not check_login(): return redirect("/login") return render_template( "index.html", problem_list=PROBLEM_LIST, severity_levels=SEVERITY_LEVELS, practice_time_options=PRACTICE_TIME_OPTIONS, active_nav="students", ) @main_bp.route("/statistics") def statistics_page(): """数据统计页""" if not check_login(): return redirect("/login") return render_template("statistics.html", active_nav="statistics") @main_bp.route("/api/students", methods=["GET"]) @login_required_json def get_students(): """获取学员列表""" # 支持筛选参数: class_id, name(模糊搜索), mine=true/false class_id = request.args.get("class_id") name = request.args.get("name") mine_filter = request.args.get("mine") query = Student.query if class_id: query = query.filter(Student.class_id == int(class_id)) if name: query = query.filter(Student.name.contains(name)) # 我的学员筛选(所在班级的老师是当前用户) if mine_filter and mine_filter.lower() == "true": user_id = session.get("user_id") if user_id: query = query.join(Class, Student.class_id == Class.id).filter(Class.teacher_id == user_id) students = query.order_by(Student.created_at.desc()).all() return jsonify([s.to_dict() for s in students]) @main_bp.route("/api/students", methods=["POST"]) @login_required_json def create_student(): """创建新学员""" data = request.get_json() student = Student( name=data.get("name"), phone=data.get("phone", ""), wechat_nickname=data.get("wechat_nickname", ""), practice_time=data.get("practice_time", "30分钟"), notes=data.get("notes", ""), class_id=data.get("class_id"), ) db.session.add(student) db.session.commit() return jsonify(student.to_dict()) @main_bp.route("/api/students/", methods=["GET"]) @login_required_json def get_student(student_id): """获取学员详情""" student = Student.query.get_or_404(student_id) problems = student.problems.all() plans = student.plans.order_by(db.desc("created_at")).all() # 获取班级名称 class_name = None if student.class_id: cls = Class.query.get(student.class_id) if cls: class_name = cls.name student_dict = student.to_dict() student_dict["class_name"] = class_name return jsonify( { "student": student_dict, "problems": [p.to_dict() for p in problems], "plans": [p.to_dict() for p in plans], } ) @main_bp.route("/api/students/", methods=["PUT"]) @login_required_json def update_student(student_id): """更新学员信息""" student = Student.query.get_or_404(student_id) data = request.get_json() if "name" in data: student.name = data["name"] if "phone" in data: student.phone = data["phone"] if "wechat_nickname" in data: student.wechat_nickname = data["wechat_nickname"] if "practice_time" in data: student.practice_time = data["practice_time"] if "notes" in data: student.notes = data["notes"] if "class_id" in data: student.class_id = data["class_id"] db.session.commit() return jsonify(student.to_dict()) @main_bp.route("/api/students/", methods=["DELETE"]) @login_required_json def delete_student(student_id): """删除学员""" student = Student.query.get_or_404(student_id) db.session.delete(student) db.session.commit() return jsonify({"message": "删除成功"}) @main_bp.route("/api/students/template") @login_required_json def download_student_template(): """下载CSV导入模板""" headers = ["ID", "姓名", "电话", "微信昵称", "每日练习时间", "班级", "备注"] # 练习时间选项 practice_times = PRACTICE_TIME_OPTIONS # 示例数据(ID留空,表示新增) example = ["", "张三", "13800138000", "昵称", "30分钟", "班级A", "备注信息"] output = io.StringIO() writer = csv.writer(output) writer.writerow(headers) writer.writerow(example) # 添加22行空行,让可选值出现在第25-26行 for _ in range(22): writer.writerow([]) # 获取进行中的班级列表 active_classes = Class.query.filter_by(active=True).all() class_names = [c.name for c in active_classes] if active_classes else [] # 添加可选练习时间和可选班级说明(第25-26行) writer.writerow(["可选练习时间:"] + list(practice_times)) if class_names: writer.writerow(["可选班级:"] + class_names) else: writer.writerow(["可选班级:无进行中班级"]) # 使用BytesIO处理编码 csv_content = output.getvalue().encode("utf-8-sig") return Response( csv_content, mimetype="text/csv; charset=utf-8-sig", headers={"Content-Disposition": "attachment; filename=student_template.csv"}, ) @main_bp.route("/api/students/export") @login_required_json def export_students(): """导出学员CSV""" # 获取所有学员 students = Student.query.order_by(Student.created_at.desc()).all() # 班级ID到名称映射 class_map = {c.id: c.name for c in Class.query.all()} headers = ["ID", "姓名", "电话", "微信昵称", "每日练习时间", "班级", "备注"] output = io.StringIO() writer = csv.writer(output) writer.writerow(headers) for s in students: class_name = class_map.get(s.class_id, "") if s.class_id else "" writer.writerow( [ s.id, s.name, s.phone or "", s.wechat_nickname or "", s.practice_time, class_name, s.notes or "", ] ) csv_content = output.getvalue().encode("utf-8-sig") return Response( csv_content, mimetype="text/csv; charset=utf-8-sig", headers={"Content-Disposition": "attachment; filename=students_export.csv"}, ) @main_bp.route("/api/students/import", methods=["POST"]) @login_required_json def import_students(): """导入学员CSV""" if "file" not in request.files: return jsonify({"error": "请选择文件"}), 400 file = request.files["file"] if file.filename == "": return jsonify({"error": "请选择文件"}), 400 if not file.filename.endswith(".csv"): return jsonify({"error": "请上传CSV文件"}), 400 try: raw_content = file.read() # 尝试多种编码 try: content = raw_content.decode("utf-8-sig") except UnicodeDecodeError: try: content = raw_content.decode("gbk") except UnicodeDecodeError: content = raw_content.decode("utf-8", errors="ignore") reader = csv.DictReader(io.StringIO(content)) success_count = 0 error_count = 0 errors = [] # 获取班级名称到ID的映射 class_map = {c.name: c.id for c in Class.query.all()} # 练习时间列表 practice_time_list = PRACTICE_TIME_OPTIONS for row_num, row in enumerate(reader, start=2): # 检查是否为空白行(所有字段都为空) is_empty = all(not (v and v.strip()) for v in row.values()) if is_empty: # 遇到空行后停止导入(空行后是可选值说明) break # 跳过说明行 if row.get("姓名", "").strip() in ["可选练习时间:", "可选班级:"]: continue try: name = row.get("姓名", "").strip() phone = row.get("电话", "").strip() wechat_nickname = row.get("微信昵称", "").strip() practice_time = row.get("每日练习时间", "30分钟").strip() class_name = row.get("班级", "").strip() notes = row.get("备注", "").strip() if not name: continue # 验证并转换练习时间 if practice_time not in practice_time_list: practice_time = "30分钟" # 查找班级ID class_id = None if class_name: class_id = class_map.get(class_name) # 优先用ID查找,否则用姓名+电话 existing = None if row.get("ID", "").strip(): try: student_id = int(row.get("ID", "").strip()) existing = Student.query.get(student_id) except (ValueError, TypeError): pass # 如果没找到,尝试用姓名+电话 if not existing: existing = Student.query.filter_by( name=name, phone=phone if phone else None ).first() if existing: # 更新已存在的学员(所有字段都允许更新) existing.name = name existing.phone = phone existing.wechat_nickname = wechat_nickname existing.practice_time = practice_time existing.class_id = class_id existing.notes = notes else: # 创建新学员 student = Student( name=name, phone=phone, wechat_nickname=wechat_nickname, practice_time=practice_time, notes=notes, class_id=class_id, ) db.session.add(student) success_count += 1 except Exception as e: error_count += 1 errors.append(f"第{row_num}行: {str(e)}") db.session.commit() result = {"message": f"导入成功: {success_count}条"} if error_count > 0: result["error_details"] = errors[:10] # 只返回前10条错误 return jsonify(result) except Exception as e: return jsonify({"error": f"导入失败: {str(e)}"}), 500 @main_bp.route("/api/statistics/students") @login_required_json def get_student_statistics(): """获取学员统计数据""" # 获取筛选参数 mine_filter = request.args.get("mine", "false") == "true" class_id_filter = request.args.get("class_id", type=int) current_user_id = session.get("user_id") # 基础查询:班级 class_query = Class.query.filter_by(active=True) if mine_filter and current_user_id: class_query = class_query.filter_by(teacher_id=current_user_id) if class_id_filter: class_query = class_query.filter_by(id=class_id_filter) classes = class_query.all() # 获取班级IDs class_ids = [c.id for c in classes] # 学员过滤 student_query = Student.query if class_ids: student_query = student_query.filter(Student.class_id.in_(class_ids)) else: student_query = student_query.filter(Student.id == -1) # 无班级时返回空 students = student_query.all() student_ids = [s.id for s in students] # 全体学员数量 total_students = len(students) # 问题记录过滤 problem_query = StudentProblem.query.filter(StudentProblem.student_id.in_(student_ids)) if student_ids else StudentProblem.query.filter(StudentProblem.id == -1) problems = problem_query.all() # 问题级别分布(来自 StudentProblem.level) levels = ["启蒙", "入门", "进阶", "熟练", "精通"] level_dist = {lv: 0 for lv in levels} for p in problems: if p.level in level_dist: level_dist[p.level] += 1 # 问题严重程度分布 severity_dist = {"轻微": 0, "中等": 0, "严重": 0} for p in problems: if p.severity in severity_dist: severity_dist[p.severity] += 1 # 各班级学员数量 class_student_count = [] for c in classes: class_student_count.append({ "class_id": c.id, "class_name": c.name, "level": c.level, "student_count": len(c.students) }) # 问题×级别矩阵(严重程度 × 学员级别) severities = ["轻微", "中等", "严重"] matrix = {} for sev in severities: matrix[sev] = {} for lv in levels: matrix[sev][lv] = 0 for p in problems: if p.level in levels and p.severity in severities: matrix[p.severity][p.level] += 1 # 各班级问题数量 class_problem_count = [] for c in classes: class_problems = [p for p in problems if p.student.class_id == c.id] class_problem_count.append({ "class_id": c.id, "class_name": c.name, "problem_count": len(class_problems) }) # 各问题名称分布 problem_name_dist = {} for p in problems: name = p.problem.name if p.problem else f"问题{p.problem_id}" if name not in problem_name_dist: problem_name_dist[name] = 0 problem_name_dist[name] += 1 # 排序:数量多的在前 problem_name_dist = dict(sorted(problem_name_dist.items(), key=lambda x: x[1], reverse=True)) # 问题×班级矩阵 problem_class_matrix = [] for p_name in problem_name_dist.keys(): row = {"problem_name": p_name} for c in classes: count = sum(1 for p in problems if (p.problem.name if p.problem else f"问题{p.problem_id}") == p_name and p.student.class_id == c.id) row[f"class_{c.id}"] = count problem_class_matrix.append(row) return jsonify({ "total_students": total_students, "total_problems": len(problems), "level_distribution": level_dist, "severity_distribution": severity_dist, "class_student_count": class_student_count, "class_problem_count": class_problem_count, "problem_level_matrix": matrix, "problem_name_distribution": problem_name_dist, "problem_class_matrix": problem_class_matrix, "classes": [{"id": c.id, "name": c.name} for c in classes], })