Files
MoFin/advice_reconciliation.py
T
知微 (MoFin) aa0f740381 MoFin 初始提交
完整数据采集+分析管道:
- market_watch.py:90行业板块采集(同花顺/东方财富)
- 市场精选推荐 cron:全市场分析+候选池+星级推荐
- price_monitor.py:持仓/自选高频价格监控
- refresh_mtf_cache.py:多周期K线缓存
- 策略评估/知识萃取管道

文档:docs/ 含完整需求+架构设计
注意:尚未配置 git remote,笑笑接手后自行配置
2026-06-20 12:04:21 +08:00

224 lines
7.6 KiB
Python

#!/usr/bin/env python3
"""advice_reconciliation.py — 建议对账脚本
每周运行一次,对比 decisions.json 的 advice_timeline 与 portfolio.json
的实际持仓变化,统计准确率。
用法:
python3 advice_reconciliation.py # 正常对账
python3 advice_reconciliation.py --force # 强制重新对账所有建议
"""
import json
import sys
from datetime import datetime, timedelta
from pathlib import Path
DATA_DIR = Path(__file__).parent / "data"
DECISIONS_PATH = DATA_DIR / "decisions.json"
PORTFOLIO_PATH = DATA_DIR / "portfolio.json"
ACCURACY_PATH = DATA_DIR / "accuracy_stats.json"
def load_json(path, default=None):
try:
with open(path, encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {} if default is None else default
def save_json(path, data):
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def get_holding_change(portfolio, code):
"""获取某只股票的当前持仓信息"""
holdings = portfolio.get("holdings", [])
for h in holdings:
if h["code"] == code:
return {
"code": code,
"name": h.get("name", ""),
"shares": h.get("shares", 0),
"cost": h.get("cost", 0),
"price": h.get("price", 0),
"position_pct": h.get("position_pct", 0),
}
return None # 已清仓
def evaluate_advice(advice, current_holding):
"""评估一条建议是否正确
Returns: 'correct', 'partial', 'wrong', 'pending', 'unknown'
"""
direction = advice.get("direction", "")
status = advice.get("status", "pending")
if status == "ignored":
return "ignored"
if status == "pending":
return "pending"
if not current_holding:
# 股票已清仓
if direction in ("卖出", "清仓", "减仓"):
return "correct"
elif direction in ("买入", "加仓", "补仓"):
return "wrong"
else:
return "unknown"
shares = current_holding.get("shares", 0)
cost = current_holding.get("cost", 0)
price = current_holding.get("price", 0)
pnl_pct = (price - cost) / cost * 100 if cost > 0 else 0
if direction in ("买入", "加仓", "补仓"):
# 如果建议买入时价格低于现价,或浮盈为正 → 正确
try:
advised_price = float(advice.get("price", 0))
if advised_price > 0 and price > 0:
if price >= advised_price * 0.95: # 允许5%误差
return "correct"
else:
return "wrong"
else:
return "unknown"
except:
return "unknown"
elif direction in ("卖出", "清仓", "减仓"):
# 如果建议卖出时价格高于现价 → 正确(规避了下跌)
try:
advised_price = float(advice.get("price", 0))
if advised_price > 0 and price > 0:
if price <= advised_price * 1.05:
return "correct"
else:
return "wrong"
else:
return "unknown"
except:
return "unknown"
elif direction in ("持有", "观望"):
# 持有建议 → 看后续是否涨
try:
advised_price = float(advice.get("price", 0))
if advised_price > 0 and price > 0:
change = (price - advised_price) / advised_price * 100
if change > -5: # 没跌超过5%
return "correct"
else:
return "wrong"
else:
return "unknown"
except:
return "unknown"
elif direction == "自选":
# 自选建议无法直接对账
return "unknown"
return "unknown"
def run():
force = "--force" in sys.argv
decisions = load_json(DECISIONS_PATH, {"decisions": []})
portfolio = load_json(PORTFOLIO_PATH, {"holdings": []})
old_stats = load_json(ACCURACY_PATH, {})
results = []
total = {"correct": 0, "wrong": 0, "partial": 0, "unknown": 0, "pending": 0, "ignored": 0}
for entry in decisions.get("decisions", []):
code = entry["code"]
name = entry.get("name", code)
timeline = entry.get("advice_timeline", [])
if not timeline:
continue
current_holding = get_holding_change(portfolio, code)
for i, advice in enumerate(timeline):
# 跳过已评估过的(除非 --force)
if not force and advice.get("evaluated"):
# 计数已有结果
result = advice.get("result", "unknown")
total[result] = total.get(result, 0) + 1
continue
result = evaluate_advice(advice, current_holding)
advice["evaluated"] = True
advice["result"] = result
advice["evaluated_at"] = datetime.now().isoformat()
total[result] = total.get(result, 0) + 1
results.append({
"code": code,
"name": name,
"date": advice.get("date", ""),
"direction": advice.get("direction", ""),
"summary": advice.get("summary", ""),
"result": result,
})
# 保存更新后的 decisions.json(含评估标记)
save_json(DECISIONS_PATH, decisions)
# 计算准确率
evaluated = total["correct"] + total["wrong"] + total["partial"]
accuracy = round(total["correct"] / evaluated * 100, 1) if evaluated > 0 else 0
stats = {
"updated_at": datetime.now().isoformat(),
"period_start": old_stats.get("period_start", (datetime.now() - timedelta(days=7)).isoformat()),
"period_end": datetime.now().isoformat(),
"total_advice": sum(total.values()),
"correct": total["correct"],
"wrong": total["wrong"],
"partial": total["partial"],
"unknown": total["unknown"],
"pending": total["pending"],
"ignored": total["ignored"],
"evaluated": evaluated,
"accuracy_pct": accuracy,
"details": results,
# 累计统计
"cumulative": {
"total": old_stats.get("cumulative", {}).get("total", 0) + evaluated,
"correct": old_stats.get("cumulative", {}).get("correct", 0) + total["correct"],
"wrong": old_stats.get("cumulative", {}).get("wrong", 0) + total["wrong"],
},
}
cum = stats["cumulative"]
cum_accuracy = round(cum["correct"] / cum["total"] * 100, 1) if cum["total"] > 0 else 0
stats["cumulative_accuracy_pct"] = cum_accuracy
save_json(ACCURACY_PATH, stats)
# 输出摘要
print(f"📊 建议对账报告")
print(f" 周期: {stats['period_start'][:10]} ~ {stats['period_end'][:10]}")
print(f" 总建议: {stats['total_advice']}")
print(f" ✅ 正确: {stats['correct']}")
print(f" ❌ 错误: {stats['wrong']}")
print(f" ⏳ 待确认: {stats['pending']}")
print(f" ✗ 已忽略: {stats['ignored']}")
print(f" ❓ 无法判断: {stats['unknown']}")
print(f" 📈 本期准确率: {accuracy}%")
print(f" 📈 累计准确率: {cum_accuracy}%")
if results:
print(f"\n 详情:")
for r in results[:20]:
icon = {"correct": "", "wrong": "", "partial": "🟡", "unknown": "", "pending": "", "ignored": ""}
print(f" {icon.get(r['result'], '?')} {r['name']}({r['code']}) {r['direction']}{r['result']}")
if __name__ == "__main__":
run()