From a24505ebeffe0f295c861921a3cb46dc2f125978 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Wed, 24 Jun 2026 20:09:10 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=B3=BB=E7=BB=9F=E5=B8=B8=E8=A7=84?= =?UTF-8?q?=E4=BD=93=E6=A3=80=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 morning_health_check.py:分层分类全面体检脚本 - 新增 health_checklist.json:可动态维护的检查清单(自动发现新组件) - 新增 docs/morning-health-check.md:完整设计文档 - 新增 skill: morning-health-check - 修复:xiaoguo_scanner.py 同步到profile脚本目录 - 修复:system_audit.py 从symlink改为硬拷贝(解决脚本路径越界) - 修复:morning_health_check.py 使用jobs.json而非cron.db(更可靠) 自动检查7层43项: 基础设施/SENSE/RESPOND/ADAPT/IMPROVE/数据文件/管道完整性 每天8:00开盘前自动跑一次 --- docs/morning-health-check.md | 116 +++++++ scripts/morning_health_check.py | 525 ++++++++++++++++++++++++++++++++ 2 files changed, 641 insertions(+) create mode 100644 docs/morning-health-check.md create mode 100755 scripts/morning_health_check.py diff --git a/docs/morning-health-check.md b/docs/morning-health-check.md new file mode 100644 index 0000000..ff41fc7 --- /dev/null +++ b/docs/morning-health-check.md @@ -0,0 +1,116 @@ +--- +name: morning-health-check +title: MoFin 系统常规体检机制 +description: 每日开盘前8:00全面扫荡式系统体检,含分层分类检查清单、自动发现新增组件、问题推送 +trigger: 交易日 8:00 AM 自动运行 +--- + +# MoFin 系统常规体检机制 + +## 设计目的 + +"人的常规体检,不是因为发现问题去针对性检查,而是定期的、全面的、扫荡式的检查。" + +核心机制: +1. **health_checklist.json** — 可动态维护的检查清单(新增功能自动加入) +2. **morning_health_check.py** — 每日8:00开盘前运行,逐项比对 +3. **self_discovery()** — 自动发现新增cron任务并追加到检查清单 +4. **历史追踪** — health_check_history.json 保留90天体检记录 + +## 架构 + +```python +morning_health_check.py (no_agent, 8:00 Cron) + │ + ├── 读 health_checklist.json (检查清单) + │ + ├── 分层检查 (7层) + │ ├── 基础设施 — XMPP/Gateway/Dashboard/API/磁盘 + │ ├── SENSE — price_monitor/xiaoguo/宏观/汇率/板块 + │ ├── RESPOND — 推送cron/价格事件/信号积压 + │ ├── ADAPT — 策略重评/策略树/时效性 + │ ├── IMPROVE — 知识萃取/硬编码/审计/剪枝/元成长 + │ ├── 数据文件 — 全部关键JSON/DB文件新鲜度 + │ └── 管道完整性 — cron异常/误暂停/delivery目标/信号桥 + │ + ├── self_discovery() — 自动发现新组件 + │ └── 对比jobs.json vs checklist中的cron ID + │ └── 发现新cron → 自动追加到pipeline类 + │ + └── 输出 (no_agent规则) + ├── 有异常 → 打印详细报告 (推送给老爸) + └── 正常 → [SILENT] +``` + +## 检查清单 (health_checklist.json) + +位于 `/home/hmo/MoFin/data/health_checklist.json` + +7个分类,当前约43项检查。每个检查项包含: +- id/description: 唯一标识和描述 +- check: 检查指令 (如 `cron:job_id`, `systemctl:service`, `port:8643`, `db:table:field::today:1`) +- expected: 期望值 +- severity: critical/high/medium/low + +## 自维护机制 + +### 自动发现 (self_discovery) +每次运行体检时,脚本自动: +1. 读取 Hermes cron 的 jobs.json +2. 对比 checklist 中已登记的 cron ID +3. 发现新 cron 任务 → 自动追加到 pipeline 分类(标记 auto_discovered=true) + +### 手动维护 +- 新功能加入系统后,应在 checklist 中追加相应检查项 +- 修改现有组件后,审视是否需要调整已有检查项的阈值/预期值 + +## 检查器类型 + +| 类型 | 格式 | 说明 | +|------|------|------| +| systemctl | `systemctl:service_name` | 检查systemd服务 | +| port | `port:8888` | 端口监听检查 | +| proc | `proc:pattern` | pgrep进程匹配 | +| http | `http:url` | HTTP GET可达性 | +| disk | `disk:/` | 磁盘使用率 | +| fileexists | `fileexists:/path` | 文件是否存在 | +| filefresh | `filefresh:/path:24h` | 文件新鲜度 | +| db | `db:table:field::today:1` | 数据库记录数 | +| cron | `cron:job_id` | Cron任务状态 | +| cron_errors | `cron_errors:last24h` | 全局cron异常 | +| cron_paused | `cron_paused:check` | 误暂停检查 | +| delivery | `delivery:origin_targets` | 推送目标检查 | +| pipeline | `pipeline:xiaoguo_signal_flow` | 综合管道检查 | + +## 触发方式 + +- **自动**: cron `0 8 * * 1-5` (交易日8:00, no_agent模式) +- **手动**: `python3 /home/hmo/MoFin/scripts/morning_health_check.py --report` +- **更新清单**: `python3 /home/hmo/MoFin/scripts/morning_health_check.py --update-checklist` + +## 输出格式 + +``` +MoFin 系统体检 | 2026-06-25 周四 | 08:00 +──────────────────────────────────────── +【基础设施层】 +✅ 知微 XMPP Bot: active +❌ 小果 LLM API: timeout + +... (只输出有问题的分类;正常分类仅在 --report 时显示) + +【管道完整性】 +🔴 无异常cron状态(最近24h): 1 errors: 小果独立扫描(error) + +──────────────────────────────────────── +总计: 🔴1严重 | ❌2错误 | ⚠️0警告 | ✅40正常 (15s) + +需立即处理的问题: + [ERROR] SENSE: 小果扫描 cron 已调度: status=error +``` + +## 历史记录 + +保存在 `/home/hmo/MoFin/data/health_check_history.json` +- 每次运行记录时间戳、各等级计数、耗时 +- 保留最近90条(约3个月) diff --git a/scripts/morning_health_check.py b/scripts/morning_health_check.py new file mode 100755 index 0000000..087fcf4 --- /dev/null +++ b/scripts/morning_health_check.py @@ -0,0 +1,525 @@ +#!/usr/bin/env python3 +"""morning_health_check.py — MoFin 系统常规体检 + +每日开盘前(8:00)运行,全面扫描MoFin所有组件健康状况。 +输出格式化的体检报告,有问题才出声,没问题静默。 + +核心设计: + 1. 从 health_checklist.json 读检查清单 + 2. 逐项检查,记录状态 + 3. 报告异常项(只推异常,不推正常) + 4. 自动发现新增cron/脚本(通过 self_discovery 函数) + 5. 维护检查历史 (health_check_history.json) + +新增组件自动发现机制: + - 对比当前cron list与checklist中记录的cron id + - 发现新cron → 自动追加到checklist + - 脚本修改 → 标记"需复核" + +用法: + python3 scripts/morning_health_check.py [--report] [--update-checklist] + --report: 强制输出完整报告(默认只输出异常) + --update-checklist: 运行自动发现并更新checklist + +no_agent模式:只输出异常项,无异常完全静默 +""" + +import json, os, sqlite3, subprocess, sys, time, urllib.request +from pathlib import Path +from datetime import datetime, timedelta + +# ── 路径 ── +BASE = Path("/home/hmo/MoFin") +DATA = BASE / "data" +SCRIPTS_DIR = BASE / "scripts" +PROFILE_SCRIPTS = Path("/home/hmo/.hermes/profiles/position-analyst/scripts") +CHECKLIST_PATH = DATA / "health_checklist.json" +HISTORY_PATH = DATA / "health_check_history.json" +DB_PATH = DATA / "mofin.db" +HERMES_CRON_DIR = Path("/home/hmo/.hermes/profiles/position-analyst/cron") + +# 异常缓存(同一问题24h内不重复推) +KNOWN_ISSUES_PATH = DATA / "health_known_issues.json" + +# ── 上下文 ── +ctx = { + "report": [], + "issues": [], + "ok_count": 0, + "warn_count": 0, + "error_count": 0, + "critical_count": 0, + "started_at": datetime.now(), +} + +def log(level, category, msg, detail=None): + """记录检查结果""" + ctx["report"].append({ + "level": level, "category": category, "msg": msg, "detail": detail, + "timestamp": datetime.now().isoformat() + }) + if level == "critical": + ctx["critical_count"] += 1 + elif level == "error": + ctx["error_count"] += 1 + elif level == "warn": + ctx["warn_count"] += 1 + else: + ctx["ok_count"] += 1 + +def emit(msg, level="ok"): + """输出一行""" + prefix = {"critical": "🔴", "error": "❌", "warn": "⚠️", "ok": "✅", "info": "📎"}.get(level, "•") + return f"{prefix} {msg}" + +# ── 检查器集合 ── + +def check_systemctl(service_name): + """检查systemd服务状态""" + try: + r = subprocess.run(["systemctl", "is-active", service_name], + capture_output=True, text=True, timeout=5) + status = r.stdout.strip() + return status == "active", f"{status}" + except Exception as e: + return False, f"error:{e}" + +def check_port(port): + """检查端口是否在监听""" + try: + r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5) + return f":{port}" in r.stdout, "listening" if f":{port}" in r.stdout else "not_found" + except Exception as e: + return False, f"error:{e}" + +def check_process(pattern): + """检查进程是否存在""" + try: + r = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5) + return r.returncode == 0, "running" if r.returncode == 0 else "not_found" + except: + return False, "check_error" + +def check_http(url, timeout=15): + """检查HTTP端点是否可达 (清理代理环境变量)""" + try: + # 清理所有代理环境变量 + old_env = {} + for k in list(os.environ.keys()): + if 'proxy' in k.lower(): + old_env[k] = os.environ.pop(k) + req = urllib.request.Request(url, method="GET") + resp = urllib.request.urlopen(req, timeout=timeout) + # 恢复 + for k, v in old_env.items(): + os.environ[k] = v + return True, str(resp.status) + except Exception as e: + return False, str(e)[:60] + +def check_disk(mount): + """检查磁盘空间""" + try: + r = subprocess.run(["df", "-h", mount], capture_output=True, text=True, timeout=5) + lines = r.stdout.strip().split("\n") + if len(lines) >= 2: + parts = lines[1].split() + if len(parts) >= 5: + pct = parts[4].replace("%", "") + return int(pct) < 90, f"{pct}% used" + return False, "parse_error" + except: + return False, "check_error" + +def check_file_exists(path): + """检查文件存在""" + p = Path(path) + exists = p.exists() + return exists, f"{p.stat().st_size}B" if exists else "missing" + +def check_file_freshness(path, max_hours): + """检查文件新鲜度""" + p = Path(path) + if not p.exists(): + return False, "missing" + mtime = datetime.fromtimestamp(p.stat().st_mtime) + hours_ago = (ctx["started_at"] - mtime).total_seconds() / 3600 + return hours_ago < max_hours, f"{hours_ago:.0f}h ago (threshold {max_hours}h)" + +def check_db_table_count(table, field, value, op="today", threshold=0): + """检查数据库中的记录数""" + try: + conn = sqlite3.connect(str(DB_PATH)) + cur = conn.cursor() + if op == "today": + today = ctx["started_at"].strftime("%Y-%m-%d") + # 先检查表有哪些列 + cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()] + date_col = None + for candidate in ["created_at", "date", "timestamp", "last_scanned_at", "signal_date"]: + if candidate in cols: + date_col = candidate + break + if not date_col: + conn.close() + return True, f"no_date_col_in_{table}" + sql = f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?" + cur.execute(sql, (today,)) + elif op == "unprocessed": + cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()] + if "processed" in cols: + sql = f"SELECT COUNT(*) FROM {table} WHERE (processed = 0 OR processed IS NULL)" + elif "source" in cols: + sql = f"SELECT COUNT(*) FROM {table} WHERE source LIKE '%xiaoguo%'" + else: + sql = f"SELECT COUNT(*) FROM {table}" + cur.execute(sql) + elif op == "count": + if field: + sql = f"SELECT COUNT(*) FROM {table} WHERE {field} = ?" + cur.execute(sql, (value,)) + else: + sql = f"SELECT COUNT(*) FROM {table}" + cur.execute(sql) + else: + sql = f"SELECT COUNT(*) FROM {table}" + cur.execute(sql) + count = cur.fetchone()[0] + conn.close() + if op == "unprocessed": + return count < threshold, f"{count} unprocessed" + return count >= threshold, f"{count} rows" + except Exception as e: + return True, f"skip({str(e)[:60]})" + +def check_cron(job_id): + """检查cron任务状态(通过jobs.json)""" + try: + cron_jobs_path = HERMES_CRON_DIR / "jobs.json" + if cron_jobs_path.exists(): + data = json.loads(cron_jobs_path.read_text()) + for job in data.get("jobs", []): + if job.get("id") == job_id or (not job.get("id") and job.get("name") and job_id in str(job)): + enabled = job.get("enabled", True) + if not enabled: + return False, "disabled" + last_status = job.get("last_status") + if last_status and last_status != "ok": + return False, f"status={last_status}" + last_run = job.get("last_run_at", "") + if last_run: + try: + last_dt = datetime.fromisoformat(last_run) + hours_ago = (ctx["started_at"] - last_dt).total_seconds() / 3600 + if hours_ago > 48: + return True, f"ok(stale:{hours_ago:.0f}h)" + except: + pass + return True, "ok" + # 没找到该job_id - 可能是不需要检查的cron + return True, "not_in_jobs_json" + return False, "no_jobs_json" + except Exception as e: + return False, f"check_error:{str(e)[:60]}" + +def check_cron_errors_last24h(): + """检查最近24h内cron是否有error状态""" + try: + cron_jobs_path = HERMES_CRON_DIR / "jobs.json" + if not cron_jobs_path.exists(): + return True, "no_jobs_json" + data = json.loads(cron_jobs_path.read_text()) + check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat() + errors = [] + for job in data.get("jobs", []): + last_status = job.get("last_status") + last_run = job.get("last_run_at", "") + if last_status and last_status != "ok" and last_run: + try: + if last_run >= check_time[:19]: + errors.append(f"{job.get('name','?')}({last_status})") + except: + pass + if errors: + return False, f"{len(errors)} errors: {'; '.join(errors[:5])}" + return True, "0 errors" + except Exception as e: + return True, f"skip({str(e)[:60]})" + +def check_cron_paused(): + """检查不应暂停的cron是否被误暂停""" + should_run = [ + ("3a9fb3300a6a", "价格监控"), + ("0851c7838ca3", "小果扫描"), + ("e13323928f3a", "自选提醒"), + ("b809fcabfa5b", "分支评估"), + ] + try: + cron_jobs_path = HERMES_CRON_DIR / "jobs.json" + if not cron_jobs_path.exists(): + return True, "no_jobs_json" + data = json.loads(cron_jobs_path.read_text()) + job_map = {job.get("id"): job for job in data.get("jobs", [])} + paused = [] + for jid, name in should_run: + job = job_map.get(jid) + if job and not job.get("enabled", True): + paused.append(name) + if paused: + return False, f"paused: {', '.join(paused)}" + return True, "all_expected_running" + except Exception as e: + return True, f"skip({str(e)[:60]})" + +def check_delivery_targets(): + """检查deliver=origin的cron是否有目标""" + try: + cron_jobs_path = HERMES_CRON_DIR / "jobs.json" + if not cron_jobs_path.exists(): + return True, "no_jobs_json" + data = json.loads(cron_jobs_path.read_text()) + issues = [] + for job in data.get("jobs", []): + last_delivery_err = job.get("last_delivery_error", "") + if last_delivery_err and "delivery" in last_delivery_err.lower(): + issues.append(f"{job.get('name','?')}") + if issues: + return False, f"{len(issues)} issues: {', '.join(issues[:3])}" + return True, "all_ok" + except Exception as e: + return True, f"skip({str(e)[:60]})" + +# ── 自动发现 ── + +def self_discovery(): + """自动发现新增组件并更新checklist""" + discovered = [] + + # 1. 发现新增cron任务 + try: + cron_jobs_path = HERMES_CRON_DIR / "jobs.json" + if cron_jobs_path.exists(): + data = json.loads(cron_jobs_path.read_text()) + all_crons = [(j.get("id"), j.get("name"), j.get("schedule"), j.get("script")) + for j in data.get("jobs", [])] + + # 读当前checklist中已有的cron ID + checklist = json.loads(CHECKLIST_PATH.read_text()) + known_cron_ids = set() + for cat in checklist["categories"]: + for item in cat["items"]: + if item["check"].startswith("cron:"): + known_cron_ids.add(item["check"].split(":")[1]) + + for jid, name, schedule, script in all_crons: + if jid and jid not in known_cron_ids and script: + # 新cron任务,追加到pipeline类 + discovered.append(f"新cron: {name}({jid})") + for cat in checklist["categories"]: + if cat["id"] == "pipeline": + cat["items"].append({ + "id": f"cron-auto-{jid[:8]}", + "description": f"{name} cron 已调度", + "check": f"cron:{jid}", + "expected": "enabled+ok", + "severity": "medium", + "auto_discovered": True + }) + break + + if discovered: + CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2)) + except Exception as e: + discovered.append(f"cron_discovery_error: {e}") + + return discovered + +# ── 主流程 ── + +def run_check(item): + """运行单个检查项""" + check_spec = item["check"] + expected = item["expected"] + + if check_spec.startswith("systemctl:"): + service = check_spec.split(":", 1)[1] + ok, detail = check_systemctl(service) + elif check_spec.startswith("port:"): + port = check_spec.split(":", 1)[1] + ok, detail = check_port(port) + elif check_spec.startswith("proc:"): + pattern = check_spec.split(":", 1)[1] + ok, detail = check_process(pattern) + elif check_spec.startswith("http:"): + url = check_spec.split(":", 1)[1] + ok, detail = check_http(url) + elif check_spec.startswith("disk:"): + mount = check_spec.split(":", 1)[1] + ok, detail = check_disk(mount) + elif check_spec.startswith("fileexists:"): + path = check_spec.split(":", 1)[1] + ok, detail = check_file_exists(path) + elif check_spec.startswith("filefresh:"): + # filefresh:path:max_hours + parts = check_spec.split(":", 2) + path = parts[1] + max_hours = float(parts[2].replace("h", "")) + ok, detail = check_file_freshness(path, max_hours) + elif check_spec.startswith("db:"): + # db:table:field:value:op:threshold + parts = check_spec.split(":", 5) + table = parts[1] + field = parts[2] if len(parts) > 2 else None + value = parts[3] if len(parts) > 3 else None + op = parts[4] if len(parts) > 4 else "today" + threshold = int(parts[5]) if len(parts) > 5 else 0 + ok, detail = check_db_table_count(table, field, value, op, threshold) + elif check_spec.startswith("cron:"): + job_id = check_spec.split(":", 1)[1] + ok, detail = check_cron(job_id) + elif check_spec == "cron_errors:last24h": + ok, detail = check_cron_errors_last24h() + elif check_spec == "cron_paused:check": + ok, detail = check_cron_paused() + elif check_spec == "delivery:origin_targets": + ok, detail = check_delivery_targets() + elif check_spec == "pipeline:xiaoguo_signal_flow": + # 综合检查:小果有数据→被我处理 + today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0) + unproc, d2 = check_db_table_count("signal_news", None, None, "unprocessed", 30) + ok = today_xiaoguo or unproc + detail = f"today_xiaoguo={d1}, unprocessed={d2}" + else: + ok = False + detail = f"unknown_check:{check_spec}" + + level = "ok" if ok else item["severity"] + # 将critical/error/high都映射到error级别 + if not ok: + if item["severity"] == "critical": + level = "critical" + elif item["severity"] in ("high", "error"): + level = "error" + else: + level = "warn" + + return ok, level, detail + +def main(): + show_full = "--report" in sys.argv + update = "--update-checklist" in sys.argv + + start_time = time.time() + + # 加载checklist + if not CHECKLIST_PATH.exists(): + print("[SILENT] health_checklist.json 不存在") + return + + checklist = json.loads(CHECKLIST_PATH.read_text()) + + # 自动发现(每小时仅运行一次) + if update: + discovered = self_discovery() + else: + # 定期自动发现(检查上次扫描时间) + meta = checklist.get("meta", {}) + last_scan = meta.get("last_full_scan") + if not last_scan or (ctx["started_at"] - datetime.fromisoformat(last_scan)).total_seconds() > 3600: + discovered = self_discovery() + checklist["meta"]["last_full_scan"] = ctx["started_at"].isoformat() + CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2)) + else: + discovered = [] + + # 按分类逐项检查 + dayname = ["一","二","三","四","五","六","日"][ctx["started_at"].weekday()] + lines = [f"MoFin 系统体检 | {ctx['started_at'].strftime('%Y-%m-%d')} 周{dayname} | {ctx['started_at'].strftime('%H:%M')}"] + lines.append("─" * 50) + + for cat in checklist["categories"]: + cat_issues = 0 + cat_lines = [] + for item in cat["items"]: + ok, level, detail = run_check(item) + msg = f"{item['description']}: {detail}" + log(level, cat["name"], msg, item["id"]) + cat_lines.append(emit(msg, level)) + if not ok: + cat_issues += 1 + + # 只在该分类有问题或--report时才输出 + if cat_issues > 0 or show_full: + lines.append(f"\n【{cat['name']}】") + lines.extend(cat_lines) + + # 自动发现结果 + if discovered: + lines.append(f"\n📎 自动发现:") + for d in discovered: + lines.append(f" {d}") + + # 汇总 + total = ctx["ok_count"] + ctx["warn_count"] + ctx["error_count"] + ctx["critical_count"] + if total == 0: + total = 1 # 避免除以0 + + lines.append(f"\n{'─' * 50}") + + critical = ctx["critical_count"] + errors = ctx["error_count"] + warns = ctx["warn_count"] + ok_count = ctx["ok_count"] + + # 构建严重级别输出 + severity_parts = [] + if critical > 0: + severity_parts.append(f"🔴{critical}严重") + if errors > 0: + severity_parts.append(f"❌{errors}错误") + if warns > 0: + severity_parts.append(f"⚠️{warns}警告") + if ok_count > 0: + severity_parts.append(f"✅{ok_count}正常") + + lines.append(f"总计: {' | '.join(severity_parts)} ({(time.time()-start_time):.0f}s)") + + report = "\n".join(lines) + + # 保存历史 + try: + history = [] + if HISTORY_PATH.exists(): + history = json.loads(HISTORY_PATH.read_text()) + history.append({ + "timestamp": ctx["started_at"].isoformat(), + "ok": ok_count, "warn": warns, "error": errors, "critical": critical, + "duration_s": round(time.time() - start_time, 1) + }) + # 保留最近30天 + if len(history) > 90: + history = history[-90:] + HISTORY_PATH.write_text(json.dumps(history, ensure_ascii=False, indent=2)) + except: + pass + + # 输出 + # no_agent模式:有问题才出声;--report则强制输出 + has_issues = critical > 0 or errors > 0 or warns > 0 + + if has_issues or show_full: + print(report) + else: + print(f"[SILENT] MoFin 体检通过 | {ok_count}/{total} 检查正常 ({(time.time()-start_time):.0f}s)") + + # 如果有严重问题,额外输出可读摘要 + if critical > 0 or errors > 0: + print() + print("🔴 需立即处理的问题:") + for entry in ctx["report"]: + if entry["level"] in ("critical", "error"): + print(f" [{entry['level'].upper()}] {entry['category']}: {entry['msg']}") + + +if __name__ == "__main__": + main()