#!/usr/bin/env python3 """capital_flow_collector.py — 个股资金流数据采集器 每30分钟拉一次持仓+自选的超大单/大单/中单/小单资金流向。 输出到 capital_flow_cache.json 供 price_monitor 和报告使用。 API: push2his.eastmoney.com 个股资金流日线 """ import json, os, sys, time, urllib.request from datetime import datetime from urllib.request import urlopen, Request from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Semaphore DATA_DIR = "/home/hmo/web-dashboard/data" DECISIONS_PATH = f"{DATA_DIR}/decisions.json" CACHE_PATH = f"{DATA_DIR}/capital_flow_cache.json" UA = "Mozilla/5.0" # 限速器:最多5个并发,每请求后强制间隔0.3s RATE_LIMIT = Semaphore(5) MIN_INTERVAL = 0.3 _last_req = 0 def _rate_limited_request(url): """带速率限制的HTTP GET,用Semaphore控制并发数""" global _last_req with RATE_LIMIT: elapsed = time.time() - _last_req if elapsed < MIN_INTERVAL: time.sleep(MIN_INTERVAL - elapsed) proxy_handler = urllib.request.ProxyHandler({}) opener = urllib.request.build_opener(proxy_handler) req = Request(url, headers={"User-Agent": UA, "Referer": "https://data.eastmoney.com/"}) try: resp = opener.open(req, timeout=8) _last_req = time.time() return json.loads(resp.read().decode("utf-8")) except Exception: return None # eastmoney secid: 1=上海 0=深圳 def secid(code): code = str(code).strip() if code.startswith(("6", "9")): return f"1.{code}" return f"0.{code}" def fetch_flow(code, days=5): """拉取个股近N日资金流(带限速+代理绕过)""" sid = secid(code) url = f"http://push2his.eastmoney.com/api/qt/stock/fflow/daykline/get?secid={sid}&fields1=f1,f2,f3,f7&fields2=f51,f52,f53,f54,f55,f56,f57&lmt={days}" data = _rate_limited_request(url) if not data: return None klines = data.get("data", {}).get("klines", []) if not klines: return None result = [] for k in klines: p = k.split(",") if len(p) >= 7: result.append({ "date": p[0], "main_net": float(p[1]), # 主力净流入(元) "super_large": float(p[2]), # 超大单净流入(元) "large": float(p[3]), # 大单净流入(元) "medium": float(p[4]), # 中单净流入(元) "small": float(p[5]), # 小单净流入(元) }) return result def fetch_flow_intraday(code): """拉取当日分时资金流(用于盘中判断)""" sid = secid(code) url = f"http://push2.eastmoney.com/api/qt/stock/fflow/kline/get?secid={sid}&fields1=f1,f2,f3,f7&fields2=f51,f52,f53,f54,f55,f56,f57&klt=1&lmt=120" try: resp = urlopen(url, timeout=5) data = json.loads(resp.read().decode("utf-8")) klines = data.get("data", {}).get("klines", []) if not klines: return None latest = klines[-1].split(",") return { "main_net": float(latest[1]), "super_large": float(latest[2]), "large": float(latest[3]), } except: return None def analyze_flow(flow_data): """分析资金流模式""" if not flow_data or len(flow_data) < 2: return {} result = {"alerts": [], "pattern": ""} # 最近两日对比 d1 = flow_data[-1] # 最新日 d2 = flow_data[-2] # 前一日 # 超大单信号 sl1 = d1["super_large"] sl2 = d2["super_large"] # 连续形态判断 main_trend = sum(d["main_net"] for d in flow_data[-3:]) sl_trend = sum(d["super_large"] for d in flow_data[-3:]) # 1. 主力连续流入 if main_trend > 50000000 and sl1 > 0 and sl2 > 0: result["pattern"] = "主力持续流入" result["alerts"].append("主力连续3日净流入") # 2. 超大单突然转向(连续流入→流出 或 流出→流入) if sl1 * sl2 < 0: # 方向反转 if sl1 > 0 and sl2 < 0: result["pattern"] = "超大单由出转入" result["alerts"].append("超大单转为净买入(暗示消息即将落地)") elif sl1 < 0 and sl2 > 0: result["pattern"] = "超大单由入转出" result["alerts"].append("超大单转为净卖出(利好出货嫌疑)") # 3. 价格与资金流背离(缺当前价格作比较,在主脚本中完成) # 4. 单日暴量 max_sl = max(abs(d["super_large"]) for d in flow_data) if max_sl == abs(sl1) and abs(sl1) > 100000000: result["pattern"] = "单日资金暴量" result["alerts"].append(f"今日超大单异常: {sl1/100000000:.2f}亿") return result def main(): codes = set() # 读取持仓+自选 try: dec = json.load(open(DECISIONS_PATH)) for d in dec.get("decisions", []): c = d.get("code", "") if c: codes.add(c) except: pass all_flows = {} # 并行抓取:ThreadPoolExecutor + 内置限速器(Semaphore 5 + 0.3s间隔) code_list = sorted(codes) if not code_list: print("[capital_flow] 无代码需要采集") return def fetch_one(code): flow = fetch_flow(code, days=5) if flow: analysis = analyze_flow(flow) return (code, { "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M"), "flow": flow, "analysis": analysis, }) return (code, None) with ThreadPoolExecutor(max_workers=5) as pool: futures = {pool.submit(fetch_one, c): c for c in code_list} for f in as_completed(futures): code, result = f.result() if result: all_flows[code] = result # 写缓存 cache = { "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M"), "stocks": all_flows, } json.dump(cache, open(CACHE_PATH, "w"), indent=2, ensure_ascii=False) print(f"[capital_flow] {len(all_flows)}/{len(code_list)}只更新完成") if __name__ == "__main__": main()