#!/usr/bin/env python3 """collect_evaluation_data.py — 六维评估原始数据采集 纯数据收集脚本(no_agent),不做任何评估/判断/RR计算。 输出:data/evaluation_input.json — 供 21:00 LLM cron 使用。 采集内容: D1 宏观环境 — 五大指数(上证/深证/恒生/恒科/A50) D2 行业表现 — 持仓+自选按行业分组 D3 技术面(当前) — 今开/今高/今低/昨收/现价/成交量 D4 基本面 — PE/PB/总市值/52周高/52周低 D5 消息面 — (此脚本不采集,LLM cron web_search) D6 资金面 — 成交额/换手率/量比 日期:2026-06-18 v1 — 初始版本 """ import json import urllib.request import os import sys import re from datetime import datetime from pathlib import Path DATA_DIR = Path(__file__).parent / "data" DECISIONS_PATH = DATA_DIR / "decisions.json" PORTFOLIO_PATH = DATA_DIR / "portfolio.json" PROFILES_PATH = DATA_DIR / "stock_profiles.json" OUTPUT_PATH = DATA_DIR / "evaluation_input.json" UA = "Mozilla/5.0" def load_json(path, default=None): try: with open(path, encoding="utf-8") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} if default is None else default def save_json(path, data): Path(path).parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def fetch_tencent_data(symbols): """批量拉行情。DB 优先,腾讯 API fallback""" if not symbols: return {} # DB 优先 try: from mofin_db import get_prices_batch_from_db db = get_prices_batch_from_db(symbols) if db: return {code: {"name": "", "price": p, "prev_close": 0, "change_pct": chg or 0, "high": 0, "low": 0} for code, (p, chg) in db.items()} except: pass # Fallback: 腾讯 code_map = {} query_symbols = [] for c in symbols: sym = f"hk{c}" if len(c) == 5 else f"sh{c}" if c.startswith(("5", "6", "9")) else f"sz{c}" query_symbols.append(sym) code_map[sym] = c url = f"http://qt.gtimg.cn/q={','.join(query_symbols)}" try: req = urllib.request.Request(url, headers={"User-Agent": UA}) resp = urllib.request.urlopen(req, timeout=15) text = resp.read().decode("gbk") except Exception as e: print(f"行情拉取失败: {e}", file=sys.stderr) return {} result = {} for line in text.strip().split("\n"): line = line.strip() if not line or "=" not in line: continue raw = line.split("=", 1)[1].strip().strip('"').strip(";") fields = raw.split("~") if len(fields) < 35: continue sym = line.split("=", 1)[0].strip().lstrip("v_") orig = code_map.get(sym) if not orig: continue # 统一格式(A股和港股字段长度不同) result[orig] = fields return result def fetch_indices(): """拉五大指数""" index_codes = { "sh000001": "上证指数", "sz399001": "深证成指", "sz399006": "创业板指", "hkHSI": "恒生指数", "hkHSTECH": "恒生科技", } idx_map = {} for c, n in index_codes.items(): sym = c # 已经是完整符号 idx_map[sym] = n url = f"http://qt.gtimg.cn/q={','.join(index_codes.keys())}" try: req = urllib.request.Request(url, headers={"User-Agent": UA}) resp = urllib.request.urlopen(req, timeout=10) text = resp.read().decode("gbk") except Exception as e: print(f"指数拉取失败: {e}", file=sys.stderr) return {} result = {} for line in text.strip().split("\n"): line = line.strip() if not line or "=" not in line: continue raw = line.split("=", 1)[1].strip().strip('"').strip(";") fields = raw.split("~") if len(fields) < 33: continue sym = line.split("=", 1)[0].strip().lstrip("v_") name = idx_map.get(sym, sym) result[name] = { "price": safe_float(fields[3]), "prev_close": safe_float(fields[4]), "change_pct": safe_float(fields[32]), "high": safe_float(fields[33]), "low": safe_float(fields[34]), "timestamp": fields[30] if len(fields) > 30 else "", } return result def safe_float(v): try: return float(v) if v else None except (ValueError, TypeError): return None def parse_stock_data(code, fields, is_hk=False): """从腾讯 API 字段解析为结构化数据""" data = { "code": code, "name": fields[1] if len(fields) > 1 else code, "price": safe_float(fields[3]), "prev_close": safe_float(fields[4]), "open": safe_float(fields[5]) if not is_hk else None, "change_pct": safe_float(fields[32]), "high": safe_float(fields[33]), "low": safe_float(fields[34]), "volume": safe_float(fields[6]), # 股数 } # A股特有字段 (index 35+) if not is_hk and len(fields) > 46: data["turnover_rate"] = safe_float(fields[38]) # 换手率% data["pe"] = safe_float(fields[47]) # 市盈率(动) data["total_market_cap"] = safe_float(fields[44]) # 总市值(亿) data["circulating_market_cap"] = safe_float(fields[45]) # 流通市值(亿) data["high_52w"] = safe_float(fields[48]) # 52周高 data["low_52w"] = safe_float(fields[49]) # 52周低 data["amplitude"] = safe_float(fields[43]) # 振幅% # 港股特有字段 if is_hk and len(fields) > 70: # 港股 PE 在 [71] 左右 data["pe"] = safe_float(fields[71]) data["total_market_cap"] = safe_float(fields[69]) data["high_52w"] = safe_float(fields[48]) data["low_52w"] = safe_float(fields[49]) return data def get_sector_mapping(profiles, decisions): """ 从 stock_profiles.json 和 decisions.json 建立 {code: {name, sector, business, market, type}} 映射 """ mapping = {} # 先读 stock_profiles profile_list = profiles.get("profiles", []) if isinstance(profiles, dict) else profiles if isinstance(profile_list, list): for p in profile_list: code = p.get("code", "") if code: mapping[code] = { "name": p.get("name", ""), "sector": p.get("sector", ""), "business": p.get("business", ""), "market": p.get("market", ""), "type": p.get("type", ""), } # 再补全 decisions.json 中的信息 for d in decisions.get("decisions", []): code = d.get("code", "") if code and code not in mapping: trig = d.get("trigger", {}) mapping[code] = { "name": d.get("name", code), "sector": trig.get("sector_name", d.get("sector_name", "")), "business": "", "market": "港股" if len(code) == 5 else "A股", "type": d.get("type", "持仓策略"), } return mapping def get_portfolio_info(portfolio): """建立 {code: {cost, shares, position_pct}} 映射""" result = {} for h in portfolio.get("holdings", []): code = h.get("code", "") result[code] = { "cost": h.get("cost", 0), "shares": h.get("shares", 0), "position_pct": h.get("position_pct", 0), } return result def get_decisions_info(decisions): """提取 decisions.json 中的策略参数""" return decisions.get("decisions", []) def run(): # 加载数据 decisions = load_json(DECISIONS_PATH, {"decisions": []}) portfolio = load_json(PORTFOLIO_PATH, {"holdings": []}) profiles = load_json(PROFILES_PATH, {"profiles": []}) # 获取行业映射 sector_mapping = get_sector_mapping(profiles, decisions) # 获取持仓信息 portfolio_info = get_portfolio_info(portfolio) # 收集所有代码 all_codes = set() for d in decisions.get("decisions", []): code = d.get("code", "") if code: all_codes.add(code) for h in portfolio.get("holdings", []): code = h.get("code", "") if code: all_codes.add(code) # 区分 A/H 股 a_codes = [c for c in all_codes if len(c) != 5] hk_codes = [c for c in all_codes if len(c) == 5] # 拉行情 a_prices = fetch_tencent_data(a_codes) if a_codes else {} hk_prices = fetch_tencent_data(hk_codes) if hk_codes else {} # 拉指数 index_data = fetch_indices() # 解析个股数据 stock_data = {} for code in a_codes: if code in a_prices: stock_data[code] = parse_stock_data(code, a_prices[code], is_hk=False) for code in hk_codes: if code in hk_prices: stock_data[code] = parse_stock_data(code, hk_prices[code], is_hk=True) # 组装输出 stocks = [] all_codes_sorted = sorted(all_codes) for code in all_codes_sorted: raw = stock_data.get(code, {}) sector_info = sector_mapping.get(code, {}) port = portfolio_info.get(code, {}) strategy = None for d in decisions.get("decisions", []): if d.get("code") == code: trig = d.get("trigger", {}) strategy = { "action": trig.get("action", d.get("action", "")), "entry_zone": trig.get("entry_zone", ""), "stop_loss": trig.get("stop_loss", d.get("stop_loss", "")), "take_profit": trig.get("take_profit", d.get("take_profit", "")), "type": d.get("type", "持仓策略"), "tech_snapshot": trig.get("tech_snapshot", d.get("tech_snapshot", "")), } break stock_entry = { "code": code, "name": raw.get("name", sector_info.get("name", code)), "market": "港股" if len(code) == 5 else "A股", "type": sector_info.get("type", "持仓策略"), "sector": sector_info.get("sector", ""), "business": sector_info.get("business", ""), # 当天行情 "price": raw.get("price"), "prev_close": raw.get("prev_close"), "open": raw.get("open"), "high": raw.get("high"), "low": raw.get("low"), "change_pct": raw.get("change_pct"), "volume": raw.get("volume"), # 基本面 "pe": raw.get("pe"), "total_market_cap": raw.get("total_market_cap"), "high_52w": raw.get("high_52w"), "low_52w": raw.get("low_52w"), "turnover_rate": raw.get("turnover_rate"), "amplitude": raw.get("amplitude"), # 持仓 "cost": port.get("cost", 0), "shares": port.get("shares", 0), "position_pct": port.get("position_pct", 0), # 现策略 "strategy": strategy, } # 浮亏% cost = port.get("cost", 0) price = raw.get("price", 0) if cost > 0 and price > 0: stock_entry["pnl_pct"] = round((price - cost) / cost * 100, 2) else: stock_entry["pnl_pct"] = None stocks.append(stock_entry) # 按行业分组统计 sector_groups = {} for s in stocks: sector = s.get("sector", "未分类") if sector not in sector_groups: sector_groups[sector] = [] sector_groups[sector].append({ "code": s["code"], "name": s["name"], "change_pct": s["change_pct"], "pnl_pct": s["pnl_pct"], "type": s["type"], }) # 汇总 total = len(stocks) up_count = sum(1 for s in stocks if s["change_pct"] is not None and s["change_pct"] > 0) down_count = sum(1 for s in stocks if s["change_pct"] is not None and s["change_pct"] < 0) deep_loss = sum(1 for s in stocks if s["pnl_pct"] is not None and s["pnl_pct"] < -20) output = { "collected_at": datetime.now().isoformat(), "total_stocks": total, "summary": { "up_count": up_count, "down_count": down_count, "deep_loss_count": deep_loss, "holdings_count": len(portfolio_info), "watchlist_count": total - len(portfolio_info), }, "index_data": index_data, "sector_groups": sector_groups, "stocks": stocks, } save_json(OUTPUT_PATH, output) print(f"数据收集完成: {total}只股票, {len(index_data)}个指数, {len(sector_groups)}个行业分组") print(f" 上涨{up_count} 下跌{down_count} 深套{deep_loss}") print(f" 输出: {OUTPUT_PATH}") if __name__ == "__main__": run()