e8653acd37
- 11个不一致cron脚本同步(MoFin→profile) - 4个游离代码收入MoFin(300308_monitor/monitor_300308/refresh_mtf_cache/strategy-staleness-check) - 6个关键依赖部署到profile(mo_models/mo_config/mo_provider/hk_rate/data_governance/data_validate)
257 lines
11 KiB
Python
257 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
策略时效性检查 - Strategy Staleness Checker v1
|
|
==============================================
|
|
扫描所有活跃策略,检查三个维度:
|
|
1. 时间老化:超过14/21天未更新预警
|
|
2. 价格偏离:当前价偏离买入区中心 >30% 预警
|
|
3. 买入区失效:买入区完全脱离当前价格区间
|
|
|
|
输出两份:JSON报告(给系统) + 人类可读摘要(stdout for cron)
|
|
"""
|
|
|
|
import json, sys, os, re, urllib.request
|
|
from datetime import datetime
|
|
|
|
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
|
|
OUTPUT_PATH = "/home/hmo/web-dashboard/data/strategy_staleness_report.json"
|
|
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
|
|
|
|
# Fallback: if new path not found, use old path
|
|
FALLBACK_PATH = "/home/hmo/data/decisions.json"
|
|
|
|
WARN_DAYS = 14 # 超过14天未更新→警告
|
|
CRITICAL_DAYS = 21 # 超过21天→严重警告
|
|
DIVERGENCE_WARN = 30 # 偏离买入区>30%→警告
|
|
DIVERGENCE_CRIT = 50 # 偏离>50%→严重
|
|
|
|
def get_price(code):
|
|
"""从腾讯API获取当前价"""
|
|
try:
|
|
market = "sh" if code.startswith("6") else "sz" if code.startswith("0") or code.startswith("3") else ""
|
|
if code.startswith(("00", "30")) or code.startswith("68"):
|
|
market = "sh" if code.startswith("6") else "sz"
|
|
elif code.startswith(("01", "02", "03")):
|
|
market = "sz"
|
|
url = f"http://qt.gtimg.cn/q={market}{code}"
|
|
req = urllib.request.Request(url, headers={"User-Agent": "curl/7.81"})
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
raw = resp.read().decode("gbk")
|
|
parts = raw.split("~")
|
|
if len(parts) > 3:
|
|
price = float(parts[3]) if parts[3] else 0
|
|
chg = float(parts[32]) if parts[32] else 0
|
|
return price, chg if price > 0 else (None, None)
|
|
except: pass
|
|
return None, None
|
|
|
|
def parse_buy_zone(current):
|
|
"""从策略current字段提取买入区间最低和最高"""
|
|
if not current:
|
|
return None, None
|
|
m = re.search(r'买入.*?(\d+\.?\d*)\s*[~\-]\s*(\d+\.?\d*)', current)
|
|
if m:
|
|
return float(m.group(1)), float(m.group(2))
|
|
return None, None
|
|
|
|
def main():
|
|
# Try new path first, fall back to old format
|
|
path = DECISIONS_PATH if os.path.exists(DECISIONS_PATH) else FALLBACK_PATH
|
|
is_new_format = (path == DECISIONS_PATH)
|
|
|
|
with open(path) as f:
|
|
data = json.load(f)
|
|
|
|
# Filter: exclude closed strategies
|
|
all_entries = data.get("decisions", [])
|
|
active = [e for e in all_entries if e.get("status", "") != "closed"]
|
|
flagged = []
|
|
now = datetime.now()
|
|
|
|
for entry in active:
|
|
code = entry.get("code", "")
|
|
name = entry.get("name", "") or code
|
|
entry_type = entry.get("type", "")
|
|
is_watchlist = "自选" in entry_type
|
|
|
|
# --- Age tracking: prefer created_at, fallback to timestamp/updated_at ---
|
|
ts = entry.get("created_at") or entry.get("timestamp") or entry.get("updated_at") or ""
|
|
age = 0
|
|
if ts:
|
|
try:
|
|
if "T" in ts:
|
|
dt = datetime.fromisoformat(ts)
|
|
else:
|
|
dt = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S")
|
|
age = (now - dt).days
|
|
except:
|
|
try:
|
|
# Try alternative format
|
|
dt = datetime.strptime(str(ts)[:19], "%Y-%m-%d %H:%M:%S")
|
|
age = (now - dt).days
|
|
except:
|
|
pass
|
|
|
|
# --- Get price: prefer local data, API fallback ---
|
|
price = entry.get("price", None)
|
|
if not price or price == 0:
|
|
price, _ = get_price(code)
|
|
else:
|
|
price = float(price) if price else None
|
|
|
|
# --- Get entry zone from structured fields ---
|
|
entry_low = entry.get("entry_low", None)
|
|
entry_high = entry.get("entry_high", None)
|
|
if (not entry_low) or (not entry_high):
|
|
# Fall back to parsing from action/current string
|
|
txt = entry.get("action", "") or entry.get("current", "")
|
|
entry_low, entry_high = parse_buy_zone(txt)
|
|
|
|
# --- Build display info ---
|
|
action_text = entry.get("action", "") or entry.get("current", "")
|
|
last_update = entry.get("updated_at", "") or entry.get("timestamp", "")
|
|
|
|
flags = []
|
|
|
|
# Time-based flagging (different thresholds for 持仓 vs 自选)
|
|
time_threshold = WARN_DAYS if not is_watchlist else WARN_DAYS * 2 # 自选给2倍时间
|
|
critical_threshold = CRITICAL_DAYS if not is_watchlist else CRITICAL_DAYS * 2
|
|
|
|
if age >= critical_threshold and not is_watchlist:
|
|
flags.append(f"严重过期: {age}天未更新")
|
|
elif age >= time_threshold and is_watchlist:
|
|
flags.append(f"策略已{age}天未更新(自选)")
|
|
elif age >= time_threshold:
|
|
flags.append(f"策略已{age}天未更新")
|
|
|
|
if price and entry_low and entry_high:
|
|
# -- STRATEGY_STALE check for watchlist stocks in buy zone --
|
|
if is_watchlist and entry_low <= price <= entry_high:
|
|
timing_signal = entry.get("timing_signal", "") or ""
|
|
rr_ratio = entry.get("rr_ratio", 0) or 0
|
|
sl = entry.get("stop_loss", 0) or 0
|
|
tp = entry.get("take_profit", 0) or 0
|
|
# 规则1: timing_signal 含"弱势持有"/"等企稳" → STRATEGY_STALE
|
|
if any(kw in timing_signal for kw in ["弱势持有", "等企稳"]):
|
|
flags.append("[STRATEGY_STALE] 信号不良(timing_signal含"+str([kw for kw in ["弱势持有", "等企稳"] if kw in timing_signal])+")")
|
|
# 规则2: RR<1.5 或无止盈 → STRATEGY_STALE
|
|
if tp == 0 or (rr_ratio > 0 and rr_ratio < 1.5):
|
|
flags.append("[STRATEGY_STALE] 盈亏比不足(RR="+str(rr_ratio)+")或无止盈")
|
|
|
|
if is_watchlist:
|
|
# 自选股:检查价格是否进入了买入区
|
|
if entry_low <= price <= entry_high:
|
|
flags.append(f"现价{price:.2f}在买入区{entry_low:.0f}~{entry_high:.0f}(是否可买需结合timing_signal判断)")
|
|
elif price > entry_high * 1.3:
|
|
flags.append(f"现价{price:.2f}远高于买入区{entry_low:.0f}~{entry_high:.0f},需重评")
|
|
elif price < entry_low * 0.8:
|
|
flags.append(f"现价{price:.2f}远低于买入区{entry_low:.0f}~{entry_high:.0f},买入区需下移")
|
|
else:
|
|
# 持仓股:检查价格偏离买入区中心
|
|
zone_center = (entry_low + entry_high) / 2
|
|
if zone_center > 0:
|
|
divergence = abs(price - zone_center) / zone_center * 100
|
|
if divergence >= DIVERGENCE_CRIT:
|
|
flags.append(f"价格距买入区中心{divergence:.0f}% 已严重偏离")
|
|
elif divergence >= DIVERGENCE_WARN:
|
|
flags.append(f"价格距买入区中心{divergence:.0f}% 需关注")
|
|
|
|
# Check if price entirely out of zone
|
|
if price > entry_high * 1.5:
|
|
flags.append(f"现价{price:.0f}远高于买入区{entry_low:.0f}~{entry_high:.0f}")
|
|
elif price < entry_low * 0.5:
|
|
flags.append(f"现价{price:.0f}远低于买入区{entry_low:.0f}~{entry_high:.0f}")
|
|
# Add type tag for clarity
|
|
type_tag = "自选" if is_watchlist else "持仓"
|
|
|
|
if flags:
|
|
flagged.append({
|
|
"code": code,
|
|
"name": name,
|
|
"price": round(price, 2) if price else None,
|
|
"flags": flags,
|
|
"age_days": age,
|
|
"last_update": last_update,
|
|
"entry_zone": f"{entry_low:.0f}~{entry_high:.0f}" if entry_low else "无买入区",
|
|
"current": action_text,
|
|
"updated_by": entry.get("source", "auto"),
|
|
"updated_reason": "自动生成",
|
|
"is_watchlist": is_watchlist,
|
|
})
|
|
|
|
# --- Portfolio-level analysis: count weak + deep_loss categories ---
|
|
holdings_count = len([e for e in active if "自选" not in e.get("type", "")])
|
|
weak_count = len([e for e in active if e.get("stock_category") in ("弱势", "深套") and "自选" not in e.get("type", "")])
|
|
all_weak_count = len([e for e in active if e.get("stock_category") in ("弱势", "深套")])
|
|
weak_pct = round(weak_count / holdings_count * 100, 1) if holdings_count > 0 else 0
|
|
all_weak_pct = round(all_weak_count / len(active) * 100, 1) if len(active) > 0 else 0
|
|
|
|
# Read portfolio for position%
|
|
position_pct = 0
|
|
cash = 0
|
|
try:
|
|
with open(PORTFOLIO_PATH) as pf:
|
|
pdata = json.load(pf)
|
|
position_pct = pdata.get("position_pct", 0)
|
|
cash = pdata.get("cash", 0)
|
|
except: pass
|
|
|
|
portfolio_flags = []
|
|
if weak_pct >= 40:
|
|
portfolio_flags.append(f"[PORTFOLIO_WEAK] 组合中弱势+深套分类持仓占比{weak_pct}%>40%,建议系统性减仓")
|
|
elif weak_pct >= 30:
|
|
portfolio_flags.append(f"[PORTFOLIO_WEAK_MILD] 组合弱势占比{weak_pct}%,需关注")
|
|
if position_pct > 80:
|
|
portfolio_flags.append(f"[PORTFOLIO_FULL] 总仓位{position_pct}%(现金{cash:.0f}元),买入建议受限")
|
|
|
|
# Write report
|
|
os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True)
|
|
report = {
|
|
"checked_at": now.strftime("%Y-%m-%dT%H:%M:%S"),
|
|
"total_active": len(active),
|
|
"flagged_count": len(flagged),
|
|
"flagged": flagged,
|
|
"portfolio": {
|
|
"position_pct": position_pct,
|
|
"cash": round(cash, 2),
|
|
"weak_position_pct": weak_pct,
|
|
"all_weak_pct": all_weak_pct,
|
|
"signals": portfolio_flags
|
|
},
|
|
"summary": f"扫描{len(active)}个策略,{len(flagged)}个需关注"
|
|
}
|
|
with open(OUTPUT_PATH, 'w') as f:
|
|
json.dump(report, f, indent=2, ensure_ascii=False)
|
|
|
|
# Human-readable output
|
|
now_str = now.strftime("%m/%d %H:%M")
|
|
print(f"【策略重估检查】{now_str}")
|
|
print(f"活跃策略{len(active)}个 | 需关注{len(flagged)}个")
|
|
|
|
# Portfolio-level signals first (if any)
|
|
for pf in portfolio_flags:
|
|
print(pf)
|
|
|
|
if flagged:
|
|
print("")
|
|
for s in flagged:
|
|
price_str = f"¥{s['price']}" if s['price'] else "N/A"
|
|
ttag = "[自选]" if s.get('is_watchlist') else ""
|
|
print(f"{ttag}[{s['code']}] {s['name']} {price_str}")
|
|
print(f" 上次更新{s['age_days']}天前 | 区间{s['entry_zone']}")
|
|
for f in s['flags']:
|
|
print(f" ⚠ {f}")
|
|
print(f" 策略: {s['current']}")
|
|
print("")
|
|
|
|
print(f"—END—{len(flagged)}个需关注 | 弱势持仓占比{weak_pct}% | 仓位{position_pct}%")
|
|
return len(flagged)
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
stale = main()
|
|
sys.exit(0) # Always exit 0 — flagged items are in JSON report + stdout; exit code 1 creates misleading "Script Error" alerts
|
|
except Exception as e:
|
|
print(f"ERROR: {e}")
|
|
sys.exit(2)
|