Files
MoFin/data_freshness.py
T

64 lines
1.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""data_freshness.py — 数据新鲜度校验
所有报告管道在生成输出前必须调用 check_fresh()。
返回 (pass: bool, details: str),如果数据过期则阻止生成操作建议。
用法:
from data_freshness import check_fresh
ok, msg = check_fresh()
if not ok:
print(f"⚠️ 数据过期: {msg}")
sys.exit(0) # 不生成报告
校验规则:
- 盘中 (9:30~15:00)price/live_prices.json 必须在 5 分钟内刷新
- 盘后 (9:30以前/15:00以后):允许最长 120 分钟
- 周末/节假日:跳过校验
"""
import json, os
from datetime import datetime, timedelta
from mo_data import read_portfolio, read_decisions, read_watchlist
# live_prices.json 已废弃,所有数据走 DB
LIVE_PRICES_PATH = "/home/hmo/web-dashboard/data/live_prices.json"
def is_market_hours():
now = datetime.now()
if now.weekday() >= 5:
return False, "weekend"
t = now.hour * 60 + now.minute
if 9*60+30 <= t <= 15*60:
return True, "trading"
return False, "closed"
def check_fresh():
"""返回 (ok: bool, msg: str)"""
now = datetime.now()
in_market, period = is_market_hours()
max_age_min = 5 if in_market else 120
# 主指标:DB portfolio_summary.updated_at
try:
pf = read_portfolio()
pf_time = pf.get("updated_at", "")
if pf_time:
pf_dt = datetime.fromisoformat(pf_time)
age = (now - pf_dt).total_seconds() / 60
if age > max_age_min:
return False, f"数据已 {age:.0f} 分钟未更新(阈值 {max_age_min} 分钟)"
return True, f"数据新鲜({age:.0f} 分钟前)"
except Exception as e:
return False, f"DB 读取失败: {e}"
return False, "无法获取数据新鲜度"
if __name__ == "__main__":
ok, msg = check_fresh()
print(f"{'' if ok else ''} {msg}")