Files
MoFin/system_audit.py
T
2026-06-30 23:12:53 +08:00

248 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""system_audit.py — MoFin 全局系统审计
每日收盘后运行,遍历所有对象生命周期,发现缺口→自动修复/记录。
审计维度:
1. 信号管道 — 今日signal_news产出vs处理量,有积压则预警
2. 股票生命周期 — 关注列表是否有条件触发的、自选是否有策略缺失的
3. 策略状态 — 过期/偏离/无止损等异常策略
4. 建议闭环 — pending超过7天的未执行建议
5. 组合健康 — 弱势占比、仓位集中度、现金水位
6. 数据管道 — 今日采集是否正常、有无cron报错
7. 系统服务 — Dashboard/XMPP/小果API在线状态
输出:JSON + 摘要文本,推送给老爸。
"""
import json, sqlite3, subprocess, sys, time
from pathlib import Path
from datetime import datetime, timedelta
DATA_DIR = Path("/home/hmo/MoFin/data")
WEB_DATA = Path("/home/hmo/web-dashboard/data")
REPORT = {"timestamp": datetime.now().isoformat(), "issues": [], "fixes": [], "ok": []}
def log_issue(area, severity, desc, fix=None):
REPORT["issues"].append({"area": area, "severity": severity, "desc": desc, "suggested_fix": fix})
def log_fix(area, desc):
REPORT["fixes"].append({"area": area, "desc": desc})
def log_ok(area, desc):
REPORT["ok"].append({"area": area, "desc": desc})
# ── 1. 信号管道审计 ──
def audit_signals(conn):
try:
total = conn.execute("SELECT COUNT(*) FROM signal_news").fetchone()[0]
unproc = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()[0]
today = conn.execute("SELECT COUNT(*) FROM signal_news WHERE created_at > datetime('now','-1 day')").fetchone()[0]
log_ok("信号管道", f"信号库{total}条,今日{today}条,未处理{unproc}")
if unproc > 30:
log_issue("信号管道", "HIGH", f"未处理信号堆积{unproc}条,可能处理速度跟不上")
except Exception as e:
log_issue("信号管道", "HIGH", f"查询失败: {e}")
# ── 2. 股票生命周期审计 ──
def audit_stocks(conn):
# 关注列表
try:
wl = json.loads((WEB_DATA / "watchlist.json").read_text())
watching = [s for s in wl.get("stocks", []) if s.get("status") == "watching"]
formal = [s for s in wl.get("stocks", []) if s.get("status") != "watching"]
log_ok("股票池", f"正式自选{len(formal)}只, 关注列表{len(watching)}")
# 检查持仓中是否有已关闭但未标记的
closed_holdings = conn.execute("SELECT COUNT(*) FROM holdings WHERE is_active=0").fetchone()[0]
active_holdings = conn.execute("SELECT COUNT(*) FROM holdings WHERE is_active=1").fetchone()[0]
if closed_holdings > 0:
log_ok("股票池", f"持有中{active_holdings}只活跃, {closed_holdings}只已关闭")
except Exception as e:
log_issue("股票池", "MEDIUM", f"查询失败: {e}")
# ── 3. 策略状态审计 ──
def audit_strategies(conn):
try:
dec = json.loads((WEB_DATA / "decisions.json").read_text())
active = [d for d in dec.get("decisions", []) if d.get("status") in ("active", "updated")]
stale_count = 0
no_stop = 0
for d in active:
# 检查是否有止损
if not d.get("stop_loss"):
no_stop += 1
# 检查是否过期(>14天)
ts = d.get("timestamp", "")
if ts:
try:
dt = datetime.fromisoformat(ts)
if (datetime.now() - dt).days > 14:
stale_count += 1
except:
pass
log_ok("策略", f"活跃策略{len(active)}")
if stale_count > 0:
log_issue("策略", "MEDIUM", f"{stale_count}条策略超过14天未更新", "运行 stale_detector 触发重评")
if no_stop > 0:
log_issue("策略", "HIGH", f"{no_stop}条活跃策略缺少止损位")
except Exception as e:
log_issue("策略", "HIGH", f"查询失败: {e}")
# ── 4. 建议闭环审计 ──
def audit_advice(conn):
try:
dec = json.loads((WEB_DATA / "decisions.json").read_text())
pending = 0
for d in dec.get("decisions", []):
for a in d.get("advice_timeline", []):
if a.get("status") == "pending":
pending += 1
if pending > 0:
log_issue("建议", "LOW", f"{pending}条建议待确认/执行", "检查advice_timeline确认是否已执行")
else:
log_ok("建议", "无待处理建议")
except Exception as e:
log_issue("建议", "MEDIUM", f"查询失败: {e}")
# ── 5. 组合健康 ──
def audit_portfolio(conn):
try:
# 优先从 portfolio.json 读总仓位(更准确,基于实际市值/总资产)
pj_path = WEB_DATA / "portfolio.json"
if not pj_path.exists():
pj_path = DATA_DIR / "portfolio.json"
if pj_path.exists():
pj = json.loads(pj_path.read_text())
pos = pj.get("position_pct", 0)
cash = pj.get("cash", 0)
available = pj.get("available_cash", cash)
else:
# 兜底:SQLite position_pct 之和
pos = conn.execute("SELECT SUM(position_pct) FROM holdings WHERE is_active=1").fetchone()[0] or 0
available = 0
log_ok("组合", f"总仓位{pos:.1f}%")
if pos > 90:
log_issue("组合", "MEDIUM", f"仓位{pos:.1f}%超过90%,现金紧张")
elif pos < 30:
log_issue("组合", "LOW", f"仓位仅{pos:.1f}%,现金过多")
except Exception as e:
log_issue("组合", "MEDIUM", f"查询失败: {e}")
# ── 8. 编译缓存审计 ──
def audit_cache():
"""检查 __pycache__ 中是否有比 .py 源文件更老的 .pyc(陈旧缓存)。"""
try:
base = Path(__file__).resolve().parent
stale = []
for pyc in base.rglob("__pycache__/*.pyc"):
py = pyc.with_suffix("") # remove .cpython-*.pyc extension
# The .py file is at parent_of___pycache__ / stem_without_cpython_suffix
# e.g., __pycache__/foo.cpython-312.pyc -> ../foo.py
stem = pyc.stem # e.g. "foo.cpython-312"
# Remove the .cpython-NNN suffix to get original module name
import re
m = re.match(r"^(.*?)\.cpython-\d+", stem)
if not m:
continue
py_path = pyc.parent.parent / f"{m.group(1)}.py"
if py_path.exists() and pyc.stat().st_mtime < py_path.stat().st_mtime:
stale.append(str(py_path.name))
if stale:
log_issue("编译缓存", "MEDIUM", f"{len(stale)}个陈旧.pyc{', '.join(stale)}", "删除对应__pycache__/.pyc")
else:
log_ok("编译缓存", "所有.pyc文件与源文件一致")
except Exception as e:
log_issue("编译缓存", "LOW", f"检查失败: {e}")
# ── 6. 数据管道审计 ──
def audit_pipeline():
# 检查DB市场数据是否今天更新
try:
conn = sqlite3.connect(str(DATA_DIR / "mofin.db"))
row = conn.execute(
"SELECT created_at FROM market_snapshots ORDER BY created_at DESC LIMIT 1"
).fetchone()
conn.close()
if row and row[0][:10] == datetime.now().strftime("%Y-%m-%d"):
log_ok("数据管道", f"市场数据今天更新({row[0]})")
else:
log_issue("数据管道", "HIGH", f"市场数据未更新(DB), 最后{row[0] if row else '无数据'}")
except Exception as e:
log_issue("数据管道", "HIGH", f"DB检查失败: {e}")
# ── 7. 系统服务 ──
def audit_services():
services = [
("Dashboard", "http://127.0.0.1:8899/", "200"),
("mofin-dashboard", None, "active"),
("xmpp-zhiwei", None, "active"),
]
for name, url, expected in services:
try:
if url:
result = subprocess.run(["curl", "-s", "-o", "/dev/null", "-w", "%{http_code}", url],
capture_output=True, text=True, timeout=5)
if result.stdout.strip() == expected:
log_ok("系统服务", f"{name} 正常")
else:
log_issue("系统服务", "HIGH", f"{name} 返回 {result.stdout.strip()} (期望{expected})")
else:
result = subprocess.run(["systemctl", "is-active", name],
capture_output=True, text=True, timeout=5)
if result.stdout.strip() == expected:
log_ok("系统服务", f"{name} 正常")
else:
log_issue("系统服务", "HIGH", f"{name} 状态 {result.stdout.strip()} (期望{expected})")
except Exception as e:
log_issue("系统服务", "HIGH", f"{name} 检查失败: {e}")
# ── 执行 ──
def main():
start = time.time()
conn = sqlite3.connect(str(DATA_DIR / "mofin.db"))
audit_signals(conn)
audit_stocks(conn)
audit_strategies(conn)
audit_advice(conn)
audit_portfolio(conn)
audit_pipeline()
audit_services()
audit_cache()
conn.close()
REPORT["duration"] = f"{time.time()-start:.0f}s"
REPORT["summary"] = f"审计完成: {len(REPORT['issues'])}个问题, {len(REPORT['fixes'])}个已修复, {len(REPORT['ok'])}项正常"
# 写入文件
(WEB_DATA / "system_audit_report.json").write_text(json.dumps(REPORT, ensure_ascii=False, indent=2))
# 输出摘要(给cron推送用)
print(f"【系统审计】{REPORT['summary']}")
for i in REPORT["issues"]:
print(f" [{i['severity']}] {i['area']}: {i['desc']}")
if REPORT["fixes"]:
for f in REPORT["fixes"]:
print(f" ✅ 已修复: {f['area']}: {f['desc']}")
for o in REPORT["ok"]:
print(f"{o['area']}: {o['desc']}")
if __name__ == "__main__":
main()