feat: DB migration — enforce currency constraints on holdings/strategies/watchlist/summary tables + price_monitor DB writes

This commit is contained in:
知微
2026-06-30 23:46:08 +08:00
parent 236e67fa71
commit dc68739ac4
2 changed files with 1660 additions and 1478 deletions
+159 -3
View File
@@ -128,6 +128,10 @@ def init_all_tables(conn: sqlite3.Connection):
name TEXT NOT NULL,
shares INTEGER NOT NULL,
cost REAL,
price REAL, -- 当前价格 (CNY)
market_value REAL, -- 市值 = shares * price
change_pct REAL, -- 涨跌幅
currency TEXT NOT NULL DEFAULT 'CNY' CHECK(currency IN ('CNY','HKD')),
position_pct REAL,
added_at TEXT,
is_active INTEGER DEFAULT 1,
@@ -135,29 +139,55 @@ def init_all_tables(conn: sqlite3.Connection):
close_pnl REAL
);
-- 持仓策略
-- 持仓策略(对应 decisions.json decisions[]
CREATE TABLE IF NOT EXISTS holding_strategies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code TEXT NOT NULL REFERENCES holdings(code),
name TEXT,
version INTEGER DEFAULT 1,
price REAL, -- 当前价格
cost REAL, -- 成本价
shares INTEGER DEFAULT 0,
stop_loss REAL,
take_profit REAL,
entry_low REAL,
entry_high REAL,
currency TEXT NOT NULL DEFAULT 'CNY' CHECK(currency IN ('CNY','HKD')),
strategy_type TEXT DEFAULT 'holding',
action TEXT, -- 买入/持有/卖出/观望
timing_signal TEXT, -- 时机信号
rr_ratio REAL, -- 盈亏比
tech_snapshot TEXT, -- 技术面快照
stock_category TEXT, -- 股票分类
sector_context TEXT, -- 板块背景
status TEXT DEFAULT 'active', -- active/updated/closed
trigger_json TEXT, -- trigger JSON (entry_zone/stop_loss/take_profit_zone)
changelog_json TEXT, -- changelog JSON 数组
source TEXT,
reason TEXT,
created_at TEXT DEFAULT (datetime('now','localtime')),
updated_at TEXT,
superseded_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_strategy_code ON holding_strategies(code);
CREATE INDEX IF NOT EXISTS idx_strategy_status ON holding_strategies(status);
-- 自选股
CREATE TABLE IF NOT EXISTS watchlist_stocks (
code TEXT PRIMARY KEY REFERENCES stocks(code),
name TEXT NOT NULL,
price REAL, -- 当前价格
entry_low REAL, -- 买入区下限
entry_high REAL, -- 买入区上限
stop_loss REAL, -- 止损
currency TEXT NOT NULL DEFAULT 'CNY' CHECK(currency IN ('CNY','HKD')),
source TEXT, -- 来源: alpha_sift/xiaoguo/manual
source_detail TEXT, -- 来源详情 JSON
notes TEXT, -- 备注
added_by TEXT, -- 谁加的
added_at TEXT DEFAULT (datetime('now','localtime')),
is_active INTEGER DEFAULT 1
is_active INTEGER DEFAULT 1,
analysis_json TEXT -- 分析结果 JSON
);
-- 候选池
@@ -223,10 +253,13 @@ def init_all_tables(conn: sqlite3.Connection):
CREATE TABLE IF NOT EXISTS portfolio_summary (
id INTEGER PRIMARY KEY CHECK (id = 1),
total_assets REAL,
total_mv REAL, -- 持仓总市值
stock_value REAL,
cash REAL,
cash REAL, -- 可用现金
frozen_cash REAL DEFAULT 0, -- 冻结资金
position_pct REAL,
total_pnl REAL,
currency TEXT NOT NULL DEFAULT 'CNY' CHECK(currency IN ('CNY','HKD')),
updated_at TEXT
);
@@ -788,3 +821,126 @@ def query_latest_market(conn: sqlite3.Connection) -> dict:
snap["top_gainers"] = [dict(r) for r in sectors[:5]]
snap["top_losers"] = [dict(r) for r in sectors[-3:]]
return snap
# ═══════════════════════════════════════════════════════════════════
# 核心写函数 — 替代 json.dump(),强制币种约束
# ═══════════════════════════════════════════════════════════════════
def write_holding_strategy(conn, code: str, name: str, data: dict) -> tuple[bool, str]:
"""写入持仓策略(替代 decisions.json 单条写入)。data 必须包含 currency。"""
try:
currency = data.get('currency', 'CNY')
conn.execute("""
INSERT INTO holding_strategies
(code, name, version, price, cost, shares, stop_loss, take_profit,
entry_low, entry_high, currency, strategy_type, action,
timing_signal, rr_ratio, tech_snapshot, stock_category,
sector_context, status, trigger_json, changelog_json,
source, reason, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,datetime('now','localtime'))
ON CONFLICT(code) DO UPDATE SET
name=excluded.name, price=excluded.price, cost=excluded.cost,
shares=excluded.shares, stop_loss=excluded.stop_loss,
take_profit=excluded.take_profit, entry_low=excluded.entry_low,
entry_high=excluded.entry_high, currency=excluded.currency,
action=excluded.action, timing_signal=excluded.timing_signal,
rr_ratio=excluded.rr_ratio, tech_snapshot=excluded.tech_snapshot,
stock_category=excluded.stock_category,
sector_context=excluded.sector_context, status=excluded.status,
trigger_json=excluded.trigger_json, changelog_json=excluded.changelog_json,
source=excluded.source, reason=excluded.reason,
updated_at=datetime('now','localtime')
""", (
code, name,
data.get('version', 1), data.get('price'), data.get('cost'),
data.get('shares', 0), data.get('stop_loss'), data.get('take_profit'),
data.get('entry_low'), data.get('entry_high'), currency,
data.get('strategy_type', 'holding'), data.get('action'),
data.get('timing_signal'), data.get('rr_ratio'),
data.get('tech_snapshot'), data.get('stock_category'),
data.get('sector_context'), data.get('status', 'active'),
data.get('trigger_json'), data.get('changelog_json'),
data.get('source'), data.get('reason'),
))
conn.commit()
return True, f"策略 {code} 已写入"
except sqlite3.IntegrityError as e:
return False, f"币种约束: {e}"
except Exception as e:
return False, str(e)
def write_holdings_batch(conn, holdings: list[dict]) -> tuple[bool, str]:
"""批量写入持仓(替代 portfolio.json holdings[]"""
try:
for h in holdings:
conn.execute("""
INSERT INTO holdings (code, name, shares, cost, price, market_value,
change_pct, currency, position_pct, added_at, is_active)
VALUES (?,?,?,?,?,?,?,?,?,datetime('now','localtime'),1)
ON CONFLICT(code) DO UPDATE SET
name=excluded.name, shares=excluded.shares, cost=excluded.cost,
price=excluded.price, market_value=excluded.market_value,
change_pct=excluded.change_pct, currency=excluded.currency,
position_pct=excluded.position_pct
""", (
h.get('code'), h.get('name'), h.get('shares', 0),
h.get('cost'), h.get('price'),
h.get('market_value'), h.get('change_pct'),
h.get('currency', 'CNY'), h.get('position_pct'),
))
conn.commit()
return True, f"已写入 {len(holdings)} 条持仓"
except sqlite3.IntegrityError as e:
conn.rollback()
return False, f"币种约束: {e}"
def write_portfolio_summary(conn, data: dict) -> tuple[bool, str]:
"""写入持仓汇总(替代 portfolio.json 顶层)"""
try:
conn.execute("""
INSERT INTO portfolio_summary (id, total_assets, total_mv, stock_value,
cash, frozen_cash, position_pct, total_pnl, currency, updated_at)
VALUES (1,?,?,?,?,?,?,?,?,datetime('now','localtime'))
ON CONFLICT(id) DO UPDATE SET
total_assets=excluded.total_assets, total_mv=excluded.total_mv,
stock_value=excluded.stock_value, cash=excluded.cash,
frozen_cash=excluded.frozen_cash, position_pct=excluded.position_pct,
total_pnl=excluded.total_pnl, currency=excluded.currency,
updated_at=datetime('now','localtime')
""", (
data.get('total_assets'), data.get('total_mv'), data.get('stock_value'),
data.get('cash'), data.get('frozen_cash', 0), data.get('position_pct'),
data.get('total_pnl'), data.get('currency', 'CNY'),
))
conn.commit()
return True, "汇总已写入"
except sqlite3.IntegrityError as e:
return False, f"约束: {e}"
def write_watchlist_stock(conn, stock: dict) -> tuple[bool, str]:
"""写入自选股(替代 watchlist.json"""
try:
conn.execute("""
INSERT INTO watchlist_stocks (code, name, price, entry_low, entry_high,
stop_loss, currency, source, source_detail, notes, added_by, added_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,datetime('now','localtime'))
ON CONFLICT(code) DO UPDATE SET
name=excluded.name, price=excluded.price, entry_low=excluded.entry_low,
entry_high=excluded.entry_high, stop_loss=excluded.stop_loss,
currency=excluded.currency, source=excluded.source,
source_detail=excluded.source_detail, notes=excluded.notes,
added_by=excluded.added_by
""", (
stock.get('code'), stock.get('name'), stock.get('price'),
stock.get('entry_low'), stock.get('entry_high'), stock.get('stop_loss'),
stock.get('currency', 'CNY'), stock.get('source'), stock.get('source_detail'),
stock.get('notes'), stock.get('added_by'),
))
conn.commit()
return True, f"自选 {stock.get('code')} 已写入"
except sqlite3.IntegrityError as e:
return False, f"约束: {e}"
+28 -2
View File
@@ -12,6 +12,7 @@ from datetime import datetime
# ── MoFin unified model ──────────────────────────────────────────────
from mo_models import is_hk_stock, get_hk_rate, calc_total_assets, calc_total_mv, calc_position_pct
from mofin_db import get_conn, write_holdings_batch, write_portfolio_summary, write_price_event, write_watchlist_stock
DECISIONS_PATH = "/home/hmo/web-dashboard/data/decisions.json"
PORTFOLIO_PATH = "/home/hmo/web-dashboard/data/portfolio.json"
@@ -261,13 +262,20 @@ def refresh_data_prices():
changed = True
if changed:
pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M')
# 统一计算总资产(mo_models 唯一公式)
pf['total_mv'] = calc_total_mv(pf.get('holdings', []))
pf['total_assets'] = calc_total_assets(pf)
pf['position_pct'] = calc_position_pct(pf)
# DB 写入(替代 json.dump,强制币种约束)
try:
conn = get_conn()
write_holdings_batch(conn, pf['holdings'])
write_portfolio_summary(conn, pf)
conn.close()
except Exception as e:
print(f" [DB写入失败] {e}", flush=True)
# 保留 JSON 副本作为冷备
json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2)
elif pf.get('updated_at'):
# 即使价格无变化,每10分钟刷新一次updated_at,防健康检查误报
try:
last_ts = datetime.strptime(pf['updated_at'], '%Y-%m-%d %H:%M')
if (datetime.now() - last_ts).total_seconds() > 600:
@@ -293,6 +301,16 @@ def refresh_data_prices():
changed = True
if changed:
wl['updated_at'] = datetime.now().isoformat()
# DB 写入(替代 json.dump
try:
conn = get_conn()
for s in wl.get('stocks', []):
s['currency'] = 'CNY' # 自选股价格统一CNY
write_watchlist_stock(conn, s)
conn.close()
except Exception as e:
print(f" [DB watchlist写入失败] {e}", flush=True)
# 保留 JSON 冷备
json.dump(wl, open(WATCHLIST_PATH, 'w'), ensure_ascii=False, indent=2)
# --- 汇总值重算(使用 mo_models 唯一公式)---
@@ -307,6 +325,14 @@ def refresh_data_prices():
if pf['total_assets'] > 0:
pf['position_pct'] = calc_position_pct(pf)
pf['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M')
# DB 写入
try:
conn = get_conn()
write_portfolio_summary(conn, pf)
conn.close()
except Exception as e:
print(f" [DB汇总写入失败] {e}", flush=True)
# JSON 冷备
json.dump(pf, open(PORTFOLIO_PATH, 'w'), ensure_ascii=False, indent=2)
except Exception as e:
print(f" [汇总重算失败] {e}", flush=True)