Files
MoFin/branch_scanner.py
T
知微 68e806cfa1 branch_scanner: 状态变化驱动推送,停止15分钟噪音
核心逻辑重写:
- 不再每15分钟推30只股票的买入信号(噪音)
- 改为静默数据采集 + 状态变化检测
- scanner_state.json 记录上一轮各股最优分支
- 只有以下情况才推:
  ① 情景切换(如弱势震荡→急跌防御)
  ② 某只股票的最优分支变化(如持有→买入/止损)
  ③ 止损首次触发(P0新出现才推,不重复推)

日常运行时完全静默,决策树数据持续累积
2026-06-24 10:37:39 +08:00

207 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
branch_scanner.py — 盘中分支扫描器
每15分钟跑一轮:
1. 读取所有有 strategy_tree 的股票
2. 获取实时价格
3. 评估每个分支在当前情景下是否适用
4. 适用分支 → 记录 trigger_count + 推送信号
自成长核心组件:让分支条件得到实际验证。
"""
import json, sys, os, re
from datetime import datetime, date
from urllib.request import Request, urlopen
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json"
MACRO_PATH = "/home/hmo/web-dashboard/data/macro_context.json"
EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json"
def get_price(code):
"""腾讯API实时价格"""
mkt = "sh" if code.startswith("6") or code.startswith("5") else "sz"
url = f"http://qt.gtimg.cn/q={mkt}{code}"
req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
try:
resp = urlopen(req, timeout=5).read().decode("gbk")
parts = resp.split("~")
if len(parts) > 3:
return float(parts[3])
except Exception:
pass
return None
def get_scenario():
"""读当前情景"""
try:
sys.path.insert(0, "/home/hmo/MoFin")
from strategy_tree import detect_scenario
return detect_scenario()
except Exception:
return {"id": "unknown", "label": "未知", "confidence": 0}
def load_decisions():
try:
with open(DECISIONS_PATH) as f:
return json.load(f)
except Exception:
return {"decisions": []}
def save_decisions(data):
with open(DECISIONS_PATH, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def check_condition(branch, scenario_id, price):
"""检查分支条件是否满足 — 同步 strategy_tree._check_branch_condition 逻辑"""
cond = branch.get("condition", {})
required_scenario = cond.get("scenario", "")
if required_scenario and required_scenario != scenario_id:
return False
price_cond = cond.get("price", "")
if price_cond and price:
ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_cond)
for op, val_str in ops:
val = float(val_str)
if op == "<" and not (price < val): return False
if op == ">" and not (price > val): return False
if op == "<=" and not (price <= val): return False
if op == ">=" and not (price >= val): return False
# 买入区下界(price_lower)检查 — 之前遗漏
price_lower = cond.get("price_lower", "")
if price_lower and price:
ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_lower)
for op, val_str in ops:
val = float(val_str)
if op == "<" and not (price < val): return False
if op == ">" and not (price > val): return False
if op == "<=" and not (price <= val): return False
if op == ">=" and not (price >= val): return False
return True
def main():
now = datetime.now()
today = now.strftime("%Y-%m-%d")
hour = now.hour
# 盘后才扫无意义
if hour < 9 or hour > 16:
return 0
scenario = get_scenario()
sid = scenario.get("id", "unknown")
slabel = scenario.get("label", "未知")
data = load_decisions()
decisions = data.get("decisions", [])
# 读上次扫描的状态(记录情景+各股最优分支)
state_path = "/home/hmo/web-dashboard/data/scanner_state.json"
last_state = {"scenario": "", "branches": {}}
try:
with open(state_path) as f:
last_state = json.load(f)
except Exception:
pass
triggered = []
alerts = []
for entry in decisions:
code = entry.get("code", "")
tree = entry.get("strategy_tree", {})
branches = tree.get("branches", [])
if not branches:
continue
price = get_price(code)
if not price:
continue
# 找出最优适用分支
best = None
for br in sorted(branches, key=lambda b: b.get("priority", 999)):
if check_condition(br, sid, price):
best = br
break
if best:
best["trigger_count"] = best.get("trigger_count", 0) + 1
best["last_triggered"] = today
triggered.append(code)
# 状态变化检测:上次不是这个分支→推送
action_type = best.get("action", {}).get("type", "hold")
br_id = best.get("id", "")
last_br = last_state.get("branches", {}).get(code, "")
last_action = last_state.get("branches", {}).get(f"{code}_action", "")
changed = (br_id != last_br)
# 推送条件:分支变化(情景变化已隐含)或止损首次触发
priority = best.get("priority", 99)
should_alert = False
if priority == 0: # 止损
# 只有止损新出现才推(上次不是止损 or 上次扫描不存在)
should_alert = (changed or not last_state.get("branches"))
elif sid != last_state.get("scenario"):
should_alert = True
elif changed:
should_alert = True
if should_alert:
rationale = best.get("rationale", "")
count = best.get("trigger_count", 1)
alerts.append(f" {code} {entry.get('name','')}: {action_type}{rationale}")
if triggered:
save_decisions(data)
# 保存当前状态供下次对比
new_state = {
"scenario": sid,
"scenario_label": slabel,
"branches": {},
"updated_at": now.isoformat(),
}
for e in decisions:
code = e.get("code", "")
tree = e.get("strategy_tree", {})
branches = tree.get("branches", [])
best = None
for br in sorted(branches, key=lambda b: b.get("priority", 999)):
if check_condition(br, sid, get_price(code)):
best = br
break
if best:
new_state["branches"][code] = best.get("id", "")
new_state["branches"][f"{code}_action"] = best.get("action", {}).get("type", "")
try:
with open(state_path, "w") as f:
json.dump(new_state, f, indent=2)
except Exception:
pass
# 输出:变化才出声,平常静默
if alerts:
scenario_shift = f"【情景切换】{last_state.get('scenario','')}{sid}" if sid != last_state.get("scenario") else ""
header = f"【分支扫描】{now.strftime('%H:%M')}" + (f" | {scenario_shift}" if scenario_shift else "")
msg = header + "\n" + "\n".join(alerts)
print(msg)
return 0
# 无变化→静默
return 0
if __name__ == "__main__":
sys.exit(main())