Files
MoFin/price_monitor.py
T
知微 e33a236bc1 自成长系统:四层循环架构文档 + 三个代码改动 + 所有日间修复
内容:
- docs/SELF_GROWTH_SYSTEM.md (NEW) — 完整的 Sense→Respond→Adapt→Improve 架构文档
- docs/SYSTEM_ARCHITECTURE.md (UPDATED) — 总索引指向新文档,cron数从14更新为31
- hk_rate.py (NEW) — HKD汇率模块,缓存+上次有效汇率自动恢复
- price_monitor.py (MODIFIED) — 价格监控注入分支评估+情景切换检测
- strategy_lifecycle.py (MODIFIED) — 策略生命周期评估上下文
- strategy_tree.py (NEW) — 情景化多分支决策引擎

日间修复(2026-06-23):
- stale_push_wlin: cash硬编码146837→读portfolio.json
- stale_push_wlin: lot_cost汇率0.93→hkd_to_cny动态
- stale_push_wlin: HK每手默认500股→Tencent API实时f[60]
- stale_push_wlin: 重评异步→串行(先重评再出报告)
- hk_rate: FALLBACK=0.87硬编码→缓存上次有效汇率
- 新增 cron: 分支扫描每30分, 分支剪枝周六, 硬编码审计17:25
- hardcode_scanner.py 每日扫描所有.py中大额数字
2026-06-24 00:04:26 +08:00

539 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""price_monitor.py — 高频价格监控脚本(批量版)
规则:进入区间报一次,离开区间报一次,中间不重复。
每次运行时一次性刷新所有持仓+自选股的实时价。
"""
import json
import urllib.request
import os
import sys
import time
from datetime import datetime
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json"
BREACH_PATH = "/home/hmo/.hermes/zone_breach.json"
STATE_PATH = os.path.expanduser("~/.hermes/price_trigger_state.json")
EVENTS_PATH = "/home/hmo/web-dashboard/data/price_events.json"
# 策略重评依赖(技术面驱动,非机械百分比)
sys.path.insert(0, "/home/hmo/web-dashboard")
try:
from strategy_lifecycle import reassess_strategy
HAS_REASSESS = True
except ImportError:
HAS_REASSESS = False
try:
from hk_rate import hkd_to_cny
HK_RATE = hkd_to_cny()
except Exception:
HK_RATE = 0.8700 # fallback
# 分支系统与情景检测
try:
sys.path.insert(0, '/home/hmo/MoFin')
from strategy_tree import detect_scenario, evaluate_branches
HAS_TREE = True
except Exception:
HAS_TREE = False
def detect_scenario(): return {}
def evaluate_branches(*a, **kw): return []
# 情景缓存(每次run_once刷新)
_SCENARIO_CACHE = {}
_BRANCH_CACHE = {} # code -> branches list
UA = "Mozilla/5.0"
# ── 批量拉取价格 ──────────────────────────────────────────────────────────
def fetch_all_prices(codes):
"""腾讯批量行情API:一次请求拉取所有股票(A股+港股)
A股:sh600110 / sz000001
港股:hk00700
返回 {code: (price, change, change_pct)}
"""
if not codes:
return {}
# 构建批量查询串
symbols = []
code_map = {} # symbol -> original_code
for code in codes:
code_s = str(code).strip()
if len(code_s) == 6:
# A股:沪市以5/6/9开头,深市以0/3开头
if code_s.startswith(('5', '6', '9')):
sym = f"sh{code_s}"
else:
sym = f"sz{code_s}"
else:
sym = f"hk{code_s}"
symbols.append(sym)
code_map[sym] = code_s
url = f"http://qt.gtimg.cn/q={','.join(symbols)}"
try:
req = urllib.request.Request(url, headers={"User-Agent": UA})
with urllib.request.urlopen(req, timeout=10) as r:
text = r.read().decode("gbk")
except Exception as e:
print(f"⚠️ 批量拉取失败: {e}", file=sys.stderr)
return {}
results = {}
for line in text.strip().split("\n"):
line = line.strip()
if not line or "=" not in line:
continue
try:
# 格式: v_sh600110="1~诺德股份~600110~11.84~11.90~..."
raw_value = line.split("=", 1)[1].strip().strip('"').strip(";")
fields = raw_value.split("~")
if len(fields) < 6:
continue
sym = line.split("=", 1)[0].strip().lstrip("v_")
orig_code = code_map.get(sym)
if not orig_code:
continue
price = float(fields[3]) if fields[3] else 0
prev_close = float(fields[4]) if fields[4] else 0
change = price - prev_close if prev_close > 0 else 0
change_pct = fields[32] if len(fields) > 32 and fields[32] else "0"
results[orig_code] = (price, change, change_pct)
except (ValueError, IndexError):
continue
return results
def refresh_data_prices():
"""一次性刷新portfolio.json和watchlist.json的所有实时价"""
all_codes = set()
# 收集所有需要拉取的代码
try:
pf = json.load(open(PORTFOLIO_PATH))
for s in pf.get('holdings', []):
all_codes.add(s['code'])
except:
pf = {"holdings": []}
try:
wl = json.load(open(WATCHLIST_PATH))
for s in wl.get('stocks', []):
all_codes.add(s['code'])
except:
wl = {"stocks": []}
if not all_codes:
return 0
# 一次性批量拉取
prices = fetch_all_prices(list(all_codes))
updated = 0
# 更新portfolio(只在价格变化时写入,避免触发文件变更通知)
changed = False
for s in pf.get('holdings', []):
if s['code'] in prices:
price, _, change_pct = prices[s['code']]
if price > 0:
# 港股:API返回HKD,需转RMB2026-06-23 bugfix
if str(s['code']).startswith(('0','1')) and len(str(s['code']))==5:
price = round(price * HK_RATE, 2)
old = s.get('price', 0)
if abs(old - price) > 0.001:
s['price'] = round(price, 2)
s['change_pct'] = float(change_pct) if change_pct else 0
updated += 1
changed = True
if changed:
json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2)
# 更新watchlist(只在价格变化时写入)
changed = False
for s in wl.get('stocks', []):
if s['code'] in prices:
price, _, change_pct = prices[s['code']]
if price > 0:
# 港股:API返回HKD,需转RMB2026-06-23 bugfix
if str(s['code']).startswith(('0','1')) and len(str(s['code']))==5:
price = round(price * HK_RATE, 2)
old = s.get('price', 0)
if abs(old - price) > 0.001:
s['price'] = round(price, 2)
s['change_pct'] = float(change_pct) if change_pct else 0
updated += 1
changed = True
if changed:
wl['updated_at'] = datetime.now().isoformat()
json.dump(wl, open(WATCHLIST_PATH, 'w'), ensure_ascii=False, indent=2)
return updated
# ── 分支系统辅助函数 ──────────────────────────────────────────────────────
def _branch_alert_suffix(code, price, shares=0, cost=0):
"""返回分支信息后缀:「 | 情景→动作」"""
if not HAS_TREE or not _SCENARIO_CACHE.get('id'):
return ""
try:
sc_id = _SCENARIO_CACHE['id']
results = evaluate_branches(code, sc_id, price, shares, cost)
for r in results:
if r.get('applicable'):
_record_branch_trigger(code, r.get('branch_id',''), price)
branch_action = r.get('action_type', r.get('action', 'hold'))
return f" | {sc_id}{branch_action}"
except Exception:
pass
return ""
def _record_branch_trigger(code, branch_id, price):
"""记录分支触发事件(自成长:trigger_count+1"""
try:
raw = json.load(open(DECISIONS_PATH))
for d in raw.get('decisions', []):
if d.get('code') == code and d.get('strategy_tree',{}).get('branches'):
for b in d['strategy_tree']['branches']:
if b['id'] == branch_id:
b.setdefault('trigger_count', 0)
b['trigger_count'] += 1
b['last_trigger_price'] = round(price, 2)
b['last_triggered'] = datetime.now().isoformat()
break
json.dump(raw, open(DECISIONS_PATH, 'w'), ensure_ascii=False, indent=2)
except Exception:
pass
# ── 区间偏离检测 ──────────────────────────────────────────────────────────
def load_state():
try:
with open(STATE_PATH) as f:
return json.load(f)
except:
return {}
def save_state(state):
os.makedirs(os.path.dirname(STATE_PATH), exist_ok=True)
with open(STATE_PATH, 'w') as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def load_breaches():
try:
with open(BREACH_PATH) as f:
return json.load(f)
except:
return {}
def save_breaches(data):
os.makedirs(os.path.dirname(BREACH_PATH), exist_ok=True)
with open(BREACH_PATH, 'w') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_events():
try:
with open(EVENTS_PATH) as f:
return json.load(f)
except:
return {"events": []}
def save_events(events):
os.makedirs(os.path.dirname(EVENTS_PATH), exist_ok=True)
with open(EVENTS_PATH, 'w') as f:
json.dump(events, f, ensure_ascii=False, indent=2)
def record_event(code, name, event_type, price, trigger_value, event_label=""):
"""记录一次价格触发事件到 price_events.json + SQLite"""
events = load_events()
now = datetime.now().isoformat()
events["events"].append({
"code": code,
"name": name,
"event_type": event_type, # entry_zone, stop_loss, take_profit, exit_zone
"price": round(price, 2),
"trigger_value": trigger_value,
"event_label": event_label,
"timestamp": now,
"date": datetime.now().strftime("%Y-%m-%d"),
})
# 保留最近10000条
events["events"] = events["events"][-10000:]
save_events(events)
# ── SQLite 双写 ──
try:
from mofin_db import get_conn, init_all_tables, write_price_event
conn = get_conn()
init_all_tables(conn)
write_price_event(conn, code, name, event_type, price, trigger_value, event_label)
conn.close()
except Exception:
pass # SQLite 写入失败不影响主流程
def get_trigger_zones(trigger):
"""返回该trigger所有可监控的区间列表,跳过已执行的batch"""
zones = []
for key, label in [
("entry_zone", "加仓区间"),
("batch1_price", "试仓区间"),
("batch2_price", "加仓区间"),
("take_profit_zone", "止盈区间"),
("watch_low", "关注区间"),
("watch_high", "减仓区间"),
("watch_break", "止损区间")
]:
status_key = key.replace("_price", "_status")
if status_key in trigger and trigger[status_key] == "executed":
continue
val = trigger.get(key, "")
if val and "~" in val:
try:
parts = val.split("~")
lo, hi = float(parts[0]), float(parts[1])
zones.append((key, label, lo, hi))
except:
pass
sl = trigger.get("stop_loss", "")
if sl:
try:
sl_price = float(sl) if isinstance(sl, (int, float)) else float(sl)
zones.append(("stop_loss", "止损", 0, sl_price))
except:
pass
return zones
def run_once(round_label=""):
"""执行一轮完整的监控流程"""
global _SCENARIO_CACHE, _BRANCH_CACHE
label = f" [{round_label}]" if round_label else ""
start = time.time()
# 刷新情景与分支缓存(每轮更新)
_SCENARIO_CACHE = detect_scenario() if HAS_TREE else {}
_BRANCH_CACHE = {}
try:
raw = json.load(open(DECISIONS_PATH))
for d in raw.get('decisions', []):
tree = d.get('strategy_tree', {})
if tree and tree.get('branches'):
_BRANCH_CACHE[d['code']] = tree['branches']
except Exception:
pass
# === 第一步:一次性刷新所有价格 ===
refreshed = refresh_data_prices()
# === 第二步:检查触发条件 ===
try:
with open(DECISIONS_PATH) as f:
dec = json.load(f)
except:
print(f"{label} 无法读取decisions.json", file=sys.stderr)
return
active = [d for d in dec.get("decisions", []) if d.get("status") == "active"]
state = load_state()
outputs = []
state_updated = False
# 收集所有需要检查的代码
check_codes = set()
for d in active:
trig = d.get("trigger", {})
if trig:
check_codes.add(d["code"])
# 批量拉取这些股票的价格
prices = fetch_all_prices(list(check_codes))
for d in active:
code = d["code"]
trig = d.get("trigger", {})
if not trig:
continue
zones = get_trigger_zones(trig)
if not zones:
continue
price_info = prices.get(code)
if not price_info:
continue
price, _ = price_info
if price == 0:
continue
name = d.get("name", code)
if code not in state:
state[code] = {}
for key, label, lo, hi in zones:
in_zone = lo <= price <= hi
prev_in_zone = state[code].get(key, None)
if in_zone and prev_in_zone != True:
if key == "stop_loss":
branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0))
outputs.append(f"⚠️ {name}({code}) {price} → 跌破止损{hi}{branch_sfx}")
record_event(code, name, "stop_loss", price, str(hi))
else:
extra = ""
if "_price" in key:
batch_shares = trig.get(key.replace("_price", "_shares"), "")
action = trig.get(key.replace("_price", "_action"), "")
if batch_shares:
extra = f" {action}{batch_shares}" if action else f" {batch_shares}"
elif key in ("take_profit_zone",):
act = trig.get("take_profit_action", "")
if act:
extra = f"{act}"
branch_sfx = _branch_alert_suffix(code, price, d.get('shares',0), d.get('cost',0))
outputs.append(f"{name}({code}) {price} → 进入{label}{lo}~{hi}{extra}{branch_sfx}")
record_event(code, name, "entry_zone", price, f"{lo}~{hi}", label)
state[code][key] = True
state_updated = True
elif not in_zone and prev_in_zone == True:
if key != "stop_loss":
outputs.append(f"📌 {name}({code}) {price} → 离开{label}{lo}~{hi}")
state[code][key] = False
state_updated = True
# === 第三步:买入区偏离检测 + 自动重评 ===
reassesed_codes = []
for d in active:
code = d["code"]
name = d.get("name", code)
price_info = prices.get(code)
if not price_info:
continue
price, _ = price_info
if price == 0:
continue
# 从 decisions.json 中读取 analysis 的买入区
entry_low = d.get("entry_low", 0)
entry_high = d.get("entry_high", 0)
if not entry_low or not entry_high:
continue
in_buy_zone = entry_low <= price <= entry_high
prev_in_buy_zone = state.get(code, {}).get("__buy_zone", None)
# 状态变化时才触发
if in_buy_zone and prev_in_buy_zone == False:
# 重新进入买入区 → 重评确认区间是否仍然有效
outputs.append(f"🔄 {name}({code}) {price} → 重新进入买入区{entry_low}~{entry_high},触发技术面重评")
do_reassess = True
elif not in_buy_zone and prev_in_buy_zone == True:
# 离开买入区 → 立即重评,更新止损/止盈/区间
outputs.append(f"🔄 {name}({code}) {price} → 离开买入区{entry_low}~{entry_high},立即技术面重评")
do_reassess = True
else:
do_reassess = False
if do_reassess and HAS_REASSESS:
try:
cost = d.get("cost", 0) or 0
shares = d.get("shares", 0) or 0
profit_pct = (price - cost) / cost * 100 if cost else 0
is_deep_loss = profit_pct < -20
sentiment = "neutral"
if d.get("tech_snapshot"):
if "bearish" in d["tech_snapshot"]:
sentiment = "bearish"
elif "bullish" in d["tech_snapshot"]:
sentiment = "bullish"
# 调用技术面驱动重评(非机械百分比)
result = reassess_strategy(
code, name, price, cost, shares,
current_action=d.get("action", ""),
volume_signal="中性", sentiment=sentiment,
)
outputs.append(f" 📊 新策略: 损{result['stop_loss']}{result['take_profit']}{result['entry_low']}~{result['entry_high']} RR={result['rr_ratio']}")
reassesed_codes.append(code)
except Exception as e:
outputs.append(f" ⚠️ 重评失败: {e}")
# 更新买入区状态
if "__buy_zone" not in state.get(code, {}):
if code not in state:
state[code] = {}
state[code]["__buy_zone"] = in_buy_zone
state_updated = True
# 如果有重评过的股票,更新 decisions.json
if reassesed_codes and HAS_REASSESS:
try:
# 重新 regenerate_all 只针对受影响的股票效率太低
# 直接全量重评(regenerate_all 内部会批量拉价格、做技术分析)
from strategy_lifecycle import regenerate_all
r = regenerate_all(stdout=False)
outputs.append(f" ✅ 策略已全量重评: {r.get('ok',0)}/{r.get('total',0)}成功")
outputs.append(f" 📌 触发股票: {', '.join(reassesed_codes)}")
except Exception as e:
outputs.append(f" ⚠️ 全量重评失败: {e}")
# === 第四步:情景变化检测 + 输出 → 直接推XMPP ===
now_str = datetime.now().strftime("%H:%M:%S")
elapsed = time.time() - start
# 情景变化检测(跨轮对比)
if HAS_TREE and _SCENARIO_CACHE.get('id'):
prev_scenario = state.get('_system', {}).get('last_scenario', '')
curr_scenario = _SCENARIO_CACHE['id']
if prev_scenario and curr_scenario != prev_scenario:
combo = _SCENARIO_CACHE.get('combo_action', '')
outputs.insert(0, f"🌀 情景切换: {prev_scenario}{curr_scenario} | {combo}")
if outputs:
state.setdefault('_system', {})['last_scenario'] = curr_scenario
state_updated = True
elif not prev_scenario:
state.setdefault('_system', {})['last_scenario'] = curr_scenario
state_updated = True
if outputs:
# 简短一行一个触发
for o in outputs:
print(o)
# 直接推到老爸私信(免等待 cron_to_xmpp
try:
body = "\n".join([f"{now_str}"] + outputs)
payload = json.dumps({
"to": "hmo@yoin.fun", "body": body, "type": "chat",
}).encode("utf-8")
req = urllib.request.Request(
"http://127.0.0.1:5805/", data=payload,
headers={"Content-Type": "application/json"},
)
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
# else: SILENT — 无触发,无输出,不推
if state_updated:
save_state(state)
def main():
"""每cron触发跑一轮"""
run_once()
if __name__ == "__main__":
main()