Files
MoFin/mo_provider.py

376 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
mo_provider.py — MoFin 统一数据源适配器
封装 DSA (daily_stock_analysis) 的数据源层作为 MoFin 的备份/增强数据管道。
架构:
主数据源:TDX Relay(通達信实时行情,走招商证券 7727 服务器)
备份数据源:DSA DataFetcherManager16 个 fetcher,自动 fallback
用法:
from mo_provider import MoDataProvider
provider = MoDataProvider()
# 获取实时行情(TDX 优先,失败 → Tencent API → DSA fallback
realtime = provider.get_realtime("00700")
# 获取 K 线数据(优先本地缓存,失败 → DSA)
kline = provider.get_kline("600519", period="daily")
# 新闻搜索(DSA 的 search_service
news = provider.search_news("腾讯控股")
依赖:
DSA 代码需在 ../../daily-stock-analysis/ZhuLinsen-daily_stock_analysis-a448886/
安装 DSA 依赖: pip install -r ../../daily-stock-analysis/...requirements.txt
"""
import sys
import os
import json
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
# ── 路径配置 ─────────────────────────────────────────────────────────
# DSA 源码路径(按优先级尝试)
_DSA_CANDIDATES = [
"/home/hmo/daily-stock-analysis", # 服务器部署路径
os.path.normpath(os.path.join( # 本地开发路径
os.path.dirname(os.path.abspath(__file__)),
"..", "daily-stock-analysis", "ZhuLinsen-daily_stock_analysis-a448886"
)),
]
_DSA_BASE = None
for _c in _DSA_CANDIDATES:
if os.path.isdir(_c) and os.path.isfile(os.path.join(_c, "data_provider", "base.py")):
_DSA_BASE = _c
break
_HAS_DSA = _DSA_BASE is not None
# ── MoDataProvider ───────────────────────────────────────────────────
class MoDataProvider:
"""MoFin 统一数据获取门面。
- TDX Relay 作为实时行情的主源(港股接近实时)
- DSA 作为备份源(当 TDX 不可用时自动 fallback
- 所有数据获取统一走此类,禁止各文件自己实现
"""
def __init__(self, tdx_url: str = None):
self._dsa_manager = None # 懒加载 DSA DataFetcherManager
self._tdx_url = tdx_url or "http://localhost:8080" # TDX relay 地址
# ── DSA 懒加载 ───────────────────────────────────────────────
@property
def has_dsa(self) -> bool:
"""DSA 数据源是否可用"""
return _HAS_DSA
def _ensure_dsa(self):
"""懒加载 DSA DataFetcherManager"""
if self._dsa_manager is not None:
return self._dsa_manager
if not _HAS_DSA:
logger.warning("DSA 源码不在 %s,无法使用备份数据源", _DSA_BASE)
return None
try:
sys.path.insert(0, _DSA_BASE)
from data_provider.base import DataFetcherManager
self._dsa_manager = DataFetcherManager()
logger.info("DSA DataFetcherManager 已加载(16个数据源)")
except Exception as e:
logger.warning("加载 DSA DataFetcherManager 失败: %s", e)
self._dsa_manager = None
return self._dsa_manager
# ── 实时行情 ──────────────────────────────────────────────────
def get_realtime(self, code: str) -> dict | None:
"""获取实时行情。
优先级:TDX → Tencent API → DSA fallback
Returns:
dict with: price, change_pct, volume, name, currency
或 None(所有源均失败)
"""
# 1. 尝试 TDX Relay
try:
import urllib.request
url = f"{self._tdx_url}/realtime/{code}"
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=3) as r:
data = json.loads(r.read())
if data.get("price"):
logger.debug("TDX relay 返回 %s 行情: %.2f", code, data["price"])
return data
except Exception:
pass
# 2. 尝试 Tencent API
try:
return self._get_tencent_realtime(code)
except Exception:
pass
# 3. DSA fallback
dsa = self._ensure_dsa()
if dsa:
try:
# DSA 的 get_realtime_quote
result = dsa.get_realtime_quote(code)
if result:
logger.info("DSA fallback 返回 %s 行情", code)
return {"price": result.price, "name": result.name}
except Exception as e:
logger.debug("DSA fallback 失败: %s", e)
logger.warning("所有数据源均无法获取 %s 的行情", code)
return None
def _get_tencent_realtime(self, code: str) -> dict | None:
"""通过 Tencent API 获取实时行情"""
import urllib.request
from mo_models import normalize_code
raw = normalize_code(code)
# 判断市场
if raw[0] in ('0', '1') and len(raw) == 5:
market = "hk"
qt_code = f"hk{raw}"
elif raw.startswith("6"):
market = "sh"
qt_code = f"sh{raw}"
elif raw.startswith(("0", "3")):
market = "sz"
qt_code = f"sz{raw}"
else:
return None
url = f"https://qt.gtimg.cn/q={qt_code}"
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=5) as r:
text = r.read().decode("gbk")
# 解析 Tencent 行情格式
parts = text.split("~")
if len(parts) > 40:
return {
"code": code,
"name": parts[1],
"price": float(parts[3]) if parts[3] else 0,
"change_pct": float(parts[32]) if parts[32] else 0,
"volume": int(parts[6]) if parts[6] else 0,
"market": market,
}
return None
# ── K 线数据 ──────────────────────────────────────────────────
def get_kline(self, code: str, period: str = "daily", count: int = 60) -> list | None:
"""获取 K 线数据。
Args:
code: 股票代码
period: daily/weekly/monthly
count: 获取条数
Returns:
list of dict 或 None
"""
dsa = self._ensure_dsa()
if not dsa:
return None
try:
df = dsa.get_daily_data(code, period=period, limit=count)
if df is not None and not df.empty:
return df.to_dict("records")
except Exception as e:
logger.warning("DSA get_kline 失败: %s", e)
return None
# ── 分钟级 K 线 ──────────────────────────────────────────────
_last_minute_call = 0 # 限流时间戳
def get_minute_kline(self, code: str, count: int = 60) -> list | None:
"""获取1分钟K线数据(东方财富 push2)。
限流保护:每次调用间隔至少1秒,批量查询间隔2秒。
Args:
code: 股票代码(6位,如'600519'
count: 获取条数(最大240,约4小时)
Returns:
[{"time":"09:31","open":xx,"close":xx,"high":xx,"low":xx,"volume":xx,"amount":xx}, ...]
或 None
"""
import time, urllib.request
now = time.time()
elapsed = now - self._last_minute_call
if elapsed < 1.0:
time.sleep(1.0 - elapsed)
# A股secid: 1.上海 0.深圳
secid = f"1.{code}" if code.startswith(('6','5')) else f"0.{code}"
url = (f"https://push2.eastmoney.com/api/qt/stock/kline/get"
f"?secid={secid}&fields1=f1,f2,f3&fields2=f51,f52,f53,f54,f55,f56,f57"
f"&klt=1&fqt=1&end=20500101&lmt={min(count, 240)}")
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = urllib.request.urlopen(req, timeout=8)
data = json.loads(resp.read())["data"]["klines"]
result = []
for line in data:
parts = line.split(",")
if len(parts) >= 6:
result.append({
"time": parts[0][-5:], # "2026-07-01 09:31" → "09:31"
"open": float(parts[1]),
"close": float(parts[2]),
"high": float(parts[3]),
"low": float(parts[4]),
"volume": int(parts[5]),
"amount": float(parts[6]) if len(parts) > 6 else 0,
})
self._last_minute_call = time.time()
return result
except Exception as e:
logger.warning("get_minute_kline(%s) 失败: %s", code, e)
return None
# ── 新闻搜索 ──────────────────────────────────────────────────
def search_news(self, query: str, max_results: int = 5) -> list:
"""通过 DSA 的搜索服务获取新闻。
Args:
query: 搜索关键词
max_results: 最多返回条数
Returns:
list of dict with: title, url, snippet, date
"""
dsa = self._ensure_dsa()
if not dsa:
return []
try:
from src.search_service import SearchService
service = SearchService()
results = service.search(query, limit=max_results)
return results[:max_results] if results else []
except Exception as e:
logger.debug("DSA news search 失败: %s", e)
return []
# ── 大盘分析 ──────────────────────────────────────────────────
def get_market_context(self, region: str = "cn") -> str | None:
"""获取 DSA 的市场复盘摘要。
Args:
region: cn/hk/us/both
Returns:
市场上下文文本 或 None
"""
dsa = self._ensure_dsa()
if not dsa:
return None
try:
from src.core.market_review import run_market_review
from src.config import get_config
config = get_config()
result = run_market_review(config=config, send_notification=False)
if result and hasattr(result, 'report'):
return result.report
except Exception as e:
logger.debug("DSA market review 失败: %s", e)
return None
# ── 基本面 ────────────────────────────────────────────────────
def get_fundamentals(self, code: str) -> dict | None:
"""获取股票基本面数据(PE/PB/ROE 等)"""
dsa = self._ensure_dsa()
if not dsa:
return None
try:
from data_provider.fundamental_adapter import AkshareFundamentalAdapter
adapter = AkshareFundamentalAdapter()
return adapter.get_fundamentals(code)
except Exception:
pass
return None
# ── 单例 ────────────────────────────────────────────────────────────
_provider_instance: MoDataProvider | None = None
def get_provider() -> MoDataProvider:
"""获取 MoDataProvider 单例"""
global _provider_instance
if _provider_instance is None:
_provider_instance = MoDataProvider()
return _provider_instance
# ── 便捷函数 ────────────────────────────────────────────────────────
def get_realtime(code: str) -> dict | None:
"""便捷函数:获取实时行情"""
return get_provider().get_realtime(code)
def get_market_context() -> str | None:
"""便捷函数:获取大盘上下文"""
return get_provider().get_market_context()
def search_news(query: str) -> list:
"""便捷函数:搜索新闻"""
return get_provider().search_news(query)
# ── 自检 ────────────────────────────────────────────────────────────
if __name__ == "__main__":
provider = MoDataProvider()
print(f"DSA 可用: {provider.has_dsa}")
print(f"DSA 路径: {_DSA_BASE}")
if provider.has_dsa:
manager = provider._ensure_dsa()
print(f"DataFetcherManager: {'已加载' if manager else '加载失败'}")
# 测试 Tencent API
try:
result = provider._get_tencent_realtime("00700")
print(f"\nTencent API 测试 (00700): {result}")
except Exception as e:
print(f"Tencent API 测试失败: {e}")