314 lines
10 KiB
Python
314 lines
10 KiB
Python
# 备份与操作日志管理
|
|
|
|
import os
|
|
import json
|
|
import shutil
|
|
from datetime import datetime
|
|
from flask import request, jsonify, current_app, session
|
|
from app.routes import main_bp
|
|
from app.models import db, Student, Class, User, PROBLEM_LIST
|
|
from app.routes.auth import login_required_json, admin_required
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# 备份目录
|
|
BACKUP_DIR = os.path.join(
|
|
os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "bk"
|
|
)
|
|
LOGS_DIR = os.path.join(BACKUP_DIR, "logs")
|
|
PROBLEMS_DIR = os.path.join(BACKUP_DIR, "problems")
|
|
|
|
|
|
def ensure_dirs():
|
|
"""确保备份目录存在"""
|
|
os.makedirs(LOGS_DIR, exist_ok=True)
|
|
os.makedirs(PROBLEMS_DIR, exist_ok=True)
|
|
|
|
|
|
def log_operation(action, operator, target_type, target_id, detail=""):
|
|
"""记录操作日志"""
|
|
ensure_dirs()
|
|
log_entry = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"action": action,
|
|
"operator": operator,
|
|
"target_type": target_type,
|
|
"target_id": target_id,
|
|
"detail": detail,
|
|
"ip": request.remote_addr if request else None,
|
|
}
|
|
log_file = os.path.join(LOGS_DIR, f"{datetime.now().strftime('%Y-%m-%d')}.jsonl")
|
|
with open(log_file, "a", encoding="utf-8") as f:
|
|
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
|
|
return log_entry
|
|
|
|
|
|
def get_backup_list():
|
|
"""获取备份列表"""
|
|
ensure_dirs()
|
|
backups = []
|
|
if os.path.exists(BACKUP_DIR):
|
|
for name in os.listdir(BACKUP_DIR):
|
|
path = os.path.join(BACKUP_DIR, name)
|
|
if os.path.isdir(path) and name.startswith("backup_"):
|
|
meta_file = os.path.join(path, "meta.json")
|
|
meta = {}
|
|
if os.path.exists(meta_file):
|
|
with open(meta_file, encoding="utf-8") as f:
|
|
meta = json.load(f)
|
|
backups.append(
|
|
{
|
|
"id": name,
|
|
"created_at": meta.get("created_at", ""),
|
|
"operator": meta.get("operator", ""),
|
|
"description": meta.get("description", ""),
|
|
"includes": meta.get("includes", []),
|
|
}
|
|
)
|
|
return sorted(backups, key=lambda x: x["created_at"], reverse=True)
|
|
|
|
|
|
@main_bp.route("/api/backup", methods=["POST"])
|
|
@login_required_json
|
|
@admin_required
|
|
def create_backup():
|
|
"""创建数据备份"""
|
|
ensure_dirs()
|
|
data = request.get_json() or {}
|
|
description = data.get("description", "")
|
|
|
|
# 创建备份目录
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
backup_id = f"backup_{timestamp}"
|
|
backup_path = os.path.join(BACKUP_DIR, backup_id)
|
|
os.makedirs(backup_path, exist_ok=True)
|
|
|
|
operator = session.get("username", "unknown")
|
|
|
|
# 备份各类数据
|
|
includes = []
|
|
|
|
# 1. 备份班级
|
|
classes = Class.query.all()
|
|
classes_data = [c.to_dict() for c in classes]
|
|
with open(os.path.join(backup_path, "classes.json"), "w", encoding="utf-8") as f:
|
|
json.dump(classes_data, f, ensure_ascii=False, indent=2)
|
|
includes.append("classes")
|
|
|
|
# 2. 备份学员
|
|
students = Student.query.all()
|
|
students_data = []
|
|
for s in students:
|
|
students_data.append(
|
|
{
|
|
"id": s.id,
|
|
"name": s.name,
|
|
"phone": s.phone,
|
|
"wechat_nickname": s.wechat_nickname,
|
|
"practice_time": s.practice_time,
|
|
"class_id": s.class_id,
|
|
"notes": s.notes,
|
|
"created_at": s.created_at.isoformat() if s.created_at else None,
|
|
}
|
|
)
|
|
with open(os.path.join(backup_path, "students.json"), "w", encoding="utf-8") as f:
|
|
json.dump(students_data, f, ensure_ascii=False, indent=2)
|
|
includes.append("students")
|
|
|
|
# 3. 备份问题配置
|
|
problems_backup_dir = os.path.join(backup_path, "problems")
|
|
os.makedirs(problems_backup_dir, exist_ok=True)
|
|
|
|
# 读取问题文件
|
|
problems_dir = current_app.config.get("PROBLEMS_DIR")
|
|
if problems_dir and os.path.exists(problems_dir):
|
|
for filename in os.listdir(problems_dir):
|
|
if filename.endswith(".md"):
|
|
src = os.path.join(problems_dir, filename)
|
|
dest = os.path.join(problems_backup_dir, filename)
|
|
shutil.copy2(src, dest)
|
|
includes.append("problems")
|
|
|
|
# 4. 备份配置文件
|
|
config_path = os.path.join(
|
|
os.path.dirname(os.path.dirname(__file__)), "config", "api_config.json"
|
|
)
|
|
if os.path.exists(config_path):
|
|
shutil.copy2(config_path, os.path.join(backup_path, "api_config.json"))
|
|
includes.append("config")
|
|
|
|
# 写入元数据
|
|
meta = {
|
|
"backup_id": backup_id,
|
|
"created_at": datetime.now().isoformat(),
|
|
"operator": operator,
|
|
"description": description,
|
|
"includes": includes,
|
|
}
|
|
with open(os.path.join(backup_path, "meta.json"), "w", encoding="utf-8") as f:
|
|
json.dump(meta, f, ensure_ascii=False, indent=2)
|
|
|
|
# 记录操作日志
|
|
log_operation(
|
|
"backup_create", operator, "system", backup_id, f"创建备份: {description}"
|
|
)
|
|
|
|
return jsonify(
|
|
{"message": "备份成功", "backup_id": backup_id, "includes": includes}
|
|
)
|
|
|
|
|
|
@main_bp.route("/api/backup", methods=["GET"])
|
|
@login_required_json
|
|
def list_backups():
|
|
"""获取备份列表"""
|
|
backups = get_backup_list()
|
|
return jsonify(backups)
|
|
|
|
|
|
@main_bp.route("/api/backup/<backup_id>", methods=["GET"])
|
|
@login_required_json
|
|
def get_backup(backup_id):
|
|
"""查看备份内容"""
|
|
backup_path = os.path.join(BACKUP_DIR, backup_id)
|
|
if not os.path.exists(backup_path):
|
|
return jsonify({"error": "备份不存在"}), 404
|
|
|
|
# 读取元数据
|
|
meta_file = os.path.join(backup_path, "meta.json")
|
|
meta = {}
|
|
if os.path.exists(meta_file):
|
|
with open(meta_file, encoding="utf-8") as f:
|
|
meta = json.load(f)
|
|
|
|
# 读取各类数据
|
|
data = {"meta": meta}
|
|
|
|
for fname in ["classes.json", "students.json", "api_config.json"]:
|
|
fpath = os.path.join(backup_path, fname)
|
|
if os.path.exists(fpath):
|
|
with open(fpath, encoding="utf-8") as f:
|
|
data[fname.replace(".json", "")] = json.load(f)
|
|
|
|
# 问题文件列表
|
|
problems_dir = os.path.join(backup_path, "problems")
|
|
if os.path.exists(problems_dir):
|
|
data["problems"] = os.listdir(problems_dir)
|
|
|
|
return jsonify(data)
|
|
|
|
|
|
@main_bp.route("/api/backup/<backup_id>/restore", methods=["POST"])
|
|
@login_required_json
|
|
@admin_required
|
|
def restore_backup(backup_id):
|
|
"""恢复备份(危险操作)"""
|
|
data = request.get_json() or {}
|
|
confirm_text = data.get("confirm", "")
|
|
|
|
# 安全确认
|
|
if confirm_text != "确认恢复":
|
|
return jsonify({"error": "请输入 '确认恢复' 以确认此操作"}), 400
|
|
|
|
backup_path = os.path.join(BACKUP_DIR, backup_id)
|
|
if not os.path.exists(backup_path):
|
|
return jsonify({"error": "备份不存在"}), 404
|
|
|
|
operator = session.get("username", "unknown")
|
|
|
|
# 先创建当前数据的自动备份
|
|
auto_backup_id = f"auto_before_restore_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
auto_backup_path = os.path.join(BACKUP_DIR, auto_backup_id)
|
|
shutil.copytree(backup_path, auto_backup_path)
|
|
|
|
# 恢复各类数据
|
|
restored = []
|
|
|
|
# 1. 恢复班级
|
|
classes_file = os.path.join(backup_path, "classes.json")
|
|
if os.path.exists(classes_file):
|
|
with open(classes_file, encoding="utf-8") as f:
|
|
classes_data = json.load(f)
|
|
# 清空并重建班级
|
|
Class.query.delete()
|
|
for c in classes_data:
|
|
cls = Class(
|
|
id=c.get("id"),
|
|
name=c.get("name"),
|
|
description=c.get("description"),
|
|
active=c.get("active", True),
|
|
)
|
|
db.session.add(cls)
|
|
db.session.commit()
|
|
restored.append("classes")
|
|
|
|
# 2. 恢复学员
|
|
students_file = os.path.join(backup_path, "students.json")
|
|
if os.path.exists(students_file):
|
|
with open(students_file, encoding="utf-8") as f:
|
|
students_data = json.load(f)
|
|
# 清空并重建学员
|
|
Student.query.delete()
|
|
for s in students_data:
|
|
student = Student(
|
|
id=s.get("id"),
|
|
name=s.get("name"),
|
|
phone=s.get("phone"),
|
|
wechat_nickname=s.get("wechat_nickname"),
|
|
practice_time=s.get("practice_time", "30分钟"),
|
|
class_id=s.get("class_id"),
|
|
notes=s.get("notes"),
|
|
)
|
|
db.session.add(student)
|
|
db.session.commit()
|
|
restored.append("students")
|
|
|
|
# 3. 恢复问题配置
|
|
problems_src = os.path.join(backup_path, "problems")
|
|
if os.path.exists(problems_src):
|
|
# 获取目标问题目录
|
|
problem_dir = current_app.config.get("PROBLEMS_DIR")
|
|
if problem_dir and os.path.exists(problem_dir):
|
|
# 清空目标目录
|
|
for f in os.listdir(problem_dir):
|
|
if f.endswith(".md"):
|
|
os.remove(os.path.join(problem_dir, f))
|
|
# 复制备份的问题文件
|
|
for f in os.listdir(problems_src):
|
|
if f.endswith(".md"):
|
|
shutil.copy2(os.path.join(problems_src, f), problem_dir)
|
|
restored.append("problems")
|
|
|
|
# 记录操作日志
|
|
log_operation(
|
|
"backup_restore",
|
|
operator,
|
|
"system",
|
|
backup_id,
|
|
f"恢复备份: {backup_id}, 已自动备份当前数据到: {auto_backup_id}",
|
|
)
|
|
|
|
return jsonify(
|
|
{"message": "恢复成功", "restored": restored, "auto_backup": auto_backup_id}
|
|
)
|
|
|
|
|
|
@main_bp.route("/api/logs", methods=["GET"])
|
|
@login_required_json
|
|
@admin_required
|
|
def get_logs():
|
|
"""获取操作日志"""
|
|
date = request.args.get("date", datetime.now().strftime("%Y-%m-%d"))
|
|
log_file = os.path.join(LOGS_DIR, f"{date}.jsonl")
|
|
|
|
if not os.path.exists(log_file):
|
|
return jsonify([])
|
|
|
|
logs = []
|
|
with open(log_file, encoding="utf-8") as f:
|
|
for line in f:
|
|
if line.strip():
|
|
logs.append(json.loads(line))
|
|
return jsonify(logs)
|