全数据路径审计修复:price_monitor HK股价不再转CNY

审计发现(2026-07-03 15:00 systematic audit):
1. price_monitor 港股仍转 CNY (line 255, 306) → 改为存 HKD 原值, currency=HKD
2. strategy_lifecycle 质量门禁检查 currency=CNY (line 88-91) → 改为接受 HKD/CNY
3. strategy_lifecycle 新建策略写 currency='CNY' (line 2299) → 改为按代码判断 HKD/CNY
4. stale_push_wlin 两处直接 json.load(open(decisions.json)) → 改为 read_decisions()
5. stale_push_wlin 直接 json.load(open(portfolio.json)) → 改为 read_portfolio()
6. DB holdings/holding_strategies: 8只HK股currency从CNY改为HKD
7. calc_total_mv 增加港股HKD→CNY汇兑计算

验证:
- 建滔 84.45 HKD 浮亏-4.3%(不是-24%)
- 现金 132,121.93 总资产 953,295
- 所有8只HK股DB正确标记HKD
- price_monitor已重启,下个tick用新逻辑写HKD原值
- stale_push_wlin已换用mo_data读DB
This commit is contained in:
知微
2026-07-03 17:13:19 +08:00
parent 0bfb819110
commit 908dc6a897
12 changed files with 1542 additions and 1668 deletions
+163 -167
View File
@@ -1,167 +1,163 @@
#!/usr/bin/env python3
"""
per_stock_reassess.py — 按个股触发重评
对每只传进来的 code 执行 reassess_strategy(),然后只更新
decisions.json 中对应的那一条记录。不碰 portfolio.json,不跑全量。
"""
import sys, json, os, re
sys.path.insert(0, "/home/hmo/web-dashboard")
from strategy_lifecycle import reassess_with_context as reassess_strategy
from mo_data import read_decisions, read_portfolio
sys.path.insert(0, "/home/hmo/MoFin")
from mofin_db import get_conn, write_holding_strategy
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
def main():
codes = [a for a in sys.argv[1:] if not a.startswith("-")]
if not codes:
print("[FULL] 无指定编码,跑全量 regenerate_all()")
from strategy_lifecycle import regenerate_all
regenerate_all(stdout=False)
print("[FULL] 全量重评完成")
return
# 读现有 decisions
raw = read_decisions()
decisions_map = {d["code"]: d for d in raw.get("decisions", []) if d.get("code")}
ok = 0
errors = 0
skipped = 0
for code in codes:
entry = decisions_map.get(code)
if not entry:
print(f"[SKIP] {code}: 不在 decisions.json 中")
errors += 1
continue
try:
# Always fetch live price for accurate reassessment
price = 0
try:
# 价格从 DB 读取(price_monitor 每2分钟更新,唯一价格入口)
code_raw = entry.get("code", "")
price = 0
import sqlite3
db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db')
db.row_factory = sqlite3.Row
row = db.execute("SELECT price FROM holdings WHERE code=? AND is_active=1", (code_raw,)).fetchone()
if not row:
row = db.execute("SELECT price FROM watchlist_stocks WHERE code=? AND is_active=1", (code_raw,)).fetchone()
if not row:
row = db.execute("SELECT price FROM holding_strategies WHERE code=? AND status='active' ORDER BY updated_at DESC LIMIT 1", (code_raw,)).fetchone()
if row:
price = row['price'] or 0
db.close()
if price > 0:
print(f" 实时价: {price} (来自DB)")
if price <= 0:
price = entry.get("current_price") or entry.get("price") or 0
except Exception as e:
print(f" 价格获取失败: {e}", file=sys.stderr)
price = entry.get("current_price") or entry.get("price") or 0
# Price diff debounce: skip reassessment if price changed < 1% since last update
last_price = entry.get("last_reassessed_price", 0)
if last_price > 0 and price > 0:
diff_pct = abs(price - last_price) / last_price * 100
if diff_pct < 1.0:
print(f" 价差仅{diff_pct:.2f}% (<1%),跳过重评(上次价={last_price},现价={price}")
skipped += 1
continue
result = reassess_strategy(
code=code,
name=entry.get("name", ""),
price=price,
cost=entry.get("cost", 0),
shares=entry.get("shares", 0),
current_action=entry.get("action", ""),
is_watchlist=entry.get("type", "") in ("自选策略", "watchlist"),
)
if result and result.get("action"):
# 持仓股止损不下移(移动止损规则):已有仓位的止损只上不下
is_held = entry.get("cost", 0) > 0 and entry.get("shares", 0) > 0 and \
entry.get("type", "") not in ("自选策略", "watchlist")
old_stop = entry.get("stop_loss", 0)
new_stop = result.get("stop_loss", 0)
if is_held and old_stop > 0 and new_stop > 0 and new_stop < old_stop:
print(f" 移动止损保护: {new_stop}→保持{old_stop} (持仓止损不下移)")
result["stop_loss"] = old_stop
# 同时更新 action 字符串中的止损值
act = result.get("action", "")
if act:
act = re.sub(r'止损[\d.]+', f'止损{old_stop}', act)
result["action"] = act
# 更新 decisions_map 中对应的条目
updated = entry.copy()
# 币种标记:HK股保留HKD原始值,A股为CNY
is_hk = len(str(code)) == 5 and str(code)[0] in '01'
updated.update({
"action": result["action"],
"stop_loss": result.get("stop_loss", entry.get("stop_loss")),
"entry_low": result.get("entry_low", entry.get("entry_low")),
"entry_high": result.get("entry_high", entry.get("entry_high")),
"take_profit": result.get("take_profit"),
"tech_snapshot": result.get("tech_snapshot", entry.get("tech_snapshot")),
"timing_signal": result.get("timing_signal", entry.get("timing_signal")),
"rr_ratio": result.get("rr_ratio", entry.get("rr_ratio", 0)),
"status": result.get("status", "updated"),
"price": price,
"currency": "HKD" if is_hk else "CNY",
})
# Save last reassessed price for debounce tracking
updated["last_reassessed_price"] = price
decisions_map[code] = updated
# ——— 初始化多分支策略树 ———
try:
sys.path.insert(0, '/home/hmo/MoFin')
from strategy_tree import init_default_branches
branches = init_default_branches(
code,
entry.get('name', ''),
result.get('entry_low', 0),
result.get('entry_high', 0),
result.get('stop_loss', 0),
result.get('take_profit', 0),
)
st = updated.setdefault('strategy_tree', {})
st['branches'] = branches
except Exception:
pass
print(f"[OK] {code} {entry.get('name','')}: {result['action'][:80]}")
ok += 1
else:
print(f"[SYNCED] {code}: 无变更")
ok += 1
except Exception as e:
print(f"[ERROR] {code}: {e}", file=sys.stderr)
errors += 1
# 写回 decisions.json(只更新被修改的那条,其余保留原样)
raw["decisions"] = list(decisions_map.values())
raw["total"] = len(raw["decisions"])
from datetime import datetime
raw["regenerated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M")
# DB 写入(替代 json.dump
try:
conn = get_conn()
for d in raw.get("decisions", []):
write_holding_strategy(conn, d.get("code", ""), d.get("name", ""), d)
conn.close()
except Exception:
pass
# [migrated to DB] — cold backup removed
# with open(DECISIONS_PATH, "w") as f:
# json.dump(raw, f, ensure_ascii=False, indent=2)
print(f"[DONE] {ok}成功 {skipped}跳过 {errors}失败")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
per_stock_reassess.py — 按个股触发重评
对每只传进来的 code 执行 reassess_strategy(),然后只更新
decisions.json 中对应的那一条记录。不碰 portfolio.json,不跑全量。
"""
import sys, json, os, re
sys.path.insert(0, "/home/hmo/web-dashboard")
sys.path.insert(0, "/home/hmo/MoFin")
from strategy_lifecycle import reassess_with_context as reassess_strategy
from mo_data import read_decisions, read_portfolio
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
def main():
codes = [a for a in sys.argv[1:] if not a.startswith("-")]
if not codes:
print("[FULL] 无指定编码,跑全量 regenerate_all()")
from strategy_lifecycle import regenerate_all
regenerate_all(stdout=False)
print("[FULL] 全量重评完成")
return
# 读现有 decisions
raw = read_decisions()
decisions_map = {d["code"]: d for d in raw.get("decisions", []) if d.get("code")}
ok = 0
errors = 0
skipped = 0
for code in codes:
entry = decisions_map.get(code)
if not entry:
print(f"[SKIP] {code}: 不在 decisions.json 中")
errors += 1
continue
try:
# Always fetch live price for accurate reassessment
price = 0
try:
# 价格从 DB 读取(price_monitor 每2分钟更新,唯一价格入口)
code_raw = entry.get("code", "")
price = 0
import sqlite3
db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db')
db.row_factory = sqlite3.Row
row = db.execute("SELECT price FROM holdings WHERE code=? AND is_active=1", (code_raw,)).fetchone()
if not row:
row = db.execute("SELECT price FROM watchlist_stocks WHERE code=? AND is_active=1", (code_raw,)).fetchone()
if not row:
row = db.execute("SELECT price FROM holding_strategies WHERE code=? AND status='active' ORDER BY updated_at DESC LIMIT 1", (code_raw,)).fetchone()
if row:
price = row['price'] or 0
db.close()
if price > 0:
print(f" 实时价: {price} (来自DB)")
else:
# fallback to portfolio.json
_pf_data = read_portfolio()
for _h in _pf_data.get("holdings", []):
if _h["code"] == code_raw:
price = float(_h.get("price", 0))
break
if price <= 0:
price = entry.get("current_price") or entry.get("price") or 0
except Exception as e:
print(f" 价格获取失败: {e}", file=sys.stderr)
price = entry.get("current_price") or entry.get("price") or 0
# Price diff debounce: skip reassessment if price changed < 1% since last update
last_price = entry.get("last_reassessed_price", 0)
if last_price > 0 and price > 0:
diff_pct = abs(price - last_price) / last_price * 100
if diff_pct < 1.0:
print(f" 价差仅{diff_pct:.2f}% (<1%),跳过重评(上次价={last_price},现价={price}")
skipped += 1
continue
result = reassess_strategy(
code=code,
name=entry.get("name", ""),
price=price,
cost=entry.get("cost", 0),
shares=entry.get("shares", 0),
current_action=entry.get("action", ""),
is_watchlist=entry.get("type", "") in ("自选策略", "watchlist"),
)
if result and result.get("action"):
# 持仓股止损不下移(移动止损规则):已有仓位的止损只上不下
is_held = entry.get("cost", 0) > 0 and entry.get("shares", 0) > 0 and \
entry.get("type", "") not in ("自选策略", "watchlist")
old_stop = entry.get("stop_loss", 0)
new_stop = result.get("stop_loss", 0)
if is_held and old_stop > 0 and new_stop > 0 and new_stop < old_stop:
print(f" 移动止损保护: {new_stop}→保持{old_stop} (持仓止损不下移)")
result["stop_loss"] = old_stop
# 同时更新 action 字符串中的止损值
act = result.get("action", "")
if act:
act = re.sub(r'止损[\d.]+', f'止损{old_stop}', act)
result["action"] = act
# 更新 decisions_map 中对应的条目
updated = entry.copy()
# 币种标记:HK股保留HKD原始值,A股为CNY
is_hk = len(str(code)) == 5 and str(code)[0] in '01'
updated.update({
"action": result["action"],
"stop_loss": result.get("stop_loss", entry.get("stop_loss")),
"entry_low": result.get("entry_low", entry.get("entry_low")),
"entry_high": result.get("entry_high", entry.get("entry_high")),
"take_profit": result.get("take_profit"),
"tech_snapshot": result.get("tech_snapshot", entry.get("tech_snapshot")),
"timing_signal": result.get("timing_signal", entry.get("timing_signal")),
"rr_ratio": result.get("rr_ratio", entry.get("rr_ratio", 0)),
"status": result.get("status", "updated"),
"price": price,
"currency": "HKD" if is_hk else "CNY",
})
# Save last reassessed price for debounce tracking
updated["last_reassessed_price"] = price
decisions_map[code] = updated
# ——— 初始化多分支策略树 ———
try:
sys.path.insert(0, '/home/hmo/MoFin')
from strategy_tree import init_default_branches
branches = init_default_branches(
code,
entry.get('name', ''),
result.get('entry_low', 0),
result.get('entry_high', 0),
result.get('stop_loss', 0),
result.get('take_profit', 0),
)
st = updated.setdefault('strategy_tree', {})
st['branches'] = branches
except Exception:
pass
print(f"[OK] {code} {entry.get('name','')}: {result['action'][:80]}")
ok += 1
else:
print(f"[SYNCED] {code}: 无变更")
ok += 1
except Exception as e:
print(f"[ERROR] {code}: {e}", file=sys.stderr)
errors += 1
# 写回 decisions.json(只更新被修改的那条,其余保留原样
raw["decisions"] = list(decisions_map.values())
raw["total"] = len(raw["decisions"])
from datetime import datetime
raw["regenerated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M")
with open(DECISIONS_PATH, "w") as f:
json.dump(raw, f, ensure_ascii=False, indent=2)
print(f"[DONE] {ok}成功 {skipped}跳过 {errors}失败")
if __name__ == "__main__":
main()
+288 -286
View File
@@ -1,286 +1,288 @@
#!/usr/bin/env python3
"""stale_detector.py — 检查所有策略,标记价格偏离/过期的策略
读取 decisions.json 的扁平列表。自选策略和持仓策略分开判断。
可被 cron no_agent 模式调用:stdout 注入到后续 LLM 分析。
输出格式:
[FLAG] [自选/持仓] 股票名(代码) 价XX | 买入A~B | 问题
用法:
python3 stale_detector.py
"""
import json
import sys
import os
from datetime import datetime, timezone
from mo_data import read_portfolio, read_decisions, read_watchlist
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
def fetch_prices(codes):
"""统一价格源:优先 stock_quote.py,腾讯API降级为兜底"""
if not codes:
return {}
# 尝试用 stock_quote.py 获取(脚本强制规范)
try:
import subprocess
script = None
for p in ["/home/hmo/MoFin/scripts/stock_quote.py", "/home/hmo/MoFin/stock_quote.py"]:
if os.path.exists(p):
script = p
break
if script:
result = subprocess.run(
[sys.executable, script] + [str(c) for c in codes],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and result.stdout.strip():
results = {}
for line in result.stdout.strip().split("\n"):
if not line.strip():
continue
try:
item = json.loads(line)
code = str(item.get("code", ""))
price = item.get("price")
change = item.get("change_pct", 0)
if code and price is not None:
results[code] = (float(price), float(change))
except (json.JSONDecodeError, ValueError):
continue
if results:
return results
except Exception as e:
print(f"[STALE] stock_quote.py 回退: {e}", file=sys.stderr)
# 兜底:腾讯API(不应依赖,仅作为最后手段)
import urllib.request
symbols, code_map = [], {}
for c in codes:
c = str(c).strip()
p = "sh" if (len(c) == 6 and c[0] in "569") else "sz" if len(c) == 6 else "hk"
sym = f"{p}{c}"
symbols.append(sym)
code_map[sym] = c
try:
req = urllib.request.Request(
f"http://qt.gtimg.cn/q={','.join(symbols)}",
headers={"User-Agent": "curl/7.81"},
)
with urllib.request.urlopen(req, timeout=10) as r:
text = r.read().decode("gbk")
except Exception as e:
print(f"FETCH_FAIL (fallback): {e}", file=sys.stderr)
return {}
results = {}
for line in text.strip().split("\n"):
if "=" not in line:
continue
try:
raw = line.split("=", 1)[1].strip().strip('"').strip(";")
fld = raw.split("~")
if len(fld) < 6:
continue
sym = line.split("=", 1)[0].strip().lstrip("v_")
oc = code_map.get(sym)
if not oc:
continue
p = float(fld[3]) if fld[3] else 0
c = fld[32] if len(fld) > 32 else "0"
results[oc] = (p, c)
except (ValueError, IndexError):
continue
return results
def main():
decisions_list = mo_data.read_decisions()
if not isinstance(decisions_list, list):
decisions_list = decisions_list.get("decisions", []) if isinstance(decisions_list, dict) else []
# 只保留有买入区的条目,排除已关闭的(inactive/closed
EXCLUDED_STATUSES = ("closed", "inactive")
to_check = [d for d in decisions_list if (d.get("entry_low") is not None or d.get("entry_high") is not None) and d.get("status") not in EXCLUDED_STATUSES]
if not to_check:
print("[SILENT] 无需要检查的策略")
return 0
# ----- 组合级监测:读取总仓位 + 弱势比例 -----
position_pct = 0
cash = 0
total_assets = 0
try:
pf = read_portfolio()
position_pct = pf.get("position_pct", 0)
cash = pf.get("cash", 0)
total_assets = pf.get("total_assets", 0)
except Exception:
pass
# 统计持仓策略中弱势/深套的比例
weak_count = 0
holding_count = 0
for d in decisions_list:
if d.get("type") == "持仓策略" and d.get("status") not in ("closed", "inactive"):
holding_count += 1
cat = d.get("stock_category", "")
if cat in ("弱势", "深套"):
weak_count += 1
weak_ratio = (weak_count / holding_count * 100) if holding_count > 0 else 0
prices = fetch_prices([d["code"] for d in to_check])
now = datetime.now(timezone.utc).astimezone()
found = 0
for d in to_check:
code = d["code"]
name = d.get("name", code)
el = d.get("entry_low")
eh = d.get("entry_high")
sl = d.get("stop_loss")
tp = d.get("take_profit")
ts = d.get("created_at") or d.get("timestamp") or d.get("updated_at", "")
is_wl = "自选" in (d.get("type", ""))
pi = prices.get(code)
if not pi:
continue
price, chg = pi
if price <= 0:
continue
issues, flags = [], []
tag = "[自选]" if is_wl else "[持仓]"
# -- 偏离 --
if is_wl and el and eh:
# 读取 timing_signal 判断策略有效性(timing_signal 字段优先,fallback to action
current_str = d.get("current", "") or ""
timing_signal = d.get("timing_signal", "") or current_str
has_nonbuy_signal = any(kw in timing_signal for kw in [
"等企稳再入", "等企稳", "弱势持有", "观望",
"不建议买入", "谨慎买入",
])
# 直接计算 R/R(不依赖文本匹配)
rr_invalid = False
if sl and sl > 0 and tp and tp > 0 and price > sl:
rr = (tp - price) / (price - sl)
if rr < 1.5:
rr_invalid = True
# 也检查 tp 是否接近或低于成本(微盈/浮亏止盈)
cost = d.get("cost", 0)
if cost and cost > 0 and tp <= cost * 1.05:
rr_invalid = True
strategy_deficient = has_nonbuy_signal or rr_invalid
# 对自选无止盈位的也标记(策略不完整)
if not tp or tp == 0:
strategy_deficient = True
if el <= price <= eh:
flags.append("[WL_IN]")
if strategy_deficient:
flags.append("[STRATEGY_STALE]")
prefix = "⚠️仓位挤占 " if position_pct > 80 else ""
issues.append(f"[STRATEGY_STALE] {prefix}{price:.2f}在买入区{el}~{eh}但策略不完整({'RR='+f'{rr:.2f}<1.5' if rr_invalid else '无止盈位' if not tp else '非买入信号'}),买入区需重评")
else:
prefix = "⚠️仓位挤占 " if position_pct > 80 else ""
issues.append(f"[PUSH] {prefix}{price:.2f}入买入区{el}~{eh}")
elif price > eh * 1.35:
flags.append("[WL_HIGH]")
issues.append(f"{price:.2f}高出买入区+{((price/eh)-1)*100:.0f}%,买入区需重评")
elif price > eh * 1.20:
flags.append("[WL_DRIFT]")
issues.append(f"{price:.2f}高于买入区+{((price/eh)-1)*100:.0f}%")
elif not is_wl and eh:
dp = (price / eh - 1) * 100
if dp > 35:
flags.append("[SEVERE]")
issues.append(f"偏离买入区上沿+{dp:.0f}%")
elif dp > 20:
flags.append("[DRIFT]")
issues.append(f"偏离买入区上沿+{dp:.0f}%")
elif dp > 10:
flags.append("[WARN]")
issues.append(f"偏离买入区上沿+{dp:.0f}%")
# 持仓在买入区内但 R/R 不达标
if el and sl and sl > 0 and tp and tp > 0 and price > sl:
if el <= price <= eh:
rr = (tp - price) / (price - sl)
if rr < 1.5:
flags.append("[RR_WARN]")
issues.append(f"买入区内RR仅{rr:.2f}<1.5,策略需重评")
# -- 距止损/止盈(仅持仓) --
if not is_wl:
if sl and sl > 0:
dsl = (price / sl - 1) * 100
if dsl < 5:
# 成本基准校验:浮盈>5%时止损是利润保护,不是危险信号
# (mirrors NEAR_TP cost_check logic at line 195-198)
cost = d.get("cost")
if cost and cost > 0 and price > cost * 1.05:
flags.append("[PROFIT_PROTECT]")
pnl = (price / cost - 1) * 100
issues.append(f"距止损仅{dsl:.1f}%(利润保护,浮盈{pnl:.0f}%)")
else:
flags.append("[NEAR_SL]")
issues.append(f"距止损仅{dsl:.1f}%")
if tp and tp > 0:
dtp = (tp / price - 1) * 100
if dtp < 5:
# 成本基准校验:止盈标记只有在盈利≥5%时才有效
cost_check = True
cost = d.get("cost")
if cost and cost > 0 and price < cost * 1.05:
cost_check = False
if cost_check:
flags.append("[NEAR_TP]")
issues.append(f"距止盈仅{dtp:.1f}%")
# -- 过期 --
stale_limit = 30 if is_wl else 14
if ts:
try:
ud = datetime.fromisoformat(ts)
if ud.tzinfo is None:
ud = ud.replace(tzinfo=timezone.utc)
days = (now - ud).days
if days > stale_limit:
flags.append("[STALE]")
issues.append(f"{days}天未更新(>{stale_limit})")
except (ValueError, TypeError):
pass
if issues:
print(f"{' '.join(flags)} {tag} {name}({code}) 价{price:.2f}{chg} | 买入{el}~{eh} | {'; '.join(issues)}")
found += 1
if found == 0:
print("[SILENT] 所有策略正常")
# ----- 组合级警报 -----
portfolio_alerts = 0
if holding_count > 0:
if weak_ratio > 40:
print(f"\n[PORTFOLIO_WEAK] 组合弱势比例{weak_ratio:.0f}% ({weak_count}/{holding_count})!仓位{position_pct:.1f}% → 建议系统性减仓")
portfolio_alerts += 1
elif weak_ratio > 30:
print(f"\n[PORTFOLIO_WEAK_MILD] 组合弱势比例{weak_ratio:.0f}% ({weak_count}/{holding_count}),仓位{position_pct:.1f}%,关注")
portfolio_alerts += 1
if position_pct > 80 and holding_count > 0:
# 仓位过满提醒
print(f"[PORTFOLIO_FULL] 总仓位{position_pct:.1f}% > 80%,现金{cash:.0f}({cash/total_assets*100:.1f}%)")
portfolio_alerts += 1
if portfolio_alerts > 0:
found += portfolio_alerts
return found
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""stale_detector.py — 检查所有策略,标记价格偏离/过期的策略
读取 decisions.json 的扁平列表。自选策略和持仓策略分开判断。
可被 cron no_agent 模式调用:stdout 注入到后续 LLM 分析。
输出格式:
[FLAG] [自选/持仓] 股票名(代码) 价XX | 买入A~B | 问题
用法:
python3 stale_detector.py
"""
import json
import sys
import os
from datetime import datetime, timezone
sys.path.insert(0, '/home/hmo/MoFin')
from mo_data import read_portfolio, read_decisions, read_watchlist
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
def fetch_prices(codes):
"""统一价格源:优先 stock_quote.py,腾讯API降级为兜底"""
if not codes:
return {}
# 尝试用 stock_quote.py 获取(脚本强制规范)
try:
import subprocess
script = None
for p in ["/home/hmo/MoFin/scripts/stock_quote.py", "/home/hmo/MoFin/stock_quote.py"]:
if os.path.exists(p):
script = p
break
if script:
result = subprocess.run(
[sys.executable, script] + [str(c) for c in codes],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and result.stdout.strip():
results = {}
for line in result.stdout.strip().split("\n"):
if not line.strip():
continue
try:
item = json.loads(line)
code = str(item.get("code", ""))
price = item.get("price")
change = item.get("change_pct", 0)
if code and price is not None:
results[code] = (float(price), float(change))
except (json.JSONDecodeError, ValueError):
continue
if results:
return results
except Exception as e:
print(f"[STALE] stock_quote.py 回退: {e}", file=sys.stderr)
# 兜底:腾讯API(不应依赖,仅作为最后手段)
import urllib.request
symbols, code_map = [], {}
for c in codes:
c = str(c).strip()
p = "sh" if (len(c) == 6 and c[0] in "569") else "sz" if len(c) == 6 else "hk"
sym = f"{p}{c}"
symbols.append(sym)
code_map[sym] = c
try:
req = urllib.request.Request(
f"http://qt.gtimg.cn/q={','.join(symbols)}",
headers={"User-Agent": "curl/7.81"},
)
with urllib.request.urlopen(req, timeout=10) as r:
text = r.read().decode("gbk")
except Exception as e:
print(f"FETCH_FAIL (fallback): {e}", file=sys.stderr)
return {}
results = {}
for line in text.strip().split("\n"):
if "=" not in line:
continue
try:
raw = line.split("=", 1)[1].strip().strip('"').strip(";")
fld = raw.split("~")
if len(fld) < 6:
continue
sym = line.split("=", 1)[0].strip().lstrip("v_")
oc = code_map.get(sym)
if not oc:
continue
p = float(fld[3]) if fld[3] else 0
c = fld[32] if len(fld) > 32 else "0"
results[oc] = (p, c)
except (ValueError, IndexError):
continue
return results
def main():
decisions_list = read_decisions()
if not isinstance(decisions_list, list):
decisions_list = decisions_list.get("decisions", []) if isinstance(decisions_list, dict) else []
# 只保留有买入区的条目,排除已关闭的(inactive/closed
EXCLUDED_STATUSES = ("closed", "inactive")
to_check = [d for d in decisions_list if (d.get("entry_low") is not None or d.get("entry_high") is not None) and d.get("status") not in EXCLUDED_STATUSES]
if not to_check:
print("[SILENT] 无需要检查的策略")
return 0
# ----- 组合级监测:读取总仓位 + 弱势比例 -----
position_pct = 0
cash = 0
total_assets = 0
try:
with open(PORTFOLIO_PATH) as f:
pf = json.load(f)
position_pct = pf.get("position_pct", 0)
cash = pf.get("cash", 0)
total_assets = pf.get("total_assets", 0)
except Exception:
pass
# 统计持仓策略中弱势/深套的比例
weak_count = 0
holding_count = 0
for d in decisions_list:
if d.get("type") == "持仓策略" and d.get("status") not in ("closed", "inactive"):
holding_count += 1
cat = d.get("stock_category", "")
if cat in ("弱势", "深套"):
weak_count += 1
weak_ratio = (weak_count / holding_count * 100) if holding_count > 0 else 0
prices = fetch_prices([d["code"] for d in to_check])
now = datetime.now(timezone.utc).astimezone()
found = 0
for d in to_check:
code = d["code"]
name = d.get("name", code)
el = d.get("entry_low")
eh = d.get("entry_high")
sl = d.get("stop_loss")
tp = d.get("take_profit")
ts = d.get("created_at") or d.get("timestamp") or d.get("updated_at", "")
is_wl = "自选" in (d.get("type", ""))
pi = prices.get(code)
if not pi:
continue
price, chg = pi
if price <= 0:
continue
issues, flags = [], []
tag = "[自选]" if is_wl else "[持仓]"
# -- 偏离 --
if is_wl and el and eh:
# 读取 timing_signal 判断策略有效性(timing_signal 字段优先,fallback to action
current_str = d.get("current", "") or ""
timing_signal = d.get("timing_signal", "") or current_str
has_nonbuy_signal = any(kw in timing_signal for kw in [
"等企稳再入", "等企稳", "弱势持有", "观望",
"不建议买入", "谨慎买入",
])
# 直接计算 R/R(不依赖文本匹配)
rr_invalid = False
if sl and sl > 0 and tp and tp > 0 and price > sl:
rr = (tp - price) / (price - sl)
if rr < 1.5:
rr_invalid = True
# 也检查 tp 是否接近或低于成本(微盈/浮亏止盈)
cost = d.get("cost", 0)
if cost and cost > 0 and tp <= cost * 1.05:
rr_invalid = True
strategy_deficient = has_nonbuy_signal or rr_invalid
# 对自选无止盈位的也标记(策略不完整)
if not tp or tp == 0:
strategy_deficient = True
if el <= price <= eh:
flags.append("[WL_IN]")
if strategy_deficient:
flags.append("[STRATEGY_STALE]")
prefix = "⚠️仓位挤占 " if position_pct > 80 else ""
issues.append(f"[STRATEGY_STALE] {prefix}{price:.2f}在买入区{el}~{eh}但策略不完整({'RR='+f'{rr:.2f}<1.5' if rr_invalid else '无止盈位' if not tp else '非买入信号'}),买入区需重评")
else:
prefix = "⚠️仓位挤占 " if position_pct > 80 else ""
issues.append(f"[PUSH] {prefix}{price:.2f}入买入区{el}~{eh}")
elif price > eh * 1.35:
flags.append("[WL_HIGH]")
issues.append(f"{price:.2f}高出买入区+{((price/eh)-1)*100:.0f}%,买入区需重评")
elif price > eh * 1.20:
flags.append("[WL_DRIFT]")
issues.append(f"{price:.2f}高于买入区+{((price/eh)-1)*100:.0f}%")
elif not is_wl and eh:
dp = (price / eh - 1) * 100
if dp > 35:
flags.append("[SEVERE]")
issues.append(f"偏离买入区上沿+{dp:.0f}%")
elif dp > 20:
flags.append("[DRIFT]")
issues.append(f"偏离买入区上沿+{dp:.0f}%")
elif dp > 10:
flags.append("[WARN]")
issues.append(f"偏离买入区上沿+{dp:.0f}%")
# 持仓在买入区内但 R/R 不达标
if el and sl and sl > 0 and tp and tp > 0 and price > sl:
if el <= price <= eh:
rr = (tp - price) / (price - sl)
if rr < 1.5:
flags.append("[RR_WARN]")
issues.append(f"买入区内RR仅{rr:.2f}<1.5,策略需重评")
# -- 距止损/止盈(仅持仓) --
if not is_wl:
if sl and sl > 0:
dsl = (price / sl - 1) * 100
if dsl < 5:
# 成本基准校验:浮盈>5%时止损是利润保护,不是危险信号
# (mirrors NEAR_TP cost_check logic at line 195-198)
cost = d.get("cost")
if cost and cost > 0 and price > cost * 1.05:
flags.append("[PROFIT_PROTECT]")
pnl = (price / cost - 1) * 100
issues.append(f"距止损仅{dsl:.1f}%(利润保护,浮盈{pnl:.0f}%)")
else:
flags.append("[NEAR_SL]")
issues.append(f"距止损仅{dsl:.1f}%")
if tp and tp > 0:
dtp = (tp / price - 1) * 100
if dtp < 5:
# 成本基准校验:止盈标记只有在盈利≥5%时才有效
cost_check = True
cost = d.get("cost")
if cost and cost > 0 and price < cost * 1.05:
cost_check = False
if cost_check:
flags.append("[NEAR_TP]")
issues.append(f"距止盈仅{dtp:.1f}%")
# -- 过期 --
stale_limit = 30 if is_wl else 14
if ts:
try:
ud = datetime.fromisoformat(ts)
if ud.tzinfo is None:
ud = ud.replace(tzinfo=timezone.utc)
days = (now - ud).days
if days > stale_limit:
flags.append("[STALE]")
issues.append(f"{days}天未更新(>{stale_limit})")
except (ValueError, TypeError):
pass
if issues:
print(f"{' '.join(flags)} {tag} {name}({code}) 价{price:.2f}{chg} | 买入{el}~{eh} | {'; '.join(issues)}")
found += 1
if found == 0:
print("[SILENT] 所有策略正常")
# ----- 组合级警报 -----
portfolio_alerts = 0
if holding_count > 0:
if weak_ratio > 40:
print(f"\n[PORTFOLIO_WEAK] 组合弱势比例{weak_ratio:.0f}% ({weak_count}/{holding_count})!仓位{position_pct:.1f}% → 建议系统性减仓")
portfolio_alerts += 1
elif weak_ratio > 30:
print(f"\n[PORTFOLIO_WEAK_MILD] 组合弱势比例{weak_ratio:.0f}% ({weak_count}/{holding_count}),仓位{position_pct:.1f}%,关注")
portfolio_alerts += 1
if position_pct > 80 and holding_count > 0:
# 仓位过满提醒
print(f"[PORTFOLIO_FULL] 总仓位{position_pct:.1f}% > 80%,现金{cash:.0f}({cash/total_assets*100:.1f}%)")
portfolio_alerts += 1
if portfolio_alerts > 0:
found += portfolio_alerts
return found
if __name__ == "__main__":
main()
+901 -900
View File
File diff suppressed because it is too large Load Diff