Files
MoFin/scripts/morning_health_check.py
T
知微 a24505ebef feat: 系统常规体检机制
- 新增 morning_health_check.py:分层分类全面体检脚本
- 新增 health_checklist.json:可动态维护的检查清单(自动发现新组件)
- 新增 docs/morning-health-check.md:完整设计文档
- 新增 skill: morning-health-check
- 修复:xiaoguo_scanner.py 同步到profile脚本目录
- 修复:system_audit.py 从symlink改为硬拷贝(解决脚本路径越界)
- 修复:morning_health_check.py 使用jobs.json而非cron.db(更可靠)

自动检查7层43项:
基础设施/SENSE/RESPOND/ADAPT/IMPROVE/数据文件/管道完整性
每天8:00开盘前自动跑一次
2026-06-24 20:09:10 +08:00

526 lines
20 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""morning_health_check.py — MoFin 系统常规体检
每日开盘前(8:00)运行,全面扫描MoFin所有组件健康状况。
输出格式化的体检报告,有问题才出声,没问题静默。
核心设计:
1. 从 health_checklist.json 读检查清单
2. 逐项检查,记录状态
3. 报告异常项(只推异常,不推正常)
4. 自动发现新增cron/脚本(通过 self_discovery 函数)
5. 维护检查历史 (health_check_history.json)
新增组件自动发现机制:
- 对比当前cron list与checklist中记录的cron id
- 发现新cron → 自动追加到checklist
- 脚本修改 → 标记"需复核"
用法:
python3 scripts/morning_health_check.py [--report] [--update-checklist]
--report: 强制输出完整报告(默认只输出异常)
--update-checklist: 运行自动发现并更新checklist
no_agent模式:只输出异常项,无异常完全静默
"""
import json, os, sqlite3, subprocess, sys, time, urllib.request
from pathlib import Path
from datetime import datetime, timedelta
# ── 路径 ──
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
SCRIPTS_DIR = BASE / "scripts"
PROFILE_SCRIPTS = Path("/home/hmo/.hermes/profiles/position-analyst/scripts")
CHECKLIST_PATH = DATA / "health_checklist.json"
HISTORY_PATH = DATA / "health_check_history.json"
DB_PATH = DATA / "mofin.db"
HERMES_CRON_DIR = Path("/home/hmo/.hermes/profiles/position-analyst/cron")
# 异常缓存(同一问题24h内不重复推)
KNOWN_ISSUES_PATH = DATA / "health_known_issues.json"
# ── 上下文 ──
ctx = {
"report": [],
"issues": [],
"ok_count": 0,
"warn_count": 0,
"error_count": 0,
"critical_count": 0,
"started_at": datetime.now(),
}
def log(level, category, msg, detail=None):
"""记录检查结果"""
ctx["report"].append({
"level": level, "category": category, "msg": msg, "detail": detail,
"timestamp": datetime.now().isoformat()
})
if level == "critical":
ctx["critical_count"] += 1
elif level == "error":
ctx["error_count"] += 1
elif level == "warn":
ctx["warn_count"] += 1
else:
ctx["ok_count"] += 1
def emit(msg, level="ok"):
"""输出一行"""
prefix = {"critical": "🔴", "error": "", "warn": "⚠️", "ok": "", "info": "📎"}.get(level, "")
return f"{prefix} {msg}"
# ── 检查器集合 ──
def check_systemctl(service_name):
"""检查systemd服务状态"""
try:
r = subprocess.run(["systemctl", "is-active", service_name],
capture_output=True, text=True, timeout=5)
status = r.stdout.strip()
return status == "active", f"{status}"
except Exception as e:
return False, f"error:{e}"
def check_port(port):
"""检查端口是否在监听"""
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
return f":{port}" in r.stdout, "listening" if f":{port}" in r.stdout else "not_found"
except Exception as e:
return False, f"error:{e}"
def check_process(pattern):
"""检查进程是否存在"""
try:
r = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5)
return r.returncode == 0, "running" if r.returncode == 0 else "not_found"
except:
return False, "check_error"
def check_http(url, timeout=15):
"""检查HTTP端点是否可达 (清理代理环境变量)"""
try:
# 清理所有代理环境变量
old_env = {}
for k in list(os.environ.keys()):
if 'proxy' in k.lower():
old_env[k] = os.environ.pop(k)
req = urllib.request.Request(url, method="GET")
resp = urllib.request.urlopen(req, timeout=timeout)
# 恢复
for k, v in old_env.items():
os.environ[k] = v
return True, str(resp.status)
except Exception as e:
return False, str(e)[:60]
def check_disk(mount):
"""检查磁盘空间"""
try:
r = subprocess.run(["df", "-h", mount], capture_output=True, text=True, timeout=5)
lines = r.stdout.strip().split("\n")
if len(lines) >= 2:
parts = lines[1].split()
if len(parts) >= 5:
pct = parts[4].replace("%", "")
return int(pct) < 90, f"{pct}% used"
return False, "parse_error"
except:
return False, "check_error"
def check_file_exists(path):
"""检查文件存在"""
p = Path(path)
exists = p.exists()
return exists, f"{p.stat().st_size}B" if exists else "missing"
def check_file_freshness(path, max_hours):
"""检查文件新鲜度"""
p = Path(path)
if not p.exists():
return False, "missing"
mtime = datetime.fromtimestamp(p.stat().st_mtime)
hours_ago = (ctx["started_at"] - mtime).total_seconds() / 3600
return hours_ago < max_hours, f"{hours_ago:.0f}h ago (threshold {max_hours}h)"
def check_db_table_count(table, field, value, op="today", threshold=0):
"""检查数据库中的记录数"""
try:
conn = sqlite3.connect(str(DB_PATH))
cur = conn.cursor()
if op == "today":
today = ctx["started_at"].strftime("%Y-%m-%d")
# 先检查表有哪些列
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
date_col = None
for candidate in ["created_at", "date", "timestamp", "last_scanned_at", "signal_date"]:
if candidate in cols:
date_col = candidate
break
if not date_col:
conn.close()
return True, f"no_date_col_in_{table}"
sql = f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?"
cur.execute(sql, (today,))
elif op == "unprocessed":
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
if "processed" in cols:
sql = f"SELECT COUNT(*) FROM {table} WHERE (processed = 0 OR processed IS NULL)"
elif "source" in cols:
sql = f"SELECT COUNT(*) FROM {table} WHERE source LIKE '%xiaoguo%'"
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
elif op == "count":
if field:
sql = f"SELECT COUNT(*) FROM {table} WHERE {field} = ?"
cur.execute(sql, (value,))
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
count = cur.fetchone()[0]
conn.close()
if op == "unprocessed":
return count < threshold, f"{count} unprocessed"
return count >= threshold, f"{count} rows"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_cron(job_id):
"""检查cron任务状态(通过jobs.json"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if cron_jobs_path.exists():
data = json.loads(cron_jobs_path.read_text())
for job in data.get("jobs", []):
if job.get("id") == job_id or (not job.get("id") and job.get("name") and job_id in str(job)):
enabled = job.get("enabled", True)
if not enabled:
return False, "disabled"
last_status = job.get("last_status")
if last_status and last_status != "ok":
return False, f"status={last_status}"
last_run = job.get("last_run_at", "")
if last_run:
try:
last_dt = datetime.fromisoformat(last_run)
hours_ago = (ctx["started_at"] - last_dt).total_seconds() / 3600
if hours_ago > 48:
return True, f"ok(stale:{hours_ago:.0f}h)"
except:
pass
return True, "ok"
# 没找到该job_id - 可能是不需要检查的cron
return True, "not_in_jobs_json"
return False, "no_jobs_json"
except Exception as e:
return False, f"check_error:{str(e)[:60]}"
def check_cron_errors_last24h():
"""检查最近24h内cron是否有error状态"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()
errors = []
for job in data.get("jobs", []):
last_status = job.get("last_status")
last_run = job.get("last_run_at", "")
if last_status and last_status != "ok" and last_run:
try:
if last_run >= check_time[:19]:
errors.append(f"{job.get('name','?')}({last_status})")
except:
pass
if errors:
return False, f"{len(errors)} errors: {'; '.join(errors[:5])}"
return True, "0 errors"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_cron_paused():
"""检查不应暂停的cron是否被误暂停"""
should_run = [
("3a9fb3300a6a", "价格监控"),
("0851c7838ca3", "小果扫描"),
("e13323928f3a", "自选提醒"),
("b809fcabfa5b", "分支评估"),
]
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
job_map = {job.get("id"): job for job in data.get("jobs", [])}
paused = []
for jid, name in should_run:
job = job_map.get(jid)
if job and not job.get("enabled", True):
paused.append(name)
if paused:
return False, f"paused: {', '.join(paused)}"
return True, "all_expected_running"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_delivery_targets():
"""检查deliver=origin的cron是否有目标"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
issues = []
for job in data.get("jobs", []):
last_delivery_err = job.get("last_delivery_error", "")
if last_delivery_err and "delivery" in last_delivery_err.lower():
issues.append(f"{job.get('name','?')}")
if issues:
return False, f"{len(issues)} issues: {', '.join(issues[:3])}"
return True, "all_ok"
except Exception as e:
return True, f"skip({str(e)[:60]})"
# ── 自动发现 ──
def self_discovery():
"""自动发现新增组件并更新checklist"""
discovered = []
# 1. 发现新增cron任务
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if cron_jobs_path.exists():
data = json.loads(cron_jobs_path.read_text())
all_crons = [(j.get("id"), j.get("name"), j.get("schedule"), j.get("script"))
for j in data.get("jobs", [])]
# 读当前checklist中已有的cron ID
checklist = json.loads(CHECKLIST_PATH.read_text())
known_cron_ids = set()
for cat in checklist["categories"]:
for item in cat["items"]:
if item["check"].startswith("cron:"):
known_cron_ids.add(item["check"].split(":")[1])
for jid, name, schedule, script in all_crons:
if jid and jid not in known_cron_ids and script:
# 新cron任务,追加到pipeline类
discovered.append(f"新cron: {name}({jid})")
for cat in checklist["categories"]:
if cat["id"] == "pipeline":
cat["items"].append({
"id": f"cron-auto-{jid[:8]}",
"description": f"{name} cron 已调度",
"check": f"cron:{jid}",
"expected": "enabled+ok",
"severity": "medium",
"auto_discovered": True
})
break
if discovered:
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
except Exception as e:
discovered.append(f"cron_discovery_error: {e}")
return discovered
# ── 主流程 ──
def run_check(item):
"""运行单个检查项"""
check_spec = item["check"]
expected = item["expected"]
if check_spec.startswith("systemctl:"):
service = check_spec.split(":", 1)[1]
ok, detail = check_systemctl(service)
elif check_spec.startswith("port:"):
port = check_spec.split(":", 1)[1]
ok, detail = check_port(port)
elif check_spec.startswith("proc:"):
pattern = check_spec.split(":", 1)[1]
ok, detail = check_process(pattern)
elif check_spec.startswith("http:"):
url = check_spec.split(":", 1)[1]
ok, detail = check_http(url)
elif check_spec.startswith("disk:"):
mount = check_spec.split(":", 1)[1]
ok, detail = check_disk(mount)
elif check_spec.startswith("fileexists:"):
path = check_spec.split(":", 1)[1]
ok, detail = check_file_exists(path)
elif check_spec.startswith("filefresh:"):
# filefresh:path:max_hours
parts = check_spec.split(":", 2)
path = parts[1]
max_hours = float(parts[2].replace("h", ""))
ok, detail = check_file_freshness(path, max_hours)
elif check_spec.startswith("db:"):
# db:table:field:value:op:threshold
parts = check_spec.split(":", 5)
table = parts[1]
field = parts[2] if len(parts) > 2 else None
value = parts[3] if len(parts) > 3 else None
op = parts[4] if len(parts) > 4 else "today"
threshold = int(parts[5]) if len(parts) > 5 else 0
ok, detail = check_db_table_count(table, field, value, op, threshold)
elif check_spec.startswith("cron:"):
job_id = check_spec.split(":", 1)[1]
ok, detail = check_cron(job_id)
elif check_spec == "cron_errors:last24h":
ok, detail = check_cron_errors_last24h()
elif check_spec == "cron_paused:check":
ok, detail = check_cron_paused()
elif check_spec == "delivery:origin_targets":
ok, detail = check_delivery_targets()
elif check_spec == "pipeline:xiaoguo_signal_flow":
# 综合检查:小果有数据→被我处理
today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0)
unproc, d2 = check_db_table_count("signal_news", None, None, "unprocessed", 30)
ok = today_xiaoguo or unproc
detail = f"today_xiaoguo={d1}, unprocessed={d2}"
else:
ok = False
detail = f"unknown_check:{check_spec}"
level = "ok" if ok else item["severity"]
# 将critical/error/high都映射到error级别
if not ok:
if item["severity"] == "critical":
level = "critical"
elif item["severity"] in ("high", "error"):
level = "error"
else:
level = "warn"
return ok, level, detail
def main():
show_full = "--report" in sys.argv
update = "--update-checklist" in sys.argv
start_time = time.time()
# 加载checklist
if not CHECKLIST_PATH.exists():
print("[SILENT] health_checklist.json 不存在")
return
checklist = json.loads(CHECKLIST_PATH.read_text())
# 自动发现(每小时仅运行一次)
if update:
discovered = self_discovery()
else:
# 定期自动发现(检查上次扫描时间)
meta = checklist.get("meta", {})
last_scan = meta.get("last_full_scan")
if not last_scan or (ctx["started_at"] - datetime.fromisoformat(last_scan)).total_seconds() > 3600:
discovered = self_discovery()
checklist["meta"]["last_full_scan"] = ctx["started_at"].isoformat()
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
else:
discovered = []
# 按分类逐项检查
dayname = ["","","","","","",""][ctx["started_at"].weekday()]
lines = [f"MoFin 系统体检 | {ctx['started_at'].strftime('%Y-%m-%d')}{dayname} | {ctx['started_at'].strftime('%H:%M')}"]
lines.append("" * 50)
for cat in checklist["categories"]:
cat_issues = 0
cat_lines = []
for item in cat["items"]:
ok, level, detail = run_check(item)
msg = f"{item['description']}: {detail}"
log(level, cat["name"], msg, item["id"])
cat_lines.append(emit(msg, level))
if not ok:
cat_issues += 1
# 只在该分类有问题或--report时才输出
if cat_issues > 0 or show_full:
lines.append(f"\n{cat['name']}")
lines.extend(cat_lines)
# 自动发现结果
if discovered:
lines.append(f"\n📎 自动发现:")
for d in discovered:
lines.append(f" {d}")
# 汇总
total = ctx["ok_count"] + ctx["warn_count"] + ctx["error_count"] + ctx["critical_count"]
if total == 0:
total = 1 # 避免除以0
lines.append(f"\n{'' * 50}")
critical = ctx["critical_count"]
errors = ctx["error_count"]
warns = ctx["warn_count"]
ok_count = ctx["ok_count"]
# 构建严重级别输出
severity_parts = []
if critical > 0:
severity_parts.append(f"🔴{critical}严重")
if errors > 0:
severity_parts.append(f"{errors}错误")
if warns > 0:
severity_parts.append(f"⚠️{warns}警告")
if ok_count > 0:
severity_parts.append(f"{ok_count}正常")
lines.append(f"总计: {' | '.join(severity_parts)} ({(time.time()-start_time):.0f}s)")
report = "\n".join(lines)
# 保存历史
try:
history = []
if HISTORY_PATH.exists():
history = json.loads(HISTORY_PATH.read_text())
history.append({
"timestamp": ctx["started_at"].isoformat(),
"ok": ok_count, "warn": warns, "error": errors, "critical": critical,
"duration_s": round(time.time() - start_time, 1)
})
# 保留最近30天
if len(history) > 90:
history = history[-90:]
HISTORY_PATH.write_text(json.dumps(history, ensure_ascii=False, indent=2))
except:
pass
# 输出
# no_agent模式:有问题才出声;--report则强制输出
has_issues = critical > 0 or errors > 0 or warns > 0
if has_issues or show_full:
print(report)
else:
print(f"[SILENT] MoFin 体检通过 | {ok_count}/{total} 检查正常 ({(time.time()-start_time):.0f}s)")
# 如果有严重问题,额外输出可读摘要
if critical > 0 or errors > 0:
print()
print("🔴 需立即处理的问题:")
for entry in ctx["report"]:
if entry["level"] in ("critical", "error"):
print(f" [{entry['level'].upper()}] {entry['category']}: {entry['msg']}")
if __name__ == "__main__":
main()