自成长:分支评估+剪枝+报告接入
补齐「顺势而为 环境预判 策略多分枝」体系中缺失的组件: branch_evaluator.py(新增)— 每30min评估所有策略树分支 1. detect_scenario() 获取当前宏观情景 2. 对42只股票评估哪个分支当前适用 3. 适用分支 trigger_count+1, last_triggered=now 4. 触发>=3次且成功率<30%→标记pruning_candidate 5. 无决策树的股票自动初始化(init_default_branches) prune_branches.py(新增)— 每日16:30收盘后剪枝 阈值:触发>=3次且成功率<30%→裁掉并归档到pruned_branches Dad说「每周太低频」→改为每日 stale_push_wlin.py(修改)— 报告每只股增加分支行: 【弱势震荡→buy_dip】价格回调到支撑区,弱势市场低吸 cron更新: 分支扫描(b809fcabfa5b) → 指向branch_evaluator.py, 每30min 剪枝(a3697c108c7b) → 指向prune_branches.py, 每日16:30 自成长核心:branch_evaluator 运行时自动发现并初始化无策略树股票, 252个分支已生成, trigger_count已开始累积, 反馈循环上线
This commit is contained in:
@@ -0,0 +1,148 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
branch_evaluator.py — 分支自成长引擎
|
||||
|
||||
每30分钟评估所有策略树的当前适用性:
|
||||
1. 读取 decisions.json 中所有 strategy_tree.branches
|
||||
2. 获取当前宏观情景(detect_scenario)
|
||||
3. 对每只股票获取实时价,评估哪些分支条件命中
|
||||
4. 命中的分支 → trigger_count+1, last_triggered=now
|
||||
5. 后续跟进:成功/失败取决于该分支被选中后5日盈亏(由price_monitor回填success_rate)
|
||||
6. 触发≥3次且成功率<30% → 标记 pruning_candidate
|
||||
7. 写回 decisions.json
|
||||
|
||||
设计为 no_agent cron 脚本:非空输出→推送到XMPP,空输出→静默
|
||||
"""
|
||||
|
||||
import json, sys, os, re
|
||||
from datetime import datetime, date
|
||||
|
||||
# 路径
|
||||
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
|
||||
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
|
||||
|
||||
# 引入 strategy_tree 模块
|
||||
sys.path.insert(0, "/home/hmo/MoFin")
|
||||
try:
|
||||
import strategy_tree as st
|
||||
except ImportError:
|
||||
# 如果 MoFin 路径下找不到,尝试直接 exec
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location("st", "/home/hmo/MoFin/strategy_tree.py")
|
||||
st = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(st)
|
||||
|
||||
|
||||
def get_live_prices():
|
||||
"""从 portfolio.json 读取实时价格"""
|
||||
prices = {}
|
||||
try:
|
||||
with open(PORTFOLIO_PATH) as f:
|
||||
pf = json.load(f)
|
||||
for h in pf.get("holdings", []):
|
||||
code = str(h.get("code", ""))
|
||||
prices[code] = h.get("price", 0)
|
||||
except Exception:
|
||||
pass
|
||||
return prices
|
||||
|
||||
|
||||
def evaluate_all():
|
||||
"""评估所有已触发策略树的分支"""
|
||||
try:
|
||||
with open(DECISIONS_PATH) as f:
|
||||
data = json.load(f)
|
||||
except Exception as e:
|
||||
print(f"[错误] 读 decisions.json 失败: {e}", file=sys.stderr)
|
||||
return
|
||||
|
||||
# 当前情景
|
||||
scenario = st.detect_scenario()
|
||||
scenario_id = scenario.get("id", "")
|
||||
scenario_label = scenario.get("label", "未知")
|
||||
|
||||
prices = get_live_prices()
|
||||
decisions = data.get("decisions", [])
|
||||
total_triggered = 0
|
||||
auto_init_count = 0
|
||||
pruning_flags = []
|
||||
|
||||
for entry in decisions:
|
||||
code = entry.get("code", "")
|
||||
tree = entry.get("strategy_tree")
|
||||
if not tree:
|
||||
# 自初始化:无决策树的股票自动生成默认分支
|
||||
try:
|
||||
branches = st.init_default_branches(
|
||||
code=code,
|
||||
name=entry.get("name", ""),
|
||||
entry_low=entry.get("entry_low", 0),
|
||||
entry_high=entry.get("entry_high", 0),
|
||||
stop_loss=entry.get("stop_loss", 0),
|
||||
take_profit=entry.get("take_profit", 0),
|
||||
)
|
||||
tree = {"branches": branches, "initialized_at": datetime.now().isoformat()}
|
||||
entry["strategy_tree"] = tree
|
||||
auto_init_count += 1
|
||||
except Exception:
|
||||
continue
|
||||
branches = tree.get("branches", [])
|
||||
if not branches:
|
||||
continue
|
||||
|
||||
price = prices.get(code, 0) or entry.get("price", 0)
|
||||
shares = entry.get("shares", 0)
|
||||
cost = entry.get("cost", 0)
|
||||
|
||||
# 评估所有分支
|
||||
results = st.evaluate_branches(code, scenario_id, price, shares, cost)
|
||||
now_ts = datetime.now().isoformat()
|
||||
|
||||
updated = False
|
||||
for result in results:
|
||||
br_id = result.get("branch_id", "")
|
||||
# 找到对应分支更新trigger_count
|
||||
for br in branches:
|
||||
if br.get("id") == br_id:
|
||||
if result.get("applicable"):
|
||||
# 分支命中 → 增加触发计数
|
||||
br["trigger_count"] = br.get("trigger_count", 0) + 1
|
||||
br["last_triggered"] = now_ts
|
||||
total_triggered += 1
|
||||
updated = True
|
||||
# 检查是否需要标记剪枝候补
|
||||
tc = br["trigger_count"]
|
||||
sr = br.get("success_rate")
|
||||
if tc >= 3 and sr is not None and sr < 30:
|
||||
br["pruning_candidate"] = True
|
||||
pruning_flags.append(f"{code}/{br_id}(触发{tc}次/成功率{sr}%)")
|
||||
break
|
||||
|
||||
if updated:
|
||||
# 回写 strategy_tree
|
||||
entry["strategy_tree"] = tree
|
||||
# 标记评估时间
|
||||
tree["last_evaluated"] = now_ts
|
||||
|
||||
# 写回文件
|
||||
with open(DECISIONS_PATH, "w") as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
# 输出摘要(空 = 静默)
|
||||
lines = []
|
||||
init_note = f" | 自动初始化{auto_init_count}只" if auto_init_count else ""
|
||||
lines.append(f"【分支评估】情景{scenario_label}({scenario_id}) | 命中{total_triggered}次{init_note}")
|
||||
if pruning_flags:
|
||||
lines.append(f"需剪枝{len(pruning_flags)}个分支:")
|
||||
for f in pruning_flags:
|
||||
lines.append(f" ⚠ {f}")
|
||||
else:
|
||||
lines.append("无需剪枝的分支")
|
||||
|
||||
out = "\n".join(lines)
|
||||
print(out)
|
||||
return out
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
evaluate_all()
|
||||
+77
-15
@@ -1,21 +1,83 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
prune_branches.py — 每周六凌晨6点执行
|
||||
prune_branches.py — 分支剪枝引擎(每日)
|
||||
|
||||
低效分支剪枝:
|
||||
- 触发≥5次且成功率<30% → 剪枝归档
|
||||
- 输出报告(stdout → cron delivery)
|
||||
裁掉低效分支:trigger_count ≥ 3 且 success_rate < 30%
|
||||
被剪的分支从 strategy_tree.branches 移除,归档到 strategy_tree.pruned_branches
|
||||
|
||||
Dad说"每周"太低频 → 改为每日16:30(收盘后)
|
||||
"""
|
||||
import sys, json
|
||||
sys.path.insert(0, '/home/hmo/MoFin')
|
||||
from strategy_tree import prune_low_performance_branches
|
||||
|
||||
result = prune_low_performance_branches(min_triggers=5, min_success_rate=0.3)
|
||||
import json, sys
|
||||
from datetime import datetime
|
||||
|
||||
if result:
|
||||
print(f"[分支剪枝] 已剪枝 {len(result)} 条低效分支:")
|
||||
for item in result:
|
||||
print(f" ✂ {item}")
|
||||
print("[分支剪枝] 剪枝完成")
|
||||
else:
|
||||
print("[分支剪枝] 无低效分支需剪枝")
|
||||
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
|
||||
|
||||
# 剪枝阈值:触发≥3次且成功率<30%
|
||||
TRIGGER_MIN = 3
|
||||
SUCCESS_MAX = 30
|
||||
|
||||
|
||||
def prune():
|
||||
try:
|
||||
with open(DECISIONS_PATH) as f:
|
||||
data = json.load(f)
|
||||
except Exception as e:
|
||||
print(f"[错误] 读 decisions.json 失败: {e}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
decisions = data.get("decisions", [])
|
||||
total_pruned = 0
|
||||
results = []
|
||||
|
||||
for entry in decisions:
|
||||
code = entry.get("code", "")
|
||||
tree = entry.get("strategy_tree")
|
||||
if not tree:
|
||||
continue
|
||||
branches = tree.get("branches", [])
|
||||
if not branches:
|
||||
continue
|
||||
|
||||
pruned_branches = tree.get("pruned_branches", [])
|
||||
kept = []
|
||||
for br in branches:
|
||||
tc = br.get("trigger_count", 0)
|
||||
sr = br.get("success_rate")
|
||||
if tc >= TRIGGER_MIN and sr is not None and sr < SUCCESS_MAX:
|
||||
# 归档
|
||||
br["pruned_at"] = datetime.now().isoformat()
|
||||
pruned_branches.append(br)
|
||||
total_pruned += 1
|
||||
results.append({
|
||||
"code": code,
|
||||
"branch_id": br.get("id", "?"),
|
||||
"trigger_count": tc,
|
||||
"success_rate": sr,
|
||||
"rationale": br.get("rationale", ""),
|
||||
})
|
||||
else:
|
||||
kept.append(br)
|
||||
|
||||
tree["branches"] = kept
|
||||
tree["pruned_branches"] = pruned_branches
|
||||
tree["last_pruned"] = datetime.now().isoformat() if total_pruned > 0 else tree.get("last_pruned", "")
|
||||
|
||||
with open(DECISIONS_PATH, "w") as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
if total_pruned > 0:
|
||||
lines = [f"【分支剪枝】本次裁掉{total_pruned}个低效分支"]
|
||||
for r in results:
|
||||
lines.append(f" ✂ {r['code']}/{r['branch_id']}(触发{r['trigger_count']}次/成功率{r['success_rate']}%)")
|
||||
lines.append(f" 理由: {r['rationale']}")
|
||||
print("\n".join(lines))
|
||||
return 0
|
||||
else:
|
||||
# 静默
|
||||
print(f"【分支剪枝】无需剪枝(所有分支均未达到触发{TRIGGER_MIN}次且成功率<{SUCCESS_MAX}%的阈值)")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(prune())
|
||||
|
||||
@@ -299,6 +299,21 @@ def main():
|
||||
except Exception:
|
||||
total_assets = available_cash * 5 # fallback
|
||||
|
||||
# 加载策略树模块(获取当前情景+分支评估)
|
||||
st = None
|
||||
scenario_id = ""
|
||||
scenario_label = ""
|
||||
try:
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location("st_module", "/home/hmo/MoFin/strategy_tree.py")
|
||||
st = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(st)
|
||||
sc = st.detect_scenario()
|
||||
scenario_id = sc.get("id", "")
|
||||
scenario_label = sc.get("label", "")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def calc_position(lot_cost, rr, market_factor, cat, code=""):
|
||||
# 理论推荐仓位(% of 总资产) — 仅基于RR+市场+品种,不受现金限制
|
||||
if rr >= 5:
|
||||
@@ -419,6 +434,26 @@ def main():
|
||||
f" 仓位:理论{theo_pct}%×总资产 | 建议{actual_pct}%({details})"
|
||||
)
|
||||
|
||||
# 读分支评估
|
||||
branch_line = ""
|
||||
if st and scenario_id:
|
||||
try:
|
||||
results = st.evaluate_branches(code, scenario_id, price, d.get("shares", 0), d.get("cost", 0))
|
||||
applicable = [r for r in results if r.get("applicable")]
|
||||
if applicable:
|
||||
# 取优先级最高的适用分支
|
||||
best = min(applicable, key=lambda r: r.get("priority", 999))
|
||||
action = best.get("action_type", "hold")
|
||||
rationale = best.get("rationale", "")
|
||||
branch_line = f" 【{scenario_label}→{action}】{rationale}"
|
||||
else:
|
||||
branch_line = f" 【{scenario_label}→持有】无匹配分支"
|
||||
except Exception:
|
||||
branch_line = ""
|
||||
if branch_line:
|
||||
# 追加到上一个元素
|
||||
lines[-1] += f"\n{branch_line}"
|
||||
|
||||
lines.insert(0, f"【知微】自选买入提醒 {now} | 总资产{total_assets:,.0f}元")
|
||||
out = "\n".join(lines)
|
||||
print(out)
|
||||
|
||||
Reference in New Issue
Block a user