Files
MoFin/scripts/data_governance.py

95 lines
3.0 KiB
Python

#!/usr/bin/env python3
"""data_governance.py — MoFin 数据治理 (no_agent)
1. holding_strategies 去重归档
2. 检查缺失策略的持仓
3. 报告数据健康状况
"""
import json, sqlite3
from pathlib import Path
from datetime import datetime
from mo_data import read_portfolio, read_decisions, read_watchlist
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
DB_PATH = DATA / "mofin.db"
DECISIONS_PATH = DATA / "decisions.json"
def clean_holding_strategies(conn):
"""归档旧策略,只保留每只股票最新一条"""
codes = conn.execute(
"SELECT code, COUNT(*) as cnt, MAX(created_at) as latest "
"FROM holding_strategies GROUP BY code HAVING cnt > 1"
).fetchall()
total_archived = 0
for code, cnt, latest in codes:
# 标记除了最新一条以外的所有记录为已归档
conn.execute(
"UPDATE holding_strategies SET superseded_at=? "
"WHERE code=? AND created_at<? AND superseded_at IS NULL",
(datetime.now().isoformat(), code, latest))
total_archived += cnt - 1
return total_archived
def check_missing_strategies(conn, decisions_list):
"""检查持仓股中哪些没有对应的策略记录"""
holdings = [d for d in decisions_list if d.get("type") == "持仓策略" and d.get("status") != "closed"]
missing = []
for h in holdings:
code = h["code"]
row = conn.execute(
"SELECT id FROM holding_strategies WHERE code=? AND superseded_at IS NULL LIMIT 1",
(code,)).fetchone()
if not row:
missing.append(h)
return missing
def main():
conn = sqlite3.connect(str(DB_PATH))
# 1. 清理 holding_strategies
archived = clean_holding_strategies(conn)
conn.commit()
remaining = conn.execute(
"SELECT COUNT(*) FROM holding_strategies WHERE superseded_at IS NULL").fetchone()[0]
print(f"holding_strategies: 归档{archived}条过期记录 | 剩余活跃{remaining}")
# 2. 检查缺失策略的持仓
decisions = mo_data.read_decisions()
decisions_list = decisions.get("decisions", [])
missing = check_missing_strategies(conn, decisions_list)
if missing:
print(f"\n⚠️ {len(missing)}只持仓没有对应策略记录:")
for m in missing:
print(f" {m.get('name','?')}({m['code']}): {m.get('type','')}")
else:
print("✅ 所有持仓都有策略记录")
# 3. 深套统计
deep = []
for d in decisions_list:
cost = d.get("cost", 0) or d.get("avg_price", 0) or 0
price = d.get("price", 0)
if cost > 0 and price > 0:
loss = (price - cost) / cost * 100
if loss < -25:
deep.append((d.get("name",""), d["code"], loss, d.get("stop_loss",0)))
if deep:
print(f"\n🔴 {len(deep)}只深套(>-25%):")
for name, code, loss, sl in deep:
print(f" {name}({code}): {loss:.0f}% 止损={sl}")
else:
print("\n✅ 无深套持仓")
conn.close()
if __name__ == "__main__":
main()