aa0f740381
完整数据采集+分析管道: - market_watch.py:90行业板块采集(同花顺/东方财富) - 市场精选推荐 cron:全市场分析+候选池+星级推荐 - price_monitor.py:持仓/自选高频价格监控 - refresh_mtf_cache.py:多周期K线缓存 - 策略评估/知识萃取管道 文档:docs/ 含完整需求+架构设计 注意:尚未配置 git remote,笑笑接手后自行配置
224 lines
7.6 KiB
Python
224 lines
7.6 KiB
Python
#!/usr/bin/env python3
|
|
"""advice_reconciliation.py — 建议对账脚本
|
|
|
|
每周运行一次,对比 decisions.json 的 advice_timeline 与 portfolio.json
|
|
的实际持仓变化,统计准确率。
|
|
|
|
用法:
|
|
python3 advice_reconciliation.py # 正常对账
|
|
python3 advice_reconciliation.py --force # 强制重新对账所有建议
|
|
"""
|
|
import json
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
DATA_DIR = Path(__file__).parent / "data"
|
|
DECISIONS_PATH = DATA_DIR / "decisions.json"
|
|
PORTFOLIO_PATH = DATA_DIR / "portfolio.json"
|
|
ACCURACY_PATH = DATA_DIR / "accuracy_stats.json"
|
|
|
|
def load_json(path, default=None):
|
|
try:
|
|
with open(path, encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return {} if default is None else default
|
|
|
|
def save_json(path, data):
|
|
Path(path).parent.mkdir(parents=True, exist_ok=True)
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
def get_holding_change(portfolio, code):
|
|
"""获取某只股票的当前持仓信息"""
|
|
holdings = portfolio.get("holdings", [])
|
|
for h in holdings:
|
|
if h["code"] == code:
|
|
return {
|
|
"code": code,
|
|
"name": h.get("name", ""),
|
|
"shares": h.get("shares", 0),
|
|
"cost": h.get("cost", 0),
|
|
"price": h.get("price", 0),
|
|
"position_pct": h.get("position_pct", 0),
|
|
}
|
|
return None # 已清仓
|
|
|
|
def evaluate_advice(advice, current_holding):
|
|
"""评估一条建议是否正确
|
|
|
|
Returns: 'correct', 'partial', 'wrong', 'pending', 'unknown'
|
|
"""
|
|
direction = advice.get("direction", "")
|
|
status = advice.get("status", "pending")
|
|
|
|
if status == "ignored":
|
|
return "ignored"
|
|
if status == "pending":
|
|
return "pending"
|
|
|
|
if not current_holding:
|
|
# 股票已清仓
|
|
if direction in ("卖出", "清仓", "减仓"):
|
|
return "correct"
|
|
elif direction in ("买入", "加仓", "补仓"):
|
|
return "wrong"
|
|
else:
|
|
return "unknown"
|
|
|
|
shares = current_holding.get("shares", 0)
|
|
cost = current_holding.get("cost", 0)
|
|
price = current_holding.get("price", 0)
|
|
pnl_pct = (price - cost) / cost * 100 if cost > 0 else 0
|
|
|
|
if direction in ("买入", "加仓", "补仓"):
|
|
# 如果建议买入时价格低于现价,或浮盈为正 → 正确
|
|
try:
|
|
advised_price = float(advice.get("price", 0))
|
|
if advised_price > 0 and price > 0:
|
|
if price >= advised_price * 0.95: # 允许5%误差
|
|
return "correct"
|
|
else:
|
|
return "wrong"
|
|
else:
|
|
return "unknown"
|
|
except:
|
|
return "unknown"
|
|
|
|
elif direction in ("卖出", "清仓", "减仓"):
|
|
# 如果建议卖出时价格高于现价 → 正确(规避了下跌)
|
|
try:
|
|
advised_price = float(advice.get("price", 0))
|
|
if advised_price > 0 and price > 0:
|
|
if price <= advised_price * 1.05:
|
|
return "correct"
|
|
else:
|
|
return "wrong"
|
|
else:
|
|
return "unknown"
|
|
except:
|
|
return "unknown"
|
|
|
|
elif direction in ("持有", "观望"):
|
|
# 持有建议 → 看后续是否涨
|
|
try:
|
|
advised_price = float(advice.get("price", 0))
|
|
if advised_price > 0 and price > 0:
|
|
change = (price - advised_price) / advised_price * 100
|
|
if change > -5: # 没跌超过5%
|
|
return "correct"
|
|
else:
|
|
return "wrong"
|
|
else:
|
|
return "unknown"
|
|
except:
|
|
return "unknown"
|
|
|
|
elif direction == "自选":
|
|
# 自选建议无法直接对账
|
|
return "unknown"
|
|
|
|
return "unknown"
|
|
|
|
|
|
def run():
|
|
force = "--force" in sys.argv
|
|
|
|
decisions = load_json(DECISIONS_PATH, {"decisions": []})
|
|
portfolio = load_json(PORTFOLIO_PATH, {"holdings": []})
|
|
old_stats = load_json(ACCURACY_PATH, {})
|
|
|
|
results = []
|
|
total = {"correct": 0, "wrong": 0, "partial": 0, "unknown": 0, "pending": 0, "ignored": 0}
|
|
|
|
for entry in decisions.get("decisions", []):
|
|
code = entry["code"]
|
|
name = entry.get("name", code)
|
|
timeline = entry.get("advice_timeline", [])
|
|
|
|
if not timeline:
|
|
continue
|
|
|
|
current_holding = get_holding_change(portfolio, code)
|
|
|
|
for i, advice in enumerate(timeline):
|
|
# 跳过已评估过的(除非 --force)
|
|
if not force and advice.get("evaluated"):
|
|
# 计数已有结果
|
|
result = advice.get("result", "unknown")
|
|
total[result] = total.get(result, 0) + 1
|
|
continue
|
|
|
|
result = evaluate_advice(advice, current_holding)
|
|
advice["evaluated"] = True
|
|
advice["result"] = result
|
|
advice["evaluated_at"] = datetime.now().isoformat()
|
|
total[result] = total.get(result, 0) + 1
|
|
|
|
results.append({
|
|
"code": code,
|
|
"name": name,
|
|
"date": advice.get("date", ""),
|
|
"direction": advice.get("direction", ""),
|
|
"summary": advice.get("summary", ""),
|
|
"result": result,
|
|
})
|
|
|
|
# 保存更新后的 decisions.json(含评估标记)
|
|
save_json(DECISIONS_PATH, decisions)
|
|
|
|
# 计算准确率
|
|
evaluated = total["correct"] + total["wrong"] + total["partial"]
|
|
accuracy = round(total["correct"] / evaluated * 100, 1) if evaluated > 0 else 0
|
|
|
|
stats = {
|
|
"updated_at": datetime.now().isoformat(),
|
|
"period_start": old_stats.get("period_start", (datetime.now() - timedelta(days=7)).isoformat()),
|
|
"period_end": datetime.now().isoformat(),
|
|
"total_advice": sum(total.values()),
|
|
"correct": total["correct"],
|
|
"wrong": total["wrong"],
|
|
"partial": total["partial"],
|
|
"unknown": total["unknown"],
|
|
"pending": total["pending"],
|
|
"ignored": total["ignored"],
|
|
"evaluated": evaluated,
|
|
"accuracy_pct": accuracy,
|
|
"details": results,
|
|
# 累计统计
|
|
"cumulative": {
|
|
"total": old_stats.get("cumulative", {}).get("total", 0) + evaluated,
|
|
"correct": old_stats.get("cumulative", {}).get("correct", 0) + total["correct"],
|
|
"wrong": old_stats.get("cumulative", {}).get("wrong", 0) + total["wrong"],
|
|
},
|
|
}
|
|
|
|
cum = stats["cumulative"]
|
|
cum_accuracy = round(cum["correct"] / cum["total"] * 100, 1) if cum["total"] > 0 else 0
|
|
stats["cumulative_accuracy_pct"] = cum_accuracy
|
|
|
|
save_json(ACCURACY_PATH, stats)
|
|
|
|
# 输出摘要
|
|
print(f"📊 建议对账报告")
|
|
print(f" 周期: {stats['period_start'][:10]} ~ {stats['period_end'][:10]}")
|
|
print(f" 总建议: {stats['total_advice']}")
|
|
print(f" ✅ 正确: {stats['correct']}")
|
|
print(f" ❌ 错误: {stats['wrong']}")
|
|
print(f" ⏳ 待确认: {stats['pending']}")
|
|
print(f" ✗ 已忽略: {stats['ignored']}")
|
|
print(f" ❓ 无法判断: {stats['unknown']}")
|
|
print(f" 📈 本期准确率: {accuracy}%")
|
|
print(f" 📈 累计准确率: {cum_accuracy}%")
|
|
|
|
if results:
|
|
print(f"\n 详情:")
|
|
for r in results[:20]:
|
|
icon = {"correct": "✅", "wrong": "❌", "partial": "🟡", "unknown": "❓", "pending": "⏳", "ignored": "✗"}
|
|
print(f" {icon.get(r['result'], '?')} {r['name']}({r['code']}) {r['direction']} → {r['result']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|