Files
MoFin/strategy_lifecycle.py
T
知微 b63475402e feat: 港股ATR波动率止损校验
- 新增 is_hk_stock(code): 港股判断(5位代码)
- 新增 calc_atr(code): 腾讯API K线→ATR(14)计算
- 港股止损增加ATR校验:止损距现价不得小于1×ATR
  技术位仍为主依据,ATR仅作为港股高波动下的间距底线
  A股不受影响保持原有逻辑
- 不违背"技术位优先,不机械乘系数"原则
2026-06-25 21:11:07 +08:00

1885 lines
78 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""策略生命周期管理系统 — 技术面驱动版本 v2
核心原则:
1. 止损放在合理的技术位,不拍数字
2. 新买入推荐:止损=弱支撑(约3%跌幅),止盈=强压力,盈亏比≥2:1
3. 已持仓:止损=强支撑(约5-8%跌幅),目标=强压力
4. 买入区间:弱支撑~弱压力之间
5. 买入时机:量价齐跌不买,缩量至支撑买,量价齐升追买
"""
import json
import urllib.request
import os
import sys
import re
from datetime import datetime
import technical_analysis as ta
import multi_timeframe as mtf
def is_hk_stock(code):
"""判断是否港股(港股代码5位,A股6位带前导零)"""
return len(str(code)) <= 5
def calc_atr(code, period=14):
"""从腾讯API K线数据计算ATR(period),返回ATR值或None"""
try:
url = f"http://ifzq.gtimg.cn/appstock/app/fqkline/get?param=hk{code},day,,,60,qfq"
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
resp = urllib.request.urlopen(req, timeout=5).read().decode('utf-8')
data = json.loads(resp)
bars = data.get('data', {}).get(f'hk{code}', {}).get('day', [])
if len(bars) < period + 1:
return None
trs = []
for i in range(1, min(len(bars), period + 1)):
try:
high = float(bars[i][2])
low = float(bars[i][3])
prev_close = float(bars[i-1][4]) if len(bars[i-1]) > 4 else float(bars[i-1][3])
tr = max(high - low, abs(high - prev_close), abs(low - prev_close))
trs.append(tr)
except (ValueError, IndexError):
continue
if not trs:
return None
return round(sum(trs) / len(trs), 2)
except Exception:
return None
# 提示词版本追踪
try:
from prompt_manager.tracking import record_strategy_generation
HAS_PROMPT_TRACKING = True
except ImportError:
HAS_PROMPT_TRACKING = False
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json"
def safe_json_load(path, default=None):
"""安全加载 JSON,遇到坏数据自动修复"""
if not os.path.exists(path):
return default if default is not None else {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError:
# 尝试修复:替换字符串内未转义的换行符,去多余括号
with open(path, "r", encoding="utf-8") as f:
raw = f.read()
fixed = raw
# 修复1: 字符串内未转义的换行 -> \\n
result = []
in_str = False
for ch in fixed:
if ch == '"':
in_str = not in_str
result.append(ch)
elif in_str and ch in '\n\r':
result.append('\\n')
else:
result.append(ch)
fixed = ''.join(result)
# 修复2: 去掉多余的尾部括号
fixed = fixed.rstrip('}')
# 补回正确的闭合
if not fixed.endswith('}'):
fixed += '}'
try:
return json.loads(fixed)
except json.JSONDecodeError as e:
print(f"[WARN] watchlist.json 自动修复失败: {e}", file=sys.stderr)
return default if default is not None else {}
KNOWLEDGE_LOG = "/home/hmo/Obsidian/knowledge/finance/analyst-knowledge-log.md"
MACRO_CONTEXT_PATH = "/home/hmo/web-dashboard/data/macro_context.json"
MARKET_CONTEXT_PATH = "/home/hmo/web-dashboard/data/market.json"
STOCK_SECTOR_MAP_PATH = "/home/hmo/web-dashboard/data/stock_sector_map.json"
def load_stock_sector_map():
"""读取个股归属行业映射
stock_sector_map.json 格式: {code: [sector1, sector2, ...]}
跳过 _note, _created_at 等元数据键。
"""
# 优先从 SQLite 读取
try:
from mofin_db import get_conn, query_sector_stocks
conn = get_conn()
# 从 stock_sectors 表反向构建 code→[sectors] 映射
rows = conn.execute("SELECT code, sector_name FROM stock_sectors ORDER BY code").fetchall()
conn.close()
code_to_sectors = {}
for code, sector in rows:
if code not in code_to_sectors:
code_to_sectors[code] = []
code_to_sectors[code].append(sector)
return code_to_sectors
except Exception:
pass
try:
with open(STOCK_SECTOR_MAP_PATH) as f:
data = json.load(f)
code_to_sectors = {}
for key, value in data.items():
if key.startswith("_"):
continue
if isinstance(value, list):
code_to_sectors[key] = value
return code_to_sectors
except Exception:
return {}
def load_market_context():
"""读取市场上下文,优先 SQLite,回退 market.json"""
# 优先从 SQLite 读取
try:
from mofin_db import get_conn, query_latest_market
conn = get_conn()
market = query_latest_market(conn)
conn.close()
if market and market.get("sectors"):
sector_perf = {}
for s in market["sectors"]:
name = s.get("name", "")
if name:
sector_perf[name] = {
"change": s.get("change_pct", 0),
"up_count": s.get("up_count", 0),
"down_count": s.get("down_count", 0),
"net_inflow": s.get("net_inflow", 0),
"lead_stock": s.get("lead_stock", ""),
"lead_stock_change": s.get("lead_stock_change", 0),
}
return {
"sector_perf": sector_perf,
"breadth": market.get("up_ratio", 50),
"mood": market.get("mood", "neutral"),
"top_gainers": {g["name"]: g["change_pct"] for g in market.get("top_gainers", [])},
"top_losers": {g["name"]: g["change_pct"] for g in market.get("top_losers", [])},
"total_sectors": len(market["sectors"]),
"market_timestamp": market.get("timestamp", ""),
}
except Exception:
pass
try:
with open(MARKET_CONTEXT_PATH) as f:
market = json.load(f)
sectors = market.get("sectors", [])
sector_perf = {}
for s in sectors:
name = s.get("name", "")
if name:
sector_perf[name] = {
"change": s.get("change", 0),
"up_count": s.get("up_count", 0),
"down_count": s.get("down_count", 0),
"net_inflow": s.get("net_inflow", 0),
"lead_stock": s.get("lead_stock", ""),
"lead_stock_change": s.get("lead_stock_change", 0),
}
top_gainers = {s.get("name", ""): s.get("change", 0)
for s in market.get("top_gainers", [])}
top_losers = {s.get("name", ""): s.get("change", 0)
for s in market.get("top_losers", [])}
return {
"sector_perf": sector_perf,
"breadth": market.get("up_ratio", 50),
"mood": market.get("mood", "neutral"),
"top_gainers": top_gainers,
"top_losers": top_losers,
"total_sectors": market.get("total_sectors", 0),
"market_timestamp": market.get("timestamp", ""),
}
except Exception:
return {
"sector_perf": {},
"breadth": 50,
"mood": "neutral",
"top_gainers": {},
"top_losers": {},
"total_sectors": 0,
"market_timestamp": "",
}
def compute_sector_adjustment(code, market_ctx, stock_sector_map):
"""根据个股所属行业的市场表现+小果情感,返回调整系数
返回 dict:
stop_bias: 止损调整系数(<1.0收紧, >1.0放宽)
target_bias: 止盈调整系数
note: 行业背景一句话
sector_name: 匹配到的行业名称
sector_change: 行业涨跌幅
"""
# 默认无调整
adj = {"stop_bias": 1.0, "target_bias": 1.0, "note": "",
"sector_name": "", "sector_change": 0}
sectors_for_code = stock_sector_map.get(code, [])
if not sectors_for_code:
return adj
sector_perf = market_ctx.get("sector_perf", {})
breadth = market_ctx.get("breadth", 50)
# 找第一个能匹配到的行业
for sec in sectors_for_code:
if sec in sector_perf:
perf = sector_perf[sec]
chg = perf.get("change", 0)
adj["sector_name"] = sec
adj["sector_change"] = chg
# 行业暴跌 > 3%
if chg <= -3:
adj["stop_bias"] = 0.92 # 止损收紧8%
adj["target_bias"] = 0.90 # 止盈下调10%
adj["note"] = f"行业{sec}大跌{chg:+.1f}%,收紧止损"
# 行业大跌 1~3%
elif chg <= -1:
adj["stop_bias"] = 0.96
adj["target_bias"] = 0.95
adj["note"] = f"行业{sec}下跌{chg:+.1f}%,适度防御"
# 行业大涨 > 3%
elif chg >= 3:
adj["stop_bias"] = 1.05 # 止损放宽5%(给趋势空间)
adj["target_bias"] = 1.03
adj["note"] = f"行业{sec}大涨{chg:+.1f}%,可适度积极"
# 行业上涨 1~3%
elif chg >= 1:
adj["stop_bias"] = 1.02
adj["note"] = f"行业{sec}上涨{chg:+.1f}%,正常"
else:
adj["note"] = f"行业{sec}{chg:+.1f}%,中性"
break
# 尝试处理命名差异:market.json中的行业名可能多了"板块"后缀
for market_sec_name in sector_perf:
if sec in market_sec_name or market_sec_name in sec:
perf = sector_perf[market_sec_name]
chg = perf.get("change", 0)
adj["sector_name"] = market_sec_name
adj["sector_change"] = chg
if chg <= -3:
adj["stop_bias"] = 0.92
adj["target_bias"] = 0.90
adj["note"] = f"行业{market_sec_name}大跌{chg:+.1f}%,收紧止损"
elif chg <= -1:
adj["stop_bias"] = 0.96
adj["target_bias"] = 0.95
adj["note"] = f"行业{market_sec_name}下跌{chg:+.1f}%,适度防御"
elif chg >= 3:
adj["stop_bias"] = 1.05
adj["target_bias"] = 1.03
adj["note"] = f"行业{market_sec_name}大涨{chg:+.1f}%,可适度积极"
elif chg >= 1:
adj["stop_bias"] = 1.02
adj["note"] = f"行业{market_sec_name}上涨{chg:+.1f}%,正常"
else:
adj["note"] = f"行业{market_sec_name}{chg:+.1f}%,中性"
break
# 如果breath<30% (大盘极弱),再加一层收紧
if breadth < 30:
adj["stop_bias"] *= 0.97 # 再收紧3%
breadth_note = "大盘仅{}%个股上涨".format(int(breadth))
adj["note"] = (adj["note"] + " | " + breadth_note) if adj["note"] else breadth_note
elif breadth < 40:
adj["stop_bias"] *= 0.99
breadth_note = "大盘偏弱({}%上涨)".format(int(breadth))
adj["note"] = (adj["note"] + " | " + breadth_note) if adj["note"] else breadth_note
# 小果情感约束:利空置信度>80%时收紧止损
try:
xiaoguo_path = "/home/hmo/web-dashboard/data/xiaoguo_sentiment.json"
if os.path.exists(xiaoguo_path):
xg = json.load(open(xiaoguo_path))
stock_sentiment = xg.get("stocks", {}).get(code, {})
if stock_sentiment:
sentiment = stock_sentiment.get("sentiment", "")
confidence = stock_sentiment.get("confidence", 0)
summary = stock_sentiment.get("summary", "")
if sentiment == "negative" and confidence > 0.8:
adj["stop_bias"] = min(adj["stop_bias"], 0.95)
adj["note"] += f" | 小果利空{confidence:.0%}:{summary[:30]}"
except Exception:
pass
return adj
def load_macro_context():
"""读取宏观上下文,返回 (bias, desc),优先 DB,回退 JSON"""
try:
import sqlite3
from pathlib import Path
conn = sqlite3.connect(str(Path(__file__).parent.parent / "data" / "mofin.db"))
row = conn.execute(
"SELECT indices, structure FROM macro_context_log "
"WHERE has_valid_data=1 ORDER BY created_at DESC LIMIT 1"
).fetchone()
conn.close()
if row:
indices = json.loads(row[0]) if row[0] else {}
structure = json.loads(row[1]) if row[1] else {}
overall = structure.get("overall", "neutral")
desc = structure.get("description", "")
else:
raise ValueError("no db data")
except Exception:
try:
with open(MACRO_CONTEXT_PATH) as f:
ctx = json.load(f)
overall = ctx.get("structure", {}).get("overall", "neutral")
desc = ctx.get("structure", {}).get("description", "")
except Exception:
return 1.0, "宏观未加载"
if "bearish" in overall:
return 0.8, f"宏观{desc}"
elif overall == "bullish":
return 1.05, f"宏观{desc}"
elif overall == "strong_bullish":
return 1.1, f"宏观{desc}"
else:
return 1.0, f"宏观{desc}"
def batch_fetch_prices(codes):
"""批量获取实时价格,合并为一次API调用(自动分批,每批15只)"""
if not codes:
return {}
# 分批处理,避免单次请求过大导致超时
batch_size = 15
all_results = {}
for batch_start in range(0, len(codes), batch_size):
batch = codes[batch_start:batch_start + batch_size]
symbols = []
code_map = {}
for raw_code in batch:
raw_code = str(raw_code).split('_')[0]
if not raw_code:
continue
if len(raw_code) == 5 and raw_code.isdigit():
prefix = "hk"
elif raw_code.startswith(("6", "5")):
prefix = "sh"
else:
prefix = "sz"
sym = f"{prefix}{raw_code}"
symbols.append(sym)
code_map[sym] = raw_code
if not symbols:
continue
url = f"http://qt.gtimg.cn/q={','.join(symbols)}"
max_retries = 2
for attempt in range(max_retries + 1):
try:
r = urllib.request.urlopen(url, timeout=10)
text = r.read().decode("gbk")
except Exception as e:
if attempt < max_retries:
continue
print(f" batch_fetch_prices error: {e}", file=sys.stderr)
continue
for line in text.strip().split("\n"):
line = line.strip()
if not line or "=" not in line:
continue
try:
sym = line.split("=", 1)[0].strip().lstrip("v_")
raw_value = line.split("=", 1)[1].strip().strip('"').strip(";")
fields = raw_value.split("~")
if len(fields) < 35:
continue
orig_code = code_map.get(sym)
if not orig_code:
continue
def f(i):
try:
return float(fields[i]) if fields[i].strip() else 0.0
except:
return 0.0
all_results[orig_code] = {
"price": f(3), "close": f(4), "high": f(33), "low": f(34),
"code": orig_code,
}
except Exception:
continue
break # Success - break retry loop
return all_results
def get_price_tencent(code):
"""获取实时价格,自动识别A股/港股"""
try:
raw_code = code.split('_')[0]
if not raw_code:
return None
if len(raw_code) == 5 and raw_code.isdigit():
prefix = "hk"
elif raw_code.startswith("6") or raw_code.startswith("5"):
prefix = "sh"
else:
prefix = "sz"
url = f"http://qt.gtimg.cn/q={prefix}{raw_code}"
r = urllib.request.urlopen(url, timeout=5)
fields = r.read().decode("gbk").split('"')[1].split("~")
def f(i):
try:
return float(fields[i]) if fields[i].strip() else 0.0
except:
return 0.0
return {
"price": f(3), "close": f(4), "high": f(33), "low": f(34),
"code": raw_code,
}
except Exception as e:
print(f" get_price error {code}: {e}", file=sys.stderr)
return None
def reassess_strategy(code, name, price, cost, shares, current_action,
volume_signal="", sentiment="neutral",
is_watchlist=False):
"""根据技术分析重评策略"""
tech = ta.full_analysis(code)
if tech and "support_resistance" in tech:
sr = tech["support_resistance"]
candle = tech.get("candlestick", {})
vol = tech.get("volume", {})
ss = sr.get("strong_support")
ws = sr.get("weak_support")
wr = sr.get("weak_resist")
sr_resist = sr.get("strong_resist")
pivot = sr.get("pivot")
effective_range = sr.get("effective_range")
print(f" TECH: 强撑={ss} 弱撑={ws} 枢轴={pivot} 弱压={wr} 强压={sr_resist} 有效区间={effective_range}")
else:
print(f" ⚠️ 技术分析不可用", file=sys.stderr)
ss = ws = wr = sr_resist = pivot = None
candle = {}
vol = {}
# ----- 多周期技术分析(周线/月线/均线) -----
mtf_analysis = {}
mtf_adj = {}
try:
mtf_result = mtf.full_multi_tf_analysis(code)
if mtf_result.get("daily") and mtf_result["daily"].get("count", 0) >= 5:
mtf_analysis = mtf_result
mtf_adj = mtf_result.get("strategy_adjustment", {})
daily_mas = mtf_result.get("daily", {}).get("mas", {})
weekly = mtf_result.get("weekly", {})
monthly = mtf_result.get("monthly", {})
trend_align = mtf_adj.get("trend_alignment", "未知")
print(f" 多周期: {trend_align} | "
f"MA5={daily_mas.get('ma5','?')} MA20={daily_mas.get('ma20','?')} MA60={daily_mas.get('ma60','?')} | "
f"周线{weekly.get('trend',{}).get('description','?')} 月线{monthly.get('trend',{}).get('description','?')}")
except Exception as e:
print(f" 多周期分析失败: {e}", file=sys.stderr)
profit_pct = (price - cost) / cost * 100 if cost else 0
is_new_entry = (cost == 0) or (shares == 0)
is_deep_loss = profit_pct < -20
# ----- 股票分类(短炒/中短线/中长线/弱势/深套) -----
stock_category = "中短线"
time_horizon = "2周~3月"
position_advice = "中等仓位"
try:
mtf_cache = json.load(open("/home/hmo/web-dashboard/data/multi_tf_cache.json"))
stock_data = mtf_cache.get(code, {})
daily_klines = stock_data.get("daily", [])
fund = stock_data.get("fundamentals", {})
closes = [d["close"] for d in daily_klines] if daily_klines else []
if len(closes) >= 10:
cur = closes[-1]
ma20 = sum(closes[-20:])/20 if len(closes)>=20 else 0
ma60 = sum(closes[-60:])/60 if len(closes)>=60 else 0
highs = [d["high"] for d in daily_klines[-20:]]
lows = [d["low"] for d in daily_klines[-20:]]
volatility = ((max(highs)-min(lows))/min(lows)*100) if min(lows)>0 else 0
pe = fund.get("pe") or 0
eps = fund.get("eps") or 0
mcap = fund.get("mcap_total") or 0
is_high_vol = volatility > 30
is_high_pe = pe > 100 or pe < 0
is_value = 0 < pe < 20 and eps > 0.5
if is_deep_loss:
stock_category = "深套"
time_horizon = "长期"
position_advice = "不补不割"
elif is_high_vol and is_high_pe:
stock_category = "短炒"
time_horizon = "数日~2周"
position_advice = "小仓快进快出"
elif cur < ma20 and cur < ma60 and ma20 > 0:
stock_category = "弱势"
time_horizon = "观望"
position_advice = "减仓或观望"
elif (is_value or mcap > 1000) and cur > ma20:
stock_category = "中长线"
time_horizon = "数月~1年"
position_advice = "正常配置"
elif volatility > 20:
stock_category = "中短线"
time_horizon = "2~6周"
position_advice = "中等仓位"
except Exception:
pass
print(f" 分类: {stock_category} | {time_horizon} | {position_advice}")
# ----- 短炒+强趋势检测:短炒分类但多周期多头时用移动止损代替弱支撑止损 -----
is_short_term_strong_trend = False
if stock_category == "短炒":
trend_align = mtf_adj.get("trend_alignment", "")
strong_trend_indicators = ["多周期看多", "多周期多头", "上升"]
if any(ind in trend_align for ind in strong_trend_indicators):
is_short_term_strong_trend = True
print(f" ⚡ 短炒+强趋势检测: 趋势={trend_align} → 启用移动止损, 不止盈")
position_advice = "小仓强趋势让利润跑"
# ----- 止损设置(含最小距离3%保护) -----
if is_new_entry:
# 新买入推荐:止损 = 弱支撑(约2-3%跌幅,合理可控)
if ws and ws > 0:
new_stop = round(ws, 2)
else:
new_stop = round(price * 0.96, 2)
elif is_deep_loss:
# 深套:止损 = 强支撑再下移(不轻易割)
if ss and ss > 0:
new_stop = round(min(ss, price * 0.85), 2)
else:
new_stop = round(price * 0.85, 2)
else:
# 已持仓正常:止损 = 强支撑
if is_short_term_strong_trend:
# 短炒+强趋势:用移动止损(距现价-5%),不止盈让利润跑
trailing_sl = round(max(ws or 0, price * 0.95), 2) if ws else round(price * 0.95, 2)
new_stop = trailing_sl
print(f" 短炒强趋势移动止损: {new_stop} (距现价-{(1-new_stop/price)*100:.1f}%)")
elif ss and ss > 0:
new_stop = round(ss, 2)
else:
new_stop = round(price * 0.88, 2)
# 已盈利仓位(>5%):用较紧的移动止损保护利润,但不超过成本线
if profit_pct > 5 and not is_new_entry and not is_deep_loss:
# 取 max(弱支撑, 成本线, 当前价×0.95) 作为止损
cost_protect = cost if cost > 0 else 0
trailing_stop = round(max(ws or 0, cost_protect, price * 0.95), 2)
if trailing_stop > new_stop:
new_stop = trailing_stop
print(f" 已启用移动止损: {new_stop} (保护+{profit_pct:.1f}%利润)", file=sys.stderr)
# 最小止损距离 —— 随趋势强度调整(2026-06-23 震度保护规则)
# 强趋势(多周期看多 + MA多头排列):最小1.5%下行空间
# 普通/弱势:最小3%下行空间
is_strong_trend = False
trend_align = mtf_adj.get("trend_alignment", "")
strong_trend_indicators = ["多周期看多", "多周期多头", "上升"]
try:
if any(ind in trend_align for ind in strong_trend_indicators) and ma20 > ma60 and cur >= ma20:
is_strong_trend = True
except (NameError, TypeError):
pass # ma20/ma60/cur may be unbound if MTF data insufficient
if is_strong_trend:
min_stop_gap = 0.015 # 1.5%
else:
min_stop_gap = 0.03 # 3%
min_stop = round(price * (1 - min_stop_gap), 2)
if new_stop > min_stop and not is_deep_loss:
old_stop = new_stop
new_stop = min_stop
if old_stop != new_stop:
print(f" 最小止损 {round(min_stop_gap*100)}%间距约束: {old_stop}{new_stop} (趋势{'' if is_strong_trend else '普通'})")
# 港股附加:ATR波动率校验 — 止损距现价不得小于 1×ATR(14)
if is_hk_stock(code):
atr = calc_atr(code)
if atr and atr > 0:
min_atr_stop = round(price - atr, 2)
if new_stop > min_atr_stop:
old_stop_val = new_stop
new_stop = min_atr_stop
print(f" 港股ATR波动率校验({atr:.2f}): 止损 {old_stop_val}{new_stop} (1×ATR间距)")
# ----- 止盈设置 -----
if is_short_term_strong_trend and not is_new_entry:
# 短炒+强趋势:不止盈让利润跑
mtf_tp = mtf_adj.get("take_profit_reference", {})
if mtf_tp and mtf_tp.get("level", 0) > price * 1.2:
new_target = round(mtf_tp["level"], 2)
else:
new_target = 0 # 无多周期阻力时不编造止盈
print(f" 短炒强趋势不止盈: 止盈设为{new_target} (+{(new_target/price-1)*100:.0f}%)")
elif sr_resist and sr_resist > 0:
new_target = round(sr_resist, 2)
else:
new_target = 0 # 无技术面数据时不编造止盈
# ----- 风险回报比校验 -----
stop_distance = price - new_stop if price > new_stop else price * 0.02
target_distance = new_target - price if new_target > price else 0
# 1:2 检查
min_target_distance = stop_distance * 2.0
if target_distance < min_target_distance:
# 尝试更高的阻力位,但不超过下一个真实压力位
candidate_targets = []
if wr and wr > price and wr != sr_resist:
candidate_targets.append(wr)
if sr_resist and sr_resist > price:
candidate_targets.append(sr_resist)
# 检查有效区间,如果有更高的自然目标位
if effective_range and price < effective_range * 0.9:
candidate_targets.append(effective_range)
found = False
for level in candidate_targets:
if (level - price) >= min_target_distance:
new_target = level
found = True
break
# 如果仍然不满足,检查是否至少能到 1:1.5
min15_distance = stop_distance * 1.5
if not found:
for level in candidate_targets:
if (level - price) >= min15_distance:
new_target = level
found = True
break
# ----- 风险回报比最终计算 -----
risk = max(price - new_stop, price * 0.01)
reward = max(new_target - price, 0)
rr_ratio = reward / risk if risk > 0 else 0
# ----- 状态判断 -----
if is_deep_loss:
status = "updated"
action_note = "深套持有"
elif is_new_entry:
if rr_ratio < 1.5:
status = "review"
action_note = "⚠️盈亏比不足1:1.5,不建议买入"
elif rr_ratio < 2.0:
status = "updated"
action_note = "⚠️盈亏比偏低(1:{:.1f}),谨慎买入".format(rr_ratio)
else:
status = "updated"
action_note = ""
else:
if rr_ratio < 0.5:
status = "updated"
action_note = "⚠️盈亏比极低,关注"
elif rr_ratio < 1.5:
status = "updated"
action_note = "⚠️盈亏比偏低(1:{:.1f}),不建议加仓".format(rr_ratio)
else:
status = "updated"
action_note = ""
# 短炒+强趋势:在action_note追加标记
if is_short_term_strong_trend and not is_new_entry and not is_deep_loss:
extra_note = "短炒强趋势持" if "深套" not in action_note else ""
if extra_note:
action_note = f"{action_note} | {extra_note}" if action_note else extra_note
# ----- 买入区间(有盈亏比严格约束) -----
max_acceptable_entry = None # 最大可接受买入价(满足R/R约束)
if new_target and new_stop and new_target > new_stop and not is_deep_loss:
# 买入价的R/R约束:
# 要求 (target - entry) / (entry - stop) >= min_rr
# 即 entry <= (target + min_rr * stop) / (1 + min_rr)
min_rr = 1.0 # 至少1:1,才不亏
recommend_rr = 1.5 # 推荐1:1.5以上
max_for_recommend = (new_target + recommend_rr * new_stop) / (1 + recommend_rr)
max_for_neutral = (new_target + min_rr * new_stop) / (1 + min_rr)
if is_new_entry:
# 新买入:要求1:1.5+
max_acceptable_entry = max_for_recommend
else:
# 已持仓加仓:至少1:1
max_acceptable_entry = max_for_neutral
if is_new_entry:
# 新买入:买入区 = 弱支撑附近(不是当前价附近!)
# 只在价格跌到弱支撑附近时才推买入
entry_low = round(price * 0.98, 2)
entry_high = round(price * 1.02, 2)
if max_acceptable_entry and entry_high > max_acceptable_entry:
entry_high = round(max_acceptable_entry, 2)
# 确保买入区不小于1%
if entry_high - entry_low < price * 0.01:
if max_acceptable_entry and price <= max_acceptable_entry:
entry_low = round(max(price * 0.99, new_stop), 2)
entry_high = round(min(price * 1.01, max_acceptable_entry), 2)
elif ws and ws > 0 and wr and wr > 0 and not is_deep_loss:
# 已持仓正常:买入区 = 弱支撑~弱支撑上方5%(给合理回调空间)
# 上限不能低于成本价×0.95(保护已有持仓不被高位逼空)
entry_low = round(ws, 2)
entry_max = round(ws * 1.05, 2) # 比弱支撑高5%,有足够空间
# 如果当前价已远离买入区,保持买入区不变(不因价格涨了就收窄)
min_upper = round(cost * 0.95, 2) if cost > 0 else 0
if entry_max < min_upper:
entry_max = min_upper
if max_acceptable_entry:
entry_high = round(min(entry_max, max_acceptable_entry), 2)
else:
entry_high = entry_max
# 如果当前价已远离买入区(高于买入区上沿),禁止加仓推荐
if price > entry_high:
# 买入区锁定在弱支撑位,但标记为"价格远离"
pass
# 如果买入区过窄,标记但不扩展(加仓必须在支撑位)
if entry_high - entry_low < price * 0.005:
entry_low = round(ws * 0.995, 2)
entry_high = round(ws * 1.005, 2)
else:
entry_low = round(price * 0.90, 2)
entry_high = round(price * 1.05, 2)
# 买入区间稳定性保护:上边界单次变动不超过5%
if 'entry_high' in dir() and entry_high:
# 读取当前策略中已有的买入区上界,如果有且变化过大则限制
old_entry_high = None
if 'current_action' in dir() and current_action:
import re
m = re.search(r'买入区[\d.]+~([\d.]+)', current_action)
if m:
old_entry_high = float(m.group(1))
if old_entry_high and old_entry_high > 0:
max_change = old_entry_high * 0.95 # 单次最多下降5%
if entry_high < max_change:
entry_high = round(max_change, 2)
# ----- 买入时机信号 -----
volume_signal = vol.get("volume_signal", "")
candlestick_sentiment = candle.get("sentiment", "neutral")
timing_signal = "neutral"
if is_new_entry:
# 新买入时机
if volume_signal == "主动买盘占优" and candlestick_sentiment == "bullish":
timing_signal = "买入"
elif volume_signal == "主动卖盘占优":
timing_signal = "观望"
elif volume_signal == "买卖均衡" and ws and price <= ws * 1.03:
timing_signal = "买入"
elif candlestick_sentiment == "bullish":
timing_signal = "买入"
elif ws and price < ws * 1.02:
timing_signal = "关注"
else:
# 已持仓时机(用于加仓/减仓参考)
if is_short_term_strong_trend:
# 短炒+强趋势:强趋势持有,禁止加仓信号
timing_signal = "持有"
elif profit_pct > 5:
# 已盈利
if volume_signal == "主动买盘占优":
timing_signal = "持有"
elif volume_signal == "主动卖盘占优" and not is_new_entry:
timing_signal = "关注"
else:
timing_signal = "持有"
elif profit_pct > 0:
# 微盈
if volume_signal == "主动买盘占优":
timing_signal = "持有"
elif ws and price <= ws * 1.02:
timing_signal = "加仓"
else:
timing_signal = "持有"
else:
# 浮亏
if volume_signal == "主动卖盘占优" and ss and price <= ss * 1.03:
timing_signal = "关注"
elif volume_signal == "主动买盘占优" and sr_resist and price >= sr_resist * 0.97:
timing_signal = "关注"
elif volume_signal == "买卖均衡" and ws and price <= ws * 1.02:
timing_signal = "加仓"
else:
timing_signal = "持有"
# ----- 【v3.2新增】分类约束:弱势/深套禁止输出买入/加仓类信号 -----
if stock_category == "弱势" or is_deep_loss:
buy_signals = ["买入", "加仓", "可追"]
if any(s in timing_signal for s in buy_signals):
old_signal = timing_signal
timing_signal = "弱势持有" if stock_category == "弱势" else "深套持有"
print(f" 分类约束: {stock_category} 原信号\"{old_signal}\"\"{timing_signal}\"")
# ----- 构造 action 描述(供 cron prompt 使用) -----
action_parts = []
if profit_pct < -20:
action_parts.append("深套持有")
elif profit_pct < -10:
action_parts.append("持有观察")
elif profit_pct < 0:
action_parts.append("持有观察")
elif profit_pct < 5:
action_parts.append("盈利持有")
else:
action_parts.append("盈利良好")
if action_note:
action_parts.append(action_note)
if is_watchlist:
# 自选股(未入场):有止损参考+买入区,内部算RR需要止盈位
action_parts.append(f"目标参考{new_target}")
action_parts.append(f"止损参考{new_stop}")
action_parts.append(f"买入区{entry_low}~{entry_high}")
elif is_new_entry:
action_parts.append(f"{new_stop}")
action_parts.append(f"{new_target}")
action_parts.append(f"{entry_low}~{entry_high}")
else:
action_parts.append(f"止损{new_stop}")
action_parts.append(f"目标{new_target}")
action_parts.append(f"买入区{entry_low}~{entry_high}")
if timing_signal != "neutral":
action_parts.append(f"信号:{timing_signal}")
new_action = " | ".join(action_parts)
# 技术面快照
tech_snapshot = ""
if candle:
tech_snapshot = (f"形态:{candle.get('pattern','?')}/{candle.get('sentiment','?')} "
f"量价:{vol.get('volume_signal','?')} "
f"强撑:{ss} 弱撑:{ws} 弱压:{wr} 强压:{sr_resist}")
# 多周期快照(追加到 tech_snapshot
mtf_context = ""
if mtf_adj:
trend_align = mtf_adj.get("trend_alignment", "")
daily_mas = mtf_analysis.get("daily", {}).get("mas", {})
ma20 = daily_mas.get("ma20")
ma60 = daily_mas.get("ma60")
stop_ref = mtf_adj.get("stop_loss_reference", {})
take_ref = mtf_adj.get("take_profit_reference", {})
parts = []
if trend_align:
parts.append(trend_align)
if ma20:
parts.append(f"MA20={ma20}")
if ma60:
parts.append(f"MA60={ma60}")
if stop_ref:
parts.append(f"长撑:{stop_ref.get('source','?')}={stop_ref['level']}")
if take_ref:
parts.append(f"长压:{take_ref.get('source','?')}={take_ref['level']}")
mtf_context = " | ".join(parts)
now_str = datetime.now().strftime('%Y-%m-%d %H:%M')
return {
'stop_loss': new_stop,
'take_profit': new_target,
'entry_low': entry_low,
'entry_high': entry_high,
'action': new_action,
'status': status,
'tech_snapshot': tech_snapshot,
'timing_signal': timing_signal,
'rr_ratio': round(rr_ratio, 2),
'action_note': action_note,
'reassessed_at': now_str,
'multi_tf_context': mtf_context, # 多周期上下文
'stock_category': stock_category, # 股票分类:短炒/中短线/中长线/弱势/深套
'time_horizon': time_horizon, # 时间跨度
'position_advice': position_advice, # 仓位建议
}
def load_stock_news_sentiment(code):
"""加载小果消息面情感"""
try:
path = "/home/hmo/web-dashboard/data/xiaoguo_sentiment.json"
if not os.path.exists(path):
return {}
xg = json.load(open(path))
return xg.get("stocks", {}).get(code, {})
except Exception:
return {}
def load_fundamentals(code):
"""加载个股基本面"""
try:
path = "/home/hmo/web-dashboard/data/multi_tf_cache.json"
if not os.path.exists(path):
return {}
m = json.load(open(path))
return m.get(code, {}).get("fundamentals", {}) or {}
except Exception:
return {}
def _get_portfolio_risk_state():
"""读取 portfolio 组合风险状态(2026-06-23 引擎协调)"""
try:
# 数据一致性检查:警告多副本(2026-06-23 bugfix
_check_portfolio_consistency()
p = json.load(open('/home/hmo/web-dashboard/data/portfolio.json'))
pos_pct = p.get('position_pct', 0)
cash = p.get('cash', 0)
holdings = p.get('holdings', [])
weak_cnt = sum(1 for h in holdings if h.get('change_pct', 0) < -15)
total = len(holdings) or 1
weak_ratio = weak_cnt / total
return {
'position_pct': pos_pct,
'cash': cash,
'is_high_position': pos_pct > 80,
'is_very_high_position': pos_pct > 90,
'is_high_weak': weak_ratio > 0.35,
'weak_ratio': round(weak_ratio * 100),
'total_holdings': total,
}
except:
return {}
def _is_buy_signal(signal):
"""判断信号是否为买入/持有类(用于防洗盘)"""
if not signal:
return False
buy_keywords = ['买入', '持有', '加仓', '关注']
for kw in buy_keywords:
if kw in signal:
return True
return False
def _check_portfolio_consistency():
"""数据一致性检查:如果存在多份 portfolio.json 则报警(2026-06-23 bugfix"""
main = '/home/hmo/web-dashboard/data/portfolio.json'
main_cash = None
try:
import json
main_cash = json.load(open(main)).get('cash')
except Exception:
return
for path in [
'/home/hmo/data/portfolio.json',
'/home/hmo/projects/MoFin/data/portfolio.json',
'/home/hmo/web-dashboard.bak/data/portfolio.json',
]:
if os.path.exists(path):
try:
other = json.load(open(path))
if other.get('cash') != main_cash:
print(f"⚠️ 数据一致性: {os.path.realpath(path)} cash={other.get('cash')} ≠ 主文件 cash={main_cash} (需清理)", file=sys.stderr)
except Exception:
pass
def _check_contradiction(code, today_only=True):
"""反馈循环核——检查本股是否有刚卖出的记录
返回 dict or None:
- sold_reason: 'portfolio_trim'|'stop_loss'
- sold_at: 卖出日期
- days_ago: 卖出距今交易日数
- is_today: 是否今日卖出
- tag: 追加到信号的标注
"""
try:
from datetime import datetime, date
dec = json.load(open('/home/hmo/web-dashboard/data/decisions.json'))
for e in dec.get('decisions', []):
if e.get('code') != code:
continue
sold_at = e.get('sold_at', '')
if not sold_at:
return None
try:
sd = datetime.strptime(sold_at, '%Y-%m-%d').date()
td = date.today()
days = (td - sd).days
except:
return None
reason = e.get('sold_reason', 'portfolio_trim')
if reason == 'stop_loss':
tag = '止损离场(逻辑破坏,短期不关注)'
else:
tag = '组合减仓后关注(已清仓,等回踩确认)'
return {
'sold_reason': reason,
'sold_at': sold_at,
'days_ago': days,
'is_today': days == 0,
'tag': tag,
}
except:
return None
return None
def _get_sell_priority_list():
"""减仓优先级排序:深套>亏损>微盈>盈利(2026-06-23 反馈循环)
返回 [(code, name, change_pct, position_pct, priority_label), ...]
按卖出的优先顺序排列(最先应该卖的在最前)
"""
try:
p = json.load(open('/home/hmo/web-dashboard/data/portfolio.json'))
holdings = p.get('holdings', [])
ranked = []
for h in holdings:
chg = h.get('change_pct', 0)
pos = h.get('position_pct', 0)
if chg < -30:
label = '深套(>30%),优先减'
rank = 0
elif chg < -20:
label = '深套(>20%),优先减'
rank = 1
elif chg < -10:
label = '亏损,建议减'
rank = 2
elif chg < 0:
label = '微亏,可减'
rank = 3
elif chg < 10:
label = '微盈,持有'
rank = 4
else:
label = '盈利,最后减'
rank = 5
ranked.append((rank, h['code'], h.get('name',''), chg, pos, label))
ranked.sort(key=lambda x: (x[0], -x[4])) # 优先 rank, 其次仓位大优先
return [{'code':c,'name':n,'change_pct':chg,'position_pct':pos,'label':l}
for r,c,n,chg,pos,l in ranked]
except:
return []
def enrich_timing_signal(base_signal, macro_desc="", sector_note="",
profit_pct=0, stock_category="", is_new_entry=False,
fundamentals=None, news_sentiment=None,
timing_signal_override=None,
portfolio_context=None,
rr_ratio=0): # 2026-06-24 新参:盈亏比约束
"""多因子合成timing_signal——大盘+行业+基本面+技术+组合风险+盈亏比
返回 (enriched_signal, factors_list)
- enriched_signal: 可读的多因子信号描述
- factors_list: 各因子的摘要列表(用于后续显示)
"""
# 如果已手动设定,尊重手动
if timing_signal_override and timing_signal_override != "neutral":
return timing_signal_override, [timing_signal_override]
factors = []
# 1. 大盘因子
if "偏强" in macro_desc or "大涨" in macro_desc or "bullish" in macro_desc.lower():
macro_txt = "大盘偏强"
factors.append(macro_txt)
elif "偏弱" in macro_desc or "大跌" in macro_desc or "bearish" in macro_desc.lower():
macro_txt = "大盘偏弱"
factors.append(macro_txt)
elif macro_desc and macro_desc != "宏观未加载":
factors.append("大盘中性")
# 2. 行业因子
if sector_note:
# 把"行业X大跌3%+"简化为"行业偏弱""行业X大涨3%+"简化为"行业偏强"
if "大跌" in sector_note or "下跌" in sector_note:
factors.append("行业偏弱")
elif "大涨" in sector_note:
factors.append("行业偏强")
elif "上涨" in sector_note:
factors.append("行业偏强")
else:
factors.append("行业中性")
# 3. 基本面因子
if fundamentals:
pe = fundamentals.get("pe", 0)
eps = fundamentals.get("eps", 0)
profit_growth = fundamentals.get("profit_growth", fundamentals.get("yoy_profit", ""))
revenue_growth = fundamentals.get("revenue_growth", fundamentals.get("yoy_revenue", ""))
mcap = fundamentals.get("mcap_total", 0)
pe = pe or 0
eps = eps or 0
profit_growth_str = str(profit_growth or "")
revenue_growth_str = str(revenue_growth or "")
# 净利增长
for val in [profit_growth_str, revenue_growth_str]:
try:
v = float(val.replace("%", "").replace("+", ""))
if v > 50:
factors.append("净利增50%+")
break
elif v > 20:
factors.append(f"净利增{int(v)}%")
break
elif v < -20:
factors.append("净利降20%+")
break
except (ValueError, AttributeError):
continue
# PE估值
if 0 < pe < 15:
factors.append("低估值")
elif pe > 100 or pe < 0:
factors.append("高估值")
# 市值
if mcap and mcap > 5000:
factors.append("蓝筹")
# 4. 消息面因子(小果情感)
if news_sentiment:
ns = news_sentiment.get("sentiment", "")
nc = news_sentiment.get("confidence", 0)
if ns == "positive" and nc >= 0.7:
kws = news_sentiment.get("keywords", [])
kw_str = f"{'/'.join(kws[:3])}" if kws else ""
factors.append(f"消息偏多{kw_str}")
elif ns == "negative" and nc >= 0.7:
kws = news_sentiment.get("keywords", [])
kw_str = f"{'/'.join(kws[:3])}" if kws else ""
factors.append(f"消息偏空{kw_str}")
# 5. 技术面(基础信号)
if base_signal and base_signal != "neutral":
factors.append(base_signal)
# 5.5 组合风险因子(2026-06-23 双引擎协调)
if portfolio_context and not is_new_entry:
if portfolio_context.get('is_very_high_position'):
factors.append("组合仓位极重(>90%)")
elif portfolio_context.get('is_high_position'):
factors.append("组合仓位偏重(>80%)")
if portfolio_context.get('is_high_weak'):
factors.append(f"弱势占{portfolio_context.get('weak_ratio')}%")
elif portfolio_context and is_new_entry:
# 新买入推荐:注明组合上下文
if portfolio_context.get('is_high_position'):
factors.append(f"{portfolio_context.get('position_pct')}%现金有限")
elif portfolio_context.get('is_high_weak'):
factors.append("组合风险信号")
# 5.7 盈亏比因子(2026-06-24 新增——RR<1.5降级买入信号)
if rr_ratio > 0:
if rr_ratio < 1.5:
factors.append(f"RR{rr_ratio}过低")
elif rr_ratio >= 3:
factors.append(f"RR{rr_ratio}")
# 1.5~3之间:中性,不特别标注
# 如果没有足够因素,返回信号不充分
if not factors:
return "信号不充分", []
# 信号只应包含明确的买卖方向,不能从行业/大盘等上下文因子拼凑
# base_signal 存在且非 neutral → 用 base_signal
# 否则 → 信号不充分(不拿 factors[-1] 当信号)
if base_signal and base_signal != "neutral":
clean_signal = base_signal
else:
# 从 factors 中找第一个有效的操作方向信号
valid_direction = {"买入", "加仓", "观望", "持有", "关注", "信号不充分"}
signal_found = ""
for f in reversed(factors):
if f in valid_direction:
signal_found = f
break
clean_signal = signal_found if signal_found else "信号不充分"
# 6. RR约束降级(2026-06-24 新增)
# 买入/加仓信号但RR<1.5 → 降级为"信号不充分"
buy_signals = {"买入", "加仓"}
if clean_signal in buy_signals and 0 < rr_ratio < 1.5:
clean_signal = "信号不充分"
factors.append("RR过低降级")
return clean_signal, factors
def reassess_with_context(code, name, price, cost, shares, current_action,
volume_signal="", sentiment="neutral", is_watchlist=False):
"""reassess_strategy + 多因子信号合成(大盘+行业+技术)
为 per_stock_reassess 等单只场景提供一站式多因子分析
"""
result = reassess_strategy(
code, name, price, cost, shares,
current_action, volume_signal, sentiment, is_watchlist
)
if not result:
return result
# 加载宏观+行业+消息+基本面上下文
try:
macro_bias, macro_desc = load_macro_context()
market_ctx = load_market_context()
stock_sector_map = load_stock_sector_map()
sector_adj = compute_sector_adjustment(code, market_ctx, stock_sector_map)
sector_note = sector_adj.get("note", "")
news_sentiment = load_stock_news_sentiment(code)
fund = load_fundamentals(code)
except Exception:
macro_desc = ""
sector_note = ""
news_sentiment = {}
fund = {}
enriched, factors = enrich_timing_signal(
base_signal=result.get("timing_signal", ""),
macro_desc=macro_desc,
sector_note=sector_note,
profit_pct=(price - cost) / cost * 100 if cost else 0,
stock_category=result.get("stock_category", ""),
is_new_entry=is_watchlist,
fundamentals=fund,
news_sentiment=news_sentiment,
portfolio_context=_get_portfolio_risk_state(),
rr_ratio=result.get("rr_ratio", 0),
)
result["timing_signal"] = enriched
result["signal_factors"] = factors
# 6. 防洗盘:信号不要一天一翻(2026-06-23)
# 如果旧信号是买入/持有类,新信号是谨慎/等待类,但中期趋势未破→维持旧信号
try:
dec = json.load(open('/home/hmo/web-dashboard/data/decisions.json'))
for e in dec.get('decisions', []):
if e.get('code') == code:
old_signal = e.get('timing_signal', '')
if old_signal and _is_buy_signal(old_signal) and not _is_buy_signal(enriched):
# 中等趋势检查:MA5 > MA20 + 多周期看多
mtf = result.get('multi_tf_context', '')
if '看多' in mtf or '多头' in mtf:
try:
closes = [float(k.split()[2]) for k in mtf.split('|') if 'MA5' in k]
except:
closes = []
has_uptrend = 'MA5' in mtf and 'MA20' in mtf
if has_uptrend:
print(f" 防洗盘: {old_signal}→保持旧信号(中期趋势完整)")
result["timing_signal"] = f"{old_signal}(正常回调价稳)"
sf = result.get("signal_factors") or []
if "正常回调价稳" not in sf:
result["signal_factors"] = sf + ["正常回调价稳"]
break
except Exception as e:
print(f" 防洗盘跳过: {e}")
# 7. 反馈循环核:检查本股是否有刚卖出的记录(2026-06-23)
contradiction = _check_contradiction(code)
if contradiction and contradiction.get('is_today'):
# 今日刚卖出 → 不屏蔽信号,但必须自标注矛盾
print(f" 反馈循环: {contradiction.get('tag')} (sold_at={contradiction.get('sold_at')})")
if _is_buy_signal(result.get('timing_signal', '')):
result['action_note'] = contradiction['tag']
# 在 timing_signal 中追加反馈标注,供报告层可见
curr_signal = result.get('timing_signal', '')
if '⚠️' not in curr_signal:
result['timing_signal'] = f"⚠️{contradiction['tag']}|{curr_signal}"
elif contradiction:
# 非今日卖出但近期卖出 → 标注已清仓
print(f" 近期清仓: sold_at={contradiction.get('sold_at')} ({contradiction.get('days_ago')}日前)")
if _is_buy_signal(result.get('timing_signal', '')):
curr_signal = result.get('timing_signal', '')
if '已清仓' not in curr_signal:
result['timing_signal'] = f"已清仓,{curr_signal}"
# 重建 action 文本(同步多因子信号)
try:
if new_action_needs_refresh(result, {"source": "auto"}, price):
_refresh_action_text(result, price, name)
except Exception:
pass
return result
def new_action_needs_refresh(result, old_entry, price):
"""判断宏观/行业调整后是否需要刷新action文本"""
# 自选股和手动策略不做调整,不需要刷新
if old_entry.get("source") == "manual":
return False
return True
def _refresh_action_text(result, price, name):
"""根据调整后的止损/止盈重建action文本"""
sl = result.get("stop_loss", 0)
tp = result.get("take_profit", 0)
el = result.get("entry_low", 0)
eh = result.get("entry_high", 0)
ts = result.get("timing_signal", "")
an = result.get("action_note", "")
old_action = result.get("action", "")
# 保持原action的前缀(持有状态部分不变)
# action格式一般是: "状态 | 止损X | 目标Y | 买入区X~Y | 信号:Z"
parts = old_action.split(" | ")
new_parts = []
for p in parts:
p = p.strip()
# 替换止损数字
if p.startswith("止损") or p.startswith("止损参考"):
if sl:
p = f"止损{sl}" if "止损参考" not in old_action.split(" | ")[0] else f"止损参考{sl}"
# 替换目标/止盈数字
if p.startswith("目标") or p.startswith("止盈"):
if tp:
p = f"目标{tp}"
# 替换买入区数字
if "买入区" in p and "~" in p:
if el and eh:
p = f"买入区{el}~{eh}"
new_parts.append(p)
result["action"] = " | ".join(new_parts)
def check_sector_alerts(market_ctx, stock_sector_map, holdings, wl):
"""行业轮动主动预警:检测板块崩盘级别信号→查持仓→输出预警
返回 list of alerts: [{code, name, sector, chg, action}]
"""
alerts = []
if not market_ctx:
return alerts
sector_perf = market_ctx.get("sector_perf", {})
# 找出所有跌幅>3%的行业
crashing_sectors = {name: data for name, data in sector_perf.items()
if data.get("change", 0) <= -3}
if not crashing_sectors:
return alerts
# 构建 code→持仓信息 的映射
holding_map = {}
for h in holdings:
c = h.get("code", "")
if c:
holding_map[c] = {"name": h.get("name", c), "type": "持仓"}
for s in wl.get("stocks", []):
c = s.get("code", "")
if c and c not in holding_map:
holding_map[c] = {"name": s.get("name", c), "type": "自选"}
# 对每个暴跌行业,查持仓中是否有股票属于该行业
for sec_name, sec_data in sorted(crashing_sectors.items(),
key=lambda x: x[1].get("change", 0)):
chg = sec_data.get("change", 0)
for code, sectors in stock_sector_map.items():
if code in holding_map and sec_name in sectors:
info = holding_map[code]
alerts.append({
"code": code,
"name": info["name"],
"sector": sec_name,
"sector_change": chg,
"type": info["type"],
"action": f"行业{sec_name}{chg:+.1f}%{info['type']}需关注",
})
alerts.sort(key=lambda a: a["sector_change"])
return alerts
def regenerate_all(stdout=True):
"""全量重评所有持仓+自选策略"""
# 优先从 SQLite 读取
try:
from mofin_db import get_conn, query_holdings, query_watchlist
conn = get_conn()
holdings = query_holdings(conn)
wl_stocks = query_watchlist(conn)
conn.close()
pf = {"holdings": holdings}
wl = {"stocks": wl_stocks}
except Exception:
pf = safe_json_load(PORTFOLIO_PATH, {})
wl = safe_json_load(WATCHLIST_PATH, {})
all_stocks = {}
for item in pf.get("holdings", []):
code = item.get("code", "")
if code:
all_stocks[code] = {"source": "portfolio", "data": item}
for item in wl.get("stocks", []):
code = item.get("code", "")
if code and code not in all_stocks:
all_stocks[code] = {"source": "watchlist", "data": item}
total = len(all_stocks)
ok = 0
errors = 0
results = []
decisions = []
# 加载现有 decisions.json 以便追踪变更
decisions_path = "/home/hmo/web-dashboard/data/decisions.json"
try:
existing_decisions = {d["code"]: d for d in json.load(open(decisions_path)).get("decisions", []) if d.get("code")}
except:
existing_decisions = {}
# 加载宏观上下文(影响策略参数调整)
macro_bias, macro_desc = load_macro_context()
if stdout:
print(f" 宏观参考: {macro_desc} (bias={macro_bias})")
# 加载市场上下文 — 行业板块表现 + 大盘宽度(策略参数调整用)
market_ctx = load_market_context()
stock_sector_map = load_stock_sector_map()
market_breadth = market_ctx.get("breadth", 50)
market_mood = market_ctx.get("mood", "neutral")
if stdout:
sectors_found = sum(1 for c in all_stocks if stock_sector_map.get(c))
print(f" 市场参考: {market_mood} 上涨比{market_breadth}% 已匹配{sectors_found}/{total}只个股行业")
# 批量预取所有价格(一次API调用 vs 之前N次)
prices_map = batch_fetch_prices(list(all_stocks.keys()))
if stdout:
print(f" 批量获取价格: {len(prices_map)}/{total} 成功")
for code, info in sorted(all_stocks.items()):
stock = info["data"]
name = stock.get("name", code)
cost = stock.get("cost", 0) or 0
shares = stock.get("shares", 0) or 0
source = info["source"]
q = prices_map.get(code)
if not q or not q.get("price"):
results.append({"code": code, "name": name, "error": "腾讯API无数据"})
errors += 1
if stdout:
print(f"{name}({code}): 腾讯API无数据")
continue
price = q["price"]
profit_pct = (price - cost) / cost * 100 if cost else 0
current_action = stock.get("analysis", {}).get("action", "")
close_yest = q.get("close", 0)
sentiment = "neutral"
if close_yest and price > close_yest * 1.02:
sentiment = "bullish"
elif close_yest and price < close_yest * 0.98:
sentiment = "bearish"
try:
is_wl = (source == "watchlist")
result = reassess_strategy(
code, name, price, cost, shares,
current_action, volume_signal="中性", sentiment=sentiment,
is_watchlist=(source == "watchlist"),
)
# --- Manual param preservation: 用户手动策略永不覆盖 ---
old_entry = existing_decisions.get(code, {})
if old_entry.get("source") == "manual":
# 仅覆盖策略参数,技术分析/信号/价格照常保留
for key in ["entry_low", "entry_high", "stop_loss", "take_profit"]:
if key in old_entry and old_entry[key] is not None:
result[key] = old_entry[key]
# 重算盈亏比(基于手动参数)
manual_stop = result.get("stop_loss", 0) or 0
manual_target = result.get("take_profit", 0) or 0
risk = max(price - manual_stop, price * 0.01) if manual_stop > 0 else price * 0.01
reward = max(manual_target - price, 0) if manual_target > 0 else 0
result["rr_ratio"] = round(reward / risk, 2) if risk > 0 else 0
# 重建 action 文本(引用手动参数,不引用自动计算的)
profit_pct = (price - cost) / cost * 100 if cost else 0
manual_action_parts = []
if profit_pct < -20:
manual_action_parts.append("深套持有")
elif profit_pct < -10:
manual_action_parts.append("持有观察")
elif profit_pct < 0:
manual_action_parts.append("持有观察")
elif profit_pct < 5:
manual_action_parts.append("盈利持有")
else:
manual_action_parts.append("盈利良好")
if result.get("action_note"):
manual_action_parts.append(result["action_note"])
if is_wl:
if manual_stop > 0:
manual_action_parts.append(f"止损参考{manual_stop}")
manual_action_parts.append(f"买入区{result['entry_low']}~{result['entry_high']}")
else:
if manual_stop > 0:
manual_action_parts.append(f"止损{manual_stop}")
if manual_target > 0:
manual_action_parts.append(f"目标{manual_target}")
manual_action_parts.append(f"买入区{result['entry_low']}~{result['entry_high']}")
ts = result.get("timing_signal", "")
if ts and ts != "neutral":
manual_action_parts.append(f"信号:{ts}")
result["action"] = " | ".join(manual_action_parts)
result["status"] = "manual" # 标记为手动管理,变更追踪不受影响
if stdout:
print(f" [手动保留] {name}({code}) 策略参数未覆盖")
# 宏观偏差调整:收盘后重评时根据宏观方向微调止损/止盈
# 自选股不做止盈宏观调整(无持仓)
# 手动策略不做宏观偏差调整(尊重用户设定)
if macro_bias != 1.0 and not is_wl and old_entry.get("source") != "manual":
old_stop = result.get("stop_loss", 0)
old_target = result.get("take_profit", 0)
if macro_bias < 1.0 and old_stop > 0: # 宏观偏弱 → 收紧止损
# 止损上移(但保留最小3%间距)
adjusted_stop = round(old_stop * (1 + (1 - macro_bias) * 0.3), 2)
min_stop = round(price * 0.97, 2)
result["stop_loss"] = min(adjusted_stop, min_stop)
if old_target > 0:
result["take_profit"] = round(old_target * (1 - (1 - macro_bias) * 0.2), 2)
elif macro_bias > 1.0 and old_target > 0: # 宏观偏强 → 止盈上调让利润跑
result["take_profit"] = round(old_target * (1 + (macro_bias - 1) * 0.3), 2)
# 行业偏差调整:根据个股所在行业的市场表现微调止损/止盈
# 手动策略不做行业调整(尊重用户设定)
sector_adj = compute_sector_adjustment(code, market_ctx, stock_sector_map)
sector_note = sector_adj.get("note", "")
if sector_note and old_entry.get("source") != "manual":
old_stop = result.get("stop_loss", 0)
old_target = result.get("take_profit", 0)
stop_bias = sector_adj.get("stop_bias", 1.0)
target_bias = sector_adj.get("target_bias", 1.0)
if stop_bias != 1.0 and old_stop > 0:
# 行业偏差调整(在宏观调整之后叠加)
adjusted = round(old_stop * stop_bias, 2)
# 保留最小3%间距
min_stop = round(price * 0.97, 2)
result["stop_loss"] = min(adjusted, min_stop)
if target_bias != 1.0 and old_target > 0 and not is_wl:
result["take_profit"] = round(old_target * target_bias, 2)
# 加载消息面+基本面(逐个股)
news_sentiment = load_stock_news_sentiment(code)
fund = load_fundamentals(code)
# 多因子合成 timing_signal:大盘+行业+消息+基本面+技术
if old_entry.get("source") != "manual":
enriched, _ = enrich_timing_signal(
base_signal=result.get("timing_signal", ""),
macro_desc=macro_desc,
sector_note=sector_note,
profit_pct=profit_pct,
stock_category=result.get("stock_category", ""),
is_new_entry=(source == "watchlist"),
fundamentals=fund,
news_sentiment=news_sentiment,
rr_ratio=result.get("rr_ratio", 0),
)
result["timing_signal"] = enriched
# 在宏观/行业/多因子调整后重建 action 文本(同步调整后的止损/止盈数字)
if new_action_needs_refresh(result, old_entry, price):
_refresh_action_text(result, price, name)
extra = {
"rr_ratio": result.get("rr_ratio"),
"action_note": result.get("action_note", ""),
"timing_signal": result.get("timing_signal", ""),
}
analysis = {
"stop_loss": result["stop_loss"],
"take_profit": result["take_profit"],
"entry_low": result["entry_low"],
"entry_high": result["entry_high"],
"action": result["action"],
"tech_snapshot": result.get("tech_snapshot", ""),
"reassessed_at": result["reassessed_at"],
"status": result["status"],
**extra,
}
stock["analysis"] = analysis
# 同步 top-level 字段 → zone_breach/price_monitor 依赖这些字段
# 2026-06-24 bugfix: analysis 子对象有但顶层没有,导致新持仓的止损检测盲区)
stock["stop_loss"] = result.get("stop_loss", 0)
stock["take_profit"] = result.get("take_profit", 0)
stock["entry_low"] = result.get("entry_low", 0)
stock["entry_high"] = result.get("entry_high", 0)
# 同步 trigger 字段 -> price_monitor 依赖
sl = result.get("stop_loss", 0)
tp = result.get("take_profit", 0)
el = result.get("entry_low", 0)
eh = result.get("entry_high", 0)
trig = {}
if sl and float(sl) > 0:
trig["stop_loss"] = float(sl)
if el and eh and float(el) > 0 and float(eh) > 0:
trig["entry_zone"] = f"{float(el)}~{float(eh)}"
if tp and float(tp) > 0:
trig["take_profit_zone"] = f"0~{float(tp)}"
stock["trigger"] = trig
results.append({
"code": code, "name": name,
"price": price, "cost": cost,
"action": result["action"],
"stop_loss": result["stop_loss"],
"take_profit": result["take_profit"],
"rr_ratio": result["rr_ratio"],
})
ok += 1
if stdout:
rr_str = f" RR={result['rr_ratio']}" if "rr_ratio" in result else ""
print(f"{name}({code}) {price} {result['action']}{rr_str}")
# 记录所有股票的决策日志(含变更追踪)
status_display = result.get("status", "active")
# 构建行业上下文
sector_ctx_str = ""
sec_name = sector_adj.get("sector_name", "")
sec_chg = sector_adj.get("sector_change", 0)
if sec_name:
sector_ctx_str = f"行业{sec_name}{sec_chg:+.1f}%"
if sector_adj.get("note"):
# note 已包含大盘宽度信息
sector_ctx_str = sector_adj["note"]
elif market_breadth < 40:
# 无行业映射时至少记录大盘宽度
sector_ctx_str = f"大盘上涨比{market_breadth}%"
new_entry = {
"code": code, "name": name, "price": price, "cost": cost,
"shares": old_entry.get("shares", 0), # 保留持仓股数
"avg_price": old_entry.get("avg_price", 0), # 保留持仓均价
"action": result["action"],
"stop_loss": result.get("stop_loss"),
"entry_low": result["entry_low"],
"entry_high": result["entry_high"],
"tech_snapshot": result.get("tech_snapshot", ""),
"timing_signal": result.get("timing_signal", ""),
"rr_ratio": result.get("rr_ratio", 0),
"status": status_display,
"note": result.get("action_note", ""),
"timestamp": result["reassessed_at"],
"updated_at": result["reassessed_at"],
"type": "自选策略" if is_wl else "持仓策略",
"source": old_entry.get("source", "auto"), # manual/auto,继承旧标记
"sector_context": sector_ctx_str, # 市场上下文:行业表现+大盘宽度
"stock_category": result.get("stock_category", "中短线"), # 组合监测用
"position_advice": result.get("position_advice", "中等仓位"),
"time_horizon": result.get("time_horizon", "2周~3月"),
}
new_entry["trigger"] = trig
# created_at: 首次创建时设置,后续 preserve
old_entry = existing_decisions.get(code, {})
if old_entry.get("created_at"):
new_entry["created_at"] = old_entry["created_at"]
else:
new_entry["created_at"] = result["reassessed_at"]
# 自选股也写止盈位(用于RR校验),但标签用"目标参考"非"止盈"
new_entry["take_profit"] = result.get("take_profit")
# --- 变更追踪 ---
old_action = old_entry.get("action", "")
old_stop = old_entry.get("stop_loss")
old_target = old_entry.get("take_profit")
# 构建旧策略摘要和变更理由
update_reason = ""
changelog_entry = None
if old_action and old_action != result["action"]:
# 策略有变化 → 记录变更
old_summary = old_action
new_summary = result["action"]
# 判断触发原因
if abs(price - old_entry.get("price", price)) / max(price, 0.01) > 0.03:
trigger = f"价格变动({old_entry.get('price','?')}{price})"
elif result.get("timing_signal") and result["timing_signal"] != old_entry.get("timing_signal", ""):
trigger = f"技术信号变化: {result['timing_signal']}"
else:
trigger = "技术面重评"
# 格式化的变更理由(自选股只看止损,不看止盈)
diff_parts = []
if old_stop and result["stop_loss"] != old_stop:
diff_parts.append(f"止损{old_stop}{result['stop_loss']}")
if not is_wl and old_target and result.get("take_profit") and result["take_profit"] != old_target:
diff_parts.append(f"止盈{old_target}{result['take_profit']}")
if diff_parts:
update_reason = f"{trigger}: {', '.join(diff_parts)} | {result.get('tech_snapshot','')[:60]}"
else:
update_reason = f"{trigger}: 策略文字调整"
changelog_entry = {
"date": result["reassessed_at"],
"old_action": old_action,
"new_action": result["action"],
"reason": update_reason,
"trigger": trigger,
}
new_entry["updated_reason"] = update_reason
elif not old_action:
# 首次创建策略
update_reason = f"初始策略创建 | {result.get('tech_snapshot','')[:60]}"
changelog_entry = {
"date": result["reassessed_at"],
"old_action": "",
"new_action": result["action"],
"reason": update_reason,
"trigger": "初始创建",
}
# 合并changelog
old_changelog = old_entry.get("changelog", []) if old_entry else []
if changelog_entry:
new_entry["changelog"] = old_changelog + [changelog_entry]
else:
new_entry["changelog"] = old_changelog
# 保留执行记录
if old_entry and old_entry.get("execution"):
new_entry["execution"] = old_entry["execution"]
elif stock.get("analysis", {}).get("status") == "executing":
new_entry["execution"] = {
"status": "executing",
"entry_price": cost if cost else 0,
"shares": shares,
"notes": "",
}
# --- 自动标记 current_recommend ---
# 只在真正执行中的持仓才自动推荐:execution.status 为 executing 或 partial_exit
exec_status = old_entry.get("execution", {}).get("status", "") if old_entry else ""
is_active = exec_status in ("executing", "partial_exit")
profit_pct = (price - cost) / cost * 100 if cost else 0
is_deep_loss_stock = profit_pct < -20
rr = result.get("rr_ratio", 0)
ts = result.get("timing_signal", "")
note = result.get("action_note", "")
# 计算是否在/接近买入区
entry_low_val = result.get("entry_low", 0)
entry_high_val = result.get("entry_high", 0)
in_buy_zone = (entry_low_val > 0 and entry_high_val > 0 and
entry_low_val <= price <= entry_high_val)
near_buy_zone_low = (entry_low_val > 0 and
price >= entry_low_val * 0.98 and
price <= entry_high_val)
# 推荐条件:必须是执行中的持仓 + 基本面条件达标
is_recommendable = (
is_active
and not is_deep_loss_stock
and rr >= 1.5
and ts != "neutral"
and "不建议" not in note
)
if is_recommendable:
new_entry["tag"] = "current_recommend"
else:
# 不清除 active_manual(用户手动标记),只清除自动推荐的
old_tag = old_entry.get("tag", "") if old_entry else ""
if old_tag != "active_manual":
new_entry.pop("tag", None)
decisions.append(new_entry)
except Exception as e:
results.append({"code": code, "name": name, "error": str(e)})
errors += 1
if stdout:
print(f"{name}({code}): {e}")
# 写回数据文件 — 保留现有字段(现金、总资产等)不丢
try:
existing_pf = json.load(open(PORTFOLIO_PATH))
except Exception:
existing_pf = {}
# 保留 price/change_pct — price_monitor 维护的实时价,regenerate_all 不应清除
_existing_holdings_map = {}
for _h in existing_pf.get('holdings', []):
if _h.get('code'):
_existing_holdings_map[_h['code']] = _h
_new_holdings = pf.get("holdings", [])
for _h in _new_holdings:
_code = _h.get('code')
if _code and _code in _existing_holdings_map:
_old = _existing_holdings_map[_code]
_h['price'] = _old.get('price', 0)
_h['change_pct'] = _old.get('change_pct', 0)
existing_pf["holdings"] = _new_holdings
existing_pf["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M')
json.dump(existing_pf, open(PORTFOLIO_PATH, "w"), ensure_ascii=False, indent=2)
json.dump(wl, open(WATCHLIST_PATH, "w"), ensure_ascii=False, indent=2)
# 写 decisions.json
decisions_path = "/home/hmo/web-dashboard/data/decisions.json"
decisions_data = {
"decisions": decisions, # 全部保留
"total": len(decisions),
"regenerated_at": datetime.now().strftime('%Y-%m-%d %H:%M'),
}
json.dump(decisions_data, open(decisions_path, "w"), ensure_ascii=False, indent=2)
# 记录策略→提示词版本关联
if HAS_PROMPT_TRACKING:
try:
for d in decisions:
if d.get("code") and d.get("action"):
record_strategy_generation(
d["code"], d.get("name", ""), d.get("action", "")
)
except Exception as e:
if stdout:
print(f" ⚠️ 提示词版本追踪失败: {e}", file=sys.stderr)
# 刷新多周期缓存到磁盘
try:
import multi_timeframe as _mtf
_mtf.flush_mtf_cache()
except Exception:
pass
summary = {"total": total, "ok": ok, "errors": errors}
if stdout:
print(f"\n✅ 全量重评完成: {ok}/{total}成功, {errors}错误")
return summary
if __name__ == "__main__":
regenerate_all()