Files
MoFin/scripts/intraday_health_check.py
知微 7c0e85af28 硬性策略质量门禁 validate_strategy()
新增 STRATEGY_QUALITY_GATES 检查清单(9条红线):
CRITICAL: 止损/止盈存在+>0, 买入区下沿<上沿
HIGH: 止损≤买入区, 买入推荐含RR≥1.5, 港股标currency=HKD
MEDIUM: signal短词, tech_snapshot含技术位

enforce_strategy_quality() 插在写入链的两处:
1. reassess_with_context() return前 → 单只重评必过
2. regenerate_all() for d in decisions: 写DB前 → 批量重评必过

不过的:status=review_needed, signal降级→信号不充分
不会写进DB/JSON,除非修复了CRITICAL问题
2026-07-02 13:46:53 +08:00

262 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""intraday_health_check.py — 盘中高频轻量自检 (no_agent)
每15分钟检查最关键的活动组件,只查会直接影响盘中运行的。
发现问题→写TODO(消费管道与每日体检共享)。
"""
import json, os, sqlite3, subprocess, urllib.request, sys, socket
from pathlib import Path
from datetime import datetime, timedelta
# ── MoFin path ─────────────────────────────────────────────────────
sys.path.insert(0, "/home/hmo/MoFin")
from mo_data import read_portfolio, read_decisions, read_watchlist
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
DB_PATH = DATA / "mofin.db"
CRON_JOBS = Path("/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json")
GATEWAY_URL = "http://localhost:8643/v1/chat/completions"
GATEWAY_KEY = "hermes123"
ISSUES = []
OK_COUNT = 0
def log(ok, msg):
global OK_COUNT
if ok:
OK_COUNT += 1
else:
ISSUES.append(msg)
def check_port(port):
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
return f":{port}" in r.stdout
except:
return False
def check_http(url, timeout=5):
"""检查HTTP可达性,5秒超时防止hang住"""
try:
for k in list(os.environ.keys()):
if 'proxy' in k.lower():
os.environ.pop(k)
req = urllib.request.Request(url, method="GET")
urllib.request.urlopen(req, timeout=timeout)
return True
except:
return False
def db_today_count(table, date_col):
today = datetime.now().strftime("%Y-%m-%d")
try:
conn = sqlite3.connect(str(DB_PATH))
r = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?", (today,)).fetchone()
conn.close()
return r[0]
except:
return -1
def check_xiaoguo():
"""小果管道:进程/scanner有数据/API可达(降级不报错)"""
# 进程 — 不一定有常驻进程(no_agent cron模式)
# 数据 — 今日有扫描记录
scans_today = db_today_count("xiaoguo_scan_tracker", "last_scanned_at")
if scans_today <= 0:
# 可能是小果离线了,不报严重,记录即可
return
# API — 用socket快速检测可达性(3s超时)
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
s.connect(("node122", 18003))
s.close()
except:
pass
PORTFOLIO_PATH = str(DATA / "portfolio.json")
def check_price_monitor():
"""价格监控:检查price_monitor cron最近是否运行 + 数据是否更新
注意:price_events 存储的是区间偏离事件(价格穿过买入区/止损/止盈边界),
不是心跳信号。横盘期/无操作信号时自然不会有新事件。因此不检查event数,
改为检查 cron 最后运行时间和 portfolio.json 数据新鲜度。
"""
# 检查cron最近运行记录
cron_ok = False
try:
with open(str(CRON_JOBS)) as f:
data = json.load(f)
jobs_list = data.get("jobs", []) if isinstance(data.get("jobs"), list) else []
if not jobs_list:
jobs_list = list(data.get("jobs", {}).values())
for job in jobs_list:
if not job:
continue
script = job.get("script") or ""
name = job.get("name") or ""
if "price_monitor" in script or "价格监控" in name:
last_run = job.get("last_run_at")
if last_run:
last_dt = datetime.fromisoformat(last_run)
# 兼容带时区和无时区两种格式
ref_now = datetime.now(last_dt.tzinfo) if last_dt.tzinfo else datetime.now()
elapsed = (ref_now - last_dt).total_seconds()
if elapsed < 600: # 10分钟内运行过
cron_ok = True
break
except Exception:
pass
if not cron_ok:
log(False, "价格监控cron无最近运行记录(>10分钟未运行)")
return
# 检查portfolio.json数据新鲜度
# 兼容 '2026-07-02 10:43'price_monitor写入,无秒)和 '2026-07-02 10:43:53'DB写入,有秒)
def _parse_updated_at(ts: str) -> datetime | None:
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M"):
try:
return datetime.strptime(ts, fmt)
except ValueError:
continue
return None
try:
pf = read_portfolio()
pf_updated = pf.get("updated_at", "")
if pf_updated:
pf_dt = _parse_updated_at(pf_updated)
if pf_dt is None:
log(False, f"价格数据updated_at格式无法解析: {pf_updated}")
else:
seconds_ago = (datetime.now() - pf_dt).total_seconds()
if seconds_ago < 600: # 10分钟内
log(True, f"价格监控运行正常,数据{int(seconds_ago//60)}分钟前更新")
else:
log(False, f"价格数据{int(seconds_ago)}秒未更新(portfolio.json")
else:
log(False, "portfolio.json缺少updated_at字段")
except Exception as e:
log(False, f"价格数据新鲜度检查失败: {e}")
def check_bots():
zhiwei = subprocess.run(["systemctl", "is-active", "xmpp-zhiwei.service"],
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
xiaoguo = subprocess.run(["systemctl", "is-active", "xmpp-xiaoguo.service"],
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
log(zhiwei, "知微XMPP Bot离线")
log(xiaoguo, "小果XMPP Bot离线")
def check_gateways():
log(check_port(8643), "知微Gateway :8643 未监听")
log(check_port(8645), "小果Gateway :8645 未监听")
def check_signal_pipeline():
"""信号从xiaoguo_scanner→signal_news→consumer是否通畅"""
unproc = 0
try:
conn = sqlite3.connect(str(DB_PATH))
r = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()
unproc = r[0]
conn.close()
except:
pass
log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30")
# 宏观风险状态检查
try:
risk_path = DATA / "macro_risk_state.json"
if risk_path.exists():
risk = json.loads(risk_path.read_text())
level = risk.get("level", "none")
expired = risk.get("expired", False)
# 提取摘要做原因描述(state.json用signals数组,不是reason字段)
signals = risk.get("signals", [])
reason = ""
if signals and isinstance(signals, list) and len(signals) > 0:
first_sig = signals[0]
summary = first_sig.get("summary", "")
if summary:
reason = summary[:80].replace("\n", " ")
if level == "high" and not expired:
reason_clean = reason.replace("【高风险】", "").strip()[:60]
log(False, f"🔴 宏观风险HIGH: {reason_clean}")
elif level == "high" and expired:
log(True, f"⏳ 宏观风险HIGH已过期(无新信号超过15分钟)")
elif level == "medium":
log(True, f"⚠️ 宏观风险MEDIUM: {reason}")
else:
log(True, "无宏观风险状态文件(可能未生成)")
except:
pass
def write_todos():
if not ISSUES:
return
for msg in ISSUES:
title = f"[盘中自检] {msg}"
try:
conn = sqlite3.connect(str(DB_PATH), timeout=10)
conn.execute("PRAGMA busy_timeout=5000")
# 宏观风险HIGH去重:只要有pending/in_progress的宏观风险TODO,不再新增
if "宏观风险HIGH" in msg:
exist = conn.execute(
"SELECT id FROM todos WHERE title LIKE '%宏观风险HIGH%' AND status IN ('pending','in_progress') LIMIT 1"
).fetchone()
else:
exist = conn.execute(
"SELECT id FROM todos WHERE title=? AND status IN ('pending','in_progress')", (title,)
).fetchone()
if not exist:
conn.execute(
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
"VALUES (?, ?, 'high', 'intraday_check', 'pending', NULL)",
(title, f"盘中自动发现: {msg}"))
conn.commit()
conn.close()
except:
pass
def main():
now = datetime.now()
# 只在交易时段运行
if now.weekday() >= 5 or now.hour < 9 or now.hour >= 15:
print("[SILENT] 非交易时段")
return
check_bots()
check_gateways()
check_xiaoguo()
if 9 <= now.hour < 16:
check_price_monitor()
check_signal_pipeline()
write_todos()
if ISSUES:
print(f"盘中自检 | {now.strftime('%H:%M')} | {len(ISSUES)}项异常:")
for i in ISSUES:
print(f" ⚠️ {i}")
else:
print(f"[SILENT] 盘中自检通过 | {OK_COUNT}项正常")
if __name__ == "__main__":
main()