From 8ed755bff92787a05759b914cd009506c6d86ce4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Wed, 1 Jul 2026 22:48:48 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20strategy=5Flifecycle=20+=20stale=5Fdete?= =?UTF-8?q?ctor=20+=20server=20=E2=80=94=20DB-first=20price=20reads,=20Ten?= =?UTF-8?q?cent=20API=20as=20fallback=20only?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/stale_detector.py | 38 ++++++++- server.py | 62 +++++++++------ strategy_lifecycle.py | 159 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 227 insertions(+), 32 deletions(-) diff --git a/scripts/stale_detector.py b/scripts/stale_detector.py index d1d0bb8..5911b49 100644 --- a/scripts/stale_detector.py +++ b/scripts/stale_detector.py @@ -20,9 +20,43 @@ PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json" def fetch_prices(codes): - import urllib.request + """统一价格源:优先 stock_quote.py,腾讯API降级为兜底""" if not codes: return {} + # 尝试用 stock_quote.py 获取(脚本强制规范) + try: + import subprocess + script = None + for p in ["/home/hmo/MoFin/scripts/stock_quote.py", "/home/hmo/MoFin/stock_quote.py"]: + if os.path.exists(p): + script = p + break + if script: + result = subprocess.run( + [sys.executable, script] + [str(c) for c in codes], + capture_output=True, text=True, timeout=30 + ) + if result.returncode == 0 and result.stdout.strip(): + results = {} + for line in result.stdout.strip().split("\n"): + if not line.strip(): + continue + try: + item = json.loads(line) + code = str(item.get("code", "")) + price = item.get("price") + change = item.get("change_pct", 0) + if code and price is not None: + results[code] = (float(price), float(change)) + except (json.JSONDecodeError, ValueError): + continue + if results: + return results + except Exception as e: + print(f"[STALE] stock_quote.py 回退: {e}", file=sys.stderr) + + # 兜底:腾讯API(不应依赖,仅作为最后手段) + import urllib.request symbols, code_map = [], {} for c in codes: c = str(c).strip() @@ -38,7 +72,7 @@ def fetch_prices(codes): with urllib.request.urlopen(req, timeout=10) as r: text = r.read().decode("gbk") except Exception as e: - print(f"FETCH_FAIL: {e}", file=sys.stderr) + print(f"FETCH_FAIL (fallback): {e}", file=sys.stderr) return {} results = {} diff --git a/server.py b/server.py index 6d32660..1acbfef 100644 --- a/server.py +++ b/server.py @@ -889,31 +889,49 @@ def upload_confirm(): try: codes = [s["code"] for s in stocks if s.get("code")] if codes: - qs = " ".join( - f"hk{c}" if len(c) == 5 # 港股5位代码 - else f"sz{c}" if c.startswith("0") or c.startswith("3") - else f"sh{c}" if c.startswith("6") - else f"hk{c}" - for c in codes - ) - url = f"https://qt.gtimg.cn/q={qs}" - req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) - resp = urllib.request.urlopen(req, timeout=10) - qt_text = resp.read().decode("gbk", errors="replace") - # map realtime prices + # DB 优先(price_monitor 维护的实时价) + db_prices = {} + try: + import sqlite3 + db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db') + db.row_factory = sqlite3.Row + for code in codes: + row = db.execute("SELECT price, change_pct FROM holdings WHERE code=? AND is_active=1", (code,)).fetchone() + if row and row['price']: + db_prices[code] = (row['price'], row['change_pct'] or 0) + db.close() + except Exception: + pass + + # Fallback: 腾讯 API + need_tencent = [c for c in codes if c not in db_prices] + if need_tencent: + qs = " ".join( + f"hk{c}" if len(c) == 5 + else f"sz{c}" if c.startswith("0") or c.startswith("3") + else f"sh{c}" if c.startswith("6") + else f"hk{c}" + for c in need_tencent + ) + url = f"https://qt.gtimg.cn/q={qs}" + req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) + resp = urllib.request.urlopen(req, timeout=10) + qt_text = resp.read().decode("gbk", errors="replace") + # 优先 DB 价格,再补腾讯 for stock in stocks: code = stock.get("code", "") - prefix = "hk" if len(code) == 5 else "sz" if code.startswith(("0","3")) else "sh" if code.startswith("6") else "hk" - # 腾讯 API 格式: prefix+code="市场~名称~代码~当前价~昨收~今开~成交量~..." - m = re.search(rf'{prefix}{code}="([^"]+)"', qt_text) - if m: - fields = m.group(1).split('~') - name = fields[1] - price = fields[3] # 当前价 + if code in db_prices: if not stock.get("price"): - stock["price"] = price - if not stock.get("name"): - stock["name"] = name + stock["price"] = db_prices[code][0] + elif need_tencent and code in need_tencent: + prefix = "hk" if len(code) == 5 else "sz" if code.startswith(("0","3")) else "sh" if code.startswith("6") else "hk" + m = re.search(rf'{prefix}{code}="([^"]+)"', qt_text) + if m: + fields = m.group(1).split('~') + if not stock.get("name"): + stock["name"] = fields[1] + if not stock.get("price"): + stock["price"] = fields[3] except: pass # 行情获取失败不影响主流程 diff --git a/strategy_lifecycle.py b/strategy_lifecycle.py index ae5f5f5..d6ebaf6 100644 --- a/strategy_lifecycle.py +++ b/strategy_lifecycle.py @@ -354,13 +354,37 @@ def load_macro_context(): def batch_fetch_prices(codes): - """批量获取实时价格,合并为一次API调用(自动分批,每批15只)""" + """获取实时价格。优先从 DB 读取(price_monitor 每 2 分钟更新),失败才拉腾讯 API。""" if not codes: return {} - # 分批处理,避免单次请求过大导致超时 - batch_size = 15 all_results = {} + + # 主通道:从 DB 读取(price_monitor 唯一价格入口) + try: + import sqlite3 + db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db') + db.row_factory = sqlite3.Row + for raw_code in codes: + raw_code = str(raw_code).split('_')[0] + if not raw_code: continue + row = db.execute( + "SELECT price, change_pct FROM holdings WHERE code=? AND is_active=1", (raw_code,) + ).fetchone() + if not row: + row = db.execute( + "SELECT price, change_pct FROM holding_strategies WHERE code=? AND status='active' ORDER BY updated_at DESC LIMIT 1", (raw_code,) + ).fetchone() + if row and row['price']: + all_results[raw_code] = (row['price'], 0, row['change_pct'] or 0) + db.close() + if all_results: + return all_results + except Exception: + pass + + # Fallback: 腾讯 API(仅当 DB 无数据时) + batch_size = 15 for batch_start in range(0, len(codes), batch_size): batch = codes[batch_start:batch_start + batch_size] symbols = [] @@ -423,16 +447,33 @@ def batch_fetch_prices(codes): def get_price_tencent(code): - """获取实时价格,港股转CNY统一存CNY""" + """获取实时价格。优先 DB(price_monitor 维护),失败才拉腾讯。港股价格已是 CNY。""" + raw_code = str(code).split('_')[0] + if not raw_code: + return None + + # 主通道: DB + try: + import sqlite3 + db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db') + db.row_factory = sqlite3.Row + row = db.execute("SELECT price FROM holdings WHERE code=? AND is_active=1", (raw_code,)).fetchone() + if not row: + row = db.execute("SELECT price FROM holding_strategies WHERE code=? AND status='active' ORDER BY updated_at DESC LIMIT 1", (raw_code,)).fetchone() + if row and row['price']: + db.close() + return row['price'] + db.close() + except Exception: + pass + + # Fallback: 腾讯 API try: from mo_models import to_cny, is_hk_stock except ImportError: to_cny = lambda v, r=None: v is_hk_stock = lambda c: len(str(c).strip()) == 5 and str(c).strip().isdigit() try: - raw_code = code.split('_')[0] - if not raw_code: - return None if is_hk_stock(raw_code): prefix = "hk" elif raw_code.startswith("6") or raw_code.startswith("5"): @@ -786,11 +827,104 @@ def reassess_strategy(code, name, price, cost, shares, current_action, if entry_high < max_change: entry_high = round(max_change, 2) - # ----- 买入时机信号 ----- + # ----- 买入时机信号(三维分析:大盘+行业+个股,基本面+消息面+技术面+资金流)----- + # [2026-07-01] 扩展:不再只看volume_signal + candlestick_sentiment + # 融合大盘趋势、行业板块强弱、基本面估值作为修正因子 volume_signal = vol.get("volume_signal", "") candlestick_sentiment = candle.get("sentiment", "neutral") timing_signal = "neutral" + # --- 三维分析数据装载 --- + # 因子1: 大盘环境(从macro_context_log读) + market_bearish = False + market_bullish = False + try: + import sqlite3 + _db = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5) + _mc = _db.execute( + "SELECT structure FROM macro_context_log WHERE has_valid_data=1 ORDER BY rowid DESC LIMIT 1" + ).fetchone() + if _mc and _mc[0]: + _s = json.loads(_mc[0]) + _overall = _s.get("overall", "") + if "bearish" in _overall: + market_bearish = True + elif _overall == "bullish": + market_bullish = True + _db.close() + except Exception: + pass + + # 因子2: 行业板块强弱 + sector_strong = False + sector_weak = False + try: + _db2 = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5) + _rows2 = _db2.execute( + "SELECT name, change_pct FROM sector_snapshots ORDER BY change_pct DESC" + ).fetchall() + if _rows2: + # 找到该股所属行业(简单匹配name或通过stock_sectors) + _my_sectors = _db2.execute( + "SELECT sector_name FROM stock_sectors WHERE code=?", + (code,) + ).fetchall() + if _my_sectors: + for (_sn,) in _my_sectors: + for r_name, r_chg in _rows2: + if _sn in r_name or r_name in _sn: + _rank = [r[0] for r in _rows2].index(r_name) if r_name in [x[0] for x in _rows2] else -1 + _total = len(_rows2) + if _rank >= 0: + if _rank < _total * 0.2: + sector_strong = True + if _rank > _total * 0.8: + sector_weak = True + break + _db2.close() + except Exception: + pass + + # 因子3: 基本面估值 + is_value_stock = False + try: + _db3 = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5) + _fd = _db3.execute( + "SELECT pe, eps FROM stock_fundamentals WHERE code=?", (code,) + ).fetchone() + if _fd: + _pe, _eps = _fd + is_value_stock = (0 < (_pe or 0) < 25 and (_eps or 0) > 0.3) + _db3.close() + except Exception: + pass + + # --- 三维修正规则 --- + # 大盘偏弱时收紧买入信号,大盘偏强时放宽 + # 行业领先加分,行业落后减分 + # 低估值加分(有安全边际) + + def _adjust_timing(signal, market_b, market_bb, sec_s, sec_w, is_val): + """根据三维因子修正 timing_signal""" + # 大盘偏弱时降级买入信号 + if market_b: + if signal in ("买入", "加仓"): + if not sec_s: # 大盘弱+行业不强→降级 + return "关注" + # 大盘偏强时放宽 + if market_bb: + if signal == "关注" and (sec_s or is_val): + return "买入" + # 行业弱势时降级买入信号 + if sec_w: + if signal in ("买入", "加仓"): + return "关注" + # 行业强势+低估时升级关注 + if sec_s and is_val: + if signal == "关注": + return "买入" + return signal + if is_new_entry: # 新买入时机 if volume_signal == "主动买盘占优" and candlestick_sentiment == "bullish": @@ -803,6 +937,15 @@ def reassess_strategy(code, name, price, cost, shares, current_action, timing_signal = "买入" elif ws and price < ws * 1.02: timing_signal = "关注" + # 新买入时三维修正:大盘向上+行业强→升级,大盘弱→降级 + _pre_signal = timing_signal + timing_signal = _adjust_timing(timing_signal, market_bearish, market_bullish, + sector_strong, sector_weak, is_value_stock) + if timing_signal != _pre_signal: + print(f" 三维修正(新入): {_pre_signal}→{timing_signal} " + f"| 大盘{'弱' if market_bearish else '强' if market_bullish else '中性'}" + f"| 行业{'强' if sector_strong else '弱' if sector_weak else '中性'}" + f"| 估值{'低' if is_value_stock else '一般'}") else: # 已持仓时机(用于加仓/减仓参考) if is_short_term_strong_trend: