#!/usr/bin/env python3 """ stale_push_wlin.py — 按5步逻辑推送自选股买入区提醒 + 自动触发重评 5步逻辑: 1. 筛选 is_watchlist=true 且价在买入区 2. RR<1.5/无止盈位/非买入signal → 标记 STRATEGY_STALE → 触发自动重评 3. 可推的:计算每手买入金额和现金占比 4. 发现 STRATEGY_STALE → 后台跑 per_stock_reassess.py 自动重评 no_agent模式:有推送→输出;无→静默 搭配 cron: no_agent=True, 交易日每30分跑一次 """ import subprocess import sys import re import json import os import threading from datetime import datetime try: from urllib.request import Request, urlopen except ImportError: from urllib2 import Request, urlopen XMPP_BRIDGE = "http://127.0.0.1:5805/" XMPP_USER = "hmo@yoin.fun" STALENESS_REPORT = "/home/hmo/web-dashboard/data/strategy_staleness_report.json" DETECTOR = "/home/hmo/.hermes/profiles/position-analyst/scripts/stale_detector.py" PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" REGEN_SCRIPT = "/home/hmo/.hermes/profiles/position-analyst/scripts/per_stock_reassess.py" REGEN_LOCK = "/tmp/.stale_push_wlin_regen.lock" MACRO_CTX = "/home/hmo/web-dashboard/data/macro_context.json" MARKET_JSON = "/home/hmo/web-dashboard/data/market.json" NON_BUY_SIGNALS = ["观望", "弱势持有", "深套持有"] def load_macro_line(): """加载大盘和市场的简要描述""" parts = [] try: with open(MACRO_CTX) as f: m = json.load(f).get("structure", {}) overall = m.get("overall", "neutral") desc = m.get("description", "") if "bearish" in overall: parts.append("大盘偏弱") elif overall == "bullish": parts.append("大盘偏强") elif desc: parts.append(f"大盘{desc}") except Exception: pass try: with open(MARKET_JSON) as f: mk = json.load(f) mood = mk.get("mood", "") if mood: parts.append(f"市场{mood}") except Exception: pass return " | ".join(parts) if parts else "" def is_actionable(cur, timing_signal=""): """检查信号是否可操作。空文本/含非买入关键词 → 不可操作""" if not cur and not timing_signal: return False # 空文本默认不安全 for kw in NON_BUY_SIGNALS: if cur and kw.lower() in cur.lower(): return False if timing_signal and kw.lower() in timing_signal.lower(): return False return True def trigger_regen_sync(stock_codes=None): """同步执行指定个股的重评(等重评完再发报告)""" if not stock_codes: return try: cmd = ["python3", REGEN_SCRIPT] + stock_codes subprocess.run(cmd, capture_output=True, text=True, timeout=60) except subprocess.TimeoutExpired: print("[REGEN] 重评超时(60s)", file=sys.stderr) except Exception as e: print(f"[REGEN] 重评失败: {e}", file=sys.stderr) def load_cash(): """从 portfolio.json 实时读现金,不硬编码""" try: with open(PORTFOLIO_PATH) as f: data = json.load(f) if isinstance(data, dict): return data.get("cash", 0) if isinstance(data, list) and len(data) > 1 and isinstance(data[1], dict): return data[1].get("cash", 0) return 0 except Exception: return 0 _HK_LOT_CACHE = {} def hk_lot_size(code): """从腾讯行情API获取港股实际每手股数(字段[60]),带缓存""" if code in _HK_LOT_CACHE: return _HK_LOT_CACHE[code] try: url = f"http://qt.gtimg.cn/q=hk{code}" req = Request(url, headers={"User-Agent": "curl/7.81"}) with urlopen(req, timeout=5) as r: text = r.read().decode("gbk") raw = text.split("=", 1)[1].strip().strip('"').strip(";") fld = raw.split("~") lot = int(fld[60]) if len(fld) > 60 and fld[60] else 1000 _HK_LOT_CACHE[code] = lot return lot except Exception: _HK_LOT_CACHE[code] = 1000 return 1000 def lot_cost(code, price): if str(code).startswith("688"): return 200 * price elif len(str(code)) == 5: lot = hk_lot_size(code) try: sys.path.insert(0, '/home/hmo/MoFin') from hk_rate import hkd_to_cny rate = hkd_to_cny() except Exception: rate = 0.87 return int(lot * price * rate) else: return 100 * price def push_to_xmpp(text): """通过知微 HTTP bridge 推送到老爸私信""" if not text.strip(): return try: payload = json.dumps({ "to": XMPP_USER, "body": text.strip(), "type": "chat", }).encode("utf-8") req = Request(XMPP_BRIDGE, data=payload, headers={"Content-Type": "application/json"}) urlopen(req, timeout=5) except Exception as e: print(f"[XMPP推送失败] {e}", file=sys.stderr) def main(): r = subprocess.run( ["python3", DETECTOR], capture_output=True, text=True, timeout=60 ) if r.returncode != 0 and r.stderr: print(f"[stderr] {r.stderr.strip()}", file=sys.stderr) wl_lines = [ l for l in r.stdout.split("\n") if "[WL_IN]" in l and "[自选]" in l ] if not wl_lines: return 0 # 读 stale report try: with open(STALENESS_REPORT) as f: report = json.load(f) except Exception: report = {"flagged": []} code_cur = {i["code"]: i.get("current", "") for i in report.get("flagged", [])} # 读 decisions.json 获取完整策略数据 code_data = {} try: with open("/home/hmo/web-dashboard/data/decisions.json") as f: dec = json.load(f) for e in dec.get("decisions", []): code_data[e["code"]] = e except Exception: pass cash = load_cash() stocks = [] stale_list = [] for l in wl_lines: m = re.match(r'\[WL_IN\](?:\s+\[\w+\])*\s+\[自选\]\s+(\S+)\((\d+)\)', l) if not m: continue name, code = m.group(1), m.group(2) pm = re.search(r'价(\d+\.\d{2})', l) if not pm: continue price = float(pm.group(1)) zm = re.search(r'买入([\d.]+)~([\d.]+)', l) if not zm: continue buy_low, buy_high = float(zm.group(1)), float(zm.group(2)) is_stale = "[STRATEGY_STALE]" in l cur = code_cur.get(code, "") if not is_actionable(cur, code_data.get(code, {}).get("timing_signal", "")) or is_stale: stale_list.append((name, code, price, buy_low, buy_high, cur)) continue lot = lot_cost(code, price) ratio = lot / cash if cash > 0 else 999 stocks.append((name, code, price, buy_low, buy_high, lot, ratio)) if not stocks and not stale_list: return 0 now = datetime.now().strftime("%H:%M") lines = [] # 市场背景 macro_line = load_macro_line() if macro_line: lines.append(f"【市场背景】{macro_line}") # [重评] 内部流程 — 不在报告中展示,只执行重评 if stale_list: stale_codes = [s[1] for s in stale_list] trigger_regen_sync(stale_codes) # 重评完成,re-read decisions.json code_data = {} try: with open("/home/hmo/web-dashboard/data/decisions.json") as f: dec = json.load(f) for e in dec.get("decisions", []): code_data[e["code"]] = e except Exception: pass stocks.sort(key=lambda s: ( 0 if len(str(s[1])) == 6 else 1, -code_data.get(s[1], {}).get("rr_ratio", 0) )) # 只展示有清晰操作信号的个股:不含"等企稳""关注""信号不充分""neutral"及纯持有信号 SKIP_KEYWORDS = ["等企稳", "关注", "信号不充分"] BUY_KEYWORDS = ["买入", "加仓"] actionable = [] for s in stocks: sig = code_data.get(s[1], {}).get("timing_signal", "") if not sig: continue # 跳过非操作信号 if any(kw in sig for kw in SKIP_KEYWORDS): continue # 中性信号跳过 stripped = sig.strip().lower() if not stripped or stripped in ("", "neutral", "持有", "深套持有", "弱势持有"): continue actionable.append(s) if not actionable: return 0 # 无操作信号 → 静默,不推 # 加载基本面缓存(PE等) fund_cache = {} try: with open("/home/hmo/web-dashboard/data/multi_tf_cache.json") as f: mtf = json.load(f) for code, v in mtf.items(): fund_cache[code] = v.get("fundamentals", {}) except Exception: pass # 仓位计算 n = len(actionable) def calc_positions(code, price, lot_cost, rr, market_factor, cat): # 理论推荐仓位(基于RR+市场+品种特性) if rr >= 5: theo_pct = 25 elif rr >= 3: theo_pct = 18 elif rr >= 2: theo_pct = 12 else: theo_pct = 8 if "偏弱" in market_factor: theo_pct = int(theo_pct * 0.8) elif "偏强" in market_factor: theo_pct = int(theo_pct * 1.15) if cat in ("蓝筹", "白马"): theo_pct = int(theo_pct * 1.2) elif cat in ("题材", "短线"): theo_pct = int(theo_pct * 0.6) elif cat in ("高波动", "成长"): theo_pct = int(theo_pct * 0.85) theo_pct = max(5, min(30, theo_pct)) # 当前推荐仓位(基于实际现金) # 可操作多只时,每只分配比例降低 if n >= 5: share_pct = theo_pct * 0.5 elif n >= 3: share_pct = theo_pct * 0.7 elif n >= 2: share_pct = theo_pct * 0.85 else: share_pct = theo_pct share_pct = max(5, min(theo_pct, share_pct)) budget = cash * share_pct / 100 lots = int(budget / lot_cost) if lot_cost > 0 else 0 if lots == 0 and lot_cost > 0 and budget > 0: # 预算不够1手 → 推荐至少1手 lots = 1 actual_pct = round(lots * lot_cost / cash * 100) if cash > 0 else 0 else: actual_pct = round(lots * lot_cost / cash * 100) if cash > 0 else 0 if lots == 0: lots_display = "预算不足1手" elif budget < lot_cost and lots == 1: lots_display = f"至少{lots}手" else: lots_display = f"{lots}手" return theo_pct, actual_pct, lots_display, lots * lot_cost # 标准格式:每个可操作标的 — 大盘/行业/个股三面 + 仓位 lines.append(f"【💡 操作建议】(当前{n}只自选可操作 | 现金{cash:,.0f}元)") for s in actionable: name, code, price, buy_low, buy_high, lot, ratio = s d = code_data.get(code, {}) sl = d.get("stop_loss", 0) tp = d.get("take_profit", 0) rr = d.get("rr_ratio", 0) sig = d.get("timing_signal", "") sector = d.get("sector_context", "") tech = d.get("tech_snapshot", "") note = d.get("note", "") d_factors = d.get("signal_factors", []) cat = d.get("stock_category", "") # 提取技术位 ss = {"强撑":"-", "弱撑":"-", "弱压":"-", "强压":"-"} for tag in ss: m = re.search(rf'{tag}:([\d.]+)', tech) if m: ss[tag] = m.group(1) # 基本面 fund = fund_cache.get(code, {}) pe = fund.get("pe", 0) eps = fund.get("eps", 0) pe_str = f"PE{pe:.0f}" if pe else "" eps_str = f"EPS{eps:.2f}" if eps else "" # 从 signal_factors 提取各维度 def _match_factor(prefix): for f in d_factors: if f.startswith(prefix): return f return "" market_factor = _match_factor("大盘") sector_factor = _match_factor("行业") value_factor = _match_factor("高估值") or _match_factor("低估值") or _match_factor("蓝筹") or pe_str or "" news_factor = _match_factor("消息") tech_factor = _match_factor("净利") or _match_factor("组合") or "" # 构建分析行 parts = [] if market_factor: parts.append(f"大盘{market_factor.replace('大盘','')}") if sector_factor: parts.append(f"行业{sector_factor.replace('行业','')}") if pe_str or value_factor: parts.append(value_factor or pe_str) if news_factor: parts.append(news_factor) if not parts: parts.append(sector or cat or "") analysis = " | ".join(p for p in parts if p) # 仓位计算 theo_pct, actual_pct, lots_display, lots_cost_total = calc_positions(code, price, lot, rr, market_factor, cat) pfx = "" if len(code) == 6 else "HK$" lines.append( f" {name}({code}) {pfx}{price:.2f} 买区{buy_low}~{buy_high} | " f"1手{lot:,.0f}元 RR={rr:.1f} 损{sl} 盈{tp}\n" f" {analysis}\n" f" 技术{ss['强撑']}→{ss['弱撑']}→{ss['弱压']}→{ss['强压']} | 信号{sig}\n" f" 仓位:理论推荐{theo_pct}% | 当前建议{actual_pct}%({lots_display}≈{lots_cost_total:,.0f}元)" ) lines.insert(0, f"【知微】自选买入提醒 {now} | 现金{cash:,.0f}元") out = "\n".join(lines) print(out) push_to_xmpp(out) return 1 if __name__ == "__main__": sys.exit(main())