feat: migrate_all.py — 完整数据迁移脚本 (JSON → SQLite)

一次性迁移全部生产数据到 mofin.db:
- stock_profiles.json → stocks (55只)
- portfolio.json → holdings (21只) + holding_strategies (21条)
- watchlist.json → watchlist_stocks (1只) + holding_strategies (1条)
- decisions.json → holding_strategies (316条, 含changelog历史)
- candidate_pool.json → candidates (10只) + candidate_score_history (21条)
- price_events.json → price_events (193条)
- evaluation.json → strategy_evaluations (36条)
- stock_sector_map.json → stock_sectors (62条)

特性:
- 自动从所有JSON源收集股票代码 (collect_all_stocks)
- 代码格式归一化 (_normalize_code: 整数→补零字符串)
- 迁移期间关闭外键约束 (兼容旧数据格式不一致)
- 幂等可重跑 (INSERT OR REPLACE/IGNORE)
- JSON文件不修改,可安全重复执行

替换旧的 migrate_sectors.py (功能已合并)
This commit is contained in:
hmo
2026-06-20 16:40:39 +08:00
parent 0924cf3124
commit 0650673038
2 changed files with 483 additions and 22 deletions
+483
View File
@@ -0,0 +1,483 @@
#!/usr/bin/env python3
"""migrate_all.py — 一次性将全部生产 JSON 数据迁移到 mofin.db
运行方式:
python3 migrate_all.py
迁移映射:
portfolio.json → holdings + holding_strategies (from analysis)
watchlist.json → watchlist_stocks + holding_strategies (strategy_type='watch')
candidate_pool.json → candidates + candidate_score_history
decisions.json → holding_strategies (changelog history)
price_events.json → price_events
stock_sector_map.json → stock_sectors
evaluation.json → strategy_evaluations
stock_profiles.json → stocks (name, exchange, type)
"""
import json
import sys
from datetime import datetime
from pathlib import Path
from mofin_db import get_conn, init_all_tables
DATA_DIR = Path(__file__).parent / "data"
def load_json(name: str):
path = DATA_DIR / name
if not path.exists():
print(f" [SKIP] {name} not found")
return None
try:
with open(path, encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f" [FAIL] {name}: {e}")
return None
def _normalize_code(code) -> str:
"""统一代码格式:整数→补零字符串"""
s = str(code).strip()
if s.isdigit():
if len(s) == 5:
return s
if len(s) < 5:
return s.zfill(5 if len(s) <= 4 else 6)
return s.zfill(6)
return s
def _guess_exchange(code: str) -> str:
if len(code) == 5 and code.isdigit():
return "HK"
if code.startswith(("6", "5", "9")):
return "SH"
return "SZ"
def collect_all_stocks() -> dict:
"""从所有 JSON 文件中收集股票代码→名称映射"""
all_stocks = {}
def add(code, name):
code = _normalize_code(code)
if not code or code in all_stocks:
return
all_stocks[code] = {
"name": str(name) if name else code,
"exchange": _guess_exchange(code),
"type": "H" if len(code) == 5 else "A",
}
# stock_profiles.json
profiles = load_json("stock_profiles.json")
if profiles:
for p in profiles.get("profiles", []):
add(p.get("code"), p.get("name"))
# portfolio.json
pf = load_json("portfolio.json")
if pf:
for h in pf.get("holdings", []):
add(h.get("code"), h.get("name"))
# watchlist.json
wl = load_json("watchlist.json")
if wl:
for s in wl.get("stocks", []):
add(s.get("code"), s.get("name"))
# decisions.json
dec = load_json("decisions.json")
if dec:
for d in dec.get("decisions", []):
add(d.get("code"), d.get("name"))
# candidate_pool.json
cp = load_json("candidate_pool.json")
if cp:
for c in cp.get("candidates", []):
add(c.get("code"), c.get("name"))
# price_events.json
pe = load_json("price_events.json")
if pe:
for e in pe.get("events", []):
add(e.get("code"), e.get("name"))
return all_stocks
def migrate_all_stocks(conn, all_stocks: dict) -> int:
"""将所有收集到的股票写入 stocks 表"""
count = 0
for code, info in all_stocks.items():
try:
conn.execute(
"INSERT OR REPLACE INTO stocks (code, name, exchange, type, updated_at) VALUES (?, ?, ?, ?, ?)",
(code, info["name"], info["exchange"], info["type"], datetime.now().isoformat()))
count += 1
except Exception as e:
print(f" stocks error {code}: {e}")
conn.commit()
return count
def migrate_holdings(conn, portfolio_data: dict) -> tuple[int, int]:
"""portfolio.json → holdings + holding_strategies"""
holdings = portfolio_data.get("holdings", [])
h_count, s_count = 0, 0
for h in holdings:
code = _normalize_code(h.get("code", ""))
name = h.get("name", "")
shares = int(h.get("shares", 0))
cost = h.get("cost", 0)
pos_pct = h.get("position_pct", 0)
if not code or not name:
continue
# holdings 表
try:
conn.execute(
"INSERT OR REPLACE INTO holdings (code, name, shares, cost, position_pct, is_active) "
"VALUES (?, ?, ?, ?, ?, 1)",
(code, name, shares, cost, pos_pct))
h_count += 1
except Exception as e:
print(f" holdings error {code}: {e}")
continue
# holding_strategies (from analysis field)
analysis = h.get("analysis", {})
if analysis:
sl = analysis.get("stop_loss")
tp = analysis.get("take_profit")
el = analysis.get("entry_low")
eh = analysis.get("entry_high")
reason = analysis.get("action", "")[:200] if analysis.get("action") else None
reassessed = analysis.get("reassessed_at")
try:
conn.execute(
"INSERT INTO holding_strategies (code, version, stop_loss, take_profit, "
"entry_low, entry_high, strategy_type, source, reason, created_at) "
"VALUES (?, 1, ?, ?, ?, ?, 'holding', 'migrate', ?, ?)",
(code, sl, tp, el, eh, reason, reassessed or datetime.now().isoformat()))
s_count += 1
except Exception as e:
print(f" strategy error {code}: {e}")
conn.commit()
return h_count, s_count
def migrate_watchlist(conn, watchlist_data: dict) -> tuple[int, int]:
"""watchlist.json → watchlist_stocks + holding_strategies"""
stocks = watchlist_data.get("stocks", [])
w_count, s_count = 0, 0
for s in stocks:
code = _normalize_code(s.get("code", ""))
name = s.get("name", "")
if not code or not name:
continue
# watchlist_stocks
try:
conn.execute(
"INSERT OR REPLACE INTO watchlist_stocks (code, name, added_at, is_active) "
"VALUES (?, ?, ?, 1)",
(code, name, s.get("added_at", datetime.now().isoformat())))
w_count += 1
except Exception as e:
print(f" watchlist error {code}: {e}")
continue
# holding_strategies (from strategy field)
strategy = s.get("strategy", {})
if strategy:
buy_zone = strategy.get("buy_zone", [])
el = buy_zone[0] if len(buy_zone) > 0 else None
eh = buy_zone[1] if len(buy_zone) > 1 else None
tp_zone = strategy.get("take_profit", [])
tp = tp_zone[0] if isinstance(tp_zone, list) and len(tp_zone) > 0 else tp_zone
try:
conn.execute(
"INSERT INTO holding_strategies (code, version, stop_loss, take_profit, "
"entry_low, entry_high, strategy_type, source, created_at) "
"VALUES (?, 1, ?, ?, ?, ?, 'watch', 'migrate', ?)",
(code, strategy.get("stop_loss"), tp, el, eh,
strategy.get("updated_at", datetime.now().isoformat())))
s_count += 1
except Exception as e:
print(f" watch strategy error {code}: {e}")
conn.commit()
return w_count, s_count
def migrate_decisions(conn, decisions_data: dict) -> int:
"""decisions.json → holding_strategies (changelog history)"""
decisions = decisions_data.get("decisions", [])
count = 0
for d in decisions:
code = _normalize_code(d.get("code", ""))
if not code:
continue
# 最新策略
sl = d.get("stop_loss")
el = d.get("entry_low")
eh = d.get("entry_high")
tp = d.get("take_profit")
action = d.get("action", "")[:200] if d.get("action") else None
try:
conn.execute(
"INSERT INTO holding_strategies (code, version, stop_loss, take_profit, "
"entry_low, entry_high, strategy_type, source, reason, created_at) "
"VALUES (?, 1, ?, ?, ?, ?, 'decision', 'migrate', ?, ?)",
(code, sl, tp, el, eh, action, d.get("timestamp", datetime.now().isoformat())))
count += 1
except Exception as e:
print(f" decision error {code}: {e}")
continue
# changelog 历史版本
changelog = d.get("changelog", [])
for i, cl in enumerate(changelog):
try:
# 从 new_action 中提取止损/止盈/买入区
new_action = cl.get("new_action", "")
# 简单提取:损XX 盈XX 买XX~XX
import re
sl_match = re.search(r'损(\d+\.?\d*)', new_action)
tp_match = re.search(r'盈(\d+\.?\d*)', new_action)
entry_match = re.search(r'买(\d+\.?\d*)~(\d+\.?\d*)', new_action)
conn.execute(
"INSERT INTO holding_strategies (code, version, stop_loss, take_profit, "
"entry_low, entry_high, strategy_type, source, reason, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, 'decision', 'migrate', ?, ?)",
(code, i + 2,
float(sl_match.group(1)) if sl_match else None,
float(tp_match.group(1)) if tp_match else None,
float(entry_match.group(1)) if entry_match else None,
float(entry_match.group(2)) if entry_match else None,
cl.get("reason", "")[:200],
cl.get("date", datetime.now().isoformat())))
count += 1
except Exception:
pass # 解析失败跳过
conn.commit()
return count
def migrate_candidates(conn, candidate_data: dict) -> tuple[int, int]:
"""candidate_pool.json → candidates + candidate_score_history"""
candidates = candidate_data.get("candidates", [])
c_count, s_count = 0, 0
for c in candidates:
code = _normalize_code(c.get("code", ""))
name = c.get("name", "")
if not code or not name:
continue
# candidates 表
try:
conn.execute(
"INSERT OR REPLACE INTO candidates (code, name, sector, reason, entry_range, "
"stop_loss, target, zhiwei_star, zhiwei_reviewed, zhiwei_reviewed_at, "
"promoted, promoted_at, dropped, drop_reason, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(code, name, c.get("sector"), c.get("reason"), c.get("entry_range"),
c.get("stop_loss"), c.get("target"), c.get("zhiwei_star"),
1 if c.get("zhiwei_reviewed") else 0, c.get("zhiwei_reviewed_at"),
1 if c.get("promoted") else 0, c.get("promoted_at"),
1 if c.get("dropped") else 0, c.get("drop_reason"),
c.get("added_at", datetime.now().isoformat())))
c_count += 1
except Exception as e:
print(f" candidate error {code}: {e}")
continue
# candidate_score_history
score_history = c.get("score_history", [])
for sh in score_history:
try:
conn.execute(
"INSERT INTO candidate_score_history (code, score, source, created_at) "
"VALUES (?, ?, 'xiaoguo', ?)",
(code, sh.get("score"), sh.get("date", datetime.now().isoformat())))
s_count += 1
except Exception:
pass
conn.commit()
return c_count, s_count
def migrate_price_events(conn, events_data: dict) -> int:
"""price_events.json → price_events 表"""
events = events_data.get("events", [])
count = 0
for e in events:
try:
conn.execute(
"INSERT INTO price_events (code, name, event_type, price, trigger_value, event_label, date) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(e.get("code"), e.get("name"), e.get("event_type"),
e.get("price"), e.get("trigger_value"), e.get("event_label"),
e.get("date", datetime.now().strftime("%Y-%m-%d"))))
count += 1
except Exception:
pass
conn.commit()
return count
def migrate_evaluations(conn, eval_data: dict) -> int:
"""evaluation.json → strategy_evaluations"""
strategies = eval_data.get("strategies", [])
count = 0
for s in strategies:
code = _normalize_code(s.get("code", ""))
if not code:
continue
theoretical = s.get("theoretical", {}).get("strategy", {})
try:
conn.execute(
"INSERT INTO strategy_evaluations (code, eval_type, status, "
"old_stop_loss, new_stop_loss, old_tp, new_tp, reason, created_at) "
"VALUES (?, 'migrate', 'completed', NULL, ?, NULL, ?, ?, ?)",
(code, theoretical.get("stop_loss"), theoretical.get("take_profit"),
s.get("updated_reason", "")[:200],
s.get("updated_at", datetime.now().isoformat())))
count += 1
except Exception:
pass
conn.commit()
return count
def migrate_sectors(conn) -> int:
"""stock_sector_map.json → stock_sectors"""
data = load_json("stock_sector_map.json")
if not data:
return 0
count = 0
for code, sectors in data.items():
if code.startswith("_") or not isinstance(sectors, list):
continue
for sector in sectors:
try:
conn.execute(
"INSERT OR IGNORE INTO stock_sectors (code, sector_name, source) VALUES (?, ?, 'ths')",
(code, sector))
count += 1
except Exception:
pass
conn.commit()
return count
def main():
print("=" * 60)
print(" MoFin 数据迁移 → mofin.db")
print("=" * 60)
conn = get_conn()
conn.execute("PRAGMA foreign_keys=OFF") # 迁移期间关闭外键(部分代码格式不一致)
init_all_tables(conn)
totals = {}
# 1. stocks (必须先迁,从所有源收集)
all_stocks = collect_all_stocks()
n = migrate_all_stocks(conn, all_stocks)
totals["stocks"] = n
print(f"\n[1/8] stocks: {n} 只 (from all sources)")
# 2. holdings + strategies
pf = load_json("portfolio.json")
if pf:
h, s = migrate_holdings(conn, pf)
totals["holdings"] = h
totals["holding_strategies"] = s
print(f"[2/8] holdings: {h} 只, strategies: {s}")
# 3. watchlist
wl = load_json("watchlist.json")
if wl:
w, s = migrate_watchlist(conn, wl)
totals["watchlist"] = w
totals["watch_strategies"] = s
print(f"[3/8] watchlist: {w} 只, strategies: {s}")
# 4. decisions → strategies
dec = load_json("decisions.json")
if dec:
n = migrate_decisions(conn, dec)
totals["decision_strategies"] = n
print(f"[4/8] decisions → strategies: {n}")
# 5. candidates
cp = load_json("candidate_pool.json")
if cp:
c, s = migrate_candidates(conn, cp)
totals["candidates"] = c
totals["score_history"] = s
print(f"[5/8] candidates: {c} 只, score_history: {s}")
# 6. price_events
pe = load_json("price_events.json")
if pe:
n = migrate_price_events(conn, pe)
totals["price_events"] = n
print(f"[6/8] price_events: {n}")
# 7. evaluations
ev = load_json("evaluation.json")
if ev:
n = migrate_evaluations(conn, ev)
totals["evaluations"] = n
print(f"[7/8] evaluations: {n}")
# 8. sectors
n = migrate_sectors(conn)
totals["sector_mappings"] = n
print(f"[8/8] stock_sectors: {n} 条映射")
conn.commit()
# ── 验证 ──
print(f"\n{'='*60}")
print(f" 迁移完成 — 数据库验证")
print(f"{'='*60}")
tables = {
"stocks": "SELECT COUNT(*) FROM stocks",
"holdings": "SELECT COUNT(*) FROM holdings",
"holding_strategies": "SELECT COUNT(*) FROM holding_strategies",
"watchlist_stocks": "SELECT COUNT(*) FROM watchlist_stocks",
"candidates": "SELECT COUNT(*) FROM candidates",
"candidate_score_history": "SELECT COUNT(*) FROM candidate_score_history",
"price_events": "SELECT COUNT(*) FROM price_events",
"strategy_evaluations": "SELECT COUNT(*) FROM strategy_evaluations",
"stock_sectors": "SELECT COUNT(*) FROM stock_sectors",
}
for name, sql in tables.items():
n = conn.execute(sql).fetchone()[0]
print(f" {name:<25} {n:>6}")
conn.close()
print(f"\n[OK] Migration complete. JSON files untouched, safe to re-run.")
if __name__ == "__main__":
main()
-22
View File
@@ -1,22 +0,0 @@
#!/usr/bin/env python3
"""一次性迁移脚本:stock_sector_map.json → stock_sectors 表
运行方式:
python3 migrate_sectors.py
"""
from mofin_db import get_conn, init_all_tables, migrate_stock_sectors
conn = get_conn()
init_all_tables(conn)
stocks, mappings = migrate_stock_sectors(conn)
print(f"迁移完成: {stocks} 只股票, {mappings} 条板块映射")
# 验证
rows = conn.execute("SELECT COUNT(*) FROM stock_sectors").fetchone()[0]
print(f"stock_sectors 表总行数: {rows}")
# 样例
for r in conn.execute("SELECT * FROM stock_sectors LIMIT 5").fetchall():
print(f" {r[0]}{r[1]}")
conn.close()