From 1610f184a03840344088b1dc53f64a87080d305d Mon Sep 17 00:00:00 2001 From: hmo Date: Sat, 20 Jun 2026 16:59:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=A1=A5=E5=85=A8=20SQLite=20=E8=A1=A8?= =?UTF-8?q?=E7=BB=93=E6=9E=84=20+=20=E6=9F=A5=E8=AF=A2=E5=87=BD=E6=95=B0?= =?UTF-8?q?=20+=20=E8=BF=81=E7=A7=BB=E8=A6=86=E7=9B=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit mofin_db.py 新增: - 4 张表: portfolio_summary, advice_timeline, accuracy_stats, strategy_feedback - 18 个查询函数: query_holdings, query_watchlist, query_strategies, query_advice_timeline, query_candidates, query_candidate_scores, query_price_events, query_price_events_by_date, query_stock_sectors, query_sector_stocks, query_accuracy_stats, query_strategy_feedback, query_strategy_evaluations, query_latest_market, query_holding_by_code, query_portfolio_summary migrate_all.py 新增: - 4 个迁移函数: migrate_portfolio_summary, migrate_advice_timeline, migrate_accuracy_stats, migrate_strategy_feedback - 迁移量: portfolio_summary(1), advice_timeline(2547), accuracy_stats(1), strategy_feedback(37) 现在 13 张表全部覆盖,JSON→SQLite 数据完整迁移 --- migrate_all.py | 124 +++++++++++++++++++++ mofin_db.py | 289 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 410 insertions(+), 3 deletions(-) diff --git a/migrate_all.py b/migrate_all.py index 7c8d8d9..2775ca4 100644 --- a/migrate_all.py +++ b/migrate_all.py @@ -387,6 +387,98 @@ def migrate_sectors(conn) -> int: return count +def migrate_portfolio_summary(conn, pf: dict) -> int: + """portfolio.json 顶层字段 → portfolio_summary""" + try: + conn.execute( + "INSERT OR REPLACE INTO portfolio_summary (id, total_assets, stock_value, cash, position_pct, total_pnl, updated_at) " + "VALUES (1, ?, ?, ?, ?, ?, ?)", + (pf.get("total_assets"), pf.get("stock_value"), pf.get("cash"), + pf.get("position_pct"), pf.get("total_pnl"), pf.get("updated_at", datetime.now().isoformat()))) + conn.commit() + return 1 + except Exception: + return 0 + + +def migrate_advice_timeline(conn, dec: dict) -> int: + """decisions.json advice_timeline[] → advice_timeline""" + count = 0 + for d in dec.get("decisions", []): + code = _normalize_code(d.get("code", "")) + if not code: + continue + timeline = d.get("advice_timeline", []) + for t in timeline: + try: + conn.execute( + "INSERT INTO advice_timeline (code, date, direction, price, summary, status, evaluated, result, evaluated_at, report_id) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (code, t.get("date"), t.get("direction"), t.get("price"), + t.get("summary"), t.get("status"), + 1 if t.get("evaluated") else 0, t.get("result"), + t.get("evaluated_at"), t.get("report_id"))) + count += 1 + except Exception: + pass + conn.commit() + return count + + +def migrate_accuracy_stats(conn, acc: dict) -> int: + """accuracy_stats.json → accuracy_stats""" + try: + conn.execute( + "INSERT OR REPLACE INTO accuracy_stats (id, period_start, period_end, total_advice, correct, wrong, partial, unknown, pending, ignored, evaluated, accuracy_pct, " + "phase1_correct, phase1_wrong, phase1_pending, phase1_accuracy, " + "phase2_correct, phase2_wrong, phase2_pending, phase2_accuracy, total_evaluated, updated_at) " + "VALUES (1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (acc.get("period_start"), acc.get("period_end"), + acc.get("total_advice"), acc.get("correct"), acc.get("wrong"), + acc.get("partial"), acc.get("unknown"), acc.get("pending"), + acc.get("ignored"), acc.get("evaluated"), acc.get("accuracy_pct"), + acc.get("phase1", {}).get("correct", 0) if isinstance(acc.get("phase1"), dict) else 0, + acc.get("phase1", {}).get("wrong", 0) if isinstance(acc.get("phase1"), dict) else 0, + acc.get("phase1", {}).get("pending", 0) if isinstance(acc.get("phase1"), dict) else 0, + acc.get("phase1", {}).get("accuracy_pct", 0) if isinstance(acc.get("phase1"), dict) else 0, + acc.get("phase2", {}).get("correct", 0) if isinstance(acc.get("phase2"), dict) else 0, + acc.get("phase2", {}).get("wrong", 0) if isinstance(acc.get("phase2"), dict) else 0, + acc.get("phase2", {}).get("pending", 0) if isinstance(acc.get("phase2"), dict) else 0, + acc.get("phase2", {}).get("accuracy_pct", 0) if isinstance(acc.get("phase2"), dict) else 0, + acc.get("total_evaluated"), acc.get("updated_at", datetime.now().isoformat()))) + conn.commit() + return 1 + except Exception: + return 0 + + +def migrate_strategy_feedback(conn, sf: dict) -> int: + """strategy_feedback.json → strategy_feedback""" + count = 0 + for f in sf.get("feedback", []): + code = _normalize_code(f.get("code", "")) + if not code: + continue + pc = f.get("phase_check", {}) + try: + conn.execute( + "INSERT INTO strategy_feedback (code, name, evaluated_at, " + "phase1_completed, phase1_result, phase1_completed_at, phase1_price, " + "phase2_completed, phase2_result, phase2_completed_at, days_in_phase1, adjustments_json) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (code, f.get("name"), f.get("evaluated_at"), + 1 if pc.get("phase1_completed") else 0, pc.get("phase1_result"), + pc.get("phase1_completed_at"), pc.get("phase1_price_at_completion"), + 1 if pc.get("phase2_completed") else 0, pc.get("phase2_result"), + pc.get("phase2_completed_at"), pc.get("days_in_phase1"), + json.dumps(f.get("adjustments", []), ensure_ascii=False))) + count += 1 + except Exception: + pass + conn.commit() + return count + + def main(): print("=" * 60) print(" MoFin 数据迁移 → mofin.db") @@ -454,6 +546,34 @@ def main(): totals["sector_mappings"] = n print(f"[8/8] stock_sectors: {n} 条映射") + # 9. portfolio_summary + pf = load_json("portfolio.json") + if pf: + n = migrate_portfolio_summary(conn, pf) + totals["portfolio_summary"] = n + print(f"[9/12] portfolio_summary: {n} 条") + + # 10. advice_timeline + dec = load_json("decisions.json") + if dec: + n = migrate_advice_timeline(conn, dec) + totals["advice_timeline"] = n + print(f"[10/12] advice_timeline: {n} 条") + + # 11. accuracy_stats + acc = load_json("accuracy_stats.json") + if acc: + n = migrate_accuracy_stats(conn, acc) + totals["accuracy_stats"] = n + print(f"[11/12] accuracy_stats: {n} 条") + + # 12. strategy_feedback + sf = load_json("strategy_feedback.json") + if sf: + n = migrate_strategy_feedback(conn, sf) + totals["strategy_feedback"] = n + print(f"[12/12] strategy_feedback: {n} 条") + conn.commit() # ── 验证 ── @@ -470,6 +590,10 @@ def main(): "price_events": "SELECT COUNT(*) FROM price_events", "strategy_evaluations": "SELECT COUNT(*) FROM strategy_evaluations", "stock_sectors": "SELECT COUNT(*) FROM stock_sectors", + "portfolio_summary": "SELECT COUNT(*) FROM portfolio_summary", + "advice_timeline": "SELECT COUNT(*) FROM advice_timeline", + "accuracy_stats": "SELECT COUNT(*) FROM accuracy_stats", + "strategy_feedback": "SELECT COUNT(*) FROM strategy_feedback", } for name, sql in tables.items(): n = conn.execute(sql).fetchone()[0] diff --git a/mofin_db.py b/mofin_db.py index eb39b8b..4f60e27 100644 --- a/mofin_db.py +++ b/mofin_db.py @@ -218,6 +218,79 @@ def init_all_tables(conn: sqlite3.Connection): reason TEXT, created_at TEXT DEFAULT (datetime('now','localtime')) ); + + -- 持仓汇总(portfolio.json 顶层字段) + CREATE TABLE IF NOT EXISTS portfolio_summary ( + id INTEGER PRIMARY KEY CHECK (id = 1), + total_assets REAL, + stock_value REAL, + cash REAL, + position_pct REAL, + total_pnl REAL, + updated_at TEXT + ); + + -- 建议时间线(decisions.json advice_timeline[]) + CREATE TABLE IF NOT EXISTS advice_timeline ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL REFERENCES stocks(code), + date TEXT, + direction TEXT, + price REAL, + summary TEXT, + status TEXT, + evaluated INTEGER DEFAULT 0, + result TEXT, + evaluated_at TEXT, + report_id TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_advice_code ON advice_timeline(code); + + -- 准确率统计(accuracy_stats.json) + CREATE TABLE IF NOT EXISTS accuracy_stats ( + id INTEGER PRIMARY KEY CHECK (id = 1), + period_start TEXT, + period_end TEXT, + total_advice INTEGER DEFAULT 0, + correct INTEGER DEFAULT 0, + wrong INTEGER DEFAULT 0, + partial INTEGER DEFAULT 0, + unknown INTEGER DEFAULT 0, + pending INTEGER DEFAULT 0, + ignored INTEGER DEFAULT 0, + evaluated INTEGER DEFAULT 0, + accuracy_pct REAL, + phase1_correct INTEGER DEFAULT 0, + phase1_wrong INTEGER DEFAULT 0, + phase1_pending INTEGER DEFAULT 0, + phase1_accuracy REAL, + phase2_correct INTEGER DEFAULT 0, + phase2_wrong INTEGER DEFAULT 0, + phase2_pending INTEGER DEFAULT 0, + phase2_accuracy REAL, + total_evaluated INTEGER DEFAULT 0, + updated_at TEXT + ); + + -- 策略反馈(strategy_feedback.json) + CREATE TABLE IF NOT EXISTS strategy_feedback ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + code TEXT NOT NULL REFERENCES stocks(code), + name TEXT, + evaluated_at TEXT, + phase1_completed INTEGER DEFAULT 0, + phase1_result TEXT, + phase1_completed_at TEXT, + phase1_price REAL, + phase2_completed INTEGER DEFAULT 0, + phase2_result TEXT, + phase2_completed_at TEXT, + days_in_phase1 INTEGER, + adjustments_json TEXT, + created_at TEXT DEFAULT (datetime('now','localtime')) + ); + CREATE INDEX IF NOT EXISTS idx_feedback_code ON strategy_feedback(code); """) conn.commit() @@ -449,14 +522,224 @@ def query_db_stats(conn: sqlite3.Connection) -> dict: snap_count = conn.execute("SELECT COUNT(*) FROM market_snapshots").fetchone()[0] sector_count = conn.execute("SELECT COUNT(*) FROM sector_snapshots").fetchone()[0] stock_count = conn.execute("SELECT COUNT(*) FROM stocks").fetchone()[0] - kline_count = conn.execute( - "SELECT COUNT(*) FROM stock_daily").fetchone()[0] + kline_count = conn.execute("SELECT COUNT(*) FROM stock_daily").fetchone()[0] event_count = conn.execute("SELECT COUNT(*) FROM price_events").fetchone()[0] + holding_count = conn.execute("SELECT COUNT(*) FROM holdings").fetchone()[0] + candidate_count = conn.execute("SELECT COUNT(*) FROM candidates").fetchone()[0] latest = conn.execute( "SELECT timestamp, source FROM market_snapshots ORDER BY id DESC LIMIT 1").fetchone() return { "snapshots": snap_count, "sector_rows": sector_count, "stocks": stock_count, "daily_klines": kline_count, - "price_events": event_count, + "price_events": event_count, "holdings": holding_count, + "candidates": candidate_count, "latest_snapshot": dict(latest) if latest else None, } + + +# ═══════════════════════════════════════════════════════════ +# 持仓查询 +# ═══════════════════════════════════════════════════════════ + +def query_holdings(conn: sqlite3.Connection) -> list[dict]: + """持仓列表(含最新策略)""" + rows = conn.execute(""" + SELECT h.code, h.name, h.shares, h.cost, h.position_pct, h.is_active, + hs.stop_loss, hs.take_profit, hs.entry_low, hs.entry_high, + hs.reason as action, hs.created_at as strategy_updated + FROM holdings h + LEFT JOIN holding_strategies hs ON h.code = hs.code + AND hs.id = (SELECT MAX(id) FROM holding_strategies WHERE code = h.code AND strategy_type = 'holding') + WHERE h.is_active = 1 + """).fetchall() + return [dict(r) for r in rows] + + +def query_holding_by_code(conn: sqlite3.Connection, code: str) -> dict | None: + """单只持仓""" + row = conn.execute(""" + SELECT h.*, hs.stop_loss, hs.take_profit, hs.entry_low, hs.entry_high, + hs.reason as action + FROM holdings h + LEFT JOIN holding_strategies hs ON h.code = hs.code + AND hs.id = (SELECT MAX(id) FROM holding_strategies WHERE code = h.code AND strategy_type = 'holding') + WHERE h.code = ? + """, (code,)).fetchone() + return dict(row) if row else None + + +def query_portfolio_summary(conn: sqlite3.Connection) -> dict: + """持仓汇总""" + row = conn.execute("SELECT * FROM portfolio_summary WHERE id = 1").fetchone() + return dict(row) if row else {} + + +# ═══════════════════════════════════════════════════════════ +# 自选股查询 +# ═══════════════════════════════════════════════════════════ + +def query_watchlist(conn: sqlite3.Connection) -> list[dict]: + """自选股列表(含策略)""" + rows = conn.execute(""" + SELECT w.code, w.name, w.added_at, + hs.stop_loss, hs.take_profit, hs.entry_low, hs.entry_high, + hs.reason as action + FROM watchlist_stocks w + LEFT JOIN holding_strategies hs ON w.code = hs.code + AND hs.id = (SELECT MAX(id) FROM holding_strategies WHERE code = w.code AND strategy_type = 'watch') + WHERE w.is_active = 1 + """).fetchall() + return [dict(r) for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 决策/策略查询 +# ═══════════════════════════════════════════════════════════ + +def query_strategies(conn: sqlite3.Connection, code: str = None) -> list[dict]: + """策略列表(按版本倒序)""" + if code: + rows = conn.execute( + "SELECT * FROM holding_strategies WHERE code = ? ORDER BY version DESC", (code,)).fetchall() + else: + rows = conn.execute( + "SELECT * FROM holding_strategies ORDER BY code, version DESC").fetchall() + return [dict(r) for r in rows] + + +def query_advice_timeline(conn: sqlite3.Connection, code: str = None, limit: int = 50) -> list[dict]: + """建议时间线""" + if code: + rows = conn.execute( + "SELECT * FROM advice_timeline WHERE code = ? ORDER BY date DESC LIMIT ?", + (code, limit)).fetchall() + else: + rows = conn.execute( + "SELECT * FROM advice_timeline ORDER BY date DESC LIMIT ?", (limit,)).fetchall() + return [dict(r) for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 候选池查询 +# ═══════════════════════════════════════════════════════════ + +def query_candidates(conn: sqlite3.Connection, active_only: bool = True) -> list[dict]: + """候选池列表(含最新评分)""" + where = "WHERE c.dropped = 0" if active_only else "" + rows = conn.execute(f""" + SELECT c.*, (SELECT score FROM candidate_score_history + WHERE code = c.code ORDER BY created_at DESC LIMIT 1) as latest_score + FROM candidates c {where} + ORDER BY c.zhiwei_star DESC NULLS LAST + """).fetchall() + return [dict(r) for r in rows] + + +def query_candidate_scores(conn: sqlite3.Connection, code: str) -> list[dict]: + """某候选的评分历史""" + rows = conn.execute( + "SELECT * FROM candidate_score_history WHERE code = ? ORDER BY created_at", + (code,)).fetchall() + return [dict(r) for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 价格事件查询 +# ═══════════════════════════════════════════════════════════ + +def query_price_events(conn: sqlite3.Connection, code: str = None, limit: int = 100) -> list[dict]: + """价格事件""" + if code: + rows = conn.execute( + "SELECT * FROM price_events WHERE code = ? ORDER BY created_at DESC LIMIT ?", + (code, limit)).fetchall() + else: + rows = conn.execute( + "SELECT * FROM price_events ORDER BY created_at DESC LIMIT ?", (limit,)).fetchall() + return [dict(r) for r in rows] + + +def query_price_events_by_date(conn: sqlite3.Connection, date: str) -> list[dict]: + """某天的价格事件""" + rows = conn.execute( + "SELECT * FROM price_events WHERE date = ? ORDER BY created_at DESC", (date,)).fetchall() + return [dict(r) for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 板块成分查询 +# ═══════════════════════════════════════════════════════════ + +def query_stock_sectors(conn: sqlite3.Connection, code: str) -> list[str]: + """某只股票所属板块""" + rows = conn.execute( + "SELECT sector_name FROM stock_sectors WHERE code = ?", (code,)).fetchall() + return [r[0] for r in rows] + + +def query_sector_stocks(conn: sqlite3.Connection, sector_name: str) -> list[str]: + """某板块包含的股票""" + rows = conn.execute( + "SELECT code FROM stock_sectors WHERE sector_name = ?", (sector_name,)).fetchall() + return [r[0] for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 准确率统计查询 +# ═══════════════════════════════════════════════════════════ + +def query_accuracy_stats(conn: sqlite3.Connection) -> dict: + """准确率统计""" + row = conn.execute("SELECT * FROM accuracy_stats WHERE id = 1").fetchone() + return dict(row) if row else {} + + +# ═══════════════════════════════════════════════════════════ +# 策略反馈查询 +# ═══════════════════════════════════════════════════════════ + +def query_strategy_feedback(conn: sqlite3.Connection, code: str = None) -> list[dict]: + """策略反馈""" + if code: + rows = conn.execute( + "SELECT * FROM strategy_feedback WHERE code = ? ORDER BY evaluated_at DESC", (code,)).fetchall() + else: + rows = conn.execute( + "SELECT * FROM strategy_feedback ORDER BY evaluated_at DESC").fetchall() + return [dict(r) for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 策略评估查询 +# ═══════════════════════════════════════════════════════════ + +def query_strategy_evaluations(conn: sqlite3.Connection, code: str = None) -> list[dict]: + """策略评估记录""" + if code: + rows = conn.execute( + "SELECT * FROM strategy_evaluations WHERE code = ? ORDER BY created_at DESC", (code,)).fetchall() + else: + rows = conn.execute( + "SELECT * FROM strategy_evaluations ORDER BY created_at DESC").fetchall() + return [dict(r) for r in rows] + + +# ═══════════════════════════════════════════════════════════ +# 市场快照查询(最新) +# ═══════════════════════════════════════════════════════════ + +def query_latest_market(conn: sqlite3.Connection) -> dict: + """最新一次市场快照(含板块数据)""" + snap = conn.execute( + "SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 1").fetchone() + if not snap: + return {} + snap = dict(snap) + sectors = conn.execute( + "SELECT * FROM sector_snapshots WHERE snapshot_id = ? ORDER BY change_pct DESC", + (snap["id"],)).fetchall() + snap["sectors"] = [dict(r) for r in sectors] + # 计算 top_gainers / top_losers + snap["top_gainers"] = [dict(r) for r in sectors[:5]] + snap["top_losers"] = [dict(r) for r in sectors[-3:]] + return snap