#!/usr/bin/env python3 """market_watch.py — 行業熱點數據採集,寫入 dashboard data/market.json 數據源優先級: 後端A:東方財富 push2 API(首選,有板塊代碼+實時指數) 後端B:同花順 THS / akshare(降級,有漲跌家數+資金流向) 注意:當前服務器無法連通東方財富API(已被封禁/域名不可達), 實際運行時自動降級到同花順 THS 後端。THS 提供90+行業板塊的 實時漲跌、上漲/下跌家數、淨流入資金等數據,足以滿足需求。 輸出:data/market.json → MoFin Dashboard 市場數據展示 """ import json from datetime import datetime from pathlib import Path from mofin_db import get_conn, init_all_tables, write_market_snapshot DATA_DIR = Path(__file__).parent / "data" # ── 後端A:東方財富 push2 API(首選,有板塊代碼+實時指數) ── def _fetch_em(url): """通用 EM API 請求""" import urllib.request req = urllib.request.Request( url, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"} ) resp = urllib.request.urlopen(req, timeout=10) return json.loads(resp.read().decode("utf-8")) def fetch_sector_em(): """東方財富行業板塊""" try: data = _fetch_em( "https://push2.eastmoney.com/api/qt/clist/get?" "pn=1&pz=60&po=1&np=1&fields=f2,f3,f4,f12,f14&fs=m:90+t:2" ) return [{ "name": i["f14"], "code": i["f12"], "price": i.get("f2", 0), "change": i.get("f3", 0), } for i in data.get("data", {}).get("diff", [])] except Exception: return None def fetch_concept_em(): """東方財富概念板塊""" try: data = _fetch_em( "https://push2.eastmoney.com/api/qt/clist/get?" "pn=1&pz=30&po=1&np=1&fields=f2,f3,f4,f12,f14&fs=m:90+t:3" ) return [{ "name": i["f14"], "code": i["f12"], "change": i.get("f3", 0), } for i in data.get("data", {}).get("diff", [])] except Exception: return None # ── 後端B:同花順 THS / akshare(降級) ── def fetch_sector_ths(): """THS 行業板塊(含漲跌家數、資金流向、領漲股)""" try: import akshare as ak df = ak.stock_board_industry_summary_ths() return [{ "name": r["板块"], "code": "", "price": 0, "change": float(r.get("涨跌幅", 0)), "volume": float(r.get("总成交量", 0)), "turnover": float(r.get("总成交额", 0)), "net_inflow": float(r.get("净流入", 0)), "up_count": int(r.get("上涨家数", 0)), "down_count": int(r.get("下跌家数", 0)), "avg_price": float(r.get("均价", 0)), "lead_stock": r.get("领涨股", ""), "lead_stock_change": float(r.get("领涨股-涨跌幅", 0)), } for _, r in df.iterrows()] except Exception as e: print(f"THS行業失敗: {e}", flush=True) return [] def fetch_concept_ths(): """THS 概念板塊(僅名稱,無實時漲跌)""" try: import akshare as ak df = ak.stock_board_concept_name_ths() return [{ "name": r["name"], "code": str(r.get("code", "")), "change": 0, } for _, r in df.iterrows()] except Exception as e: print(f"THS概念失敗: {e}", flush=True) return [] # ── 輔助函數 ── def get_market_mood(sectors): if not sectors: return "unknown" ratio = sum(1 for s in sectors if s.get("change", 0) > 0) / len(sectors) return "bullish" if ratio > 0.7 else "neutral" if ratio > 0.4 else "bearish" def get_market_verdict(up_ratio, mood, sectors): """Return (verdict, reason) based on sector data.""" if not sectors: return "unknown", "数据不足" if up_ratio < 25: return "弱势", f"仅{up_ratio}%板块上涨,{mood}" elif up_ratio < 40: return "偏弱", f"{up_ratio}%板块上涨,结构分化" elif up_ratio < 60: return "均衡", f"{up_ratio}%板块上涨,涨跌均衡" else: return "强势", f"{up_ratio}%板块上涨,整体走强" def get_hot_sectors(sectors, top_n=3): """Return sectors with highest positive change as hot sectors.""" hot = [s for s in sectors if s.get("change", 0) > 1.0] hot.sort(key=lambda s: s.get("change", 0), reverse=True) return [{ "name": s["name"], "change": s.get("change", 0), "reason": f"板块涨{s.get('change',0):.1f}%" } for s in hot[:top_n]] def get_danger_sectors(sectors, top_n=3): """Return sectors with lowest (negative) change as danger sectors.""" danger = [s for s in sectors if s.get("change", 0) < -1.0] danger.sort(key=lambda s: s.get("change", 0)) return [{ "name": s["name"], "change": s.get("change", 0), "reason": f"板块跌{s.get('change',0):.1f}%" } for s in danger[:top_n]] # ── 主流程 ── def main(): # 行業板塊:EM → THS → 兜底 sectors = fetch_sector_em() source = "eastmoney" if sectors is None: sectors = fetch_sector_ths() source = "ths" # 概念板塊:EM → THS → 空 concepts = fetch_concept_em() concept_source = "eastmoney" if concepts is None: concepts = fetch_concept_ths() concept_source = "ths" if not concepts: concepts = [] concept_source = "unavailable" # 排序 sorted_sectors = sorted(sectors, key=lambda s: s.get("change", 0), reverse=True) top_gainers = [s for s in sorted_sectors if s.get("change", 0) > 0][:5] top_losers = [s for s in reversed(sorted_sectors) if s.get("change", 0) < 0][:3] # 计算大盘数据 up_ratio = round( sum(1 for s in sectors if s.get("change", 0) > 0) / max(len(sectors), 1) * 100, 1 ) mood = get_market_mood(sectors) verdict, verdict_reason = get_market_verdict(up_ratio, mood, sectors) market_data = { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"), "source": source, "concept_source": concept_source, "total_sectors": len(sectors), "up_ratio": up_ratio, "mood": mood, "market_verdict": verdict, "verdict_reason": verdict_reason, "hot_sectors": get_hot_sectors(sectors), "danger_sectors": get_danger_sectors(sectors), "top_gainers": top_gainers, "top_losers": top_losers, "sectors": sectors, "concepts": concepts, } DATA_DIR.mkdir(parents=True, exist_ok=True) with open(DATA_DIR / "market.json", "w", encoding="utf-8") as f: json.dump(market_data, f, ensure_ascii=False, indent=2) # ── SQLite 双写 ── conn = get_conn() init_all_tables(conn) ok, msg, sid = write_market_snapshot(conn, market_data) if ok: print(f"[DB] {msg}", flush=True) else: print(f"[DB] 写入失败(JSON 不受影响): {msg}", flush=True) conn.close() # 靜默:只寫文件,不輸出到stdout,避免cron推送 if __name__ == "__main__": main()