Files

492 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 学员管理路由
import csv
import io
from flask import (
request,
jsonify,
render_template,
session,
redirect,
url_for,
Response,
)
from app.routes import main_bp
from app.models import (
db,
Student,
Class,
StudentProblem,
PracticePlan,
PROBLEM_LIST,
SEVERITY_LEVELS,
PRACTICE_TIME_OPTIONS,
User,
)
from app.routes.auth import login_required_json, check_login
@main_bp.route("/")
def index():
"""首页"""
if not check_login():
return redirect("/login")
student_count = Student.query.count()
class_count = Class.query.count()
plan_count = PracticePlan.query.count()
return render_template(
"home.html",
student_count=student_count,
class_count=class_count,
plan_count=plan_count,
active_nav="home",
)
@main_bp.route("/student/<int:student_id>")
@login_required_json
def student_detail_page(student_id):
"""学员详情页"""
student = Student.query.get_or_404(student_id)
return render_template("student.html", student=student, active_nav="students")
@main_bp.route("/students")
def students_page():
"""学员列表页"""
if not check_login():
return redirect("/login")
return render_template(
"index.html",
problem_list=PROBLEM_LIST,
severity_levels=SEVERITY_LEVELS,
practice_time_options=PRACTICE_TIME_OPTIONS,
active_nav="students",
)
@main_bp.route("/statistics")
def statistics_page():
"""数据统计页"""
if not check_login():
return redirect("/login")
return render_template("statistics.html", active_nav="statistics")
@main_bp.route("/api/students", methods=["GET"])
@login_required_json
def get_students():
"""获取学员列表"""
# 支持筛选参数: class_id, name(模糊搜索), mine=true/false
class_id = request.args.get("class_id")
name = request.args.get("name")
mine_filter = request.args.get("mine")
query = Student.query
if class_id:
query = query.filter(Student.class_id == int(class_id))
if name:
query = query.filter(Student.name.contains(name))
# 我的学员筛选(所在班级的老师是当前用户)
if mine_filter and mine_filter.lower() == "true":
user_id = session.get("user_id")
if user_id:
query = query.join(Class, Student.class_id == Class.id).filter(Class.teacher_id == user_id)
students = query.order_by(Student.created_at.desc()).all()
return jsonify([s.to_dict() for s in students])
@main_bp.route("/api/students", methods=["POST"])
@login_required_json
def create_student():
"""创建新学员"""
data = request.get_json()
student = Student(
name=data.get("name"),
phone=data.get("phone", ""),
wechat_nickname=data.get("wechat_nickname", ""),
practice_time=data.get("practice_time", "30分钟"),
notes=data.get("notes", ""),
class_id=data.get("class_id"),
)
db.session.add(student)
db.session.commit()
return jsonify(student.to_dict())
@main_bp.route("/api/students/<int:student_id>", methods=["GET"])
@login_required_json
def get_student(student_id):
"""获取学员详情"""
student = Student.query.get_or_404(student_id)
problems = student.problems.all()
plans = student.plans.order_by(db.desc("created_at")).all()
# 获取班级名称
class_name = None
if student.class_id:
cls = Class.query.get(student.class_id)
if cls:
class_name = cls.name
student_dict = student.to_dict()
student_dict["class_name"] = class_name
return jsonify(
{
"student": student_dict,
"problems": [p.to_dict() for p in problems],
"plans": [p.to_dict() for p in plans],
}
)
@main_bp.route("/api/students/<int:student_id>", methods=["PUT"])
@login_required_json
def update_student(student_id):
"""更新学员信息"""
student = Student.query.get_or_404(student_id)
data = request.get_json()
if "name" in data:
student.name = data["name"]
if "phone" in data:
student.phone = data["phone"]
if "wechat_nickname" in data:
student.wechat_nickname = data["wechat_nickname"]
if "practice_time" in data:
student.practice_time = data["practice_time"]
if "notes" in data:
student.notes = data["notes"]
if "class_id" in data:
student.class_id = data["class_id"]
db.session.commit()
return jsonify(student.to_dict())
@main_bp.route("/api/students/<int:student_id>", methods=["DELETE"])
@login_required_json
def delete_student(student_id):
"""删除学员"""
student = Student.query.get_or_404(student_id)
db.session.delete(student)
db.session.commit()
return jsonify({"message": "删除成功"})
@main_bp.route("/api/students/template")
@login_required_json
def download_student_template():
"""下载CSV导入模板"""
headers = ["ID", "姓名", "电话", "微信昵称", "每日练习时间", "班级", "备注"]
# 练习时间选项
practice_times = PRACTICE_TIME_OPTIONS
# 示例数据(ID留空,表示新增)
example = ["", "张三", "13800138000", "昵称", "30分钟", "班级A", "备注信息"]
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(headers)
writer.writerow(example)
# 添加22行空行,让可选值出现在第25-26行
for _ in range(22):
writer.writerow([])
# 获取进行中的班级列表
active_classes = Class.query.filter_by(active=True).all()
class_names = [c.name for c in active_classes] if active_classes else []
# 添加可选练习时间和可选班级说明(第25-26行)
writer.writerow(["可选练习时间:"] + list(practice_times))
if class_names:
writer.writerow(["可选班级:"] + class_names)
else:
writer.writerow(["可选班级:无进行中班级"])
# 使用BytesIO处理编码
csv_content = output.getvalue().encode("utf-8-sig")
return Response(
csv_content,
mimetype="text/csv; charset=utf-8-sig",
headers={"Content-Disposition": "attachment; filename=student_template.csv"},
)
@main_bp.route("/api/students/export")
@login_required_json
def export_students():
"""导出学员CSV"""
# 获取所有学员
students = Student.query.order_by(Student.created_at.desc()).all()
# 班级ID到名称映射
class_map = {c.id: c.name for c in Class.query.all()}
headers = ["ID", "姓名", "电话", "微信昵称", "每日练习时间", "班级", "备注"]
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(headers)
for s in students:
class_name = class_map.get(s.class_id, "") if s.class_id else ""
writer.writerow(
[
s.id,
s.name,
s.phone or "",
s.wechat_nickname or "",
s.practice_time,
class_name,
s.notes or "",
]
)
csv_content = output.getvalue().encode("utf-8-sig")
return Response(
csv_content,
mimetype="text/csv; charset=utf-8-sig",
headers={"Content-Disposition": "attachment; filename=students_export.csv"},
)
@main_bp.route("/api/students/import", methods=["POST"])
@login_required_json
def import_students():
"""导入学员CSV"""
if "file" not in request.files:
return jsonify({"error": "请选择文件"}), 400
file = request.files["file"]
if file.filename == "":
return jsonify({"error": "请选择文件"}), 400
if not file.filename.endswith(".csv"):
return jsonify({"error": "请上传CSV文件"}), 400
try:
raw_content = file.read()
# 尝试多种编码
try:
content = raw_content.decode("utf-8-sig")
except UnicodeDecodeError:
try:
content = raw_content.decode("gbk")
except UnicodeDecodeError:
content = raw_content.decode("utf-8", errors="ignore")
reader = csv.DictReader(io.StringIO(content))
success_count = 0
error_count = 0
errors = []
# 获取班级名称到ID的映射
class_map = {c.name: c.id for c in Class.query.all()}
# 练习时间列表
practice_time_list = PRACTICE_TIME_OPTIONS
for row_num, row in enumerate(reader, start=2):
# 检查是否为空白行(所有字段都为空)
is_empty = all(not (v and v.strip()) for v in row.values())
if is_empty:
# 遇到空行后停止导入(空行后是可选值说明)
break
# 跳过说明行
if row.get("姓名", "").strip() in ["可选练习时间:", "可选班级:"]:
continue
try:
name = row.get("姓名", "").strip()
phone = row.get("电话", "").strip()
wechat_nickname = row.get("微信昵称", "").strip()
practice_time = row.get("每日练习时间", "30分钟").strip()
class_name = row.get("班级", "").strip()
notes = row.get("备注", "").strip()
if not name:
continue
# 验证并转换练习时间
if practice_time not in practice_time_list:
practice_time = "30分钟"
# 查找班级ID
class_id = None
if class_name:
class_id = class_map.get(class_name)
# 优先用ID查找,否则用姓名+电话
existing = None
if row.get("ID", "").strip():
try:
student_id = int(row.get("ID", "").strip())
existing = Student.query.get(student_id)
except (ValueError, TypeError):
pass
# 如果没找到,尝试用姓名+电话
if not existing:
existing = Student.query.filter_by(
name=name, phone=phone if phone else None
).first()
if existing:
# 更新已存在的学员(所有字段都允许更新)
existing.name = name
existing.phone = phone
existing.wechat_nickname = wechat_nickname
existing.practice_time = practice_time
existing.class_id = class_id
existing.notes = notes
else:
# 创建新学员
student = Student(
name=name,
phone=phone,
wechat_nickname=wechat_nickname,
practice_time=practice_time,
notes=notes,
class_id=class_id,
)
db.session.add(student)
success_count += 1
except Exception as e:
error_count += 1
errors.append(f"{row_num}行: {str(e)}")
db.session.commit()
result = {"message": f"导入成功: {success_count}"}
if error_count > 0:
result["error_details"] = errors[:10] # 只返回前10条错误
return jsonify(result)
except Exception as e:
return jsonify({"error": f"导入失败: {str(e)}"}), 500
@main_bp.route("/api/statistics/students")
@login_required_json
def get_student_statistics():
"""获取学员统计数据"""
# 获取筛选参数
mine_filter = request.args.get("mine", "false") == "true"
class_id_filter = request.args.get("class_id", type=int)
current_user_id = session.get("user_id")
# 基础查询:班级
class_query = Class.query.filter_by(active=True)
if mine_filter and current_user_id:
class_query = class_query.filter_by(teacher_id=current_user_id)
if class_id_filter:
class_query = class_query.filter_by(id=class_id_filter)
classes = class_query.all()
# 获取班级IDs
class_ids = [c.id for c in classes]
# 学员过滤
student_query = Student.query
if class_ids:
student_query = student_query.filter(Student.class_id.in_(class_ids))
else:
student_query = student_query.filter(Student.id == -1) # 无班级时返回空
students = student_query.all()
student_ids = [s.id for s in students]
# 全体学员数量
total_students = len(students)
# 问题记录过滤
problem_query = StudentProblem.query.filter(StudentProblem.student_id.in_(student_ids)) if student_ids else StudentProblem.query.filter(StudentProblem.id == -1)
problems = problem_query.all()
# 问题级别分布(来自 StudentProblem.level
levels = ["启蒙", "入门", "进阶", "熟练", "精通"]
level_dist = {lv: 0 for lv in levels}
for p in problems:
if p.level in level_dist:
level_dist[p.level] += 1
# 问题严重程度分布
severity_dist = {"轻微": 0, "中等": 0, "严重": 0}
for p in problems:
if p.severity in severity_dist:
severity_dist[p.severity] += 1
# 各班级学员数量
class_student_count = []
for c in classes:
class_student_count.append({
"class_id": c.id,
"class_name": c.name,
"level": c.level,
"student_count": len(c.students)
})
# 问题×级别矩阵(严重程度 × 学员级别)
severities = ["轻微", "中等", "严重"]
matrix = {}
for sev in severities:
matrix[sev] = {}
for lv in levels:
matrix[sev][lv] = 0
for p in problems:
if p.level in levels and p.severity in severities:
matrix[p.severity][p.level] += 1
# 各班级问题数量
class_problem_count = []
for c in classes:
class_problems = [p for p in problems if p.student.class_id == c.id]
class_problem_count.append({
"class_id": c.id,
"class_name": c.name,
"problem_count": len(class_problems)
})
# 各问题名称分布
problem_name_dist = {}
for p in problems:
name = p.problem.name if p.problem else f"问题{p.problem_id}"
if name not in problem_name_dist:
problem_name_dist[name] = 0
problem_name_dist[name] += 1
# 排序:数量多的在前
problem_name_dist = dict(sorted(problem_name_dist.items(), key=lambda x: x[1], reverse=True))
# 问题×班级矩阵
problem_class_matrix = []
for p_name in problem_name_dist.keys():
row = {"problem_name": p_name}
for c in classes:
count = sum(1 for p in problems if (p.problem.name if p.problem else f"问题{p.problem_id}") == p_name and p.student.class_id == c.id)
row[f"class_{c.id}"] = count
problem_class_matrix.append(row)
return jsonify({
"total_students": total_students,
"total_problems": len(problems),
"level_distribution": level_dist,
"severity_distribution": severity_dist,
"class_student_count": class_student_count,
"class_problem_count": class_problem_count,
"problem_level_matrix": matrix,
"problem_name_distribution": problem_name_dist,
"problem_class_matrix": problem_class_matrix,
"classes": [{"id": c.id, "name": c.name} for c in classes],
})