#!/usr/bin/env python3 """system_health_check.py — MoFin 系统健康检查 每日运行,检查所有组件是否正常工作。 输出报告,有问题才推送。 """ import json, os, sys, subprocess from datetime import datetime, timedelta from pathlib import Path DATA_DIR = Path("/home/hmo/web-dashboard/data") DECISIONS_PATH = DATA_DIR / "decisions.json" PORTFOLIO_PATH = DATA_DIR / "portfolio.json" EVENTS_PATH = DATA_DIR / "price_events.json" EVALUATION_PATH = DATA_DIR / "evaluation.json" ACCURACY_PATH = DATA_DIR / "accuracy_stats.json" CRON_JOBS = "/home/hmo/.hermes/cron/jobs.json" POSITION_CRON = "/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json" def check(ok, msg): icon = "✅" if ok else "⚠️" return f" {icon} {msg}" def load_json(path, default=None): try: with open(path) as f: return json.load(f) except: return {} if default is None else default def check_cron_jobs(path, label): issues = [] try: d = load_json(path, {"jobs": []}) for j in d.get("jobs", []): name = j.get("name", "?") enabled = j.get("enabled", True) last = j.get("last_run_at", "") status = j.get("last_status", "") if not enabled: issues.append(f"{name} 已禁用") elif not last: issues.append(f"{name} 从未运行") elif status != "ok": issues.append(f"{name} 上次状态={status}") return len(d.get("jobs", [])), issues except: return 0, ["无法读取"] def run(): now = datetime.now() issues = [] ok_count = 0 warn_count = 0 lines = [f"MoFin 系统健康检查 | {now.strftime('%Y-%m-%d %H:%M')}"] lines.append("") # 1. 进程检查 lines.append("【进程】") procs = { "mofin-dashboard": "mofin-dashboard", "xmpp-zhiwei": "xmpp_zhiwei_bot", "ejabberd": "ejabberd", } for name, pattern in procs.items(): # 先查 systemd,再查 pgrep r = subprocess.run(["systemctl", "is-active", f"{pattern}.service"], capture_output=True, text=True, timeout=5) alive = r.stdout.strip() == "active" if not alive: r2 = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5) alive = r2.returncode == 0 lines.append(check(alive, f"{name} {'运行中' if alive else '已停止'}")) if not alive: issues.append(f"{name} 进程不存在"); warn_count += 1 else: ok_count += 1 # 2. 端口检查 lines.append("") lines.append("【端口】") ports = {"8899": "Dashboard", "5222": "ejabberd", "8643": "知微Gateway"} for port, name in ports.items(): r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5) listening = f":{port}" in r.stdout lines.append(check(listening, f"{name} :{port} {'监听中' if listening else '未监听'}")) if not listening: issues.append(f"{name} 端口{port}未监听"); warn_count += 1 else: ok_count += 1 # 3. 数据文件检查 lines.append("") lines.append("【数据文件】") files = { "portfolio.json": PORTFOLIO_PATH, "watchlist.json": DATA_DIR / "watchlist.json", "decisions.json": DECISIONS_PATH, "market.json": DATA_DIR / "market.json", "price_events.json": EVENTS_PATH, "evaluation.json": EVALUATION_PATH, "accuracy_stats.json": ACCURACY_PATH, } for name, path in files.items(): exists = path.exists() size = path.stat().st_size if exists else 0 lines.append(check(exists and size > 10, f"{name} {'存在' if exists else '缺失'} ({size}B)")) if not exists or size < 10: issues.append(f"{name} 缺失或为空") warn_count += 1 else: ok_count += 1 # 4. 价格事件统计 lines.append("") lines.append("【价格事件】") try: from mofin_db import get_conn, query_price_events, query_price_events_by_date conn = get_conn() ev_list = query_price_events(conn, limit=50000) today_events = query_price_events_by_date(conn, now.strftime("%Y-%m-%d")) conn.close() except Exception: events = load_json(EVENTS_PATH, {"events": []}) ev_list = events.get("events", []) today_events = [e for e in ev_list if e.get("date") == now.strftime("%Y-%m-%d")] lines.append(check(len(ev_list) > 0, f"历史事件: {len(ev_list)}条")) lines.append(check(len(today_events) > 0, f"今日事件: {len(today_events)}条")) if len(ev_list) == 0: issues.append("price_events 无事件记录,price_monitor可能未触发过") warn_count += 1 else: ok_count += 1 # 5. 策略评估统计 lines.append("") lines.append("【策略评估】") evals = load_json(EVALUATION_PATH, {"strategies": []}) s_list = evals.get("strategies", []) lines.append(check(len(s_list) > 0, f"已评估策略: {len(s_list)}条")) if len(s_list) > 0: avg = sum(s.get("score", 0) for s in s_list) / len(s_list) lines.append(check(avg > 0, f"平均评分: {avg:.1f}/10")) ok_count += 1 else: issues.append("evaluation.json 无评估数据") warn_count += 1 # 6. 建议记录统计 lines.append("") lines.append("【建议记录】") decisions = load_json(DECISIONS_PATH, {"decisions": []}) total_advice = sum(len(d.get("advice_timeline", [])) for d in decisions.get("decisions", [])) lines.append(check(total_advice > 0, f"建议记录: {total_advice}条")) if total_advice == 0: issues.append("所有策略建议记录为空") warn_count += 1 else: ok_count += 1 # 7. Cron jobs lines.append("") lines.append("【Cron Jobs】") cnt, cron_issues = check_cron_jobs(CRON_JOBS, "default") lines.append(check(cnt > 0, f"default profile: {cnt}个job")) for ci in cron_issues: lines.append(f" ⚠️ {ci}") warn_count += 1 if cnt == 0: warn_count += 1 cnt2, cron_issues2 = check_cron_jobs(POSITION_CRON, "position-analyst") lines.append(check(cnt2 > 0, f"position-analyst: {cnt2}个job")) for ci in cron_issues2: lines.append(f" ⚠️ {ci}") warn_count += 1 if cnt2 == 0: warn_count += 1 # 8. 数据新鲜度 lines.append("") lines.append("【数据新鲜度】") # 各数据文件的合理最大陈旧时间(小时) freshness_thresholds = { "portfolio.json": 24, # 每日有数据即可 "decisions.json": 48, # 策略参数更新频率较低 "multi_tf_cache.json": 24, # K线缓存每日更新 "macro_context.json": 24, # 宏观数据每日2次 "market.json": 48, # 行业数据每日更新 "strategy_staleness_report.json": 24, # 时效性报告每日生成 } data_files = { "portfolio.json": PORTFOLIO_PATH, "decisions.json": DECISIONS_PATH, "multi_tf_cache.json": DATA_DIR / "multi_tf_cache.json", "macro_context.json": DATA_DIR / "macro_context.json", "market.json": DATA_DIR / "market.json", "strategy_staleness_report.json": DATA_DIR / "strategy_staleness_report.json", } for name, path in data_files.items(): if not path.exists(): lines.append(check(False, f"{name} 缺失")) issues.append(f"{name} 文件缺失") warn_count += 1 continue mtime = datetime.fromtimestamp(path.stat().st_mtime) hours_ago = (now - mtime).total_seconds() / 3600 threshold = freshness_thresholds.get(name, 24) fresh = hours_ago < threshold time_str = f"{hours_ago:.0f}h前" if hours_ago >= 1 else f"{hours_ago*60:.0f}分钟前" lines.append(check(fresh, f"{name} 更新于 {time_str} (阈值{threshold}h)")) if not fresh: issues.append(f"{name} 超过{threshold}h未更新(最近更新:{time_str})") warn_count += 1 else: ok_count += 1 # 数据管道组件检查 lines.append("") lines.append("【数据管道】") pipe_checks = [ ("再生器(regenerate_all)", r"strategy_lifecycle\.py"), ("市场采集(market_watch)", r"market_watch\.py"), ("宏观采集(macro)", r"macro_context_collector\.py"), ] for pname, ppattern in pipe_checks: r = subprocess.run(["pgrep", "-f", ppattern], capture_output=True, timeout=5) if r.returncode == 0: lines.append(check(True, f"{pname} 进程存在")) ok_count += 1 else: # no_agent脚本不常驻,不报warn lines.append(" 📎 {} 无常驻进程(no_agent脚本按cron调度运行)".format(pname)) # 价格数据更新时间检查(盘中应有当日数据) is_trading_day = now.weekday() < 5 # 周一到周五 if is_trading_day and now.hour >= 9 and now.hour < 16: if PORTFOLIO_PATH.exists(): mtime = datetime.fromtimestamp(PORTFOLIO_PATH.stat().st_mtime) hours_ago = (now - mtime).total_seconds() / 3600 has_intraday_data = mtime.date() == now.date() lines.append(check(has_intraday_data, f"盘中有当日价格数据 {'是' if has_intraday_data else '否'}(最近{mtime.strftime('%H:%M')})")) if not has_intraday_data: issues.append(f"盘中交易时段但portfolio.json无今日数据(最近更新{mtime.strftime('%m-%d %H:%M')})") warn_count += 1 else: ok_count += 1 # 汇总 total = ok_count + warn_count lines.append("") lines.append(f"总计: ✅ {ok_count}/{total} 正常 | ⚠️ {warn_count}/{total} 需关注") if issues: lines.append("") lines.append("需关注项:") for i, issue in enumerate(issues[:10], 1): lines.append(f" {i}. {issue}") report = "\n".join(lines) print(report) # 如果有问题,写入报告文件供推送 if warn_count > 0: report_path = Path("/home/hmo/.hermes/profiles/position-analyst/cron/output/health") report_path.mkdir(parents=True, exist_ok=True) report_file = report_path / f"health_{now.strftime('%Y%m%d_%H%M')}.md" report_file.write_text(f"# MoFin 系统健康检查\n\n{report}") print(f"\n报告已写入 {report_file}") else: print("\n[SILENT] 一切正常") if __name__ == "__main__": run()