feat: strategy_lifecycle + stale_detector + server — DB-first price reads, Tencent API as fallback only

This commit is contained in:
知微
2026-07-01 22:48:48 +08:00
parent 849495c4ba
commit 8ed755bff9
3 changed files with 227 additions and 32 deletions
+36 -2
View File
@@ -20,9 +20,43 @@ PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
def fetch_prices(codes): def fetch_prices(codes):
import urllib.request """统一价格源:优先 stock_quote.py,腾讯API降级为兜底"""
if not codes: if not codes:
return {} return {}
# 尝试用 stock_quote.py 获取(脚本强制规范)
try:
import subprocess
script = None
for p in ["/home/hmo/MoFin/scripts/stock_quote.py", "/home/hmo/MoFin/stock_quote.py"]:
if os.path.exists(p):
script = p
break
if script:
result = subprocess.run(
[sys.executable, script] + [str(c) for c in codes],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and result.stdout.strip():
results = {}
for line in result.stdout.strip().split("\n"):
if not line.strip():
continue
try:
item = json.loads(line)
code = str(item.get("code", ""))
price = item.get("price")
change = item.get("change_pct", 0)
if code and price is not None:
results[code] = (float(price), float(change))
except (json.JSONDecodeError, ValueError):
continue
if results:
return results
except Exception as e:
print(f"[STALE] stock_quote.py 回退: {e}", file=sys.stderr)
# 兜底:腾讯API(不应依赖,仅作为最后手段)
import urllib.request
symbols, code_map = [], {} symbols, code_map = [], {}
for c in codes: for c in codes:
c = str(c).strip() c = str(c).strip()
@@ -38,7 +72,7 @@ def fetch_prices(codes):
with urllib.request.urlopen(req, timeout=10) as r: with urllib.request.urlopen(req, timeout=10) as r:
text = r.read().decode("gbk") text = r.read().decode("gbk")
except Exception as e: except Exception as e:
print(f"FETCH_FAIL: {e}", file=sys.stderr) print(f"FETCH_FAIL (fallback): {e}", file=sys.stderr)
return {} return {}
results = {} results = {}
+40 -22
View File
@@ -889,31 +889,49 @@ def upload_confirm():
try: try:
codes = [s["code"] for s in stocks if s.get("code")] codes = [s["code"] for s in stocks if s.get("code")]
if codes: if codes:
qs = " ".join( # DB 优先(price_monitor 维护的实时价)
f"hk{c}" if len(c) == 5 # 港股5位代码 db_prices = {}
else f"sz{c}" if c.startswith("0") or c.startswith("3") try:
else f"sh{c}" if c.startswith("6") import sqlite3
else f"hk{c}" db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db')
for c in codes db.row_factory = sqlite3.Row
) for code in codes:
url = f"https://qt.gtimg.cn/q={qs}" row = db.execute("SELECT price, change_pct FROM holdings WHERE code=? AND is_active=1", (code,)).fetchone()
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) if row and row['price']:
resp = urllib.request.urlopen(req, timeout=10) db_prices[code] = (row['price'], row['change_pct'] or 0)
qt_text = resp.read().decode("gbk", errors="replace") db.close()
# map realtime prices except Exception:
pass
# Fallback: 腾讯 API
need_tencent = [c for c in codes if c not in db_prices]
if need_tencent:
qs = " ".join(
f"hk{c}" if len(c) == 5
else f"sz{c}" if c.startswith("0") or c.startswith("3")
else f"sh{c}" if c.startswith("6")
else f"hk{c}"
for c in need_tencent
)
url = f"https://qt.gtimg.cn/q={qs}"
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = urllib.request.urlopen(req, timeout=10)
qt_text = resp.read().decode("gbk", errors="replace")
# 优先 DB 价格,再补腾讯
for stock in stocks: for stock in stocks:
code = stock.get("code", "") code = stock.get("code", "")
prefix = "hk" if len(code) == 5 else "sz" if code.startswith(("0","3")) else "sh" if code.startswith("6") else "hk" if code in db_prices:
# 腾讯 API 格式: prefix+code="市场~名称~代码~当前价~昨收~今开~成交量~..."
m = re.search(rf'{prefix}{code}="([^"]+)"', qt_text)
if m:
fields = m.group(1).split('~')
name = fields[1]
price = fields[3] # 当前价
if not stock.get("price"): if not stock.get("price"):
stock["price"] = price stock["price"] = db_prices[code][0]
if not stock.get("name"): elif need_tencent and code in need_tencent:
stock["name"] = name prefix = "hk" if len(code) == 5 else "sz" if code.startswith(("0","3")) else "sh" if code.startswith("6") else "hk"
m = re.search(rf'{prefix}{code}="([^"]+)"', qt_text)
if m:
fields = m.group(1).split('~')
if not stock.get("name"):
stock["name"] = fields[1]
if not stock.get("price"):
stock["price"] = fields[3]
except: except:
pass # 行情获取失败不影响主流程 pass # 行情获取失败不影响主流程
+151 -8
View File
@@ -354,13 +354,37 @@ def load_macro_context():
def batch_fetch_prices(codes): def batch_fetch_prices(codes):
"""批量获取实时价格,合并为一次API调用(自动分批,每批15只)""" """获取实时价格。优先从 DB 读取(price_monitor 每 2 分钟更新),失败才拉腾讯 API。"""
if not codes: if not codes:
return {} return {}
# 分批处理,避免单次请求过大导致超时
batch_size = 15
all_results = {} all_results = {}
# 主通道:从 DB 读取(price_monitor 唯一价格入口)
try:
import sqlite3
db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db')
db.row_factory = sqlite3.Row
for raw_code in codes:
raw_code = str(raw_code).split('_')[0]
if not raw_code: continue
row = db.execute(
"SELECT price, change_pct FROM holdings WHERE code=? AND is_active=1", (raw_code,)
).fetchone()
if not row:
row = db.execute(
"SELECT price, change_pct FROM holding_strategies WHERE code=? AND status='active' ORDER BY updated_at DESC LIMIT 1", (raw_code,)
).fetchone()
if row and row['price']:
all_results[raw_code] = (row['price'], 0, row['change_pct'] or 0)
db.close()
if all_results:
return all_results
except Exception:
pass
# Fallback: 腾讯 API(仅当 DB 无数据时)
batch_size = 15
for batch_start in range(0, len(codes), batch_size): for batch_start in range(0, len(codes), batch_size):
batch = codes[batch_start:batch_start + batch_size] batch = codes[batch_start:batch_start + batch_size]
symbols = [] symbols = []
@@ -423,16 +447,33 @@ def batch_fetch_prices(codes):
def get_price_tencent(code): def get_price_tencent(code):
"""获取实时价格,港股转CNY统一存CNY""" """获取实时价格。优先 DBprice_monitor 维护),失败才拉腾讯。港股价格已是 CNY"""
raw_code = str(code).split('_')[0]
if not raw_code:
return None
# 主通道: DB
try:
import sqlite3
db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db')
db.row_factory = sqlite3.Row
row = db.execute("SELECT price FROM holdings WHERE code=? AND is_active=1", (raw_code,)).fetchone()
if not row:
row = db.execute("SELECT price FROM holding_strategies WHERE code=? AND status='active' ORDER BY updated_at DESC LIMIT 1", (raw_code,)).fetchone()
if row and row['price']:
db.close()
return row['price']
db.close()
except Exception:
pass
# Fallback: 腾讯 API
try: try:
from mo_models import to_cny, is_hk_stock from mo_models import to_cny, is_hk_stock
except ImportError: except ImportError:
to_cny = lambda v, r=None: v to_cny = lambda v, r=None: v
is_hk_stock = lambda c: len(str(c).strip()) == 5 and str(c).strip().isdigit() is_hk_stock = lambda c: len(str(c).strip()) == 5 and str(c).strip().isdigit()
try: try:
raw_code = code.split('_')[0]
if not raw_code:
return None
if is_hk_stock(raw_code): if is_hk_stock(raw_code):
prefix = "hk" prefix = "hk"
elif raw_code.startswith("6") or raw_code.startswith("5"): elif raw_code.startswith("6") or raw_code.startswith("5"):
@@ -786,11 +827,104 @@ def reassess_strategy(code, name, price, cost, shares, current_action,
if entry_high < max_change: if entry_high < max_change:
entry_high = round(max_change, 2) entry_high = round(max_change, 2)
# ----- 买入时机信号 ----- # ----- 买入时机信号(三维分析:大盘+行业+个股,基本面+消息面+技术面+资金流)-----
# [2026-07-01] 扩展:不再只看volume_signal + candlestick_sentiment
# 融合大盘趋势、行业板块强弱、基本面估值作为修正因子
volume_signal = vol.get("volume_signal", "") volume_signal = vol.get("volume_signal", "")
candlestick_sentiment = candle.get("sentiment", "neutral") candlestick_sentiment = candle.get("sentiment", "neutral")
timing_signal = "neutral" timing_signal = "neutral"
# --- 三维分析数据装载 ---
# 因子1: 大盘环境(从macro_context_log读)
market_bearish = False
market_bullish = False
try:
import sqlite3
_db = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5)
_mc = _db.execute(
"SELECT structure FROM macro_context_log WHERE has_valid_data=1 ORDER BY rowid DESC LIMIT 1"
).fetchone()
if _mc and _mc[0]:
_s = json.loads(_mc[0])
_overall = _s.get("overall", "")
if "bearish" in _overall:
market_bearish = True
elif _overall == "bullish":
market_bullish = True
_db.close()
except Exception:
pass
# 因子2: 行业板块强弱
sector_strong = False
sector_weak = False
try:
_db2 = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5)
_rows2 = _db2.execute(
"SELECT name, change_pct FROM sector_snapshots ORDER BY change_pct DESC"
).fetchall()
if _rows2:
# 找到该股所属行业(简单匹配name或通过stock_sectors
_my_sectors = _db2.execute(
"SELECT sector_name FROM stock_sectors WHERE code=?",
(code,)
).fetchall()
if _my_sectors:
for (_sn,) in _my_sectors:
for r_name, r_chg in _rows2:
if _sn in r_name or r_name in _sn:
_rank = [r[0] for r in _rows2].index(r_name) if r_name in [x[0] for x in _rows2] else -1
_total = len(_rows2)
if _rank >= 0:
if _rank < _total * 0.2:
sector_strong = True
if _rank > _total * 0.8:
sector_weak = True
break
_db2.close()
except Exception:
pass
# 因子3: 基本面估值
is_value_stock = False
try:
_db3 = sqlite3.connect("/home/hmo/MoFin/data/mofin.db", timeout=5)
_fd = _db3.execute(
"SELECT pe, eps FROM stock_fundamentals WHERE code=?", (code,)
).fetchone()
if _fd:
_pe, _eps = _fd
is_value_stock = (0 < (_pe or 0) < 25 and (_eps or 0) > 0.3)
_db3.close()
except Exception:
pass
# --- 三维修正规则 ---
# 大盘偏弱时收紧买入信号,大盘偏强时放宽
# 行业领先加分,行业落后减分
# 低估值加分(有安全边际)
def _adjust_timing(signal, market_b, market_bb, sec_s, sec_w, is_val):
"""根据三维因子修正 timing_signal"""
# 大盘偏弱时降级买入信号
if market_b:
if signal in ("买入", "加仓"):
if not sec_s: # 大盘弱+行业不强→降级
return "关注"
# 大盘偏强时放宽
if market_bb:
if signal == "关注" and (sec_s or is_val):
return "买入"
# 行业弱势时降级买入信号
if sec_w:
if signal in ("买入", "加仓"):
return "关注"
# 行业强势+低估时升级关注
if sec_s and is_val:
if signal == "关注":
return "买入"
return signal
if is_new_entry: if is_new_entry:
# 新买入时机 # 新买入时机
if volume_signal == "主动买盘占优" and candlestick_sentiment == "bullish": if volume_signal == "主动买盘占优" and candlestick_sentiment == "bullish":
@@ -803,6 +937,15 @@ def reassess_strategy(code, name, price, cost, shares, current_action,
timing_signal = "买入" timing_signal = "买入"
elif ws and price < ws * 1.02: elif ws and price < ws * 1.02:
timing_signal = "关注" timing_signal = "关注"
# 新买入时三维修正:大盘向上+行业强→升级,大盘弱→降级
_pre_signal = timing_signal
timing_signal = _adjust_timing(timing_signal, market_bearish, market_bullish,
sector_strong, sector_weak, is_value_stock)
if timing_signal != _pre_signal:
print(f" 三维修正(新入): {_pre_signal}{timing_signal} "
f"| 大盘{'' if market_bearish else '' if market_bullish else '中性'}"
f"| 行业{'' if sector_strong else '' if sector_weak else '中性'}"
f"| 估值{'' if is_value_stock else '一般'}")
else: else:
# 已持仓时机(用于加仓/减仓参考) # 已持仓时机(用于加仓/减仓参考)
if is_short_term_strong_trend: if is_short_term_strong_trend: