#!/usr/bin/env python3 """strategy_feedback.py — 策略评估反馈引擎 从评估结果自动推导策略调整建议: 1. 阶段一完成(达到止盈)→ 标记成功,萃取经验 2. 阶段一失败(跌破止损)→ 标记失败,分析原因 3. 阶段二验证(新止损被跌破)→ 标记止损正确 4. 长期未触及任何区间 → 建议重新评估策略区间 5. 准确率趋势 → 调整策略参数(宽度、区间计算方法) 输出:写入 decisions.json + 生成调整建议报告 """ import json import sys from datetime import datetime, timedelta from pathlib import Path DATA_DIR = Path(__file__).parent / "data" DECISIONS_PATH = DATA_DIR / "decisions.json" PORTFOLIO_PATH = DATA_DIR / "portfolio.json" ACCURACY_PATH = DATA_DIR / "accuracy_stats.json" EVENTS_PATH = DATA_DIR / "price_events.json" FEEDBACK_PATH = DATA_DIR / "strategy_feedback.json" def load_json(path, default=None): try: with open(path, encoding="utf-8") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} if default is None else default def save_json(path, data): Path(path).parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def check_phase_completion(decision, events): """检查某条策略是否有阶段完成事件""" code = decision["code"] trig = decision.get("trigger", {}) stock_events = [e for e in events.get("events", []) if e["code"] == code] el = trig.get("entry_zone", "") sl = trig.get("stop_loss", "") tp = trig.get("take_profit", "") el_low = el_high = None if el and "~" in str(el): try: parts = str(el).split("~") el_low, el_high = float(parts[0]), float(parts[1]) except: pass sl_p = float(sl) if sl else None tp_p = float(tp) if tp else None result = { "phase1_completed": False, "phase1_result": None, # "success" or "failure" "phase1_completed_at": None, "phase1_price_at_completion": None, "phase2_completed": False, "phase2_result": None, "phase2_completed_at": None, "days_in_phase1": None, } # 检查价格事件中是否有止盈/止损触发 for ev in stock_events: ev_type = ev.get("event_type", "") ev_price = ev.get("price", 0) ev_time = ev.get("timestamp", "") if ev_type == "stop_loss" and sl_p and ev_price <= sl_p: result["phase1_completed"] = True result["phase1_result"] = "failure" result["phase1_completed_at"] = ev_time result["phase1_price_at_completion"] = ev_price # 止盈:检查是否达到或超过止盈价 if tp_p and ev_price >= tp_p: result["phase1_completed"] = True result["phase1_result"] = "success" result["phase1_completed_at"] = ev_time result["phase1_price_at_completion"] = ev_price return result def compute_accuracy_trend(accuracy_stats): """计算准确率趋势,用于调整策略参数""" details = accuracy_stats.get("details", []) if not details: return {"trend": "stable", "phase1_accuracy": 0, "phase2_accuracy": 0} phase1 = accuracy_stats.get("phase1", {}) phase2 = accuracy_stats.get("phase2", {}) p1_acc = phase1.get("accuracy_pct", 0) p2_acc = phase2.get("accuracy_pct", 0) if p1_acc >= 80: trend = "improving" elif p1_acc <= 50 and phase1.get("correct", 0) + phase1.get("wrong", 0) >= 3: trend = "declining" else: trend = "stable" return { "trend": trend, "phase1_accuracy": p1_acc, "phase2_accuracy": p2_acc, "phase1_evaluated": phase1.get("correct", 0) + phase1.get("wrong", 0), "phase2_evaluated": phase2.get("correct", 0) + phase2.get("wrong", 0), } def generate_adjustment(decision, phase_check, accuracy_trend): """根据评估结果生成策略调整建议""" code = decision["code"] name = decision.get("name", code) trig = decision.get("trigger", {}) adjustments = [] el = trig.get("entry_zone", "") sl = trig.get("stop_loss", "") tp = trig.get("take_profit", "") # 1. 阶段一完成 → 萃取经验 if phase_check["phase1_completed"]: if phase_check["phase1_result"] == "success": adjustments.append({ "type": "phase1_success", "message": f"{name}({code}) 阶段一成功:达到止盈{tp},价格{phase_check['phase1_price_at_completion']}", "action": "mark_completed", "knowledge": f"策略区间{el}有效,价格达到止盈位{tp}", }) elif phase_check["phase1_result"] == "failure": adjustments.append({ "type": "phase1_failure", "message": f"{name}({code}) 阶段一失败:跌破止损{sl},价格{phase_check['phase1_price_at_completion']}", "action": "reassess", "knowledge": f"策略区间{el}失效,止损{sl}被跌破,需重新评估", }) # 2. 长期未触发 → 建议重新评估 # 检查策略创建时间 created_at = decision.get("timestamp", "") if created_at: try: created = datetime.fromisoformat(created_at) days_since = (datetime.now() - created).days if days_since >= 14 and not phase_check["phase1_completed"]: adjustments.append({ "type": "stale_strategy", "message": f"{name}({code}) 策略已{days_since}天未触发任何区间,建议重新评估", "action": "reassess", }) except: pass # 3. 准确率趋势 → 调整策略宽度 if accuracy_trend["trend"] == "declining" and accuracy_trend["phase1_evaluated"] >= 3: adjustments.append({ "type": "accuracy_declining", "message": f"准确率下降至{accuracy_trend['phase1_accuracy']}%,建议收紧策略区间宽度", "action": "tighten", }) return adjustments def run(): decisions = load_json(DECISIONS_PATH, {"decisions": []}) events = load_json(EVENTS_PATH, {"events": []}) accuracy_stats = load_json(ACCURACY_PATH, {}) accuracy_trend = compute_accuracy_trend(accuracy_stats) all_feedback = [] phase1_completed = [] reassess_needed = [] for d in decisions["decisions"]: code = d["code"] name = d.get("name", code) # 检查阶段完成情况 phase_check = check_phase_completion(d, events) # 生成调整建议 adjustments = generate_adjustment(d, phase_check, accuracy_trend) feedback_entry = { "code": code, "name": name, "evaluated_at": datetime.now().isoformat(), "phase_check": phase_check, "adjustments": adjustments, } all_feedback.append(feedback_entry) # 收集需要处理的策略 if phase_check["phase1_completed"]: phase1_completed.append(feedback_entry) if any(a["action"] in ("reassess", "tighten") for a in adjustments): reassess_needed.append(feedback_entry) # 保存反馈结果 feedback_data = { "updated_at": datetime.now().isoformat(), "accuracy_trend": accuracy_trend, "total_strategies": len(decisions["decisions"]), "phase1_completed_count": len(phase1_completed), "reassess_needed_count": len(reassess_needed), "feedback": all_feedback, } save_json(FEEDBACK_PATH, feedback_data) # 输出报告 print("=" * 70) print(f"策略反馈引擎报告 | {datetime.now().strftime('%Y-%m-%d %H:%M')}") print("=" * 70) print(f"\n📈 准确率趋势: {accuracy_trend['trend']}") print(f" 阶段一准确率: {accuracy_trend['phase1_accuracy']}% ({accuracy_trend['phase1_evaluated']}次评估)") print(f" 阶段二准确率: {accuracy_trend['phase2_accuracy']}% ({accuracy_trend['phase2_evaluated']}次评估)") print(f"\n✅ 阶段一已完成: {len(phase1_completed)}只") for fb in phase1_completed: pc = fb["phase_check"] icon = "🟢" if pc["phase1_result"] == "success" else "🔴" print(f" {icon} {fb['name']}({fb['code']}) {pc['phase1_result']} 于{pc['phase1_completed_at'][:19]} 价格{pc['phase1_price_at_completion']}") print(f"\n🔄 需重新评估: {len(reassess_needed)}只") for fb in reassess_needed: for adj in fb["adjustments"]: print(f" {adj['type']}: {adj['message']}") if not phase1_completed and not reassess_needed: print(f"\n 无策略需要调整,所有策略正常运行中") print(f"\n✅ 反馈完成,已写入 {FEEDBACK_PATH}") if __name__ == "__main__": run()