From f50b93d427596413560cebe63d022c3c8fda7849 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Wed, 1 Jul 2026 22:32:30 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20eliminate=20duplicate=20price=20fetches?= =?UTF-8?q?=20=E2=80=94=20stale=5Fpush=5Fwlin=20+=20per=5Fstock=5Freassess?= =?UTF-8?q?=20now=20read=20from=20DB=20instead=20of=20self-pulling=20Tence?= =?UTF-8?q?nt=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/per_stock_reassess.py | 46 +- scripts/stale_push_wlin.py | 1828 +++++++++++++++++---------------- 2 files changed, 940 insertions(+), 934 deletions(-) diff --git a/scripts/per_stock_reassess.py b/scripts/per_stock_reassess.py index e83de4b..542feb3 100644 --- a/scripts/per_stock_reassess.py +++ b/scripts/per_stock_reassess.py @@ -41,41 +41,35 @@ def main(): # Always fetch live price for accurate reassessment price = 0 try: - import urllib.request + # 价格从 DB 读取(price_monitor 每2分钟更新,唯一价格入口) code_raw = entry.get("code", "") - # 港股5位代码(含0开头)→ 前缀hk - if len(code_raw) == 5 and code_raw[0] in '01': - prefix = "hk" + price = 0 + import sqlite3 + db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db') + db.row_factory = sqlite3.Row + row = db.execute("SELECT price FROM holdings WHERE code=? AND is_active=1", (code_raw,)).fetchone() + if not row: + row = db.execute("SELECT price FROM watchlist_stocks WHERE code=? AND is_active=1", (code_raw,)).fetchone() + if not row: + row = db.execute("SELECT price FROM holding_strategies WHERE code=? AND status='active' ORDER BY updated_at DESC LIMIT 1", (code_raw,)).fetchone() + if row: + price = row['price'] or 0 + db.close() + if price > 0: + print(f" 实时价: {price} (来自DB)") else: - sym_map = {"6":"sh","5":"sh","0":"sz","3":"sz"} - for k, v in sym_map.items(): - if code_raw.startswith(k): - prefix = v - break - if not prefix: - prefix = "sz" - url = f"http://qt.gtimg.cn/q={prefix}{code_raw}" - resp = urllib.request.urlopen(url, timeout=5) - text = resp.read().decode("gbk") - fields = text.split('"')[1].split("~") - price = float(fields[3]) if fields[3] else 0 - print(f" 实时价: {price} {'(港股HKD)' if len(code_raw) == 5 and code_raw[0] in '01' else ''}") - except Exception as e: - print(f" 实时价获取失败: {e}", file=sys.stderr) - # Try portfolio.json as fallback (price_monitor keeps live prices) - try: + # fallback to portfolio.json with open("/home/hmo/web-dashboard/data/portfolio.json") as _pf: _pf_data = json.load(_pf) for _h in _pf_data.get("holdings", []): if _h["code"] == code_raw: price = float(_h.get("price", 0)) - print(f" 从portfolio.json取实时价: {price}") break - except Exception: - pass - if price == 0: + if price <= 0: price = entry.get("current_price") or entry.get("price") or 0 - print(f" fallback到存储价: {price}", file=sys.stderr) + except Exception as e: + print(f" 价格获取失败: {e}", file=sys.stderr) + price = entry.get("current_price") or entry.get("price") or 0 # Price diff debounce: skip reassessment if price changed < 1% since last update last_price = entry.get("last_reassessed_price", 0) diff --git a/scripts/stale_push_wlin.py b/scripts/stale_push_wlin.py index 5be42c6..e50b472 100644 --- a/scripts/stale_push_wlin.py +++ b/scripts/stale_push_wlin.py @@ -1,908 +1,920 @@ -#!/usr/bin/env python3 -""" -stale_push_wlin.py — 按5步逻辑推送自选股买入区提醒 + 自动触发重评 - -5步逻辑: -1. 筛选 is_watchlist=true 且价在买入区 -2. RR<1.5/无止盈位/非买入signal → 标记 STRATEGY_STALE → 触发自动重评 -3. 可推的:计算每手买入金额和现金占比 -4. 发现 STRATEGY_STALE → 后台跑 per_stock_reassess.py 自动重评 - -no_agent模式:有推送→输出;无→静默 -搭配 cron: no_agent=True, 交易日每30分跑一次 -""" -import subprocess -import sys -import re -import json -import os -import threading -import time -from datetime import datetime, time - -# ── MoFin unified model import ────────────────────────────────────── -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -try: - from mo_models import is_hk_stock, get_hk_rate, to_cny, calc_total_assets - _USE_MO_MODELS = True -except ImportError: - _USE_MO_MODELS = False - def is_hk_stock(code): - code = str(code or '').strip().upper() - return len(code) == 5 and code.isdigit() and code[0] in ('0', '1') - def get_hk_rate(): - return 0.87 - def to_cny(price, code): - if price is None: return price - if is_hk_stock(code): return round(float(price) * get_hk_rate(), 2) - return price - def calc_total_assets(pf): - total_mv = sum((h.get('shares',0) or 0) * (h.get('price',0) or 0) for h in pf.get('holdings',[])) - return round(total_mv + (pf.get('cash',0) or 0) + (pf.get('frozen_cash',0) or 0), 2) - -# 市场时段检查 -_MARKET_HOURS = { - 'ashare': (time(9, 30), time(15, 0)), - 'hk': (time(9, 30), time(16, 0)), -} - -def is_ashare(code: str) -> bool: - """判断是否A股代码""" - return code.isdigit() and (code.startswith(('6', '5')) or len(code) in (6,)) - -def market_is_open(code: str, now: datetime = None) -> bool: - """检查某股票对应市场是否在交易时段内""" - if not code: - return True - now = now or datetime.now() - t = now.time() - code_str = str(code) - if code_str.startswith(('0', '1')) and len(code_str) == 5: - # 港股 - start, end = _MARKET_HOURS['hk'] - else: - # A股(含ETF、科创板) - start, end = _MARKET_HOURS['ashare'] - return start <= t <= end -try: - from urllib.request import Request, urlopen -except ImportError: - from urllib2 import Request, urlopen -# 6维评分系统 -sys.path.insert(0, "/home/hmo/MoFin/scripts") -from stock_scorer import score_future_outlook, is_hk_stock, settlement_delay_note - -# ── 趋势检查 ──────────────────────────────────────────────────── -def fetch_trend_data(code): - """取均线数据判断趋势状态。返回 (current_price, ma5, trend_label) 或 None""" - try: - prefix = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk" - url = f"http://qt.gtimg.cn/q={prefix}{code}" - req = Request(url, headers={'User-Agent': 'Mozilla/5.0'}) - resp = urlopen(req, timeout=5).read().decode('gbk') - fld = resp.split('=')[1].strip().strip('"').strip(';').split('~') - current = float(fld[3]) if len(fld) > 3 else 0 - except: - return None - - try: - url = f"http://ifzq.gtimg.cn/appstock/app/fqkline/get?param={prefix}{code},day,,,30,qfq" - req = Request(url, headers={'User-Agent': 'Mozilla/5.0'}) - resp = urlopen(req, timeout=5).read().decode('utf-8') - data = json.loads(resp) - day_key = 'qfqday' if prefix != 'hk' else 'day' - bars = data.get('data', {}).get(f'{prefix}{code}', {}).get(day_key, []) - except: - return None - - if not bars or current <= 0: - return None - closes = [float(b[2]) for b in bars] - if len(closes) < 5: - return None - - def ma(n): - return sum(closes[-n:]) / n - ma5 = ma(5) - ma10 = ma(10) if len(closes) >= 10 else None - ma20 = ma(20) if len(closes) >= 20 else None - - # 趋势分析 - pct_above_ma5 = (current - ma5) / ma5 * 100 - uptrend = False - - if ma20 and ma10: - if ma5 > ma10 > ma20: - trend_label = "多头排列" - uptrend = True - elif current < ma5 and ma5 < ma10 and current < ma10: - trend_label = "空头排列" - elif current > ma5 and ma5 > ma10: - trend_label = "短期转强" - uptrend = True - else: - trend_label = "震荡" - if current > ma5 > ma10: - uptrend = True - else: - trend_label = "数据不足" - - return { - 'price': current, - 'ma5': round(ma5, 2), - 'ma10': round(ma10, 2) if ma10 else None, - 'ma20': round(ma20, 2) if ma20 else None, - 'pct_above_ma5': round(pct_above_ma5, 1), - 'trend': trend_label, - 'uptrend': uptrend, - } - -# ── XMPP -XMPP_BRIDGE = "http://127.0.0.1:5805/" -XMPP_USER = "hmo@yoin.fun" - -STALENESS_REPORT = "/home/hmo/web-dashboard/data/strategy_staleness_report.json" -DETECTOR = "/home/hmo/.hermes/profiles/position-analyst/scripts/stale_detector.py" -PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" -REGEN_SCRIPT = "/home/hmo/.hermes/profiles/position-analyst/scripts/per_stock_reassess.py" -REGEN_LOCK = "/tmp/.stale_push_wlin_regen.lock" -MACRO_CTX = "/home/hmo/web-dashboard/data/macro_context.json" -MARKET_JSON = "/home/hmo/web-dashboard/data/market.json" -COOLDOWN_PATH = "/home/hmo/web-dashboard/data/push_cooldown.json" - -NON_BUY_SIGNALS = ["观望", "弱势持有", "深套持有"] - - -def load_macro_line(): - """加载大盘和市场的简要描述""" - parts = [] - try: - # 优先 DB - import sqlite3 - db = sqlite3.connect("/home/hmo/MoFin/data/mofin.db") - row = db.execute( - "SELECT structure FROM macro_context_log " - "WHERE has_valid_data=1 ORDER BY created_at DESC LIMIT 1" - ).fetchone() - db.close() - if row and row[0]: - m = json.loads(row[0]) - else: - raise ValueError("no db data") - overall = m.get("overall", "neutral") - desc = m.get("description", "") - if "bearish" in overall: - parts.append("大盘偏弱") - elif overall == "bullish": - parts.append("大盘偏强") - elif desc: - parts.append(f"大盘{desc}") - except Exception: - try: - with open(MACRO_CTX) as f: - m = json.load(f).get("structure", {}) - overall = m.get("overall", "neutral") - desc = m.get("description", "") - if "bearish" in overall: - parts.append("大盘偏弱") - elif overall == "bullish": - parts.append("大盘偏强") - elif desc: - parts.append(f"大盘{desc}") - except Exception: - pass - try: - with open(MARKET_JSON) as f: - mk = json.load(f) - mood = mk.get("mood", "") - if mood: - parts.append(f"市场{mood}") - except Exception: - pass - return " | ".join(parts) if parts else "" - - -def is_actionable(cur, timing_signal=""): - """检查信号是否可操作。空文本/含非买入关键词 → 不可操作""" - if not cur and not timing_signal: - return False # 空文本默认不安全 - for kw in NON_BUY_SIGNALS: - if cur and kw.lower() in cur.lower(): - return False - if timing_signal and kw.lower() in timing_signal.lower(): - return False - return True - - -def trigger_regen_sync(stock_codes=None): - """同步执行指定个股的重评(等重评完再发报告)""" - if not stock_codes: - return - try: - cmd = ["python3", REGEN_SCRIPT] + stock_codes - subprocess.run(cmd, capture_output=True, text=True, timeout=60) - except subprocess.TimeoutExpired: - print("[REGEN] 重评超时(60s)", file=sys.stderr) - except Exception as e: - print(f"[REGEN] 重评失败: {e}", file=sys.stderr) - - -def load_cash(): - """从 portfolio.json 实时读可用现金(可用 ≈ 实时买力),不硬编码""" - try: - with open(PORTFOLIO_PATH) as f: - data = json.load(f) - if isinstance(data, dict): - # 先读 cash_available(拆分了可用/冻结),fallback 到 cash - return data.get("cash_available", data.get("cash", 0)) - if isinstance(data, list) and len(data) > 1 and isinstance(data[1], dict): - return data[1].get("cash_available", data[1].get("cash", 0)) - return 0 - except Exception: - return 0 - - -_HK_LOT_CACHE = {} - -def hk_lot_size(code): - """从腾讯行情API获取港股实际每手股数(字段[60]),带缓存""" - if code in _HK_LOT_CACHE: - return _HK_LOT_CACHE[code] - try: - url = f"http://qt.gtimg.cn/q=hk{code}" - req = Request(url, headers={"User-Agent": "curl/7.81"}) - with urlopen(req, timeout=5) as r: - text = r.read().decode("gbk") - raw = text.split("=", 1)[1].strip().strip('"').strip(";") - fld = raw.split("~") - lot = int(fld[60]) if len(fld) > 60 and fld[60] else 1000 - _HK_LOT_CACHE[code] = lot - return lot - except Exception: - _HK_LOT_CACHE[code] = 1000 - return 1000 - - -def lot_cost(code, price): - if str(code).startswith("688"): - return 200 * price - elif is_hk_stock(code): - lot = hk_lot_size(code) - rate = get_hk_rate() - return int(lot * price * rate) - else: - return 100 * price - - -def push_to_xmpp(text): - """通过知微 HTTP bridge 推送到老爸私信""" - if not text.strip(): - return - try: - payload = json.dumps({ - "to": XMPP_USER, - "body": text.strip(), - "type": "chat", - }).encode("utf-8") - req = Request(XMPP_BRIDGE, data=payload, headers={"Content-Type": "application/json"}) - urlopen(req, timeout=5) - except Exception as e: - print(f"[XMPP推送失败] {e}", file=sys.stderr) - - -def load_cooldown(): - try: - with open(COOLDOWN_PATH) as f: - return json.load(f) - except Exception: - return {} - - -def save_cooldown(cd): - try: - with open(COOLDOWN_PATH, "w") as f: - json.dump(cd, f, indent=2) - except Exception: - pass - - -def in_cooldown(code, action_type, cooldown_dict, minutes=30): - key = f"{code}_{action_type}" - last = cooldown_dict.get(key, 0) - elapsed = datetime.now().timestamp() - last - return elapsed < minutes * 60, elapsed, key - - -def main(): - r = subprocess.run( - ["python3", DETECTOR], capture_output=True, text=True, timeout=60 - ) - if r.returncode != 0 and r.stderr: - print(f"[stderr] {r.stderr.strip()}", file=sys.stderr) - - wl_lines = [ - l for l in r.stdout.split("\n") - if "[WL_IN]" in l and "[自选]" in l - ] - if not wl_lines: - return 0 - - # 读 stale report - try: - with open(STALENESS_REPORT) as f: - report = json.load(f) - except Exception: - report = {"flagged": []} - code_cur = {i["code"]: i.get("current", "") for i in report.get("flagged", [])} - - # 加载冷却状态 - cooldown = load_cooldown() - now_ts = datetime.now().timestamp() - - # 读 decisions.json 获取完整策略数据 - code_data = {} - try: - with open("/home/hmo/web-dashboard/data/decisions.json") as f: - dec = json.load(f) - for e in dec.get("decisions", []): - code_data[e["code"]] = e - except Exception: - pass - - cash = load_cash() - stocks = [] - stale_list = [] - all_candidates = [] # 所有在买入区的自选(stale+non-stale) - - for l in wl_lines: - m = re.match(r'\[WL_IN\](?:\s+\[\w+\])*\s+\[自选\]\s+(\S+)\((\d+)\)', l) - if not m: - continue - name, code = m.group(1), m.group(2) - pm = re.search(r'价(\d+\.\d{2})', l) - if not pm: - continue - price = float(pm.group(1)) - zm = re.search(r'买入([\d.]+)~([\d.]+)', l) - if not zm: - continue - buy_low, buy_high = float(zm.group(1)), float(zm.group(2)) - is_stale = "[STRATEGY_STALE]" in l - cur = code_cur.get(code, "") - - all_candidates.append((name, code, price, buy_low, buy_high, cur, is_stale)) - - if not is_actionable(cur, code_data.get(code, {}).get("timing_signal", "")) or is_stale: - stale_list.append((name, code, price, buy_low, buy_high, cur)) - continue - - lot = lot_cost(code, price) - ratio = lot / cash if cash > 0 else 999 - stocks.append((name, code, price, buy_low, buy_high, lot, ratio)) - - if not stocks and not stale_list: - return 0 - - now = datetime.now().strftime("%H:%M") - lines = [] - - # 市场背景 - macro_line = load_macro_line() - if macro_line: - lines.append(f"【市场背景】{macro_line}") - - # [关键修复: 2026-06-25] 所有预推票先重评,再出报告 - # 不只是 stale 的重评,所有在买入区的自选都先刷新策略,确保推荐不滞后 - to_reassess = list(set(s[1] for s in stocks) | set(s[1] for s in stale_list)) - if to_reassess: - trigger_regen_sync(to_reassess) - # 重评完成,re-read decisions.json 获取最新策略 - code_data = {} - try: - with open("/home/hmo/web-dashboard/data/decisions.json") as f: - dec = json.load(f) - for e in dec.get("decisions", []): - code_data[e["code"]] = e - except Exception: - pass - - # 重新过滤:重评后可能有策略变化(止盈/止损/信号变动) - # 重建 stocks 列表,用新数据判断(不再用旧 is_stale 标记,因为已全部重评) - stocks = [] - for (name, code, price, buy_low, buy_high, cur, is_stale) in all_candidates: - # 重评后重新检查 actionability(用新 timing_signal) - sig = code_data.get(code, {}).get("timing_signal", "") - if not is_actionable(cur, sig): - continue - lot = lot_cost(code, price) - ratio = lot / cash if cash > 0 else 999 - stocks.append((name, code, price, buy_low, buy_high, lot, ratio)) - - # 加载portfolio获取持仓信息(A/H去重用) - pf = {"holdings": []} - try: - with open(PORTFOLIO_PATH) as f: - pf = json.load(f) - except Exception: - pass - - stocks.sort(key=lambda s: ( - 0 if len(str(s[1])) == 6 else 1, - -code_data.get(s[1], {}).get("rr_ratio", 0) - )) - - # 只展示有清晰操作信号的个股 - # timing_signal 必须是明确操作方向:买入/加仓/观望/关注/信号不充分 - # 行业描述(行业偏弱/行业偏强/大盘变盘等)不是操作信号,一律跳过 - VALID_SIGNALS = {"买入", "加仓", "观望", "关注", "信号不充分"} - SKIP_KEYWORDS = ["等企稳", "信号不充分"] - - actionable = [] - for s in stocks: - sig = code_data.get(s[1], {}).get("timing_signal", "") - if not sig: - continue - # 跳过非操作信号 - if any(kw in sig for kw in SKIP_KEYWORDS): - continue - # 中性信号跳过 - stripped = sig.strip() - if not stripped or stripped.lower() in ("", "neutral", "持有", "深套持有", "弱势持有"): - continue - # 信号必须含买入/加仓才推荐——其他非操作信号跳过 - if not any(kw in sig for kw in ["买入", "加仓"]): - continue - # 趋势检查:必须不是空头排列(价格在MA5以下且MA5= 5: - theo_pct = 25 - elif rr >= 3: - theo_pct = 18 - elif rr >= 2: - theo_pct = 12 - else: - theo_pct = 8 - if "偏弱" in market_factor: - theo_pct = int(theo_pct * 0.8) - elif "偏强" in market_factor: - theo_pct = int(theo_pct * 1.15) - if cat in ("蓝筹", "白马"): - theo_pct = int(theo_pct * 1.2) - elif cat in ("题材", "短线"): - theo_pct = int(theo_pct * 0.6) - elif cat in ("高波动", "成长"): - theo_pct = int(theo_pct * 0.85) - theo_pct = max(5, min(30, theo_pct)) - - # 当前建议仓位:理论占总资产% → 按现金锁死 - ideal_budget = total_assets * theo_pct / 100 - # 可操作N只时,现金分配不超过 available_cash / n * 1.5 - max_use_cash = (available_cash / max(n, 1)) * 1.5 - budget = min(ideal_budget, max_use_cash, available_cash) - lots = int(budget / lot_cost) if lot_cost > 0 else 0 - - if lots == 0 and lot_cost > 0 and budget > lot_cost * 0.8: - # 预算覆盖超过80%的1手金额 → 至少1手(仅差一档) - lots = 1 - - lot_cost_total = lots * lot_cost - if lots == 0: - pct_actual = 0 - elif total_assets > 0: - pct_actual = round(lot_cost_total / total_assets * 100) - else: - pct_actual = 0 - - if lots == 0: - details = f"预算不足1手({budget:,.0f}/{lot_cost:,.0f}元)" - else: - if len(str(code)) == 5: - hk_lot = hk_lot_size(code) - shares = lots * hk_lot - elif code.startswith("688"): - shares = lots * 200 - else: - shares = lots * 100 - details = f"{lots}手({shares}股,{lot_cost_total:,.0f}元)" - - return theo_pct, pct_actual, details, lots, lot_cost_total - - # ── 换仓评估 ────────────────────────────────────────────────────── - # score_future_outlook 从 stock_scorer 模块导入(6维评分) - - def evaluate_swap(lot_cost_target, rr, sig, tp, sl, name, code, price_in, - total_assets_in, cash_in, pf_in, cd_in): - """现金不足时评估是否卖差票换推荐股。 - - 核心逻辑: - - 已发生的亏损是沉没成本,不参与决策 - - 用6维评分法评估每个持仓的未来前景(基于决策系统既有数据) - - 优先卖前景最差的票,保留前景好的票(无论当前盈亏%) - - 卖港股→买A股需T+2到账,如果推荐此方案则标注延迟风险 - - 对目标票(RR>=3+买入信号)才有换仓资格 - - 返回(推荐文案str, 缺口float)或 (None, gap) - """ - gap = lot_cost_target - cash_in - # 目标票质量门槛 - if rr < 3.0 or gap <= 0 or gap > total_assets_in * 0.5: - return None, gap - if not any(kw in sig for kw in ["买入", "加仓", "建仓"]): - return None, gap - - # 收集持仓数据 + 前景评分 - ph = [] - for h in pf_in.get("holdings", []): - hs = h.get("shares", 0) or 0 - hp = h.get("price", 0) or 0 - hc = h.get("cost", 0) or 0 - if hs <= 0 or hp <= 0: - continue - hmv = hs * hp - # 港股价格已是 CNY(price_monitor 写入时已转),不需要再乘汇率 - hpl_pct = (hp - hc) / hc * 100 if hc else 0 - - # 6维全面评分(越低越差,越建议卖) - fscore, _ = score_future_outlook(h_code, cd_in) - - ph.append({ - "code": h_code, - "name": h.get("name", ""), - "shares": hs, - "price": hp, - "cost": hc, - "mv": round(hmv), - "pl_pct": round(hpl_pct, 1), - "score": fscore, - }) - - # 按前景评分升序(最差的排最前面) - ph.sort(key=lambda x: x["score"]) - - # 打印调试信息:所有持仓的前景评分 - # print(f"[SWAP_DEBUG] 前景评分(越低越差):", file=sys.stderr) - # for x in ph[:10]: - # print(f" {x['name']}({x['code']}) 评分{x['score']} 亏{x['pl_pct']}% 市值{x['mv']:,}", file=sys.stderr) - - # 只考虑评分<=0(前景差或中性偏弱)的作为减仓候选 - candidates = [h for h in ph if h["score"] <= 0] - if not candidates: - return None, gap - - # 贪心选评分最差的,凑够现金缺口(最多2只) - selected = [] - cash_freed = 0 - for h in candidates: - if cash_freed >= gap: - break - cash_freed += h["mv"] - selected.append(h) - - if cash_freed < gap or len(selected) > 2: - return None, gap - - # 计算目标票的预期涨幅 - if tp and tp > 0: - target_gain_pct = (tp - price_in) / price_in * 100 - else: - target_gain_pct = rr * 3 - - # 构建推荐文案 - buy_is_a = not is_hk_stock(code) # 目标是否是A股 - sell_parts = [] - sell_names = [] - settlement_warnings = [] - for h in selected: - # 每个被选股票配一句"为什么卖它" - reason = f"评分{h['score']}" - if h['pl_pct'] <= -30: - reason += "深套" - elif h['pl_pct'] <= -15: - reason += f"亏损{h['pl_pct']}%" - sell_parts.append(f"{h['name']}({h['code']}) {h['shares']}股 亏{h['pl_pct']}% ({reason})") - sell_names.append(h['name']) - # 检查结算延迟:卖港股→买A股 - if is_hk_stock(h['code']) and buy_is_a: - settlement_warnings.append(f"{h['name']}是港股通,卖出需T+2到账才能买A股") - sell_desc = ";".join(sell_parts) - - new_budget = cash_in + cash_freed - new_lots = int(new_budget / lot_cost_target) if lot_cost_target > 0 else 0 - if new_lots == 0: - return None, gap - if code.startswith("688"): - new_shares = new_lots * 200 - elif len(code) <= 5: - new_shares = new_lots * hk_lot_size(code) - else: - new_shares = new_lots * 100 - new_cost = new_lots * lot_cost_target - new_pct = round(new_cost / total_assets_in * 100) if total_assets_in > 0 else 0 - - text = ( - f"换仓建议:卖{sell_desc}" - f"→腾{round(cash_freed):,}元" - f"→买{name}({code}) {new_lots}手({new_shares}股,{round(new_cost):,}元)" - f"占{new_pct}%仓位" - f"(止损{sl}(-{round((price_in-sl)/price_in*100,1)}%)" - f"止盈{tp}(+{round(target_gain_pct,1)}%)" - f" RR={rr})\n" - f" 理由:{', '.join(sell_names)}评分最低," - f"继续持有无积极信号且技术偏弱;" - f"换到有明确信号和止损的标的,预期收益更优。" - ) - if settlement_warnings: - text += "\n ⚠️ " + " | ".join(settlement_warnings) - return text, gap - - # 标准格式:每个可操作标的 — 大盘/行业/个股三面 + 仓位 - lines.append(f"【💡 操作建议】(当前{n}只自选可操作 | 总资产{total_assets:,.0f}元 现金{available_cash:,.0f}元)") - for s in actionable: - name, code, price, buy_low, buy_high, lot, ratio = s - d = code_data.get(code, {}) - sl = d.get("stop_loss", 0) - tp = d.get("take_profit", 0) - rr = d.get("rr_ratio", 0) - sig = d.get("timing_signal", "") - sector = d.get("sector_context", "") - tech = d.get("tech_snapshot", "") - mtf_ctx = d.get("multi_tf_context", "") - note = d.get("note", "") - d_factors = d.get("signal_factors", []) - cat = d.get("stock_category", "") - - # 提取技术位 - ss = {"强撑":"-", "弱撑":"-", "弱压":"-", "强压":"-"} - for tag in ss: - m = re.search(rf'{tag}:([\d.]+)', tech) - if m: - ss[tag] = m.group(1) - - # 基本面 - fund = fund_cache.get(code, {}) - pe = fund.get("pe", 0) - eps = fund.get("eps", 0) - pe_str = f"PE{pe:.0f}" if pe else "" - eps_str = f"EPS{eps:.2f}" if eps else "" - - # 从 signal_factors 提取各维度 - def _match_factor(prefix): - for f in d_factors: - if f.startswith(prefix): - return f - return "" - - market_factor = _match_factor("大盘") - sector_factor = _match_factor("行业") - value_factor = _match_factor("高估值") or _match_factor("低估值") or _match_factor("蓝筹") or pe_str or "" - news_factor = _match_factor("消息") - tech_factor = _match_factor("净利") or _match_factor("组合") or "" - - # 构建分析行 - parts = [] - if market_factor: - parts.append(f"大盘{market_factor.replace('大盘','')}") - if sector_factor: - parts.append(f"行业{sector_factor.replace('行业','')}") - if pe_str or value_factor: - parts.append(value_factor or pe_str) - if news_factor: - parts.append(news_factor) - if not parts: - parts.append(sector or cat or "") - - analysis = " | ".join(p for p in parts if p) - - # 仓位计算 - theo_pct, actual_pct, details, lots, lot_cost_total = calc_position( - lot, rr, market_factor, cat, code - ) - - pfx = "" if len(code) == 6 else "HK$" - - # 取分支动作类型 - branch_action = "hold" - branch_rationale = "" - if st and scenario_id: - try: - results = st.evaluate_branches(code, scenario_id, price, d.get("shares", 0), d.get("cost", 0)) - applicable = [r for r in results if r.get("applicable")] - if applicable: - best = min(applicable, key=lambda r: r.get("priority", 999)) - branch_action = best.get("action_type", "hold") - branch_rationale = best.get("rationale", "") - except Exception: - pass - - # 冷却检查:相同股+相同操作30分钟内不发 - cooled, elapsed, cd_key = in_cooldown(code, branch_action, cooldown) - if cooled: - continue - - # 策略质量过滤:只有正向/中性信号才推操作建议 - bad_keywords = ["偏弱", "弱势", "观望", "卖出", "回避", "回避"] - if any(kw in sig for kw in bad_keywords): - continue - - # 行业背景过滤:行业大跌时不在买入区推荐(即使个股信号好) - if "大跌" in sector: - continue - - # 换仓评估:现金不足时评估是否卖差票换推荐股 - swap_text = None - if lots == 0: - swap_text, _ = evaluate_swap( - lot, rr, sig, tp, sl, name, code, price, - total_assets, available_cash, pf, code_data - ) - - action_tag = "🛒" if (lots > 0 or swap_text) else "⚠️" - - lines.append( - f" {action_tag} {name}({code}) {pfx}{price:.2f} 买区{buy_low}~{buy_high} | " - f"1手{lot:,.0f}元 RR={rr:.1f} 损{sl} 盈{tp}\n" - f" {analysis}\n" - f" 技术{ss['强撑']}→{ss['弱撑']}→{ss['弱压']}→{ss['强压']} | 信号{sig}\n" - f" 仓位:理论{theo_pct}%×总资产 | 建议{actual_pct}%({details})" - ) - - if mtf_ctx: - lines[-1] += f"\n 均线{mtf_ctx}" - - if swap_text: - lines[-1] += f"\n {swap_text}" - - # 分支描述 - branch_line = "" - if branch_action != "hold": - branch_line = f" 【{scenario_label}→{branch_action}】{branch_rationale}" - if branch_line: - lines[-1] += f"\n{branch_line}" - - # 记录推送时间(冷却计时用) - cooldown[cd_key] = now_ts - - save_cooldown(cooldown) - - # 修正可操作数量(剔除冷却跳过后的实际数量) - actual_n = len(lines) - (1 if macro_line else 0) - 1 # 减去市场背景 + 操作建议标题 - if actual_n != n: - # 更新操作建议行 - for i, ln in enumerate(lines): - if "【💡 操作建议】" in ln: - lines[i] = f"【💡 操作建议】(当前{actual_n}只自选可操作 | 总资产{total_assets:,.0f}元 现金{available_cash:,.0f}元)" - break - - if actual_n <= 0: - return 0 # 全部冷却中 → 静默,不推 - - # ── T+2前瞻:扫描近期可能入买区的A股,提前准备现金 ── - t2_lines = [] - try: - dec_t2 = json.loads(open("/home/hmo/web-dashboard/data/decisions.json").read()) - for entry in dec_t2.get("decisions", []): - if entry.get("status") == "closed" or entry.get("type") != "自选策略": - continue - ec = entry["code"] - el = entry.get("entry_low", 0) or 0 - eh = entry.get("entry_high", 0) or 0 - ep = entry.get("price", 0) or 0 - if not eh or not ep or el <= 0: - continue - # A股+价格在买入区上方5%以内(即将进入买入区) - if not is_hk_stock(ec) and el <= ep <= eh * 1.05 and ep > eh: - anticipation_pct = (ep - eh) / eh * 100 - lot = lot_cost(ec, ep) - if lot > available_cash: - # 现金不足 → 卖港股提前准备 - ph = [] - for h in pf.get("holdings", []): - hs = h.get("shares", 0) or 0 - hp = h.get("price", 0) or 0 - hc = h.get("cost", 0) or 0 - if hs <= 0 or hp <= 0 or not is_hk_stock(h.get("code","")): - continue - sc = score_future_outlook(h.get("code",""), code_data) - ph.append((sc, h)) - ph.sort(key=lambda x: x[0]) - if ph: - worst = ph[0][1] - w_name = worst.get("name","?") - w_code = worst.get("code","") - w_price = worst.get("price",0) - w_shares = worst.get("shares",0) - w_value = w_price * w_shares - if w_value >= lot: - name_e = entry.get("name","") - t2_lines.append( - f" ⏳ {name_e}({ec})距买入区仅{anticipation_pct:.0f}%," - f"需{lot:,.0f}元。建议提前卖{w_name}({w_code})" - f"腾{w_value:,.0f}元(T+2到账后可用)" - ) - except: - pass - - if t2_lines: - lines.append("") - lines.append("【⏳ 提前准备(T+2港股提前出清)】") - lines.extend(t2_lines) - - lines.insert(0, f"【知微】自选买入提醒 {now} | 总资产{total_assets:,.0f}元") - out = "\n".join(lines) - print(out) - push_to_xmpp(out) - return 0 - - -if __name__ == "__main__": - sys.exit(main()) +#!/usr/bin/env python3 +""" +stale_push_wlin.py — 按5步逻辑推送自选股买入区提醒 + 自动触发重评 + +5步逻辑: +1. 筛选 is_watchlist=true 且价在买入区 +2. RR<1.5/无止盈位/非买入signal → 标记 STRATEGY_STALE → 触发自动重评 +3. 可推的:计算每手买入金额和现金占比 +4. 发现 STRATEGY_STALE → 后台跑 per_stock_reassess.py 自动重评 + +no_agent模式:有推送→输出;无→静默 +搭配 cron: no_agent=True, 交易日每30分跑一次 +""" +import subprocess +import sys +import re +import json +import os +import threading +import time +from datetime import datetime, time + +# ── MoFin unified model import ────────────────────────────────────── +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +try: + from mo_models import is_hk_stock, get_hk_rate, to_cny, calc_total_assets + _USE_MO_MODELS = True +except ImportError: + _USE_MO_MODELS = False + def is_hk_stock(code): + code = str(code or '').strip().upper() + return len(code) == 5 and code.isdigit() and code[0] in ('0', '1') + def get_hk_rate(): + return 0.87 + def to_cny(price, code): + if price is None: return price + if is_hk_stock(code): return round(float(price) * get_hk_rate(), 2) + return price + def calc_total_assets(pf): + total_mv = sum((h.get('shares',0) or 0) * (h.get('price',0) or 0) for h in pf.get('holdings',[])) + return round(total_mv + (pf.get('cash',0) or 0) + (pf.get('frozen_cash',0) or 0), 2) + +# 市场时段检查 +_MARKET_HOURS = { + 'ashare': (time(9, 30), time(15, 0)), + 'hk': (time(9, 30), time(16, 0)), +} + +def is_ashare(code: str) -> bool: + """判断是否A股代码""" + return code.isdigit() and (code.startswith(('6', '5')) or len(code) in (6,)) + +def market_is_open(code: str, now: datetime = None) -> bool: + """检查某股票对应市场是否在交易时段内""" + if not code: + return True + now = now or datetime.now() + t = now.time() + code_str = str(code) + if code_str.startswith(('0', '1')) and len(code_str) == 5: + # 港股 + start, end = _MARKET_HOURS['hk'] + else: + # A股(含ETF、科创板) + start, end = _MARKET_HOURS['ashare'] + return start <= t <= end +try: + from urllib.request import Request, urlopen +except ImportError: + from urllib2 import Request, urlopen +# 6维评分系统 +sys.path.insert(0, "/home/hmo/MoFin/scripts") +from stock_scorer import score_future_outlook, is_hk_stock, settlement_delay_note + +# ── 趋势检查 ──────────────────────────────────────────────────── +def fetch_trend_data(code): + """取均线数据判断趋势状态。价格从 DB 读取(price_monitor 唯一入口)。返回 (current_price, ma5, trend_label) 或 None""" + # 价格从 DB 读取,不再自拉腾讯 API + current = 0 + try: + import sqlite3 + db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db') + db.row_factory = sqlite3.Row + row = db.execute("SELECT price FROM holdings WHERE code=? AND is_active=1", (code,)).fetchone() + if not row: + row = db.execute("SELECT price FROM watchlist_stocks WHERE code=? AND is_active=1", (code,)).fetchone() + if not row: + row = db.execute("SELECT price FROM holding_strategies WHERE code=? AND status='active' ORDER BY updated_at DESC LIMIT 1", (code,)).fetchone() + if row: + current = row['price'] or 0 + db.close() + except Exception: + pass + + if current <= 0: + return None + + # K线数据仍从腾讯取(均线计算需要历史K线,DB 里 stock_daily 表有但不一定有最新数据) + try: + prefix = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk" + url = f"http://ifzq.gtimg.cn/appstock/app/fqkline/get?param={prefix}{code},day,,,30,qfq" + req = Request(url, headers={'User-Agent': 'Mozilla/5.0'}) + resp = urlopen(req, timeout=5).read().decode('utf-8') + data = json.loads(resp) + day_key = 'qfqday' if prefix != 'hk' else 'day' + bars = data.get('data', {}).get(f'{prefix}{code}', {}).get(day_key, []) + except: + return None + + if not bars or current <= 0: + return None + closes = [float(b[2]) for b in bars] + if len(closes) < 5: + return None + + def ma(n): + return sum(closes[-n:]) / n + ma5 = ma(5) + ma10 = ma(10) if len(closes) >= 10 else None + ma20 = ma(20) if len(closes) >= 20 else None + + # 趋势分析 + pct_above_ma5 = (current - ma5) / ma5 * 100 + uptrend = False + + if ma20 and ma10: + if ma5 > ma10 > ma20: + trend_label = "多头排列" + uptrend = True + elif current < ma5 and ma5 < ma10 and current < ma10: + trend_label = "空头排列" + elif current > ma5 and ma5 > ma10: + trend_label = "短期转强" + uptrend = True + else: + trend_label = "震荡" + if current > ma5 > ma10: + uptrend = True + else: + trend_label = "数据不足" + + return { + 'price': current, + 'ma5': round(ma5, 2), + 'ma10': round(ma10, 2) if ma10 else None, + 'ma20': round(ma20, 2) if ma20 else None, + 'pct_above_ma5': round(pct_above_ma5, 1), + 'trend': trend_label, + 'uptrend': uptrend, + } + +# ── XMPP +XMPP_BRIDGE = "http://127.0.0.1:5805/" +XMPP_USER = "hmo@yoin.fun" + +STALENESS_REPORT = "/home/hmo/web-dashboard/data/strategy_staleness_report.json" +DETECTOR = "/home/hmo/.hermes/profiles/position-analyst/scripts/stale_detector.py" +PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" +REGEN_SCRIPT = "/home/hmo/.hermes/profiles/position-analyst/scripts/per_stock_reassess.py" +REGEN_LOCK = "/tmp/.stale_push_wlin_regen.lock" +MACRO_CTX = "/home/hmo/web-dashboard/data/macro_context.json" +MARKET_JSON = "/home/hmo/web-dashboard/data/market.json" +COOLDOWN_PATH = "/home/hmo/web-dashboard/data/push_cooldown.json" + +NON_BUY_SIGNALS = ["观望", "弱势持有", "深套持有"] + + +def load_macro_line(): + """加载大盘和市场的简要描述""" + parts = [] + try: + # 优先 DB + import sqlite3 + db = sqlite3.connect("/home/hmo/MoFin/data/mofin.db") + row = db.execute( + "SELECT structure FROM macro_context_log " + "WHERE has_valid_data=1 ORDER BY created_at DESC LIMIT 1" + ).fetchone() + db.close() + if row and row[0]: + m = json.loads(row[0]) + else: + raise ValueError("no db data") + overall = m.get("overall", "neutral") + desc = m.get("description", "") + if "bearish" in overall: + parts.append("大盘偏弱") + elif overall == "bullish": + parts.append("大盘偏强") + elif desc: + parts.append(f"大盘{desc}") + except Exception: + try: + with open(MACRO_CTX) as f: + m = json.load(f).get("structure", {}) + overall = m.get("overall", "neutral") + desc = m.get("description", "") + if "bearish" in overall: + parts.append("大盘偏弱") + elif overall == "bullish": + parts.append("大盘偏强") + elif desc: + parts.append(f"大盘{desc}") + except Exception: + pass + try: + with open(MARKET_JSON) as f: + mk = json.load(f) + mood = mk.get("mood", "") + if mood: + parts.append(f"市场{mood}") + except Exception: + pass + return " | ".join(parts) if parts else "" + + +def is_actionable(cur, timing_signal=""): + """检查信号是否可操作。空文本/含非买入关键词 → 不可操作""" + if not cur and not timing_signal: + return False # 空文本默认不安全 + for kw in NON_BUY_SIGNALS: + if cur and kw.lower() in cur.lower(): + return False + if timing_signal and kw.lower() in timing_signal.lower(): + return False + return True + + +def trigger_regen_sync(stock_codes=None): + """同步执行指定个股的重评(等重评完再发报告)""" + if not stock_codes: + return + try: + cmd = ["python3", REGEN_SCRIPT] + stock_codes + subprocess.run(cmd, capture_output=True, text=True, timeout=60) + except subprocess.TimeoutExpired: + print("[REGEN] 重评超时(60s)", file=sys.stderr) + except Exception as e: + print(f"[REGEN] 重评失败: {e}", file=sys.stderr) + + +def load_cash(): + """从 portfolio.json 实时读可用现金(可用 ≈ 实时买力),不硬编码""" + try: + with open(PORTFOLIO_PATH) as f: + data = json.load(f) + if isinstance(data, dict): + # 先读 cash_available(拆分了可用/冻结),fallback 到 cash + return data.get("cash_available", data.get("cash", 0)) + if isinstance(data, list) and len(data) > 1 and isinstance(data[1], dict): + return data[1].get("cash_available", data[1].get("cash", 0)) + return 0 + except Exception: + return 0 + + +_HK_LOT_CACHE = {} + +def hk_lot_size(code): + """从腾讯行情API获取港股实际每手股数(字段[60]),带缓存""" + if code in _HK_LOT_CACHE: + return _HK_LOT_CACHE[code] + try: + url = f"http://qt.gtimg.cn/q=hk{code}" + req = Request(url, headers={"User-Agent": "curl/7.81"}) + with urlopen(req, timeout=5) as r: + text = r.read().decode("gbk") + raw = text.split("=", 1)[1].strip().strip('"').strip(";") + fld = raw.split("~") + lot = int(fld[60]) if len(fld) > 60 and fld[60] else 1000 + _HK_LOT_CACHE[code] = lot + return lot + except Exception: + _HK_LOT_CACHE[code] = 1000 + return 1000 + + +def lot_cost(code, price): + if str(code).startswith("688"): + return 200 * price + elif is_hk_stock(code): + lot = hk_lot_size(code) + rate = get_hk_rate() + return int(lot * price * rate) + else: + return 100 * price + + +def push_to_xmpp(text): + """通过知微 HTTP bridge 推送到老爸私信""" + if not text.strip(): + return + try: + payload = json.dumps({ + "to": XMPP_USER, + "body": text.strip(), + "type": "chat", + }).encode("utf-8") + req = Request(XMPP_BRIDGE, data=payload, headers={"Content-Type": "application/json"}) + urlopen(req, timeout=5) + except Exception as e: + print(f"[XMPP推送失败] {e}", file=sys.stderr) + + +def load_cooldown(): + try: + with open(COOLDOWN_PATH) as f: + return json.load(f) + except Exception: + return {} + + +def save_cooldown(cd): + try: + with open(COOLDOWN_PATH, "w") as f: + json.dump(cd, f, indent=2) + except Exception: + pass + + +def in_cooldown(code, action_type, cooldown_dict, minutes=30): + key = f"{code}_{action_type}" + last = cooldown_dict.get(key, 0) + elapsed = datetime.now().timestamp() - last + return elapsed < minutes * 60, elapsed, key + + +def main(): + r = subprocess.run( + ["python3", DETECTOR], capture_output=True, text=True, timeout=60 + ) + if r.returncode != 0 and r.stderr: + print(f"[stderr] {r.stderr.strip()}", file=sys.stderr) + + wl_lines = [ + l for l in r.stdout.split("\n") + if "[WL_IN]" in l and "[自选]" in l + ] + if not wl_lines: + return 0 + + # 读 stale report + try: + with open(STALENESS_REPORT) as f: + report = json.load(f) + except Exception: + report = {"flagged": []} + code_cur = {i["code"]: i.get("current", "") for i in report.get("flagged", [])} + + # 加载冷却状态 + cooldown = load_cooldown() + now_ts = datetime.now().timestamp() + + # 读 decisions.json 获取完整策略数据 + code_data = {} + try: + with open("/home/hmo/web-dashboard/data/decisions.json") as f: + dec = json.load(f) + for e in dec.get("decisions", []): + code_data[e["code"]] = e + except Exception: + pass + + cash = load_cash() + stocks = [] + stale_list = [] + all_candidates = [] # 所有在买入区的自选(stale+non-stale) + + for l in wl_lines: + m = re.match(r'\[WL_IN\](?:\s+\[\w+\])*\s+\[自选\]\s+(\S+)\((\d+)\)', l) + if not m: + continue + name, code = m.group(1), m.group(2) + pm = re.search(r'价(\d+\.\d{2})', l) + if not pm: + continue + price = float(pm.group(1)) + zm = re.search(r'买入([\d.]+)~([\d.]+)', l) + if not zm: + continue + buy_low, buy_high = float(zm.group(1)), float(zm.group(2)) + is_stale = "[STRATEGY_STALE]" in l + cur = code_cur.get(code, "") + + all_candidates.append((name, code, price, buy_low, buy_high, cur, is_stale)) + + if not is_actionable(cur, code_data.get(code, {}).get("timing_signal", "")) or is_stale: + stale_list.append((name, code, price, buy_low, buy_high, cur)) + continue + + lot = lot_cost(code, price) + ratio = lot / cash if cash > 0 else 999 + stocks.append((name, code, price, buy_low, buy_high, lot, ratio)) + + if not stocks and not stale_list: + return 0 + + now = datetime.now().strftime("%H:%M") + lines = [] + + # 市场背景 + macro_line = load_macro_line() + if macro_line: + lines.append(f"【市场背景】{macro_line}") + + # [关键修复: 2026-06-25] 所有预推票先重评,再出报告 + # 不只是 stale 的重评,所有在买入区的自选都先刷新策略,确保推荐不滞后 + to_reassess = list(set(s[1] for s in stocks) | set(s[1] for s in stale_list)) + if to_reassess: + trigger_regen_sync(to_reassess) + # 重评完成,re-read decisions.json 获取最新策略 + code_data = {} + try: + with open("/home/hmo/web-dashboard/data/decisions.json") as f: + dec = json.load(f) + for e in dec.get("decisions", []): + code_data[e["code"]] = e + except Exception: + pass + + # 重新过滤:重评后可能有策略变化(止盈/止损/信号变动) + # 重建 stocks 列表,用新数据判断(不再用旧 is_stale 标记,因为已全部重评) + stocks = [] + for (name, code, price, buy_low, buy_high, cur, is_stale) in all_candidates: + # 重评后重新检查 actionability(用新 timing_signal) + sig = code_data.get(code, {}).get("timing_signal", "") + if not is_actionable(cur, sig): + continue + lot = lot_cost(code, price) + ratio = lot / cash if cash > 0 else 999 + stocks.append((name, code, price, buy_low, buy_high, lot, ratio)) + + # 加载portfolio获取持仓信息(A/H去重用) + pf = {"holdings": []} + try: + with open(PORTFOLIO_PATH) as f: + pf = json.load(f) + except Exception: + pass + + stocks.sort(key=lambda s: ( + 0 if len(str(s[1])) == 6 else 1, + -code_data.get(s[1], {}).get("rr_ratio", 0) + )) + + # 只展示有清晰操作信号的个股 + # timing_signal 必须是明确操作方向:买入/加仓/观望/关注/信号不充分 + # 行业描述(行业偏弱/行业偏强/大盘变盘等)不是操作信号,一律跳过 + VALID_SIGNALS = {"买入", "加仓", "观望", "关注", "信号不充分"} + SKIP_KEYWORDS = ["等企稳", "信号不充分"] + + actionable = [] + for s in stocks: + sig = code_data.get(s[1], {}).get("timing_signal", "") + if not sig: + continue + # 跳过非操作信号 + if any(kw in sig for kw in SKIP_KEYWORDS): + continue + # 中性信号跳过 + stripped = sig.strip() + if not stripped or stripped.lower() in ("", "neutral", "持有", "深套持有", "弱势持有"): + continue + # 信号必须含买入/加仓才推荐——其他非操作信号跳过 + if not any(kw in sig for kw in ["买入", "加仓"]): + continue + # 趋势检查:必须不是空头排列(价格在MA5以下且MA5= 5: + theo_pct = 25 + elif rr >= 3: + theo_pct = 18 + elif rr >= 2: + theo_pct = 12 + else: + theo_pct = 8 + if "偏弱" in market_factor: + theo_pct = int(theo_pct * 0.8) + elif "偏强" in market_factor: + theo_pct = int(theo_pct * 1.15) + if cat in ("蓝筹", "白马"): + theo_pct = int(theo_pct * 1.2) + elif cat in ("题材", "短线"): + theo_pct = int(theo_pct * 0.6) + elif cat in ("高波动", "成长"): + theo_pct = int(theo_pct * 0.85) + theo_pct = max(5, min(30, theo_pct)) + + # 当前建议仓位:理论占总资产% → 按现金锁死 + ideal_budget = total_assets * theo_pct / 100 + # 可操作N只时,现金分配不超过 available_cash / n * 1.5 + max_use_cash = (available_cash / max(n, 1)) * 1.5 + budget = min(ideal_budget, max_use_cash, available_cash) + lots = int(budget / lot_cost) if lot_cost > 0 else 0 + + if lots == 0 and lot_cost > 0 and budget > lot_cost * 0.8: + # 预算覆盖超过80%的1手金额 → 至少1手(仅差一档) + lots = 1 + + lot_cost_total = lots * lot_cost + if lots == 0: + pct_actual = 0 + elif total_assets > 0: + pct_actual = round(lot_cost_total / total_assets * 100) + else: + pct_actual = 0 + + if lots == 0: + details = f"预算不足1手({budget:,.0f}/{lot_cost:,.0f}元)" + else: + if len(str(code)) == 5: + hk_lot = hk_lot_size(code) + shares = lots * hk_lot + elif code.startswith("688"): + shares = lots * 200 + else: + shares = lots * 100 + details = f"{lots}手({shares}股,{lot_cost_total:,.0f}元)" + + return theo_pct, pct_actual, details, lots, lot_cost_total + + # ── 换仓评估 ────────────────────────────────────────────────────── + # score_future_outlook 从 stock_scorer 模块导入(6维评分) + + def evaluate_swap(lot_cost_target, rr, sig, tp, sl, name, code, price_in, + total_assets_in, cash_in, pf_in, cd_in): + """现金不足时评估是否卖差票换推荐股。 + + 核心逻辑: + - 已发生的亏损是沉没成本,不参与决策 + - 用6维评分法评估每个持仓的未来前景(基于决策系统既有数据) + - 优先卖前景最差的票,保留前景好的票(无论当前盈亏%) + - 卖港股→买A股需T+2到账,如果推荐此方案则标注延迟风险 + - 对目标票(RR>=3+买入信号)才有换仓资格 + + 返回(推荐文案str, 缺口float)或 (None, gap) + """ + gap = lot_cost_target - cash_in + # 目标票质量门槛 + if rr < 3.0 or gap <= 0 or gap > total_assets_in * 0.5: + return None, gap + if not any(kw in sig for kw in ["买入", "加仓", "建仓"]): + return None, gap + + # 收集持仓数据 + 前景评分 + ph = [] + for h in pf_in.get("holdings", []): + hs = h.get("shares", 0) or 0 + hp = h.get("price", 0) or 0 + hc = h.get("cost", 0) or 0 + if hs <= 0 or hp <= 0: + continue + hmv = hs * hp + # 港股价格已是 CNY(price_monitor 写入时已转),不需要再乘汇率 + hpl_pct = (hp - hc) / hc * 100 if hc else 0 + + # 6维全面评分(越低越差,越建议卖) + fscore, _ = score_future_outlook(h_code, cd_in) + + ph.append({ + "code": h_code, + "name": h.get("name", ""), + "shares": hs, + "price": hp, + "cost": hc, + "mv": round(hmv), + "pl_pct": round(hpl_pct, 1), + "score": fscore, + }) + + # 按前景评分升序(最差的排最前面) + ph.sort(key=lambda x: x["score"]) + + # 打印调试信息:所有持仓的前景评分 + # print(f"[SWAP_DEBUG] 前景评分(越低越差):", file=sys.stderr) + # for x in ph[:10]: + # print(f" {x['name']}({x['code']}) 评分{x['score']} 亏{x['pl_pct']}% 市值{x['mv']:,}", file=sys.stderr) + + # 只考虑评分<=0(前景差或中性偏弱)的作为减仓候选 + candidates = [h for h in ph if h["score"] <= 0] + if not candidates: + return None, gap + + # 贪心选评分最差的,凑够现金缺口(最多2只) + selected = [] + cash_freed = 0 + for h in candidates: + if cash_freed >= gap: + break + cash_freed += h["mv"] + selected.append(h) + + if cash_freed < gap or len(selected) > 2: + return None, gap + + # 计算目标票的预期涨幅 + if tp and tp > 0: + target_gain_pct = (tp - price_in) / price_in * 100 + else: + target_gain_pct = rr * 3 + + # 构建推荐文案 + buy_is_a = not is_hk_stock(code) # 目标是否是A股 + sell_parts = [] + sell_names = [] + settlement_warnings = [] + for h in selected: + # 每个被选股票配一句"为什么卖它" + reason = f"评分{h['score']}" + if h['pl_pct'] <= -30: + reason += "深套" + elif h['pl_pct'] <= -15: + reason += f"亏损{h['pl_pct']}%" + sell_parts.append(f"{h['name']}({h['code']}) {h['shares']}股 亏{h['pl_pct']}% ({reason})") + sell_names.append(h['name']) + # 检查结算延迟:卖港股→买A股 + if is_hk_stock(h['code']) and buy_is_a: + settlement_warnings.append(f"{h['name']}是港股通,卖出需T+2到账才能买A股") + sell_desc = ";".join(sell_parts) + + new_budget = cash_in + cash_freed + new_lots = int(new_budget / lot_cost_target) if lot_cost_target > 0 else 0 + if new_lots == 0: + return None, gap + if code.startswith("688"): + new_shares = new_lots * 200 + elif len(code) <= 5: + new_shares = new_lots * hk_lot_size(code) + else: + new_shares = new_lots * 100 + new_cost = new_lots * lot_cost_target + new_pct = round(new_cost / total_assets_in * 100) if total_assets_in > 0 else 0 + + text = ( + f"换仓建议:卖{sell_desc}" + f"→腾{round(cash_freed):,}元" + f"→买{name}({code}) {new_lots}手({new_shares}股,{round(new_cost):,}元)" + f"占{new_pct}%仓位" + f"(止损{sl}(-{round((price_in-sl)/price_in*100,1)}%)" + f"止盈{tp}(+{round(target_gain_pct,1)}%)" + f" RR={rr})\n" + f" 理由:{', '.join(sell_names)}评分最低," + f"继续持有无积极信号且技术偏弱;" + f"换到有明确信号和止损的标的,预期收益更优。" + ) + if settlement_warnings: + text += "\n ⚠️ " + " | ".join(settlement_warnings) + return text, gap + + # 标准格式:每个可操作标的 — 大盘/行业/个股三面 + 仓位 + lines.append(f"【💡 操作建议】(当前{n}只自选可操作 | 总资产{total_assets:,.0f}元 现金{available_cash:,.0f}元)") + for s in actionable: + name, code, price, buy_low, buy_high, lot, ratio = s + d = code_data.get(code, {}) + sl = d.get("stop_loss", 0) + tp = d.get("take_profit", 0) + rr = d.get("rr_ratio", 0) + sig = d.get("timing_signal", "") + sector = d.get("sector_context", "") + tech = d.get("tech_snapshot", "") + mtf_ctx = d.get("multi_tf_context", "") + note = d.get("note", "") + d_factors = d.get("signal_factors", []) + cat = d.get("stock_category", "") + + # 提取技术位 + ss = {"强撑":"-", "弱撑":"-", "弱压":"-", "强压":"-"} + for tag in ss: + m = re.search(rf'{tag}:([\d.]+)', tech) + if m: + ss[tag] = m.group(1) + + # 基本面 + fund = fund_cache.get(code, {}) + pe = fund.get("pe", 0) + eps = fund.get("eps", 0) + pe_str = f"PE{pe:.0f}" if pe else "" + eps_str = f"EPS{eps:.2f}" if eps else "" + + # 从 signal_factors 提取各维度 + def _match_factor(prefix): + for f in d_factors: + if f.startswith(prefix): + return f + return "" + + market_factor = _match_factor("大盘") + sector_factor = _match_factor("行业") + value_factor = _match_factor("高估值") or _match_factor("低估值") or _match_factor("蓝筹") or pe_str or "" + news_factor = _match_factor("消息") + tech_factor = _match_factor("净利") or _match_factor("组合") or "" + + # 构建分析行 + parts = [] + if market_factor: + parts.append(f"大盘{market_factor.replace('大盘','')}") + if sector_factor: + parts.append(f"行业{sector_factor.replace('行业','')}") + if pe_str or value_factor: + parts.append(value_factor or pe_str) + if news_factor: + parts.append(news_factor) + if not parts: + parts.append(sector or cat or "") + + analysis = " | ".join(p for p in parts if p) + + # 仓位计算 + theo_pct, actual_pct, details, lots, lot_cost_total = calc_position( + lot, rr, market_factor, cat, code + ) + + pfx = "" if len(code) == 6 else "HK$" + + # 取分支动作类型 + branch_action = "hold" + branch_rationale = "" + if st and scenario_id: + try: + results = st.evaluate_branches(code, scenario_id, price, d.get("shares", 0), d.get("cost", 0)) + applicable = [r for r in results if r.get("applicable")] + if applicable: + best = min(applicable, key=lambda r: r.get("priority", 999)) + branch_action = best.get("action_type", "hold") + branch_rationale = best.get("rationale", "") + except Exception: + pass + + # 冷却检查:相同股+相同操作30分钟内不发 + cooled, elapsed, cd_key = in_cooldown(code, branch_action, cooldown) + if cooled: + continue + + # 策略质量过滤:只有正向/中性信号才推操作建议 + bad_keywords = ["偏弱", "弱势", "观望", "卖出", "回避", "回避"] + if any(kw in sig for kw in bad_keywords): + continue + + # 行业背景过滤:行业大跌时不在买入区推荐(即使个股信号好) + if "大跌" in sector: + continue + + # 换仓评估:现金不足时评估是否卖差票换推荐股 + swap_text = None + if lots == 0: + swap_text, _ = evaluate_swap( + lot, rr, sig, tp, sl, name, code, price, + total_assets, available_cash, pf, code_data + ) + + action_tag = "🛒" if (lots > 0 or swap_text) else "⚠️" + + lines.append( + f" {action_tag} {name}({code}) {pfx}{price:.2f} 买区{buy_low}~{buy_high} | " + f"1手{lot:,.0f}元 RR={rr:.1f} 损{sl} 盈{tp}\n" + f" {analysis}\n" + f" 技术{ss['强撑']}→{ss['弱撑']}→{ss['弱压']}→{ss['强压']} | 信号{sig}\n" + f" 仓位:理论{theo_pct}%×总资产 | 建议{actual_pct}%({details})" + ) + + if mtf_ctx: + lines[-1] += f"\n 均线{mtf_ctx}" + + if swap_text: + lines[-1] += f"\n {swap_text}" + + # 分支描述 + branch_line = "" + if branch_action != "hold": + branch_line = f" 【{scenario_label}→{branch_action}】{branch_rationale}" + if branch_line: + lines[-1] += f"\n{branch_line}" + + # 记录推送时间(冷却计时用) + cooldown[cd_key] = now_ts + + save_cooldown(cooldown) + + # 修正可操作数量(剔除冷却跳过后的实际数量) + actual_n = len(lines) - (1 if macro_line else 0) - 1 # 减去市场背景 + 操作建议标题 + if actual_n != n: + # 更新操作建议行 + for i, ln in enumerate(lines): + if "【💡 操作建议】" in ln: + lines[i] = f"【💡 操作建议】(当前{actual_n}只自选可操作 | 总资产{total_assets:,.0f}元 现金{available_cash:,.0f}元)" + break + + if actual_n <= 0: + return 0 # 全部冷却中 → 静默,不推 + + # ── T+2前瞻:扫描近期可能入买区的A股,提前准备现金 ── + t2_lines = [] + try: + dec_t2 = json.loads(open("/home/hmo/web-dashboard/data/decisions.json").read()) + for entry in dec_t2.get("decisions", []): + if entry.get("status") == "closed" or entry.get("type") != "自选策略": + continue + ec = entry["code"] + el = entry.get("entry_low", 0) or 0 + eh = entry.get("entry_high", 0) or 0 + ep = entry.get("price", 0) or 0 + if not eh or not ep or el <= 0: + continue + # A股+价格在买入区上方5%以内(即将进入买入区) + if not is_hk_stock(ec) and el <= ep <= eh * 1.05 and ep > eh: + anticipation_pct = (ep - eh) / eh * 100 + lot = lot_cost(ec, ep) + if lot > available_cash: + # 现金不足 → 卖港股提前准备 + ph = [] + for h in pf.get("holdings", []): + hs = h.get("shares", 0) or 0 + hp = h.get("price", 0) or 0 + hc = h.get("cost", 0) or 0 + if hs <= 0 or hp <= 0 or not is_hk_stock(h.get("code","")): + continue + sc = score_future_outlook(h.get("code",""), code_data) + ph.append((sc, h)) + ph.sort(key=lambda x: x[0]) + if ph: + worst = ph[0][1] + w_name = worst.get("name","?") + w_code = worst.get("code","") + w_price = worst.get("price",0) + w_shares = worst.get("shares",0) + w_value = w_price * w_shares + if w_value >= lot: + name_e = entry.get("name","") + t2_lines.append( + f" ⏳ {name_e}({ec})距买入区仅{anticipation_pct:.0f}%," + f"需{lot:,.0f}元。建议提前卖{w_name}({w_code})" + f"腾{w_value:,.0f}元(T+2到账后可用)" + ) + except: + pass + + if t2_lines: + lines.append("") + lines.append("【⏳ 提前准备(T+2港股提前出清)】") + lines.extend(t2_lines) + + lines.insert(0, f"【知微】自选买入提醒 {now} | 总资产{total_assets:,.0f}元") + out = "\n".join(lines) + print(out) + push_to_xmpp(out) + return 0 + + +if __name__ == "__main__": + sys.exit(main())