Files
skills/multi-source-stock-query/scripts/multi_source_stock_query.py
T
hmo 04db423416 Initial commit: skills library
- 70 skills with code and documentation
- Add .gitignore (ignore __pycache__, output/, temp/, venv/)
- Clean up test intermediates and caches
2026-04-26 19:27:40 +08:00

273 lines
10 KiB
Python

#!/usr/bin/env python3
"""
多源股票数据查询工具
支持Yahoo Finance、Google Finance、东方财富、雪球等多个数据源
通过交叉验证确保数据准确性
"""
import sys
import json
import time
import logging
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
import requests
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
class StockDataQuery:
"""多源股票数据查询类"""
def __init__(self):
self.data_sources = {
"yahoo_finance": self._query_yahoo_finance,
"google_finance": self._query_google_finance,
"eastmoney": self._query_eastmoney,
"xueqiu": self._query_xueqiu,
}
self.logger = self._setup_logger()
def _setup_logger(self):
"""设置日志"""
logger = logging.getLogger("StockDataQuery")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def standardize_stock_code(self, stock_code: str) -> str:
"""标准化股票代码"""
stock_code = stock_code.strip().upper()
# 如果已经有后缀,直接返回
if "." in stock_code:
return stock_code
# 根据代码特征判断市场
if len(stock_code) == 5 and stock_code.isdigit():
return f"{stock_code}.HK" # 港股
elif len(stock_code) == 6 and stock_code.isdigit():
if stock_code.startswith(("00", "30")):
return f"{stock_code}.SZ" # 深圳A股
else:
return f"{stock_code}.SS" # 上海A股
elif stock_code.replace(".", "").replace("-", "").isalpha():
return stock_code # 美股或其他
else:
return f"{stock_code}.HK" # 默认港股
def _query_yahoo_finance(self, standardized_code: str) -> Optional[Dict]:
"""查询Yahoo Finance数据"""
try:
url = (
f"https://query1.finance.yahoo.com/v8/finance/chart/{standardized_code}"
)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
if data.get("chart", {}).get("result"):
result = data["chart"]["result"][0]
meta = result["meta"]
return {
"source": "yahoo_finance",
"price": float(meta.get("regularMarketPrice", 0)),
"previous_close": float(meta.get("previousClose", 0)),
"open": float(meta.get("regularMarketOpen", 0)),
"high": float(meta.get("regularMarketDayHigh", 0)),
"low": float(meta.get("regularMarketDayLow", 0)),
"volume": int(meta.get("regularMarketVolume", 0)),
"market_cap": meta.get("marketCap"),
"pe_ratio": meta.get("trailingPE"),
"currency": meta.get("currency", "USD"),
"timestamp": datetime.now().isoformat(),
"success": True,
}
self.logger.warning(
f"Yahoo Finance returned status {response.status_code} for {standardized_code}"
)
return None
except Exception as e:
self.logger.error(
f"Yahoo Finance query failed for {standardized_code}: {e}"
)
return None
def _query_google_finance(self, standardized_code: str) -> Optional[Dict]:
"""查询Google Finance数据(简化版)"""
try:
# Google Finance API相对复杂,这里使用备用方案
# 实际实现中可以使用Google Finance的公开API或网页抓取
self.logger.info(
f"Google Finance query not implemented for {standardized_code}"
)
return None
except Exception as e:
self.logger.error(
f"Google Finance query failed for {standardized_code}: {e}"
)
return None
def _query_eastmoney(self, standardized_code: str) -> Optional[Dict]:
"""查询东方财富数据(简化版)"""
try:
# 东方财富需要处理中文编码和特定API
self.logger.info(f"EastMoney query not implemented for {standardized_code}")
return None
except Exception as e:
self.logger.error(f"EastMoney query failed for {standardized_code}: {e}")
return None
def _query_xueqiu(self, standardized_code: str) -> Optional[Dict]:
"""查询雪球数据(简化版)"""
try:
# 雪球需要处理特定的API格式
self.logger.info(f"Xueqiu query not implemented for {standardized_code}")
return None
except Exception as e:
self.logger.error(f"Xueqiu query failed for {standardized_code}: {e}")
return None
def _validate_data_consistency(self, results: List[Dict]) -> Dict:
"""验证数据一致性并生成最终结果"""
if not results:
return {"error": "No valid data sources available", "confidence_score": 0}
if len(results) == 1:
# 只有一个数据源,置信度较低
result = results[0].copy()
result["confidence_score"] = 60
result["data_sources"] = [results[0]["source"]]
result["validation_status"] = "single_source"
return result
# 多个数据源,进行一致性检查
prices = [r["price"] for r in results if r.get("price", 0) > 0]
if not prices:
return {"error": "No valid price data available", "confidence_score": 0}
# 计算价格一致性
avg_price = sum(prices) / len(prices)
max_deviation = max(abs(p - avg_price) / avg_price for p in prices)
if max_deviation <= 0.03: # 3%以内认为一致
confidence_score = 95
validation_status = "passed"
elif max_deviation <= 0.05: # 5%以内可接受
confidence_score = 85
validation_status = "acceptable"
else:
confidence_score = 70
validation_status = "inconsistent"
# 使用Yahoo Finance的数据作为基础(如果有)
yahoo_result = next(
(r for r in results if r["source"] == "yahoo_finance"), results[0]
)
final_result = yahoo_result.copy()
# 覆盖价格为平均价格
final_result["price"] = round(avg_price, 2)
final_result["confidence_score"] = confidence_score
final_result["data_sources"] = [r["source"] for r in results]
final_result["validation_status"] = validation_status
final_result["price_consistency"] = {
"individual_prices": {r["source"]: r["price"] for r in results},
"average_price": avg_price,
"max_deviation_percent": round(max_deviation * 100, 2),
}
return final_result
def get_stock_data(self, stock_code: str, include_validation: bool = False) -> Dict:
"""获取单只股票数据"""
standardized_code = self.standardize_stock_code(stock_code)
self.logger.info(f"Querying stock data for {stock_code} -> {standardized_code}")
# 并行查询多个数据源
results = []
with ThreadPoolExecutor(max_workers=len(self.data_sources)) as executor:
future_to_source = {
executor.submit(query_func, standardized_code): source_name
for source_name, query_func in self.data_sources.items()
}
for future in as_completed(future_to_source):
try:
result = future.result(timeout=15)
if result and result.get("success"):
results.append(result)
self.logger.info(
f"Successfully got data from {future_to_source[future]}"
)
except Exception as e:
source_name = future_to_source[future]
self.logger.error(f"Query failed for {source_name}: {e}")
# 验证和聚合结果
final_result = self._validate_data_consistency(results)
final_result["code"] = stock_code
final_result["standardized_code"] = standardized_code
if not include_validation:
# 移除详细的验证信息以简化输出
final_result.pop("price_consistency", None)
return final_result
def get_batch_stock_data(
self, stock_codes: List[str], include_validation: bool = False
) -> List[Dict]:
"""批量获取股票数据"""
results = []
for code in stock_codes:
result = self.get_stock_data(code, include_validation)
results.append(result)
# 避免请求过于频繁
time.sleep(0.5)
return results
def main():
"""主函数"""
if len(sys.argv) < 2:
print("用法:")
print(" python multi_source_stock_query.py <stock_code>")
print(
" python multi_source_stock_query.py --batch <stock_code1>,<stock_code2>,..."
)
print("")
print("示例:")
print(" python multi_source_stock_query.py 00700.HK")
print(
" python multi_source_stock_query.py --batch 00700.HK,09868.HK,001309.SZ"
)
sys.exit(1)
if sys.argv[1] == "--batch" and len(sys.argv) > 2:
stock_codes = sys.argv[2].split(",")
query = StockDataQuery()
results = query.get_batch_stock_data(stock_codes, include_validation=True)
print(json.dumps(results, indent=2, ensure_ascii=False))
else:
stock_code = sys.argv[1]
query = StockDataQuery()
result = query.get_stock_data(stock_code, include_validation=True)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()