From 0aef122b69010d001453e539bf4e36e40ef4ccc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=A5=E5=BE=AE?= Date: Wed, 1 Jul 2026 22:56:24 +0800 Subject: [PATCH] feat: mofin_db get_price_from_db/get_prices_batch helpers + strategy_evaluator DB-first --- mofin_db.py | 68 +++++++++++++++++++++++++++++++++++++++++++ strategy_evaluator.py | 14 ++++++++- 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/mofin_db.py b/mofin_db.py index 2b8fff9..d8a6ae2 100644 --- a/mofin_db.py +++ b/mofin_db.py @@ -833,6 +833,74 @@ def query_strategy_evaluations(conn: sqlite3.Connection, code: str = None) -> li # ═══════════════════════════════════════════════════════════ def query_latest_market(conn: sqlite3.Connection) -> dict: + """获取最新一次市场快照(含 sector 详情)""" + row = conn.execute( + "SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 1").fetchone() + if not row: + return {} + snap = dict(row) + # 关联 sectors + sectors = conn.execute( + "SELECT * FROM sector_snapshots WHERE snapshot_id = ? ORDER BY change_pct DESC", + (snap["id"],)).fetchall() + snap["sectors"] = [dict(r) for r in sectors] + snap["top_gainers"] = [dict(r) for r in sectors[:5]] + snap["top_losers"] = [dict(r) for r in sectors[-3:]] + return snap + + +# ═══════════════════════════════════════════════════════════════════ +# 通用工具 +# ═══════════════════════════════════════════════════════════════════ + +def get_price_from_db(code: str) -> tuple[float | None, float | None]: + """从 DB 读取最新价格(price_monitor 维护)。 + 返回 (price, change_pct) 或 (None, None) + + 所有脚本应优先调用此函数,DB 无数据时才拉腾讯 API。 + """ + try: + import sqlite3 + db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db') + db.row_factory = sqlite3.Row + row = db.execute( + "SELECT price, change_pct FROM holdings WHERE code=? AND is_active=1", (str(code),) + ).fetchone() + if not row: + row = db.execute( + "SELECT price FROM holding_strategies WHERE code=? AND status='active' ORDER BY updated_at DESC LIMIT 1", (str(code),) + ).fetchone() + db.close() + if row: + return (row['price'], row['change_pct'] if 'change_pct' in row.keys() else None) + except Exception: + pass + return (None, None) + + +def get_prices_batch_from_db(codes: list[str]) -> dict: + """从 DB 批量读取价格。返回 {code: (price, change_pct)}""" + results = {} + if not codes: + return results + try: + import sqlite3 + db = sqlite3.connect('/home/hmo/web-dashboard/data/mofin.db') + db.row_factory = sqlite3.Row + for code in codes: + row = db.execute( + "SELECT price, change_pct FROM holdings WHERE code=? AND is_active=1", (str(code),) + ).fetchone() + if not row: + row = db.execute( + "SELECT price FROM holding_strategies WHERE code=? AND status='active' ORDER BY updated_at DESC LIMIT 1", (str(code),) + ).fetchone() + if row and row['price']: + results[str(code)] = (row['price'], row['change_pct'] if 'change_pct' in row.keys() else 0) + db.close() + except Exception: + pass + return results """最新一次市场快照(含板块数据)""" snap = conn.execute( "SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 1").fetchone() diff --git a/strategy_evaluator.py b/strategy_evaluator.py index 82b99fc..fe49b07 100644 --- a/strategy_evaluator.py +++ b/strategy_evaluator.py @@ -44,9 +44,21 @@ def save_json(path, data): def fetch_prices(codes): - """批量拉腾讯行情""" + """批量拉价格。DB 优先(price_monitor 维护),腾讯 API fallback""" if not codes: return {} + + # 主通道: DB + try: + from mofin_db import get_prices_batch_from_db + db_results = get_prices_batch_from_db(codes) + if db_results: + return {code: {"name": "", "price": p, "prev_close": 0, "change_pct": chg or 0, + "high": 0, "low": 0} for code, (p, chg) in db_results.items()} + except Exception: + pass + + # Fallback: 腾讯 API symbols = [] code_map = {} for c in codes: