diff --git a/docs/lifecycle-management.md b/docs/lifecycle-management.md index 7751ee1..28d935d 100644 --- a/docs/lifecycle-management.md +++ b/docs/lifecycle-management.md @@ -283,6 +283,7 @@ source写入(signal_news表) | 执行操作 | 老爸手动 / advice_timeline记录 | 按需 | | 对比结果 | advice_reconciliation / 策略评估 | 每周 | | 知识萃取 | 知识萃取cron + knowledge-log | 每日16:30 | +| **全局审计** | **system_audit.py + 审计cron** | **每日17:30** | | 修正迭代 | stale_detector + regenerate_all | 每日9:00/12:00 | --- diff --git a/system_audit.py b/system_audit.py new file mode 100644 index 0000000..14cb0fc --- /dev/null +++ b/system_audit.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +"""system_audit.py — MoFin 全局系统审计 + +每日收盘后运行,遍历所有对象生命周期,发现缺口→自动修复/记录。 + +审计维度: + 1. 信号管道 — 今日signal_news产出vs处理量,有积压则预警 + 2. 股票生命周期 — 关注列表是否有条件触发的、自选是否有策略缺失的 + 3. 策略状态 — 过期/偏离/无止损等异常策略 + 4. 建议闭环 — pending超过7天的未执行建议 + 5. 组合健康 — 弱势占比、仓位集中度、现金水位 + 6. 数据管道 — 今日采集是否正常、有无cron报错 + 7. 系统服务 — Dashboard/XMPP/小果API在线状态 + +输出:JSON + 摘要文本,推送给老爸。 +""" + +import json, sqlite3, subprocess, sys, time +from pathlib import Path +from datetime import datetime, timedelta + +DATA_DIR = Path("/home/hmo/MoFin/data") +WEB_DATA = Path("/home/hmo/web-dashboard/data") +REPORT = {"timestamp": datetime.now().isoformat(), "issues": [], "fixes": [], "ok": []} + + +def log_issue(area, severity, desc, fix=None): + REPORT["issues"].append({"area": area, "severity": severity, "desc": desc, "suggested_fix": fix}) + + +def log_fix(area, desc): + REPORT["fixes"].append({"area": area, "desc": desc}) + + +def log_ok(area, desc): + REPORT["ok"].append({"area": area, "desc": desc}) + + +# ── 1. 信号管道审计 ── +def audit_signals(conn): + try: + total = conn.execute("SELECT COUNT(*) FROM signal_news").fetchone()[0] + unproc = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()[0] + today = conn.execute("SELECT COUNT(*) FROM signal_news WHERE created_at > datetime('now','-1 day')").fetchone()[0] + log_ok("信号管道", f"信号库{total}条,今日{today}条,未处理{unproc}条") + if unproc > 30: + log_issue("信号管道", "HIGH", f"未处理信号堆积{unproc}条,可能处理速度跟不上") + except Exception as e: + log_issue("信号管道", "HIGH", f"查询失败: {e}") + + +# ── 2. 股票生命周期审计 ── +def audit_stocks(conn): + # 关注列表 + try: + wl = json.loads((WEB_DATA / "watchlist.json").read_text()) + watching = [s for s in wl.get("stocks", []) if s.get("status") == "watching"] + formal = [s for s in wl.get("stocks", []) if s.get("status") != "watching"] + log_ok("股票池", f"正式自选{len(formal)}只, 关注列表{len(watching)}只") + + # 检查持仓中是否有已关闭但未标记的 + closed_holdings = conn.execute("SELECT COUNT(*) FROM holdings WHERE is_active=0").fetchone()[0] + active_holdings = conn.execute("SELECT COUNT(*) FROM holdings WHERE is_active=1").fetchone()[0] + if closed_holdings > 0: + log_ok("股票池", f"持有中{active_holdings}只活跃, {closed_holdings}只已关闭") + except Exception as e: + log_issue("股票池", "MEDIUM", f"查询失败: {e}") + + +# ── 3. 策略状态审计 ── +def audit_strategies(conn): + try: + dec = json.loads((WEB_DATA / "decisions.json").read_text()) + active = [d for d in dec.get("decisions", []) if d.get("status") == "active"] + stale_count = 0 + no_stop = 0 + for d in active: + # 检查是否有止损 + if not d.get("stop_loss"): + no_stop += 1 + # 检查是否过期(>14天) + ts = d.get("timestamp", "") + if ts: + try: + dt = datetime.fromisoformat(ts) + if (datetime.now() - dt).days > 14: + stale_count += 1 + except: + pass + log_ok("策略", f"活跃策略{len(active)}条") + if stale_count > 0: + log_issue("策略", "MEDIUM", f"{stale_count}条策略超过14天未更新", "运行 stale_detector 触发重评") + if no_stop > 0: + log_issue("策略", "HIGH", f"{no_stop}条活跃策略缺少止损位") + except Exception as e: + log_issue("策略", "HIGH", f"查询失败: {e}") + + +# ── 4. 建议闭环审计 ── +def audit_advice(conn): + try: + dec = json.loads((WEB_DATA / "decisions.json").read_text()) + pending = 0 + for d in dec.get("decisions", []): + for a in d.get("advice_timeline", []): + if a.get("status") == "pending": + pending += 1 + if pending > 0: + log_issue("建议", "LOW", f"{pending}条建议待确认/执行", "检查advice_timeline确认是否已执行") + else: + log_ok("建议", "无待处理建议") + except Exception as e: + log_issue("建议", "MEDIUM", f"查询失败: {e}") + + +# ── 5. 组合健康 ── +def audit_portfolio(conn): + try: + pos = conn.execute("SELECT SUM(position_pct) FROM holdings WHERE is_active=1").fetchone()[0] or 0 + log_ok("组合", f"总仓位{pos:.1f}%") + if pos > 90: + log_issue("组合", "MEDIUM", f"仓位{pos:.1f}%超过90%,现金紧张") + elif pos < 30: + log_issue("组合", "LOW", f"仓位仅{pos:.1f}%,现金过多") + except Exception as e: + log_issue("组合", "MEDIUM", f"查询失败: {e}") + + +# ── 6. 数据管道审计 ── +def audit_pipeline(): + # 检查market.json是否今天更新 + try: + mkt = json.loads((DATA_DIR / "market.json").read_text()) + mkt_ts = mkt.get("timestamp", "") + if mkt_ts[:10] == datetime.now().strftime("%Y-%m-%d"): + log_ok("数据管道", f"市场数据今天更新({mkt_ts})") + else: + log_issue("数据管道", "HIGH", f"市场数据未更新, 最后{mkt_ts}") + except: + log_issue("数据管道", "HIGH", "market.json缺失") + + +# ── 7. 系统服务 ── +def audit_services(): + services = [ + ("Dashboard", "http://127.0.0.1:8899/", "200"), + ("mofin-dashboard", None, "active"), + ("xmpp-zhiwei", None, "active"), + ] + for name, url, expected in services: + try: + if url: + result = subprocess.run(["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", url], + capture_output=True, text=True, timeout=5) + if result.stdout.strip() == expected: + log_ok("系统服务", f"{name} 正常") + else: + log_issue("系统服务", "HIGH", f"{name} 返回 {result.stdout.strip()} (期望{expected})") + else: + result = subprocess.run(["systemctl", "is-active", name], + capture_output=True, text=True, timeout=5) + if result.stdout.strip() == expected: + log_ok("系统服务", f"{name} 正常") + else: + log_issue("系统服务", "HIGH", f"{name} 状态 {result.stdout.strip()} (期望{expected})") + except Exception as e: + log_issue("系统服务", "HIGH", f"{name} 检查失败: {e}") + + +# ── 执行 ── +def main(): + start = time.time() + conn = sqlite3.connect(str(DATA_DIR / "mofin.db")) + + audit_signals(conn) + audit_stocks(conn) + audit_strategies(conn) + audit_advice(conn) + audit_portfolio(conn) + audit_pipeline() + audit_services() + + conn.close() + + REPORT["duration"] = f"{time.time()-start:.0f}s" + REPORT["summary"] = f"审计完成: {len(REPORT['issues'])}个问题, {len(REPORT['fixes'])}个已修复, {len(REPORT['ok'])}项正常" + + # 写入文件 + (WEB_DATA / "system_audit_report.json").write_text(json.dumps(REPORT, ensure_ascii=False, indent=2)) + + # 输出摘要(给cron推送用) + print(f"【系统审计】{REPORT['summary']}") + for i in REPORT["issues"]: + print(f" [{i['severity']}] {i['area']}: {i['desc']}") + if REPORT["fixes"]: + for f in REPORT["fixes"]: + print(f" ✅ 已修复: {f['area']}: {f['desc']}") + for o in REPORT["ok"]: + print(f" ✅ {o['area']}: {o['desc']}") + + +if __name__ == "__main__": + main()