Files
MoFin/scripts/capital_flow_collector.py
T

187 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""capital_flow_collector.py — 个股资金流数据采集器
每30分钟拉一次持仓+自选的超大单/大单/中单/小单资金流向。
输出到 capital_flow_cache.json 供 price_monitor 和报告使用。
API: push2his.eastmoney.com 个股资金流日线
"""
import json, os, sys, time, urllib.request
from datetime import datetime
from urllib.request import urlopen, Request
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Semaphore
from mo_data import read_portfolio, read_decisions, read_watchlist
from mofin_db import get_conn, write_capital_flow_cache
DATA_DIR = "/home/hmo/web-dashboard/data"
DECISIONS_PATH = f"{DATA_DIR}/decisions.json"
CACHE_PATH = f"{DATA_DIR}/capital_flow_cache.json"
UA = "Mozilla/5.0"
# 限速器:最多5个并发,每请求后强制间隔0.3s
RATE_LIMIT = Semaphore(5)
MIN_INTERVAL = 0.3
_last_req = 0
def _rate_limited_request(url):
"""带速率限制的HTTP GET,用Semaphore控制并发数"""
global _last_req
with RATE_LIMIT:
elapsed = time.time() - _last_req
if elapsed < MIN_INTERVAL:
time.sleep(MIN_INTERVAL - elapsed)
proxy_handler = urllib.request.ProxyHandler({})
opener = urllib.request.build_opener(proxy_handler)
req = Request(url, headers={"User-Agent": UA, "Referer": "https://data.eastmoney.com/"})
try:
resp = opener.open(req, timeout=8)
_last_req = time.time()
return json.loads(resp.read().decode("utf-8"))
except Exception:
return None
# eastmoney secid: 1=上海 0=深圳
def secid(code):
code = str(code).strip()
if code.startswith(("6", "9")):
return f"1.{code}"
return f"0.{code}"
def fetch_flow(code, days=5):
"""拉取个股近N日资金流(带限速+代理绕过)"""
sid = secid(code)
url = f"http://push2his.eastmoney.com/api/qt/stock/fflow/daykline/get?secid={sid}&fields1=f1,f2,f3,f7&fields2=f51,f52,f53,f54,f55,f56,f57&lmt={days}"
data = _rate_limited_request(url)
if not data:
return None
klines = data.get("data", {}).get("klines", [])
if not klines:
return None
result = []
for k in klines:
p = k.split(",")
if len(p) >= 7:
result.append({
"date": p[0],
"main_net": float(p[1]), # 主力净流入(元)
"super_large": float(p[2]), # 超大单净流入(元)
"large": float(p[3]), # 大单净流入(元)
"medium": float(p[4]), # 中单净流入(元)
"small": float(p[5]), # 小单净流入(元)
})
return result
def fetch_flow_intraday(code):
"""拉取当日分时资金流(用于盘中判断)"""
sid = secid(code)
url = f"http://push2.eastmoney.com/api/qt/stock/fflow/kline/get?secid={sid}&fields1=f1,f2,f3,f7&fields2=f51,f52,f53,f54,f55,f56,f57&klt=1&lmt=120"
try:
resp = urlopen(url, timeout=5)
data = json.loads(resp.read().decode("utf-8"))
klines = data.get("data", {}).get("klines", [])
if not klines:
return None
latest = klines[-1].split(",")
return {
"main_net": float(latest[1]),
"super_large": float(latest[2]),
"large": float(latest[3]),
}
except:
return None
def analyze_flow(flow_data):
"""分析资金流模式"""
if not flow_data or len(flow_data) < 2:
return {}
result = {"alerts": [], "pattern": ""}
# 最近两日对比
d1 = flow_data[-1] # 最新日
d2 = flow_data[-2] # 前一日
# 超大单信号
sl1 = d1["super_large"]
sl2 = d2["super_large"]
# 连续形态判断
main_trend = sum(d["main_net"] for d in flow_data[-3:])
sl_trend = sum(d["super_large"] for d in flow_data[-3:])
# 1. 主力连续流入
if main_trend > 50000000 and sl1 > 0 and sl2 > 0:
result["pattern"] = "主力持续流入"
result["alerts"].append("主力连续3日净流入")
# 2. 超大单突然转向(连续流入→流出 或 流出→流入)
if sl1 * sl2 < 0: # 方向反转
if sl1 > 0 and sl2 < 0:
result["pattern"] = "超大单由出转入"
result["alerts"].append("超大单转为净买入(暗示消息即将落地)")
elif sl1 < 0 and sl2 > 0:
result["pattern"] = "超大单由入转出"
result["alerts"].append("超大单转为净卖出(利好出货嫌疑)")
# 3. 价格与资金流背离(缺当前价格作比较,在主脚本中完成)
# 4. 单日暴量
max_sl = max(abs(d["super_large"]) for d in flow_data)
if max_sl == abs(sl1) and abs(sl1) > 100000000:
result["pattern"] = "单日资金暴量"
result["alerts"].append(f"今日超大单异常: {sl1/100000000:.2f}亿")
return result
def main():
codes = set()
# 读取持仓+自选
try:
dec = mo_data.read_decisions()
for d in dec.get("decisions", []):
c = d.get("code", "")
if c:
codes.add(c)
except:
pass
all_flows = {}
# 并行抓取:ThreadPoolExecutor + 内置限速器(Semaphore 5 + 0.3s间隔)
code_list = sorted(codes)
if not code_list:
print("[capital_flow] 无代码需要采集")
return
def fetch_one(code):
flow = fetch_flow(code, days=5)
if flow:
analysis = analyze_flow(flow)
return (code, {
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M"),
"flow": flow,
"analysis": analysis,
})
return (code, None)
with ThreadPoolExecutor(max_workers=5) as pool:
futures = {pool.submit(fetch_one, c): c for c in code_list}
for f in as_completed(futures):
code, result = f.result()
if result:
all_flows[code] = result
# 写缓存
cache = {
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M"),
"stocks": all_flows,
}
# 写 DB(替代 capital_flow_cache.json
conn = get_conn()
write_capital_flow_cache(conn, cache)
conn.close()
print(f"[capital_flow] {len(all_flows)}/{len(code_list)}只更新完成")
if __name__ == "__main__":
main()