diff --git a/migrate_all.py b/migrate_all.py new file mode 100644 index 0000000..7c8d8d9 --- /dev/null +++ b/migrate_all.py @@ -0,0 +1,483 @@ +#!/usr/bin/env python3 +"""migrate_all.py — 一次性将全部生产 JSON 数据迁移到 mofin.db + +运行方式: + python3 migrate_all.py + +迁移映射: + portfolio.json → holdings + holding_strategies (from analysis) + watchlist.json → watchlist_stocks + holding_strategies (strategy_type='watch') + candidate_pool.json → candidates + candidate_score_history + decisions.json → holding_strategies (changelog history) + price_events.json → price_events + stock_sector_map.json → stock_sectors + evaluation.json → strategy_evaluations + stock_profiles.json → stocks (name, exchange, type) +""" + +import json +import sys +from datetime import datetime +from pathlib import Path + +from mofin_db import get_conn, init_all_tables + +DATA_DIR = Path(__file__).parent / "data" + + +def load_json(name: str): + path = DATA_DIR / name + if not path.exists(): + print(f" [SKIP] {name} not found") + return None + try: + with open(path, encoding="utf-8") as f: + return json.load(f) + except Exception as e: + print(f" [FAIL] {name}: {e}") + return None + + +def _normalize_code(code) -> str: + """统一代码格式:整数→补零字符串""" + s = str(code).strip() + if s.isdigit(): + if len(s) == 5: + return s + if len(s) < 5: + return s.zfill(5 if len(s) <= 4 else 6) + return s.zfill(6) + return s + + +def _guess_exchange(code: str) -> str: + if len(code) == 5 and code.isdigit(): + return "HK" + if code.startswith(("6", "5", "9")): + return "SH" + return "SZ" + + +def collect_all_stocks() -> dict: + """从所有 JSON 文件中收集股票代码→名称映射""" + all_stocks = {} + + def add(code, name): + code = _normalize_code(code) + if not code or code in all_stocks: + return + all_stocks[code] = { + "name": str(name) if name else code, + "exchange": _guess_exchange(code), + "type": "H" if len(code) == 5 else "A", + } + + # stock_profiles.json + profiles = load_json("stock_profiles.json") + if profiles: + for p in profiles.get("profiles", []): + add(p.get("code"), p.get("name")) + + # portfolio.json + pf = load_json("portfolio.json") + if pf: + for h in pf.get("holdings", []): + add(h.get("code"), h.get("name")) + + # watchlist.json + wl = load_json("watchlist.json") + if wl: + for s in wl.get("stocks", []): + add(s.get("code"), s.get("name")) + + # decisions.json + dec = load_json("decisions.json") + if dec: + for d in dec.get("decisions", []): + add(d.get("code"), d.get("name")) + + # candidate_pool.json + cp = load_json("candidate_pool.json") + if cp: + for c in cp.get("candidates", []): + add(c.get("code"), c.get("name")) + + # price_events.json + pe = load_json("price_events.json") + if pe: + for e in pe.get("events", []): + add(e.get("code"), e.get("name")) + + return all_stocks + + +def migrate_all_stocks(conn, all_stocks: dict) -> int: + """将所有收集到的股票写入 stocks 表""" + count = 0 + for code, info in all_stocks.items(): + try: + conn.execute( + "INSERT OR REPLACE INTO stocks (code, name, exchange, type, updated_at) VALUES (?, ?, ?, ?, ?)", + (code, info["name"], info["exchange"], info["type"], datetime.now().isoformat())) + count += 1 + except Exception as e: + print(f" stocks error {code}: {e}") + conn.commit() + return count + + +def migrate_holdings(conn, portfolio_data: dict) -> tuple[int, int]: + """portfolio.json → holdings + holding_strategies""" + holdings = portfolio_data.get("holdings", []) + h_count, s_count = 0, 0 + + for h in holdings: + code = _normalize_code(h.get("code", "")) + name = h.get("name", "") + shares = int(h.get("shares", 0)) + cost = h.get("cost", 0) + pos_pct = h.get("position_pct", 0) + + if not code or not name: + continue + + # holdings 表 + try: + conn.execute( + "INSERT OR REPLACE INTO holdings (code, name, shares, cost, position_pct, is_active) " + "VALUES (?, ?, ?, ?, ?, 1)", + (code, name, shares, cost, pos_pct)) + h_count += 1 + except Exception as e: + print(f" holdings error {code}: {e}") + continue + + # holding_strategies (from analysis field) + analysis = h.get("analysis", {}) + if analysis: + sl = analysis.get("stop_loss") + tp = analysis.get("take_profit") + el = analysis.get("entry_low") + eh = analysis.get("entry_high") + reason = analysis.get("action", "")[:200] if analysis.get("action") else None + reassessed = analysis.get("reassessed_at") + try: + conn.execute( + "INSERT INTO holding_strategies (code, version, stop_loss, take_profit, " + "entry_low, entry_high, strategy_type, source, reason, created_at) " + "VALUES (?, 1, ?, ?, ?, ?, 'holding', 'migrate', ?, ?)", + (code, sl, tp, el, eh, reason, reassessed or datetime.now().isoformat())) + s_count += 1 + except Exception as e: + print(f" strategy error {code}: {e}") + + conn.commit() + return h_count, s_count + + +def migrate_watchlist(conn, watchlist_data: dict) -> tuple[int, int]: + """watchlist.json → watchlist_stocks + holding_strategies""" + stocks = watchlist_data.get("stocks", []) + w_count, s_count = 0, 0 + + for s in stocks: + code = _normalize_code(s.get("code", "")) + name = s.get("name", "") + if not code or not name: + continue + + # watchlist_stocks + try: + conn.execute( + "INSERT OR REPLACE INTO watchlist_stocks (code, name, added_at, is_active) " + "VALUES (?, ?, ?, 1)", + (code, name, s.get("added_at", datetime.now().isoformat()))) + w_count += 1 + except Exception as e: + print(f" watchlist error {code}: {e}") + continue + + # holding_strategies (from strategy field) + strategy = s.get("strategy", {}) + if strategy: + buy_zone = strategy.get("buy_zone", []) + el = buy_zone[0] if len(buy_zone) > 0 else None + eh = buy_zone[1] if len(buy_zone) > 1 else None + tp_zone = strategy.get("take_profit", []) + tp = tp_zone[0] if isinstance(tp_zone, list) and len(tp_zone) > 0 else tp_zone + try: + conn.execute( + "INSERT INTO holding_strategies (code, version, stop_loss, take_profit, " + "entry_low, entry_high, strategy_type, source, created_at) " + "VALUES (?, 1, ?, ?, ?, ?, 'watch', 'migrate', ?)", + (code, strategy.get("stop_loss"), tp, el, eh, + strategy.get("updated_at", datetime.now().isoformat()))) + s_count += 1 + except Exception as e: + print(f" watch strategy error {code}: {e}") + + conn.commit() + return w_count, s_count + + +def migrate_decisions(conn, decisions_data: dict) -> int: + """decisions.json → holding_strategies (changelog history)""" + decisions = decisions_data.get("decisions", []) + count = 0 + + for d in decisions: + code = _normalize_code(d.get("code", "")) + if not code: + continue + + # 最新策略 + sl = d.get("stop_loss") + el = d.get("entry_low") + eh = d.get("entry_high") + tp = d.get("take_profit") + action = d.get("action", "")[:200] if d.get("action") else None + try: + conn.execute( + "INSERT INTO holding_strategies (code, version, stop_loss, take_profit, " + "entry_low, entry_high, strategy_type, source, reason, created_at) " + "VALUES (?, 1, ?, ?, ?, ?, 'decision', 'migrate', ?, ?)", + (code, sl, tp, el, eh, action, d.get("timestamp", datetime.now().isoformat()))) + count += 1 + except Exception as e: + print(f" decision error {code}: {e}") + continue + + # changelog 历史版本 + changelog = d.get("changelog", []) + for i, cl in enumerate(changelog): + try: + # 从 new_action 中提取止损/止盈/买入区 + new_action = cl.get("new_action", "") + # 简单提取:损XX 盈XX 买XX~XX + import re + sl_match = re.search(r'损(\d+\.?\d*)', new_action) + tp_match = re.search(r'盈(\d+\.?\d*)', new_action) + entry_match = re.search(r'买(\d+\.?\d*)~(\d+\.?\d*)', new_action) + + conn.execute( + "INSERT INTO holding_strategies (code, version, stop_loss, take_profit, " + "entry_low, entry_high, strategy_type, source, reason, created_at) " + "VALUES (?, ?, ?, ?, ?, ?, 'decision', 'migrate', ?, ?)", + (code, i + 2, + float(sl_match.group(1)) if sl_match else None, + float(tp_match.group(1)) if tp_match else None, + float(entry_match.group(1)) if entry_match else None, + float(entry_match.group(2)) if entry_match else None, + cl.get("reason", "")[:200], + cl.get("date", datetime.now().isoformat()))) + count += 1 + except Exception: + pass # 解析失败跳过 + + conn.commit() + return count + + +def migrate_candidates(conn, candidate_data: dict) -> tuple[int, int]: + """candidate_pool.json → candidates + candidate_score_history""" + candidates = candidate_data.get("candidates", []) + c_count, s_count = 0, 0 + + for c in candidates: + code = _normalize_code(c.get("code", "")) + name = c.get("name", "") + if not code or not name: + continue + + # candidates 表 + try: + conn.execute( + "INSERT OR REPLACE INTO candidates (code, name, sector, reason, entry_range, " + "stop_loss, target, zhiwei_star, zhiwei_reviewed, zhiwei_reviewed_at, " + "promoted, promoted_at, dropped, drop_reason, created_at) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + (code, name, c.get("sector"), c.get("reason"), c.get("entry_range"), + c.get("stop_loss"), c.get("target"), c.get("zhiwei_star"), + 1 if c.get("zhiwei_reviewed") else 0, c.get("zhiwei_reviewed_at"), + 1 if c.get("promoted") else 0, c.get("promoted_at"), + 1 if c.get("dropped") else 0, c.get("drop_reason"), + c.get("added_at", datetime.now().isoformat()))) + c_count += 1 + except Exception as e: + print(f" candidate error {code}: {e}") + continue + + # candidate_score_history + score_history = c.get("score_history", []) + for sh in score_history: + try: + conn.execute( + "INSERT INTO candidate_score_history (code, score, source, created_at) " + "VALUES (?, ?, 'xiaoguo', ?)", + (code, sh.get("score"), sh.get("date", datetime.now().isoformat()))) + s_count += 1 + except Exception: + pass + + conn.commit() + return c_count, s_count + + +def migrate_price_events(conn, events_data: dict) -> int: + """price_events.json → price_events 表""" + events = events_data.get("events", []) + count = 0 + for e in events: + try: + conn.execute( + "INSERT INTO price_events (code, name, event_type, price, trigger_value, event_label, date) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + (e.get("code"), e.get("name"), e.get("event_type"), + e.get("price"), e.get("trigger_value"), e.get("event_label"), + e.get("date", datetime.now().strftime("%Y-%m-%d")))) + count += 1 + except Exception: + pass + conn.commit() + return count + + +def migrate_evaluations(conn, eval_data: dict) -> int: + """evaluation.json → strategy_evaluations""" + strategies = eval_data.get("strategies", []) + count = 0 + for s in strategies: + code = _normalize_code(s.get("code", "")) + if not code: + continue + theoretical = s.get("theoretical", {}).get("strategy", {}) + try: + conn.execute( + "INSERT INTO strategy_evaluations (code, eval_type, status, " + "old_stop_loss, new_stop_loss, old_tp, new_tp, reason, created_at) " + "VALUES (?, 'migrate', 'completed', NULL, ?, NULL, ?, ?, ?)", + (code, theoretical.get("stop_loss"), theoretical.get("take_profit"), + s.get("updated_reason", "")[:200], + s.get("updated_at", datetime.now().isoformat()))) + count += 1 + except Exception: + pass + conn.commit() + return count + + +def migrate_sectors(conn) -> int: + """stock_sector_map.json → stock_sectors""" + data = load_json("stock_sector_map.json") + if not data: + return 0 + count = 0 + for code, sectors in data.items(): + if code.startswith("_") or not isinstance(sectors, list): + continue + for sector in sectors: + try: + conn.execute( + "INSERT OR IGNORE INTO stock_sectors (code, sector_name, source) VALUES (?, ?, 'ths')", + (code, sector)) + count += 1 + except Exception: + pass + conn.commit() + return count + + +def main(): + print("=" * 60) + print(" MoFin 数据迁移 → mofin.db") + print("=" * 60) + + conn = get_conn() + conn.execute("PRAGMA foreign_keys=OFF") # 迁移期间关闭外键(部分代码格式不一致) + init_all_tables(conn) + + totals = {} + + # 1. stocks (必须先迁,从所有源收集) + all_stocks = collect_all_stocks() + n = migrate_all_stocks(conn, all_stocks) + totals["stocks"] = n + print(f"\n[1/8] stocks: {n} 只 (from all sources)") + + # 2. holdings + strategies + pf = load_json("portfolio.json") + if pf: + h, s = migrate_holdings(conn, pf) + totals["holdings"] = h + totals["holding_strategies"] = s + print(f"[2/8] holdings: {h} 只, strategies: {s} 条") + + # 3. watchlist + wl = load_json("watchlist.json") + if wl: + w, s = migrate_watchlist(conn, wl) + totals["watchlist"] = w + totals["watch_strategies"] = s + print(f"[3/8] watchlist: {w} 只, strategies: {s} 条") + + # 4. decisions → strategies + dec = load_json("decisions.json") + if dec: + n = migrate_decisions(conn, dec) + totals["decision_strategies"] = n + print(f"[4/8] decisions → strategies: {n} 条") + + # 5. candidates + cp = load_json("candidate_pool.json") + if cp: + c, s = migrate_candidates(conn, cp) + totals["candidates"] = c + totals["score_history"] = s + print(f"[5/8] candidates: {c} 只, score_history: {s} 条") + + # 6. price_events + pe = load_json("price_events.json") + if pe: + n = migrate_price_events(conn, pe) + totals["price_events"] = n + print(f"[6/8] price_events: {n} 条") + + # 7. evaluations + ev = load_json("evaluation.json") + if ev: + n = migrate_evaluations(conn, ev) + totals["evaluations"] = n + print(f"[7/8] evaluations: {n} 条") + + # 8. sectors + n = migrate_sectors(conn) + totals["sector_mappings"] = n + print(f"[8/8] stock_sectors: {n} 条映射") + + conn.commit() + + # ── 验证 ── + print(f"\n{'='*60}") + print(f" 迁移完成 — 数据库验证") + print(f"{'='*60}") + tables = { + "stocks": "SELECT COUNT(*) FROM stocks", + "holdings": "SELECT COUNT(*) FROM holdings", + "holding_strategies": "SELECT COUNT(*) FROM holding_strategies", + "watchlist_stocks": "SELECT COUNT(*) FROM watchlist_stocks", + "candidates": "SELECT COUNT(*) FROM candidates", + "candidate_score_history": "SELECT COUNT(*) FROM candidate_score_history", + "price_events": "SELECT COUNT(*) FROM price_events", + "strategy_evaluations": "SELECT COUNT(*) FROM strategy_evaluations", + "stock_sectors": "SELECT COUNT(*) FROM stock_sectors", + } + for name, sql in tables.items(): + n = conn.execute(sql).fetchone()[0] + print(f" {name:<25} {n:>6} 行") + + conn.close() + print(f"\n[OK] Migration complete. JSON files untouched, safe to re-run.") + + +if __name__ == "__main__": + main() diff --git a/migrate_sectors.py b/migrate_sectors.py deleted file mode 100644 index c0271f3..0000000 --- a/migrate_sectors.py +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python3 -"""一次性迁移脚本:stock_sector_map.json → stock_sectors 表 - -运行方式: - python3 migrate_sectors.py -""" -from mofin_db import get_conn, init_all_tables, migrate_stock_sectors - -conn = get_conn() -init_all_tables(conn) -stocks, mappings = migrate_stock_sectors(conn) -print(f"迁移完成: {stocks} 只股票, {mappings} 条板块映射") - -# 验证 -rows = conn.execute("SELECT COUNT(*) FROM stock_sectors").fetchone()[0] -print(f"stock_sectors 表总行数: {rows}") - -# 样例 -for r in conn.execute("SELECT * FROM stock_sectors LIMIT 5").fetchall(): - print(f" {r[0]} → {r[1]}") - -conn.close()