#!/usr/bin/env python3 """ mo_bridge.py — MoFin ↔ DSA 全功能集成桥 真正调用 DSA 的三大功能: 1. 新闻搜索 — SearchService.search_comprehensive_intel()(7 个搜索引擎) 2. 大盘复盘 — run_market_review()(A股/港股/美股) 3. 策略问股 — AgentExecutor.run()(DSA 的 15 种策略,作为 MoFin 的第二意见) 用法: from mo_bridge import enrich_analysis_context, get_stock_analysis # 在 MoFin 分析前注入 DSA 上下文(大盘 + 新闻) ctx = enrich_analysis_context("00700", "腾讯控股", region="hk") prompt += f"\n\n{ctx}" # 用 DSA 的策略做独立分析 opinion = get_stock_analysis("600519", "贵州茅台", skills=["ma_golden_cross"]) """ import sys import os import json import logging from pathlib import Path from datetime import datetime logger = logging.getLogger(__name__) # ── DSA 路径 ───────────────────────────────────────────────────────── _DSA_CANDIDATES = [ "/home/hmo/daily-stock-analysis", str(Path(__file__).resolve().parent.parent / "daily-stock-analysis" / "ZhuLinsen-daily_stock_analysis-a448886"), ] _DSA_BASE = None for _c in _DSA_CANDIDATES: _p = Path(_c) if _p.is_dir() and (_p / "data_provider" / "base.py").exists(): _DSA_BASE = _p break _HAS_DSA = _DSA_BASE is not None if _HAS_DSA: sys.path.insert(0, str(_DSA_BASE)) # ── 懒加载 DSA 模块 ────────────────────────────────────────────────── _dsa_search_service = None _dsa_config = None def _ensure_dsa_search(): global _dsa_search_service if _dsa_search_service is not None: return _dsa_search_service if not _HAS_DSA: return None try: from src.search_service import get_search_service _dsa_search_service = get_search_service() except Exception as e: logger.warning("DSA SearchService 加载失败: %s", e) return _dsa_search_service def _ensure_dsa_config(): global _dsa_config if _dsa_config is not None: return _dsa_config if not _HAS_DSA: return None try: from src.config import get_config _dsa_config = get_config() except Exception as e: logger.warning("DSA Config 加载失败: %s", e) return _dsa_config # ── 1. 新闻搜索 ───────────────────────────────────────────────────── def get_stock_news(stock_code: str, stock_name: str = "", max_results: int = 5) -> str: """通过 DSA 的 7 个搜索引擎获取股票相关新闻。 Args: stock_code: 股票代码 (如 '600519', '00700', 'AAPL') stock_name: 股票名称 (提高搜索精度) max_results: 最多返回条数 Returns: str: Markdown 格式新闻摘要,可直接注入分析 prompt。失败时返回 ''。 """ service = _ensure_dsa_search() if not service: return "" try: intel = service.search_comprehensive_intel( stock_code, stock_name or stock_code, max_searches=3 ) if not intel: return "" lines = [f"## 📰 {stock_name or stock_code} 最新情报"] news = intel.get("latest_news") if news and news.results: lines.append("\n### 最新新闻") for r in news.results[:max_results]: date_str = f" ({r.published_date})" if r.published_date else "" snippet = r.snippet[:150] if r.snippet else "" lines.append(f"- **{r.title}**{date_str}: {snippet}") risk = intel.get("risk_check") if risk and risk.results: lines.append("\n### ⚠️ 风险关注") for r in risk.results[:3]: lines.append(f"- {r.title}: {r.snippet[:100] if r.snippet else ''}") return "\n".join(lines) except Exception as e: logger.warning("DSA 新闻搜索失败: %s", e) return "" # ── 2. 大盘复盘 ───────────────────────────────────────────────────── def get_market_review(region: str = "cn") -> str: """获取 DSA 的大盘复盘报告。 优先读本地缓存(24h内),没有则调用 DSA 实时生成。 Args: region: 'cn'=A股, 'hk'=港股, 'us'=美股, 'both'=全市场 Returns: str: Markdown 格式大盘复盘摘要 """ if not _HAS_DSA: return "" # 先读缓存 cache_dir = Path(str(_DSA_BASE)) / "data" / "market_review" if cache_dir.exists(): try: files = sorted(cache_dir.glob("*.md"), key=os.path.getmtime, reverse=True) if files: if (datetime.now().timestamp() - os.path.getmtime(str(files[0]))) < 86400: content = files[0].read_text(encoding="utf-8") lines = [l for l in content.split("\n")[:30] if len(l.strip()) > 3] return "## 📈 今日大盘背景\n" + "\n".join(lines) except Exception: pass # 实时调用 DSA try: from src.core.market_review import run_market_review class StubNotifier: def is_available(self): return False def send(self, *a, **kw): return True def save_report_to_file(self, *a, **kw): return None config = _ensure_dsa_config() result = run_market_review( notifier=StubNotifier(), config=config, override_region=region, send_notification=False, save_report_file=False, persist_history=False, trigger_source="mofin", ) if result and isinstance(result, str): lines = [l for l in result.split("\n")[:25] if len(l.strip()) > 3] return "## 📈 今日大盘复盘\n" + "\n".join(lines) except Exception as e: logger.warning("DSA 大盘复盘失败: %s", e) return "" # ── 3. 策略问股(第二意见)─────────────────────────────────────────── def get_stock_analysis( stock_code: str, stock_name: str = "", skills: list = None, ) -> dict | None: """用 DSA 的 15 种内置策略独立分析一只股票。 Args: stock_code: 股票代码 stock_name: 股票名称 skills: 策略列表,默认 ['ma_golden_cross', 'bull_trend'] Returns: dict: {source, sentiment_score, operation_advice, trend_prediction, analysis_summary, risk_warning, strategies_used, raw} """ if not _HAS_DSA: return None if not skills: skills = ["ma_golden_cross", "bull_trend"] try: from src.agent.factory import build_agent_executor executor = build_agent_executor(skills=skills) result = executor.run( task=f"分析 {stock_code} {stock_name}", context={"stock_code": stock_code, "stock_name": stock_name, "report_language": "zh"}, ) if result.success and result.dashboard: d = result.dashboard return { "source": "DSA", "sentiment_score": d.get("sentiment_score", 0), "operation_advice": d.get("operation_advice", ""), "trend_prediction": d.get("trend_prediction", ""), "analysis_summary": d.get("analysis_summary", ""), "risk_warning": d.get("risk_warning", ""), "strategies_used": skills, "raw": result.content[:500] if result.content else "", } except Exception as e: logger.warning("DSA Agent 分析 %s 失败: %s", e) return None def get_strategy_opinion_text(opinion: dict) -> str: """将 get_stock_analysis() 的结果格式化为可读文本""" if not opinion: return "" return ( f"## 🤖 DSA 策略参考\n" f"- 评分: {opinion.get('sentiment_score', '?')}/100\n" f"- 建议: {opinion.get('operation_advice', '?')}\n" f"- 趋势: {opinion.get('trend_prediction', '?')}\n" f"- 策略: {', '.join(opinion.get('strategies_used', []))}\n" f"- 摘要: {opinion.get('analysis_summary', '')}\n" f"- 风险: {opinion.get('risk_warning', '')}" ) # ── 4. 综合上下文(一键调用)───────────────────────────────────────── def enrich_analysis_context( stock_code: str = "", stock_name: str = "", region: str = "cn", include_news: bool = True, include_market: bool = True, ) -> str: """一键获取 DSA 全部分析上下文,注入 MoFin 的 LLM prompt。 在 strategy_lifecycle.reassess_with_context() 或 Hermes cron job 的 prompt 前调用。 Returns: str: 可直接拼接到 LLM prompt 的 Markdown 文本 """ parts = [] if include_market: market = get_market_review(region) if market: parts.append(market) if include_news and stock_code: news = get_stock_news(stock_code, stock_name) if news: parts.append(news) return "\n\n".join(parts) if parts else "" # ── 自检 ───────────────────────────────────────────────────────────── if __name__ == "__main__": print(f"DSA: {'available' if _HAS_DSA else 'NOT FOUND'} ({_DSA_BASE})") if _HAS_DSA: print("\n--- 新闻测试 (600519) ---") n = get_stock_news("600519", "贵州茅台", max_results=2) print(n[:300] if n else "(无结果)") print("\n--- 大盘测试 ---") m = get_market_review("cn") print(m[:300] if m else "(无结果)")