docs: 统一数据库架构实施文档

覆盖: 表结构设计(13张表)、数据流架构、核心模块说明、
部署指南、架构决策记录(ADR)、已知限制
This commit is contained in:
hmo
2026-06-20 17:54:10 +08:00
parent 25f8c6ec67
commit 6182ff081d
+354
View File
@@ -0,0 +1,354 @@
# MoFin 统一数据库架构 — 实施文档
> 最后更新:2026-06-20
> 维护人:小小莫(xxm)
> 状态:✅ 已完成
---
## 一、概述
将 MoFin 系统散落在 11 个 JSON 文件中的数据统一纳入 SQLite 数据库(`data/mofin.db`),实现:
- **数据关系化**:持仓 ↔ 板块 ↔ 趋势,一条 SQL 直连
- **历史可追溯**:板块快照、价格事件、策略变更全部时序存储
- **操作 no_agent 化**:常用查询用脚本完成,不消耗 LLM
- **增量无损迁移**:JSON 双写 → 验证 → 消费者切 SQLite → JSON 保留回退
### 当前状态
| 阶段 | 状态 |
|------|------|
| 阶段1:市场快照入库 | ✅ 已完成 |
| 阶段2:个股K线入库 | ✅ 已完成 |
| 阶段3:板块成分映射 | ✅ 已完成 |
| 阶段4:业务表迁移 | ✅ 已完成 |
| 阶段5:消费者切 SQLite | ✅ 已完成 |
---
## 二、数据库设计
### 2.1 表结构总览(13 张表)
```
mofin.db
├── market_snapshots # 每次市场采集的元信息
├── sector_snapshots # 每个板块在每次采集中的快照
├── stocks # 个股基本信息
├── stock_daily # 日K线
├── stock_weekly # 周K线
├── stock_monthly # 月K线
├── stock_fundamentals # 基本面(PE/PB/EPS/市值)
├── stock_sectors # 个股→板块映射
├── holdings # 持仓
├── holding_strategies # 持仓策略(含版本历史)
├── watchlist_stocks # 自选股
├── candidates # 候选池
├── candidate_score_history # 候选评分历史
├── price_events # 价格触发事件
├── strategy_evaluations # 策略评估记录
├── portfolio_summary # 持仓汇总(总资产/现金/仓位)
├── advice_timeline # 建议时间线
├── accuracy_stats # 准确率统计
└── strategy_feedback # 策略反馈
```
### 2.2 核心表详述
#### market_snapshots + sector_snapshots
```sql
CREATE TABLE market_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL, -- '2026-06-20 15:30'
source TEXT NOT NULL DEFAULT 'ths',
up_ratio REAL, -- 上涨板块占比(%)
mood TEXT -- bullish/neutral/bearish
);
CREATE TABLE sector_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
snapshot_id INTEGER NOT NULL REFERENCES market_snapshots(id),
name TEXT NOT NULL, -- 板块名称
change_pct REAL, -- 涨跌幅(%)
up_count INTEGER, -- 上涨家数
down_count INTEGER, -- 下跌家数
net_inflow REAL, -- 资金净流入(亿)
lead_stock TEXT, -- 领涨股
lead_stock_change REAL -- 领涨股涨跌幅
);
```
**写入者**`market_watch.py`(每30分钟,JSON + SQLite 双写)
**典型查询**
```sql
-- 半导体板块最近5次涨跌幅趋势
SELECT s.timestamp, ss.change_pct, ss.net_inflow
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE ss.name = '半导体'
ORDER BY s.timestamp DESC LIMIT 5;
-- 资金连续3天净流入的板块
SELECT name, COUNT(*) as times, AVG(net_inflow) as avg_inflow
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE s.timestamp >= date('now', '-3 days') AND net_inflow > 0
GROUP BY name HAVING COUNT(*) >= 3
ORDER BY avg_inflow DESC;
```
#### stocks + stock_daily/weekly/monthly + stock_fundamentals
```sql
CREATE TABLE stocks (
code TEXT PRIMARY KEY,
name TEXT NOT NULL,
exchange TEXT DEFAULT 'SH', -- SH/SZ/HK
type TEXT DEFAULT 'A' -- A/H
);
CREATE TABLE stock_daily (
code TEXT NOT NULL REFERENCES stocks(code),
date TEXT NOT NULL,
open REAL, close REAL, high REAL, low REAL,
volume REAL, amount REAL,
PRIMARY KEY (code, date)
);
-- stock_weekly, stock_monthly 结构相同
```
**写入者**`multi_timeframe.py`(缓存写入时双写 SQLite
#### holdings + holding_strategies
```sql
CREATE TABLE holdings (
code TEXT PRIMARY KEY REFERENCES stocks(code),
name TEXT NOT NULL,
shares INTEGER NOT NULL,
cost REAL,
position_pct REAL,
is_active INTEGER DEFAULT 1
);
CREATE TABLE holding_strategies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL REFERENCES holdings(code),
version INTEGER DEFAULT 1,
stop_loss REAL,
take_profit REAL,
entry_low REAL,
entry_high REAL,
strategy_type TEXT DEFAULT 'holding', -- holding/watch/decision
source TEXT, -- migrate/reassess/manual
reason TEXT,
created_at TEXT,
superseded_at TEXT
);
```
**数据来源**`migrate_all.py``portfolio.json``decisions.json` 迁移
#### price_events
```sql
CREATE TABLE price_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL REFERENCES stocks(code),
name TEXT,
event_type TEXT NOT NULL, -- entry_zone/stop_loss/take_profit
price REAL,
trigger_value TEXT,
event_label TEXT,
date TEXT
);
```
**写入者**`price_monitor.py`JSON + SQLite 双写)
---
## 三、数据流架构
### 3.1 写入路径
```
market_watch.py ──→ market.json + market_snapshots + sector_snapshots
multi_timeframe.py ──→ multi_tf_cache.json + stocks + stock_daily/weekly/monthly
price_monitor.py ──→ price_events.json + price_events
```
### 3.2 读取路径(SQLite 优先,JSON 回退)
```
server.py ──→ mofin_db.query_*() ──→ (失败) ──→ JSON 文件
strategy_lifecycle.py ──→ mofin_db.query_*() ──→ (失败) ──→ JSON 文件
market_insight.py ──→ mofin_db.query_latest_market() ──→ (失败) ──→ market.json
strategy_feedback.py ──→ mofin_db.query_price_events() ──→ (失败) ──→ price_events.json
system_health_check.py ──→ mofin_db.query_price_events() ──→ (失败) ──→ price_events.json
```
### 3.3 回退策略
所有消费者使用 `try: SQLite → except: JSON` 模式。SQLite 不可用时自动降级,系统不中断。
---
## 四、核心模块
### 4.1 mofin_db.py — 统一数据库访问层
所有脚本通过此模块访问数据库,避免重复建表/连接逻辑。
```python
from mofin_db import get_conn, init_all_tables, query_holdings, ...
conn = get_conn() # WAL模式 + Row工厂 + 外键约束
init_all_tables(conn) # 幂等建表
holdings = query_holdings(conn)
conn.close()
```
**写入函数**
| 函数 | 用途 |
|------|------|
| `write_market_snapshot()` | 市场快照双写 |
| `write_klines()` | K线数据双写 |
| `write_price_event()` | 价格事件双写 |
**查询函数(18个)**
| 函数 | 返回 |
|------|------|
| `query_holdings()` | 持仓列表(含最新策略) |
| `query_watchlist()` | 自选股列表 |
| `query_strategies(code)` | 策略版本历史 |
| `query_advice_timeline(code)` | 建议时间线 |
| `query_candidates()` | 候选池 |
| `query_candidate_scores(code)` | 候选评分历史 |
| `query_price_events(code)` | 价格事件 |
| `query_price_events_by_date(date)` | 某天价格事件 |
| `query_stock_sectors(code)` | 个股所属板块 |
| `query_sector_stocks(name)` | 板块成分股 |
| `query_accuracy_stats()` | 准确率统计 |
| `query_strategy_feedback(code)` | 策略反馈 |
| `query_strategy_evaluations(code)` | 策略评估记录 |
| `query_latest_market()` | 最新市场快照 |
| `query_sector_trend(name)` | 板块趋势 |
| `query_top_inflow()` | 资金净流入排行 |
| `query_consecutive_inflow()` | 连续净流入 |
| `query_market_mood()` | 市场情绪趋势 |
| `query_db_stats()` | 数据库概览 |
### 4.2 migrate_all.py — 一次性迁移脚本
```bash
python3 migrate_all.py
```
从所有 JSON 文件迁移历史数据到 SQLite。幂等可重跑(`INSERT OR REPLACE`/`INSERT OR IGNORE`),JSON 文件不修改。
**迁移映射**
| 源 JSON | 目标表 | 迁移量(参考) |
|----------|--------|--------------|
| stock_profiles.json + 所有源 | stocks | ~55 只 |
| portfolio.json | holdings | ~21 只 |
| portfolio.json (analysis) | holding_strategies | ~21 条 |
| watchlist.json | watchlist_stocks | ~29 只 |
| decisions.json (含 changelog) | holding_strategies | ~316 条 |
| candidate_pool.json | candidates | ~10 只 |
| candidate_pool.json (score_history) | candidate_score_history | ~21 条 |
| price_events.json | price_events | ~193 条 |
| evaluation.json | strategy_evaluations | ~36 条 |
| stock_sector_map.json | stock_sectors | ~62 条 |
| portfolio.json (顶层) | portfolio_summary | 1 条 |
| decisions.json (advice_timeline) | advice_timeline | ~2547 条 |
| accuracy_stats.json | accuracy_stats | 1 条 |
| strategy_feedback.json | strategy_feedback | ~37 条 |
### 4.3 mofin_query.py — 通用查询工具
```bash
python3 mofin_query.py "半导体最近5次采集的涨跌幅"
python3 mofin_query.py "今天资金净流入最多的5个板块"
python3 mofin_query.py "最近3天连续净流入的板块"
python3 mofin_query.py "市场情绪趋势(最近10次)"
python3 mofin_query.py "数据库概览"
python3 mofin_query.py "SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 5"
```
---
## 五、部署指南
### 5.1 首次部署
```bash
cd /home/hmo/web-dashboard
git pull
python3 migrate_all.py # 一次性迁移历史数据
python3 market_watch.py # 验证双写(需要 akshare
python3 mofin_query.py "数据库概览" # 验证数据入库
# 重启 Flask
```
### 5.2 日常运维
数据库文件:`/home/hmo/web-dashboard/data/mofin.db`
```bash
# 查看数据库大小
ls -lh data/mofin.db
# 手动查询
sqlite3 data/mofin.db "SELECT COUNT(*) FROM market_snapshots"
sqlite3 data/mofin.db "SELECT COUNT(*) FROM price_events"
# 备份
cp data/mofin.db data/mofin.db.$(date +%Y%m%d).bak
```
### 5.3 故障恢复
如果 SQLite 损坏或不可用:
- 所有消费者自动回退 JSON 文件,系统不中断
- 删除 `data/mofin.db`,重新运行 `migrate_all.py` 重建
- JSON 文件是数据源,不会丢失
---
## 六、架构决策记录
### ADR-1:为什么选 SQLite 而不是 PostgreSQL
- 单机部署,无分布式需求
- 零运维成本(无需独立进程)
- Python 标准库 `sqlite3`,无额外依赖
- WAL 模式支持并发读写
- 数据量预估 < 100MBSQLite 完全胜任
### ADR-2:为什么双写而不是直接切换
- 消费者(server.py、strategy_lifecycle.py 等)仍在读 JSON
- 直接切换风险高,一旦出错影响生产
- 双写 → 验证数据一致性 → 消费者切 SQLite → JSON 保留回退
### ADR-3:为什么 SQLite 优先 + JSON 回退
- SQLite 查询能力远超 JSON 文件
- 保留 JSON 回退确保 SQLite 不可用时系统不中断
- 渐进式迁移,降低风险
---
## 七、已知限制
| 限制 | 说明 | 影响 |
|------|------|------|
| `holdings` 表缺 `price`/`change_pct` | 实时价格由 price_monitor 写入 JSONSQLite 未同步 | `/api/overview` 的 top_movers 依赖 change_pct,回退 JSON 时正常 |
| `market.json` 的 LLM 生成字段 | `insights``potential_stocks``market_verdict` 等由 LLM cron 生成,SQLite 未存储 | 这些字段仅 market_screener.py 写入,Dashboard 展示用,不影响核心分析 |
| `decisions.json` 的复杂嵌套字段 | `trigger``changelog``evaluation` 等嵌套结构未完全迁移到 SQLite | 部分消费者仍读 JSON(如 server.py 的 decisions API |
| 单文件数据库 | SQLite 是单文件,无主从复制 | 备份需手动或 cron |