From 077f683878353c7f2ccbfd4a0f069bce347f9b11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Wed, 24 Jun 2026 21:58:57 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=B8=89=E5=B1=82=E8=87=AA=E6=A3=80+?= =?UTF-8?q?=E5=85=83=E8=87=AA=E6=A3=80+cron=E5=85=A8=E5=B1=80=E5=AE=A1?= =?UTF-8?q?=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 盘中高频(每15分): XMPP/Gateway/Scanner/价格/信号管道 - 每日早检(8:00): 原有7层 + 新增cron全局审计 + 元自检 - cron审计: 检查所有启用的定时任务是否在24h内运行过 - 元自检: 昨日体检是否完成/checklist覆盖是否完整 - 自成长: auto_discovery自动追加新增cron到清单 --- scripts/intraday_health_check.py | 146 +++++++++++++++++++++++++++++++ scripts/morning_health_check.py | 70 ++++++++++++++- 2 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 scripts/intraday_health_check.py diff --git a/scripts/intraday_health_check.py b/scripts/intraday_health_check.py new file mode 100644 index 0000000..b51fdd4 --- /dev/null +++ b/scripts/intraday_health_check.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +"""intraday_health_check.py — 盘中高频轻量自检 (no_agent) + +每15分钟检查最关键的活动组件,只查会直接影响盘中运行的。 +发现问题→写TODO(消费管道与每日体检共享)。 +""" + +import json, os, sqlite3, subprocess, urllib.request +from pathlib import Path +from datetime import datetime, timedelta + +BASE = Path("/home/hmo/MoFin") +DATA = BASE / "data" +DB_PATH = DATA / "mofin.db" +CRON_JOBS = Path("/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json") +GATEWAY_URL = "http://localhost:8643/v1/chat/completions" +GATEWAY_KEY = "hermes123" + +ISSUES = [] +OK_COUNT = 0 + + +def log(ok, msg): + global OK_COUNT + if ok: + OK_COUNT += 1 + else: + ISSUES.append(msg) + + +def check_port(port): + try: + r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5) + return f":{port}" in r.stdout + except: + return False + + +def check_http(url, timeout=8): + try: + for k in list(os.environ.keys()): + if 'proxy' in k.lower(): + os.environ.pop(k) + req = urllib.request.Request(url, method="GET") + urllib.request.urlopen(req, timeout=timeout) + return True + except: + return False + + +def db_today_count(table, date_col): + today = datetime.now().strftime("%Y-%m-%d") + try: + conn = sqlite3.connect(str(DB_PATH)) + r = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?", (today,)).fetchone() + conn.close() + return r[0] + except: + return -1 + + +def check_xiaoguo(): + """小果管道:scanner有数据 + API可达""" + scans_today = db_today_count("xiaoguo_scan_tracker", "last_scanned_at") + log(scans_today > 0, f"小果扫描今日数据: {scans_today}条(需>0)") + api_ok = check_http("http://192.168.1.122:18003/v1/models") + log(api_ok, "小果LLM API不可达") + + +def check_price_monitor(): + done = db_today_count("price_events", "date") + log(done > 0, f"价格监控今日事件: {done}条(需>0)") + + +def check_bots(): + zhiwei = subprocess.run(["systemctl", "is-active", "xmpp-zhiwei.service"], + capture_output=True, text=True, timeout=5).stdout.strip() == "active" + xiaoguo = subprocess.run(["systemctl", "is-active", "xmpp-xiaoguo.service"], + capture_output=True, text=True, timeout=5).stdout.strip() == "active" + log(zhiwei, "知微XMPP Bot离线") + log(xiaoguo, "小果XMPP Bot离线") + + +def check_gateways(): + log(check_port(8643), "知微Gateway :8643 未监听") + log(check_port(8645), "小果Gateway :8645 未监听") + + +def check_signal_pipeline(): + """信号从xiaoguo_scanner→signal_news→consumer是否通畅""" + unproc = 0 + try: + conn = sqlite3.connect(str(DB_PATH)) + r = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone() + unproc = r[0] + conn.close() + except: + pass + log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30)") + + +def write_todos(): + if not ISSUES: + return + for msg in ISSUES: + title = f"[盘中自检] {msg}" + try: + conn = sqlite3.connect(str(DB_PATH)) + exist = conn.execute("SELECT id FROM todos WHERE title=? AND status IN ('pending','in_progress')", (title,)).fetchone() + if not exist: + conn.execute( + "INSERT INTO todos (title, description, priority, source, status, fix_action) " + "VALUES (?, ?, 'high', 'intraday_check', 'pending', NULL)", + (title, f"盘中自动发现: {msg}")) + conn.commit() + conn.close() + except: + pass + + +def main(): + now = datetime.now() + # 只在交易时段运行 + if now.weekday() >= 5 or now.hour < 9 or now.hour >= 15: + print("[SILENT] 非交易时段") + return + + check_bots() + check_gateways() + check_xiaoguo() + if 9 <= now.hour < 16: + check_price_monitor() + check_signal_pipeline() + + write_todos() + + if ISSUES: + print(f"盘中自检 | {now.strftime('%H:%M')} | {len(ISSUES)}项异常:") + for i in ISSUES: + print(f" ⚠️ {i}") + else: + print(f"[SILENT] 盘中自检通过 | {OK_COUNT}项正常") + + +if __name__ == "__main__": + main() diff --git a/scripts/morning_health_check.py b/scripts/morning_health_check.py index 9c72767..58440a0 100755 --- a/scripts/morning_health_check.py +++ b/scripts/morning_health_check.py @@ -442,8 +442,70 @@ def check_delivery_targets(): except Exception as e: return True, f"skip({str(e)[:60]})" -# ── 自动发现 ── +def check_cron_audit(): + """审计全部cron:最近24h内是否运行过""" + try: + cron_jobs_path = HERMES_CRON_DIR / "jobs.json" + if not cron_jobs_path.exists(): + return True, "no_jobs_json" + data = json.loads(cron_jobs_path.read_text()) + check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()[:19] + stale = [] + for job in data.get("jobs", []): + name = job.get("name", "?") + enabled = job.get("enabled", True) + script = job.get("script", "") + last_run = job.get("last_run_at", "") + last_status = job.get("last_status") + if not enabled or not script: + continue + if not last_run: + stale.append(f"{name}(从未运行)") + continue + if last_run[:19] < check_time: + if last_status and last_status == "ok": + stale.append(f"{name}(>24h未运行)") + else: + stale.append(f"{name}(>24h+状态异常)") + if stale: + return False, f"{len(stale)}个cron异常: {'; '.join(stale[:5])}" + total = sum(1 for j in data.get("jobs",[]) if j.get("enabled") and j.get("script")) + return True, f"全部{total}个cron正常" + except Exception as e: + return True, f"skip({str(e)[:60]})" + + +def check_meta_health_check_yesterday(): + """元检:昨天体检是否正常完成""" + try: + history = [] + if HISTORY_PATH.exists(): + history = json.loads(HISTORY_PATH.read_text()) + yesterday = (ctx["started_at"] - timedelta(days=1)).strftime("%Y-%m-%d") + for h in history[-30:]: + ts = h.get("timestamp", "") + if ts[:10] == yesterday: + if h.get("error", 0) == 0 and h.get("critical", 0) == 0: + return True, f"昨日体检通过({h.get('ok',0)}项正常)" + return True, f"昨日体检有{h.get('error',0)}错误+{h.get('critical',0)}严重(已记录)" + return True, "无昨日记录(首次运行)" + except: + return True, "skip" + + +def check_meta_checklist_completeness(): + """元检:检查清单是否覆盖了所有已知组件""" + try: + added = ctx.get("auto_discovered_items", []) + if added: + return True, f"自动发现并追加了{len(added)}个新组件到清单" + return True, "清单覆盖完整" + except: + return True, "skip" + + +# ── 自动发现 ── def self_discovery(): """自动发现新增组件并更新checklist""" discovered = [] @@ -536,6 +598,12 @@ def run_check(item): ok, detail = check_cron_paused() elif check_spec == "delivery:origin_targets": ok, detail = check_delivery_targets() + elif check_spec == "cron_audit:all": + ok, detail = check_cron_audit() + elif check_spec == "meta:health_check_yesterday": + ok, detail = check_meta_health_check_yesterday() + elif check_spec == "meta:checklist_completeness": + ok, detail = check_meta_checklist_completeness() elif check_spec == "pipeline:xiaoguo_signal_flow": # 综合检查:小果有数据→被我处理 today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0)