From 9124a7ad56c9b9f08c2b5549d4824c8a52e49a6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Fri, 3 Jul 2026 12:41:20 +0800 Subject: [PATCH] =?UTF-8?q?migrate:=20remove=20final=20JSON=20cold=20backu?= =?UTF-8?q?p=20lines,=20prune/branch=E2=86=92DB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- branch_scanner.py | 259 +++++++++++++++++----------------- prune_branches.py | 227 ++++++++++++++--------------- scripts/final_verify.py | 37 +++++ scripts/strategy_lifecycle.py | 13 -- strategy_lifecycle.py | 13 -- 5 files changed, 285 insertions(+), 264 deletions(-) create mode 100644 scripts/final_verify.py diff --git a/branch_scanner.py b/branch_scanner.py index eef80ca..f822c7f 100644 --- a/branch_scanner.py +++ b/branch_scanner.py @@ -1,126 +1,133 @@ -#!/usr/bin/env python3 -""" -branch_scanner.py — 分支自成长数据采集器(全静默) - -核心功能(三件事,全部后台静默执行): -1. 每轮扫描42只股票,评估当前情景下各分支的适用性 -2. 适用分支 → trigger_count + 1,记录 last_triggered -3. 保存当前状态到 scanner_state.json 供下次对比 - -无输出 → 静默运行。触发数据积累在 decisions.json。 -操作信号由 stale_push_wlin / price_monitor / 开盘收盘简报 另路输出。 - -数据流向(自成长):每15分钟branch_scanner积累trigger_count → - 每日prune_branches评估低效分支 → decisions.json修剪 → 分支越来越有效 -""" - -import json, sys, re -from datetime import datetime -from urllib.request import Request, urlopen - -DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" -SCANNER_STATE = "/home/hmo/web-dashboard/data/scanner_state.json" - - -def get_price(code): - # DB 优先 - try: from mofin_db import get_price_from_db; p, _ = get_price_from_db(code); return p if p else 0 - except: pass - # Fallback: 腾讯 - mkt = "sh" if code.startswith("6") or code.startswith("5") else "sz" - url = f"http://qt.gtimg.cn/q={mkt}{code}" - req = Request(url, headers={"User-Agent": "Mozilla/5.0"}) - try: - resp = urlopen(req, timeout=5).read().decode("gbk") - parts = resp.split("~") - if len(parts) > 3: - return float(parts[3]) - except Exception: - return None - - -def get_scenario(): - try: - sys.path.insert(0, "/home/hmo/MoFin") - from strategy_tree import detect_scenario - return detect_scenario() - except Exception: - return {"id": "unknown", "label": "未知", "confidence": 0} - - -def check_condition(branch, scenario_id, price): - cond = branch.get("condition", {}) - required_scenario = cond.get("scenario", "") - if required_scenario and required_scenario != scenario_id: - return False - price_cond = cond.get("price", "") - if price_cond and price: - ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_cond) - for op, val_str in ops: - val = float(val_str) - if op == "<" and not (price < val): return False - if op == ">" and not (price > val): return False - if op == "<=" and not (price <= val): return False - if op == ">=" and not (price >= val): return False - price_lower = cond.get("price_lower", "") - if price_lower and price: - ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_lower) - for op, val_str in ops: - val = float(val_str) - if op == "<" and not (price < val): return False - if op == ">" and not (price > val): return False - if op == "<=" and not (price <= val): return False - if op == ">=" and not (price >= val): return False - return True - - -def main(): - now = datetime.now() - if now.hour < 9 or now.hour > 16: - return 0 - - scenario = get_scenario() - sid = scenario.get("id", "unknown") - - with open(DECISIONS_PATH) as f: - data = json.load(f) - decisions = data.get("decisions", []) - - for entry in decisions: - code = entry.get("code", "") - tree = entry.get("strategy_tree", {}) - branches = tree.get("branches", []) - if not branches: - continue - price = get_price(code) - if not price: - continue - for br in sorted(branches, key=lambda b: b.get("priority", 999)): - if check_condition(br, sid, price): - br["trigger_count"] = br.get("trigger_count", 0) + 1 - br["last_triggered"] = now.strftime("%Y-%m-%d") - break - - with open(DECISIONS_PATH, "w") as f: - json.dump(data, f, indent=2, ensure_ascii=False) - - # 更新状态快照 - state = {"scenario": sid, "updated_at": now.isoformat(), "branches": {}} - for e in decisions: - code = e.get("code", "") - tree = e.get("strategy_tree", {}) - for br in sorted(tree.get("branches", []), key=lambda b: b.get("priority", 999)): - if check_condition(br, sid, get_price(code)): - state["branches"][code] = br.get("id", "") - break - try: - with open(SCANNER_STATE, "w") as f: - json.dump(state, f, indent=2) - except Exception: - pass - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) +#!/usr/bin/env python3 +""" +branch_scanner.py — 分支自成长数据采集器(全静默) + +核心功能(三件事,全部后台静默执行): +1. 每轮扫描42只股票,评估当前情景下各分支的适用性 +2. 适用分支 → trigger_count + 1,记录 last_triggered +3. 保存当前状态到 scanner_state.json 供下次对比 + +无输出 → 静默运行。触发数据积累在 decisions.json。 +操作信号由 stale_push_wlin / price_monitor / 开盘收盘简报 另路输出。 + +数据流向(自成长):每15分钟branch_scanner积累trigger_count → + 每日prune_branches评估低效分支 → decisions.json修剪 → 分支越来越有效 +""" + +import json, sys, re +from datetime import datetime +from urllib.request import Request, urlopen + +DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" +SCANNER_STATE = "/home/hmo/web-dashboard/data/scanner_state.json" + + +def get_price(code): + # DB 优先 + try: from mofin_db import get_price_from_db; p, _ = get_price_from_db(code); return p if p else 0 + except: pass + # Fallback: 腾讯 + mkt = "sh" if code.startswith("6") or code.startswith("5") else "sz" + url = f"http://qt.gtimg.cn/q={mkt}{code}" + req = Request(url, headers={"User-Agent": "Mozilla/5.0"}) + try: + resp = urlopen(req, timeout=5).read().decode("gbk") + parts = resp.split("~") + if len(parts) > 3: + return float(parts[3]) + except Exception: + return None + + +def get_scenario(): + try: + sys.path.insert(0, "/home/hmo/MoFin") + from strategy_tree import detect_scenario + return detect_scenario() + except Exception: + return {"id": "unknown", "label": "未知", "confidence": 0} + + +def check_condition(branch, scenario_id, price): + cond = branch.get("condition", {}) + required_scenario = cond.get("scenario", "") + if required_scenario and required_scenario != scenario_id: + return False + price_cond = cond.get("price", "") + if price_cond and price: + ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_cond) + for op, val_str in ops: + val = float(val_str) + if op == "<" and not (price < val): return False + if op == ">" and not (price > val): return False + if op == "<=" and not (price <= val): return False + if op == ">=" and not (price >= val): return False + price_lower = cond.get("price_lower", "") + if price_lower and price: + ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_lower) + for op, val_str in ops: + val = float(val_str) + if op == "<" and not (price < val): return False + if op == ">" and not (price > val): return False + if op == "<=" and not (price <= val): return False + if op == ">=" and not (price >= val): return False + return True + + +def main(): + now = datetime.now() + if now.hour < 9 or now.hour > 16: + return 0 + + scenario = get_scenario() + sid = scenario.get("id", "unknown") + + with open(DECISIONS_PATH) as f: + data = json.load(f) + decisions = data.get("decisions", []) + + for entry in decisions: + code = entry.get("code", "") + tree = entry.get("strategy_tree", {}) + branches = tree.get("branches", []) + if not branches: + continue + price = get_price(code) + if not price: + continue + for br in sorted(branches, key=lambda b: b.get("priority", 999)): + if check_condition(br, sid, price): + br["trigger_count"] = br.get("trigger_count", 0) + 1 + br["last_triggered"] = now.strftime("%Y-%m-%d") + break + + # 写入 DB(替代 decisions.json) + try: + from mofin_db import get_conn, write_holding_strategy + conn = get_conn() + for e in data.get('decisions', []): + write_holding_strategy(conn, e.get('code', ''), e.get('name', ''), e) + conn.close() + except Exception: + pass + + # 更新状态快照 + state = {"scenario": sid, "updated_at": now.isoformat(), "branches": {}} + for e in decisions: + code = e.get("code", "") + tree = e.get("strategy_tree", {}) + for br in sorted(tree.get("branches", []), key=lambda b: b.get("priority", 999)): + if check_condition(br, sid, get_price(code)): + state["branches"][code] = br.get("id", "") + break + try: + with open(SCANNER_STATE, "w") as f: + json.dump(state, f, indent=2) + except Exception: + pass + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/prune_branches.py b/prune_branches.py index cb7b6f7..c07fbe9 100644 --- a/prune_branches.py +++ b/prune_branches.py @@ -1,112 +1,115 @@ -#!/usr/bin/env python3 -""" -prune_branches.py — 每日剪枝 - -扫描所有 strategy_tree 分支,删除低效分支: -- 触发 >= 3次 且 成功率 < 30% → 标记 pruning_candidate -- 触发 >= 5次 且 成功率 < 50% → 标记 pruning_candidate -- pruning_candidate 连续7天无新触发 → 删除 - -自成长核心:低效分支被淘汰,高效分支被保留。 -数据写入 decisions.json 的 strategy_tree.branches[]。 -""" - -import json, sys, os -from datetime import datetime, timedelta - -DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" -PRUNE_LOG = "/home/hmo/MoFin/data/prune_log.json" - - -def load_decisions(): - with open(DECISIONS_PATH) as f: - return json.load(f) - - -def save_decisions(data): - with open(DECISIONS_PATH, "w") as f: - json.dump(data, f, indent=2, ensure_ascii=False) - - -def main(): - data = load_decisions() - decisions = data.get("decisions", []) - today = datetime.now().strftime("%Y-%m-%d") - pruned = [] - warnings = [] - - for entry in decisions: - code = entry.get("code", "") - tree = entry.get("strategy_tree", {}) - branches = tree.get("branches", []) - if not branches: - continue - - keep = [] - for br in branches: - triggers = br.get("trigger_count", 0) - success = br.get("success_rate") - last = br.get("last_triggered", "") - priority = br.get("priority", 99) - - # 跳过默认持有分支 - if priority == 99: - keep.append(br) - continue - - # 评估是否该剪枝 - should_prune = False - reason = "" - - if triggers >= 5 and success is not None and success < 50: - should_prune = True - reason = f"触发{triggers}次,成功率{success}% < 50%" - elif triggers >= 3 and success is not None and success < 30: - should_prune = True - reason = f"触发{triggers}次,成功率{success}% < 30%" - - if should_prune: - pruned.append({ - "code": code, - "branch_id": br.get("id", ""), - "action": br.get("action", {}).get("type", ""), - "rationale": br.get("rationale", ""), - "triggers": triggers, - "success_rate": success, - "reason": reason, - "pruned_at": today, - }) - print(f"[PRUNE] {code} {br.get('id','?')}: {reason}") - else: - keep.append(br) - - if len(keep) < len(branches): - tree["branches"] = keep - entry["strategy_tree"] = tree - - if pruned: - save_decisions(data) - # 记录剪枝日志 - log = [] - try: - with open(PRUNE_LOG) as f: - log = json.load(f) - except Exception: - pass - log.append({ - "date": today, - "pruned": pruned, - "total_before": sum(len(e.get("strategy_tree", {}).get("branches", [])) for e in decisions), - }) - os.makedirs(os.path.dirname(PRUNE_LOG), exist_ok=True) - with open(PRUNE_LOG, "w") as f: - json.dump(log, f, indent=2, ensure_ascii=False) - print(f"[PRUNE] 今日剪枝{len(pruned)}条,保留{sum(len(e.get('strategy_tree',{}).get('branches',[])) for e in decisions)}条") - else: - print("[PRUNE] 无需要剪枝的分支") - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) +#!/usr/bin/env python3 +""" +prune_branches.py — 每日剪枝 + +扫描所有 strategy_tree 分支,删除低效分支: +- 触发 >= 3次 且 成功率 < 30% → 标记 pruning_candidate +- 触发 >= 5次 且 成功率 < 50% → 标记 pruning_candidate +- pruning_candidate 连续7天无新触发 → 删除 + +自成长核心:低效分支被淘汰,高效分支被保留。 +数据写入 decisions.json 的 strategy_tree.branches[]。 +""" + +import json, sys, os +from datetime import datetime, timedelta + +DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" +PRUNE_LOG = "/home/hmo/MoFin/data/prune_log.json" + + +def load_decisions(): + from mo_data import read_decisions + return read_decisions() + + +def save_decisions(data): + from mofin_db import get_conn, write_holding_strategy + conn = get_conn() + for e in data.get('decisions', []): + write_holding_strategy(conn, e.get('code', ''), e.get('name', ''), e) + conn.close() + + +def main(): + data = load_decisions() + decisions = data.get("decisions", []) + today = datetime.now().strftime("%Y-%m-%d") + pruned = [] + warnings = [] + + for entry in decisions: + code = entry.get("code", "") + tree = entry.get("strategy_tree", {}) + branches = tree.get("branches", []) + if not branches: + continue + + keep = [] + for br in branches: + triggers = br.get("trigger_count", 0) + success = br.get("success_rate") + last = br.get("last_triggered", "") + priority = br.get("priority", 99) + + # 跳过默认持有分支 + if priority == 99: + keep.append(br) + continue + + # 评估是否该剪枝 + should_prune = False + reason = "" + + if triggers >= 5 and success is not None and success < 50: + should_prune = True + reason = f"触发{triggers}次,成功率{success}% < 50%" + elif triggers >= 3 and success is not None and success < 30: + should_prune = True + reason = f"触发{triggers}次,成功率{success}% < 30%" + + if should_prune: + pruned.append({ + "code": code, + "branch_id": br.get("id", ""), + "action": br.get("action", {}).get("type", ""), + "rationale": br.get("rationale", ""), + "triggers": triggers, + "success_rate": success, + "reason": reason, + "pruned_at": today, + }) + print(f"[PRUNE] {code} {br.get('id','?')}: {reason}") + else: + keep.append(br) + + if len(keep) < len(branches): + tree["branches"] = keep + entry["strategy_tree"] = tree + + if pruned: + save_decisions(data) + # 记录剪枝日志 + log = [] + try: + with open(PRUNE_LOG) as f: + log = json.load(f) + except Exception: + pass + log.append({ + "date": today, + "pruned": pruned, + "total_before": sum(len(e.get("strategy_tree", {}).get("branches", [])) for e in decisions), + }) + os.makedirs(os.path.dirname(PRUNE_LOG), exist_ok=True) + with open(PRUNE_LOG, "w") as f: + json.dump(log, f, indent=2, ensure_ascii=False) + print(f"[PRUNE] 今日剪枝{len(pruned)}条,保留{sum(len(e.get('strategy_tree',{}).get('branches',[])) for e in decisions)}条") + else: + print("[PRUNE] 无需要剪枝的分支") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/final_verify.py b/scripts/final_verify.py new file mode 100644 index 0000000..466d5a9 --- /dev/null +++ b/scripts/final_verify.py @@ -0,0 +1,37 @@ +"""Final verification: full data cycle after JSON→DB migration""" +import sys +sys.path.insert(0, '/home/hmo/MoFin') + +from mo_data import read_portfolio, read_decisions, read_watchlist + +pf = read_portfolio() +dec = read_decisions() +wl = read_watchlist() + +h = len(pf.get('holdings', [])) +d = len(dec.get('decisions', [])) +w = len(wl.get('stocks', [])) + +print(f"portfolio holdings: {h}") +print(f"decisions: {d}") +print(f"watchlist: {w}") + +# Check one HK stock has correct CNY cost +for holding in pf.get('holdings', []): + if holding.get('code') == '01888': + print(f"\n01888 cost={holding.get('cost')} price={holding.get('price')} curr={holding.get('currency')}") + c = holding.get('cost', 0); p = holding.get('price', 0) + if c and p: + print(f"P&L: {(p-c)/c*100:.1f}%") + +# Check decisions have currency=CNY +cnys = sum(1 for d in dec.get('decisions', []) if d.get('currency') == 'CNY') +print(f"\ndecisions with CNY: {cnys}/{d}") + +# Check no JSON fallback in mo_data (pure DB) +with open('/home/hmo/MoFin/mo_data.py') as f: + content = f.read() + pure_db = 'json.load(open' not in content +print(f"mo_data pure DB: {'YES' if pure_db else 'NO — still has JSON'}") + +print(f"\n{'ALL GOOD' if h and d and w else 'FAIL'}") diff --git a/scripts/strategy_lifecycle.py b/scripts/strategy_lifecycle.py index dcdba0b..d10dd37 100644 --- a/scripts/strategy_lifecycle.py +++ b/scripts/strategy_lifecycle.py @@ -2029,19 +2029,6 @@ def regenerate_all(stdout=True): conn.close() except Exception as e: print(f" [DB写入失败] {e}", flush=True) - # JSON 冷备 - json.dump(existing_pf, open(PORTFOLIO_PATH, "w"), ensure_ascii=False, indent=2) - json.dump(wl, open(WATCHLIST_PATH, "w"), ensure_ascii=False, indent=2) - - # 写 decisions.json - decisions_path = "/home/hmo/web-dashboard/data/decisions.json" - decisions_data = { - "decisions": decisions, # 全部保留 - "total": len(decisions), - "regenerated_at": datetime.now().strftime('%Y-%m-%d %H:%M'), - } - json.dump(decisions_data, open(decisions_path, "w"), ensure_ascii=False, indent=2) - # DB 已在上方写入(和 portfolio/watchlist 一起) # 记录策略→提示词版本关联 if HAS_PROMPT_TRACKING: diff --git a/strategy_lifecycle.py b/strategy_lifecycle.py index 9ef6eba..fb7e8a1 100644 --- a/strategy_lifecycle.py +++ b/strategy_lifecycle.py @@ -2540,19 +2540,6 @@ def regenerate_all(stdout=True): conn.close() except Exception as e: print(f" [DB写入失败] {e}", flush=True) - # JSON 冷备 - json.dump(existing_pf, open(PORTFOLIO_PATH, "w"), ensure_ascii=False, indent=2) - json.dump(wl, open(WATCHLIST_PATH, "w"), ensure_ascii=False, indent=2) - - # 写 decisions.json - decisions_path = "/home/hmo/web-dashboard/data/decisions.json" - decisions_data = { - "decisions": decisions, # 全部保留 - "total": len(decisions), - "regenerated_at": datetime.now().strftime('%Y-%m-%d %H:%M'), - } - json.dump(decisions_data, open(decisions_path, "w"), ensure_ascii=False, indent=2) - # DB 已在上方写入(和 portfolio/watchlist 一起) # 记录策略→提示词版本关联 if HAS_PROMPT_TRACKING: