# MoFin 统一数据库架构 — 实施文档 > 最后更新:2026-06-20 > 维护人:小小莫(xxm) > 状态:✅ 已完成 --- ## 一、概述 将 MoFin 系统散落在 11 个 JSON 文件中的数据统一纳入 SQLite 数据库(`data/mofin.db`),实现: - **数据关系化**:持仓 ↔ 板块 ↔ 趋势,一条 SQL 直连 - **历史可追溯**:板块快照、价格事件、策略变更全部时序存储 - **操作 no_agent 化**:常用查询用脚本完成,不消耗 LLM - **增量无损迁移**:JSON 双写 → 验证 → 消费者切 SQLite → JSON 保留回退 ### 当前状态 | 阶段 | 状态 | |------|------| | 阶段1:市场快照入库 | ✅ 已完成 | | 阶段2:个股K线入库 | ✅ 已完成 | | 阶段3:板块成分映射 | ✅ 已完成 | | 阶段4:业务表迁移 | ✅ 已完成 | | 阶段5:消费者切 SQLite | ✅ 已完成 | --- ## 二、数据库设计 ### 2.1 表结构总览(13 张表) ``` mofin.db ├── market_snapshots # 每次市场采集的元信息 ├── sector_snapshots # 每个板块在每次采集中的快照 ├── stocks # 个股基本信息 ├── stock_daily # 日K线 ├── stock_weekly # 周K线 ├── stock_monthly # 月K线 ├── stock_fundamentals # 基本面(PE/PB/EPS/市值) ├── stock_sectors # 个股→板块映射 ├── holdings # 持仓 ├── holding_strategies # 持仓策略(含版本历史) ├── watchlist_stocks # 自选股 ├── candidates # 候选池 ├── candidate_score_history # 候选评分历史 ├── price_events # 价格触发事件 ├── strategy_evaluations # 策略评估记录 ├── portfolio_summary # 持仓汇总(总资产/现金/仓位) ├── advice_timeline # 建议时间线 ├── accuracy_stats # 准确率统计 └── strategy_feedback # 策略反馈 ``` ### 2.2 核心表详述 #### market_snapshots + sector_snapshots ```sql CREATE TABLE market_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, -- '2026-06-20 15:30' source TEXT NOT NULL DEFAULT 'ths', up_ratio REAL, -- 上涨板块占比(%) mood TEXT -- bullish/neutral/bearish ); CREATE TABLE sector_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, snapshot_id INTEGER NOT NULL REFERENCES market_snapshots(id), name TEXT NOT NULL, -- 板块名称 change_pct REAL, -- 涨跌幅(%) up_count INTEGER, -- 上涨家数 down_count INTEGER, -- 下跌家数 net_inflow REAL, -- 资金净流入(亿) lead_stock TEXT, -- 领涨股 lead_stock_change REAL -- 领涨股涨跌幅 ); ``` **写入者**:`market_watch.py`(每30分钟,JSON + SQLite 双写) **典型查询**: ```sql -- 半导体板块最近5次涨跌幅趋势 SELECT s.timestamp, ss.change_pct, ss.net_inflow FROM sector_snapshots ss JOIN market_snapshots s ON ss.snapshot_id = s.id WHERE ss.name = '半导体' ORDER BY s.timestamp DESC LIMIT 5; -- 资金连续3天净流入的板块 SELECT name, COUNT(*) as times, AVG(net_inflow) as avg_inflow FROM sector_snapshots ss JOIN market_snapshots s ON ss.snapshot_id = s.id WHERE s.timestamp >= date('now', '-3 days') AND net_inflow > 0 GROUP BY name HAVING COUNT(*) >= 3 ORDER BY avg_inflow DESC; ``` #### stocks + stock_daily/weekly/monthly + stock_fundamentals ```sql CREATE TABLE stocks ( code TEXT PRIMARY KEY, name TEXT NOT NULL, exchange TEXT DEFAULT 'SH', -- SH/SZ/HK type TEXT DEFAULT 'A' -- A/H ); CREATE TABLE stock_daily ( code TEXT NOT NULL REFERENCES stocks(code), date TEXT NOT NULL, open REAL, close REAL, high REAL, low REAL, volume REAL, amount REAL, PRIMARY KEY (code, date) ); -- stock_weekly, stock_monthly 结构相同 ``` **写入者**:`multi_timeframe.py`(缓存写入时双写 SQLite) #### holdings + holding_strategies ```sql CREATE TABLE holdings ( code TEXT PRIMARY KEY REFERENCES stocks(code), name TEXT NOT NULL, shares INTEGER NOT NULL, cost REAL, position_pct REAL, is_active INTEGER DEFAULT 1 ); CREATE TABLE holding_strategies ( id INTEGER PRIMARY KEY AUTOINCREMENT, code TEXT NOT NULL REFERENCES holdings(code), version INTEGER DEFAULT 1, stop_loss REAL, take_profit REAL, entry_low REAL, entry_high REAL, strategy_type TEXT DEFAULT 'holding', -- holding/watch/decision source TEXT, -- migrate/reassess/manual reason TEXT, created_at TEXT, superseded_at TEXT ); ``` **数据来源**:`migrate_all.py` 从 `portfolio.json`、`decisions.json` 迁移 #### price_events ```sql CREATE TABLE price_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, code TEXT NOT NULL REFERENCES stocks(code), name TEXT, event_type TEXT NOT NULL, -- entry_zone/stop_loss/take_profit price REAL, trigger_value TEXT, event_label TEXT, date TEXT ); ``` **写入者**:`price_monitor.py`(JSON + SQLite 双写) --- ## 三、数据流架构 ### 3.1 写入路径 ``` market_watch.py ──→ market.json + market_snapshots + sector_snapshots multi_timeframe.py ──→ multi_tf_cache.json + stocks + stock_daily/weekly/monthly price_monitor.py ──→ price_events.json + price_events ``` ### 3.2 读取路径(SQLite 优先,JSON 回退) ``` server.py ──→ mofin_db.query_*() ──→ (失败) ──→ JSON 文件 strategy_lifecycle.py ──→ mofin_db.query_*() ──→ (失败) ──→ JSON 文件 market_insight.py ──→ mofin_db.query_latest_market() ──→ (失败) ──→ market.json strategy_feedback.py ──→ mofin_db.query_price_events() ──→ (失败) ──→ price_events.json system_health_check.py ──→ mofin_db.query_price_events() ──→ (失败) ──→ price_events.json ``` ### 3.3 回退策略 所有消费者使用 `try: SQLite → except: JSON` 模式。SQLite 不可用时自动降级,系统不中断。 --- ## 四、核心模块 ### 4.1 mofin_db.py — 统一数据库访问层 所有脚本通过此模块访问数据库,避免重复建表/连接逻辑。 ```python from mofin_db import get_conn, init_all_tables, query_holdings, ... conn = get_conn() # WAL模式 + Row工厂 + 外键约束 init_all_tables(conn) # 幂等建表 holdings = query_holdings(conn) conn.close() ``` **写入函数**: | 函数 | 用途 | |------|------| | `write_market_snapshot()` | 市场快照双写 | | `write_klines()` | K线数据双写 | | `write_price_event()` | 价格事件双写 | **查询函数(18个)**: | 函数 | 返回 | |------|------| | `query_holdings()` | 持仓列表(含最新策略) | | `query_watchlist()` | 自选股列表 | | `query_strategies(code)` | 策略版本历史 | | `query_advice_timeline(code)` | 建议时间线 | | `query_candidates()` | 候选池 | | `query_candidate_scores(code)` | 候选评分历史 | | `query_price_events(code)` | 价格事件 | | `query_price_events_by_date(date)` | 某天价格事件 | | `query_stock_sectors(code)` | 个股所属板块 | | `query_sector_stocks(name)` | 板块成分股 | | `query_accuracy_stats()` | 准确率统计 | | `query_strategy_feedback(code)` | 策略反馈 | | `query_strategy_evaluations(code)` | 策略评估记录 | | `query_latest_market()` | 最新市场快照 | | `query_sector_trend(name)` | 板块趋势 | | `query_top_inflow()` | 资金净流入排行 | | `query_consecutive_inflow()` | 连续净流入 | | `query_market_mood()` | 市场情绪趋势 | | `query_db_stats()` | 数据库概览 | ### 4.2 migrate_all.py — 一次性迁移脚本 ```bash python3 migrate_all.py ``` 从所有 JSON 文件迁移历史数据到 SQLite。幂等可重跑(`INSERT OR REPLACE`/`INSERT OR IGNORE`),JSON 文件不修改。 **迁移映射**: | 源 JSON | 目标表 | 迁移量(参考) | |----------|--------|--------------| | stock_profiles.json + 所有源 | stocks | ~55 只 | | portfolio.json | holdings | ~21 只 | | portfolio.json (analysis) | holding_strategies | ~21 条 | | watchlist.json | watchlist_stocks | ~29 只 | | decisions.json (含 changelog) | holding_strategies | ~316 条 | | candidate_pool.json | candidates | ~10 只 | | candidate_pool.json (score_history) | candidate_score_history | ~21 条 | | price_events.json | price_events | ~193 条 | | evaluation.json | strategy_evaluations | ~36 条 | | stock_sector_map.json | stock_sectors | ~62 条 | | portfolio.json (顶层) | portfolio_summary | 1 条 | | decisions.json (advice_timeline) | advice_timeline | ~2547 条 | | accuracy_stats.json | accuracy_stats | 1 条 | | strategy_feedback.json | strategy_feedback | ~37 条 | ### 4.3 mofin_query.py — 通用查询工具 ```bash python3 mofin_query.py "半导体最近5次采集的涨跌幅" python3 mofin_query.py "今天资金净流入最多的5个板块" python3 mofin_query.py "最近3天连续净流入的板块" python3 mofin_query.py "市场情绪趋势(最近10次)" python3 mofin_query.py "数据库概览" python3 mofin_query.py "SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 5" ``` --- ## 五、部署指南 ### 5.1 首次部署 ```bash cd /home/hmo/web-dashboard git pull python3 migrate_all.py # 一次性迁移历史数据 python3 market_watch.py # 验证双写(需要 akshare) python3 mofin_query.py "数据库概览" # 验证数据入库 # 重启 Flask ``` ### 5.2 日常运维 数据库文件:`/home/hmo/web-dashboard/data/mofin.db` ```bash # 查看数据库大小 ls -lh data/mofin.db # 手动查询 sqlite3 data/mofin.db "SELECT COUNT(*) FROM market_snapshots" sqlite3 data/mofin.db "SELECT COUNT(*) FROM price_events" # 备份 cp data/mofin.db data/mofin.db.$(date +%Y%m%d).bak ``` ### 5.3 故障恢复 如果 SQLite 损坏或不可用: - 所有消费者自动回退 JSON 文件,系统不中断 - 删除 `data/mofin.db`,重新运行 `migrate_all.py` 重建 - JSON 文件是数据源,不会丢失 --- ## 六、架构决策记录 ### ADR-1:为什么选 SQLite 而不是 PostgreSQL - 单机部署,无分布式需求 - 零运维成本(无需独立进程) - Python 标准库 `sqlite3`,无额外依赖 - WAL 模式支持并发读写 - 数据量预估 < 100MB,SQLite 完全胜任 ### ADR-2:为什么双写而不是直接切换 - 消费者(server.py、strategy_lifecycle.py 等)仍在读 JSON - 直接切换风险高,一旦出错影响生产 - 双写 → 验证数据一致性 → 消费者切 SQLite → JSON 保留回退 ### ADR-3:为什么 SQLite 优先 + JSON 回退 - SQLite 查询能力远超 JSON 文件 - 保留 JSON 回退确保 SQLite 不可用时系统不中断 - 渐进式迁移,降低风险 --- ## 七、已知限制 | 限制 | 说明 | 影响 | |------|------|------| | `holdings` 表缺 `price`/`change_pct` | 实时价格由 price_monitor 写入 JSON,SQLite 未同步 | `/api/overview` 的 top_movers 依赖 change_pct,回退 JSON 时正常 | | `market.json` 的 LLM 生成字段 | `insights`、`potential_stocks`、`market_verdict` 等由 LLM cron 生成,SQLite 未存储 | 这些字段仅 market_screener.py 写入,Dashboard 展示用,不影响核心分析 | | `decisions.json` 的复杂嵌套字段 | `trigger`、`changelog`、`evaluation` 等嵌套结构未完全迁移到 SQLite | 部分消费者仍读 JSON(如 server.py 的 decisions API) | | 单文件数据库 | SQLite 是单文件,无主从复制 | 备份需手动或 cron |