4aa884ed31
T001: enrich_timing_signal RR参数降级 T002: price_monitor已有XMPP推送(514-525行) T003: 自选扫描加15:00轮次 T005: 策略数据口径理解完成(非bug) T006: trigger同步(已完成) T004: 截图叙事清理待设计 system_health_check.py CRON_JOBS路径修正→正确的profile路径
268 lines
10 KiB
Python
268 lines
10 KiB
Python
#!/usr/bin/env python3
|
||
"""system_health_check.py — MoFin 系统健康检查
|
||
|
||
每日运行,检查所有组件是否正常工作。
|
||
输出报告,有问题才推送。
|
||
"""
|
||
import json, os, sys, subprocess
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
|
||
DATA_DIR = Path("/home/hmo/web-dashboard/data")
|
||
DECISIONS_PATH = DATA_DIR / "decisions.json"
|
||
PORTFOLIO_PATH = DATA_DIR / "portfolio.json"
|
||
EVENTS_PATH = DATA_DIR / "price_events.json"
|
||
EVALUATION_PATH = DATA_DIR / "evaluation.json"
|
||
ACCURACY_PATH = DATA_DIR / "accuracy_stats.json"
|
||
CRON_JOBS = "/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json"
|
||
POSITION_CRON = "/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json"
|
||
|
||
def check(ok, msg):
|
||
icon = "✅" if ok else "⚠️"
|
||
return f" {icon} {msg}"
|
||
|
||
def load_json(path, default=None):
|
||
try:
|
||
with open(path) as f:
|
||
return json.load(f)
|
||
except:
|
||
return {} if default is None else default
|
||
|
||
def check_cron_jobs(path, label):
|
||
issues = []
|
||
try:
|
||
d = load_json(path, {"jobs": []})
|
||
for j in d.get("jobs", []):
|
||
name = j.get("name", "?")
|
||
enabled = j.get("enabled", True)
|
||
last = j.get("last_run_at", "")
|
||
status = j.get("last_status", "")
|
||
if not enabled:
|
||
issues.append(f"{name} 已禁用")
|
||
elif not last:
|
||
issues.append(f"{name} 从未运行")
|
||
elif status != "ok":
|
||
issues.append(f"{name} 上次状态={status}")
|
||
return len(d.get("jobs", [])), issues
|
||
except:
|
||
return 0, ["无法读取"]
|
||
|
||
def run():
|
||
now = datetime.now()
|
||
issues = []
|
||
ok_count = 0
|
||
warn_count = 0
|
||
|
||
lines = [f"MoFin 系统健康检查 | {now.strftime('%Y-%m-%d %H:%M')}"]
|
||
lines.append("")
|
||
|
||
# 1. 进程检查
|
||
lines.append("【进程】")
|
||
procs = {
|
||
"mofin-dashboard": "mofin-dashboard",
|
||
"xmpp-zhiwei": "xmpp_zhiwei_bot",
|
||
"ejabberd": "ejabberd",
|
||
}
|
||
for name, pattern in procs.items():
|
||
# 先查 systemd,再查 pgrep
|
||
r = subprocess.run(["systemctl", "is-active", f"{pattern}.service"], capture_output=True, text=True, timeout=5)
|
||
alive = r.stdout.strip() == "active"
|
||
if not alive:
|
||
r2 = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5)
|
||
alive = r2.returncode == 0
|
||
lines.append(check(alive, f"{name} {'运行中' if alive else '已停止'}"))
|
||
if not alive: issues.append(f"{name} 进程不存在"); warn_count += 1
|
||
else: ok_count += 1
|
||
|
||
# 2. 端口检查
|
||
lines.append("")
|
||
lines.append("【端口】")
|
||
ports = {"8899": "Dashboard", "5222": "ejabberd", "8643": "知微Gateway"}
|
||
for port, name in ports.items():
|
||
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
|
||
listening = f":{port}" in r.stdout
|
||
lines.append(check(listening, f"{name} :{port} {'监听中' if listening else '未监听'}"))
|
||
if not listening: issues.append(f"{name} 端口{port}未监听"); warn_count += 1
|
||
else: ok_count += 1
|
||
|
||
# 3. 数据文件检查
|
||
lines.append("")
|
||
lines.append("【数据文件】")
|
||
files = {
|
||
"portfolio.json": PORTFOLIO_PATH,
|
||
"watchlist.json": DATA_DIR / "watchlist.json",
|
||
"decisions.json": DECISIONS_PATH,
|
||
"market.json": DATA_DIR / "market.json",
|
||
"price_events.json": EVENTS_PATH,
|
||
"evaluation.json": EVALUATION_PATH,
|
||
"accuracy_stats.json": ACCURACY_PATH,
|
||
}
|
||
for name, path in files.items():
|
||
exists = path.exists()
|
||
size = path.stat().st_size if exists else 0
|
||
lines.append(check(exists and size > 10, f"{name} {'存在' if exists else '缺失'} ({size}B)"))
|
||
if not exists or size < 10:
|
||
issues.append(f"{name} 缺失或为空")
|
||
warn_count += 1
|
||
else:
|
||
ok_count += 1
|
||
|
||
# 4. 价格事件统计
|
||
lines.append("")
|
||
lines.append("【价格事件】")
|
||
try:
|
||
from mofin_db import get_conn, query_price_events, query_price_events_by_date
|
||
conn = get_conn()
|
||
ev_list = query_price_events(conn, limit=50000)
|
||
today_events = query_price_events_by_date(conn, now.strftime("%Y-%m-%d"))
|
||
conn.close()
|
||
except Exception:
|
||
events = load_json(EVENTS_PATH, {"events": []})
|
||
ev_list = events.get("events", [])
|
||
today_events = [e for e in ev_list if e.get("date") == now.strftime("%Y-%m-%d")]
|
||
lines.append(check(len(ev_list) > 0, f"历史事件: {len(ev_list)}条"))
|
||
lines.append(check(len(today_events) > 0, f"今日事件: {len(today_events)}条"))
|
||
if len(ev_list) == 0:
|
||
issues.append("price_events 无事件记录,price_monitor可能未触发过")
|
||
warn_count += 1
|
||
else:
|
||
ok_count += 1
|
||
|
||
# 5. 策略评估统计
|
||
lines.append("")
|
||
lines.append("【策略评估】")
|
||
evals = load_json(EVALUATION_PATH, {"strategies": []})
|
||
s_list = evals.get("strategies", [])
|
||
lines.append(check(len(s_list) > 0, f"已评估策略: {len(s_list)}条"))
|
||
if len(s_list) > 0:
|
||
avg = sum(s.get("score", 0) for s in s_list) / len(s_list)
|
||
lines.append(check(avg > 0, f"平均评分: {avg:.1f}/10"))
|
||
ok_count += 1
|
||
else:
|
||
issues.append("evaluation.json 无评估数据")
|
||
warn_count += 1
|
||
|
||
# 6. 建议记录统计
|
||
lines.append("")
|
||
lines.append("【建议记录】")
|
||
decisions = load_json(DECISIONS_PATH, {"decisions": []})
|
||
total_advice = sum(len(d.get("advice_timeline", [])) for d in decisions.get("decisions", []))
|
||
lines.append(check(total_advice > 0, f"建议记录: {total_advice}条"))
|
||
if total_advice == 0:
|
||
issues.append("所有策略建议记录为空")
|
||
warn_count += 1
|
||
else:
|
||
ok_count += 1
|
||
|
||
# 7. Cron jobs
|
||
lines.append("")
|
||
lines.append("【Cron Jobs】")
|
||
cnt, cron_issues = check_cron_jobs(CRON_JOBS, "default")
|
||
lines.append(check(cnt > 0, f"default profile: {cnt}个job"))
|
||
for ci in cron_issues:
|
||
lines.append(f" ⚠️ {ci}")
|
||
warn_count += 1
|
||
if cnt == 0: warn_count += 1
|
||
cnt2, cron_issues2 = check_cron_jobs(POSITION_CRON, "position-analyst")
|
||
lines.append(check(cnt2 > 0, f"position-analyst: {cnt2}个job"))
|
||
for ci in cron_issues2:
|
||
lines.append(f" ⚠️ {ci}")
|
||
warn_count += 1
|
||
if cnt2 == 0: warn_count += 1
|
||
|
||
# 8. 数据新鲜度
|
||
lines.append("")
|
||
lines.append("【数据新鲜度】")
|
||
# 各数据文件的合理最大陈旧时间(小时)
|
||
freshness_thresholds = {
|
||
"portfolio.json": 24, # 每日有数据即可
|
||
"decisions.json": 48, # 策略参数更新频率较低
|
||
"multi_tf_cache.json": 24, # K线缓存每日更新
|
||
"macro_context.json": 24, # 宏观数据每日2次
|
||
"market.json": 48, # 行业数据每日更新
|
||
"strategy_staleness_report.json": 24, # 时效性报告每日生成
|
||
}
|
||
data_files = {
|
||
"portfolio.json": PORTFOLIO_PATH,
|
||
"decisions.json": DECISIONS_PATH,
|
||
"multi_tf_cache.json": DATA_DIR / "multi_tf_cache.json",
|
||
"macro_context.json": DATA_DIR / "macro_context.json",
|
||
"market.json": DATA_DIR / "market.json",
|
||
"strategy_staleness_report.json": DATA_DIR / "strategy_staleness_report.json",
|
||
}
|
||
for name, path in data_files.items():
|
||
if not path.exists():
|
||
lines.append(check(False, f"{name} 缺失"))
|
||
issues.append(f"{name} 文件缺失")
|
||
warn_count += 1
|
||
continue
|
||
mtime = datetime.fromtimestamp(path.stat().st_mtime)
|
||
hours_ago = (now - mtime).total_seconds() / 3600
|
||
threshold = freshness_thresholds.get(name, 24)
|
||
fresh = hours_ago < threshold
|
||
time_str = f"{hours_ago:.0f}h前" if hours_ago >= 1 else f"{hours_ago*60:.0f}分钟前"
|
||
lines.append(check(fresh, f"{name} 更新于 {time_str} (阈值{threshold}h)"))
|
||
if not fresh:
|
||
issues.append(f"{name} 超过{threshold}h未更新(最近更新:{time_str})")
|
||
warn_count += 1
|
||
else:
|
||
ok_count += 1
|
||
|
||
# 数据管道组件检查
|
||
lines.append("")
|
||
lines.append("【数据管道】")
|
||
pipe_checks = [
|
||
("再生器(regenerate_all)", r"strategy_lifecycle\.py"),
|
||
("市场采集(market_watch)", r"market_watch\.py"),
|
||
("宏观采集(macro)", r"macro_context_collector\.py"),
|
||
]
|
||
for pname, ppattern in pipe_checks:
|
||
r = subprocess.run(["pgrep", "-f", ppattern], capture_output=True, timeout=5)
|
||
if r.returncode == 0:
|
||
lines.append(check(True, f"{pname} 进程存在"))
|
||
ok_count += 1
|
||
else:
|
||
# no_agent脚本不常驻,不报warn
|
||
lines.append(" 📎 {} 无常驻进程(no_agent脚本按cron调度运行)".format(pname))
|
||
|
||
# 价格数据更新时间检查(盘中应有当日数据)
|
||
is_trading_day = now.weekday() < 5 # 周一到周五
|
||
if is_trading_day and now.hour >= 9 and now.hour < 16:
|
||
if PORTFOLIO_PATH.exists():
|
||
mtime = datetime.fromtimestamp(PORTFOLIO_PATH.stat().st_mtime)
|
||
hours_ago = (now - mtime).total_seconds() / 3600
|
||
has_intraday_data = mtime.date() == now.date()
|
||
lines.append(check(has_intraday_data, f"盘中有当日价格数据 {'是' if has_intraday_data else '否'}(最近{mtime.strftime('%H:%M')})"))
|
||
if not has_intraday_data:
|
||
issues.append(f"盘中交易时段但portfolio.json无今日数据(最近更新{mtime.strftime('%m-%d %H:%M')})")
|
||
warn_count += 1
|
||
else:
|
||
ok_count += 1
|
||
|
||
# 汇总
|
||
total = ok_count + warn_count
|
||
lines.append("")
|
||
lines.append(f"总计: ✅ {ok_count}/{total} 正常 | ⚠️ {warn_count}/{total} 需关注")
|
||
if issues:
|
||
lines.append("")
|
||
lines.append("需关注项:")
|
||
for i, issue in enumerate(issues[:10], 1):
|
||
lines.append(f" {i}. {issue}")
|
||
|
||
report = "\n".join(lines)
|
||
print(report)
|
||
|
||
# 如果有问题,写入报告文件供推送
|
||
if warn_count > 0:
|
||
report_path = Path("/home/hmo/.hermes/profiles/position-analyst/cron/output/health")
|
||
report_path.mkdir(parents=True, exist_ok=True)
|
||
report_file = report_path / f"health_{now.strftime('%Y%m%d_%H%M')}.md"
|
||
report_file.write_text(f"# MoFin 系统健康检查\n\n{report}")
|
||
print(f"\n报告已写入 {report_file}")
|
||
else:
|
||
print("\n[SILENT] 一切正常")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
run()
|