Files
MoFin/mo_data.py
T

199 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
mo_data.py — MoFin 统一数据读取层
替代 json.load(open(portfolio.json)) 等直接读 JSON 文件的方式。
所有数据从 DB 读取,币种字段强制存在。
用法:
from mo_data import read_portfolio, read_decisions, read_watchlist
pf = read_portfolio() # 返回和 portfolio.json 一样的 dict 结构
dec = read_decisions() # 返回和 decisions.json 一样的 dict 结构
wl = read_watchlist() # 返回和 watchlist.json 一样的 dict 结构
"""
import sqlite3, json, os
from datetime import datetime
DB_PATH = '/home/hmo/web-dashboard/data/mofin.db'
# JSON 文件路径(冷备,仅当 DB 不可用时 fallback
PORTFOLIO_JSON = '/home/hmo/web-dashboard/data/portfolio.json'
DECISIONS_JSON = '/home/hmo/web-dashboard/data/decisions.json'
WATCHLIST_JSON = '/home/hmo/web-dashboard/data/watchlist.json'
def _get_db():
db = sqlite3.connect(DB_PATH)
db.row_factory = sqlite3.Row
return db
# ── portfolio.json → holdings + portfolio_summary ─────────────────
def read_portfolio():
"""返回 portfolio.json 等价 dict。DB 优先,JSON 冷备。"""
try:
db = _get_db()
# holdings
rows = db.execute(
"SELECT code, name, shares, cost, price, market_value, "
"change_pct, currency, position_pct "
"FROM holdings WHERE is_active=1"
).fetchall()
holdings = []
for r in rows:
h = dict(r)
h['_currency'] = h.get('currency', 'CNY') # 兼容旧代码的 _currency 字段名
holdings.append(h)
# summary
sum_row = db.execute("SELECT * FROM portfolio_summary WHERE id=1").fetchone()
summary = dict(sum_row) if sum_row else {}
db.close()
return {
"holdings": holdings,
"total_assets": summary.get("total_assets", 0),
"total_mv": summary.get("total_mv", 0),
"stock_value": summary.get("stock_value", summary.get("total_mv", 0)),
"cash": summary.get("cash", 0),
"frozen_cash": summary.get("frozen_cash", 0),
"position_pct": summary.get("position_pct", 0),
"currency": summary.get("currency", "CNY"),
"updated_at": summary.get("updated_at", ""),
}
except Exception:
pass
# JSON 冷备
try:
return json.load(open(PORTFOLIO_JSON, encoding='utf-8'))
except:
return {"holdings": [], "total_assets": 0, "cash": 0, "frozen_cash": 0}
# ── decisions.json → holding_strategies ────────────────────────────
def read_decisions():
"""返回 decisions.json 等价 dict。DB 优先,JSON 冷备。"""
try:
db = _get_db()
rows = db.execute(
"SELECT code, name, version, price, cost, shares, "
"stop_loss, take_profit, entry_low, entry_high, "
"currency, strategy_type, action, timing_signal, "
"rr_ratio, tech_snapshot, stock_category, sector_context, "
"status, trigger_json, changelog_json, source, reason, "
"created_at, updated_at "
"FROM holding_strategies WHERE status IN ('active','updated') "
"ORDER BY code"
).fetchall()
decisions = []
for r in rows:
d = dict(r)
# 还原 trigger 字段
if d.get('trigger_json'):
try:
d['trigger'] = json.loads(d['trigger_json'])
except:
d['trigger'] = {}
else:
d['trigger'] = {}
# 还原 changelog 字段
if d.get('changelog_json'):
try:
d['changelog'] = json.loads(d['changelog_json'])
except:
d['changelog'] = []
else:
d['changelog'] = []
decisions.append(d)
db.close()
return {
"decisions": decisions,
"total": len(decisions),
"regenerated_at": datetime.now().strftime('%Y-%m-%d %H:%M'),
}
except Exception:
pass
try:
return json.load(open(DECISIONS_JSON, encoding='utf-8'))
except:
return {"decisions": [], "total": 0}
# ── watchlist.json → watchlist_stocks ──────────────────────────────
def read_watchlist():
"""返回 watchlist.json 等价 dict。DB 优先,JSON 冷备。"""
try:
db = _get_db()
rows = db.execute(
"SELECT code, name, price, entry_low, entry_high, "
"stop_loss, currency, source, source_detail, notes, "
"added_by, added_at, analysis_json "
"FROM watchlist_stocks WHERE is_active=1"
).fetchall()
stocks = []
for r in rows:
s = dict(r)
if s.get('source_detail'):
try: s['source_detail'] = json.loads(s['source_detail'])
except: pass
if s.get('analysis_json'):
try: s['analysis'] = json.loads(s['analysis_json'])
except: s['analysis'] = {}
else:
s['analysis'] = {}
stocks.append(s)
db.close()
return {
"stocks": stocks,
"updated_at": datetime.now().strftime('%Y-%m-%d %H:%M'),
}
except Exception:
pass
try:
return json.load(open(WATCHLIST_JSON, encoding='utf-8'))
except:
return {"stocks": []}
# ── 便捷函数 ────────────────────────────────────────────────────────
def read_portfolio_json():
"""别名(兼容旧代码直接 import 后调用)"""
return read_portfolio()
def read_decisions_json():
"""别名"""
return read_decisions()
def read_watchlist_json():
"""别名"""
return read_watchlist()
# ── 自检 ────────────────────────────────────────────────────────────
if __name__ == "__main__":
pf = read_portfolio()
print(f"portfolio: {len(pf.get('holdings',[]))} holdings, total_assets={pf.get('total_assets',0)}")
dec = read_decisions()
print(f"decisions: {len(dec.get('decisions',[]))} entries")
wl = read_watchlist()
print(f"watchlist: {len(wl.get('stocks',[]))} stocks")