Files
MoFin/price_monitor.py
T
hmo 6abc2e45b0 refactor: phase 0-2 MoFin architecture reform — single source of truth
Phase 0 (止血):
- mo_models.py: unified calc_total_assets(), is_hk_stock(), get_hk_rate() — single source of truth
- Fixed 3 files missing frozen_cash: holdings_reconciliation, server, import_holding_xls
- Fixed stale_push_wlin: unified is_hk_stock detection, removed hardcoded 0.866
- Fixed price_monitor: consolidated 2 duplicate total_assets blocks into mo_models calls
- Fixed stock_scorer: replaced broken len()<=5 is_hk_stock heuristic
- Fixed strategy_lifecycle: replaced non-existent currency_utils import with mo_models

Phase 1 (DSA adapter):
- mo_provider.py: wraps DSA DataFetcherManager (16 fetchers, auto-fallback)
  - TDX relay as primary, DSA as backup for realtime/kline/news/fundamentals

Phase 2 (Integration):
- mo_bridge.py: injects DSA market review + news context into MoFin analysis prompts
- Graceful degradation if DSA not installed

Infrastructure:
- mo_config.py: centralized Config singleton replacing scattered hardcoded paths
- All 11 changed files pass python compile check

Impact: total_assets now computed in ONE place (mo_models).
        is_hk_stock now ONE implementation (no more false negatives).
        HK rate now ONE source (hk_rate API → cache → 0.87 fallback).
        No more hardcoded 0.866/0.8664/0.8700 divergence.
2026-06-29 23:25:54 +08:00

593 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""price_monitor.py — 高频价格监控脚本(批量版)
规则:进入区间报一次,离开区间报一次,中间不重复。
每次运行时一次性刷新所有持仓+自选股的实时价。
"""
import json
import urllib.request
import os
import sys
import time
from datetime import datetime
# ── MoFin unified model ──────────────────────────────────────────────
from mo_models import is_hk_stock, get_hk_rate, calc_total_assets, calc_total_mv, calc_position_pct
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json"
BREACH_PATH = "/home/hmo/.hermes/zone_breach.json"
STATE_PATH = os.path.expanduser("~/.hermes/price_trigger_state.json")
EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json"
# 策略重评依赖(技术面驱动,非机械百分比)
sys.path.insert(0, "/home/hmo/web-dashboard")
try:
from strategy_lifecycle import reassess_strategy
HAS_REASSESS = True
except ImportError:
HAS_REASSESS = False
try:
HK_RATE = get_hk_rate()
except Exception:
HK_RATE = 0.87 # ultimate fallback
# 分支系统与情景检测
try:
sys.path.insert(0, '/home/hmo/MoFin')
from strategy_tree import detect_scenario, evaluate_branches
HAS_TREE = True
except Exception:
HAS_TREE = False
def detect_scenario(): return {}
def evaluate_branches(*a, **kw): return []
# 情景缓存(每次run_once刷新)
_SCENARIO_CACHE = {}
_BRANCH_CACHE = {} # code -> branches list
UA = "Mozilla/5.0"
# ── 批量拉取价格 ──────────────────────────────────────────────────────────
def fetch_all_prices(codes):
"""腾讯批量行情API:一次请求拉取所有股票(A股+港股)
A股:sh600110 / sz000001
港股:hk00700
返回 {code: (price, change, change_pct)}
"""
if not codes:
return {}
# 构建批量查询串
symbols = []
code_map = {} # symbol -> original_code
for code in codes:
code_s = str(code).strip()
if len(code_s) == 6:
# A股:沪市以5/6/9开头,深市以0/3开头
if code_s.startswith(('5', '6', '9')):
sym = f"sh{code_s}"
else:
sym = f"sz{code_s}"
else:
sym = f"hk{code_s}"
symbols.append(sym)
code_map[sym] = code_s
url = f"http://qt.gtimg.cn/q={','.join(symbols)}"
try:
req = urllib.request.Request(url, headers={"User-Agent": UA})
with urllib.request.urlopen(req, timeout=10) as r:
text = r.read().decode("gbk")
except Exception as e:
print(f"⚠️ 批量拉取失败: {e}", file=sys.stderr)
return {}
results = {}
for line in text.strip().split("\n"):
line = line.strip()
if not line or "=" not in line:
continue
try:
# 格式: v_sh600110="1~诺德股份~600110~11.84~11.90~..."
raw_value = line.split("=", 1)[1].strip().strip('"').strip(";")
fields = raw_value.split("~")
if len(fields) < 6:
continue
sym = line.split("=", 1)[0].strip().lstrip("v_")
orig_code = code_map.get(sym)
if not orig_code:
continue
price = float(fields[3]) if fields[3] else 0
prev_close = float(fields[4]) if fields[4] else 0
change = price - prev_close if prev_close > 0 else 0
change_pct = fields[32] if len(fields) > 32 and fields[32] else "0"
results[orig_code] = (price, change, change_pct)
except (ValueError, IndexError):
continue
return results
def refresh_data_prices():
"""一次性刷新portfolio.json和watchlist.json的所有实时价"""
all_codes = set()
# 收集所有需要拉取的代码
try:
pf = json.load(open(PORTFOLIO_PATH))
for s in pf.get('holdings', []):
all_codes.add(s['code'])
except:
pf = {"holdings": []}
try:
wl = json.load(open(WATCHLIST_PATH))
for s in wl.get('stocks', []):
all_codes.add(s['code'])
except:
wl = {"stocks": []}
if not all_codes:
return 0
# 一次性批量拉取
prices = fetch_all_prices(list(all_codes))
updated = 0
# 保存全量实时价快照(供报告管道消费,确保分析用最新数据)
try:
live = {"updated_at": datetime.now().isoformat(), "prices": {}}
for code in all_codes:
if code in prices:
p, c, chg = prices[code]
live["prices"][code] = {"price": p, "change_pct": chg}
json.dump(live, open("/home/hmo/web-dashboard/data/live_prices.json", "w"), indent=2)
except Exception:
pass
# 更新portfolio(只在价格变化时写入,避免触发文件变更通知)
changed = False
for s in pf.get('holdings', []):
if s['code'] in prices:
price, _, change_pct = prices[s['code']]
if price > 0:
# 港股:API返回HKD,需转RMB
if is_hk_stock(s['code']):
price = round(price * HK_RATE, 2)
old = s.get('price', 0)
if abs(old - price) > 0.001:
s['price'] = round(price, 2)
s['change_pct'] = float(change_pct) if change_pct else 0
updated += 1
changed = True
if changed:
pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M')
# 统一计算总资产(mo_models 唯一公式)
pf['total_mv'] = calc_total_mv(pf.get('holdings', []))
pf['total_assets'] = calc_total_assets(pf)
pf['position_pct'] = calc_position_pct(pf)
json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2)
elif pf.get('updated_at'):
# 即使价格无变化,每10分钟刷新一次updated_at,防健康检查误报
try:
last_ts = datetime.strptime(pf['updated_at'], '%Y-%m-%d %H:%M')
if (datetime.now() - last_ts).total_seconds() > 600:
pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M')
json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2)
except:
pass
# 更新watchlist(只在价格变化时写入)
changed = False
for s in wl.get('stocks', []):
if s['code'] in prices:
price, _, change_pct = prices[s['code']]
if price > 0:
# 港股:API返回HKD,需转RMB
if is_hk_stock(s['code']):
price = round(price * HK_RATE, 2)
old = s.get('price', 0)
if abs(old - price) > 0.001:
s['price'] = round(price, 2)
s['change_pct'] = float(change_pct) if change_pct else 0
updated += 1
changed = True
if changed:
wl['updated_at'] = datetime.now().isoformat()
json.dump(wl, open(WATCHLIST_PATH, 'w'), ensure_ascii=False, indent=2)
# --- 汇总值重算(使用 mo_models 唯一公式)---
try:
live_market_value = calc_total_mv(pf.get('holdings', []))
old_mv = pf.get('total_mv', 0)
if abs(old_mv - live_market_value) > 0.01:
pf['total_mv'] = round(live_market_value, 2)
pf['total_assets'] = calc_total_assets(pf)
if pf['total_assets'] > 0:
pf['position_pct'] = calc_position_pct(pf)
pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M')
json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2)
except Exception as e:
print(f" [汇总重算失败] {e}", flush=True)
# --- 结束汇总重算 ---
return updated
# ── 分支系统辅助函数 ──────────────────────────────────────────────────────
def _branch_alert_suffix(code, price, shares=0, cost=0):
"""返回分支信息后缀:「 | 情景→动作」"""
if not HAS_TREE or not _SCENARIO_CACHE.get('id'):
return ""
try:
sc_id = _SCENARIO_CACHE['id']
results = evaluate_branches(code, sc_id, price, shares, cost)
for r in results:
if r.get('applicable'):
_record_branch_trigger(code, r.get('branch_id',''), price)
branch_action = r.get('action_type', r.get('action', 'hold'))
return f" | {sc_id}{branch_action}"
except Exception:
pass
return ""
def _record_branch_trigger(code, branch_id, price):
"""记录分支触发事件(自成长:trigger_count+1"""
try:
raw = json.load(open(DECISIONS_PATH))
for d in raw.get('decisions', []):
if d.get('code') == code and d.get('strategy_tree',{}).get('branches'):
for b in d['strategy_tree']['branches']:
if b['id'] == branch_id:
b.setdefault('trigger_count', 0)
b['trigger_count'] += 1
b['last_trigger_price'] = round(price, 2)
b['last_triggered'] = datetime.now().isoformat()
break
json.dump(raw, open(DECISIONS_PATH, 'w'), ensure_ascii=False, indent=2)
except Exception:
pass
# ── 区间偏离检测 ──────────────────────────────────────────────────────────
def load_state():
try:
with open(STATE_PATH) as f:
return json.load(f)
except:
return {}
def save_state(state):
os.makedirs(os.path.dirname(STATE_PATH), exist_ok=True)
with open(STATE_PATH, 'w') as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def load_breaches():
try:
with open(BREACH_PATH) as f:
return json.load(f)
except:
return {}
def save_breaches(data):
os.makedirs(os.path.dirname(BREACH_PATH), exist_ok=True)
with open(BREACH_PATH, 'w') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_events():
try:
with open(EVENTS_PATH) as f:
return json.load(f)
except:
return {"events": []}
def save_events(events):
os.makedirs(os.path.dirname(EVENTS_PATH), exist_ok=True)
with open(EVENTS_PATH, 'w') as f:
json.dump(events, f, ensure_ascii=False, indent=2)
def record_event(code, name, event_type, price, trigger_value, event_label=""):
"""记录一次价格触发事件到 price_events.json + SQLite"""
events = load_events()
now = datetime.now().isoformat()
events["events"].append({
"code": code,
"name": name,
"event_type": event_type, # entry_zone, stop_loss, take_profit, exit_zone
"price": round(price, 2),
"trigger_value": trigger_value,
"event_label": event_label,
"timestamp": now,
"date": datetime.now().strftime("%Y-%m-%d"),
})
# 保留最近10000条
events["events"] = events["events"][-10000:]
save_events(events)
# ── SQLite 双写 ──
try:
from mofin_db import get_conn, init_all_tables, write_price_event
conn = get_conn()
init_all_tables(conn)
write_price_event(conn, code, name, event_type, price, trigger_value, event_label)
conn.close()
except Exception:
pass # SQLite 写入失败不影响主流程
def get_trigger_zones(d):
"""返回该decision所有可监控的区间列表,从顶层字段读取"""
zones = []
is_holding = d.get('shares', 0) > 0
# 买入区间(自选和持仓都监控)
el = d.get("entry_low", 0)
eh = d.get("entry_high", 0)
if el and eh and float(el) > 0 and float(eh) > 0:
try:
zones.append(("entry_zone", "买入区间", float(el), float(eh)))
except:
pass
# 止损+止盈(只有持仓才监控,自选无意义)
if is_holding:
sl = d.get("stop_loss", 0)
if sl and float(sl) > 0:
try:
zones.append(("stop_loss", "止损", 0, float(sl)))
except:
pass
tp = d.get("take_profit", 0)
if tp and float(tp) > 0:
try:
zones.append(("take_profit_zone", "止盈区间", 0, float(tp)))
except:
pass
return zones
def run_once(round_label=""):
"""执行一轮完整的监控流程"""
global _SCENARIO_CACHE, _BRANCH_CACHE
label = f" [{round_label}]" if round_label else ""
start = time.time()
# 刷新情景与分支缓存(每轮更新)
_SCENARIO_CACHE = detect_scenario() if HAS_TREE else {}
_BRANCH_CACHE = {}
try:
raw = json.load(open(DECISIONS_PATH))
for d in raw.get('decisions', []):
tree = d.get('strategy_tree', {})
if tree and tree.get('branches'):
_BRANCH_CACHE[d['code']] = tree['branches']
except Exception:
pass
# === 第一步:一次性刷新所有价格 ===
refreshed = refresh_data_prices()
# === 第二步:检查触发条件 ===
try:
with open(DECISIONS_PATH) as f:
dec = json.load(f)
except:
print(f"{label} 无法读取decisions.json", file=sys.stderr)
return
active = [d for d in dec.get("decisions", []) if d.get("status") in ("active", "updated")]
state = load_state()
outputs = []
state_updated = False
# 收集所有需要检查的代码
check_codes = set()
for d in active:
if get_trigger_zones(d):
check_codes.add(d["code"])
# 批量拉取这些股票的价格
prices = fetch_all_prices(list(check_codes))
for d in active:
code = d["code"]
zones = get_trigger_zones(d)
if not zones:
continue
price_info = prices.get(code)
if not price_info:
continue
price, _, _ = price_info
if price == 0:
continue
name = d.get("name", code)
if code not in state:
state[code] = {}
for key, label, lo, hi in zones:
in_zone = lo <= price <= hi
prev_in_zone = state[code].get(key, None)
if in_zone and prev_in_zone != True:
if key == "stop_loss":
branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0))
outputs.append(f"⚠️ {name}({code}) {price} → 跌破止损{hi}{branch_sfx}")
record_event(code, name, "stop_loss", price, str(hi))
else:
extra = ""
if "_price" in key:
batch_shares = d.get(key.replace("_price", "_shares"), "")
action = d.get(key.replace("_price", "_action"), "")
if batch_shares:
extra = f" {action}{batch_shares}" if action else f" {batch_shares}"
elif key in ("take_profit_zone",):
act = d.get("take_profit_action", "")
if act:
extra = f"{act}"
branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0))
outputs.append(f"{name}({code}) {price} → 进入{label}{lo}~{hi}{extra}{branch_sfx}")
record_event(code, name, "entry_zone", price, f"{lo}~{hi}", label)
state[code][key] = True
state_updated = True
elif not in_zone and prev_in_zone == True:
if key != "stop_loss":
outputs.append(f"📌 {name}({code}) {price} → 离开{label}{lo}~{hi}")
state[code][key] = False
state_updated = True
# === 第三步:买入区偏离检测 + 自动重评 ===
reassesed_codes = []
for d in active:
code = d["code"]
name = d.get("name", code)
price_info = prices.get(code)
if not price_info:
continue
price, _, _ = price_info
if price == 0:
continue
# 从 decisions.json 中读取 analysis 的买入区
entry_low = d.get("entry_low", 0)
entry_high = d.get("entry_high", 0)
if not entry_low or not entry_high:
continue
in_buy_zone = entry_low <= price <= entry_high
prev_in_buy_zone = state.get(code, {}).get("__buy_zone", None)
# 状态变化时才触发
if in_buy_zone and prev_in_buy_zone == False:
# 重新进入买入区 → 重评确认区间是否仍然有效
outputs.append(f"🔄 {name}({code}) {price} → 重新进入买入区{entry_low}~{entry_high},触发技术面重评")
do_reassess = True
elif not in_buy_zone and prev_in_buy_zone == True:
# 离开买入区 → 立即重评,更新止损/止盈/区间
outputs.append(f"🔄 {name}({code}) {price} → 离开买入区{entry_low}~{entry_high},立即技术面重评")
do_reassess = True
else:
do_reassess = False
if do_reassess and HAS_REASSESS:
try:
cost = d.get("cost", 0) or 0
shares = d.get("shares", 0) or 0
profit_pct = (price - cost) / cost * 100 if cost else 0
is_deep_loss = profit_pct < -20
sentiment = "neutral"
if d.get("tech_snapshot"):
if "bearish" in d["tech_snapshot"]:
sentiment = "bearish"
elif "bullish" in d["tech_snapshot"]:
sentiment = "bullish"
# 调用技术面驱动重评(非机械百分比)
result = reassess_strategy(
code, name, price, cost, shares,
current_action=d.get("action", ""),
volume_signal="中性", sentiment=sentiment,
)
outputs.append(f" 📊 新策略: 损{result['stop_loss']}{result['take_profit']}{result['entry_low']}~{result['entry_high']} RR={result['rr_ratio']}")
reassesed_codes.append(code)
except Exception as e:
outputs.append(f" ⚠️ 重评失败: {e}")
# 更新买入区状态
if "__buy_zone" not in state.get(code, {}):
if code not in state:
state[code] = {}
state[code]["__buy_zone"] = in_buy_zone
state_updated = True
# 如果有重评过的股票,更新 decisions.json
if reassesed_codes and HAS_REASSESS:
try:
# 重新 regenerate_all 只针对受影响的股票效率太低
# 直接全量重评(regenerate_all 内部会批量拉价格、做技术分析)
from strategy_lifecycle import regenerate_all
r = regenerate_all(stdout=False)
outputs.append(f" ✅ 策略已全量重评: {r.get('ok',0)}/{r.get('total',0)}成功")
outputs.append(f" 📌 触发股票: {', '.join(reassesed_codes)}")
except Exception as e:
outputs.append(f" ⚠️ 全量重评失败: {e}")
# === 3.5 资金流异常检测(2026-06-27 新增)===
try:
cf = json.load(open("/home/hmo/web-dashboard/data/capital_flow_cache.json"))
# 检查所有 active decision 中的资金流异常
for d in active:
code = d["code"]
stock_cf = cf.get("stocks", {}).get(code, {})
analysis = stock_cf.get("analysis", {})
alerts = analysis.get("alerts", [])
if alerts:
name = d.get("name", code)
for a in alerts:
outputs.append(f" 💰 {name}({code}) {a}")
except Exception:
pass
# === 第四步:情景变化检测 + 输出 → 直接推XMPP ===
now_str = datetime.now().strftime("%H:%M:%S")
elapsed = time.time() - start
# 情景变化检测(跨轮对比)
if HAS_TREE and _SCENARIO_CACHE.get('id'):
prev_scenario = state.get('_system', {}).get('last_scenario', '')
curr_scenario = _SCENARIO_CACHE['id']
if prev_scenario and curr_scenario != prev_scenario:
combo = _SCENARIO_CACHE.get('combo_action', '')
outputs.insert(0, f"🌀 情景切换: {prev_scenario}{curr_scenario} | {combo}")
if outputs:
state.setdefault('_system', {})['last_scenario'] = curr_scenario
state_updated = True
elif not prev_scenario:
state.setdefault('_system', {})['last_scenario'] = curr_scenario
state_updated = True
if outputs:
# 简短一行一个触发
for o in outputs:
print(o)
# 推送XMPP(只推关键事件:止损跌破+情景切换+资金流异动,不推买入区进出/重评等操作细节)
critical = [o for o in outputs if o.startswith(("⚠️", "🌀", "💰"))]
if critical:
try:
body = "\n".join([f"{now_str}"] + critical)
payload = json.dumps({
"to": "hmo@yoin.fun", "body": body, "type": "chat",
}).encode("utf-8")
req = urllib.request.Request(
"http://127.0.0.1:5805/", data=payload,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
# else: SILENT — 无触发,无输出,不推
if state_updated:
save_state(state)
def main():
"""每cron触发跑一轮"""
run_once()
if __name__ == "__main__":
main()