Files
MoFin/scripts/strategy_review.py
T
知微 b053103377 feat: 策略复盘闭环 Phase1
- 新增 scripts/strategy_review.py: 遍历所有active策略
  - 腾讯API实时价对比止损/止盈/入场点
  - 分类: correct/wrong/partial/pending
  - 失败模式归因: 止损过紧/入场过早/止盈过远等
  - 写入 accuracy_stats 表(首条真实数据)
- 新增 docs/strategy-review-loop.md: 完整闭环设计文档
- 含失败模式→修复方向映射表

Phase1 结果: 38条策略, 94.7%准确率(19条待定), 1条止损过紧
2026-06-25 19:58:00 +08:00

197 lines
7.8 KiB
Python

#!/usr/bin/env python3
"""strategy_review.py — 策略成功率复盘 (no_agent)
遍历所有active策略,检查实际结果,写入accuracy_stats表。
归因分析失败模式,输出复盘报告。
用法:
python3 scripts/strategy_review.py # 正常复盘
python3 scripts/strategy_review.py --force # 强制重新评估所有
"""
import json, sqlite3, sys, time, urllib.request
from pathlib import Path
from datetime import datetime, timedelta
from collections import Counter
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
DB_PATH = DATA / "mofin.db"
DECISIONS_PATH = DATA / "decisions.json"
FAILURE_MODES = {
"stop_too_tight": {"label": "止损过紧", "fix": "放宽止损到强支撑×0.95"},
"entry_too_early": {"label": "入场过早", "fix": "买入区下移,等缩量确认"},
"tp_too_close": {"label": "止盈过近", "fix": "止盈放到更高阻力位"},
"wrong_direction": {"label": "方向看错", "fix": "检查多周期趋势判断"},
"wrong_regime": {"label": "情景错配", "fix": "加入情景过滤条件"},
"bad_signal": {"label": "信号误判", "fix": "修正信号合成逻辑"},
"sector_drag": {"label": "行业拖累", "fix": "加入行业动量过滤"},
}
def fetch_price(code):
"""拉腾讯实时价"""
try:
prefix = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk"
url = f"http://qt.gtimg.cn/q={prefix}{code}"
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
resp = urllib.request.urlopen(req, timeout=5).read().decode('gbk')
fld = resp.split('=')[1].strip().strip('"').strip(';').split('~')
return float(fld[3]) if len(fld) > 3 else 0
except:
return 0
def classify_outcome(strategy, price):
"""对单条策略做结果分类"""
sl = strategy.get("stop_loss", 0) or 0
tp = strategy.get("take_profit", 0) or 0
entry_low = strategy.get("entry_low", 0) or 0
entry_high = strategy.get("entry_high", 0) or 0
created = strategy.get("created_at", "") or strategy.get("timestamp", "")
code = strategy.get("code", "")
name = strategy.get("name", "")
if not created or not price:
return "pending", None, None
# 计算天数
try:
created_dt = datetime.fromisoformat(created)
days = (datetime.now() - created_dt).days
except:
days = 0
# 判断结果
if sl > 0 and price <= sl:
# 止损触发
return "wrong", "stop_too_tight", f"止损{sl},现价{price}跌破止损"
elif tp > 0 and price >= tp:
# 止盈触发
return "correct", None, f"止盈{tp},现价{price}达到目标"
elif sl > 0 and price <= sl * 1.03:
# 逼近止损
return "pending", None, f"距止损仅{(price-sl)/sl*100:.1f}%"
elif entry_low > 0 and price < entry_low * 0.9:
# 入场后大跌
drift = (entry_low - price) / entry_low
if drift > 0.15:
return "wrong", "entry_too_early", f"入场{entry_low}后跌{drift:.0f}%至{price}"
else:
return "partial", None, f"入场{entry_low}后微跌{drift:.1f}%"
elif tp > 0 and price >= tp * 0.85:
# 接近止盈但没到
return "correct", None, f"距止盈{tp}{(tp-price)/price*100:.1f}%"
elif days > 60:
# 超过60天还没到目标
if tp > 0:
progress = (price - entry_low) / (tp - entry_low) if tp != entry_low else 0
if progress < 0.3:
return "wrong", "tp_too_close", f"60天+仅走{progress:.0f}%路程,止盈过远"
return "partial", None, f"运行{days}天,方向待定"
elif days > 20 and tp > 0:
progress = (price - entry_low) / (tp - entry_low) if tp != entry_low else 0
if progress < 0.2:
return "partial", None, f"20天+仅走{progress:.0f}%"
return "correct", None, f"20天走{progress:.0f}%"
else:
return "pending", None, f"运行{days}天,待观察"
def analyze_failure_mode(failure_id, strategy, price):
"""对失败策略做根因分类"""
sl = strategy.get("stop_loss", 0) or 0
tp = strategy.get("take_profit", 0) or 0
entry = strategy.get("entry_low", 0) or strategy.get("entry_high", 0) or 0
if failure_id == "stop_too_tight":
# 止损过紧:止损后价格是否反弹了?
return {"mode": "stop_too_tight", "severity": "high",
"detail": f"止损{sl}被打,检查强支撑是否在{sl}以下足够空间"}
elif failure_id == "entry_too_early":
return {"mode": "entry_too_early", "severity": "medium",
"detail": f"买入区下沿{entry}后继续跌至{price},需缩量确认后再入"}
elif failure_id == "tp_too_close":
return {"mode": "tp_too_close", "severity": "medium",
"detail": f"止盈{tp}过早到达或从未到达,检查阻力位是否准确"}
else:
return {"mode": "unknown", "severity": "low", "detail": "需人工分析"}
def review():
start = time.time()
force = "--force" in sys.argv
decisions = json.loads(DECISIONS_PATH.read_text())
strategies = decisions.get("decisions", [])
conn = sqlite3.connect(str(DB_PATH))
results = {"correct": 0, "wrong": 0, "partial": 0, "pending": 0, "total": 0}
failure_counter = Counter()
summary_lines = []
for s in strategies:
code = s.get("code", "")
name = s.get("name", "")
status = s.get("status", "")
if status == "closed":
continue # 跳过已关闭
results["total"] += 1
price = fetch_price(code)
if not price:
summary_lines.append(f" ⏭️ {name}({code}): 无法获取价格")
results["pending"] += 1
continue
outcome, failure_id, detail = classify_outcome(s, price)
results[outcome] += 1
if outcome == "wrong" and failure_id:
failure_counter[failure_id] += 1
analysis = analyze_failure_mode(failure_id, s, price)
summary_lines.append(f"{name}({code}): {FAILURE_MODES.get(failure_id,{}).get('label','未知')}{detail}")
elif outcome == "correct":
summary_lines.append(f"{name}({code}): {detail}")
elif outcome == "partial":
summary_lines.append(f" ⚠️ {name}({code}): {detail}")
else:
summary_lines.append(f"{name}({code}): {detail}")
# 写入 accuracy_stats
conn.execute(
"INSERT INTO accuracy_stats (total_advice, correct, wrong, partial, pending, "
"accuracy_pct, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(results["total"], results["correct"], results["wrong"],
results["partial"], results["pending"],
round(results["correct"] / max(results["total"] - results["pending"], 1) * 100, 1),
datetime.now().isoformat()))
conn.commit()
conn.close()
# 输出报告
elapsed = time.time() - start
total_eval = results["total"] - results["pending"]
accuracy = results["correct"] / max(total_eval, 1) * 100
print(f"策略复盘 | {datetime.now().strftime('%Y-%m-%d')} | {results['total']}条策略 | ({elapsed:.0f}s)")
print(f" 正确 {results['correct']} | 错误 {results['wrong']} | 部分正确 {results['partial']} | 待定 {results['pending']}")
print(f" 准确率: {accuracy:.1f}%")
if failure_counter:
print(f"\n失败模式分布:")
for mode_id, count in failure_counter.most_common():
info = FAILURE_MODES.get(mode_id, {})
print(f" {info.get('label', mode_id)} ({count}次): {info.get('fix', '')}")
if summary_lines:
print(f"\n逐条复盘:")
for line in summary_lines:
print(line)
if __name__ == "__main__":
review()