Files
MoFin/scripts/intraday_health_check.py
T
知微 120f35671d feat: 数据管道注册表+宏观风险集成+管道完整性审计
1. 新增 data/pipeline_registry.json — 数据管道注册表
   - 记录每条数据流的source→consumer→end_user
   - verified字段标记是否验证通过
   - 新增组件时必须在此注册,否则体检会报未验证

2. 宏观预警集成(修复管道缺口)
   - intraday_health_check: 读macro_risk_state.json,HIGH风险时报警
   - morning_health_check: 新增管道注册表审计项
     读pipeline_registry.json,未验证的管道标为异常

3. 管道完整性自发现机制
   注册表中verified=false的管道会在每日体检中标记
   逼迫新增组件时注册管道,否则睡不好觉
2026-06-27 02:04:22 +08:00

183 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""intraday_health_check.py — 盘中高频轻量自检 (no_agent)
每15分钟检查最关键的活动组件,只查会直接影响盘中运行的。
发现问题→写TODO(消费管道与每日体检共享)。
"""
import json, os, sqlite3, subprocess, urllib.request
from pathlib import Path
from datetime import datetime, timedelta
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
DB_PATH = DATA / "mofin.db"
CRON_JOBS = Path("/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json")
GATEWAY_URL = "http://localhost:8643/v1/chat/completions"
GATEWAY_KEY = "hermes123"
ISSUES = []
OK_COUNT = 0
def log(ok, msg):
global OK_COUNT
if ok:
OK_COUNT += 1
else:
ISSUES.append(msg)
def check_port(port):
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
return f":{port}" in r.stdout
except:
return False
def check_http(url, timeout=8):
try:
for k in list(os.environ.keys()):
if 'proxy' in k.lower():
os.environ.pop(k)
req = urllib.request.Request(url, method="GET")
urllib.request.urlopen(req, timeout=timeout)
return True
except:
return False
def db_today_count(table, date_col):
today = datetime.now().strftime("%Y-%m-%d")
try:
conn = sqlite3.connect(str(DB_PATH))
r = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?", (today,)).fetchone()
conn.close()
return r[0]
except:
return -1
def check_xiaoguo():
"""小果管道:进程/scanner有数据/API可达(降级不报错)"""
# 进程 — 不一定有常驻进程(no_agent cron模式)
# 数据 — 今日有扫描记录
scans_today = db_today_count("xiaoguo_scan_tracker", "last_scanned_at")
if scans_today <= 0:
# 可能是小果离线了,不报严重,记录即可
return
# API — 不通时scanner已降级为unknown,不影响
check_http("http://192.168.1.122:18003/v1/models")
def check_price_monitor():
"""价格监控:进程在跑 + 最近有数据"""
# 进程检查
r = subprocess.run(["pgrep", "-f", "price_monitor"], capture_output=True, timeout=5)
process_alive = r.returncode == 0
if not process_alive:
log(False, "价格监控进程不在运行")
return
# 数据新鲜度(最近10分钟是否有事件)
try:
conn = sqlite3.connect(str(DB_PATH))
recent = conn.execute(
"SELECT COUNT(*) FROM price_events WHERE created_at > datetime('now', '-10 minutes')"
).fetchone()[0]
conn.close()
log(recent > 0, f"价格监控进程在跑,但最近10分钟无新事件")
except:
log(True, "价格监控进程在跑")
def check_bots():
zhiwei = subprocess.run(["systemctl", "is-active", "xmpp-zhiwei.service"],
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
xiaoguo = subprocess.run(["systemctl", "is-active", "xmpp-xiaoguo.service"],
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
log(zhiwei, "知微XMPP Bot离线")
log(xiaoguo, "小果XMPP Bot离线")
def check_gateways():
log(check_port(8643), "知微Gateway :8643 未监听")
log(check_port(8645), "小果Gateway :8645 未监听")
def check_signal_pipeline():
"""信号从xiaoguo_scanner→signal_news→consumer是否通畅"""
unproc = 0
try:
conn = sqlite3.connect(str(DB_PATH))
r = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()
unproc = r[0]
conn.close()
except:
pass
log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30")
# 宏观风险状态检查
try:
risk_path = DATA / "macro_risk_state.json"
if risk_path.exists():
risk = json.loads(risk_path.read_text())
level = risk.get("level", "none")
reason = risk.get("reason", "")
if level == "high":
log(False, f"🔴 宏观风险HIGH: {reason[:80]}")
elif level == "medium":
log(True, f"⚠️ 宏观风险MEDIUM: {reason[:80]}")
else:
log(True, "无宏观风险状态文件(可能未生成)")
except:
pass
def write_todos():
if not ISSUES:
return
for msg in ISSUES:
title = f"[盘中自检] {msg}"
try:
conn = sqlite3.connect(str(DB_PATH))
exist = conn.execute("SELECT id FROM todos WHERE title=? AND status IN ('pending','in_progress')", (title,)).fetchone()
if not exist:
conn.execute(
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
"VALUES (?, ?, 'high', 'intraday_check', 'pending', NULL)",
(title, f"盘中自动发现: {msg}"))
conn.commit()
conn.close()
except:
pass
def main():
now = datetime.now()
# 只在交易时段运行
if now.weekday() >= 5 or now.hour < 9 or now.hour >= 15:
print("[SILENT] 非交易时段")
return
check_bots()
check_gateways()
check_xiaoguo()
if 9 <= now.hour < 16:
check_price_monitor()
check_signal_pipeline()
write_todos()
if ISSUES:
print(f"盘中自检 | {now.strftime('%H:%M')} | {len(ISSUES)}项异常:")
for i in ISSUES:
print(f" ⚠️ {i}")
else:
print(f"[SILENT] 盘中自检通过 | {OK_COUNT}项正常")
if __name__ == "__main__":
main()