Files
MoFin/price_monitor.py
T

715 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""price_monitor.py — 高频价格监控脚本(批量版)
规则:进入区间报一次,离开区间报一次,中间不重复。
每次运行时一次性刷新所有持仓+自选股的实时价。
"""
import json
import urllib.request
import os
import sys
import time
from datetime import datetime
# ── MoFin unified model ──────────────────────────────────────────────
from mo_models import is_hk_stock, get_hk_rate, calc_total_assets, calc_total_mv, calc_position_pct
from mofin_db import get_conn, write_holdings_batch, write_portfolio_summary, write_price_event, write_watchlist_stock
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json"
BREACH_PATH = "/home/hmo/.hermes/zone_breach.json"
STATE_PATH = os.path.expanduser("~/.hermes/price_trigger_state.json")
EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json"
# 策略重评依赖(技术面驱动,非机械百分比)
sys.path.insert(0, "/home/hmo/web-dashboard")
try:
from strategy_lifecycle import reassess_strategy
HAS_REASSESS = True
except ImportError:
HAS_REASSESS = False
try:
HK_RATE = get_hk_rate()
except Exception:
HK_RATE = 0.87 # ultimate fallback
# 分支系统与情景检测
try:
sys.path.insert(0, '/home/hmo/MoFin')
from strategy_tree import detect_scenario, evaluate_branches
HAS_TREE = True
except Exception:
HAS_TREE = False
def detect_scenario(): return {}
def evaluate_branches(*a, **kw): return []
# 情景缓存(每次run_once刷新)
_SCENARIO_CACHE = {}
_BRANCH_CACHE = {} # code -> branches list
UA = "Mozilla/5.0"
# ── 批量拉取价格 ──────────────────────────────────────────────────────────
def fetch_all_prices(codes):
"""腾讯批量行情API:仅用于A股(沪市/深市)
A股:sh600110 / sz000001
港股已迁移至 fetch_hk_eastmoney()(东方财富实时行情)
返回 {code: (price, change, change_pct)}
"""
if not codes:
return {}
# 只处理A股(6位代码),港股走东方财富
a_codes = [c for c in codes if len(str(c).strip()) == 6]
if not a_codes:
return {}
symbols = []
code_map = {}
for code in a_codes:
code_s = str(code).strip()
if code_s.startswith(('5', '6', '9')):
sym = f"sh{code_s}"
else:
sym = f"sz{code_s}"
symbols.append(sym)
code_map[sym] = code_s
url = f"http://qt.gtimg.cn/q={','.join(symbols)}"
try:
req = urllib.request.Request(url, headers={"User-Agent": UA})
with urllib.request.urlopen(req, timeout=10) as r:
text = r.read().decode("gbk")
except Exception as e:
print(f"⚠️ 腾讯A股拉取失败: {e}", file=sys.stderr)
return {}
results = {}
for line in text.strip().split("\n"):
line = line.strip()
if not line or "=" not in line:
continue
try:
raw_value = line.split("=", 1)[1].strip().strip('"').strip(";")
fields = raw_value.split("~")
if len(fields) < 6:
continue
sym = line.split("=", 1)[0].strip().lstrip("v_")
orig_code = code_map.get(sym)
if not orig_code:
continue
price = float(fields[3]) if fields[3] else 0
prev_close = float(fields[4]) if fields[4] else 0
change = price - prev_close if prev_close > 0 else 0
change_pct = fields[32] if len(fields) > 32 and fields[32] else "0"
results[orig_code] = (price, change, change_pct)
except (ValueError, IndexError):
continue
return results
# ── 港股实时行情(东方财富,无15分钟延迟)──────────────────────────────
def fetch_hk_eastmoney(codes):
"""东方财富港股实时行情 API — 免费、实时、无15分钟延迟。
API: push2.eastmoney.com
市场代码: 116 (港交所)
格式: 116.00700
返回 {code: (price, change, change_pct)}
Fallback: 失败时回退到腾讯 qt.gtimg.cn(15分钟延迟)
"""
if not codes:
return {}
hk_codes = [str(c).strip() for c in codes if len(str(c).strip()) <= 5]
if not hk_codes:
return {}
results = {}
# 主通道:东方财富实时行情(逐股查询,港股最多~10只,可接受)
for code in hk_codes:
try:
url = (f"https://push2.eastmoney.com/api/qt/stock/get"
f"?secid=116.{code}"
f"&fields=f43,f170,f60,f57,f58"
f"&fltt=2")
req = urllib.request.Request(url, headers={"User-Agent": UA})
with urllib.request.urlopen(req, timeout=5) as r:
resp = json.loads(r.read().decode("utf-8"))
if resp.get("rc") != 0:
continue
item = resp.get("data", {})
if not item:
continue
price = float(item.get("f43", 0)) if item.get("f43") else 0
prev_close = float(item.get("f60", 0)) if item.get("f60") else 0
change = round(price - prev_close, 2) if prev_close > 0 else 0
change_pct = str(item.get("f170", "0"))
if price > 0:
results[code] = (price, change, change_pct)
# 东方财富有频率限制,每请求间隔 0.2s
time.sleep(0.2)
except Exception as e:
print(f"⚠️ 东方财富 {code} 拉取失败: {e}", file=sys.stderr)
continue
# Fallback: 腾讯 qt.gtimg.cn15分钟延迟)
missing = [c for c in hk_codes if c not in results]
if missing:
try:
fallback = _fetch_hk_tencent_fallback(missing)
results.update(fallback)
except Exception:
pass
return results
def _fetch_hk_tencent_fallback(codes):
"""腾讯港股行情(15分钟延迟,仅作 fallback)"""
symbols = [f"hk{c}" for c in codes]
url = f"http://qt.gtimg.cn/q={','.join(symbols)}"
req = urllib.request.Request(url, headers={"User-Agent": UA})
with urllib.request.urlopen(req, timeout=10) as r:
text = r.read().decode("gbk")
code_map = {f"hk{c}": c for c in codes}
results = {}
for line in text.strip().split("\n"):
if "=" not in line:
continue
try:
raw = line.split("=", 1)[1].strip().strip('"').strip(";")
fields = raw.split("~")
if len(fields) < 6:
continue
sym = line.split("=", 1)[0].strip().lstrip("v_")
orig = code_map.get(sym)
if not orig:
continue
price = float(fields[3]) if fields[3] else 0
prev_close = float(fields[4]) if fields[4] else 0
change = price - prev_close if prev_close > 0 else 0
change_pct = fields[32] if len(fields) > 32 and fields[32] else "0"
results[orig] = (price, change, change_pct)
except (ValueError, IndexError):
continue
return results
def refresh_data_prices():
"""一次性刷新portfolio.json和watchlist.json的所有实时价"""
all_codes = set()
# 收集所有需要拉取的代码
try:
pf = json.load(open(PORTFOLIO_PATH))
for s in pf.get('holdings', []):
all_codes.add(s['code'])
except:
pf = {"holdings": []}
try:
wl = json.load(open(WATCHLIST_PATH))
for s in wl.get('stocks', []):
all_codes.add(s['code'])
except:
wl = {"stocks": []}
if not all_codes:
return 0
# 分批拉取:A股走腾讯(实时) + 港股走东方财富(实时)
all_list = list(all_codes)
prices = fetch_all_prices(all_list) # A股(腾讯,实时)
hk_prices = fetch_hk_eastmoney(all_list) # 港股(东方财富,实时)
prices.update(hk_prices)
updated = 0
# 保存全量实时价快照(供报告管道消费,确保分析用最新数据)
try:
live = {"updated_at": datetime.now().isoformat(), "prices": {}}
for code in all_codes:
if code in prices:
p, c, chg = prices[code]
live["prices"][code] = {"price": p, "change_pct": chg}
json.dump(live, open("/home/hmo/web-dashboard/data/live_prices.json", "w"), indent=2)
except Exception:
pass
# 更新portfolio(只在价格变化时写入,避免触发文件变更通知)
changed = False
for s in pf.get('holdings', []):
if s['code'] in prices:
price, _, change_pct = prices[s['code']]
if price > 0:
# 港股:API返回HKD,需转RMB
if is_hk_stock(s['code']):
price = round(price * HK_RATE, 2)
old = s.get('price', 0)
if abs(old - price) > 0.001:
s['price'] = round(price, 2)
s['change_pct'] = float(change_pct) if change_pct else 0
updated += 1
changed = True
if changed:
pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M')
pf['total_mv'] = calc_total_mv(pf.get('holdings', []))
pf['total_assets'] = calc_total_assets(pf)
pf['position_pct'] = calc_position_pct(pf)
# DB 写入(替代 json.dump,强制币种约束)
try:
conn = get_conn()
write_holdings_batch(conn, pf['holdings'])
write_portfolio_summary(conn, pf)
conn.close()
except Exception as e:
print(f" [DB写入失败] {e}", flush=True)
# 保留 JSON 副本作为冷备
json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2)
elif pf.get('updated_at'):
try:
last_ts = datetime.strptime(pf['updated_at'], '%Y-%m-%d %H:%M')
if (datetime.now() - last_ts).total_seconds() > 600:
pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M')
json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2)
except:
pass
# 更新watchlist(只在价格变化时写入)
changed = False
for s in wl.get('stocks', []):
if s['code'] in prices:
price, _, change_pct = prices[s['code']]
if price > 0:
# 港股:API返回HKD,需转RMB
if is_hk_stock(s['code']):
price = round(price * HK_RATE, 2)
old = s.get('price', 0)
if abs(old - price) > 0.001:
s['price'] = round(price, 2)
s['change_pct'] = float(change_pct) if change_pct else 0
updated += 1
changed = True
if changed:
wl['updated_at'] = datetime.now().isoformat()
# DB 写入(替代 json.dump
try:
conn = get_conn()
for s in wl.get('stocks', []):
s['currency'] = 'CNY' # 自选股价格统一CNY
write_watchlist_stock(conn, s)
conn.close()
except Exception as e:
print(f" [DB watchlist写入失败] {e}", flush=True)
# 保留 JSON 冷备
json.dump(wl, open(WATCHLIST_PATH, 'w'), ensure_ascii=False, indent=2)
# --- 汇总值重算(使用 mo_models 唯一公式)---
try:
live_market_value = calc_total_mv(pf.get('holdings', []))
old_mv = pf.get('total_mv', 0)
if abs(old_mv - live_market_value) > 0.01:
pf['total_mv'] = round(live_market_value, 2)
pf['total_assets'] = calc_total_assets(pf)
if pf['total_assets'] > 0:
pf['position_pct'] = calc_position_pct(pf)
pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M')
# DB 写入
try:
conn = get_conn()
write_portfolio_summary(conn, pf)
conn.close()
except Exception as e:
print(f" [DB汇总写入失败] {e}", flush=True)
# JSON 冷备
json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2)
except Exception as e:
print(f" [汇总重算失败] {e}", flush=True)
# --- 结束汇总重算 ---
return updated
# ── 分支系统辅助函数 ──────────────────────────────────────────────────────
def _branch_alert_suffix(code, price, shares=0, cost=0):
"""返回分支信息后缀:「 | 情景→动作」"""
if not HAS_TREE or not _SCENARIO_CACHE.get('id'):
return ""
try:
sc_id = _SCENARIO_CACHE['id']
results = evaluate_branches(code, sc_id, price, shares, cost)
for r in results:
if r.get('applicable'):
_record_branch_trigger(code, r.get('branch_id',''), price)
branch_action = r.get('action_type', r.get('action', 'hold'))
return f" | {sc_id}{branch_action}"
except Exception:
pass
return ""
def _record_branch_trigger(code, branch_id, price):
"""记录分支触发事件(自成长:trigger_count+1"""
try:
raw = json.load(open(DECISIONS_PATH))
for d in raw.get('decisions', []):
if d.get('code') == code and d.get('strategy_tree',{}).get('branches'):
for b in d['strategy_tree']['branches']:
if b['id'] == branch_id:
b.setdefault('trigger_count', 0)
b['trigger_count'] += 1
b['last_trigger_price'] = round(price, 2)
b['last_triggered'] = datetime.now().isoformat()
break
json.dump(raw, open(DECISIONS_PATH, 'w'), ensure_ascii=False, indent=2)
except Exception:
pass
# ── 区间偏离检测 ──────────────────────────────────────────────────────────
def load_state():
try:
with open(STATE_PATH) as f:
return json.load(f)
except:
return {}
def save_state(state):
os.makedirs(os.path.dirname(STATE_PATH), exist_ok=True)
with open(STATE_PATH, 'w') as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def load_breaches():
try:
with open(BREACH_PATH) as f:
return json.load(f)
except:
return {}
def save_breaches(data):
os.makedirs(os.path.dirname(BREACH_PATH), exist_ok=True)
with open(BREACH_PATH, 'w') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_events():
try:
with open(EVENTS_PATH) as f:
return json.load(f)
except:
return {"events": []}
def save_events(events):
os.makedirs(os.path.dirname(EVENTS_PATH), exist_ok=True)
with open(EVENTS_PATH, 'w') as f:
json.dump(events, f, ensure_ascii=False, indent=2)
def record_event(code, name, event_type, price, trigger_value, event_label=""):
"""记录一次价格触发事件到 price_events.json + SQLite"""
events = load_events()
now = datetime.now().isoformat()
events["events"].append({
"code": code,
"name": name,
"event_type": event_type, # entry_zone, stop_loss, take_profit, exit_zone
"price": round(price, 2),
"trigger_value": trigger_value,
"event_label": event_label,
"timestamp": now,
"date": datetime.now().strftime("%Y-%m-%d"),
})
# 保留最近10000条
events["events"] = events["events"][-10000:]
save_events(events)
# ── SQLite 双写 ──
try:
from mofin_db import get_conn, init_all_tables, write_price_event
conn = get_conn()
init_all_tables(conn)
write_price_event(conn, code, name, event_type, price, trigger_value, event_label)
conn.close()
except Exception:
pass # SQLite 写入失败不影响主流程
def get_trigger_zones(d):
"""返回该decision所有可监控的区间列表,从顶层字段读取"""
zones = []
is_holding = d.get('shares', 0) > 0
# 买入区间(自选和持仓都监控)
el = d.get("entry_low", 0)
eh = d.get("entry_high", 0)
if el and eh and float(el) > 0 and float(eh) > 0:
try:
zones.append(("entry_zone", "买入区间", float(el), float(eh)))
except:
pass
# 止损+止盈(只有持仓才监控,自选无意义)
if is_holding:
sl = d.get("stop_loss", 0)
if sl and float(sl) > 0:
try:
zones.append(("stop_loss", "止损", 0, float(sl)))
except:
pass
tp = d.get("take_profit", 0)
if tp and float(tp) > 0:
try:
zones.append(("take_profit_zone", "止盈区间", 0, float(tp)))
except:
pass
return zones
def run_once(round_label=""):
"""执行一轮完整的监控流程"""
global _SCENARIO_CACHE, _BRANCH_CACHE
label = f" [{round_label}]" if round_label else ""
start = time.time()
# 刷新情景与分支缓存(每轮更新)
_SCENARIO_CACHE = detect_scenario() if HAS_TREE else {}
_BRANCH_CACHE = {}
try:
raw = json.load(open(DECISIONS_PATH))
for d in raw.get('decisions', []):
tree = d.get('strategy_tree', {})
if tree and tree.get('branches'):
_BRANCH_CACHE[d['code']] = tree['branches']
except Exception:
pass
# === 第一步:一次性刷新所有价格 ===
refreshed = refresh_data_prices()
# === 第二步:检查触发条件 ===
try:
with open(DECISIONS_PATH) as f:
dec = json.load(f)
except:
print(f"{label} 无法读取decisions.json", file=sys.stderr)
return
active = [d for d in dec.get("decisions", []) if d.get("status") in ("active", "updated")]
state = load_state()
outputs = []
state_updated = False
# 收集所有需要检查的代码
check_codes = set()
for d in active:
if get_trigger_zones(d):
check_codes.add(d["code"])
# 批量拉取这些股票的价格
prices = fetch_all_prices(list(check_codes))
for d in active:
code = d["code"]
zones = get_trigger_zones(d)
if not zones:
continue
price_info = prices.get(code)
if not price_info:
continue
price, _, _ = price_info
if price == 0:
continue
name = d.get("name", code)
if code not in state:
state[code] = {}
for key, label, lo, hi in zones:
in_zone = lo <= price <= hi
prev_in_zone = state[code].get(key, None)
if in_zone and prev_in_zone != True:
if key == "stop_loss":
branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0))
outputs.append(f"⚠️ {name}({code}) {price} → 跌破止损{hi}{branch_sfx}")
record_event(code, name, "stop_loss", price, str(hi))
else:
extra = ""
if "_price" in key:
batch_shares = d.get(key.replace("_price", "_shares"), "")
action = d.get(key.replace("_price", "_action"), "")
if batch_shares:
extra = f" {action}{batch_shares}" if action else f" {batch_shares}"
elif key in ("take_profit_zone",):
act = d.get("take_profit_action", "")
if act:
extra = f"{act}"
branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0))
outputs.append(f"{name}({code}) {price} → 进入{label}{lo}~{hi}{extra}{branch_sfx}")
record_event(code, name, "entry_zone", price, f"{lo}~{hi}", label)
state[code][key] = True
state_updated = True
elif not in_zone and prev_in_zone == True:
if key != "stop_loss":
outputs.append(f"📌 {name}({code}) {price} → 离开{label}{lo}~{hi}")
state[code][key] = False
state_updated = True
# === 第三步:买入区偏离检测 + 自动重评 ===
reassesed_codes = []
for d in active:
code = d["code"]
name = d.get("name", code)
price_info = prices.get(code)
if not price_info:
continue
price, _, _ = price_info
if price == 0:
continue
# 从 decisions.json 中读取 analysis 的买入区
entry_low = d.get("entry_low", 0)
entry_high = d.get("entry_high", 0)
if not entry_low or not entry_high:
continue
in_buy_zone = entry_low <= price <= entry_high
prev_in_buy_zone = state.get(code, {}).get("__buy_zone", None)
# 状态变化时才触发
if in_buy_zone and prev_in_buy_zone == False:
# 重新进入买入区 → 重评确认区间是否仍然有效
outputs.append(f"🔄 {name}({code}) {price} → 重新进入买入区{entry_low}~{entry_high},触发技术面重评")
do_reassess = True
elif not in_buy_zone and prev_in_buy_zone == True:
# 离开买入区 → 立即重评,更新止损/止盈/区间
outputs.append(f"🔄 {name}({code}) {price} → 离开买入区{entry_low}~{entry_high},立即技术面重评")
do_reassess = True
else:
do_reassess = False
if do_reassess and HAS_REASSESS:
try:
cost = d.get("cost", 0) or 0
shares = d.get("shares", 0) or 0
profit_pct = (price - cost) / cost * 100 if cost else 0
is_deep_loss = profit_pct < -20
sentiment = "neutral"
if d.get("tech_snapshot"):
if "bearish" in d["tech_snapshot"]:
sentiment = "bearish"
elif "bullish" in d["tech_snapshot"]:
sentiment = "bullish"
# 调用技术面驱动重评(非机械百分比)
result = reassess_strategy(
code, name, price, cost, shares,
current_action=d.get("action", ""),
volume_signal="中性", sentiment=sentiment,
)
outputs.append(f" 📊 新策略: 损{result['stop_loss']}{result['take_profit']}{result['entry_low']}~{result['entry_high']} RR={result['rr_ratio']}")
reassesed_codes.append(code)
except Exception as e:
outputs.append(f" ⚠️ 重评失败: {e}")
# 更新买入区状态
if "__buy_zone" not in state.get(code, {}):
if code not in state:
state[code] = {}
state[code]["__buy_zone"] = in_buy_zone
state_updated = True
# 如果有重评过的股票,更新 decisions.json
if reassesed_codes and HAS_REASSESS:
try:
# 重新 regenerate_all 只针对受影响的股票效率太低
# 直接全量重评(regenerate_all 内部会批量拉价格、做技术分析)
from strategy_lifecycle import regenerate_all
r = regenerate_all(stdout=False)
outputs.append(f" ✅ 策略已全量重评: {r.get('ok',0)}/{r.get('total',0)}成功")
outputs.append(f" 📌 触发股票: {', '.join(reassesed_codes)}")
except Exception as e:
outputs.append(f" ⚠️ 全量重评失败: {e}")
# === 3.5 资金流异常检测(2026-06-27 新增)===
try:
cf = json.load(open("/home/hmo/web-dashboard/data/capital_flow_cache.json"))
# 检查所有 active decision 中的资金流异常
for d in active:
code = d["code"]
stock_cf = cf.get("stocks", {}).get(code, {})
analysis = stock_cf.get("analysis", {})
alerts = analysis.get("alerts", [])
if alerts:
name = d.get("name", code)
for a in alerts:
outputs.append(f" 💰 {name}({code}) {a}")
except Exception:
pass
# === 第四步:情景变化检测 + 输出 → 直接推XMPP ===
now_str = datetime.now().strftime("%H:%M:%S")
elapsed = time.time() - start
# 情景变化检测(跨轮对比)
if HAS_TREE and _SCENARIO_CACHE.get('id'):
prev_scenario = state.get('_system', {}).get('last_scenario', '')
curr_scenario = _SCENARIO_CACHE['id']
if prev_scenario and curr_scenario != prev_scenario:
combo = _SCENARIO_CACHE.get('combo_action', '')
outputs.insert(0, f"🌀 情景切换: {prev_scenario}{curr_scenario} | {combo}")
if outputs:
state.setdefault('_system', {})['last_scenario'] = curr_scenario
state_updated = True
elif not prev_scenario:
state.setdefault('_system', {})['last_scenario'] = curr_scenario
state_updated = True
if outputs:
# 简短一行一个触发
for o in outputs:
print(o)
# 推送XMPP(只推关键事件:止损跌破+情景切换+资金流异动,不推买入区进出/重评等操作细节)
critical = [o for o in outputs if o.startswith(("⚠️", "🌀", "💰"))]
if critical:
try:
body = "\n".join([f"{now_str}"] + critical)
payload = json.dumps({
"to": "hmo@yoin.fun", "body": body, "type": "chat",
}).encode("utf-8")
req = urllib.request.Request(
"http://127.0.0.1:5805/", data=payload,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
# else: SILENT — 无触发,无输出,不推
if state_updated:
save_state(state)
def main():
"""每cron触发跑一轮"""
run_once()
if __name__ == "__main__":
main()