fix: system_audit/stale_push_wlin/refresh_mtf → DB, remove final JSON refs

This commit is contained in:
知微
2026-07-03 23:19:32 +08:00
parent 9bc0cca174
commit 8e487f400f
3 changed files with 1233 additions and 1242 deletions
+94 -98
View File
@@ -1,98 +1,94 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""多周期缓存刷新脚本 — 在开盘前预填充K线数据 """多周期缓存刷新脚本 — 在开盘前预填充K线数据
为所有持仓+自选股预先拉取日/周/月K线,写入 multi_tf_cache.json 为所有持仓+自选股预先拉取日/周/月K线,写入 multi_tf_cache.json
这样收盘后全量重评(regenerate_all)运行时K线数据已有缓存,无需逐个拉取。 这样收盘后全量重评(regenerate_all)运行时K线数据已有缓存,无需逐个拉取。
运行时间:每天9:00(开盘前),no_agent模式。 运行时间:每天9:00(开盘前),no_agent模式。
无输出 = 成功(避免每天收到无意义消息)。 无输出 = 成功(避免每天收到无意义消息)。
""" """
import sys import sys
import os import os
import json import json
from datetime import datetime from datetime import datetime
# 确保能找到 web-dashboard 模块 # 确保能找到 web-dashboard 模块
sys.path.insert(0, "/home/hmo/web-dashboard") sys.path.insert(0, "/home/hmo/web-dashboard")
# 控制台UTC日志 # 控制台UTC日志
def log(msg): def log(msg):
ts = datetime.utcnow().strftime("%H:%M:%S") ts = datetime.utcnow().strftime("%H:%M:%S")
print(f"[{ts}] {msg}", file=sys.stderr) print(f"[{ts}] {msg}", file=sys.stderr)
def main(): def main():
from strategy_lifecycle import safe_json_load, PORTFOLIO_PATH, WATCHLIST_PATH from strategy_lifecycle import safe_json_load, PORTFOLIO_PATH, WATCHLIST_PATH
# 收集所有股票代码 # 收集所有股票代码
codes = [] codes = []
for item in safe_json_load(PORTFOLIO_PATH, {}).get("holdings", []): for item in safe_json_load(PORTFOLIO_PATH, {}).get("holdings", []):
code = item.get("code", "") code = item.get("code", "")
if code: if code:
codes.append(("portfolio", code)) codes.append(("portfolio", code))
seen = set(c[1] for c in codes) seen = set(c[1] for c in codes)
for item in safe_json_load(WATCHLIST_PATH, {}).get("stocks", []): for item in safe_json_load(WATCHLIST_PATH, {}).get("stocks", []):
code = item.get("code", "") code = item.get("code", "")
if code and code not in seen: if code and code not in seen:
codes.append(("watchlist", code)) codes.append(("watchlist", code))
seen.add(code) seen.add(code)
# 加入指数代码(用于多周期趋势研判) # 加入指数代码(用于多周期趋势研判)
INDEXES = { INDEXES = {
"sh000001": "上证指数", "sz399001": "深证成指", "sh000001": "上证指数", "sz399001": "深证成指",
"sz399006": "创业板指", "sh000688": "科创50", "sz399006": "创业板指", "sh000688": "科创50",
"hkHSI": "恒生指数", "hkHSTECH": "恒生科技", "hkHSI": "恒生指数", "hkHSTECH": "恒生科技",
} }
for idx_code in INDEXES: for idx_code in INDEXES:
if idx_code not in seen: if idx_code not in seen:
codes.append(("index", idx_code)) codes.append(("index", idx_code))
seen.add(idx_code) seen.add(idx_code)
log(f"Pre-populating multi-timeframe cache for {len(codes)} stocks...") log(f"Pre-populating multi-timeframe cache for {len(codes)} stocks...")
# 检查当前缓存,只更新需要更新的 # 从 DB 读取现有缓存(替代 multi_tf_cache.json
mtf_cache_path = "/home/hmo/web-dashboard/data/multi_tf_cache.json" from multi_timeframe import _load_mtf_cache, _save_mtf_cache
try: existing = _load_mtf_cache()
with open(mtf_cache_path) as f:
existing = json.load(f) import time
except (FileNotFoundError, json.JSONDecodeError): from multi_timeframe import full_multi_tf_analysis
existing = {}
cached = 0
import time fetched = 0
from multi_timeframe import full_multi_tf_analysis errors = 0
cached = 0 for source, code in codes:
fetched = 0 cached_entry = existing.get(code, {})
errors = 0 updated_at = cached_entry.get("updated_at", 0)
now = time.time()
for source, code in codes:
cached_entry = existing.get(code, {}) # 检查缓存是否新鲜:日K 1小时内,周/月K 1天内
updated_at = cached_entry.get("updated_at", 0) has_daily = bool(cached_entry.get("daily"))
now = time.time() has_weekly = bool(cached_entry.get("weekly"))
has_monthly = bool(cached_entry.get("monthly"))
# 检查缓存是否新鲜:日K 1小时内,周/月K 1天内 cache_fresh = (updated_at > 0 and (now - updated_at) < 3600)
has_daily = bool(cached_entry.get("daily"))
has_weekly = bool(cached_entry.get("weekly")) if has_daily and has_weekly and has_monthly and cache_fresh:
has_monthly = bool(cached_entry.get("monthly")) cached += 1
cache_fresh = (updated_at > 0 and (now - updated_at) < 3600) continue
if has_daily and has_weekly and has_monthly and cache_fresh: try:
cached += 1 r = full_multi_tf_analysis(code)
continue if any(k in r for k in ["daily", "weekly", "monthly"]):
fetched += 1
try: log(f" OK {code} ({source})")
r = full_multi_tf_analysis(code) else:
if any(k in r for k in ["daily", "weekly", "monthly"]): errors += 1
fetched += 1 log(f" EMPTY {code} ({source})")
log(f" OK {code} ({source})") except Exception as e:
else: errors += 1
errors += 1 log(f" FAIL {code} ({source}): {e}")
log(f" EMPTY {code} ({source})")
except Exception as e: log(f"Done: {cached} cached, {fetched} fetched, {errors} errors")
errors += 1
log(f" FAIL {code} ({source}): {e}") if __name__ == "__main__":
main()
log(f"Done: {cached} cached, {fetched} fetched, {errors} errors")
if __name__ == "__main__":
main()
+900 -897
View File
File diff suppressed because it is too large Load Diff
+239 -247
View File
@@ -1,247 +1,239 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""system_audit.py — MoFin 全局系统审计 """system_audit.py — MoFin 全局系统审计
每日收盘后运行,遍历所有对象生命周期,发现缺口→自动修复/记录。 每日收盘后运行,遍历所有对象生命周期,发现缺口→自动修复/记录。
审计维度: 审计维度:
1. 信号管道 — 今日signal_news产出vs处理量,有积压则预警 1. 信号管道 — 今日signal_news产出vs处理量,有积压则预警
2. 股票生命周期 — 关注列表是否有条件触发的、自选是否有策略缺失的 2. 股票生命周期 — 关注列表是否有条件触发的、自选是否有策略缺失的
3. 策略状态 — 过期/偏离/无止损等异常策略 3. 策略状态 — 过期/偏离/无止损等异常策略
4. 建议闭环 — pending超过7天的未执行建议 4. 建议闭环 — pending超过7天的未执行建议
5. 组合健康 — 弱势占比、仓位集中度、现金水位 5. 组合健康 — 弱势占比、仓位集中度、现金水位
6. 数据管道 — 今日采集是否正常、有无cron报错 6. 数据管道 — 今日采集是否正常、有无cron报错
7. 系统服务 — Dashboard/XMPP/小果API在线状态 7. 系统服务 — Dashboard/XMPP/小果API在线状态
输出:JSON + 摘要文本,推送给老爸。 输出:JSON + 摘要文本,推送给老爸。
""" """
import json, sqlite3, subprocess, sys, time import json, sqlite3, subprocess, sys, time
from pathlib import Path from pathlib import Path
from datetime import datetime, timedelta from datetime import datetime, timedelta
from mo_data import read_portfolio, read_decisions, read_watchlist
DATA_DIR = Path("/home/hmo/MoFin/data")
WEB_DATA = Path("/home/hmo/web-dashboard/data") DATA_DIR = Path("/home/hmo/MoFin/data")
REPORT = {"timestamp": datetime.now().isoformat(), "issues": [], "fixes": [], "ok": []} WEB_DATA = Path("/home/hmo/web-dashboard/data")
REPORT = {"timestamp": datetime.now().isoformat(), "issues": [], "fixes": [], "ok": []}
def log_issue(area, severity, desc, fix=None):
REPORT["issues"].append({"area": area, "severity": severity, "desc": desc, "suggested_fix": fix}) def log_issue(area, severity, desc, fix=None):
REPORT["issues"].append({"area": area, "severity": severity, "desc": desc, "suggested_fix": fix})
def log_fix(area, desc):
REPORT["fixes"].append({"area": area, "desc": desc}) def log_fix(area, desc):
REPORT["fixes"].append({"area": area, "desc": desc})
def log_ok(area, desc):
REPORT["ok"].append({"area": area, "desc": desc}) def log_ok(area, desc):
REPORT["ok"].append({"area": area, "desc": desc})
# ── 1. 信号管道审计 ──
def audit_signals(conn): # ── 1. 信号管道审计 ──
try: def audit_signals(conn):
total = conn.execute("SELECT COUNT(*) FROM signal_news").fetchone()[0] try:
unproc = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()[0] total = conn.execute("SELECT COUNT(*) FROM signal_news").fetchone()[0]
today = conn.execute("SELECT COUNT(*) FROM signal_news WHERE created_at > datetime('now','-1 day')").fetchone()[0] unproc = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()[0]
log_ok("信号管道", f"信号库{total}条,今日{today}条,未处理{unproc}") today = conn.execute("SELECT COUNT(*) FROM signal_news WHERE created_at > datetime('now','-1 day')").fetchone()[0]
if unproc > 30: log_ok("信号管道", f"信号库{total}条,今日{today}条,未处理{unproc}")
log_issue("信号管道", "HIGH", f"未处理信号堆积{unproc}条,可能处理速度跟不上") if unproc > 30:
except Exception as e: log_issue("信号管道", "HIGH", f"未处理信号堆积{unproc}条,可能处理速度跟不上")
log_issue("信号管道", "HIGH", f"查询失败: {e}") except Exception as e:
log_issue("信号管道", "HIGH", f"查询失败: {e}")
# ── 2. 股票生命周期审计 ──
def audit_stocks(conn): # ── 2. 股票生命周期审计 ──
# 关注列表 def audit_stocks(conn):
try: # 关注列表
wl = json.loads((WEB_DATA / "watchlist.json").read_text()) try:
watching = [s for s in wl.get("stocks", []) if s.get("status") == "watching"] wl = read_watchlist()
formal = [s for s in wl.get("stocks", []) if s.get("status") != "watching"] watching = [s for s in wl.get("stocks", []) if s.get("status") == "watching"]
log_ok("股票池", f"正式自选{len(formal)}只, 关注列表{len(watching)}") formal = [s for s in wl.get("stocks", []) if s.get("status") != "watching"]
log_ok("股票池", f"正式自选{len(formal)}只, 关注列表{len(watching)}")
# 检查持仓中是否有已关闭但未标记的
closed_holdings = conn.execute("SELECT COUNT(*) FROM holdings WHERE is_active=0").fetchone()[0] # 检查持仓中是否有已关闭但未标记的
active_holdings = conn.execute("SELECT COUNT(*) FROM holdings WHERE is_active=1").fetchone()[0] closed_holdings = conn.execute("SELECT COUNT(*) FROM holdings WHERE is_active=0").fetchone()[0]
if closed_holdings > 0: active_holdings = conn.execute("SELECT COUNT(*) FROM holdings WHERE is_active=1").fetchone()[0]
log_ok("股票池", f"持有中{active_holdings}只活跃, {closed_holdings}只已关闭") if closed_holdings > 0:
except Exception as e: log_ok("股票池", f"持有中{active_holdings}只活跃, {closed_holdings}只已关闭")
log_issue("股票池", "MEDIUM", f"查询失败: {e}") except Exception as e:
log_issue("股票池", "MEDIUM", f"查询失败: {e}")
# ── 3. 策略状态审计 ──
def audit_strategies(conn): # ── 3. 策略状态审计 ──
try: def audit_strategies(conn):
dec = json.loads((WEB_DATA / "decisions.json").read_text()) try:
active = [d for d in dec.get("decisions", []) if d.get("status") in ("active", "updated")] dec = read_decisions()
stale_count = 0 active = [d for d in dec.get("decisions", []) if d.get("status") in ("active", "updated")]
no_stop = 0 stale_count = 0
for d in active: no_stop = 0
# 检查是否有止损 for d in active:
if not d.get("stop_loss"): # 检查是否有止损
no_stop += 1 if not d.get("stop_loss"):
# 检查是否过期(>14天) no_stop += 1
ts = d.get("timestamp", "") # 检查是否过期(>14天)
if ts: ts = d.get("timestamp", "")
try: if ts:
dt = datetime.fromisoformat(ts) try:
if (datetime.now() - dt).days > 14: dt = datetime.fromisoformat(ts)
stale_count += 1 if (datetime.now() - dt).days > 14:
except: stale_count += 1
pass except:
log_ok("策略", f"活跃策略{len(active)}") pass
if stale_count > 0: log_ok("策略", f"活跃策略{len(active)}")
log_issue("策略", "MEDIUM", f"{stale_count}条策略超过14天未更新", "运行 stale_detector 触发重评") if stale_count > 0:
if no_stop > 0: log_issue("策略", "MEDIUM", f"{stale_count}条策略超过14天未更新", "运行 stale_detector 触发重评")
log_issue("策略", "HIGH", f"{no_stop}条活跃策略缺少止损位") if no_stop > 0:
except Exception as e: log_issue("策略", "HIGH", f"{no_stop}条活跃策略缺少止损位")
log_issue("策略", "HIGH", f"查询失败: {e}") except Exception as e:
log_issue("策略", "HIGH", f"查询失败: {e}")
# ── 4. 建议闭环审计 ──
def audit_advice(conn): # ── 4. 建议闭环审计 ──
try: def audit_advice(conn):
dec = json.loads((WEB_DATA / "decisions.json").read_text()) try:
pending = 0 dec = read_decisions()
for d in dec.get("decisions", []): pending = 0
for a in d.get("advice_timeline", []): for d in dec.get("decisions", []):
if a.get("status") == "pending": for a in d.get("advice_timeline", []):
pending += 1 if a.get("status") == "pending":
if pending > 0: pending += 1
log_issue("建议", "LOW", f"{pending}条建议待确认/执行", "检查advice_timeline确认是否已执行") if pending > 0:
else: log_issue("建议", "LOW", f"{pending}条建议待确认/执行", "检查advice_timeline确认是否已执行")
log_ok("建议", "无待处理建议") else:
except Exception as e: log_ok("建议", "无待处理建议")
log_issue("建议", "MEDIUM", f"查询失败: {e}") except Exception as e:
log_issue("建议", "MEDIUM", f"查询失败: {e}")
# ── 5. 组合健康 ──
def audit_portfolio(conn): # ── 5. 组合健康 ──
try: def audit_portfolio(conn):
# 优先从 portfolio.json 读总仓位(更准确,基于实际市值/总资产) try:
pj_path = WEB_DATA / "portfolio.json" pj = read_portfolio()
if not pj_path.exists(): pos = pj.get("position_pct", 0)
pj_path = DATA_DIR / "portfolio.json" cash = pj.get("cash", 0)
if pj_path.exists(): available = pj.get("available_cash", cash)
pj = json.loads(pj_path.read_text())
pos = pj.get("position_pct", 0) log_ok("组合", f"总仓位{pos:.1f}%")
cash = pj.get("cash", 0) if pos > 90:
available = pj.get("available_cash", cash) log_issue("组合", "MEDIUM", f"仓位{pos:.1f}%超过90%,现金紧张")
else: elif pos < 30:
# 兜底:SQLite position_pct 之和 log_issue("组合", "LOW", f"仓位仅{pos:.1f}%,现金过多")
pos = conn.execute("SELECT SUM(position_pct) FROM holdings WHERE is_active=1").fetchone()[0] or 0 except Exception as e:
available = 0 log_issue("组合", "MEDIUM", f"查询失败: {e}")
log_ok("组合", f"总仓位{pos:.1f}%")
if pos > 90: # ── 8. 编译缓存审计 ──
log_issue("组合", "MEDIUM", f"仓位{pos:.1f}%超过90%,现金紧张") def audit_cache():
elif pos < 30: """检查 __pycache__ 中是否有比 .py 源文件更老的 .pyc(陈旧缓存)。"""
log_issue("组合", "LOW", f"仓位仅{pos:.1f}%,现金过多") try:
except Exception as e: base = Path(__file__).resolve().parent
log_issue("组合", "MEDIUM", f"查询失败: {e}") stale = []
for pyc in base.rglob("__pycache__/*.pyc"):
py = pyc.with_suffix("") # remove .cpython-*.pyc extension
# ── 8. 编译缓存审计 ── # The .py file is at parent_of___pycache__ / stem_without_cpython_suffix
def audit_cache(): # e.g., __pycache__/foo.cpython-312.pyc -> ../foo.py
"""检查 __pycache__ 中是否有比 .py 源文件更老的 .pyc(陈旧缓存)。""" stem = pyc.stem # e.g. "foo.cpython-312"
try: # Remove the .cpython-NNN suffix to get original module name
base = Path(__file__).resolve().parent import re
stale = [] m = re.match(r"^(.*?)\.cpython-\d+", stem)
for pyc in base.rglob("__pycache__/*.pyc"): if not m:
py = pyc.with_suffix("") # remove .cpython-*.pyc extension continue
# The .py file is at parent_of___pycache__ / stem_without_cpython_suffix py_path = pyc.parent.parent / f"{m.group(1)}.py"
# e.g., __pycache__/foo.cpython-312.pyc -> ../foo.py if py_path.exists() and pyc.stat().st_mtime < py_path.stat().st_mtime:
stem = pyc.stem # e.g. "foo.cpython-312" stale.append(str(py_path.name))
# Remove the .cpython-NNN suffix to get original module name if stale:
import re log_issue("编译缓存", "MEDIUM", f"{len(stale)}个陈旧.pyc{', '.join(stale)}", "删除对应__pycache__/.pyc")
m = re.match(r"^(.*?)\.cpython-\d+", stem) else:
if not m: log_ok("编译缓存", "所有.pyc文件与源文件一致")
continue except Exception as e:
py_path = pyc.parent.parent / f"{m.group(1)}.py" log_issue("编译缓存", "LOW", f"检查失败: {e}")
if py_path.exists() and pyc.stat().st_mtime < py_path.stat().st_mtime:
stale.append(str(py_path.name))
if stale: # ── 6. 数据管道审计 ──
log_issue("编译缓存", "MEDIUM", f"{len(stale)}个陈旧.pyc{', '.join(stale)}", "删除对应__pycache__/.pyc") def audit_pipeline():
else: # 检查DB市场数据是否今天更新
log_ok("编译缓存", "所有.pyc文件与源文件一致") try:
except Exception as e: conn = sqlite3.connect(str(DATA_DIR / "mofin.db"))
log_issue("编译缓存", "LOW", f"检查失败: {e}") row = conn.execute(
"SELECT created_at FROM market_snapshots ORDER BY created_at DESC LIMIT 1"
).fetchone()
# ── 6. 数据管道审计 ── conn.close()
def audit_pipeline(): if row and row[0][:10] == datetime.now().strftime("%Y-%m-%d"):
# 检查DB市场数据是否今天更新 log_ok("数据管道", f"市场数据今天更新({row[0]})")
try: else:
conn = sqlite3.connect(str(DATA_DIR / "mofin.db")) log_issue("数据管道", "HIGH", f"市场数据未更新(DB), 最后{row[0] if row else '无数据'}")
row = conn.execute( except Exception as e:
"SELECT created_at FROM market_snapshots ORDER BY created_at DESC LIMIT 1" log_issue("数据管道", "HIGH", f"DB检查失败: {e}")
).fetchone()
conn.close()
if row and row[0][:10] == datetime.now().strftime("%Y-%m-%d"): # ── 7. 系统服务 ──
log_ok("数据管道", f"市场数据今天更新({row[0]})") def audit_services():
else: services = [
log_issue("数据管道", "HIGH", f"市场数据未更新(DB), 最后{row[0] if row else '无数据'}") ("Dashboard", "http://127.0.0.1:8899/", "200"),
except Exception as e: ("mofin-dashboard", None, "active"),
log_issue("数据管道", "HIGH", f"DB检查失败: {e}") ("xmpp-zhiwei", None, "active"),
]
for name, url, expected in services:
# ── 7. 系统服务 ── try:
def audit_services(): if url:
services = [ result = subprocess.run(["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", url],
("Dashboard", "http://127.0.0.1:8899/", "200"), capture_output=True, text=True, timeout=5)
("mofin-dashboard", None, "active"), if result.stdout.strip() == expected:
("xmpp-zhiwei", None, "active"), log_ok("系统服务", f"{name} 正常")
] else:
for name, url, expected in services: log_issue("系统服务", "HIGH", f"{name} 返回 {result.stdout.strip()} (期望{expected})")
try: else:
if url: result = subprocess.run(["systemctl", "is-active", name],
result = subprocess.run(["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", url], capture_output=True, text=True, timeout=5)
capture_output=True, text=True, timeout=5) if result.stdout.strip() == expected:
if result.stdout.strip() == expected: log_ok("系统服务", f"{name} 正常")
log_ok("系统服务", f"{name} 正常") else:
else: log_issue("系统服务", "HIGH", f"{name} 状态 {result.stdout.strip()} (期望{expected})")
log_issue("系统服务", "HIGH", f"{name} 返回 {result.stdout.strip()} (期望{expected})") except Exception as e:
else: log_issue("系统服务", "HIGH", f"{name} 检查失败: {e}")
result = subprocess.run(["systemctl", "is-active", name],
capture_output=True, text=True, timeout=5)
if result.stdout.strip() == expected: # ── 执行 ──
log_ok("系统服务", f"{name} 正常") def main():
else: start = time.time()
log_issue("系统服务", "HIGH", f"{name} 状态 {result.stdout.strip()} (期望{expected})") conn = sqlite3.connect(str(DATA_DIR / "mofin.db"))
except Exception as e:
log_issue("系统服务", "HIGH", f"{name} 检查失败: {e}") audit_signals(conn)
audit_stocks(conn)
audit_strategies(conn)
# ── 执行 ── audit_advice(conn)
def main(): audit_portfolio(conn)
start = time.time() audit_pipeline()
conn = sqlite3.connect(str(DATA_DIR / "mofin.db")) audit_services()
audit_cache()
audit_signals(conn)
audit_stocks(conn) conn.close()
audit_strategies(conn)
audit_advice(conn) REPORT["duration"] = f"{time.time()-start:.0f}s"
audit_portfolio(conn) REPORT["summary"] = f"审计完成: {len(REPORT['issues'])}个问题, {len(REPORT['fixes'])}个已修复, {len(REPORT['ok'])}项正常"
audit_pipeline()
audit_services() # 写入文件
audit_cache() (WEB_DATA / "system_audit_report.json").write_text(json.dumps(REPORT, ensure_ascii=False, indent=2))
conn.close() # 输出摘要(给cron推送用)
print(f"【系统审计】{REPORT['summary']}")
REPORT["duration"] = f"{time.time()-start:.0f}s" for i in REPORT["issues"]:
REPORT["summary"] = f"审计完成: {len(REPORT['issues'])}个问题, {len(REPORT['fixes'])}个已修复, {len(REPORT['ok'])}项正常" print(f" [{i['severity']}] {i['area']}: {i['desc']}")
if REPORT["fixes"]:
# 写入文件 for f in REPORT["fixes"]:
(WEB_DATA / "system_audit_report.json").write_text(json.dumps(REPORT, ensure_ascii=False, indent=2)) print(f" ✅ 已修复: {f['area']}: {f['desc']}")
for o in REPORT["ok"]:
# 输出摘要(给cron推送用) print(f"{o['area']}: {o['desc']}")
print(f"【系统审计】{REPORT['summary']}")
for i in REPORT["issues"]:
print(f" [{i['severity']}] {i['area']}: {i['desc']}") if __name__ == "__main__":
if REPORT["fixes"]: main()
for f in REPORT["fixes"]:
print(f" ✅ 已修复: {f['area']}: {f['desc']}")
for o in REPORT["ok"]:
print(f"{o['area']}: {o['desc']}")
if __name__ == "__main__":
main()