feat: 初始提交 v1.2.0 - 钢琴练习方案生成系统

This commit is contained in:
hmo
2026-04-21 20:00:33 +08:00
commit fd593bddf4
44 changed files with 10936 additions and 0 deletions
+8
View File
@@ -0,0 +1,8 @@
# 路由蓝图
from flask import Blueprint
main_bp = Blueprint("main", __name__)
# 导入各路由模块
from app.routes import students, plans, problems, settings, auth, classes, users, backup
+160
View File
@@ -0,0 +1,160 @@
# 登录认证路由
from flask import request, jsonify, render_template, session, redirect, url_for
from app.routes import main_bp
from app.models import db, User
@main_bp.route("/login")
def login_page():
"""登录页面"""
if session.get("user_id"):
return redirect("/")
return render_template("login.html")
@main_bp.route("/api/login", methods=["POST"])
def api_login():
"""登录API"""
data = request.get_json()
username = data.get("username", "").strip()
password = data.get("password", "")
if not username or not password:
return jsonify({"error": "请输入用户名和密码"}), 400
user = User.query.filter_by(username=username).first()
if not user or not user.check_password(password):
return jsonify({"error": "用户名或密码错误"}), 401
session["user_id"] = user.id
session["username"] = user.username
return jsonify({"message": "登录成功"})
@main_bp.route("/api/logout", methods=["POST"])
def api_logout():
"""登出API"""
session.clear()
return jsonify({"message": "登出成功"})
@main_bp.route("/api/check-login", methods=["GET"])
def check_login():
"""检查登录状态"""
if session.get("user_id"):
user = User.query.get(session.get("user_id"))
return jsonify(
{
"logged_in": True,
"username": session.get("username"),
"role": user.role if user else "user",
}
)
return jsonify({"logged_in": False})
@main_bp.route("/setup")
def setup_page():
"""初始设置页面(首次访问)"""
# 检查是否已有用户
user_count = User.query.count()
if user_count > 0:
return redirect("/login")
return render_template("setup.html")
@main_bp.route("/api/setup", methods=["POST"])
def api_setup():
"""初始设置API"""
data = request.get_json()
username = data.get("username", "").strip()
password = data.get("password", "")
confirm_password = data.get("confirm_password", "")
# 检查是否已有用户
if User.query.count() > 0:
return jsonify({"error": "系统已设置,请登录"}), 400
if not username or not password:
return jsonify({"error": "请输入用户名和密码"}), 400
if password != confirm_password:
return jsonify({"error": "两次密码输入不一致"}), 400
try:
user = User(username=username, role="admin")
user.set_password(password)
db.session.add(user)
db.session.commit()
session["user_id"] = user.id
session["username"] = user.username
return jsonify({"message": "设置成功"})
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
db.session.rollback()
return jsonify({"error": "设置失败: " + str(e)}), 500
# 登录_required装饰器
def login_required(f):
"""登录验证装饰器"""
from functools import wraps
@wraps(f)
def decorated(*args, **kwargs):
if not session.get("user_id"):
return redirect("/login")
return f(*args, **kwargs)
return decorated
def admin_required(f):
"""管理员权限装饰器"""
from functools import wraps
@wraps(f)
def decorated(*args, **kwargs):
if not session.get("user_id"):
return jsonify({"error": "请先登录"}), 401
user = User.query.get(session.get("user_id"))
if not user or user.role != "admin":
return jsonify({"error": "权限不足"}), 403
return f(*args, **kwargs)
return decorated
def login_required_json(f):
"""登录验证装饰器(返回JSON"""
from functools import wraps
@wraps(f)
def decorated(*args, **kwargs):
if not session.get("user_id"):
return jsonify({"error": "请先登录"}), 401
return f(*args, **kwargs)
return decorated
def get_current_user():
"""获取当前登录用户"""
user_id = session.get("user_id")
if user_id:
return User.query.get(user_id)
return None
def is_admin():
"""判断当前用户是否是管理员"""
user = get_current_user()
return user and user.role == "admin"
def check_login():
"""检查登录状态(返回布尔值)"""
return session.get("user_id") is not None
+313
View File
@@ -0,0 +1,313 @@
# 备份与操作日志管理
import os
import json
import shutil
from datetime import datetime
from flask import request, jsonify, current_app, session
from app.routes import main_bp
from app.models import db, Student, Class, User, PROBLEM_LIST
from app.routes.auth import login_required_json, admin_required
import logging
logger = logging.getLogger(__name__)
# 备份目录
BACKUP_DIR = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "bk"
)
LOGS_DIR = os.path.join(BACKUP_DIR, "logs")
PROBLEMS_DIR = os.path.join(BACKUP_DIR, "problems")
def ensure_dirs():
"""确保备份目录存在"""
os.makedirs(LOGS_DIR, exist_ok=True)
os.makedirs(PROBLEMS_DIR, exist_ok=True)
def log_operation(action, operator, target_type, target_id, detail=""):
"""记录操作日志"""
ensure_dirs()
log_entry = {
"timestamp": datetime.now().isoformat(),
"action": action,
"operator": operator,
"target_type": target_type,
"target_id": target_id,
"detail": detail,
"ip": request.remote_addr if request else None,
}
log_file = os.path.join(LOGS_DIR, f"{datetime.now().strftime('%Y-%m-%d')}.jsonl")
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
return log_entry
def get_backup_list():
"""获取备份列表"""
ensure_dirs()
backups = []
if os.path.exists(BACKUP_DIR):
for name in os.listdir(BACKUP_DIR):
path = os.path.join(BACKUP_DIR, name)
if os.path.isdir(path) and name.startswith("backup_"):
meta_file = os.path.join(path, "meta.json")
meta = {}
if os.path.exists(meta_file):
with open(meta_file, encoding="utf-8") as f:
meta = json.load(f)
backups.append(
{
"id": name,
"created_at": meta.get("created_at", ""),
"operator": meta.get("operator", ""),
"description": meta.get("description", ""),
"includes": meta.get("includes", []),
}
)
return sorted(backups, key=lambda x: x["created_at"], reverse=True)
@main_bp.route("/api/backup", methods=["POST"])
@login_required_json
@admin_required
def create_backup():
"""创建数据备份"""
ensure_dirs()
data = request.get_json() or {}
description = data.get("description", "")
# 创建备份目录
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_id = f"backup_{timestamp}"
backup_path = os.path.join(BACKUP_DIR, backup_id)
os.makedirs(backup_path, exist_ok=True)
operator = session.get("username", "unknown")
# 备份各类数据
includes = []
# 1. 备份班级
classes = Class.query.all()
classes_data = [c.to_dict() for c in classes]
with open(os.path.join(backup_path, "classes.json"), "w", encoding="utf-8") as f:
json.dump(classes_data, f, ensure_ascii=False, indent=2)
includes.append("classes")
# 2. 备份学员
students = Student.query.all()
students_data = []
for s in students:
students_data.append(
{
"id": s.id,
"name": s.name,
"phone": s.phone,
"wechat_nickname": s.wechat_nickname,
"practice_time": s.practice_time,
"class_id": s.class_id,
"notes": s.notes,
"created_at": s.created_at.isoformat() if s.created_at else None,
}
)
with open(os.path.join(backup_path, "students.json"), "w", encoding="utf-8") as f:
json.dump(students_data, f, ensure_ascii=False, indent=2)
includes.append("students")
# 3. 备份问题配置
problems_backup_dir = os.path.join(backup_path, "problems")
os.makedirs(problems_backup_dir, exist_ok=True)
# 读取问题文件
problems_dir = current_app.config.get("PROBLEMS_DIR")
if problems_dir and os.path.exists(problems_dir):
for filename in os.listdir(problems_dir):
if filename.endswith(".md"):
src = os.path.join(problems_dir, filename)
dest = os.path.join(problems_backup_dir, filename)
shutil.copy2(src, dest)
includes.append("problems")
# 4. 备份配置文件
config_path = os.path.join(
os.path.dirname(os.path.dirname(__file__)), "config", "api_config.json"
)
if os.path.exists(config_path):
shutil.copy2(config_path, os.path.join(backup_path, "api_config.json"))
includes.append("config")
# 写入元数据
meta = {
"backup_id": backup_id,
"created_at": datetime.now().isoformat(),
"operator": operator,
"description": description,
"includes": includes,
}
with open(os.path.join(backup_path, "meta.json"), "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
# 记录操作日志
log_operation(
"backup_create", operator, "system", backup_id, f"创建备份: {description}"
)
return jsonify(
{"message": "备份成功", "backup_id": backup_id, "includes": includes}
)
@main_bp.route("/api/backup", methods=["GET"])
@login_required_json
def list_backups():
"""获取备份列表"""
backups = get_backup_list()
return jsonify(backups)
@main_bp.route("/api/backup/<backup_id>", methods=["GET"])
@login_required_json
def get_backup(backup_id):
"""查看备份内容"""
backup_path = os.path.join(BACKUP_DIR, backup_id)
if not os.path.exists(backup_path):
return jsonify({"error": "备份不存在"}), 404
# 读取元数据
meta_file = os.path.join(backup_path, "meta.json")
meta = {}
if os.path.exists(meta_file):
with open(meta_file, encoding="utf-8") as f:
meta = json.load(f)
# 读取各类数据
data = {"meta": meta}
for fname in ["classes.json", "students.json", "api_config.json"]:
fpath = os.path.join(backup_path, fname)
if os.path.exists(fpath):
with open(fpath, encoding="utf-8") as f:
data[fname.replace(".json", "")] = json.load(f)
# 问题文件列表
problems_dir = os.path.join(backup_path, "problems")
if os.path.exists(problems_dir):
data["problems"] = os.listdir(problems_dir)
return jsonify(data)
@main_bp.route("/api/backup/<backup_id>/restore", methods=["POST"])
@login_required_json
@admin_required
def restore_backup(backup_id):
"""恢复备份(危险操作)"""
data = request.get_json() or {}
confirm_text = data.get("confirm", "")
# 安全确认
if confirm_text != "确认恢复":
return jsonify({"error": "请输入 '确认恢复' 以确认此操作"}), 400
backup_path = os.path.join(BACKUP_DIR, backup_id)
if not os.path.exists(backup_path):
return jsonify({"error": "备份不存在"}), 404
operator = session.get("username", "unknown")
# 先创建当前数据的自动备份
auto_backup_id = f"auto_before_restore_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
auto_backup_path = os.path.join(BACKUP_DIR, auto_backup_id)
shutil.copytree(backup_path, auto_backup_path)
# 恢复各类数据
restored = []
# 1. 恢复班级
classes_file = os.path.join(backup_path, "classes.json")
if os.path.exists(classes_file):
with open(classes_file, encoding="utf-8") as f:
classes_data = json.load(f)
# 清空并重建班级
Class.query.delete()
for c in classes_data:
cls = Class(
id=c.get("id"),
name=c.get("name"),
description=c.get("description"),
active=c.get("active", True),
)
db.session.add(cls)
db.session.commit()
restored.append("classes")
# 2. 恢复学员
students_file = os.path.join(backup_path, "students.json")
if os.path.exists(students_file):
with open(students_file, encoding="utf-8") as f:
students_data = json.load(f)
# 清空并重建学员
Student.query.delete()
for s in students_data:
student = Student(
id=s.get("id"),
name=s.get("name"),
phone=s.get("phone"),
wechat_nickname=s.get("wechat_nickname"),
practice_time=s.get("practice_time", "30分钟"),
class_id=s.get("class_id"),
notes=s.get("notes"),
)
db.session.add(student)
db.session.commit()
restored.append("students")
# 3. 恢复问题配置
problems_src = os.path.join(backup_path, "problems")
if os.path.exists(problems_src):
# 获取目标问题目录
problem_dir = current_app.config.get("PROBLEMS_DIR")
if problem_dir and os.path.exists(problem_dir):
# 清空目标目录
for f in os.listdir(problem_dir):
if f.endswith(".md"):
os.remove(os.path.join(problem_dir, f))
# 复制备份的问题文件
for f in os.listdir(problems_src):
if f.endswith(".md"):
shutil.copy2(os.path.join(problems_src, f), problem_dir)
restored.append("problems")
# 记录操作日志
log_operation(
"backup_restore",
operator,
"system",
backup_id,
f"恢复备份: {backup_id}, 已自动备份当前数据到: {auto_backup_id}",
)
return jsonify(
{"message": "恢复成功", "restored": restored, "auto_backup": auto_backup_id}
)
@main_bp.route("/api/logs", methods=["GET"])
@login_required_json
@admin_required
def get_logs():
"""获取操作日志"""
date = request.args.get("date", datetime.now().strftime("%Y-%m-%d"))
log_file = os.path.join(LOGS_DIR, f"{date}.jsonl")
if not os.path.exists(log_file):
return jsonify([])
logs = []
with open(log_file, encoding="utf-8") as f:
for line in f:
if line.strip():
logs.append(json.loads(line))
return jsonify(logs)
+151
View File
@@ -0,0 +1,151 @@
# 班级管理路由
from flask import request, jsonify, render_template, session
from app.routes import main_bp
from app.models import db, Class, Student
from app.routes.auth import login_required_json, admin_required
import logging
logger = logging.getLogger(__name__)
@main_bp.route("/classes")
@login_required_json
def classes_page():
"""班级管理页面"""
return render_template("classes.html")
@main_bp.route("/api/classes", methods=["GET"])
@login_required_json
def api_classes_list():
"""班级列表"""
# 支持筛选参数: active=true, active=false, 不传则返回全部
active_filter = request.args.get("active")
query = Class.query
if active_filter is not None:
if active_filter.lower() == "true":
query = query.filter(Class.active == True)
elif active_filter.lower() == "false":
query = query.filter(Class.active == False)
classes = query.order_by(Class.created_at.desc()).all()
return jsonify([c.to_dict() for c in classes])
@main_bp.route("/api/classes", methods=["POST"])
@admin_required
def api_classes_create():
"""新增班级"""
data = request.get_json()
name = data.get("name", "").strip()
description = data.get("description", "")
active = data.get("active", True) # 默认进行中
if not name:
return jsonify({"error": "请输入班级名称", "code": "VALIDATION_ERROR"}), 400
if Class.query.filter_by(name=name).first():
return jsonify({"error": "班级名称已存在", "code": "DUPLICATE_NAME"}), 400
try:
cls = Class(name=name, description=description, active=active)
db.session.add(cls)
db.session.commit()
return jsonify(cls.to_dict())
except Exception as e:
db.session.rollback()
logger.error(f"创建班级失败: {str(e)}")
return jsonify({"error": "操作失败,请稍后重试", "code": "SERVER_ERROR"}), 500
@main_bp.route("/api/classes/<int:class_id>", methods=["PUT"])
@admin_required
def api_classes_update(class_id):
"""编辑班级"""
cls = Class.query.get_or_404(class_id)
data = request.get_json()
if "name" in data and data["name"]:
# 检查名称是否与其他班级冲突
existing = Class.query.filter_by(name=data["name"]).first()
if existing and existing.id != class_id:
return jsonify({"error": "班级名称已存在", "code": "DUPLICATE_NAME"}), 400
cls.name = data["name"]
if "description" in data:
cls.description = data["description"]
if "active" in data:
cls.active = data["active"]
try:
db.session.commit()
return jsonify(cls.to_dict())
except Exception as e:
db.session.rollback()
logger.error(f"更新班级失败: {str(e)}")
return jsonify({"error": "操作失败,请稍后重试", "code": "SERVER_ERROR"}), 500
@main_bp.route("/api/classes/<int:class_id>", methods=["DELETE"])
@admin_required
def api_classes_delete(class_id):
"""删除班级(不删除学员,仅解除关联)"""
cls = Class.query.get_or_404(class_id)
try:
# 解除学员与班级的关联
Student.query.filter_by(class_id=class_id).update({"class_id": None})
db.session.delete(cls)
db.session.commit()
return jsonify({"message": "删除成功"})
except Exception as e:
db.session.rollback()
logger.error(f"删除班级失败: {str(e)}")
return jsonify({"error": "操作失败,请稍后重试", "code": "SERVER_ERROR"}), 500
@main_bp.route("/api/classes/<int:class_id>/students", methods=["GET"])
@login_required_json
def api_classes_students(class_id):
"""班级学员列表"""
cls = Class.query.get_or_404(class_id)
# 直接查询而非使用relationship
students = Student.query.filter_by(class_id=class_id).all()
return jsonify(
[
{
"id": s.id,
"name": s.name,
"phone": s.phone,
"practice_time": s.practice_time,
}
for s in students
]
)
@main_bp.route("/api/classes/<int:class_id>/assign", methods=["POST"])
@login_required_json
def api_classes_assign(class_id):
"""分配学员到班级"""
cls = Class.query.get_or_404(class_id)
data = request.get_json()
student_ids = data.get("student_ids", [])
try:
# 1. 先清除这个班级的所有学员关联
Student.query.filter_by(class_id=class_id).update({"class_id": None})
# 2. 再分配选中的学员到这个班级
if student_ids:
Student.query.filter(Student.id.in_(student_ids)).update(
{"class_id": class_id}
)
db.session.commit()
return jsonify({"message": "分配成功"})
except Exception as e:
db.session.rollback()
logger.error(f"分配学员失败: {str(e)}")
return jsonify({"error": "操作失败,请稍后重试", "code": "SERVER_ERROR"}), 500
+502
View File
@@ -0,0 +1,502 @@
# 方案生成路由
import json
import os
from flask import (
request,
jsonify,
render_template,
send_file,
current_app,
Response,
stream_with_context,
session,
)
from app.routes import main_bp
from app.models import db, Student, PracticePlan
from app.services.plan_generator import generate_practice_plan, generate_ai_report
from app.services.pdf_generator import generate_pdf
from app.routes.auth import login_required_json
def sse_format(data):
"""格式化SSE数据"""
return f"data: {json.dumps(data)}\n\n"
@main_bp.route("/api/students/<int:student_id>/plans", methods=["GET"])
@login_required_json
def get_student_plans(student_id):
"""获取学员的练习方案列表"""
student = Student.query.get_or_404(student_id)
plans = student.plans.order_by(PracticePlan.created_at.desc()).all()
return jsonify([p.to_dict() for p in plans])
@main_bp.route("/api/generate-plan", methods=["POST"])
@login_required_json
def generate_plan():
"""生成练习方案 - 使用SSE实时推送进度"""
data = request.get_json()
student_id = data.get("student_id")
use_ai = data.get("use_ai", True)
template_id = data.get("template_id") # AI提示词模板ID
student = Student.query.get_or_404(student_id)
problems = student.problems.all()
if not problems:
return jsonify({"error": "请先记录学员的问题"}), 400
# 预先收集所有数据,避免在generator中访问数据库
problems_dir = current_app.config["PROBLEMS_DIR"]
# 学员的统一练习时间
practice_time = student.practice_time or "30-60分钟"
problem_data = []
for p in problems:
# problem_id 已经是完整标识(如 "01_手小"),直接用作文件名
problem_file = os.path.join(problems_dir, f"{p.problem_id}.md")
content = ""
if os.path.exists(problem_file):
with open(problem_file, "r", encoding="utf-8") as f:
content = f.read()
problem_data.append(
{
"problem_id": p.problem_id,
"problem_name": p.problem_name,
"severity": p.severity,
"level": p.level,
"content": content,
}
)
time_mapping = {
"15分钟": {"total": 15, "basic": 10, "tech": 2, "piece": 3},
"30分钟": {"total": 30, "basic": 15, "tech": 5, "piece": 10},
"45分钟": {"total": 45, "basic": 15, "tech": 10, "piece": 20},
"60分钟": {"total": 60, "basic": 20, "tech": 15, "piece": 25},
"90分钟": {"total": 90, "basic": 25, "tech": 25, "piece": 40},
"120分钟": {"total": 120, "basic": 30, "tech": 35, "piece": 55},
"150分钟以上": {"total": 150, "basic": 35, "tech": 45, "piece": 70},
}
time_config = time_mapping.get(practice_time, time_mapping["30分钟"])
# 加载API配置
from app.config import load_api_config
api_config = load_api_config(current_app.config)
@stream_with_context
def generate():
# Step 1: 收集问题数据
yield sse_format(
{"step": "collecting", "message": "正在收集问题数据...", "progress": 10}
)
# Step 2: 生成基础方案
yield sse_format(
{"step": "basic", "message": "正在生成基础练习方案...", "progress": 30}
)
plan_content = generate_practice_plan(
student_name=student.name,
problems=problem_data,
problems_dir=problems_dir,
practice_time=practice_time,
)
yield sse_format(
{"step": "basic_done", "message": "基础方案生成完成", "progress": 50}
)
# Step 3: 如果启用AI,生成AI报告
ai_report = None
if use_ai:
# 显示AI配置信息
yield sse_format(
{
"step": "ai_config",
"message": f"AI配置: {api_config.get('model', 'N/A')} @ {api_config.get('provider', 'N/A')}",
"progress": 55,
"detail": f"Endpoint: {api_config.get('base_url', '')}",
}
)
yield sse_format(
{
"step": "ai_start",
"message": "正在调用AI生成个性化报告...",
"progress": 60,
}
)
# 构建问题摘要用于显示
problems_summary = []
for p in problem_data[:3]: # 只显示前3个问题
level = p.get("level", "入门")
severity = p.get("severity", "中等")
problems_summary.append(f"- {p['problem_name']} [{level} | {severity}]")
yield sse_format(
{
"step": "ai_problems",
"message": "发送问题给AI:",
"progress": 62,
"detail": "\n".join(problems_summary),
}
)
yield sse_format(
{"step": "ai_request", "message": "等待AI响应...", "progress": 65}
)
# 先用dry_run模式获取提示词并显示给用户
prompt, _, error = generate_ai_report(
student_name=student.name,
wechat_nickname=student.wechat_nickname or "",
problems=problem_data,
practice_time=practice_time,
time_config=time_config,
template_id=template_id,
dry_run=True
)
# 发送提示词给前端显示
yield sse_format({
"step": "ai_prompt",
"message": "发送给AI的提示词:",
"progress": 60,
"detail": prompt if prompt else ""
})
if error:
yield sse_format(
{
"step": "ai_error",
"message": f"获取提示词失败: {error}",
"progress": 80,
"error": error,
}
)
return
yield sse_format(
{"step": "ai_generating", "message": "正在生成AI报告...", "progress": 70}
)
# 真正调用API生成报告
_, ai_report, error = generate_ai_report(
student_name=student.name,
wechat_nickname=student.wechat_nickname or "",
problems=problem_data,
practice_time=practice_time,
time_config=time_config,
template_id=template_id,
dry_run=False
)
if error:
yield sse_format(
{
"step": "ai_error",
"message": f"AI生成失败: {error}",
"progress": 80,
"error": error,
}
)
plan_content["ai_report_error"] = error
else:
# 显示AI返回的报告长度
report_lines = len(ai_report.split("\n")) if ai_report else 0
yield sse_format(
{
"step": "ai_response",
"message": f"AI报告已生成",
"progress": 75,
"detail": f"报告长度: {len(ai_report) if ai_report else 0} 字符, {report_lines}",
}
)
yield sse_format(
{
"step": "ai_done",
"message": "AI报告生成完成",
"progress": 80,
"has_ai": True,
}
)
plan_content["ai_report"] = ai_report
else:
yield sse_format(
{"step": "ai_skip", "message": "跳过AI生成", "progress": 80}
)
# Step 4: 保存方案
yield sse_format(
{"step": "saving", "message": "正在保存方案...", "progress": 90}
)
plan = None
try:
plan = PracticePlan(
student_id=student_id,
content=json.dumps(plan_content, ensure_ascii=False),
)
db.session.add(plan)
db.session.commit()
yield sse_format(
{
"step": "saved",
"message": f"方案已保存 (ID: {plan.id})",
"progress": 95,
}
)
except Exception as e:
db.session.rollback()
yield sse_format(
{
"step": "save_error",
"message": f"保存失败: {str(e)}",
"progress": 90,
"error": str(e),
}
)
# 确保 complete 消息被发送
try:
yield sse_format(
{
"step": "complete",
"message": "方案生成完成!",
"progress": 100,
"plan_id": plan.id if plan and hasattr(plan, 'id') else None,
"content": plan_content,
"ai_report": ai_report,
}
)
except Exception as e:
# 最后一道防护 - 即使出错也要发送 complete
yield sse_format(
{
"step": "complete",
"message": f"方案生成完成!(部分错误: {str(e)[:50]}",
"progress": 100,
"plan_id": None,
"error": str(e),
}
)
return Response(
generate(),
mimetype="text/event-stream",
headers={
"X-Accel-Buffering": "no",
"Cache-Control": "no-cache",
}
)
@main_bp.route("/api/plans/<int:plan_id>", methods=["GET"])
@login_required_json
def get_plan(plan_id):
"""获取单个方案详情"""
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
return jsonify(
{
"id": plan.id,
"student_id": plan.student_id,
"student_name": plan.student.name if plan.student else "",
"created_at": plan.created_at.strftime("%Y-%m-%d %H:%M"),
"content": content,
}
)
@main_bp.route("/api/plans/<int:plan_id>/pdf", methods=["GET"])
@login_required_json
def export_pdf(plan_id):
"""导出PDF - 使用模板"""
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
student_name = plan.student.name if plan.student else "未知学员"
# 尝试使用数据库中的报告模板
report_template = None
try:
from app.models import Template
tmpl = Template.query.filter_by(type="report").first()
if tmpl:
report_template = tmpl.content
except:
pass
# 如果有模板,先渲染
rendered_report = None
if report_template:
rendered_report = report_template
rendered_report = rendered_report.replace("{student_name}", student_name)
rendered_report = rendered_report.replace("{practice_time}", content.get('practice_time', 'N/A'))
rendered_report = rendered_report.replace("{total_minutes}", str(content.get('total_daily_minutes', 0)))
rendered_report = rendered_report.replace("{generated_at}", content.get('generated_at', ''))
if content.get('ai_report'):
rendered_report = rendered_report.replace("{ai_report}", content['ai_report'])
else:
rendered_report = rendered_report.replace("{ai_report}", "(未生成AI报告)")
problem_tags = ""
for problem in content.get('problems', []):
problem_tags += f"- **{problem.get('name', '')}** ({problem.get('severity', '')})\n"
rendered_report = rendered_report.replace("{problem_tags}", problem_tags or "(无)")
schedule_table = "| 阶段 | 时长 | 内容 | 目的 |\n|------|------|------|------|\n"
for item in content.get('daily_schedule', []):
schedule_table += f"| {item.get('phase', '')} | {item.get('duration', '')} | {item.get('content', '')} | {item.get('purpose', '')} |\n"
rendered_report = rendered_report.replace("{schedule_table}", schedule_table)
pdf_path = generate_pdf(
plan_id=plan_id,
student_name=student_name,
content=content,
output_dir=current_app.config["PDF_OUTPUT_DIR"],
rendered_report=rendered_report, # 传递渲染后的报告
)
return send_file(
pdf_path, as_attachment=True, download_name=f"{student_name}_练习方案.pdf"
)
@main_bp.route("/api/plans/<int:plan_id>/md", methods=["GET"])
@login_required_json
def export_md(plan_id):
"""导出Markdown"""
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
student_name = plan.student.name if plan.student else "未知学员"
template_id = request.args.get('template_id', type=int)
# 获取报告模板
report_template = None
try:
from app.models import Template
if template_id:
tmpl = Template.query.get(template_id)
if tmpl and tmpl.type == "report":
report_template = tmpl.content
else:
tmpl = Template.query.filter_by(type="report").order_by(Template.sort_order.asc()).first()
if tmpl:
report_template = tmpl.content
except:
pass
if report_template:
# 使用模板渲染
# 替换变量
rendered = report_template
rendered = rendered.replace("{student_name}", student_name)
rendered = rendered.replace("{practice_time}", content.get('practice_time', 'N/A'))
rendered = rendered.replace("{total_minutes}", str(content.get('total_daily_minutes', 0)))
rendered = rendered.replace("{generated_at}", content.get('generated_at', ''))
# AI报告
if content.get('ai_report'):
rendered = rendered.replace("{ai_report}", content['ai_report'])
else:
rendered = rendered.replace("{ai_report}", "(未生成AI报告)")
# 问题诊断标签
problem_tags = ""
for problem in content.get('problems', []):
problem_tags += f"- **{problem.get('name', '')}** ({problem.get('severity', '')})\n"
rendered = rendered.replace("{problem_tags}", problem_tags or "(无)")
# 每日计划表格
schedule_table = "| 阶段 | 时长 | 内容 | 目的 |\n|------|------|------|------|\n"
for item in content.get('daily_schedule', []):
schedule_table += f"| {item.get('phase', '')} | {item.get('duration', '')} | {item.get('content', '')} | {item.get('purpose', '')} |\n"
rendered = rendered.replace("{schedule_table}", schedule_table)
md_content = rendered
else:
# 兜底:生成完整内容
md_lines = [
f"# {student_name} - 个性化练习方案\n",
f"**练习时间**: {content.get('practice_time', 'N/A')} (共{content.get('total_daily_minutes', 0)}分钟)\n",
f"**生成时间**: {content.get('generated_at', '')}\n",
"\n---\n",
]
if content.get('ai_report'):
md_lines.append("## 📝 AI个性化报告\n")
md_lines.append(content['ai_report'])
md_lines.append("\n---\n")
md_lines.append("## 🔍 问题诊断\n")
for problem in content.get('problems', []):
md_lines.append(f"- **{problem.get('name', '')}** ({problem.get('severity', '')})\n")
md_lines.append("\n")
if content.get('problem_details'):
md_lines.append("## 📚 问题详解\n")
for detail in content.get('problem_details', []):
md_lines.append(f"### {detail.get('name', '')} ({detail.get('severity', '')})\n")
md_lines.append(f"{detail.get('content', '')}\n\n")
md_lines.append("## 📅 每日练习计划\n")
md_lines.append("| 阶段 | 时长 | 内容 | 目的 |\n|------|------|------|------|\n")
for item in content.get('daily_schedule', []):
md_lines.append(f"| {item.get('phase', '')} | {item.get('duration', '')} | {item.get('content', '')} | {item.get('purpose', '')} |\n")
md_content = ''.join(md_lines)
from urllib.parse import quote
filename = quote(f"{student_name}_练习方案.md")
return Response(
md_content,
mimetype='text/markdown; charset=utf-8',
headers={
'Content-Disposition': f"attachment; filename*=UTF-8''{filename}"
}
)
@main_bp.route("/plans/<int:plan_id>/wechat", methods=["GET"])
@login_required_json
def wechat_card(plan_id):
"""微信卡片展示页"""
plan = PracticePlan.query.get_or_404(plan_id)
content = json.loads(plan.content)
return render_template(
"wechat_card.html",
plan_id=plan_id,
student_name=plan.student.name if plan.student else "",
content=content,
)
@main_bp.route("/api/plans/<int:plan_id>", methods=["DELETE"])
@login_required_json
def delete_plan(plan_id):
"""删除方案"""
plan = PracticePlan.query.get_or_404(plan_id)
db.session.delete(plan)
db.session.commit()
return jsonify({"message": "删除成功"})
@main_bp.route("/api/plans/<int:plan_id>/content", methods=["PUT"])
@login_required_json
def update_plan_content(plan_id):
"""更新方案内容(用于编辑)"""
plan = PracticePlan.query.get_or_404(plan_id)
data = request.get_json()
# 更新content字段
if "content" in data:
plan.content = data["content"]
db.session.commit()
return jsonify({"message": "保存成功"})
+82
View File
@@ -0,0 +1,82 @@
# 问题记录路由
from flask import request, jsonify
from app.routes import main_bp
from app.models import db, Student, StudentProblem
from app.routes.auth import login_required_json
@main_bp.route("/api/students/<int:student_id>/problems", methods=["GET"])
@login_required_json
def get_student_problems(student_id):
"""获取学员的问题列表"""
student = Student.query.get_or_404(student_id)
problems = student.problems.all()
return jsonify([p.to_dict() for p in problems])
@main_bp.route("/api/students/<int:student_id>/problems", methods=["POST"])
@login_required_json
def add_student_problem(student_id):
"""为学员添加问题(同步模式:先删再加)"""
student = Student.query.get_or_404(student_id)
data = request.get_json()
problems = data.get("problems", [])
submitted_ids = set()
# 删除旧的问题(如果勾选被取消)
existing = StudentProblem.query.filter_by(student_id=student_id).all()
for e in existing:
if e.problem_id not in [p.get("problem_id") for p in problems]:
db.session.delete(e)
# 添加或更新问题
for p in problems:
problem_id = p.get("problem_id")
submitted_ids.add(problem_id)
# 检查是否已存在
existing = StudentProblem.query.filter_by(
student_id=student_id, problem_id=problem_id
).first()
if existing:
# 更新现有记录
existing.severity = p.get("severity")
existing.level = p.get("level")
else:
# 添加新记录
problem = StudentProblem(
student_id=student_id,
problem_id=problem_id,
problem_name=p.get("problem_name"),
severity=p.get("severity"),
level=p.get("level"),
)
db.session.add(problem)
db.session.commit()
return jsonify({"message": "问题记录成功"})
@main_bp.route("/api/students/<int:student_id>/problems", methods=["DELETE"])
@login_required_json
def clear_student_problems(student_id):
"""清除学员的问题记录"""
StudentProblem.query.filter_by(student_id=student_id).delete()
db.session.commit()
return jsonify({"message": "清除成功"})
@main_bp.route(
"/api/students/<int:student_id>/problems/<problem_id>", methods=["DELETE"]
)
@login_required_json
def delete_single_problem(student_id, problem_id):
"""删除学员的单个问题"""
StudentProblem.query.filter_by(
student_id=student_id, problem_id=problem_id
).delete()
db.session.commit()
return jsonify({"message": "删除成功"})
+368
View File
@@ -0,0 +1,368 @@
# 问题配置路由 - 完整CRUD
import os
import shutil
from datetime import datetime
from flask import request, jsonify, render_template, current_app, session, redirect
from app.routes import main_bp
from app.config import load_api_config, save_api_config
from app.routes.auth import login_required_json, admin_required
@main_bp.route("/settings")
@login_required_json
def settings():
"""问题配置页面 - 所有登录用户可访问"""
return render_template("settings.html")
@main_bp.route("/api-settings")
@admin_required
def api_settings_page():
"""API设置页面 - 仅管理员"""
return render_template("api_settings.html")
# ==================== API配置接口 ====================
@main_bp.route("/api/config", methods=["GET"])
@admin_required
def get_api_config():
"""获取API配置"""
config = load_api_config(current_app.config)
# 不返回完整的api_key,只返回前5位和后5位
if config.get("api_key"):
api_key = config["api_key"]
if len(api_key) > 10:
config["api_key_preview"] = api_key[:5] + "..." + api_key[-5:]
else:
config["api_key_preview"] = "****"
return jsonify(config)
@main_bp.route("/api/config", methods=["POST"])
@admin_required
def update_api_config():
"""更新API配置"""
data = request.get_json()
# 验证必填字段
if not data.get("api_key"):
return jsonify({"error": "API Key不能为空"}), 400
# 根据provider设置默认endpoint
provider = data.get("provider", "volcengine")
default_endpoints = {
"minimax": "https://api.minimaxi.com/anthropic/v1",
"volcengine": "https://ark.cn-beijing.volces.com/api/coding/v3",
"deepseek": "https://api.deepseek.com/v1",
"openrouter": "https://openrouter.ai/api/v1",
}
default_endpoint = default_endpoints.get(provider, "https://ark.cn-beijing.volces.com/api/coding/v3")
# 保存配置
config = {
"provider": provider,
"api_key": data.get("api_key", ""),
"base_url": data.get("base_url", default_endpoint),
"model": data.get("model", "doubao-seed-2.0-pro"),
"temperature": float(data.get("temperature", 0.7)),
"prompt_template": data.get("prompt_template", ""),
}
save_api_config(config, current_app.config)
return jsonify({"message": "配置保存成功"})
@main_bp.route("/api/problems", methods=["GET"])
@login_required_json
def get_problems():
"""获取问题列表(从文件夹动态读取)"""
problems_dir = current_app.config.get("PROBLEMS_DIR")
problems = []
if problems_dir and os.path.exists(problems_dir):
for filename in sorted(os.listdir(problems_dir)):
if (
filename.endswith(".md")
and not filename.startswith("模板")
and not filename.startswith("针对性练习建议")
):
name = filename.replace(".md", "")
if "汇总" in name:
continue
parts = name.split("_", 1)
if len(parts) == 2:
problem_id = parts[0]
problem_name = parts[1]
category = "技术类"
filepath = os.path.join(problems_dir, filename)
try:
with open(filepath, "r", encoding="utf-8") as f:
content = f.read()
if "认知类" in content:
category = "认知类"
elif "节奏类" in content:
category = "节奏类"
elif "表现类" in content:
category = "表现类"
elif "习惯类" in content:
category = "习惯类"
elif "综合类" in content:
category = "综合类"
except:
pass
try:
mtime = os.path.getmtime(filepath)
except:
mtime = 0
problems.append(
{
"id": problem_id,
"name": problem_name,
"category": category,
"file": filename,
"mtime": mtime,
}
)
return jsonify(sorted(problems, key=lambda x: x["id"]))
@main_bp.route("/api/problems/<problem_id>", methods=["GET"])
@login_required_json
def get_problem_detail(problem_id):
"""获取单个问题的详细信息"""
problems_dir = current_app.config.get("PROBLEMS_DIR")
if problems_dir and os.path.exists(problems_dir):
for filename in os.listdir(problems_dir):
if filename.startswith(f"{problem_id}_") and filename.endswith(".md"):
filepath = os.path.join(problems_dir, filename)
with open(filepath, "r", encoding="utf-8") as f:
content = f.read()
return jsonify(
{"id": problem_id, "filename": filename, "content": content}
)
return jsonify({"error": "问题不存在"}), 404
@main_bp.route("/api/problems", methods=["POST"])
@admin_required
def create_problem():
"""创建新问题 - 仅管理员"""
"""创建新问题"""
data = request.get_json()
problem_id = data.get("id", "").strip()
problem_name = data.get("name", "").strip()
category = data.get("category", "技术类")
if not problem_id or not problem_name:
return jsonify({"error": "ID和名称不能为空"}), 400
# 格式化ID
problem_id = problem_id.zfill(2)
problems_dir = current_app.config.get("PROBLEMS_DIR")
filename = f"{problem_id}_{problem_name}.md"
filepath = os.path.join(problems_dir, filename)
if os.path.exists(filepath):
return jsonify({"error": "问题已存在"}), 400
# 生成默认内容
content = f"""# {problem_name}
> 所属系列:钢琴学习常见问题针对性练习建议
> 配合目标体系使用,针对性补齐短板
---
## 问题表现
- 请描述具体表现症状
## 原因分析
- 可能的原因1
- 可能的原因2
## 针对性练习方案
### 日常基础练习
| 练习名称 | 时长 | 频率 | 目的 |
|---------|------|------|------|
| 练习1 | 10分钟 | 每天 | 目的1 |
| 练习2 | 5分钟 | 每天 | 目的2 |
### 具体操作
```
练习1:练习名称
- 步骤1
- 步骤2
- 步骤3
```
## 练习提醒
### ⚠️ 禁忌
- 禁忌1
- 禁忌2
### ✓ 正确做法
- 正确做法1
- 正确做法2
## 评估标准
| 等级 | 标准 |
|------|------|
| 入门 | 达到的标准 |
| 进阶 | 达到的标准 |
| 熟练 | 达到的标准 |
| 精通 | 达到的标准 |
---
> **版本**V1.0
> **创建时间**{datetime.now().strftime("%Y-%m-%d")}
> **适用场景**:成人钢琴集体课学员个性化辅导"""
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
return jsonify({"message": "创建成功", "id": problem_id, "name": problem_name})
@main_bp.route("/api/problems/<problem_id>", methods=["PUT"])
@login_required_json
def update_problem(problem_id):
"""更新问题"""
data = request.get_json()
new_name = data.get("name", "").strip()
new_content = data.get("content", "").strip()
if not new_name:
return jsonify({"error": "名称不能为空"}), 400
problems_dir = current_app.config.get("PROBLEMS_DIR")
# 找到旧文件
old_filename = None
for f in os.listdir(problems_dir):
if f.startswith(f"{problem_id}_") and f.endswith(".md"):
old_filename = f
break
if not old_filename:
return jsonify({"error": "问题不存在"}), 404
old_filepath = os.path.join(problems_dir, old_filename)
new_filename = f"{problem_id}_{new_name}.md"
new_filepath = os.path.join(problems_dir, new_filename)
# 如果名称改变,需要重命名文件
if old_filename != new_filename:
if os.path.exists(new_filepath):
return jsonify({"error": "同名问题已存在"}), 400
os.rename(old_filepath, new_filepath)
filepath = new_filepath
else:
filepath = old_filepath
# 更新内容
with open(filepath, "w", encoding="utf-8") as f:
f.write(new_content)
return jsonify({"message": "更新成功"})
@main_bp.route("/api/problems/<problem_id>", methods=["DELETE"])
@admin_required
def delete_problem(problem_id):
"""删除问题"""
problems_dir = current_app.config.get("PROBLEMS_DIR")
# 找到文件
filename = None
for f in os.listdir(problems_dir):
if f.startswith(f"{problem_id}_") and f.endswith(".md"):
filename = f
break
if not filename:
return jsonify({"error": "问题不存在"}), 404
filepath = os.path.join(problems_dir, filename)
# 移动到备份目录
trash_dir = os.path.join(problems_dir, "bk")
os.makedirs(trash_dir, exist_ok=True)
import time
backup_name = f"{filename.replace('.md', '')}_{int(time.time())}.md"
shutil.move(filepath, os.path.join(trash_dir, backup_name))
return jsonify({"message": "删除成功"})
# ==================== AI测试接口 ====================
@main_bp.route("/api/config/test", methods=["POST"])
@admin_required
def test_api_connection():
"""测试API连接"""
import requests
config = load_api_config(current_app.config)
provider = config.get("provider", "volcengine")
headers = {
"Authorization": f"Bearer {config.get('api_key', '')}",
"Content-Type": "application/json",
}
payload = {
"model": config.get("model", "doubao-seed-2.0-pro"),
"messages": [{"role": "user", "content": "你好"}],
"max_tokens": 50,
}
try:
# MiniMax 使用 Anthropic 格式的 API
if provider == "minimax":
endpoint = f"{config.get('base_url')}/messages"
payload["max_tokens"] = 50
else:
endpoint = f"{config.get('base_url')}/chat/completions"
response = requests.post(
endpoint,
headers=headers,
json=payload,
timeout=30,
)
if response.status_code == 200:
return jsonify({"success": True, "message": "连接成功"})
else:
return jsonify(
{"success": False, "error": f"API返回错误: {response.status_code}"}
)
except Exception as e:
return jsonify({"success": False, "error": str(e)})
+332
View File
@@ -0,0 +1,332 @@
# 学员管理路由
import csv
import io
from flask import (
request,
jsonify,
render_template,
session,
redirect,
url_for,
Response,
)
from app.routes import main_bp
from app.models import (
db,
Student,
PROBLEM_LIST,
SEVERITY_LEVELS,
PRACTICE_TIME_OPTIONS,
User,
Class,
)
from app.routes.auth import login_required_json, check_login
@main_bp.route("/")
def index():
"""首页 - 学员列表"""
if not check_login():
return redirect("/login")
return render_template(
"index.html",
problem_list=PROBLEM_LIST,
severity_levels=SEVERITY_LEVELS,
practice_time_options=PRACTICE_TIME_OPTIONS,
)
@main_bp.route("/api/students", methods=["GET"])
@login_required_json
def get_students():
"""获取学员列表"""
# 支持筛选参数: class_id, name(模糊搜索)
class_id = request.args.get("class_id")
name = request.args.get("name")
query = Student.query
if class_id:
query = query.filter(Student.class_id == int(class_id))
if name:
query = query.filter(Student.name.contains(name))
students = query.order_by(Student.created_at.desc()).all()
return jsonify([s.to_dict() for s in students])
@main_bp.route("/api/students", methods=["POST"])
@login_required_json
def create_student():
"""创建新学员"""
data = request.get_json()
student = Student(
name=data.get("name"),
phone=data.get("phone", ""),
wechat_nickname=data.get("wechat_nickname", ""),
practice_time=data.get("practice_time", "30分钟"),
notes=data.get("notes", ""),
class_id=data.get("class_id"),
)
db.session.add(student)
db.session.commit()
return jsonify(student.to_dict())
@main_bp.route("/api/students/<int:student_id>", methods=["GET"])
@login_required_json
def get_student(student_id):
"""获取学员详情"""
student = Student.query.get_or_404(student_id)
problems = student.problems.all()
plans = student.plans.order_by(db.desc("created_at")).all()
# 获取班级名称
class_name = None
if student.class_id:
cls = Class.query.get(student.class_id)
if cls:
class_name = cls.name
student_dict = student.to_dict()
student_dict["class_name"] = class_name
return jsonify(
{
"student": student_dict,
"problems": [p.to_dict() for p in problems],
"plans": [p.to_dict() for p in plans],
}
)
@main_bp.route("/api/students/<int:student_id>", methods=["PUT"])
@login_required_json
def update_student(student_id):
"""更新学员信息"""
student = Student.query.get_or_404(student_id)
data = request.get_json()
if "name" in data:
student.name = data["name"]
if "phone" in data:
student.phone = data["phone"]
if "wechat_nickname" in data:
student.wechat_nickname = data["wechat_nickname"]
if "practice_time" in data:
student.practice_time = data["practice_time"]
if "notes" in data:
student.notes = data["notes"]
if "class_id" in data:
student.class_id = data["class_id"]
db.session.commit()
return jsonify(student.to_dict())
@main_bp.route("/api/students/<int:student_id>", methods=["DELETE"])
@login_required_json
def delete_student(student_id):
"""删除学员"""
student = Student.query.get_or_404(student_id)
db.session.delete(student)
db.session.commit()
return jsonify({"message": "删除成功"})
@main_bp.route("/api/students/template")
@login_required_json
def download_student_template():
"""下载CSV导入模板"""
headers = ["ID", "姓名", "电话", "微信昵称", "每日练习时间", "班级", "备注"]
# 练习时间选项
practice_times = PRACTICE_TIME_OPTIONS
# 示例数据(ID留空,表示新增)
example = ["", "张三", "13800138000", "昵称", "30分钟", "班级A", "备注信息"]
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(headers)
writer.writerow(example)
# 添加22行空行,让可选值出现在第25-26行
for _ in range(22):
writer.writerow([])
# 获取进行中的班级列表
active_classes = Class.query.filter_by(active=True).all()
class_names = [c.name for c in active_classes] if active_classes else []
# 添加可选练习时间和可选班级说明(第25-26行)
writer.writerow(["可选练习时间:"] + list(practice_times))
if class_names:
writer.writerow(["可选班级:"] + class_names)
else:
writer.writerow(["可选班级:无进行中班级"])
# 使用BytesIO处理编码
csv_content = output.getvalue().encode("utf-8-sig")
return Response(
csv_content,
mimetype="text/csv; charset=utf-8-sig",
headers={"Content-Disposition": "attachment; filename=student_template.csv"},
)
@main_bp.route("/api/students/export")
@login_required_json
def export_students():
"""导出学员CSV"""
# 获取所有学员
students = Student.query.order_by(Student.created_at.desc()).all()
# 班级ID到名称映射
class_map = {c.id: c.name for c in Class.query.all()}
headers = ["ID", "姓名", "电话", "微信昵称", "每日练习时间", "班级", "备注"]
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(headers)
for s in students:
class_name = class_map.get(s.class_id, "") if s.class_id else ""
writer.writerow(
[
s.id,
s.name,
s.phone or "",
s.wechat_nickname or "",
s.practice_time,
class_name,
s.notes or "",
]
)
csv_content = output.getvalue().encode("utf-8-sig")
return Response(
csv_content,
mimetype="text/csv; charset=utf-8-sig",
headers={"Content-Disposition": "attachment; filename=students_export.csv"},
)
@main_bp.route("/api/students/import", methods=["POST"])
@login_required_json
def import_students():
"""导入学员CSV"""
if "file" not in request.files:
return jsonify({"error": "请选择文件"}), 400
file = request.files["file"]
if file.filename == "":
return jsonify({"error": "请选择文件"}), 400
if not file.filename.endswith(".csv"):
return jsonify({"error": "请上传CSV文件"}), 400
try:
raw_content = file.read()
# 尝试多种编码
try:
content = raw_content.decode("utf-8-sig")
except UnicodeDecodeError:
try:
content = raw_content.decode("gbk")
except UnicodeDecodeError:
content = raw_content.decode("utf-8", errors="ignore")
reader = csv.DictReader(io.StringIO(content))
success_count = 0
error_count = 0
errors = []
# 获取班级名称到ID的映射
class_map = {c.name: c.id for c in Class.query.all()}
# 练习时间列表
practice_time_list = PRACTICE_TIME_OPTIONS
for row_num, row in enumerate(reader, start=2):
# 检查是否为空白行(所有字段都为空)
is_empty = all(not (v and v.strip()) for v in row.values())
if is_empty:
# 遇到空行后停止导入(空行后是可选值说明)
break
# 跳过说明行
if row.get("姓名", "").strip() in ["可选练习时间:", "可选班级:"]:
continue
try:
name = row.get("姓名", "").strip()
phone = row.get("电话", "").strip()
wechat_nickname = row.get("微信昵称", "").strip()
practice_time = row.get("每日练习时间", "30分钟").strip()
class_name = row.get("班级", "").strip()
notes = row.get("备注", "").strip()
if not name:
continue
# 验证并转换练习时间
if practice_time not in practice_time_list:
practice_time = "30分钟"
# 查找班级ID
class_id = None
if class_name:
class_id = class_map.get(class_name)
# 优先用ID查找,否则用姓名+电话
existing = None
if row.get("ID", "").strip():
try:
student_id = int(row.get("ID", "").strip())
existing = Student.query.get(student_id)
except (ValueError, TypeError):
pass
# 如果没找到,尝试用姓名+电话
if not existing:
existing = Student.query.filter_by(
name=name, phone=phone if phone else None
).first()
if existing:
# 更新已存在的学员(所有字段都允许更新)
existing.name = name
existing.phone = phone
existing.wechat_nickname = wechat_nickname
existing.practice_time = practice_time
existing.class_id = class_id
existing.notes = notes
else:
# 创建新学员
student = Student(
name=name,
phone=phone,
wechat_nickname=wechat_nickname,
practice_time=practice_time,
notes=notes,
class_id=class_id,
)
db.session.add(student)
success_count += 1
except Exception as e:
error_count += 1
errors.append(f"{row_num}行: {str(e)}")
db.session.commit()
result = {"message": f"导入成功: {success_count}"}
if error_count > 0:
result["error_details"] = errors[:10] # 只返回前10条错误
return jsonify(result)
except Exception as e:
return jsonify({"error": f"导入失败: {str(e)}"}), 500
+175
View File
@@ -0,0 +1,175 @@
# 模板管理路由
from flask import Blueprint, request, jsonify
from app.models import db, Template
from app.routes.auth import admin_required
templates_bp = Blueprint('templates', __name__, url_prefix='/templates')
# 默认模板
DEFAULT_TEMPLATES = {
"ai_prompt": {
"name": "AI提示词模板",
"type": "ai_prompt",
"description": "生成练习方案时发送给AI的提示词",
"sort_order": 0,
"content": """你是一位资深的钢琴教师。请根据学员的具体问题详情,生成一份个性化练习方案报告。
## 学员基本信息
- **姓名**: {student_name}
- **微信昵称**: {wechat_nickname}
- **每日可练习时间**: {practice_time}
## 学员被诊断的问题
{student_problems}
## 每个问题的详细信息和练习方法(请务必基于这些内容生成方案)
{problems}
## 任务要求
请根据上述学员的问题诊断和详细信息,生成一份针对性的练习方案报告:
1. 先简述该学员当前存在的主要问题
2. 给出一个每日练习安排建议(你可以根据问题特点灵活安排热身、技术练习、曲目练习等环节)
3. 针对每个问题给出具体的日常练习方法
4. 给出3-5条重点注意事项
请使用Markdown格式,语言专业、简洁、有鼓励性。""",
},
"report": {
"name": "报告导出模板",
"type": "report",
"description": "导出方案时使用的Markdown模板",
"sort_order": 0,
"content": """# 钢琴练习方案 - {student_name}
**练习时间**: {practice_time} (共{total_minutes}分钟)
**生成时间**: {generated_at}
---
## AI个性化报告
{ai_report}
## 问题诊断
{problem_tags}
## 每日练习计划
{schedule_table}
---
*坚持练习 · 必有进步*""",
}
}
def init_default_templates():
"""初始化默认模板(如果不存在)"""
for key, tmpl in DEFAULT_TEMPLATES.items():
existing = Template.query.filter_by(name=tmpl["name"]).first()
if not existing:
t = Template(
name=tmpl["name"],
type=tmpl["type"],
content=tmpl["content"],
description=tmpl["description"],
sort_order=tmpl.get("sort_order", 0)
)
db.session.add(t)
db.session.commit()
@templates_bp.route("/")
@admin_required
def templates_page():
"""模板管理页面"""
from flask import render_template
return render_template("templates.html")
@templates_bp.route("/templates", methods=["GET"])
@admin_required
def get_templates():
"""获取所有模板(按sort_order排序,可按type筛选)"""
query = Template.query.order_by(Template.sort_order.asc())
template_type = request.args.get('type')
if template_type:
query = query.filter_by(type=template_type)
templates = query.all()
return jsonify([t.to_dict() for t in templates])
@templates_bp.route("/templates/<int:template_id>", methods=["GET"])
@admin_required
def get_template(template_id):
"""获取单个模板"""
tmpl = Template.query.get_or_404(template_id)
return jsonify(tmpl.to_dict())
@templates_bp.route("/templates", methods=["POST"])
@admin_required
def create_template():
"""创建模板"""
data = request.get_json()
tmpl = Template(
name=data.get("name"),
type=data.get("type"),
content=data.get("content"),
description=data.get("description", ""),
sort_order=data.get("sort_order", 0)
)
db.session.add(tmpl)
db.session.commit()
return jsonify(tmpl.to_dict()), 201
@templates_bp.route("/templates/<int:template_id>", methods=["PUT"])
@admin_required
def update_template(template_id):
"""更新模板"""
tmpl = Template.query.get_or_404(template_id)
data = request.get_json()
if "content" in data:
tmpl.content = data["content"]
if "description" in data:
tmpl.description = data["description"]
if "name" in data:
tmpl.name = data["name"]
if "sort_order" in data:
tmpl.sort_order = data["sort_order"]
db.session.commit()
return jsonify(tmpl.to_dict())
@templates_bp.route("/templates/<int:template_id>", methods=["DELETE"])
@admin_required
def delete_template(template_id):
"""删除模板"""
tmpl = Template.query.get_or_404(template_id)
db.session.delete(tmpl)
db.session.commit()
return jsonify({"message": "删除成功"})
@templates_bp.route("/templates/<string:template_type>/render", methods=["POST"])
@admin_required
def render_template_preview(template_type):
"""渲染模板(用于预览)"""
data = request.get_json()
tmpl = Template.query.filter_by(type=template_type).first()
if not tmpl:
return jsonify({"error": "模板不存在"}), 404
content = tmpl.content
# 替换变量
for key, value in data.items():
placeholder = "{" + key + "}"
content = content.replace(placeholder, str(value))
return jsonify({"rendered": content})
+140
View File
@@ -0,0 +1,140 @@
# 用户管理路由
from flask import request, jsonify, render_template, session
from app.routes import main_bp
from app.models import db, User
from app.routes.auth import login_required_json, admin_required
@main_bp.route("/users")
@admin_required
def users_page():
"""用户管理页面"""
return render_template("users.html")
@main_bp.route("/api/users", methods=["GET"])
@admin_required
def api_users_list():
"""用户列表"""
users = User.query.order_by(User.created_at.desc()).all()
return jsonify([u.to_dict() for u in users])
@main_bp.route("/api/users", methods=["POST"])
@admin_required
def api_users_create():
"""新增用户"""
data = request.get_json()
username = data.get("username", "").strip()
password = data.get("password", "")
role = data.get("role", "user")
if not username or not password:
return jsonify({"error": "请输入用户名和密码"}), 400
if User.query.filter_by(username=username).first():
return jsonify({"error": "用户名已存在"}), 400
if role not in ["admin", "user"]:
return jsonify({"error": "无效的角色"}), 400
try:
user = User(username=username, role=role)
user.set_password(password)
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict())
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
db.session.rollback()
return jsonify({"error": "创建失败: " + str(e)}), 500
@main_bp.route("/api/users/<int:user_id>", methods=["PUT"])
@admin_required
def api_users_update(user_id):
"""编辑用户(仅管理员可改角色)"""
user = User.query.get_or_404(user_id)
data = request.get_json()
if "role" in data:
if data["role"] in ["admin", "user"]:
user.role = data["role"]
try:
db.session.commit()
return jsonify(user.to_dict())
except Exception as e:
db.session.rollback()
return jsonify({"error": "更新失败: " + str(e)}), 500
@main_bp.route("/api/users/<int:user_id>", methods=["DELETE"])
@admin_required
def api_users_delete(user_id):
"""删除用户"""
if user_id == session.get("user_id"):
return jsonify({"error": "不能删除自己"}), 400
user = User.query.get_or_404(user_id)
try:
db.session.delete(user)
db.session.commit()
return jsonify({"message": "删除成功"})
except Exception as e:
db.session.rollback()
return jsonify({"error": "删除失败: " + str(e)}), 500
@main_bp.route("/api/users/<int:user_id>/reset-password", methods=["POST"])
@admin_required
def api_users_reset_password(user_id):
"""重置用户密码(管理员无需知道原密码)"""
user = User.query.get_or_404(user_id)
data = request.get_json()
new_password = data.get("new_password", "")
if not new_password:
return jsonify({"error": "请输入新密码"}), 400
try:
user.set_password(new_password)
db.session.commit()
return jsonify({"message": "密码重置成功"})
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
db.session.rollback()
return jsonify({"error": "重置失败: " + str(e)}), 500
@main_bp.route("/api/users/change-password", methods=["POST"])
@login_required_json
def api_users_change_password():
"""修改当前用户密码"""
user = User.query.get(session.get("user_id"))
data = request.get_json()
old_password = data.get("old_password", "")
new_password = data.get("new_password", "")
confirm_password = data.get("confirm_password", "")
if not old_password or not new_password:
return jsonify({"error": "请填写完整"}), 400
if not user.check_password(old_password):
return jsonify({"error": "原密码错误"}), 400
if new_password != confirm_password:
return jsonify({"error": "两次密码输入不一致"}), 400
try:
user.set_password(new_password)
db.session.commit()
return jsonify({"message": "密码修改成功"})
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
db.session.rollback()
return jsonify({"error": "修改失败: " + str(e)}), 500