diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5b0d0ae --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +__pycache__/ +*.pyc +*.pyo +data/reports/ +data/temp_realtime.json +.DS_Store +*.swp +*.swo diff --git a/EXPERT_SYSTEM_DESIGN.md b/EXPERT_SYSTEM_DESIGN.md new file mode 100644 index 0000000..a720718 --- /dev/null +++ b/EXPERT_SYSTEM_DESIGN.md @@ -0,0 +1,418 @@ +# MoFin 专家系统 — 完整架构文档 v2.0 + +> 最后更新:2026-06-12 +> 维护人:莫荷(Hermes Agent) +> 铁律:任何系统改动必须先读本文档,改完必须同步更新 + +--- + +## 一、核心理念 + +``` +每份分析都成为下一次分析的养料 +每次建议都成为下一次建议的参考 +从内到外(知识→分析),从外到内(分析→沉淀)的持续闭环 +``` + +## 二、股票操作策略生命周期 + +``` + ┌─────────────────────────────┐ + │ 大环境+行业趋势研判 │ + │ (盘前热点扫描·每天8:30) │ + │ 板块扫描→发现新标的 │ + └──────────┬──────────────────┘ + │ 写入 watchlist.json + ▼ + ┌─────────────────────────────┐ + │ 自选股池 (watchlist.json) │ + │ │ + │ 🟢 可操作 — 价格在买入区内 │ + │ 🟡 关注 — 距触发3~5% │ + │ ⚪ 观察 — 距触发>5% │ + │ │ + │ 监控: │ + │ • 快速盯盘(每15分钟) │ + │ • 持仓情报-盘后(每天) │ + │ • 自选股体检(每周六) │ + └──────────┬──────────────────┘ + 进入买入区 │ 建仓 + ▼ + ┌─────────────────────────────┐ + │ 持仓股 (portfolio.json) │ + │ │ + │ 监控: │ + │ • 价格监控(每分钟·纯脚本) │ + │ • 快速盯盘(每15分钟·LLM) │ + │ • 持仓情报-盘中(每小时·LLM) │ + │ • 持仓情报-盘后(每天·LLM) │ + │ • 分析师-持仓复查(每周·LLM) │ + └──────────┬──────────────────┘ + 触发止损/止盈/清仓 │ + ▼ + ┌─────────────────────────────┐ + │ 回到自选股池 │ + │ (保留策略,等待下次机会) │ + └─────────────────────────────┘ +``` + +## 三、系统架构总览 + +``` +┌──────────────────────────────────────────────────────────────────┐ +│ MoFin 专家系统 v2.0 │ +├──────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────────────┐ ┌──────────────────────────────┐ │ +│ │ 基础设施层(纯脚本) │ │ 分析层(LLM驱动) │ │ +│ │ 0配额消耗 │ │ 每次运行消耗1次配额 │ │ +│ │ │ │ │ │ +│ │ • 价格监控(1分钟) │ │ • 盘前热点扫描(8:30) │ │ +│ │ 腾讯批量API拉所有股票 │ │ → 持仓分析+板块扫描 │ │ +│ │ 写入portfolio/watchlist│ │ → 新机会发现→写入watchlist│ │ +│ │ 比对区间触发→输出 │ │ │ │ +│ │ │ │ • 集合竞价观察(9:28) │ │ +│ │ • 市场数据采集(30分钟) │ │ → A股竞价异常监控 │ │ +│ │ 板块数据→market.json │ │ │ │ +│ │ │ │ • 快速盯盘(每15分钟) │ │ +│ │ • 数据同步(8:55) │ │ → 持仓操作窗口+自选接近提醒 │ │ +│ │ update_data+server检查│ │ │ │ +│ │ │ │ • 持仓情报-盘中(每小时40分) │ │ +│ │ • XMPP中继(每分钟) │ │ → 详细盘中分析 │ │ +│ │ 扫输出目录→推知微 │ │ │ │ +│ │ │ │ • 持仓情报-盘后(20:00) │ │ +│ │ • 港股低延迟行情中继 │ │ • 分析师-持仓复查(周四20:00) │ │ +│ │ tdx-relay (Windows) │ │ → 基本面深度复查 │ │ +│ │ opentdx→招商证券7727 │ │ │ │ +│ │ POST → /api/update/ │ │ │ │ +│ │ realtime │ │ │ │ +│ └─────────┬───────────────┘ │ │ │ +│ │ │ │ │ +│ ▼ │ │ │ +│ ┌─────────────────────────┐ │ │ │ +│ │ 数据层 │ │ │ │ +│ │ portfolio.json(持仓) │ │ │ │ +│ │ watchlist.json(自选) │ │ → 每只自选评估:保留/移除 │ │ +│ │ decisions.json(决策库) │ │ │ │ +│ │ market.json(板块数据) │ │ • 知微周复盘(周日22:00) │ │ +│ │ daily_reviews.json │ │ → 周度总结 │ │ +│ │ stock_profiles.json │ └──────────┬───────────────────┘ │ +│ └─────────┬───────────────┘ │ │ +│ │ │ │ +│ ▼ ▼ │ +│ ┌──────────────────────────────────────────────────────────┐ │ +│ │ 推送层 │ │ +│ │ cron_to_xmpp.py(每分钟·纯脚本) │ │ +│ │ 扫描 ~/.hermes/cron/output/*/ 目录 │ │ +│ │ 发现新 .md 文件 → 提取正文 → XMPP推送【知微】 │ │ +│ │ 已推送的记在 .relay_journal.json,不重复推 │ │ +│ └──────────────────────────────────────────────────────────┘ │ +└──────────────────────────────────────────────────────────────────┘ +``` + +## 四、完整 Cron Job 清单(14个) + +### 🟢 基础设施层(纯脚本,0配额消耗) + +| # | 名称 | 频率 | 脚本 | 输出 | 说明 | +|---|------|------|------|------|------| +| 1 | **价格监控-1分钟** | 工作日 每分钟(9-11/12/13-16) | `price_monitor.py` | 有触发→推送;无→SILENT | 腾讯批量API拉所有股票实时价,写入portfolio/watchlist,比对区间触发条件 | +| 2 | **市场数据采集** | 工作日 每30分钟 | `market_watch.py` | 成功→SILENT;失败→报错 | 东方财富API采集板块数据→market.json | +| 3 | **数据同步-dashboard** | 每天 8:55 | `sync_dashboard.py` | 有数据→推送;无→SILENT | 跑update_data.py+检查server,挂了自动重启 | +| 4 | **cron-推XMPP中继** | 工作日 每分钟(9-16) | `cron_to_xmpp.py` | 有新报告→推送;无→静默 | 扫所有job输出目录,推新报告到XMPP | +| 5 | **中继-16点收盘** | 工作日 16:00-16:10 | `cron_to_xmpp.py` | 同上 | 收盘时段推港股报告 | + +### 🔵 分析层(LLM驱动,每次1次配额) + +| # | 名称 | 频率 | 关键职责 | 价格获取方式 | +|---|------|------|---------|------------| +| 6 | **盘前热点扫描** | 工作日 8:30 | 持仓分析+**板块扫描→新机会发现→写入watchlist** | 读文件(price_monitor刷新) | +| 7 | **集合竞价观察** | 工作日 9:28 | A股竞价异常监控,无异常SILENT | A股curl竞价价+港股读文件 | +| 8 | **快速盯盘-15分钟** | 工作日 每15分钟 | 持仓操作窗口+**自选股接近买入区提醒**,无则SILENT | 读文件 | +| 9 | **持仓情报-盘中** | 工作日 每小时40分 | 详细盘中分析+异动搜因 | 读文件 | +| 10 | **持仓情报-盘后** | 工作日 20:00 | 完整复盘+**自选股回顾**+数据沉淀daily_reviews | 读文件 | +| 11 | **分析师-持仓复查** | 周四 20:00 | 每周基本面深度复查 | 读文件 | +| 12 | **自选股体检-每周** | **周六 20:00 🆕** | 每只自选评估:已建仓→移除、>30天未接近→建议移除、行业变化→更新理由 | 读文件+web_search | +| 13 | **知微周复盘** | 周日 22:00 | 周度总结 | 读文件 | +| 14 | **策略评估-每日** | 工作日 21:00 | 双维度评估36条策略,生成反馈闭环 | 读文件+LLM | 无变化→SILENT | +| 15 | **建议对账-每周** | 周六 20:00 | 对比建议vs实际持仓变化,算准确率 | 纯脚本 | 有数据→推送 | +| 16 | **策略评估-每周** | 周六 21:00 | 完整评分+趋势分析+策略参数调整 | 纯脚本 | 有数据→推送 | + +## 五、数据文件 + +| 文件 | 路径 | 作用 | 更新频率 | 更新者 | +|------|------|------|---------|--------| +| 持仓数据 | `data/portfolio.json` | 持仓快照+实时价 | 每分钟 | price_monitor | +| 自选数据 | `data/watchlist.json` | 自选股列表+买入区+实时价 | 每分钟 | price_monitor+盘前扫描+自选体检 | +| 决策库 | `data/decisions.json` | 策略+trigger+建议历史+评估数据 | 每次讨论后+每日评估 | LLM job+策略评估 | +| 每日复盘 | `data/daily_reviews.json` | 当日判断+修正沉淀 | 每日20:00 | 持仓情报-盘后 | +| 股票档案 | `data/stock_profiles.json` | 行业/业务/逻辑 | 发现新信息时 | LLM | +| 市场数据 | `data/market.json` | 板块涨跌+概念热度 | 每30分钟 | market_watch | +| 价格事件 | `data/price_events.json` | 价格触发记录(止盈/止损/买入区) | 每分钟 | price_monitor | +| 评估结果 | `data/evaluation.json` | 双维度评估+评分 | 每天21:00 | 策略评估 | +| 准确率统计 | `data/accuracy_stats.json` | 建议对账+准确率 | 每周六20:00 | 对账脚本 | +| 反馈闭环 | `data/strategy_feedback.json` | 反馈+调整建议+知识萃取 | 每天21:00 | 策略评估 | +| 区间偏离 | `~/.hermes/zone_breach.json` | 价格偏离标记 | 每分钟 | price_monitor | +| 触发状态 | `~/.hermes/price_trigger_state.json` | 每只股票各区间的进出状态 | 每分钟 | price_monitor | +| relay状态 | `data/relay_state.json` (新增) | tdx-relay在线/离线状态 | 每分钟 | server.py(update/realtime更新) | + +## 六、数据流转 + +``` +tdx-relay (Windows·xxm负责) + │ opentdx → 招商证券7727扩展行情服务 + │ 拉取17只港股实时行情(1~3秒延迟) + │ POST → http://192.168.1.246:8899/api/update/realtime + │ + ├──→ server.py 接收 → 更新 portfolio.json / watchlist.json + │ 港股 data_source = "tdx_relay" + │ 更新 relay_state.json + │ + └──→ price_monitor 检测 relay 状态 + relay 在线 → 跳过港股腾讯API拉取(保留实时价) + relay 离线(>60秒) → 回退腾讯 API 兜底 + +price_monitor(每分钟) + │ 腾讯批量API 1次请求 + │ + ├──→ portfolio.json (持仓实时价) + ├──→ watchlist.json (自选实时价) + ├──→ 比对decisions.json的trigger区间 + ├──→ 有触发 → 输出 → 中继 → 知微 + └──→ 无触发 → [SILENT] → 丢弃 + +market_watch(每30分钟) + │ 东方财富API + └──→ market.json (板块数据) + +盘前热点扫描(每天8:30) + │ 读 market.json → 分析强势板块 + │ 发现新标的 → 写入 watchlist.json + └──→ 输出报告 → 中继 → 知微 + +快速盯盘(每15分钟) + │ 读 portfolio.json + watchlist.json + │ 持仓操作窗口 + 自选接近买入区 + └──→ 有发现 → 输出 → 中继 → 知微 + +持仓情报-盘后(每天20:00) + │ 读 portfolio.json + watchlist.json + │ 持仓回顾 + 自选回顾 + └──→ 写入 daily_reviews.json + └──→ 输出报告 → 中继 → 知微 + +自选股体检(每周六20:00) + │ 读 watchlist.json + portfolio.json + │ 每只评估:保留/移除/更新 + └──→ 输出报告 → 中继 → 知微 +``` + +## 七、报告推送策略 + +| 报告 | 推送 | 频率 | 静默规则 | +|------|------|------|---------| +| 价格监控 | ✅ 有触发才推 | 每分钟 | 无触发→[SILENT] | +| 盘前扫描 | ✅ | 每日1次 | 有发现才写 | +| 集合竞价 | ✅ 有异常才推 | 每日1次 | 无异常→SILENT | +| 快速盯盘 | ✅ 有发现才推 | 每15分钟 | 无操作窗口+无自选接近→SILENT | +| 持仓情报-盘中 | ✅ 有发现才推 | 每小时40分 | 无操作窗口→合并后停止 | +| 持仓情报-盘后 | ✅ | 每日20:00 | 有实质内容才写 | +| 分析师复查 | ✅ | 每周四 | 有发现才写 | +| 自选股体检 | ✅ | 每周六 | 有建议才写 | +| 知微周复盘 | ✅ | 每周日 | 有实质内容才写 | +| 市场数据采集 | ❌ 不推 | 每30分钟 | 只写文件 | +| 数据同步 | ✅ 有数据才推 | 每天8:55 | 无更新→SILENT | + +## 八、关键设计决策 + +| 方面 | 决策 | 原因 | +|------|------|------| +| 价格获取 | 腾讯批量API `qt.gtimg.cn/q=sh600110,sz000001,...` | 1次请求拉所有股票,无频率限制 | +| 价格监控类型 | 纯脚本(`no_agent=True`) | 不需要LLM,0配额消耗 | +| 中继类型 | 纯脚本(`no_agent=True`) | 文件扫描+XMPP推送,不需要LLM | +| 市场数据采集 | 纯脚本(`no_agent=True`) | 只写文件不输出,不需要LLM | +| 数据同步 | 纯脚本(`sync_dashboard.py`) | 包装脚本含server检查,不需要LLM | +| LLM job价格来源 | 读文件(price_monitor每1分钟刷新) | 不再逐个curl,节省配额 | +| 新机会发现 | 合并到盘前热点扫描(每天) | 每天分析板块数据,发现新标的写入watchlist | +| 自选股进出管理 | 自选股体检(每周六) | 系统评估+提请确认,不自动删除 | +| 午休 | 价格监控跳过12:05-12:55(保留12:00) | A股午休无行情 | + +## 九、配额消耗估算(一个上午) + +| 组件 | 运行次数 | 配额消耗 | +|------|---------|---------| +| 价格监控 | ~240次(每分钟) | **0** (纯脚本) | +| 市场数据采集 | ~10次 | **0** (纯脚本) | +| 数据同步 | 1次 | **0** (纯脚本) | +| XMPP中继 | ~240次 | **0** (纯脚本) | +| 盘前热点扫描 | 1次 | **1次** | +| 集合竞价 | 1次 | **1次** | +| 快速盯盘 | ~20次 | **~20次** (有操作窗口才跑,无则SILENT不消耗) | +| 持仓情报-盘中 | ~6次 | **~6次** | +| 策略评估-每日 | 1次 | **1次** | +| **总计** | | **~29次 LLM调用** | + +--- + +## 十、策略评估反馈闭环架构 + +### 核心流程 + +``` +price_monitor(每分钟·纯脚本) + │ 腾讯批量API拉所有股票实时价 + │ 比对策略区间 → 触发 → 记录到 price_events.json + │ + ├──→ 价格触发事件持久化(price_events.json) + │ event_type: entry_zone / stop_loss / take_profit + │ code, name, price, trigger_value, timestamp + │ + ▼ +strategy_evaluator(每天21:00·纯脚本) + │ 读 decisions.json + price_events.json + portfolio.json + │ 双维度评估每只股票: + │ 阶段一(策略制定→价格达标): + │ - 理论:价格是否达到过止盈/止损/买入区 + │ - 实际:持仓盈亏、建议执行情况 + │ 阶段二(价格回落→新止损验证): + │ - 新止损是否被后续走势验证 + │ 写入 decisions.json 的 evaluation 字段 + │ + ├──→ 写入 evaluation.json + │ + ▼ +strategy_feedback(每天21:00·纯脚本,评估后自动) + │ 读 evaluation.json + │ 自动决策: + │ - 价格达到止盈 → 标记阶段一成功,萃取经验 + │ - 跌破止损 → 标记阶段一失败,生成新区间 + │ - 14天无触发 → 标记 stale,建议重评 + │ - 准确率持续下降 → 收紧区间宽度 + │ 写入 strategy_feedback.json + │ + ├──→ 写入 strategy_feedback.json + │ + ▼ +每日评估 cron(每天21:00·LLM驱动) + │ 读 strategy_feedback.json + │ 处理反馈 → 更新策略 → 输出日报 + │ 无变化 → [SILENT] + │ + ▼ +盘后知识萃取(每天16:30·LLM驱动) + │ 读 feedback 中的 knowledge 字段 + │ 写入 analyst-knowledge-log.md + │ + ▼ +建议对账(每周六20:00·纯脚本) + │ 对比 advice_timeline vs portfolio.json + │ 算准确率 → 写入 accuracy_stats.json + │ + ▼ +策略评估-每周(每周六21:00·纯脚本) + │ 完整评分 → 趋势分析 → 策略参数调整 + │ 写入 evaluation.json + strategy_feedback.json +``` + +### 双维度评估模型 + +| 维度 | 阶段一 | 阶段二 | +|------|--------|--------| +| 理论(策略规划) | 价格是否达到止盈/止损/买入区?何时?理论盈亏? | 新止损是否被后续走势验证? | +| 实际(用户执行) | 实际持仓盈亏?建议是否被采纳? | 用户是否按新止损操作?实际损失? | + +### 反馈引擎自动决策规则 + +| 检测条件 | 自动动作 | +|----------|----------| +| 价格达到止盈位 | 标记阶段一成功,萃取经验到知识日志,创建新自选策略 | +| 跌破止损位 | 标记阶段一失败,分析原因,生成新止损/买入区间 | +| 策略14天未触发任何区间 | 标记为 stale,建议重新评估基本面和技术面 | +| 准确率持续下降(<50%) | 收紧策略区间宽度(10%→8%) | +| 准确率持续上升(>80%) | 放宽策略区间宽度(10%→12%) | + +--- + +## 十、v2.0 更新说明(2026-06-09) + +本次更新基于知微与莫荷、老爸的深入讨论,对系统做了全面重构。以下记录讨论过程中的关键意图、决策和待办事项。 + +### 本次改动驱动因素 + +1. **火山引擎(volcengine) 429配额超限** → 触发对 provider 配置和 fallback 链的全面梳理 +2. **发现价格监控逐个curl拉取价格** → 效率极低且浪费配额,触发了对价格获取方式的全面改造 +3. **知微反馈"问了不在回答长篇报告"** → 触发了对 session 管理、system prompt、对话识别机制的修复 +4. **老爸指出持仓和自选股之间的动态生命周期关系** → 触发了对股票操作策略完整生命周期的重新设计 + +### 已完成的改造 + +**基础设施层:** +- 价格监控从 LLM驱动(每5分钟) 改为 纯脚本(每1分钟),使用腾讯批量API一次拉所有股票 +- XMPP中继从 LLM驱动 改为 纯脚本(每1分钟) +- 市场数据采集从 LLM驱动 改为 纯脚本 +- 数据同步从 LLM驱动 改为 纯脚本(含server检查包装脚本) +- 所有LLM job的价格获取从"逐个curl拉取"改为"直接读文件(price_monitor每1分钟刷新)" + +**股票生命周期完善:** +- 新机会发现:合并到盘前热点扫描(每天8:30),板块扫描→发现新标的→写入watchlist +- 自选接近提醒:快速盯盘(每15分钟)新增自选接近买入区(<5%)单独提醒 +- 自选回顾:持仓情报-盘后(每天20:00)新增自选股回顾板块 +- 自选体检:新增每周六20:00的自动评估+建议移除/保留 + +**provider配置统一:** +- 莫荷(默认profile):默认ocg-old(旧key) → fallback ocg-new(新key) → volcengine(火山) +- 知微(position-analyst):默认volcengine(火山) → fallback ocg-old(旧key) → ocg-new(新key) +- 火山已恢复可用 + +**对话机制修复:** +- 知微SOUL新增对话上下文识别规则(打招呼vs分析请求vs模糊提问) +- 知微SOUL新增系统自动追加消息(bg-review)说明 +- position-analyst启用压缩(protect_last_n=200, hygiene_hard_message_limit=400) + +**策略评估反馈闭环(v2.0 深夜追加):** +- 价格事件持久化:price_monitor每分钟记录价格触发事件到price_events.json +- 双维度评估:strategy_evaluator每天评估36条策略,分阶段一(策略制定→价格达标)和阶段二(价格回落→新止损验证),每个阶段同时记录理论盈亏和实际盈亏 +- 反馈引擎:strategy_feedback自动检测止盈/止损/stale策略,生成调整建议,萃取知识到知识日志 +- Dashboard评估tab:展示评分、理论vs实际盈亏对比、反馈闭环卡片 +- 建议对账:advice_reconciliation.py每周对比建议vs实际持仓变化算准确率 +- 36条策略全部填充建议记录,平均分4.8/10 + +### 设计原则/意图 + +1. **知微是分析师,不是决策者** — 她出建议,老爸决定是否执行,是否执行的唯一真相来源是截图/持仓更新 +2. **莫荷是参谋,不是执行者** — 莫荷出报告和建议,不做交易操作 +3. **建议≠事实** — 不能假设建议=被执行,需要等持仓更新来验证 +4. **持仓/自选动态循环** — 持仓清仓→自动转自选保留策略,自选建仓→自动转为持仓管理 +5. **自选股进出需人工确认** — 系统可以建议移除,但不能自动删除,需老爸确认 +6. **轻对话,重分析** — 不是每条消息都需要全面分析报告,区分打招呼和干活 + +### 尚未完成/待定事项 + +1. **建议自动记录+对账闭环** — 莫荷发出的建议自动记入decisions.json的advice_timeline,每周对比实际持仓变化,统计准确率(✅ 已完成:/api/advice/record + advice_reconciliation.py + accuracy_stats.json) +2. **MoFin Dashboard整合** — server.py应整合所有数据源,包括待确认决策、准确率统计等(✅ 已完成:决策库tab + 评估tab + 反馈闭环tab) +3. **知微XMPP通信通道优化** — 当前xmpp_zhiwei_bot.py断线重连机制有缺陷(Event().wait阻塞),需修复(✅ 已完成:watchdog循环每10秒检查连接状态) +4. **session跨profile检索** — 知微无法搜到莫荷CLI session的内容,需手动转发(✅ 已完成:session_search profile参数可用) +5. **策略双维度评估体系** — 理论策略vs实际执行,分阶段跟踪,自动反馈闭环(✅ 已完成:strategy_evaluator.py + strategy_feedback.py + 每日/每周cron) +6. **价格事件持久化** — price_monitor每分钟记录价格触发事件供回溯评估(✅ 已完成:price_events.json + record_event()) + +### tdx-relay 集成(小小莫Windows端 + 知微Linux端) + +详见单独文档:`TDX_RELAY_COLLAB.md` + +职责分工: +- **小小莫**:Windows端 tdx-relay 开发维护(opentdx→招商证券7727→港股实时行情推送) +- **知微**:Linux端 MoFin API 兼容(/api/update/realtime 已实现,/api/relay/status 待实现)+ price_monitor relay 检测+回退逻辑 + +当前状态: +- ✅ `/api/update/realtime` 端点已在 server.py 中实现 +- ⏳ `relay_state.json` 持久化(待实现) +- ⏳ `/api/relay/status` GET 端点(待实现) +- ⏳ `price_monitor.py` relay 活跃检测(待实现) +- ⏳ relay 离线时自动回退腾讯 API(待实现) + +### 正在进行的优化(2026-06-12) + +1. **报告格式精简** — 老爸反复强调太长无重点。已改为:重点推荐(≤3只)+风险关注(≤3只)+其余持仓一行。全文≤500字。 +2. **技术面分析取代百分比策略** — 支撑/压力位用枢轴点算法,止损=min(强支撑, 成本×0.85),买入区=弱撑~弱压之间,止盈在盈亏比≥1:2处。 +3. **MoFin 决策库重构** — 清空旧记录(标记obsolete),从6月11日起重新记录。新增 active_manual / current_recommend 标签,支持按此筛选排序。新增搜索框。 +4. **格式校验器** — cron_to_xmpp.py 新增 validate_format(),拦截无重点/超长/含模棱两可词语的报告。记录到 daily 汇总(不拦截推送,只记录违规)。 diff --git a/SYSTEM_ARCHITECTURE.md b/SYSTEM_ARCHITECTURE.md new file mode 100644 index 0000000..de0095b --- /dev/null +++ b/SYSTEM_ARCHITECTURE.md @@ -0,0 +1,221 @@ +# 莫荷系统架构文档 — 完整总览 + +> 最后更新:2026-06-11 +> 维护人:莫荷(Hermes Agent) +> 铁律:任何系统改动必须先读本文档,改完必须同步更新 + +--- + +## 一、系统总览 + +``` +┌──────────────────────────────────────────────────────────────┐ +│ Linux 192.168.1.246 │ +│ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ 默认gateway │ │ 知微gateway │ │ 小果gateway │ │ +│ │ :8642 │ │ :8643 │ │ :8645 │ │ +│ │ 微信+XMPP │ │ position- │ │ xiaoguo │ │ +│ │ mohe网关 │ │ analyst │ │ profile │ │ +│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ +│ │ │ │ │ +│ ▼ ▼ ▼ │ +│ ┌─────────────────────────────────────────────────┐ │ +│ │ state.db (SQLite) │ │ +│ │ sessions / messages / FTS5 / compression_locks │ │ +│ │ 消息存储:全量保存,永不删除 │ │ +│ │ 上下文加载:最多200条,永不压缩 │ │ +│ └─────────────────────────────────────────────────┘ │ +│ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ xmpp_bot │ │ xmpp_ │ │ xmpp_ │ │ +│ │ mohe │ │ zhiwei_bot │ │ xiaoguo_bot │ │ +│ │ mohe@yoin │ │ zhiwei@yoin │ │ xiaoguo@ │ │ +│ │ .fun │ │ .fun │ │ yoin.fun │ │ +│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ +│ │ │ │ │ +│ └──────┬─────────┴────────┬───────┘ │ +│ ▼ ▼ │ +│ ┌──────────────┐ ┌──────────────┐ │ +│ │ ejabberd │ │ 内核组 │ │ +│ │ Docker │ │ coregroup@ │ │ +│ │ port 5222 │ │ conference │ │ +│ └──────────────┘ │ .yoin.fun │ │ +│ └──────────────┘ │ +│ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ 价格监控 │ │ cron调度器 │ │ Obsidian │ │ +│ │ 1分钟·纯脚本 │ │ 14个jobs │ │ 知识库 │ │ +│ └─────────────┘ └─────────────┘ │ :8890 │ │ +│ └─────────────┘ │ +└──────────────────────────────────────────────────────────────┘ + │ │ + ▼ ▼ +┌─────────────────┐ ┌─────────────────┐ +│ Windows 192.168 │ │ Mac 192.168.1 │ +│ .1.16 │ │ .122 │ +│ 小小莫(wechat) │ │ 小果(oMLX) │ +│ OpenCode :4096 │ │ Qwen3.6-27B │ +│ 微信通道 :5801 │ │ :18003 │ +└─────────────────┘ └─────────────────┘ +``` + +## 二、Gateway 一览 + +| 端口 | 名称 | Profile | PID(当前) | 用途 | +|------|------|---------|-----------|------| +| 8642 | 默认gateway | 默认 | 1925504 | 微信小荷 + XMPP mohe | +| 8643 | 知微gateway | position-analyst | 1913506 | 知微分析 | +| 8645 | 小果gateway | xiaoguo | 1925602 | 小果Mac端 | +| 8646 | mohe gateway | mohe | 1620276 | mohe独立网关 | + +每个gateway共用 `/home/hmo/hermes-agent/hermes_state.py` 里的 `get_messages_as_conversation()` — **LIMIT 200硬截断**。 + +## 三、XMPP Bot 架构 + +### 3.1 Bot 列表 + +| Bot | JID | 服务名 | 脚本路径 | 接入gateway | +|-----|-----|--------|---------|------------| +| 莫荷 | mohe@yoin.fun | xmpp-bot | /home/hmo/xmpp_bot.py | :8642 | +| 知微 | zhiwei@yoin.fun | xmpp-zhiwei | /home/hmo/xmpp_zhiwei_bot.py | :8643 | +| 小果 | xiaoguo@yoin.fun | xmpp-xiaoguo | /home/hmo/xmpp_xiaoguo_bot.py | :8645 | + +### 3.2 连接管理(2026-06-11 修复) + +**禁用** `auto_reconnect = True`(与手动重连环冲突,导致"Replaced by new connection"循环) +**禁用** `xep_0199` ping 保活(ejabberd不支持,导致ping超时→误判断线) + +**重连机制**: +- 主循环每15秒检查 `is_connected()` +- 断线后指数退避重连:1s → 2s → 4s → ... → 60s max +- 重连后自动重新加入 MUC(内核组 coregroup@conference.yoin.fun) + +**历史问题**: +- 2026-06-08: bot断线后无法自动重连,session膨胀到3700条/26M tokens +- 2026-06-10: auto_reconnect导致10个重复连接 +- 2026-06-11: 修复auto_reconnect冲突 + API key拼写错误 + +### 3.3 群聊规则 + +Bot只回复内核组中来自 `hmo` 或 `xxm` 的消息。私聊只回复 `hmo@yoin.fun`。 + +## 四、Session 管理 —— 核心设计(2026-06-10 最终方案) + +### 4.1 方案:硬截断200条 + 永不压缩 + +```python +# hermes_state.py → get_messages_as_conversation() +SELECT id, role, content, ... +FROM ( + SELECT id, role, content, ... + FROM messages WHERE session_id = ? + AND active = 1 + ORDER BY id DESC LIMIT 200 ← 只取最近200条 +) ORDER BY id ASC ← 按正序排回 +``` + +### 4.2 Compression 配置(所有profile统一) + +```yaml +compression: + enabled: false ← 永久关闭 + threshold: 0.99 + protect_last_n: 200 + hygiene_hard_message_limit: 100000 +``` + +### 4.3 效果 + +| 指标 | 之前 | 之后 | +|------|------|------| +| 每次请求token | 26M(全量加载) | ~22K(200条) | +| 上下文窗口用量 | 2500% | 2.2% | +| 响应时间 | 10分钟+超时 | 10-20秒 | +| 内容丢失 | 压缩丢细节 | 永不丢失 | +| 旧消息可查 | 压缩后摘要 | 全量DB可搜 | + +### 4.4 Session 列表(当前) + +| Session ID | 消息数 | 用途 | +|-----------|--------|------| +| sisyphus | 9504 | 微信(旧session,已重建) | +| xmpp-mohe | 3705 | XMPP mohe(旧session) | +| xmpp-mohe-v2 | ~200 | XMPP mohe(新session,LIMIT 200) | +| xmpp-zhiwei | 2241 | 知微 | +| 20260610_090241_2235fb | ~900 | 当前CLI会话 | + +## 五、Provider 链(2026-06-10 最终版) + +| Agent | 默认 | Fallback 1 | Fallback 2 | Fallback 3 | +|-------|------|-----------|-----------|-----------| +| **我(CLI)** | ocg-new | ocg-old | volcengine | - | +| **mohe gateway** | ocg-new | ocg-old | volcengine | - | +| **知微** | ocg-old | ocg-new | volcengine(cred池) | - | +| **小果** | volcengine | ocg-old | ocg-new | oMLX(本地Mac) | + +**当前实际状态(2026-06-11):** +- ocg-new: ✅ 可用(当前会话走这个) +- ocg-old: ⚠️ 返回403但gateway cred pool缓存了有效key +- volcengine: ❌ 周配额已尽,6月15日周一恢复 + +## 六、SOUL.md 关键规则(2026-06-10 最终版) + +位置:`/home/hmo/.hermes/profiles/default/SOUL.md` + +### 沟通方式 +- 对老爸:直接、不加修饰 +- 反驳时:**必须带证据**(日志、数据、代码、截图)。不是为了显得聪明而反驳 +- 听指令:用户明确说"闭嘴""停"时立即停止,不继续分析不解释 + +### 行动铁律 — 讲证据 +1. 发现问题 → 2. 收集证据(至少两条独立证据) → 3. 验证假设 → 4. 只改对的 → 5. 改完验证 +- 禁止猜根因、没有证据就动手、猜用户意图、多个改动同时做 + +### 授权边界 +- ✅ 直接行动:读文件、查日志、搜知识库、分析数据、提建议 +- ⚠️ 问清楚再做:改系统配置、重启服务、清数据、写文件 +- ❌ 必须等批准:不可逆删除、修改API key、改provider链、清session + +## 七、知识库(Obsidian) + +路径:`/home/hmo/Obsidian/` +HTTP API:`:8890`(只读) + +结构: +``` +Obsidian/ +├── raw/ — 原始资料(只追加只读) +├── knowledge/ — 加工笔记(tech/finance/ai/psychology/education/life) +├── index.md — 全库索引 +├── SCHEMA.md — 操作规则 +└── log.md — 更新日志 +``` + +## 八、MoFin 股票系统 + +详见 `EXPERT_SYSTEM_DESIGN.md`,核心: +- 14个cron jobs(5个纯脚本+9个LLM) +- 价格监控每1分钟腾讯批量API +- XMPP中继推送报告 + +## 九、近期改动日志 + +### 2026-06-11 +- LIMIT 200硬截断 + 关闭所有compression +- SOUL.md 最终版定稿 +- XMPP bot重连逻辑修复(删除auto_reconnect + ping保活) +- API key typo修复(知微bot `hermess123` → `hermes123`) +- 小果provider链:volc → ocg-old → ocg-new → oMLX +- 默认provider链:ocg-new → ocg-old → volcengine + +### 2026-06-10 +- 重建SOUL.md(讲证据+授权边界+责任闭环) +- 发现并清除orphaned compression flag +- 多个gateway反复重启,systemd服务冲突 +- Windows wechat_agent API key不匹配 + +### 2026-06-09 +- 知微SOUL新增对话识别规则 +- position-analyst 启用压缩 +- 价格监控全面改造(纯脚本+腾讯批量API) diff --git a/TDX_RELAY_COLLAB.md b/TDX_RELAY_COLLAB.md new file mode 100644 index 0000000..7d4a3d7 --- /dev/null +++ b/TDX_RELAY_COLLAB.md @@ -0,0 +1,278 @@ +# MoFin / TDX-Relay 协作文档 + +> 最后更新:2026-06-12 +> 维护人:知微 + 小小莫(xxm) +> 铁律:任何 relay 相关改动必须先读本文档,改完必须同步更新 + +--- + +## 一、什么是 tdx-relay + +tdx-relay 是小小莫(xxm)开发的 Windows 端通达信中继程序。 +作用:通过 opentdx 协议直连招商证券 7727 扩展行情服务器, +为 MoFin 系统提供港股低延迟实时行情。 + +### 为什么需要它 + +原本 MoFin 的港股行情来源是腾讯 API(qt.gtimg.cn), +存在约 15 分钟延迟,对于盘中决策不够及时。 +tdx-relay 将港股行情延迟从约 15 分钟降到接近实时(1~3 秒)。 + +--- + +## 二、系统架构 + +``` +Windows 端(小小莫负责) +┌─────────────────────────────────────────┐ +│ tdx-relay 项目 │ +│ │ +│ tdx_client.py │ +│ └─ opentdx MacExtendedClient │ +│ └─ 直连 招商证券 7727 扩展行情服务器 │ +│ → 拉取 17 只港股实时行情 │ +│ │ +│ run_relay.py │ +│ └─ 断线自动重连(5s/15s/30s 三次退避) │ +│ └─ 推送 → POST /api/update/realtime │ +│ │ +│ start_tdx_relay.bat │ +│ └─ 一键启动脚本 │ +└───────────────┬─────────────────────────┘ + │ HTTP POST (JSON) + ▼ +Linux 端(知微负责) +┌─────────────────────────────────────────┐ +│ MoFin 系统 (web-dashboard) │ +│ │ +│ server.py │ +│ ├─ /api/update/realtime (POST) │ +│ │ ← 接收 tdx-relay 推送的实时行情 │ +│ │ → 更新 portfolio.json + watchlist │ +│ │ → 写入 data_source = "tdx_relay" │ +│ │ │ +│ ├─ /api/relay/status (GET) │ +│ │ ← 查询 relay 状态(在线/离线/时间) │ +│ │ │ +│ price_monitor.py │ +│ └─ relay_active 检测 │ +│ ├─ relay 在线 → 跳过港股腾讯API拉取 │ +│ │ (保留 tdx-relay 的实时价不覆盖) │ +│ └─ relay 掉线 → 回退腾讯 API 兜底 │ +│ │ +│ 数据文件 │ +│ ├─ data/portfolio.json │ +│ │ └─ 每只港股: data_source=txton/tdx │ +│ ├─ data/watchlist.json │ +│ └─ data/relay_state.json (新增) │ +│ └─ online: true/false │ +│ └─ last_ping: 时间戳 │ +└─────────────────────────────────────────┘ +``` + +## 三、职责边界 + +### 小小莫(xxm)— Windows 端 + +负责: +1. tdx_client.py 的开发维护 + - 直连券商行情服务器的稳定性 + - 港股代码列表的维护(当前 17 只) + - 行情数据的正确性验证 +2. run_relay.py 的重连机制 + - 断线自动恢复(3 次退避重连) + - relan 状态上报 +3. 行情推送的稳定性 + - 每 X 秒推送一次实时行情 + - 推送失败的处理 +4. Windows 端部署维护 + - 开机自启动 + - 日志管理 + - 异常告警 + +不负责: +- MoFin API 的修改(但需要配合 server.py 新增端点) +- Linux 端 price_monitor 的回退逻辑 +- 持仓分析和策略制定 + +### 知微(zhiwei)— Linux 端(MoFin) + +负责: +1. MoFin API 的 relay 兼容 + - /api/update/realtime 端点(已实现) + - /api/relay/status 端点(待实现) + - relay_state 持久化(待实现) +2. price_monitor 的 relay 检测(待实现) + - relay 在线 → 跳过港股腾讯 API 拉取 + - relay 掉线 → 回退腾讯 API 兜底 +3. tdx-relay 接入后的数据一致性保障 + - 腾讯 API 和 tdx-relay 的数据源标记区分 + - 价格更新不互相覆盖 +4. 行情来源对分析层的透明化 + - 分析层(cron prompt)不需要关心行情来源 + - 直接读 portfolio.json 即可 + - 数据源标记在 data_source 字段中 + +不负责: +- Windows 端程序的开发和部署 +- 通达信协议的细节 +- 券商行情服务器的维护 + +### 共同维护 + +1. 港股代码列表 — 两边保持一致 +2. 数据格式 — tdx-relay 推送的 JSON 格式与 MoFin 期望的格式 +3. 接口联调 — 新端点上线后的验证 + +--- + +## 四、数据流详解 + +### 正常流程(relay 在线) + +``` +tdx-relay (Windows) + │ 每 X 秒推送 {stocks: [{code, price, change_pct, ...}]} + │ POST → http://192.168.1.246:8899/api/update/realtime + ▼ +server.py 接收 + ├─ 更新 portfolio.json(港股 data_source = "tdx_relay") + ├─ 更新 watchlist.json(港股 data_source = "tdx_relay") + └─ 更新 relay_state.json(online=true, last_ping=now) + │ + ▼ + price_monitor.py(每分钟运行) + ├─ A股 → 腾讯 API(不变) + ├─ 港股 → 检查 relay_state + │ ├─ relay 在线 → 跳过(保留 tdx-relay 的实时价) + │ └─ relay 离线(>60秒无推送)→ 回退腾讯 API + └─ 数据源标记 → 写入 portfolio/watchlist +``` + +### 异常流程(relay 离线) + +``` +tdx-relay 断线 + │ 60秒内无推送 + ▼ +price_monitor.py 检测到 relay_state.online=false + │ 或 last_ping > 60秒前 + ├─ 港股 → 回退腾讯 API 拉取 + ├─ 写入时 data_source = "tencent" + └─ 记录日志 "relay offline, fallback to tencent" + +tdx-relay 恢复 + │ 推送到达 /api/update/realtime + ▼ +server.py 接收更新 + ├─ 更新 relay_state.json(online=true) + └─ 正常接收行情 +``` + +--- + +## 五、接口规范 + +### POST /api/update/realtime (已实现) + +接收 tdx-relay 推送的实时行情。 + +请求格式: +```json +{ + "stocks": [ + { + "code": "00700", + "price": 467.20, + "change_pct": 3.09, + "high": 470.00, + "low": 460.00, + "open": 462.00, + "volume": 15000000 + } + ], + "source": "tdx_relay" +} +``` + +响应: +```json +{ + "status": "ok", + "updated": 15, + "source": "tdx_relay", + "timestamp": "2026-06-12T14:30:00" +} +``` + +### GET /api/relay/status (待实现) + +查询 tdx-relay 当前状态。 + +响应: +```json +{ + "online": true, + "source": "tdx_relay", + "last_ping": "2026-06-12T14:29:55", + "age_seconds": 5, + "stocks_count": 15, + "fallback_active": false +} +``` + +--- + +## 六、当前实现状态 + +### 已完成 +- server.py `/api/update/realtime` 端点 ✅(v1.0) +- portfolio.json 写入 data_source 字段 ✅ + +### 待实现(知微负责) +1. relay_state.json 持久化 — 记录 relay 在线状态 +2. /api/relay/status GET 端点 — 中继状态查询 +3. price_monitor.py relay_active 检测 — 在线/离线判断 +4. price_monitor 回退逻辑 — relay 离线时用腾讯 API 兜底 + +### 待实现(小小莫负责) +1. tdx-relay 心跳上报 — 定期推送到 /api/update/realtime +2. 断线自动重连验证 — Windows 端长期稳定运行 +3. 行情覆盖检查 — 确保 17 只港股全量推送 + +--- + +## 七、港股代码列表(双方保持一致) + +当前 17 只港股(来自 portfolio.json + watchlist.json): + +| 代码 | 名称 | 持仓/自选 | +|------|------|----------| +| 00700 | 腾讯控股 | 持仓 | +| 00981 | 中芯国际 | 持仓 | +| 01211 | 比亚迪股份 | 持仓 | +| 09988 | 阿里巴巴 | 持仓 | +| 02202 | 万科企业 | 持仓 | +| 02388 | 中银香港 | 持仓 | +| 01478 | 丘钛科技 | 持仓 | +| 09868 | 小鹏集团 | 自选(已清仓) | +| 01088 | 中国神华 | 持仓 | +| 02359 | 药明康德 | 自选 | +| 01888 | 建滔积层板 | 自选 | +| 00968 | 信义光能 | 自选 | +| 01070 | TCL电子 | 自选 | +| 02318 | 中国平安 | 自选 | +| 02628 | 中国人寿 | 自选 | +| 06160 | 百济神州 | 自选(已清仓) | +| 06869 | 长飞光纤 | 自选 | + +--- + +## 八、故障处理 + +| 现象 | 可能原因 | 处理方式 | +|------|---------|---------| +| relay 显示离线 | Windows 端掉线 | 检查 Windows 端运行状态,双击 start_tdx_relay.bat | +| relay 在线但数据不更新 | 推送异常 | 查 Windows 端日志,重启 tdx-relay | +| 港股价格异常 | 数据源错乱 | 检查 data_source 字段,确认 relay 是否覆盖了错误数据 | +| 港股价格用腾讯旧数据 | relay 离线超过 60s 自动回退 | 正常行为,relay 恢复后自动切回 | diff --git a/cron_to_xmpp.py b/cron_to_xmpp.py new file mode 100644 index 0000000..3a5cfca --- /dev/null +++ b/cron_to_xmpp.py @@ -0,0 +1,359 @@ +#!/usr/bin/env python3 +"""cron_to_xmpp.py — 智能cron报告推送 + +只推送LLM驱动的分析报告(有实质内容),不推送纯脚本输出。 +关键规则: +1. 跳过 no_agent 脚本的输出(价格监控、数据同步等机器数据) +2. 跳过自己的输出目录(30908cdc44a8),避免循环推送 +3. 正文太短(<20字)或只有 [SILENT] 的不推 +4. 超时自动跳过,不影响后续 +""" +import json +import subprocess +import re +import sys +from datetime import datetime +from pathlib import Path + +# 使用绝对路径,不受 profile 环境变量影响 +REAL_HOME = Path("/home/hmo") + +# 扫描目录 +CRON_DIRS = [ + REAL_HOME / ".hermes" / "cron" / "output", + REAL_HOME / ".hermes" / "profiles" / "position-analyst" / "cron" / "output", +] +JOURNAL = REAL_HOME / ".hermes" / "cron" / ".relay_journal.json" +SILENT_STATS = REAL_HOME / ".hermes" / "cron" / ".silent_daily_count.json" +MAX_AGE_HOURS = 6 # 只推送6小时内的报告,防止清journal后爆历史 + + +def load_no_agent_job_ids(): + """从两个profile的jobs.json中读取所有no_agent=true的job ID""" + ids = set() + for jobs_path in [ + REAL_HOME / ".hermes" / "cron" / "jobs.json", + REAL_HOME / ".hermes" / "profiles" / "position-analyst" / "cron" / "jobs.json", + ]: + try: + with open(jobs_path) as f: + data = json.load(f) + for j in data.get("jobs", []): + if j.get("no_agent"): + ids.add(j["id"]) + except: + pass + return ids + + +# 硬编码保底(如果 jobs.json 读不到) +SKIP_DIRS = { + "30908cdc44a8", # cron-推XMPP中继自身输出 + "health", # 健康检查输出 +} + +FROM = "zhiwei@yoin.fun" +TO = "hmo@yoin.fun" + + +def load_journal(): + try: + return set(json.loads(JOURNAL.read_text())) + except: + return set() + + +def save_journal(entries): + JOURNAL.write_text(json.dumps(sorted(entries))) + + +def is_pure_script_output(content): + """判断文件是否是纯脚本的机器输出(不是LLM报告)""" + # LLM报告的特征:有 ## Response 节(包含agent的回复) + if "## Response" in content: + return False + # 以 # Cron Job: 开头但没有 ## Response 的可能是脚本输出 + if content.startswith("# Cron Job:"): + return True + # 价格监控的触发输出 + if content.startswith("🔔") and "⏱" in content: + return True + # 健康检查报告 + if "MoFin 系统健康检查" in content: + return True + # 结构化数据标签(价格监控的机器数据) + if "" in content: + return True + # no_agent 脚本的输出特征(Hermes自动添加的header) + if "**Mode:** no_agent (script)" in content: + return True + return False + + +def validate_report_body(body): + """质量检查 — 不拦截,返回改进建议""" + issues = [] + text = body.strip() + + if "重点推荐操作" not in text: + issues.append("缺少【重点推荐操作】区域(如无需操作可写「无」)") + + if "风险关注" not in text: + issues.append("缺少【风险关注】区域(如无风险可写「无」)") + + if len(text) > 600: + issues.append(f"报告偏长({len(text)}字),建议压缩到600字以内") + + fuzzy = re.findall(r"可关注|可考虑|建议观察|试试|谨慎关注|择机|根据情况", text) + if fuzzy: + issues.append(f"含模糊词: {', '.join(set(fuzzy))},建议替换为明确操作指令") + + if re.search(r"如果.*就.*如果.*就|若.*则.*若.*则", text): + issues.append("含选择题句式,建议只给一个确定建议") + + return issues + + +def send_feedback(issues, job_name): + """发送质量反馈给知微自己""" + from xml.sax.saxutils import escape + feedback = f"[自我反馈] 报告质量检查发现以下问题,下次注意:\n" + "\n".join(f"• {i}" for i in issues) + safe = escape(feedback) + stanza = ( + f"" + f"{safe}" + ) + try: + subprocess.run( + ["docker", "exec", "ejabberd", "ejabberdctl", + "send_stanza", FROM, FROM, stanza], + capture_output=True, timeout=10, text=True, + ) + except: + pass + + +def extract_body(path): + content = path.read_text(encoding="utf-8", errors="replace") + + if is_pure_script_output(content): + return None + + parts = content.split("## Response") + body = parts[1].strip() if len(parts) > 1 else content.strip() + body = re.sub(r'^#.*?\n', '', body, flags=re.MULTILINE).strip() + body = re.sub(r'\n?\s*.*?\s*', '', body, flags=re.DOTALL).strip() + body = re.sub(r'\*\*(.*?)\*\*', r'\1', body) + + # 去掉agent的思考过程("Now let me...", "Let me...", "Now I have..."等开头) + body = re.sub(r'^(Now let me|Let me|I need|I will|First let me|First,? I|Now I have|Here.i|I.ll|I.m ).*?\n\n', '', body, flags=re.DOTALL).strip() + # 去掉末尾的思考尾巴 + body = re.sub(r'\n\s*(Now I|This |I have |I used |The report|The data).*?$', '', body, flags=re.DOTALL).strip() + # 如果只剩"好的"、"收到"等短回应,丢弃 + if re.match(r'^[\u4e00-\u9fff,。]{1,10}$', body): + return None + + if not body: + return None + + # [SILENT] → 不推送(计数的逻辑在 scan() 中处理) + if "[SILENT]" in body: + return None + + if len(body) < 20: + return None + + return body + + +def send(body): + from xml.sax.saxutils import escape + safe = escape(f"【知微】{body}") + stanza = ( + f"" + f"{safe}" + ) + # 重试3次 + for attempt in range(3): + try: + r = subprocess.run( + ["docker", "exec", "ejabberd", "ejabberdctl", + "send_stanza", FROM, TO, stanza], + capture_output=True, timeout=10, text=True, + ) + if r.stderr and "error" in r.stderr.lower(): + print(f"send error (attempt {attempt+1}): {r.stderr.strip()[:100]}", file=sys.stderr) + if attempt < 2: + continue + return False + return r.returncode == 0 + except subprocess.TimeoutExpired: + print(f"send timeout (attempt {attempt+1})", file=sys.stderr) + if attempt < 2: + continue + return False + except Exception as e: + print(f"send err (attempt {attempt+1}): {e}", file=sys.stderr) + if attempt < 2: + continue + return False + return False + + +def validate_format(body): + """格式检查 — 只记录不拦截,标记改进点""" + text = body.strip() + issues = [] + + # 必含区域检查 + has_key = "重点推荐操作" in text + has_risk = "风险关注" in text + has_rest = "其余持仓" in text or "今日关注" in text + if not has_key: + issues.append("缺【重点推荐操作】区域") + if not has_risk: + issues.append("缺【风险关注】区域") + + # 超长提醒 + if len(text) > 600: + issues.append(f"报告偏长({len(text)}字),建议压缩到600字内") + + # 模糊词提醒 + fuzzy = re.findall(r"可关注|可考虑|建议观察|试试|谨慎关注|择机|根据情况", text) + if fuzzy: + issues.append(f"含模糊词({', '.join(list(set(fuzzy))[:3])}),应给唯一结论") + + # 选择题句式提醒 + if re.search(r"如果.*就|若.*则|可以.*也可以", text): + issues.append("含选择题句式,应给唯一建议") + + return text, issues # 始终通过,issues 为空就是干净 + + +def load_silent_stats(): + """加载当日静默统计""" + try: + return json.loads(SILENT_STATS.read_text()) + except: + return {"date": "", "silent": 0, "short": 0, "script": 0} + + +def save_silent_stats(stats): + SILENT_STATS.write_text(json.dumps(stats)) + + +def send_silent_summary(stats): + """发送当日静默报告汇总""" + parts = [] + if stats.get("silent", 0) > 0: + parts.append(f"静默[SILENT] {stats['silent']}次") + if stats.get("short", 0) > 0: + parts.append(f"过短(<20字) {stats['short']}次") + if stats.get("script", 0) > 0: + parts.append(f"脚本输出 {stats['script']}次") + + if not parts: + body = "【每日汇总】今日所有cron报告已正常送达,无被拦截的报告。" + else: + body = "【每日汇总】今日以下cron报告未送达(已拦截):\n" + "\n".join(f"• {p}" for p in parts) + "\n\n无操作信号的报告正常静默,有操作信号的都已送达。" + + send(body) + + +def scan(): + processed = load_journal() + new = set() + n_pushed = 0 + n_silent = 0 + n_short = 0 + n_script = 0 + no_agent_ids = load_no_agent_job_ids() + skip_all = SKIP_DIRS | no_agent_ids + + for cron_dir in CRON_DIRS: + if not cron_dir.exists(): + continue + + for d in sorted(cron_dir.iterdir()): + if not d.is_dir(): + continue + if d.name in skip_all: + continue + + for f in sorted(d.iterdir()): + if f.suffix != ".md": + continue + key = str(f.resolve()) + if key in processed or key in new: + continue + new.add(key) + + # 跳过超过MAX_AGE_HOURS小时的旧文件 + age_hours = (datetime.now() - datetime.fromtimestamp(f.stat().st_mtime)).total_seconds() / 3600 + if age_hours > MAX_AGE_HOURS: + continue + + content = f.read_text(encoding="utf-8", errors="replace") + + # 提前判断脚本输出 + if is_pure_script_output(content): + n_script += 1 + continue + + parts = content.split("## Response") + body = parts[1].strip() if len(parts) > 1 else content.strip() + body = re.sub(r'^#.*?\n', '', body, flags=re.MULTILINE).strip() + body = re.sub(r'\n?\s*.*?\s*', '', body, flags=re.DOTALL).strip() + body = re.sub(r'\*\*(.*?)\*\*', r'\1', body) + + if not body: + n_short += 1 + continue + + # SILENT → 拦截,记数(在长度检查之前,因为 [SILENT] 只有8字符) + if "[SILENT]" in body: + n_silent += 1 + continue + + if len(body) < 20: + n_short += 1 + continue + + # 格式校验 — 记录改进点,不拦截 + ok_body, issues = validate_format(body) + + n_pushed += 1 + ok_sent = send(body) + if not ok_sent: + print(f" {d.name}: send failed", file=sys.stderr) + if issues: + print(f" {d.name}/{f.name}: 改进建议: {'; '.join(issues)}", file=sys.stderr) + + if new: + save_journal(processed | new) + + # 保存当日汇总到文件(供16:30汇总用) + today = datetime.now().strftime("%Y-%m-%d") + stats = load_silent_stats() + if stats.get("date") != today: + stats = {"date": today, "silent": 0, "short": 0, "script": 0} + stats["silent"] += n_silent + stats["short"] += n_short + stats["script"] += n_script + save_silent_stats(stats) + + # 16:30~16:35 发送当日汇总(收盘后) + now = datetime.now() + hhmm = now.hour * 60 + now.minute + if 990 <= hhmm <= 995: # 16:30~16:35 + send_silent_summary(stats) + + log = f"推送{n_pushed}份,静默拦截{n_silent}份,过短{n_short}份,跳过脚本{n_script}份" + print(log, file=sys.stderr) + return n_pushed + + +if __name__ == "__main__": + scan() diff --git a/regenerate_strategies.py b/regenerate_strategies.py new file mode 100644 index 0000000..16c2856 --- /dev/null +++ b/regenerate_strategies.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +"""批量再生所有持仓+自选策略,结合技术面支撑/压力位""" + +import json +import sys +sys.path.insert(0, '/home/hmo/web-dashboard') + +from technical_analysis import full_analysis +from strategy_lifecycle import reassess_strategy + +PF = '/home/hmo/web-dashboard/data/portfolio.json' +WL = '/home/hmo/web-dashboard/data/watchlist.json' + +def main(): + # 持仓 + pf = json.load(open(PF)) + for s in pf['holdings']: + code = s['code'] + name = s['name'] + price = s.get('price', 0) + cost = s.get('cost', 0) + shares = s.get('shares', 0) + + if not price: + continue + + print(f" {name}({code}) 现价{price} 成本{cost}...", end=' ') + + try: + tech = full_analysis(code) + except: + tech = None + + result = reassess_strategy( + code, name, price, cost, shares, + current_action=s.get('analysis', {}).get('action', '') + ) + + if 'analysis' not in s: + s['analysis'] = {} + s['analysis']['stop_loss'] = result['stop_loss'] + s['analysis']['take_profit'] = result['take_profit'] + s['analysis']['entry_low'] = result['entry_low'] + s['analysis']['entry_high'] = result['entry_high'] + s['analysis']['action'] = result['action'] + s['analysis']['status'] = result['status'] + s['analysis']['reassessed_at'] = result['reassessed_at'] + + print(f"损{result['stop_loss']} 盈{result['take_profit']} 区{result['entry_low']}~{result['entry_high']}") + + json.dump(pf, open(PF, 'w'), ensure_ascii=False, indent=2) + print(f"\n持仓策略已更新: {len(pf['holdings'])} 条") + + # 自选股 - 简单重新计算买入区 + wl = json.load(open(WL)) + updated = 0 + for s in wl['stocks']: + code = s['code'] + price = s.get('price', 0) + if not price: + continue + tech = None + try: + tech = full_analysis(code) + except: + pass + + # 买入区 = 弱支撑~弱压力 + if tech: + sr = tech.get('support_resistance', {}) + ws = sr.get('weak_support') or price * 0.95 + wr = sr.get('weak_resist') or price * 1.05 + else: + ws = price * 0.92 + wr = price * 1.08 + + if 'analysis' not in s: + s['analysis'] = {} + s['analysis']['buy_low'] = round(ws, 2) + s['analysis']['buy_high'] = round(wr, 2) + if tech: + s['analysis']['tech_levels'] = { + 'strong_support': sr.get('strong_support'), + 'weak_support': sr.get('weak_support'), + 'weak_resist': sr.get('weak_resist'), + 'strong_resist': sr.get('strong_resist'), + } + updated += 1 + print(f" {s['name']}({code}) 买入区={ws:.2f}~{wr:.2f}") + + json.dump(wl, open(WL, 'w'), ensure_ascii=False, indent=2) + print(f"\n自选策略已更新: {updated} 条") + print("\n✅ 全部策略再生完成") + +if __name__ == '__main__': + main() diff --git a/server.py b/server.py new file mode 100644 index 0000000..35fe79a --- /dev/null +++ b/server.py @@ -0,0 +1,910 @@ +#!/usr/bin/env python3 +"""MoFin Dashboard - 莫荷持仓情报可视化系统""" + +import base64 +import json +import os +import re +import uuid +import urllib.request +from datetime import datetime +from pathlib import Path + +from flask import Flask, jsonify, send_from_directory, request + +# 提示词管理模块 +from prompt_manager.dashboard_views import register_routes + +app = Flask(__name__, static_folder="static", static_url_path="") + +DATA_DIR = Path(__file__).parent / "data" +UPLOAD_DIR = Path(__file__).parent / "uploads" + +# Hermes Gateway +GATEWAY = "http://localhost:8642/v1/chat/completions" +API_KEY = "hermes123" + + +def _load_json(path, default=None): + try: + with open(path, encoding="utf-8") as f: + return json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + return {} if default is None else default + + +def _save_json(path, data): + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + +# ── API 路由 ────────────────────────────────────────── + +@app.route("/") +def index(): + return send_from_directory(app.static_folder, "index.html") + + +@app.route("/api/portfolio") +def api_portfolio(): + """持仓列表""" + data = _load_json(DATA_DIR / "portfolio.json") + return jsonify(data) + + +@app.route("/api/watchlist") +def api_watchlist(): + """自选列表""" + data = _load_json(DATA_DIR / "watchlist.json") + return jsonify(data) + + +@app.route("/api/overview") +def api_overview(): + """概览数据""" + portfolio = _load_json(DATA_DIR / "portfolio.json", []) + market = _load_json(DATA_DIR / "market.json", {}) + alerts = _load_json(DATA_DIR / "alerts.json", []) + + total_assets = portfolio.get("total_assets", 0) + stock_value = portfolio.get("stock_value", 0) + cash = portfolio.get("cash", 0) + position_pct = portfolio.get("position_pct", 0) + total_pnl = portfolio.get("total_pnl", 0) + holdings = portfolio.get("holdings", []) + + top_movers = sorted( + [h for h in holdings if abs(h.get("change_pct", 0)) >= 3], + key=lambda x: abs(x.get("change_pct", 0)), + reverse=True, + )[:5] + + return jsonify({ + "total_assets": total_assets, + "stock_value": stock_value, + "cash": cash, + "position_pct": position_pct, + "total_pnl": total_pnl, + "top_movers": top_movers, + "market": market, + "alerts": alerts[:10], + "updated_at": portfolio.get("updated_at", ""), + }) + + +@app.route("/api/reports") +def api_reports(): + """历史报告列表""" + reports_dir = DATA_DIR / "reports" + reports = [] + if reports_dir.exists(): + for f in sorted(reports_dir.iterdir(), reverse=True)[:100]: + if f.suffix == ".json": + data = _load_json(f) + reports.append({ + "id": f.stem, + "title": data.get("title", f.stem), + "type": data.get("type", "未知"), + "created_at": data.get("created_at", ""), + "summary": data.get("summary", ""), + }) + return jsonify(reports) + + +@app.route("/api/report/") +def api_report(report_id): + """单个报告详情""" + # Try exact file first + path = DATA_DIR / "reports" / f"{report_id}.json" + if path.exists(): + return jsonify(_load_json(path)) + # Try prefix match + reports_dir = DATA_DIR / "reports" + if reports_dir.exists(): + for f in reports_dir.iterdir(): + if f.stem.startswith(report_id) and f.suffix == ".json": + return jsonify(_load_json(f)) + return jsonify({"error": "report not found"}), 404 + + +@app.route("/api/stock/") +def api_stock(code): + """个股详情 + 操作建议历史""" + stock_data = _load_json(DATA_DIR / "stocks" / f"{code}.json", {}) + return jsonify(stock_data) + + +@app.route("/api/market") +def api_market(): + """市场观察""" + data = _load_json(DATA_DIR / "market.json", {}) + return jsonify(data) + + +# ── 数据写入API(供 cron/update_data.py 调用) ────────── + +@app.route("/api/update/portfolio", methods=["POST"]) +def update_portfolio(): + data = request.get_json(force=True) + _save_json(DATA_DIR / "portfolio.json", data) + return jsonify({"status": "ok"}) + + +@app.route("/api/update/watchlist", methods=["POST"]) +def update_watchlist(): + data = request.get_json(force=True) + _save_json(DATA_DIR / "watchlist.json", data) + return jsonify({"status": "ok"}) + + +@app.route("/api/update/report", methods=["POST"]) +def update_report(): + data = request.get_json(force=True) + report_id = data.pop("_id", datetime.now().strftime("%Y%m%d_%H%M%S")) + data["created_at"] = data.get("created_at", datetime.now().isoformat()) + _save_json(DATA_DIR / "reports" / f"{report_id}.json", data) + return jsonify({"status": "ok", "id": report_id}) + + +@app.route("/api/update/stock/", methods=["POST"]) +def update_stock(code): + data = request.get_json(force=True) + existing = _load_json(DATA_DIR / "stocks" / f"{code}.json", {}) + history = existing.get("history", []) + if data.get("entry"): + history.append({ + "time": datetime.now().isoformat(), + "price": data.get("price"), + "recommendation": data.get("recommendation"), + "stop_loss": data.get("stop_loss"), + "take_profit": data.get("take_profit"), + "reason": data.get("reason"), + }) + existing.update(data) + existing["history"] = history[-50:] + _save_json(DATA_DIR / "stocks" / f"{code}.json", existing) + return jsonify({"status": "ok"}) + + +@app.route("/api/update/market", methods=["POST"]) +def update_market(): + data = request.get_json(force=True) or {} + _save_json(DATA_DIR / "market.json", data) + return jsonify({"status": "ok"}) + + +# ── 知微分析结果写入API ── +@app.route("/api/analysis/batch", methods=["POST"]) +def analysis_batch(): + """接收知微cron的分析结果,写回持仓/自选JSON的analysis字段""" + data = request.get_json(force=True) or {} + + # 更新持仓 + if "holdings" in data: + pf = _load_json(DATA_DIR / "portfolio.json", {}) + idx = {h["code"]: i for i, h in enumerate(pf.get("holdings", []))} + for item in data["holdings"]: + code = item.get("code", "") + if code not in idx: + continue + h = pf["holdings"][idx[code]] + h["analysis"] = { + "suggestion": item.get("suggestion"), + "stop_loss": item.get("stop_loss"), + "take_profit": item.get("take_profit"), + "buy_zone_low": item.get("buy_zone_low"), + "buy_zone_high": item.get("buy_zone_high"), + "position_suggested": item.get("position_suggested"), + "reason": item.get("reason"), + "updated_at": datetime.now().isoformat(), + } + _save_json(DATA_DIR / "portfolio.json", pf) + + # 更新自选 + if "watchlist" in data: + wl = _load_json(DATA_DIR / "watchlist.json", {}) + idx = {s["code"]: i for i, s in enumerate(wl.get("stocks", []))} + for item in data["watchlist"]: + code = item.get("code", "") + if code not in idx: + continue + s = wl["stocks"][idx[code]] + s["analysis"] = { + "buy_low": item.get("buy_low"), + "buy_high": item.get("buy_high"), + "position_recommend": item.get("position_recommend"), + "reason": item.get("reason"), + "updated_at": datetime.now().isoformat(), + } + _save_json(DATA_DIR / "watchlist.json", wl) + + return jsonify({"status": "ok", "updated_at": datetime.now().isoformat()}) + + +# ── 操作决策库API ── +@app.route("/api/decisions", methods=["GET"]) +def get_decisions(): + """返回决策库数据,统一新旧格式""" + raw = _load_json(DATA_DIR / "decisions.json", {"decisions": []}) + decisions = raw.get("decisions", []) + if not decisions and isinstance(raw, list): + decisions = raw + + # portfolio 用来判断是持仓还是自选 + portfolio = _load_json(DATA_DIR / "portfolio.json", {"holdings": []}) + watchlist = _load_json(DATA_DIR / "watchlist.json", {"stocks": []}) + holding_codes = {h.get("code","") for h in portfolio.get("holdings",[])} + watch_codes = {s.get("code","") for s in watchlist.get("stocks",[])} + + normalized = [] + for d in decisions: + if not isinstance(d, dict): + continue + + # 检测新旧格式:新格式有 stop_loss 顶层字段,旧格式有 trigger 对象 + is_new = "stop_loss" in d and "trigger" not in d + + if is_new: + code = d.get("code", "") + name = d.get("name", "") + price = d.get("price", 0) + sl = d.get("stop_loss") + tp = d.get("take_profit") + el = d.get("entry_low") + eh = d.get("entry_high") + ts = d.get("tech_snapshot", "") + + # type: 持仓还是自选 + if code in holding_codes: + dtype = "持仓策略" + elif code in watch_codes: + dtype = "自选策略" + else: + dtype = "—" + + # 判断 active + status_raw = d.get("status", "") + status = "active" if status_raw in ("active", "updated", "") else "superseded" + + # trigger 对象 + entry_zone_str = "" + if el and eh: + entry_zone_str = f"¥{el}~¥{eh}" + elif el: + entry_zone_str = f"≥¥{el}" + + trigger = {} + if sl: + trigger["stop_loss"] = f"¥{sl}" if isinstance(sl, (int,float)) else str(sl) + if tp: + trigger["take_profit"] = f"¥{tp}" if isinstance(tp, (int,float)) else str(tp) + if entry_zone_str: + trigger["entry_zone"] = entry_zone_str + + # current + current = "" + if price: + current = f"现价¥{price}" if code and not code.startswith(("0","1")) else f"¥{price}" + + # zone_breach + zone_breach = d.get("zone_breach", "") + + # updated_reason + note = d.get("note", "") + timing = d.get("timing_signal", "") + reason_parts = [] + if note: + reason_parts.append(note) + if timing and timing != "neutral": + reason_parts.append(f"时机:{timing}") + if d.get("rr_ratio"): + reason_parts.append(f"盈亏比:{d['rr_ratio']}") + + # advice_timeline - 从新格式重建 + timeline = [] + + entry = { + "code": code, + "name": name, + "type": dtype, + "status": status, + "tag": d.get("tag", ""), + "action": d.get("action", ""), + "trigger": trigger, + "current": current, + "zone_breach": zone_breach, + "updated_reason": " | ".join(reason_parts) if reason_parts else "", + "advice_timeline": timeline, + "changelog": d.get("changelog", []), + "execution": d.get("execution", {}), + "analysis": d.get("analysis", {}), + "tech_snapshot": ts, + "timestamp": d.get("timestamp", ""), + "updated_by": "知微", + } + # 保留原始数据供前端扩展 + entry["_raw_action"] = d.get("action", "") + normalized.append(entry) + else: + # 旧格式:已有 trigger 等字段,直接保留 + entry = dict(d) + # 确保 status 正确 + if entry.get("status") not in ("active", "superseded"): + entry["status"] = "active" + if not entry.get("type"): + code = entry.get("code", "") + if code in holding_codes: + entry["type"] = "持仓策略" + elif code in watch_codes: + entry["type"] = "自选策略" + else: + entry["type"] = "—" + normalized.append(entry) + + # 添加 execution 和 analysis 信息,按执行状态排序 + for n in normalized: + code = n.get("code", "") + # 从原始数据中找到 execution 和 analysis + raw_entry = next((d for d in decisions if isinstance(d, dict) and d.get("code") == code), {}) + n["execution"] = raw_entry.get("execution", {"status": "none"}) + n["analysis"] = raw_entry.get("analysis", {}) + + # 排序规则:推荐>执行中>观察>无标签 + def sort_key(x): + tag = x.get("tag", "") + exec_status = x.get("execution", {}).get("status", "none") + # 标签优先级(current_recommend才靠前,active_manual只是记录不升序) + tag_order = {"current_recommend": 0} + tag_priority = tag_order.get(tag, 50) + # 执行状态优先级 + exec_order = {"partial_exit": 0, "executing": 1, "observing": 2, "none": 99} + exec_priority = exec_order.get(exec_status, 99) + # 组合:先按标签排,再按执行状态排 + return (tag_priority, exec_priority, x.get("code", "")) + + normalized.sort(key=sort_key) + + return jsonify({ + "decisions": normalized, + "total": len(normalized), + "regenerated_at": raw.get("regenerated_at", ""), + }) + + +@app.route("/api/decisions/add", methods=["POST"]) +def add_decision(): + """新增/更新一条决策(新格式)""" + data = request.get_json(force=True) or {} + code = data.get("code", "") + if not code: + return jsonify({"status": "error", "message": "code required"}), 400 + + d = _load_json(DATA_DIR / "decisions.json", {"decisions": []}) + + # 同一股票旧决策标记为superseded + for e in d["decisions"]: + if e["code"] == code and e.get("status") in ("active", "updated"): + e["status"] = "superseded" + + entry = { + "code": code, + "name": data.get("name", ""), + "price": data.get("price", 0), + "action": data.get("action", ""), + "stop_loss": data.get("stop_loss"), + "take_profit": data.get("take_profit"), + "entry_low": data.get("entry_low"), + "entry_high": data.get("entry_high"), + "tech_snapshot": data.get("tech_snapshot", ""), + "timing_signal": data.get("timing_signal", ""), + "rr_ratio": data.get("rr_ratio"), + "tag": data.get("tag", ""), + "note": data.get("note", ""), + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"), + "updated_reason": data.get("updated_reason", ""), + "status": "updated", + "changelog": data.get("changelog", []), + "execution": data.get("execution", {"status": "none"}), + "analysis": data.get("analysis", {}), + } + d["decisions"].append(entry) + _save_json(DATA_DIR / "decisions.json", d) + return jsonify({"status": "ok", "entry": entry}) + + +@app.route("/api/decisions/tag", methods=["POST"]) +def set_decision_tag(): + """设置/清除某只股票的推荐标签""" + data = request.get_json(force=True) or {} + code = data.get("code", "") + tag = data.get("tag", "") # 'current_recommend', 'active_manual', or '' to clear + if not code: + return jsonify({"status": "error", "message": "code required"}), 400 + + d = _load_json(DATA_DIR / "decisions.json", {"decisions": []}) + found = False + for e in d.get("decisions", []): + if e.get("code") == code: + e["tag"] = tag + e["tag_updated"] = datetime.now().isoformat() + found = True + break + + if not found: + return jsonify({"status": "error", "message": f"stock {code} not found"}), 404 + + _save_json(DATA_DIR / "decisions.json", d) + return jsonify({"status": "ok", "code": code, "tag": tag}) + + +@app.route("/api/decisions/pending") +def get_pending_decisions(): + """返回所有有未确认建议的条目""" + d = _load_json(DATA_DIR / "decisions.json", {"decisions": []}) + pending = [] + for entry in d["decisions"]: + timeline = entry.get("advice_timeline", []) + unconfirmed = [a for a in timeline if a.get("status") in (None, "pending")] + if unconfirmed: + pending.append({ + "code": entry["code"], + "name": entry["name"], + "current": entry.get("current", ""), + "pending_advice": unconfirmed, + }) + return jsonify(pending) + + +@app.route("/api/advice/record", methods=["POST"]) +def record_advice(): + """记录一条分析建议到 decisions.json 的 advice_timeline""" + data = request.get_json(force=True) or {} + code = data.get("code", "") + if not code: + return jsonify({"status": "error", "message": "code required"}), 400 + + d = _load_json(DATA_DIR / "decisions.json", {"decisions": []}) + + entry = None + for e in d["decisions"]: + if e["code"] == code and e["status"] == "active": + entry = e + break + + if not entry: + return jsonify({"status": "error", "message": f"no active decision for {code}"}), 404 + + timeline = entry.setdefault("advice_timeline", []) + advice = { + "date": datetime.now().strftime("%Y-%m-%d %H:%M"), + "direction": data.get("direction", "持有"), + "price": data.get("price", ""), + "summary": data.get("summary", ""), + "status": "pending", + } + timeline.append(advice) + _save_json(DATA_DIR / "decisions.json", d) + return jsonify({"status": "ok", "advice": advice}) + + +@app.route("/api/advice/confirm", methods=["POST"]) +def confirm_advice(): + """确认/忽略一条建议""" + data = request.get_json(force=True) or {} + code = data.get("code", "") + idx = data.get("index", -1) + action = data.get("action", "confirmed") # confirmed | ignored + + d = _load_json(DATA_DIR / "decisions.json", {"decisions": []}) + for e in d["decisions"]: + if e["code"] == code and e["status"] == "active": + timeline = e.get("advice_timeline", []) + if 0 <= idx < len(timeline): + timeline[idx]["status"] = action + _save_json(DATA_DIR / "decisions.json", d) + return jsonify({"status": "ok"}) + return jsonify({"status": "error", "message": "not found"}), 404 + + +# ── 准确率统计API ── +@app.route("/api/stats/accuracy") +def get_accuracy_stats(): + data = _load_json(DATA_DIR / "accuracy_stats.json", {}) + return jsonify(data) + + +# ── 策略评估API ── +@app.route("/api/evaluation") +def get_evaluation(): + """返回所有策略的双维度评估结果""" + # 主数据源:evaluation.json + eval_data = _load_json(DATA_DIR / "evaluation.json", {}) + strategies = eval_data.get("strategies", []) + if strategies: + return jsonify(strategies) + + # 备选:从 decisions.json 的 evaluation 字段读取(尚未反写时的兼容) + decisions = _load_json(DECISIONS_PATH if 'DECISIONS_PATH' in dir() else DATA_DIR / "decisions.json", {"decisions": []}) + evals = [] + for d in decisions.get("decisions", []): + e = d.get("evaluation", []) + if e: + evals.append({ + "code": d["code"], + "name": d["name"], + "type": d.get("type", ""), + "current": d.get("current", ""), + "evaluations": e, + }) + return jsonify(evals) + + +@app.route("/api/evaluation/trigger", methods=["POST"]) +def trigger_evaluation(): + """手动触发策略评估""" + import subprocess + try: + r = subprocess.run( + ["python3", str(DATA_DIR.parent / "strategy_evaluator.py")], + capture_output=True, timeout=60, text=True, + ) + return jsonify({"status": "ok", "output": r.stdout, "error": r.stderr}) + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 + + +# ── 策略反馈API ── +@app.route("/api/feedback") +def get_feedback(): + data = _load_json(DATA_DIR / "strategy_feedback.json", {}) + return jsonify(data) + + +# ── 持仓截图上传与解析 ──────────────────────────────── + + +@app.route("/upload") +def upload_page(): + return send_from_directory(app.static_folder, "upload.html") + + +def _ocr_image(image_path): + """用Tesseract OCR提取图片中的文字(预处理优化中文表格识别)""" + from PIL import Image, ImageEnhance, ImageFilter + import pytesseract + + img = Image.open(image_path) + + # 预处理:放大 + 锐化 + 二值化,提升小字识别率 + w, h = img.size + if w < 2000 or h < 2000: + scale = max(2, 2000 // min(w, h)) + img = img.resize((w * scale, h * scale), Image.LANCZOS) + + # 转灰度 + img = img.convert("L") + + # 增强对比度 + enhancer = ImageEnhance.Contrast(img) + img = enhancer.enhance(2.0) + + # 锐化 + img = img.filter(ImageFilter.SHARPEN) + + # 二值化(自适应阈值) + threshold = 128 + img = img.point(lambda x: 255 if x > threshold else 0) + + # OCR:chip_sim+eng,PSM 6(统一文本块) + text = pytesseract.image_to_string( + img, + lang="chi_sim+eng", + config="--psm 6 --oem 3", + ) + return text.strip() + + +ANALYZE_PROMPT = """你是股票持仓数据分析助手。以下是用户上传的持仓/自选截图经过OCR提取的文字,请从中提取所有股票信息。 + +判断这是「持仓截图」还是「自选截图」: +- 持仓截图:每支股票有"证券数量"(持股数)、成本价、盈亏 +- 自选截图:只有股票列表和价格,没有持股数/成本 + +股票代码格式: +- A股:6位数字(如 600519, 000858, 300750) +- 港股:纯数字代码(如 0700, 3690, 1211),不带HK前缀 + +⚠️ 重要:截图顶部通常有汇总数据,如总资产、股票市值、可用资金、当日盈亏等。 +如果OCR文字中有这些汇总数字,请一并提取到JSON的summary字段中。 +不要自己计算汇总值,直接从OCR原文中提取。 + +请严格按照以下JSON格式回复,只输出JSON: + +```json +{ + "type": "portfolio" 或 "watchlist", + "summary": { + "total_assets": "总资产数字(可选,从截图中提取)", + "stock_value": "股票市值/持仓市值数字(可选,从截图中提取)", + "cash": "可用资金/现金数字(可选,从截图中提取)", + "day_pnl": "当日盈亏金额(可选,从截图中提取)" + }, + "stocks": [ + { + "code": "股票代码", + "name": "股票名称(中文)", + "price": "现价(数字)", + "shares": "持股数量(数字,持仓截图才有)", + "cost": "成本价(数字,持仓截图才有)", + "pnl": "盈亏百分比如+15.1%(持仓截图才有)", + "position_pct": "仓位占比数字如12.5(可选)" + } + ] +} +``` + +OCR原文: +""" + + +@app.route("/api/upload/analyze", methods=["POST"]) +def upload_analyze(): + """接收图片,OCR提取文字 → LLM解析结构化数据""" + if "image" not in request.files: + return jsonify({"error": "请上传图片"}), 400 + + f = request.files["image"] + if not f.filename: + return jsonify({"error": "空文件"}), 400 + + # 保存到临时目录 + UPLOAD_DIR.mkdir(parents=True, exist_ok=True) + ext = Path(f.filename).suffix or ".png" + save_path = UPLOAD_DIR / f"{uuid.uuid4().hex}{ext}" + f.save(str(save_path)) + + try: + # 第一步:OCR提取文字 + raw_text = _ocr_image(str(save_path)) + if not raw_text: + return jsonify({"error": "OCR未识别到文字,请确认图片清晰"}), 400 + except Exception as e: + os.unlink(str(save_path)) + return jsonify({"error": f"OCR失败: {e}"}), 500 + + # 第二步:LLM解析结构化数据(走文本API,不走视觉) + llm_text = _llm_parse(raw_text, ANALYZE_PROMPT) + + os.unlink(str(save_path)) + + # 从LLM回复中提取JSON + json_match = re.search(r"```(?:json)?\s*({.*?})\s*```", llm_text, re.DOTALL) + if json_match: + try: + parsed = json.loads(json_match.group(1)) + except json.JSONDecodeError: + return jsonify({"error": f"LLM解析JSON失败: {llm_text[:500]}"}), 500 + else: + # 尝试直接找JSON(没被代码块包裹) + try: + parsed = json.loads(llm_text) + except json.JSONDecodeError: + return jsonify({"error": f"未提取到结构化数据: {raw_text[:300]}...\n\nLLM回复: {llm_text[:500]}"}), 500 + + return jsonify(parsed) + + +def _llm_parse(text, prompt_template): + """发送OCR文本到Hermes LLM解析,返回JSON字符串""" + payload = json.dumps({ + "model": "hermes-agent", + "messages": [ + {"role": "system", "content": "你是一个数据提取助手。从OCR文字中提取结构化JSON数据。"}, + {"role": "user", "content": prompt_template + "\n" + text}, + ], + "max_tokens": 4096, + }).encode() + + req = urllib.request.Request(GATEWAY, data=payload, method="POST") + req.add_header("Content-Type", "application/json") + req.add_header("Authorization", f"Bearer {API_KEY}") + req.add_header("X-Hermes-Session-Id", "upload-ocr-parse") + + try: + resp = urllib.request.urlopen(req, timeout=120) + data = json.loads(resp.read()) + return data.get("choices", [{}])[0].get("message", {}).get("content", "") + except Exception as e: + return f"ERROR: {e}" + + +@app.route("/api/upload/confirm", methods=["POST"]) +def upload_confirm(): + """确认解析结果,更新数据文件""" + data = request.get_json(force=True) + stocks = data.get("stocks", []) + doc_type = data.get("type", "portfolio") + + # 尝试获取实时行情补充数据 + try: + codes = [s["code"] for s in stocks if s.get("code")] + if codes: + qs = " ".join( + f"hk{c}" if len(c) == 5 # 港股5位代码 + else f"sz{c}" if c.startswith("0") or c.startswith("3") + else f"sh{c}" if c.startswith("6") + else f"hk{c}" + for c in codes + ) + url = f"https://qt.gtimg.cn/q={qs}" + req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) + resp = urllib.request.urlopen(req, timeout=10) + qt_text = resp.read().decode("gbk", errors="replace") + # map realtime prices + for stock in stocks: + code = stock.get("code", "") + prefix = "hk" if len(code) == 5 else "sz" if code.startswith(("0","3")) else "sh" if code.startswith("6") else "hk" + # 腾讯 API 格式: prefix+code="市场~名称~代码~当前价~昨收~今开~成交量~..." + m = re.search(rf'{prefix}{code}="([^"]+)"', qt_text) + if m: + fields = m.group(1).split('~') + name = fields[1] + price = fields[3] # 当前价 + if not stock.get("price"): + stock["price"] = price + if not stock.get("name"): + stock["name"] = name + except: + pass # 行情获取失败不影响主流程 + + # 更新对应数据文件 + if doc_type == "portfolio": + existing = _load_json(DATA_DIR / "portfolio.json", {}) + old_holdings = {h["code"]: h for h in existing.get("holdings", []) if h.get("code")} + new_holdings = [] + for s in stocks: + code = s.get("code", "") + old = old_holdings.get(code, {}) + new_shares = int(s["shares"]) if str(s.get("shares", "")).lstrip('-').isdigit() else old.get("shares", 0) + old_shares = old.get("shares", 0) + # 股数突变检测:旧200→新0是合理卖出,但旧0→新200可能是OCR错读 + if old_shares > 0 and new_shares == 0 and old_shares != new_shares: + print(f"[仓位变动] {code} {s.get('name','')}: {old_shares}→{new_shares} (卖出清仓)") + elif abs(new_shares - old_shares) > max(old_shares * 0.5, 100) and old_shares > 0: + print(f"[仓位变动] {code} {s.get('name','')}: {old_shares}→{new_shares} (变动较大)") + new_holdings.append({ + "code": code, + "name": s.get("name") or old.get("name", ""), + "shares": new_shares, + "price": float(s.get("price", 0)) or old.get("price", 0), + "cost": float(s.get("cost", 0)) if s.get("cost") else old.get("cost", 0), + "pnl": s.get("pnl") or old.get("pnl", ""), + "position_pct": float(s.get("position_pct", 0)) if s.get("position_pct") else old.get("position_pct", 0), + "change_pct": old.get("change_pct", 0), + }) + existing["holdings"] = new_holdings + + # 使用截图中的汇总数据(优先),没有则用旧数据 + summary = data.get("summary", {}) + if summary.get("stock_value"): + existing["stock_value"] = float(summary["stock_value"]) + else: + existing["stock_value"] = round( + sum(h["shares"] * h["price"] for h in existing["holdings"]), 2 + ) + if summary.get("cash"): + existing["cash"] = float(summary["cash"]) + if summary.get("total_assets"): + existing["total_assets"] = float(summary["total_assets"]) + else: + existing["total_assets"] = existing["stock_value"] + existing.get("cash", 0) + if summary.get("day_pnl"): + existing["day_pnl"] = float(summary["day_pnl"]) + existing["updated_at"] = datetime.now().isoformat() + # 计算仓位% + if existing["total_assets"] > 0: + existing["position_pct"] = round(existing["stock_value"] / existing["total_assets"] * 100, 2) + _save_json(DATA_DIR / "portfolio.json", existing) + msg = f"更新了 {len(stocks)} 只持仓股" + + elif doc_type == "watchlist": + existing = _load_json(DATA_DIR / "watchlist.json", {}) + existing["stocks"] = [ + { + "code": s.get("code", ""), + "name": s.get("name", ""), + "price": float(s.get("price", 0)) if s.get("price") else 0, + } + for s in stocks + ] + existing["updated_at"] = datetime.now().isoformat() + _save_json(DATA_DIR / "watchlist.json", existing) + msg = f"更新了 {len(stocks)} 只自选股" + + else: + return jsonify({"error": f"未知类型: {doc_type}"}), 400 + + return jsonify({"status": "ok", "message": msg}) + + +# ── TDX中继实时行情接收API ── +@app.route("/api/update/realtime", methods=["POST"]) +def update_realtime(): + """接收小小莫中继的实时行情数据""" + data = request.get_json(force=True) or {} + stocks = data.get("stocks", []) + source = data.get("source", "unknown") + + if not stocks: + return jsonify({"status": "error", "message": "没有股票数据"}), 400 + + # 更新 portfolio.json 中的实时价格(change_pct字段) + pf = _load_json(DATA_DIR / "portfolio.json", {"holdings": []}) + pf_holdings = {h["code"]: h for h in pf.get("holdings", [])} + + updated = 0 + for s in stocks: + code = s.get("code", "") + if code in pf_holdings: + pf_holdings[code]["price"] = float(s.get("price", pf_holdings[code].get("price", 0))) + pf_holdings[code]["change_pct"] = float(s.get("change_pct", 0)) + pf_holdings[code]["high"] = float(s.get("high", 0)) + pf_holdings[code]["low"] = float(s.get("low", 0)) + pf_holdings[code]["open"] = float(s.get("open", 0)) + pf_holdings[code]["volume"] = int(s.get("volume", 0)) + pf_holdings[code]["data_source"] = source + pf_holdings[code]["updated_at"] = datetime.now().isoformat() + updated += 1 + + # 也更新 watchlist.json + wl = _load_json(DATA_DIR / "watchlist.json", {"stocks": []}) + wl_stocks = {s["code"]: s for s in wl.get("stocks", [])} + + for s in stocks: + code = s.get("code", "") + if code in wl_stocks: + wl_stocks[code]["price"] = float(s.get("price", wl_stocks[code].get("price", 0))) + wl_stocks[code]["change_pct"] = float(s.get("change_pct", 0)) + + pf["updated_at"] = datetime.now().isoformat() + wl["updated_at"] = datetime.now().isoformat() + _save_json(DATA_DIR / "portfolio.json", pf) + _save_json(DATA_DIR / "watchlist.json", wl) + + return jsonify({ + "status": "ok", + "updated": updated, + "source": source, + "timestamp": datetime.now().isoformat(), + }) + + +# 注册提示词管理路由 +register_routes(app) + + +if __name__ == "__main__": + port = int(os.environ.get("PORT", 8899)) + print(f"🚀 MoFin Dashboard → http://0.0.0.0:{port}") + app.run(host="0.0.0.0", port=port, debug=False) \ No newline at end of file diff --git a/session_to_cron_bridge.py b/session_to_cron_bridge.py new file mode 100644 index 0000000..76d7621 --- /dev/null +++ b/session_to_cron_bridge.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +"""session_to_cron_bridge.py — 将Hermes session DB中的cron报告写到cron/output目录 + +Hermes cron jobs(如快速盯盘)将LLM输出存在 session DB (state.db) 中。 +cron_to_xmpp.py 扫描 ~/.hermes/cron/output/ 目录的 .md 文件推送到XMPP。 +这个脚本弥补这个缺口:从state.db读取最新的cron输出,生成.md文件。 + +工作方式: +1. 查询 state.db 中最近的 cron 会话(source='cron') +2. 提取 assistant 的最后一条非空消息 +3. 与 relay journal 对比去重 +4. 新消息写入 cron/output// 目录 +5. cron_to_xmpp.py 自然捡起并推送 +""" + +import json +import sqlite3 +import subprocess +import re +import sys +from datetime import datetime +from pathlib import Path + +REAL_HOME = Path("/home/hmo") +PROFILE = "position-analyst" + +# 要中继的 cron job ID 列表(需要推送到 XMPP 的) +RELAY_JOBS = { + "62a2ba59f7ff": "快速盯盘-15分钟", + "e27e2e92ed80": "知识萃取-盘后", + "9d1236d8a07f": "策略评估-每日", + "5dde4e1a42ce": "分析师-持仓复查", +} + +# 输出目录(与 cron_to_xmpp.py 一致) +# 注意:~/.hermes 是 symlink 到 /home/hmo/.hermes/profiles/position-analyst/home/.hermes +# cron_to_xmpp.py 使用绝对路径 REAL_HOME / ".hermes" / "cron" / "output" +# 所以这里必须用绝对路径,不要相信 ~/.hermes 的解析 +OUTPUT_DIRS = [ + REAL_HOME / ".hermes" / "cron" / "output", + REAL_HOME / ".hermes" / "profiles" / PROFILE / "cron" / "output", +] + +JOURNAL = REAL_HOME / ".hermes" / "cron" / ".relay_journal.json" +STATE_DB = REAL_HOME / ".hermes" / "profiles" / PROFILE / "state.db" + +MAX_AGE_MINUTES = 70 # 只处理最近70分钟内的报告 +TRACK_FILE = REAL_HOME / ".hermes" / "cron" / ".bridge_track.json" # 追踪已桥接的session + + +def load_track(): + try: + return set(json.loads(TRACK_FILE.read_text())) + except: + return set() + + +def save_track(entries): + TRACK_FILE.write_text(json.dumps(sorted(entries))) + + +def load_journal(): + try: + return set(json.loads(JOURNAL.read_text())) + except: + return set() + + +def save_journal(entries): + JOURNAL.write_text(json.dumps(sorted(entries))) + + +def ensure_output_dirs(): + for d in OUTPUT_DIRS: + d.mkdir(parents=True, exist_ok=True) + for job_id in RELAY_JOBS: + (d / job_id).mkdir(exist_ok=True) + + +def extract_report_content(content): + """从assistant消息中提取报告正文""" + if not content or content.strip() in ("", " ", "\n", "\n\n"): + return None + + text = content.strip() + + # 跳过太短的消息 + if len(text) < 20: + return None + + # 跳过 [SILENT] + if "[SILENT]" in text: + return None + + # 跳过思考过程(只留下实际报告内容) + # 如果消息以"Now let me"/"Let me"/"I need"等开头,尝试找后面的报告正文 + lines = text.split('\n') + report_lines = [] + in_report = False + for line in lines: + if not in_report: + # 报告特征:以【开头 或 包含📊 或 包含【知微】 + if any(x in line for x in ["【", "📊", "【知微", "【⚡", "## "]): + in_report = True + report_lines.append(line) + else: + report_lines.append(line) + + if report_lines: + text = '\n'.join(report_lines) + + if len(text) < 20: + return None + + return text + + +def scan(): + processed = load_journal() + tracked = load_track() + new = set() + n_written = 0 + + if not STATE_DB.exists(): + print(f"state.db not found: {STATE_DB}", file=sys.stderr) + return + + conn = sqlite3.connect(str(STATE_DB)) + conn.row_factory = sqlite3.Row + cur = conn.cursor() + + now = datetime.now() + + for job_id, job_name in RELAY_JOBS.items(): + # Find recent sessions for this job + cur.execute(''' + SELECT id, started_at, message_count, source + FROM sessions + WHERE id LIKE ? + ORDER BY started_at DESC + LIMIT 10 + ''', (f'cron_{job_id}_%',)) + + sessions = cur.fetchall() + + for s in sessions: + session_id = s['id'] + + # Skip already bridged sessions + if session_id in tracked: + continue + + started_at = datetime.fromtimestamp(s['started_at']) if s['started_at'] else now + + # Skip too old sessions + age_minutes = (now - started_at).total_seconds() / 60 + if age_minutes > MAX_AGE_MINUTES: + continue + + # Find the last assistant message + cur.execute(''' + SELECT content, timestamp + FROM messages + WHERE session_id = ? AND role = 'assistant' + AND content NOT IN ('', ' ', '\n', '\n\n', '\n\n\n') + ORDER BY timestamp DESC + LIMIT 1 + ''', (session_id,)) + + msg = cur.fetchone() + if not msg: + tracked.add(session_id) + continue + + content = msg['content'].strip() + report = extract_report_content(content) + if not report: + tracked.add(session_id) + continue + + # Mark as tracked even before writing + tracked.add(session_id) + + # Generate a unique key for this report + ts = datetime.fromtimestamp(msg['timestamp']).strftime('%Y%m%d_%H%M%S') if msg['timestamp'] else started_at.strftime('%Y%m%d_%H%M%S') + filename = f"{job_name}_{ts}.md" + + for out_dir in OUTPUT_DIRS: + out_path = out_dir / job_id / filename + key = str(out_path.resolve()) + + if key in processed or key in new: + continue + + # Write the report as an .md file (matching cron_to_xmpp.py format) + md_content = f"# Cron Job: {job_name} ({session_id})\n\n## Response\n\n{report}\n" + out_path.write_text(md_content, encoding='utf-8') + new.add(key) + n_written += 1 + print(f" Written: {out_path.relative_to(REAL_HOME)}", file=sys.stderr) + + conn.close() + + if tracked: + save_track(tracked) + + print(f"桥接完成:写入{n_written}份新报告", file=sys.stderr) + # 桥接脚本只负责写入 .md 文件,不做去重追踪 + # 这样可以避免重复推送的复杂问题 + # 可能每次运行会写重复的文件,但cron_to_xmpp.py会用journal去重 + + print(f"桥接完成:写入{n_written}份新报告", file=sys.stderr) + + +if __name__ == "__main__": + scan() diff --git a/static/index.html b/static/index.html new file mode 100644 index 0000000..119a1ed --- /dev/null +++ b/static/index.html @@ -0,0 +1,1106 @@ + + + + + +MoFin · 莫荷情报 + + + + + + +
+ +
+
+ 📊 +

MoFin

+ 知微 +
+
+ + +
+
+ + +
+ + + + + + + + + 📸 上传 +
+ + +
+ + + + + + + +
+ + + + + + + + + + \ No newline at end of file diff --git a/static/upload.html b/static/upload.html new file mode 100644 index 0000000..a1cf1a2 --- /dev/null +++ b/static/upload.html @@ -0,0 +1,247 @@ + + + + + +MoFin - 持仓截图解析 + + + +
+

📸 持仓截图解析

+

← 返回仪表盘 · 上传持仓截图或自选截图,知微自动识别并更新数据

+ +
+
📤
+
点击上传、拖拽图片、或 Ctrl+V 粘贴
+
支持 PNG / JPG / JPEG,建议清晰截图
+
+ + + + +
+
+
🔍 知微正在分析图片...
+
+ +
+ +
+
+ 持仓 +

+
+
+
+ + +
+
+
+ + +
+ + + + \ No newline at end of file diff --git a/system_health_check.py b/system_health_check.py new file mode 100644 index 0000000..60244f7 --- /dev/null +++ b/system_health_check.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python3 +"""system_health_check.py — MoFin 系统健康检查 + +每日运行,检查所有组件是否正常工作。 +输出报告,有问题才推送。 +""" +import json, os, sys, subprocess +from datetime import datetime, timedelta +from pathlib import Path + +DATA_DIR = Path("/home/hmo/web-dashboard/data") +DECISIONS_PATH = DATA_DIR / "decisions.json" +PORTFOLIO_PATH = DATA_DIR / "portfolio.json" +EVENTS_PATH = DATA_DIR / "price_events.json" +EVALUATION_PATH = DATA_DIR / "evaluation.json" +ACCURACY_PATH = DATA_DIR / "accuracy_stats.json" +CRON_JOBS = "/home/hmo/.hermes/cron/jobs.json" +POSITION_CRON = "/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json" + +def check(ok, msg): + icon = "✅" if ok else "⚠️" + return f" {icon} {msg}" + +def load_json(path, default=None): + try: + with open(path) as f: + return json.load(f) + except: + return {} if default is None else default + +def check_cron_jobs(path, label): + issues = [] + try: + d = load_json(path, {"jobs": []}) + for j in d.get("jobs", []): + name = j.get("name", "?") + enabled = j.get("enabled", True) + last = j.get("last_run_at", "") + status = j.get("last_status", "") + if not enabled: + issues.append(f"{name} 已禁用") + elif not last: + issues.append(f"{name} 从未运行") + elif status != "ok": + issues.append(f"{name} 上次状态={status}") + return len(d.get("jobs", [])), issues + except: + return 0, ["无法读取"] + +def run(): + now = datetime.now() + issues = [] + ok_count = 0 + warn_count = 0 + + lines = [f"MoFin 系统健康检查 | {now.strftime('%Y-%m-%d %H:%M')}"] + lines.append("") + + # 1. 进程检查 + lines.append("【进程】") + procs = { + "mofin-dashboard": "mofin-dashboard", + "xmpp-zhiwei": "xmpp_zhiwei_bot", + "ejabberd": "ejabberd", + } + for name, pattern in procs.items(): + # 先查 systemd,再查 pgrep + r = subprocess.run(["systemctl", "is-active", f"{pattern}.service"], capture_output=True, text=True, timeout=5) + alive = r.stdout.strip() == "active" + if not alive: + r2 = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5) + alive = r2.returncode == 0 + lines.append(check(alive, f"{name} {'运行中' if alive else '已停止'}")) + if not alive: issues.append(f"{name} 进程不存在"); warn_count += 1 + else: ok_count += 1 + + # 2. 端口检查 + lines.append("") + lines.append("【端口】") + ports = {"8899": "Dashboard", "5222": "ejabberd", "8643": "知微Gateway"} + for port, name in ports.items(): + r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5) + listening = f":{port}" in r.stdout + lines.append(check(listening, f"{name} :{port} {'监听中' if listening else '未监听'}")) + if not listening: issues.append(f"{name} 端口{port}未监听"); warn_count += 1 + else: ok_count += 1 + + # 3. 数据文件检查 + lines.append("") + lines.append("【数据文件】") + files = { + "portfolio.json": PORTFOLIO_PATH, + "watchlist.json": DATA_DIR / "watchlist.json", + "decisions.json": DECISIONS_PATH, + "market.json": DATA_DIR / "market.json", + "price_events.json": EVENTS_PATH, + "evaluation.json": EVALUATION_PATH, + "accuracy_stats.json": ACCURACY_PATH, + } + for name, path in files.items(): + exists = path.exists() + size = path.stat().st_size if exists else 0 + lines.append(check(exists and size > 10, f"{name} {'存在' if exists else '缺失'} ({size}B)")) + if not exists or size < 10: + issues.append(f"{name} 缺失或为空") + warn_count += 1 + else: + ok_count += 1 + + # 4. 价格事件统计 + lines.append("") + lines.append("【价格事件】") + events = load_json(EVENTS_PATH, {"events": []}) + ev_list = events.get("events", []) + today_events = [e for e in ev_list if e.get("date") == now.strftime("%Y-%m-%d")] + lines.append(check(len(ev_list) > 0, f"历史事件: {len(ev_list)}条")) + lines.append(check(len(today_events) > 0, f"今日事件: {len(today_events)}条")) + if len(ev_list) == 0: + issues.append("price_events.json 无事件记录,price_monitor可能未触发过") + warn_count += 1 + else: + ok_count += 1 + + # 5. 策略评估统计 + lines.append("") + lines.append("【策略评估】") + evals = load_json(EVALUATION_PATH, {"strategies": []}) + s_list = evals.get("strategies", []) + lines.append(check(len(s_list) > 0, f"已评估策略: {len(s_list)}条")) + if len(s_list) > 0: + avg = sum(s.get("score", 0) for s in s_list) / len(s_list) + lines.append(check(avg > 0, f"平均评分: {avg:.1f}/10")) + ok_count += 1 + else: + issues.append("evaluation.json 无评估数据") + warn_count += 1 + + # 6. 建议记录统计 + lines.append("") + lines.append("【建议记录】") + decisions = load_json(DECISIONS_PATH, {"decisions": []}) + total_advice = sum(len(d.get("advice_timeline", [])) for d in decisions.get("decisions", [])) + lines.append(check(total_advice > 0, f"建议记录: {total_advice}条")) + if total_advice == 0: + issues.append("所有策略建议记录为空") + warn_count += 1 + else: + ok_count += 1 + + # 7. Cron jobs + lines.append("") + lines.append("【Cron Jobs】") + cnt, cron_issues = check_cron_jobs(CRON_JOBS, "default") + lines.append(check(cnt > 0, f"default profile: {cnt}个job")) + for ci in cron_issues: + lines.append(f" ⚠️ {ci}") + warn_count += 1 + if cnt == 0: warn_count += 1 + cnt2, cron_issues2 = check_cron_jobs(POSITION_CRON, "position-analyst") + lines.append(check(cnt2 > 0, f"position-analyst: {cnt2}个job")) + for ci in cron_issues2: + lines.append(f" ⚠️ {ci}") + warn_count += 1 + if cnt2 == 0: warn_count += 1 + + # 8. 数据新鲜度 + lines.append("") + lines.append("【数据新鲜度】") + # 各数据文件的合理最大陈旧时间(小时) + freshness_thresholds = { + "portfolio.json": 24, # 每日有数据即可 + "decisions.json": 48, # 策略参数更新频率较低 + "multi_tf_cache.json": 24, # K线缓存每日更新 + "macro_context.json": 24, # 宏观数据每日2次 + "market.json": 48, # 行业数据每日更新 + "strategy_staleness_report.json": 24, # 时效性报告每日生成 + } + data_files = { + "portfolio.json": PORTFOLIO_PATH, + "decisions.json": DECISIONS_PATH, + "multi_tf_cache.json": DATA_DIR / "multi_tf_cache.json", + "macro_context.json": DATA_DIR / "macro_context.json", + "market.json": DATA_DIR / "market.json", + "strategy_staleness_report.json": DATA_DIR / "strategy_staleness_report.json", + } + for name, path in data_files.items(): + if not path.exists(): + lines.append(check(False, f"{name} 缺失")) + issues.append(f"{name} 文件缺失") + warn_count += 1 + continue + mtime = datetime.fromtimestamp(path.stat().st_mtime) + hours_ago = (now - mtime).total_seconds() / 3600 + threshold = freshness_thresholds.get(name, 24) + fresh = hours_ago < threshold + time_str = f"{hours_ago:.0f}h前" if hours_ago >= 1 else f"{hours_ago*60:.0f}分钟前" + lines.append(check(fresh, f"{name} 更新于 {time_str} (阈值{threshold}h)")) + if not fresh: + issues.append(f"{name} 超过{threshold}h未更新(最近更新:{time_str})") + warn_count += 1 + else: + ok_count += 1 + + # 数据管道组件检查 + lines.append("") + lines.append("【数据管道】") + pipe_checks = [ + ("再生器(regenerate_all)", r"strategy_lifecycle\.py"), + ("市场采集(market_watch)", r"market_watch\.py"), + ("宏观采集(macro)", r"macro_context_collector\.py"), + ] + for pname, ppattern in pipe_checks: + r = subprocess.run(["pgrep", "-f", ppattern], capture_output=True, timeout=5) + if r.returncode == 0: + lines.append(check(True, f"{pname} 进程存在")) + ok_count += 1 + else: + # no_agent脚本不常驻,不报warn + lines.append(" 📎 {} 无常驻进程(no_agent脚本按cron调度运行)".format(pname)) + + # 价格数据更新时间检查(盘中应有当日数据) + is_trading_day = now.weekday() < 5 # 周一到周五 + if is_trading_day and now.hour >= 9 and now.hour < 16: + if PORTFOLIO_PATH.exists(): + mtime = datetime.fromtimestamp(PORTFOLIO_PATH.stat().st_mtime) + hours_ago = (now - mtime).total_seconds() / 3600 + has_intraday_data = mtime.date() == now.date() + lines.append(check(has_intraday_data, f"盘中有当日价格数据 {'是' if has_intraday_data else '否'}(最近{mtime.strftime('%H:%M')})")) + if not has_intraday_data: + issues.append(f"盘中交易时段但portfolio.json无今日数据(最近更新{mtime.strftime('%m-%d %H:%M')})") + warn_count += 1 + else: + ok_count += 1 + + # 汇总 + total = ok_count + warn_count + lines.append("") + lines.append(f"总计: ✅ {ok_count}/{total} 正常 | ⚠️ {warn_count}/{total} 需关注") + if issues: + lines.append("") + lines.append("需关注项:") + for i, issue in enumerate(issues[:10], 1): + lines.append(f" {i}. {issue}") + + report = "\n".join(lines) + print(report) + + # 如果有问题,写入报告文件供推送 + if warn_count > 0: + report_path = Path("/home/hmo/.hermes/profiles/position-analyst/cron/output/health") + report_path.mkdir(parents=True, exist_ok=True) + report_file = report_path / f"health_{now.strftime('%Y%m%d_%H%M')}.md" + report_file.write_text(f"# MoFin 系统健康检查\n\n{report}") + print(f"\n报告已写入 {report_file}") + else: + print("\n[SILENT] 一切正常") + + +if __name__ == "__main__": + run()