Files
MoFin/branch_scanner.py
T

134 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
branch_scanner.py — 分支自成长数据采集器(全静默)
核心功能(三件事,全部后台静默执行):
1. 每轮扫描42只股票,评估当前情景下各分支的适用性
2. 适用分支 → trigger_count + 1,记录 last_triggered
3. 保存当前状态到 scanner_state.json 供下次对比
无输出 → 静默运行。触发数据积累在 decisions.json。
操作信号由 stale_push_wlin / price_monitor / 开盘收盘简报 另路输出。
数据流向(自成长):每15分钟branch_scanner积累trigger_count →
每日prune_branches评估低效分支 → decisions.json修剪 → 分支越来越有效
"""
import json, sys, re
from datetime import datetime
from urllib.request import Request, urlopen
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
SCANNER_STATE = "/home/hmo/web-dashboard/data/scanner_state.json"
def get_price(code):
# DB 优先
try: from mofin_db import get_price_from_db; p, _ = get_price_from_db(code); return p if p else 0
except: pass
# Fallback: 腾讯
mkt = "sh" if code.startswith("6") or code.startswith("5") else "sz"
url = f"http://qt.gtimg.cn/q={mkt}{code}"
req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
try:
resp = urlopen(req, timeout=5).read().decode("gbk")
parts = resp.split("~")
if len(parts) > 3:
return float(parts[3])
except Exception:
return None
def get_scenario():
try:
sys.path.insert(0, "/home/hmo/MoFin")
from strategy_tree import detect_scenario
return detect_scenario()
except Exception:
return {"id": "unknown", "label": "未知", "confidence": 0}
def check_condition(branch, scenario_id, price):
cond = branch.get("condition", {})
required_scenario = cond.get("scenario", "")
if required_scenario and required_scenario != scenario_id:
return False
price_cond = cond.get("price", "")
if price_cond and price:
ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_cond)
for op, val_str in ops:
val = float(val_str)
if op == "<" and not (price < val): return False
if op == ">" and not (price > val): return False
if op == "<=" and not (price <= val): return False
if op == ">=" and not (price >= val): return False
price_lower = cond.get("price_lower", "")
if price_lower and price:
ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_lower)
for op, val_str in ops:
val = float(val_str)
if op == "<" and not (price < val): return False
if op == ">" and not (price > val): return False
if op == "<=" and not (price <= val): return False
if op == ">=" and not (price >= val): return False
return True
def main():
now = datetime.now()
if now.hour < 9 or now.hour > 16:
return 0
scenario = get_scenario()
sid = scenario.get("id", "unknown")
with open(DECISIONS_PATH) as f:
data = json.load(f)
decisions = data.get("decisions", [])
for entry in decisions:
code = entry.get("code", "")
tree = entry.get("strategy_tree", {})
branches = tree.get("branches", [])
if not branches:
continue
price = get_price(code)
if not price:
continue
for br in sorted(branches, key=lambda b: b.get("priority", 999)):
if check_condition(br, sid, price):
br["trigger_count"] = br.get("trigger_count", 0) + 1
br["last_triggered"] = now.strftime("%Y-%m-%d")
break
# 写入 DB(替代 decisions.json
try:
from mofin_db import get_conn, write_holding_strategy
conn = get_conn()
for e in data.get('decisions', []):
write_holding_strategy(conn, e.get('code', ''), e.get('name', ''), e)
conn.close()
except Exception:
pass
# 更新状态快照
state = {"scenario": sid, "updated_at": now.isoformat(), "branches": {}}
for e in decisions:
code = e.get("code", "")
tree = e.get("strategy_tree", {})
for br in sorted(tree.get("branches", []), key=lambda b: b.get("priority", 999)):
if check_condition(br, sid, get_price(code)):
state["branches"][code] = br.get("id", "")
break
try:
with open(SCANNER_STATE, "w") as f:
json.dump(state, f, indent=2)
except Exception:
pass
return 0
if __name__ == "__main__":
sys.exit(main())