branch_scanner 彻底静默:自成长采集器不出操作信号
branch_scanner 是自成长数据收集器,不输出也不推送。 操作信号由 stale_push_wlin / price_monitor / 开盘收盘简报 按cron-report-format规范输出。 scanner只做三件事: 1. 每15分钟扫价格+评估分支 2. trigger_count + 1(决策树数据) 3. 更新状态快照
This commit is contained in:
+36
-120
@@ -1,28 +1,28 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
branch_scanner.py — 盘中分支扫描器
|
||||
branch_scanner.py — 分支自成长数据采集器(全静默)
|
||||
|
||||
每15分钟跑一轮:
|
||||
1. 读取所有有 strategy_tree 的股票
|
||||
2. 获取实时价格
|
||||
3. 评估每个分支在当前情景下是否适用
|
||||
4. 适用分支 → 记录 trigger_count + 推送信号
|
||||
核心功能(三件事,全部后台静默执行):
|
||||
1. 每轮扫描42只股票,评估当前情景下各分支的适用性
|
||||
2. 适用分支 → trigger_count + 1,记录 last_triggered
|
||||
3. 保存当前状态到 scanner_state.json 供下次对比
|
||||
|
||||
自成长核心组件:让分支条件得到实际验证。
|
||||
无输出 → 静默运行。触发数据积累在 decisions.json。
|
||||
操作信号由 stale_push_wlin / price_monitor / 开盘收盘简报 另路输出。
|
||||
|
||||
数据流向(自成长):每15分钟branch_scanner积累trigger_count →
|
||||
每日prune_branches评估低效分支 → decisions.json修剪 → 分支越来越有效
|
||||
"""
|
||||
|
||||
import json, sys, os, re
|
||||
from datetime import datetime, date
|
||||
import json, sys, re
|
||||
from datetime import datetime
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
|
||||
WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json"
|
||||
MACRO_PATH = "/home/hmo/web-dashboard/data/macro_context.json"
|
||||
EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json"
|
||||
SCANNER_STATE = "/home/hmo/web-dashboard/data/scanner_state.json"
|
||||
|
||||
|
||||
def get_price(code):
|
||||
"""腾讯API实时价格"""
|
||||
mkt = "sh" if code.startswith("6") or code.startswith("5") else "sz"
|
||||
url = f"http://qt.gtimg.cn/q={mkt}{code}"
|
||||
req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
|
||||
@@ -32,12 +32,10 @@ def get_price(code):
|
||||
if len(parts) > 3:
|
||||
return float(parts[3])
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def get_scenario():
|
||||
"""读当前情景"""
|
||||
try:
|
||||
sys.path.insert(0, "/home/hmo/MoFin")
|
||||
from strategy_tree import detect_scenario
|
||||
@@ -46,26 +44,11 @@ def get_scenario():
|
||||
return {"id": "unknown", "label": "未知", "confidence": 0}
|
||||
|
||||
|
||||
def load_decisions():
|
||||
try:
|
||||
with open(DECISIONS_PATH) as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
return {"decisions": []}
|
||||
|
||||
|
||||
def save_decisions(data):
|
||||
with open(DECISIONS_PATH, "w") as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
|
||||
def check_condition(branch, scenario_id, price):
|
||||
"""检查分支条件是否满足 — 同步 strategy_tree._check_branch_condition 逻辑"""
|
||||
cond = branch.get("condition", {})
|
||||
required_scenario = cond.get("scenario", "")
|
||||
if required_scenario and required_scenario != scenario_id:
|
||||
return False
|
||||
|
||||
price_cond = cond.get("price", "")
|
||||
if price_cond and price:
|
||||
ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_cond)
|
||||
@@ -75,8 +58,6 @@ def check_condition(branch, scenario_id, price):
|
||||
if op == ">" and not (price > val): return False
|
||||
if op == "<=" and not (price <= val): return False
|
||||
if op == ">=" and not (price >= val): return False
|
||||
|
||||
# 买入区下界(price_lower)检查 — 之前遗漏
|
||||
price_lower = cond.get("price_lower", "")
|
||||
if price_lower and price:
|
||||
ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_lower)
|
||||
@@ -86,119 +67,54 @@ def check_condition(branch, scenario_id, price):
|
||||
if op == ">" and not (price > val): return False
|
||||
if op == "<=" and not (price <= val): return False
|
||||
if op == ">=" and not (price >= val): return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
now = datetime.now()
|
||||
today = now.strftime("%Y-%m-%d")
|
||||
hour = now.hour
|
||||
|
||||
# 盘后才扫无意义
|
||||
if hour < 9 or hour > 16:
|
||||
if now.hour < 9 or now.hour > 16:
|
||||
return 0
|
||||
|
||||
scenario = get_scenario()
|
||||
sid = scenario.get("id", "unknown")
|
||||
slabel = scenario.get("label", "未知")
|
||||
data = load_decisions()
|
||||
|
||||
with open(DECISIONS_PATH) as f:
|
||||
data = json.load(f)
|
||||
decisions = data.get("decisions", [])
|
||||
|
||||
# 读上次扫描的状态(记录情景+各股最优分支)
|
||||
state_path = "/home/hmo/web-dashboard/data/scanner_state.json"
|
||||
last_state = {"scenario": "", "branches": {}}
|
||||
try:
|
||||
with open(state_path) as f:
|
||||
last_state = json.load(f)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
triggered = []
|
||||
alerts = []
|
||||
for entry in decisions:
|
||||
code = entry.get("code", "")
|
||||
tree = entry.get("strategy_tree", {})
|
||||
branches = tree.get("branches", [])
|
||||
if not branches:
|
||||
continue
|
||||
|
||||
price = get_price(code)
|
||||
if not price:
|
||||
continue
|
||||
|
||||
# 找出最优适用分支
|
||||
best = None
|
||||
for br in sorted(branches, key=lambda b: b.get("priority", 999)):
|
||||
if check_condition(br, sid, price):
|
||||
best = br
|
||||
br["trigger_count"] = br.get("trigger_count", 0) + 1
|
||||
br["last_triggered"] = now.strftime("%Y-%m-%d")
|
||||
break
|
||||
|
||||
if best:
|
||||
best["trigger_count"] = best.get("trigger_count", 0) + 1
|
||||
best["last_triggered"] = today
|
||||
triggered.append(code)
|
||||
with open(DECISIONS_PATH, "w") as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
# 状态变化检测:上次不是这个分支→推送
|
||||
action_type = best.get("action", {}).get("type", "hold")
|
||||
br_id = best.get("id", "")
|
||||
last_br = last_state.get("branches", {}).get(code, "")
|
||||
last_action = last_state.get("branches", {}).get(f"{code}_action", "")
|
||||
changed = (br_id != last_br)
|
||||
# 更新状态快照
|
||||
state = {"scenario": sid, "updated_at": now.isoformat(), "branches": {}}
|
||||
for e in decisions:
|
||||
code = e.get("code", "")
|
||||
tree = e.get("strategy_tree", {})
|
||||
for br in sorted(tree.get("branches", []), key=lambda b: b.get("priority", 999)):
|
||||
if check_condition(br, sid, get_price(code)):
|
||||
state["branches"][code] = br.get("id", "")
|
||||
break
|
||||
try:
|
||||
with open(SCANNER_STATE, "w") as f:
|
||||
json.dump(state, f, indent=2)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 推送条件:分支变化(情景变化已隐含)或止损首次触发
|
||||
priority = best.get("priority", 99)
|
||||
should_alert = False
|
||||
if priority == 0: # 止损
|
||||
# 只有止损新出现才推(上次不是止损 or 上次扫描不存在)
|
||||
should_alert = (changed or not last_state.get("branches"))
|
||||
elif sid != last_state.get("scenario"):
|
||||
should_alert = True
|
||||
elif changed:
|
||||
should_alert = True
|
||||
|
||||
if should_alert:
|
||||
rationale = best.get("rationale", "")
|
||||
count = best.get("trigger_count", 1)
|
||||
alerts.append(f" {code} {entry.get('name','')}: {action_type}({rationale})")
|
||||
|
||||
if triggered:
|
||||
save_decisions(data)
|
||||
|
||||
# 保存当前状态供下次对比
|
||||
new_state = {
|
||||
"scenario": sid,
|
||||
"scenario_label": slabel,
|
||||
"branches": {},
|
||||
"updated_at": now.isoformat(),
|
||||
}
|
||||
for e in decisions:
|
||||
code = e.get("code", "")
|
||||
tree = e.get("strategy_tree", {})
|
||||
branches = tree.get("branches", [])
|
||||
best = None
|
||||
for br in sorted(branches, key=lambda b: b.get("priority", 999)):
|
||||
if check_condition(br, sid, get_price(code)):
|
||||
best = br
|
||||
break
|
||||
if best:
|
||||
new_state["branches"][code] = best.get("id", "")
|
||||
new_state["branches"][f"{code}_action"] = best.get("action", {}).get("type", "")
|
||||
try:
|
||||
with open(state_path, "w") as f:
|
||||
json.dump(new_state, f, indent=2)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 输出:变化才出声,平常静默
|
||||
if alerts:
|
||||
scenario_shift = f"【情景切换】{last_state.get('scenario','')}→{sid}" if sid != last_state.get("scenario") else ""
|
||||
header = f"【分支扫描】{now.strftime('%H:%M')}" + (f" | {scenario_shift}" if scenario_shift else "")
|
||||
msg = header + "\n" + "\n".join(alerts)
|
||||
print(msg)
|
||||
return 0
|
||||
|
||||
# 无变化→静默
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user