#!/usr/bin/env python3 """price_monitor.py — 高频价格监控脚本(批量版) 规则:进入区间报一次,离开区间报一次,中间不重复。 每次运行时一次性刷新所有持仓+自选股的实时价。 """ import json import urllib.request import os import sys import time from datetime import datetime DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json" PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json" BREACH_PATH = "/home/hmo/.hermes/zone_breach.json" STATE_PATH = os.path.expanduser("~/.hermes/price_trigger_state.json") EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json" # 策略重评依赖(技术面驱动,非机械百分比) sys.path.insert(0, "/home/hmo/web-dashboard") try: from strategy_lifecycle import reassess_strategy HAS_REASSESS = True except ImportError: HAS_REASSESS = False try: from hk_rate import hkd_to_cny HK_RATE = hkd_to_cny() except Exception: HK_RATE = 0.8700 # fallback # 分支系统与情景检测 try: sys.path.insert(0, '/home/hmo/MoFin') from strategy_tree import detect_scenario, evaluate_branches HAS_TREE = True except Exception: HAS_TREE = False def detect_scenario(): return {} def evaluate_branches(*a, **kw): return [] # 情景缓存(每次run_once刷新) _SCENARIO_CACHE = {} _BRANCH_CACHE = {} # code -> branches list UA = "Mozilla/5.0" # ── 批量拉取价格 ────────────────────────────────────────────────────────── def fetch_all_prices(codes): """腾讯批量行情API:一次请求拉取所有股票(A股+港股) A股:sh600110 / sz000001 港股:hk00700 返回 {code: (price, change, change_pct)} """ if not codes: return {} # 构建批量查询串 symbols = [] code_map = {} # symbol -> original_code for code in codes: code_s = str(code).strip() if len(code_s) == 6: # A股:沪市以5/6/9开头,深市以0/3开头 if code_s.startswith(('5', '6', '9')): sym = f"sh{code_s}" else: sym = f"sz{code_s}" else: sym = f"hk{code_s}" symbols.append(sym) code_map[sym] = code_s url = f"http://qt.gtimg.cn/q={','.join(symbols)}" try: req = urllib.request.Request(url, headers={"User-Agent": UA}) with urllib.request.urlopen(req, timeout=10) as r: text = r.read().decode("gbk") except Exception as e: print(f"⚠️ 批量拉取失败: {e}", file=sys.stderr) return {} results = {} for line in text.strip().split("\n"): line = line.strip() if not line or "=" not in line: continue try: # 格式: v_sh600110="1~诺德股份~600110~11.84~11.90~..." raw_value = line.split("=", 1)[1].strip().strip('"').strip(";") fields = raw_value.split("~") if len(fields) < 6: continue sym = line.split("=", 1)[0].strip().lstrip("v_") orig_code = code_map.get(sym) if not orig_code: continue price = float(fields[3]) if fields[3] else 0 prev_close = float(fields[4]) if fields[4] else 0 change = price - prev_close if prev_close > 0 else 0 change_pct = fields[32] if len(fields) > 32 and fields[32] else "0" results[orig_code] = (price, change, change_pct) except (ValueError, IndexError): continue return results def refresh_data_prices(): """一次性刷新portfolio.json和watchlist.json的所有实时价""" all_codes = set() # 收集所有需要拉取的代码 try: pf = json.load(open(PORTFOLIO_PATH)) for s in pf.get('holdings', []): all_codes.add(s['code']) except: pf = {"holdings": []} try: wl = json.load(open(WATCHLIST_PATH)) for s in wl.get('stocks', []): all_codes.add(s['code']) except: wl = {"stocks": []} if not all_codes: return 0 # 一次性批量拉取 prices = fetch_all_prices(list(all_codes)) updated = 0 # 保存全量实时价快照(供报告管道消费,确保分析用最新数据) try: live = {"updated_at": datetime.now().isoformat(), "prices": {}} for code in all_codes: if code in prices: p, c, chg = prices[code] live["prices"][code] = {"price": p, "change_pct": chg} json.dump(live, open("/home/hmo/web-dashboard/data/live_prices.json", "w"), indent=2) except Exception: pass # 更新portfolio(只在价格变化时写入,避免触发文件变更通知) changed = False for s in pf.get('holdings', []): if s['code'] in prices: price, _, change_pct = prices[s['code']] if price > 0: # 港股:API返回HKD,需转RMB(2026-06-23 bugfix) if str(s['code']).startswith(('0','1')) and len(str(s['code']))==5: price = round(price * HK_RATE, 2) old = s.get('price', 0) if abs(old - price) > 0.001: s['price'] = round(price, 2) s['change_pct'] = float(change_pct) if change_pct else 0 updated += 1 changed = True if changed: pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M') json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2) elif pf.get('updated_at'): # 即使价格无变化,每10分钟刷新一次updated_at,防健康检查误报 try: last_ts = datetime.strptime(pf['updated_at'], '%Y-%m-%d %H:%M') if (datetime.now() - last_ts).total_seconds() > 600: pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M') json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2) except: pass # 更新watchlist(只在价格变化时写入) changed = False for s in wl.get('stocks', []): if s['code'] in prices: price, _, change_pct = prices[s['code']] if price > 0: # 港股:API返回HKD,需转RMB(2026-06-23 bugfix) if str(s['code']).startswith(('0','1')) and len(str(s['code']))==5: price = round(price * HK_RATE, 2) old = s.get('price', 0) if abs(old - price) > 0.001: s['price'] = round(price, 2) s['change_pct'] = float(change_pct) if change_pct else 0 updated += 1 changed = True if changed: wl['updated_at'] = datetime.now().isoformat() json.dump(wl, open(WATCHLIST_PATH, 'w'), ensure_ascii=False, indent=2) return updated # ── 分支系统辅助函数 ────────────────────────────────────────────────────── def _branch_alert_suffix(code, price, shares=0, cost=0): """返回分支信息后缀:「 | 情景→动作」""" if not HAS_TREE or not _SCENARIO_CACHE.get('id'): return "" try: sc_id = _SCENARIO_CACHE['id'] results = evaluate_branches(code, sc_id, price, shares, cost) for r in results: if r.get('applicable'): _record_branch_trigger(code, r.get('branch_id',''), price) branch_action = r.get('action_type', r.get('action', 'hold')) return f" | {sc_id}→{branch_action}" except Exception: pass return "" def _record_branch_trigger(code, branch_id, price): """记录分支触发事件(自成长:trigger_count+1)""" try: raw = json.load(open(DECISIONS_PATH)) for d in raw.get('decisions', []): if d.get('code') == code and d.get('strategy_tree',{}).get('branches'): for b in d['strategy_tree']['branches']: if b['id'] == branch_id: b.setdefault('trigger_count', 0) b['trigger_count'] += 1 b['last_trigger_price'] = round(price, 2) b['last_triggered'] = datetime.now().isoformat() break json.dump(raw, open(DECISIONS_PATH, 'w'), ensure_ascii=False, indent=2) except Exception: pass # ── 区间偏离检测 ────────────────────────────────────────────────────────── def load_state(): try: with open(STATE_PATH) as f: return json.load(f) except: return {} def save_state(state): os.makedirs(os.path.dirname(STATE_PATH), exist_ok=True) with open(STATE_PATH, 'w') as f: json.dump(state, f, ensure_ascii=False, indent=2) def load_breaches(): try: with open(BREACH_PATH) as f: return json.load(f) except: return {} def save_breaches(data): os.makedirs(os.path.dirname(BREACH_PATH), exist_ok=True) with open(BREACH_PATH, 'w') as f: json.dump(data, f, ensure_ascii=False, indent=2) def load_events(): try: with open(EVENTS_PATH) as f: return json.load(f) except: return {"events": []} def save_events(events): os.makedirs(os.path.dirname(EVENTS_PATH), exist_ok=True) with open(EVENTS_PATH, 'w') as f: json.dump(events, f, ensure_ascii=False, indent=2) def record_event(code, name, event_type, price, trigger_value, event_label=""): """记录一次价格触发事件到 price_events.json + SQLite""" events = load_events() now = datetime.now().isoformat() events["events"].append({ "code": code, "name": name, "event_type": event_type, # entry_zone, stop_loss, take_profit, exit_zone "price": round(price, 2), "trigger_value": trigger_value, "event_label": event_label, "timestamp": now, "date": datetime.now().strftime("%Y-%m-%d"), }) # 保留最近10000条 events["events"] = events["events"][-10000:] save_events(events) # ── SQLite 双写 ── try: from mofin_db import get_conn, init_all_tables, write_price_event conn = get_conn() init_all_tables(conn) write_price_event(conn, code, name, event_type, price, trigger_value, event_label) conn.close() except Exception: pass # SQLite 写入失败不影响主流程 def get_trigger_zones(d): """返回该decision所有可监控的区间列表,从顶层字段读取""" zones = [] is_holding = d.get('shares', 0) > 0 # 买入区间(自选和持仓都监控) el = d.get("entry_low", 0) eh = d.get("entry_high", 0) if el and eh and float(el) > 0 and float(eh) > 0: try: zones.append(("entry_zone", "买入区间", float(el), float(eh))) except: pass # 止损+止盈(只有持仓才监控,自选无意义) if is_holding: sl = d.get("stop_loss", 0) if sl and float(sl) > 0: try: zones.append(("stop_loss", "止损", 0, float(sl))) except: pass tp = d.get("take_profit", 0) if tp and float(tp) > 0: try: zones.append(("take_profit_zone", "止盈区间", 0, float(tp))) except: pass return zones def run_once(round_label=""): """执行一轮完整的监控流程""" global _SCENARIO_CACHE, _BRANCH_CACHE label = f" [{round_label}]" if round_label else "" start = time.time() # 刷新情景与分支缓存(每轮更新) _SCENARIO_CACHE = detect_scenario() if HAS_TREE else {} _BRANCH_CACHE = {} try: raw = json.load(open(DECISIONS_PATH)) for d in raw.get('decisions', []): tree = d.get('strategy_tree', {}) if tree and tree.get('branches'): _BRANCH_CACHE[d['code']] = tree['branches'] except Exception: pass # === 第一步:一次性刷新所有价格 === refreshed = refresh_data_prices() # === 第二步:检查触发条件 === try: with open(DECISIONS_PATH) as f: dec = json.load(f) except: print(f"❌{label} 无法读取decisions.json", file=sys.stderr) return active = [d for d in dec.get("decisions", []) if d.get("status") in ("active", "updated")] state = load_state() outputs = [] state_updated = False # 收集所有需要检查的代码 check_codes = set() for d in active: if get_trigger_zones(d): check_codes.add(d["code"]) # 批量拉取这些股票的价格 prices = fetch_all_prices(list(check_codes)) for d in active: code = d["code"] zones = get_trigger_zones(d) if not zones: continue price_info = prices.get(code) if not price_info: continue price, _, _ = price_info if price == 0: continue name = d.get("name", code) if code not in state: state[code] = {} for key, label, lo, hi in zones: in_zone = lo <= price <= hi prev_in_zone = state[code].get(key, None) if in_zone and prev_in_zone != True: if key == "stop_loss": branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0)) outputs.append(f"⚠️ {name}({code}) {price} → 跌破止损{hi}!{branch_sfx}") record_event(code, name, "stop_loss", price, str(hi)) else: extra = "" if "_price" in key: batch_shares = d.get(key.replace("_price", "_shares"), "") action = d.get(key.replace("_price", "_action"), "") if batch_shares: extra = f" {action}{batch_shares}股" if action else f" {batch_shares}股" elif key in ("take_profit_zone",): act = d.get("take_profit_action", "") if act: extra = f"({act})" branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0)) outputs.append(f"⚡ {name}({code}) {price} → 进入{label}{lo}~{hi}{extra}{branch_sfx}") record_event(code, name, "entry_zone", price, f"{lo}~{hi}", label) state[code][key] = True state_updated = True elif not in_zone and prev_in_zone == True: if key != "stop_loss": outputs.append(f"📌 {name}({code}) {price} → 离开{label}{lo}~{hi}") state[code][key] = False state_updated = True # === 第三步:买入区偏离检测 + 自动重评 === reassesed_codes = [] for d in active: code = d["code"] name = d.get("name", code) price_info = prices.get(code) if not price_info: continue price, _, _ = price_info if price == 0: continue # 从 decisions.json 中读取 analysis 的买入区 entry_low = d.get("entry_low", 0) entry_high = d.get("entry_high", 0) if not entry_low or not entry_high: continue in_buy_zone = entry_low <= price <= entry_high prev_in_buy_zone = state.get(code, {}).get("__buy_zone", None) # 状态变化时才触发 if in_buy_zone and prev_in_buy_zone == False: # 重新进入买入区 → 重评确认区间是否仍然有效 outputs.append(f"🔄 {name}({code}) {price} → 重新进入买入区{entry_low}~{entry_high},触发技术面重评") do_reassess = True elif not in_buy_zone and prev_in_buy_zone == True: # 离开买入区 → 立即重评,更新止损/止盈/区间 outputs.append(f"🔄 {name}({code}) {price} → 离开买入区{entry_low}~{entry_high},立即技术面重评") do_reassess = True else: do_reassess = False if do_reassess and HAS_REASSESS: try: cost = d.get("cost", 0) or 0 shares = d.get("shares", 0) or 0 profit_pct = (price - cost) / cost * 100 if cost else 0 is_deep_loss = profit_pct < -20 sentiment = "neutral" if d.get("tech_snapshot"): if "bearish" in d["tech_snapshot"]: sentiment = "bearish" elif "bullish" in d["tech_snapshot"]: sentiment = "bullish" # 调用技术面驱动重评(非机械百分比) result = reassess_strategy( code, name, price, cost, shares, current_action=d.get("action", ""), volume_signal="中性", sentiment=sentiment, ) outputs.append(f" 📊 新策略: 损{result['stop_loss']} 盈{result['take_profit']} 区{result['entry_low']}~{result['entry_high']} RR={result['rr_ratio']}") reassesed_codes.append(code) except Exception as e: outputs.append(f" ⚠️ 重评失败: {e}") # 更新买入区状态 if "__buy_zone" not in state.get(code, {}): if code not in state: state[code] = {} state[code]["__buy_zone"] = in_buy_zone state_updated = True # 如果有重评过的股票,更新 decisions.json if reassesed_codes and HAS_REASSESS: try: # 重新 regenerate_all 只针对受影响的股票效率太低 # 直接全量重评(regenerate_all 内部会批量拉价格、做技术分析) from strategy_lifecycle import regenerate_all r = regenerate_all(stdout=False) outputs.append(f" ✅ 策略已全量重评: {r.get('ok',0)}/{r.get('total',0)}成功") outputs.append(f" 📌 触发股票: {', '.join(reassesed_codes)}") except Exception as e: outputs.append(f" ⚠️ 全量重评失败: {e}") # === 3.5 资金流异常检测(2026-06-27 新增)=== try: cf = json.load(open("/home/hmo/web-dashboard/data/capital_flow_cache.json")) # 检查所有 active decision 中的资金流异常 for d in active: code = d["code"] stock_cf = cf.get("stocks", {}).get(code, {}) analysis = stock_cf.get("analysis", {}) alerts = analysis.get("alerts", []) if alerts: name = d.get("name", code) for a in alerts: outputs.append(f" 💰 {name}({code}) {a}") except Exception: pass # === 第四步:情景变化检测 + 输出 → 直接推XMPP === now_str = datetime.now().strftime("%H:%M:%S") elapsed = time.time() - start # 情景变化检测(跨轮对比) if HAS_TREE and _SCENARIO_CACHE.get('id'): prev_scenario = state.get('_system', {}).get('last_scenario', '') curr_scenario = _SCENARIO_CACHE['id'] if prev_scenario and curr_scenario != prev_scenario: combo = _SCENARIO_CACHE.get('combo_action', '') outputs.insert(0, f"🌀 情景切换: {prev_scenario}→{curr_scenario} | {combo}") if outputs: state.setdefault('_system', {})['last_scenario'] = curr_scenario state_updated = True elif not prev_scenario: state.setdefault('_system', {})['last_scenario'] = curr_scenario state_updated = True if outputs: # 简短一行一个触发 for o in outputs: print(o) # 推送XMPP(只推关键事件:止损跌破+情景切换+资金流异动,不推买入区进出/重评等操作细节) critical = [o for o in outputs if o.startswith(("⚠️", "🌀", "💰"))] if critical: try: body = "\n".join([f"{now_str}"] + critical) payload = json.dumps({ "to": "hmo@yoin.fun", "body": body, "type": "chat", }).encode("utf-8") req = urllib.request.Request( "http://127.0.0.1:5805/", data=payload, headers={"Content-Type": "application/json"}, ) urllib.request.urlopen(req, timeout=5) except Exception: pass # else: SILENT — 无触发,无输出,不推 if state_updated: save_state(state) def main(): """每cron触发跑一轮""" run_once() if __name__ == "__main__": main()