Files
MoFin/mo_bridge.py
T

158 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
mo_bridge.py — MoFin ↔ DSA 集成桥接
在 MoFin 的定时分析流程(cron_to_xmpp.py)中,
在 LLM 分析 prompt 之前注入 DSA 的宏观情报。
用法:
from mo_bridge import enrich_analysis_context
# 在 LLM 分析前调用
context = enrich_analysis_context()
if context:
prompt += f"\n\n## 今日大盘背景\n{context}"
依赖:
需要 DSA 源码 + 依赖(pip install litellm akshare yfinance 等)
未安装时优雅降级,不影响 MoFin 正常运行。
"""
import sys
import os
import json
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
# DSA 源码路径(按优先级尝试)
_DSA_CANDIDATES = [
"/home/hmo/daily-stock-analysis",
str(Path(__file__).resolve().parent.parent / "daily-stock-analysis" / "ZhuLinsen-daily_stock_analysis-a448886"),
]
_DSA_BASE = None
for _c in _DSA_CANDIDATES:
_p = Path(_c)
if _p.is_dir() and (_p / "data_provider" / "base.py").exists():
_DSA_BASE = _p
break
_HAS_DSA = _DSA_BASE is not None
def enrich_analysis_context(region: str = "cn") -> str:
"""从 DSA 获取市场背景和新闻舆情,注入 MoFin 分析上下文。
Args:
region: cn/hk/us/both — 分析哪个市场
Returns:
str: Markdown 格式的市场背景文本(可直接注入分析 prompt)
如果 DSA 不可用,返回空字符串
"""
parts = []
# 1. 大盘复盘
market_text = get_market_review(region)
if market_text:
parts.append(f"## 今日大盘背景\n{market_text}")
# 2. 搜索舆情(如果有 DSA search_service
news_text = get_news_context()
if news_text:
parts.append(f"## 今日重要新闻\n{news_text}")
return "\n\n".join(parts)
def get_market_review(region: str = "cn") -> str | None:
"""获取 DSA 市场复盘摘要"""
if not _HAS_DSA:
return None
try:
sys.path.insert(0, str(_DSA_BASE))
# 尝试从 DSA 的本地缓存中读取最近的市场复盘
from src.services.daily_market_context import DailyMarketContextService
# 先看本地是否有缓存
cache_dir = _DSA_BASE / "data" / "market_review"
if cache_dir.exists():
files = sorted(cache_dir.glob("*.md"), key=os.path.getmtime, reverse=True)
if files:
content = files[0].read_text(encoding="utf-8")
# 只取摘要部分(前 500 字)
lines = content.split("\n")
summary_lines = []
for line in lines:
if len(line.strip()) > 5:
summary_lines.append(line)
if len(summary_lines) >= 20:
break
return "\n".join(summary_lines)
# 如果没有缓存,尝试实时获取(需要 DSA 完整配置)
logger.debug("未找到 DSA 市场复盘缓存,跳过")
return None
except Exception as e:
logger.debug("获取 DSA 市场复盘失败: %s", e)
return None
finally:
# 清理 sys.path
if str(_DSA_BASE) in sys.path:
sys.path.remove(str(_DSA_BASE))
def get_news_context() -> str | None:
"""搜索今日重要财经新闻"""
# 预留接口:待 DSA 依赖安装后实现
# 目前 MoFin 的 mofin_news.py 已覆盖基本新闻需求
return None
def get_stock_fundamentals(code: str) -> dict | None:
"""通过 DSA 获取股票基本面数据"""
if not _HAS_DSA:
return None
try:
sys.path.insert(0, str(_DSA_BASE))
from mo_provider import MoDataProvider
provider = MoDataProvider()
return provider.get_fundamentals(code)
except Exception as e:
logger.debug("获取 %s 基本面失败: %s", code, e)
return None
finally:
if str(_DSA_BASE) in sys.path:
sys.path.remove(str(_DSA_BASE))
# ── 便捷入口 ──────────────────────────────────────────────────────────
def quick_summary() -> str:
"""快速获取今日分析上下文(单次调用)"""
return enrich_analysis_context()
# ── 自检 ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
print(f"DSA 可用: {_HAS_DSA}")
print(f"DSA 路径: {_DSA_BASE}")
if _HAS_DSA:
context = enrich_analysis_context()
if context:
print(f"\n=== 市场上下文 ({len(context)} 字符) ===")
print(context[:1000])
else:
print("\n无可用市场上下文(DSA 缓存为空)")
else:
print("\nDSA 不可用,跳过。部署后需安装依赖:")
print(" pip install litellm akshare yfinance baostock")