Files
MoFin/market_watch.py
T
hmo 0924cf3124 refactor: 数据层重构 — 统一 SQLite 访问层 + 多脚本双写
新建 mofin_db.py 共享数据库模块:
- get_conn() 统一连接管理 (WAL + Row factory + 外键)
- init_all_tables() 幂等建表 (12张表: market/sector/stock/kline/fundamentals/sectors/holdings/strategies/watchlist/candidates/score_history/events/evaluations)
- write_market_snapshot() 市场快照双写
- write_klines() K线数据双写 (stocks + daily/weekly/monthly + fundamentals)
- write_price_event() 价格事件双写
- migrate_stock_sectors() 一次性迁移 stock_sector_map.json
- query_*() 通用查询函数 (sector_trend/top_inflow/consecutive_inflow/market_mood/db_stats)

重构现有脚本:
- market_watch.py: 删除内联 DB 代码,改用 mofin_db
- multi_timeframe.py: _save_local_history() 加 SQLite 双写
- price_monitor.py: record_event() 加 SQLite 双写
- mofin_query.py: 改用 mofin_db 查询函数

新增:
- migrate_sectors.py: 一次性迁移脚本

清理:
- get_realtime_prices.py: 死代码 (只读 portfolio.json,不调API)
2026-06-20 16:26:17 +08:00

179 lines
5.6 KiB
Python

#!/usr/bin/env python3
"""market_watch.py — 行業熱點數據採集,寫入 dashboard data/market.json
數據源優先級:
後端A:東方財富 push2 API(首選,有板塊代碼+實時指數)
後端B:同花順 THS / akshare(降級,有漲跌家數+資金流向)
注意:當前服務器無法連通東方財富API(已被封禁/域名不可達),
實際運行時自動降級到同花順 THS 後端。THS 提供90+行業板塊的
實時漲跌、上漲/下跌家數、淨流入資金等數據,足以滿足需求。
輸出:data/market.json → MoFin Dashboard 市場數據展示
"""
import json
from datetime import datetime
from pathlib import Path
from mofin_db import get_conn, init_all_tables, write_market_snapshot
DATA_DIR = Path(__file__).parent / "data"
# ── 後端A:東方財富 push2 API(首選,有板塊代碼+實時指數) ──
def _fetch_em(url):
"""通用 EM API 請求"""
import urllib.request
req = urllib.request.Request(
url,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
)
resp = urllib.request.urlopen(req, timeout=10)
return json.loads(resp.read().decode("utf-8"))
def fetch_sector_em():
"""東方財富行業板塊"""
try:
data = _fetch_em(
"https://push2.eastmoney.com/api/qt/clist/get?"
"pn=1&pz=60&po=1&np=1&fields=f2,f3,f4,f12,f14&fs=m:90+t:2"
)
return [{
"name": i["f14"],
"code": i["f12"],
"price": i.get("f2", 0),
"change": i.get("f3", 0),
} for i in data.get("data", {}).get("diff", [])]
except Exception:
return None
def fetch_concept_em():
"""東方財富概念板塊"""
try:
data = _fetch_em(
"https://push2.eastmoney.com/api/qt/clist/get?"
"pn=1&pz=30&po=1&np=1&fields=f2,f3,f4,f12,f14&fs=m:90+t:3"
)
return [{
"name": i["f14"],
"code": i["f12"],
"change": i.get("f3", 0),
} for i in data.get("data", {}).get("diff", [])]
except Exception:
return None
# ── 後端B:同花順 THS / akshare(降級) ──
def fetch_sector_ths():
"""THS 行業板塊(含漲跌家數、資金流向、領漲股)"""
try:
import akshare as ak
df = ak.stock_board_industry_summary_ths()
return [{
"name": r["板块"],
"code": "",
"price": 0,
"change": float(r.get("涨跌幅", 0)),
"volume": float(r.get("总成交量", 0)),
"turnover": float(r.get("总成交额", 0)),
"net_inflow": float(r.get("净流入", 0)),
"up_count": int(r.get("上涨家数", 0)),
"down_count": int(r.get("下跌家数", 0)),
"avg_price": float(r.get("均价", 0)),
"lead_stock": r.get("领涨股", ""),
"lead_stock_change": float(r.get("领涨股-涨跌幅", 0)),
} for _, r in df.iterrows()]
except Exception as e:
print(f"THS行業失敗: {e}", flush=True)
return []
def fetch_concept_ths():
"""THS 概念板塊(僅名稱,無實時漲跌)"""
try:
import akshare as ak
df = ak.stock_board_concept_name_ths()
return [{
"name": r["name"],
"code": str(r.get("code", "")),
"change": 0,
} for _, r in df.iterrows()]
except Exception as e:
print(f"THS概念失敗: {e}", flush=True)
return []
# ── 輔助函數 ──
def get_market_mood(sectors):
if not sectors:
return "unknown"
ratio = sum(1 for s in sectors if s.get("change", 0) > 0) / len(sectors)
return "bullish" if ratio > 0.7 else "neutral" if ratio > 0.4 else "bearish"
# ── 主流程 ──
def main():
# 行業板塊:EM → THS → 兜底
sectors = fetch_sector_em()
source = "eastmoney"
if sectors is None:
sectors = fetch_sector_ths()
source = "ths"
# 概念板塊:EM → THS → 空
concepts = fetch_concept_em()
concept_source = "eastmoney"
if concepts is None:
concepts = fetch_concept_ths()
concept_source = "ths"
if not concepts:
concepts = []
concept_source = "unavailable"
# 排序
sorted_sectors = sorted(sectors, key=lambda s: s.get("change", 0), reverse=True)
top_gainers = [s for s in sorted_sectors if s.get("change", 0) > 0][:5]
top_losers = [s for s in reversed(sorted_sectors) if s.get("change", 0) < 0][:3]
market_data = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"),
"source": source,
"concept_source": concept_source,
"total_sectors": len(sectors),
"up_ratio": round(
sum(1 for s in sectors if s.get("change", 0) > 0) / max(len(sectors), 1) * 100, 1
),
"mood": get_market_mood(sectors),
"top_gainers": top_gainers,
"top_losers": top_losers,
"sectors": sectors,
"concepts": concepts,
}
DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(DATA_DIR / "market.json", "w", encoding="utf-8") as f:
json.dump(market_data, f, ensure_ascii=False, indent=2)
# ── SQLite 双写 ──
conn = get_conn()
init_all_tables(conn)
ok, msg, sid = write_market_snapshot(conn, market_data)
if ok:
print(f"[DB] {msg}", flush=True)
else:
print(f"[DB] 写入失败(JSON 不受影响): {msg}", flush=True)
conn.close()
# 靜默:只寫文件,不輸出到stdout,避免cron推送
if __name__ == "__main__":
main()