843012e045
- 移除health_check_history.json → health_check_log表 - 移除health_known_issues.json(未使用) - 移除TODO_PATH(已用DB替代) - 更新文档注释
763 lines
30 KiB
Python
Executable File
763 lines
30 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""morning_health_check.py — MoFin 系统常规体检
|
||
|
||
每日开盘前(8:00)运行,全面扫描MoFin所有组件健康状况。
|
||
当前8类48项(清单自动扩展)。
|
||
输出格式化的体检报告,有问题才出声,没问题静默。
|
||
|
||
核心设计:
|
||
从 health_checklist.json 读检查清单
|
||
逐项检查,记录状态
|
||
报告异常项(只推异常,不推正常)
|
||
自动发现新增cron/脚本(通过 self_discovery 函数)
|
||
维护检查历史 (health_check_log 表)
|
||
自动修复可修问题,不可修写TODO
|
||
|
||
新增组件自动发现机制:
|
||
- 对比当前cron list与checklist中记录的cron id
|
||
- 发现新cron → 自动追加到checklist
|
||
- 脚本修改 → 标记"需复核"
|
||
|
||
用法:
|
||
python3 scripts/morning_health_check.py [--report] [--update-checklist]
|
||
--report: 强制输出完整报告(默认只输出异常)
|
||
--update-checklist: 运行自动发现并更新checklist
|
||
|
||
no_agent模式:只输出异常项,无异常完全静默
|
||
"""
|
||
|
||
import json, os, sqlite3, subprocess, sys, time, urllib.request
|
||
from pathlib import Path
|
||
from datetime import datetime, timedelta
|
||
|
||
# ── 路径 ──
|
||
BASE = Path("/home/hmo/MoFin")
|
||
DATA = BASE / "data"
|
||
SCRIPTS_DIR = BASE / "scripts"
|
||
PROFILE_SCRIPTS = Path("/home/hmo/.hermes/profiles/position-analyst/scripts")
|
||
CHECKLIST_PATH = DATA / "health_checklist.json"
|
||
DB_PATH = DATA / "mofin.db"
|
||
HERMES_CRON_DIR = Path("/home/hmo/.hermes/profiles/position-analyst/cron")
|
||
|
||
|
||
def derive_fix_action(detail, msg):
|
||
"""根据issue信息推导可执行的修复命令"""
|
||
# 小果扫描 error → 验证脚本是否存在
|
||
if "xiaoguo_scanner" in msg or "小果扫描" in msg:
|
||
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/xiaoguo_scanner.py 2>&1 && echo 'ok'"
|
||
# system-audit error → 验证拷贝
|
||
if "system_audit" in msg or "系统审计" in msg:
|
||
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/system_audit.py 2>&1"
|
||
# cron errors(last_status=error)→ 验证文件存在,等下次cron运行自动恢复
|
||
if "cron" in msg.lower() and "error" in msg.lower() and ("小果" in msg or "系统审计" in msg):
|
||
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/xiaoguo_scanner.py /home/hmo/.hermes/profiles/position-analyst/scripts/system_audit.py 2>&1"
|
||
# 港股汇率 → 刷新
|
||
if "港股汇率" in msg:
|
||
return f"cd {BASE} && python3 hk_rate.py 2>&1"
|
||
# 价格监控无事件 → 检查进程
|
||
if "价格监控" in msg and "0 rows" in msg:
|
||
return "ps aux | grep price_monitor | grep -v grep | head -3"
|
||
# delivery目标缺失 → 改为local
|
||
if "deliver" in msg.lower() or "delivery" in msg.lower():
|
||
return f"cd {BASE} && echo '需手动设置: cronjob action=update deliver=local'"
|
||
# 小果→知微桥不通
|
||
if "信号桥" in msg:
|
||
return f"cd {BASE} && python3 scripts/xiaoguo_signal_consumer.py 2>&1"
|
||
return None
|
||
|
||
|
||
def auto_fix_issue(issue):
|
||
"""对明确可自动修复的问题执行修复,返回 (fixed, fix_msg)"""
|
||
item_id = issue.get("detail", "")
|
||
msg = issue.get("msg", "")
|
||
|
||
# 港股汇率缓存缺失 → 生成
|
||
if "港股汇率缓存" in msg and "missing" in msg:
|
||
try:
|
||
# hk_rate.py 写入 ~/.cache/hk_exchange_rate.json,profile环境下解析到 profile/home/.cache/
|
||
r = subprocess.run(
|
||
["python3", str(BASE / "hk_rate.py")],
|
||
capture_output=True, text=True, timeout=15
|
||
)
|
||
if r.returncode == 0:
|
||
return True, f"已自动刷新港股汇率缓存: {r.stdout.strip()}"
|
||
else:
|
||
return False, f"汇率刷新失败: {r.stderr[:100]}"
|
||
except Exception as e:
|
||
return False, f"汇率刷新异常: {e}"
|
||
|
||
# 价格监控今天无事件(交易日盘中)→ 检查进程
|
||
if "价格监控" in msg and "0 rows" in msg:
|
||
now = ctx["started_at"]
|
||
if now.weekday() < 5 and 9 <= now.hour <= 15:
|
||
# 交易时段,应该有事
|
||
ok, detail = check_process("price_monitor")
|
||
if not ok:
|
||
return True, "已检测:price_monitor进程不存在(需人工介入)"
|
||
return True, "已确认:price_monitor进程运行中,但今日无事件(可能无价格触发)"
|
||
# 非交易时段→正常
|
||
return True, "非交易时段无价格事件属正常"
|
||
|
||
# 其他问题→不可自动修复
|
||
return False, "需人工处理"
|
||
|
||
|
||
def write_todos_for_issues():
|
||
"""将体检发现的异常写入 TODO 系统(去重、升级),先尝试自动修复"""
|
||
try:
|
||
if not ctx["report"]:
|
||
return
|
||
|
||
# 只有 error/critical/warn 才处理
|
||
issues = [e for e in ctx["report"] if e["level"] in ("critical", "error", "warn")]
|
||
if not issues:
|
||
return
|
||
|
||
# 先尝试自动修复
|
||
fixed_issues = []
|
||
remaining = []
|
||
for issue in issues:
|
||
fixed, fix_msg = auto_fix_issue(issue)
|
||
if fixed:
|
||
fixed_issues.append((issue, fix_msg))
|
||
log("ok", issue["category"], f"已自动修复: {fix_msg}", issue.get("detail",""))
|
||
else:
|
||
remaining.append(issue)
|
||
|
||
# 输出修复摘要
|
||
if fixed_issues:
|
||
print()
|
||
print("🛠️ 自动修复:")
|
||
for issue, fix_msg in fixed_issues:
|
||
print(f" ✅ {issue['category']}: {fix_msg}")
|
||
|
||
# 剩余的无法自动修复的→写TODO到数据库
|
||
if not remaining:
|
||
return
|
||
|
||
try:
|
||
conn = sqlite3.connect(str(DB_PATH))
|
||
todo_priority = {"critical": "high", "error": "medium", "warn": "low"}
|
||
new_count = 0
|
||
|
||
for issue in remaining:
|
||
title = f"[体检发现] {issue['msg']}"
|
||
level = issue["level"]
|
||
pri = todo_priority.get(level, "medium")
|
||
|
||
# 去重:检查是否已存在(含completed的也要查,避免重复加)
|
||
r_exist = conn.execute(
|
||
"SELECT id, status FROM todos WHERE title=?",
|
||
(title,)
|
||
).fetchone()
|
||
|
||
if r_exist:
|
||
if r_exist[1] == "blocked":
|
||
# 已阻塞的重新打开
|
||
conn.execute(
|
||
"UPDATE todos SET status='pending', priority=?, note='已重新打开', updated_at=CURRENT_TIMESTAMP WHERE id=?",
|
||
(pri, r_exist[0])
|
||
)
|
||
else:
|
||
# 生成fix_action(必须非空)
|
||
fix_action = derive_fix_action(issue.get("detail", ""), issue.get("msg", ""))
|
||
if not fix_action:
|
||
# 没有fix_action就不创建TODO,直接输出到报告里
|
||
print(f" ⚠️ 无法自动修复: [{pri}] {title[:60]}")
|
||
print(f" 原因: 未知修复方案,需人工分析")
|
||
continue
|
||
conn.execute(
|
||
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
|
||
"VALUES (?, ?, ?, 'health_check', 'pending', ?)",
|
||
(title,
|
||
f"体检发现于 {ctx['started_at'].strftime('%Y-%m-%d %H:%M')}\n分类: {issue['category']}\n详情: {issue.get('detail', '')}\n无法当场修复原因: 需验证/需等待",
|
||
pri, fix_action)
|
||
)
|
||
new_count += 1
|
||
|
||
conn.commit()
|
||
|
||
if new_count > 0:
|
||
print()
|
||
print(f"📋 已加入TODO({new_count}条):")
|
||
for r2 in conn.execute(
|
||
"SELECT title, priority FROM todos WHERE status='pending' AND source='health_check' "
|
||
"ORDER BY created_at DESC LIMIT ?", (new_count,)
|
||
).fetchall():
|
||
print(f" [{r2[1]}] {r2[0][:70]}")
|
||
conn.close()
|
||
except Exception as e:
|
||
print(f" TODO写入异常: {e}")
|
||
except Exception as e:
|
||
pass # TODO 写入失败不阻碍体检主流程
|
||
|
||
# ── 上下文 ──
|
||
ctx = {
|
||
"report": [],
|
||
"issues": [],
|
||
"ok_count": 0,
|
||
"warn_count": 0,
|
||
"error_count": 0,
|
||
"critical_count": 0,
|
||
"started_at": datetime.now(),
|
||
}
|
||
|
||
def log(level, category, msg, detail=None):
|
||
"""记录检查结果"""
|
||
ctx["report"].append({
|
||
"level": level, "category": category, "msg": msg, "detail": detail,
|
||
"timestamp": datetime.now().isoformat()
|
||
})
|
||
if level == "critical":
|
||
ctx["critical_count"] += 1
|
||
elif level == "error":
|
||
ctx["error_count"] += 1
|
||
elif level == "warn":
|
||
ctx["warn_count"] += 1
|
||
else:
|
||
ctx["ok_count"] += 1
|
||
|
||
def emit(msg, level="ok"):
|
||
"""输出一行"""
|
||
prefix = {"critical": "🔴", "error": "❌", "warn": "⚠️", "ok": "✅", "info": "📎"}.get(level, "•")
|
||
return f"{prefix} {msg}"
|
||
|
||
# ── 检查器集合 ──
|
||
|
||
def check_systemctl(service_name):
|
||
"""检查systemd服务状态"""
|
||
try:
|
||
r = subprocess.run(["systemctl", "is-active", service_name],
|
||
capture_output=True, text=True, timeout=5)
|
||
status = r.stdout.strip()
|
||
return status == "active", f"{status}"
|
||
except Exception as e:
|
||
return False, f"error:{e}"
|
||
|
||
def check_port(port):
|
||
"""检查端口是否在监听"""
|
||
try:
|
||
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
|
||
return f":{port}" in r.stdout, "listening" if f":{port}" in r.stdout else "not_found"
|
||
except Exception as e:
|
||
return False, f"error:{e}"
|
||
|
||
def check_process(pattern):
|
||
"""检查进程是否存在"""
|
||
try:
|
||
r = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5)
|
||
return r.returncode == 0, "running" if r.returncode == 0 else "not_found"
|
||
except:
|
||
return False, "check_error"
|
||
|
||
def check_http(url, timeout=15):
|
||
"""检查HTTP端点是否可达 (清理代理环境变量)"""
|
||
try:
|
||
# 清理所有代理环境变量
|
||
old_env = {}
|
||
for k in list(os.environ.keys()):
|
||
if 'proxy' in k.lower():
|
||
old_env[k] = os.environ.pop(k)
|
||
req = urllib.request.Request(url, method="GET")
|
||
resp = urllib.request.urlopen(req, timeout=timeout)
|
||
# 恢复
|
||
for k, v in old_env.items():
|
||
os.environ[k] = v
|
||
return True, str(resp.status)
|
||
except Exception as e:
|
||
return False, str(e)[:60]
|
||
|
||
def check_disk(mount):
|
||
"""检查磁盘空间"""
|
||
try:
|
||
r = subprocess.run(["df", "-h", mount], capture_output=True, text=True, timeout=5)
|
||
lines = r.stdout.strip().split("\n")
|
||
if len(lines) >= 2:
|
||
parts = lines[1].split()
|
||
if len(parts) >= 5:
|
||
pct = parts[4].replace("%", "")
|
||
return int(pct) < 90, f"{pct}% used"
|
||
return False, "parse_error"
|
||
except:
|
||
return False, "check_error"
|
||
|
||
def check_file_exists(path):
|
||
"""检查文件存在"""
|
||
p = Path(path)
|
||
exists = p.exists()
|
||
return exists, f"{p.stat().st_size}B" if exists else "missing"
|
||
|
||
def check_file_freshness(path, max_hours):
|
||
"""检查文件新鲜度"""
|
||
p = Path(path)
|
||
if not p.exists():
|
||
return False, "missing"
|
||
mtime = datetime.fromtimestamp(p.stat().st_mtime)
|
||
hours_ago = (ctx["started_at"] - mtime).total_seconds() / 3600
|
||
return hours_ago < max_hours, f"{hours_ago:.0f}h ago (threshold {max_hours}h)"
|
||
|
||
def check_db_table_count(table, field, value, op="today", threshold=0):
|
||
"""检查数据库中的记录数"""
|
||
try:
|
||
conn = sqlite3.connect(str(DB_PATH))
|
||
cur = conn.cursor()
|
||
if op == "today":
|
||
today = ctx["started_at"].strftime("%Y-%m-%d")
|
||
# 先检查表有哪些列
|
||
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
|
||
date_col = None
|
||
for candidate in ["created_at", "date", "timestamp", "last_scanned_at", "signal_date"]:
|
||
if candidate in cols:
|
||
date_col = candidate
|
||
break
|
||
if not date_col:
|
||
conn.close()
|
||
return True, f"no_date_col_in_{table}"
|
||
sql = f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?"
|
||
cur.execute(sql, (today,))
|
||
elif op == "unprocessed":
|
||
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
|
||
if "processed" in cols:
|
||
sql = f"SELECT COUNT(*) FROM {table} WHERE (processed = 0 OR processed IS NULL)"
|
||
elif "source" in cols:
|
||
sql = f"SELECT COUNT(*) FROM {table} WHERE source LIKE '%xiaoguo%'"
|
||
else:
|
||
sql = f"SELECT COUNT(*) FROM {table}"
|
||
cur.execute(sql)
|
||
elif op == "count":
|
||
if field:
|
||
sql = f"SELECT COUNT(*) FROM {table} WHERE {field} = ?"
|
||
cur.execute(sql, (value,))
|
||
else:
|
||
sql = f"SELECT COUNT(*) FROM {table}"
|
||
cur.execute(sql)
|
||
else:
|
||
sql = f"SELECT COUNT(*) FROM {table}"
|
||
cur.execute(sql)
|
||
count = cur.fetchone()[0]
|
||
conn.close()
|
||
if op == "unprocessed":
|
||
return count < threshold, f"{count} unprocessed"
|
||
return count >= threshold, f"{count} rows"
|
||
except Exception as e:
|
||
return True, f"skip({str(e)[:60]})"
|
||
|
||
def check_cron(job_id):
|
||
"""检查cron任务状态(通过jobs.json)"""
|
||
try:
|
||
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
|
||
if cron_jobs_path.exists():
|
||
data = json.loads(cron_jobs_path.read_text())
|
||
for job in data.get("jobs", []):
|
||
if job.get("id") == job_id or (not job.get("id") and job.get("name") and job_id in str(job)):
|
||
enabled = job.get("enabled", True)
|
||
if not enabled:
|
||
return False, "disabled"
|
||
last_status = job.get("last_status")
|
||
if last_status and last_status != "ok":
|
||
return False, f"status={last_status}"
|
||
last_run = job.get("last_run_at", "")
|
||
if last_run:
|
||
try:
|
||
last_dt = datetime.fromisoformat(last_run)
|
||
hours_ago = (ctx["started_at"] - last_dt).total_seconds() / 3600
|
||
if hours_ago > 48:
|
||
return True, f"ok(stale:{hours_ago:.0f}h)"
|
||
except:
|
||
pass
|
||
return True, "ok"
|
||
# 没找到该job_id - 可能是不需要检查的cron
|
||
return True, "not_in_jobs_json"
|
||
return False, "no_jobs_json"
|
||
except Exception as e:
|
||
return False, f"check_error:{str(e)[:60]}"
|
||
|
||
def check_cron_errors_last24h():
|
||
"""检查最近24h内cron是否有error状态"""
|
||
try:
|
||
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
|
||
if not cron_jobs_path.exists():
|
||
return True, "no_jobs_json"
|
||
data = json.loads(cron_jobs_path.read_text())
|
||
check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()
|
||
errors = []
|
||
for job in data.get("jobs", []):
|
||
last_status = job.get("last_status")
|
||
last_run = job.get("last_run_at", "")
|
||
if last_status and last_status != "ok" and last_run:
|
||
try:
|
||
if last_run >= check_time[:19]:
|
||
errors.append(f"{job.get('name','?')}({last_status})")
|
||
except:
|
||
pass
|
||
if errors:
|
||
return False, f"{len(errors)} errors: {'; '.join(errors[:5])}"
|
||
return True, "0 errors"
|
||
except Exception as e:
|
||
return True, f"skip({str(e)[:60]})"
|
||
|
||
def check_cron_paused():
|
||
"""检查不应暂停的cron是否被误暂停"""
|
||
should_run = [
|
||
("3a9fb3300a6a", "价格监控"),
|
||
("0851c7838ca3", "小果扫描"),
|
||
("e13323928f3a", "自选提醒"),
|
||
("b809fcabfa5b", "分支评估"),
|
||
]
|
||
try:
|
||
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
|
||
if not cron_jobs_path.exists():
|
||
return True, "no_jobs_json"
|
||
data = json.loads(cron_jobs_path.read_text())
|
||
job_map = {job.get("id"): job for job in data.get("jobs", [])}
|
||
paused = []
|
||
for jid, name in should_run:
|
||
job = job_map.get(jid)
|
||
if job and not job.get("enabled", True):
|
||
paused.append(name)
|
||
if paused:
|
||
return False, f"paused: {', '.join(paused)}"
|
||
return True, "all_expected_running"
|
||
except Exception as e:
|
||
return True, f"skip({str(e)[:60]})"
|
||
|
||
def check_delivery_targets():
|
||
"""检查deliver=origin的cron是否有目标"""
|
||
try:
|
||
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
|
||
if not cron_jobs_path.exists():
|
||
return True, "no_jobs_json"
|
||
data = json.loads(cron_jobs_path.read_text())
|
||
issues = []
|
||
for job in data.get("jobs", []):
|
||
last_delivery_err = job.get("last_delivery_error", "")
|
||
if last_delivery_err and "delivery" in last_delivery_err.lower():
|
||
issues.append(f"{job.get('name','?')}")
|
||
if issues:
|
||
return False, f"{len(issues)} issues: {', '.join(issues[:3])}"
|
||
return True, "all_ok"
|
||
except Exception as e:
|
||
return True, f"skip({str(e)[:60]})"
|
||
|
||
|
||
def check_cron_audit():
|
||
"""审计全部cron:最近24h内是否运行过"""
|
||
try:
|
||
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
|
||
if not cron_jobs_path.exists():
|
||
return True, "no_jobs_json"
|
||
data = json.loads(cron_jobs_path.read_text())
|
||
check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()[:19]
|
||
stale = []
|
||
for job in data.get("jobs", []):
|
||
name = job.get("name", "?")
|
||
enabled = job.get("enabled", True)
|
||
script = job.get("script", "")
|
||
last_run = job.get("last_run_at", "")
|
||
last_status = job.get("last_status")
|
||
if not enabled or not script:
|
||
continue
|
||
if not last_run:
|
||
stale.append(f"{name}(从未运行)")
|
||
continue
|
||
if last_run[:19] < check_time:
|
||
if last_status and last_status == "ok":
|
||
stale.append(f"{name}(>24h未运行)")
|
||
else:
|
||
stale.append(f"{name}(>24h+状态异常)")
|
||
if stale:
|
||
return False, f"{len(stale)}个cron异常: {'; '.join(stale[:5])}"
|
||
total = sum(1 for j in data.get("jobs",[]) if j.get("enabled") and j.get("script"))
|
||
return True, f"全部{total}个cron正常"
|
||
except Exception as e:
|
||
return True, f"skip({str(e)[:60]})"
|
||
|
||
|
||
def check_meta_health_check_yesterday():
|
||
"""元检:昨天体检是否正常完成"""
|
||
try:
|
||
conn = sqlite3.connect(str(DB_PATH))
|
||
yesterday = (ctx["started_at"] - timedelta(days=1)).strftime("%Y-%m-%d")
|
||
row = conn.execute(
|
||
"SELECT ok_count, error_count, critical_count FROM health_check_log "
|
||
"WHERE date(created_at) = ? ORDER BY created_at DESC LIMIT 1",
|
||
(yesterday,)
|
||
).fetchone()
|
||
conn.close()
|
||
if row:
|
||
if row[1] == 0 and row[2] == 0:
|
||
return True, f"昨日体检通过({row[0]}项正常)"
|
||
return True, f"昨日体检有{row[1]}错误+{row[2]}严重(已记录)"
|
||
return True, "无昨日记录(首次运行)"
|
||
except:
|
||
return True, "skip"
|
||
|
||
|
||
def check_meta_checklist_completeness():
|
||
"""元检:检查清单是否覆盖了所有已知组件"""
|
||
try:
|
||
added = ctx.get("auto_discovered_items", [])
|
||
if added:
|
||
return True, f"自动发现并追加了{len(added)}个新组件到清单"
|
||
return True, "清单覆盖完整"
|
||
except:
|
||
return True, "skip"
|
||
|
||
|
||
# ── 自动发现 ──
|
||
def self_discovery():
|
||
"""自动发现新增组件并更新checklist"""
|
||
discovered = []
|
||
|
||
# 1. 发现新增cron任务
|
||
try:
|
||
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
|
||
if cron_jobs_path.exists():
|
||
data = json.loads(cron_jobs_path.read_text())
|
||
all_crons = [(j.get("id"), j.get("name"), j.get("schedule"), j.get("script"))
|
||
for j in data.get("jobs", [])]
|
||
|
||
# 读当前checklist中已有的cron ID
|
||
checklist = json.loads(CHECKLIST_PATH.read_text())
|
||
known_cron_ids = set()
|
||
for cat in checklist["categories"]:
|
||
for item in cat["items"]:
|
||
if item["check"].startswith("cron:"):
|
||
known_cron_ids.add(item["check"].split(":")[1])
|
||
|
||
for jid, name, schedule, script in all_crons:
|
||
if jid and jid not in known_cron_ids and script:
|
||
# 新cron任务,追加到pipeline类
|
||
discovered.append(f"新cron: {name}({jid})")
|
||
for cat in checklist["categories"]:
|
||
if cat["id"] == "pipeline":
|
||
cat["items"].append({
|
||
"id": f"cron-auto-{jid[:8]}",
|
||
"description": f"{name} cron 已调度",
|
||
"check": f"cron:{jid}",
|
||
"expected": "enabled+ok",
|
||
"severity": "medium",
|
||
"auto_discovered": True
|
||
})
|
||
break
|
||
|
||
if discovered:
|
||
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
|
||
except Exception as e:
|
||
discovered.append(f"cron_discovery_error: {e}")
|
||
|
||
return discovered
|
||
|
||
# ── 主流程 ──
|
||
|
||
def run_check(item):
|
||
"""运行单个检查项"""
|
||
check_spec = item["check"]
|
||
expected = item["expected"]
|
||
|
||
if check_spec.startswith("systemctl:"):
|
||
service = check_spec.split(":", 1)[1]
|
||
ok, detail = check_systemctl(service)
|
||
elif check_spec.startswith("port:"):
|
||
port = check_spec.split(":", 1)[1]
|
||
ok, detail = check_port(port)
|
||
elif check_spec.startswith("proc:"):
|
||
pattern = check_spec.split(":", 1)[1]
|
||
ok, detail = check_process(pattern)
|
||
elif check_spec.startswith("http:"):
|
||
url = check_spec.split(":", 1)[1]
|
||
ok, detail = check_http(url)
|
||
elif check_spec.startswith("disk:"):
|
||
mount = check_spec.split(":", 1)[1]
|
||
ok, detail = check_disk(mount)
|
||
elif check_spec.startswith("fileexists:"):
|
||
path = check_spec.split(":", 1)[1]
|
||
ok, detail = check_file_exists(path)
|
||
elif check_spec.startswith("filefresh:"):
|
||
# filefresh:path:max_hours
|
||
parts = check_spec.split(":", 2)
|
||
path = parts[1]
|
||
max_hours = float(parts[2].replace("h", ""))
|
||
ok, detail = check_file_freshness(path, max_hours)
|
||
elif check_spec.startswith("db:"):
|
||
# db:table:field:value:op:threshold
|
||
parts = check_spec.split(":", 5)
|
||
table = parts[1]
|
||
field = parts[2] if len(parts) > 2 else None
|
||
value = parts[3] if len(parts) > 3 else None
|
||
op = parts[4] if len(parts) > 4 else "today"
|
||
threshold = int(parts[5]) if len(parts) > 5 else 0
|
||
ok, detail = check_db_table_count(table, field, value, op, threshold)
|
||
elif check_spec.startswith("cron:"):
|
||
job_id = check_spec.split(":", 1)[1]
|
||
ok, detail = check_cron(job_id)
|
||
elif check_spec == "cron_errors:last24h":
|
||
ok, detail = check_cron_errors_last24h()
|
||
elif check_spec == "cron_paused:check":
|
||
ok, detail = check_cron_paused()
|
||
elif check_spec == "delivery:origin_targets":
|
||
ok, detail = check_delivery_targets()
|
||
elif check_spec == "cron_audit:all":
|
||
ok, detail = check_cron_audit()
|
||
elif check_spec == "meta:health_check_yesterday":
|
||
ok, detail = check_meta_health_check_yesterday()
|
||
elif check_spec == "meta:checklist_completeness":
|
||
ok, detail = check_meta_checklist_completeness()
|
||
elif check_spec == "pipeline:xiaoguo_signal_flow":
|
||
# 综合检查:小果有数据→被我处理
|
||
today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0)
|
||
unproc, d2 = check_db_table_count("signal_news", None, None, "unprocessed", 30)
|
||
ok = today_xiaoguo or unproc
|
||
detail = f"today_xiaoguo={d1}, unprocessed={d2}"
|
||
else:
|
||
ok = False
|
||
detail = f"unknown_check:{check_spec}"
|
||
|
||
level = "ok" if ok else item["severity"]
|
||
# 将critical/error/high都映射到error级别
|
||
if not ok:
|
||
if item["severity"] == "critical":
|
||
level = "critical"
|
||
elif item["severity"] in ("high", "error"):
|
||
level = "error"
|
||
else:
|
||
level = "warn"
|
||
|
||
return ok, level, detail
|
||
|
||
def main():
|
||
show_full = "--report" in sys.argv
|
||
update = "--update-checklist" in sys.argv
|
||
|
||
start_time = time.time()
|
||
|
||
# 加载checklist
|
||
if not CHECKLIST_PATH.exists():
|
||
print("[SILENT] health_checklist.json 不存在")
|
||
return
|
||
|
||
checklist = json.loads(CHECKLIST_PATH.read_text())
|
||
|
||
# 自动发现(每小时仅运行一次)
|
||
if update:
|
||
discovered = self_discovery()
|
||
else:
|
||
# 定期自动发现(检查上次扫描时间)
|
||
meta = checklist.get("meta", {})
|
||
last_scan = meta.get("last_full_scan")
|
||
if not last_scan or (ctx["started_at"] - datetime.fromisoformat(last_scan)).total_seconds() > 3600:
|
||
discovered = self_discovery()
|
||
checklist["meta"]["last_full_scan"] = ctx["started_at"].isoformat()
|
||
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
|
||
else:
|
||
discovered = []
|
||
|
||
# 按分类逐项检查
|
||
dayname = ["一","二","三","四","五","六","日"][ctx["started_at"].weekday()]
|
||
lines = [f"MoFin 系统体检 | {ctx['started_at'].strftime('%Y-%m-%d')} 周{dayname} | {ctx['started_at'].strftime('%H:%M')}"]
|
||
lines.append("─" * 50)
|
||
|
||
for cat in checklist["categories"]:
|
||
cat_issues = 0
|
||
cat_lines = []
|
||
for item in cat["items"]:
|
||
ok, level, detail = run_check(item)
|
||
msg = f"{item['description']}: {detail}"
|
||
log(level, cat["name"], msg, item["id"])
|
||
cat_lines.append(emit(msg, level))
|
||
if not ok:
|
||
cat_issues += 1
|
||
|
||
# 只在该分类有问题或--report时才输出
|
||
if cat_issues > 0 or show_full:
|
||
lines.append(f"\n【{cat['name']}】")
|
||
lines.extend(cat_lines)
|
||
|
||
# 自动发现结果
|
||
if discovered:
|
||
lines.append(f"\n📎 自动发现:")
|
||
for d in discovered:
|
||
lines.append(f" {d}")
|
||
|
||
# 汇总
|
||
total = ctx["ok_count"] + ctx["warn_count"] + ctx["error_count"] + ctx["critical_count"]
|
||
if total == 0:
|
||
total = 1 # 避免除以0
|
||
|
||
lines.append(f"\n{'─' * 50}")
|
||
|
||
critical = ctx["critical_count"]
|
||
errors = ctx["error_count"]
|
||
warns = ctx["warn_count"]
|
||
ok_count = ctx["ok_count"]
|
||
|
||
# 构建严重级别输出
|
||
severity_parts = []
|
||
if critical > 0:
|
||
severity_parts.append(f"🔴{critical}严重")
|
||
if errors > 0:
|
||
severity_parts.append(f"❌{errors}错误")
|
||
if warns > 0:
|
||
severity_parts.append(f"⚠️{warns}警告")
|
||
if ok_count > 0:
|
||
severity_parts.append(f"✅{ok_count}正常")
|
||
|
||
lines.append(f"总计: {' | '.join(severity_parts)} ({(time.time()-start_time):.0f}s)")
|
||
|
||
report = "\n".join(lines)
|
||
|
||
# 保存历史到DB
|
||
try:
|
||
conn_hist = sqlite3.connect(str(DB_PATH))
|
||
details = json.dumps([e for e in ctx["report"] if e["level"] in ("critical", "error")])
|
||
conn_hist.execute(
|
||
"INSERT INTO health_check_log (ok_count, warn_count, error_count, critical_count, duration_s, details) "
|
||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||
(ok_count, warns, errors, critical, round(time.time()-start_time, 1), details))
|
||
conn_hist.commit()
|
||
conn_hist.close()
|
||
except:
|
||
pass
|
||
|
||
# 输出
|
||
# no_agent模式:有问题才出声;--report则强制输出
|
||
has_issues = critical > 0 or errors > 0 or warns > 0
|
||
|
||
if has_issues or show_full:
|
||
print(report)
|
||
else:
|
||
print(f"[SILENT] MoFin 体检通过 | {ok_count}/{total} 检查正常 ({(time.time()-start_time):.0f}s)")
|
||
|
||
# 如果有严重问题,额外输出可读摘要
|
||
if critical > 0 or errors > 0:
|
||
print()
|
||
print("🔴 需立即处理的问题:")
|
||
for entry in ctx["report"]:
|
||
if entry["level"] in ("critical", "error"):
|
||
print(f" [{entry['level'].upper()}] {entry['category']}: {entry['msg']}")
|
||
|
||
# 检查是否有执行器升级来的TODO(通知失败挂起的)
|
||
try:
|
||
conn2 = sqlite3.connect(str(DB_PATH))
|
||
needs_llm = conn2.execute(
|
||
"SELECT id, title, priority, created_at, note FROM todos "
|
||
"WHERE status='needs_llm' "
|
||
"ORDER BY CASE priority WHEN 'high' THEN 0 WHEN 'medium' THEN 1 ELSE 2 END, created_at ASC LIMIT 10"
|
||
).fetchall()
|
||
if needs_llm:
|
||
print()
|
||
print("🔶 需知微介入(执行器无法自动修复):")
|
||
for n in needs_llm:
|
||
note = (n[4] or "")[:60]
|
||
print(f" [{n[2]}] #{n[0]} {n[1][:60]} → {note}")
|
||
conn2.close()
|
||
except:
|
||
pass
|
||
|
||
# 将异常写入 TODO 系统
|
||
write_todos_for_issues()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|