#!/usr/bin/env python3 """ stock_quote.py — 统一股票行情查询工具(唯一权威价格源) 用法: python3 stock_quote.py 688411 # 单个A股 python3 stock_quote.py 688411 01211 300750 # 批量 python3 stock_quote.py --all-holdings # 所有持仓 输出:每只股票一行JSON,格式统一、无歧义。 LLM 禁止直接解析新浪/腾讯原始CSV,只能读本脚本输出。 数据来源(按优先级降序): A股: 东财push2 → 新浪hq → 腾讯qt 港股: 东财并行限速(5 workers) → 新浪批量 → 腾讯15min延迟(兜底) 验证规则: - price 必须在 [low, high] 范围内 - change_pct 必须与 (price - prev_close) / prev_close 一致(±0.1%容差) - 任一验证失败 → 该数据源降级,尝试下个源 """ import json, sys, re, time, urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Semaphore from pathlib import Path from datetime import datetime UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" DATA_DIR = Path("/home/hmo/MoFin/data") # ── 工具 ── def _http_get(url, headers=None, timeout=10): """带代理绕过的HTTP GET""" req = urllib.request.Request(url, headers=headers or {"User-Agent": UA}) proxy_handler = urllib.request.ProxyHandler({}) opener = urllib.request.build_opener(proxy_handler) try: with opener.open(req, timeout=timeout) as r: return r.read().decode("utf-8", errors="replace") except Exception: return None def _detect_market(code): """自动识别A股/港股 A股代码固定6位,港股代码5位(含前缀0如00700)。 """ code = str(code).strip() if len(code) == 6: return "ashare" if len(code) <= 5: return "hk" return "unknown" def _sym(code, market): """转成API可用的symbol""" if market == "ashare": pre = "sh" if code.startswith(("5", "6", "9")) else "sz" return pre + code, code return code, code # HK codes used directly # ── 数据源:东方财富(A股+港股通用) ── def _em_ashare(code): """东财A股API""" url = f"https://push2.eastmoney.com/api/qt/stock/get?secid=1.{code}&fields=f43,f44,f45,f46,f47,f50,f57,f58,f60,f170,f169&fltt=2" raw = _http_get(url, headers={"User-Agent": UA, "Referer": "https://quote.eastmoney.com/"}) if not raw: return None try: resp = json.loads(raw) if resp.get("rc") != 0: return None d = resp.get("data") if not d: return None price = d.get("f43") if price is None or float(price) <= 0: return None return { "code": code, "name": None, # 东财不返名称 "price": float(price), "change_pct": float(d.get("f170", 0)) if d.get("f170") is not None else None, "high": float(d.get("f44", 0)) if d.get("f44") else None, "low": float(d.get("f45", 0)) if d.get("f45") else None, "open": float(d.get("f46", 0)) if d.get("f46") else None, "prev_close": float(d.get("f60", 0)) if d.get("f60") else None, "volume": d.get("f47"), "amount": d.get("f50"), "source": "eastmoney", } except (json.JSONDecodeError, ValueError, TypeError): return None def _em_hk(code): """东财港股API(单股)""" url = f"https://push2.eastmoney.com/api/qt/stock/get?secid=116.{code}&fields=f43,f44,f45,f46,f47,f50,f57,f58,f60,f170,f169&fltt=2" raw = _http_get(url, headers={"User-Agent": UA, "Referer": "https://quote.eastmoney.com/"}) if not raw: return None try: resp = json.loads(raw) if resp.get("rc") != 0: return None d = resp.get("data") if not d: return None price = d.get("f43") if price is None or float(price) <= 0: return None return { "code": code, "name": None, "price": float(price), "change_pct": float(d.get("f170", 0)) if d.get("f170") is not None else None, "high": float(d.get("f44", 0)) if d.get("f44") else None, "low": float(d.get("f45", 0)) if d.get("f45") else None, "open": float(d.get("f46", 0)) if d.get("f46") else None, "prev_close": float(d.get("f60", 0)) if d.get("f60") else None, "volume": d.get("f47"), "amount": d.get("f50"), "source": "eastmoney", } except (json.JSONDecodeError, ValueError, TypeError): return None # ── 数据源:新浪(A股) ── def _sina_ashare(code): """新浪A股API(批量)""" pre = "sh" if code.startswith(("5", "6", "9")) else "sz" url = f"https://hq.sinajs.cn/list={pre}{code}" raw = _http_get(url, headers={ "User-Agent": UA, "Referer": "https://finance.sina.com.cn", }) if not raw: return None try: fields = raw.split("\"")[1].split(",") # 新浪格式: name,open,prev_close,current,high,low,buy,sell,volume,amount if len(fields) < 8: return None name = fields[0].strip() price = float(fields[3]) if fields[3] else 0 prev_close = float(fields[2]) if fields[2] else 0 if price <= 0: return None change_pct = round((price - prev_close) / prev_close * 100, 2) if prev_close > 0 else None return { "code": code, "name": name, "price": price, "change_pct": change_pct, "high": float(fields[4]) if fields[4] else None, "low": float(fields[5]) if fields[5] else None, "open": float(fields[1]) if fields[1] else None, "prev_close": prev_close, "volume": int(fields[8]) if len(fields) > 8 and fields[8] else None, "amount": float(fields[9]) if len(fields) > 9 and fields[9] else None, "source": "sina", } except (ValueError, IndexError): return None # ── 数据源:腾讯(A股兜底) ── def _tencent_ashare(code): """腾讯A股API(批量,15min延迟兜底)""" pre = "sh" if code.startswith(("5", "6", "9")) else "sz" url = f"https://qt.gtimg.cn/q={pre}{code}" raw = _http_get(url, headers={"User-Agent": UA}) if not raw: return None try: fields = raw.split("~") # Tencent格式: ~分隔 if len(fields) < 10: return None name = fields[1] price = float(fields[3]) if fields[3] else 0 prev_close = float(fields[4]) if fields[4] else 0 if price <= 0: return None change_pct = round((price - prev_close) / prev_close * 100, 2) if prev_close > 0 else None return { "code": code, "name": name, "price": price, "change_pct": change_pct, "high": float(fields[33]) if len(fields) > 33 and fields[33] else None, "low": float(fields[34]) if len(fields) > 34 and fields[34] else None, "open": float(fields[5]) if fields[5] else None, "prev_close": prev_close, "volume": int(fields[6]) if fields[6] else None, "amount": float(fields[37]) if len(fields) > 37 and fields[37] else None, "source": "tencent", } except (ValueError, IndexError): return None # ── 数据验证 ── def _validate(q): """验证行情数据自洽性。返回 (is_valid, reason)""" if q is None: return False, "no_data" if q["price"] <= 0: return False, "price_zero" # price 必须在 [low, high] 范围内(如果low/high存在) if q.get("high") and q.get("low"): if q["price"] < q["low"] or q["price"] > q["high"]: return False, f"price_out_of_range: {q['price']} not in [{q['low']},{q['high']}]" # change_pct一致性(如果prev_close存在) if q.get("prev_close") and q["prev_close"] > 0 and q.get("change_pct") is not None: expected = round((q["price"] - q["prev_close"]) / q["prev_close"] * 100, 2) if abs(expected - q["change_pct"]) > 0.5: return False, f"change_pct_mismatch: reported={q['change_pct']} expected={expected}" return True, "ok" # ── 主查询逻辑 ── def get_quote(code): """ 获取单只股票行情,按优先级尝试多个数据源。 返回统一dict,失败返回None。 """ market = _detect_market(code) result = None if market == "ashare": # A股: 东财 → 新浪 → 腾讯 for fetcher in [_em_ashare, _sina_ashare, _tencent_ashare]: q = fetcher(code) valid, reason = _validate(q) if valid: q["market"] = "A股" if q.get("name") is None: q["name"] = _get_name_from_cache(code) return q if q is not None: result = q # keep last attempt elif market == "hk": for fetcher in [_em_hk, _sina_hk, _tencent_hk]: q = fetcher(code) valid, reason = _validate(q) if valid: q["market"] = "港股" q["code"] = code if q.get("name") is None: q["name"] = _get_name_from_cache(code) return q if q is not None: result = q return result def get_quotes_batch(codes, max_workers=5): """ 批量获取,并行执行。 返回 {code: quote_dict or None} """ if max_workers == 1: return {c: get_quote(c) for c in codes} results = {} with ThreadPoolExecutor(max_workers=max_workers) as ex: fut_map = {ex.submit(get_quote, c): c for c in codes} for fut in as_completed(fut_map): c = fut_map[fut] try: results[c] = fut.result() except Exception: results[c] = None return results # ── 名称缓存(从portfolio.json/decisions.json补充) ── def _get_name_from_cache(code): """从本地数据文件补充股票名称(东财API不返回名称时用)""" try: pf = DATA_DIR / "portfolio.json" if pf.exists(): d = json.loads(pf.read_text()) for h in d.get("holdings", []): if str(h.get("code", "")) == str(code): return h.get("name", "") except Exception: pass try: wl = DATA_DIR / "watchlist.json" if wl.exists(): d = json.loads(wl.read_text()) for item in d if isinstance(d, list) else d.get("stocks", []): if str(item.get("code", "")) == str(code): return item.get("name", "") except Exception: pass return "" # ── 港股备用数据源 ── def _sina_hk(code): """新浪港股API(批量友好,单股也支持)""" url = f"https://hq.sinajs.cn/list=hk{code}" raw = _http_get(url, headers={ "User-Agent": UA, "Referer": "https://finance.sina.com.cn", }) if not raw: return None try: fields = raw.split("\"")[1].split(",") if len(fields) < 9: return None name = fields[1] price = float(fields[2]) if fields[2] else 0 prev_close = float(fields[3]) if fields[3] else 0 if price <= 0: return None change_amt = float(fields[7]) if fields[7] else 0 # 更可靠的price计算 if prev_close > 0 and abs(change_amt) > 0: price = round(prev_close + change_amt, 2) change_pct = float(fields[8]) if fields[8] else 0 return { "code": code, "name": name, "price": round(prev_close + change_amt, 2) if prev_close > 0 and abs(change_amt) > 0 else price, "change_pct": change_pct, "high": None, # Sina HK format不含high/low字段 "low": None, "open": float(fields[4]) if fields[4] else None, "prev_close": prev_close, "volume": None, "amount": None, "source": "sina", } except (ValueError, IndexError): return None def _tencent_hk(code): """腾讯港股(15min延迟,兜底)""" url = f"https://qt.gtimg.cn/q=hk{code}" raw = _http_get(url, headers={"User-Agent": UA}) if not raw: return None try: fields = raw.split("~") if len(fields) < 10: return None name = fields[1] price = float(fields[3]) if fields[3] else 0 prev_close = float(fields[4]) if fields[4] else 0 if price <= 0: return None change_pct = round((price - prev_close) / prev_close * 100, 2) if prev_close > 0 else None return { "code": code, "name": name, "price": price, "change_pct": change_pct, "high": float(fields[33]) if len(fields) > 33 and fields[33] else None, "low": float(fields[34]) if len(fields) > 34 and fields[34] else None, "open": float(fields[5]) if fields[5] else None, "prev_close": prev_close, "volume": None, "amount": None, "source": "tencent", } except (ValueError, IndexError): return None # ── 从portfolio.json读取所有持仓代码 ── def get_holding_codes(): """从portfolio.json提取所有持仓代码""" try: pf = DATA_DIR / "portfolio.json" d = json.loads(pf.read_text()) return [h["code"] for h in d.get("holdings", []) if h.get("code")] except Exception: return [] # ── CLI入口 ── def main(): if len(sys.argv) < 2: print("用法: python3 stock_quote.py [code2 ...]", file=sys.stderr) print(" python3 stock_quote.py --all-holdings", file=sys.stderr) sys.exit(1) codes = [] if sys.argv[1] == "--all-holdings": codes = get_holding_codes() if not codes: print(json.dumps({"error": "无法读取持仓列表", "timestamp": datetime.now().isoformat()})) sys.exit(1) else: codes = [c.strip() for c in sys.argv[1:] if c.strip()] results = get_quotes_batch(codes) timestamp = datetime.now().isoformat() # 输出:每行一个JSON(方便批量处理) for code in codes: q = results.get(code) if q: q["fetched_at"] = timestamp print(json.dumps(q, ensure_ascii=False, default=str)) else: print(json.dumps({ "code": code, "error": "无法获取行情", "fetched_at": timestamp, }, ensure_ascii=False)) if __name__ == "__main__": main()