#!/usr/bin/env python3 """system_audit.py — MoFin 全局系统审计 每日收盘后运行,遍历所有对象生命周期,发现缺口→自动修复/记录。 审计维度: 1. 信号管道 — 今日signal_news产出vs处理量,有积压则预警 2. 股票生命周期 — 关注列表是否有条件触发的、自选是否有策略缺失的 3. 策略状态 — 过期/偏离/无止损等异常策略 4. 建议闭环 — pending超过7天的未执行建议 5. 组合健康 — 弱势占比、仓位集中度、现金水位 6. 数据管道 — 今日采集是否正常、有无cron报错 7. 系统服务 — Dashboard/XMPP/小果API在线状态 输出:JSON + 摘要文本,推送给老爸。 """ import json, sqlite3, subprocess, sys, time from pathlib import Path from datetime import datetime, timedelta DATA_DIR = Path("/home/hmo/MoFin/data") WEB_DATA = Path("/home/hmo/web-dashboard/data") REPORT = {"timestamp": datetime.now().isoformat(), "issues": [], "fixes": [], "ok": []} def log_issue(area, severity, desc, fix=None): REPORT["issues"].append({"area": area, "severity": severity, "desc": desc, "suggested_fix": fix}) def log_fix(area, desc): REPORT["fixes"].append({"area": area, "desc": desc}) def log_ok(area, desc): REPORT["ok"].append({"area": area, "desc": desc}) # ── 1. 信号管道审计 ── def audit_signals(conn): try: total = conn.execute("SELECT COUNT(*) FROM signal_news").fetchone()[0] unproc = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()[0] today = conn.execute("SELECT COUNT(*) FROM signal_news WHERE created_at > datetime('now','-1 day')").fetchone()[0] log_ok("信号管道", f"信号库{total}条,今日{today}条,未处理{unproc}条") if unproc > 30: log_issue("信号管道", "HIGH", f"未处理信号堆积{unproc}条,可能处理速度跟不上") except Exception as e: log_issue("信号管道", "HIGH", f"查询失败: {e}") # ── 2. 股票生命周期审计 ── def audit_stocks(conn): # 关注列表 try: wl = json.loads((WEB_DATA / "watchlist.json").read_text()) watching = [s for s in wl.get("stocks", []) if s.get("status") == "watching"] formal = [s for s in wl.get("stocks", []) if s.get("status") != "watching"] log_ok("股票池", f"正式自选{len(formal)}只, 关注列表{len(watching)}只") # 检查持仓中是否有已关闭但未标记的 closed_holdings = conn.execute("SELECT COUNT(*) FROM holdings WHERE is_active=0").fetchone()[0] active_holdings = conn.execute("SELECT COUNT(*) FROM holdings WHERE is_active=1").fetchone()[0] if closed_holdings > 0: log_ok("股票池", f"持有中{active_holdings}只活跃, {closed_holdings}只已关闭") except Exception as e: log_issue("股票池", "MEDIUM", f"查询失败: {e}") # ── 3. 策略状态审计 ── def audit_strategies(conn): try: dec = json.loads((WEB_DATA / "decisions.json").read_text()) active = [d for d in dec.get("decisions", []) if d.get("status") == "active"] stale_count = 0 no_stop = 0 for d in active: # 检查是否有止损 if not d.get("stop_loss"): no_stop += 1 # 检查是否过期(>14天) ts = d.get("timestamp", "") if ts: try: dt = datetime.fromisoformat(ts) if (datetime.now() - dt).days > 14: stale_count += 1 except: pass log_ok("策略", f"活跃策略{len(active)}条") if stale_count > 0: log_issue("策略", "MEDIUM", f"{stale_count}条策略超过14天未更新", "运行 stale_detector 触发重评") if no_stop > 0: log_issue("策略", "HIGH", f"{no_stop}条活跃策略缺少止损位") except Exception as e: log_issue("策略", "HIGH", f"查询失败: {e}") # ── 4. 建议闭环审计 ── def audit_advice(conn): try: dec = json.loads((WEB_DATA / "decisions.json").read_text()) pending = 0 for d in dec.get("decisions", []): for a in d.get("advice_timeline", []): if a.get("status") == "pending": pending += 1 if pending > 0: log_issue("建议", "LOW", f"{pending}条建议待确认/执行", "检查advice_timeline确认是否已执行") else: log_ok("建议", "无待处理建议") except Exception as e: log_issue("建议", "MEDIUM", f"查询失败: {e}") # ── 5. 组合健康 ── def audit_portfolio(conn): try: pos = conn.execute("SELECT SUM(position_pct) FROM holdings WHERE is_active=1").fetchone()[0] or 0 log_ok("组合", f"总仓位{pos:.1f}%") if pos > 90: log_issue("组合", "MEDIUM", f"仓位{pos:.1f}%超过90%,现金紧张") elif pos < 30: log_issue("组合", "LOW", f"仓位仅{pos:.1f}%,现金过多") except Exception as e: log_issue("组合", "MEDIUM", f"查询失败: {e}") # ── 6. 数据管道审计 ── def audit_pipeline(): # 检查market.json是否今天更新 try: mkt = json.loads((DATA_DIR / "market.json").read_text()) mkt_ts = mkt.get("timestamp", "") if mkt_ts[:10] == datetime.now().strftime("%Y-%m-%d"): log_ok("数据管道", f"市场数据今天更新({mkt_ts})") else: log_issue("数据管道", "HIGH", f"市场数据未更新, 最后{mkt_ts}") except: log_issue("数据管道", "HIGH", "market.json缺失") # ── 7. 系统服务 ── def audit_services(): services = [ ("Dashboard", "http://127.0.0.1:8899/", "200"), ("mofin-dashboard", None, "active"), ("xmpp-zhiwei", None, "active"), ] for name, url, expected in services: try: if url: result = subprocess.run(["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", url], capture_output=True, text=True, timeout=5) if result.stdout.strip() == expected: log_ok("系统服务", f"{name} 正常") else: log_issue("系统服务", "HIGH", f"{name} 返回 {result.stdout.strip()} (期望{expected})") else: result = subprocess.run(["systemctl", "is-active", name], capture_output=True, text=True, timeout=5) if result.stdout.strip() == expected: log_ok("系统服务", f"{name} 正常") else: log_issue("系统服务", "HIGH", f"{name} 状态 {result.stdout.strip()} (期望{expected})") except Exception as e: log_issue("系统服务", "HIGH", f"{name} 检查失败: {e}") # ── 执行 ── def main(): start = time.time() conn = sqlite3.connect(str(DATA_DIR / "mofin.db")) audit_signals(conn) audit_stocks(conn) audit_strategies(conn) audit_advice(conn) audit_portfolio(conn) audit_pipeline() audit_services() conn.close() REPORT["duration"] = f"{time.time()-start:.0f}s" REPORT["summary"] = f"审计完成: {len(REPORT['issues'])}个问题, {len(REPORT['fixes'])}个已修复, {len(REPORT['ok'])}项正常" # 写入文件 (WEB_DATA / "system_audit_report.json").write_text(json.dumps(REPORT, ensure_ascii=False, indent=2)) # 输出摘要(给cron推送用) print(f"【系统审计】{REPORT['summary']}") for i in REPORT["issues"]: print(f" [{i['severity']}] {i['area']}: {i['desc']}") if REPORT["fixes"]: for f in REPORT["fixes"]: print(f" ✅ 已修复: {f['area']}: {f['desc']}") for o in REPORT["ok"]: print(f" ✅ {o['area']}: {o['desc']}") if __name__ == "__main__": main()