#!/usr/bin/env python3 """chip_factors.py — 筹码因子计算模块 基于中信建投《筹码分布因子系统构建》研报,实现四大类因子。 用日线数据就够了,分钟数据用于当日穿透率增强。 用法: from chip_factors import ChipFactors cf = ChipFactors() # 计算单只股票的筹码乖离率 result = cf.calc_all("600519") print(result["bias"], result["ptr"], result["ptr_today"]) # 批量计算所有持仓/自选 results = cf.batch_calc(["600519", "00700", "000700"]) """ import json, os, sqlite3, time, urllib.request from datetime import datetime, timedelta from pathlib import Path DB_PATH = Path("/home/hmo/MoFin/data/mofin.db") MOFIN_ROOT = Path("/home/hmo/MoFin") CACHE_DIR = MOFIN_ROOT / "data" / "chip_cache" # ── 分钟数据限流 ── _last_minute_call = 0 def _fetch_quote(code): """拉腾讯实时价(清代理)""" for k in list(os.environ.keys()): if 'proxy' in k.lower(): os.environ.pop(k) prefix = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk" try: url = f"http://qt.gtimg.cn/q={prefix}{code}" req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) resp = urllib.request.urlopen(req, timeout=5).read().decode('gbk') fields = resp.split('=')[1].strip().strip('"').strip(';').split('~') return float(fields[3]) if len(fields) > 3 else 0 except: return 0 def _fetch_minute_kline(code, count=60): """拉1分钟K线(带限流)""" global _last_minute_call now = time.time() if now - _last_minute_call < 1.0: time.sleep(1.0 - (now - _last_minute_call)) secid = f"1.{code}" if code.startswith(('6','5')) else f"0.{code}" url = (f"https://push2.eastmoney.com/api/qt/stock/kline/get" f"?secid={secid}&fields1=f1,f2,f3&fields2=f51,f52,f53,f54,f55,f56,f57" f"&klt=1&fqt=1&end=20500101&lmt={min(count, 240)}") try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) resp = urllib.request.urlopen(req, timeout=8) data = json.loads(resp.read())["data"]["klines"] _last_minute_call = time.time() return [line.split(",") for line in data] except: return None class ChipFactors: """筹码因子计算器""" def __init__(self): CACHE_DIR.mkdir(parents=True, exist_ok=True) self._cache = {} # code → {last_chip, winner, bias} self._load_cache() def _load_cache(self): """加载缓存的筹码状态""" for f in CACHE_DIR.glob("*.json"): code = f.stem try: with open(f) as fp: self._cache[code] = json.load(fp) except: pass def _save_cache(self, code): """保存筹码状态""" if code in self._cache: path = CACHE_DIR / f"{code}.json" with open(path, "w") as fp: json.dump(self._cache[code], fp, ensure_ascii=False) # ── 筹码分布估算(用日线OHLCV) ── def _build_chip_distribution(self, code): """从日线K线估算筹码分布。 原理:假设每日成交量在OHLC区间内均匀分布, 每根K线的成交量按价格区间分配,累积成筹码分布。 """ for k in list(os.environ.keys()): if 'proxy' in k.lower(): os.environ.pop(k) # 从腾讯API取60日K线 prefix = "sh" if code.startswith(('60','68','51')) else "sz" if code.startswith(('00','30','15')) else "hk" url = f"http://ifzq.gtimg.cn/appstock/app/fqkline/get?param={prefix}{code},day,,,640,qfq" try: opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) resp = opener.open(req, timeout=8).read().decode('utf-8') data = json.loads(resp) day_key = 'qfqday' if prefix != 'hk' else 'day' bars = data.get('data', {}).get(f'{prefix}{code}', {}).get(day_key, []) except: return {} # 估算筹码分布:价格区间 → 累积量 chip_dist = {} # price_level → accumulated_volume decay = 0.97 # 每日衰减因子(老筹码逐步换手) for bar in bars: try: if len(bar) < 6: continue high = float(bar[3]) # index 3 = high low = float(bar[4]) # index 4 = low volume = float(bar[5]) if len(bar) > 5 else 0 # index 5 = volume if high <= low or volume <= 0: continue # 在OHLC区间均匀分配成交量 step = max(round((high - low) / 5, 2), 0.01) level = round(low, 2) vol_per_level = volume / max(int((high - low) / step) + 1, 1) while level <= high: chip_dist[level] = chip_dist.get(level, 0) + vol_per_level level = round(level + step, 2) except: continue # 衰减老筹码 total = sum(chip_dist.values()) if total > 0: for k in chip_dist: chip_dist[k] *= decay return chip_dist # ── 三大因子计算 ── def calc_all(self, code, name="", price=None): """计算全部筹码因子,返回dict""" result = {"code": code, "name": name, "price": price} # 获取当前价(如果没传) if not price: price = _fetch_quote(code) if not price: return result result["price"] = price # 构建筹码分布 chip_dist = self._build_chip_distribution(code) if not chip_dist or price <= 0: return result # 计算盈利/亏损筹码占比 total_vol = sum(chip_dist.values()) if total_vol <= 0: return result winner_vol = sum(v for k, v in chip_dist.items() if k <= price) # 盈利筹码(cost≤现价) loser_vol = total_vol - winner_vol # 亏损筹码(cost>现价) winner_pct = winner_vol / total_vol loser_pct = loser_vol / total_vol # 获取前日状态 prev = self._cache.get(code, {}) prev_winner = prev.get("winner_pct", winner_pct) prev_bias = prev.get("bias", 0) # 估算换手率(近10日均量/总流通股) turnover = 0.02 # 默认2% try: prefix2 = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk" url2 = f"http://ifzq.gtimg.cn/appstock/app/fkline/get?param={prefix2}{code},day,,,10,qfq" req2 = urllib.request.Request(url2, headers={"User-Agent": "Mozilla/5.0"}) resp2 = urllib.request.urlopen(req2, timeout=5).read().decode('utf-8') data2 = json.loads(resp2) dk = 'qfqday' if prefix2 != 'hk' else 'day' bars2 = data2.get('data', {}).get(f'{prefix2}{code}', {}).get(dk, []) if len(bars2) > 5: avg_vol = sum(float(b[5]) for b in bars2[-10:] if len(b)>5) / min(len(bars2), 10) # 用近60日最高量估算总流通股 max_vol = avg_vol * 50 # 估算值 turnover = min(avg_vol / max(max_vol, 1), 0.3) except: pass # 1. 筹码穿透率 PTR = (winner_pct - prev_winner) / turnover ptr = (winner_pct - prev_winner) / max(turnover, 0.001) # 2. 当日筹码穿透率(简化版) = 今日量 / 总筹码 / turnover ptr_today = 0 minute_data = _fetch_minute_kline(code, count=30) if minute_data: today_vol = sum(float(m[5]) for m in minute_data if len(m) > 5) ptr_today = today_vol / max(total_vol, 1) / max(turnover, 0.001) # 3. 筹码乖离率(亏损版本—按文章发现,亏损筹码版反而最强) # bias = loser_pct * turnover + prev_bias * (1 - turnover) bias = loser_pct * turnover + prev_bias * (1 - turnover) # 更新缓存 self._cache[code] = { "winner_pct": winner_pct, "loser_pct": loser_pct, "bias": bias, "updated_at": datetime.now().isoformat() } self._save_cache(code) return { "code": code, "name": name, "price": price, "winner_pct": round(winner_pct, 4), "loser_pct": round(loser_pct, 4), "ptr": round(ptr, 4), "ptr_today": round(ptr_today, 4), "bias": round(bias, 4), "turnover": round(turnover, 4), } def batch_calc(self, stocks): """批量计算多只股票""" results = [] for i, (code, name) in enumerate(stocks): if i > 0: time.sleep(1.5) # 限流 result = self.calc_all(code, name) results.append(result) return results # ── 主入口 ── if __name__ == "__main__": import sys cf = ChipFactors() # 从decisions.json获取持仓+自选 dec = json.loads(open(str(MOFIN_ROOT / "data" / "decisions.json")).read()) stocks = [(s["code"], s.get("name","")) for s in dec.get("decisions", []) if s.get("status") != "closed"] results = cf.batch_calc(stocks) # 按bias排序显示(亏损筹码占比最高的排前面) results.sort(key=lambda r: r.get("bias", 0), reverse=True) print(f"{'股票':16} {'亏损筹码%':>10} {'PTR':>8} {'乖离率':>8} {'换手率':>8}") print("-" * 60) for r in results: if r.get("price"): print(f"{r['name']:8}({r['code']:6}) {r['loser_pct']*100:>8.1f}% {r['ptr']:>8.4f} {r['bias']:>8.4f} {r['turnover']*100:>6.1f}%") print(f"\n共计算{len(results)}只股票") print(f"筹码缓存目录: {CACHE_DIR}")