6182ff081d
覆盖: 表结构设计(13张表)、数据流架构、核心模块说明、 部署指南、架构决策记录(ADR)、已知限制
12 KiB
12 KiB
MoFin 统一数据库架构 — 实施文档
最后更新:2026-06-20 维护人:小小莫(xxm) 状态:✅ 已完成
一、概述
将 MoFin 系统散落在 11 个 JSON 文件中的数据统一纳入 SQLite 数据库(data/mofin.db),实现:
- 数据关系化:持仓 ↔ 板块 ↔ 趋势,一条 SQL 直连
- 历史可追溯:板块快照、价格事件、策略变更全部时序存储
- 操作 no_agent 化:常用查询用脚本完成,不消耗 LLM
- 增量无损迁移:JSON 双写 → 验证 → 消费者切 SQLite → JSON 保留回退
当前状态
| 阶段 | 状态 |
|---|---|
| 阶段1:市场快照入库 | ✅ 已完成 |
| 阶段2:个股K线入库 | ✅ 已完成 |
| 阶段3:板块成分映射 | ✅ 已完成 |
| 阶段4:业务表迁移 | ✅ 已完成 |
| 阶段5:消费者切 SQLite | ✅ 已完成 |
二、数据库设计
2.1 表结构总览(13 张表)
mofin.db
├── market_snapshots # 每次市场采集的元信息
├── sector_snapshots # 每个板块在每次采集中的快照
├── stocks # 个股基本信息
├── stock_daily # 日K线
├── stock_weekly # 周K线
├── stock_monthly # 月K线
├── stock_fundamentals # 基本面(PE/PB/EPS/市值)
├── stock_sectors # 个股→板块映射
├── holdings # 持仓
├── holding_strategies # 持仓策略(含版本历史)
├── watchlist_stocks # 自选股
├── candidates # 候选池
├── candidate_score_history # 候选评分历史
├── price_events # 价格触发事件
├── strategy_evaluations # 策略评估记录
├── portfolio_summary # 持仓汇总(总资产/现金/仓位)
├── advice_timeline # 建议时间线
├── accuracy_stats # 准确率统计
└── strategy_feedback # 策略反馈
2.2 核心表详述
market_snapshots + sector_snapshots
CREATE TABLE market_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL, -- '2026-06-20 15:30'
source TEXT NOT NULL DEFAULT 'ths',
up_ratio REAL, -- 上涨板块占比(%)
mood TEXT -- bullish/neutral/bearish
);
CREATE TABLE sector_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
snapshot_id INTEGER NOT NULL REFERENCES market_snapshots(id),
name TEXT NOT NULL, -- 板块名称
change_pct REAL, -- 涨跌幅(%)
up_count INTEGER, -- 上涨家数
down_count INTEGER, -- 下跌家数
net_inflow REAL, -- 资金净流入(亿)
lead_stock TEXT, -- 领涨股
lead_stock_change REAL -- 领涨股涨跌幅
);
写入者:market_watch.py(每30分钟,JSON + SQLite 双写)
典型查询:
-- 半导体板块最近5次涨跌幅趋势
SELECT s.timestamp, ss.change_pct, ss.net_inflow
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE ss.name = '半导体'
ORDER BY s.timestamp DESC LIMIT 5;
-- 资金连续3天净流入的板块
SELECT name, COUNT(*) as times, AVG(net_inflow) as avg_inflow
FROM sector_snapshots ss
JOIN market_snapshots s ON ss.snapshot_id = s.id
WHERE s.timestamp >= date('now', '-3 days') AND net_inflow > 0
GROUP BY name HAVING COUNT(*) >= 3
ORDER BY avg_inflow DESC;
stocks + stock_daily/weekly/monthly + stock_fundamentals
CREATE TABLE stocks (
code TEXT PRIMARY KEY,
name TEXT NOT NULL,
exchange TEXT DEFAULT 'SH', -- SH/SZ/HK
type TEXT DEFAULT 'A' -- A/H
);
CREATE TABLE stock_daily (
code TEXT NOT NULL REFERENCES stocks(code),
date TEXT NOT NULL,
open REAL, close REAL, high REAL, low REAL,
volume REAL, amount REAL,
PRIMARY KEY (code, date)
);
-- stock_weekly, stock_monthly 结构相同
写入者:multi_timeframe.py(缓存写入时双写 SQLite)
holdings + holding_strategies
CREATE TABLE holdings (
code TEXT PRIMARY KEY REFERENCES stocks(code),
name TEXT NOT NULL,
shares INTEGER NOT NULL,
cost REAL,
position_pct REAL,
is_active INTEGER DEFAULT 1
);
CREATE TABLE holding_strategies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL REFERENCES holdings(code),
version INTEGER DEFAULT 1,
stop_loss REAL,
take_profit REAL,
entry_low REAL,
entry_high REAL,
strategy_type TEXT DEFAULT 'holding', -- holding/watch/decision
source TEXT, -- migrate/reassess/manual
reason TEXT,
created_at TEXT,
superseded_at TEXT
);
数据来源:migrate_all.py 从 portfolio.json、decisions.json 迁移
price_events
CREATE TABLE price_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL REFERENCES stocks(code),
name TEXT,
event_type TEXT NOT NULL, -- entry_zone/stop_loss/take_profit
price REAL,
trigger_value TEXT,
event_label TEXT,
date TEXT
);
写入者:price_monitor.py(JSON + SQLite 双写)
三、数据流架构
3.1 写入路径
market_watch.py ──→ market.json + market_snapshots + sector_snapshots
multi_timeframe.py ──→ multi_tf_cache.json + stocks + stock_daily/weekly/monthly
price_monitor.py ──→ price_events.json + price_events
3.2 读取路径(SQLite 优先,JSON 回退)
server.py ──→ mofin_db.query_*() ──→ (失败) ──→ JSON 文件
strategy_lifecycle.py ──→ mofin_db.query_*() ──→ (失败) ──→ JSON 文件
market_insight.py ──→ mofin_db.query_latest_market() ──→ (失败) ──→ market.json
strategy_feedback.py ──→ mofin_db.query_price_events() ──→ (失败) ──→ price_events.json
system_health_check.py ──→ mofin_db.query_price_events() ──→ (失败) ──→ price_events.json
3.3 回退策略
所有消费者使用 try: SQLite → except: JSON 模式。SQLite 不可用时自动降级,系统不中断。
四、核心模块
4.1 mofin_db.py — 统一数据库访问层
所有脚本通过此模块访问数据库,避免重复建表/连接逻辑。
from mofin_db import get_conn, init_all_tables, query_holdings, ...
conn = get_conn() # WAL模式 + Row工厂 + 外键约束
init_all_tables(conn) # 幂等建表
holdings = query_holdings(conn)
conn.close()
写入函数:
| 函数 | 用途 |
|---|---|
write_market_snapshot() |
市场快照双写 |
write_klines() |
K线数据双写 |
write_price_event() |
价格事件双写 |
查询函数(18个):
| 函数 | 返回 |
|---|---|
query_holdings() |
持仓列表(含最新策略) |
query_watchlist() |
自选股列表 |
query_strategies(code) |
策略版本历史 |
query_advice_timeline(code) |
建议时间线 |
query_candidates() |
候选池 |
query_candidate_scores(code) |
候选评分历史 |
query_price_events(code) |
价格事件 |
query_price_events_by_date(date) |
某天价格事件 |
query_stock_sectors(code) |
个股所属板块 |
query_sector_stocks(name) |
板块成分股 |
query_accuracy_stats() |
准确率统计 |
query_strategy_feedback(code) |
策略反馈 |
query_strategy_evaluations(code) |
策略评估记录 |
query_latest_market() |
最新市场快照 |
query_sector_trend(name) |
板块趋势 |
query_top_inflow() |
资金净流入排行 |
query_consecutive_inflow() |
连续净流入 |
query_market_mood() |
市场情绪趋势 |
query_db_stats() |
数据库概览 |
4.2 migrate_all.py — 一次性迁移脚本
python3 migrate_all.py
从所有 JSON 文件迁移历史数据到 SQLite。幂等可重跑(INSERT OR REPLACE/INSERT OR IGNORE),JSON 文件不修改。
迁移映射:
| 源 JSON | 目标表 | 迁移量(参考) |
|---|---|---|
| stock_profiles.json + 所有源 | stocks | ~55 只 |
| portfolio.json | holdings | ~21 只 |
| portfolio.json (analysis) | holding_strategies | ~21 条 |
| watchlist.json | watchlist_stocks | ~29 只 |
| decisions.json (含 changelog) | holding_strategies | ~316 条 |
| candidate_pool.json | candidates | ~10 只 |
| candidate_pool.json (score_history) | candidate_score_history | ~21 条 |
| price_events.json | price_events | ~193 条 |
| evaluation.json | strategy_evaluations | ~36 条 |
| stock_sector_map.json | stock_sectors | ~62 条 |
| portfolio.json (顶层) | portfolio_summary | 1 条 |
| decisions.json (advice_timeline) | advice_timeline | ~2547 条 |
| accuracy_stats.json | accuracy_stats | 1 条 |
| strategy_feedback.json | strategy_feedback | ~37 条 |
4.3 mofin_query.py — 通用查询工具
python3 mofin_query.py "半导体最近5次采集的涨跌幅"
python3 mofin_query.py "今天资金净流入最多的5个板块"
python3 mofin_query.py "最近3天连续净流入的板块"
python3 mofin_query.py "市场情绪趋势(最近10次)"
python3 mofin_query.py "数据库概览"
python3 mofin_query.py "SELECT * FROM market_snapshots ORDER BY id DESC LIMIT 5"
五、部署指南
5.1 首次部署
cd /home/hmo/web-dashboard
git pull
python3 migrate_all.py # 一次性迁移历史数据
python3 market_watch.py # 验证双写(需要 akshare)
python3 mofin_query.py "数据库概览" # 验证数据入库
# 重启 Flask
5.2 日常运维
数据库文件:/home/hmo/web-dashboard/data/mofin.db
# 查看数据库大小
ls -lh data/mofin.db
# 手动查询
sqlite3 data/mofin.db "SELECT COUNT(*) FROM market_snapshots"
sqlite3 data/mofin.db "SELECT COUNT(*) FROM price_events"
# 备份
cp data/mofin.db data/mofin.db.$(date +%Y%m%d).bak
5.3 故障恢复
如果 SQLite 损坏或不可用:
- 所有消费者自动回退 JSON 文件,系统不中断
- 删除
data/mofin.db,重新运行migrate_all.py重建 - JSON 文件是数据源,不会丢失
六、架构决策记录
ADR-1:为什么选 SQLite 而不是 PostgreSQL
- 单机部署,无分布式需求
- 零运维成本(无需独立进程)
- Python 标准库
sqlite3,无额外依赖 - WAL 模式支持并发读写
- 数据量预估 < 100MB,SQLite 完全胜任
ADR-2:为什么双写而不是直接切换
- 消费者(server.py、strategy_lifecycle.py 等)仍在读 JSON
- 直接切换风险高,一旦出错影响生产
- 双写 → 验证数据一致性 → 消费者切 SQLite → JSON 保留回退
ADR-3:为什么 SQLite 优先 + JSON 回退
- SQLite 查询能力远超 JSON 文件
- 保留 JSON 回退确保 SQLite 不可用时系统不中断
- 渐进式迁移,降低风险
七、已知限制
| 限制 | 说明 | 影响 |
|---|---|---|
holdings 表缺 price/change_pct |
实时价格由 price_monitor 写入 JSON,SQLite 未同步 | /api/overview 的 top_movers 依赖 change_pct,回退 JSON 时正常 |
market.json 的 LLM 生成字段 |
insights、potential_stocks、market_verdict 等由 LLM cron 生成,SQLite 未存储 |
这些字段仅 market_screener.py 写入,Dashboard 展示用,不影响核心分析 |
decisions.json 的复杂嵌套字段 |
trigger、changelog、evaluation 等嵌套结构未完全迁移到 SQLite |
部分消费者仍读 JSON(如 server.py 的 decisions API) |
| 单文件数据库 | SQLite 是单文件,无主从复制 | 备份需手动或 cron |