Files
MoFin/scripts/morning_health_check.py
T

858 lines
35 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""morning_health_check.py — MoFin 系统常规体检
每日开盘前(8:00)运行,全面扫描MoFin所有组件健康状况。
当前8类48项(清单自动扩展)。
输出格式化的体检报告,有问题才出声,没问题静默。
核心设计:
从 health_checklist.json 读检查清单
逐项检查,记录状态
报告异常项(只推异常,不推正常)
自动发现新增cron/脚本(通过 self_discovery 函数)
维护检查历史 (health_check_log 表)
自动修复可修问题,不可修写TODO
新增组件自动发现机制:
- 对比当前cron list与checklist中记录的cron id
- 发现新cron → 自动追加到checklist
- 脚本修改 → 标记"需复核"
用法:
python3 scripts/morning_health_check.py [--report] [--update-checklist]
--report: 强制输出完整报告(默认只输出异常)
--update-checklist: 运行自动发现并更新checklist
no_agent模式:只输出异常项,无异常完全静默
"""
import json, os, sqlite3, subprocess, sys, time, urllib.request
from pathlib import Path
from datetime import datetime, timedelta
# ── 路径 ──
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
SCRIPTS_DIR = BASE / "scripts"
PROFILE_SCRIPTS = Path("/home/hmo/.hermes/profiles/position-analyst/scripts")
CHECKLIST_PATH = DATA / "health_checklist.json"
DB_PATH = DATA / "mofin.db"
HERMES_CRON_DIR = Path("/home/hmo/.hermes/profiles/position-analyst/cron")
def derive_fix_action(detail, msg):
"""根据issue信息推导可执行的修复命令"""
# 小果扫描 error → 验证脚本是否存在
if "xiaoguo_scanner" in msg or "小果扫描" in msg:
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/xiaoguo_scanner.py 2>&1 && echo 'ok'"
# system-audit error → 验证拷贝
if "system_audit" in msg or "系统审计" in msg:
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/system_audit.py 2>&1"
# cron errorslast_status=error)→ 验证文件存在,等下次cron运行自动恢复
if "cron" in msg.lower() and "error" in msg.lower() and ("小果" in msg or "系统审计" in msg):
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/xiaoguo_scanner.py /home/hmo/.hermes/profiles/position-analyst/scripts/system_audit.py 2>&1"
# 港股汇率 → 刷新
if "港股汇率" in msg:
return f"cd {BASE} && python3 hk_rate.py 2>&1"
# 价格监控无事件 → 检查进程
if "价格监控" in msg and "0 rows" in msg:
return "ps aux | grep price_monitor | grep -v grep | head -3"
# delivery目标缺失 → 改为local
if "deliver" in msg.lower() or "delivery" in msg.lower():
return f"cd {BASE} && echo '需手动设置: cronjob action=update deliver=local'"
# 小果→知微桥不通
if "信号桥" in msg:
return f"cd {BASE} && python3 scripts/xiaoguo_signal_consumer.py 2>&1"
return None
def auto_fix_issue(issue):
"""对明确可自动修复的问题执行修复,返回 (fixed, fix_msg)"""
item_id = issue.get("detail", "")
msg = issue.get("msg", "")
# 港股汇率缓存缺失 → 生成
if "港股汇率缓存" in msg and "missing" in msg:
try:
# hk_rate.py 写入 ~/.cache/hk_exchange_rate.jsonprofile环境下解析到 profile/home/.cache/
r = subprocess.run(
["python3", str(BASE / "hk_rate.py")],
capture_output=True, text=True, timeout=15
)
if r.returncode == 0:
return True, f"已自动刷新港股汇率缓存: {r.stdout.strip()}"
else:
return False, f"汇率刷新失败: {r.stderr[:100]}"
except Exception as e:
return False, f"汇率刷新异常: {e}"
# 价格监控今天无事件(交易日盘中)→ 检查进程
if "价格监控" in msg and "0 rows" in msg:
now = ctx["started_at"]
if now.weekday() < 5 and 9 <= now.hour <= 15:
# 交易时段,应该有事
ok, detail = check_process("price_monitor")
if not ok:
return True, "已检测:price_monitor进程不存在(需人工介入)"
return True, "已确认:price_monitor进程运行中,但今日无事件(可能无价格触发)"
# 非交易时段→正常
return True, "非交易时段无价格事件属正常"
# 其他问题→不可自动修复
return False, "需人工处理"
def write_todos_for_issues():
"""将体检发现的异常写入 TODO 系统(去重、升级),先尝试自动修复"""
try:
if not ctx["report"]:
return
# 只有 error/critical/warn 才处理
issues = [e for e in ctx["report"] if e["level"] in ("critical", "error", "warn")]
if not issues:
return
# 先尝试自动修复
fixed_issues = []
remaining = []
for issue in issues:
fixed, fix_msg = auto_fix_issue(issue)
if fixed:
fixed_issues.append((issue, fix_msg))
log("ok", issue["category"], f"已自动修复: {fix_msg}", issue.get("detail",""))
else:
remaining.append(issue)
# 输出修复摘要
if fixed_issues:
print()
print("🛠️ 自动修复:")
for issue, fix_msg in fixed_issues:
print(f"{issue['category']}: {fix_msg}")
# 剩余的无法自动修复的→写TODO到数据库
if not remaining:
return
try:
conn = sqlite3.connect(str(DB_PATH))
todo_priority = {"critical": "high", "error": "medium", "warn": "low"}
new_count = 0
for issue in remaining:
title = f"[体检发现] {issue['msg']}"
level = issue["level"]
pri = todo_priority.get(level, "medium")
# 去重:检查是否已存在(含completed的也要查,避免重复加)
r_exist = conn.execute(
"SELECT id, status FROM todos WHERE title=?",
(title,)
).fetchone()
if r_exist:
if r_exist[1] == "blocked":
# 已阻塞的重新打开
conn.execute(
"UPDATE todos SET status='pending', priority=?, note='已重新打开', updated_at=CURRENT_TIMESTAMP WHERE id=?",
(pri, r_exist[0])
)
else:
# 生成fix_action(必须非空)
fix_action = derive_fix_action(issue.get("detail", ""), issue.get("msg", ""))
if not fix_action:
# 没有fix_action就不创建TODO,直接输出到报告里
print(f" ⚠️ 无法自动修复: [{pri}] {title[:60]}")
print(f" 原因: 未知修复方案,需人工分析")
continue
conn.execute(
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
"VALUES (?, ?, ?, 'health_check', 'pending', ?)",
(title,
f"体检发现于 {ctx['started_at'].strftime('%Y-%m-%d %H:%M')}\n分类: {issue['category']}\n详情: {issue.get('detail', '')}\n无法当场修复原因: 需验证/需等待",
pri, fix_action)
)
new_count += 1
conn.commit()
if new_count > 0:
print()
print(f"📋 已加入TODO({new_count}条):")
for r2 in conn.execute(
"SELECT title, priority FROM todos WHERE status='pending' AND source='health_check' "
"ORDER BY created_at DESC LIMIT ?", (new_count,)
).fetchall():
print(f" [{r2[1]}] {r2[0][:70]}")
conn.close()
except Exception as e:
print(f" TODO写入异常: {e}")
except Exception as e:
pass # TODO 写入失败不阻碍体检主流程
# ── 上下文 ──
ctx = {
"report": [],
"issues": [],
"ok_count": 0,
"warn_count": 0,
"error_count": 0,
"critical_count": 0,
"started_at": datetime.now(),
}
def log(level, category, msg, detail=None):
"""记录检查结果"""
ctx["report"].append({
"level": level, "category": category, "msg": msg, "detail": detail,
"timestamp": datetime.now().isoformat()
})
if level == "critical":
ctx["critical_count"] += 1
elif level == "error":
ctx["error_count"] += 1
elif level == "warn":
ctx["warn_count"] += 1
else:
ctx["ok_count"] += 1
def emit(msg, level="ok"):
"""输出一行"""
prefix = {"critical": "🔴", "error": "", "warn": "⚠️", "ok": "", "info": "📎"}.get(level, "")
return f"{prefix} {msg}"
# ── 检查器集合 ──
def check_systemctl(service_name):
"""检查systemd服务状态"""
try:
r = subprocess.run(["systemctl", "is-active", service_name],
capture_output=True, text=True, timeout=5)
status = r.stdout.strip()
return status == "active", f"{status}"
except Exception as e:
return False, f"error:{e}"
def check_port(port):
"""检查端口是否在监听"""
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
return f":{port}" in r.stdout, "listening" if f":{port}" in r.stdout else "not_found"
except Exception as e:
return False, f"error:{e}"
def check_process(pattern):
"""检查进程是否存在"""
try:
r = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5)
return r.returncode == 0, "running" if r.returncode == 0 else "not_found"
except:
return False, "check_error"
def check_http(url, timeout=15):
"""检查HTTP端点是否可达 (清理代理环境变量)"""
try:
# 清理所有代理环境变量
old_env = {}
for k in list(os.environ.keys()):
if 'proxy' in k.lower():
old_env[k] = os.environ.pop(k)
req = urllib.request.Request(url, method="GET")
resp = urllib.request.urlopen(req, timeout=timeout)
# 恢复
for k, v in old_env.items():
os.environ[k] = v
return True, str(resp.status)
except Exception as e:
return False, str(e)[:60]
def check_disk(mount):
"""检查磁盘空间"""
try:
r = subprocess.run(["df", "-h", mount], capture_output=True, text=True, timeout=5)
lines = r.stdout.strip().split("\n")
if len(lines) >= 2:
parts = lines[1].split()
if len(parts) >= 5:
pct = parts[4].replace("%", "")
return int(pct) < 90, f"{pct}% used"
return False, "parse_error"
except:
return False, "check_error"
def check_file_exists(path):
"""检查文件存在"""
p = Path(path)
exists = p.exists()
return exists, f"{p.stat().st_size}B" if exists else "missing"
def check_file_freshness(path, max_hours):
"""检查文件新鲜度"""
p = Path(path)
if not p.exists():
return False, "missing"
mtime = datetime.fromtimestamp(p.stat().st_mtime)
hours_ago = (ctx["started_at"] - mtime).total_seconds() / 3600
return hours_ago < max_hours, f"{hours_ago:.0f}h ago (threshold {max_hours}h)"
def check_db_table_count(table, field, value, op="today", threshold=0):
"""检查数据库中的记录数"""
try:
conn = sqlite3.connect(str(DB_PATH))
cur = conn.cursor()
if op == "today":
today = ctx["started_at"].strftime("%Y-%m-%d")
# 先检查表有哪些列
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
date_col = None
for candidate in ["created_at", "date", "timestamp", "last_scanned_at", "signal_date"]:
if candidate in cols:
date_col = candidate
break
if not date_col:
conn.close()
return True, f"no_date_col_in_{table}"
sql = f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?"
cur.execute(sql, (today,))
elif op == "unprocessed":
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
if "processed" in cols:
sql = f"SELECT COUNT(*) FROM {table} WHERE (processed = 0 OR processed IS NULL)"
elif "source" in cols:
sql = f"SELECT COUNT(*) FROM {table} WHERE source LIKE '%xiaoguo%'"
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
elif op == "count":
if field:
sql = f"SELECT COUNT(*) FROM {table} WHERE {field} = ?"
cur.execute(sql, (value,))
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
count = cur.fetchone()[0]
conn.close()
if op == "unprocessed":
return count < threshold, f"{count} unprocessed"
return count >= threshold, f"{count} rows"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_cron(job_id):
"""检查cron任务状态(通过jobs.json"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if cron_jobs_path.exists():
data = json.loads(cron_jobs_path.read_text())
for job in data.get("jobs", []):
if job.get("id") == job_id or (not job.get("id") and job.get("name") and job_id in str(job)):
enabled = job.get("enabled", True)
if not enabled:
return False, "disabled"
last_status = job.get("last_status")
if last_status and last_status != "ok":
return False, f"status={last_status}"
last_run = job.get("last_run_at", "")
if last_run:
try:
last_dt = datetime.fromisoformat(last_run)
hours_ago = (ctx["started_at"] - last_dt).total_seconds() / 3600
if hours_ago > 48:
return True, f"ok(stale:{hours_ago:.0f}h)"
except:
pass
return True, "ok"
# 没找到该job_id - 可能是不需要检查的cron
return True, "not_in_jobs_json"
return False, "no_jobs_json"
except Exception as e:
return False, f"check_error:{str(e)[:60]}"
def check_cron_errors_last24h():
"""检查最近24h内cron是否有error状态"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()
errors = []
for job in data.get("jobs", []):
last_status = job.get("last_status")
last_run = job.get("last_run_at", "")
if last_status and last_status != "ok" and last_run:
try:
if last_run >= check_time[:19]:
errors.append(f"{job.get('name','?')}({last_status})")
except:
pass
if errors:
return False, f"{len(errors)} errors: {'; '.join(errors[:5])}"
return True, "0 errors"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_cron_paused():
"""检查不应暂停的cron是否被误暂停"""
should_run = [
("3a9fb3300a6a", "价格监控"),
("0851c7838ca3", "小果扫描"),
("e13323928f3a", "自选提醒"),
("b809fcabfa5b", "分支评估"),
]
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
job_map = {job.get("id"): job for job in data.get("jobs", [])}
paused = []
for jid, name in should_run:
job = job_map.get(jid)
if job and not job.get("enabled", True):
paused.append(name)
if paused:
return False, f"paused: {', '.join(paused)}"
return True, "all_expected_running"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_delivery_targets():
"""检查deliver=origin的cron是否有目标"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
issues = []
for job in data.get("jobs", []):
last_delivery_err = job.get("last_delivery_error", "")
if last_delivery_err and "delivery" in last_delivery_err.lower():
issues.append(f"{job.get('name','?')}")
if issues:
return False, f"{len(issues)} issues: {', '.join(issues[:3])}"
return True, "all_ok"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_cron_audit():
"""审计全部cron:最近24h内是否运行过"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()[:19]
stale = []
for job in data.get("jobs", []):
name = job.get("name", "?")
enabled = job.get("enabled", True)
script = job.get("script", "")
last_run = job.get("last_run_at", "")
last_status = job.get("last_status")
if not enabled or not script:
continue
if not last_run:
stale.append(f"{name}(从未运行)")
continue
if last_run[:19] < check_time:
if last_status and last_status == "ok":
stale.append(f"{name}(>24h未运行)")
else:
stale.append(f"{name}(>24h+状态异常)")
if stale:
return False, f"{len(stale)}个cron异常: {'; '.join(stale[:5])}"
total = sum(1 for j in data.get("jobs",[]) if j.get("enabled") and j.get("script"))
return True, f"全部{total}个cron正常"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_meta_health_check_yesterday():
"""元检:昨天体检是否正常完成"""
try:
conn = sqlite3.connect(str(DB_PATH))
yesterday = (ctx["started_at"] - timedelta(days=1)).strftime("%Y-%m-%d")
row = conn.execute(
"SELECT ok_count, error_count, critical_count FROM health_check_log "
"WHERE date(created_at) = ? ORDER BY created_at DESC LIMIT 1",
(yesterday,)
).fetchone()
conn.close()
if row:
if row[1] == 0 and row[2] == 0:
return True, f"昨日体检通过({row[0]}项正常)"
return True, f"昨日体检有{row[1]}错误+{row[2]}严重(已记录)"
return True, "无昨日记录(首次运行)"
except:
return True, "skip"
def check_meta_checklist_completeness():
"""元检:检查清单是否覆盖了所有已知组件"""
try:
added = ctx.get("auto_discovered_items", [])
if added:
return True, f"自动发现并追加了{len(added)}个新组件到清单"
return True, "清单覆盖完整"
except:
return True, "skip"
# ── 自动发现 ──
def self_discovery():
"""自动发现新增组件并更新checklist"""
discovered = []
# 1. 发现新增cron任务
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if cron_jobs_path.exists():
data = json.loads(cron_jobs_path.read_text())
all_crons = [(j.get("id"), j.get("name"), j.get("schedule"), j.get("script"))
for j in data.get("jobs", [])]
# 读当前checklist中已有的cron ID
checklist = json.loads(CHECKLIST_PATH.read_text())
known_cron_ids = set()
for cat in checklist["categories"]:
for item in cat["items"]:
if item["check"].startswith("cron:"):
known_cron_ids.add(item["check"].split(":")[1])
for jid, name, schedule, script in all_crons:
if jid and jid not in known_cron_ids and script:
# 新cron任务,追加到pipeline类
discovered.append(f"新cron: {name}({jid})")
for cat in checklist["categories"]:
if cat["id"] == "pipeline":
cat["items"].append({
"id": f"cron-auto-{jid[:8]}",
"description": f"{name} cron 已调度",
"check": f"cron:{jid}",
"expected": "enabled+ok",
"severity": "medium",
"auto_discovered": True
})
break
if discovered:
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
except Exception as e:
discovered.append(f"cron_discovery_error: {e}")
return discovered
# ── 主流程 ──
def run_check(item):
"""运行单个检查项"""
check_spec = item["check"]
expected = item["expected"]
if check_spec.startswith("systemctl:"):
service = check_spec.split(":", 1)[1]
ok, detail = check_systemctl(service)
elif check_spec.startswith("port:"):
port = check_spec.split(":", 1)[1]
ok, detail = check_port(port)
elif check_spec.startswith("proc:"):
pattern = check_spec.split(":", 1)[1]
ok, detail = check_process(pattern)
elif check_spec.startswith("http:"):
url = check_spec.split(":", 1)[1]
ok, detail = check_http(url)
elif check_spec.startswith("disk:"):
mount = check_spec.split(":", 1)[1]
ok, detail = check_disk(mount)
elif check_spec.startswith("fileexists:"):
path = check_spec.split(":", 1)[1]
ok, detail = check_file_exists(path)
elif check_spec.startswith("filefresh:"):
# filefresh:path:max_hours
parts = check_spec.split(":", 2)
path = parts[1]
max_hours = float(parts[2].replace("h", ""))
ok, detail = check_file_freshness(path, max_hours)
elif check_spec.startswith("db:"):
# db:table:field:value:op:threshold
parts = check_spec.split(":", 5)
table = parts[1]
field = parts[2] if len(parts) > 2 else None
value = parts[3] if len(parts) > 3 else None
op = parts[4] if len(parts) > 4 else "today"
threshold = int(parts[5]) if len(parts) > 5 else 0
ok, detail = check_db_table_count(table, field, value, op, threshold)
elif check_spec.startswith("cron:"):
job_id = check_spec.split(":", 1)[1]
ok, detail = check_cron(job_id)
elif check_spec == "cron_errors:last24h":
ok, detail = check_cron_errors_last24h()
elif check_spec == "cron_paused:check":
ok, detail = check_cron_paused()
elif check_spec == "delivery:origin_targets":
ok, detail = check_delivery_targets()
elif check_spec == "cron_audit:all":
ok, detail = check_cron_audit()
elif check_spec == "meta:health_check_yesterday":
ok, detail = check_meta_health_check_yesterday()
elif check_spec == "meta:checklist_completeness":
ok, detail = check_meta_checklist_completeness()
elif check_spec == "pipeline:xiaoguo_signal_flow":
today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0)
unproc, d2 = check_db_table_count("signal_news", None, None, "unprocessed", 30)
ok = today_xiaoguo or unproc
detail = f"today_xiaoguo={d1}, unprocessed={d2}"
elif check_spec == "pipeline:registry_audit":
ok = True
gaps = []
unregistered = []
try:
import json as j2
reg = j2.loads(open(str(DATA / "pipeline_registry.json")).read())
for p in reg.get("pipelines", []):
if not p.get("verified"):
gaps.append(p["name"])
# 自动发现:从cron jobs.json读所有脚本,交叉比对注册表
known_sources = set()
for p in reg.get("pipelines", []):
src = p.get("source", "")
# 从source描述中提取脚本名
import re as rr
for m in rr.findall(r'[\w_-]+\.py', src):
known_sources.add(m.replace('.py', ''))
# 从jobs.json动态读取所有启用脚本
unregistered = []
try:
jobs_data = j2.loads(open(str(HERMES_CRON_DIR / "jobs.json")).read())
active_scripts = set()
for job in jobs_data.get("jobs", []):
script = job.get("script", "") or ""
if script and job.get("enabled", True):
name = script.replace('.py', '')
active_scripts.add(name)
for s in sorted(active_scripts):
if s not in known_sources:
unregistered.append(s)
except Exception:
pass
if unregistered:
ok = False
detail = f"{len(gaps)}条待验证 + {len(unregistered)}个新组件未注册"
# 自动修复:读脚本docstring,推算数据流
try:
for s in unregistered:
script_path = HERMES_CRON_DIR.parent / "scripts" / f"{s}.py"
desc = "未知"
source_info = f"{s}.py"
consumer_info = "未知"
if script_path.exists():
content = script_path.read_text()
# 提取docstring
import re as rr2
doc_match = rr2.search(r'"""(.*?)"""', content, rr2.DOTALL)
if doc_match:
doc_text = doc_match.group(1).strip()
desc = doc_text.split('\\n')[0][:80]
# 尝试从docstring中提取管道信息
pipe_match = rr2.search(r'管道[:].*?(?=\\n|$)', doc_text)
if pipe_match:
consumer_info = pipe_match.group(0).replace('管道','').strip(': ')
# 检测写入模式
if 'signal_news' in content:
consumer_info = 'signal_news表'
if 'macro_risk_state' in content:
consumer_info = 'macro_risk_state.json'
if 'watchlist' in content.lower():
consumer_info = 'watchlist.json / decisions.json'
if 'INSERT INTO' in content:
for tbl in ['todos', 'price_events', 'macro_context_log', 'accuracy_stats']:
if tbl in content:
consumer_info = f'{tbl}'
if '.write_text' in content or 'json.dump' in content:
for path in ['macro_risk_state', 'macro_context', 'market', 'portfolio', 'decisions']:
if path in content:
consumer_info = f'{path}.json'
reg["pipelines"].append({
"id": f"auto-{s}",
"name": desc[:60],
"source": source_info,
"consumer": consumer_info,
"end_user": "待确认",
"verified": False,
"gap": f"自动发现({desc[:60]})",
"fix": "手动编辑pipeline_registry.json完善此项"
})
open(str(DATA / "pipeline_registry.json"), 'w').write(
j2.dumps(reg, ensure_ascii=False, indent=2))
detail += f" → 已自动注册{len(unregistered)}个(含推断)"
except Exception as e:
detail += f" (自动注册异常:{str(e)[:30]})"
elif gaps:
ok = False
detail = f"{len(gaps)}条管道未验证: {', '.join(gaps[:5])}"
else:
detail = f"全部{len(reg['pipelines'])}条管道正常"
except Exception as e:
ok = True
detail = f"注册表不可读({str(e)[:60]})"
else:
ok = False
detail = f"unknown_check:{check_spec}"
level = "ok" if ok else item["severity"]
# 将critical/error/high都映射到error级别
if not ok:
if item["severity"] == "critical":
level = "critical"
elif item["severity"] in ("high", "error"):
level = "error"
else:
level = "warn"
return ok, level, detail
def main():
show_full = "--report" in sys.argv
update = "--update-checklist" in sys.argv
start_time = time.time()
# 加载checklist
if not CHECKLIST_PATH.exists():
print("[SILENT] health_checklist.json 不存在")
return
checklist = json.loads(CHECKLIST_PATH.read_text())
# 自动发现(每小时仅运行一次)
if update:
discovered = self_discovery()
else:
# 定期自动发现(检查上次扫描时间)
meta = checklist.get("meta", {})
last_scan = meta.get("last_full_scan")
if not last_scan or (ctx["started_at"] - datetime.fromisoformat(last_scan)).total_seconds() > 3600:
discovered = self_discovery()
checklist["meta"]["last_full_scan"] = ctx["started_at"].isoformat()
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
else:
discovered = []
# 按分类逐项检查
dayname = ["","","","","","",""][ctx["started_at"].weekday()]
lines = [f"MoFin 系统体检 | {ctx['started_at'].strftime('%Y-%m-%d')}{dayname} | {ctx['started_at'].strftime('%H:%M')}"]
lines.append("" * 50)
for cat in checklist["categories"]:
cat_issues = 0
cat_lines = []
for item in cat["items"]:
ok, level, detail = run_check(item)
msg = f"{item['description']}: {detail}"
log(level, cat["name"], msg, item["id"])
cat_lines.append(emit(msg, level))
if not ok:
cat_issues += 1
# 只在该分类有问题或--report时才输出
if cat_issues > 0 or show_full:
lines.append(f"\n{cat['name']}")
lines.extend(cat_lines)
# 自动发现结果
if discovered:
lines.append(f"\n📎 自动发现:")
for d in discovered:
lines.append(f" {d}")
# 汇总
total = ctx["ok_count"] + ctx["warn_count"] + ctx["error_count"] + ctx["critical_count"]
if total == 0:
total = 1 # 避免除以0
lines.append(f"\n{'' * 50}")
critical = ctx["critical_count"]
errors = ctx["error_count"]
warns = ctx["warn_count"]
ok_count = ctx["ok_count"]
# 构建严重级别输出
severity_parts = []
if critical > 0:
severity_parts.append(f"🔴{critical}严重")
if errors > 0:
severity_parts.append(f"{errors}错误")
if warns > 0:
severity_parts.append(f"⚠️{warns}警告")
if ok_count > 0:
severity_parts.append(f"{ok_count}正常")
lines.append(f"总计: {' | '.join(severity_parts)} ({(time.time()-start_time):.0f}s)")
report = "\n".join(lines)
# 保存历史到DB
try:
conn_hist = sqlite3.connect(str(DB_PATH))
details = json.dumps([e for e in ctx["report"] if e["level"] in ("critical", "error")])
conn_hist.execute(
"INSERT INTO health_check_log (ok_count, warn_count, error_count, critical_count, duration_s, details) "
"VALUES (?, ?, ?, ?, ?, ?)",
(ok_count, warns, errors, critical, round(time.time()-start_time, 1), details))
conn_hist.commit()
conn_hist.close()
except:
pass
# 输出
# no_agent模式:有问题才出声;--report则强制输出
has_issues = critical > 0 or errors > 0 or warns > 0
if has_issues or show_full:
print(report)
else:
print(f"[SILENT] MoFin 体检通过 | {ok_count}/{total} 检查正常 ({(time.time()-start_time):.0f}s)")
# 如果有严重问题,额外输出可读摘要
if critical > 0 or errors > 0:
print()
print("🔴 需立即处理的问题:")
for entry in ctx["report"]:
if entry["level"] in ("critical", "error"):
print(f" [{entry['level'].upper()}] {entry['category']}: {entry['msg']}")
# 检查是否有执行器升级来的TODO(通知失败挂起的)
try:
conn2 = sqlite3.connect(str(DB_PATH))
needs_llm = conn2.execute(
"SELECT id, title, priority, created_at, note FROM todos "
"WHERE status='needs_llm' "
"ORDER BY CASE priority WHEN 'high' THEN 0 WHEN 'medium' THEN 1 ELSE 2 END, created_at ASC LIMIT 10"
).fetchall()
if needs_llm:
print()
print("🔶 需知微介入(执行器无法自动修复):")
for n in needs_llm:
note = (n[4] or "")[:60]
print(f" [{n[2]}] #{n[0]} {n[1][:60]}{note}")
conn2.close()
except:
pass
# 将异常写入 TODO 系统
write_todos_for_issues()
if __name__ == "__main__":
main()