Files
MoFin/docs/DATABASE_ARCHITECTURE.md
T
hmo 6182ff081d docs: 统一数据库架构实施文档
覆盖: 表结构设计(13张表)、数据流架构、核心模块说明、
部署指南、架构决策记录(ADR)、已知限制
2026-06-20 17:54:10 +08:00

12 KiB
Raw Blame History

MoFin 统一数据库架构 — 实施文档

最后更新:2026-06-20 维护人:小小莫(xxm) 状态: 已完成


一、概述

将 MoFin 系统散落在 11 个 JSON 文件中的数据统一纳入 SQLite 数据库(data/mofin.db),实现:

  • 数据关系化:持仓 ↔ 板块 ↔ 趋势,一条 SQL 直连
  • 历史可追溯:板块快照、价格事件、策略变更全部时序存储
  • 操作 no_agent 化:常用查询用脚本完成,不消耗 LLM
  • 增量无损迁移:JSON 双写 → 验证 → 消费者切 SQLite → JSON 保留回退

当前状态

阶段 状态
阶段1:市场快照入库 已完成
阶段2:个股K线入库 已完成
阶段3:板块成分映射 已完成
阶段4:业务表迁移 已完成
阶段5:消费者切 SQLite 已完成

二、数据库设计

2.1 表结构总览(13 张表)

mofin.db
├── market_snapshots        # 每次市场采集的元信息
├── sector_snapshots        # 每个板块在每次采集中的快照
├── stocks                  # 个股基本信息
├── stock_daily             # 日K线
├── stock_weekly            # 周K线
├── stock_monthly           # 月K线
├── stock_fundamentals      # 基本面(PE/PB/EPS/市值)
├── stock_sectors           # 个股→板块映射
├── holdings                # 持仓
├── holding_strategies      # 持仓策略(含版本历史)
├── watchlist_stocks        # 自选股
├── candidates              # 候选池
├── candidate_score_history # 候选评分历史
├── price_events            # 价格触发事件
├── strategy_evaluations    # 策略评估记录
├── portfolio_summary       # 持仓汇总(总资产/现金/仓位)
├── advice_timeline         # 建议时间线
├── accuracy_stats          # 准确率统计
└── strategy_feedback       # 策略反馈

2.2 核心表详述

market_snapshots + sector_snapshots

CREATE TABLE market_snapshots (
    id          INTEGER PRIMARY KEY AUTOINCREMENT,
    timestamp   TEXT    NOT NULL,           -- '2026-06-20 15:30'
    source      TEXT    NOT NULL DEFAULT 'ths',
    up_ratio    REAL,                      -- 上涨板块占比(%)
    mood        TEXT                       -- bullish/neutral/bearish
);

CREATE TABLE sector_snapshots (
    id              INTEGER PRIMARY KEY AUTOINCREMENT,
    snapshot_id     INTEGER NOT NULL REFERENCES market_snapshots(id),
    name            TEXT    NOT NULL,       -- 板块名称
    change_pct      REAL,                  -- 涨跌幅(%)
    up_count        INTEGER,               -- 上涨家数
    down_count      INTEGER,               -- 下跌家数
    net_inflow      REAL,                  -- 资金净流入(亿)
    lead_stock      TEXT,                  -- 领涨股
    lead_stock_change REAL                 -- 领涨股涨跌幅
);

写入者market_watch.py(每30分钟,JSON + SQLite 双写)

典型查询

-- 半导体板块最近5次涨跌幅趋势
SELECT s.timestamp, ss.change_pct, ss.net_inflow
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE ss.name = '半导体'
ORDER BY s.timestamp DESC LIMIT 5;

-- 资金连续3天净流入的板块
SELECT name, COUNT(*) as times, AVG(net_inflow) as avg_inflow
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE s.timestamp >= date('now', '-3 days') AND net_inflow > 0
GROUP BY name HAVING COUNT(*) >= 3
ORDER BY avg_inflow DESC;

stocks + stock_daily/weekly/monthly + stock_fundamentals

CREATE TABLE stocks (
    code        TEXT PRIMARY KEY,
    name        TEXT NOT NULL,
    exchange    TEXT DEFAULT 'SH',         -- SH/SZ/HK
    type        TEXT DEFAULT 'A'           -- A/H
);

CREATE TABLE stock_daily (
    code TEXT NOT NULL REFERENCES stocks(code),
    date TEXT NOT NULL,
    open REAL, close REAL, high REAL, low REAL,
    volume REAL, amount REAL,
    PRIMARY KEY (code, date)
);
-- stock_weekly, stock_monthly 结构相同

写入者multi_timeframe.py(缓存写入时双写 SQLite

holdings + holding_strategies

CREATE TABLE holdings (
    code            TEXT PRIMARY KEY REFERENCES stocks(code),
    name            TEXT NOT NULL,
    shares          INTEGER NOT NULL,
    cost            REAL,
    position_pct    REAL,
    is_active       INTEGER DEFAULT 1
);

CREATE TABLE holding_strategies (
    id              INTEGER PRIMARY KEY AUTOINCREMENT,
    code            TEXT NOT NULL REFERENCES holdings(code),
    version         INTEGER DEFAULT 1,
    stop_loss       REAL,
    take_profit     REAL,
    entry_low       REAL,
    entry_high      REAL,
    strategy_type   TEXT DEFAULT 'holding',  -- holding/watch/decision
    source          TEXT,                    -- migrate/reassess/manual
    reason          TEXT,
    created_at      TEXT,
    superseded_at   TEXT
);

数据来源migrate_all.pyportfolio.jsondecisions.json 迁移

price_events

CREATE TABLE price_events (
    id              INTEGER PRIMARY KEY AUTOINCREMENT,
    code            TEXT NOT NULL REFERENCES stocks(code),
    name            TEXT,
    event_type      TEXT NOT NULL,          -- entry_zone/stop_loss/take_profit
    price           REAL,
    trigger_value   TEXT,
    event_label     TEXT,
    date            TEXT
);

写入者price_monitor.pyJSON + SQLite 双写)


三、数据流架构

3.1 写入路径

market_watch.py ──→ market.json + market_snapshots + sector_snapshots
multi_timeframe.py ──→ multi_tf_cache.json + stocks + stock_daily/weekly/monthly
price_monitor.py ──→ price_events.json + price_events

3.2 读取路径(SQLite 优先,JSON 回退)

server.py ──→ mofin_db.query_*() ──→ (失败) ──→ JSON 文件
strategy_lifecycle.py ──→ mofin_db.query_*() ──→ (失败) ──→ JSON 文件
market_insight.py ──→ mofin_db.query_latest_market() ──→ (失败) ──→ market.json
strategy_feedback.py ──→ mofin_db.query_price_events() ──→ (失败) ──→ price_events.json
system_health_check.py ──→ mofin_db.query_price_events() ──→ (失败) ──→ price_events.json

3.3 回退策略

所有消费者使用 try: SQLite → except: JSON 模式。SQLite 不可用时自动降级,系统不中断。


四、核心模块

4.1 mofin_db.py — 统一数据库访问层

所有脚本通过此模块访问数据库,避免重复建表/连接逻辑。

from mofin_db import get_conn, init_all_tables, query_holdings, ...

conn = get_conn()           # WAL模式 + Row工厂 + 外键约束
init_all_tables(conn)       # 幂等建表
holdings = query_holdings(conn)
conn.close()

写入函数

函数 用途
write_market_snapshot() 市场快照双写
write_klines() K线数据双写
write_price_event() 价格事件双写

查询函数(18个)

函数 返回
query_holdings() 持仓列表(含最新策略)
query_watchlist() 自选股列表
query_strategies(code) 策略版本历史
query_advice_timeline(code) 建议时间线
query_candidates() 候选池
query_candidate_scores(code) 候选评分历史
query_price_events(code) 价格事件
query_price_events_by_date(date) 某天价格事件
query_stock_sectors(code) 个股所属板块
query_sector_stocks(name) 板块成分股
query_accuracy_stats() 准确率统计
query_strategy_feedback(code) 策略反馈
query_strategy_evaluations(code) 策略评估记录
query_latest_market() 最新市场快照
query_sector_trend(name) 板块趋势
query_top_inflow() 资金净流入排行
query_consecutive_inflow() 连续净流入
query_market_mood() 市场情绪趋势
query_db_stats() 数据库概览

4.2 migrate_all.py — 一次性迁移脚本

python3 migrate_all.py

从所有 JSON 文件迁移历史数据到 SQLite。幂等可重跑(INSERT OR REPLACE/INSERT OR IGNORE),JSON 文件不修改。

迁移映射

源 JSON 目标表 迁移量(参考)
stock_profiles.json + 所有源 stocks ~55 只
portfolio.json holdings ~21 只
portfolio.json (analysis) holding_strategies ~21 条
watchlist.json watchlist_stocks ~29 只
decisions.json (含 changelog) holding_strategies ~316 条
candidate_pool.json candidates ~10 只
candidate_pool.json (score_history) candidate_score_history ~21 条
price_events.json price_events ~193 条
evaluation.json strategy_evaluations ~36 条
stock_sector_map.json stock_sectors ~62 条
portfolio.json (顶层) portfolio_summary 1 条
decisions.json (advice_timeline) advice_timeline ~2547 条
accuracy_stats.json accuracy_stats 1 条
strategy_feedback.json strategy_feedback ~37 条

4.3 mofin_query.py — 通用查询工具

python3 mofin_query.py "半导体最近5次采集的涨跌幅"
python3 mofin_query.py "今天资金净流入最多的5个板块"
python3 mofin_query.py "最近3天连续净流入的板块"
python3 mofin_query.py "市场情绪趋势(最近10次)"
python3 mofin_query.py "数据库概览"
python3 mofin_query.py "SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 5"

五、部署指南

5.1 首次部署

cd /home/hmo/web-dashboard
git pull
python3 migrate_all.py          # 一次性迁移历史数据
python3 market_watch.py          # 验证双写(需要 akshare
python3 mofin_query.py "数据库概览"  # 验证数据入库
# 重启 Flask

5.2 日常运维

数据库文件:/home/hmo/web-dashboard/data/mofin.db

# 查看数据库大小
ls -lh data/mofin.db

# 手动查询
sqlite3 data/mofin.db "SELECT COUNT(*) FROM market_snapshots"
sqlite3 data/mofin.db "SELECT COUNT(*) FROM price_events"

# 备份
cp data/mofin.db data/mofin.db.$(date +%Y%m%d).bak

5.3 故障恢复

如果 SQLite 损坏或不可用:

  • 所有消费者自动回退 JSON 文件,系统不中断
  • 删除 data/mofin.db,重新运行 migrate_all.py 重建
  • JSON 文件是数据源,不会丢失

六、架构决策记录

ADR-1:为什么选 SQLite 而不是 PostgreSQL

  • 单机部署,无分布式需求
  • 零运维成本(无需独立进程)
  • Python 标准库 sqlite3,无额外依赖
  • WAL 模式支持并发读写
  • 数据量预估 < 100MBSQLite 完全胜任

ADR-2:为什么双写而不是直接切换

  • 消费者(server.py、strategy_lifecycle.py 等)仍在读 JSON
  • 直接切换风险高,一旦出错影响生产
  • 双写 → 验证数据一致性 → 消费者切 SQLite → JSON 保留回退

ADR-3:为什么 SQLite 优先 + JSON 回退

  • SQLite 查询能力远超 JSON 文件
  • 保留 JSON 回退确保 SQLite 不可用时系统不中断
  • 渐进式迁移,降低风险

七、已知限制

限制 说明 影响
holdings 表缺 price/change_pct 实时价格由 price_monitor 写入 JSONSQLite 未同步 /api/overview 的 top_movers 依赖 change_pct,回退 JSON 时正常
market.json 的 LLM 生成字段 insightspotential_stocksmarket_verdict 等由 LLM cron 生成,SQLite 未存储 这些字段仅 market_screener.py 写入,Dashboard 展示用,不影响核心分析
decisions.json 的复杂嵌套字段 triggerchangelogevaluation 等嵌套结构未完全迁移到 SQLite 部分消费者仍读 JSON(如 server.py 的 decisions API
单文件数据库 SQLite 是单文件,无主从复制 备份需手动或 cron