Files
MoFin/scripts/stale_push_wlin.py
T
知微 e0b7f49c3a stale_push_wlin: 操作符号标记
- ⚠️ 预算不足1手(列了也没法买,但让你知道)
- 🛒 手数达标(现金够买,可直接操作)

取自 position_advice calc_position 的实际计算结果:
  lots=0 → 预算不足 ⚠️
  lots>0 → 可以买 🛒
2026-06-24 10:47:34 +08:00

467 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
stale_push_wlin.py — 按5步逻辑推送自选股买入区提醒 + 自动触发重评
5步逻辑:
1. 筛选 is_watchlist=true 且价在买入区
2. RR<1.5/无止盈位/非买入signal → 标记 STRATEGY_STALE → 触发自动重评
3. 可推的:计算每手买入金额和现金占比
4. 发现 STRATEGY_STALE → 后台跑 per_stock_reassess.py 自动重评
no_agent模式:有推送→输出;无→静默
搭配 cron: no_agent=True, 交易日每30分跑一次
"""
import subprocess
import sys
import re
import json
import os
import threading
from datetime import datetime
try:
from urllib.request import Request, urlopen
except ImportError:
from urllib2 import Request, urlopen
XMPP_BRIDGE = "http://127.0.0.1:5805/"
XMPP_USER = "hmo@yoin.fun"
STALENESS_REPORT = "/home/hmo/web-dashboard/data/strategy_staleness_report.json"
DETECTOR = "/home/hmo/.hermes/profiles/position-analyst/scripts/stale_detector.py"
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
REGEN_SCRIPT = "/home/hmo/.hermes/profiles/position-analyst/scripts/per_stock_reassess.py"
REGEN_LOCK = "/tmp/.stale_push_wlin_regen.lock"
MACRO_CTX = "/home/hmo/web-dashboard/data/macro_context.json"
MARKET_JSON = "/home/hmo/web-dashboard/data/market.json"
NON_BUY_SIGNALS = ["观望", "弱势持有", "深套持有"]
def load_macro_line():
"""加载大盘和市场的简要描述"""
parts = []
try:
with open(MACRO_CTX) as f:
m = json.load(f).get("structure", {})
overall = m.get("overall", "neutral")
desc = m.get("description", "")
if "bearish" in overall:
parts.append("大盘偏弱")
elif overall == "bullish":
parts.append("大盘偏强")
elif desc:
parts.append(f"大盘{desc}")
except Exception:
pass
try:
with open(MARKET_JSON) as f:
mk = json.load(f)
mood = mk.get("mood", "")
if mood:
parts.append(f"市场{mood}")
except Exception:
pass
return " | ".join(parts) if parts else ""
def is_actionable(cur, timing_signal=""):
"""检查信号是否可操作。空文本/含非买入关键词 → 不可操作"""
if not cur and not timing_signal:
return False # 空文本默认不安全
for kw in NON_BUY_SIGNALS:
if cur and kw.lower() in cur.lower():
return False
if timing_signal and kw.lower() in timing_signal.lower():
return False
return True
def trigger_regen_sync(stock_codes=None):
"""同步执行指定个股的重评(等重评完再发报告)"""
if not stock_codes:
return
try:
cmd = ["python3", REGEN_SCRIPT] + stock_codes
subprocess.run(cmd, capture_output=True, text=True, timeout=60)
except subprocess.TimeoutExpired:
print("[REGEN] 重评超时(60s", file=sys.stderr)
except Exception as e:
print(f"[REGEN] 重评失败: {e}", file=sys.stderr)
def load_cash():
"""从 portfolio.json 实时读现金,不硬编码"""
try:
with open(PORTFOLIO_PATH) as f:
data = json.load(f)
if isinstance(data, dict):
return data.get("cash", 0)
if isinstance(data, list) and len(data) > 1 and isinstance(data[1], dict):
return data[1].get("cash", 0)
return 0
except Exception:
return 0
_HK_LOT_CACHE = {}
def hk_lot_size(code):
"""从腾讯行情API获取港股实际每手股数(字段[60]),带缓存"""
if code in _HK_LOT_CACHE:
return _HK_LOT_CACHE[code]
try:
url = f"http://qt.gtimg.cn/q=hk{code}"
req = Request(url, headers={"User-Agent": "curl/7.81"})
with urlopen(req, timeout=5) as r:
text = r.read().decode("gbk")
raw = text.split("=", 1)[1].strip().strip('"').strip(";")
fld = raw.split("~")
lot = int(fld[60]) if len(fld) > 60 and fld[60] else 1000
_HK_LOT_CACHE[code] = lot
return lot
except Exception:
_HK_LOT_CACHE[code] = 1000
return 1000
def lot_cost(code, price):
if str(code).startswith("688"):
return 200 * price
elif len(str(code)) == 5:
lot = hk_lot_size(code)
try:
sys.path.insert(0, '/home/hmo/MoFin')
from hk_rate import hkd_to_cny
rate = hkd_to_cny()
except Exception:
rate = 0.87
return int(lot * price * rate)
else:
return 100 * price
def push_to_xmpp(text):
"""通过知微 HTTP bridge 推送到老爸私信"""
if not text.strip():
return
try:
payload = json.dumps({
"to": XMPP_USER,
"body": text.strip(),
"type": "chat",
}).encode("utf-8")
req = Request(XMPP_BRIDGE, data=payload, headers={"Content-Type": "application/json"})
urlopen(req, timeout=5)
except Exception as e:
print(f"[XMPP推送失败] {e}", file=sys.stderr)
def main():
r = subprocess.run(
["python3", DETECTOR], capture_output=True, text=True, timeout=60
)
if r.returncode != 0 and r.stderr:
print(f"[stderr] {r.stderr.strip()}", file=sys.stderr)
wl_lines = [
l for l in r.stdout.split("\n")
if "[WL_IN]" in l and "[自选]" in l
]
if not wl_lines:
return 0
# 读 stale report
try:
with open(STALENESS_REPORT) as f:
report = json.load(f)
except Exception:
report = {"flagged": []}
code_cur = {i["code"]: i.get("current", "") for i in report.get("flagged", [])}
# 读 decisions.json 获取完整策略数据
code_data = {}
try:
with open("/home/hmo/web-dashboard/data/decisions.json") as f:
dec = json.load(f)
for e in dec.get("decisions", []):
code_data[e["code"]] = e
except Exception:
pass
cash = load_cash()
stocks = []
stale_list = []
for l in wl_lines:
m = re.match(r'\[WL_IN\](?:\s+\[\w+\])*\s+\[自选\]\s+(\S+)\((\d+)\)', l)
if not m:
continue
name, code = m.group(1), m.group(2)
pm = re.search(r'价(\d+\.\d{2})', l)
if not pm:
continue
price = float(pm.group(1))
zm = re.search(r'买入([\d.]+)~([\d.]+)', l)
if not zm:
continue
buy_low, buy_high = float(zm.group(1)), float(zm.group(2))
is_stale = "[STRATEGY_STALE]" in l
cur = code_cur.get(code, "")
if not is_actionable(cur, code_data.get(code, {}).get("timing_signal", "")) or is_stale:
stale_list.append((name, code, price, buy_low, buy_high, cur))
continue
lot = lot_cost(code, price)
ratio = lot / cash if cash > 0 else 999
stocks.append((name, code, price, buy_low, buy_high, lot, ratio))
if not stocks and not stale_list:
return 0
now = datetime.now().strftime("%H:%M")
lines = []
# 市场背景
macro_line = load_macro_line()
if macro_line:
lines.append(f"【市场背景】{macro_line}")
# [重评] 内部流程 — 不在报告中展示,只执行重评
if stale_list:
stale_codes = [s[1] for s in stale_list]
trigger_regen_sync(stale_codes)
# 重评完成,re-read decisions.json
code_data = {}
try:
with open("/home/hmo/web-dashboard/data/decisions.json") as f:
dec = json.load(f)
for e in dec.get("decisions", []):
code_data[e["code"]] = e
except Exception:
pass
stocks.sort(key=lambda s: (
0 if len(str(s[1])) == 6 else 1,
-code_data.get(s[1], {}).get("rr_ratio", 0)
))
# 只展示有清晰操作信号的个股:不含"等企稳""关注""信号不充分""neutral"及纯持有信号
SKIP_KEYWORDS = ["等企稳", "关注", "信号不充分"]
BUY_KEYWORDS = ["买入", "加仓"]
actionable = []
for s in stocks:
sig = code_data.get(s[1], {}).get("timing_signal", "")
if not sig:
continue
# 跳过非操作信号
if any(kw in sig for kw in SKIP_KEYWORDS):
continue
# 中性信号跳过
stripped = sig.strip().lower()
if not stripped or stripped in ("", "neutral", "持有", "深套持有", "弱势持有"):
continue
actionable.append(s)
if not actionable:
return 0 # 无操作信号 → 静默,不推
# 加载基本面缓存(PE等)
fund_cache = {}
try:
with open("/home/hmo/web-dashboard/data/multi_tf_cache.json") as f:
mtf = json.load(f)
for code, v in mtf.items():
fund_cache[code] = v.get("fundamentals", {})
except Exception:
pass
# 仓位计算:读取总资产和现金
n = len(actionable)
total_assets = 0
available_cash = 0
try:
with open("/home/hmo/web-dashboard/data/strategy_staleness_report.json") as f:
sr = json.load(f)
port = sr.get("portfolio", {})
available_cash = port.get("cash", 0) or 0
except Exception:
pass
try:
with open("/home/hmo/web-dashboard/data/portfolio.json") as f:
pf = json.load(f)
for h in pf.get("holdings", []):
mv = h.get("shares", 0) * h.get("price", 0)
total_assets += mv
total_assets += available_cash
except Exception:
total_assets = available_cash * 5 # fallback
# 加载策略树模块(获取当前情景+分支评估)
st = None
scenario_id = ""
scenario_label = ""
try:
import importlib.util
spec = importlib.util.spec_from_file_location("st_module", "/home/hmo/MoFin/strategy_tree.py")
st = importlib.util.module_from_spec(spec)
spec.loader.exec_module(st)
sc = st.detect_scenario()
scenario_id = sc.get("id", "")
scenario_label = sc.get("label", "")
except Exception:
pass
def calc_position(lot_cost, rr, market_factor, cat, code=""):
# 理论推荐仓位(% of 总资产) — 仅基于RR+市场+品种,不受现金限制
if rr >= 5:
theo_pct = 25
elif rr >= 3:
theo_pct = 18
elif rr >= 2:
theo_pct = 12
else:
theo_pct = 8
if "偏弱" in market_factor:
theo_pct = int(theo_pct * 0.8)
elif "偏强" in market_factor:
theo_pct = int(theo_pct * 1.15)
if cat in ("蓝筹", "白马"):
theo_pct = int(theo_pct * 1.2)
elif cat in ("题材", "短线"):
theo_pct = int(theo_pct * 0.6)
elif cat in ("高波动", "成长"):
theo_pct = int(theo_pct * 0.85)
theo_pct = max(5, min(30, theo_pct))
# 当前建议仓位:理论占总资产% → 按现金锁死
ideal_budget = total_assets * theo_pct / 100
# 可操作N只时,现金分配不超过 available_cash / n * 1.5
max_use_cash = (available_cash / max(n, 1)) * 1.5
budget = min(ideal_budget, max_use_cash, available_cash)
lots = int(budget / lot_cost) if lot_cost > 0 else 0
if lots == 0 and lot_cost > 0 and budget > lot_cost * 0.5:
# 预算超过半手 → 至少1手
lots = 1
lot_cost_total = lots * lot_cost
if lots == 0:
pct_actual = 0
elif total_assets > 0:
pct_actual = round(lot_cost_total / total_assets * 100)
else:
pct_actual = 0
if lots == 0:
details = f"预算不足1手({budget:,.0f}/{lot_cost:,.0f}元)"
else:
shares = lots * (200 if code.startswith("688") else 100)
details = f"{lots}手({shares}股,{lot_cost_total:,.0f}元)"
return theo_pct, pct_actual, details, lots, lot_cost_total
# 标准格式:每个可操作标的 — 大盘/行业/个股三面 + 仓位
lines.append(f"【💡 操作建议】(当前{n}只自选可操作 | 总资产{total_assets:,.0f}元 现金{available_cash:,.0f}元)")
for s in actionable:
name, code, price, buy_low, buy_high, lot, ratio = s
d = code_data.get(code, {})
sl = d.get("stop_loss", 0)
tp = d.get("take_profit", 0)
rr = d.get("rr_ratio", 0)
sig = d.get("timing_signal", "")
sector = d.get("sector_context", "")
tech = d.get("tech_snapshot", "")
note = d.get("note", "")
d_factors = d.get("signal_factors", [])
cat = d.get("stock_category", "")
# 提取技术位
ss = {"强撑":"-", "弱撑":"-", "弱压":"-", "强压":"-"}
for tag in ss:
m = re.search(rf'{tag}:([\d.]+)', tech)
if m:
ss[tag] = m.group(1)
# 基本面
fund = fund_cache.get(code, {})
pe = fund.get("pe", 0)
eps = fund.get("eps", 0)
pe_str = f"PE{pe:.0f}" if pe else ""
eps_str = f"EPS{eps:.2f}" if eps else ""
# 从 signal_factors 提取各维度
def _match_factor(prefix):
for f in d_factors:
if f.startswith(prefix):
return f
return ""
market_factor = _match_factor("大盘")
sector_factor = _match_factor("行业")
value_factor = _match_factor("高估值") or _match_factor("低估值") or _match_factor("蓝筹") or pe_str or ""
news_factor = _match_factor("消息")
tech_factor = _match_factor("净利") or _match_factor("组合") or ""
# 构建分析行
parts = []
if market_factor:
parts.append(f"大盘{market_factor.replace('大盘','')}")
if sector_factor:
parts.append(f"行业{sector_factor.replace('行业','')}")
if pe_str or value_factor:
parts.append(value_factor or pe_str)
if news_factor:
parts.append(news_factor)
if not parts:
parts.append(sector or cat or "")
analysis = " | ".join(p for p in parts if p)
# 仓位计算
theo_pct, actual_pct, details, lots, lot_cost_total = calc_position(
lot, rr, market_factor, cat, code
)
pfx = "" if len(code) == 6 else "HK$"
action_tag = "⚠️" if lots == 0 else "🛒"
lines.append(
f" {action_tag} {name}({code}) {pfx}{price:.2f} 买区{buy_low}~{buy_high} | "
f"1手{lot:,.0f}元 RR={rr:.1f}{sl}{tp}\n"
f" {analysis}\n"
f" 技术{ss['强撑']}{ss['弱撑']}{ss['弱压']}{ss['强压']} | 信号{sig}\n"
f" 仓位:理论{theo_pct}%×总资产 | 建议{actual_pct}%{details}"
)
# 读分支评估
branch_line = ""
if st and scenario_id:
try:
results = st.evaluate_branches(code, scenario_id, price, d.get("shares", 0), d.get("cost", 0))
applicable = [r for r in results if r.get("applicable")]
if applicable:
# 取优先级最高的适用分支
best = min(applicable, key=lambda r: r.get("priority", 999))
action = best.get("action_type", "hold")
rationale = best.get("rationale", "")
branch_line = f"{scenario_label}{action}{rationale}"
else:
branch_line = f"{scenario_label}→持有】无匹配分支"
except Exception:
branch_line = ""
if branch_line:
# 追加到上一个元素
lines[-1] += f"\n{branch_line}"
lines.insert(0, f"【知微】自选买入提醒 {now} | 总资产{total_assets:,.0f}")
out = "\n".join(lines)
print(out)
push_to_xmpp(out)
return 0
if __name__ == "__main__":
sys.exit(main())