#!/usr/bin/env python3 """price_monitor.py — 高频价格监控脚本(批量版) 规则:进入区间报一次,离开区间报一次,中间不重复。 每次运行时一次性刷新所有持仓+自选股的实时价。 """ import json import urllib.request import os import sys import time from datetime import datetime DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json" BREACH_PATH = "/home/hmo/.hermes/zone_breach.json" STATE_PATH = os.path.expanduser("~/.hermes/price_trigger_state.json") EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json" # 策略重评依赖(技术面驱动,非机械百分比) sys.path.insert(0, "/home/hmo/web-dashboard") try: from strategy_lifecycle import reassess_strategy HAS_REASSESS = True except ImportError: HAS_REASSESS = False UA = "Mozilla/5.0" # ── 批量拉取价格 ────────────────────────────────────────────────────────── def fetch_all_prices(codes): """腾讯批量行情API:一次请求拉取所有股票(A股+港股) A股:sh600110 / sz000001 港股:hk00700 返回 {code: (price, change, change_pct)} """ if not codes: return {} # 构建批量查询串 symbols = [] code_map = {} # symbol -> original_code for code in codes: code_s = str(code).strip() if len(code_s) == 6: # A股:沪市以5/6/9开头,深市以0/3开头 if code_s.startswith(('5', '6', '9')): sym = f"sh{code_s}" else: sym = f"sz{code_s}" else: sym = f"hk{code_s}" symbols.append(sym) code_map[sym] = code_s url = f"http://qt.gtimg.cn/q={','.join(symbols)}" try: req = urllib.request.Request(url, headers={"User-Agent": UA}) with urllib.request.urlopen(req, timeout=10) as r: text = r.read().decode("gbk") except Exception as e: print(f"⚠️ 批量拉取失败: {e}", file=sys.stderr) return {} results = {} for line in text.strip().split("\n"): line = line.strip() if not line or "=" not in line: continue try: # 格式: v_sh600110="1~诺德股份~600110~11.84~11.90~..." raw_value = line.split("=", 1)[1].strip().strip('"').strip(";") fields = raw_value.split("~") if len(fields) < 6: continue sym = line.split("=", 1)[0].strip().lstrip("v_") orig_code = code_map.get(sym) if not orig_code: continue price = float(fields[3]) if fields[3] else 0 prev_close = float(fields[4]) if fields[4] else 0 change = price - prev_close if prev_close > 0 else 0 change_pct = fields[32] if len(fields) > 32 and fields[32] else "0" results[orig_code] = (price, change, change_pct) except (ValueError, IndexError): continue return results def refresh_data_prices(): """一次性刷新portfolio.json和watchlist.json的所有实时价""" all_codes = set() # 收集所有需要拉取的代码 try: pf = json.load(open(PORTFOLIO_PATH)) for s in pf.get('holdings', []): all_codes.add(s['code']) except: pf = {"holdings": []} try: wl = json.load(open(WATCHLIST_PATH)) for s in wl.get('stocks', []): all_codes.add(s['code']) except: wl = {"stocks": []} if not all_codes: return 0 # 一次性批量拉取 prices = fetch_all_prices(list(all_codes)) updated = 0 # 更新portfolio(只在价格变化时写入,避免触发文件变更通知) changed = False for s in pf.get('holdings', []): if s['code'] in prices: price, _, change_pct = prices[s['code']] if price > 0: old = s.get('price', 0) if abs(old - price) > 0.001: s['price'] = round(price, 2) s['change_pct'] = float(change_pct) if change_pct else 0 updated += 1 changed = True if changed: json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2) # 更新watchlist(只在价格变化时写入) changed = False for s in wl.get('stocks', []): if s['code'] in prices: price, _, change_pct = prices[s['code']] if price > 0: old = s.get('price', 0) if abs(old - price) > 0.001: s['price'] = round(price, 2) s['change_pct'] = float(change_pct) if change_pct else 0 updated += 1 changed = True if changed: wl['updated_at'] = datetime.now().isoformat() json.dump(wl, open(WATCHLIST_PATH, 'w'), ensure_ascii=False, indent=2) return updated # ── 区间偏离检测 ────────────────────────────────────────────────────────── def load_state(): try: with open(STATE_PATH) as f: return json.load(f) except: return {} def save_state(state): os.makedirs(os.path.dirname(STATE_PATH), exist_ok=True) with open(STATE_PATH, 'w') as f: json.dump(state, f, ensure_ascii=False, indent=2) def load_breaches(): try: with open(BREACH_PATH) as f: return json.load(f) except: return {} def save_breaches(data): os.makedirs(os.path.dirname(BREACH_PATH), exist_ok=True) with open(BREACH_PATH, 'w') as f: json.dump(data, f, ensure_ascii=False, indent=2) def load_events(): try: with open(EVENTS_PATH) as f: return json.load(f) except: return {"events": []} def save_events(events): os.makedirs(os.path.dirname(EVENTS_PATH), exist_ok=True) with open(EVENTS_PATH, 'w') as f: json.dump(events, f, ensure_ascii=False, indent=2) def record_event(code, name, event_type, price, trigger_value, event_label=""): """记录一次价格触发事件到 price_events.json + SQLite""" events = load_events() now = datetime.now().isoformat() events["events"].append({ "code": code, "name": name, "event_type": event_type, # entry_zone, stop_loss, take_profit, exit_zone "price": round(price, 2), "trigger_value": trigger_value, "event_label": event_label, "timestamp": now, "date": datetime.now().strftime("%Y-%m-%d"), }) # 保留最近10000条 events["events"] = events["events"][-10000:] save_events(events) # ── SQLite 双写 ── try: from mofin_db import get_conn, init_all_tables, write_price_event conn = get_conn() init_all_tables(conn) write_price_event(conn, code, name, event_type, price, trigger_value, event_label) conn.close() except Exception: pass # SQLite 写入失败不影响主流程 def get_trigger_zones(trigger): """返回该trigger所有可监控的区间列表,跳过已执行的batch""" zones = [] for key, label in [ ("entry_zone", "加仓区间"), ("batch1_price", "试仓区间"), ("batch2_price", "加仓区间"), ("take_profit_zone", "止盈区间"), ("watch_low", "关注区间"), ("watch_high", "减仓区间"), ("watch_break", "止损区间") ]: status_key = key.replace("_price", "_status") if status_key in trigger and trigger[status_key] == "executed": continue val = trigger.get(key, "") if val and "~" in val: try: parts = val.split("~") lo, hi = float(parts[0]), float(parts[1]) zones.append((key, label, lo, hi)) except: pass sl = trigger.get("stop_loss", "") if sl: try: sl_price = float(sl) if isinstance(sl, (int, float)) else float(sl) zones.append(("stop_loss", "止损", 0, sl_price)) except: pass return zones def run_once(round_label=""): """执行一轮完整的监控流程""" label = f" [{round_label}]" if round_label else "" start = time.time() # === 第一步:一次性刷新所有价格 === refreshed = refresh_data_prices() # === 第二步:检查触发条件 === try: with open(DECISIONS_PATH) as f: dec = json.load(f) except: print(f"❌{label} 无法读取decisions.json", file=sys.stderr) return active = [d for d in dec.get("decisions", []) if d.get("status") == "active"] state = load_state() outputs = [] state_updated = False # 收集所有需要检查的代码 check_codes = set() for d in active: trig = d.get("trigger", {}) if trig: check_codes.add(d["code"]) # 批量拉取这些股票的价格 prices = fetch_all_prices(list(check_codes)) for d in active: code = d["code"] trig = d.get("trigger", {}) if not trig: continue zones = get_trigger_zones(trig) if not zones: continue price_info = prices.get(code) if not price_info: continue price, _ = price_info if price == 0: continue name = d.get("name", code) if code not in state: state[code] = {} for key, label, lo, hi in zones: in_zone = lo <= price <= hi prev_in_zone = state[code].get(key, None) if in_zone and prev_in_zone != True: if key == "stop_loss": outputs.append(f"⚠️ {name}({code}) {price} → 跌破止损{hi}!") record_event(code, name, "stop_loss", price, str(hi)) else: extra = "" if "_price" in key: batch_shares = trig.get(key.replace("_price", "_shares"), "") action = trig.get(key.replace("_price", "_action"), "") if batch_shares: extra = f" {action}{batch_shares}股" if action else f" {batch_shares}股" elif key in ("take_profit_zone",): act = trig.get("take_profit_action", "") if act: extra = f"({act})" outputs.append(f"⚡ {name}({code}) {price} → 进入{label}{lo}~{hi}{extra}") record_event(code, name, "entry_zone", price, f"{lo}~{hi}", label) state[code][key] = True state_updated = True elif not in_zone and prev_in_zone == True: if key != "stop_loss": outputs.append(f"📌 {name}({code}) {price} → 离开{label}{lo}~{hi}") state[code][key] = False state_updated = True # === 第三步:买入区偏离检测 + 自动重评 === reassesed_codes = [] for d in active: code = d["code"] name = d.get("name", code) price_info = prices.get(code) if not price_info: continue price, _ = price_info if price == 0: continue # 从 decisions.json 中读取 analysis 的买入区 entry_low = d.get("entry_low", 0) entry_high = d.get("entry_high", 0) if not entry_low or not entry_high: continue in_buy_zone = entry_low <= price <= entry_high prev_in_buy_zone = state.get(code, {}).get("__buy_zone", None) # 状态变化时才触发 if in_buy_zone and prev_in_buy_zone == False: # 重新进入买入区 → 重评确认区间是否仍然有效 outputs.append(f"🔄 {name}({code}) {price} → 重新进入买入区{entry_low}~{entry_high},触发技术面重评") do_reassess = True elif not in_buy_zone and prev_in_buy_zone == True: # 离开买入区 → 立即重评,更新止损/止盈/区间 outputs.append(f"🔄 {name}({code}) {price} → 离开买入区{entry_low}~{entry_high},立即技术面重评") do_reassess = True else: do_reassess = False if do_reassess and HAS_REASSESS: try: cost = d.get("cost", 0) or 0 shares = d.get("shares", 0) or 0 profit_pct = (price - cost) / cost * 100 if cost else 0 is_deep_loss = profit_pct < -20 sentiment = "neutral" if d.get("tech_snapshot"): if "bearish" in d["tech_snapshot"]: sentiment = "bearish" elif "bullish" in d["tech_snapshot"]: sentiment = "bullish" # 调用技术面驱动重评(非机械百分比) result = reassess_strategy( code, name, price, cost, shares, current_action=d.get("action", ""), volume_signal="中性", sentiment=sentiment, ) outputs.append(f" 📊 新策略: 损{result['stop_loss']} 盈{result['take_profit']} 区{result['entry_low']}~{result['entry_high']} RR={result['rr_ratio']}") reassesed_codes.append(code) except Exception as e: outputs.append(f" ⚠️ 重评失败: {e}") # 更新买入区状态 if "__buy_zone" not in state.get(code, {}): if code not in state: state[code] = {} state[code]["__buy_zone"] = in_buy_zone state_updated = True # 如果有重评过的股票,更新 decisions.json if reassesed_codes and HAS_REASSESS: try: # 重新 regenerate_all 只针对受影响的股票效率太低 # 直接全量重评(regenerate_all 内部会批量拉价格、做技术分析) from strategy_lifecycle import regenerate_all r = regenerate_all(stdout=False) outputs.append(f" ✅ 策略已全量重评: {r.get('ok',0)}/{r.get('total',0)}成功") outputs.append(f" 📌 触发股票: {', '.join(reassesed_codes)}") except Exception as e: outputs.append(f" ⚠️ 全量重评失败: {e}") # === 第四步:输出 === now_str = datetime.now().strftime("%H:%M:%S") elapsed = time.time() - start if outputs: print(f"\n🔔 {now_str}{label}") for o in outputs: print(o) print(f"\n{json.dumps({'type':'价格监控','time':now_str,'triggers':outputs}, ensure_ascii=False)}") else: # 无触发时 SILENT(中继不推送) print(f"[SILENT]{label} 价格正常 | {refreshed}只已刷新 | {elapsed:.1f}s") if state_updated: save_state(state) # 输出耗时 print(f"⏱{label} {elapsed:.1f}s", flush=True) def main(): """每cron触发跑一轮""" run_once() if __name__ == "__main__": main()