#!/usr/bin/env python3 """monitor_300308.py — 紧盯中际旭创(300308)站稳1330条件 每2分钟检查一次,条件满足时发XMPP信号,然后停用自身。 站稳1330条件(三条件同时满足): 1. 现价 >= 1332(1330留2元余量) 2. 连续2次检查都 >= 1332(排除毛刺) 3. 13:00之后(下午开盘后) 发一次信号后就停,不再重复。 """ import json, os, subprocess, sys, urllib.request from datetime import datetime CODE = "300308" NAME = "中际旭创" PRICE_THRESHOLD = 1332.0 BUY_QTY = 100 # 1手 STOP_LOSS = 1293.88 TAKE_PROFIT = 1456.53 STATE_FILE = os.path.expanduser("~/.hermes/monitor_300308_state.json") HERMES_SEND = ["hermes", "send", "--to", "xmpp:hmo@yoin.fun"] def load_state(): try: with open(STATE_FILE) as f: return json.load(f) except: return {"alerted": False, "consecutive_ok": 0, "last_price": 0} def save_state(s): os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True) with open(STATE_FILE, "w") as f: json.dump(s, f, indent=2) def fetch_price(): """从腾讯API获取实时价""" url = f"http://qt.gtimg.cn/q=sz{CODE}" req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) try: resp = urllib.request.urlopen(req, timeout=10) text = resp.read().decode("gbk") fields = text.split("~") price = float(fields[3]) if fields[3] else 0 change_pct = fields[32] if len(fields) > 32 else "0" return price, change_pct except Exception as e: return 0, "0" def send_alert(price, change_pct): msg = ( f"【信号】{NAME}({CODE}) 站稳1330,可买入!\n" f"现价{price} ({change_pct}%)\n" f"信号:下午三条件满足(现价>=1332+连续确认+午后时段)\n" f"操作:买入1手(100股)约{price*100:.0f}元\n" f"止损{STOP_LOSS}({(price-STOP_LOSS)/price*100:.1f}%) 止盈{TAKE_PROFIT}({(TAKE_PROFIT-price)/price*100:.1f}%)\n" f"RR={(TAKE_PROFIT-price)/(price-STOP_LOSS):.1f}\n" f"现金150,625元,1手占{price*100/150625*100:.1f}%" ) try: subprocess.run(HERMES_SEND + [msg], capture_output=True, timeout=15) return True except: return False def main(): now = datetime.now() state = load_state() # 已发过信号 → 静默 if state.get("alerted"): print("[SILENT] 已发过买入信号") return # 非交易时段 → 静默(但允许14:30前) if now.weekday() >= 5 or now.hour < 13 or now.hour >= 14: print(f"[SILENT] 非监控时段(13:00-14:00)") return # 取价 price, change_pct = fetch_price() if price == 0: print(f"[SILENT] 取价失败") return # 条件1:现价 >= 1332 if price >= PRICE_THRESHOLD: state["consecutive_ok"] = state.get("consecutive_ok", 0) + 1 else: state["consecutive_ok"] = 0 print(f"[SILENT] 现价{price}低于阈值{PRICE_THRESHOLD}") save_state(state) return # 条件2:连续2次检查都满足(约4分钟确认) if state["consecutive_ok"] < 2: print(f"[SILENT] 连续确认中 {state['consecutive_ok']}/2") save_state(state) return # 条件3:13:00之后(已隐含在上面的时段检查中) # 全部满足 → 发信号 success = send_alert(price, change_pct) if success: state["alerted"] = True state["alerted_at"] = now.isoformat() state["alert_price"] = price save_state(state) print(f"✅ 信号已推送: {NAME} {price} 可买入") else: print(f"⚠️ 信号生成失败,下次重试") if __name__ == "__main__": main()