migrate: remove final JSON cold backup lines, prune/branch→DB

This commit is contained in:
知微
2026-07-03 12:41:20 +08:00
parent b3bedc8024
commit 9124a7ad56
5 changed files with 285 additions and 264 deletions
+133 -126
View File
@@ -1,126 +1,133 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
branch_scanner.py — 分支自成长数据采集器(全静默) branch_scanner.py — 分支自成长数据采集器(全静默)
核心功能(三件事,全部后台静默执行): 核心功能(三件事,全部后台静默执行):
1. 每轮扫描42只股票,评估当前情景下各分支的适用性 1. 每轮扫描42只股票,评估当前情景下各分支的适用性
2. 适用分支 → trigger_count + 1,记录 last_triggered 2. 适用分支 → trigger_count + 1,记录 last_triggered
3. 保存当前状态到 scanner_state.json 供下次对比 3. 保存当前状态到 scanner_state.json 供下次对比
无输出 → 静默运行。触发数据积累在 decisions.json。 无输出 → 静默运行。触发数据积累在 decisions.json。
操作信号由 stale_push_wlin / price_monitor / 开盘收盘简报 另路输出。 操作信号由 stale_push_wlin / price_monitor / 开盘收盘简报 另路输出。
数据流向(自成长):每15分钟branch_scanner积累trigger_count → 数据流向(自成长):每15分钟branch_scanner积累trigger_count →
每日prune_branches评估低效分支 → decisions.json修剪 → 分支越来越有效 每日prune_branches评估低效分支 → decisions.json修剪 → 分支越来越有效
""" """
import json, sys, re import json, sys, re
from datetime import datetime from datetime import datetime
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
SCANNER_STATE = "/home/hmo/web-dashboard/data/scanner_state.json" SCANNER_STATE = "/home/hmo/web-dashboard/data/scanner_state.json"
def get_price(code): def get_price(code):
# DB 优先 # DB 优先
try: from mofin_db import get_price_from_db; p, _ = get_price_from_db(code); return p if p else 0 try: from mofin_db import get_price_from_db; p, _ = get_price_from_db(code); return p if p else 0
except: pass except: pass
# Fallback: 腾讯 # Fallback: 腾讯
mkt = "sh" if code.startswith("6") or code.startswith("5") else "sz" mkt = "sh" if code.startswith("6") or code.startswith("5") else "sz"
url = f"http://qt.gtimg.cn/q={mkt}{code}" url = f"http://qt.gtimg.cn/q={mkt}{code}"
req = Request(url, headers={"User-Agent": "Mozilla/5.0"}) req = Request(url, headers={"User-Agent": "Mozilla/5.0"})
try: try:
resp = urlopen(req, timeout=5).read().decode("gbk") resp = urlopen(req, timeout=5).read().decode("gbk")
parts = resp.split("~") parts = resp.split("~")
if len(parts) > 3: if len(parts) > 3:
return float(parts[3]) return float(parts[3])
except Exception: except Exception:
return None return None
def get_scenario(): def get_scenario():
try: try:
sys.path.insert(0, "/home/hmo/MoFin") sys.path.insert(0, "/home/hmo/MoFin")
from strategy_tree import detect_scenario from strategy_tree import detect_scenario
return detect_scenario() return detect_scenario()
except Exception: except Exception:
return {"id": "unknown", "label": "未知", "confidence": 0} return {"id": "unknown", "label": "未知", "confidence": 0}
def check_condition(branch, scenario_id, price): def check_condition(branch, scenario_id, price):
cond = branch.get("condition", {}) cond = branch.get("condition", {})
required_scenario = cond.get("scenario", "") required_scenario = cond.get("scenario", "")
if required_scenario and required_scenario != scenario_id: if required_scenario and required_scenario != scenario_id:
return False return False
price_cond = cond.get("price", "") price_cond = cond.get("price", "")
if price_cond and price: if price_cond and price:
ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_cond) ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_cond)
for op, val_str in ops: for op, val_str in ops:
val = float(val_str) val = float(val_str)
if op == "<" and not (price < val): return False if op == "<" and not (price < val): return False
if op == ">" and not (price > val): return False if op == ">" and not (price > val): return False
if op == "<=" and not (price <= val): return False if op == "<=" and not (price <= val): return False
if op == ">=" and not (price >= val): return False if op == ">=" and not (price >= val): return False
price_lower = cond.get("price_lower", "") price_lower = cond.get("price_lower", "")
if price_lower and price: if price_lower and price:
ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_lower) ops = re.findall(r"([<>=!]+)\s*([\d.]+)", price_lower)
for op, val_str in ops: for op, val_str in ops:
val = float(val_str) val = float(val_str)
if op == "<" and not (price < val): return False if op == "<" and not (price < val): return False
if op == ">" and not (price > val): return False if op == ">" and not (price > val): return False
if op == "<=" and not (price <= val): return False if op == "<=" and not (price <= val): return False
if op == ">=" and not (price >= val): return False if op == ">=" and not (price >= val): return False
return True return True
def main(): def main():
now = datetime.now() now = datetime.now()
if now.hour < 9 or now.hour > 16: if now.hour < 9 or now.hour > 16:
return 0 return 0
scenario = get_scenario() scenario = get_scenario()
sid = scenario.get("id", "unknown") sid = scenario.get("id", "unknown")
with open(DECISIONS_PATH) as f: with open(DECISIONS_PATH) as f:
data = json.load(f) data = json.load(f)
decisions = data.get("decisions", []) decisions = data.get("decisions", [])
for entry in decisions: for entry in decisions:
code = entry.get("code", "") code = entry.get("code", "")
tree = entry.get("strategy_tree", {}) tree = entry.get("strategy_tree", {})
branches = tree.get("branches", []) branches = tree.get("branches", [])
if not branches: if not branches:
continue continue
price = get_price(code) price = get_price(code)
if not price: if not price:
continue continue
for br in sorted(branches, key=lambda b: b.get("priority", 999)): for br in sorted(branches, key=lambda b: b.get("priority", 999)):
if check_condition(br, sid, price): if check_condition(br, sid, price):
br["trigger_count"] = br.get("trigger_count", 0) + 1 br["trigger_count"] = br.get("trigger_count", 0) + 1
br["last_triggered"] = now.strftime("%Y-%m-%d") br["last_triggered"] = now.strftime("%Y-%m-%d")
break break
with open(DECISIONS_PATH, "w") as f: # 写入 DB(替代 decisions.json
json.dump(data, f, indent=2, ensure_ascii=False) try:
from mofin_db import get_conn, write_holding_strategy
# 更新状态快照 conn = get_conn()
state = {"scenario": sid, "updated_at": now.isoformat(), "branches": {}} for e in data.get('decisions', []):
for e in decisions: write_holding_strategy(conn, e.get('code', ''), e.get('name', ''), e)
code = e.get("code", "") conn.close()
tree = e.get("strategy_tree", {}) except Exception:
for br in sorted(tree.get("branches", []), key=lambda b: b.get("priority", 999)): pass
if check_condition(br, sid, get_price(code)):
state["branches"][code] = br.get("id", "") # 更新状态快照
break state = {"scenario": sid, "updated_at": now.isoformat(), "branches": {}}
try: for e in decisions:
with open(SCANNER_STATE, "w") as f: code = e.get("code", "")
json.dump(state, f, indent=2) tree = e.get("strategy_tree", {})
except Exception: for br in sorted(tree.get("branches", []), key=lambda b: b.get("priority", 999)):
pass if check_condition(br, sid, get_price(code)):
state["branches"][code] = br.get("id", "")
return 0 break
try:
with open(SCANNER_STATE, "w") as f:
if __name__ == "__main__": json.dump(state, f, indent=2)
sys.exit(main()) except Exception:
pass
return 0
if __name__ == "__main__":
sys.exit(main())
+115 -112
View File
@@ -1,112 +1,115 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
prune_branches.py — 每日剪枝 prune_branches.py — 每日剪枝
扫描所有 strategy_tree 分支,删除低效分支: 扫描所有 strategy_tree 分支,删除低效分支:
- 触发 >= 3次 且 成功率 < 30% → 标记 pruning_candidate - 触发 >= 3次 且 成功率 < 30% → 标记 pruning_candidate
- 触发 >= 5次 且 成功率 < 50% → 标记 pruning_candidate - 触发 >= 5次 且 成功率 < 50% → 标记 pruning_candidate
- pruning_candidate 连续7天无新触发 → 删除 - pruning_candidate 连续7天无新触发 → 删除
自成长核心:低效分支被淘汰,高效分支被保留。 自成长核心:低效分支被淘汰,高效分支被保留。
数据写入 decisions.json 的 strategy_tree.branches[]。 数据写入 decisions.json 的 strategy_tree.branches[]。
""" """
import json, sys, os import json, sys, os
from datetime import datetime, timedelta from datetime import datetime, timedelta
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
PRUNE_LOG = "/home/hmo/MoFin/data/prune_log.json" PRUNE_LOG = "/home/hmo/MoFin/data/prune_log.json"
def load_decisions(): def load_decisions():
with open(DECISIONS_PATH) as f: from mo_data import read_decisions
return json.load(f) return read_decisions()
def save_decisions(data): def save_decisions(data):
with open(DECISIONS_PATH, "w") as f: from mofin_db import get_conn, write_holding_strategy
json.dump(data, f, indent=2, ensure_ascii=False) conn = get_conn()
for e in data.get('decisions', []):
write_holding_strategy(conn, e.get('code', ''), e.get('name', ''), e)
def main(): conn.close()
data = load_decisions()
decisions = data.get("decisions", [])
today = datetime.now().strftime("%Y-%m-%d") def main():
pruned = [] data = load_decisions()
warnings = [] decisions = data.get("decisions", [])
today = datetime.now().strftime("%Y-%m-%d")
for entry in decisions: pruned = []
code = entry.get("code", "") warnings = []
tree = entry.get("strategy_tree", {})
branches = tree.get("branches", []) for entry in decisions:
if not branches: code = entry.get("code", "")
continue tree = entry.get("strategy_tree", {})
branches = tree.get("branches", [])
keep = [] if not branches:
for br in branches: continue
triggers = br.get("trigger_count", 0)
success = br.get("success_rate") keep = []
last = br.get("last_triggered", "") for br in branches:
priority = br.get("priority", 99) triggers = br.get("trigger_count", 0)
success = br.get("success_rate")
# 跳过默认持有分支 last = br.get("last_triggered", "")
if priority == 99: priority = br.get("priority", 99)
keep.append(br)
continue # 跳过默认持有分支
if priority == 99:
# 评估是否该剪枝 keep.append(br)
should_prune = False continue
reason = ""
# 评估是否该剪枝
if triggers >= 5 and success is not None and success < 50: should_prune = False
should_prune = True reason = ""
reason = f"触发{triggers}次,成功率{success}% < 50%"
elif triggers >= 3 and success is not None and success < 30: if triggers >= 5 and success is not None and success < 50:
should_prune = True should_prune = True
reason = f"触发{triggers}次,成功率{success}% < 30%" reason = f"触发{triggers}次,成功率{success}% < 50%"
elif triggers >= 3 and success is not None and success < 30:
if should_prune: should_prune = True
pruned.append({ reason = f"触发{triggers}次,成功率{success}% < 30%"
"code": code,
"branch_id": br.get("id", ""), if should_prune:
"action": br.get("action", {}).get("type", ""), pruned.append({
"rationale": br.get("rationale", ""), "code": code,
"triggers": triggers, "branch_id": br.get("id", ""),
"success_rate": success, "action": br.get("action", {}).get("type", ""),
"reason": reason, "rationale": br.get("rationale", ""),
"pruned_at": today, "triggers": triggers,
}) "success_rate": success,
print(f"[PRUNE] {code} {br.get('id','?')}: {reason}") "reason": reason,
else: "pruned_at": today,
keep.append(br) })
print(f"[PRUNE] {code} {br.get('id','?')}: {reason}")
if len(keep) < len(branches): else:
tree["branches"] = keep keep.append(br)
entry["strategy_tree"] = tree
if len(keep) < len(branches):
if pruned: tree["branches"] = keep
save_decisions(data) entry["strategy_tree"] = tree
# 记录剪枝日志
log = [] if pruned:
try: save_decisions(data)
with open(PRUNE_LOG) as f: # 记录剪枝日志
log = json.load(f) log = []
except Exception: try:
pass with open(PRUNE_LOG) as f:
log.append({ log = json.load(f)
"date": today, except Exception:
"pruned": pruned, pass
"total_before": sum(len(e.get("strategy_tree", {}).get("branches", [])) for e in decisions), log.append({
}) "date": today,
os.makedirs(os.path.dirname(PRUNE_LOG), exist_ok=True) "pruned": pruned,
with open(PRUNE_LOG, "w") as f: "total_before": sum(len(e.get("strategy_tree", {}).get("branches", [])) for e in decisions),
json.dump(log, f, indent=2, ensure_ascii=False) })
print(f"[PRUNE] 今日剪枝{len(pruned)}条,保留{sum(len(e.get('strategy_tree',{}).get('branches',[])) for e in decisions)}") os.makedirs(os.path.dirname(PRUNE_LOG), exist_ok=True)
else: with open(PRUNE_LOG, "w") as f:
print("[PRUNE] 无需要剪枝的分支") json.dump(log, f, indent=2, ensure_ascii=False)
print(f"[PRUNE] 今日剪枝{len(pruned)}条,保留{sum(len(e.get('strategy_tree',{}).get('branches',[])) for e in decisions)}")
return 0 else:
print("[PRUNE] 无需要剪枝的分支")
if __name__ == "__main__": return 0
sys.exit(main())
if __name__ == "__main__":
sys.exit(main())
+37
View File
@@ -0,0 +1,37 @@
"""Final verification: full data cycle after JSON→DB migration"""
import sys
sys.path.insert(0, '/home/hmo/MoFin')
from mo_data import read_portfolio, read_decisions, read_watchlist
pf = read_portfolio()
dec = read_decisions()
wl = read_watchlist()
h = len(pf.get('holdings', []))
d = len(dec.get('decisions', []))
w = len(wl.get('stocks', []))
print(f"portfolio holdings: {h}")
print(f"decisions: {d}")
print(f"watchlist: {w}")
# Check one HK stock has correct CNY cost
for holding in pf.get('holdings', []):
if holding.get('code') == '01888':
print(f"\n01888 cost={holding.get('cost')} price={holding.get('price')} curr={holding.get('currency')}")
c = holding.get('cost', 0); p = holding.get('price', 0)
if c and p:
print(f"P&L: {(p-c)/c*100:.1f}%")
# Check decisions have currency=CNY
cnys = sum(1 for d in dec.get('decisions', []) if d.get('currency') == 'CNY')
print(f"\ndecisions with CNY: {cnys}/{d}")
# Check no JSON fallback in mo_data (pure DB)
with open('/home/hmo/MoFin/mo_data.py') as f:
content = f.read()
pure_db = 'json.load(open' not in content
print(f"mo_data pure DB: {'YES' if pure_db else 'NO — still has JSON'}")
print(f"\n{'ALL GOOD' if h and d and w else 'FAIL'}")
-13
View File
@@ -2029,19 +2029,6 @@ def regenerate_all(stdout=True):
conn.close() conn.close()
except Exception as e: except Exception as e:
print(f" [DB写入失败] {e}", flush=True) print(f" [DB写入失败] {e}", flush=True)
# JSON 冷备
json.dump(existing_pf, open(PORTFOLIO_PATH, "w"), ensure_ascii=False, indent=2)
json.dump(wl, open(WATCHLIST_PATH, "w"), ensure_ascii=False, indent=2)
# 写 decisions.json
decisions_path = "/home/hmo/web-dashboard/data/decisions.json"
decisions_data = {
"decisions": decisions, # 全部保留
"total": len(decisions),
"regenerated_at": datetime.now().strftime('%Y-%m-%d %H:%M'),
}
json.dump(decisions_data, open(decisions_path, "w"), ensure_ascii=False, indent=2)
# DB 已在上方写入(和 portfolio/watchlist 一起)
# 记录策略→提示词版本关联 # 记录策略→提示词版本关联
if HAS_PROMPT_TRACKING: if HAS_PROMPT_TRACKING:
-13
View File
@@ -2540,19 +2540,6 @@ def regenerate_all(stdout=True):
conn.close() conn.close()
except Exception as e: except Exception as e:
print(f" [DB写入失败] {e}", flush=True) print(f" [DB写入失败] {e}", flush=True)
# JSON 冷备
json.dump(existing_pf, open(PORTFOLIO_PATH, "w"), ensure_ascii=False, indent=2)
json.dump(wl, open(WATCHLIST_PATH, "w"), ensure_ascii=False, indent=2)
# 写 decisions.json
decisions_path = "/home/hmo/web-dashboard/data/decisions.json"
decisions_data = {
"decisions": decisions, # 全部保留
"total": len(decisions),
"regenerated_at": datetime.now().strftime('%Y-%m-%d %H:%M'),
}
json.dump(decisions_data, open(decisions_path, "w"), ensure_ascii=False, indent=2)
# DB 已在上方写入(和 portfolio/watchlist 一起)
# 记录策略→提示词版本关联 # 记录策略→提示词版本关联
if HAS_PROMPT_TRACKING: if HAS_PROMPT_TRACKING: