Files
MoFin/system_health_check.py
T
知微 4aa884ed31 TODO一口气4/6完成
T001: enrich_timing_signal RR参数降级
T002: price_monitor已有XMPP推送(514-525行)
T003: 自选扫描加15:00轮次
T005: 策略数据口径理解完成(非bug)
T006: trigger同步(已完成)
T004: 截图叙事清理待设计

system_health_check.py CRON_JOBS路径修正→正确的profile路径
2026-06-24 17:50:49 +08:00

268 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""system_health_check.py — MoFin 系统健康检查
每日运行,检查所有组件是否正常工作。
输出报告,有问题才推送。
"""
import json, os, sys, subprocess
from datetime import datetime, timedelta
from pathlib import Path
DATA_DIR = Path("/home/hmo/web-dashboard/data")
DECISIONS_PATH = DATA_DIR / "decisions.json"
PORTFOLIO_PATH = DATA_DIR / "portfolio.json"
EVENTS_PATH = DATA_DIR / "price_events.json"
EVALUATION_PATH = DATA_DIR / "evaluation.json"
ACCURACY_PATH = DATA_DIR / "accuracy_stats.json"
CRON_JOBS = "/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json"
POSITION_CRON = "/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json"
def check(ok, msg):
icon = "" if ok else "⚠️"
return f" {icon} {msg}"
def load_json(path, default=None):
try:
with open(path) as f:
return json.load(f)
except:
return {} if default is None else default
def check_cron_jobs(path, label):
issues = []
try:
d = load_json(path, {"jobs": []})
for j in d.get("jobs", []):
name = j.get("name", "?")
enabled = j.get("enabled", True)
last = j.get("last_run_at", "")
status = j.get("last_status", "")
if not enabled:
issues.append(f"{name} 已禁用")
elif not last:
issues.append(f"{name} 从未运行")
elif status != "ok":
issues.append(f"{name} 上次状态={status}")
return len(d.get("jobs", [])), issues
except:
return 0, ["无法读取"]
def run():
now = datetime.now()
issues = []
ok_count = 0
warn_count = 0
lines = [f"MoFin 系统健康检查 | {now.strftime('%Y-%m-%d %H:%M')}"]
lines.append("")
# 1. 进程检查
lines.append("【进程】")
procs = {
"mofin-dashboard": "mofin-dashboard",
"xmpp-zhiwei": "xmpp_zhiwei_bot",
"ejabberd": "ejabberd",
}
for name, pattern in procs.items():
# 先查 systemd,再查 pgrep
r = subprocess.run(["systemctl", "is-active", f"{pattern}.service"], capture_output=True, text=True, timeout=5)
alive = r.stdout.strip() == "active"
if not alive:
r2 = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5)
alive = r2.returncode == 0
lines.append(check(alive, f"{name} {'运行中' if alive else '已停止'}"))
if not alive: issues.append(f"{name} 进程不存在"); warn_count += 1
else: ok_count += 1
# 2. 端口检查
lines.append("")
lines.append("【端口】")
ports = {"8899": "Dashboard", "5222": "ejabberd", "8643": "知微Gateway"}
for port, name in ports.items():
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
listening = f":{port}" in r.stdout
lines.append(check(listening, f"{name} :{port} {'监听中' if listening else '未监听'}"))
if not listening: issues.append(f"{name} 端口{port}未监听"); warn_count += 1
else: ok_count += 1
# 3. 数据文件检查
lines.append("")
lines.append("【数据文件】")
files = {
"portfolio.json": PORTFOLIO_PATH,
"watchlist.json": DATA_DIR / "watchlist.json",
"decisions.json": DECISIONS_PATH,
"market.json": DATA_DIR / "market.json",
"price_events.json": EVENTS_PATH,
"evaluation.json": EVALUATION_PATH,
"accuracy_stats.json": ACCURACY_PATH,
}
for name, path in files.items():
exists = path.exists()
size = path.stat().st_size if exists else 0
lines.append(check(exists and size > 10, f"{name} {'存在' if exists else '缺失'} ({size}B)"))
if not exists or size < 10:
issues.append(f"{name} 缺失或为空")
warn_count += 1
else:
ok_count += 1
# 4. 价格事件统计
lines.append("")
lines.append("【价格事件】")
try:
from mofin_db import get_conn, query_price_events, query_price_events_by_date
conn = get_conn()
ev_list = query_price_events(conn, limit=50000)
today_events = query_price_events_by_date(conn, now.strftime("%Y-%m-%d"))
conn.close()
except Exception:
events = load_json(EVENTS_PATH, {"events": []})
ev_list = events.get("events", [])
today_events = [e for e in ev_list if e.get("date") == now.strftime("%Y-%m-%d")]
lines.append(check(len(ev_list) > 0, f"历史事件: {len(ev_list)}"))
lines.append(check(len(today_events) > 0, f"今日事件: {len(today_events)}"))
if len(ev_list) == 0:
issues.append("price_events 无事件记录,price_monitor可能未触发过")
warn_count += 1
else:
ok_count += 1
# 5. 策略评估统计
lines.append("")
lines.append("【策略评估】")
evals = load_json(EVALUATION_PATH, {"strategies": []})
s_list = evals.get("strategies", [])
lines.append(check(len(s_list) > 0, f"已评估策略: {len(s_list)}"))
if len(s_list) > 0:
avg = sum(s.get("score", 0) for s in s_list) / len(s_list)
lines.append(check(avg > 0, f"平均评分: {avg:.1f}/10"))
ok_count += 1
else:
issues.append("evaluation.json 无评估数据")
warn_count += 1
# 6. 建议记录统计
lines.append("")
lines.append("【建议记录】")
decisions = load_json(DECISIONS_PATH, {"decisions": []})
total_advice = sum(len(d.get("advice_timeline", [])) for d in decisions.get("decisions", []))
lines.append(check(total_advice > 0, f"建议记录: {total_advice}"))
if total_advice == 0:
issues.append("所有策略建议记录为空")
warn_count += 1
else:
ok_count += 1
# 7. Cron jobs
lines.append("")
lines.append("【Cron Jobs】")
cnt, cron_issues = check_cron_jobs(CRON_JOBS, "default")
lines.append(check(cnt > 0, f"default profile: {cnt}个job"))
for ci in cron_issues:
lines.append(f" ⚠️ {ci}")
warn_count += 1
if cnt == 0: warn_count += 1
cnt2, cron_issues2 = check_cron_jobs(POSITION_CRON, "position-analyst")
lines.append(check(cnt2 > 0, f"position-analyst: {cnt2}个job"))
for ci in cron_issues2:
lines.append(f" ⚠️ {ci}")
warn_count += 1
if cnt2 == 0: warn_count += 1
# 8. 数据新鲜度
lines.append("")
lines.append("【数据新鲜度】")
# 各数据文件的合理最大陈旧时间(小时)
freshness_thresholds = {
"portfolio.json": 24, # 每日有数据即可
"decisions.json": 48, # 策略参数更新频率较低
"multi_tf_cache.json": 24, # K线缓存每日更新
"macro_context.json": 24, # 宏观数据每日2次
"market.json": 48, # 行业数据每日更新
"strategy_staleness_report.json": 24, # 时效性报告每日生成
}
data_files = {
"portfolio.json": PORTFOLIO_PATH,
"decisions.json": DECISIONS_PATH,
"multi_tf_cache.json": DATA_DIR / "multi_tf_cache.json",
"macro_context.json": DATA_DIR / "macro_context.json",
"market.json": DATA_DIR / "market.json",
"strategy_staleness_report.json": DATA_DIR / "strategy_staleness_report.json",
}
for name, path in data_files.items():
if not path.exists():
lines.append(check(False, f"{name} 缺失"))
issues.append(f"{name} 文件缺失")
warn_count += 1
continue
mtime = datetime.fromtimestamp(path.stat().st_mtime)
hours_ago = (now - mtime).total_seconds() / 3600
threshold = freshness_thresholds.get(name, 24)
fresh = hours_ago < threshold
time_str = f"{hours_ago:.0f}h前" if hours_ago >= 1 else f"{hours_ago*60:.0f}分钟前"
lines.append(check(fresh, f"{name} 更新于 {time_str} (阈值{threshold}h)"))
if not fresh:
issues.append(f"{name} 超过{threshold}h未更新(最近更新:{time_str}")
warn_count += 1
else:
ok_count += 1
# 数据管道组件检查
lines.append("")
lines.append("【数据管道】")
pipe_checks = [
("再生器(regenerate_all)", r"strategy_lifecycle\.py"),
("市场采集(market_watch)", r"market_watch\.py"),
("宏观采集(macro)", r"macro_context_collector\.py"),
]
for pname, ppattern in pipe_checks:
r = subprocess.run(["pgrep", "-f", ppattern], capture_output=True, timeout=5)
if r.returncode == 0:
lines.append(check(True, f"{pname} 进程存在"))
ok_count += 1
else:
# no_agent脚本不常驻,不报warn
lines.append(" 📎 {} 无常驻进程(no_agent脚本按cron调度运行)".format(pname))
# 价格数据更新时间检查(盘中应有当日数据)
is_trading_day = now.weekday() < 5 # 周一到周五
if is_trading_day and now.hour >= 9 and now.hour < 16:
if PORTFOLIO_PATH.exists():
mtime = datetime.fromtimestamp(PORTFOLIO_PATH.stat().st_mtime)
hours_ago = (now - mtime).total_seconds() / 3600
has_intraday_data = mtime.date() == now.date()
lines.append(check(has_intraday_data, f"盘中有当日价格数据 {'' if has_intraday_data else ''}(最近{mtime.strftime('%H:%M')})"))
if not has_intraday_data:
issues.append(f"盘中交易时段但portfolio.json无今日数据(最近更新{mtime.strftime('%m-%d %H:%M')}")
warn_count += 1
else:
ok_count += 1
# 汇总
total = ok_count + warn_count
lines.append("")
lines.append(f"总计: ✅ {ok_count}/{total} 正常 | ⚠️ {warn_count}/{total} 需关注")
if issues:
lines.append("")
lines.append("需关注项:")
for i, issue in enumerate(issues[:10], 1):
lines.append(f" {i}. {issue}")
report = "\n".join(lines)
print(report)
# 如果有问题,写入报告文件供推送
if warn_count > 0:
report_path = Path("/home/hmo/.hermes/profiles/position-analyst/cron/output/health")
report_path.mkdir(parents=True, exist_ok=True)
report_file = report_path / f"health_{now.strftime('%Y%m%d_%H%M')}.md"
report_file.write_text(f"# MoFin 系统健康检查\n\n{report}")
print(f"\n报告已写入 {report_file}")
else:
print("\n[SILENT] 一切正常")
if __name__ == "__main__":
run()