Files
MoFin/scripts/morning_health_check.py
T
知微 4407f35027 feat: 自愈循环系统 + xiaoguo信号桥
- 新增 docs/self-healing-loop.md: 自愈循环设计文档
- 修改 morning_health_check.py: 发现异常后自动写入TODO系统
  - severity=critical→priority=high, error→medium, warn→low
  - 相同问题已completed→重新打开并升一级
  - 已有pending→不重复
- 新增 scripts/xiaoguo_signal_consumer.py: 知微消费小果扫描信号
  - 每30分钟盘中运行
  - 读signal_news未处理xiaoguo信号
  - 五维快速评估→加自选/关注/跳过
  - 标记processed=1
- 新增 cron: 小果信号消费-盘中 (15,45 9-15)
- 更新health_checklist自维护机制

自愈循环: 体检→TODO→修复→验证
2026-06-24 20:14:53 +08:00

590 lines
22 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""morning_health_check.py — MoFin 系统常规体检
每日开盘前(8:00)运行,全面扫描MoFin所有组件健康状况。
输出格式化的体检报告,有问题才出声,没问题静默。
核心设计:
1. 从 health_checklist.json 读检查清单
2. 逐项检查,记录状态
3. 报告异常项(只推异常,不推正常)
4. 自动发现新增cron/脚本(通过 self_discovery 函数)
5. 维护检查历史 (health_check_history.json)
新增组件自动发现机制:
- 对比当前cron list与checklist中记录的cron id
- 发现新cron → 自动追加到checklist
- 脚本修改 → 标记"需复核"
用法:
python3 scripts/morning_health_check.py [--report] [--update-checklist]
--report: 强制输出完整报告(默认只输出异常)
--update-checklist: 运行自动发现并更新checklist
no_agent模式:只输出异常项,无异常完全静默
"""
import json, os, sqlite3, subprocess, sys, time, urllib.request
from pathlib import Path
from datetime import datetime, timedelta
# ── 路径 ──
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
SCRIPTS_DIR = BASE / "scripts"
PROFILE_SCRIPTS = Path("/home/hmo/.hermes/profiles/position-analyst/scripts")
CHECKLIST_PATH = DATA / "health_checklist.json"
HISTORY_PATH = DATA / "health_check_history.json"
DB_PATH = DATA / "mofin.db"
HERMES_CRON_DIR = Path("/home/hmo/.hermes/profiles/position-analyst/cron")
TODO_PATH = Path("/home/hmo/.hermes/profiles/position-analyst/todo.json")
def write_todos_for_issues():
"""将体检发现的异常写入 TODO 系统(去重、升级)"""
try:
if not ctx["report"]:
return
# 只有 error/critical/warn 才写 TODO
issues = [e for e in ctx["report"] if e["level"] in ("critical", "error", "warn")]
if not issues:
return
# 读现有 TODO
existing = []
if TODO_PATH.exists():
try:
existing = json.loads(TODO_PATH.read_text())
except:
existing = []
existing_titles = {t.get("title", "") for t in existing}
todo_priority = {"critical": "high", "error": "medium", "warn": "low"}
new_items = []
for issue in issues:
title = f"[体检发现] {issue['msg']}"
# 去重:检查是否已有相同 title 的 TODO
if title in existing_titles:
# 已有相同 TODO -> 升级
for t in existing:
if t.get("title") == title:
if t.get("status") == "completed":
# 昨天修了但今天还有问题 -> 重新打开并升级
t["status"] = "pending"
t["priority"] = todo_priority.get(issue["level"], "medium")
t["note"] = f"重新打开: {ctx['started_at'].isoformat()}"
elif t.get("status") == "pending":
# 还是 pending -> 不重复写入
pass
elif t.get("status") == "in_progress":
# 正在处理中 -> 不干扰
pass
continue
existing_titles.add(title)
new_items.append({
"title": title,
"desc": f"体检发现于 {ctx['started_at'].strftime('%Y-%m-%d %H:%M')},分类: {issue['category']},详情: {issue.get('detail', '')}",
"status": "pending",
"priority": todo_priority.get(issue["level"], "medium"),
"created": ctx["started_at"].isoformat(),
"target": "health_check_fix",
})
if new_items:
existing.extend(new_items)
TODO_PATH.write_text(json.dumps(existing, ensure_ascii=False, indent=2))
except Exception as e:
pass # TODO 写入失败不阻碍体检主流程
# 异常缓存(同一问题24h内不重复推)
KNOWN_ISSUES_PATH = DATA / "health_known_issues.json"
# ── 上下文 ──
ctx = {
"report": [],
"issues": [],
"ok_count": 0,
"warn_count": 0,
"error_count": 0,
"critical_count": 0,
"started_at": datetime.now(),
}
def log(level, category, msg, detail=None):
"""记录检查结果"""
ctx["report"].append({
"level": level, "category": category, "msg": msg, "detail": detail,
"timestamp": datetime.now().isoformat()
})
if level == "critical":
ctx["critical_count"] += 1
elif level == "error":
ctx["error_count"] += 1
elif level == "warn":
ctx["warn_count"] += 1
else:
ctx["ok_count"] += 1
def emit(msg, level="ok"):
"""输出一行"""
prefix = {"critical": "🔴", "error": "", "warn": "⚠️", "ok": "", "info": "📎"}.get(level, "")
return f"{prefix} {msg}"
# ── 检查器集合 ──
def check_systemctl(service_name):
"""检查systemd服务状态"""
try:
r = subprocess.run(["systemctl", "is-active", service_name],
capture_output=True, text=True, timeout=5)
status = r.stdout.strip()
return status == "active", f"{status}"
except Exception as e:
return False, f"error:{e}"
def check_port(port):
"""检查端口是否在监听"""
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
return f":{port}" in r.stdout, "listening" if f":{port}" in r.stdout else "not_found"
except Exception as e:
return False, f"error:{e}"
def check_process(pattern):
"""检查进程是否存在"""
try:
r = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5)
return r.returncode == 0, "running" if r.returncode == 0 else "not_found"
except:
return False, "check_error"
def check_http(url, timeout=15):
"""检查HTTP端点是否可达 (清理代理环境变量)"""
try:
# 清理所有代理环境变量
old_env = {}
for k in list(os.environ.keys()):
if 'proxy' in k.lower():
old_env[k] = os.environ.pop(k)
req = urllib.request.Request(url, method="GET")
resp = urllib.request.urlopen(req, timeout=timeout)
# 恢复
for k, v in old_env.items():
os.environ[k] = v
return True, str(resp.status)
except Exception as e:
return False, str(e)[:60]
def check_disk(mount):
"""检查磁盘空间"""
try:
r = subprocess.run(["df", "-h", mount], capture_output=True, text=True, timeout=5)
lines = r.stdout.strip().split("\n")
if len(lines) >= 2:
parts = lines[1].split()
if len(parts) >= 5:
pct = parts[4].replace("%", "")
return int(pct) < 90, f"{pct}% used"
return False, "parse_error"
except:
return False, "check_error"
def check_file_exists(path):
"""检查文件存在"""
p = Path(path)
exists = p.exists()
return exists, f"{p.stat().st_size}B" if exists else "missing"
def check_file_freshness(path, max_hours):
"""检查文件新鲜度"""
p = Path(path)
if not p.exists():
return False, "missing"
mtime = datetime.fromtimestamp(p.stat().st_mtime)
hours_ago = (ctx["started_at"] - mtime).total_seconds() / 3600
return hours_ago < max_hours, f"{hours_ago:.0f}h ago (threshold {max_hours}h)"
def check_db_table_count(table, field, value, op="today", threshold=0):
"""检查数据库中的记录数"""
try:
conn = sqlite3.connect(str(DB_PATH))
cur = conn.cursor()
if op == "today":
today = ctx["started_at"].strftime("%Y-%m-%d")
# 先检查表有哪些列
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
date_col = None
for candidate in ["created_at", "date", "timestamp", "last_scanned_at", "signal_date"]:
if candidate in cols:
date_col = candidate
break
if not date_col:
conn.close()
return True, f"no_date_col_in_{table}"
sql = f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?"
cur.execute(sql, (today,))
elif op == "unprocessed":
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
if "processed" in cols:
sql = f"SELECT COUNT(*) FROM {table} WHERE (processed = 0 OR processed IS NULL)"
elif "source" in cols:
sql = f"SELECT COUNT(*) FROM {table} WHERE source LIKE '%xiaoguo%'"
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
elif op == "count":
if field:
sql = f"SELECT COUNT(*) FROM {table} WHERE {field} = ?"
cur.execute(sql, (value,))
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
count = cur.fetchone()[0]
conn.close()
if op == "unprocessed":
return count < threshold, f"{count} unprocessed"
return count >= threshold, f"{count} rows"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_cron(job_id):
"""检查cron任务状态(通过jobs.json"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if cron_jobs_path.exists():
data = json.loads(cron_jobs_path.read_text())
for job in data.get("jobs", []):
if job.get("id") == job_id or (not job.get("id") and job.get("name") and job_id in str(job)):
enabled = job.get("enabled", True)
if not enabled:
return False, "disabled"
last_status = job.get("last_status")
if last_status and last_status != "ok":
return False, f"status={last_status}"
last_run = job.get("last_run_at", "")
if last_run:
try:
last_dt = datetime.fromisoformat(last_run)
hours_ago = (ctx["started_at"] - last_dt).total_seconds() / 3600
if hours_ago > 48:
return True, f"ok(stale:{hours_ago:.0f}h)"
except:
pass
return True, "ok"
# 没找到该job_id - 可能是不需要检查的cron
return True, "not_in_jobs_json"
return False, "no_jobs_json"
except Exception as e:
return False, f"check_error:{str(e)[:60]}"
def check_cron_errors_last24h():
"""检查最近24h内cron是否有error状态"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()
errors = []
for job in data.get("jobs", []):
last_status = job.get("last_status")
last_run = job.get("last_run_at", "")
if last_status and last_status != "ok" and last_run:
try:
if last_run >= check_time[:19]:
errors.append(f"{job.get('name','?')}({last_status})")
except:
pass
if errors:
return False, f"{len(errors)} errors: {'; '.join(errors[:5])}"
return True, "0 errors"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_cron_paused():
"""检查不应暂停的cron是否被误暂停"""
should_run = [
("3a9fb3300a6a", "价格监控"),
("0851c7838ca3", "小果扫描"),
("e13323928f3a", "自选提醒"),
("b809fcabfa5b", "分支评估"),
]
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
job_map = {job.get("id"): job for job in data.get("jobs", [])}
paused = []
for jid, name in should_run:
job = job_map.get(jid)
if job and not job.get("enabled", True):
paused.append(name)
if paused:
return False, f"paused: {', '.join(paused)}"
return True, "all_expected_running"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_delivery_targets():
"""检查deliver=origin的cron是否有目标"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
issues = []
for job in data.get("jobs", []):
last_delivery_err = job.get("last_delivery_error", "")
if last_delivery_err and "delivery" in last_delivery_err.lower():
issues.append(f"{job.get('name','?')}")
if issues:
return False, f"{len(issues)} issues: {', '.join(issues[:3])}"
return True, "all_ok"
except Exception as e:
return True, f"skip({str(e)[:60]})"
# ── 自动发现 ──
def self_discovery():
"""自动发现新增组件并更新checklist"""
discovered = []
# 1. 发现新增cron任务
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if cron_jobs_path.exists():
data = json.loads(cron_jobs_path.read_text())
all_crons = [(j.get("id"), j.get("name"), j.get("schedule"), j.get("script"))
for j in data.get("jobs", [])]
# 读当前checklist中已有的cron ID
checklist = json.loads(CHECKLIST_PATH.read_text())
known_cron_ids = set()
for cat in checklist["categories"]:
for item in cat["items"]:
if item["check"].startswith("cron:"):
known_cron_ids.add(item["check"].split(":")[1])
for jid, name, schedule, script in all_crons:
if jid and jid not in known_cron_ids and script:
# 新cron任务,追加到pipeline类
discovered.append(f"新cron: {name}({jid})")
for cat in checklist["categories"]:
if cat["id"] == "pipeline":
cat["items"].append({
"id": f"cron-auto-{jid[:8]}",
"description": f"{name} cron 已调度",
"check": f"cron:{jid}",
"expected": "enabled+ok",
"severity": "medium",
"auto_discovered": True
})
break
if discovered:
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
except Exception as e:
discovered.append(f"cron_discovery_error: {e}")
return discovered
# ── 主流程 ──
def run_check(item):
"""运行单个检查项"""
check_spec = item["check"]
expected = item["expected"]
if check_spec.startswith("systemctl:"):
service = check_spec.split(":", 1)[1]
ok, detail = check_systemctl(service)
elif check_spec.startswith("port:"):
port = check_spec.split(":", 1)[1]
ok, detail = check_port(port)
elif check_spec.startswith("proc:"):
pattern = check_spec.split(":", 1)[1]
ok, detail = check_process(pattern)
elif check_spec.startswith("http:"):
url = check_spec.split(":", 1)[1]
ok, detail = check_http(url)
elif check_spec.startswith("disk:"):
mount = check_spec.split(":", 1)[1]
ok, detail = check_disk(mount)
elif check_spec.startswith("fileexists:"):
path = check_spec.split(":", 1)[1]
ok, detail = check_file_exists(path)
elif check_spec.startswith("filefresh:"):
# filefresh:path:max_hours
parts = check_spec.split(":", 2)
path = parts[1]
max_hours = float(parts[2].replace("h", ""))
ok, detail = check_file_freshness(path, max_hours)
elif check_spec.startswith("db:"):
# db:table:field:value:op:threshold
parts = check_spec.split(":", 5)
table = parts[1]
field = parts[2] if len(parts) > 2 else None
value = parts[3] if len(parts) > 3 else None
op = parts[4] if len(parts) > 4 else "today"
threshold = int(parts[5]) if len(parts) > 5 else 0
ok, detail = check_db_table_count(table, field, value, op, threshold)
elif check_spec.startswith("cron:"):
job_id = check_spec.split(":", 1)[1]
ok, detail = check_cron(job_id)
elif check_spec == "cron_errors:last24h":
ok, detail = check_cron_errors_last24h()
elif check_spec == "cron_paused:check":
ok, detail = check_cron_paused()
elif check_spec == "delivery:origin_targets":
ok, detail = check_delivery_targets()
elif check_spec == "pipeline:xiaoguo_signal_flow":
# 综合检查:小果有数据→被我处理
today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0)
unproc, d2 = check_db_table_count("signal_news", None, None, "unprocessed", 30)
ok = today_xiaoguo or unproc
detail = f"today_xiaoguo={d1}, unprocessed={d2}"
else:
ok = False
detail = f"unknown_check:{check_spec}"
level = "ok" if ok else item["severity"]
# 将critical/error/high都映射到error级别
if not ok:
if item["severity"] == "critical":
level = "critical"
elif item["severity"] in ("high", "error"):
level = "error"
else:
level = "warn"
return ok, level, detail
def main():
show_full = "--report" in sys.argv
update = "--update-checklist" in sys.argv
start_time = time.time()
# 加载checklist
if not CHECKLIST_PATH.exists():
print("[SILENT] health_checklist.json 不存在")
return
checklist = json.loads(CHECKLIST_PATH.read_text())
# 自动发现(每小时仅运行一次)
if update:
discovered = self_discovery()
else:
# 定期自动发现(检查上次扫描时间)
meta = checklist.get("meta", {})
last_scan = meta.get("last_full_scan")
if not last_scan or (ctx["started_at"] - datetime.fromisoformat(last_scan)).total_seconds() > 3600:
discovered = self_discovery()
checklist["meta"]["last_full_scan"] = ctx["started_at"].isoformat()
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
else:
discovered = []
# 按分类逐项检查
dayname = ["","","","","","",""][ctx["started_at"].weekday()]
lines = [f"MoFin 系统体检 | {ctx['started_at'].strftime('%Y-%m-%d')}{dayname} | {ctx['started_at'].strftime('%H:%M')}"]
lines.append("" * 50)
for cat in checklist["categories"]:
cat_issues = 0
cat_lines = []
for item in cat["items"]:
ok, level, detail = run_check(item)
msg = f"{item['description']}: {detail}"
log(level, cat["name"], msg, item["id"])
cat_lines.append(emit(msg, level))
if not ok:
cat_issues += 1
# 只在该分类有问题或--report时才输出
if cat_issues > 0 or show_full:
lines.append(f"\n{cat['name']}")
lines.extend(cat_lines)
# 自动发现结果
if discovered:
lines.append(f"\n📎 自动发现:")
for d in discovered:
lines.append(f" {d}")
# 汇总
total = ctx["ok_count"] + ctx["warn_count"] + ctx["error_count"] + ctx["critical_count"]
if total == 0:
total = 1 # 避免除以0
lines.append(f"\n{'' * 50}")
critical = ctx["critical_count"]
errors = ctx["error_count"]
warns = ctx["warn_count"]
ok_count = ctx["ok_count"]
# 构建严重级别输出
severity_parts = []
if critical > 0:
severity_parts.append(f"🔴{critical}严重")
if errors > 0:
severity_parts.append(f"{errors}错误")
if warns > 0:
severity_parts.append(f"⚠️{warns}警告")
if ok_count > 0:
severity_parts.append(f"{ok_count}正常")
lines.append(f"总计: {' | '.join(severity_parts)} ({(time.time()-start_time):.0f}s)")
report = "\n".join(lines)
# 保存历史
try:
history = []
if HISTORY_PATH.exists():
history = json.loads(HISTORY_PATH.read_text())
history.append({
"timestamp": ctx["started_at"].isoformat(),
"ok": ok_count, "warn": warns, "error": errors, "critical": critical,
"duration_s": round(time.time() - start_time, 1)
})
# 保留最近30天
if len(history) > 90:
history = history[-90:]
HISTORY_PATH.write_text(json.dumps(history, ensure_ascii=False, indent=2))
except:
pass
# 输出
# no_agent模式:有问题才出声;--report则强制输出
has_issues = critical > 0 or errors > 0 or warns > 0
if has_issues or show_full:
print(report)
else:
print(f"[SILENT] MoFin 体检通过 | {ok_count}/{total} 检查正常 ({(time.time()-start_time):.0f}s)")
# 如果有严重问题,额外输出可读摘要
if critical > 0 or errors > 0:
print()
print("🔴 需立即处理的问题:")
for entry in ctx["report"]:
if entry["level"] in ("critical", "error"):
print(f" [{entry['level'].upper()}] {entry['category']}: {entry['msg']}")
# 将异常写入 TODO 系统
write_todos_for_issues()
if __name__ == "__main__":
main()