refactor: remove duplicate scripts + JSON constants + update docs

This commit is contained in:
知微
2026-07-04 09:30:50 +08:00
parent 7d6b08953c
commit 8b114d6e9e
26 changed files with 2200 additions and 4913 deletions
+258 -261
View File
@@ -1,261 +1,258 @@
#!/usr/bin/env python3
"""intraday_health_check.py — 盘中高频轻量自检 (no_agent)
每15分钟检查最关键的活动组件,只查会直接影响盘中运行的。
发现问题→写TODO(消费管道与每日体检共享)。
"""
import json, os, sqlite3, subprocess, urllib.request, sys, socket
from pathlib import Path
from datetime import datetime, timedelta
# ── MoFin path ─────────────────────────────────────────────────────
sys.path.insert(0, "/home/hmo/MoFin")
from mo_data import read_portfolio, read_decisions, read_watchlist
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
DB_PATH = DATA / "mofin.db"
CRON_JOBS = Path("/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json")
GATEWAY_URL = "http://localhost:8643/v1/chat/completions"
GATEWAY_KEY = "hermes123"
ISSUES = []
OK_COUNT = 0
def log(ok, msg):
global OK_COUNT
if ok:
OK_COUNT += 1
else:
ISSUES.append(msg)
def check_port(port):
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
return f":{port}" in r.stdout
except:
return False
def check_http(url, timeout=5):
"""检查HTTP可达性,5秒超时防止hang住"""
try:
for k in list(os.environ.keys()):
if 'proxy' in k.lower():
os.environ.pop(k)
req = urllib.request.Request(url, method="GET")
urllib.request.urlopen(req, timeout=timeout)
return True
except:
return False
def db_today_count(table, date_col):
today = datetime.now().strftime("%Y-%m-%d")
try:
conn = sqlite3.connect(str(DB_PATH))
r = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?", (today,)).fetchone()
conn.close()
return r[0]
except:
return -1
def check_xiaoguo():
"""小果管道:进程/scanner有数据/API可达(降级不报错)"""
# 进程 — 不一定有常驻进程(no_agent cron模式)
# 数据 — 今日有扫描记录
scans_today = db_today_count("xiaoguo_scan_tracker", "last_scanned_at")
if scans_today <= 0:
# 可能是小果离线了,不报严重,记录即可
return
# API — 用socket快速检测可达性(3s超时)
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
s.connect(("node122", 18003))
s.close()
except:
pass
PORTFOLIO_PATH = str(DATA / "portfolio.json")
def check_price_monitor():
"""价格监控:检查price_monitor cron最近是否运行 + 数据是否更新
注意:price_events 存储的是区间偏离事件(价格穿过买入区/止损/止盈边界),
不是心跳信号。横盘期/无操作信号时自然不会有新事件。因此不检查event数,
改为检查 cron 最后运行时间和 portfolio.json 数据新鲜度。
"""
# 检查cron最近运行记录
cron_ok = False
try:
with open(str(CRON_JOBS)) as f:
data = json.load(f)
jobs_list = data.get("jobs", []) if isinstance(data.get("jobs"), list) else []
if not jobs_list:
jobs_list = list(data.get("jobs", {}).values())
for job in jobs_list:
if not job:
continue
script = job.get("script") or ""
name = job.get("name") or ""
if "price_monitor" in script or "价格监控" in name:
last_run = job.get("last_run_at")
if last_run:
last_dt = datetime.fromisoformat(last_run)
# 兼容带时区和无时区两种格式
ref_now = datetime.now(last_dt.tzinfo) if last_dt.tzinfo else datetime.now()
elapsed = (ref_now - last_dt).total_seconds()
if elapsed < 600: # 10分钟内运行过
cron_ok = True
break
except Exception:
pass
if not cron_ok:
log(False, "价格监控cron无最近运行记录(>10分钟未运行)")
return
# 检查portfolio.json数据新鲜度
# 兼容 '2026-07-02 10:43'price_monitor写入,无秒)和 '2026-07-02 10:43:53'DB写入,有秒)
def _parse_updated_at(ts: str) -> datetime | None:
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M"):
try:
return datetime.strptime(ts, fmt)
except ValueError:
continue
return None
try:
pf = read_portfolio()
pf_updated = pf.get("updated_at", "")
if pf_updated:
pf_dt = _parse_updated_at(pf_updated)
if pf_dt is None:
log(False, f"价格数据updated_at格式无法解析: {pf_updated}")
else:
seconds_ago = (datetime.now() - pf_dt).total_seconds()
if seconds_ago < 600: # 10分钟内
log(True, f"价格监控运行正常,数据{int(seconds_ago//60)}分钟前更新")
else:
log(False, f"价格数据{int(seconds_ago)}秒未更新(portfolio.json")
else:
log(False, "portfolio.json缺少updated_at字段")
except Exception as e:
log(False, f"价格数据新鲜度检查失败: {e}")
def check_bots():
zhiwei = subprocess.run(["systemctl", "is-active", "xmpp-zhiwei.service"],
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
xiaoguo = subprocess.run(["systemctl", "is-active", "xmpp-xiaoguo.service"],
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
log(zhiwei, "知微XMPP Bot离线")
log(xiaoguo, "小果XMPP Bot离线")
def check_gateways():
log(check_port(8643), "知微Gateway :8643 未监听")
log(check_port(8645), "小果Gateway :8645 未监听")
def check_signal_pipeline():
"""信号从xiaoguo_scanner→signal_news→consumer是否通畅"""
unproc = 0
try:
conn = sqlite3.connect(str(DB_PATH))
r = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()
unproc = r[0]
conn.close()
except:
pass
log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30")
# 宏观风险状态检查
try:
risk_path = DATA / "macro_risk_state.json"
if risk_path.exists():
risk = json.loads(risk_path.read_text())
level = risk.get("level", "none")
expired = risk.get("expired", False)
# 提取摘要做原因描述(state.json用signals数组,不是reason字段)
signals = risk.get("signals", [])
reason = ""
if signals and isinstance(signals, list) and len(signals) > 0:
first_sig = signals[0]
summary = first_sig.get("summary", "")
if summary:
reason = summary[:80].replace("\n", " ")
if level == "high" and not expired:
reason_clean = reason.replace("【高风险】", "").strip()[:60]
log(False, f"🔴 宏观风险HIGH: {reason_clean}")
elif level == "high" and expired:
log(True, f"⏳ 宏观风险HIGH已过期(无新信号超过15分钟)")
elif level == "medium":
log(True, f"⚠️ 宏观风险MEDIUM: {reason}")
else:
log(True, "无宏观风险状态文件(可能未生成)")
except:
pass
def write_todos():
if not ISSUES:
return
for msg in ISSUES:
title = f"[盘中自检] {msg}"
try:
conn = sqlite3.connect(str(DB_PATH), timeout=10)
conn.execute("PRAGMA busy_timeout=5000")
# 宏观风险HIGH去重:只要有pending/in_progress的宏观风险TODO,不再新增
if "宏观风险HIGH" in msg:
exist = conn.execute(
"SELECT id FROM todos WHERE title LIKE '%宏观风险HIGH%' AND status IN ('pending','in_progress') LIMIT 1"
).fetchone()
else:
exist = conn.execute(
"SELECT id FROM todos WHERE title=? AND status IN ('pending','in_progress')", (title,)
).fetchone()
if not exist:
conn.execute(
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
"VALUES (?, ?, 'high', 'intraday_check', 'pending', NULL)",
(title, f"盘中自动发现: {msg}"))
conn.commit()
conn.close()
except:
pass
def main():
now = datetime.now()
# 只在交易时段运行
if now.weekday() >= 5 or now.hour < 9 or now.hour >= 15:
print("[SILENT] 非交易时段")
return
check_bots()
check_gateways()
check_xiaoguo()
if 9 <= now.hour < 16:
check_price_monitor()
check_signal_pipeline()
write_todos()
if ISSUES:
print(f"盘中自检 | {now.strftime('%H:%M')} | {len(ISSUES)}项异常:")
for i in ISSUES:
print(f" ⚠️ {i}")
else:
print(f"[SILENT] 盘中自检通过 | {OK_COUNT}项正常")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""intraday_health_check.py — 盘中高频轻量自检 (no_agent)
每15分钟检查最关键的活动组件,只查会直接影响盘中运行的。
发现问题→写TODO(消费管道与每日体检共享)。
"""
import json, os, sqlite3, subprocess, urllib.request, sys, socket
from pathlib import Path
from datetime import datetime, timedelta
# ── MoFin path ─────────────────────────────────────────────────────
sys.path.insert(0, "/home/hmo/MoFin")
from mo_data import read_portfolio, read_decisions, read_watchlist
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
DB_PATH = DATA / "mofin.db"
CRON_JOBS = Path("/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json")
GATEWAY_URL = "http://localhost:8643/v1/chat/completions"
GATEWAY_KEY = "hermes123"
ISSUES = []
OK_COUNT = 0
def log(ok, msg):
global OK_COUNT
if ok:
OK_COUNT += 1
else:
ISSUES.append(msg)
def check_port(port):
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
return f":{port}" in r.stdout
except:
return False
def check_http(url, timeout=5):
"""检查HTTP可达性,5秒超时防止hang住"""
try:
for k in list(os.environ.keys()):
if 'proxy' in k.lower():
os.environ.pop(k)
req = urllib.request.Request(url, method="GET")
urllib.request.urlopen(req, timeout=timeout)
return True
except:
return False
def db_today_count(table, date_col):
today = datetime.now().strftime("%Y-%m-%d")
try:
conn = sqlite3.connect(str(DB_PATH))
r = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?", (today,)).fetchone()
conn.close()
return r[0]
except:
return -1
def check_xiaoguo():
"""小果管道:进程/scanner有数据/API可达(降级不报错)"""
# 进程 — 不一定有常驻进程(no_agent cron模式)
# 数据 — 今日有扫描记录
scans_today = db_today_count("xiaoguo_scan_tracker", "last_scanned_at")
if scans_today <= 0:
# 可能是小果离线了,不报严重,记录即可
return
# API — 用socket快速检测可达性(3s超时)
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
s.connect(("node122", 18003))
s.close()
except:
pass
def check_price_monitor():
"""价格监控:检查price_monitor cron最近是否运行 + 数据是否更新
注意:price_events 存储的是区间偏离事件(价格穿过买入区/止损/止盈边界),
不是心跳信号。横盘期/无操作信号时自然不会有新事件。因此不检查event数,
改为检查 cron 最后运行时间和 portfolio.json 数据新鲜度。
"""
# 检查cron最近运行记录
cron_ok = False
try:
with open(str(CRON_JOBS)) as f:
data = json.load(f)
jobs_list = data.get("jobs", []) if isinstance(data.get("jobs"), list) else []
if not jobs_list:
jobs_list = list(data.get("jobs", {}).values())
for job in jobs_list:
if not job:
continue
script = job.get("script") or ""
name = job.get("name") or ""
if "price_monitor" in script or "价格监控" in name:
last_run = job.get("last_run_at")
if last_run:
last_dt = datetime.fromisoformat(last_run)
# 兼容带时区和无时区两种格式
ref_now = datetime.now(last_dt.tzinfo) if last_dt.tzinfo else datetime.now()
elapsed = (ref_now - last_dt).total_seconds()
if elapsed < 600: # 10分钟内运行过
cron_ok = True
break
except Exception:
pass
if not cron_ok:
log(False, "价格监控cron无最近运行记录(>10分钟未运行)")
return
# 检查portfolio.json数据新鲜度
# 兼容 '2026-07-02 10:43'price_monitor写入,无秒)和 '2026-07-02 10:43:53'DB写入,有秒)
def _parse_updated_at(ts: str) -> datetime | None:
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M"):
try:
return datetime.strptime(ts, fmt)
except ValueError:
continue
return None
try:
pf = read_portfolio()
pf_updated = pf.get("updated_at", "")
if pf_updated:
pf_dt = _parse_updated_at(pf_updated)
if pf_dt is None:
log(False, f"价格数据updated_at格式无法解析: {pf_updated}")
else:
seconds_ago = (datetime.now() - pf_dt).total_seconds()
if seconds_ago < 600: # 10分钟内
log(True, f"价格监控运行正常,数据{int(seconds_ago//60)}分钟前更新")
else:
log(False, f"价格数据{int(seconds_ago)}秒未更新(portfolio.json")
else:
log(False, "portfolio.json缺少updated_at字段")
except Exception as e:
log(False, f"价格数据新鲜度检查失败: {e}")
def check_bots():
zhiwei = subprocess.run(["systemctl", "is-active", "xmpp-zhiwei.service"],
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
xiaoguo = subprocess.run(["systemctl", "is-active", "xmpp-xiaoguo.service"],
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
log(zhiwei, "知微XMPP Bot离线")
log(xiaoguo, "小果XMPP Bot离线")
def check_gateways():
log(check_port(8643), "知微Gateway :8643 未监听")
log(check_port(8645), "小果Gateway :8645 未监听")
def check_signal_pipeline():
"""信号从xiaoguo_scanner→signal_news→consumer是否通畅"""
unproc = 0
try:
conn = sqlite3.connect(str(DB_PATH))
r = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()
unproc = r[0]
conn.close()
except:
pass
log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30")
# 宏观风险状态检查
try:
risk_path = DATA / "macro_risk_state.json"
if risk_path.exists():
risk = json.loads(risk_path.read_text())
level = risk.get("level", "none")
expired = risk.get("expired", False)
# 提取摘要做原因描述(state.json用signals数组,不是reason字段)
signals = risk.get("signals", [])
reason = ""
if signals and isinstance(signals, list) and len(signals) > 0:
first_sig = signals[0]
summary = first_sig.get("summary", "")
if summary:
reason = summary[:80].replace("\n", " ")
if level == "high" and not expired:
reason_clean = reason.replace("【高风险】", "").strip()[:60]
log(False, f"🔴 宏观风险HIGH: {reason_clean}")
elif level == "high" and expired:
log(True, f"⏳ 宏观风险HIGH已过期(无新信号超过15分钟)")
elif level == "medium":
log(True, f"⚠️ 宏观风险MEDIUM: {reason}")
else:
log(True, "无宏观风险状态文件(可能未生成)")
except:
pass
def write_todos():
if not ISSUES:
return
for msg in ISSUES:
title = f"[盘中自检] {msg}"
try:
conn = sqlite3.connect(str(DB_PATH), timeout=10)
conn.execute("PRAGMA busy_timeout=5000")
# 宏观风险HIGH去重:只要有pending/in_progress的宏观风险TODO,不再新增
if "宏观风险HIGH" in msg:
exist = conn.execute(
"SELECT id FROM todos WHERE title LIKE '%宏观风险HIGH%' AND status IN ('pending','in_progress') LIMIT 1"
).fetchone()
else:
exist = conn.execute(
"SELECT id FROM todos WHERE title=? AND status IN ('pending','in_progress')", (title,)
).fetchone()
if not exist:
conn.execute(
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
"VALUES (?, ?, 'high', 'intraday_check', 'pending', NULL)",
(title, f"盘中自动发现: {msg}"))
conn.commit()
conn.close()
except:
pass
def main():
now = datetime.now()
# 只在交易时段运行
if now.weekday() >= 5 or now.hour < 9 or now.hour >= 15:
print("[SILENT] 非交易时段")
return
check_bots()
check_gateways()
check_xiaoguo()
if 9 <= now.hour < 16:
check_price_monitor()
check_signal_pipeline()
write_todos()
if ISSUES:
print(f"盘中自检 | {now.strftime('%H:%M')} | {len(ISSUES)}项异常:")
for i in ISSUES:
print(f" ⚠️ {i}")
else:
print(f"[SILENT] 盘中自检通过 | {OK_COUNT}项正常")
if __name__ == "__main__":
main()