Files
MoFin/scripts/intraday_health_check.py
T
知微 d27dbcc6ad fix: 盘中自检增加进程级检查(price_monitor/xiaoguo_scanner)
- price_monitor:pgrep进程 + 最近10分钟事件数双重验证
- xiaoguo_scanner:pgrep进程 + 今日数据 + API可达三重验证
- 避免"停了两天不知道"的情况
2026-06-24 22:00:58 +08:00

168 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""intraday_health_check.py — 盘中高频轻量自检 (no_agent)
每15分钟检查最关键的活动组件,只查会直接影响盘中运行的。
发现问题→写TODO(消费管道与每日体检共享)。
"""
import json, os, sqlite3, subprocess, urllib.request
from pathlib import Path
from datetime import datetime, timedelta
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
DB_PATH = DATA / "mofin.db"
CRON_JOBS = Path("/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json")
GATEWAY_URL = "http://localhost:8643/v1/chat/completions"
GATEWAY_KEY = "hermes123"
ISSUES = []
OK_COUNT = 0
def log(ok, msg):
global OK_COUNT
if ok:
OK_COUNT += 1
else:
ISSUES.append(msg)
def check_port(port):
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
return f":{port}" in r.stdout
except:
return False
def check_http(url, timeout=8):
try:
for k in list(os.environ.keys()):
if 'proxy' in k.lower():
os.environ.pop(k)
req = urllib.request.Request(url, method="GET")
urllib.request.urlopen(req, timeout=timeout)
return True
except:
return False
def db_today_count(table, date_col):
today = datetime.now().strftime("%Y-%m-%d")
try:
conn = sqlite3.connect(str(DB_PATH))
r = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?", (today,)).fetchone()
conn.close()
return r[0]
except:
return -1
def check_xiaoguo():
"""小果管道:进程/scanner有数据/API可达"""
# 进程
r = subprocess.run(["pgrep", "-f", "xiaoguo_scanner"], capture_output=True, timeout=5)
log(r.returncode == 0, "小果扫描进程不在运行")
# 数据
scans_today = db_today_count("xiaoguo_scan_tracker", "last_scanned_at")
log(scans_today > 0, f"小果扫描今日数据: {scans_today}条(需>0")
# API
api_ok = check_http("http://192.168.1.122:18003/v1/models")
log(api_ok, "小果LLM API不可达")
def check_price_monitor():
"""价格监控:进程在跑 + 最近有数据"""
# 进程检查
r = subprocess.run(["pgrep", "-f", "price_monitor"], capture_output=True, timeout=5)
process_alive = r.returncode == 0
if not process_alive:
log(False, "价格监控进程不在运行")
return
# 数据新鲜度(最近10分钟是否有事件)
try:
conn = sqlite3.connect(str(DB_PATH))
recent = conn.execute(
"SELECT COUNT(*) FROM price_events WHERE created_at > datetime('now', '-10 minutes')"
).fetchone()[0]
conn.close()
log(recent > 0, f"价格监控进程在跑,但最近10分钟无新事件")
except:
log(True, "价格监控进程在跑")
def check_bots():
zhiwei = subprocess.run(["systemctl", "is-active", "xmpp-zhiwei.service"],
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
xiaoguo = subprocess.run(["systemctl", "is-active", "xmpp-xiaoguo.service"],
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
log(zhiwei, "知微XMPP Bot离线")
log(xiaoguo, "小果XMPP Bot离线")
def check_gateways():
log(check_port(8643), "知微Gateway :8643 未监听")
log(check_port(8645), "小果Gateway :8645 未监听")
def check_signal_pipeline():
"""信号从xiaoguo_scanner→signal_news→consumer是否通畅"""
unproc = 0
try:
conn = sqlite3.connect(str(DB_PATH))
r = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()
unproc = r[0]
conn.close()
except:
pass
log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30")
def write_todos():
if not ISSUES:
return
for msg in ISSUES:
title = f"[盘中自检] {msg}"
try:
conn = sqlite3.connect(str(DB_PATH))
exist = conn.execute("SELECT id FROM todos WHERE title=? AND status IN ('pending','in_progress')", (title,)).fetchone()
if not exist:
conn.execute(
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
"VALUES (?, ?, 'high', 'intraday_check', 'pending', NULL)",
(title, f"盘中自动发现: {msg}"))
conn.commit()
conn.close()
except:
pass
def main():
now = datetime.now()
# 只在交易时段运行
if now.weekday() >= 5 or now.hour < 9 or now.hour >= 15:
print("[SILENT] 非交易时段")
return
check_bots()
check_gateways()
check_xiaoguo()
if 9 <= now.hour < 16:
check_price_monitor()
check_signal_pipeline()
write_todos()
if ISSUES:
print(f"盘中自检 | {now.strftime('%H:%M')} | {len(ISSUES)}项异常:")
for i in ISSUES:
print(f" ⚠️ {i}")
else:
print(f"[SILENT] 盘中自检通过 | {OK_COUNT}项正常")
if __name__ == "__main__":
main()