6a97d93018
截图确认: - 可用资金 92,664.20(含天添利) - 冻结 39,481.40 - 总现金 132,145.60 - 总资产 = 持仓市值1,107,670 + 现金132,145.60 = 1,239,815.60 法拉电子 189.20卖出100股已记录
229 lines
7.8 KiB
Python
229 lines
7.8 KiB
Python
#!/usr/bin/env python3
|
||
"""intraday_health_check.py — 盘中高频轻量自检 (no_agent)
|
||
|
||
每15分钟检查最关键的活动组件,只查会直接影响盘中运行的。
|
||
发现问题→写TODO(消费管道与每日体检共享)。
|
||
"""
|
||
|
||
import json, os, sqlite3, subprocess, urllib.request
|
||
from pathlib import Path
|
||
from datetime import datetime, timedelta
|
||
|
||
BASE = Path("/home/hmo/MoFin")
|
||
DATA = BASE / "data"
|
||
DB_PATH = DATA / "mofin.db"
|
||
CRON_JOBS = Path("/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json")
|
||
GATEWAY_URL = "http://localhost:8643/v1/chat/completions"
|
||
GATEWAY_KEY = "hermes123"
|
||
|
||
ISSUES = []
|
||
OK_COUNT = 0
|
||
|
||
|
||
def log(ok, msg):
|
||
global OK_COUNT
|
||
if ok:
|
||
OK_COUNT += 1
|
||
else:
|
||
ISSUES.append(msg)
|
||
|
||
|
||
def check_port(port):
|
||
try:
|
||
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
|
||
return f":{port}" in r.stdout
|
||
except:
|
||
return False
|
||
|
||
|
||
def check_http(url, timeout=8):
|
||
try:
|
||
for k in list(os.environ.keys()):
|
||
if 'proxy' in k.lower():
|
||
os.environ.pop(k)
|
||
req = urllib.request.Request(url, method="GET")
|
||
urllib.request.urlopen(req, timeout=timeout)
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
|
||
def db_today_count(table, date_col):
|
||
today = datetime.now().strftime("%Y-%m-%d")
|
||
try:
|
||
conn = sqlite3.connect(str(DB_PATH))
|
||
r = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?", (today,)).fetchone()
|
||
conn.close()
|
||
return r[0]
|
||
except:
|
||
return -1
|
||
|
||
|
||
def check_xiaoguo():
|
||
"""小果管道:进程/scanner有数据/API可达(降级不报错)"""
|
||
# 进程 — 不一定有常驻进程(no_agent cron模式)
|
||
# 数据 — 今日有扫描记录
|
||
scans_today = db_today_count("xiaoguo_scan_tracker", "last_scanned_at")
|
||
if scans_today <= 0:
|
||
# 可能是小果离线了,不报严重,记录即可
|
||
return
|
||
# API — 不通时scanner已降级为unknown,不影响
|
||
check_http("http://192.168.1.122:18003/v1/models")
|
||
|
||
|
||
PORTFOLIO_PATH = str(DATA / "portfolio.json")
|
||
|
||
|
||
def check_price_monitor():
|
||
"""价格监控:检查price_monitor cron最近是否运行 + 数据是否更新
|
||
|
||
注意:price_events 存储的是区间偏离事件(价格穿过买入区/止损/止盈边界),
|
||
不是心跳信号。横盘期/无操作信号时自然不会有新事件。因此不检查event数,
|
||
改为检查 cron 最后运行时间和 portfolio.json 数据新鲜度。
|
||
"""
|
||
# 检查cron最近运行记录
|
||
cron_ok = False
|
||
try:
|
||
with open(str(CRON_JOBS)) as f:
|
||
data = json.load(f)
|
||
jobs_list = data.get("jobs", []) if isinstance(data.get("jobs"), list) else []
|
||
if not jobs_list:
|
||
jobs_list = list(data.get("jobs", {}).values())
|
||
for job in jobs_list:
|
||
if not job:
|
||
continue
|
||
script = job.get("script") or ""
|
||
name = job.get("name") or ""
|
||
if "price_monitor" in script or "价格监控" in name:
|
||
last_run = job.get("last_run_at")
|
||
if last_run:
|
||
last_dt = datetime.fromisoformat(last_run)
|
||
# 兼容带时区和无时区两种格式
|
||
ref_now = datetime.now(last_dt.tzinfo) if last_dt.tzinfo else datetime.now()
|
||
elapsed = (ref_now - last_dt).total_seconds()
|
||
if elapsed < 600: # 10分钟内运行过
|
||
cron_ok = True
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
if not cron_ok:
|
||
log(False, "价格监控cron无最近运行记录(>10分钟未运行)")
|
||
return
|
||
|
||
# 检查portfolio.json数据新鲜度
|
||
try:
|
||
pf = json.load(open(PORTFOLIO_PATH))
|
||
pf_updated = pf.get("updated_at", "")
|
||
if pf_updated:
|
||
pf_dt = datetime.strptime(pf_updated, "%Y-%m-%d %H:%M")
|
||
seconds_ago = (datetime.now() - pf_dt).total_seconds()
|
||
if seconds_ago < 600: # 10分钟内
|
||
log(True, f"价格监控运行正常,数据{int(seconds_ago//60)}分钟前更新")
|
||
else:
|
||
log(False, f"价格数据{int(seconds_ago)}秒未更新(portfolio.json)")
|
||
else:
|
||
log(False, "portfolio.json缺少updated_at字段")
|
||
except Exception as e:
|
||
log(False, f"价格数据新鲜度检查失败: {e}")
|
||
|
||
|
||
def check_bots():
|
||
zhiwei = subprocess.run(["systemctl", "is-active", "xmpp-zhiwei.service"],
|
||
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
|
||
xiaoguo = subprocess.run(["systemctl", "is-active", "xmpp-xiaoguo.service"],
|
||
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
|
||
log(zhiwei, "知微XMPP Bot离线")
|
||
log(xiaoguo, "小果XMPP Bot离线")
|
||
|
||
|
||
def check_gateways():
|
||
log(check_port(8643), "知微Gateway :8643 未监听")
|
||
log(check_port(8645), "小果Gateway :8645 未监听")
|
||
|
||
|
||
def check_signal_pipeline():
|
||
"""信号从xiaoguo_scanner→signal_news→consumer是否通畅"""
|
||
unproc = 0
|
||
try:
|
||
conn = sqlite3.connect(str(DB_PATH))
|
||
r = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()
|
||
unproc = r[0]
|
||
conn.close()
|
||
except:
|
||
pass
|
||
log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30)")
|
||
|
||
# 宏观风险状态检查
|
||
try:
|
||
risk_path = DATA / "macro_risk_state.json"
|
||
if risk_path.exists():
|
||
risk = json.loads(risk_path.read_text())
|
||
level = risk.get("level", "none")
|
||
expired = risk.get("expired", False)
|
||
# 提取摘要做原因描述(state.json用signals数组,不是reason字段)
|
||
signals = risk.get("signals", [])
|
||
reason = ""
|
||
if signals and isinstance(signals, list) and len(signals) > 0:
|
||
first_sig = signals[0]
|
||
summary = first_sig.get("summary", "")
|
||
if summary:
|
||
reason = summary[:80].replace("\n", " ")
|
||
if level == "high" and not expired:
|
||
log(False, f"🔴 宏观风险HIGH: {reason}")
|
||
elif level == "high" and expired:
|
||
log(True, f"⏳ 宏观风险HIGH已过期(无新信号超过15分钟)")
|
||
elif level == "medium":
|
||
log(True, f"⚠️ 宏观风险MEDIUM: {reason}")
|
||
else:
|
||
log(True, "无宏观风险状态文件(可能未生成)")
|
||
except:
|
||
pass
|
||
|
||
|
||
def write_todos():
|
||
if not ISSUES:
|
||
return
|
||
for msg in ISSUES:
|
||
title = f"[盘中自检] {msg}"
|
||
try:
|
||
conn = sqlite3.connect(str(DB_PATH))
|
||
exist = conn.execute("SELECT id FROM todos WHERE title=? AND status IN ('pending','in_progress')", (title,)).fetchone()
|
||
if not exist:
|
||
conn.execute(
|
||
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
|
||
"VALUES (?, ?, 'high', 'intraday_check', 'pending', NULL)",
|
||
(title, f"盘中自动发现: {msg}"))
|
||
conn.commit()
|
||
conn.close()
|
||
except:
|
||
pass
|
||
|
||
|
||
def main():
|
||
now = datetime.now()
|
||
# 只在交易时段运行
|
||
if now.weekday() >= 5 or now.hour < 9 or now.hour >= 15:
|
||
print("[SILENT] 非交易时段")
|
||
return
|
||
|
||
check_bots()
|
||
check_gateways()
|
||
check_xiaoguo()
|
||
if 9 <= now.hour < 16:
|
||
check_price_monitor()
|
||
check_signal_pipeline()
|
||
|
||
write_todos()
|
||
|
||
if ISSUES:
|
||
print(f"盘中自检 | {now.strftime('%H:%M')} | {len(ISSUES)}项异常:")
|
||
for i in ISSUES:
|
||
print(f" ⚠️ {i}")
|
||
else:
|
||
print(f"[SILENT] 盘中自检通过 | {OK_COUNT}项正常")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|