#!/usr/bin/env python3 """策略生命周期管理系统 — 技术面驱动版本 v2 核心原则: 1. 止损放在合理的技术位,不拍数字 2. 新买入推荐:止损=弱支撑(约3%跌幅),止盈=强压力,盈亏比≥2:1 3. 已持仓:止损=强支撑(约5-8%跌幅),目标=强压力 4. 买入区间:弱支撑~弱压力之间 5. 买入时机:量价齐跌不买,缩量至支撑买,量价齐升追买 """ import json import urllib.request import os import sys import re from datetime import datetime import technical_analysis as ta import multi_timeframe as mtf from mo_data import read_portfolio, read_decisions, read_watchlist def is_hk_stock(code): """判断是否港股(港股代码5位,A股6位带前导零)""" return len(str(code)) <= 5 def calc_atr(code, period=14): """从腾讯API K线数据计算ATR(period),返回ATR值或None""" try: url = f"http://ifzq.gtimg.cn/appstock/app/fqkline/get?param=hk{code},day,,,60,qfq" req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) resp = urllib.request.urlopen(req, timeout=5).read().decode('utf-8') data = json.loads(resp) bars = data.get('data', {}).get(f'hk{code}', {}).get('day', []) if len(bars) < period + 1: return None trs = [] for i in range(1, min(len(bars), period + 1)): try: high = float(bars[i][2]) low = float(bars[i][3]) prev_close = float(bars[i-1][4]) if len(bars[i-1]) > 4 else float(bars[i-1][3]) tr = max(high - low, abs(high - prev_close), abs(low - prev_close)) trs.append(tr) except (ValueError, IndexError): continue if not trs: return None return round(sum(trs) / len(trs), 2) except Exception: return None # 提示词版本追踪 try: from prompt_manager.tracking import record_strategy_generation HAS_PROMPT_TRACKING = True except ImportError: HAS_PROMPT_TRACKING = False PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" WATCHLIST_PATH = "/home/hmo/web-dashboard/data/watchlist.json" def safe_json_load(path, default=None): """安全加载 JSON,遇到坏数据自动修复""" if not os.path.exists(path): return default if default is not None else {} try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except json.JSONDecodeError: # 尝试修复:替换字符串内未转义的换行符,去多余括号 with open(path, "r", encoding="utf-8") as f: raw = f.read() fixed = raw # 修复1: 字符串内未转义的换行 -> \\n result = [] in_str = False for ch in fixed: if ch == '"': in_str = not in_str result.append(ch) elif in_str and ch in '\n\r': result.append('\\n') else: result.append(ch) fixed = ''.join(result) # 修复2: 去掉多余的尾部括号 fixed = fixed.rstrip('}') # 补回正确的闭合 if not fixed.endswith('}'): fixed += '}' try: return json.loads(fixed) except json.JSONDecodeError as e: print(f"[WARN] watchlist.json 自动修复失败: {e}", file=sys.stderr) return default if default is not None else {} KNOWLEDGE_LOG = "/home/hmo/Obsidian/knowledge/finance/analyst-knowledge-log.md" MACRO_CONTEXT_PATH = "/home/hmo/web-dashboard/data/macro_context.json" MARKET_CONTEXT_PATH = "/home/hmo/web-dashboard/data/market.json" STOCK_SECTOR_MAP_PATH = "/home/hmo/web-dashboard/data/stock_sector_map.json" def load_stock_sector_map(): """读取个股归属行业映射 stock_sector_map.json 格式: {code: [sector1, sector2, ...]} 跳过 _note, _created_at 等元数据键。 """ # 优先从 SQLite 读取 try: from mofin_db import get_conn, query_sector_stocks conn = get_conn() # 从 stock_sectors 表反向构建 code→[sectors] 映射 rows = conn.execute("SELECT code, sector_name FROM stock_sectors ORDER BY code").fetchall() conn.close() code_to_sectors = {} for code, sector in rows: if code not in code_to_sectors: code_to_sectors[code] = [] code_to_sectors[code].append(sector) return code_to_sectors except Exception: pass try: with open(STOCK_SECTOR_MAP_PATH) as f: data = json.load(f) code_to_sectors = {} for key, value in data.items(): if key.startswith("_"): continue if isinstance(value, list): code_to_sectors[key] = value return code_to_sectors except Exception: return {} def load_market_context(): """读取市场上下文,优先 SQLite,回退 market.json""" # 优先从 SQLite 读取 try: from mofin_db import get_conn, query_latest_market conn = get_conn() market = query_latest_market(conn) conn.close() if market and market.get("sectors"): sector_perf = {} for s in market["sectors"]: name = s.get("name", "") if name: sector_perf[name] = { "change": s.get("change_pct", 0), "up_count": s.get("up_count", 0), "down_count": s.get("down_count", 0), "net_inflow": s.get("net_inflow", 0), "lead_stock": s.get("lead_stock", ""), "lead_stock_change": s.get("lead_stock_change", 0), } return { "sector_perf": sector_perf, "breadth": market.get("up_ratio", 50), "mood": market.get("mood", "neutral"), "top_gainers": {g["name"]: g["change_pct"] for g in market.get("top_gainers", [])}, "top_losers": {g["name"]: g["change_pct"] for g in market.get("top_losers", [])}, "total_sectors": len(market["sectors"]), "market_timestamp": market.get("timestamp", ""), } except Exception: pass try: with open(MARKET_CONTEXT_PATH) as f: market = json.load(f) sectors = market.get("sectors", []) sector_perf = {} for s in sectors: name = s.get("name", "") if name: sector_perf[name] = { "change": s.get("change", 0), "up_count": s.get("up_count", 0), "down_count": s.get("down_count", 0), "net_inflow": s.get("net_inflow", 0), "lead_stock": s.get("lead_stock", ""), "lead_stock_change": s.get("lead_stock_change", 0), } top_gainers = {s.get("name", ""): s.get("change", 0) for s in market.get("top_gainers", [])} top_losers = {s.get("name", ""): s.get("change", 0) for s in market.get("top_losers", [])} return { "sector_perf": sector_perf, "breadth": market.get("up_ratio", 50), "mood": market.get("mood", "neutral"), "top_gainers": top_gainers, "top_losers": top_losers, "total_sectors": market.get("total_sectors", 0), "market_timestamp": market.get("timestamp", ""), } except Exception: return { "sector_perf": {}, "breadth": 50, "mood": "neutral", "top_gainers": {}, "top_losers": {}, "total_sectors": 0, "market_timestamp": "", } def compute_sector_adjustment(code, market_ctx, stock_sector_map): """根据个股所属行业的市场表现+小果情感,返回调整系数 返回 dict: stop_bias: 止损调整系数(<1.0收紧, >1.0放宽) target_bias: 止盈调整系数 note: 行业背景一句话 sector_name: 匹配到的行业名称 sector_change: 行业涨跌幅 """ # 默认无调整 adj = {"stop_bias": 1.0, "target_bias": 1.0, "note": "", "sector_name": "", "sector_change": 0} sectors_for_code = stock_sector_map.get(code, []) if not sectors_for_code: return adj sector_perf = market_ctx.get("sector_perf", {}) breadth = market_ctx.get("breadth", 50) # 找第一个能匹配到的行业 for sec in sectors_for_code: if sec in sector_perf: perf = sector_perf[sec] chg = perf.get("change", 0) adj["sector_name"] = sec adj["sector_change"] = chg # 行业暴跌 > 3% if chg <= -3: adj["stop_bias"] = 0.92 # 止损收紧8% adj["target_bias"] = 0.90 # 止盈下调10% adj["note"] = f"行业{sec}大跌{chg:+.1f}%,收紧止损" # 行业大跌 1~3% elif chg <= -1: adj["stop_bias"] = 0.96 adj["target_bias"] = 0.95 adj["note"] = f"行业{sec}下跌{chg:+.1f}%,适度防御" # 行业大涨 > 3% elif chg >= 3: adj["stop_bias"] = 1.05 # 止损放宽5%(给趋势空间) adj["target_bias"] = 1.03 adj["note"] = f"行业{sec}大涨{chg:+.1f}%,可适度积极" # 行业上涨 1~3% elif chg >= 1: adj["stop_bias"] = 1.02 adj["note"] = f"行业{sec}上涨{chg:+.1f}%,正常" else: adj["note"] = f"行业{sec}{chg:+.1f}%,中性" break # 尝试处理命名差异:market.json中的行业名可能多了"板块"后缀 for market_sec_name in sector_perf: if sec in market_sec_name or market_sec_name in sec: perf = sector_perf[market_sec_name] chg = perf.get("change", 0) adj["sector_name"] = market_sec_name adj["sector_change"] = chg if chg <= -3: adj["stop_bias"] = 0.92 adj["target_bias"] = 0.90 adj["note"] = f"行业{market_sec_name}大跌{chg:+.1f}%,收紧止损" elif chg <= -1: adj["stop_bias"] = 0.96 adj["target_bias"] = 0.95 adj["note"] = f"行业{market_sec_name}下跌{chg:+.1f}%,适度防御" elif chg >= 3: adj["stop_bias"] = 1.05 adj["target_bias"] = 1.03 adj["note"] = f"行业{market_sec_name}大涨{chg:+.1f}%,可适度积极" elif chg >= 1: adj["stop_bias"] = 1.02 adj["note"] = f"行业{market_sec_name}上涨{chg:+.1f}%,正常" else: adj["note"] = f"行业{market_sec_name}{chg:+.1f}%,中性" break # 如果breath<30% (大盘极弱),再加一层收紧 if breadth < 30: adj["stop_bias"] *= 0.97 # 再收紧3% breadth_note = "大盘仅{}%个股上涨".format(int(breadth)) adj["note"] = (adj["note"] + " | " + breadth_note) if adj["note"] else breadth_note elif breadth < 40: adj["stop_bias"] *= 0.99 breadth_note = "大盘偏弱({}%上涨)".format(int(breadth)) adj["note"] = (adj["note"] + " | " + breadth_note) if adj["note"] else breadth_note # 小果情感约束:利空置信度>80%时收紧止损 try: xiaoguo_path = "/home/hmo/web-dashboard/data/xiaoguo_sentiment.json" if os.path.exists(xiaoguo_path): xg = json.load(open(xiaoguo_path)) stock_sentiment = xg.get("stocks", {}).get(code, {}) if stock_sentiment: sentiment = stock_sentiment.get("sentiment", "") confidence = stock_sentiment.get("confidence", 0) summary = stock_sentiment.get("summary", "") if sentiment == "negative" and confidence > 0.8: adj["stop_bias"] = min(adj["stop_bias"], 0.95) adj["note"] += f" | 小果利空{confidence:.0%}:{summary[:30]}" except Exception: pass return adj def load_macro_context(): """读取宏观上下文,返回 (bias, desc),优先 DB,回退 JSON""" try: import sqlite3 from pathlib import Path conn = sqlite3.connect(str(Path(__file__).parent.parent / "data" / "mofin.db")) row = conn.execute( "SELECT indices, structure FROM macro_context_log " "WHERE has_valid_data=1 ORDER BY created_at DESC LIMIT 1" ).fetchone() conn.close() if row: indices = json.loads(row[0]) if row[0] else {} structure = json.loads(row[1]) if row[1] else {} overall = structure.get("overall", "neutral") desc = structure.get("description", "") else: raise ValueError("no db data") except Exception: try: with open(MACRO_CONTEXT_PATH) as f: ctx = json.load(f) overall = ctx.get("structure", {}).get("overall", "neutral") desc = ctx.get("structure", {}).get("description", "") except Exception: return 1.0, "宏观未加载" if "bearish" in overall: return 0.8, f"宏观{desc}" elif overall == "bullish": return 1.05, f"宏观{desc}" elif overall == "strong_bullish": return 1.1, f"宏观{desc}" else: return 1.0, f"宏观{desc}" def batch_fetch_prices(codes): """批量获取实时价格,合并为一次API调用(自动分批,每批15只)""" if not codes: return {} # 分批处理,避免单次请求过大导致超时 batch_size = 15 all_results = {} for batch_start in range(0, len(codes), batch_size): batch = codes[batch_start:batch_start + batch_size] symbols = [] code_map = {} for raw_code in batch: raw_code = str(raw_code).split('_')[0] if not raw_code: continue if len(raw_code) == 5 and raw_code.isdigit(): prefix = "hk" elif raw_code.startswith(("6", "5")): prefix = "sh" else: prefix = "sz" sym = f"{prefix}{raw_code}" symbols.append(sym) code_map[sym] = raw_code if not symbols: continue url = f"http://qt.gtimg.cn/q={','.join(symbols)}" max_retries = 2 for attempt in range(max_retries + 1): try: r = urllib.request.urlopen(url, timeout=10) text = r.read().decode("gbk") except Exception as e: if attempt < max_retries: continue print(f" batch_fetch_prices error: {e}", file=sys.stderr) continue for line in text.strip().split("\n"): line = line.strip() if not line or "=" not in line: continue try: sym = line.split("=", 1)[0].strip().lstrip("v_") raw_value = line.split("=", 1)[1].strip().strip('"').strip(";") fields = raw_value.split("~") if len(fields) < 35: continue orig_code = code_map.get(sym) if not orig_code: continue def f(i): try: return float(fields[i]) if fields[i].strip() else 0.0 except: return 0.0 all_results[orig_code] = { "price": f(3), "close": f(4), "high": f(33), "low": f(34), "code": orig_code, } except Exception: continue break # Success - break retry loop return all_results def get_price_tencent(code): """获取实时价格,港股转CNY统一存CNY""" try: from mo_models import to_cny, is_hk_stock except ImportError: to_cny = lambda v, r=None: v is_hk_stock = lambda c: len(str(c).strip()) == 5 and str(c).strip().isdigit() try: raw_code = code.split('_')[0] if not raw_code: return None if is_hk_stock(raw_code): prefix = "hk" elif raw_code.startswith("6") or raw_code.startswith("5"): prefix = "sh" else: prefix = "sz" url = f"http://qt.gtimg.cn/q={prefix}{raw_code}" r = urllib.request.urlopen(url, timeout=5) fields = r.read().decode("gbk").split('"')[1].split("~") def f(i): try: return float(fields[i]) if fields[i].strip() else 0.0 except: return 0.0 price = f(3) if is_hk_stock(raw_code) and price > 0: price = to_cny(price) return { "price": price, "close": f(4), "high": f(33), "low": f(34), "code": raw_code, } except Exception as e: print(f" get_price error {code}: {e}", file=sys.stderr) return None def reassess_strategy(code, name, price, cost, shares, current_action, volume_signal="", sentiment="neutral", is_watchlist=False): """根据技术分析重评策略""" tech = ta.full_analysis(code) if tech and "support_resistance" in tech: sr = tech["support_resistance"] candle = tech.get("candlestick", {}) vol = tech.get("volume", {}) ss = sr.get("strong_support") ws = sr.get("weak_support") wr = sr.get("weak_resist") sr_resist = sr.get("strong_resist") pivot = sr.get("pivot") effective_range = sr.get("effective_range") print(f" TECH: 强撑={ss} 弱撑={ws} 枢轴={pivot} 弱压={wr} 强压={sr_resist} 有效区间={effective_range}") else: print(f" ⚠️ 技术分析不可用", file=sys.stderr) ss = ws = wr = sr_resist = pivot = None candle = {} vol = {} # ----- 多周期技术分析(周线/月线/均线) ----- mtf_analysis = {} mtf_adj = {} try: mtf_result = mtf.full_multi_tf_analysis(code) if mtf_result.get("daily") and mtf_result["daily"].get("count", 0) >= 5: mtf_analysis = mtf_result mtf_adj = mtf_result.get("strategy_adjustment", {}) daily_mas = mtf_result.get("daily", {}).get("mas", {}) weekly = mtf_result.get("weekly", {}) monthly = mtf_result.get("monthly", {}) trend_align = mtf_adj.get("trend_alignment", "未知") print(f" 多周期: {trend_align} | " f"MA5={daily_mas.get('ma5','?')} MA20={daily_mas.get('ma20','?')} MA60={daily_mas.get('ma60','?')} | " f"周线{weekly.get('trend',{}).get('description','?')} 月线{monthly.get('trend',{}).get('description','?')}") except Exception as e: print(f" 多周期分析失败: {e}", file=sys.stderr) profit_pct = (price - cost) / cost * 100 if cost else 0 is_new_entry = (cost == 0) or (shares == 0) is_deep_loss = profit_pct < -20 # ----- 股票分类(短炒/中短线/中长线/弱势/深套) ----- stock_category = "中短线" time_horizon = "2周~3月" position_advice = "中等仓位" try: mtf_cache = json.load(open("/home/hmo/web-dashboard/data/multi_tf_cache.json")) stock_data = mtf_cache.get(code, {}) daily_klines = stock_data.get("daily", []) fund = stock_data.get("fundamentals", {}) closes = [d["close"] for d in daily_klines] if daily_klines else [] if len(closes) >= 10: cur = closes[-1] ma20 = sum(closes[-20:])/20 if len(closes)>=20 else 0 ma60 = sum(closes[-60:])/60 if len(closes)>=60 else 0 highs = [d["high"] for d in daily_klines[-20:]] lows = [d["low"] for d in daily_klines[-20:]] volatility = ((max(highs)-min(lows))/min(lows)*100) if min(lows)>0 else 0 pe = fund.get("pe") or 0 eps = fund.get("eps") or 0 mcap = fund.get("mcap_total") or 0 is_high_vol = volatility > 30 is_high_pe = pe > 100 or pe < 0 is_value = 0 < pe < 20 and eps > 0.5 if is_deep_loss: stock_category = "深套" time_horizon = "长期" position_advice = "不补不割" elif is_high_vol and is_high_pe: stock_category = "短炒" time_horizon = "数日~2周" position_advice = "小仓快进快出" elif cur < ma20 and cur < ma60 and ma20 > 0: stock_category = "弱势" time_horizon = "观望" position_advice = "减仓或观望" elif (is_value or mcap > 1000) and cur > ma20: stock_category = "中长线" time_horizon = "数月~1年" position_advice = "正常配置" elif volatility > 20: stock_category = "中短线" time_horizon = "2~6周" position_advice = "中等仓位" except Exception: pass print(f" 分类: {stock_category} | {time_horizon} | {position_advice}") # ----- 短炒+强趋势检测:短炒分类但多周期多头时用移动止损代替弱支撑止损 ----- is_short_term_strong_trend = False if stock_category == "短炒": trend_align = mtf_adj.get("trend_alignment", "") strong_trend_indicators = ["多周期看多", "多周期多头", "上升"] if any(ind in trend_align for ind in strong_trend_indicators): is_short_term_strong_trend = True print(f" ⚡ 短炒+强趋势检测: 趋势={trend_align} → 启用移动止损, 不止盈") position_advice = "小仓强趋势让利润跑" # ----- 止损设置(含最小距离3%保护) ----- if is_new_entry: # 新买入推荐:止损 = 弱支撑(约2-3%跌幅,合理可控) if ws and ws > 0: new_stop = round(ws, 2) else: new_stop = round(price * 0.96, 2) elif is_deep_loss: # 深套:止损 = 强支撑再下移(不轻易割) if ss and ss > 0: new_stop = round(min(ss, price * 0.85), 2) else: new_stop = round(price * 0.85, 2) else: # 已持仓正常:止损 = 强支撑 if is_short_term_strong_trend: # 短炒+强趋势:用移动止损(距现价-5%),不止盈让利润跑 trailing_sl = round(max(ws or 0, price * 0.95), 2) if ws else round(price * 0.95, 2) new_stop = trailing_sl print(f" 短炒强趋势移动止损: {new_stop} (距现价-{(1-new_stop/price)*100:.1f}%)") elif ss and ss > 0: new_stop = round(ss, 2) else: new_stop = round(price * 0.88, 2) # 已盈利仓位(>5%):用较紧的移动止损保护利润,但不超过成本线 if profit_pct > 5 and not is_new_entry and not is_deep_loss: # 取 max(弱支撑, 成本线, 当前价×0.95) 作为止损 cost_protect = cost if cost > 0 else 0 trailing_stop = round(max(ws or 0, cost_protect, price * 0.95), 2) if trailing_stop > new_stop: new_stop = trailing_stop print(f" 已启用移动止损: {new_stop} (保护+{profit_pct:.1f}%利润)", file=sys.stderr) # 最小止损距离 —— 随趋势强度调整(2026-06-23 震度保护规则) # 强趋势(多周期看多 + MA多头排列):最小1.5%下行空间 # 普通/弱势:最小3%下行空间 is_strong_trend = False trend_align = mtf_adj.get("trend_alignment", "") strong_trend_indicators = ["多周期看多", "多周期多头", "上升"] try: if any(ind in trend_align for ind in strong_trend_indicators) and ma20 > ma60 and cur >= ma20: is_strong_trend = True except (NameError, TypeError): pass # ma20/ma60/cur may be unbound if MTF data insufficient if is_strong_trend: min_stop_gap = 0.015 # 1.5% else: min_stop_gap = 0.03 # 3% min_stop = round(price * (1 - min_stop_gap), 2) if new_stop > min_stop and not is_deep_loss: old_stop = new_stop new_stop = min_stop if old_stop != new_stop: print(f" 最小止损 {round(min_stop_gap*100)}%间距约束: {old_stop}→{new_stop} (趋势{'强' if is_strong_trend else '普通'})") # 港股附加:ATR波动率校验 — 止损距现价不得小于 1×ATR(14) if is_hk_stock(code): atr = calc_atr(code) if atr and atr > 0: min_atr_stop = round(price - atr, 2) if new_stop > min_atr_stop: old_stop_val = new_stop new_stop = min_atr_stop print(f" 港股ATR波动率校验({atr:.2f}): 止损 {old_stop_val}→{new_stop} (1×ATR间距)") # ----- 止盈设置 ----- if is_short_term_strong_trend and not is_new_entry: # 短炒+强趋势:不止盈让利润跑 mtf_tp = mtf_adj.get("take_profit_reference", {}) if mtf_tp and mtf_tp.get("level", 0) > price * 1.2: new_target = round(mtf_tp["level"], 2) else: new_target = 0 # 无多周期阻力时不编造止盈 print(f" 短炒强趋势不止盈: 止盈设为{new_target} (+{(new_target/price-1)*100:.0f}%)") elif sr_resist and sr_resist > 0: new_target = round(sr_resist, 2) else: new_target = 0 # 无技术面数据时不编造止盈 # ----- 风险回报比校验 ----- stop_distance = price - new_stop if price > new_stop else price * 0.02 target_distance = new_target - price if new_target > price else 0 # 1:2 检查 min_target_distance = stop_distance * 2.0 if target_distance < min_target_distance: # 尝试更高的阻力位,但不超过下一个真实压力位 candidate_targets = [] if wr and wr > price and wr != sr_resist: candidate_targets.append(wr) if sr_resist and sr_resist > price: candidate_targets.append(sr_resist) # 检查有效区间,如果有更高的自然目标位 if effective_range and price < effective_range * 0.9: candidate_targets.append(effective_range) found = False for level in candidate_targets: if (level - price) >= min_target_distance: new_target = level found = True break # 如果仍然不满足,检查是否至少能到 1:1.5 min15_distance = stop_distance * 1.5 if not found: for level in candidate_targets: if (level - price) >= min15_distance: new_target = level found = True break # ----- 风险回报比最终计算 ----- risk = max(price - new_stop, price * 0.01) reward = max(new_target - price, 0) rr_ratio = reward / risk if risk > 0 else 0 # ----- 状态判断 ----- if is_deep_loss: status = "updated" action_note = "深套持有" elif is_new_entry: if rr_ratio < 1.5: status = "review" action_note = "⚠️盈亏比不足1:1.5,不建议买入" elif rr_ratio < 2.0: status = "updated" action_note = "⚠️盈亏比偏低(1:{:.1f}),谨慎买入".format(rr_ratio) else: status = "updated" action_note = "" else: if rr_ratio < 0.5: status = "updated" action_note = "⚠️盈亏比极低,关注" elif rr_ratio < 1.5: status = "updated" action_note = "⚠️盈亏比偏低(1:{:.1f}),不建议加仓".format(rr_ratio) else: status = "updated" action_note = "" # 短炒+强趋势:在action_note追加标记 if is_short_term_strong_trend and not is_new_entry and not is_deep_loss: extra_note = "短炒强趋势持" if "深套" not in action_note else "" if extra_note: action_note = f"{action_note} | {extra_note}" if action_note else extra_note # ----- 买入区间(有盈亏比严格约束) ----- max_acceptable_entry = None # 最大可接受买入价(满足R/R约束) if new_target and new_stop and new_target > new_stop and not is_deep_loss: # 买入价的R/R约束: # 要求 (target - entry) / (entry - stop) >= min_rr # 即 entry <= (target + min_rr * stop) / (1 + min_rr) min_rr = 1.0 # 至少1:1,才不亏 recommend_rr = 1.5 # 推荐1:1.5以上 max_for_recommend = (new_target + recommend_rr * new_stop) / (1 + recommend_rr) max_for_neutral = (new_target + min_rr * new_stop) / (1 + min_rr) if is_new_entry: # 新买入:要求1:1.5+ max_acceptable_entry = max_for_recommend else: # 已持仓加仓:至少1:1 max_acceptable_entry = max_for_neutral if is_new_entry: # 新买入:买入区 = 弱支撑附近(不是当前价附近!) # 只在价格跌到弱支撑附近时才推买入 entry_low = round(price * 0.98, 2) entry_high = round(price * 1.02, 2) if max_acceptable_entry and entry_high > max_acceptable_entry: entry_high = round(max_acceptable_entry, 2) # 确保买入区不小于1% if entry_high - entry_low < price * 0.01: if max_acceptable_entry and price <= max_acceptable_entry: entry_low = round(max(price * 0.99, new_stop), 2) entry_high = round(min(price * 1.01, max_acceptable_entry), 2) elif ws and ws > 0 and wr and wr > 0 and not is_deep_loss: # 已持仓正常:买入区 = 弱支撑~弱支撑上方5%(给合理回调空间) # 上限不能低于成本价×0.95(保护已有持仓不被高位逼空) entry_low = round(ws, 2) entry_max = round(ws * 1.05, 2) # 比弱支撑高5%,有足够空间 # 如果当前价已远离买入区,保持买入区不变(不因价格涨了就收窄) min_upper = round(cost * 0.95, 2) if cost > 0 else 0 if entry_max < min_upper: entry_max = min_upper if max_acceptable_entry: entry_high = round(min(entry_max, max_acceptable_entry), 2) else: entry_high = entry_max # 如果当前价已远离买入区(高于买入区上沿),禁止加仓推荐 if price > entry_high: # 买入区锁定在弱支撑位,但标记为"价格远离" pass # 如果买入区过窄,标记但不扩展(加仓必须在支撑位) if entry_high - entry_low < price * 0.005: entry_low = round(ws * 0.995, 2) entry_high = round(ws * 1.005, 2) else: entry_low = round(price * 0.90, 2) entry_high = round(price * 1.05, 2) # 买入区间稳定性保护:上边界单次变动不超过5% if 'entry_high' in dir() and entry_high: # 读取当前策略中已有的买入区上界,如果有且变化过大则限制 old_entry_high = None if 'current_action' in dir() and current_action: import re m = re.search(r'买入区[\d.]+~([\d.]+)', current_action) if m: old_entry_high = float(m.group(1)) if old_entry_high and old_entry_high > 0: max_change = old_entry_high * 0.95 # 单次最多下降5% if entry_high < max_change: entry_high = round(max_change, 2) # ----- 买入时机信号(三维分析:大盘+行业+个股,基本面+消息面+技术面+资金流)----- # [2026-07-01] 扩展:不再只看volume_signal + candlestick_sentiment # 融合大盘趋势、行业板块强弱、基本面估值作为修正因子 volume_signal = vol.get("volume_signal", "") candlestick_sentiment = candle.get("sentiment", "neutral") timing_signal = "neutral" # --- 三维分析数据装载 --- # 因子1: 大盘环境(从macro_context_log读) market_bearish = False market_bullish = False try: import sqlite3 _db = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5) _mc = _db.execute( "SELECT structure FROM macro_context_log WHERE has_valid_data=1 ORDER BY rowid DESC LIMIT 1" ).fetchone() if _mc and _mc[0]: _s = json.loads(_mc[0]) _overall = _s.get("overall", "") if "bearish" in _overall: market_bearish = True elif _overall == "bullish": market_bullish = True _db.close() except Exception: pass # 因子2: 行业板块强弱 sector_strong = False sector_weak = False try: _db2 = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5) _rows2 = _db2.execute( "SELECT name, change_pct FROM sector_snapshots ORDER BY change_pct DESC" ).fetchall() if _rows2: # 找到该股所属行业(简单匹配name或通过stock_sectors) _my_sectors = _db2.execute( "SELECT sector_name FROM stock_sectors WHERE code=?", (code,) ).fetchall() if _my_sectors: for (_sn,) in _my_sectors: for r_name, r_chg in _rows2: if _sn in r_name or r_name in _sn: _rank = [r[0] for r in _rows2].index(r_name) if r_name in [x[0] for x in _rows2] else -1 _total = len(_rows2) if _rank >= 0: if _rank < _total * 0.2: sector_strong = True if _rank > _total * 0.8: sector_weak = True break _db2.close() except Exception: pass # 因子3: 基本面估值 is_value_stock = False try: _db3 = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5) _fd = _db3.execute( "SELECT pe, eps FROM stock_fundamentals WHERE code=?", (code,) ).fetchone() if _fd: _pe, _eps = _fd is_value_stock = (0 < (_pe or 0) < 25 and (_eps or 0) > 0.3) _db3.close() except Exception: pass # --- 三维修正规则 --- # 大盘偏弱时收紧买入信号,大盘偏强时放宽 # 行业领先加分,行业落后减分 # 低估值加分(有安全边际) def _adjust_timing(signal, market_b, market_bb, sec_s, sec_w, is_val): """根据三维因子修正 timing_signal""" # 大盘偏弱时降级买入信号 if market_b: if signal in ("买入", "加仓"): if not sec_s: # 大盘弱+行业不强→降级 return "关注" # 大盘偏强时放宽 if market_bb: if signal == "关注" and (sec_s or is_val): return "买入" # 行业弱势时降级买入信号 if sec_w: if signal in ("买入", "加仓"): return "关注" # 行业强势+低估时升级关注 if sec_s and is_val: if signal == "关注": return "买入" return signal if is_new_entry: # 新买入时机 if volume_signal == "主动买盘占优" and candlestick_sentiment == "bullish": timing_signal = "买入" elif volume_signal == "主动卖盘占优": timing_signal = "观望" elif volume_signal == "买卖均衡" and ws and price <= ws * 1.03: timing_signal = "买入" elif candlestick_sentiment == "bullish": timing_signal = "买入" elif ws and price < ws * 1.02: timing_signal = "关注" # 新买入时三维修正:大盘向上+行业强→升级,大盘弱→降级 _pre_signal = timing_signal timing_signal = _adjust_timing(timing_signal, market_bearish, market_bullish, sector_strong, sector_weak, is_value_stock) if timing_signal != _pre_signal: print(f" 三维修正(新入): {_pre_signal}→{timing_signal} " f"| 大盘{'弱' if market_bearish else '强' if market_bullish else '中性'}" f"| 行业{'强' if sector_strong else '弱' if sector_weak else '中性'}" f"| 估值{'低' if is_value_stock else '一般'}") else: # 已持仓时机(用于加仓/减仓参考) if is_short_term_strong_trend: # 短炒+强趋势:强趋势持有,禁止加仓信号 timing_signal = "持有" elif profit_pct > 5: # 已盈利 if volume_signal == "主动买盘占优": timing_signal = "持有" elif volume_signal == "主动卖盘占优" and not is_new_entry: timing_signal = "关注" else: timing_signal = "持有" elif profit_pct > 0: # 微盈 if volume_signal == "主动买盘占优": timing_signal = "持有" elif ws and price <= ws * 1.02: timing_signal = "加仓" else: timing_signal = "持有" else: # 浮亏 if volume_signal == "主动卖盘占优" and ss and price <= ss * 1.03: timing_signal = "关注" elif volume_signal == "主动买盘占优" and sr_resist and price >= sr_resist * 0.97: timing_signal = "关注" elif volume_signal == "买卖均衡" and ws and price <= ws * 1.02: timing_signal = "加仓" else: timing_signal = "持有" # ----- 【v3.2新增】分类约束:弱势/深套禁止输出买入/加仓类信号 ----- if stock_category == "弱势" or is_deep_loss: buy_signals = ["买入", "加仓", "可追"] if any(s in timing_signal for s in buy_signals): old_signal = timing_signal timing_signal = "弱势持有" if stock_category == "弱势" else "深套持有" print(f" 分类约束: {stock_category} 原信号\"{old_signal}\" → \"{timing_signal}\"") # ----- 构造 action 描述(供 cron prompt 使用) ----- action_parts = [] if profit_pct < -20: action_parts.append("深套持有") elif profit_pct < -10: action_parts.append("持有观察") elif profit_pct < 0: action_parts.append("持有观察") elif profit_pct < 5: action_parts.append("盈利持有") else: action_parts.append("盈利良好") if action_note: action_parts.append(action_note) if is_watchlist: # 自选股(未入场):有止损参考+买入区,内部算RR需要止盈位 action_parts.append(f"目标参考{new_target}") action_parts.append(f"止损参考{new_stop}") action_parts.append(f"买入区{entry_low}~{entry_high}") elif is_new_entry: action_parts.append(f"损{new_stop}") action_parts.append(f"盈{new_target}") action_parts.append(f"买{entry_low}~{entry_high}") else: action_parts.append(f"止损{new_stop}") action_parts.append(f"目标{new_target}") action_parts.append(f"买入区{entry_low}~{entry_high}") if timing_signal != "neutral": action_parts.append(f"信号:{timing_signal}") new_action = " | ".join(action_parts) # 技术面快照 tech_snapshot = "" if candle: tech_snapshot = (f"形态:{candle.get('pattern','?')}/{candle.get('sentiment','?')} " f"量价:{vol.get('volume_signal','?')} " f"强撑:{ss} 弱撑:{ws} 弱压:{wr} 强压:{sr_resist}") # 加入均线信息(如果可用) try: dm = mtf_analysis.get("daily", {}).get("mas", {}) ma_parts = [] for m in ['ma5', 'ma10', 'ma20', 'ma60']: v = dm.get(m) if v: ma_parts.append(f"{m.upper()}={v}") if ma_parts: tech_snapshot += " | " + " ".join(ma_parts) except (NameError, AttributeError): pass # 多周期快照(追加到 tech_snapshot) mtf_context = "" if mtf_adj: trend_align = mtf_adj.get("trend_alignment", "") daily_mas = mtf_analysis.get("daily", {}).get("mas", {}) ma20 = daily_mas.get("ma20") ma60 = daily_mas.get("ma60") stop_ref = mtf_adj.get("stop_loss_reference", {}) take_ref = mtf_adj.get("take_profit_reference", {}) parts = [] if trend_align: parts.append(trend_align) if ma20: parts.append(f"MA20={ma20}") if ma60: parts.append(f"MA60={ma60}") if stop_ref: parts.append(f"长撑:{stop_ref.get('source','?')}={stop_ref['level']}") if take_ref: parts.append(f"长压:{take_ref.get('source','?')}={take_ref['level']}") mtf_context = " | ".join(parts) now_str = datetime.now().strftime('%Y-%m-%d %H:%M') return { 'stop_loss': new_stop, 'take_profit': new_target, 'entry_low': entry_low, 'entry_high': entry_high, 'action': new_action, 'status': status, 'tech_snapshot': tech_snapshot, 'timing_signal': timing_signal, 'rr_ratio': round(rr_ratio, 2), 'action_note': action_note, 'reassessed_at': now_str, 'multi_tf_context': mtf_context, # 多周期上下文 'stock_category': stock_category, # 股票分类:短炒/中短线/中长线/弱势/深套 'time_horizon': time_horizon, # 时间跨度 'position_advice': position_advice, # 仓位建议 } def load_stock_news_sentiment(code): """加载小果消息面情感""" try: path = "/home/hmo/web-dashboard/data/xiaoguo_sentiment.json" if not os.path.exists(path): return {} xg = json.load(open(path)) return xg.get("stocks", {}).get(code, {}) except Exception: return {} def load_fundamentals(code): """加载个股基本面""" try: path = "/home/hmo/web-dashboard/data/multi_tf_cache.json" if not os.path.exists(path): return {} m = json.load(open(path)) return m.get(code, {}).get("fundamentals", {}) or {} except Exception: return {} def _get_portfolio_risk_state(): """读取 portfolio 组合风险状态(2026-06-23 引擎协调)""" try: # 数据一致性检查:警告多副本(2026-06-23 bugfix) _check_portfolio_consistency() p = json.load(open('/home/hmo/web-dashboard/data/portfolio.json')) pos_pct = p.get('position_pct', 0) cash = p.get('cash', 0) holdings = p.get('holdings', []) weak_cnt = sum(1 for h in holdings if h.get('change_pct', 0) < -15) total = len(holdings) or 1 weak_ratio = weak_cnt / total return { 'position_pct': pos_pct, 'cash': cash, 'is_high_position': pos_pct > 80, 'is_very_high_position': pos_pct > 90, 'is_high_weak': weak_ratio > 0.35, 'weak_ratio': round(weak_ratio * 100), 'total_holdings': total, } except: return {} def _is_buy_signal(signal): """判断信号是否为买入/持有类(用于防洗盘)""" if not signal: return False buy_keywords = ['买入', '持有', '加仓', '关注'] for kw in buy_keywords: if kw in signal: return True return False def _check_portfolio_consistency(): """数据一致性检查:如果存在多份 portfolio.json 则报警(2026-06-23 bugfix)""" main = '/home/hmo/web-dashboard/data/portfolio.json' main_cash = None try: import json main_cash = json.load(open(main)).get('cash') except Exception: return for path in [ '/home/hmo/data/portfolio.json', '/home/hmo/projects/MoFin/data/portfolio.json', '/home/hmo/web-dashboard.bak/data/portfolio.json', ]: if os.path.exists(path): try: other = json.load(open(path)) if other.get('cash') != main_cash: print(f"⚠️ 数据一致性: {os.path.realpath(path)} cash={other.get('cash')} ≠ 主文件 cash={main_cash} (需清理)", file=sys.stderr) except Exception: pass def _check_contradiction(code, today_only=True): """反馈循环核——检查本股是否有刚卖出的记录 返回 dict or None: - sold_reason: 'portfolio_trim'|'stop_loss' - sold_at: 卖出日期 - days_ago: 卖出距今交易日数 - is_today: 是否今日卖出 - tag: 追加到信号的标注 """ try: from datetime import datetime, date dec = json.load(open('/home/hmo/web-dashboard/data/decisions.json')) for e in dec.get('decisions', []): if e.get('code') != code: continue sold_at = e.get('sold_at', '') if not sold_at: return None try: sd = datetime.strptime(sold_at, '%Y-%m-%d').date() td = date.today() days = (td - sd).days except: return None reason = e.get('sold_reason', 'portfolio_trim') if reason == 'stop_loss': tag = '止损离场(逻辑破坏,短期不关注)' else: tag = '组合减仓后关注(已清仓,等回踩确认)' return { 'sold_reason': reason, 'sold_at': sold_at, 'days_ago': days, 'is_today': days == 0, 'tag': tag, } except: return None return None def _get_sell_priority_list(): """减仓优先级排序:深套>亏损>微盈>盈利(2026-06-23 反馈循环) 返回 [(code, name, change_pct, position_pct, priority_label), ...] 按卖出的优先顺序排列(最先应该卖的在最前) """ try: p = json.load(open('/home/hmo/web-dashboard/data/portfolio.json')) holdings = p.get('holdings', []) ranked = [] for h in holdings: chg = h.get('change_pct', 0) pos = h.get('position_pct', 0) if chg < -30: label = '深套(>30%),优先减' rank = 0 elif chg < -20: label = '深套(>20%),优先减' rank = 1 elif chg < -10: label = '亏损,建议减' rank = 2 elif chg < 0: label = '微亏,可减' rank = 3 elif chg < 10: label = '微盈,持有' rank = 4 else: label = '盈利,最后减' rank = 5 ranked.append((rank, h['code'], h.get('name',''), chg, pos, label)) ranked.sort(key=lambda x: (x[0], -x[4])) # 优先 rank, 其次仓位大优先 return [{'code':c,'name':n,'change_pct':chg,'position_pct':pos,'label':l} for r,c,n,chg,pos,l in ranked] except: return [] def enrich_timing_signal(base_signal, macro_desc="", sector_note="", profit_pct=0, stock_category="", is_new_entry=False, fundamentals=None, news_sentiment=None, timing_signal_override=None, portfolio_context=None, rr_ratio=0): # 2026-06-24 新参:盈亏比约束 """多因子合成timing_signal——大盘+行业+基本面+技术+组合风险+盈亏比 返回 (enriched_signal, factors_list) - enriched_signal: 可读的多因子信号描述 - factors_list: 各因子的摘要列表(用于后续显示) """ # 如果已手动设定,尊重手动 if timing_signal_override and timing_signal_override != "neutral": return timing_signal_override, [timing_signal_override] factors = [] # 1. 大盘因子 if "偏强" in macro_desc or "大涨" in macro_desc or "bullish" in macro_desc.lower(): macro_txt = "大盘偏强" factors.append(macro_txt) elif "偏弱" in macro_desc or "大跌" in macro_desc or "bearish" in macro_desc.lower(): macro_txt = "大盘偏弱" factors.append(macro_txt) elif macro_desc and macro_desc != "宏观未加载": factors.append("大盘中性") # 2. 行业因子 if sector_note: # 把"行业X大跌3%+"简化为"行业偏弱","行业X大涨3%+"简化为"行业偏强" if "大跌" in sector_note or "下跌" in sector_note: factors.append("行业偏弱") elif "大涨" in sector_note: factors.append("行业偏强") elif "上涨" in sector_note: factors.append("行业偏强") else: factors.append("行业中性") # 3. 基本面因子 if fundamentals: pe = fundamentals.get("pe", 0) eps = fundamentals.get("eps", 0) profit_growth = fundamentals.get("profit_growth", fundamentals.get("yoy_profit", "")) revenue_growth = fundamentals.get("revenue_growth", fundamentals.get("yoy_revenue", "")) mcap = fundamentals.get("mcap_total", 0) pe = pe or 0 eps = eps or 0 profit_growth_str = str(profit_growth or "") revenue_growth_str = str(revenue_growth or "") # 净利增长 for val in [profit_growth_str, revenue_growth_str]: try: v = float(val.replace("%", "").replace("+", "")) if v > 50: factors.append("净利增50%+") break elif v > 20: factors.append(f"净利增{int(v)}%") break elif v < -20: factors.append("净利降20%+") break except (ValueError, AttributeError): continue # PE估值 if 0 < pe < 15: factors.append("低估值") elif pe > 100 or pe < 0: factors.append("高估值") # 市值 if mcap and mcap > 5000: factors.append("蓝筹") # 4. 消息面因子(小果情感) if news_sentiment: ns = news_sentiment.get("sentiment", "") nc = news_sentiment.get("confidence", 0) if ns == "positive" and nc >= 0.7: kws = news_sentiment.get("keywords", []) kw_str = f"({'/'.join(kws[:3])})" if kws else "" factors.append(f"消息偏多{kw_str}") elif ns == "negative" and nc >= 0.7: kws = news_sentiment.get("keywords", []) kw_str = f"({'/'.join(kws[:3])})" if kws else "" factors.append(f"消息偏空{kw_str}") # 5. 技术面(基础信号) if base_signal and base_signal != "neutral": factors.append(base_signal) # 5.5 组合风险因子(2026-06-23 双引擎协调) if portfolio_context and not is_new_entry: if portfolio_context.get('is_very_high_position'): factors.append("组合仓位极重(>90%)") elif portfolio_context.get('is_high_position'): factors.append("组合仓位偏重(>80%)") if portfolio_context.get('is_high_weak'): factors.append(f"弱势占{portfolio_context.get('weak_ratio')}%") elif portfolio_context and is_new_entry: # 新买入推荐:注明组合上下文 if portfolio_context.get('is_high_position'): factors.append(f"仓{portfolio_context.get('position_pct')}%现金有限") elif portfolio_context.get('is_high_weak'): factors.append("组合风险信号") # 5.7 盈亏比因子(2026-06-24 新增——RR<1.5降级买入信号) if rr_ratio > 0: if rr_ratio < 1.5: factors.append(f"RR{rr_ratio}过低") elif rr_ratio >= 3: factors.append(f"RR{rr_ratio}") # 1.5~3之间:中性,不特别标注 # 如果没有足够因素,返回信号不充分 if not factors: return "信号不充分", [] # 信号只应包含明确的买卖方向,不能从行业/大盘等上下文因子拼凑 # base_signal 存在且非 neutral → 用 base_signal # 否则 → 信号不充分(不拿 factors[-1] 当信号) if base_signal and base_signal != "neutral": clean_signal = base_signal else: # 从 factors 中找第一个有效的操作方向信号 valid_direction = {"买入", "加仓", "观望", "持有", "关注", "信号不充分"} signal_found = "" for f in reversed(factors): if f in valid_direction: signal_found = f break clean_signal = signal_found if signal_found else "信号不充分" # 6. RR约束降级(2026-06-24 新增) # 买入/加仓信号但RR<1.5 → 降级为"信号不充分" buy_signals = {"买入", "加仓"} if clean_signal in buy_signals and 0 < rr_ratio < 1.5: clean_signal = "信号不充分" factors.append("RR过低降级") return clean_signal, factors def reassess_with_context(code, name, price, cost, shares, current_action, volume_signal="", sentiment="neutral", is_watchlist=False): """reassess_strategy + 多因子信号合成(大盘+行业+技术) 为 per_stock_reassess 等单只场景提供一站式多因子分析 """ result = reassess_strategy( code, name, price, cost, shares, current_action, volume_signal, sentiment, is_watchlist ) if not result: return result # 加载宏观+行业+消息+基本面上下文 try: macro_bias, macro_desc = load_macro_context() market_ctx = load_market_context() stock_sector_map = load_stock_sector_map() sector_adj = compute_sector_adjustment(code, market_ctx, stock_sector_map) sector_note = sector_adj.get("note", "") news_sentiment = load_stock_news_sentiment(code) fund = load_fundamentals(code) except Exception: macro_desc = "" sector_note = "" news_sentiment = {} fund = {} # ── DSA 集成:注入大盘复盘 + 新闻情报 ────────────────────────── try: from mo_bridge import enrich_analysis_context region = "hk" if len(str(code)) == 5 and str(code)[0] in ('0','1') else "cn" dsa_ctx = enrich_analysis_context(stock_code=code, stock_name=name, region=region, include_news=True) if dsa_ctx: macro_desc = (macro_desc + "\n\n" + dsa_ctx).strip() except Exception: pass # DSA 不可用时静默跳过 enriched, factors = enrich_timing_signal( base_signal=result.get("timing_signal", ""), macro_desc=macro_desc, sector_note=sector_note, profit_pct=(price - cost) / cost * 100 if cost else 0, stock_category=result.get("stock_category", ""), is_new_entry=is_watchlist, fundamentals=fund, news_sentiment=news_sentiment, portfolio_context=_get_portfolio_risk_state(), rr_ratio=result.get("rr_ratio", 0), ) result["timing_signal"] = enriched result["signal_factors"] = factors # 6. 防洗盘:信号不要一天一翻(2026-06-23) # 如果旧信号是买入/持有类,新信号是谨慎/等待类,但中期趋势未破→维持旧信号 try: dec = json.load(open('/home/hmo/web-dashboard/data/decisions.json')) for e in dec.get('decisions', []): if e.get('code') == code: old_signal = e.get('timing_signal', '') if old_signal and _is_buy_signal(old_signal) and not _is_buy_signal(enriched): # 中等趋势检查:MA5 > MA20 + 多周期看多 mtf = result.get('multi_tf_context', '') if '看多' in mtf or '多头' in mtf: try: closes = [float(k.split()[2]) for k in mtf.split('|') if 'MA5' in k] except: closes = [] has_uptrend = 'MA5' in mtf and 'MA20' in mtf if has_uptrend: print(f" 防洗盘: {old_signal}→保持旧信号(中期趋势完整)") result["timing_signal"] = f"{old_signal}(正常回调价稳)" sf = result.get("signal_factors") or [] if "正常回调价稳" not in sf: result["signal_factors"] = sf + ["正常回调价稳"] break except Exception as e: print(f" 防洗盘跳过: {e}") # 7. 反馈循环核:检查本股是否有刚卖出的记录(2026-06-23) contradiction = _check_contradiction(code) if contradiction and contradiction.get('is_today'): # 今日刚卖出 → 不屏蔽信号,但必须自标注矛盾 print(f" 反馈循环: {contradiction.get('tag')} (sold_at={contradiction.get('sold_at')})") if _is_buy_signal(result.get('timing_signal', '')): result['action_note'] = contradiction['tag'] # 在 timing_signal 中追加反馈标注,供报告层可见 curr_signal = result.get('timing_signal', '') if '⚠️' not in curr_signal: result['timing_signal'] = f"⚠️{contradiction['tag']}|{curr_signal}" elif contradiction: # 非今日卖出但近期卖出 → 标注已清仓 print(f" 近期清仓: sold_at={contradiction.get('sold_at')} ({contradiction.get('days_ago')}日前)") if _is_buy_signal(result.get('timing_signal', '')): curr_signal = result.get('timing_signal', '') if '已清仓' not in curr_signal: result['timing_signal'] = f"已清仓,{curr_signal}" # 重建 action 文本(同步多因子信号) try: if new_action_needs_refresh(result, {"source": "auto"}, price): _refresh_action_text(result, price, name) except Exception: pass return result def new_action_needs_refresh(result, old_entry, price): """判断宏观/行业调整后是否需要刷新action文本""" # 自选股和手动策略不做调整,不需要刷新 if old_entry.get("source") == "manual": return False return True def _refresh_action_text(result, price, name): """根据调整后的止损/止盈重建action文本""" sl = result.get("stop_loss", 0) tp = result.get("take_profit", 0) el = result.get("entry_low", 0) eh = result.get("entry_high", 0) ts = result.get("timing_signal", "") an = result.get("action_note", "") old_action = result.get("action", "") # 保持原action的前缀(持有状态部分不变) # action格式一般是: "状态 | 止损X | 目标Y | 买入区X~Y | 信号:Z" parts = old_action.split(" | ") new_parts = [] for p in parts: p = p.strip() # 替换止损数字 if p.startswith("止损") or p.startswith("止损参考"): if sl: p = f"止损{sl}" if "止损参考" not in old_action.split(" | ")[0] else f"止损参考{sl}" # 替换目标/止盈数字 if p.startswith("目标") or p.startswith("止盈"): if tp: p = f"目标{tp}" # 替换买入区数字 if "买入区" in p and "~" in p: if el and eh: p = f"买入区{el}~{eh}" new_parts.append(p) result["action"] = " | ".join(new_parts) def check_sector_alerts(market_ctx, stock_sector_map, holdings, wl): """行业轮动主动预警:检测板块崩盘级别信号→查持仓→输出预警 返回 list of alerts: [{code, name, sector, chg, action}] """ alerts = [] if not market_ctx: return alerts sector_perf = market_ctx.get("sector_perf", {}) # 找出所有跌幅>3%的行业 crashing_sectors = {name: data for name, data in sector_perf.items() if data.get("change", 0) <= -3} if not crashing_sectors: return alerts # 构建 code→持仓信息 的映射 holding_map = {} for h in holdings: c = h.get("code", "") if c: holding_map[c] = {"name": h.get("name", c), "type": "持仓"} for s in wl.get("stocks", []): c = s.get("code", "") if c and c not in holding_map: holding_map[c] = {"name": s.get("name", c), "type": "自选"} # 对每个暴跌行业,查持仓中是否有股票属于该行业 for sec_name, sec_data in sorted(crashing_sectors.items(), key=lambda x: x[1].get("change", 0)): chg = sec_data.get("change", 0) for code, sectors in stock_sector_map.items(): if code in holding_map and sec_name in sectors: info = holding_map[code] alerts.append({ "code": code, "name": info["name"], "sector": sec_name, "sector_change": chg, "type": info["type"], "action": f"行业{sec_name}跌{chg:+.1f}%,{info['type']}需关注", }) alerts.sort(key=lambda a: a["sector_change"]) return alerts def regenerate_all(stdout=True): """全量重评所有持仓+自选策略""" # 优先从 SQLite 读取 try: from mofin_db import get_conn, query_holdings, query_watchlist conn = get_conn() holdings = query_holdings(conn) wl_stocks = query_watchlist(conn) conn.close() pf = {"holdings": holdings} wl = {"stocks": wl_stocks} except Exception: pf = safe_json_load(PORTFOLIO_PATH, {}) wl = safe_json_load(WATCHLIST_PATH, {}) all_stocks = {} for item in pf.get("holdings", []): code = item.get("code", "") if code: all_stocks[code] = {"source": "portfolio", "data": item} for item in wl.get("stocks", []): code = item.get("code", "") if code and code not in all_stocks: all_stocks[code] = {"source": "watchlist", "data": item} total = len(all_stocks) ok = 0 errors = 0 results = [] decisions = [] # 加载现有 decisions.json 以便追踪变更 decisions_path = "/home/hmo/web-dashboard/data/decisions.json" try: existing_decisions = {d["code"]: d for d in mo_data.read_decisions().get("decisions", []) if d.get("code")} except: existing_decisions = {} # 加载宏观上下文(影响策略参数调整) macro_bias, macro_desc = load_macro_context() if stdout: print(f" 宏观参考: {macro_desc} (bias={macro_bias})") # 加载市场上下文 — 行业板块表现 + 大盘宽度(策略参数调整用) market_ctx = load_market_context() stock_sector_map = load_stock_sector_map() market_breadth = market_ctx.get("breadth", 50) market_mood = market_ctx.get("mood", "neutral") if stdout: sectors_found = sum(1 for c in all_stocks if stock_sector_map.get(c)) print(f" 市场参考: {market_mood} 上涨比{market_breadth}% 已匹配{sectors_found}/{total}只个股行业") # 批量预取所有价格(一次API调用 vs 之前N次) prices_map = batch_fetch_prices(list(all_stocks.keys())) if stdout: print(f" 批量获取价格: {len(prices_map)}/{total} 成功") for code, info in sorted(all_stocks.items()): stock = info["data"] name = stock.get("name", code) cost = stock.get("cost", 0) or 0 shares = stock.get("shares", 0) or 0 source = info["source"] q = prices_map.get(code) if not q or not q.get("price"): results.append({"code": code, "name": name, "error": "腾讯API无数据"}) errors += 1 if stdout: print(f" ❌ {name}({code}): 腾讯API无数据") continue price = q["price"] profit_pct = (price - cost) / cost * 100 if cost else 0 current_action = stock.get("analysis", {}).get("action", "") close_yest = q.get("close", 0) sentiment = "neutral" if close_yest and price > close_yest * 1.02: sentiment = "bullish" elif close_yest and price < close_yest * 0.98: sentiment = "bearish" try: is_wl = (source == "watchlist") result = reassess_strategy( code, name, price, cost, shares, current_action, volume_signal="中性", sentiment=sentiment, is_watchlist=(source == "watchlist"), ) # --- Manual param preservation: 用户手动策略永不覆盖 --- old_entry = existing_decisions.get(code, {}) if old_entry.get("source") == "manual": # 仅覆盖策略参数,技术分析/信号/价格照常保留 for key in ["entry_low", "entry_high", "stop_loss", "take_profit"]: if key in old_entry and old_entry[key] is not None: result[key] = old_entry[key] # 重算盈亏比(基于手动参数) manual_stop = result.get("stop_loss", 0) or 0 manual_target = result.get("take_profit", 0) or 0 risk = max(price - manual_stop, price * 0.01) if manual_stop > 0 else price * 0.01 reward = max(manual_target - price, 0) if manual_target > 0 else 0 result["rr_ratio"] = round(reward / risk, 2) if risk > 0 else 0 # 重建 action 文本(引用手动参数,不引用自动计算的) profit_pct = (price - cost) / cost * 100 if cost else 0 manual_action_parts = [] if profit_pct < -20: manual_action_parts.append("深套持有") elif profit_pct < -10: manual_action_parts.append("持有观察") elif profit_pct < 0: manual_action_parts.append("持有观察") elif profit_pct < 5: manual_action_parts.append("盈利持有") else: manual_action_parts.append("盈利良好") if result.get("action_note"): manual_action_parts.append(result["action_note"]) if is_wl: if manual_stop > 0: manual_action_parts.append(f"止损参考{manual_stop}") manual_action_parts.append(f"买入区{result['entry_low']}~{result['entry_high']}") else: if manual_stop > 0: manual_action_parts.append(f"止损{manual_stop}") if manual_target > 0: manual_action_parts.append(f"目标{manual_target}") manual_action_parts.append(f"买入区{result['entry_low']}~{result['entry_high']}") ts = result.get("timing_signal", "") if ts and ts != "neutral": manual_action_parts.append(f"信号:{ts}") result["action"] = " | ".join(manual_action_parts) result["status"] = "manual" # 标记为手动管理,变更追踪不受影响 if stdout: print(f" [手动保留] {name}({code}) 策略参数未覆盖") # 宏观偏差调整:收盘后重评时根据宏观方向微调止损/止盈 # 自选股不做止盈宏观调整(无持仓) # 手动策略不做宏观偏差调整(尊重用户设定) if macro_bias != 1.0 and not is_wl and old_entry.get("source") != "manual": old_stop = result.get("stop_loss", 0) old_target = result.get("take_profit", 0) if macro_bias < 1.0 and old_stop > 0: # 宏观偏弱 → 收紧止损 # 止损上移(但保留最小3%间距) adjusted_stop = round(old_stop * (1 + (1 - macro_bias) * 0.3), 2) min_stop = round(price * 0.97, 2) result["stop_loss"] = min(adjusted_stop, min_stop) if old_target > 0: result["take_profit"] = round(old_target * (1 - (1 - macro_bias) * 0.2), 2) elif macro_bias > 1.0 and old_target > 0: # 宏观偏强 → 止盈上调让利润跑 result["take_profit"] = round(old_target * (1 + (macro_bias - 1) * 0.3), 2) # 行业偏差调整:根据个股所在行业的市场表现微调止损/止盈 # 手动策略不做行业调整(尊重用户设定) sector_adj = compute_sector_adjustment(code, market_ctx, stock_sector_map) sector_note = sector_adj.get("note", "") if sector_note and old_entry.get("source") != "manual": old_stop = result.get("stop_loss", 0) old_target = result.get("take_profit", 0) stop_bias = sector_adj.get("stop_bias", 1.0) target_bias = sector_adj.get("target_bias", 1.0) if stop_bias != 1.0 and old_stop > 0: # 行业偏差调整(在宏观调整之后叠加) adjusted = round(old_stop * stop_bias, 2) # 保留最小3%间距 min_stop = round(price * 0.97, 2) result["stop_loss"] = min(adjusted, min_stop) if target_bias != 1.0 and old_target > 0 and not is_wl: result["take_profit"] = round(old_target * target_bias, 2) # 加载消息面+基本面(逐个股) news_sentiment = load_stock_news_sentiment(code) fund = load_fundamentals(code) # 多因子合成 timing_signal:大盘+行业+消息+基本面+技术 if old_entry.get("source") != "manual": enriched, _ = enrich_timing_signal( base_signal=result.get("timing_signal", ""), macro_desc=macro_desc, sector_note=sector_note, profit_pct=profit_pct, stock_category=result.get("stock_category", ""), is_new_entry=(source == "watchlist"), fundamentals=fund, news_sentiment=news_sentiment, rr_ratio=result.get("rr_ratio", 0), ) result["timing_signal"] = enriched # 在宏观/行业/多因子调整后重建 action 文本(同步调整后的止损/止盈数字) if new_action_needs_refresh(result, old_entry, price): _refresh_action_text(result, price, name) extra = { "rr_ratio": result.get("rr_ratio"), "action_note": result.get("action_note", ""), "timing_signal": result.get("timing_signal", ""), } analysis = { "stop_loss": result["stop_loss"], "take_profit": result["take_profit"], "entry_low": result["entry_low"], "entry_high": result["entry_high"], "action": result["action"], "tech_snapshot": result.get("tech_snapshot", ""), "multi_tf_context": result.get("multi_tf_context", ""), "reassessed_at": result["reassessed_at"], "status": result["status"], **extra, } stock["analysis"] = analysis # 同步 top-level 字段 → zone_breach/price_monitor 依赖这些字段 # (2026-06-24 bugfix: analysis 子对象有但顶层没有,导致新持仓的止损检测盲区) stock["stop_loss"] = result.get("stop_loss", 0) stock["take_profit"] = result.get("take_profit", 0) stock["entry_low"] = result.get("entry_low", 0) stock["entry_high"] = result.get("entry_high", 0) # 同步 trigger 字段 -> price_monitor 依赖 sl = result.get("stop_loss", 0) tp = result.get("take_profit", 0) el = result.get("entry_low", 0) eh = result.get("entry_high", 0) trig = {} if sl and float(sl) > 0: trig["stop_loss"] = float(sl) if el and eh and float(el) > 0 and float(eh) > 0: trig["entry_zone"] = f"{float(el)}~{float(eh)}" if tp and float(tp) > 0: trig["take_profit_zone"] = f"0~{float(tp)}" stock["trigger"] = trig results.append({ "code": code, "name": name, "price": price, "cost": cost, "action": result["action"], "stop_loss": result["stop_loss"], "take_profit": result["take_profit"], "rr_ratio": result["rr_ratio"], }) ok += 1 if stdout: rr_str = f" RR={result['rr_ratio']}" if "rr_ratio" in result else "" print(f" ✅ {name}({code}) {price} {result['action']}{rr_str}") # 记录所有股票的决策日志(含变更追踪) status_display = result.get("status", "active") # 构建行业上下文 sector_ctx_str = "" sec_name = sector_adj.get("sector_name", "") sec_chg = sector_adj.get("sector_change", 0) if sec_name: sector_ctx_str = f"行业{sec_name}{sec_chg:+.1f}%" if sector_adj.get("note"): # note 已包含大盘宽度信息 sector_ctx_str = sector_adj["note"] elif market_breadth < 40: # 无行业映射时至少记录大盘宽度 sector_ctx_str = f"大盘上涨比{market_breadth}%" new_entry = { "code": code, "name": name, "price": price, "cost": old_entry.get("cost", cost) if old_entry else cost, # 优先保留旧成本(holding.xls权威) "shares": old_entry.get("shares", 0), # 保留持仓股数 "avg_price": old_entry.get("avg_price", 0), # 保留持仓均价 "action": result["action"], "stop_loss": result.get("stop_loss"), "entry_low": result["entry_low"], "entry_high": result["entry_high"], "tech_snapshot": result.get("tech_snapshot", ""), "timing_signal": result.get("timing_signal", ""), "rr_ratio": result.get("rr_ratio", 0), "status": status_display, "note": result.get("action_note", ""), "timestamp": result["reassessed_at"], "updated_at": result["reassessed_at"], "type": "自选策略" if is_wl else "持仓策略", "source": old_entry.get("source", "auto"), # manual/auto,继承旧标记 "sector_context": sector_ctx_str, # 市场上下文:行业表现+大盘宽度 "stock_category": result.get("stock_category", "中短线"), # 组合监测用 "position_advice": result.get("position_advice", "中等仓位"), "time_horizon": result.get("time_horizon", "2周~3月"), } new_entry["trigger"] = trig # created_at: 首次创建时设置,后续 preserve old_entry = existing_decisions.get(code, {}) if old_entry.get("created_at"): new_entry["created_at"] = old_entry["created_at"] else: new_entry["created_at"] = result["reassessed_at"] # 保留 last_reassessed_price(per_stock_reassess 维护的防抖字段) if old_entry.get("last_reassessed_price"): new_entry["last_reassessed_price"] = old_entry["last_reassessed_price"] # 自选股也写止盈位(用于RR校验),但标签用"目标参考"非"止盈" new_entry["take_profit"] = result.get("take_profit") # --- 变更追踪 --- old_action = old_entry.get("action", "") old_stop = old_entry.get("stop_loss") old_target = old_entry.get("take_profit") # 构建旧策略摘要和变更理由 update_reason = "" changelog_entry = None if old_action and old_action != result["action"]: # 策略有变化 → 记录变更 old_summary = old_action new_summary = result["action"] # 判断触发原因 if abs(price - old_entry.get("price", price)) / max(price, 0.01) > 0.03: trigger = f"价格变动({old_entry.get('price','?')}→{price})" elif result.get("timing_signal") and result["timing_signal"] != old_entry.get("timing_signal", ""): trigger = f"技术信号变化: {result['timing_signal']}" else: trigger = "技术面重评" # 格式化的变更理由(自选股只看止损,不看止盈) diff_parts = [] if old_stop and result["stop_loss"] != old_stop: diff_parts.append(f"止损{old_stop}→{result['stop_loss']}") if not is_wl and old_target and result.get("take_profit") and result["take_profit"] != old_target: diff_parts.append(f"止盈{old_target}→{result['take_profit']}") if diff_parts: update_reason = f"{trigger}: {', '.join(diff_parts)} | {result.get('tech_snapshot','')[:60]}" else: update_reason = f"{trigger}: 策略文字调整" changelog_entry = { "date": result["reassessed_at"], "old_action": old_action, "new_action": result["action"], "reason": update_reason, "trigger": trigger, } new_entry["updated_reason"] = update_reason elif not old_action: # 首次创建策略 update_reason = f"初始策略创建 | {result.get('tech_snapshot','')[:60]}" changelog_entry = { "date": result["reassessed_at"], "old_action": "", "new_action": result["action"], "reason": update_reason, "trigger": "初始创建", } # 合并changelog old_changelog = old_entry.get("changelog", []) if old_entry else [] if changelog_entry: new_entry["changelog"] = old_changelog + [changelog_entry] else: new_entry["changelog"] = old_changelog # 保留执行记录 if old_entry and old_entry.get("execution"): new_entry["execution"] = old_entry["execution"] elif stock.get("analysis", {}).get("status") == "executing": new_entry["execution"] = { "status": "executing", "entry_price": cost if cost else 0, "shares": shares, "notes": "", } # --- 自动标记 current_recommend --- # 只在真正执行中的持仓才自动推荐:execution.status 为 executing 或 partial_exit exec_status = old_entry.get("execution", {}).get("status", "") if old_entry else "" is_active = exec_status in ("executing", "partial_exit") profit_pct = (price - cost) / cost * 100 if cost else 0 is_deep_loss_stock = profit_pct < -20 rr = result.get("rr_ratio", 0) ts = result.get("timing_signal", "") note = result.get("action_note", "") # 计算是否在/接近买入区 entry_low_val = result.get("entry_low", 0) entry_high_val = result.get("entry_high", 0) in_buy_zone = (entry_low_val > 0 and entry_high_val > 0 and entry_low_val <= price <= entry_high_val) near_buy_zone_low = (entry_low_val > 0 and price >= entry_low_val * 0.98 and price <= entry_high_val) # 推荐条件:必须是执行中的持仓 + 基本面条件达标 is_recommendable = ( is_active and not is_deep_loss_stock and rr >= 1.5 and ts != "neutral" and "不建议" not in note ) if is_recommendable: new_entry["tag"] = "current_recommend" else: # 不清除 active_manual(用户手动标记),只清除自动推荐的 old_tag = old_entry.get("tag", "") if old_entry else "" if old_tag != "active_manual": new_entry.pop("tag", None) decisions.append(new_entry) except Exception as e: results.append({"code": code, "name": name, "error": str(e)}) errors += 1 if stdout: print(f" ❌ {name}({code}): {e}") # 写回数据文件 — 保留现有字段(现金、总资产等)不丢 try: existing_pf = mo_data.read_portfolio() except Exception: existing_pf = {} # 保留 price/change_pct — price_monitor 维护的实时价,regenerate_all 不应清除 _existing_holdings_map = {} for _h in existing_pf.get('holdings', []): if _h.get('code'): _existing_holdings_map[_h['code']] = _h _new_holdings = pf.get("holdings", []) for _h in _new_holdings: _code = _h.get('code') if _code and _code in _existing_holdings_map: _old = _existing_holdings_map[_code] _h['price'] = _old.get('price', 0) _h['change_pct'] = _old.get('change_pct', 0) existing_pf["holdings"] = _new_holdings existing_pf["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M') # ── Watchlist ↔ Holdings 双向自动迁移(2026-06-27 Dad要求)── # ① 持仓已有 → 从自选移除(买入自动清除) wl_codes = {s.get("code") for s in wl.get("stocks", []) if s.get("code")} pf_codes = {h.get("code") for h in _new_holdings if h.get("code") and h.get("shares", 0) > 0} removed_from_wl = [] for h_code in wl_codes & pf_codes: # 持仓>0且量够 → 自选移除 wl["stocks"] = [s for s in wl.get("stocks", []) if s.get("code") != h_code] removed_from_wl.append(h_code) if removed_from_wl and stdout: print(f" 自选→持仓自动移除: {', '.join(removed_from_wl)}") # ② 清仓/卖光 → 加回自选(只要仍有关注价值) added_to_wl = [] old_pf_codes = {_h.get("code") for _h in existing_pf.get("holdings", []) if _h.get("code")} sold_codes = old_pf_codes - pf_codes # 曾持仓但现在没有(或不在了) for sc in sold_codes: # 已有自选就不重复加 if sc in wl_codes: continue # 从现有decisions看是否有关注价值 for d in decisions: if d.get("code") == sc and d.get("entry_low") and d.get("entry_high"): wl["stocks"].append({ "code": sc, "name": d.get("name", sc), "entry_low": d.get("entry_low"), "entry_high": d.get("entry_high"), "stop_loss": d.get("stop_loss", 0), "analysis": {"action": d.get("action", ""), "tech_snapshot": d.get("tech_snapshot", "")} }) added_to_wl.append(sc) break if added_to_wl and stdout: print(f" 清仓→自选自动加入: {', '.join(added_to_wl)}") # DB 写入(替代 JSON dump — 强制币种约束) try: from mofin_db import get_conn, write_holdings_batch, write_portfolio_summary, write_watchlist_stock, write_holding_strategy conn = get_conn() write_holdings_batch(conn, existing_pf.get('holdings', [])) write_portfolio_summary(conn, existing_pf) for s in wl.get('stocks', []): s.setdefault('currency', 'CNY') write_watchlist_stock(conn, s) for d in decisions: write_holding_strategy(conn, d.get('code', ''), d.get('name', ''), d) conn.close() except Exception as e: print(f" [DB写入失败] {e}", flush=True) # JSON 冷备 json.dump(existing_pf, open(PORTFOLIO_PATH, "w"), ensure_ascii=False, indent=2) json.dump(wl, open(WATCHLIST_PATH, "w"), ensure_ascii=False, indent=2) # 写 decisions.json decisions_path = "/home/hmo/web-dashboard/data/decisions.json" decisions_data = { "decisions": decisions, # 全部保留 "total": len(decisions), "regenerated_at": datetime.now().strftime('%Y-%m-%d %H:%M'), } json.dump(decisions_data, open(decisions_path, "w"), ensure_ascii=False, indent=2) # DB 已在上方写入(和 portfolio/watchlist 一起) # 记录策略→提示词版本关联 if HAS_PROMPT_TRACKING: try: for d in decisions: if d.get("code") and d.get("action"): record_strategy_generation( d["code"], d.get("name", ""), d.get("action", "") ) except Exception as e: if stdout: print(f" ⚠️ 提示词版本追踪失败: {e}", file=sys.stderr) # 刷新多周期缓存到磁盘 try: import multi_timeframe as _mtf _mtf.flush_mtf_cache() except Exception: pass summary = {"total": total, "ok": ok, "errors": errors} if stdout: print(f"\n✅ 全量重评完成: {ok}/{total}成功, {errors}错误") return summary if __name__ == "__main__": regenerate_all()