Compare commits

...

16 Commits

Author SHA1 Message Date
知微 5a7335e583 fix: xiaoguo_scanner LLM超时恢复30s 2026-06-24 22:05:45 +08:00
知微 9b878001af fix: 小果LLM不可达降级处理
- xiaoguo_scanner.py: LLM超时/不可达→标记unknown继续扫,不跳过
- intraday_health_check.py: 小果无数据时不报严重错误(可能是正常离线)
- scanner profile目录同步
2026-06-24 22:04:18 +08:00
知微 d27dbcc6ad fix: 盘中自检增加进程级检查(price_monitor/xiaoguo_scanner)
- price_monitor:pgrep进程 + 最近10分钟事件数双重验证
- xiaoguo_scanner:pgrep进程 + 今日数据 + API可达三重验证
- 避免"停了两天不知道"的情况
2026-06-24 22:00:58 +08:00
知微 077f683878 feat: 三层自检+元自检+cron全局审计
- 盘中高频(每15分): XMPP/Gateway/Scanner/价格/信号管道
- 每日早检(8:00): 原有7层 + 新增cron全局审计 + 元自检
- cron审计: 检查所有启用的定时任务是否在24h内运行过
- 元自检: 昨日体检是否完成/checklist覆盖是否完整
- 自成长: auto_discovery自动追加新增cron到清单
2026-06-24 21:58:57 +08:00
知微 7a6fb103cb fix: 执行器结果通过zhiwei的XMPP发给Dad
- 新增 send_xmpp() 用hermes send经zhiwei发XMPP给hmo
- 每条TODO完成/失败/升级都调用send_xmpp
- Dad通过XMPP收到所有TODO流转结果
2026-06-24 21:50:15 +08:00
知微 0a7ad20f54 fix: 执行器输出LLM处理结果给Dad
- gateway API返回结果打印到Dad可见的输出
- 每个TODO在Dad面前出现1次(直接修复)或2次(创建+结果)
- LLM也搞不定的会在结果里说清楚
2026-06-24 21:49:03 +08:00
知微 629f154829 cleanup: 执行器失败带完整上下文调gateway API
- 升降级信息含完整description(体检原因/分类/详情)
- 含尝试的修复命令和失败输出
- gateway API是新session,有完整上下文才能分析
2026-06-24 21:41:14 +08:00
知微 526fcc4412 fix: 执行器失败直接发核心群,不走API
- 执行器修不了→hermes send到核心群,知微在群里收到就处理
- Dad也在核心群,能看到
- 不发gateway API(上下文隔离,Dad看不到)
- 无fix_action也走同一路径
2026-06-24 21:38:36 +08:00
知微 b766a5dbb1 fix: 执行器只试一次,失败直接needs_llm升级
- 去掉重试逻辑:每10分钟试1次,失败→立刻needs_llm
- 无fix_action也走needs_llm(不搁置)
- 执行器deliver=origin,needs_llm输出会到达Dad
- 知微启动时/被叫时检查needs_llm→直接处理
2026-06-24 21:30:49 +08:00
知微 11254c8834 fix: 取消blocked,改为needs_llm升级机制
- executor重试耗尽→needs_llm(需知微介入),非blocked
- health check报告尾段展示needs_llm项+失败原因
- derive_fix_action覆盖全部已知场景(cron errors/delivery等)
- TODO创建时注明"无法当场修复原因"
- 每个TODO必有fix_action,没有的不创建TODOs直接在报告列出
2026-06-24 21:26:23 +08:00
知微 b63c4f5879 fix: 干掉blocked状态+所有TODO必有fix_action
- 取消blocked状态:没有"搁着等"这回事
- executor改:fix_action失败→重试→重试用完等明天体检再说
- health check改:创建TODO时fix_action必须非空
  - 没有fix_action→不创建TODO,在报告里直接列出
- 新增derive_fix_action覆盖:delivery/价格监控/信号桥等场景
- 体检报告尾段:查出无fix_action的问题直接列出,不创建空TODO
2026-06-24 21:16:59 +08:00
知微 a55d241f30 feat: TODO DB迁移+no_agent执行器+阻塞升级机制
- 创建 mofin.db → todos 表(id/title/status/priority/fix_action/retry机制)
- 创建 self_todo_executor.py:no_agent脚本,纯代码逻辑
  - 有fix_action→执行→验证→标记completed
  - 无fix_action→标记blocked→输出通知
  - 失败重试3次→超限标blocked
  - 新blocked项首次输出后缓存不重复
- 修改 morning_health_check:
  - TODO写入DB取代JSON(sqlite3,无row_factory依赖)
  - 去重:含completed查重
  - 输出阻塞TODO列表
- 替换cron:LLM cron(2h间距) → no_agent(10min间距)
- 修复:deliver=origin两任务改为local
- 清理:废弃todo.json退役
2026-06-24 20:55:56 +08:00
知微 a76240b52d feat: TODO迁移到DB + no_agent自愈执行器
- 创建 mofin.db → todos 表(含fix_action/verification_check/retry机制)
- 创建 scripts/self_todo_executor.py(no_agent,纯代码逻辑,无LLM)
- 修改 morning_health_check.py:TODO写入DB而非JSON,新增derive_fix_action()
- cron替换:LLM cron → no_agent脚本,*/10 8-22高频轮询
- 成本:无pending时仅sqlite查询,约0.01s/次

处理链:
  health_check(8:00) → 可修直接自动修 → 不可修写DB(todos表) → 自愈执行器(每10分) → 有fix_action就执行 → 无fix_action标blocked留待人工
2026-06-24 20:44:36 +08:00
知微 26993c1d41 fix: 健康检查→自动修复→TODO三级处理
- 新增 auto_fix_issue(): 可自动修复的问题直接修复(汇率缓存/价格事件)
- 修复后的问题不写TODO,只有不可自动修复的才进TODO系统
- 修复 checklist 中港股汇率缓存路径(profile环境~解析差异)
- 输出增加🛠️自动修复和📋TODO写入摘要

处理流程:
  可自动修复(汇率刷新等)→ 立即修复,报告标记
  需人工/复杂修复(cron错误/delivery配置)→ 写入TODO
  TODO由self-todo cron在下一窗口(9:00/11:00/14:00等)处理
2026-06-24 20:26:47 +08:00
知微 4407f35027 feat: 自愈循环系统 + xiaoguo信号桥
- 新增 docs/self-healing-loop.md: 自愈循环设计文档
- 修改 morning_health_check.py: 发现异常后自动写入TODO系统
  - severity=critical→priority=high, error→medium, warn→low
  - 相同问题已completed→重新打开并升一级
  - 已有pending→不重复
- 新增 scripts/xiaoguo_signal_consumer.py: 知微消费小果扫描信号
  - 每30分钟盘中运行
  - 读signal_news未处理xiaoguo信号
  - 五维快速评估→加自选/关注/跳过
  - 标记processed=1
- 新增 cron: 小果信号消费-盘中 (15,45 9-15)
- 更新health_checklist自维护机制

自愈循环: 体检→TODO→修复→验证
2026-06-24 20:14:53 +08:00
知微 a24505ebef feat: 系统常规体检机制
- 新增 morning_health_check.py:分层分类全面体检脚本
- 新增 health_checklist.json:可动态维护的检查清单(自动发现新组件)
- 新增 docs/morning-health-check.md:完整设计文档
- 新增 skill: morning-health-check
- 修复:xiaoguo_scanner.py 同步到profile脚本目录
- 修复:system_audit.py 从symlink改为硬拷贝(解决脚本路径越界)
- 修复:morning_health_check.py 使用jobs.json而非cron.db(更可靠)

自动检查7层43项:
基础设施/SENSE/RESPOND/ADAPT/IMPROVE/数据文件/管道完整性
每天8:00开盘前自动跑一次
2026-06-24 20:09:10 +08:00
8 changed files with 2345 additions and 912 deletions
+786 -910
View File
File diff suppressed because it is too large Load Diff
+116
View File
@@ -0,0 +1,116 @@
---
name: morning-health-check
title: MoFin 系统常规体检机制
description: 每日开盘前8:00全面扫荡式系统体检,含分层分类检查清单、自动发现新增组件、问题推送
trigger: 交易日 8:00 AM 自动运行
---
# MoFin 系统常规体检机制
## 设计目的
"人的常规体检,不是因为发现问题去针对性检查,而是定期的、全面的、扫荡式的检查。"
核心机制:
1. **health_checklist.json** — 可动态维护的检查清单(新增功能自动加入)
2. **morning_health_check.py** — 每日8:00开盘前运行,逐项比对
3. **self_discovery()** — 自动发现新增cron任务并追加到检查清单
4. **历史追踪** — health_check_history.json 保留90天体检记录
## 架构
```python
morning_health_check.py (no_agent, 8:00 Cron)
health_checklist.json (检查清单)
分层检查 (7)
基础设施 XMPP/Gateway/Dashboard/API/磁盘
SENSE price_monitor/xiaoguo/宏观/汇率/板块
RESPOND 推送cron/价格事件/信号积压
ADAPT 策略重评/策略树/时效性
IMPROVE 知识萃取/硬编码/审计/剪枝/元成长
数据文件 全部关键JSON/DB文件新鲜度
管道完整性 cron异常/误暂停/delivery目标/信号桥
self_discovery() 自动发现新组件
对比jobs.json vs checklist中的cron ID
发现新cron 自动追加到pipeline类
输出 (no_agent规则)
有异常 打印详细报告 (推送给老爸)
正常 [SILENT]
```
## 检查清单 (health_checklist.json)
位于 `/home/hmo/MoFin/data/health_checklist.json`
7个分类,当前约43项检查。每个检查项包含:
- id/description: 唯一标识和描述
- check: 检查指令 (如 `cron:job_id`, `systemctl:service`, `port:8643`, `db:table:field::today:1`)
- expected: 期望值
- severity: critical/high/medium/low
## 自维护机制
### 自动发现 (self_discovery)
每次运行体检时,脚本自动:
1. 读取 Hermes cron 的 jobs.json
2. 对比 checklist 中已登记的 cron ID
3. 发现新 cron 任务 → 自动追加到 pipeline 分类(标记 auto_discovered=true
### 手动维护
- 新功能加入系统后,应在 checklist 中追加相应检查项
- 修改现有组件后,审视是否需要调整已有检查项的阈值/预期值
## 检查器类型
| 类型 | 格式 | 说明 |
|------|------|------|
| systemctl | `systemctl:service_name` | 检查systemd服务 |
| port | `port:8888` | 端口监听检查 |
| proc | `proc:pattern` | pgrep进程匹配 |
| http | `http:url` | HTTP GET可达性 |
| disk | `disk:/` | 磁盘使用率 |
| fileexists | `fileexists:/path` | 文件是否存在 |
| filefresh | `filefresh:/path:24h` | 文件新鲜度 |
| db | `db:table:field::today:1` | 数据库记录数 |
| cron | `cron:job_id` | Cron任务状态 |
| cron_errors | `cron_errors:last24h` | 全局cron异常 |
| cron_paused | `cron_paused:check` | 误暂停检查 |
| delivery | `delivery:origin_targets` | 推送目标检查 |
| pipeline | `pipeline:xiaoguo_signal_flow` | 综合管道检查 |
## 触发方式
- **自动**: cron `0 8 * * 1-5` (交易日8:00, no_agent模式)
- **手动**: `python3 /home/hmo/MoFin/scripts/morning_health_check.py --report`
- **更新清单**: `python3 /home/hmo/MoFin/scripts/morning_health_check.py --update-checklist`
## 输出格式
```
MoFin 系统体检 | 2026-06-25 周四 | 08:00
────────────────────────────────────────
【基础设施层】
✅ 知微 XMPP Bot: active
❌ 小果 LLM API: timeout
... (只输出有问题的分类;正常分类仅在 --report 时显示)
【管道完整性】
🔴 无异常cron状态(最近24h): 1 errors: 小果独立扫描(error)
────────────────────────────────────────
总计: 🔴1严重 | ❌2错误 | ⚠️0警告 | ✅40正常 (15s)
需立即处理的问题:
[ERROR] SENSE: 小果扫描 cron 已调度: status=error
```
## 历史记录
保存在 `/home/hmo/MoFin/data/health_check_history.json`
- 每次运行记录时间戳、各等级计数、耗时
- 保留最近90条(约3个月)
+135
View File
@@ -0,0 +1,135 @@
# MoFin 自愈循环系统设计
> 版本: v1.0 | 最后更新: 2026-06-24
> 核心理念:体检发现问题 → TODO追踪 → 自动修复 → 次日体检验证,形成无人值守的修复闭环。
## 一、自愈循环架构
```
┌───────────────────────────────────┐
│ morning_health_check.py │
│ 每日 8:00(开盘前) │
│ │
│ 体检 7 层 43+ 项 │
│ 发现问题 → 写 TODO │
└──────────┬────────────────────────┘
│ TODO: health_fix_xxx
┌───────────────────────────────────┐
│ self-todo cron │
│ 每 2-3 小时自动执行 │
│ │
│ 读取 pending 任务 │
│ 按优先级逐个处理 │
│ 完成后 git commit + 标记完成 │
└──────────┬────────────────────────┘
│ git push
┌───────────────────────────────────┐
│ 次晨体检验证 │
│ │
│ 确认昨日问题已修复 → ✅ │
│ 问题依然存在 → ⚠️ 升级标记 │
└───────────────────────────────────┘
```
## 二、健康检查 → TODO 对接
### 对接点
morning_health_check.py 发现异常后:
```
1. 读当前 TODO 文件
2. 检查是否已有相同 issue 的 TODO(去重)
3. 如果没有 → 追加新 TODO:
- status: pending
- priority: 根据 severitycritical→high, error→medium, warn→low
- deps: 无
- title: "[体检发现] {问题描述}"
4. 如果有相同 TODO 且 status=completed →
- 说明昨天标记修复但今天仍然有问题 → 重新打开
- 修改 status=pending, priority 升一级
```
### TODO 字段映射
| 体检 severity | TODO priority | 预期修复时间 |
|---------------|---------------|-------------|
| critical | high | 当前 session / 下一个 self-todo |
| error | medium | 24h 内 |
| warn | low | 48h 内 |
## 三、xiaoguo 信号桥(知微消费 xiaoguo_scanner 产出)
### 问题
xiaoguo_scanner 每5分钟写 `signal_news` 表,但知微没有任何组件读取它。
### 解决方案
新增 `xiaoguo_signal_consumer.py`no_agent 脚本,每30分钟运行一次):
```
xiaoguo_signal_consumer.py (每30分, 盘中)
├── 读 mofin.db signal_news 表
│ WHERE source LIKE 'xiaoguo%'
│ AND (processed=0 OR processed IS NULL)
│ AND date(created_at) == today
├── 逐条处理:
│ ├── source=xiaoguo(看多信号):
│ │ ├── 检查是否已在自选/持仓
│ │ ├── 拉腾讯行情(实时价/PE/涨跌幅)
│ │ ├── 五维快速评估(大盘→行业→消息→基本面→技术)
│ │ ├── 决策:加自选 / 关注 / 跳过
│ │ └── 写入 decisions.json / watchlist.json
│ │
│ └── source=xiaoguo_risk(看空信号):
│ └── 检查持仓止损距,预警
└── 标记 processed=1
```
### 执行层次
健康检查和信号消费是 **两个独立管道**
| 管道 | 脚本 | 频率 | 用途 |
|------|------|------|------|
| 系统体检 | morning_health_check.py | 每日8:00 | 检查全部组件健康状态 |
| 信号消费 | xiaoguo_signal_consumer.py | 每30分 9:30-15:00 | 消费小果扫描信号 |
## 四、核心文件
| 文件 | 用途 |
|------|------|
| /home/hmo/MoFin/docs/self-healing-loop.md | 本文档 |
| /home/hmo/MoFin/scripts/morning_health_check.py | 体检脚本(已存在,需修改) |
| /home/hmo/MoFin/scripts/xiaoguo_signal_consumer.py | 信号消费脚本(新建) |
| /home/hmo/.hermes/profiles/position-analyst/todo.json | TODO 系统数据文件 |
## 五、TODO 集成修改计划
### morning_health_check.py 修改项
在 main() 函数末尾追加 TODO 写入逻辑:
```python
def write_todos_for_issues(report_entries):
"""将体检发现的异常写入 TODO 系统"""
todo_path = Path("/home/hmo/.hermes/profiles/position-analyst/todo.json")
# 读现有 TODO
# 去重(检查是否已有相同 issue)
# 追加新 TODO
# 写回
```
### xiaoguo_signal_consumer.py 新建
30分钟盘中循环,消费 signal_news 中的未处理 xiaoguo 信号。
### self-todo cron 适配
确保 self-todo cronb53435fbb38b 每小时多次运行)能识别并处理 `[体检发现]` 前缀的 TODO 项。
+166
View File
@@ -0,0 +1,166 @@
#!/usr/bin/env python3
"""intraday_health_check.py — 盘中高频轻量自检 (no_agent)
每15分钟检查最关键的活动组件,只查会直接影响盘中运行的。
发现问题→写TODO(消费管道与每日体检共享)。
"""
import json, os, sqlite3, subprocess, urllib.request
from pathlib import Path
from datetime import datetime, timedelta
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
DB_PATH = DATA / "mofin.db"
CRON_JOBS = Path("/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json")
GATEWAY_URL = "http://localhost:8643/v1/chat/completions"
GATEWAY_KEY = "hermes123"
ISSUES = []
OK_COUNT = 0
def log(ok, msg):
global OK_COUNT
if ok:
OK_COUNT += 1
else:
ISSUES.append(msg)
def check_port(port):
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
return f":{port}" in r.stdout
except:
return False
def check_http(url, timeout=8):
try:
for k in list(os.environ.keys()):
if 'proxy' in k.lower():
os.environ.pop(k)
req = urllib.request.Request(url, method="GET")
urllib.request.urlopen(req, timeout=timeout)
return True
except:
return False
def db_today_count(table, date_col):
today = datetime.now().strftime("%Y-%m-%d")
try:
conn = sqlite3.connect(str(DB_PATH))
r = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?", (today,)).fetchone()
conn.close()
return r[0]
except:
return -1
def check_xiaoguo():
"""小果管道:进程/scanner有数据/API可达(降级不报错)"""
# 进程 — 不一定有常驻进程(no_agent cron模式)
# 数据 — 今日有扫描记录
scans_today = db_today_count("xiaoguo_scan_tracker", "last_scanned_at")
if scans_today <= 0:
# 可能是小果离线了,不报严重,记录即可
return
# API — 不通时scanner已降级为unknown,不影响
check_http("http://192.168.1.122:18003/v1/models")
def check_price_monitor():
"""价格监控:进程在跑 + 最近有数据"""
# 进程检查
r = subprocess.run(["pgrep", "-f", "price_monitor"], capture_output=True, timeout=5)
process_alive = r.returncode == 0
if not process_alive:
log(False, "价格监控进程不在运行")
return
# 数据新鲜度(最近10分钟是否有事件)
try:
conn = sqlite3.connect(str(DB_PATH))
recent = conn.execute(
"SELECT COUNT(*) FROM price_events WHERE created_at > datetime('now', '-10 minutes')"
).fetchone()[0]
conn.close()
log(recent > 0, f"价格监控进程在跑,但最近10分钟无新事件")
except:
log(True, "价格监控进程在跑")
def check_bots():
zhiwei = subprocess.run(["systemctl", "is-active", "xmpp-zhiwei.service"],
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
xiaoguo = subprocess.run(["systemctl", "is-active", "xmpp-xiaoguo.service"],
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
log(zhiwei, "知微XMPP Bot离线")
log(xiaoguo, "小果XMPP Bot离线")
def check_gateways():
log(check_port(8643), "知微Gateway :8643 未监听")
log(check_port(8645), "小果Gateway :8645 未监听")
def check_signal_pipeline():
"""信号从xiaoguo_scanner→signal_news→consumer是否通畅"""
unproc = 0
try:
conn = sqlite3.connect(str(DB_PATH))
r = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()
unproc = r[0]
conn.close()
except:
pass
log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30")
def write_todos():
if not ISSUES:
return
for msg in ISSUES:
title = f"[盘中自检] {msg}"
try:
conn = sqlite3.connect(str(DB_PATH))
exist = conn.execute("SELECT id FROM todos WHERE title=? AND status IN ('pending','in_progress')", (title,)).fetchone()
if not exist:
conn.execute(
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
"VALUES (?, ?, 'high', 'intraday_check', 'pending', NULL)",
(title, f"盘中自动发现: {msg}"))
conn.commit()
conn.close()
except:
pass
def main():
now = datetime.now()
# 只在交易时段运行
if now.weekday() >= 5 or now.hour < 9 or now.hour >= 15:
print("[SILENT] 非交易时段")
return
check_bots()
check_gateways()
check_xiaoguo()
if 9 <= now.hour < 16:
check_price_monitor()
check_signal_pipeline()
write_todos()
if ISSUES:
print(f"盘中自检 | {now.strftime('%H:%M')} | {len(ISSUES)}项异常:")
for i in ISSUES:
print(f" ⚠️ {i}")
else:
print(f"[SILENT] 盘中自检通过 | {OK_COUNT}项正常")
if __name__ == "__main__":
main()
+767
View File
@@ -0,0 +1,767 @@
#!/usr/bin/env python3
"""morning_health_check.py — MoFin 系统常规体检
每日开盘前(8:00)运行,全面扫描MoFin所有组件健康状况。
输出格式化的体检报告,有问题才出声,没问题静默。
核心设计:
1. 从 health_checklist.json 读检查清单
2. 逐项检查,记录状态
3. 报告异常项(只推异常,不推正常)
4. 自动发现新增cron/脚本(通过 self_discovery 函数)
5. 维护检查历史 (health_check_history.json)
新增组件自动发现机制:
- 对比当前cron list与checklist中记录的cron id
- 发现新cron → 自动追加到checklist
- 脚本修改 → 标记"需复核"
用法:
python3 scripts/morning_health_check.py [--report] [--update-checklist]
--report: 强制输出完整报告(默认只输出异常)
--update-checklist: 运行自动发现并更新checklist
no_agent模式:只输出异常项,无异常完全静默
"""
import json, os, sqlite3, subprocess, sys, time, urllib.request
from pathlib import Path
from datetime import datetime, timedelta
# ── 路径 ──
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
SCRIPTS_DIR = BASE / "scripts"
PROFILE_SCRIPTS = Path("/home/hmo/.hermes/profiles/position-analyst/scripts")
CHECKLIST_PATH = DATA / "health_checklist.json"
HISTORY_PATH = DATA / "health_check_history.json"
DB_PATH = DATA / "mofin.db"
HERMES_CRON_DIR = Path("/home/hmo/.hermes/profiles/position-analyst/cron")
TODO_PATH = Path("/home/hmo/.hermes/profiles/position-analyst/todo.json")
def derive_fix_action(detail, msg):
"""根据issue信息推导可执行的修复命令"""
# 小果扫描 error → 验证脚本是否存在
if "xiaoguo_scanner" in msg or "小果扫描" in msg:
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/xiaoguo_scanner.py 2>&1 && echo 'ok'"
# system-audit error → 验证拷贝
if "system_audit" in msg or "系统审计" in msg:
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/system_audit.py 2>&1"
# cron errorslast_status=error)→ 验证文件存在,等下次cron运行自动恢复
if "cron" in msg.lower() and "error" in msg.lower() and ("小果" in msg or "系统审计" in msg):
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/xiaoguo_scanner.py /home/hmo/.hermes/profiles/position-analyst/scripts/system_audit.py 2>&1"
# 港股汇率 → 刷新
if "港股汇率" in msg:
return f"cd {BASE} && python3 hk_rate.py 2>&1"
# 价格监控无事件 → 检查进程
if "价格监控" in msg and "0 rows" in msg:
return "ps aux | grep price_monitor | grep -v grep | head -3"
# delivery目标缺失 → 改为local
if "deliver" in msg.lower() or "delivery" in msg.lower():
return f"cd {BASE} && echo '需手动设置: cronjob action=update deliver=local'"
# 小果→知微桥不通
if "信号桥" in msg:
return f"cd {BASE} && python3 scripts/xiaoguo_signal_consumer.py 2>&1"
return None
def auto_fix_issue(issue):
"""对明确可自动修复的问题执行修复,返回 (fixed, fix_msg)"""
item_id = issue.get("detail", "")
msg = issue.get("msg", "")
# 港股汇率缓存缺失 → 生成
if "港股汇率缓存" in msg and "missing" in msg:
try:
# hk_rate.py 写入 ~/.cache/hk_exchange_rate.jsonprofile环境下解析到 profile/home/.cache/
r = subprocess.run(
["python3", str(BASE / "hk_rate.py")],
capture_output=True, text=True, timeout=15
)
if r.returncode == 0:
return True, f"已自动刷新港股汇率缓存: {r.stdout.strip()}"
else:
return False, f"汇率刷新失败: {r.stderr[:100]}"
except Exception as e:
return False, f"汇率刷新异常: {e}"
# 价格监控今天无事件(交易日盘中)→ 检查进程
if "价格监控" in msg and "0 rows" in msg:
now = ctx["started_at"]
if now.weekday() < 5 and 9 <= now.hour <= 15:
# 交易时段,应该有事
ok, detail = check_process("price_monitor")
if not ok:
return True, "已检测:price_monitor进程不存在(需人工介入)"
return True, "已确认:price_monitor进程运行中,但今日无事件(可能无价格触发)"
# 非交易时段→正常
return True, "非交易时段无价格事件属正常"
# 其他问题→不可自动修复
return False, "需人工处理"
def write_todos_for_issues():
"""将体检发现的异常写入 TODO 系统(去重、升级),先尝试自动修复"""
try:
if not ctx["report"]:
return
# 只有 error/critical/warn 才处理
issues = [e for e in ctx["report"] if e["level"] in ("critical", "error", "warn")]
if not issues:
return
# 先尝试自动修复
fixed_issues = []
remaining = []
for issue in issues:
fixed, fix_msg = auto_fix_issue(issue)
if fixed:
fixed_issues.append((issue, fix_msg))
log("ok", issue["category"], f"已自动修复: {fix_msg}", issue.get("detail",""))
else:
remaining.append(issue)
# 输出修复摘要
if fixed_issues:
print()
print("🛠️ 自动修复:")
for issue, fix_msg in fixed_issues:
print(f"{issue['category']}: {fix_msg}")
# 剩余的无法自动修复的→写TODO到数据库
if not remaining:
return
try:
conn = sqlite3.connect(str(DB_PATH))
todo_priority = {"critical": "high", "error": "medium", "warn": "low"}
new_count = 0
for issue in remaining:
title = f"[体检发现] {issue['msg']}"
level = issue["level"]
pri = todo_priority.get(level, "medium")
# 去重:检查是否已存在(含completed的也要查,避免重复加)
r_exist = conn.execute(
"SELECT id, status FROM todos WHERE title=?",
(title,)
).fetchone()
if r_exist:
if r_exist[1] == "blocked":
# 已阻塞的重新打开
conn.execute(
"UPDATE todos SET status='pending', priority=?, note='已重新打开', updated_at=CURRENT_TIMESTAMP WHERE id=?",
(pri, r_exist[0])
)
else:
# 生成fix_action(必须非空)
fix_action = derive_fix_action(issue.get("detail", ""), issue.get("msg", ""))
if not fix_action:
# 没有fix_action就不创建TODO,直接输出到报告里
print(f" ⚠️ 无法自动修复: [{pri}] {title[:60]}")
print(f" 原因: 未知修复方案,需人工分析")
continue
conn.execute(
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
"VALUES (?, ?, ?, 'health_check', 'pending', ?)",
(title,
f"体检发现于 {ctx['started_at'].strftime('%Y-%m-%d %H:%M')}\n分类: {issue['category']}\n详情: {issue.get('detail', '')}\n无法当场修复原因: 需验证/需等待",
pri, fix_action)
)
new_count += 1
conn.commit()
if new_count > 0:
print()
print(f"📋 已加入TODO({new_count}条):")
for r2 in conn.execute(
"SELECT title, priority FROM todos WHERE status='pending' AND source='health_check' "
"ORDER BY created_at DESC LIMIT ?", (new_count,)
).fetchall():
print(f" [{r2[1]}] {r2[0][:70]}")
conn.close()
except Exception as e:
print(f" TODO写入异常: {e}")
except Exception as e:
pass # TODO 写入失败不阻碍体检主流程
# 异常缓存(同一问题24h内不重复推)
KNOWN_ISSUES_PATH = DATA / "health_known_issues.json"
# ── 上下文 ──
ctx = {
"report": [],
"issues": [],
"ok_count": 0,
"warn_count": 0,
"error_count": 0,
"critical_count": 0,
"started_at": datetime.now(),
}
def log(level, category, msg, detail=None):
"""记录检查结果"""
ctx["report"].append({
"level": level, "category": category, "msg": msg, "detail": detail,
"timestamp": datetime.now().isoformat()
})
if level == "critical":
ctx["critical_count"] += 1
elif level == "error":
ctx["error_count"] += 1
elif level == "warn":
ctx["warn_count"] += 1
else:
ctx["ok_count"] += 1
def emit(msg, level="ok"):
"""输出一行"""
prefix = {"critical": "🔴", "error": "", "warn": "⚠️", "ok": "", "info": "📎"}.get(level, "")
return f"{prefix} {msg}"
# ── 检查器集合 ──
def check_systemctl(service_name):
"""检查systemd服务状态"""
try:
r = subprocess.run(["systemctl", "is-active", service_name],
capture_output=True, text=True, timeout=5)
status = r.stdout.strip()
return status == "active", f"{status}"
except Exception as e:
return False, f"error:{e}"
def check_port(port):
"""检查端口是否在监听"""
try:
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
return f":{port}" in r.stdout, "listening" if f":{port}" in r.stdout else "not_found"
except Exception as e:
return False, f"error:{e}"
def check_process(pattern):
"""检查进程是否存在"""
try:
r = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5)
return r.returncode == 0, "running" if r.returncode == 0 else "not_found"
except:
return False, "check_error"
def check_http(url, timeout=15):
"""检查HTTP端点是否可达 (清理代理环境变量)"""
try:
# 清理所有代理环境变量
old_env = {}
for k in list(os.environ.keys()):
if 'proxy' in k.lower():
old_env[k] = os.environ.pop(k)
req = urllib.request.Request(url, method="GET")
resp = urllib.request.urlopen(req, timeout=timeout)
# 恢复
for k, v in old_env.items():
os.environ[k] = v
return True, str(resp.status)
except Exception as e:
return False, str(e)[:60]
def check_disk(mount):
"""检查磁盘空间"""
try:
r = subprocess.run(["df", "-h", mount], capture_output=True, text=True, timeout=5)
lines = r.stdout.strip().split("\n")
if len(lines) >= 2:
parts = lines[1].split()
if len(parts) >= 5:
pct = parts[4].replace("%", "")
return int(pct) < 90, f"{pct}% used"
return False, "parse_error"
except:
return False, "check_error"
def check_file_exists(path):
"""检查文件存在"""
p = Path(path)
exists = p.exists()
return exists, f"{p.stat().st_size}B" if exists else "missing"
def check_file_freshness(path, max_hours):
"""检查文件新鲜度"""
p = Path(path)
if not p.exists():
return False, "missing"
mtime = datetime.fromtimestamp(p.stat().st_mtime)
hours_ago = (ctx["started_at"] - mtime).total_seconds() / 3600
return hours_ago < max_hours, f"{hours_ago:.0f}h ago (threshold {max_hours}h)"
def check_db_table_count(table, field, value, op="today", threshold=0):
"""检查数据库中的记录数"""
try:
conn = sqlite3.connect(str(DB_PATH))
cur = conn.cursor()
if op == "today":
today = ctx["started_at"].strftime("%Y-%m-%d")
# 先检查表有哪些列
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
date_col = None
for candidate in ["created_at", "date", "timestamp", "last_scanned_at", "signal_date"]:
if candidate in cols:
date_col = candidate
break
if not date_col:
conn.close()
return True, f"no_date_col_in_{table}"
sql = f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?"
cur.execute(sql, (today,))
elif op == "unprocessed":
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
if "processed" in cols:
sql = f"SELECT COUNT(*) FROM {table} WHERE (processed = 0 OR processed IS NULL)"
elif "source" in cols:
sql = f"SELECT COUNT(*) FROM {table} WHERE source LIKE '%xiaoguo%'"
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
elif op == "count":
if field:
sql = f"SELECT COUNT(*) FROM {table} WHERE {field} = ?"
cur.execute(sql, (value,))
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
else:
sql = f"SELECT COUNT(*) FROM {table}"
cur.execute(sql)
count = cur.fetchone()[0]
conn.close()
if op == "unprocessed":
return count < threshold, f"{count} unprocessed"
return count >= threshold, f"{count} rows"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_cron(job_id):
"""检查cron任务状态(通过jobs.json"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if cron_jobs_path.exists():
data = json.loads(cron_jobs_path.read_text())
for job in data.get("jobs", []):
if job.get("id") == job_id or (not job.get("id") and job.get("name") and job_id in str(job)):
enabled = job.get("enabled", True)
if not enabled:
return False, "disabled"
last_status = job.get("last_status")
if last_status and last_status != "ok":
return False, f"status={last_status}"
last_run = job.get("last_run_at", "")
if last_run:
try:
last_dt = datetime.fromisoformat(last_run)
hours_ago = (ctx["started_at"] - last_dt).total_seconds() / 3600
if hours_ago > 48:
return True, f"ok(stale:{hours_ago:.0f}h)"
except:
pass
return True, "ok"
# 没找到该job_id - 可能是不需要检查的cron
return True, "not_in_jobs_json"
return False, "no_jobs_json"
except Exception as e:
return False, f"check_error:{str(e)[:60]}"
def check_cron_errors_last24h():
"""检查最近24h内cron是否有error状态"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()
errors = []
for job in data.get("jobs", []):
last_status = job.get("last_status")
last_run = job.get("last_run_at", "")
if last_status and last_status != "ok" and last_run:
try:
if last_run >= check_time[:19]:
errors.append(f"{job.get('name','?')}({last_status})")
except:
pass
if errors:
return False, f"{len(errors)} errors: {'; '.join(errors[:5])}"
return True, "0 errors"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_cron_paused():
"""检查不应暂停的cron是否被误暂停"""
should_run = [
("3a9fb3300a6a", "价格监控"),
("0851c7838ca3", "小果扫描"),
("e13323928f3a", "自选提醒"),
("b809fcabfa5b", "分支评估"),
]
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
job_map = {job.get("id"): job for job in data.get("jobs", [])}
paused = []
for jid, name in should_run:
job = job_map.get(jid)
if job and not job.get("enabled", True):
paused.append(name)
if paused:
return False, f"paused: {', '.join(paused)}"
return True, "all_expected_running"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_delivery_targets():
"""检查deliver=origin的cron是否有目标"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
issues = []
for job in data.get("jobs", []):
last_delivery_err = job.get("last_delivery_error", "")
if last_delivery_err and "delivery" in last_delivery_err.lower():
issues.append(f"{job.get('name','?')}")
if issues:
return False, f"{len(issues)} issues: {', '.join(issues[:3])}"
return True, "all_ok"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_cron_audit():
"""审计全部cron:最近24h内是否运行过"""
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if not cron_jobs_path.exists():
return True, "no_jobs_json"
data = json.loads(cron_jobs_path.read_text())
check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()[:19]
stale = []
for job in data.get("jobs", []):
name = job.get("name", "?")
enabled = job.get("enabled", True)
script = job.get("script", "")
last_run = job.get("last_run_at", "")
last_status = job.get("last_status")
if not enabled or not script:
continue
if not last_run:
stale.append(f"{name}(从未运行)")
continue
if last_run[:19] < check_time:
if last_status and last_status == "ok":
stale.append(f"{name}(>24h未运行)")
else:
stale.append(f"{name}(>24h+状态异常)")
if stale:
return False, f"{len(stale)}个cron异常: {'; '.join(stale[:5])}"
total = sum(1 for j in data.get("jobs",[]) if j.get("enabled") and j.get("script"))
return True, f"全部{total}个cron正常"
except Exception as e:
return True, f"skip({str(e)[:60]})"
def check_meta_health_check_yesterday():
"""元检:昨天体检是否正常完成"""
try:
history = []
if HISTORY_PATH.exists():
history = json.loads(HISTORY_PATH.read_text())
yesterday = (ctx["started_at"] - timedelta(days=1)).strftime("%Y-%m-%d")
for h in history[-30:]:
ts = h.get("timestamp", "")
if ts[:10] == yesterday:
if h.get("error", 0) == 0 and h.get("critical", 0) == 0:
return True, f"昨日体检通过({h.get('ok',0)}项正常)"
return True, f"昨日体检有{h.get('error',0)}错误+{h.get('critical',0)}严重(已记录)"
return True, "无昨日记录(首次运行)"
except:
return True, "skip"
def check_meta_checklist_completeness():
"""元检:检查清单是否覆盖了所有已知组件"""
try:
added = ctx.get("auto_discovered_items", [])
if added:
return True, f"自动发现并追加了{len(added)}个新组件到清单"
return True, "清单覆盖完整"
except:
return True, "skip"
# ── 自动发现 ──
def self_discovery():
"""自动发现新增组件并更新checklist"""
discovered = []
# 1. 发现新增cron任务
try:
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
if cron_jobs_path.exists():
data = json.loads(cron_jobs_path.read_text())
all_crons = [(j.get("id"), j.get("name"), j.get("schedule"), j.get("script"))
for j in data.get("jobs", [])]
# 读当前checklist中已有的cron ID
checklist = json.loads(CHECKLIST_PATH.read_text())
known_cron_ids = set()
for cat in checklist["categories"]:
for item in cat["items"]:
if item["check"].startswith("cron:"):
known_cron_ids.add(item["check"].split(":")[1])
for jid, name, schedule, script in all_crons:
if jid and jid not in known_cron_ids and script:
# 新cron任务,追加到pipeline类
discovered.append(f"新cron: {name}({jid})")
for cat in checklist["categories"]:
if cat["id"] == "pipeline":
cat["items"].append({
"id": f"cron-auto-{jid[:8]}",
"description": f"{name} cron 已调度",
"check": f"cron:{jid}",
"expected": "enabled+ok",
"severity": "medium",
"auto_discovered": True
})
break
if discovered:
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
except Exception as e:
discovered.append(f"cron_discovery_error: {e}")
return discovered
# ── 主流程 ──
def run_check(item):
"""运行单个检查项"""
check_spec = item["check"]
expected = item["expected"]
if check_spec.startswith("systemctl:"):
service = check_spec.split(":", 1)[1]
ok, detail = check_systemctl(service)
elif check_spec.startswith("port:"):
port = check_spec.split(":", 1)[1]
ok, detail = check_port(port)
elif check_spec.startswith("proc:"):
pattern = check_spec.split(":", 1)[1]
ok, detail = check_process(pattern)
elif check_spec.startswith("http:"):
url = check_spec.split(":", 1)[1]
ok, detail = check_http(url)
elif check_spec.startswith("disk:"):
mount = check_spec.split(":", 1)[1]
ok, detail = check_disk(mount)
elif check_spec.startswith("fileexists:"):
path = check_spec.split(":", 1)[1]
ok, detail = check_file_exists(path)
elif check_spec.startswith("filefresh:"):
# filefresh:path:max_hours
parts = check_spec.split(":", 2)
path = parts[1]
max_hours = float(parts[2].replace("h", ""))
ok, detail = check_file_freshness(path, max_hours)
elif check_spec.startswith("db:"):
# db:table:field:value:op:threshold
parts = check_spec.split(":", 5)
table = parts[1]
field = parts[2] if len(parts) > 2 else None
value = parts[3] if len(parts) > 3 else None
op = parts[4] if len(parts) > 4 else "today"
threshold = int(parts[5]) if len(parts) > 5 else 0
ok, detail = check_db_table_count(table, field, value, op, threshold)
elif check_spec.startswith("cron:"):
job_id = check_spec.split(":", 1)[1]
ok, detail = check_cron(job_id)
elif check_spec == "cron_errors:last24h":
ok, detail = check_cron_errors_last24h()
elif check_spec == "cron_paused:check":
ok, detail = check_cron_paused()
elif check_spec == "delivery:origin_targets":
ok, detail = check_delivery_targets()
elif check_spec == "cron_audit:all":
ok, detail = check_cron_audit()
elif check_spec == "meta:health_check_yesterday":
ok, detail = check_meta_health_check_yesterday()
elif check_spec == "meta:checklist_completeness":
ok, detail = check_meta_checklist_completeness()
elif check_spec == "pipeline:xiaoguo_signal_flow":
# 综合检查:小果有数据→被我处理
today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0)
unproc, d2 = check_db_table_count("signal_news", None, None, "unprocessed", 30)
ok = today_xiaoguo or unproc
detail = f"today_xiaoguo={d1}, unprocessed={d2}"
else:
ok = False
detail = f"unknown_check:{check_spec}"
level = "ok" if ok else item["severity"]
# 将critical/error/high都映射到error级别
if not ok:
if item["severity"] == "critical":
level = "critical"
elif item["severity"] in ("high", "error"):
level = "error"
else:
level = "warn"
return ok, level, detail
def main():
show_full = "--report" in sys.argv
update = "--update-checklist" in sys.argv
start_time = time.time()
# 加载checklist
if not CHECKLIST_PATH.exists():
print("[SILENT] health_checklist.json 不存在")
return
checklist = json.loads(CHECKLIST_PATH.read_text())
# 自动发现(每小时仅运行一次)
if update:
discovered = self_discovery()
else:
# 定期自动发现(检查上次扫描时间)
meta = checklist.get("meta", {})
last_scan = meta.get("last_full_scan")
if not last_scan or (ctx["started_at"] - datetime.fromisoformat(last_scan)).total_seconds() > 3600:
discovered = self_discovery()
checklist["meta"]["last_full_scan"] = ctx["started_at"].isoformat()
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
else:
discovered = []
# 按分类逐项检查
dayname = ["","","","","","",""][ctx["started_at"].weekday()]
lines = [f"MoFin 系统体检 | {ctx['started_at'].strftime('%Y-%m-%d')}{dayname} | {ctx['started_at'].strftime('%H:%M')}"]
lines.append("" * 50)
for cat in checklist["categories"]:
cat_issues = 0
cat_lines = []
for item in cat["items"]:
ok, level, detail = run_check(item)
msg = f"{item['description']}: {detail}"
log(level, cat["name"], msg, item["id"])
cat_lines.append(emit(msg, level))
if not ok:
cat_issues += 1
# 只在该分类有问题或--report时才输出
if cat_issues > 0 or show_full:
lines.append(f"\n{cat['name']}")
lines.extend(cat_lines)
# 自动发现结果
if discovered:
lines.append(f"\n📎 自动发现:")
for d in discovered:
lines.append(f" {d}")
# 汇总
total = ctx["ok_count"] + ctx["warn_count"] + ctx["error_count"] + ctx["critical_count"]
if total == 0:
total = 1 # 避免除以0
lines.append(f"\n{'' * 50}")
critical = ctx["critical_count"]
errors = ctx["error_count"]
warns = ctx["warn_count"]
ok_count = ctx["ok_count"]
# 构建严重级别输出
severity_parts = []
if critical > 0:
severity_parts.append(f"🔴{critical}严重")
if errors > 0:
severity_parts.append(f"{errors}错误")
if warns > 0:
severity_parts.append(f"⚠️{warns}警告")
if ok_count > 0:
severity_parts.append(f"{ok_count}正常")
lines.append(f"总计: {' | '.join(severity_parts)} ({(time.time()-start_time):.0f}s)")
report = "\n".join(lines)
# 保存历史
try:
history = []
if HISTORY_PATH.exists():
history = json.loads(HISTORY_PATH.read_text())
history.append({
"timestamp": ctx["started_at"].isoformat(),
"ok": ok_count, "warn": warns, "error": errors, "critical": critical,
"duration_s": round(time.time() - start_time, 1)
})
# 保留最近30天
if len(history) > 90:
history = history[-90:]
HISTORY_PATH.write_text(json.dumps(history, ensure_ascii=False, indent=2))
except:
pass
# 输出
# no_agent模式:有问题才出声;--report则强制输出
has_issues = critical > 0 or errors > 0 or warns > 0
if has_issues or show_full:
print(report)
else:
print(f"[SILENT] MoFin 体检通过 | {ok_count}/{total} 检查正常 ({(time.time()-start_time):.0f}s)")
# 如果有严重问题,额外输出可读摘要
if critical > 0 or errors > 0:
print()
print("🔴 需立即处理的问题:")
for entry in ctx["report"]:
if entry["level"] in ("critical", "error"):
print(f" [{entry['level'].upper()}] {entry['category']}: {entry['msg']}")
# 检查是否有执行器升级来的TODO(通知失败挂起的)
try:
conn2 = sqlite3.connect(str(DB_PATH))
needs_llm = conn2.execute(
"SELECT id, title, priority, created_at, note FROM todos "
"WHERE status='needs_llm' "
"ORDER BY CASE priority WHEN 'high' THEN 0 WHEN 'medium' THEN 1 ELSE 2 END, created_at ASC LIMIT 10"
).fetchall()
if needs_llm:
print()
print("🔶 需知微介入(执行器无法自动修复):")
for n in needs_llm:
note = (n[4] or "")[:60]
print(f" [{n[2]}] #{n[0]} {n[1][:60]}{note}")
conn2.close()
except:
pass
# 将异常写入 TODO 系统
write_todos_for_issues()
if __name__ == "__main__":
main()
+119
View File
@@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""self_todo_executor.py — TODO自动执行器 (no_agent模式)
每10分钟轮询pending TODOs,执行fix_action。
成功→completed。失败→调gateway API,带完整上下文让知微处理。
"""
import json, sqlite3, subprocess, time, urllib.request
from pathlib import Path
from datetime import datetime
BASE = Path("/home/hmo/MoFin")
DB_PATH = BASE / "data" / "mofin.db"
GATEWAY_URL = "http://localhost:8643/v1/chat/completions"
GATEWAY_KEY = "hermes123"
def send_xmpp(msg):
"""通过zhiwei发XMPP消息给Dad"""
try:
subprocess.run(
["hermes", "send", "--to", "xmpp:hmo@yoin.fun", msg],
capture_output=True, text=True, timeout=15
)
except:
pass
def main():
start = time.time()
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT id, title, description, fix_action FROM todos WHERE status='pending' "
"ORDER BY CASE priority WHEN 'high' THEN 0 WHEN 'medium' THEN 1 ELSE 2 END, "
"created_at ASC LIMIT 3"
).fetchall()
if not rows:
conn.close()
print("[SILENT] 无待处理TODO")
return
for row in rows:
tid = row["id"]
title = row["title"]
desc = row["description"] or ""
fix = row["fix_action"]
conn.execute("UPDATE todos SET status='in_progress' WHERE id=?", (tid,))
conn.commit()
if not fix:
# 无修复方案 → 带完整描述调gateway
context = f"[自愈执行器] 系统体检发现以下问题,无自动修复方案,需分析处理。\n\n问题: {title}\n\n详情:\n{desc}".strip()
send_xmpp(f"📋 TODO已创建(无自动修复): {title[:80]}")
else:
# 执行修复命令
try:
r = subprocess.run(fix, shell=True, capture_output=True, text=True, timeout=30)
if r.returncode == 0:
conn.execute("UPDATE todos SET status='completed', note=? WHERE id=?",
(f"已修复: {r.stdout.strip()[:200]}", tid))
conn.commit()
send_xmpp(f"✅ TODO修复成功: {title[:80]}")
print(f"{title}: 已修复")
continue
output = r.stderr.strip()[:500] or r.stdout.strip()[:500]
except subprocess.TimeoutExpired:
output = "执行超时(30s)"
except Exception as e:
output = str(e)[:200]
context = (
f"[自愈执行器] 尝试自动修复失败,需知微分析处理。\n\n"
f"问题: {title}\n\n"
f"详情:\n{desc}\n\n"
f"尝试的修复命令: {fix}\n"
f"失败输出: {output}\n\n"
f"请分析失败原因并完成修复,完成后标记TODO #{tid} 为 completed。"
)
# 调gateway让知微处理(带完整上下文)
payload = json.dumps({
"model": "default",
"messages": [{"role": "user", "content": context}],
"max_tokens": 1000,
}).encode()
req = urllib.request.Request(GATEWAY_URL, data=payload,
headers={"Content-Type": "application/json",
"Authorization": f"Bearer {GATEWAY_KEY}"})
try:
resp = urllib.request.urlopen(req, timeout=120)
reply = json.loads(resp.read())
result = reply["choices"][0]["message"]["content"][:500]
conn.execute("UPDATE todos SET status='completed', note=? WHERE id=?",
(f"知微已处理: {result[:200]}", tid))
send_xmpp(f"🔶 TODO需知微处理: {title[:60]}\n{result[:200]}")
print(f" 🔶 {title}")
print(f" {result[:300]}")
except Exception as e:
conn.execute("UPDATE todos SET status='pending', note=? WHERE id=?",
(f"调用知微失败: {str(e)[:100]},下次再试", tid))
send_xmpp(f"⚠️ TODO处理失败(将重试): {title[:60]}\n{str(e)[:100]}")
print(f" ⚠️ {title}: gateway API调用失败,下次再试")
conn.commit()
conn.close()
if rows:
print(f"自愈执行器 | {datetime.now().strftime('%H:%M')} | {len(rows)}条 ({time.time()-start:.0f}s)")
else:
print("[SILENT] 无待处理TODO")
if __name__ == "__main__":
main()
+252
View File
@@ -0,0 +1,252 @@
#!/usr/bin/env python3
"""xiaoguo_signal_consumer.py — 知微消费小果扫描信号
盘中每30分钟运行,读取 signal_news 表中未处理的 xiaoguo 信号,
做五维快速评估后决定:加自选 / 关注 / 跳过。
管道位置:
xiaoguo_scanner (每5分) → signal_news → 本脚本 → 知微分析报告
no_agent模式:有发现→输出,无→静默
"""
import json, os, sqlite3, sys, time, urllib.request
from pathlib import Path
from datetime import datetime
BASE = Path("/home/hmo/MoFin")
DATA = BASE / "data"
DB_PATH = DATA / "mofin.db"
# 自选池和决策文件
WATCHLIST_PATH = DATA / "watchlist.json"
DECISIONS_PATH = DATA / "decisions.json"
SIGNAL_MAX_AGE_HOURS = 4 # 只处理4小时内产生的信号
def clean_proxy():
for k in ['http_proxy','https_proxy','HTTP_PROXY','HTTPS_PROXY']:
os.environ.pop(k, None)
def fetch_quote(code):
"""拉腾讯行情,返回 dict"""
try:
prefix = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk"
url = f"http://qt.gtimg.cn/q={prefix}{code}"
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
resp = urllib.request.urlopen(req, timeout=5).read().decode('gbk')
fld = resp.split('=')[1].strip().strip('"').strip(';').split('~')
return {
"name": fld[1] if len(fld) > 1 else "",
"code": code,
"price": float(fld[3]) if len(fld) > 3 else 0,
"change_pct": float(fld[32]) if len(fld) > 32 else 0,
"pe": float(fld[39]) if len(fld) > 39 and fld[39] else 0,
"turnover": float(fld[38]) if len(fld) > 38 and fld[38] else 0,
}
except Exception as e:
return {"code": code, "error": str(e)[:60]}
def is_in_portfolio(conn, code):
"""检查是否已在持仓或自选中"""
code_stripped = code.lstrip("0")
cur = conn.execute("SELECT COUNT(*) FROM holdings WHERE code=? AND is_active=1", (code_stripped,))
if cur.fetchone()[0] > 0:
return "holdings"
cur = conn.execute("SELECT COUNT(*) FROM watchlist_stocks WHERE code=?", (code,))
if cur.fetchone()[0] > 0:
return "watchlist"
# 也检查 watchlist.json
try:
wl = json.loads(WATCHLIST_PATH.read_text())
for s in wl.get("stocks", []):
if s.get("code") == code or s.get("code", "").lstrip("0") == code_stripped:
return "watchlist"
except:
pass
return None
def quick_assess(quote):
"""五维快速评估(自动版)"""
score = 0
reasons = []
# 大盘环境(简化:交易日9:30-15:00且在涨)
# 引用 macro_context.json 中的大盘方向
try:
mc = json.loads((DATA / "macro_context.json").read_text())
sh = mc.get("shanghai", {}).get("change_pct", 0)
if sh > 0.5:
score += 1
reasons.append(f"大盘+{sh:.1f}%偏强")
elif sh < -0.5:
score -= 1
reasons.append(f"大盘{sh:.1f}%偏弱")
except:
pass
# 技术面:涨跌幅
chg = quote.get("change_pct", 0)
if chg > 3:
score += 1
reasons.append(f"涨幅+{chg:.1f}%偏强")
elif chg < -3:
score -= 1
reasons.append(f"跌幅{chg:.1f}%偏弱")
else:
score += 0.5
reasons.append(f"走势平稳{chg:+.1f}%")
# 基本面:PE
pe = quote.get("pe", 0)
if 5 < pe < 40:
score += 1
reasons.append(f"PE={pe:.0f}合理")
elif pe <= 0:
score -= 0.5
reasons.append("PE为负")
elif pe > 100:
score -= 0.5
reasons.append(f"PE={pe:.0f}偏高")
# 量能
turn = quote.get("turnover", 0)
if turn > 5:
score += 0.5
reasons.append(f"换手{turn:.1f}%活跃")
elif turn < 0.5:
score -= 0.3
reasons.append(f"换手{turn:.1f}%偏低")
return score, reasons
def evaluate_and_act(signal, quote):
"""评估信号并决定操作"""
status_in = is_in_portfolio(get_conn(), signal.get("code", ""))
if status_in:
return f"已在{status_in}中,跳过", None
score, reasons = quick_assess(quote)
if score >= 1.5:
action = "watchlist"
summary = f"加自选: {' | '.join(reasons)}"
elif score >= 0:
action = "monitor"
summary = f"关注: {' | '.join(reasons)}"
else:
action = "skip"
summary = f"跳过(评分{score:.1f}): {' | '.join(reasons)}"
return summary, action
def get_conn():
import sqlite3
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def mark_processed(conn, signal_id):
conn.execute("UPDATE signal_news SET processed=1 WHERE id=?", (signal_id,))
conn.commit()
def main():
clean_proxy()
start = time.time()
today = datetime.now().strftime("%Y-%m-%d")
conn = get_conn()
# 读未处理 xiaoguo 信号(今日)
rows = conn.execute(
"SELECT id, sector, overall_sentiment, summary, key_articles, searched_stocks, source "
"FROM signal_news "
"WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL) "
"AND date(created_at) = ? "
"ORDER BY created_at DESC LIMIT 20",
(today,)
).fetchall()
if not rows:
conn.close()
print("[SILENT] 今日无未处理小果信号")
return
# 尝试从 searched_stocks 提取股票代码
results = []
for r in rows:
try:
searched = json.loads(r["searched_stocks"]) if r["searched_stocks"] else []
except:
searched = []
# 从 sector 字段取股票名
sector_name = r["sector"] or ""
# 尝试提取代码
codes_found = []
for s in searched:
# searched_stocks 存的是股票名称列表
# 尝试从 summary 里找代码
import re
codes = re.findall(r'\d{6}', r["summary"] or "")
codes_found.extend(codes)
if not codes_found:
# 没有直接代码,用名称去查
mark_processed(conn, r["id"])
continue
code = codes_found[0]
quote = fetch_quote(code)
summary, action = evaluate_and_act(dict(r), quote)
if action == "watchlist":
# 加自选
results.append(f"{sector_name}({code}): {summary}")
# 写入 watchlist.json
try:
wl = json.loads(WATCHLIST_PATH.read_text())
wl.setdefault("stocks", [])
# 检查是否已在
existing = [s for s in wl["stocks"] if s.get("code") == code]
if not existing:
wl["stocks"].append({
"code": code,
"name": quote.get("name", sector_name),
"price": quote.get("price", 0),
"status": "watching",
"source": "xiaoguo_scanner",
"added_at": today,
})
WATCHLIST_PATH.write_text(json.dumps(wl, ensure_ascii=False, indent=2))
except:
pass
elif action == "monitor":
results.append(f"🔄 {sector_name}({code}): {summary}")
else:
results.append(f"⏭️ {sector_name}({code}): {summary}")
mark_processed(conn, r["id"])
conn.close()
elapsed = time.time() - start
if results:
print(f"小果信号消费 | {today} | {len(results)}条处理 ({elapsed:.0f}s)")
for r in results:
print(f" {r}")
else:
print("[SILENT] 小果信号消费结束")
if __name__ == "__main__":
main()
+4 -2
View File
@@ -231,8 +231,10 @@ def check_stock(code, name, articles):
if s in reply: if s in reply:
return True, s return True, s
return True, "中性" return True, "中性"
except: except Exception as e:
pass # LLM不可达 → 降级:标记为unknown,不阻塞扫描流程
print(f" ⚠️ 小果LLM不可达({str(e)[:30]}),降级为unknown", flush=True)
return True, "unknown"
return None, None return None, None