Files
MoFin/scripts/stale_push_wlin.py
T
知微 9a702a66f7 A/H跨市场去重:药明康德不再推H股
药明康德(02359) timing_signal=买入、多头排列,但Dad已持有
药明康德A股(603259)。同股同权,H折CNY还贵7.4%。

新增:stale_push_wlin在推荐前检查portfolio中是否有
同名不同代码的持仓。有则跳过推荐。

测试:之前推H股,现在静默(0可操作→无输出)
2026-06-24 15:35:58 +08:00

778 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
stale_push_wlin.py — 按5步逻辑推送自选股买入区提醒 + 自动触发重评
5步逻辑:
1. 筛选 is_watchlist=true 且价在买入区
2. RR<1.5/无止盈位/非买入signal → 标记 STRATEGY_STALE → 触发自动重评
3. 可推的:计算每手买入金额和现金占比
4. 发现 STRATEGY_STALE → 后台跑 per_stock_reassess.py 自动重评
no_agent模式:有推送→输出;无→静默
搭配 cron: no_agent=True, 交易日每30分跑一次
"""
import subprocess
import sys
import re
import json
import os
import threading
import time
from datetime import datetime
try:
from urllib.request import Request, urlopen
except ImportError:
from urllib2 import Request, urlopen
# 6维评分系统
sys.path.insert(0, "/home/hmo/MoFin/scripts")
from stock_scorer import score_future_outlook, is_hk_stock, settlement_delay_note
# ── 趋势检查 ────────────────────────────────────────────────────
def fetch_trend_data(code):
"""取均线数据判断趋势状态。返回 (current_price, ma5, trend_label) 或 None"""
try:
prefix = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk"
url = f"http://qt.gtimg.cn/q={prefix}{code}"
req = Request(url, headers={'User-Agent': 'Mozilla/5.0'})
resp = urlopen(req, timeout=5).read().decode('gbk')
fld = resp.split('=')[1].strip().strip('"').strip(';').split('~')
current = float(fld[3]) if len(fld) > 3 else 0
except:
return None
try:
url = f"http://ifzq.gtimg.cn/appstock/app/fqkline/get?param={prefix}{code},day,,,30,qfq"
req = Request(url, headers={'User-Agent': 'Mozilla/5.0'})
resp = urlopen(req, timeout=5).read().decode('utf-8')
data = json.loads(resp)
day_key = 'qfqday' if prefix != 'hk' else 'day'
bars = data.get('data', {}).get(f'{prefix}{code}', {}).get(day_key, [])
except:
return None
if not bars or current <= 0:
return None
closes = [float(b[2]) for b in bars]
if len(closes) < 5:
return None
def ma(n):
return sum(closes[-n:]) / n
ma5 = ma(5)
ma10 = ma(10) if len(closes) >= 10 else None
ma20 = ma(20) if len(closes) >= 20 else None
# 趋势分析
pct_above_ma5 = (current - ma5) / ma5 * 100
uptrend = False
if ma20 and ma10:
if ma5 > ma10 > ma20:
trend_label = "多头排列"
uptrend = True
elif current < ma5 and ma5 < ma10 and current < ma10:
trend_label = "空头排列"
elif current > ma5 and ma5 > ma10:
trend_label = "短期转强"
uptrend = True
else:
trend_label = "震荡"
if current > ma5 > ma10:
uptrend = True
else:
trend_label = "数据不足"
return {
'price': current,
'ma5': round(ma5, 2),
'ma10': round(ma10, 2) if ma10 else None,
'ma20': round(ma20, 2) if ma20 else None,
'pct_above_ma5': round(pct_above_ma5, 1),
'trend': trend_label,
'uptrend': uptrend,
}
# ── XMPP
XMPP_BRIDGE = "http://127.0.0.1:5805/"
XMPP_USER = "hmo@yoin.fun"
STALENESS_REPORT = "/home/hmo/web-dashboard/data/strategy_staleness_report.json"
DETECTOR = "/home/hmo/.hermes/profiles/position-analyst/scripts/stale_detector.py"
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
REGEN_SCRIPT = "/home/hmo/.hermes/profiles/position-analyst/scripts/per_stock_reassess.py"
REGEN_LOCK = "/tmp/.stale_push_wlin_regen.lock"
MACRO_CTX = "/home/hmo/web-dashboard/data/macro_context.json"
MARKET_JSON = "/home/hmo/web-dashboard/data/market.json"
COOLDOWN_PATH = "/home/hmo/web-dashboard/data/push_cooldown.json"
NON_BUY_SIGNALS = ["观望", "弱势持有", "深套持有"]
def load_macro_line():
"""加载大盘和市场的简要描述"""
parts = []
try:
with open(MACRO_CTX) as f:
m = json.load(f).get("structure", {})
overall = m.get("overall", "neutral")
desc = m.get("description", "")
if "bearish" in overall:
parts.append("大盘偏弱")
elif overall == "bullish":
parts.append("大盘偏强")
elif desc:
parts.append(f"大盘{desc}")
except Exception:
pass
try:
with open(MARKET_JSON) as f:
mk = json.load(f)
mood = mk.get("mood", "")
if mood:
parts.append(f"市场{mood}")
except Exception:
pass
return " | ".join(parts) if parts else ""
def is_actionable(cur, timing_signal=""):
"""检查信号是否可操作。空文本/含非买入关键词 → 不可操作"""
if not cur and not timing_signal:
return False # 空文本默认不安全
for kw in NON_BUY_SIGNALS:
if cur and kw.lower() in cur.lower():
return False
if timing_signal and kw.lower() in timing_signal.lower():
return False
return True
def trigger_regen_sync(stock_codes=None):
"""同步执行指定个股的重评(等重评完再发报告)"""
if not stock_codes:
return
try:
cmd = ["python3", REGEN_SCRIPT] + stock_codes
subprocess.run(cmd, capture_output=True, text=True, timeout=60)
except subprocess.TimeoutExpired:
print("[REGEN] 重评超时(60s", file=sys.stderr)
except Exception as e:
print(f"[REGEN] 重评失败: {e}", file=sys.stderr)
def load_cash():
"""从 portfolio.json 实时读现金,不硬编码"""
try:
with open(PORTFOLIO_PATH) as f:
data = json.load(f)
if isinstance(data, dict):
return data.get("cash", 0)
if isinstance(data, list) and len(data) > 1 and isinstance(data[1], dict):
return data[1].get("cash", 0)
return 0
except Exception:
return 0
_HK_LOT_CACHE = {}
def hk_lot_size(code):
"""从腾讯行情API获取港股实际每手股数(字段[60]),带缓存"""
if code in _HK_LOT_CACHE:
return _HK_LOT_CACHE[code]
try:
url = f"http://qt.gtimg.cn/q=hk{code}"
req = Request(url, headers={"User-Agent": "curl/7.81"})
with urlopen(req, timeout=5) as r:
text = r.read().decode("gbk")
raw = text.split("=", 1)[1].strip().strip('"').strip(";")
fld = raw.split("~")
lot = int(fld[60]) if len(fld) > 60 and fld[60] else 1000
_HK_LOT_CACHE[code] = lot
return lot
except Exception:
_HK_LOT_CACHE[code] = 1000
return 1000
def lot_cost(code, price):
if str(code).startswith("688"):
return 200 * price
elif len(str(code)) == 5:
lot = hk_lot_size(code)
try:
sys.path.insert(0, '/home/hmo/MoFin')
from hk_rate import hkd_to_cny
rate = hkd_to_cny()
except Exception:
rate = 0.87
return int(lot * price * rate)
else:
return 100 * price
def push_to_xmpp(text):
"""通过知微 HTTP bridge 推送到老爸私信"""
if not text.strip():
return
try:
payload = json.dumps({
"to": XMPP_USER,
"body": text.strip(),
"type": "chat",
}).encode("utf-8")
req = Request(XMPP_BRIDGE, data=payload, headers={"Content-Type": "application/json"})
urlopen(req, timeout=5)
except Exception as e:
print(f"[XMPP推送失败] {e}", file=sys.stderr)
def load_cooldown():
try:
with open(COOLDOWN_PATH) as f:
return json.load(f)
except Exception:
return {}
def save_cooldown(cd):
try:
with open(COOLDOWN_PATH, "w") as f:
json.dump(cd, f, indent=2)
except Exception:
pass
def in_cooldown(code, action_type, cooldown_dict, minutes=30):
key = f"{code}_{action_type}"
last = cooldown_dict.get(key, 0)
elapsed = time.time() - last
return elapsed < minutes * 60, elapsed, key
def main():
r = subprocess.run(
["python3", DETECTOR], capture_output=True, text=True, timeout=60
)
if r.returncode != 0 and r.stderr:
print(f"[stderr] {r.stderr.strip()}", file=sys.stderr)
wl_lines = [
l for l in r.stdout.split("\n")
if "[WL_IN]" in l and "[自选]" in l
]
if not wl_lines:
return 0
# 读 stale report
try:
with open(STALENESS_REPORT) as f:
report = json.load(f)
except Exception:
report = {"flagged": []}
code_cur = {i["code"]: i.get("current", "") for i in report.get("flagged", [])}
# 加载冷却状态
cooldown = load_cooldown()
now_ts = time.time()
# 读 decisions.json 获取完整策略数据
code_data = {}
try:
with open("/home/hmo/web-dashboard/data/decisions.json") as f:
dec = json.load(f)
for e in dec.get("decisions", []):
code_data[e["code"]] = e
except Exception:
pass
cash = load_cash()
stocks = []
stale_list = []
for l in wl_lines:
m = re.match(r'\[WL_IN\](?:\s+\[\w+\])*\s+\[自选\]\s+(\S+)\((\d+)\)', l)
if not m:
continue
name, code = m.group(1), m.group(2)
pm = re.search(r'价(\d+\.\d{2})', l)
if not pm:
continue
price = float(pm.group(1))
zm = re.search(r'买入([\d.]+)~([\d.]+)', l)
if not zm:
continue
buy_low, buy_high = float(zm.group(1)), float(zm.group(2))
is_stale = "[STRATEGY_STALE]" in l
cur = code_cur.get(code, "")
if not is_actionable(cur, code_data.get(code, {}).get("timing_signal", "")) or is_stale:
stale_list.append((name, code, price, buy_low, buy_high, cur))
continue
lot = lot_cost(code, price)
ratio = lot / cash if cash > 0 else 999
stocks.append((name, code, price, buy_low, buy_high, lot, ratio))
if not stocks and not stale_list:
return 0
now = datetime.now().strftime("%H:%M")
lines = []
# 市场背景
macro_line = load_macro_line()
if macro_line:
lines.append(f"【市场背景】{macro_line}")
# [重评] 内部流程 — 不在报告中展示,只执行重评
if stale_list:
stale_codes = [s[1] for s in stale_list]
trigger_regen_sync(stale_codes)
# 重评完成,re-read decisions.json
code_data = {}
try:
with open("/home/hmo/web-dashboard/data/decisions.json") as f:
dec = json.load(f)
for e in dec.get("decisions", []):
code_data[e["code"]] = e
except Exception:
pass
# 加载portfolio获取持仓信息(A/H去重用)
pf = {"holdings": []}
try:
with open(PORTFOLIO_PATH) as f:
pf = json.load(f)
except Exception:
pass
stocks.sort(key=lambda s: (
0 if len(str(s[1])) == 6 else 1,
-code_data.get(s[1], {}).get("rr_ratio", 0)
))
# 只展示有清晰操作信号的个股
# timing_signal 必须是明确操作方向:买入/加仓/观望/关注/信号不充分
# 行业描述(行业偏弱/行业偏强/大盘变盘等)不是操作信号,一律跳过
VALID_SIGNALS = {"买入", "加仓", "观望", "关注", "信号不充分"}
SKIP_KEYWORDS = ["等企稳", "信号不充分"]
actionable = []
for s in stocks:
sig = code_data.get(s[1], {}).get("timing_signal", "")
if not sig:
continue
# 跳过非操作信号
if any(kw in sig for kw in SKIP_KEYWORDS):
continue
# 中性信号跳过
stripped = sig.strip()
if not stripped or stripped.lower() in ("", "neutral", "持有", "深套持有", "弱势持有"):
continue
# 信号必须含买入/加仓才推荐——其他非操作信号跳过
if not any(kw in sig for kw in ["买入", "加仓"]):
continue
# 趋势检查:必须不是空头排列(价格在MA5以下且MA5<MA10
trend = fetch_trend_data(s[1])
if trend:
if not trend['uptrend']:
# 空头排列或弱势震荡,不推荐
continue
# (如果趋势数据获取失败,放行—不因数据问题错杀)
# A/H跨市场去重:同一公司已有持仓,不推荐另一市场的品种
name_s1 = code_data.get(s[1], {}).get("name", "") or s[0]
skip_ah = False
for h in pf.get("holdings", []):
if h["code"] == s[1]:
continue # 同一代码,不跳过
if h.get("name", "") == name_s1:
skip_ah = True
break
if skip_ah:
continue # 同一公司已在另一市场持有,不推荐
actionable.append(s)
if not actionable:
return 0 # 无操作信号 → 静默,不推
# 加载基本面缓存(PE等)
fund_cache = {}
try:
with open("/home/hmo/web-dashboard/data/multi_tf_cache.json") as f:
mtf = json.load(f)
for code, v in mtf.items():
fund_cache[code] = v.get("fundamentals", {})
except Exception:
pass
# 仓位计算:从holding.xls导入的portfolio.json读取总资产和现金
n = len(actionable)
total_assets = 0
available_cash = 0
try:
with open("/home/hmo/web-dashboard/data/portfolio.json") as f:
pf = json.load(f)
available_cash = pf.get("cash", 0) or 0
# 直接取 portfolio.json 的总资产(导入时已做港币→人民币换算)
total_assets = pf.get("total_assets", 0) or 0
if total_assets <= 0:
# fallback: 手动算
for h in pf.get("holdings", []):
mv = h.get("shares", 0) * h.get("price", 0)
if len(str(h.get("code", ""))) <= 5: # 港股
mv *= 0.866
total_assets += mv
total_assets += available_cash
except Exception:
total_assets = available_cash * 5 # fallback
# 加载策略树模块(获取当前情景+分支评估)
st = None
scenario_id = ""
scenario_label = ""
try:
import importlib.util
spec = importlib.util.spec_from_file_location("st_module", "/home/hmo/MoFin/strategy_tree.py")
st = importlib.util.module_from_spec(spec)
spec.loader.exec_module(st)
sc = st.detect_scenario()
scenario_id = sc.get("id", "")
scenario_label = sc.get("label", "")
except Exception:
pass
def calc_position(lot_cost, rr, market_factor, cat, code=""):
# 理论推荐仓位(% of 总资产) — 仅基于RR+市场+品种,不受现金限制
if rr >= 5:
theo_pct = 25
elif rr >= 3:
theo_pct = 18
elif rr >= 2:
theo_pct = 12
else:
theo_pct = 8
if "偏弱" in market_factor:
theo_pct = int(theo_pct * 0.8)
elif "偏强" in market_factor:
theo_pct = int(theo_pct * 1.15)
if cat in ("蓝筹", "白马"):
theo_pct = int(theo_pct * 1.2)
elif cat in ("题材", "短线"):
theo_pct = int(theo_pct * 0.6)
elif cat in ("高波动", "成长"):
theo_pct = int(theo_pct * 0.85)
theo_pct = max(5, min(30, theo_pct))
# 当前建议仓位:理论占总资产% → 按现金锁死
ideal_budget = total_assets * theo_pct / 100
# 可操作N只时,现金分配不超过 available_cash / n * 1.5
max_use_cash = (available_cash / max(n, 1)) * 1.5
budget = min(ideal_budget, max_use_cash, available_cash)
lots = int(budget / lot_cost) if lot_cost > 0 else 0
if lots == 0 and lot_cost > 0 and budget > lot_cost * 0.8:
# 预算覆盖超过80%的1手金额 → 至少1手(仅差一档)
lots = 1
lot_cost_total = lots * lot_cost
if lots == 0:
pct_actual = 0
elif total_assets > 0:
pct_actual = round(lot_cost_total / total_assets * 100)
else:
pct_actual = 0
if lots == 0:
details = f"预算不足1手({budget:,.0f}/{lot_cost:,.0f}元)"
else:
if len(str(code)) == 5:
hk_lot = hk_lot_size(code)
shares = lots * hk_lot
elif code.startswith("688"):
shares = lots * 200
else:
shares = lots * 100
details = f"{lots}手({shares}股,{lot_cost_total:,.0f}元)"
return theo_pct, pct_actual, details, lots, lot_cost_total
# ── 换仓评估 ──────────────────────────────────────────────────────
# score_future_outlook 从 stock_scorer 模块导入(6维评分)
def evaluate_swap(lot_cost_target, rr, sig, tp, sl, name, code, price_in,
total_assets_in, cash_in, pf_in, cd_in):
"""现金不足时评估是否卖差票换推荐股。
核心逻辑:
- 已发生的亏损是沉没成本,不参与决策
- 用6维评分法评估每个持仓的未来前景(基于决策系统既有数据)
- 优先卖前景最差的票,保留前景好的票(无论当前盈亏%
- 卖港股→买A股需T+2到账,如果推荐此方案则标注延迟风险
- 对目标票(RR>=3+买入信号)才有换仓资格
返回(推荐文案str, 缺口float)或 (None, gap)
"""
gap = lot_cost_target - cash_in
# 目标票质量门槛
if rr < 3.0 or gap <= 0 or gap > total_assets_in * 0.5:
return None, gap
if not any(kw in sig for kw in ["买入", "加仓", "建仓"]):
return None, gap
# 收集持仓数据 + 前景评分
ph = []
for h in pf_in.get("holdings", []):
hs = h.get("shares", 0) or 0
hp = h.get("price", 0) or 0
hc = h.get("cost", 0) or 0
if hs <= 0 or hp <= 0:
continue
hmv = hs * hp
h_code = str(h.get("code", ""))
if len(h_code) <= 5:
hmv *= 0.866 # approximate HKD→CNY
hpl_pct = (hp - hc) / hc * 100 if hc else 0
# 6维全面评分(越低越差,越建议卖)
fscore, _ = score_future_outlook(h_code, cd_in)
ph.append({
"code": h_code,
"name": h.get("name", ""),
"shares": hs,
"price": hp,
"cost": hc,
"mv": round(hmv),
"pl_pct": round(hpl_pct, 1),
"score": fscore,
})
# 按前景评分升序(最差的排最前面)
ph.sort(key=lambda x: x["score"])
# 打印调试信息:所有持仓的前景评分
# print(f"[SWAP_DEBUG] 前景评分(越低越差):", file=sys.stderr)
# for x in ph[:10]:
# print(f" {x['name']}({x['code']}) 评分{x['score']} 亏{x['pl_pct']}% 市值{x['mv']:,}", file=sys.stderr)
# 只考虑评分<=0(前景差或中性偏弱)的作为减仓候选
candidates = [h for h in ph if h["score"] <= 0]
if not candidates:
return None, gap
# 贪心选评分最差的,凑够现金缺口(最多2只)
selected = []
cash_freed = 0
for h in candidates:
if cash_freed >= gap:
break
cash_freed += h["mv"]
selected.append(h)
if cash_freed < gap or len(selected) > 2:
return None, gap
# 计算目标票的预期涨幅
if tp and tp > 0:
target_gain_pct = (tp - price_in) / price_in * 100
else:
target_gain_pct = rr * 3
# 构建推荐文案
buy_is_a = not is_hk_stock(code) # 目标是否是A股
sell_parts = []
sell_names = []
settlement_warnings = []
for h in selected:
# 每个被选股票配一句"为什么卖它"
reason = f"评分{h['score']}"
if h['pl_pct'] <= -30:
reason += "深套"
elif h['pl_pct'] <= -15:
reason += f"亏损{h['pl_pct']}%"
sell_parts.append(f"{h['name']}({h['code']}) {h['shares']}股 亏{h['pl_pct']}% ({reason})")
sell_names.append(h['name'])
# 检查结算延迟:卖港股→买A股
if is_hk_stock(h['code']) and buy_is_a:
settlement_warnings.append(f"{h['name']}是港股通,卖出需T+2到账才能买A股")
sell_desc = "".join(sell_parts)
new_budget = cash_in + cash_freed
new_lots = int(new_budget / lot_cost_target) if lot_cost_target > 0 else 0
if new_lots == 0:
return None, gap
if code.startswith("688"):
new_shares = new_lots * 200
elif len(code) <= 5:
new_shares = new_lots * hk_lot_size(code)
else:
new_shares = new_lots * 100
new_cost = new_lots * lot_cost_target
new_pct = round(new_cost / total_assets_in * 100) if total_assets_in > 0 else 0
text = (
f"换仓建议:卖{sell_desc}"
f"→腾{round(cash_freed):,}"
f"→买{name}({code}) {new_lots}手({new_shares}股,{round(new_cost):,}元)"
f"{new_pct}%仓位"
f"(止损{sl}(-{round((price_in-sl)/price_in*100,1)}%)"
f"止盈{tp}(+{round(target_gain_pct,1)}%)"
f" RR={rr}\n"
f" 理由:{', '.join(sell_names)}评分最低,"
f"继续持有无积极信号且技术偏弱;"
f"换到有明确信号和止损的标的,预期收益更优。"
)
if settlement_warnings:
text += "\n ⚠️ " + " | ".join(settlement_warnings)
return text, gap
# 标准格式:每个可操作标的 — 大盘/行业/个股三面 + 仓位
lines.append(f"【💡 操作建议】(当前{n}只自选可操作 | 总资产{total_assets:,.0f}元 现金{available_cash:,.0f}元)")
for s in actionable:
name, code, price, buy_low, buy_high, lot, ratio = s
d = code_data.get(code, {})
sl = d.get("stop_loss", 0)
tp = d.get("take_profit", 0)
rr = d.get("rr_ratio", 0)
sig = d.get("timing_signal", "")
sector = d.get("sector_context", "")
tech = d.get("tech_snapshot", "")
note = d.get("note", "")
d_factors = d.get("signal_factors", [])
cat = d.get("stock_category", "")
# 提取技术位
ss = {"强撑":"-", "弱撑":"-", "弱压":"-", "强压":"-"}
for tag in ss:
m = re.search(rf'{tag}:([\d.]+)', tech)
if m:
ss[tag] = m.group(1)
# 基本面
fund = fund_cache.get(code, {})
pe = fund.get("pe", 0)
eps = fund.get("eps", 0)
pe_str = f"PE{pe:.0f}" if pe else ""
eps_str = f"EPS{eps:.2f}" if eps else ""
# 从 signal_factors 提取各维度
def _match_factor(prefix):
for f in d_factors:
if f.startswith(prefix):
return f
return ""
market_factor = _match_factor("大盘")
sector_factor = _match_factor("行业")
value_factor = _match_factor("高估值") or _match_factor("低估值") or _match_factor("蓝筹") or pe_str or ""
news_factor = _match_factor("消息")
tech_factor = _match_factor("净利") or _match_factor("组合") or ""
# 构建分析行
parts = []
if market_factor:
parts.append(f"大盘{market_factor.replace('大盘','')}")
if sector_factor:
parts.append(f"行业{sector_factor.replace('行业','')}")
if pe_str or value_factor:
parts.append(value_factor or pe_str)
if news_factor:
parts.append(news_factor)
if not parts:
parts.append(sector or cat or "")
analysis = " | ".join(p for p in parts if p)
# 仓位计算
theo_pct, actual_pct, details, lots, lot_cost_total = calc_position(
lot, rr, market_factor, cat, code
)
pfx = "" if len(code) == 6 else "HK$"
# 取分支动作类型
branch_action = "hold"
branch_rationale = ""
if st and scenario_id:
try:
results = st.evaluate_branches(code, scenario_id, price, d.get("shares", 0), d.get("cost", 0))
applicable = [r for r in results if r.get("applicable")]
if applicable:
best = min(applicable, key=lambda r: r.get("priority", 999))
branch_action = best.get("action_type", "hold")
branch_rationale = best.get("rationale", "")
except Exception:
pass
# 冷却检查:相同股+相同操作30分钟内不发
cooled, elapsed, cd_key = in_cooldown(code, branch_action, cooldown)
if cooled:
continue
# 策略质量过滤:只有正向/中性信号才推操作建议
bad_keywords = ["偏弱", "弱势", "观望", "卖出", "回避", "回避"]
if any(kw in sig for kw in bad_keywords):
continue
# 行业背景过滤:行业大跌时不在买入区推荐(即使个股信号好)
if "大跌" in sector:
continue
# 换仓评估:现金不足时评估是否卖差票换推荐股
swap_text = None
if lots == 0:
swap_text, _ = evaluate_swap(
lot, rr, sig, tp, sl, name, code, price,
total_assets, available_cash, pf, code_data
)
action_tag = "🛒" if (lots > 0 or swap_text) else "⚠️"
lines.append(
f" {action_tag} {name}({code}) {pfx}{price:.2f} 买区{buy_low}~{buy_high} | "
f"1手{lot:,.0f}元 RR={rr:.1f}{sl}{tp}\n"
f" {analysis}\n"
f" 技术{ss['强撑']}{ss['弱撑']}{ss['弱压']}{ss['强压']} | 信号{sig}\n"
f" 仓位:理论{theo_pct}%×总资产 | 建议{actual_pct}%{details}"
)
if swap_text:
lines[-1] += f"\n {swap_text}"
# 分支描述
branch_line = ""
if branch_action != "hold":
branch_line = f"{scenario_label}{branch_action}{branch_rationale}"
if branch_line:
lines[-1] += f"\n{branch_line}"
# 记录推送时间(冷却计时用)
cooldown[cd_key] = now_ts
save_cooldown(cooldown)
# 修正可操作数量(剔除冷却跳过后的实际数量)
actual_n = len(lines) - (1 if macro_line else 0) - 1 # 减去市场背景 + 操作建议标题
if actual_n != n:
# 更新操作建议行
for i, ln in enumerate(lines):
if "【💡 操作建议】" in ln:
lines[i] = f"【💡 操作建议】(当前{actual_n}只自选可操作 | 总资产{total_assets:,.0f}元 现金{available_cash:,.0f}元)"
break
if actual_n <= 0:
return 0 # 全部冷却中 → 静默,不推
lines.insert(0, f"【知微】自选买入提醒 {now} | 总资产{total_assets:,.0f}")
out = "\n".join(lines)
print(out)
push_to_xmpp(out)
return 0
if __name__ == "__main__":
sys.exit(main())