Files
MoFin/scripts/stale_push_wlin.py
T
知微 92815aac06 换仓评估逻辑:现金不足时自动分析卖差票换推荐股
新增 evaluate_swap() 函数:
1. 仅对RR>=2.0且含买入关键词的强信号触发
2. 扫描持仓按亏损排序,找最少卖出组合凑现金缺口
3. 预期盈利 > 锁定亏损×1.5 才推荐切换
4. 最多卖3只,单次换仓不超总资产50%
5. 不划算时维持原预算不足1手消息

已验证:海博思创(688411) RR=5.6但预期盈利9k<锁定亏损46k×1.5
→ 不推荐切换,正确。沐曦如果触发也会按同一逻辑判断。
2026-06-24 11:42:26 +08:00

609 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
stale_push_wlin.py — 按5步逻辑推送自选股买入区提醒 + 自动触发重评
5步逻辑:
1. 筛选 is_watchlist=true 且价在买入区
2. RR<1.5/无止盈位/非买入signal → 标记 STRATEGY_STALE → 触发自动重评
3. 可推的:计算每手买入金额和现金占比
4. 发现 STRATEGY_STALE → 后台跑 per_stock_reassess.py 自动重评
no_agent模式:有推送→输出;无→静默
搭配 cron: no_agent=True, 交易日每30分跑一次
"""
import subprocess
import sys
import re
import json
import os
import threading
import time
from datetime import datetime
try:
from urllib.request import Request, urlopen
except ImportError:
from urllib2 import Request, urlopen
XMPP_BRIDGE = "http://127.0.0.1:5805/"
XMPP_USER = "hmo@yoin.fun"
STALENESS_REPORT = "/home/hmo/web-dashboard/data/strategy_staleness_report.json"
DETECTOR = "/home/hmo/.hermes/profiles/position-analyst/scripts/stale_detector.py"
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
REGEN_SCRIPT = "/home/hmo/.hermes/profiles/position-analyst/scripts/per_stock_reassess.py"
REGEN_LOCK = "/tmp/.stale_push_wlin_regen.lock"
MACRO_CTX = "/home/hmo/web-dashboard/data/macro_context.json"
MARKET_JSON = "/home/hmo/web-dashboard/data/market.json"
COOLDOWN_PATH = "/home/hmo/web-dashboard/data/push_cooldown.json"
NON_BUY_SIGNALS = ["观望", "弱势持有", "深套持有"]
def load_macro_line():
"""加载大盘和市场的简要描述"""
parts = []
try:
with open(MACRO_CTX) as f:
m = json.load(f).get("structure", {})
overall = m.get("overall", "neutral")
desc = m.get("description", "")
if "bearish" in overall:
parts.append("大盘偏弱")
elif overall == "bullish":
parts.append("大盘偏强")
elif desc:
parts.append(f"大盘{desc}")
except Exception:
pass
try:
with open(MARKET_JSON) as f:
mk = json.load(f)
mood = mk.get("mood", "")
if mood:
parts.append(f"市场{mood}")
except Exception:
pass
return " | ".join(parts) if parts else ""
def is_actionable(cur, timing_signal=""):
"""检查信号是否可操作。空文本/含非买入关键词 → 不可操作"""
if not cur and not timing_signal:
return False # 空文本默认不安全
for kw in NON_BUY_SIGNALS:
if cur and kw.lower() in cur.lower():
return False
if timing_signal and kw.lower() in timing_signal.lower():
return False
return True
def trigger_regen_sync(stock_codes=None):
"""同步执行指定个股的重评(等重评完再发报告)"""
if not stock_codes:
return
try:
cmd = ["python3", REGEN_SCRIPT] + stock_codes
subprocess.run(cmd, capture_output=True, text=True, timeout=60)
except subprocess.TimeoutExpired:
print("[REGEN] 重评超时(60s", file=sys.stderr)
except Exception as e:
print(f"[REGEN] 重评失败: {e}", file=sys.stderr)
def load_cash():
"""从 portfolio.json 实时读现金,不硬编码"""
try:
with open(PORTFOLIO_PATH) as f:
data = json.load(f)
if isinstance(data, dict):
return data.get("cash", 0)
if isinstance(data, list) and len(data) > 1 and isinstance(data[1], dict):
return data[1].get("cash", 0)
return 0
except Exception:
return 0
_HK_LOT_CACHE = {}
def hk_lot_size(code):
"""从腾讯行情API获取港股实际每手股数(字段[60]),带缓存"""
if code in _HK_LOT_CACHE:
return _HK_LOT_CACHE[code]
try:
url = f"http://qt.gtimg.cn/q=hk{code}"
req = Request(url, headers={"User-Agent": "curl/7.81"})
with urlopen(req, timeout=5) as r:
text = r.read().decode("gbk")
raw = text.split("=", 1)[1].strip().strip('"').strip(";")
fld = raw.split("~")
lot = int(fld[60]) if len(fld) > 60 and fld[60] else 1000
_HK_LOT_CACHE[code] = lot
return lot
except Exception:
_HK_LOT_CACHE[code] = 1000
return 1000
def lot_cost(code, price):
if str(code).startswith("688"):
return 200 * price
elif len(str(code)) == 5:
lot = hk_lot_size(code)
try:
sys.path.insert(0, '/home/hmo/MoFin')
from hk_rate import hkd_to_cny
rate = hkd_to_cny()
except Exception:
rate = 0.87
return int(lot * price * rate)
else:
return 100 * price
def push_to_xmpp(text):
"""通过知微 HTTP bridge 推送到老爸私信"""
if not text.strip():
return
try:
payload = json.dumps({
"to": XMPP_USER,
"body": text.strip(),
"type": "chat",
}).encode("utf-8")
req = Request(XMPP_BRIDGE, data=payload, headers={"Content-Type": "application/json"})
urlopen(req, timeout=5)
except Exception as e:
print(f"[XMPP推送失败] {e}", file=sys.stderr)
def load_cooldown():
try:
with open(COOLDOWN_PATH) as f:
return json.load(f)
except Exception:
return {}
def save_cooldown(cd):
try:
with open(COOLDOWN_PATH, "w") as f:
json.dump(cd, f, indent=2)
except Exception:
pass
def in_cooldown(code, action_type, cooldown_dict, minutes=30):
key = f"{code}_{action_type}"
last = cooldown_dict.get(key, 0)
elapsed = time.time() - last
return elapsed < minutes * 60, elapsed, key
def main():
r = subprocess.run(
["python3", DETECTOR], capture_output=True, text=True, timeout=60
)
if r.returncode != 0 and r.stderr:
print(f"[stderr] {r.stderr.strip()}", file=sys.stderr)
wl_lines = [
l for l in r.stdout.split("\n")
if "[WL_IN]" in l and "[自选]" in l
]
if not wl_lines:
return 0
# 读 stale report
try:
with open(STALENESS_REPORT) as f:
report = json.load(f)
except Exception:
report = {"flagged": []}
code_cur = {i["code"]: i.get("current", "") for i in report.get("flagged", [])}
# 加载冷却状态
cooldown = load_cooldown()
now_ts = time.time()
# 读 decisions.json 获取完整策略数据
code_data = {}
try:
with open("/home/hmo/web-dashboard/data/decisions.json") as f:
dec = json.load(f)
for e in dec.get("decisions", []):
code_data[e["code"]] = e
except Exception:
pass
cash = load_cash()
stocks = []
stale_list = []
for l in wl_lines:
m = re.match(r'\[WL_IN\](?:\s+\[\w+\])*\s+\[自选\]\s+(\S+)\((\d+)\)', l)
if not m:
continue
name, code = m.group(1), m.group(2)
pm = re.search(r'价(\d+\.\d{2})', l)
if not pm:
continue
price = float(pm.group(1))
zm = re.search(r'买入([\d.]+)~([\d.]+)', l)
if not zm:
continue
buy_low, buy_high = float(zm.group(1)), float(zm.group(2))
is_stale = "[STRATEGY_STALE]" in l
cur = code_cur.get(code, "")
if not is_actionable(cur, code_data.get(code, {}).get("timing_signal", "")) or is_stale:
stale_list.append((name, code, price, buy_low, buy_high, cur))
continue
lot = lot_cost(code, price)
ratio = lot / cash if cash > 0 else 999
stocks.append((name, code, price, buy_low, buy_high, lot, ratio))
if not stocks and not stale_list:
return 0
now = datetime.now().strftime("%H:%M")
lines = []
# 市场背景
macro_line = load_macro_line()
if macro_line:
lines.append(f"【市场背景】{macro_line}")
# [重评] 内部流程 — 不在报告中展示,只执行重评
if stale_list:
stale_codes = [s[1] for s in stale_list]
trigger_regen_sync(stale_codes)
# 重评完成,re-read decisions.json
code_data = {}
try:
with open("/home/hmo/web-dashboard/data/decisions.json") as f:
dec = json.load(f)
for e in dec.get("decisions", []):
code_data[e["code"]] = e
except Exception:
pass
stocks.sort(key=lambda s: (
0 if len(str(s[1])) == 6 else 1,
-code_data.get(s[1], {}).get("rr_ratio", 0)
))
# 只展示有清晰操作信号的个股:不含"等企稳""关注""信号不充分""neutral"及纯持有信号
SKIP_KEYWORDS = ["等企稳", "关注", "信号不充分"]
BUY_KEYWORDS = ["买入", "加仓"]
actionable = []
for s in stocks:
sig = code_data.get(s[1], {}).get("timing_signal", "")
if not sig:
continue
# 跳过非操作信号
if any(kw in sig for kw in SKIP_KEYWORDS):
continue
# 中性信号跳过
stripped = sig.strip().lower()
if not stripped or stripped in ("", "neutral", "持有", "深套持有", "弱势持有"):
continue
actionable.append(s)
if not actionable:
return 0 # 无操作信号 → 静默,不推
# 加载基本面缓存(PE等)
fund_cache = {}
try:
with open("/home/hmo/web-dashboard/data/multi_tf_cache.json") as f:
mtf = json.load(f)
for code, v in mtf.items():
fund_cache[code] = v.get("fundamentals", {})
except Exception:
pass
# 仓位计算:从holding.xls导入的portfolio.json读取总资产和现金
n = len(actionable)
total_assets = 0
available_cash = 0
try:
with open("/home/hmo/web-dashboard/data/portfolio.json") as f:
pf = json.load(f)
available_cash = pf.get("cash", 0) or 0
for h in pf.get("holdings", []):
mv = h.get("shares", 0) * h.get("price", 0)
if h.get("currency") == "HKD":
mv *= h.get("exchange_rate", 0.866)
total_assets += mv
total_assets += available_cash
except Exception:
total_assets = available_cash * 5 # fallback
# 加载策略树模块(获取当前情景+分支评估)
st = None
scenario_id = ""
scenario_label = ""
try:
import importlib.util
spec = importlib.util.spec_from_file_location("st_module", "/home/hmo/MoFin/strategy_tree.py")
st = importlib.util.module_from_spec(spec)
spec.loader.exec_module(st)
sc = st.detect_scenario()
scenario_id = sc.get("id", "")
scenario_label = sc.get("label", "")
except Exception:
pass
def calc_position(lot_cost, rr, market_factor, cat, code=""):
# 理论推荐仓位(% of 总资产) — 仅基于RR+市场+品种,不受现金限制
if rr >= 5:
theo_pct = 25
elif rr >= 3:
theo_pct = 18
elif rr >= 2:
theo_pct = 12
else:
theo_pct = 8
if "偏弱" in market_factor:
theo_pct = int(theo_pct * 0.8)
elif "偏强" in market_factor:
theo_pct = int(theo_pct * 1.15)
if cat in ("蓝筹", "白马"):
theo_pct = int(theo_pct * 1.2)
elif cat in ("题材", "短线"):
theo_pct = int(theo_pct * 0.6)
elif cat in ("高波动", "成长"):
theo_pct = int(theo_pct * 0.85)
theo_pct = max(5, min(30, theo_pct))
# 当前建议仓位:理论占总资产% → 按现金锁死
ideal_budget = total_assets * theo_pct / 100
# 可操作N只时,现金分配不超过 available_cash / n * 1.5
max_use_cash = (available_cash / max(n, 1)) * 1.5
budget = min(ideal_budget, max_use_cash, available_cash)
lots = int(budget / lot_cost) if lot_cost > 0 else 0
if lots == 0 and lot_cost > 0 and budget > lot_cost * 0.8:
# 预算覆盖超过80%的1手金额 → 至少1手(仅差一档)
lots = 1
lot_cost_total = lots * lot_cost
if lots == 0:
pct_actual = 0
elif total_assets > 0:
pct_actual = round(lot_cost_total / total_assets * 100)
else:
pct_actual = 0
if lots == 0:
details = f"预算不足1手({budget:,.0f}/{lot_cost:,.0f}元)"
else:
if len(str(code)) == 5:
hk_lot = hk_lot_size(code)
shares = lots * hk_lot
elif code.startswith("688"):
shares = lots * 200
else:
shares = lots * 100
details = f"{lots}手({shares}股,{lot_cost_total:,.0f}元)"
return theo_pct, pct_actual, details, lots, lot_cost_total
# ── 换仓评估 ──────────────────────────────────────────────────────
def evaluate_swap(lot_cost_target, rr, sig, tp, sl, name, code, price_in, total_assets_in, cash_in, pf_in):
"""现金不足时评估是否卖差票换推荐股。仅在目标信号强时触发。
返回(推荐文案str, 需补充的现金缺口float)或 (None, gap)"""
gap = lot_cost_target - cash_in
if rr < 2.0 or gap <= 0 or gap > total_assets_in * 0.5:
return None, gap
if not any(kw in sig for kw in ["买入", "加仓", "建仓", ""]):
return None, gap
# 收集持仓,按盈亏率升序(最差排前)
ph = []
for h in pf_in.get("holdings", []):
hs = h.get("shares", 0) or 0
hp = h.get("price", 0) or 0
hc = h.get("cost", 0) or 0
if hs <= 0 or hp <= 0:
continue
hmv = hs * hp
h_code = str(h.get("code", ""))
if len(h_code) <= 5:
hmv *= 0.866
hpl = (hp - hc) * hs
hpl_pct = (hp - hc) / hc * 100 if hc else 0
ph.append({"code": h_code, "name": h.get("name", ""),
"shares": hs, "price": hp, "cost": hc,
"mv": round(hmv), "pl": round(hpl),
"pl_pct": round(hpl_pct, 1)})
ph.sort(key=lambda x: x["pl_pct"])
candidates = [h for h in ph if h["pl_pct"] < -10]
if not candidates:
return None, gap
selected = []
cash_freed = 0
locked_loss = 0
for h in candidates:
if cash_freed >= gap:
break
cash_freed += h["mv"]
locked_loss += abs(h["pl"])
selected.append(h)
if cash_freed < gap or len(selected) > 3:
return None, gap
# 估算目标预期盈利(到止盈)
if tp and tp > 0 and lot_cost_target > 0:
target_gain_pct = (tp - price_in) / price_in
expected_gain = lot_cost_target * max(target_gain_pct, 0.05)
else:
target_gain_pct = rr * 0.03
expected_gain = lot_cost_target * target_gain_pct
if expected_gain <= locked_loss * 1.5:
return None, gap
sell_desc = "".join(f"{h['name']}({h['code']}) {h['shares']}股 亏{h['pl_pct']}%"
for h in selected)
sell_total = cash_freed
new_budget = cash_in + sell_total
new_lots = int(new_budget / lot_cost_target) if lot_cost_target > 0 else 0
if new_lots == 0:
return None, gap
if code.startswith("688"):
new_shares = new_lots * 200
elif len(code) <= 5:
new_shares = new_lots * hk_lot_size(code)
else:
new_shares = new_lots * 100
new_cost = new_lots * lot_cost_target
new_pct = round(new_cost / total_assets_in * 100) if total_assets_in > 0 else 0
ratio_vs_loss = round(expected_gain / locked_loss, 1) if locked_loss else 0
text = (f"换仓建议:卖{sell_desc}"
f"→腾{round(sell_total):,}元(锁定亏损{locked_loss:,}"
f"→买{name}({code}) {new_lots}手({new_shares}股,{round(new_cost):,}元)"
f"{new_pct}%仓位"
f"(目标{tp} +{round(target_gain_pct*100,1)}%预期={round(expected_gain):,}"
f"≈锁定亏损的{ratio_vs_loss}倍,划算)")
return text, gap
# 标准格式:每个可操作标的 — 大盘/行业/个股三面 + 仓位
lines.append(f"【💡 操作建议】(当前{n}只自选可操作 | 总资产{total_assets:,.0f}元 现金{available_cash:,.0f}元)")
for s in actionable:
name, code, price, buy_low, buy_high, lot, ratio = s
d = code_data.get(code, {})
sl = d.get("stop_loss", 0)
tp = d.get("take_profit", 0)
rr = d.get("rr_ratio", 0)
sig = d.get("timing_signal", "")
sector = d.get("sector_context", "")
tech = d.get("tech_snapshot", "")
note = d.get("note", "")
d_factors = d.get("signal_factors", [])
cat = d.get("stock_category", "")
# 提取技术位
ss = {"强撑":"-", "弱撑":"-", "弱压":"-", "强压":"-"}
for tag in ss:
m = re.search(rf'{tag}:([\d.]+)', tech)
if m:
ss[tag] = m.group(1)
# 基本面
fund = fund_cache.get(code, {})
pe = fund.get("pe", 0)
eps = fund.get("eps", 0)
pe_str = f"PE{pe:.0f}" if pe else ""
eps_str = f"EPS{eps:.2f}" if eps else ""
# 从 signal_factors 提取各维度
def _match_factor(prefix):
for f in d_factors:
if f.startswith(prefix):
return f
return ""
market_factor = _match_factor("大盘")
sector_factor = _match_factor("行业")
value_factor = _match_factor("高估值") or _match_factor("低估值") or _match_factor("蓝筹") or pe_str or ""
news_factor = _match_factor("消息")
tech_factor = _match_factor("净利") or _match_factor("组合") or ""
# 构建分析行
parts = []
if market_factor:
parts.append(f"大盘{market_factor.replace('大盘','')}")
if sector_factor:
parts.append(f"行业{sector_factor.replace('行业','')}")
if pe_str or value_factor:
parts.append(value_factor or pe_str)
if news_factor:
parts.append(news_factor)
if not parts:
parts.append(sector or cat or "")
analysis = " | ".join(p for p in parts if p)
# 仓位计算
theo_pct, actual_pct, details, lots, lot_cost_total = calc_position(
lot, rr, market_factor, cat, code
)
pfx = "" if len(code) == 6 else "HK$"
# 取分支动作类型
branch_action = "hold"
branch_rationale = ""
if st and scenario_id:
try:
results = st.evaluate_branches(code, scenario_id, price, d.get("shares", 0), d.get("cost", 0))
applicable = [r for r in results if r.get("applicable")]
if applicable:
best = min(applicable, key=lambda r: r.get("priority", 999))
branch_action = best.get("action_type", "hold")
branch_rationale = best.get("rationale", "")
except Exception:
pass
# 冷却检查:相同股+相同操作30分钟内不发
cooled, elapsed, cd_key = in_cooldown(code, branch_action, cooldown)
if cooled:
continue
action_tag = "⚠️" if lots == 0 else "🛒"
# 换仓评估:现金不足时评估是否卖差票换推荐股
swap_text = None
if lots == 0:
swap_text, _ = evaluate_swap(
lot, rr, sig, tp, sl, name, code, price,
total_assets, available_cash, pf
)
lines.append(
f" {action_tag} {name}({code}) {pfx}{price:.2f} 买区{buy_low}~{buy_high} | "
f"1手{lot:,.0f}元 RR={rr:.1f}{sl}{tp}\n"
f" {analysis}\n"
f" 技术{ss['强撑']}{ss['弱撑']}{ss['弱压']}{ss['强压']} | 信号{sig}\n"
f" 仓位:理论{theo_pct}%×总资产 | 建议{actual_pct}%{details}"
)
if swap_text:
lines[-1] += f"\n {swap_text}"
# 分支描述
branch_line = ""
if branch_action != "hold":
branch_line = f"{scenario_label}{branch_action}{branch_rationale}"
if branch_line:
lines[-1] += f"\n{branch_line}"
# 记录推送时间(冷却计时用)
cooldown[cd_key] = now_ts
save_cooldown(cooldown)
# 修正可操作数量(剔除冷却跳过后的实际数量)
actual_n = len(lines) - (1 if macro_line else 0) - 1 # 减去市场背景 + 操作建议标题
if actual_n != n:
# 更新操作建议行
for i, ln in enumerate(lines):
if "【💡 操作建议】" in ln:
lines[i] = f"【💡 操作建议】(当前{actual_n}只自选可操作 | 总资产{total_assets:,.0f}元 现金{available_cash:,.0f}元)"
break
if actual_n <= 0:
return 0 # 全部冷却中 → 静默,不推
lines.insert(0, f"【知微】自选买入提醒 {now} | 总资产{total_assets:,.0f}")
out = "\n".join(lines)
print(out)
push_to_xmpp(out)
return 0
if __name__ == "__main__":
sys.exit(main())