Files
MoFin/system_health_check.py
T

322 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""system_health_check.py — MoFin 系统健康检查
每日运行,检查所有组件是否正常工作。
输出报告,有问题才推送。
"""
import json, os, sys, subprocess
from datetime import datetime, timedelta
from pathlib import Path
DATA_DIR = Path("/home/hmo/web-dashboard/data")
EVENTS_PATH = DATA_DIR / "price_events.json"
EVALUATION_PATH = DATA_DIR / "evaluation.json"
ACCURACY_PATH = DATA_DIR / "accuracy_stats.json"
CRON_JOBS = "/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json"
POSITION_CRON = "/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json"
def check(ok, msg):
icon = "" if ok else "⚠️"
return f" {icon} {msg}"
def load_json(path, default=None):
try:
with open(path) as f:
return json.load(f)
except:
return {} if default is None else default
def check_cron_jobs(path, label):
issues = []
try:
d = load_json(path, {"jobs": []})
for j in d.get("jobs", []):
name = j.get("name", "?")
enabled = j.get("enabled", True)
last = j.get("last_run_at", "")
status = j.get("last_status", "")
if not enabled:
issues.append(f"{name} 已禁用")
elif not last:
issues.append(f"{name} 从未运行")
elif status != "ok":
issues.append(f"{name} 上次状态={status}")
return len(d.get("jobs", [])), issues
except:
return 0, ["无法读取"]
def run():
now = datetime.now()
issues = []
ok_count = 0
warn_count = 0
lines = [f"MoFin 系统健康检查 | {now.strftime('%Y-%m-%d %H:%M')}"]
lines.append("")
# 1. 进程检查
lines.append("【进程】")
procs = {
"mofin-dashboard": "mofin-dashboard",
"xmpp-zhiwei": "xmpp_zhiwei_bot",
"ejabberd": "ejabberd",
}
for name, pattern in procs.items():
# 先查 systemd,再查 pgrep
r = subprocess.run(["systemctl", "is-active", f"{pattern}.service"], capture_output=True, text=True, timeout=5)
alive = r.stdout.strip() == "active"
if not alive:
r2 = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5)
alive = r2.returncode == 0
lines.append(check(alive, f"{name} {'运行中' if alive else '已停止'}"))
if not alive: issues.append(f"{name} 进程不存在"); warn_count += 1
else: ok_count += 1
# 2. 端口检查
lines.append("")
lines.append("【端口】")
ports = {"8899": "Dashboard", "5222": "ejabberd", "8643": "知微Gateway"}
for port, name in ports.items():
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
listening = f":{port}" in r.stdout
lines.append(check(listening, f"{name} :{port} {'监听中' if listening else '未监听'}"))
if not listening: issues.append(f"{name} 端口{port}未监听"); warn_count += 1
else: ok_count += 1
# 3. 数据文件检查
lines.append("")
lines.append("【数据文件】")
# DB 优先:从 SQLite 查询代替 JSON 文件检查
try:
from mo_data import read_portfolio, read_decisions, read_watchlist
pf = read_portfolio()
lines.append(check(len(pf.get("holdings", [])) > 0, f"portfolio.json DB记录: {len(pf.get('holdings', []))}"))
ok_count += 1
wl = read_watchlist()
lines.append(check(len(wl.get("stocks", [])) > 0, f"watchlist.json DB记录: {len(wl.get('stocks', []))}"))
ok_count += 1
dec = read_decisions()
lines.append(check(len(dec.get("decisions", [])) > 0, f"decisions.json DB记录: {len(dec.get('decisions', []))}"))
ok_count += 1
except Exception:
lines.append(check(False, "MoFin DB 数据读取失败"))
warn_count += 3
# 仍为 JSON 文件的检查
files = {
"market.json": DATA_DIR / "market.json",
"price_events.json": EVENTS_PATH,
"evaluation.json": EVALUATION_PATH,
"accuracy_stats.json": ACCURACY_PATH,
}
for name, path in files.items():
exists = path.exists()
size = path.stat().st_size if exists else 0
lines.append(check(exists and size > 10, f"{name} {'存在' if exists else '缺失'} ({size}B)"))
if not exists or size < 10:
issues.append(f"{name} 缺失或为空")
warn_count += 1
else:
ok_count += 1
# 4. 价格事件统计
lines.append("")
lines.append("【价格事件】")
try:
from mofin_db import get_conn, query_price_events, query_price_events_by_date
conn = get_conn()
ev_list = query_price_events(conn, limit=50000)
today_events = query_price_events_by_date(conn, now.strftime("%Y-%m-%d"))
conn.close()
except Exception:
events = load_json(EVENTS_PATH, {"events": []})
ev_list = events.get("events", [])
today_events = [e for e in ev_list if e.get("date") == now.strftime("%Y-%m-%d")]
lines.append(check(len(ev_list) > 0, f"历史事件: {len(ev_list)}"))
lines.append(check(len(today_events) > 0, f"今日事件: {len(today_events)}"))
if len(ev_list) == 0:
issues.append("price_events 无事件记录,price_monitor可能未触发过")
warn_count += 1
else:
ok_count += 1
# 5. 策略评估统计
lines.append("")
lines.append("【策略评估】")
evals = load_json(EVALUATION_PATH, {"strategies": []})
s_list = evals.get("strategies", [])
lines.append(check(len(s_list) > 0, f"已评估策略: {len(s_list)}"))
if len(s_list) > 0:
avg = sum(s.get("score", 0) for s in s_list) / len(s_list)
lines.append(check(avg > 0, f"平均评分: {avg:.1f}/10"))
ok_count += 1
else:
issues.append("evaluation.json 无评估数据")
warn_count += 1
# 6. 建议记录统计
lines.append("")
lines.append("【建议记录】")
try:
from mo_data import read_decisions
dec = read_decisions()
total_advice = sum(len(d.get("advice_timeline", [])) for d in dec.get("decisions", []))
except Exception:
dec = {"decisions": []}
total_advice = 0
lines.append(check(total_advice > 0, f"建议记录: {total_advice}"))
if total_advice == 0:
issues.append("所有策略建议记录为空")
warn_count += 1
else:
ok_count += 1
# 7. Cron jobs
lines.append("")
lines.append("【Cron Jobs】")
cnt, cron_issues = check_cron_jobs(CRON_JOBS, "default")
lines.append(check(cnt > 0, f"default profile: {cnt}个job"))
for ci in cron_issues:
lines.append(f" ⚠️ {ci}")
warn_count += 1
if cnt == 0: warn_count += 1
cnt2, cron_issues2 = check_cron_jobs(POSITION_CRON, "position-analyst")
lines.append(check(cnt2 > 0, f"position-analyst: {cnt2}个job"))
for ci in cron_issues2:
lines.append(f" ⚠️ {ci}")
warn_count += 1
if cnt2 == 0: warn_count += 1
# 8. 数据新鲜度
lines.append("")
lines.append("【数据新鲜度】")
# DB 优先:从 SQLite 查最新更新时间
try:
from mofin_db import get_conn
conn = get_conn()
db_checks = {
"portfolio (DB)": ("SELECT MAX(updated_at) FROM holdings", 24),
"decisions (DB)": ("SELECT MAX(updated_at) FROM holding_strategies WHERE status IN ('active','updated')", 48),
}
for name, (sql, threshold) in db_checks.items():
row = conn.execute(sql).fetchone()
ts = row[0] if row and row[0] else None
if not ts:
lines.append(check(False, f"{name} 无更新时间戳"))
issues.append(f"{name} 无更新时间戳")
warn_count += 1
continue
mtime = datetime.strptime(ts[:19], "%Y-%m-%d %H:%M:%S") if len(ts) >= 19 else datetime.strptime(ts[:10], "%Y-%m-%d")
hours_ago = (now - mtime).total_seconds() / 3600
fresh = hours_ago < threshold
time_str = f"{hours_ago:.0f}h前" if hours_ago >= 1 else f"{hours_ago*60:.0f}分钟前"
lines.append(check(fresh, f"{name} 更新于 {time_str} (阈值{threshold}h)"))
if not fresh:
issues.append(f"{name} 超过{threshold}h未更新(最近更新:{time_str}")
warn_count += 1
else:
ok_count += 1
conn.close()
except Exception:
lines.append(check(False, "DB 数据新鲜度检查失败"))
# JSON 文件新鲜度(仅限尚未迁移到 DB 的)
freshness_thresholds = {
"multi_tf_cache.json": 24, # K线缓存每日更新
"macro_context.json": 24, # 宏观数据每日2次
"market.json": 48, # 行业数据每日更新
"strategy_staleness_report.json": 24, # 时效性报告每日生成
}
data_files = {
"multi_tf_cache.json": DATA_DIR / "multi_tf_cache.json",
"macro_context.json": DATA_DIR / "macro_context.json",
"market.json": DATA_DIR / "market.json",
"strategy_staleness_report.json": DATA_DIR / "strategy_staleness_report.json",
}
for name, path in data_files.items():
if not path.exists():
lines.append(check(False, f"{name} 缺失"))
issues.append(f"{name} 文件缺失")
warn_count += 1
continue
mtime = datetime.fromtimestamp(path.stat().st_mtime)
hours_ago = (now - mtime).total_seconds() / 3600
threshold = freshness_thresholds.get(name, 24)
fresh = hours_ago < threshold
time_str = f"{hours_ago:.0f}h前" if hours_ago >= 1 else f"{hours_ago*60:.0f}分钟前"
lines.append(check(fresh, f"{name} 更新于 {time_str} (阈值{threshold}h)"))
if not fresh:
issues.append(f"{name} 超过{threshold}h未更新(最近更新:{time_str}")
warn_count += 1
else:
ok_count += 1
# 数据管道组件检查
lines.append("")
lines.append("【数据管道】")
pipe_checks = [
("再生器(regenerate_all)", r"strategy_lifecycle\.py"),
("市场采集(market_watch)", r"market_watch\.py"),
("宏观采集(macro)", r"macro_context_collector\.py"),
]
for pname, ppattern in pipe_checks:
r = subprocess.run(["pgrep", "-f", ppattern], capture_output=True, timeout=5)
if r.returncode == 0:
lines.append(check(True, f"{pname} 进程存在"))
ok_count += 1
else:
# no_agent脚本不常驻,不报warn
lines.append(" 📎 {} 无常驻进程(no_agent脚本按cron调度运行)".format(pname))
# 价格数据更新时间检查(盘中应有当日数据)
is_trading_day = now.weekday() < 5 # 周一到周五
if is_trading_day and now.hour >= 9 and now.hour < 16:
try:
from mofin_db import get_conn
conn = get_conn()
row = conn.execute("SELECT MAX(updated_at) FROM holdings WHERE is_active=1").fetchone()
conn.close()
ts = row[0] if row and row[0] else None
if ts:
mtime = datetime.strptime(ts[:19], "%Y-%m-%d %H:%M:%S") if len(ts) >= 19 else datetime.strptime(ts[:10], "%Y-%m-%d")
has_intraday_data = mtime.date() == now.date()
lines.append(check(has_intraday_data, f"盘中有当日价格数据 {'' if has_intraday_data else ''}(最近{mtime.strftime('%H:%M')})"))
if not has_intraday_data:
issues.append(f"盘中交易时段但DB holdings无今日数据(最近更新{mtime.strftime('%m-%d %H:%M')}")
warn_count += 1
else:
ok_count += 1
else:
lines.append(check(False, "盘中DB holdings无价格更新记录"))
warn_count += 1
except Exception:
lines.append(check(False, "盘中DB价格数据检查失败"))
warn_count += 1
# 汇总
total = ok_count + warn_count
lines.append("")
lines.append(f"总计: ✅ {ok_count}/{total} 正常 | ⚠️ {warn_count}/{total} 需关注")
if issues:
lines.append("")
lines.append("需关注项:")
for i, issue in enumerate(issues[:10], 1):
lines.append(f" {i}. {issue}")
report = "\n".join(lines)
print(report)
# 如果有问题,写入报告文件供推送
if warn_count > 0:
report_path = Path("/home/hmo/.hermes/profiles/position-analyst/cron/output/health")
report_path.mkdir(parents=True, exist_ok=True)
report_file = report_path / f"health_{now.strftime('%Y%m%d_%H%M')}.md"
report_file.write_text(f"# MoFin 系统健康检查\n\n{report}")
print(f"\n报告已写入 {report_file}")
else:
print("\n[SILENT] 一切正常")
if __name__ == "__main__":
run()