#!/usr/bin/env python3 """intraday_health_check.py — 盘中高频轻量自检 (no_agent) 每15分钟检查最关键的活动组件,只查会直接影响盘中运行的。 发现问题→写TODO(消费管道与每日体检共享)。 """ import json, os, sqlite3, subprocess, urllib.request from pathlib import Path from datetime import datetime, timedelta BASE = Path("/home/hmo/MoFin") DATA = BASE / "data" DB_PATH = DATA / "mofin.db" CRON_JOBS = Path("/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json") GATEWAY_URL = "http://localhost:8643/v1/chat/completions" GATEWAY_KEY = "hermes123" ISSUES = [] OK_COUNT = 0 def log(ok, msg): global OK_COUNT if ok: OK_COUNT += 1 else: ISSUES.append(msg) def check_port(port): try: r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5) return f":{port}" in r.stdout except: return False def check_http(url, timeout=8): try: for k in list(os.environ.keys()): if 'proxy' in k.lower(): os.environ.pop(k) req = urllib.request.Request(url, method="GET") urllib.request.urlopen(req, timeout=timeout) return True except: return False def db_today_count(table, date_col): today = datetime.now().strftime("%Y-%m-%d") try: conn = sqlite3.connect(str(DB_PATH)) r = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?", (today,)).fetchone() conn.close() return r[0] except: return -1 def check_xiaoguo(): """小果管道:进程/scanner有数据/API可达(降级不报错)""" # 进程 — 不一定有常驻进程(no_agent cron模式) # 数据 — 今日有扫描记录 scans_today = db_today_count("xiaoguo_scan_tracker", "last_scanned_at") if scans_today <= 0: # 可能是小果离线了,不报严重,记录即可 return # API — 不通时scanner已降级为unknown,不影响 check_http("http://192.168.1.122:18003/v1/models") def check_price_monitor(): """价格监控:进程在跑 + 最近有数据""" # 进程检查 r = subprocess.run(["pgrep", "-f", "price_monitor"], capture_output=True, timeout=5) process_alive = r.returncode == 0 if not process_alive: log(False, "价格监控进程不在运行") return # 数据新鲜度(最近10分钟是否有事件) try: conn = sqlite3.connect(str(DB_PATH)) recent = conn.execute( "SELECT COUNT(*) FROM price_events WHERE created_at > datetime('now', '-10 minutes')" ).fetchone()[0] conn.close() log(recent > 0, f"价格监控进程在跑,但最近10分钟无新事件") except: log(True, "价格监控进程在跑") def check_bots(): zhiwei = subprocess.run(["systemctl", "is-active", "xmpp-zhiwei.service"], capture_output=True, text=True, timeout=5).stdout.strip() == "active" xiaoguo = subprocess.run(["systemctl", "is-active", "xmpp-xiaoguo.service"], capture_output=True, text=True, timeout=5).stdout.strip() == "active" log(zhiwei, "知微XMPP Bot离线") log(xiaoguo, "小果XMPP Bot离线") def check_gateways(): log(check_port(8643), "知微Gateway :8643 未监听") log(check_port(8645), "小果Gateway :8645 未监听") def check_signal_pipeline(): """信号从xiaoguo_scanner→signal_news→consumer是否通畅""" unproc = 0 try: conn = sqlite3.connect(str(DB_PATH)) r = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone() unproc = r[0] conn.close() except: pass log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30)") def write_todos(): if not ISSUES: return for msg in ISSUES: title = f"[盘中自检] {msg}" try: conn = sqlite3.connect(str(DB_PATH)) exist = conn.execute("SELECT id FROM todos WHERE title=? AND status IN ('pending','in_progress')", (title,)).fetchone() if not exist: conn.execute( "INSERT INTO todos (title, description, priority, source, status, fix_action) " "VALUES (?, ?, 'high', 'intraday_check', 'pending', NULL)", (title, f"盘中自动发现: {msg}")) conn.commit() conn.close() except: pass def main(): now = datetime.now() # 只在交易时段运行 if now.weekday() >= 5 or now.hour < 9 or now.hour >= 15: print("[SILENT] 非交易时段") return check_bots() check_gateways() check_xiaoguo() if 9 <= now.hour < 16: check_price_monitor() check_signal_pipeline() write_todos() if ISSUES: print(f"盘中自检 | {now.strftime('%H:%M')} | {len(ISSUES)}项异常:") for i in ISSUES: print(f" ⚠️ {i}") else: print(f"[SILENT] 盘中自检通过 | {OK_COUNT}项正常") if __name__ == "__main__": main()