Files
MoFin/strategy_feedback.py
T
hmo 25f8c6ec67 refactor: 消费者切 SQLite 优先读取
切换策略: SQLite 优先 → 失败回退 JSON

price_events (100%覆盖):
- strategy_feedback.py: run() 优先 query_price_events()
- system_health_check.py: 优先 query_price_events() + query_price_events_by_date()

stock_sector_map (100%覆盖):
- strategy_lifecycle.py: load_stock_sector_map() 优先 stock_sectors 表

market.json (85%覆盖):
- strategy_lifecycle.py: load_market_context() 优先 query_latest_market()
- market_insight.py: generate() 优先 query_latest_market()

portfolio.json + watchlist.json (70%覆盖):
- strategy_lifecycle.py: regenerate_all() 优先 query_holdings() + query_watchlist()
- server.py: /api/portfolio, /api/watchlist, /api/overview, /api/market 优先 SQLite

所有改动保留 JSON 回退路径,SQLite 不可用时自动降级
2026-06-20 17:50:15 +08:00

260 lines
9.1 KiB
Python

#!/usr/bin/env python3
"""strategy_feedback.py — 策略评估反馈引擎
从评估结果自动推导策略调整建议:
1. 阶段一完成(达到止盈)→ 标记成功,萃取经验
2. 阶段一失败(跌破止损)→ 标记失败,分析原因
3. 阶段二验证(新止损被跌破)→ 标记止损正确
4. 长期未触及任何区间 → 建议重新评估策略区间
5. 准确率趋势 → 调整策略参数(宽度、区间计算方法)
输出:写入 decisions.json + 生成调整建议报告
"""
import json
import sys
from datetime import datetime, timedelta
from pathlib import Path
DATA_DIR = Path(__file__).parent / "data"
DECISIONS_PATH = DATA_DIR / "decisions.json"
PORTFOLIO_PATH = DATA_DIR / "portfolio.json"
ACCURACY_PATH = DATA_DIR / "accuracy_stats.json"
EVENTS_PATH = DATA_DIR / "price_events.json"
FEEDBACK_PATH = DATA_DIR / "strategy_feedback.json"
def load_json(path, default=None):
try:
with open(path, encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {} if default is None else default
def save_json(path, data):
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def check_phase_completion(decision, events):
"""检查某条策略是否有阶段完成事件"""
code = decision["code"]
trig = decision.get("trigger", {})
stock_events = [e for e in events.get("events", []) if e["code"] == code]
el = trig.get("entry_zone", "")
sl = trig.get("stop_loss", "")
tp = trig.get("take_profit", "")
el_low = el_high = None
if el and "~" in str(el):
try:
parts = str(el).split("~")
el_low, el_high = float(parts[0]), float(parts[1])
except:
pass
sl_p = float(sl) if sl else None
tp_p = float(tp) if tp else None
result = {
"phase1_completed": False,
"phase1_result": None, # "success" or "failure"
"phase1_completed_at": None,
"phase1_price_at_completion": None,
"phase2_completed": False,
"phase2_result": None,
"phase2_completed_at": None,
"days_in_phase1": None,
}
# 检查价格事件中是否有止盈/止损触发
for ev in stock_events:
ev_type = ev.get("event_type", "")
ev_price = ev.get("price", 0)
ev_time = ev.get("timestamp", "")
if ev_type == "stop_loss" and sl_p and ev_price <= sl_p:
result["phase1_completed"] = True
result["phase1_result"] = "failure"
result["phase1_completed_at"] = ev_time
result["phase1_price_at_completion"] = ev_price
# 止盈:检查是否达到或超过止盈价
if tp_p and ev_price >= tp_p:
result["phase1_completed"] = True
result["phase1_result"] = "success"
result["phase1_completed_at"] = ev_time
result["phase1_price_at_completion"] = ev_price
return result
def compute_accuracy_trend(accuracy_stats):
"""计算准确率趋势,用于调整策略参数"""
details = accuracy_stats.get("details", [])
if not details:
return {"trend": "stable", "phase1_accuracy": 0, "phase2_accuracy": 0}
phase1 = accuracy_stats.get("phase1", {})
phase2 = accuracy_stats.get("phase2", {})
p1_acc = phase1.get("accuracy_pct", 0)
p2_acc = phase2.get("accuracy_pct", 0)
if p1_acc >= 80:
trend = "improving"
elif p1_acc <= 50 and phase1.get("correct", 0) + phase1.get("wrong", 0) >= 3:
trend = "declining"
else:
trend = "stable"
return {
"trend": trend,
"phase1_accuracy": p1_acc,
"phase2_accuracy": p2_acc,
"phase1_evaluated": phase1.get("correct", 0) + phase1.get("wrong", 0),
"phase2_evaluated": phase2.get("correct", 0) + phase2.get("wrong", 0),
}
def generate_adjustment(decision, phase_check, accuracy_trend):
"""根据评估结果生成策略调整建议"""
code = decision["code"]
name = decision.get("name", code)
trig = decision.get("trigger", {})
adjustments = []
el = trig.get("entry_zone", "")
sl = trig.get("stop_loss", "")
tp = trig.get("take_profit", "")
# 1. 阶段一完成 → 萃取经验
if phase_check["phase1_completed"]:
if phase_check["phase1_result"] == "success":
adjustments.append({
"type": "phase1_success",
"message": f"{name}({code}) 阶段一成功:达到止盈{tp},价格{phase_check['phase1_price_at_completion']}",
"action": "mark_completed",
"knowledge": f"策略区间{el}有效,价格达到止盈位{tp}",
})
elif phase_check["phase1_result"] == "failure":
adjustments.append({
"type": "phase1_failure",
"message": f"{name}({code}) 阶段一失败:跌破止损{sl},价格{phase_check['phase1_price_at_completion']}",
"action": "reassess",
"knowledge": f"策略区间{el}失效,止损{sl}被跌破,需重新评估",
})
# 2. 长期未触发 → 建议重新评估
# 检查策略创建时间
created_at = decision.get("timestamp", "")
if created_at:
try:
created = datetime.fromisoformat(created_at)
days_since = (datetime.now() - created).days
if days_since >= 14 and not phase_check["phase1_completed"]:
adjustments.append({
"type": "stale_strategy",
"message": f"{name}({code}) 策略已{days_since}天未触发任何区间,建议重新评估",
"action": "reassess",
})
except:
pass
# 3. 准确率趋势 → 调整策略宽度
if accuracy_trend["trend"] == "declining" and accuracy_trend["phase1_evaluated"] >= 3:
adjustments.append({
"type": "accuracy_declining",
"message": f"准确率下降至{accuracy_trend['phase1_accuracy']}%,建议收紧策略区间宽度",
"action": "tighten",
})
return adjustments
def run():
decisions = load_json(DECISIONS_PATH, {"decisions": []})
# 优先从 SQLite 读取价格事件
try:
from mofin_db import get_conn, query_price_events
conn = get_conn()
pe_rows = query_price_events(conn, limit=50000)
conn.close()
events = {"events": pe_rows}
except Exception:
events = load_json(EVENTS_PATH, {"events": []})
accuracy_stats = load_json(ACCURACY_PATH, {})
accuracy_trend = compute_accuracy_trend(accuracy_stats)
all_feedback = []
phase1_completed = []
reassess_needed = []
for d in decisions["decisions"]:
code = d["code"]
name = d.get("name", code)
# 检查阶段完成情况
phase_check = check_phase_completion(d, events)
# 生成调整建议
adjustments = generate_adjustment(d, phase_check, accuracy_trend)
feedback_entry = {
"code": code,
"name": name,
"evaluated_at": datetime.now().isoformat(),
"phase_check": phase_check,
"adjustments": adjustments,
}
all_feedback.append(feedback_entry)
# 收集需要处理的策略
if phase_check["phase1_completed"]:
phase1_completed.append(feedback_entry)
if any(a["action"] in ("reassess", "tighten") for a in adjustments):
reassess_needed.append(feedback_entry)
# 保存反馈结果
feedback_data = {
"updated_at": datetime.now().isoformat(),
"accuracy_trend": accuracy_trend,
"total_strategies": len(decisions["decisions"]),
"phase1_completed_count": len(phase1_completed),
"reassess_needed_count": len(reassess_needed),
"feedback": all_feedback,
}
save_json(FEEDBACK_PATH, feedback_data)
# 输出报告
print("=" * 70)
print(f"策略反馈引擎报告 | {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print("=" * 70)
print(f"\n📈 准确率趋势: {accuracy_trend['trend']}")
print(f" 阶段一准确率: {accuracy_trend['phase1_accuracy']}% ({accuracy_trend['phase1_evaluated']}次评估)")
print(f" 阶段二准确率: {accuracy_trend['phase2_accuracy']}% ({accuracy_trend['phase2_evaluated']}次评估)")
print(f"\n✅ 阶段一已完成: {len(phase1_completed)}")
for fb in phase1_completed:
pc = fb["phase_check"]
icon = "🟢" if pc["phase1_result"] == "success" else "🔴"
print(f" {icon} {fb['name']}({fb['code']}) {pc['phase1_result']}{pc['phase1_completed_at'][:19]} 价格{pc['phase1_price_at_completion']}")
print(f"\n🔄 需重新评估: {len(reassess_needed)}")
for fb in reassess_needed:
for adj in fb["adjustments"]:
print(f" {adj['type']}: {adj['message']}")
if not phase1_completed and not reassess_needed:
print(f"\n 无策略需要调整,所有策略正常运行中")
print(f"\n✅ 反馈完成,已写入 {FEEDBACK_PATH}")
if __name__ == "__main__":
run()