Files
MoFin/scripts/stock_quote.py
知微 7c0e85af28 硬性策略质量门禁 validate_strategy()
新增 STRATEGY_QUALITY_GATES 检查清单(9条红线):
CRITICAL: 止损/止盈存在+>0, 买入区下沿<上沿
HIGH: 止损≤买入区, 买入推荐含RR≥1.5, 港股标currency=HKD
MEDIUM: signal短词, tech_snapshot含技术位

enforce_strategy_quality() 插在写入链的两处:
1. reassess_with_context() return前 → 单只重评必过
2. regenerate_all() for d in decisions: 写DB前 → 批量重评必过

不过的:status=review_needed, signal降级→信号不充分
不会写进DB/JSON,除非修复了CRITICAL问题
2026-07-02 13:46:53 +08:00

437 lines
15 KiB
Python

#!/usr/bin/env python3
"""
stock_quote.py — 统一股票行情查询工具(唯一权威价格源)
用法:
python3 stock_quote.py 688411 # 单个A股
python3 stock_quote.py 688411 01211 300750 # 批量
python3 stock_quote.py --all-holdings # 所有持仓
输出:每只股票一行JSON,格式统一、无歧义。
LLM 禁止直接解析新浪/腾讯原始CSV,只能读本脚本输出。
数据来源(按优先级降序):
A股: 东财push2 → 新浪hq → 腾讯qt
港股: 东财并行限速(5 workers) → 新浪批量 → 腾讯15min延迟(兜底)
验证规则:
- price 必须在 [low, high] 范围内
- change_pct 必须与 (price - prev_close) / prev_close 一致(±0.1%容差)
- 任一验证失败 → 该数据源降级,尝试下个源
"""
import json, sys, re, time, urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Semaphore
from pathlib import Path
from datetime import datetime
UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36"
DATA_DIR = Path("/home/hmo/MoFin/data")
# ── 工具 ──
def _http_get(url, headers=None, timeout=10):
"""带代理绕过的HTTP GET"""
req = urllib.request.Request(url, headers=headers or {"User-Agent": UA})
proxy_handler = urllib.request.ProxyHandler({})
opener = urllib.request.build_opener(proxy_handler)
try:
with opener.open(req, timeout=timeout) as r:
return r.read().decode("utf-8", errors="replace")
except Exception:
return None
def _detect_market(code):
"""自动识别A股/港股
A股代码固定6位,港股代码5位(含前缀0如00700)。
"""
code = str(code).strip()
if len(code) == 6:
return "ashare"
if len(code) <= 5:
return "hk"
return "unknown"
def _sym(code, market):
"""转成API可用的symbol"""
if market == "ashare":
pre = "sh" if code.startswith(("5", "6", "9")) else "sz"
return pre + code, code
return code, code # HK codes used directly
# ── 数据源:东方财富(A股+港股通用) ──
def _em_ashare(code):
"""东财A股API"""
url = f"https://push2.eastmoney.com/api/qt/stock/get?secid=1.{code}&fields=f43,f44,f45,f46,f47,f50,f57,f58,f60,f170,f169&fltt=2"
raw = _http_get(url, headers={"User-Agent": UA, "Referer": "https://quote.eastmoney.com/"})
if not raw:
return None
try:
resp = json.loads(raw)
if resp.get("rc") != 0:
return None
d = resp.get("data")
if not d:
return None
price = d.get("f43")
if price is None or float(price) <= 0:
return None
return {
"code": code,
"name": None, # 东财不返名称
"price": float(price),
"change_pct": float(d.get("f170", 0)) if d.get("f170") is not None else None,
"high": float(d.get("f44", 0)) if d.get("f44") else None,
"low": float(d.get("f45", 0)) if d.get("f45") else None,
"open": float(d.get("f46", 0)) if d.get("f46") else None,
"prev_close": float(d.get("f60", 0)) if d.get("f60") else None,
"volume": d.get("f47"),
"amount": d.get("f50"),
"source": "eastmoney",
}
except (json.JSONDecodeError, ValueError, TypeError):
return None
def _em_hk(code):
"""东财港股API(单股)"""
url = f"https://push2.eastmoney.com/api/qt/stock/get?secid=116.{code}&fields=f43,f44,f45,f46,f47,f50,f57,f58,f60,f170,f169&fltt=2"
raw = _http_get(url, headers={"User-Agent": UA, "Referer": "https://quote.eastmoney.com/"})
if not raw:
return None
try:
resp = json.loads(raw)
if resp.get("rc") != 0:
return None
d = resp.get("data")
if not d:
return None
price = d.get("f43")
if price is None or float(price) <= 0:
return None
return {
"code": code,
"name": None,
"price": float(price),
"change_pct": float(d.get("f170", 0)) if d.get("f170") is not None else None,
"high": float(d.get("f44", 0)) if d.get("f44") else None,
"low": float(d.get("f45", 0)) if d.get("f45") else None,
"open": float(d.get("f46", 0)) if d.get("f46") else None,
"prev_close": float(d.get("f60", 0)) if d.get("f60") else None,
"volume": d.get("f47"),
"amount": d.get("f50"),
"source": "eastmoney",
}
except (json.JSONDecodeError, ValueError, TypeError):
return None
# ── 数据源:新浪(A股) ──
def _sina_ashare(code):
"""新浪A股API(批量)"""
pre = "sh" if code.startswith(("5", "6", "9")) else "sz"
url = f"https://hq.sinajs.cn/list={pre}{code}"
raw = _http_get(url, headers={
"User-Agent": UA,
"Referer": "https://finance.sina.com.cn",
})
if not raw:
return None
try:
fields = raw.split("\"")[1].split(",")
# 新浪格式: name,open,prev_close,current,high,low,buy,sell,volume,amount
if len(fields) < 8:
return None
name = fields[0].strip()
price = float(fields[3]) if fields[3] else 0
prev_close = float(fields[2]) if fields[2] else 0
if price <= 0:
return None
change_pct = round((price - prev_close) / prev_close * 100, 2) if prev_close > 0 else None
return {
"code": code,
"name": name,
"price": price,
"change_pct": change_pct,
"high": float(fields[4]) if fields[4] else None,
"low": float(fields[5]) if fields[5] else None,
"open": float(fields[1]) if fields[1] else None,
"prev_close": prev_close,
"volume": int(fields[8]) if len(fields) > 8 and fields[8] else None,
"amount": float(fields[9]) if len(fields) > 9 and fields[9] else None,
"source": "sina",
}
except (ValueError, IndexError):
return None
# ── 数据源:腾讯(A股兜底) ──
def _tencent_ashare(code):
"""腾讯A股API(批量,15min延迟兜底)"""
pre = "sh" if code.startswith(("5", "6", "9")) else "sz"
url = f"https://qt.gtimg.cn/q={pre}{code}"
raw = _http_get(url, headers={"User-Agent": UA})
if not raw:
return None
try:
fields = raw.split("~")
# Tencent格式: ~分隔
if len(fields) < 10:
return None
name = fields[1]
price = float(fields[3]) if fields[3] else 0
prev_close = float(fields[4]) if fields[4] else 0
if price <= 0:
return None
change_pct = round((price - prev_close) / prev_close * 100, 2) if prev_close > 0 else None
return {
"code": code,
"name": name,
"price": price,
"change_pct": change_pct,
"high": float(fields[33]) if len(fields) > 33 and fields[33] else None,
"low": float(fields[34]) if len(fields) > 34 and fields[34] else None,
"open": float(fields[5]) if fields[5] else None,
"prev_close": prev_close,
"volume": int(fields[6]) if fields[6] else None,
"amount": float(fields[37]) if len(fields) > 37 and fields[37] else None,
"source": "tencent",
}
except (ValueError, IndexError):
return None
# ── 数据验证 ──
def _validate(q):
"""验证行情数据自洽性。返回 (is_valid, reason)"""
if q is None:
return False, "no_data"
if q["price"] <= 0:
return False, "price_zero"
# price 必须在 [low, high] 范围内(如果low/high存在)
if q.get("high") and q.get("low"):
if q["price"] < q["low"] or q["price"] > q["high"]:
return False, f"price_out_of_range: {q['price']} not in [{q['low']},{q['high']}]"
# change_pct一致性(如果prev_close存在)
if q.get("prev_close") and q["prev_close"] > 0 and q.get("change_pct") is not None:
expected = round((q["price"] - q["prev_close"]) / q["prev_close"] * 100, 2)
if abs(expected - q["change_pct"]) > 0.5:
return False, f"change_pct_mismatch: reported={q['change_pct']} expected={expected}"
return True, "ok"
# ── 主查询逻辑 ──
def get_quote(code):
"""
获取单只股票行情,按优先级尝试多个数据源。
返回统一dict,失败返回None。
"""
market = _detect_market(code)
result = None
if market == "ashare":
# A股: 东财 → 新浪 → 腾讯
for fetcher in [_em_ashare, _sina_ashare, _tencent_ashare]:
q = fetcher(code)
valid, reason = _validate(q)
if valid:
q["market"] = "A股"
if q.get("name") is None:
q["name"] = _get_name_from_cache(code)
return q
if q is not None:
result = q # keep last attempt
elif market == "hk":
for fetcher in [_em_hk, _sina_hk, _tencent_hk]:
q = fetcher(code)
valid, reason = _validate(q)
if valid:
q["market"] = "港股"
q["code"] = code
if q.get("name") is None:
q["name"] = _get_name_from_cache(code)
return q
if q is not None:
result = q
return result
def get_quotes_batch(codes, max_workers=5):
"""
批量获取,并行执行。
返回 {code: quote_dict or None}
"""
if max_workers == 1:
return {c: get_quote(c) for c in codes}
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as ex:
fut_map = {ex.submit(get_quote, c): c for c in codes}
for fut in as_completed(fut_map):
c = fut_map[fut]
try:
results[c] = fut.result()
except Exception:
results[c] = None
return results
# ── 名称缓存(从portfolio.json/decisions.json补充) ──
def _get_name_from_cache(code):
"""从本地数据文件补充股票名称(东财API不返回名称时用)"""
try:
pf = DATA_DIR / "portfolio.json"
if pf.exists():
d = json.loads(pf.read_text())
for h in d.get("holdings", []):
if str(h.get("code", "")) == str(code):
return h.get("name", "")
except Exception:
pass
try:
wl = DATA_DIR / "watchlist.json"
if wl.exists():
d = json.loads(wl.read_text())
for item in d if isinstance(d, list) else d.get("stocks", []):
if str(item.get("code", "")) == str(code):
return item.get("name", "")
except Exception:
pass
return ""
# ── 港股备用数据源 ──
def _sina_hk(code):
"""新浪港股API(批量友好,单股也支持)"""
url = f"https://hq.sinajs.cn/list=hk{code}"
raw = _http_get(url, headers={
"User-Agent": UA,
"Referer": "https://finance.sina.com.cn",
})
if not raw:
return None
try:
fields = raw.split("\"")[1].split(",")
if len(fields) < 9:
return None
name = fields[1]
price = float(fields[2]) if fields[2] else 0
prev_close = float(fields[3]) if fields[3] else 0
if price <= 0:
return None
change_amt = float(fields[7]) if fields[7] else 0
# 更可靠的price计算
if prev_close > 0 and abs(change_amt) > 0:
price = round(prev_close + change_amt, 2)
change_pct = float(fields[8]) if fields[8] else 0
return {
"code": code,
"name": name,
"price": round(prev_close + change_amt, 2) if prev_close > 0 and abs(change_amt) > 0 else price,
"change_pct": change_pct,
"high": None, # Sina HK format不含high/low字段
"low": None,
"open": float(fields[4]) if fields[4] else None,
"prev_close": prev_close,
"volume": None,
"amount": None,
"source": "sina",
}
except (ValueError, IndexError):
return None
def _tencent_hk(code):
"""腾讯港股(15min延迟,兜底)"""
url = f"https://qt.gtimg.cn/q=hk{code}"
raw = _http_get(url, headers={"User-Agent": UA})
if not raw:
return None
try:
fields = raw.split("~")
if len(fields) < 10:
return None
name = fields[1]
price = float(fields[3]) if fields[3] else 0
prev_close = float(fields[4]) if fields[4] else 0
if price <= 0:
return None
change_pct = round((price - prev_close) / prev_close * 100, 2) if prev_close > 0 else None
return {
"code": code,
"name": name,
"price": price,
"change_pct": change_pct,
"high": float(fields[33]) if len(fields) > 33 and fields[33] else None,
"low": float(fields[34]) if len(fields) > 34 and fields[34] else None,
"open": float(fields[5]) if fields[5] else None,
"prev_close": prev_close,
"volume": None,
"amount": None,
"source": "tencent",
}
except (ValueError, IndexError):
return None
# ── 从portfolio.json读取所有持仓代码 ──
def get_holding_codes():
"""从portfolio.json提取所有持仓代码"""
try:
pf = DATA_DIR / "portfolio.json"
d = json.loads(pf.read_text())
return [h["code"] for h in d.get("holdings", []) if h.get("code")]
except Exception:
return []
# ── CLI入口 ──
def main():
if len(sys.argv) < 2:
print("用法: python3 stock_quote.py <code1> [code2 ...]", file=sys.stderr)
print(" python3 stock_quote.py --all-holdings", file=sys.stderr)
sys.exit(1)
codes = []
if sys.argv[1] == "--all-holdings":
codes = get_holding_codes()
if not codes:
print(json.dumps({"error": "无法读取持仓列表", "timestamp": datetime.now().isoformat()}))
sys.exit(1)
else:
codes = [c.strip() for c in sys.argv[1:] if c.strip()]
results = get_quotes_batch(codes)
timestamp = datetime.now().isoformat()
# 输出:每行一个JSON(方便批量处理)
for code in codes:
q = results.get(code)
if q:
q["fetched_at"] = timestamp
print(json.dumps(q, ensure_ascii=False, default=str))
else:
print(json.dumps({
"code": code,
"error": "无法获取行情",
"fetched_at": timestamp,
}, ensure_ascii=False))
if __name__ == "__main__":
main()