#!/usr/bin/env python3 """strategy_evaluator.py — 策略双维度评估引擎 两阶段评估模型: 阶段一(策略制定→价格达标): 理论:策略设定的买入区/止损/止盈 → 股价是否达到过这些价位 → 理论盈亏 实际:老爸是否按策略执行 → 实际买入/卖出价格 → 实际盈亏 阶段二(价格回落后→新止损验证): 理论:价格未按预期走 → 给出新止损 → 股价是否继续下跌验证止损正确性 实际:老爸实际卖出价格 → 对比新止损 → 验证止损有效性 输出:写入 decisions.json 的 evaluation 字段 + accuracy_stats.json """ import json import urllib.request import os import sys import re from datetime import datetime, timedelta from pathlib import Path DATA_DIR = Path(__file__).parent / "data" DECISIONS_PATH = DATA_DIR / "decisions.json" PORTFOLIO_PATH = DATA_DIR / "portfolio.json" ACCURACY_PATH = DATA_DIR / "accuracy_stats.json" UA = "Mozilla/5.0" def load_json(path, default=None): try: with open(path, encoding="utf-8") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} if default is None else default def save_json(path, data): Path(path).parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def fetch_prices(codes): """批量拉腾讯行情""" if not codes: return {} symbols = [] code_map = {} for c in codes: sym = f"hk{c}" if len(c) == 5 else f"sh{c}" if c.startswith(("5", "6", "9")) else f"sz{c}" symbols.append(sym) code_map[sym] = c url = f"http://qt.gtimg.cn/q={','.join(symbols)}" try: req = urllib.request.Request(url, headers={"User-Agent": UA}) resp = urllib.request.urlopen(req, timeout=10) text = resp.read().decode("gbk") except Exception as e: print(f"行情拉取失败: {e}", file=sys.stderr) return {} prices = {} for line in text.strip().split("\n"): line = line.strip() if not line or "=" not in line: continue raw = line.split("=", 1)[1].strip().strip('"').strip(";") fields = raw.split("~") if len(fields) < 33: continue sym = line.split("=", 1)[0].strip().lstrip("v_") orig = code_map.get(sym) if not orig: continue prices[orig] = { "name": fields[1], "price": float(fields[3]) if fields[3] else 0, "prev_close": float(fields[4]) if fields[4] else 0, "change_pct": float(fields[32]) if fields[32] else 0, "high": float(fields[33]) if fields[33] else 0, "low": float(fields[34]) if fields[34] else 0, } return prices def parse_tech_snapshot(decision): """ 从 decision 的 tech_snapshot 中提取技术面参数。 返回 dict: { 'pattern', 'volume', '强撑', '弱撑', '弱压', '强压' } """ trig = decision.get("trigger", {}) raw = trig.get("tech_snapshot", decision.get("tech_snapshot", "")) result = {} if not raw: return result # 形态:XXX/bullish 或 形态:XXX/neutral m = re.search(r'形态:([^\s]+)', raw) if m: result["pattern"] = m.group(1) # 量价:XXX m = re.search(r'量价:([^\s]+)', raw) if m: result["volume"] = m.group(1) # 强撑:N / 弱撑:N / 弱压:N / 强压:N for key in ["强撑", "弱撑", "弱压", "强压"]: m = re.search(rf'{key}:([\d.]+)', raw) if m: result[key] = float(m.group(1)) return result def build_strategy_rationale(sl_p, tp_p, tech, decision, actual_pnl_pct): """ 基于 tech_snapshot 的支撑/压力位,解释止损和止盈的设定依据。 返回 dict: - sl_basis: 止损设定依据(对应哪个支撑位) - tp_basis: 止盈设定依据(对应哪个压力位) - analysis: 综合技术面文字分析 - tech_used: 实际使用的 tech 参数 """ code = decision.get("code", "") name = decision.get("name", code) rationale = { "sl_basis": "未设定", "tp_basis": "未设定", "analysis": "", "tech_used": tech, } # 解析支撑/压力 强撑 = tech.get("强撑") 弱撑 = tech.get("弱撑") 弱压 = tech.get("弱压") 强压 = tech.get("强压") 形态 = tech.get("pattern", "无记录") 量价 = tech.get("volume", "数据不足") # ---- 止损依据 ---- if sl_p and 强撑 and 弱撑: # 止损在强撑附近 (±1%) if abs(sl_p - 强撑) / 强撑 < 0.02: rationale["sl_basis"] = f"技术面强支撑{强撑}(距{abs(sl_p-强撑)/强撑*100:.1f}%)" # 止损在弱撑附近 elif abs(sl_p - 弱撑) / 弱撑 < 0.02: rationale["sl_basis"] = f"技术面弱支撑{弱撑}(距{abs(sl_p-弱撑)/弱撑*100:.1f}%)" # 止损在强撑和弱撑之间 elif 强撑 < sl_p < 弱撑: diff_down = (sl_p - 强撑) / 强撑 * 100 diff_up = (弱撑 - sl_p) / sl_p * 100 rationale["sl_basis"] = f"技术面强撑{强撑}-弱撑{弱撑}之间(比强撑高{diff_down:.1f}%,比弱撑低{diff_up:.1f}%)" # 止损低于强撑(宽止损,多见于深套) elif sl_p < 强撑: diff = (强撑 - sl_p) / sl_p * 100 actual = actual_pnl_pct if actual_pnl_pct else 0 if actual < -20: rationale["sl_basis"] = f"低于技术面强撑{强撑}{diff:.1f}%(深套宽止损)" else: rationale["sl_basis"] = f"低于技术面强撑{强撑}{diff:.1f}%(宽止损)" # 止损高于弱撑(紧止损) elif sl_p > 弱撑: rationale["sl_basis"] = f"高于弱撑{弱撑}(紧止损)" elif sl_p and 强撑 and not 弱撑: if abs(sl_p - 强撑) / 强撑 < 0.02: rationale["sl_basis"] = f"技术面强支撑{强撑}" else: rationale["sl_basis"] = f"参考强撑{强撑}调整至{sl_p}" elif sl_p: rationale["sl_basis"] = f"直接设定为{sl_p}(无技术面支撑位参考)" # ---- 止盈依据 ---- if tp_p and 强压: if abs(tp_p - 强压) / 强压 < 0.03: rationale["tp_basis"] = f"技术面强压力{强压}(距{abs(tp_p-强压)/强压*100:.1f}%)" elif tp_p > 强压: diff = (tp_p - 强压) / 强压 * 100 rationale["tp_basis"] = f"技术面强压{强压}上方{diff:.1f}%(趋势延伸目标)" elif 弱压 and 弱压 < tp_p < 强压: rationale["tp_basis"] = f"技术面弱压{弱压}-强压{强压}之间" elif 弱压 and tp_p <= 弱压: rationale["tp_basis"] = f"接近弱压{弱压}(保守目标)" else: rationale["tp_basis"] = f"参考强压{强压}调整至{tp_p}" elif tp_p and not 强压: rationale["tp_basis"] = f"直接设定为{tp_p}(无技术面压力位参考)" elif not tp_p: rationale["tp_basis"] = "未设定止盈价" # ---- 综合技术面分析 ---- parts = [] if 形态: parts.append(f"K线形态:{形态}") if 量价: parts.append(f"量价:{量价}") if 强撑 or 弱撑 or 弱压 or 强压: levels = [] if 强撑: levels.append(f"强撑{强撑}") if 弱撑: levels.append(f"弱撑{弱撑}") if 弱压: levels.append(f"弱压{弱压}") if 强压: levels.append(f"强压{强压}") parts.append("技术位:" + "/".join(levels)) rationale["analysis"] = " | ".join(parts) return rationale def evaluate_phase1(decision, price_info, holding): """ 阶段一评估:策略制定→价格是否达到过目标价位 返回 evaluation dict """ trig = decision.get("trigger", {}) code = decision["code"] name = decision.get("name", code) price = price_info.get("price", 0) change = price_info.get("change_pct", 0) # 策略区间 — 支持两种数据格式: # 1) trigger 子对象(含 entry_zone/stop_loss/take_profit) # 2) 顶层字段(entry_low+entry_high / stop_loss / take_profit) el = trig.get("entry_zone", "") sl = trig.get("stop_loss", "") tp = trig.get("take_profit", "") if not el and decision.get("entry_low") is not None: el_low = decision.get("entry_low") el_high = decision.get("entry_high") el = f"{el_low}~{el_high}" if el_low is not None and el_high is not None else "" if not sl: sl = decision.get("stop_loss", "") if not tp: tp = decision.get("take_profit", "") el_low = el_high = None if el and "~" in str(el): try: parts = str(el).split("~") el_low, el_high = float(parts[0]), float(parts[1]) except: pass sl_p = float(sl) if sl else None tp_p = float(tp) if tp else None # 持仓信息 cost = holding.get("cost", 0) if holding else 0 shares = holding.get("shares", 0) if holding else 0 position_pct = holding.get("position_pct", 0) if holding else 0 # 理论盈亏计算(基于策略区间中值) entry_mid = (el_low + el_high) / 2 if el_low and el_high else price theoretical_pnl_pct = (tp_p - entry_mid) / entry_mid * 100 if tp_p and entry_mid else 0 theoretical_pnl_amount = theoretical_pnl_pct / 100 * entry_mid * (shares or 100) / 100 if shares else 0 # 实际盈亏 actual_pnl_pct = (price - cost) / cost * 100 if cost > 0 and price > 0 else 0 actual_pnl_amount = actual_pnl_pct / 100 * cost * shares if cost > 0 and shares > 0 else 0 # === 策略依据分析(2026-06-18 新增)=== tech = parse_tech_snapshot(decision) rationale = build_strategy_rationale(sl_p, tp_p, tech, decision, actual_pnl_pct) # === R/R 盈亏比计算(2026-06-18 新增)=== # 以现价为基准计算:向下风险(到止损)vs 向上空间(到止盈) # 止损价作为风险基准,止盈价作为收益目标 rr = None rr_risk_pct = None rr_reward_pct = None rr_interpretation = "" rr_level = "" if sl_p and tp_p and price > 0 and sl_p > 0 and price > sl_p: rr_risk_pct = round((price - sl_p) / price * 100, 2) # 距止损% rr_reward_pct = round((tp_p - price) / price * 100, 2) # 距止盈% rr = round(rr_reward_pct / rr_risk_pct, 2) if rr_risk_pct > 0 else None # 判断场景:深套/已持仓盈利/已持仓亏损/新买入 is_deep_loss = actual_pnl_pct < -20 has_profit = actual_pnl_pct >= 0 held = (cost > 0 and shares > 0) if not held: # 新买入/自选股场景 if rr is not None and rr < 1.5: rr_level = "⚠️盈亏比不足" rr_interpretation = f"新买入要求R/R≥1.5,现{rr}每亏1元仅赚{rr}元,不建议买入" elif rr is not None and rr < 2.0: rr_level = "⚠️盈亏比偏低" rr_interpretation = f"新买入要求R/R≥1.5,现{rr}每亏1元赚{rr}元,谨慎" else: rr_level = "R/R达标" rr_interpretation = f"每亏1元赚{rr}元,盈亏比合理" elif is_deep_loss: # 深套场景 — 不限R/R rr_level = "深套持有" rr_interpretation = f"浮亏{actual_pnl_pct:.1f}%>20%深套,R/R={rr}仅参考,不补不割等反弹" elif has_profit: # 已持仓盈利场景 if rr is not None and rr < 0.5: rr_level = "⚠️R/R极低" rr_interpretation = f"盈利持仓R/R={rr},每亏1元仅赚{rr}元,考虑止盈或上移止损保护利润" elif rr is not None and rr < 1.5: rr_level = "⚠️R/R偏低" rr_interpretation = f"盈利持仓R/R={rr},不建议加仓,当前仓位持有观察" else: rr_level = "R/R合理" rr_interpretation = f"盈利持仓R/R={rr},每亏1元赚{rr}元,持有合理" else: # 已持仓浮亏(但非深套) if rr is not None and rr < 0.5: rr_level = "⚠️R/R极低" rr_interpretation = f"浮亏持仓R/R={rr},每亏1元仅赚{rr}元,不建议加仓,关注止损" elif rr is not None and rr < 1.0: rr_level = "⚠️R/R不足" rr_interpretation = f"浮亏持仓要求加仓R/R≥1.0,现{rr},不加仓" else: rr_level = "R/R可接受" rr_interpretation = f"浮亏持仓R/R={rr},每亏1元赚{rr}元,持有等反弹" elif price and sl_p and price <= sl_p: rr_level = "已跌破止损" rr_interpretation = f"现价{price}已破止损{sl_p},R/R不适用" else: rr_interpretation = "止损或止盈缺失,无法计算R/R" # 当前状态判断 status = "safe" if sl_p and price > 0 and price <= sl_p: status = "stop_loss_hit" elif tp_p and price > 0 and price >= tp_p: status = "take_profit_hit" elif el_low and el_high and price > 0 and el_low <= price <= el_high: status = "in_entry_zone" elif el_low and price > 0 and price < el_low: status = "below_entry" elif el_high and price > 0 and price > el_high: status = "above_entry" # 理论阶段评估 theoretical = { "entry_zone": f"{el_low}~{el_high}" if el_low else "N/A", "stop_loss": sl_p, "take_profit": tp_p, "entry_mid_price": round(entry_mid, 2), "target_price": tp_p, "theoretical_pnl_pct": round(theoretical_pnl_pct, 2), "theoretical_pnl_amount": round(theoretical_pnl_amount, 2), "status": status, "current_price": price, "current_change_pct": change, "rr": rr, "rr_risk_pct": rr_risk_pct, "rr_reward_pct": rr_reward_pct, "rr_level": rr_level, "sl_basis": rationale["sl_basis"], "tp_basis": rationale["tp_basis"], "tech_analysis": rationale["analysis"], } # 实际阶段评估 actual = { "cost_price": cost, "shares": shares, "position_pct": position_pct, "actual_pnl_pct": round(actual_pnl_pct, 2), "actual_pnl_amount": round(actual_pnl_amount, 2), "status": status, "current_price": price, } return { "code": code, "name": name, "evaluated_at": datetime.now().isoformat(), "phase": 1, "theoretical": theoretical, "actual": actual, "rr_level": rr_level, "rr_interpretation": rr_interpretation, "strategy_rationale": { "sl_basis": rationale["sl_basis"], "tp_basis": rationale["tp_basis"], "tech_analysis": rationale["analysis"], }, "summary": f"{name}({code}) | 损{sl_p}({rationale['sl_basis']})/" f"盈{tp_p}({rationale['tp_basis']}) | " f"现价{price}({change:+.2f}%) | " f"距损{rr_risk_pct}%/距盈{rr_reward_pct}% | RR={rr} | " f"{rr_level} | 理{theoretical_pnl_pct:+.1f}%实{actual_pnl_pct:+.1f}%", } def evaluate_phase2(decision, price_info, holding, prev_eval): """ 阶段二评估:价格回落后→新止损验证 需要 prev_eval 中记录了之前的目标价和新止损价 """ trig = decision.get("trigger", {}) code = decision["code"] name = decision.get("name", code) price = price_info.get("price", 0) sl = trig.get("stop_loss", "") if not sl: sl = decision.get("stop_loss", "") sl_p = float(sl) if sl else None # 从 prev_eval 中获取阶段一的止损 prev_sl = None if prev_eval: prev_sl = prev_eval.get("theoretical", {}).get("stop_loss") # 检查新止损是否被跌破 new_sl_hit = False days_to_hit = None if sl_p and price > 0 and price <= sl_p: new_sl_hit = True # 无法精确知道多少天跌破,标记为当前 days_to_hit = 0 result = { "code": code, "name": name, "evaluated_at": datetime.now().isoformat(), "phase": 2, "new_stop_loss": sl_p, "previous_stop_loss": prev_sl, "current_price": price, "new_sl_hit": new_sl_hit, "days_to_hit": days_to_hit, "summary": f"{name}({code}) 新止损{sl_p} {'已跌破' if new_sl_hit else '未触及'} 现价{price}", } return result def run(): decisions = load_json(DECISIONS_PATH, {"decisions": []}) portfolio = load_json(PORTFOLIO_PATH, {"holdings": []}) holdings_map = {h["code"]: h for h in portfolio.get("holdings", [])} # 收集所有代码 all_codes = [d["code"] for d in decisions["decisions"]] prices = fetch_prices(all_codes) results = [] stats = { "phase1_correct": 0, "phase1_wrong": 0, "phase1_pending": 0, "phase2_correct": 0, "phase2_wrong": 0, "phase2_pending": 0, } for d in decisions["decisions"]: code = d["code"] pi = prices.get(code, {}) h = holdings_map.get(code) # 获取已有的 evaluation 记录 existing_eval = d.get("evaluation", []) # 阶段一评估 eval1 = evaluate_phase1(d, pi, h) results.append(eval1) # 阶段二评估(如果有前次止损记录) prev_eval = existing_eval[-1] if existing_eval else None if prev_eval and prev_eval.get("phase") == 1: eval2 = evaluate_phase2(d, pi, h, prev_eval) results.append(eval2) # 更新 decisions.json 的 evaluation 字段 d["evaluation"] = [e for e in [eval1] + ([eval2] if prev_eval and prev_eval.get("phase") == 1 else [])] # 保存更新 save_json(DECISIONS_PATH, decisions) # 汇总统计 for r in results: phase = r["phase"] if phase == 1: status = r["theoretical"]["status"] if status in ("take_profit_hit",): stats["phase1_correct"] += 1 elif status in ("stop_loss_hit",): stats["phase1_wrong"] += 1 else: stats["phase1_pending"] += 1 elif phase == 2: if r.get("new_sl_hit"): stats["phase2_correct"] += 1 else: stats["phase2_pending"] += 1 # 写入 accuracy_stats accuracy = { "updated_at": datetime.now().isoformat(), "phase1": { "correct": stats["phase1_correct"], "wrong": stats["phase1_wrong"], "pending": stats["phase1_pending"], "accuracy_pct": round(stats["phase1_correct"] / max(stats["phase1_correct"] + stats["phase1_wrong"], 1) * 100, 1), }, "phase2": { "correct": stats["phase2_correct"], "wrong": stats["phase2_wrong"], "pending": stats["phase2_pending"], "accuracy_pct": round(stats["phase2_correct"] / max(stats["phase2_correct"] + stats["phase2_wrong"], 1) * 100, 1), }, "total_evaluated": len(results), "details": [r["summary"] for r in results], } save_json(ACCURACY_PATH, accuracy) # 输出报告 print("=" * 70) print(f"策略双维度评估报告 | {datetime.now().strftime('%Y-%m-%d %H:%M')}") print("=" * 70) print(f"\n📊 阶段一(策略制定→价格达标)") print(f" 正确(达到止盈): {stats['phase1_correct']}") print(f" 错误(跌破止损): {stats['phase1_wrong']}") print(f" 待验证: {stats['phase1_pending']}") print(f" 准确率: {accuracy['phase1']['accuracy_pct']}%") print(f"\n📊 阶段二(价格回落→新止损验证)") print(f" 正确(新止损验证有效): {stats['phase2_correct']}") print(f" 错误: {stats['phase2_wrong']}") print(f" 待验证: {stats['phase2_pending']}") print(f"\n📋 逐股评估:") for r in results: if r["phase"] == 1: print(f" {r['summary']}") if r.get("rr_interpretation"): print(f" RR: {r['rr_interpretation']}") sr = r.get("strategy_rationale", {}) if sr.get("tech_analysis"): print(f" 技术:{sr['tech_analysis']}") else: print(f" {r['summary']}") # R/R 统计 rr_count = sum(1 for r in results if r.get("rr_level") and r["phase"] == 1) rr_warn = sum(1 for r in results if "⚠️" in r.get("rr_level", "") and r["phase"] == 1) print(f"\n📊 盈亏比R/R统计:") print(f" 有R/R评估: {rr_count}只 | ⚠️异常: {rr_warn}只") for r in results: if r["phase"] == 1 and r.get("rr_level"): level = r["rr_level"] if "⚠️" in level: name_code = r['summary'].split('|')[0].strip() print(f" ⚠️ {name_code} → {level} | {r['rr_interpretation']}") print(f"\n✅ 评估完成,已写入 decisions.json 和 accuracy_stats.json") if __name__ == "__main__": run()