diff --git a/__pycache__/price_monitor.cpython-312.pyc b/__pycache__/price_monitor.cpython-312.pyc index 1052c04..0d03c30 100644 Binary files a/__pycache__/price_monitor.cpython-312.pyc and b/__pycache__/price_monitor.cpython-312.pyc differ diff --git a/__pycache__/strategy_lifecycle.cpython-312.pyc b/__pycache__/strategy_lifecycle.cpython-312.pyc index 1b2ab9a..80da202 100644 Binary files a/__pycache__/strategy_lifecycle.cpython-312.pyc and b/__pycache__/strategy_lifecycle.cpython-312.pyc differ diff --git a/mo_bridge.py b/mo_bridge.py index 1c62cbb..7b459f7 100644 --- a/mo_bridge.py +++ b/mo_bridge.py @@ -1,21 +1,21 @@ #!/usr/bin/env python3 """ -mo_bridge.py — MoFin ↔ DSA 集成桥接 +mo_bridge.py — MoFin ↔ DSA 全功能集成桥 -在 MoFin 的定时分析流程(cron_to_xmpp.py)中, -在 LLM 分析 prompt 之前注入 DSA 的宏观情报。 +真正调用 DSA 的三大功能: +1. 新闻搜索 — SearchService.search_comprehensive_intel()(7 个搜索引擎) +2. 大盘复盘 — run_market_review()(A股/港股/美股) +3. 策略问股 — AgentExecutor.run()(DSA 的 15 种策略,作为 MoFin 的第二意见) 用法: - from mo_bridge import enrich_analysis_context + from mo_bridge import enrich_analysis_context, get_stock_analysis - # 在 LLM 分析前调用 - context = enrich_analysis_context() - if context: - prompt += f"\n\n## 今日大盘背景\n{context}" - -依赖: - 需要 DSA 源码 + 依赖(pip install litellm akshare yfinance 等) - 未安装时优雅降级,不影响 MoFin 正常运行。 + # 在 MoFin 分析前注入 DSA 上下文(大盘 + 新闻) + ctx = enrich_analysis_context("00700", "腾讯控股", region="hk") + prompt += f"\n\n{ctx}" + + # 用 DSA 的策略做独立分析 + opinion = get_stock_analysis("600519", "贵州茅台", skills=["ma_golden_cross"]) """ import sys @@ -23,10 +23,12 @@ import os import json import logging from pathlib import Path +from datetime import datetime logger = logging.getLogger(__name__) -# DSA 源码路径(按优先级尝试) +# ── DSA 路径 ───────────────────────────────────────────────────────── + _DSA_CANDIDATES = [ "/home/hmo/daily-stock-analysis", str(Path(__file__).resolve().parent.parent / "daily-stock-analysis" / "ZhuLinsen-daily_stock_analysis-a448886"), @@ -41,117 +43,248 @@ for _c in _DSA_CANDIDATES: _HAS_DSA = _DSA_BASE is not None +if _HAS_DSA: + sys.path.insert(0, str(_DSA_BASE)) -def enrich_analysis_context(region: str = "cn") -> str: - """从 DSA 获取市场背景和新闻舆情,注入 MoFin 分析上下文。 +# ── 懒加载 DSA 模块 ────────────────────────────────────────────────── + +_dsa_search_service = None +_dsa_config = None + + +def _ensure_dsa_search(): + global _dsa_search_service + if _dsa_search_service is not None: + return _dsa_search_service + if not _HAS_DSA: + return None + try: + from src.search_service import get_search_service + _dsa_search_service = get_search_service() + except Exception as e: + logger.warning("DSA SearchService 加载失败: %s", e) + return _dsa_search_service + + +def _ensure_dsa_config(): + global _dsa_config + if _dsa_config is not None: + return _dsa_config + if not _HAS_DSA: + return None + try: + from src.config import get_config + _dsa_config = get_config() + except Exception as e: + logger.warning("DSA Config 加载失败: %s", e) + return _dsa_config + + +# ── 1. 新闻搜索 ───────────────────────────────────────────────────── + +def get_stock_news(stock_code: str, stock_name: str = "", max_results: int = 5) -> str: + """通过 DSA 的 7 个搜索引擎获取股票相关新闻。 Args: - region: cn/hk/us/both — 分析哪个市场 + stock_code: 股票代码 (如 '600519', '00700', 'AAPL') + stock_name: 股票名称 (提高搜索精度) + max_results: 最多返回条数 Returns: - str: Markdown 格式的市场背景文本(可直接注入分析 prompt) - 如果 DSA 不可用,返回空字符串 + str: Markdown 格式新闻摘要,可直接注入分析 prompt。失败时返回 ''。 """ - parts = [] + service = _ensure_dsa_search() + if not service: + return "" - # 1. 大盘复盘 - market_text = get_market_review(region) - if market_text: - parts.append(f"## 今日大盘背景\n{market_text}") + try: + intel = service.search_comprehensive_intel( + stock_code, stock_name or stock_code, max_searches=3 + ) + if not intel: + return "" + + lines = [f"## 📰 {stock_name or stock_code} 最新情报"] + + news = intel.get("latest_news") + if news and news.results: + lines.append("\n### 最新新闻") + for r in news.results[:max_results]: + date_str = f" ({r.published_date})" if r.published_date else "" + snippet = r.snippet[:150] if r.snippet else "" + lines.append(f"- **{r.title}**{date_str}: {snippet}") + + risk = intel.get("risk_check") + if risk and risk.results: + lines.append("\n### ⚠️ 风险关注") + for r in risk.results[:3]: + lines.append(f"- {r.title}: {r.snippet[:100] if r.snippet else ''}") + + return "\n".join(lines) - # 2. 搜索舆情(如果有 DSA search_service) - news_text = get_news_context() - if news_text: - parts.append(f"## 今日重要新闻\n{news_text}") - - return "\n\n".join(parts) + except Exception as e: + logger.warning("DSA 新闻搜索失败: %s", e) + return "" -def get_market_review(region: str = "cn") -> str | None: - """获取 DSA 市场复盘摘要""" +# ── 2. 大盘复盘 ───────────────────────────────────────────────────── + +def get_market_review(region: str = "cn") -> str: + """获取 DSA 的大盘复盘报告。 + 优先读本地缓存(24h内),没有则调用 DSA 实时生成。 + + Args: + region: 'cn'=A股, 'hk'=港股, 'us'=美股, 'both'=全市场 + + Returns: + str: Markdown 格式大盘复盘摘要 + """ + if not _HAS_DSA: + return "" + + # 先读缓存 + cache_dir = Path(str(_DSA_BASE)) / "data" / "market_review" + if cache_dir.exists(): + try: + files = sorted(cache_dir.glob("*.md"), key=os.path.getmtime, reverse=True) + if files: + if (datetime.now().timestamp() - os.path.getmtime(str(files[0]))) < 86400: + content = files[0].read_text(encoding="utf-8") + lines = [l for l in content.split("\n")[:30] if len(l.strip()) > 3] + return "## 📈 今日大盘背景\n" + "\n".join(lines) + except Exception: + pass + + # 实时调用 DSA + try: + from src.core.market_review import run_market_review + + class StubNotifier: + def is_available(self): return False + def send(self, *a, **kw): return True + def save_report_to_file(self, *a, **kw): return None + + config = _ensure_dsa_config() + result = run_market_review( + notifier=StubNotifier(), config=config, + override_region=region, send_notification=False, + save_report_file=False, persist_history=False, + trigger_source="mofin", + ) + if result and isinstance(result, str): + lines = [l for l in result.split("\n")[:25] if len(l.strip()) > 3] + return "## 📈 今日大盘复盘\n" + "\n".join(lines) + except Exception as e: + logger.warning("DSA 大盘复盘失败: %s", e) + + return "" + + +# ── 3. 策略问股(第二意见)─────────────────────────────────────────── + +def get_stock_analysis( + stock_code: str, + stock_name: str = "", + skills: list = None, +) -> dict | None: + """用 DSA 的 15 种内置策略独立分析一只股票。 + + Args: + stock_code: 股票代码 + stock_name: 股票名称 + skills: 策略列表,默认 ['ma_golden_cross', 'bull_trend'] + + Returns: + dict: {source, sentiment_score, operation_advice, trend_prediction, + analysis_summary, risk_warning, strategies_used, raw} + """ if not _HAS_DSA: return None + if not skills: + skills = ["ma_golden_cross", "bull_trend"] + try: - sys.path.insert(0, str(_DSA_BASE)) + from src.agent.factory import build_agent_executor - # 尝试从 DSA 的本地缓存中读取最近的市场复盘 - from src.services.daily_market_context import DailyMarketContextService - - # 先看本地是否有缓存 - cache_dir = _DSA_BASE / "data" / "market_review" - if cache_dir.exists(): - files = sorted(cache_dir.glob("*.md"), key=os.path.getmtime, reverse=True) - if files: - content = files[0].read_text(encoding="utf-8") - # 只取摘要部分(前 500 字) - lines = content.split("\n") - summary_lines = [] - for line in lines: - if len(line.strip()) > 5: - summary_lines.append(line) - if len(summary_lines) >= 20: - break - return "\n".join(summary_lines) - - # 如果没有缓存,尝试实时获取(需要 DSA 完整配置) - logger.debug("未找到 DSA 市场复盘缓存,跳过") - return None + executor = build_agent_executor(skills=skills) + result = executor.run( + task=f"分析 {stock_code} {stock_name}", + context={"stock_code": stock_code, "stock_name": stock_name, "report_language": "zh"}, + ) + if result.success and result.dashboard: + d = result.dashboard + return { + "source": "DSA", + "sentiment_score": d.get("sentiment_score", 0), + "operation_advice": d.get("operation_advice", ""), + "trend_prediction": d.get("trend_prediction", ""), + "analysis_summary": d.get("analysis_summary", ""), + "risk_warning": d.get("risk_warning", ""), + "strategies_used": skills, + "raw": result.content[:500] if result.content else "", + } except Exception as e: - logger.debug("获取 DSA 市场复盘失败: %s", e) - return None - finally: - # 清理 sys.path - if str(_DSA_BASE) in sys.path: - sys.path.remove(str(_DSA_BASE)) - - -def get_news_context() -> str | None: - """搜索今日重要财经新闻""" - # 预留接口:待 DSA 依赖安装后实现 - # 目前 MoFin 的 mofin_news.py 已覆盖基本新闻需求 + logger.warning("DSA Agent 分析 %s 失败: %s", e) + return None -def get_stock_fundamentals(code: str) -> dict | None: - """通过 DSA 获取股票基本面数据""" - if not _HAS_DSA: - return None +def get_strategy_opinion_text(opinion: dict) -> str: + """将 get_stock_analysis() 的结果格式化为可读文本""" + if not opinion: + return "" + return ( + f"## 🤖 DSA 策略参考\n" + f"- 评分: {opinion.get('sentiment_score', '?')}/100\n" + f"- 建议: {opinion.get('operation_advice', '?')}\n" + f"- 趋势: {opinion.get('trend_prediction', '?')}\n" + f"- 策略: {', '.join(opinion.get('strategies_used', []))}\n" + f"- 摘要: {opinion.get('analysis_summary', '')}\n" + f"- 风险: {opinion.get('risk_warning', '')}" + ) + + +# ── 4. 综合上下文(一键调用)───────────────────────────────────────── + +def enrich_analysis_context( + stock_code: str = "", + stock_name: str = "", + region: str = "cn", + include_news: bool = True, + include_market: bool = True, +) -> str: + """一键获取 DSA 全部分析上下文,注入 MoFin 的 LLM prompt。 - try: - sys.path.insert(0, str(_DSA_BASE)) - from mo_provider import MoDataProvider - provider = MoDataProvider() - return provider.get_fundamentals(code) - except Exception as e: - logger.debug("获取 %s 基本面失败: %s", code, e) - return None - finally: - if str(_DSA_BASE) in sys.path: - sys.path.remove(str(_DSA_BASE)) + 在 strategy_lifecycle.reassess_with_context() 或 Hermes cron job 的 prompt 前调用。 + + Returns: + str: 可直接拼接到 LLM prompt 的 Markdown 文本 + """ + parts = [] + + if include_market: + market = get_market_review(region) + if market: + parts.append(market) + + if include_news and stock_code: + news = get_stock_news(stock_code, stock_name) + if news: + parts.append(news) + + return "\n\n".join(parts) if parts else "" -# ── 便捷入口 ────────────────────────────────────────────────────────── - -def quick_summary() -> str: - """快速获取今日分析上下文(单次调用)""" - return enrich_analysis_context() - - -# ── 自检 ────────────────────────────────────────────────────────────── +# ── 自检 ───────────────────────────────────────────────────────────── if __name__ == "__main__": - print(f"DSA 可用: {_HAS_DSA}") - print(f"DSA 路径: {_DSA_BASE}") - + print(f"DSA: {'available' if _HAS_DSA else 'NOT FOUND'} ({_DSA_BASE})") if _HAS_DSA: - context = enrich_analysis_context() - if context: - print(f"\n=== 市场上下文 ({len(context)} 字符) ===") - print(context[:1000]) - else: - print("\n无可用市场上下文(DSA 缓存为空)") - else: - print("\nDSA 不可用,跳过。部署后需安装依赖:") - print(" pip install litellm akshare yfinance baostock") + print("\n--- 新闻测试 (600519) ---") + n = get_stock_news("600519", "贵州茅台", max_results=2) + print(n[:300] if n else "(无结果)") + print("\n--- 大盘测试 ---") + m = get_market_review("cn") + print(m[:300] if m else "(无结果)") diff --git a/strategy_lifecycle.py b/strategy_lifecycle.py index ea812b6..63a52fd 100644 --- a/strategy_lifecycle.py +++ b/strategy_lifecycle.py @@ -1282,6 +1282,17 @@ def reassess_with_context(code, name, price, cost, shares, current_action, news_sentiment = {} fund = {} + # ── DSA 集成:注入大盘复盘 + 新闻情报 ────────────────────────── + try: + from mo_bridge import enrich_analysis_context + region = "hk" if len(str(code)) == 5 and str(code)[0] in ('0','1') else "cn" + dsa_ctx = enrich_analysis_context(stock_code=code, stock_name=name, + region=region, include_news=True) + if dsa_ctx: + macro_desc = (macro_desc + "\n\n" + dsa_ctx).strip() + except Exception: + pass # DSA 不可用时静默跳过 + enriched, factors = enrich_timing_signal( base_signal=result.get("timing_signal", ""), macro_desc=macro_desc,