#!/usr/bin/env python3 """ 策略时效性检查 - Strategy Staleness Checker v1 ============================================== 扫描所有活跃策略,检查三个维度: 1. 时间老化:超过14/21天未更新预警 2. 价格偏离:当前价偏离买入区中心 >30% 预警 3. 买入区失效:买入区完全脱离当前价格区间 输出两份:JSON报告(给系统) + 人类可读摘要(stdout for cron) """ import json, sys, os, re, urllib.request from datetime import datetime DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" OUTPUT_PATH = "/home/hmo/web-dashboard/data/strategy_staleness_report.json" PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" # Fallback: if new path not found, use old path FALLBACK_PATH = "/home/hmo/data/decisions.json" WARN_DAYS = 14 # 超过14天未更新→警告 CRITICAL_DAYS = 21 # 超过21天→严重警告 DIVERGENCE_WARN = 30 # 偏离买入区>30%→警告 DIVERGENCE_CRIT = 50 # 偏离>50%→严重 def get_price(code): """从腾讯API获取当前价""" try: market = "sh" if code.startswith("6") else "sz" if code.startswith("0") or code.startswith("3") else "" if code.startswith(("00", "30")) or code.startswith("68"): market = "sh" if code.startswith("6") else "sz" elif code.startswith(("01", "02", "03")): market = "sz" url = f"http://qt.gtimg.cn/q={market}{code}" req = urllib.request.Request(url, headers={"User-Agent": "curl/7.81"}) with urllib.request.urlopen(req, timeout=5) as resp: raw = resp.read().decode("gbk") parts = raw.split("~") if len(parts) > 3: price = float(parts[3]) if parts[3] else 0 chg = float(parts[32]) if parts[32] else 0 return price, chg if price > 0 else (None, None) except: pass return None, None def parse_buy_zone(current): """从策略current字段提取买入区间最低和最高""" if not current: return None, None m = re.search(r'买入.*?(\d+\.?\d*)\s*[~\-]\s*(\d+\.?\d*)', current) if m: return float(m.group(1)), float(m.group(2)) return None, None def main(): # Try new path first, fall back to old format path = DECISIONS_PATH if os.path.exists(DECISIONS_PATH) else FALLBACK_PATH is_new_format = (path == DECISIONS_PATH) with open(path) as f: data = json.load(f) # Filter: exclude closed strategies all_entries = data.get("decisions", []) active = [e for e in all_entries if e.get("status", "") != "closed"] flagged = [] now = datetime.now() for entry in active: code = entry.get("code", "") name = entry.get("name", "") or code entry_type = entry.get("type", "") is_watchlist = "自选" in entry_type # --- Age tracking: prefer created_at, fallback to timestamp/updated_at --- ts = entry.get("created_at") or entry.get("timestamp") or entry.get("updated_at") or "" age = 0 if ts: try: if "T" in ts: dt = datetime.fromisoformat(ts) else: dt = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S") age = (now - dt).days except: try: # Try alternative format dt = datetime.strptime(str(ts)[:19], "%Y-%m-%d %H:%M:%S") age = (now - dt).days except: pass # --- Get price: prefer local data, API fallback --- price = entry.get("price", None) if not price or price == 0: price, _ = get_price(code) else: price = float(price) if price else None # --- Get entry zone from structured fields --- entry_low = entry.get("entry_low", None) entry_high = entry.get("entry_high", None) if (not entry_low) or (not entry_high): # Fall back to parsing from action/current string txt = entry.get("action", "") or entry.get("current", "") entry_low, entry_high = parse_buy_zone(txt) # --- Build display info --- action_text = entry.get("action", "") or entry.get("current", "") last_update = entry.get("updated_at", "") or entry.get("timestamp", "") flags = [] # Time-based flagging (different thresholds for 持仓 vs 自选) time_threshold = WARN_DAYS if not is_watchlist else WARN_DAYS * 2 # 自选给2倍时间 critical_threshold = CRITICAL_DAYS if not is_watchlist else CRITICAL_DAYS * 2 if age >= critical_threshold and not is_watchlist: flags.append(f"严重过期: {age}天未更新") elif age >= time_threshold and is_watchlist: flags.append(f"策略已{age}天未更新(自选)") elif age >= time_threshold: flags.append(f"策略已{age}天未更新") if price and entry_low and entry_high: # -- STRATEGY_STALE check for watchlist stocks in buy zone -- if is_watchlist and entry_low <= price <= entry_high: timing_signal = entry.get("timing_signal", "") or "" rr_ratio = entry.get("rr_ratio", 0) or 0 sl = entry.get("stop_loss", 0) or 0 tp = entry.get("take_profit", 0) or 0 # 规则1: timing_signal 含"弱势持有"/"等企稳" → STRATEGY_STALE if any(kw in timing_signal for kw in ["弱势持有", "等企稳"]): flags.append("[STRATEGY_STALE] 信号不良(timing_signal含"+str([kw for kw in ["弱势持有", "等企稳"] if kw in timing_signal])+")") # 规则2: RR<1.5 或无止盈 → STRATEGY_STALE if tp == 0 or (rr_ratio > 0 and rr_ratio < 1.5): flags.append("[STRATEGY_STALE] 盈亏比不足(RR="+str(rr_ratio)+")或无止盈") if is_watchlist: # 自选股:检查价格是否进入了买入区 if entry_low <= price <= entry_high: flags.append(f"现价{price:.2f}在买入区{entry_low:.0f}~{entry_high:.0f}(是否可买需结合timing_signal判断)") elif price > entry_high * 1.3: flags.append(f"现价{price:.2f}远高于买入区{entry_low:.0f}~{entry_high:.0f},需重评") elif price < entry_low * 0.8: flags.append(f"现价{price:.2f}远低于买入区{entry_low:.0f}~{entry_high:.0f},买入区需下移") else: # 持仓股:检查价格偏离买入区中心 zone_center = (entry_low + entry_high) / 2 if zone_center > 0: divergence = abs(price - zone_center) / zone_center * 100 if divergence >= DIVERGENCE_CRIT: flags.append(f"价格距买入区中心{divergence:.0f}% 已严重偏离") elif divergence >= DIVERGENCE_WARN: flags.append(f"价格距买入区中心{divergence:.0f}% 需关注") # Check if price entirely out of zone if price > entry_high * 1.5: flags.append(f"现价{price:.0f}远高于买入区{entry_low:.0f}~{entry_high:.0f}") elif price < entry_low * 0.5: flags.append(f"现价{price:.0f}远低于买入区{entry_low:.0f}~{entry_high:.0f}") # Add type tag for clarity type_tag = "自选" if is_watchlist else "持仓" if flags: flagged.append({ "code": code, "name": name, "price": round(price, 2) if price else None, "flags": flags, "age_days": age, "last_update": last_update, "entry_zone": f"{entry_low:.0f}~{entry_high:.0f}" if entry_low else "无买入区", "current": action_text, "updated_by": entry.get("source", "auto"), "updated_reason": "自动生成", "is_watchlist": is_watchlist, }) # --- Portfolio-level analysis: count weak + deep_loss categories --- holdings_count = len([e for e in active if "自选" not in e.get("type", "")]) weak_count = len([e for e in active if e.get("stock_category") in ("弱势", "深套") and "自选" not in e.get("type", "")]) all_weak_count = len([e for e in active if e.get("stock_category") in ("弱势", "深套")]) weak_pct = round(weak_count / holdings_count * 100, 1) if holdings_count > 0 else 0 all_weak_pct = round(all_weak_count / len(active) * 100, 1) if len(active) > 0 else 0 # Read portfolio for position% position_pct = 0 cash = 0 try: with open(PORTFOLIO_PATH) as pf: pdata = json.load(pf) position_pct = pdata.get("position_pct", 0) cash = pdata.get("cash", 0) except: pass portfolio_flags = [] if weak_pct >= 40: portfolio_flags.append(f"[PORTFOLIO_WEAK] 组合中弱势+深套分类持仓占比{weak_pct}%>40%,建议系统性减仓") elif weak_pct >= 30: portfolio_flags.append(f"[PORTFOLIO_WEAK_MILD] 组合弱势占比{weak_pct}%,需关注") if position_pct > 80: portfolio_flags.append(f"[PORTFOLIO_FULL] 总仓位{position_pct}%(现金{cash:.0f}元),买入建议受限") # Write report os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True) report = { "checked_at": now.strftime("%Y-%m-%dT%H:%M:%S"), "total_active": len(active), "flagged_count": len(flagged), "flagged": flagged, "portfolio": { "position_pct": position_pct, "cash": round(cash, 2), "weak_position_pct": weak_pct, "all_weak_pct": all_weak_pct, "signals": portfolio_flags }, "summary": f"扫描{len(active)}个策略,{len(flagged)}个需关注" } with open(OUTPUT_PATH, 'w') as f: json.dump(report, f, indent=2, ensure_ascii=False) # Human-readable output now_str = now.strftime("%m/%d %H:%M") print(f"【策略重估检查】{now_str}") print(f"活跃策略{len(active)}个 | 需关注{len(flagged)}个") # Portfolio-level signals first (if any) for pf in portfolio_flags: print(pf) if flagged: print("") for s in flagged: price_str = f"¥{s['price']}" if s['price'] else "N/A" ttag = "[自选]" if s.get('is_watchlist') else "" print(f"{ttag}[{s['code']}] {s['name']} {price_str}") print(f" 上次更新{s['age_days']}天前 | 区间{s['entry_zone']}") for f in s['flags']: print(f" ⚠ {f}") print(f" 策略: {s['current']}") print("") print(f"—END—{len(flagged)}个需关注 | 弱势持仓占比{weak_pct}% | 仓位{position_pct}%") return len(flagged) if __name__ == "__main__": try: stale = main() sys.exit(0) # Always exit 0 — flagged items are in JSON report + stdout; exit code 1 creates misleading "Script Error" alerts except Exception as e: print(f"ERROR: {e}") sys.exit(2)