Files
MoFin/scripts/stale_push_wlin.py
T
知微 f6ee15489c stale_push_wlin: 重评段删除,只推有清晰操作信号的个股
改动:
- 移除「策略需重评」报告段 — 内部流程,Dad不需要看到
- 移除pick/watch拆分的旧逻辑 — 统一为actionable过滤
- 跳过信号含等企稳关注信号不充分neutral持有等无用描述的个股
- 无操作信号 → 静默不推
- 有操作信号 → 标准格式(含行业context+技术位+止损止盈+RR+1手成本)

Dad要求:要看到的是可以直接操作的建议,不是内部流程记录
2026-06-24 09:46:52 +08:00

304 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
stale_push_wlin.py — 按5步逻辑推送自选股买入区提醒 + 自动触发重评
5步逻辑:
1. 筛选 is_watchlist=true 且价在买入区
2. RR<1.5/无止盈位/非买入signal → 标记 STRATEGY_STALE → 触发自动重评
3. 可推的:计算每手买入金额和现金占比
4. 发现 STRATEGY_STALE → 后台跑 per_stock_reassess.py 自动重评
no_agent模式:有推送→输出;无→静默
搭配 cron: no_agent=True, 交易日每30分跑一次
"""
import subprocess
import sys
import re
import json
import os
import threading
from datetime import datetime
try:
from urllib.request import Request, urlopen
except ImportError:
from urllib2 import Request, urlopen
XMPP_BRIDGE = "http://127.0.0.1:5805/"
XMPP_USER = "hmo@yoin.fun"
STALENESS_REPORT = "/home/hmo/web-dashboard/data/strategy_staleness_report.json"
DETECTOR = "/home/hmo/.hermes/profiles/position-analyst/scripts/stale_detector.py"
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
REGEN_SCRIPT = "/home/hmo/.hermes/profiles/position-analyst/scripts/per_stock_reassess.py"
REGEN_LOCK = "/tmp/.stale_push_wlin_regen.lock"
MACRO_CTX = "/home/hmo/web-dashboard/data/macro_context.json"
MARKET_JSON = "/home/hmo/web-dashboard/data/market.json"
NON_BUY_SIGNALS = ["观望", "弱势持有", "深套持有"]
def load_macro_line():
"""加载大盘和市场的简要描述"""
parts = []
try:
with open(MACRO_CTX) as f:
m = json.load(f).get("structure", {})
overall = m.get("overall", "neutral")
desc = m.get("description", "")
if "bearish" in overall:
parts.append("大盘偏弱")
elif overall == "bullish":
parts.append("大盘偏强")
elif desc:
parts.append(f"大盘{desc}")
except Exception:
pass
try:
with open(MARKET_JSON) as f:
mk = json.load(f)
mood = mk.get("mood", "")
if mood:
parts.append(f"市场{mood}")
except Exception:
pass
return " | ".join(parts) if parts else ""
def is_actionable(cur, timing_signal=""):
"""检查信号是否可操作。空文本/含非买入关键词 → 不可操作"""
if not cur and not timing_signal:
return False # 空文本默认不安全
for kw in NON_BUY_SIGNALS:
if cur and kw.lower() in cur.lower():
return False
if timing_signal and kw.lower() in timing_signal.lower():
return False
return True
def trigger_regen_sync(stock_codes=None):
"""同步执行指定个股的重评(等重评完再发报告)"""
if not stock_codes:
return
try:
cmd = ["python3", REGEN_SCRIPT] + stock_codes
subprocess.run(cmd, capture_output=True, text=True, timeout=60)
except subprocess.TimeoutExpired:
print("[REGEN] 重评超时(60s", file=sys.stderr)
except Exception as e:
print(f"[REGEN] 重评失败: {e}", file=sys.stderr)
def load_cash():
"""从 portfolio.json 实时读现金,不硬编码"""
try:
with open(PORTFOLIO_PATH) as f:
data = json.load(f)
if isinstance(data, dict):
return data.get("cash", 0)
if isinstance(data, list) and len(data) > 1 and isinstance(data[1], dict):
return data[1].get("cash", 0)
return 0
except Exception:
return 0
_HK_LOT_CACHE = {}
def hk_lot_size(code):
"""从腾讯行情API获取港股实际每手股数(字段[60]),带缓存"""
if code in _HK_LOT_CACHE:
return _HK_LOT_CACHE[code]
try:
url = f"http://qt.gtimg.cn/q=hk{code}"
req = Request(url, headers={"User-Agent": "curl/7.81"})
with urlopen(req, timeout=5) as r:
text = r.read().decode("gbk")
raw = text.split("=", 1)[1].strip().strip('"').strip(";")
fld = raw.split("~")
lot = int(fld[60]) if len(fld) > 60 and fld[60] else 1000
_HK_LOT_CACHE[code] = lot
return lot
except Exception:
_HK_LOT_CACHE[code] = 1000
return 1000
def lot_cost(code, price):
if str(code).startswith("688"):
return 200 * price
elif len(str(code)) == 5:
lot = hk_lot_size(code)
try:
sys.path.insert(0, '/home/hmo/MoFin')
from hk_rate import hkd_to_cny
rate = hkd_to_cny()
except Exception:
rate = 0.87
return int(lot * price * rate)
else:
return 100 * price
def push_to_xmpp(text):
"""通过知微 HTTP bridge 推送到老爸私信"""
if not text.strip():
return
try:
payload = json.dumps({
"to": XMPP_USER,
"body": text.strip(),
"type": "chat",
}).encode("utf-8")
req = Request(XMPP_BRIDGE, data=payload, headers={"Content-Type": "application/json"})
urlopen(req, timeout=5)
except Exception as e:
print(f"[XMPP推送失败] {e}", file=sys.stderr)
def main():
r = subprocess.run(
["python3", DETECTOR], capture_output=True, text=True, timeout=60
)
if r.returncode != 0 and r.stderr:
print(f"[stderr] {r.stderr.strip()}", file=sys.stderr)
wl_lines = [
l for l in r.stdout.split("\n")
if "[WL_IN]" in l and "[自选]" in l
]
if not wl_lines:
return 0
# 读 stale report
try:
with open(STALENESS_REPORT) as f:
report = json.load(f)
except Exception:
report = {"flagged": []}
code_cur = {i["code"]: i.get("current", "") for i in report.get("flagged", [])}
# 读 decisions.json 获取完整策略数据
code_data = {}
try:
with open("/home/hmo/web-dashboard/data/decisions.json") as f:
dec = json.load(f)
for e in dec.get("decisions", []):
code_data[e["code"]] = e
except Exception:
pass
cash = load_cash()
stocks = []
stale_list = []
for l in wl_lines:
m = re.match(r'\[WL_IN\](?:\s+\[\w+\])*\s+\[自选\]\s+(\S+)\((\d+)\)', l)
if not m:
continue
name, code = m.group(1), m.group(2)
pm = re.search(r'价(\d+\.\d{2})', l)
if not pm:
continue
price = float(pm.group(1))
zm = re.search(r'买入([\d.]+)~([\d.]+)', l)
if not zm:
continue
buy_low, buy_high = float(zm.group(1)), float(zm.group(2))
is_stale = "[STRATEGY_STALE]" in l
cur = code_cur.get(code, "")
if not is_actionable(cur, code_data.get(code, {}).get("timing_signal", "")) or is_stale:
stale_list.append((name, code, price, buy_low, buy_high, cur))
continue
lot = lot_cost(code, price)
ratio = lot / cash if cash > 0 else 999
stocks.append((name, code, price, buy_low, buy_high, lot, ratio))
if not stocks and not stale_list:
return 0
now = datetime.now().strftime("%H:%M")
lines = []
# 市场背景
macro_line = load_macro_line()
if macro_line:
lines.append(f"【市场背景】{macro_line}")
# [重评] 内部流程 — 不在报告中展示,只执行重评
if stale_list:
stale_codes = [s[1] for s in stale_list]
trigger_regen_sync(stale_codes)
# 重评完成,re-read decisions.json
code_data = {}
try:
with open("/home/hmo/web-dashboard/data/decisions.json") as f:
dec = json.load(f)
for e in dec.get("decisions", []):
code_data[e["code"]] = e
except Exception:
pass
stocks.sort(key=lambda s: (
0 if len(str(s[1])) == 6 else 1,
-code_data.get(s[1], {}).get("rr_ratio", 0)
))
# 只展示有清晰操作信号的个股:不含"等企稳""关注""信号不充分""neutral"及纯持有信号
SKIP_KEYWORDS = ["等企稳", "关注", "信号不充分"]
BUY_KEYWORDS = ["买入", "加仓"]
actionable = []
for s in stocks:
sig = code_data.get(s[1], {}).get("timing_signal", "")
if not sig:
continue
# 跳过非操作信号
if any(kw in sig for kw in SKIP_KEYWORDS):
continue
# 中性信号跳过
stripped = sig.strip().lower()
if not stripped or stripped in ("", "neutral", "持有", "深套持有", "弱势持有"):
continue
actionable.append(s)
if not actionable:
return 0 # 无操作信号 → 静默,不推
# 标准格式:每个可操作标的
lines.append(f"【💡 操作建议】(当前{len(actionable)}只自选可操作)")
for s in actionable:
name, code, price, buy_low, buy_high, lot, ratio = s
d = code_data.get(code, {})
sl = d.get("stop_loss", 0)
tp = d.get("take_profit", 0)
rr = d.get("rr_ratio", 0)
sig = d.get("timing_signal", "")
sector = d.get("sector_context", "")
tech = d.get("tech_snapshot", "")
# 提取技术位
ss = {"强撑":"-", "弱撑":"-", "弱压":"-", "强压":"-"}
for tag in ss:
m = re.search(rf'{tag}:([\d.]+)', tech)
if m:
ss[tag] = m.group(1)
pfx = "" if len(code) == 6 else "HK$"
lines.append(
f" {name}({code}) {pfx}{price:.2f} 买区{buy_low}~{buy_high} | "
f"1手{lot:,.0f}元 RR={rr:.1f}{sl}{tp}\n"
f" {sector} | {ss['强撑']}{ss['弱撑']}{ss['弱压']}{ss['强压']} | {sig}"
)
lines.insert(0, f"【知微】自选买入提醒 {now} | 现金{cash:,.0f}")
out = "\n".join(lines)
print(out)
push_to_xmpp(out)
return 1
if __name__ == "__main__":
sys.exit(main())