Files
MoFin/scripts/morning_health_check.py
T
知微 26993c1d41 fix: 健康检查→自动修复→TODO三级处理
- 新增 auto_fix_issue(): 可自动修复的问题直接修复(汇率缓存/价格事件)
- 修复后的问题不写TODO,只有不可自动修复的才进TODO系统
- 修复 checklist 中港股汇率缓存路径(profile环境~解析差异)
- 输出增加🛠️自动修复和📋TODO写入摘要

处理流程:
  可自动修复(汇率刷新等)→ 立即修复,报告标记
  需人工/复杂修复(cron错误/delivery配置)→ 写入TODO
  TODO由self-todo cron在下一窗口(9:00/11:00/14:00等)处理
2026-06-24 20:26:47 +08:00

645 lines
24 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""morning_health_check.py — MoFin 系统常规体检
每日开盘前(8:00)运行,全面扫描MoFin所有组件健康状况。
输出格式化的体检报告,有问题才出声,没问题静默。
核心设计:
1. 从 health_checklist.json 读检查清单
2. 逐项检查,记录状态
3. 报告异常项(只推异常,不推正常)
4. 自动发现新增cron/脚本(通过 self_discovery 函数)
5. 维护检查历史 (health_check_history.json)
新增组件自动发现机制:
- 对比当前cron list与checklist中记录的cron id
- 发现新cron → 自动追加到checklist
- 脚本修改 → 标记"需复核"
用法:
python3 scripts/morning_health_check.py [--report] [--update-checklist]
--report: 强制输出完整报告(默认只输出异常)
--update-checklist: 运行自动发现并更新checklist
no_agent模式:只输出异常项,无异常完全静默
"""
import json, os, sqlite3, subprocess, sys, time, urllib.request
from pathlib import Path
from datetime import datetime, timedelta
# ── 路径 ──
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
SCRIPTS_DIR = BASE / "scripts"
PROFILE_SCRIPTS = Path("/home/hmo/.hermes/profiles/position-analyst/scripts")
CHECKLIST_PATH = DATA / "health_checklist.json"
HISTORY_PATH = DATA / "health_check_history.json"
DB_PATH = DATA / "mofin.db"
HERMES_CRON_DIR = Path("/home/hmo/.hermes/profiles/position-analyst/cron")
TODO_PATH = Path("/home/hmo/.hermes/profiles/position-analyst/todo.json")
def auto_fix_issue(issue):
"""对明确可自动修复的问题执行修复,返回 (fixed, fix_msg)"""
item_id = issue.get("detail", "")
msg = issue.get("msg", "")
# 港股汇率缓存缺失 → 生成
if "港股汇率缓存" in msg and "missing" in msg:
try:
# hk_rate.py 写入 ~/.cache/hk_exchange_rate.jsonprofile环境下解析到 profile/home/.cache/
r = subprocess.run(
["python3", str(BASE / "hk_rate.py")],
capture_output=True, text=True, timeout=15
)
if r.returncode == 0:
return True, f"已自动刷新港股汇率缓存: {r.stdout.strip()}"
else:
return False, f"汇率刷新失败: {r.stderr[:100]}"
except Exception as e:
return False, f"汇率刷新异常: {e}"
# 价格监控今天无事件(交易日盘中)→ 检查进程
if "价格监控" in msg and "0 rows" in msg:
now = ctx["started_at"]
if now.weekday() < 5 and 9 <= now.hour <= 15:
# 交易时段,应该有事
ok, detail = check_process("price_monitor")
if not ok:
return True, "已检测:price_monitor进程不存在(需人工介入)"
return True, "已确认:price_monitor进程运行中,但今日无事件(可能无价格触发)"
# 非交易时段→正常
return True, "非交易时段无价格事件属正常"
# 其他问题→不可自动修复
return False, "需人工处理"
def write_todos_for_issues():
"""将体检发现的异常写入 TODO 系统(去重、升级),先尝试自动修复"""
try:
if not ctx["report"]:
return
# 只有 error/critical/warn 才处理
issues = [e for e in ctx["report"] if e["level"] in ("critical", "error", "warn")]
if not issues:
return
# 先尝试自动修复
fixed_issues = []
remaining = []
for issue in issues:
fixed, fix_msg = auto_fix_issue(issue)
if fixed:
fixed_issues.append((issue, fix_msg))
log("ok", issue["category"], f"已自动修复: {fix_msg}", issue.get("detail",""))
else:
remaining.append(issue)
# 输出修复摘要
if fixed_issues:
print()
print("🛠️ 自动修复:")
for issue, fix_msg in fixed_issues:
print(f"{issue['category']}: {fix_msg}")
# 剩余的无法自动修复的→写TODO
if not remaining:
return
# 读现有 TODO
existing = []
if TODO_PATH.exists():
try:
existing = json.loads(TODO_PATH.read_text())
except:
existing = []
existing_titles = {t.get("title", "") for t in existing}
todo_priority = {"critical": "high", "error": "medium", "warn": "low"}
new_items = []
for issue in remaining:
title = f"[体检发现] {issue['msg']}"
# 去重
if title in existing_titles:
for t in existing:
if t.get("title") == title:
if t.get("status") == "completed":
t["status"] = "pending"
t["priority"] = todo_priority.get(issue["level"], "medium")
t["note"] = f"重新打开: {ctx['started_at'].isoformat()}"
continue
existing_titles.add(title)
new_items.append({
"title": title,
"desc": f"体检发现于 {ctx['started_at'].strftime('%Y-%m-%d %H:%M')},分类: {issue['category']},详情: {issue.get('detail', '')}",
"status": "pending",
"priority": todo_priority.get(issue["level"], "medium"),
"created": ctx["started_at"].isoformat(),
"target": "health_check_fix",
})
if new_items:
existing.extend(new_items)
TODO_PATH.write_text(json.dumps(existing, ensure_ascii=False, indent=2))
print()
print("📋 已加入TODO(待处理):")
for item in new_items:
print(f" [{item['priority']}] {item['title'][:70]}")
except Exception as e:
pass # TODO 写入失败不阻碍体检主流程
# 异常缓存(同一问题24h内不重复推)
KNOWN_ISSUES_PATH = DATA / "health_known_issues.json"
# ── 上下文 ──
ctx = {
"report": [],
"issues": [],
"ok_count": 0,
"warn_count": 0,
"error_count": 0,
"critical_count": 0,
"started_at": datetime.now(),
}
def log(level, category, msg, detail=None):
"""记录检查结果"""
ctx["report"].append({
"level": level, "category": category, "msg": msg, "detail": detail,
"timestamp": datetime.now().isoformat()
})
if level == "critical":
ctx["critical_count"] += 1
elif level == "error":
ctx["error_count"] += 1
elif level == "warn":
ctx["warn_count"] += 1
else:
ctx["ok_count"] += 1
def emit(msg, level="ok"):
"""输出一行"""
prefix = {"critical": "🔴", "error": "", "warn": "⚠️", "ok": "", "info": "📎"}.get(level, "")
return f"{prefix} {msg}"
# ── 检查器集合 ──
def check_systemctl(service_name):
"""检查systemd服务状态"""
try:
r = subprocess.run(["systemctl", "is-active", service_name],
capture_output=True, text=True, timeout=5)
status = r.stdout.strip()
return status == "active", f"{status}"
except Exception as e:
return False, f"error:{e}"
def check_port(port):
"""检查端口是否在监听"""
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
return f":{port}" in r.stdout, "listening" if f":{port}" in r.stdout else "not_found"
except Exception as e:
return False, f"error:{e}"
def check_process(pattern):
"""检查进程是否存在"""
try:
r = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5)
return r.returncode == 0, "running" if r.returncode == 0 else "not_found"
except:
return False, "check_error"
def check_http(url, timeout=15):
"""检查HTTP端点是否可达 (清理代理环境变量)"""
try:
# 清理所有代理环境变量
old_env = {}
for k in list(os.environ.keys()):
if 'proxy' in k.lower():
old_env[k] = os.environ.pop(k)
req = urllib.request.Request(url, method="GET")
resp = urllib.request.urlopen(req, timeout=timeout)
# 恢复
for k, v in old_env.items():
os.environ[k] = v
return True, str(resp.status)
except Exception as e:
return False, str(e)[:60]
def check_disk(mount):
"""检查磁盘空间"""
try:
r = subprocess.run(["df", "-h", mount], capture_output=True, text=True, timeout=5)
lines = r.stdout.strip().split("\n")
if len(lines) >= 2:
parts = lines[1].split()
if len(parts) >= 5:
pct = parts[4].replace("%", "")
return int(pct) < 90, f"{pct}% used"
return False, "parse_error"
except:
return False, "check_error"
def check_file_exists(path):
"""检查文件存在"""
p = Path(path)
exists = p.exists()
return exists, f"{p.stat().st_size}B" if exists else "missing"
def check_file_freshness(path, max_hours):
"""检查文件新鲜度"""
p = Path(path)
if not p.exists():
return False, "missing"
mtime = datetime.fromtimestamp(p.stat().st_mtime)
hours_ago = (ctx["started_at"] - mtime).total_seconds() / 3600
return hours_ago < max_hours, f"{hours_ago:.0f}h ago (threshold {max_hours}h)"
def check_db_table_count(table, field, value, op="today", threshold=0):
"""检查数据库中的记录数"""
try:
conn = sqlite3.connect(str(DB_PATH))
cur = conn.cursor()
if op == "today":
today = ctx["started_at"].strftime("%Y-%m-%d")
# 先检查表有哪些列
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
date_col = None
for candidate in ["created_at", "date", "timestamp", "last_scanned_at", "signal_date"]:
if candidate in cols:
date_col = candidate
break
if not date_col:
conn.close()
return True, f"no_date_col_in_{table}"
sql = f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?"
cur.execute(sql, (today,))
elif op == "unprocessed":
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
if "processed" in cols:
sql = f"SELECT COUNT(*) FROM {table} WHERE (processed = 0 OR processed IS NULL)"
elif "source" in cols:
sql = f"SELECT COUNT(*) FROM {table} WHERE source LIKE '%xiaoguo%'"
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
elif op == "count":
if field:
sql = f"SELECT COUNT(*) FROM {table} WHERE {field} = ?"
cur.execute(sql, (value,))
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
count = cur.fetchone()[0]
conn.close()
if op == "unprocessed":
return count < threshold, f"{count} unprocessed"
return count >= threshold, f"{count} rows"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_cron(job_id):
"""检查cron任务状态(通过jobs.json"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if cron_jobs_path.exists():
data = json.loads(cron_jobs_path.read_text())
for job in data.get("jobs", []):
if job.get("id") == job_id or (not job.get("id") and job.get("name") and job_id in str(job)):
enabled = job.get("enabled", True)
if not enabled:
return False, "disabled"
last_status = job.get("last_status")
if last_status and last_status != "ok":
return False, f"status={last_status}"
last_run = job.get("last_run_at", "")
if last_run:
try:
last_dt = datetime.fromisoformat(last_run)
hours_ago = (ctx["started_at"] - last_dt).total_seconds() / 3600
if hours_ago > 48:
return True, f"ok(stale:{hours_ago:.0f}h)"
except:
pass
return True, "ok"
# 没找到该job_id - 可能是不需要检查的cron
return True, "not_in_jobs_json"
return False, "no_jobs_json"
except Exception as e:
return False, f"check_error:{str(e)[:60]}"
def check_cron_errors_last24h():
"""检查最近24h内cron是否有error状态"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()
errors = []
for job in data.get("jobs", []):
last_status = job.get("last_status")
last_run = job.get("last_run_at", "")
if last_status and last_status != "ok" and last_run:
try:
if last_run >= check_time[:19]:
errors.append(f"{job.get('name','?')}({last_status})")
except:
pass
if errors:
return False, f"{len(errors)} errors: {'; '.join(errors[:5])}"
return True, "0 errors"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_cron_paused():
"""检查不应暂停的cron是否被误暂停"""
should_run = [
("3a9fb3300a6a", "价格监控"),
("0851c7838ca3", "小果扫描"),
("e13323928f3a", "自选提醒"),
("b809fcabfa5b", "分支评估"),
]
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
job_map = {job.get("id"): job for job in data.get("jobs", [])}
paused = []
for jid, name in should_run:
job = job_map.get(jid)
if job and not job.get("enabled", True):
paused.append(name)
if paused:
return False, f"paused: {', '.join(paused)}"
return True, "all_expected_running"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_delivery_targets():
"""检查deliver=origin的cron是否有目标"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
issues = []
for job in data.get("jobs", []):
last_delivery_err = job.get("last_delivery_error", "")
if last_delivery_err and "delivery" in last_delivery_err.lower():
issues.append(f"{job.get('name','?')}")
if issues:
return False, f"{len(issues)} issues: {', '.join(issues[:3])}"
return True, "all_ok"
except Exception as e:
return True, f"skip({str(e)[:60]})"
# ── 自动发现 ──
def self_discovery():
"""自动发现新增组件并更新checklist"""
discovered = []
# 1. 发现新增cron任务
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if cron_jobs_path.exists():
data = json.loads(cron_jobs_path.read_text())
all_crons = [(j.get("id"), j.get("name"), j.get("schedule"), j.get("script"))
for j in data.get("jobs", [])]
# 读当前checklist中已有的cron ID
checklist = json.loads(CHECKLIST_PATH.read_text())
known_cron_ids = set()
for cat in checklist["categories"]:
for item in cat["items"]:
if item["check"].startswith("cron:"):
known_cron_ids.add(item["check"].split(":")[1])
for jid, name, schedule, script in all_crons:
if jid and jid not in known_cron_ids and script:
# 新cron任务,追加到pipeline类
discovered.append(f"新cron: {name}({jid})")
for cat in checklist["categories"]:
if cat["id"] == "pipeline":
cat["items"].append({
"id": f"cron-auto-{jid[:8]}",
"description": f"{name} cron 已调度",
"check": f"cron:{jid}",
"expected": "enabled+ok",
"severity": "medium",
"auto_discovered": True
})
break
if discovered:
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
except Exception as e:
discovered.append(f"cron_discovery_error: {e}")
return discovered
# ── 主流程 ──
def run_check(item):
"""运行单个检查项"""
check_spec = item["check"]
expected = item["expected"]
if check_spec.startswith("systemctl:"):
service = check_spec.split(":", 1)[1]
ok, detail = check_systemctl(service)
elif check_spec.startswith("port:"):
port = check_spec.split(":", 1)[1]
ok, detail = check_port(port)
elif check_spec.startswith("proc:"):
pattern = check_spec.split(":", 1)[1]
ok, detail = check_process(pattern)
elif check_spec.startswith("http:"):
url = check_spec.split(":", 1)[1]
ok, detail = check_http(url)
elif check_spec.startswith("disk:"):
mount = check_spec.split(":", 1)[1]
ok, detail = check_disk(mount)
elif check_spec.startswith("fileexists:"):
path = check_spec.split(":", 1)[1]
ok, detail = check_file_exists(path)
elif check_spec.startswith("filefresh:"):
# filefresh:path:max_hours
parts = check_spec.split(":", 2)
path = parts[1]
max_hours = float(parts[2].replace("h", ""))
ok, detail = check_file_freshness(path, max_hours)
elif check_spec.startswith("db:"):
# db:table:field:value:op:threshold
parts = check_spec.split(":", 5)
table = parts[1]
field = parts[2] if len(parts) > 2 else None
value = parts[3] if len(parts) > 3 else None
op = parts[4] if len(parts) > 4 else "today"
threshold = int(parts[5]) if len(parts) > 5 else 0
ok, detail = check_db_table_count(table, field, value, op, threshold)
elif check_spec.startswith("cron:"):
job_id = check_spec.split(":", 1)[1]
ok, detail = check_cron(job_id)
elif check_spec == "cron_errors:last24h":
ok, detail = check_cron_errors_last24h()
elif check_spec == "cron_paused:check":
ok, detail = check_cron_paused()
elif check_spec == "delivery:origin_targets":
ok, detail = check_delivery_targets()
elif check_spec == "pipeline:xiaoguo_signal_flow":
# 综合检查:小果有数据→被我处理
today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0)
unproc, d2 = check_db_table_count("signal_news", None, None, "unprocessed", 30)
ok = today_xiaoguo or unproc
detail = f"today_xiaoguo={d1}, unprocessed={d2}"
else:
ok = False
detail = f"unknown_check:{check_spec}"
level = "ok" if ok else item["severity"]
# 将critical/error/high都映射到error级别
if not ok:
if item["severity"] == "critical":
level = "critical"
elif item["severity"] in ("high", "error"):
level = "error"
else:
level = "warn"
return ok, level, detail
def main():
show_full = "--report" in sys.argv
update = "--update-checklist" in sys.argv
start_time = time.time()
# 加载checklist
if not CHECKLIST_PATH.exists():
print("[SILENT] health_checklist.json 不存在")
return
checklist = json.loads(CHECKLIST_PATH.read_text())
# 自动发现(每小时仅运行一次)
if update:
discovered = self_discovery()
else:
# 定期自动发现(检查上次扫描时间)
meta = checklist.get("meta", {})
last_scan = meta.get("last_full_scan")
if not last_scan or (ctx["started_at"] - datetime.fromisoformat(last_scan)).total_seconds() > 3600:
discovered = self_discovery()
checklist["meta"]["last_full_scan"] = ctx["started_at"].isoformat()
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
else:
discovered = []
# 按分类逐项检查
dayname = ["","","","","","",""][ctx["started_at"].weekday()]
lines = [f"MoFin 系统体检 | {ctx['started_at'].strftime('%Y-%m-%d')}{dayname} | {ctx['started_at'].strftime('%H:%M')}"]
lines.append("" * 50)
for cat in checklist["categories"]:
cat_issues = 0
cat_lines = []
for item in cat["items"]:
ok, level, detail = run_check(item)
msg = f"{item['description']}: {detail}"
log(level, cat["name"], msg, item["id"])
cat_lines.append(emit(msg, level))
if not ok:
cat_issues += 1
# 只在该分类有问题或--report时才输出
if cat_issues > 0 or show_full:
lines.append(f"\n{cat['name']}")
lines.extend(cat_lines)
# 自动发现结果
if discovered:
lines.append(f"\n📎 自动发现:")
for d in discovered:
lines.append(f" {d}")
# 汇总
total = ctx["ok_count"] + ctx["warn_count"] + ctx["error_count"] + ctx["critical_count"]
if total == 0:
total = 1 # 避免除以0
lines.append(f"\n{'' * 50}")
critical = ctx["critical_count"]
errors = ctx["error_count"]
warns = ctx["warn_count"]
ok_count = ctx["ok_count"]
# 构建严重级别输出
severity_parts = []
if critical > 0:
severity_parts.append(f"🔴{critical}严重")
if errors > 0:
severity_parts.append(f"{errors}错误")
if warns > 0:
severity_parts.append(f"⚠️{warns}警告")
if ok_count > 0:
severity_parts.append(f"{ok_count}正常")
lines.append(f"总计: {' | '.join(severity_parts)} ({(time.time()-start_time):.0f}s)")
report = "\n".join(lines)
# 保存历史
try:
history = []
if HISTORY_PATH.exists():
history = json.loads(HISTORY_PATH.read_text())
history.append({
"timestamp": ctx["started_at"].isoformat(),
"ok": ok_count, "warn": warns, "error": errors, "critical": critical,
"duration_s": round(time.time() - start_time, 1)
})
# 保留最近30天
if len(history) > 90:
history = history[-90:]
HISTORY_PATH.write_text(json.dumps(history, ensure_ascii=False, indent=2))
except:
pass
# 输出
# no_agent模式:有问题才出声;--report则强制输出
has_issues = critical > 0 or errors > 0 or warns > 0
if has_issues or show_full:
print(report)
else:
print(f"[SILENT] MoFin 体检通过 | {ok_count}/{total} 检查正常 ({(time.time()-start_time):.0f}s)")
# 如果有严重问题,额外输出可读摘要
if critical > 0 or errors > 0:
print()
print("🔴 需立即处理的问题:")
for entry in ctx["report"]:
if entry["level"] in ("critical", "error"):
print(f" [{entry['level'].upper()}] {entry['category']}: {entry['msg']}")
# 将异常写入 TODO 系统
write_todos_for_issues()
if __name__ == "__main__":
main()