Files
MoFin/price_monitor.py
T
知微 9709c43ccb 总资产权威数据源统一修复
问题:总资产每次报告重新计算,数字不一致。
根因:cash字段错误(92664→73759),stale_push_wlin二次×0.866,
      报告各算各的。

修复:
1. portfolio.json cash 修正为Dad截图确认值73,758.85
2. price_monitor 每轮写入 total_mv + total_assets 到portfolio.json
   (从此所有报告只读这个字段,不自算)
3. stale_push_wlin 删除重复的 hmv *= 0.866(数据已CNY)
4. portfolio.json 加 currency: CNY 标记防混淆
5. 日志记录本次修复
2026-06-29 21:39:06 +08:00

609 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""price_monitor.py — 高频价格监控脚本(批量版)
规则:进入区间报一次,离开区间报一次,中间不重复。
每次运行时一次性刷新所有持仓+自选股的实时价。
"""
import json
import urllib.request
import os
import sys
import time
from datetime import datetime
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json"
BREACH_PATH = "/home/hmo/.hermes/zone_breach.json"
STATE_PATH = os.path.expanduser("~/.hermes/price_trigger_state.json")
EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json"
# 策略重评依赖(技术面驱动,非机械百分比)
sys.path.insert(0, "/home/hmo/web-dashboard")
try:
from strategy_lifecycle import reassess_strategy
HAS_REASSESS = True
except ImportError:
HAS_REASSESS = False
try:
from hk_rate import hkd_to_cny
HK_RATE = hkd_to_cny()
except Exception:
HK_RATE = 0.8700 # fallback
# 分支系统与情景检测
try:
sys.path.insert(0, '/home/hmo/MoFin')
from strategy_tree import detect_scenario, evaluate_branches
HAS_TREE = True
except Exception:
HAS_TREE = False
def detect_scenario(): return {}
def evaluate_branches(*a, **kw): return []
# 情景缓存(每次run_once刷新)
_SCENARIO_CACHE = {}
_BRANCH_CACHE = {} # code -> branches list
UA = "Mozilla/5.0"
# ── 批量拉取价格 ──────────────────────────────────────────────────────────
def fetch_all_prices(codes):
"""腾讯批量行情API:一次请求拉取所有股票(A股+港股)
A股:sh600110 / sz000001
港股:hk00700
返回 {code: (price, change, change_pct)}
"""
if not codes:
return {}
# 构建批量查询串
symbols = []
code_map = {} # symbol -> original_code
for code in codes:
code_s = str(code).strip()
if len(code_s) == 6:
# A股:沪市以5/6/9开头,深市以0/3开头
if code_s.startswith(('5', '6', '9')):
sym = f"sh{code_s}"
else:
sym = f"sz{code_s}"
else:
sym = f"hk{code_s}"
symbols.append(sym)
code_map[sym] = code_s
url = f"http://qt.gtimg.cn/q={','.join(symbols)}"
try:
req = urllib.request.Request(url, headers={"User-Agent": UA})
with urllib.request.urlopen(req, timeout=10) as r:
text = r.read().decode("gbk")
except Exception as e:
print(f"⚠️ 批量拉取失败: {e}", file=sys.stderr)
return {}
results = {}
for line in text.strip().split("\n"):
line = line.strip()
if not line or "=" not in line:
continue
try:
# 格式: v_sh600110="1~诺德股份~600110~11.84~11.90~..."
raw_value = line.split("=", 1)[1].strip().strip('"').strip(";")
fields = raw_value.split("~")
if len(fields) < 6:
continue
sym = line.split("=", 1)[0].strip().lstrip("v_")
orig_code = code_map.get(sym)
if not orig_code:
continue
price = float(fields[3]) if fields[3] else 0
prev_close = float(fields[4]) if fields[4] else 0
change = price - prev_close if prev_close > 0 else 0
change_pct = fields[32] if len(fields) > 32 and fields[32] else "0"
results[orig_code] = (price, change, change_pct)
except (ValueError, IndexError):
continue
return results
def refresh_data_prices():
"""一次性刷新portfolio.json和watchlist.json的所有实时价"""
all_codes = set()
# 收集所有需要拉取的代码
try:
pf = json.load(open(PORTFOLIO_PATH))
for s in pf.get('holdings', []):
all_codes.add(s['code'])
except:
pf = {"holdings": []}
try:
wl = json.load(open(WATCHLIST_PATH))
for s in wl.get('stocks', []):
all_codes.add(s['code'])
except:
wl = {"stocks": []}
if not all_codes:
return 0
# 一次性批量拉取
prices = fetch_all_prices(list(all_codes))
updated = 0
# 保存全量实时价快照(供报告管道消费,确保分析用最新数据)
try:
live = {"updated_at": datetime.now().isoformat(), "prices": {}}
for code in all_codes:
if code in prices:
p, c, chg = prices[code]
live["prices"][code] = {"price": p, "change_pct": chg}
json.dump(live, open("/home/hmo/web-dashboard/data/live_prices.json", "w"), indent=2)
except Exception:
pass
# 更新portfolio(只在价格变化时写入,避免触发文件变更通知)
changed = False
for s in pf.get('holdings', []):
if s['code'] in prices:
price, _, change_pct = prices[s['code']]
if price > 0:
# 港股:API返回HKD,需转RMB2026-06-23 bugfix
if str(s['code']).startswith(('0','1')) and len(str(s['code']))==5:
price = round(price * HK_RATE, 2)
old = s.get('price', 0)
if abs(old - price) > 0.001:
s['price'] = round(price, 2)
s['change_pct'] = float(change_pct) if change_pct else 0
updated += 1
changed = True
if changed:
pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M')
# 统一计算总资产:持仓市值 + 现金(所有港股价已×HK_RATE转CNY)
pf['total_mv'] = round(sum(
h.get('shares',0) * h.get('price',0) for h in pf.get('holdings',[])
), 2)
pf['total_assets'] = round(pf['total_mv'] + pf.get('cash',0), 2)
json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2)
elif pf.get('updated_at'):
# 即使价格无变化,每10分钟刷新一次updated_at,防健康检查误报
try:
last_ts = datetime.strptime(pf['updated_at'], '%Y-%m-%d %H:%M')
if (datetime.now() - last_ts).total_seconds() > 600:
pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M')
json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2)
except:
pass
# 更新watchlist(只在价格变化时写入)
changed = False
for s in wl.get('stocks', []):
if s['code'] in prices:
price, _, change_pct = prices[s['code']]
if price > 0:
# 港股:API返回HKD,需转RMB2026-06-23 bugfix
if str(s['code']).startswith(('0','1')) and len(str(s['code']))==5:
price = round(price * HK_RATE, 2)
old = s.get('price', 0)
if abs(old - price) > 0.001:
s['price'] = round(price, 2)
s['change_pct'] = float(change_pct) if change_pct else 0
updated += 1
changed = True
if changed:
wl['updated_at'] = datetime.now().isoformat()
json.dump(wl, open(WATCHLIST_PATH, 'w'), ensure_ascii=False, indent=2)
# --- 汇总值重算(2026-06-29 bugfix: 之前price_monitor只更新个股价,不更新汇总)---
# total_market_value / total_assets / cash / position_pct 来自import_holding_xls快照
# 价格更新后必须同步刷新汇总,否则报告使用过期汇总 → 现金/资产/仓位全错
try:
live_market_value = sum(
h.get('shares', 0) * h.get('price', 0)
for h in pf.get('holdings', [])
)
# 从 stale_report 读可用现金(Dad截图确认值)
stale_cash = 0
try:
sr = json.load(open('/home/hmo/web-dashboard/data/strategy_staleness_report.json'))
fallback_cash = pf.get('cash', 0) or 0
stale_cash = sr.get('portfolio', {}).get('cash', fallback_cash)
except:
stale_cash = pf.get('cash', 0) or 0
old_mv = pf.get('total_market_value', 0)
if abs(old_mv - live_market_value) > 0.01:
pf['total_market_value'] = round(live_market_value, 2)
old_cash = pf.get('cash', 0)
if abs(old_cash - stale_cash) > 0.01:
pf['cash'] = stale_cash
pf['total_assets'] = round(live_market_value + stale_cash, 2)
if pf['total_assets'] > 0:
pf['position_pct'] = round(live_market_value / pf['total_assets'] * 100, 2)
pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M')
json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2)
except Exception as e:
print(f" [汇总重算失败] {e}", flush=True)
# --- 结束汇总重算 ---
return updated
# ── 分支系统辅助函数 ──────────────────────────────────────────────────────
def _branch_alert_suffix(code, price, shares=0, cost=0):
"""返回分支信息后缀:「 | 情景→动作」"""
if not HAS_TREE or not _SCENARIO_CACHE.get('id'):
return ""
try:
sc_id = _SCENARIO_CACHE['id']
results = evaluate_branches(code, sc_id, price, shares, cost)
for r in results:
if r.get('applicable'):
_record_branch_trigger(code, r.get('branch_id',''), price)
branch_action = r.get('action_type', r.get('action', 'hold'))
return f" | {sc_id}{branch_action}"
except Exception:
pass
return ""
def _record_branch_trigger(code, branch_id, price):
"""记录分支触发事件(自成长:trigger_count+1"""
try:
raw = json.load(open(DECISIONS_PATH))
for d in raw.get('decisions', []):
if d.get('code') == code and d.get('strategy_tree',{}).get('branches'):
for b in d['strategy_tree']['branches']:
if b['id'] == branch_id:
b.setdefault('trigger_count', 0)
b['trigger_count'] += 1
b['last_trigger_price'] = round(price, 2)
b['last_triggered'] = datetime.now().isoformat()
break
json.dump(raw, open(DECISIONS_PATH, 'w'), ensure_ascii=False, indent=2)
except Exception:
pass
# ── 区间偏离检测 ──────────────────────────────────────────────────────────
def load_state():
try:
with open(STATE_PATH) as f:
return json.load(f)
except:
return {}
def save_state(state):
os.makedirs(os.path.dirname(STATE_PATH), exist_ok=True)
with open(STATE_PATH, 'w') as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def load_breaches():
try:
with open(BREACH_PATH) as f:
return json.load(f)
except:
return {}
def save_breaches(data):
os.makedirs(os.path.dirname(BREACH_PATH), exist_ok=True)
with open(BREACH_PATH, 'w') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_events():
try:
with open(EVENTS_PATH) as f:
return json.load(f)
except:
return {"events": []}
def save_events(events):
os.makedirs(os.path.dirname(EVENTS_PATH), exist_ok=True)
with open(EVENTS_PATH, 'w') as f:
json.dump(events, f, ensure_ascii=False, indent=2)
def record_event(code, name, event_type, price, trigger_value, event_label=""):
"""记录一次价格触发事件到 price_events.json + SQLite"""
events = load_events()
now = datetime.now().isoformat()
events["events"].append({
"code": code,
"name": name,
"event_type": event_type, # entry_zone, stop_loss, take_profit, exit_zone
"price": round(price, 2),
"trigger_value": trigger_value,
"event_label": event_label,
"timestamp": now,
"date": datetime.now().strftime("%Y-%m-%d"),
})
# 保留最近10000条
events["events"] = events["events"][-10000:]
save_events(events)
# ── SQLite 双写 ──
try:
from mofin_db import get_conn, init_all_tables, write_price_event
conn = get_conn()
init_all_tables(conn)
write_price_event(conn, code, name, event_type, price, trigger_value, event_label)
conn.close()
except Exception:
pass # SQLite 写入失败不影响主流程
def get_trigger_zones(d):
"""返回该decision所有可监控的区间列表,从顶层字段读取"""
zones = []
is_holding = d.get('shares', 0) > 0
# 买入区间(自选和持仓都监控)
el = d.get("entry_low", 0)
eh = d.get("entry_high", 0)
if el and eh and float(el) > 0 and float(eh) > 0:
try:
zones.append(("entry_zone", "买入区间", float(el), float(eh)))
except:
pass
# 止损+止盈(只有持仓才监控,自选无意义)
if is_holding:
sl = d.get("stop_loss", 0)
if sl and float(sl) > 0:
try:
zones.append(("stop_loss", "止损", 0, float(sl)))
except:
pass
tp = d.get("take_profit", 0)
if tp and float(tp) > 0:
try:
zones.append(("take_profit_zone", "止盈区间", 0, float(tp)))
except:
pass
return zones
def run_once(round_label=""):
"""执行一轮完整的监控流程"""
global _SCENARIO_CACHE, _BRANCH_CACHE
label = f" [{round_label}]" if round_label else ""
start = time.time()
# 刷新情景与分支缓存(每轮更新)
_SCENARIO_CACHE = detect_scenario() if HAS_TREE else {}
_BRANCH_CACHE = {}
try:
raw = json.load(open(DECISIONS_PATH))
for d in raw.get('decisions', []):
tree = d.get('strategy_tree', {})
if tree and tree.get('branches'):
_BRANCH_CACHE[d['code']] = tree['branches']
except Exception:
pass
# === 第一步:一次性刷新所有价格 ===
refreshed = refresh_data_prices()
# === 第二步:检查触发条件 ===
try:
with open(DECISIONS_PATH) as f:
dec = json.load(f)
except:
print(f"{label} 无法读取decisions.json", file=sys.stderr)
return
active = [d for d in dec.get("decisions", []) if d.get("status") in ("active", "updated")]
state = load_state()
outputs = []
state_updated = False
# 收集所有需要检查的代码
check_codes = set()
for d in active:
if get_trigger_zones(d):
check_codes.add(d["code"])
# 批量拉取这些股票的价格
prices = fetch_all_prices(list(check_codes))
for d in active:
code = d["code"]
zones = get_trigger_zones(d)
if not zones:
continue
price_info = prices.get(code)
if not price_info:
continue
price, _, _ = price_info
if price == 0:
continue
name = d.get("name", code)
if code not in state:
state[code] = {}
for key, label, lo, hi in zones:
in_zone = lo <= price <= hi
prev_in_zone = state[code].get(key, None)
if in_zone and prev_in_zone != True:
if key == "stop_loss":
branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0))
outputs.append(f"⚠️ {name}({code}) {price} → 跌破止损{hi}{branch_sfx}")
record_event(code, name, "stop_loss", price, str(hi))
else:
extra = ""
if "_price" in key:
batch_shares = d.get(key.replace("_price", "_shares"), "")
action = d.get(key.replace("_price", "_action"), "")
if batch_shares:
extra = f" {action}{batch_shares}" if action else f" {batch_shares}"
elif key in ("take_profit_zone",):
act = d.get("take_profit_action", "")
if act:
extra = f"{act}"
branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0))
outputs.append(f"{name}({code}) {price} → 进入{label}{lo}~{hi}{extra}{branch_sfx}")
record_event(code, name, "entry_zone", price, f"{lo}~{hi}", label)
state[code][key] = True
state_updated = True
elif not in_zone and prev_in_zone == True:
if key != "stop_loss":
outputs.append(f"📌 {name}({code}) {price} → 离开{label}{lo}~{hi}")
state[code][key] = False
state_updated = True
# === 第三步:买入区偏离检测 + 自动重评 ===
reassesed_codes = []
for d in active:
code = d["code"]
name = d.get("name", code)
price_info = prices.get(code)
if not price_info:
continue
price, _, _ = price_info
if price == 0:
continue
# 从 decisions.json 中读取 analysis 的买入区
entry_low = d.get("entry_low", 0)
entry_high = d.get("entry_high", 0)
if not entry_low or not entry_high:
continue
in_buy_zone = entry_low <= price <= entry_high
prev_in_buy_zone = state.get(code, {}).get("__buy_zone", None)
# 状态变化时才触发
if in_buy_zone and prev_in_buy_zone == False:
# 重新进入买入区 → 重评确认区间是否仍然有效
outputs.append(f"🔄 {name}({code}) {price} → 重新进入买入区{entry_low}~{entry_high},触发技术面重评")
do_reassess = True
elif not in_buy_zone and prev_in_buy_zone == True:
# 离开买入区 → 立即重评,更新止损/止盈/区间
outputs.append(f"🔄 {name}({code}) {price} → 离开买入区{entry_low}~{entry_high},立即技术面重评")
do_reassess = True
else:
do_reassess = False
if do_reassess and HAS_REASSESS:
try:
cost = d.get("cost", 0) or 0
shares = d.get("shares", 0) or 0
profit_pct = (price - cost) / cost * 100 if cost else 0
is_deep_loss = profit_pct < -20
sentiment = "neutral"
if d.get("tech_snapshot"):
if "bearish" in d["tech_snapshot"]:
sentiment = "bearish"
elif "bullish" in d["tech_snapshot"]:
sentiment = "bullish"
# 调用技术面驱动重评(非机械百分比)
result = reassess_strategy(
code, name, price, cost, shares,
current_action=d.get("action", ""),
volume_signal="中性", sentiment=sentiment,
)
outputs.append(f" 📊 新策略: 损{result['stop_loss']}{result['take_profit']}{result['entry_low']}~{result['entry_high']} RR={result['rr_ratio']}")
reassesed_codes.append(code)
except Exception as e:
outputs.append(f" ⚠️ 重评失败: {e}")
# 更新买入区状态
if "__buy_zone" not in state.get(code, {}):
if code not in state:
state[code] = {}
state[code]["__buy_zone"] = in_buy_zone
state_updated = True
# 如果有重评过的股票,更新 decisions.json
if reassesed_codes and HAS_REASSESS:
try:
# 重新 regenerate_all 只针对受影响的股票效率太低
# 直接全量重评(regenerate_all 内部会批量拉价格、做技术分析)
from strategy_lifecycle import regenerate_all
r = regenerate_all(stdout=False)
outputs.append(f" ✅ 策略已全量重评: {r.get('ok',0)}/{r.get('total',0)}成功")
outputs.append(f" 📌 触发股票: {', '.join(reassesed_codes)}")
except Exception as e:
outputs.append(f" ⚠️ 全量重评失败: {e}")
# === 3.5 资金流异常检测(2026-06-27 新增)===
try:
cf = json.load(open("/home/hmo/web-dashboard/data/capital_flow_cache.json"))
# 检查所有 active decision 中的资金流异常
for d in active:
code = d["code"]
stock_cf = cf.get("stocks", {}).get(code, {})
analysis = stock_cf.get("analysis", {})
alerts = analysis.get("alerts", [])
if alerts:
name = d.get("name", code)
for a in alerts:
outputs.append(f" 💰 {name}({code}) {a}")
except Exception:
pass
# === 第四步:情景变化检测 + 输出 → 直接推XMPP ===
now_str = datetime.now().strftime("%H:%M:%S")
elapsed = time.time() - start
# 情景变化检测(跨轮对比)
if HAS_TREE and _SCENARIO_CACHE.get('id'):
prev_scenario = state.get('_system', {}).get('last_scenario', '')
curr_scenario = _SCENARIO_CACHE['id']
if prev_scenario and curr_scenario != prev_scenario:
combo = _SCENARIO_CACHE.get('combo_action', '')
outputs.insert(0, f"🌀 情景切换: {prev_scenario}{curr_scenario} | {combo}")
if outputs:
state.setdefault('_system', {})['last_scenario'] = curr_scenario
state_updated = True
elif not prev_scenario:
state.setdefault('_system', {})['last_scenario'] = curr_scenario
state_updated = True
if outputs:
# 简短一行一个触发
for o in outputs:
print(o)
# 推送XMPP(只推关键事件:止损跌破+情景切换+资金流异动,不推买入区进出/重评等操作细节)
critical = [o for o in outputs if o.startswith(("⚠️", "🌀", "💰"))]
if critical:
try:
body = "\n".join([f"{now_str}"] + critical)
payload = json.dumps({
"to": "hmo@yoin.fun", "body": body, "type": "chat",
}).encode("utf-8")
req = urllib.request.Request(
"http://127.0.0.1:5805/", data=payload,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
# else: SILENT — 无触发,无输出,不推
if state_updated:
save_state(state)
def main():
"""每cron触发跑一轮"""
run_once()
if __name__ == "__main__":
main()