Files
MoFin/scripts/branch_scanner.py
T
知微 e1c426fb96 branch_scanner: 修price_lower遗漏+只输最优分支
1. check_condition 新增 price_lower 检查(买入区下界)
   之前 buy_dip 分支的 price_lower: >=entry_low 从未被验证
   导致跌出买入区的股票也错误触发买入信号

2. 输出改为每只股票只列最优优先级的适用分支
   之前列出所有适用分支(多分支冲突:同一只股同时止损+买入)
   优先级:止损(P0) > 回调买入(P1) > 追涨(P2) > 减仓(P3) > 止盈(P4) > 持有(P99)
   每只股票只列一项,无矛盾
2026-06-24 10:34:46 +08:00

167 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
branch_scanner.py — 盘中分支扫描器
每15分钟跑一轮:
1. 读取所有有 strategy_tree 的股票
2. 获取实时价格
3. 评估每个分支在当前情景下是否适用
4. 适用分支 → 记录 trigger_count + 推送信号
自成长核心组件:让分支条件得到实际验证。
"""
import json, sys, os, re
from datetime import datetime, date
from urllib.request import Request, urlopen
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json"
MACRO_PATH = "/home/hmo/web-dashboard/data/macro_context.json"
EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json"
XMPP_URL = "http://127.0.0.1:5805/"
def get_price(code):
"""腾讯API实时价格"""
mkt = "sh" if code.startswith("6") or code.startswith("5") else "sz"
url = f"http://qt.gtimg.cn/q={mkt}{code}"
req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
try:
resp = urlopen(req, timeout=5).read().decode("gbk")
parts = resp.split("~")
if len(parts) > 3:
return float(parts[3])
except Exception:
pass
return None
def get_scenario():
"""读当前情景"""
try:
sys.path.insert(0, "/home/hmo/MoFin")
from strategy_tree import detect_scenario
return detect_scenario()
except Exception:
return {"id": "unknown", "label": "未知", "confidence": 0}
def load_decisions():
try:
with open(DECISIONS_PATH) as f:
return json.load(f)
except Exception:
return {"decisions": []}
def save_decisions(data):
with open(DECISIONS_PATH, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def check_condition(branch, scenario_id, price):
"""检查分支条件是否满足 — 同步 strategy_tree._check_branch_condition 逻辑"""
cond = branch.get("condition", {})
required_scenario = cond.get("scenario", "")
if required_scenario and required_scenario != scenario_id:
return False
price_cond = cond.get("price", "")
if price_cond and price:
ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_cond)
for op, val_str in ops:
val = float(val_str)
if op == "<" and not (price < val): return False
if op == ">" and not (price > val): return False
if op == "<=" and not (price <= val): return False
if op == ">=" and not (price >= val): return False
# 买入区下界(price_lower)检查 — 之前遗漏
price_lower = cond.get("price_lower", "")
if price_lower and price:
ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_lower)
for op, val_str in ops:
val = float(val_str)
if op == "<" and not (price < val): return False
if op == ">" and not (price > val): return False
if op == "<=" and not (price <= val): return False
if op == ">=" and not (price >= val): return False
return True
def push_alert(msg):
try:
payload = json.dumps({"to": "hmo@yoin.fun", "body": msg, "type": "chat"}).encode()
urlopen(XMPP_URL, data=payload, timeout=3)
except Exception:
pass
def main():
now = datetime.now()
today = now.strftime("%Y-%m-%d")
hour = now.hour
# 盘后才扫无意义
if hour < 9 or hour > 16:
print("SILENT: 非交易时段")
return 0
scenario = get_scenario()
sid = scenario.get("id", "unknown")
data = load_decisions()
decisions = data.get("decisions", [])
triggered = []
for entry in decisions:
code = entry.get("code", "")
tree = entry.get("strategy_tree", {})
branches = tree.get("branches", [])
if not branches:
continue
price = get_price(code)
if not price:
continue
# 找出所有适用的分支,选最优(优先级数字最低)
best = None
for br in sorted(branches, key=lambda b: b.get("priority", 999)):
if check_condition(br, sid, price):
best = br
break
if best:
best["trigger_count"] = best.get("trigger_count", 0) + 1
best["last_triggered"] = today
triggered.append((code, entry.get("name", ""), best))
if triggered:
save_decisions(data)
print(f"[SCAN] {now.strftime('%H:%M')} 情景={sid} | {len(triggered)}个分支被触发")
# 推送重要触发
alerts = []
for code, name, br in triggered:
action = br.get("action", {})
action_type = action.get("type", "hold")
priority = br.get("priority", 99)
rationale = br.get("rationale", "")
count = br.get("trigger_count", 1)
if action_type != "hold":
alerts.append(f" {code} {name}: {action_type}{rationale})触发{count}")
if alerts:
msg = f"【分支扫描】{now.strftime('%H:%M')} | 情景{sid}\n" + "\n".join(alerts)
push_alert(msg)
else:
print(f"[SCAN] {now.strftime('%H:%M')} | 情景{sid} | 无触发")
return 0
if __name__ == "__main__":
sys.exit(main())