#!/usr/bin/env python3 """morning_health_check.py — MoFin 系统常规体检 每日开盘前(8:00)运行,全面扫描MoFin所有组件健康状况。 当前8类48项(清单自动扩展)。 输出格式化的体检报告,有问题才出声,没问题静默。 核心设计: 从 health_checklist.json 读检查清单 逐项检查,记录状态 报告异常项(只推异常,不推正常) 自动发现新增cron/脚本(通过 self_discovery 函数) 维护检查历史 (health_check_log 表) 自动修复可修问题,不可修写TODO 新增组件自动发现机制: - 对比当前cron list与checklist中记录的cron id - 发现新cron → 自动追加到checklist - 脚本修改 → 标记"需复核" 用法: python3 scripts/morning_health_check.py [--report] [--update-checklist] --report: 强制输出完整报告(默认只输出异常) --update-checklist: 运行自动发现并更新checklist no_agent模式:只输出异常项,无异常完全静默 """ import json, os, sqlite3, subprocess, sys, time, urllib.request from pathlib import Path from datetime import datetime, timedelta # ── 路径 ── BASE = Path("/home/hmo/MoFin") DATA = BASE / "data" SCRIPTS_DIR = BASE / "scripts" PROFILE_SCRIPTS = Path("/home/hmo/.hermes/profiles/position-analyst/scripts") CHECKLIST_PATH = DATA / "health_checklist.json" DB_PATH = DATA / "mofin.db" HERMES_CRON_DIR = Path("/home/hmo/.hermes/profiles/position-analyst/cron") def derive_fix_action(detail, msg): """根据issue信息推导可执行的修复命令""" # 小果扫描 error → 验证脚本是否存在 if "xiaoguo_scanner" in msg or "小果扫描" in msg: return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/xiaoguo_scanner.py 2>&1 && echo 'ok'" # system-audit error → 验证拷贝 if "system_audit" in msg or "系统审计" in msg: return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/system_audit.py 2>&1" # cron errors(last_status=error)→ 验证文件存在,等下次cron运行自动恢复 if "cron" in msg.lower() and "error" in msg.lower() and ("小果" in msg or "系统审计" in msg): return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/xiaoguo_scanner.py /home/hmo/.hermes/profiles/position-analyst/scripts/system_audit.py 2>&1" # 港股汇率 → 刷新 if "港股汇率" in msg: return f"cd {BASE} && python3 hk_rate.py 2>&1" # 价格监控无事件 → 检查进程 if "价格监控" in msg and "0 rows" in msg: return "ps aux | grep price_monitor | grep -v grep | head -3" # delivery目标缺失 → 改为local if "deliver" in msg.lower() or "delivery" in msg.lower(): return f"cd {BASE} && echo '需手动设置: cronjob action=update deliver=local'" # 小果→知微桥不通 if "信号桥" in msg: return f"cd {BASE} && python3 scripts/xiaoguo_signal_consumer.py 2>&1" return None def auto_fix_issue(issue): """对明确可自动修复的问题执行修复,返回 (fixed, fix_msg)""" item_id = issue.get("detail", "") msg = issue.get("msg", "") # 港股汇率缓存缺失 → 生成 if "港股汇率缓存" in msg and "missing" in msg: try: # hk_rate.py 写入 ~/.cache/hk_exchange_rate.json,profile环境下解析到 profile/home/.cache/ r = subprocess.run( ["python3", str(BASE / "hk_rate.py")], capture_output=True, text=True, timeout=15 ) if r.returncode == 0: return True, f"已自动刷新港股汇率缓存: {r.stdout.strip()}" else: return False, f"汇率刷新失败: {r.stderr[:100]}" except Exception as e: return False, f"汇率刷新异常: {e}" # 价格监控今天无事件(交易日盘中)→ 检查进程 if "价格监控" in msg and "0 rows" in msg: now = ctx["started_at"] if now.weekday() < 5 and 9 <= now.hour <= 15: # 交易时段,应该有事 ok, detail = check_process("price_monitor") if not ok: return True, "已检测:price_monitor进程不存在(需人工介入)" return True, "已确认:price_monitor进程运行中,但今日无事件(可能无价格触发)" # 非交易时段→正常 return True, "非交易时段无价格事件属正常" # 其他问题→不可自动修复 return False, "需人工处理" def write_todos_for_issues(): """将体检发现的异常写入 TODO 系统(去重、升级),先尝试自动修复""" try: if not ctx["report"]: return # 只有 error/critical/warn 才处理 issues = [e for e in ctx["report"] if e["level"] in ("critical", "error", "warn")] if not issues: return # 先尝试自动修复 fixed_issues = [] remaining = [] for issue in issues: fixed, fix_msg = auto_fix_issue(issue) if fixed: fixed_issues.append((issue, fix_msg)) log("ok", issue["category"], f"已自动修复: {fix_msg}", issue.get("detail","")) else: remaining.append(issue) # 输出修复摘要 if fixed_issues: print() print("🛠️ 自动修复:") for issue, fix_msg in fixed_issues: print(f" ✅ {issue['category']}: {fix_msg}") # 剩余的无法自动修复的→写TODO到数据库 if not remaining: return try: conn = sqlite3.connect(str(DB_PATH)) todo_priority = {"critical": "high", "error": "medium", "warn": "low"} new_count = 0 for issue in remaining: title = f"[体检发现] {issue['msg']}" level = issue["level"] pri = todo_priority.get(level, "medium") # 去重:检查是否已存在(含completed的也要查,避免重复加) r_exist = conn.execute( "SELECT id, status FROM todos WHERE title=?", (title,) ).fetchone() if r_exist: if r_exist[1] == "blocked": # 已阻塞的重新打开 conn.execute( "UPDATE todos SET status='pending', priority=?, note='已重新打开', updated_at=CURRENT_TIMESTAMP WHERE id=?", (pri, r_exist[0]) ) else: # 生成fix_action(必须非空) fix_action = derive_fix_action(issue.get("detail", ""), issue.get("msg", "")) if not fix_action: # 没有fix_action就不创建TODO,直接输出到报告里 print(f" ⚠️ 无法自动修复: [{pri}] {title[:60]}") print(f" 原因: 未知修复方案,需人工分析") continue conn.execute( "INSERT INTO todos (title, description, priority, source, status, fix_action) " "VALUES (?, ?, ?, 'health_check', 'pending', ?)", (title, f"体检发现于 {ctx['started_at'].strftime('%Y-%m-%d %H:%M')}\n分类: {issue['category']}\n详情: {issue.get('detail', '')}\n无法当场修复原因: 需验证/需等待", pri, fix_action) ) new_count += 1 conn.commit() if new_count > 0: print() print(f"📋 已加入TODO({new_count}条):") for r2 in conn.execute( "SELECT title, priority FROM todos WHERE status='pending' AND source='health_check' " "ORDER BY created_at DESC LIMIT ?", (new_count,) ).fetchall(): print(f" [{r2[1]}] {r2[0][:70]}") conn.close() except Exception as e: print(f" TODO写入异常: {e}") except Exception as e: pass # TODO 写入失败不阻碍体检主流程 # ── 上下文 ── ctx = { "report": [], "issues": [], "ok_count": 0, "warn_count": 0, "error_count": 0, "critical_count": 0, "started_at": datetime.now(), } def log(level, category, msg, detail=None): """记录检查结果""" ctx["report"].append({ "level": level, "category": category, "msg": msg, "detail": detail, "timestamp": datetime.now().isoformat() }) if level == "critical": ctx["critical_count"] += 1 elif level == "error": ctx["error_count"] += 1 elif level == "warn": ctx["warn_count"] += 1 else: ctx["ok_count"] += 1 def emit(msg, level="ok"): """输出一行""" prefix = {"critical": "🔴", "error": "❌", "warn": "⚠️", "ok": "✅", "info": "📎"}.get(level, "•") return f"{prefix} {msg}" # ── 检查器集合 ── def check_systemctl(service_name): """检查systemd服务状态""" try: r = subprocess.run(["systemctl", "is-active", service_name], capture_output=True, text=True, timeout=5) status = r.stdout.strip() return status == "active", f"{status}" except Exception as e: return False, f"error:{e}" def check_port(port): """检查端口是否在监听""" try: r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5) return f":{port}" in r.stdout, "listening" if f":{port}" in r.stdout else "not_found" except Exception as e: return False, f"error:{e}" def check_process(pattern): """检查进程是否存在""" try: r = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5) return r.returncode == 0, "running" if r.returncode == 0 else "not_found" except: return False, "check_error" def check_http(url, timeout=15): """检查HTTP端点是否可达 (清理代理环境变量)""" try: # 清理所有代理环境变量 old_env = {} for k in list(os.environ.keys()): if 'proxy' in k.lower(): old_env[k] = os.environ.pop(k) req = urllib.request.Request(url, method="GET") resp = urllib.request.urlopen(req, timeout=timeout) # 恢复 for k, v in old_env.items(): os.environ[k] = v return True, str(resp.status) except Exception as e: return False, str(e)[:60] def check_disk(mount): """检查磁盘空间""" try: r = subprocess.run(["df", "-h", mount], capture_output=True, text=True, timeout=5) lines = r.stdout.strip().split("\n") if len(lines) >= 2: parts = lines[1].split() if len(parts) >= 5: pct = parts[4].replace("%", "") return int(pct) < 90, f"{pct}% used" return False, "parse_error" except: return False, "check_error" def check_file_exists(path): """检查文件存在""" p = Path(path) exists = p.exists() return exists, f"{p.stat().st_size}B" if exists else "missing" def check_file_freshness(path, max_hours): """检查文件新鲜度""" p = Path(path) if not p.exists(): return False, "missing" mtime = datetime.fromtimestamp(p.stat().st_mtime) hours_ago = (ctx["started_at"] - mtime).total_seconds() / 3600 return hours_ago < max_hours, f"{hours_ago:.0f}h ago (threshold {max_hours}h)" def check_db_table_count(table, field, value, op="today", threshold=0): """检查数据库中的记录数""" try: conn = sqlite3.connect(str(DB_PATH)) cur = conn.cursor() if op == "today": today = ctx["started_at"].strftime("%Y-%m-%d") # 先检查表有哪些列 cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()] date_col = None for candidate in ["created_at", "date", "timestamp", "last_scanned_at", "signal_date"]: if candidate in cols: date_col = candidate break if not date_col: conn.close() return True, f"no_date_col_in_{table}" sql = f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?" cur.execute(sql, (today,)) elif op == "unprocessed": cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()] if "processed" in cols: sql = f"SELECT COUNT(*) FROM {table} WHERE (processed = 0 OR processed IS NULL)" elif "source" in cols: sql = f"SELECT COUNT(*) FROM {table} WHERE source LIKE '%xiaoguo%'" else: sql = f"SELECT COUNT(*) FROM {table}" cur.execute(sql) elif op == "count": if field: sql = f"SELECT COUNT(*) FROM {table} WHERE {field} = ?" cur.execute(sql, (value,)) else: sql = f"SELECT COUNT(*) FROM {table}" cur.execute(sql) else: sql = f"SELECT COUNT(*) FROM {table}" cur.execute(sql) count = cur.fetchone()[0] conn.close() if op == "unprocessed": return count < threshold, f"{count} unprocessed" return count >= threshold, f"{count} rows" except Exception as e: return True, f"skip({str(e)[:60]})" def check_cron(job_id): """检查cron任务状态(通过jobs.json)""" try: cron_jobs_path = HERMES_CRON_DIR / "jobs.json" if cron_jobs_path.exists(): data = json.loads(cron_jobs_path.read_text()) for job in data.get("jobs", []): if job.get("id") == job_id or (not job.get("id") and job.get("name") and job_id in str(job)): enabled = job.get("enabled", True) if not enabled: return False, "disabled" last_status = job.get("last_status") if last_status and last_status != "ok": return False, f"status={last_status}" last_run = job.get("last_run_at", "") if last_run: try: last_dt = datetime.fromisoformat(last_run) hours_ago = (ctx["started_at"] - last_dt).total_seconds() / 3600 if hours_ago > 48: return True, f"ok(stale:{hours_ago:.0f}h)" except: pass return True, "ok" # 没找到该job_id - 可能是不需要检查的cron return True, "not_in_jobs_json" return False, "no_jobs_json" except Exception as e: return False, f"check_error:{str(e)[:60]}" def check_cron_errors_last24h(): """检查最近24h内cron是否有error状态""" try: cron_jobs_path = HERMES_CRON_DIR / "jobs.json" if not cron_jobs_path.exists(): return True, "no_jobs_json" data = json.loads(cron_jobs_path.read_text()) check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat() errors = [] for job in data.get("jobs", []): last_status = job.get("last_status") last_run = job.get("last_run_at", "") if last_status and last_status != "ok" and last_run: try: if last_run >= check_time[:19]: errors.append(f"{job.get('name','?')}({last_status})") except: pass if errors: return False, f"{len(errors)} errors: {'; '.join(errors[:5])}" return True, "0 errors" except Exception as e: return True, f"skip({str(e)[:60]})" def check_cron_paused(): """检查不应暂停的cron是否被误暂停""" should_run = [ ("3a9fb3300a6a", "价格监控"), ("0851c7838ca3", "小果扫描"), ("e13323928f3a", "自选提醒"), ("b809fcabfa5b", "分支评估"), ] try: cron_jobs_path = HERMES_CRON_DIR / "jobs.json" if not cron_jobs_path.exists(): return True, "no_jobs_json" data = json.loads(cron_jobs_path.read_text()) job_map = {job.get("id"): job for job in data.get("jobs", [])} paused = [] for jid, name in should_run: job = job_map.get(jid) if job and not job.get("enabled", True): paused.append(name) if paused: return False, f"paused: {', '.join(paused)}" return True, "all_expected_running" except Exception as e: return True, f"skip({str(e)[:60]})" def check_delivery_targets(): """检查deliver=origin的cron是否有目标""" try: cron_jobs_path = HERMES_CRON_DIR / "jobs.json" if not cron_jobs_path.exists(): return True, "no_jobs_json" data = json.loads(cron_jobs_path.read_text()) issues = [] for job in data.get("jobs", []): last_delivery_err = job.get("last_delivery_error", "") if last_delivery_err and "delivery" in last_delivery_err.lower(): issues.append(f"{job.get('name','?')}") if issues: return False, f"{len(issues)} issues: {', '.join(issues[:3])}" return True, "all_ok" except Exception as e: return True, f"skip({str(e)[:60]})" def check_cron_audit(): """审计全部cron:最近24h内是否运行过""" try: cron_jobs_path = HERMES_CRON_DIR / "jobs.json" if not cron_jobs_path.exists(): return True, "no_jobs_json" data = json.loads(cron_jobs_path.read_text()) check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()[:19] stale = [] for job in data.get("jobs", []): name = job.get("name", "?") enabled = job.get("enabled", True) script = job.get("script", "") last_run = job.get("last_run_at", "") last_status = job.get("last_status") if not enabled or not script: continue if not last_run: stale.append(f"{name}(从未运行)") continue if last_run[:19] < check_time: if last_status and last_status == "ok": stale.append(f"{name}(>24h未运行)") else: stale.append(f"{name}(>24h+状态异常)") if stale: return False, f"{len(stale)}个cron异常: {'; '.join(stale[:5])}" total = sum(1 for j in data.get("jobs",[]) if j.get("enabled") and j.get("script")) return True, f"全部{total}个cron正常" except Exception as e: return True, f"skip({str(e)[:60]})" def check_meta_health_check_yesterday(): """元检:昨天体检是否正常完成""" try: conn = sqlite3.connect(str(DB_PATH)) yesterday = (ctx["started_at"] - timedelta(days=1)).strftime("%Y-%m-%d") row = conn.execute( "SELECT ok_count, error_count, critical_count FROM health_check_log " "WHERE date(created_at) = ? ORDER BY created_at DESC LIMIT 1", (yesterday,) ).fetchone() conn.close() if row: if row[1] == 0 and row[2] == 0: return True, f"昨日体检通过({row[0]}项正常)" return True, f"昨日体检有{row[1]}错误+{row[2]}严重(已记录)" return True, "无昨日记录(首次运行)" except: return True, "skip" def check_meta_checklist_completeness(): """元检:检查清单是否覆盖了所有已知组件""" try: added = ctx.get("auto_discovered_items", []) if added: return True, f"自动发现并追加了{len(added)}个新组件到清单" return True, "清单覆盖完整" except: return True, "skip" # ── 自动发现 ── def self_discovery(): """自动发现新增组件并更新checklist""" discovered = [] # 1. 发现新增cron任务 try: cron_jobs_path = HERMES_CRON_DIR / "jobs.json" if cron_jobs_path.exists(): data = json.loads(cron_jobs_path.read_text()) all_crons = [(j.get("id"), j.get("name"), j.get("schedule"), j.get("script")) for j in data.get("jobs", [])] # 读当前checklist中已有的cron ID checklist = json.loads(CHECKLIST_PATH.read_text()) known_cron_ids = set() for cat in checklist["categories"]: for item in cat["items"]: if item["check"].startswith("cron:"): known_cron_ids.add(item["check"].split(":")[1]) for jid, name, schedule, script in all_crons: if jid and jid not in known_cron_ids and script: # 新cron任务,追加到pipeline类 discovered.append(f"新cron: {name}({jid})") for cat in checklist["categories"]: if cat["id"] == "pipeline": cat["items"].append({ "id": f"cron-auto-{jid[:8]}", "description": f"{name} cron 已调度", "check": f"cron:{jid}", "expected": "enabled+ok", "severity": "medium", "auto_discovered": True }) break if discovered: CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2)) except Exception as e: discovered.append(f"cron_discovery_error: {e}") return discovered # ── 主流程 ── def run_check(item): """运行单个检查项""" check_spec = item["check"] expected = item["expected"] if check_spec.startswith("systemctl:"): service = check_spec.split(":", 1)[1] ok, detail = check_systemctl(service) elif check_spec.startswith("port:"): port = check_spec.split(":", 1)[1] ok, detail = check_port(port) elif check_spec.startswith("proc:"): pattern = check_spec.split(":", 1)[1] ok, detail = check_process(pattern) elif check_spec.startswith("http:"): url = check_spec.split(":", 1)[1] ok, detail = check_http(url) elif check_spec.startswith("disk:"): mount = check_spec.split(":", 1)[1] ok, detail = check_disk(mount) elif check_spec.startswith("fileexists:"): path = check_spec.split(":", 1)[1] ok, detail = check_file_exists(path) elif check_spec.startswith("filefresh:"): # filefresh:path:max_hours parts = check_spec.split(":", 2) path = parts[1] max_hours = float(parts[2].replace("h", "")) ok, detail = check_file_freshness(path, max_hours) elif check_spec.startswith("db:"): # db:table:field:value:op:threshold parts = check_spec.split(":", 5) table = parts[1] field = parts[2] if len(parts) > 2 else None value = parts[3] if len(parts) > 3 else None op = parts[4] if len(parts) > 4 else "today" threshold = int(parts[5]) if len(parts) > 5 else 0 ok, detail = check_db_table_count(table, field, value, op, threshold) elif check_spec.startswith("cron:"): job_id = check_spec.split(":", 1)[1] ok, detail = check_cron(job_id) elif check_spec == "cron_errors:last24h": ok, detail = check_cron_errors_last24h() elif check_spec == "cron_paused:check": ok, detail = check_cron_paused() elif check_spec == "delivery:origin_targets": ok, detail = check_delivery_targets() elif check_spec == "cron_audit:all": ok, detail = check_cron_audit() elif check_spec == "meta:health_check_yesterday": ok, detail = check_meta_health_check_yesterday() elif check_spec == "meta:checklist_completeness": ok, detail = check_meta_checklist_completeness() elif check_spec == "pipeline:xiaoguo_signal_flow": today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0) unproc, d2 = check_db_table_count("signal_news", None, None, "unprocessed", 30) ok = today_xiaoguo or unproc detail = f"today_xiaoguo={d1}, unprocessed={d2}" elif check_spec == "pipeline:registry_audit": ok = True gaps = [] unregistered = [] try: import json as j2 reg = j2.loads(open(str(DATA / "pipeline_registry.json")).read()) for p in reg.get("pipelines", []): if not p.get("verified"): gaps.append(p["name"]) # 自动发现:从cron jobs.json读所有脚本,交叉比对注册表 known_sources = set() for p in reg.get("pipelines", []): src = p.get("source", "") # 从source描述中提取脚本名 import re as rr for m in rr.findall(r'[\w_-]+\.py', src): known_sources.add(m.replace('.py', '')) # 从jobs.json动态读取所有启用脚本 unregistered = [] try: jobs_data = j2.loads(open(str(HERMES_CRON_DIR / "jobs.json")).read()) active_scripts = set() for job in jobs_data.get("jobs", []): script = job.get("script", "") or "" if script and job.get("enabled", True): name = script.replace('.py', '') active_scripts.add(name) for s in sorted(active_scripts): if s not in known_sources: unregistered.append(s) except Exception: pass if unregistered: ok = False detail = f"{len(gaps)}条待验证 + {len(unregistered)}个新组件未注册" # 自动修复:读脚本docstring,推算数据流 try: for s in unregistered: script_path = HERMES_CRON_DIR.parent / "scripts" / f"{s}.py" desc = "未知" source_info = f"{s}.py" consumer_info = "未知" if script_path.exists(): content = script_path.read_text() # 提取docstring import re as rr2 doc_match = rr2.search(r'"""(.*?)"""', content, rr2.DOTALL) if doc_match: doc_text = doc_match.group(1).strip() desc = doc_text.split('\\n')[0][:80] # 尝试从docstring中提取管道信息 pipe_match = rr2.search(r'管道[::].*?(?=\\n|$)', doc_text) if pipe_match: consumer_info = pipe_match.group(0).replace('管道','').strip(':: ') # 检测写入模式 if 'signal_news' in content: consumer_info = 'signal_news表' if 'macro_risk_state' in content: consumer_info = 'macro_risk_state.json' if 'watchlist' in content.lower(): consumer_info = 'watchlist.json / decisions.json' if 'INSERT INTO' in content: for tbl in ['todos', 'price_events', 'macro_context_log', 'accuracy_stats']: if tbl in content: consumer_info = f'{tbl}表' if '.write_text' in content or 'json.dump' in content: for path in ['macro_risk_state', 'macro_context', 'market', 'portfolio', 'decisions']: if path in content: consumer_info = f'{path}.json' reg["pipelines"].append({ "id": f"auto-{s}", "name": desc[:60], "source": source_info, "consumer": consumer_info, "end_user": "待确认", "verified": False, "gap": f"自动发现({desc[:60]})", "fix": "手动编辑pipeline_registry.json完善此项" }) open(str(DATA / "pipeline_registry.json"), 'w').write( j2.dumps(reg, ensure_ascii=False, indent=2)) detail += f" → 已自动注册{len(unregistered)}个(含推断)" except Exception as e: detail += f" (自动注册异常:{str(e)[:30]})" elif gaps: ok = False detail = f"{len(gaps)}条管道未验证: {', '.join(gaps[:5])}" else: detail = f"全部{len(reg['pipelines'])}条管道正常" except Exception as e: ok = True detail = f"注册表不可读({str(e)[:60]})" else: ok = False detail = f"unknown_check:{check_spec}" level = "ok" if ok else item["severity"] # 将critical/error/high都映射到error级别 if not ok: if item["severity"] == "critical": level = "critical" elif item["severity"] in ("high", "error"): level = "error" else: level = "warn" return ok, level, detail def main(): show_full = "--report" in sys.argv update = "--update-checklist" in sys.argv start_time = time.time() # 加载checklist if not CHECKLIST_PATH.exists(): print("[SILENT] health_checklist.json 不存在") return checklist = json.loads(CHECKLIST_PATH.read_text()) # 自动发现(每小时仅运行一次) if update: discovered = self_discovery() else: # 定期自动发现(检查上次扫描时间) meta = checklist.get("meta", {}) last_scan = meta.get("last_full_scan") if not last_scan or (ctx["started_at"] - datetime.fromisoformat(last_scan)).total_seconds() > 3600: discovered = self_discovery() checklist["meta"]["last_full_scan"] = ctx["started_at"].isoformat() CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2)) else: discovered = [] # 按分类逐项检查 dayname = ["一","二","三","四","五","六","日"][ctx["started_at"].weekday()] lines = [f"MoFin 系统体检 | {ctx['started_at'].strftime('%Y-%m-%d')} 周{dayname} | {ctx['started_at'].strftime('%H:%M')}"] lines.append("─" * 50) for cat in checklist["categories"]: cat_issues = 0 cat_lines = [] for item in cat["items"]: ok, level, detail = run_check(item) msg = f"{item['description']}: {detail}" log(level, cat["name"], msg, item["id"]) cat_lines.append(emit(msg, level)) if not ok: cat_issues += 1 # 只在该分类有问题或--report时才输出 if cat_issues > 0 or show_full: lines.append(f"\n【{cat['name']}】") lines.extend(cat_lines) # 自动发现结果 if discovered: lines.append(f"\n📎 自动发现:") for d in discovered: lines.append(f" {d}") # 汇总 total = ctx["ok_count"] + ctx["warn_count"] + ctx["error_count"] + ctx["critical_count"] if total == 0: total = 1 # 避免除以0 lines.append(f"\n{'─' * 50}") critical = ctx["critical_count"] errors = ctx["error_count"] warns = ctx["warn_count"] ok_count = ctx["ok_count"] # 构建严重级别输出 severity_parts = [] if critical > 0: severity_parts.append(f"🔴{critical}严重") if errors > 0: severity_parts.append(f"❌{errors}错误") if warns > 0: severity_parts.append(f"⚠️{warns}警告") if ok_count > 0: severity_parts.append(f"✅{ok_count}正常") lines.append(f"总计: {' | '.join(severity_parts)} ({(time.time()-start_time):.0f}s)") report = "\n".join(lines) # 保存历史到DB try: conn_hist = sqlite3.connect(str(DB_PATH)) details = json.dumps([e for e in ctx["report"] if e["level"] in ("critical", "error")]) conn_hist.execute( "INSERT INTO health_check_log (ok_count, warn_count, error_count, critical_count, duration_s, details) " "VALUES (?, ?, ?, ?, ?, ?)", (ok_count, warns, errors, critical, round(time.time()-start_time, 1), details)) conn_hist.commit() conn_hist.close() except: pass # 输出 # no_agent模式:有问题才出声;--report则强制输出 has_issues = critical > 0 or errors > 0 or warns > 0 if has_issues or show_full: print(report) else: print(f"[SILENT] MoFin 体检通过 | {ok_count}/{total} 检查正常 ({(time.time()-start_time):.0f}s)") # 如果有严重问题,额外输出可读摘要 if critical > 0 or errors > 0: print() print("🔴 需立即处理的问题:") for entry in ctx["report"]: if entry["level"] in ("critical", "error"): print(f" [{entry['level'].upper()}] {entry['category']}: {entry['msg']}") # 检查是否有执行器升级来的TODO(通知失败挂起的) try: conn2 = sqlite3.connect(str(DB_PATH)) needs_llm = conn2.execute( "SELECT id, title, priority, created_at, note FROM todos " "WHERE status='needs_llm' " "ORDER BY CASE priority WHEN 'high' THEN 0 WHEN 'medium' THEN 1 ELSE 2 END, created_at ASC LIMIT 10" ).fetchall() if needs_llm: print() print("🔶 需知微介入(执行器无法自动修复):") for n in needs_llm: note = (n[4] or "")[:60] print(f" [{n[2]}] #{n[0]} {n[1][:60]} → {note}") conn2.close() except: pass # 将异常写入 TODO 系统 write_todos_for_issues() if __name__ == "__main__": # --verify-new-cron 模式:创建cron后立即验证完整性 if "--verify-new-cron" in sys.argv: idx = sys.argv.index("--verify-new-cron") if idx + 1 < len(sys.argv): script_name = sys.argv[idx + 1] import json as j2 from pathlib import Path as P2 DATA = P2("/home/hmo/MoFin/data") # 检查cron-catalog.md catalog = DATA.parent / "docs" / "cron-catalog.md" if catalog.exists(): content = catalog.read_text() if script_name in content: print(f" ✅ cron-catalog.md: 已登记") else: print(f" ⚠️ cron-catalog.md: 未登记(可从docstring自动生成)") # 检查pipeline_registry.json reg_path = DATA / "pipeline_registry.json" if reg_path.exists(): reg = j2.loads(reg_path.read_text()) registered = any(script_name in p.get("source","") for p in reg["pipelines"]) if registered: print(f" ✅ pipeline_registry.json: 已注册") else: print(f" ⚠️ pipeline_registry.json: 未注册(自动添加占位)") # 读docstring自动注册 script_path = P2("/home/hmo/.hermes/profiles/position-analyst/scripts") / f"{script_name}.py" desc = script_name if script_path.exists(): import re m = re.search(r'"""(.*?)"""', script_path.read_text(), re.DOTALL) if m: desc = m.group(1).strip().split('\n')[0][:80] reg["pipelines"].append({ "id": f"auto-{script_name}", "name": desc[:60], "source": f"{script_name}.py", "consumer": "待确认", "end_user": "待确认", "verified": False, "gap": f"新建后自动注册({desc[:60]})", "fix": "手动完善pipeline_registry.json" }) reg_path.write_text(j2.dumps(reg, ensure_ascii=False, indent=2)) print(f" 已自动添加占位记录(desc={desc[:50]})") sys.exit(0) main()