#!/usr/bin/env python3 """ stale_push_wlin.py — 按5步逻辑推送自选股买入区提醒 + 自动触发重评 5步逻辑: 1. 筛选 is_watchlist=true 且价在买入区 2. RR<1.5/无止盈位/非买入signal → 标记 STRATEGY_STALE → 触发自动重评 3. 可推的:计算每手买入金额和现金占比 4. 发现 STRATEGY_STALE → 后台跑 per_stock_reassess.py 自动重评 no_agent模式:有推送→输出;无→静默 搭配 cron: no_agent=True, 交易日每30分跑一次 """ import subprocess import sys import re import json import os import threading import time from datetime import datetime try: from urllib.request import Request, urlopen except ImportError: from urllib2 import Request, urlopen XMPP_BRIDGE = "http://127.0.0.1:5805/" XMPP_USER = "hmo@yoin.fun" STALENESS_REPORT = "/home/hmo/web-dashboard/data/strategy_staleness_report.json" DETECTOR = "/home/hmo/.hermes/profiles/position-analyst/scripts/stale_detector.py" PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" REGEN_SCRIPT = "/home/hmo/.hermes/profiles/position-analyst/scripts/per_stock_reassess.py" REGEN_LOCK = "/tmp/.stale_push_wlin_regen.lock" MACRO_CTX = "/home/hmo/web-dashboard/data/macro_context.json" MARKET_JSON = "/home/hmo/web-dashboard/data/market.json" COOLDOWN_PATH = "/home/hmo/web-dashboard/data/push_cooldown.json" NON_BUY_SIGNALS = ["观望", "弱势持有", "深套持有"] def load_macro_line(): """加载大盘和市场的简要描述""" parts = [] try: with open(MACRO_CTX) as f: m = json.load(f).get("structure", {}) overall = m.get("overall", "neutral") desc = m.get("description", "") if "bearish" in overall: parts.append("大盘偏弱") elif overall == "bullish": parts.append("大盘偏强") elif desc: parts.append(f"大盘{desc}") except Exception: pass try: with open(MARKET_JSON) as f: mk = json.load(f) mood = mk.get("mood", "") if mood: parts.append(f"市场{mood}") except Exception: pass return " | ".join(parts) if parts else "" def is_actionable(cur, timing_signal=""): """检查信号是否可操作。空文本/含非买入关键词 → 不可操作""" if not cur and not timing_signal: return False # 空文本默认不安全 for kw in NON_BUY_SIGNALS: if cur and kw.lower() in cur.lower(): return False if timing_signal and kw.lower() in timing_signal.lower(): return False return True def trigger_regen_sync(stock_codes=None): """同步执行指定个股的重评(等重评完再发报告)""" if not stock_codes: return try: cmd = ["python3", REGEN_SCRIPT] + stock_codes subprocess.run(cmd, capture_output=True, text=True, timeout=60) except subprocess.TimeoutExpired: print("[REGEN] 重评超时(60s)", file=sys.stderr) except Exception as e: print(f"[REGEN] 重评失败: {e}", file=sys.stderr) def load_cash(): """从 portfolio.json 实时读现金,不硬编码""" try: with open(PORTFOLIO_PATH) as f: data = json.load(f) if isinstance(data, dict): return data.get("cash", 0) if isinstance(data, list) and len(data) > 1 and isinstance(data[1], dict): return data[1].get("cash", 0) return 0 except Exception: return 0 _HK_LOT_CACHE = {} def hk_lot_size(code): """从腾讯行情API获取港股实际每手股数(字段[60]),带缓存""" if code in _HK_LOT_CACHE: return _HK_LOT_CACHE[code] try: url = f"http://qt.gtimg.cn/q=hk{code}" req = Request(url, headers={"User-Agent": "curl/7.81"}) with urlopen(req, timeout=5) as r: text = r.read().decode("gbk") raw = text.split("=", 1)[1].strip().strip('"').strip(";") fld = raw.split("~") lot = int(fld[60]) if len(fld) > 60 and fld[60] else 1000 _HK_LOT_CACHE[code] = lot return lot except Exception: _HK_LOT_CACHE[code] = 1000 return 1000 def lot_cost(code, price): if str(code).startswith("688"): return 200 * price elif len(str(code)) == 5: lot = hk_lot_size(code) try: sys.path.insert(0, '/home/hmo/MoFin') from hk_rate import hkd_to_cny rate = hkd_to_cny() except Exception: rate = 0.87 return int(lot * price * rate) else: return 100 * price def push_to_xmpp(text): """通过知微 HTTP bridge 推送到老爸私信""" if not text.strip(): return try: payload = json.dumps({ "to": XMPP_USER, "body": text.strip(), "type": "chat", }).encode("utf-8") req = Request(XMPP_BRIDGE, data=payload, headers={"Content-Type": "application/json"}) urlopen(req, timeout=5) except Exception as e: print(f"[XMPP推送失败] {e}", file=sys.stderr) def load_cooldown(): try: with open(COOLDOWN_PATH) as f: return json.load(f) except Exception: return {} def save_cooldown(cd): try: with open(COOLDOWN_PATH, "w") as f: json.dump(cd, f, indent=2) except Exception: pass def in_cooldown(code, action_type, cooldown_dict, minutes=30): key = f"{code}_{action_type}" last = cooldown_dict.get(key, 0) elapsed = time.time() - last return elapsed < minutes * 60, elapsed, key def main(): r = subprocess.run( ["python3", DETECTOR], capture_output=True, text=True, timeout=60 ) if r.returncode != 0 and r.stderr: print(f"[stderr] {r.stderr.strip()}", file=sys.stderr) wl_lines = [ l for l in r.stdout.split("\n") if "[WL_IN]" in l and "[自选]" in l ] if not wl_lines: return 0 # 读 stale report try: with open(STALENESS_REPORT) as f: report = json.load(f) except Exception: report = {"flagged": []} code_cur = {i["code"]: i.get("current", "") for i in report.get("flagged", [])} # 加载冷却状态 cooldown = load_cooldown() now_ts = time.time() # 读 decisions.json 获取完整策略数据 code_data = {} try: with open("/home/hmo/web-dashboard/data/decisions.json") as f: dec = json.load(f) for e in dec.get("decisions", []): code_data[e["code"]] = e except Exception: pass cash = load_cash() stocks = [] stale_list = [] for l in wl_lines: m = re.match(r'\[WL_IN\](?:\s+\[\w+\])*\s+\[自选\]\s+(\S+)\((\d+)\)', l) if not m: continue name, code = m.group(1), m.group(2) pm = re.search(r'价(\d+\.\d{2})', l) if not pm: continue price = float(pm.group(1)) zm = re.search(r'买入([\d.]+)~([\d.]+)', l) if not zm: continue buy_low, buy_high = float(zm.group(1)), float(zm.group(2)) is_stale = "[STRATEGY_STALE]" in l cur = code_cur.get(code, "") if not is_actionable(cur, code_data.get(code, {}).get("timing_signal", "")) or is_stale: stale_list.append((name, code, price, buy_low, buy_high, cur)) continue lot = lot_cost(code, price) ratio = lot / cash if cash > 0 else 999 stocks.append((name, code, price, buy_low, buy_high, lot, ratio)) if not stocks and not stale_list: return 0 now = datetime.now().strftime("%H:%M") lines = [] # 市场背景 macro_line = load_macro_line() if macro_line: lines.append(f"【市场背景】{macro_line}") # [重评] 内部流程 — 不在报告中展示,只执行重评 if stale_list: stale_codes = [s[1] for s in stale_list] trigger_regen_sync(stale_codes) # 重评完成,re-read decisions.json code_data = {} try: with open("/home/hmo/web-dashboard/data/decisions.json") as f: dec = json.load(f) for e in dec.get("decisions", []): code_data[e["code"]] = e except Exception: pass stocks.sort(key=lambda s: ( 0 if len(str(s[1])) == 6 else 1, -code_data.get(s[1], {}).get("rr_ratio", 0) )) # 只展示有清晰操作信号的个股:不含"等企稳""关注""信号不充分""neutral"及纯持有信号 SKIP_KEYWORDS = ["等企稳", "关注", "信号不充分"] BUY_KEYWORDS = ["买入", "加仓"] actionable = [] for s in stocks: sig = code_data.get(s[1], {}).get("timing_signal", "") if not sig: continue # 跳过非操作信号 if any(kw in sig for kw in SKIP_KEYWORDS): continue # 中性信号跳过 stripped = sig.strip().lower() if not stripped or stripped in ("", "neutral", "持有", "深套持有", "弱势持有"): continue actionable.append(s) if not actionable: return 0 # 无操作信号 → 静默,不推 # 加载基本面缓存(PE等) fund_cache = {} try: with open("/home/hmo/web-dashboard/data/multi_tf_cache.json") as f: mtf = json.load(f) for code, v in mtf.items(): fund_cache[code] = v.get("fundamentals", {}) except Exception: pass # 仓位计算:从holding.xls导入的portfolio.json读取总资产和现金 n = len(actionable) total_assets = 0 available_cash = 0 try: with open("/home/hmo/web-dashboard/data/portfolio.json") as f: pf = json.load(f) available_cash = pf.get("cash", 0) or 0 for h in pf.get("holdings", []): mv = h.get("shares", 0) * h.get("price", 0) if h.get("currency") == "HKD": mv *= h.get("exchange_rate", 0.866) total_assets += mv total_assets += available_cash except Exception: total_assets = available_cash * 5 # fallback # 加载策略树模块(获取当前情景+分支评估) st = None scenario_id = "" scenario_label = "" try: import importlib.util spec = importlib.util.spec_from_file_location("st_module", "/home/hmo/MoFin/strategy_tree.py") st = importlib.util.module_from_spec(spec) spec.loader.exec_module(st) sc = st.detect_scenario() scenario_id = sc.get("id", "") scenario_label = sc.get("label", "") except Exception: pass def calc_position(lot_cost, rr, market_factor, cat, code=""): # 理论推荐仓位(% of 总资产) — 仅基于RR+市场+品种,不受现金限制 if rr >= 5: theo_pct = 25 elif rr >= 3: theo_pct = 18 elif rr >= 2: theo_pct = 12 else: theo_pct = 8 if "偏弱" in market_factor: theo_pct = int(theo_pct * 0.8) elif "偏强" in market_factor: theo_pct = int(theo_pct * 1.15) if cat in ("蓝筹", "白马"): theo_pct = int(theo_pct * 1.2) elif cat in ("题材", "短线"): theo_pct = int(theo_pct * 0.6) elif cat in ("高波动", "成长"): theo_pct = int(theo_pct * 0.85) theo_pct = max(5, min(30, theo_pct)) # 当前建议仓位:理论占总资产% → 按现金锁死 ideal_budget = total_assets * theo_pct / 100 # 可操作N只时,现金分配不超过 available_cash / n * 1.5 max_use_cash = (available_cash / max(n, 1)) * 1.5 budget = min(ideal_budget, max_use_cash, available_cash) lots = int(budget / lot_cost) if lot_cost > 0 else 0 if lots == 0 and lot_cost > 0 and budget > lot_cost * 0.8: # 预算覆盖超过80%的1手金额 → 至少1手(仅差一档) lots = 1 lot_cost_total = lots * lot_cost if lots == 0: pct_actual = 0 elif total_assets > 0: pct_actual = round(lot_cost_total / total_assets * 100) else: pct_actual = 0 if lots == 0: details = f"预算不足1手({budget:,.0f}/{lot_cost:,.0f}元)" else: if len(str(code)) == 5: hk_lot = hk_lot_size(code) shares = lots * hk_lot elif code.startswith("688"): shares = lots * 200 else: shares = lots * 100 details = f"{lots}手({shares}股,{lot_cost_total:,.0f}元)" return theo_pct, pct_actual, details, lots, lot_cost_total # 标准格式:每个可操作标的 — 大盘/行业/个股三面 + 仓位 lines.append(f"【💡 操作建议】(当前{n}只自选可操作 | 总资产{total_assets:,.0f}元 现金{available_cash:,.0f}元)") for s in actionable: name, code, price, buy_low, buy_high, lot, ratio = s d = code_data.get(code, {}) sl = d.get("stop_loss", 0) tp = d.get("take_profit", 0) rr = d.get("rr_ratio", 0) sig = d.get("timing_signal", "") sector = d.get("sector_context", "") tech = d.get("tech_snapshot", "") note = d.get("note", "") d_factors = d.get("signal_factors", []) cat = d.get("stock_category", "") # 提取技术位 ss = {"强撑":"-", "弱撑":"-", "弱压":"-", "强压":"-"} for tag in ss: m = re.search(rf'{tag}:([\d.]+)', tech) if m: ss[tag] = m.group(1) # 基本面 fund = fund_cache.get(code, {}) pe = fund.get("pe", 0) eps = fund.get("eps", 0) pe_str = f"PE{pe:.0f}" if pe else "" eps_str = f"EPS{eps:.2f}" if eps else "" # 从 signal_factors 提取各维度 def _match_factor(prefix): for f in d_factors: if f.startswith(prefix): return f return "" market_factor = _match_factor("大盘") sector_factor = _match_factor("行业") value_factor = _match_factor("高估值") or _match_factor("低估值") or _match_factor("蓝筹") or pe_str or "" news_factor = _match_factor("消息") tech_factor = _match_factor("净利") or _match_factor("组合") or "" # 构建分析行 parts = [] if market_factor: parts.append(f"大盘{market_factor.replace('大盘','')}") if sector_factor: parts.append(f"行业{sector_factor.replace('行业','')}") if pe_str or value_factor: parts.append(value_factor or pe_str) if news_factor: parts.append(news_factor) if not parts: parts.append(sector or cat or "") analysis = " | ".join(p for p in parts if p) # 仓位计算 theo_pct, actual_pct, details, lots, lot_cost_total = calc_position( lot, rr, market_factor, cat, code ) pfx = "" if len(code) == 6 else "HK$" # 取分支动作类型 branch_action = "hold" branch_rationale = "" if st and scenario_id: try: results = st.evaluate_branches(code, scenario_id, price, d.get("shares", 0), d.get("cost", 0)) applicable = [r for r in results if r.get("applicable")] if applicable: best = min(applicable, key=lambda r: r.get("priority", 999)) branch_action = best.get("action_type", "hold") branch_rationale = best.get("rationale", "") except Exception: pass # 冷却检查:相同股+相同操作30分钟内不发 cooled, elapsed, cd_key = in_cooldown(code, branch_action, cooldown) if cooled: continue action_tag = "⚠️" if lots == 0 else "🛒" lines.append( f" {action_tag} {name}({code}) {pfx}{price:.2f} 买区{buy_low}~{buy_high} | " f"1手{lot:,.0f}元 RR={rr:.1f} 损{sl} 盈{tp}\n" f" {analysis}\n" f" 技术{ss['强撑']}→{ss['弱撑']}→{ss['弱压']}→{ss['强压']} | 信号{sig}\n" f" 仓位:理论{theo_pct}%×总资产 | 建议{actual_pct}%({details})" ) # 分支描述 branch_line = "" if branch_action != "hold": branch_line = f" 【{scenario_label}→{branch_action}】{branch_rationale}" if branch_line: lines[-1] += f"\n{branch_line}" # 记录推送时间(冷却计时用) cooldown[cd_key] = now_ts save_cooldown(cooldown) # 修正可操作数量(剔除冷却跳过后的实际数量) actual_n = len(lines) - (1 if macro_line else 0) - 1 # 减去市场背景 + 操作建议标题 if actual_n != n: # 更新操作建议行 for i, ln in enumerate(lines): if "【💡 操作建议】" in ln: lines[i] = f"【💡 操作建议】(当前{actual_n}只自选可操作 | 总资产{total_assets:,.0f}元 现金{available_cash:,.0f}元)" break if actual_n <= 0: return 0 # 全部冷却中 → 静默,不推 lines.insert(0, f"【知微】自选买入提醒 {now} | 总资产{total_assets:,.0f}元") out = "\n".join(lines) print(out) push_to_xmpp(out) return 0 if __name__ == "__main__": sys.exit(main())