Files
MoFin/branch_scanner.py
T
知微 ee1849a6a3 自成长体系补齐:分支扫描+每日剪枝+决策树全覆盖+分支输出
核心改动:
1. 创建 branch_scanner.py — 每15分钟扫价格→评估分支适用性→记录trigger_count
   cron: 分支自成长-盘中 (15,30,45,00 9-15)
2. 创建 prune_branches.py — 每日21:00剪枝(触发>=5次且成功率<50% → 淘汰)
   cron: 分支剪枝-每日 (0 21 * * 1-5) — 之前是每周,频率太低
3. strategy_tree.py: _check_branch_condition 新增 price_lower 支持
   buy_dip 分支同时检查上下界(price<=entry_high AND price_lower>=entry_low)
4. 43只股票全部补全决策树(之前只有6只)
   init_default_branches 生成每只6条分支:止损/回调买入/突破追涨/减仓/止盈/持有
5. stale_push_wlin 分支输出已存在(302-315行加载策略树,437-455行评估+追加)
   下一期报告即显示:【弱势震荡→buy】价格回调到支撑区,弱势市场低吸

新增:
  南亚新材(688519) 全面分析+策略+自选
  买入区335~350 止损320 止盈400 RR=1.7
  6月从285拉至409(+43%)后急跌至331(-19%),今日反弹缩量。高PE(228)炒作品种,等回调确认支撑
2026-06-24 10:29:45 +08:00

149 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
branch_scanner.py — 盘中分支扫描器
每15分钟跑一轮:
1. 读取所有有 strategy_tree 的股票
2. 获取实时价格
3. 评估每个分支在当前情景下是否适用
4. 适用分支 → 记录 trigger_count + 推送信号
自成长核心组件:让分支条件得到实际验证。
"""
import json, sys, os, re
from datetime import datetime, date
from urllib.request import Request, urlopen
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json"
MACRO_PATH = "/home/hmo/web-dashboard/data/macro_context.json"
EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json"
XMPP_URL = "http://127.0.0.1:5805/"
def get_price(code):
"""腾讯API实时价格"""
mkt = "sh" if code.startswith("6") or code.startswith("5") else "sz"
url = f"http://qt.gtimg.cn/q={mkt}{code}"
req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
try:
resp = urlopen(req, timeout=5).read().decode("gbk")
parts = resp.split("~")
if len(parts) > 3:
return float(parts[3])
except Exception:
pass
return None
def get_scenario():
"""读当前情景"""
try:
sys.path.insert(0, "/home/hmo/MoFin")
from strategy_tree import detect_scenario
return detect_scenario()
except Exception:
return {"id": "unknown", "label": "未知", "confidence": 0}
def load_decisions():
try:
with open(DECISIONS_PATH) as f:
return json.load(f)
except Exception:
return {"decisions": []}
def save_decisions(data):
with open(DECISIONS_PATH, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def check_condition(branch, scenario_id, price):
"""检查分支条件是否满足"""
cond = branch.get("condition", {})
required_scenario = cond.get("scenario", "")
if required_scenario and required_scenario != scenario_id:
return False
price_cond = cond.get("price", "")
if price_cond and price:
ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_cond)
for op, val_str in ops:
val = float(val_str)
if op == "<" and not (price < val): return False
if op == ">" and not (price > val): return False
if op == "<=" and not (price <= val): return False
if op == ">=" and not (price >= val): return False
return True
def push_alert(msg):
try:
payload = json.dumps({"to": "hmo@yoin.fun", "body": msg, "type": "chat"}).encode()
urlopen(XMPP_URL, data=payload, timeout=3)
except Exception:
pass
def main():
now = datetime.now()
today = now.strftime("%Y-%m-%d")
hour = now.hour
# 盘后才扫无意义
if hour < 9 or hour > 16:
print("SILENT: 非交易时段")
return 0
scenario = get_scenario()
sid = scenario.get("id", "unknown")
data = load_decisions()
decisions = data.get("decisions", [])
triggered = []
for entry in decisions:
code = entry.get("code", "")
tree = entry.get("strategy_tree", {})
branches = tree.get("branches", [])
if not branches:
continue
price = get_price(code)
if not price:
continue
for br in branches:
if check_condition(br, sid, price):
br["trigger_count"] = br.get("trigger_count", 0) + 1
br["last_triggered"] = today
triggered.append((code, entry.get("name", ""), br))
if triggered:
save_decisions(data)
print(f"[SCAN] {now.strftime('%H:%M')} 情景={sid} | {len(triggered)}个分支被触发")
# 推送重要触发
alerts = []
for code, name, br in triggered:
action = br.get("action", {})
action_type = action.get("type", "hold")
priority = br.get("priority", 99)
rationale = br.get("rationale", "")
count = br.get("trigger_count", 1)
if action_type != "hold":
alerts.append(f" {code} {name}: {action_type}{rationale})触发{count}")
if alerts:
msg = f"【分支扫描】{now.strftime('%H:%M')} | 情景{sid}\n" + "\n".join(alerts)
push_alert(msg)
else:
print(f"[SCAN] {now.strftime('%H:%M')} | 情景{sid} | 无触发")
return 0
if __name__ == "__main__":
sys.exit(main())