加回 .gitignore

This commit is contained in:
知微
2026-06-20 12:43:24 +08:00
parent 33df400c01
commit 66a8f24e17
11 changed files with 4119 additions and 0 deletions
+260
View File
@@ -0,0 +1,260 @@
#!/usr/bin/env python3
"""system_health_check.py — MoFin 系统健康检查
每日运行,检查所有组件是否正常工作。
输出报告,有问题才推送。
"""
import json, os, sys, subprocess
from datetime import datetime, timedelta
from pathlib import Path
DATA_DIR = Path("/home/hmo/web-dashboard/data")
DECISIONS_PATH = DATA_DIR / "decisions.json"
PORTFOLIO_PATH = DATA_DIR / "portfolio.json"
EVENTS_PATH = DATA_DIR / "price_events.json"
EVALUATION_PATH = DATA_DIR / "evaluation.json"
ACCURACY_PATH = DATA_DIR / "accuracy_stats.json"
CRON_JOBS = "/home/hmo/.hermes/cron/jobs.json"
POSITION_CRON = "/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json"
def check(ok, msg):
icon = "" if ok else "⚠️"
return f" {icon} {msg}"
def load_json(path, default=None):
try:
with open(path) as f:
return json.load(f)
except:
return {} if default is None else default
def check_cron_jobs(path, label):
issues = []
try:
d = load_json(path, {"jobs": []})
for j in d.get("jobs", []):
name = j.get("name", "?")
enabled = j.get("enabled", True)
last = j.get("last_run_at", "")
status = j.get("last_status", "")
if not enabled:
issues.append(f"{name} 已禁用")
elif not last:
issues.append(f"{name} 从未运行")
elif status != "ok":
issues.append(f"{name} 上次状态={status}")
return len(d.get("jobs", [])), issues
except:
return 0, ["无法读取"]
def run():
now = datetime.now()
issues = []
ok_count = 0
warn_count = 0
lines = [f"MoFin 系统健康检查 | {now.strftime('%Y-%m-%d %H:%M')}"]
lines.append("")
# 1. 进程检查
lines.append("【进程】")
procs = {
"mofin-dashboard": "mofin-dashboard",
"xmpp-zhiwei": "xmpp_zhiwei_bot",
"ejabberd": "ejabberd",
}
for name, pattern in procs.items():
# 先查 systemd,再查 pgrep
r = subprocess.run(["systemctl", "is-active", f"{pattern}.service"], capture_output=True, text=True, timeout=5)
alive = r.stdout.strip() == "active"
if not alive:
r2 = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5)
alive = r2.returncode == 0
lines.append(check(alive, f"{name} {'运行中' if alive else '已停止'}"))
if not alive: issues.append(f"{name} 进程不存在"); warn_count += 1
else: ok_count += 1
# 2. 端口检查
lines.append("")
lines.append("【端口】")
ports = {"8899": "Dashboard", "5222": "ejabberd", "8643": "知微Gateway"}
for port, name in ports.items():
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
listening = f":{port}" in r.stdout
lines.append(check(listening, f"{name} :{port} {'监听中' if listening else '未监听'}"))
if not listening: issues.append(f"{name} 端口{port}未监听"); warn_count += 1
else: ok_count += 1
# 3. 数据文件检查
lines.append("")
lines.append("【数据文件】")
files = {
"portfolio.json": PORTFOLIO_PATH,
"watchlist.json": DATA_DIR / "watchlist.json",
"decisions.json": DECISIONS_PATH,
"market.json": DATA_DIR / "market.json",
"price_events.json": EVENTS_PATH,
"evaluation.json": EVALUATION_PATH,
"accuracy_stats.json": ACCURACY_PATH,
}
for name, path in files.items():
exists = path.exists()
size = path.stat().st_size if exists else 0
lines.append(check(exists and size > 10, f"{name} {'存在' if exists else '缺失'} ({size}B)"))
if not exists or size < 10:
issues.append(f"{name} 缺失或为空")
warn_count += 1
else:
ok_count += 1
# 4. 价格事件统计
lines.append("")
lines.append("【价格事件】")
events = load_json(EVENTS_PATH, {"events": []})
ev_list = events.get("events", [])
today_events = [e for e in ev_list if e.get("date") == now.strftime("%Y-%m-%d")]
lines.append(check(len(ev_list) > 0, f"历史事件: {len(ev_list)}"))
lines.append(check(len(today_events) > 0, f"今日事件: {len(today_events)}"))
if len(ev_list) == 0:
issues.append("price_events.json 无事件记录,price_monitor可能未触发过")
warn_count += 1
else:
ok_count += 1
# 5. 策略评估统计
lines.append("")
lines.append("【策略评估】")
evals = load_json(EVALUATION_PATH, {"strategies": []})
s_list = evals.get("strategies", [])
lines.append(check(len(s_list) > 0, f"已评估策略: {len(s_list)}"))
if len(s_list) > 0:
avg = sum(s.get("score", 0) for s in s_list) / len(s_list)
lines.append(check(avg > 0, f"平均评分: {avg:.1f}/10"))
ok_count += 1
else:
issues.append("evaluation.json 无评估数据")
warn_count += 1
# 6. 建议记录统计
lines.append("")
lines.append("【建议记录】")
decisions = load_json(DECISIONS_PATH, {"decisions": []})
total_advice = sum(len(d.get("advice_timeline", [])) for d in decisions.get("decisions", []))
lines.append(check(total_advice > 0, f"建议记录: {total_advice}"))
if total_advice == 0:
issues.append("所有策略建议记录为空")
warn_count += 1
else:
ok_count += 1
# 7. Cron jobs
lines.append("")
lines.append("【Cron Jobs】")
cnt, cron_issues = check_cron_jobs(CRON_JOBS, "default")
lines.append(check(cnt > 0, f"default profile: {cnt}个job"))
for ci in cron_issues:
lines.append(f" ⚠️ {ci}")
warn_count += 1
if cnt == 0: warn_count += 1
cnt2, cron_issues2 = check_cron_jobs(POSITION_CRON, "position-analyst")
lines.append(check(cnt2 > 0, f"position-analyst: {cnt2}个job"))
for ci in cron_issues2:
lines.append(f" ⚠️ {ci}")
warn_count += 1
if cnt2 == 0: warn_count += 1
# 8. 数据新鲜度
lines.append("")
lines.append("【数据新鲜度】")
# 各数据文件的合理最大陈旧时间(小时)
freshness_thresholds = {
"portfolio.json": 24, # 每日有数据即可
"decisions.json": 48, # 策略参数更新频率较低
"multi_tf_cache.json": 24, # K线缓存每日更新
"macro_context.json": 24, # 宏观数据每日2次
"market.json": 48, # 行业数据每日更新
"strategy_staleness_report.json": 24, # 时效性报告每日生成
}
data_files = {
"portfolio.json": PORTFOLIO_PATH,
"decisions.json": DECISIONS_PATH,
"multi_tf_cache.json": DATA_DIR / "multi_tf_cache.json",
"macro_context.json": DATA_DIR / "macro_context.json",
"market.json": DATA_DIR / "market.json",
"strategy_staleness_report.json": DATA_DIR / "strategy_staleness_report.json",
}
for name, path in data_files.items():
if not path.exists():
lines.append(check(False, f"{name} 缺失"))
issues.append(f"{name} 文件缺失")
warn_count += 1
continue
mtime = datetime.fromtimestamp(path.stat().st_mtime)
hours_ago = (now - mtime).total_seconds() / 3600
threshold = freshness_thresholds.get(name, 24)
fresh = hours_ago < threshold
time_str = f"{hours_ago:.0f}h前" if hours_ago >= 1 else f"{hours_ago*60:.0f}分钟前"
lines.append(check(fresh, f"{name} 更新于 {time_str} (阈值{threshold}h)"))
if not fresh:
issues.append(f"{name} 超过{threshold}h未更新(最近更新:{time_str}")
warn_count += 1
else:
ok_count += 1
# 数据管道组件检查
lines.append("")
lines.append("【数据管道】")
pipe_checks = [
("再生器(regenerate_all)", r"strategy_lifecycle\.py"),
("市场采集(market_watch)", r"market_watch\.py"),
("宏观采集(macro)", r"macro_context_collector\.py"),
]
for pname, ppattern in pipe_checks:
r = subprocess.run(["pgrep", "-f", ppattern], capture_output=True, timeout=5)
if r.returncode == 0:
lines.append(check(True, f"{pname} 进程存在"))
ok_count += 1
else:
# no_agent脚本不常驻,不报warn
lines.append(" 📎 {} 无常驻进程(no_agent脚本按cron调度运行)".format(pname))
# 价格数据更新时间检查(盘中应有当日数据)
is_trading_day = now.weekday() < 5 # 周一到周五
if is_trading_day and now.hour >= 9 and now.hour < 16:
if PORTFOLIO_PATH.exists():
mtime = datetime.fromtimestamp(PORTFOLIO_PATH.stat().st_mtime)
hours_ago = (now - mtime).total_seconds() / 3600
has_intraday_data = mtime.date() == now.date()
lines.append(check(has_intraday_data, f"盘中有当日价格数据 {'' if has_intraday_data else ''}(最近{mtime.strftime('%H:%M')})"))
if not has_intraday_data:
issues.append(f"盘中交易时段但portfolio.json无今日数据(最近更新{mtime.strftime('%m-%d %H:%M')}")
warn_count += 1
else:
ok_count += 1
# 汇总
total = ok_count + warn_count
lines.append("")
lines.append(f"总计: ✅ {ok_count}/{total} 正常 | ⚠️ {warn_count}/{total} 需关注")
if issues:
lines.append("")
lines.append("需关注项:")
for i, issue in enumerate(issues[:10], 1):
lines.append(f" {i}. {issue}")
report = "\n".join(lines)
print(report)
# 如果有问题,写入报告文件供推送
if warn_count > 0:
report_path = Path("/home/hmo/.hermes/profiles/position-analyst/cron/output/health")
report_path.mkdir(parents=True, exist_ok=True)
report_file = report_path / f"health_{now.strftime('%Y%m%d_%H%M')}.md"
report_file.write_text(f"# MoFin 系统健康检查\n\n{report}")
print(f"\n报告已写入 {report_file}")
else:
print("\n[SILENT] 一切正常")
if __name__ == "__main__":
run()