# 备份与操作日志管理 import os import json import shutil from datetime import datetime from flask import request, jsonify, current_app, session from app.routes import main_bp from app.models import db, Student, Class, User, PROBLEM_LIST from app.routes.auth import login_required_json, admin_required import logging logger = logging.getLogger(__name__) # 备份目录 BACKUP_DIR = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "bk" ) LOGS_DIR = os.path.join(BACKUP_DIR, "logs") PROBLEMS_DIR = os.path.join(BACKUP_DIR, "problems") def ensure_dirs(): """确保备份目录存在""" os.makedirs(LOGS_DIR, exist_ok=True) os.makedirs(PROBLEMS_DIR, exist_ok=True) def log_operation(action, operator, target_type, target_id, detail=""): """记录操作日志""" ensure_dirs() log_entry = { "timestamp": datetime.now().isoformat(), "action": action, "operator": operator, "target_type": target_type, "target_id": target_id, "detail": detail, "ip": request.remote_addr if request else None, } log_file = os.path.join(LOGS_DIR, f"{datetime.now().strftime('%Y-%m-%d')}.jsonl") with open(log_file, "a", encoding="utf-8") as f: f.write(json.dumps(log_entry, ensure_ascii=False) + "\n") return log_entry def get_backup_list(): """获取备份列表""" ensure_dirs() backups = [] if os.path.exists(BACKUP_DIR): for name in os.listdir(BACKUP_DIR): path = os.path.join(BACKUP_DIR, name) if os.path.isdir(path) and name.startswith("backup_"): meta_file = os.path.join(path, "meta.json") meta = {} if os.path.exists(meta_file): with open(meta_file, encoding="utf-8") as f: meta = json.load(f) backups.append( { "id": name, "created_at": meta.get("created_at", ""), "operator": meta.get("operator", ""), "description": meta.get("description", ""), "includes": meta.get("includes", []), } ) return sorted(backups, key=lambda x: x["created_at"], reverse=True) @main_bp.route("/api/backup", methods=["POST"]) @login_required_json @admin_required def create_backup(): """创建数据备份""" ensure_dirs() data = request.get_json() or {} description = data.get("description", "") # 创建备份目录 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_id = f"backup_{timestamp}" backup_path = os.path.join(BACKUP_DIR, backup_id) os.makedirs(backup_path, exist_ok=True) operator = session.get("username", "unknown") # 备份各类数据 includes = [] # 1. 备份班级 classes = Class.query.all() classes_data = [c.to_dict() for c in classes] with open(os.path.join(backup_path, "classes.json"), "w", encoding="utf-8") as f: json.dump(classes_data, f, ensure_ascii=False, indent=2) includes.append("classes") # 2. 备份学员 students = Student.query.all() students_data = [] for s in students: students_data.append( { "id": s.id, "name": s.name, "phone": s.phone, "wechat_nickname": s.wechat_nickname, "practice_time": s.practice_time, "class_id": s.class_id, "notes": s.notes, "created_at": s.created_at.isoformat() if s.created_at else None, } ) with open(os.path.join(backup_path, "students.json"), "w", encoding="utf-8") as f: json.dump(students_data, f, ensure_ascii=False, indent=2) includes.append("students") # 3. 备份问题配置 problems_backup_dir = os.path.join(backup_path, "problems") os.makedirs(problems_backup_dir, exist_ok=True) # 读取问题文件 problems_dir = current_app.config.get("PROBLEMS_DIR") if problems_dir and os.path.exists(problems_dir): for filename in os.listdir(problems_dir): if filename.endswith(".md"): src = os.path.join(problems_dir, filename) dest = os.path.join(problems_backup_dir, filename) shutil.copy2(src, dest) includes.append("problems") # 4. 备份配置文件 config_path = os.path.join( os.path.dirname(os.path.dirname(__file__)), "config", "api_config.json" ) if os.path.exists(config_path): shutil.copy2(config_path, os.path.join(backup_path, "api_config.json")) includes.append("config") # 写入元数据 meta = { "backup_id": backup_id, "created_at": datetime.now().isoformat(), "operator": operator, "description": description, "includes": includes, } with open(os.path.join(backup_path, "meta.json"), "w", encoding="utf-8") as f: json.dump(meta, f, ensure_ascii=False, indent=2) # 记录操作日志 log_operation( "backup_create", operator, "system", backup_id, f"创建备份: {description}" ) return jsonify( {"message": "备份成功", "backup_id": backup_id, "includes": includes} ) @main_bp.route("/api/backup", methods=["GET"]) @login_required_json def list_backups(): """获取备份列表""" backups = get_backup_list() return jsonify(backups) @main_bp.route("/api/backup/", methods=["GET"]) @login_required_json def get_backup(backup_id): """查看备份内容""" backup_path = os.path.join(BACKUP_DIR, backup_id) if not os.path.exists(backup_path): return jsonify({"error": "备份不存在"}), 404 # 读取元数据 meta_file = os.path.join(backup_path, "meta.json") meta = {} if os.path.exists(meta_file): with open(meta_file, encoding="utf-8") as f: meta = json.load(f) # 读取各类数据 data = {"meta": meta} for fname in ["classes.json", "students.json", "api_config.json"]: fpath = os.path.join(backup_path, fname) if os.path.exists(fpath): with open(fpath, encoding="utf-8") as f: data[fname.replace(".json", "")] = json.load(f) # 问题文件列表 problems_dir = os.path.join(backup_path, "problems") if os.path.exists(problems_dir): data["problems"] = os.listdir(problems_dir) return jsonify(data) @main_bp.route("/api/backup//restore", methods=["POST"]) @login_required_json @admin_required def restore_backup(backup_id): """恢复备份(危险操作)""" data = request.get_json() or {} confirm_text = data.get("confirm", "") # 安全确认 if confirm_text != "确认恢复": return jsonify({"error": "请输入 '确认恢复' 以确认此操作"}), 400 backup_path = os.path.join(BACKUP_DIR, backup_id) if not os.path.exists(backup_path): return jsonify({"error": "备份不存在"}), 404 operator = session.get("username", "unknown") # 先创建当前数据的自动备份 auto_backup_id = f"auto_before_restore_{datetime.now().strftime('%Y%m%d_%H%M%S')}" auto_backup_path = os.path.join(BACKUP_DIR, auto_backup_id) shutil.copytree(backup_path, auto_backup_path) # 恢复各类数据 restored = [] # 1. 恢复班级 classes_file = os.path.join(backup_path, "classes.json") if os.path.exists(classes_file): with open(classes_file, encoding="utf-8") as f: classes_data = json.load(f) # 清空并重建班级 Class.query.delete() for c in classes_data: cls = Class( id=c.get("id"), name=c.get("name"), description=c.get("description"), active=c.get("active", True), ) db.session.add(cls) db.session.commit() restored.append("classes") # 2. 恢复学员 students_file = os.path.join(backup_path, "students.json") if os.path.exists(students_file): with open(students_file, encoding="utf-8") as f: students_data = json.load(f) # 清空并重建学员 Student.query.delete() for s in students_data: student = Student( id=s.get("id"), name=s.get("name"), phone=s.get("phone"), wechat_nickname=s.get("wechat_nickname"), practice_time=s.get("practice_time", "30分钟"), class_id=s.get("class_id"), notes=s.get("notes"), ) db.session.add(student) db.session.commit() restored.append("students") # 3. 恢复问题配置 problems_src = os.path.join(backup_path, "problems") if os.path.exists(problems_src): # 获取目标问题目录 problem_dir = current_app.config.get("PROBLEMS_DIR") if problem_dir and os.path.exists(problem_dir): # 清空目标目录 for f in os.listdir(problem_dir): if f.endswith(".md"): os.remove(os.path.join(problem_dir, f)) # 复制备份的问题文件 for f in os.listdir(problems_src): if f.endswith(".md"): shutil.copy2(os.path.join(problems_src, f), problem_dir) restored.append("problems") # 记录操作日志 log_operation( "backup_restore", operator, "system", backup_id, f"恢复备份: {backup_id}, 已自动备份当前数据到: {auto_backup_id}", ) return jsonify( {"message": "恢复成功", "restored": restored, "auto_backup": auto_backup_id} ) @main_bp.route("/api/logs", methods=["GET"]) @login_required_json @admin_required def get_logs(): """获取操作日志""" date = request.args.get("date", datetime.now().strftime("%Y-%m-%d")) log_file = os.path.join(LOGS_DIR, f"{date}.jsonl") if not os.path.exists(log_file): return jsonify([]) logs = [] with open(log_file, encoding="utf-8") as f: for line in f: if line.strip(): logs.append(json.loads(line)) return jsonify(logs)