Files
MoFin/mo_bridge.py
T

324 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
mo_bridge.py — MoFin ↔ DSA 全功能集成桥
真正调用 DSA 的三大功能:
1. 新闻搜索 — SearchService.search_comprehensive_intel()7 个搜索引擎)
2. 大盘复盘 — run_market_review()A股/港股/美股)
3. 策略问股 — AgentExecutor.run()DSA 的 15 种策略,作为 MoFin 的第二意见)
用法:
from mo_bridge import enrich_analysis_context, get_stock_analysis
# 在 MoFin 分析前注入 DSA 上下文(大盘 + 新闻)
ctx = enrich_analysis_context("00700", "腾讯控股", region="hk")
prompt += f"\n\n{ctx}"
# 用 DSA 的策略做独立分析
opinion = get_stock_analysis("600519", "贵州茅台", skills=["ma_golden_cross"])
"""
import sys
import os
import json
import logging
from pathlib import Path
from datetime import datetime
logger = logging.getLogger(__name__)
# ── DSA 路径 ─────────────────────────────────────────────────────────
_DSA_CANDIDATES = [
"/home/hmo/daily-stock-analysis",
str(Path(__file__).resolve().parent.parent / "daily-stock-analysis" / "ZhuLinsen-daily_stock_analysis-a448886"),
]
_DSA_BASE = None
for _c in _DSA_CANDIDATES:
_p = Path(_c)
if _p.is_dir() and (_p / "data_provider" / "base.py").exists():
_DSA_BASE = _p
break
_HAS_DSA = _DSA_BASE is not None
if _HAS_DSA:
sys.path.insert(0, str(_DSA_BASE))
# ── 懒加载 DSA 模块 ──────────────────────────────────────────────────
_dsa_search_service = None
_dsa_config = None
def _ensure_dsa_search():
global _dsa_search_service
if _dsa_search_service is not None:
return _dsa_search_service
if not _HAS_DSA:
return None
try:
from src.search_service import get_search_service
_dsa_search_service = get_search_service()
except Exception as e:
logger.warning("DSA SearchService 加载失败: %s", e)
return _dsa_search_service
def _ensure_dsa_config():
global _dsa_config
if _dsa_config is not None:
return _dsa_config
if not _HAS_DSA:
return None
try:
from src.config import get_config
_dsa_config = get_config()
except Exception as e:
logger.warning("DSA Config 加载失败: %s", e)
return _dsa_config
# ── 1. 新闻搜索 ─────────────────────────────────────────────────────
def get_stock_news(stock_code: str, stock_name: str = "", max_results: int = 5) -> str:
"""获取股票相关新闻。优先 DSA 搜索引擎,失败则 fallback 到东方财富 akshare。
Args:
stock_code: 股票代码 (如 '600519', '00700', 'AAPL')
stock_name: 股票名称 (提高搜索精度)
max_results: 最多返回条数
Returns:
str: Markdown 格式新闻摘要,可直接注入分析 prompt。失败时返回 ''
"""
lines = [f"## 📰 {stock_name or stock_code} 最新情报"]
got_any = False
# 主通道: DSA SearchService7 个搜索引擎,需要 API Key)
service = _ensure_dsa_search()
if service:
try:
intel = service.search_comprehensive_intel(
stock_code, stock_name or stock_code, max_searches=2
)
if intel:
news = intel.get("latest_news")
if news and news.results:
lines.append("\n### 最新新闻")
for r in news.results[:max_results]:
date_str = f" ({r.published_date})" if r.published_date else ""
snippet = r.snippet[:150] if r.snippet else ""
lines.append(f"- **{r.title}**{date_str}: {snippet}")
got_any = True
risk = intel.get("risk_check")
if risk and risk.results:
lines.append("\n### ⚠️ 风险关注")
for r in risk.results[:3]:
lines.append(f"- {r.title}")
except Exception as e:
logger.debug("DSA 搜索失败: %s", e)
# Fallback: 东方财富 akshare(免费,国内直连,无需 API Key)
if not got_any:
try:
import sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from mofin_news import search_akshare_news
articles = search_akshare_news(stock_code, max_results)
if articles:
lines.append("\n### 最新新闻 (东方财富)")
for a in articles:
snippet = a.get('content', '')[:120] if a.get('content') else ''
lines.append(f"- **{a.get('title', '')}**: {snippet}")
got_any = True
except Exception as e:
logger.debug("akshare 新闻 fallback 失败: %s", e)
return "\n".join(lines) if got_any else ""
# ── 2. 大盘复盘 ─────────────────────────────────────────────────────
def get_market_review(region: str = "cn", force_refresh: bool = False) -> str:
"""获取 DSA 的大盘复盘报告。
cron 场景:默认只读缓存(快,不阻塞)。每天第一次调用时 DSA 可能已经生成了缓存。
手动场景:force_refresh=True 实时调用 DSA 生成(慢,5-10秒)。
Args:
region: 'cn'=A股, 'hk'=港股, 'us'=美股
force_refresh: 是否强制实时生成(默认 False,只读缓存)
Returns:
str: Markdown 格式大盘复盘摘要
"""
if not _HAS_DSA:
return ""
# 先读缓存(优先,快)
cache_dirs = [
Path(str(_DSA_BASE)) / "reports", # DSA 生成的市场报告目录
Path(str(_DSA_BASE)) / "data" / "market_review",
]
for cache_dir in cache_dirs:
if not cache_dir.exists():
continue
try:
pattern = f"market_review_*.md" if "report" in str(cache_dir) else "*.md"
files = []
for p in cache_dir.glob("market_review_*.md"):
files.append(p)
if not files:
for p in cache_dir.glob("*.md"):
if "market" in p.name.lower() or "review" in p.name.lower():
files.append(p)
files.sort(key=os.path.getmtime, reverse=True)
if files:
if (datetime.now().timestamp() - os.path.getmtime(str(files[0]))) < 86400:
content = files[0].read_text(encoding="utf-8")
lines = [l for l in content.split("\n")[:30] if len(l.strip()) > 3]
return "## 📈 今日大盘背景\n" + "\n".join(lines)
except Exception:
pass
# cron 场景不走实时(太慢),直接返回空
if not force_refresh:
return ""
# 手动场景:实时调用 DSA(慢,需要 LLM)
try:
from src.core.market_review import run_market_review
class StubNotifier:
def is_available(self): return False
def send(self, *a, **kw): return True
def save_report_to_file(self, *a, **kw): return None
config = _ensure_dsa_config()
result = run_market_review(
notifier=StubNotifier(), config=config,
override_region=region, send_notification=False,
save_report_file=True, persist_history=True,
trigger_source="mofin",
)
if result and isinstance(result, str):
lines = [l for l in result.split("\n")[:25] if len(l.strip()) > 3]
return "## 📈 今日大盘复盘\n" + "\n".join(lines)
except Exception as e:
logger.warning("DSA 大盘复盘生成失败: %s", e)
return ""
# ── 3. 策略问股(第二意见)───────────────────────────────────────────
def get_stock_analysis(
stock_code: str,
stock_name: str = "",
skills: list = None,
) -> dict | None:
"""用 DSA 的 15 种内置策略独立分析一只股票。
Args:
stock_code: 股票代码
stock_name: 股票名称
skills: 策略列表,默认 ['ma_golden_cross', 'bull_trend']
Returns:
dict: {source, sentiment_score, operation_advice, trend_prediction,
analysis_summary, risk_warning, strategies_used, raw}
"""
if not _HAS_DSA:
return None
if not skills:
skills = ["ma_golden_cross", "bull_trend"]
try:
from src.agent.factory import build_agent_executor
executor = build_agent_executor(skills=skills)
result = executor.run(
task=f"分析 {stock_code} {stock_name}",
context={"stock_code": stock_code, "stock_name": stock_name, "report_language": "zh"},
)
if result.success and result.dashboard:
d = result.dashboard
return {
"source": "DSA",
"sentiment_score": d.get("sentiment_score", 0),
"operation_advice": d.get("operation_advice", ""),
"trend_prediction": d.get("trend_prediction", ""),
"analysis_summary": d.get("analysis_summary", ""),
"risk_warning": d.get("risk_warning", ""),
"strategies_used": skills,
"raw": result.content[:500] if result.content else "",
}
except Exception as e:
logger.warning("DSA Agent 分析 %s 失败: %s", e)
return None
def get_strategy_opinion_text(opinion: dict) -> str:
"""将 get_stock_analysis() 的结果格式化为可读文本"""
if not opinion:
return ""
return (
f"## 🤖 DSA 策略参考\n"
f"- 评分: {opinion.get('sentiment_score', '?')}/100\n"
f"- 建议: {opinion.get('operation_advice', '?')}\n"
f"- 趋势: {opinion.get('trend_prediction', '?')}\n"
f"- 策略: {', '.join(opinion.get('strategies_used', []))}\n"
f"- 摘要: {opinion.get('analysis_summary', '')}\n"
f"- 风险: {opinion.get('risk_warning', '')}"
)
# ── 4. 综合上下文(一键调用)─────────────────────────────────────────
def enrich_analysis_context(
stock_code: str = "",
stock_name: str = "",
region: str = "cn",
include_news: bool = True,
include_market: bool = True,
) -> str:
"""一键获取 DSA 全部分析上下文,注入 MoFin 的 LLM prompt。
在 strategy_lifecycle.reassess_with_context() 或 Hermes cron job 的 prompt 前调用。
Returns:
str: 可直接拼接到 LLM prompt 的 Markdown 文本
"""
parts = []
if include_market:
market = get_market_review(region)
if market:
parts.append(market)
if include_news and stock_code:
news = get_stock_news(stock_code, stock_name)
if news:
parts.append(news)
return "\n\n".join(parts) if parts else ""
# ── 自检 ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
print(f"DSA: {'available' if _HAS_DSA else 'NOT FOUND'} ({_DSA_BASE})")
if _HAS_DSA:
print("\n--- 新闻测试 (600519) ---")
n = get_stock_news("600519", "贵州茅台", max_results=2)
print(n[:300] if n else "(无结果)")
print("\n--- 大盘测试 ---")
m = get_market_review("cn")
print(m[:300] if m else "(无结果)")