Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5a7335e583 | |||
| 9b878001af | |||
| d27dbcc6ad | |||
| 077f683878 | |||
| 7a6fb103cb | |||
| 0a7ad20f54 | |||
| 629f154829 | |||
| 526fcc4412 | |||
| b766a5dbb1 | |||
| 11254c8834 | |||
| b63c4f5879 | |||
| a55d241f30 | |||
| a76240b52d | |||
| 26993c1d41 | |||
| 4407f35027 | |||
| a24505ebef |
+786
-910
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,116 @@
|
||||
---
|
||||
name: morning-health-check
|
||||
title: MoFin 系统常规体检机制
|
||||
description: 每日开盘前8:00全面扫荡式系统体检,含分层分类检查清单、自动发现新增组件、问题推送
|
||||
trigger: 交易日 8:00 AM 自动运行
|
||||
---
|
||||
|
||||
# MoFin 系统常规体检机制
|
||||
|
||||
## 设计目的
|
||||
|
||||
"人的常规体检,不是因为发现问题去针对性检查,而是定期的、全面的、扫荡式的检查。"
|
||||
|
||||
核心机制:
|
||||
1. **health_checklist.json** — 可动态维护的检查清单(新增功能自动加入)
|
||||
2. **morning_health_check.py** — 每日8:00开盘前运行,逐项比对
|
||||
3. **self_discovery()** — 自动发现新增cron任务并追加到检查清单
|
||||
4. **历史追踪** — health_check_history.json 保留90天体检记录
|
||||
|
||||
## 架构
|
||||
|
||||
```python
|
||||
morning_health_check.py (no_agent, 8:00 Cron)
|
||||
│
|
||||
├── 读 health_checklist.json (检查清单)
|
||||
│
|
||||
├── 分层检查 (7层)
|
||||
│ ├── 基础设施 — XMPP/Gateway/Dashboard/API/磁盘
|
||||
│ ├── SENSE — price_monitor/xiaoguo/宏观/汇率/板块
|
||||
│ ├── RESPOND — 推送cron/价格事件/信号积压
|
||||
│ ├── ADAPT — 策略重评/策略树/时效性
|
||||
│ ├── IMPROVE — 知识萃取/硬编码/审计/剪枝/元成长
|
||||
│ ├── 数据文件 — 全部关键JSON/DB文件新鲜度
|
||||
│ └── 管道完整性 — cron异常/误暂停/delivery目标/信号桥
|
||||
│
|
||||
├── self_discovery() — 自动发现新组件
|
||||
│ └── 对比jobs.json vs checklist中的cron ID
|
||||
│ └── 发现新cron → 自动追加到pipeline类
|
||||
│
|
||||
└── 输出 (no_agent规则)
|
||||
├── 有异常 → 打印详细报告 (推送给老爸)
|
||||
└── 正常 → [SILENT]
|
||||
```
|
||||
|
||||
## 检查清单 (health_checklist.json)
|
||||
|
||||
位于 `/home/hmo/MoFin/data/health_checklist.json`
|
||||
|
||||
7个分类,当前约43项检查。每个检查项包含:
|
||||
- id/description: 唯一标识和描述
|
||||
- check: 检查指令 (如 `cron:job_id`, `systemctl:service`, `port:8643`, `db:table:field::today:1`)
|
||||
- expected: 期望值
|
||||
- severity: critical/high/medium/low
|
||||
|
||||
## 自维护机制
|
||||
|
||||
### 自动发现 (self_discovery)
|
||||
每次运行体检时,脚本自动:
|
||||
1. 读取 Hermes cron 的 jobs.json
|
||||
2. 对比 checklist 中已登记的 cron ID
|
||||
3. 发现新 cron 任务 → 自动追加到 pipeline 分类(标记 auto_discovered=true)
|
||||
|
||||
### 手动维护
|
||||
- 新功能加入系统后,应在 checklist 中追加相应检查项
|
||||
- 修改现有组件后,审视是否需要调整已有检查项的阈值/预期值
|
||||
|
||||
## 检查器类型
|
||||
|
||||
| 类型 | 格式 | 说明 |
|
||||
|------|------|------|
|
||||
| systemctl | `systemctl:service_name` | 检查systemd服务 |
|
||||
| port | `port:8888` | 端口监听检查 |
|
||||
| proc | `proc:pattern` | pgrep进程匹配 |
|
||||
| http | `http:url` | HTTP GET可达性 |
|
||||
| disk | `disk:/` | 磁盘使用率 |
|
||||
| fileexists | `fileexists:/path` | 文件是否存在 |
|
||||
| filefresh | `filefresh:/path:24h` | 文件新鲜度 |
|
||||
| db | `db:table:field::today:1` | 数据库记录数 |
|
||||
| cron | `cron:job_id` | Cron任务状态 |
|
||||
| cron_errors | `cron_errors:last24h` | 全局cron异常 |
|
||||
| cron_paused | `cron_paused:check` | 误暂停检查 |
|
||||
| delivery | `delivery:origin_targets` | 推送目标检查 |
|
||||
| pipeline | `pipeline:xiaoguo_signal_flow` | 综合管道检查 |
|
||||
|
||||
## 触发方式
|
||||
|
||||
- **自动**: cron `0 8 * * 1-5` (交易日8:00, no_agent模式)
|
||||
- **手动**: `python3 /home/hmo/MoFin/scripts/morning_health_check.py --report`
|
||||
- **更新清单**: `python3 /home/hmo/MoFin/scripts/morning_health_check.py --update-checklist`
|
||||
|
||||
## 输出格式
|
||||
|
||||
```
|
||||
MoFin 系统体检 | 2026-06-25 周四 | 08:00
|
||||
────────────────────────────────────────
|
||||
【基础设施层】
|
||||
✅ 知微 XMPP Bot: active
|
||||
❌ 小果 LLM API: timeout
|
||||
|
||||
... (只输出有问题的分类;正常分类仅在 --report 时显示)
|
||||
|
||||
【管道完整性】
|
||||
🔴 无异常cron状态(最近24h): 1 errors: 小果独立扫描(error)
|
||||
|
||||
────────────────────────────────────────
|
||||
总计: 🔴1严重 | ❌2错误 | ⚠️0警告 | ✅40正常 (15s)
|
||||
|
||||
需立即处理的问题:
|
||||
[ERROR] SENSE: 小果扫描 cron 已调度: status=error
|
||||
```
|
||||
|
||||
## 历史记录
|
||||
|
||||
保存在 `/home/hmo/MoFin/data/health_check_history.json`
|
||||
- 每次运行记录时间戳、各等级计数、耗时
|
||||
- 保留最近90条(约3个月)
|
||||
@@ -0,0 +1,135 @@
|
||||
# MoFin 自愈循环系统设计
|
||||
|
||||
> 版本: v1.0 | 最后更新: 2026-06-24
|
||||
> 核心理念:体检发现问题 → TODO追踪 → 自动修复 → 次日体检验证,形成无人值守的修复闭环。
|
||||
|
||||
## 一、自愈循环架构
|
||||
|
||||
```
|
||||
┌───────────────────────────────────┐
|
||||
│ morning_health_check.py │
|
||||
│ 每日 8:00(开盘前) │
|
||||
│ │
|
||||
│ 体检 7 层 43+ 项 │
|
||||
│ 发现问题 → 写 TODO │
|
||||
└──────────┬────────────────────────┘
|
||||
│ TODO: health_fix_xxx
|
||||
▼
|
||||
┌───────────────────────────────────┐
|
||||
│ self-todo cron │
|
||||
│ 每 2-3 小时自动执行 │
|
||||
│ │
|
||||
│ 读取 pending 任务 │
|
||||
│ 按优先级逐个处理 │
|
||||
│ 完成后 git commit + 标记完成 │
|
||||
└──────────┬────────────────────────┘
|
||||
│ git push
|
||||
▼
|
||||
┌───────────────────────────────────┐
|
||||
│ 次晨体检验证 │
|
||||
│ │
|
||||
│ 确认昨日问题已修复 → ✅ │
|
||||
│ 问题依然存在 → ⚠️ 升级标记 │
|
||||
└───────────────────────────────────┘
|
||||
```
|
||||
|
||||
## 二、健康检查 → TODO 对接
|
||||
|
||||
### 对接点
|
||||
|
||||
morning_health_check.py 发现异常后:
|
||||
|
||||
```
|
||||
1. 读当前 TODO 文件
|
||||
2. 检查是否已有相同 issue 的 TODO(去重)
|
||||
3. 如果没有 → 追加新 TODO:
|
||||
- status: pending
|
||||
- priority: 根据 severity(critical→high, error→medium, warn→low)
|
||||
- deps: 无
|
||||
- title: "[体检发现] {问题描述}"
|
||||
4. 如果有相同 TODO 且 status=completed →
|
||||
- 说明昨天标记修复但今天仍然有问题 → 重新打开
|
||||
- 修改 status=pending, priority 升一级
|
||||
```
|
||||
|
||||
### TODO 字段映射
|
||||
|
||||
| 体检 severity | TODO priority | 预期修复时间 |
|
||||
|---------------|---------------|-------------|
|
||||
| critical | high | 当前 session / 下一个 self-todo |
|
||||
| error | medium | 24h 内 |
|
||||
| warn | low | 48h 内 |
|
||||
|
||||
## 三、xiaoguo 信号桥(知微消费 xiaoguo_scanner 产出)
|
||||
|
||||
### 问题
|
||||
|
||||
xiaoguo_scanner 每5分钟写 `signal_news` 表,但知微没有任何组件读取它。
|
||||
|
||||
### 解决方案
|
||||
|
||||
新增 `xiaoguo_signal_consumer.py`(no_agent 脚本,每30分钟运行一次):
|
||||
|
||||
```
|
||||
xiaoguo_signal_consumer.py (每30分, 盘中)
|
||||
│
|
||||
├── 读 mofin.db signal_news 表
|
||||
│ WHERE source LIKE 'xiaoguo%'
|
||||
│ AND (processed=0 OR processed IS NULL)
|
||||
│ AND date(created_at) == today
|
||||
│
|
||||
├── 逐条处理:
|
||||
│ ├── source=xiaoguo(看多信号):
|
||||
│ │ ├── 检查是否已在自选/持仓
|
||||
│ │ ├── 拉腾讯行情(实时价/PE/涨跌幅)
|
||||
│ │ ├── 五维快速评估(大盘→行业→消息→基本面→技术)
|
||||
│ │ ├── 决策:加自选 / 关注 / 跳过
|
||||
│ │ └── 写入 decisions.json / watchlist.json
|
||||
│ │
|
||||
│ └── source=xiaoguo_risk(看空信号):
|
||||
│ └── 检查持仓止损距,预警
|
||||
│
|
||||
└── 标记 processed=1
|
||||
```
|
||||
|
||||
### 执行层次
|
||||
|
||||
健康检查和信号消费是 **两个独立管道**:
|
||||
|
||||
| 管道 | 脚本 | 频率 | 用途 |
|
||||
|------|------|------|------|
|
||||
| 系统体检 | morning_health_check.py | 每日8:00 | 检查全部组件健康状态 |
|
||||
| 信号消费 | xiaoguo_signal_consumer.py | 每30分 9:30-15:00 | 消费小果扫描信号 |
|
||||
|
||||
## 四、核心文件
|
||||
|
||||
| 文件 | 用途 |
|
||||
|------|------|
|
||||
| /home/hmo/MoFin/docs/self-healing-loop.md | 本文档 |
|
||||
| /home/hmo/MoFin/scripts/morning_health_check.py | 体检脚本(已存在,需修改) |
|
||||
| /home/hmo/MoFin/scripts/xiaoguo_signal_consumer.py | 信号消费脚本(新建) |
|
||||
| /home/hmo/.hermes/profiles/position-analyst/todo.json | TODO 系统数据文件 |
|
||||
|
||||
## 五、TODO 集成修改计划
|
||||
|
||||
### morning_health_check.py 修改项
|
||||
|
||||
在 main() 函数末尾追加 TODO 写入逻辑:
|
||||
|
||||
```python
|
||||
def write_todos_for_issues(report_entries):
|
||||
"""将体检发现的异常写入 TODO 系统"""
|
||||
todo_path = Path("/home/hmo/.hermes/profiles/position-analyst/todo.json")
|
||||
# 读现有 TODO
|
||||
# 去重(检查是否已有相同 issue)
|
||||
# 追加新 TODO
|
||||
# 写回
|
||||
```
|
||||
|
||||
### xiaoguo_signal_consumer.py 新建
|
||||
|
||||
30分钟盘中循环,消费 signal_news 中的未处理 xiaoguo 信号。
|
||||
|
||||
### self-todo cron 适配
|
||||
|
||||
确保 self-todo cron(b53435fbb38b 每小时多次运行)能识别并处理 `[体检发现]` 前缀的 TODO 项。
|
||||
@@ -0,0 +1,166 @@
|
||||
#!/usr/bin/env python3
|
||||
"""intraday_health_check.py — 盘中高频轻量自检 (no_agent)
|
||||
|
||||
每15分钟检查最关键的活动组件,只查会直接影响盘中运行的。
|
||||
发现问题→写TODO(消费管道与每日体检共享)。
|
||||
"""
|
||||
|
||||
import json, os, sqlite3, subprocess, urllib.request
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
BASE = Path("/home/hmo/MoFin")
|
||||
DATA = BASE / "data"
|
||||
DB_PATH = DATA / "mofin.db"
|
||||
CRON_JOBS = Path("/home/hmo/.hermes/profiles/position-analyst/cron/jobs.json")
|
||||
GATEWAY_URL = "http://localhost:8643/v1/chat/completions"
|
||||
GATEWAY_KEY = "hermes123"
|
||||
|
||||
ISSUES = []
|
||||
OK_COUNT = 0
|
||||
|
||||
|
||||
def log(ok, msg):
|
||||
global OK_COUNT
|
||||
if ok:
|
||||
OK_COUNT += 1
|
||||
else:
|
||||
ISSUES.append(msg)
|
||||
|
||||
|
||||
def check_port(port):
|
||||
try:
|
||||
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
|
||||
return f":{port}" in r.stdout
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
def check_http(url, timeout=8):
|
||||
try:
|
||||
for k in list(os.environ.keys()):
|
||||
if 'proxy' in k.lower():
|
||||
os.environ.pop(k)
|
||||
req = urllib.request.Request(url, method="GET")
|
||||
urllib.request.urlopen(req, timeout=timeout)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
def db_today_count(table, date_col):
|
||||
today = datetime.now().strftime("%Y-%m-%d")
|
||||
try:
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
r = conn.execute(f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?", (today,)).fetchone()
|
||||
conn.close()
|
||||
return r[0]
|
||||
except:
|
||||
return -1
|
||||
|
||||
|
||||
def check_xiaoguo():
|
||||
"""小果管道:进程/scanner有数据/API可达(降级不报错)"""
|
||||
# 进程 — 不一定有常驻进程(no_agent cron模式)
|
||||
# 数据 — 今日有扫描记录
|
||||
scans_today = db_today_count("xiaoguo_scan_tracker", "last_scanned_at")
|
||||
if scans_today <= 0:
|
||||
# 可能是小果离线了,不报严重,记录即可
|
||||
return
|
||||
# API — 不通时scanner已降级为unknown,不影响
|
||||
check_http("http://192.168.1.122:18003/v1/models")
|
||||
|
||||
|
||||
def check_price_monitor():
|
||||
"""价格监控:进程在跑 + 最近有数据"""
|
||||
# 进程检查
|
||||
r = subprocess.run(["pgrep", "-f", "price_monitor"], capture_output=True, timeout=5)
|
||||
process_alive = r.returncode == 0
|
||||
if not process_alive:
|
||||
log(False, "价格监控进程不在运行")
|
||||
return
|
||||
|
||||
# 数据新鲜度(最近10分钟是否有事件)
|
||||
try:
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
recent = conn.execute(
|
||||
"SELECT COUNT(*) FROM price_events WHERE created_at > datetime('now', '-10 minutes')"
|
||||
).fetchone()[0]
|
||||
conn.close()
|
||||
log(recent > 0, f"价格监控进程在跑,但最近10分钟无新事件")
|
||||
except:
|
||||
log(True, "价格监控进程在跑")
|
||||
|
||||
|
||||
def check_bots():
|
||||
zhiwei = subprocess.run(["systemctl", "is-active", "xmpp-zhiwei.service"],
|
||||
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
|
||||
xiaoguo = subprocess.run(["systemctl", "is-active", "xmpp-xiaoguo.service"],
|
||||
capture_output=True, text=True, timeout=5).stdout.strip() == "active"
|
||||
log(zhiwei, "知微XMPP Bot离线")
|
||||
log(xiaoguo, "小果XMPP Bot离线")
|
||||
|
||||
|
||||
def check_gateways():
|
||||
log(check_port(8643), "知微Gateway :8643 未监听")
|
||||
log(check_port(8645), "小果Gateway :8645 未监听")
|
||||
|
||||
|
||||
def check_signal_pipeline():
|
||||
"""信号从xiaoguo_scanner→signal_news→consumer是否通畅"""
|
||||
unproc = 0
|
||||
try:
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
r = conn.execute("SELECT COUNT(*) FROM signal_news WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL)").fetchone()
|
||||
unproc = r[0]
|
||||
conn.close()
|
||||
except:
|
||||
pass
|
||||
log(unproc < 30, f"信号堆积: {unproc}条未处理(需<30)")
|
||||
|
||||
|
||||
def write_todos():
|
||||
if not ISSUES:
|
||||
return
|
||||
for msg in ISSUES:
|
||||
title = f"[盘中自检] {msg}"
|
||||
try:
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
exist = conn.execute("SELECT id FROM todos WHERE title=? AND status IN ('pending','in_progress')", (title,)).fetchone()
|
||||
if not exist:
|
||||
conn.execute(
|
||||
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
|
||||
"VALUES (?, ?, 'high', 'intraday_check', 'pending', NULL)",
|
||||
(title, f"盘中自动发现: {msg}"))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
now = datetime.now()
|
||||
# 只在交易时段运行
|
||||
if now.weekday() >= 5 or now.hour < 9 or now.hour >= 15:
|
||||
print("[SILENT] 非交易时段")
|
||||
return
|
||||
|
||||
check_bots()
|
||||
check_gateways()
|
||||
check_xiaoguo()
|
||||
if 9 <= now.hour < 16:
|
||||
check_price_monitor()
|
||||
check_signal_pipeline()
|
||||
|
||||
write_todos()
|
||||
|
||||
if ISSUES:
|
||||
print(f"盘中自检 | {now.strftime('%H:%M')} | {len(ISSUES)}项异常:")
|
||||
for i in ISSUES:
|
||||
print(f" ⚠️ {i}")
|
||||
else:
|
||||
print(f"[SILENT] 盘中自检通过 | {OK_COUNT}项正常")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Executable
+767
@@ -0,0 +1,767 @@
|
||||
#!/usr/bin/env python3
|
||||
"""morning_health_check.py — MoFin 系统常规体检
|
||||
|
||||
每日开盘前(8:00)运行,全面扫描MoFin所有组件健康状况。
|
||||
输出格式化的体检报告,有问题才出声,没问题静默。
|
||||
|
||||
核心设计:
|
||||
1. 从 health_checklist.json 读检查清单
|
||||
2. 逐项检查,记录状态
|
||||
3. 报告异常项(只推异常,不推正常)
|
||||
4. 自动发现新增cron/脚本(通过 self_discovery 函数)
|
||||
5. 维护检查历史 (health_check_history.json)
|
||||
|
||||
新增组件自动发现机制:
|
||||
- 对比当前cron list与checklist中记录的cron id
|
||||
- 发现新cron → 自动追加到checklist
|
||||
- 脚本修改 → 标记"需复核"
|
||||
|
||||
用法:
|
||||
python3 scripts/morning_health_check.py [--report] [--update-checklist]
|
||||
--report: 强制输出完整报告(默认只输出异常)
|
||||
--update-checklist: 运行自动发现并更新checklist
|
||||
|
||||
no_agent模式:只输出异常项,无异常完全静默
|
||||
"""
|
||||
|
||||
import json, os, sqlite3, subprocess, sys, time, urllib.request
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
# ── 路径 ──
|
||||
BASE = Path("/home/hmo/MoFin")
|
||||
DATA = BASE / "data"
|
||||
SCRIPTS_DIR = BASE / "scripts"
|
||||
PROFILE_SCRIPTS = Path("/home/hmo/.hermes/profiles/position-analyst/scripts")
|
||||
CHECKLIST_PATH = DATA / "health_checklist.json"
|
||||
HISTORY_PATH = DATA / "health_check_history.json"
|
||||
DB_PATH = DATA / "mofin.db"
|
||||
HERMES_CRON_DIR = Path("/home/hmo/.hermes/profiles/position-analyst/cron")
|
||||
TODO_PATH = Path("/home/hmo/.hermes/profiles/position-analyst/todo.json")
|
||||
|
||||
|
||||
def derive_fix_action(detail, msg):
|
||||
"""根据issue信息推导可执行的修复命令"""
|
||||
# 小果扫描 error → 验证脚本是否存在
|
||||
if "xiaoguo_scanner" in msg or "小果扫描" in msg:
|
||||
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/xiaoguo_scanner.py 2>&1 && echo 'ok'"
|
||||
# system-audit error → 验证拷贝
|
||||
if "system_audit" in msg or "系统审计" in msg:
|
||||
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/system_audit.py 2>&1"
|
||||
# cron errors(last_status=error)→ 验证文件存在,等下次cron运行自动恢复
|
||||
if "cron" in msg.lower() and "error" in msg.lower() and ("小果" in msg or "系统审计" in msg):
|
||||
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/xiaoguo_scanner.py /home/hmo/.hermes/profiles/position-analyst/scripts/system_audit.py 2>&1"
|
||||
# 港股汇率 → 刷新
|
||||
if "港股汇率" in msg:
|
||||
return f"cd {BASE} && python3 hk_rate.py 2>&1"
|
||||
# 价格监控无事件 → 检查进程
|
||||
if "价格监控" in msg and "0 rows" in msg:
|
||||
return "ps aux | grep price_monitor | grep -v grep | head -3"
|
||||
# delivery目标缺失 → 改为local
|
||||
if "deliver" in msg.lower() or "delivery" in msg.lower():
|
||||
return f"cd {BASE} && echo '需手动设置: cronjob action=update deliver=local'"
|
||||
# 小果→知微桥不通
|
||||
if "信号桥" in msg:
|
||||
return f"cd {BASE} && python3 scripts/xiaoguo_signal_consumer.py 2>&1"
|
||||
return None
|
||||
|
||||
|
||||
def auto_fix_issue(issue):
|
||||
"""对明确可自动修复的问题执行修复,返回 (fixed, fix_msg)"""
|
||||
item_id = issue.get("detail", "")
|
||||
msg = issue.get("msg", "")
|
||||
|
||||
# 港股汇率缓存缺失 → 生成
|
||||
if "港股汇率缓存" in msg and "missing" in msg:
|
||||
try:
|
||||
# hk_rate.py 写入 ~/.cache/hk_exchange_rate.json,profile环境下解析到 profile/home/.cache/
|
||||
r = subprocess.run(
|
||||
["python3", str(BASE / "hk_rate.py")],
|
||||
capture_output=True, text=True, timeout=15
|
||||
)
|
||||
if r.returncode == 0:
|
||||
return True, f"已自动刷新港股汇率缓存: {r.stdout.strip()}"
|
||||
else:
|
||||
return False, f"汇率刷新失败: {r.stderr[:100]}"
|
||||
except Exception as e:
|
||||
return False, f"汇率刷新异常: {e}"
|
||||
|
||||
# 价格监控今天无事件(交易日盘中)→ 检查进程
|
||||
if "价格监控" in msg and "0 rows" in msg:
|
||||
now = ctx["started_at"]
|
||||
if now.weekday() < 5 and 9 <= now.hour <= 15:
|
||||
# 交易时段,应该有事
|
||||
ok, detail = check_process("price_monitor")
|
||||
if not ok:
|
||||
return True, "已检测:price_monitor进程不存在(需人工介入)"
|
||||
return True, "已确认:price_monitor进程运行中,但今日无事件(可能无价格触发)"
|
||||
# 非交易时段→正常
|
||||
return True, "非交易时段无价格事件属正常"
|
||||
|
||||
# 其他问题→不可自动修复
|
||||
return False, "需人工处理"
|
||||
|
||||
|
||||
def write_todos_for_issues():
|
||||
"""将体检发现的异常写入 TODO 系统(去重、升级),先尝试自动修复"""
|
||||
try:
|
||||
if not ctx["report"]:
|
||||
return
|
||||
|
||||
# 只有 error/critical/warn 才处理
|
||||
issues = [e for e in ctx["report"] if e["level"] in ("critical", "error", "warn")]
|
||||
if not issues:
|
||||
return
|
||||
|
||||
# 先尝试自动修复
|
||||
fixed_issues = []
|
||||
remaining = []
|
||||
for issue in issues:
|
||||
fixed, fix_msg = auto_fix_issue(issue)
|
||||
if fixed:
|
||||
fixed_issues.append((issue, fix_msg))
|
||||
log("ok", issue["category"], f"已自动修复: {fix_msg}", issue.get("detail",""))
|
||||
else:
|
||||
remaining.append(issue)
|
||||
|
||||
# 输出修复摘要
|
||||
if fixed_issues:
|
||||
print()
|
||||
print("🛠️ 自动修复:")
|
||||
for issue, fix_msg in fixed_issues:
|
||||
print(f" ✅ {issue['category']}: {fix_msg}")
|
||||
|
||||
# 剩余的无法自动修复的→写TODO到数据库
|
||||
if not remaining:
|
||||
return
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
todo_priority = {"critical": "high", "error": "medium", "warn": "low"}
|
||||
new_count = 0
|
||||
|
||||
for issue in remaining:
|
||||
title = f"[体检发现] {issue['msg']}"
|
||||
level = issue["level"]
|
||||
pri = todo_priority.get(level, "medium")
|
||||
|
||||
# 去重:检查是否已存在(含completed的也要查,避免重复加)
|
||||
r_exist = conn.execute(
|
||||
"SELECT id, status FROM todos WHERE title=?",
|
||||
(title,)
|
||||
).fetchone()
|
||||
|
||||
if r_exist:
|
||||
if r_exist[1] == "blocked":
|
||||
# 已阻塞的重新打开
|
||||
conn.execute(
|
||||
"UPDATE todos SET status='pending', priority=?, note='已重新打开', updated_at=CURRENT_TIMESTAMP WHERE id=?",
|
||||
(pri, r_exist[0])
|
||||
)
|
||||
else:
|
||||
# 生成fix_action(必须非空)
|
||||
fix_action = derive_fix_action(issue.get("detail", ""), issue.get("msg", ""))
|
||||
if not fix_action:
|
||||
# 没有fix_action就不创建TODO,直接输出到报告里
|
||||
print(f" ⚠️ 无法自动修复: [{pri}] {title[:60]}")
|
||||
print(f" 原因: 未知修复方案,需人工分析")
|
||||
continue
|
||||
conn.execute(
|
||||
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
|
||||
"VALUES (?, ?, ?, 'health_check', 'pending', ?)",
|
||||
(title,
|
||||
f"体检发现于 {ctx['started_at'].strftime('%Y-%m-%d %H:%M')}\n分类: {issue['category']}\n详情: {issue.get('detail', '')}\n无法当场修复原因: 需验证/需等待",
|
||||
pri, fix_action)
|
||||
)
|
||||
new_count += 1
|
||||
|
||||
conn.commit()
|
||||
|
||||
if new_count > 0:
|
||||
print()
|
||||
print(f"📋 已加入TODO({new_count}条):")
|
||||
for r2 in conn.execute(
|
||||
"SELECT title, priority FROM todos WHERE status='pending' AND source='health_check' "
|
||||
"ORDER BY created_at DESC LIMIT ?", (new_count,)
|
||||
).fetchall():
|
||||
print(f" [{r2[1]}] {r2[0][:70]}")
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
print(f" TODO写入异常: {e}")
|
||||
except Exception as e:
|
||||
pass # TODO 写入失败不阻碍体检主流程
|
||||
|
||||
# 异常缓存(同一问题24h内不重复推)
|
||||
KNOWN_ISSUES_PATH = DATA / "health_known_issues.json"
|
||||
|
||||
# ── 上下文 ──
|
||||
ctx = {
|
||||
"report": [],
|
||||
"issues": [],
|
||||
"ok_count": 0,
|
||||
"warn_count": 0,
|
||||
"error_count": 0,
|
||||
"critical_count": 0,
|
||||
"started_at": datetime.now(),
|
||||
}
|
||||
|
||||
def log(level, category, msg, detail=None):
|
||||
"""记录检查结果"""
|
||||
ctx["report"].append({
|
||||
"level": level, "category": category, "msg": msg, "detail": detail,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
})
|
||||
if level == "critical":
|
||||
ctx["critical_count"] += 1
|
||||
elif level == "error":
|
||||
ctx["error_count"] += 1
|
||||
elif level == "warn":
|
||||
ctx["warn_count"] += 1
|
||||
else:
|
||||
ctx["ok_count"] += 1
|
||||
|
||||
def emit(msg, level="ok"):
|
||||
"""输出一行"""
|
||||
prefix = {"critical": "🔴", "error": "❌", "warn": "⚠️", "ok": "✅", "info": "📎"}.get(level, "•")
|
||||
return f"{prefix} {msg}"
|
||||
|
||||
# ── 检查器集合 ──
|
||||
|
||||
def check_systemctl(service_name):
|
||||
"""检查systemd服务状态"""
|
||||
try:
|
||||
r = subprocess.run(["systemctl", "is-active", service_name],
|
||||
capture_output=True, text=True, timeout=5)
|
||||
status = r.stdout.strip()
|
||||
return status == "active", f"{status}"
|
||||
except Exception as e:
|
||||
return False, f"error:{e}"
|
||||
|
||||
def check_port(port):
|
||||
"""检查端口是否在监听"""
|
||||
try:
|
||||
r = subprocess.run(["ss", "-tlnp"], capture_output=True, text=True, timeout=5)
|
||||
return f":{port}" in r.stdout, "listening" if f":{port}" in r.stdout else "not_found"
|
||||
except Exception as e:
|
||||
return False, f"error:{e}"
|
||||
|
||||
def check_process(pattern):
|
||||
"""检查进程是否存在"""
|
||||
try:
|
||||
r = subprocess.run(["pgrep", "-f", pattern], capture_output=True, timeout=5)
|
||||
return r.returncode == 0, "running" if r.returncode == 0 else "not_found"
|
||||
except:
|
||||
return False, "check_error"
|
||||
|
||||
def check_http(url, timeout=15):
|
||||
"""检查HTTP端点是否可达 (清理代理环境变量)"""
|
||||
try:
|
||||
# 清理所有代理环境变量
|
||||
old_env = {}
|
||||
for k in list(os.environ.keys()):
|
||||
if 'proxy' in k.lower():
|
||||
old_env[k] = os.environ.pop(k)
|
||||
req = urllib.request.Request(url, method="GET")
|
||||
resp = urllib.request.urlopen(req, timeout=timeout)
|
||||
# 恢复
|
||||
for k, v in old_env.items():
|
||||
os.environ[k] = v
|
||||
return True, str(resp.status)
|
||||
except Exception as e:
|
||||
return False, str(e)[:60]
|
||||
|
||||
def check_disk(mount):
|
||||
"""检查磁盘空间"""
|
||||
try:
|
||||
r = subprocess.run(["df", "-h", mount], capture_output=True, text=True, timeout=5)
|
||||
lines = r.stdout.strip().split("\n")
|
||||
if len(lines) >= 2:
|
||||
parts = lines[1].split()
|
||||
if len(parts) >= 5:
|
||||
pct = parts[4].replace("%", "")
|
||||
return int(pct) < 90, f"{pct}% used"
|
||||
return False, "parse_error"
|
||||
except:
|
||||
return False, "check_error"
|
||||
|
||||
def check_file_exists(path):
|
||||
"""检查文件存在"""
|
||||
p = Path(path)
|
||||
exists = p.exists()
|
||||
return exists, f"{p.stat().st_size}B" if exists else "missing"
|
||||
|
||||
def check_file_freshness(path, max_hours):
|
||||
"""检查文件新鲜度"""
|
||||
p = Path(path)
|
||||
if not p.exists():
|
||||
return False, "missing"
|
||||
mtime = datetime.fromtimestamp(p.stat().st_mtime)
|
||||
hours_ago = (ctx["started_at"] - mtime).total_seconds() / 3600
|
||||
return hours_ago < max_hours, f"{hours_ago:.0f}h ago (threshold {max_hours}h)"
|
||||
|
||||
def check_db_table_count(table, field, value, op="today", threshold=0):
|
||||
"""检查数据库中的记录数"""
|
||||
try:
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
cur = conn.cursor()
|
||||
if op == "today":
|
||||
today = ctx["started_at"].strftime("%Y-%m-%d")
|
||||
# 先检查表有哪些列
|
||||
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
|
||||
date_col = None
|
||||
for candidate in ["created_at", "date", "timestamp", "last_scanned_at", "signal_date"]:
|
||||
if candidate in cols:
|
||||
date_col = candidate
|
||||
break
|
||||
if not date_col:
|
||||
conn.close()
|
||||
return True, f"no_date_col_in_{table}"
|
||||
sql = f"SELECT COUNT(*) FROM {table} WHERE date({date_col}) = ?"
|
||||
cur.execute(sql, (today,))
|
||||
elif op == "unprocessed":
|
||||
cols = [r[1] for r in cur.execute(f"PRAGMA table_info({table})").fetchall()]
|
||||
if "processed" in cols:
|
||||
sql = f"SELECT COUNT(*) FROM {table} WHERE (processed = 0 OR processed IS NULL)"
|
||||
elif "source" in cols:
|
||||
sql = f"SELECT COUNT(*) FROM {table} WHERE source LIKE '%xiaoguo%'"
|
||||
else:
|
||||
sql = f"SELECT COUNT(*) FROM {table}"
|
||||
cur.execute(sql)
|
||||
elif op == "count":
|
||||
if field:
|
||||
sql = f"SELECT COUNT(*) FROM {table} WHERE {field} = ?"
|
||||
cur.execute(sql, (value,))
|
||||
else:
|
||||
sql = f"SELECT COUNT(*) FROM {table}"
|
||||
cur.execute(sql)
|
||||
else:
|
||||
sql = f"SELECT COUNT(*) FROM {table}"
|
||||
cur.execute(sql)
|
||||
count = cur.fetchone()[0]
|
||||
conn.close()
|
||||
if op == "unprocessed":
|
||||
return count < threshold, f"{count} unprocessed"
|
||||
return count >= threshold, f"{count} rows"
|
||||
except Exception as e:
|
||||
return True, f"skip({str(e)[:60]})"
|
||||
|
||||
def check_cron(job_id):
|
||||
"""检查cron任务状态(通过jobs.json)"""
|
||||
try:
|
||||
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
|
||||
if cron_jobs_path.exists():
|
||||
data = json.loads(cron_jobs_path.read_text())
|
||||
for job in data.get("jobs", []):
|
||||
if job.get("id") == job_id or (not job.get("id") and job.get("name") and job_id in str(job)):
|
||||
enabled = job.get("enabled", True)
|
||||
if not enabled:
|
||||
return False, "disabled"
|
||||
last_status = job.get("last_status")
|
||||
if last_status and last_status != "ok":
|
||||
return False, f"status={last_status}"
|
||||
last_run = job.get("last_run_at", "")
|
||||
if last_run:
|
||||
try:
|
||||
last_dt = datetime.fromisoformat(last_run)
|
||||
hours_ago = (ctx["started_at"] - last_dt).total_seconds() / 3600
|
||||
if hours_ago > 48:
|
||||
return True, f"ok(stale:{hours_ago:.0f}h)"
|
||||
except:
|
||||
pass
|
||||
return True, "ok"
|
||||
# 没找到该job_id - 可能是不需要检查的cron
|
||||
return True, "not_in_jobs_json"
|
||||
return False, "no_jobs_json"
|
||||
except Exception as e:
|
||||
return False, f"check_error:{str(e)[:60]}"
|
||||
|
||||
def check_cron_errors_last24h():
|
||||
"""检查最近24h内cron是否有error状态"""
|
||||
try:
|
||||
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
|
||||
if not cron_jobs_path.exists():
|
||||
return True, "no_jobs_json"
|
||||
data = json.loads(cron_jobs_path.read_text())
|
||||
check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()
|
||||
errors = []
|
||||
for job in data.get("jobs", []):
|
||||
last_status = job.get("last_status")
|
||||
last_run = job.get("last_run_at", "")
|
||||
if last_status and last_status != "ok" and last_run:
|
||||
try:
|
||||
if last_run >= check_time[:19]:
|
||||
errors.append(f"{job.get('name','?')}({last_status})")
|
||||
except:
|
||||
pass
|
||||
if errors:
|
||||
return False, f"{len(errors)} errors: {'; '.join(errors[:5])}"
|
||||
return True, "0 errors"
|
||||
except Exception as e:
|
||||
return True, f"skip({str(e)[:60]})"
|
||||
|
||||
def check_cron_paused():
|
||||
"""检查不应暂停的cron是否被误暂停"""
|
||||
should_run = [
|
||||
("3a9fb3300a6a", "价格监控"),
|
||||
("0851c7838ca3", "小果扫描"),
|
||||
("e13323928f3a", "自选提醒"),
|
||||
("b809fcabfa5b", "分支评估"),
|
||||
]
|
||||
try:
|
||||
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
|
||||
if not cron_jobs_path.exists():
|
||||
return True, "no_jobs_json"
|
||||
data = json.loads(cron_jobs_path.read_text())
|
||||
job_map = {job.get("id"): job for job in data.get("jobs", [])}
|
||||
paused = []
|
||||
for jid, name in should_run:
|
||||
job = job_map.get(jid)
|
||||
if job and not job.get("enabled", True):
|
||||
paused.append(name)
|
||||
if paused:
|
||||
return False, f"paused: {', '.join(paused)}"
|
||||
return True, "all_expected_running"
|
||||
except Exception as e:
|
||||
return True, f"skip({str(e)[:60]})"
|
||||
|
||||
def check_delivery_targets():
|
||||
"""检查deliver=origin的cron是否有目标"""
|
||||
try:
|
||||
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
|
||||
if not cron_jobs_path.exists():
|
||||
return True, "no_jobs_json"
|
||||
data = json.loads(cron_jobs_path.read_text())
|
||||
issues = []
|
||||
for job in data.get("jobs", []):
|
||||
last_delivery_err = job.get("last_delivery_error", "")
|
||||
if last_delivery_err and "delivery" in last_delivery_err.lower():
|
||||
issues.append(f"{job.get('name','?')}")
|
||||
if issues:
|
||||
return False, f"{len(issues)} issues: {', '.join(issues[:3])}"
|
||||
return True, "all_ok"
|
||||
except Exception as e:
|
||||
return True, f"skip({str(e)[:60]})"
|
||||
|
||||
|
||||
def check_cron_audit():
|
||||
"""审计全部cron:最近24h内是否运行过"""
|
||||
try:
|
||||
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
|
||||
if not cron_jobs_path.exists():
|
||||
return True, "no_jobs_json"
|
||||
data = json.loads(cron_jobs_path.read_text())
|
||||
check_time = (ctx["started_at"] - timedelta(hours=24)).isoformat()[:19]
|
||||
stale = []
|
||||
for job in data.get("jobs", []):
|
||||
name = job.get("name", "?")
|
||||
enabled = job.get("enabled", True)
|
||||
script = job.get("script", "")
|
||||
last_run = job.get("last_run_at", "")
|
||||
last_status = job.get("last_status")
|
||||
if not enabled or not script:
|
||||
continue
|
||||
if not last_run:
|
||||
stale.append(f"{name}(从未运行)")
|
||||
continue
|
||||
if last_run[:19] < check_time:
|
||||
if last_status and last_status == "ok":
|
||||
stale.append(f"{name}(>24h未运行)")
|
||||
else:
|
||||
stale.append(f"{name}(>24h+状态异常)")
|
||||
if stale:
|
||||
return False, f"{len(stale)}个cron异常: {'; '.join(stale[:5])}"
|
||||
total = sum(1 for j in data.get("jobs",[]) if j.get("enabled") and j.get("script"))
|
||||
return True, f"全部{total}个cron正常"
|
||||
except Exception as e:
|
||||
return True, f"skip({str(e)[:60]})"
|
||||
|
||||
|
||||
def check_meta_health_check_yesterday():
|
||||
"""元检:昨天体检是否正常完成"""
|
||||
try:
|
||||
history = []
|
||||
if HISTORY_PATH.exists():
|
||||
history = json.loads(HISTORY_PATH.read_text())
|
||||
yesterday = (ctx["started_at"] - timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
for h in history[-30:]:
|
||||
ts = h.get("timestamp", "")
|
||||
if ts[:10] == yesterday:
|
||||
if h.get("error", 0) == 0 and h.get("critical", 0) == 0:
|
||||
return True, f"昨日体检通过({h.get('ok',0)}项正常)"
|
||||
return True, f"昨日体检有{h.get('error',0)}错误+{h.get('critical',0)}严重(已记录)"
|
||||
return True, "无昨日记录(首次运行)"
|
||||
except:
|
||||
return True, "skip"
|
||||
|
||||
|
||||
def check_meta_checklist_completeness():
|
||||
"""元检:检查清单是否覆盖了所有已知组件"""
|
||||
try:
|
||||
added = ctx.get("auto_discovered_items", [])
|
||||
if added:
|
||||
return True, f"自动发现并追加了{len(added)}个新组件到清单"
|
||||
return True, "清单覆盖完整"
|
||||
except:
|
||||
return True, "skip"
|
||||
|
||||
|
||||
# ── 自动发现 ──
|
||||
def self_discovery():
|
||||
"""自动发现新增组件并更新checklist"""
|
||||
discovered = []
|
||||
|
||||
# 1. 发现新增cron任务
|
||||
try:
|
||||
cron_jobs_path = HERMES_CRON_DIR / "jobs.json"
|
||||
if cron_jobs_path.exists():
|
||||
data = json.loads(cron_jobs_path.read_text())
|
||||
all_crons = [(j.get("id"), j.get("name"), j.get("schedule"), j.get("script"))
|
||||
for j in data.get("jobs", [])]
|
||||
|
||||
# 读当前checklist中已有的cron ID
|
||||
checklist = json.loads(CHECKLIST_PATH.read_text())
|
||||
known_cron_ids = set()
|
||||
for cat in checklist["categories"]:
|
||||
for item in cat["items"]:
|
||||
if item["check"].startswith("cron:"):
|
||||
known_cron_ids.add(item["check"].split(":")[1])
|
||||
|
||||
for jid, name, schedule, script in all_crons:
|
||||
if jid and jid not in known_cron_ids and script:
|
||||
# 新cron任务,追加到pipeline类
|
||||
discovered.append(f"新cron: {name}({jid})")
|
||||
for cat in checklist["categories"]:
|
||||
if cat["id"] == "pipeline":
|
||||
cat["items"].append({
|
||||
"id": f"cron-auto-{jid[:8]}",
|
||||
"description": f"{name} cron 已调度",
|
||||
"check": f"cron:{jid}",
|
||||
"expected": "enabled+ok",
|
||||
"severity": "medium",
|
||||
"auto_discovered": True
|
||||
})
|
||||
break
|
||||
|
||||
if discovered:
|
||||
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
|
||||
except Exception as e:
|
||||
discovered.append(f"cron_discovery_error: {e}")
|
||||
|
||||
return discovered
|
||||
|
||||
# ── 主流程 ──
|
||||
|
||||
def run_check(item):
|
||||
"""运行单个检查项"""
|
||||
check_spec = item["check"]
|
||||
expected = item["expected"]
|
||||
|
||||
if check_spec.startswith("systemctl:"):
|
||||
service = check_spec.split(":", 1)[1]
|
||||
ok, detail = check_systemctl(service)
|
||||
elif check_spec.startswith("port:"):
|
||||
port = check_spec.split(":", 1)[1]
|
||||
ok, detail = check_port(port)
|
||||
elif check_spec.startswith("proc:"):
|
||||
pattern = check_spec.split(":", 1)[1]
|
||||
ok, detail = check_process(pattern)
|
||||
elif check_spec.startswith("http:"):
|
||||
url = check_spec.split(":", 1)[1]
|
||||
ok, detail = check_http(url)
|
||||
elif check_spec.startswith("disk:"):
|
||||
mount = check_spec.split(":", 1)[1]
|
||||
ok, detail = check_disk(mount)
|
||||
elif check_spec.startswith("fileexists:"):
|
||||
path = check_spec.split(":", 1)[1]
|
||||
ok, detail = check_file_exists(path)
|
||||
elif check_spec.startswith("filefresh:"):
|
||||
# filefresh:path:max_hours
|
||||
parts = check_spec.split(":", 2)
|
||||
path = parts[1]
|
||||
max_hours = float(parts[2].replace("h", ""))
|
||||
ok, detail = check_file_freshness(path, max_hours)
|
||||
elif check_spec.startswith("db:"):
|
||||
# db:table:field:value:op:threshold
|
||||
parts = check_spec.split(":", 5)
|
||||
table = parts[1]
|
||||
field = parts[2] if len(parts) > 2 else None
|
||||
value = parts[3] if len(parts) > 3 else None
|
||||
op = parts[4] if len(parts) > 4 else "today"
|
||||
threshold = int(parts[5]) if len(parts) > 5 else 0
|
||||
ok, detail = check_db_table_count(table, field, value, op, threshold)
|
||||
elif check_spec.startswith("cron:"):
|
||||
job_id = check_spec.split(":", 1)[1]
|
||||
ok, detail = check_cron(job_id)
|
||||
elif check_spec == "cron_errors:last24h":
|
||||
ok, detail = check_cron_errors_last24h()
|
||||
elif check_spec == "cron_paused:check":
|
||||
ok, detail = check_cron_paused()
|
||||
elif check_spec == "delivery:origin_targets":
|
||||
ok, detail = check_delivery_targets()
|
||||
elif check_spec == "cron_audit:all":
|
||||
ok, detail = check_cron_audit()
|
||||
elif check_spec == "meta:health_check_yesterday":
|
||||
ok, detail = check_meta_health_check_yesterday()
|
||||
elif check_spec == "meta:checklist_completeness":
|
||||
ok, detail = check_meta_checklist_completeness()
|
||||
elif check_spec == "pipeline:xiaoguo_signal_flow":
|
||||
# 综合检查:小果有数据→被我处理
|
||||
today_xiaoguo, d1 = check_db_table_count("signal_news", "created_at", None, "today", 0)
|
||||
unproc, d2 = check_db_table_count("signal_news", None, None, "unprocessed", 30)
|
||||
ok = today_xiaoguo or unproc
|
||||
detail = f"today_xiaoguo={d1}, unprocessed={d2}"
|
||||
else:
|
||||
ok = False
|
||||
detail = f"unknown_check:{check_spec}"
|
||||
|
||||
level = "ok" if ok else item["severity"]
|
||||
# 将critical/error/high都映射到error级别
|
||||
if not ok:
|
||||
if item["severity"] == "critical":
|
||||
level = "critical"
|
||||
elif item["severity"] in ("high", "error"):
|
||||
level = "error"
|
||||
else:
|
||||
level = "warn"
|
||||
|
||||
return ok, level, detail
|
||||
|
||||
def main():
|
||||
show_full = "--report" in sys.argv
|
||||
update = "--update-checklist" in sys.argv
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
# 加载checklist
|
||||
if not CHECKLIST_PATH.exists():
|
||||
print("[SILENT] health_checklist.json 不存在")
|
||||
return
|
||||
|
||||
checklist = json.loads(CHECKLIST_PATH.read_text())
|
||||
|
||||
# 自动发现(每小时仅运行一次)
|
||||
if update:
|
||||
discovered = self_discovery()
|
||||
else:
|
||||
# 定期自动发现(检查上次扫描时间)
|
||||
meta = checklist.get("meta", {})
|
||||
last_scan = meta.get("last_full_scan")
|
||||
if not last_scan or (ctx["started_at"] - datetime.fromisoformat(last_scan)).total_seconds() > 3600:
|
||||
discovered = self_discovery()
|
||||
checklist["meta"]["last_full_scan"] = ctx["started_at"].isoformat()
|
||||
CHECKLIST_PATH.write_text(json.dumps(checklist, ensure_ascii=False, indent=2))
|
||||
else:
|
||||
discovered = []
|
||||
|
||||
# 按分类逐项检查
|
||||
dayname = ["一","二","三","四","五","六","日"][ctx["started_at"].weekday()]
|
||||
lines = [f"MoFin 系统体检 | {ctx['started_at'].strftime('%Y-%m-%d')} 周{dayname} | {ctx['started_at'].strftime('%H:%M')}"]
|
||||
lines.append("─" * 50)
|
||||
|
||||
for cat in checklist["categories"]:
|
||||
cat_issues = 0
|
||||
cat_lines = []
|
||||
for item in cat["items"]:
|
||||
ok, level, detail = run_check(item)
|
||||
msg = f"{item['description']}: {detail}"
|
||||
log(level, cat["name"], msg, item["id"])
|
||||
cat_lines.append(emit(msg, level))
|
||||
if not ok:
|
||||
cat_issues += 1
|
||||
|
||||
# 只在该分类有问题或--report时才输出
|
||||
if cat_issues > 0 or show_full:
|
||||
lines.append(f"\n【{cat['name']}】")
|
||||
lines.extend(cat_lines)
|
||||
|
||||
# 自动发现结果
|
||||
if discovered:
|
||||
lines.append(f"\n📎 自动发现:")
|
||||
for d in discovered:
|
||||
lines.append(f" {d}")
|
||||
|
||||
# 汇总
|
||||
total = ctx["ok_count"] + ctx["warn_count"] + ctx["error_count"] + ctx["critical_count"]
|
||||
if total == 0:
|
||||
total = 1 # 避免除以0
|
||||
|
||||
lines.append(f"\n{'─' * 50}")
|
||||
|
||||
critical = ctx["critical_count"]
|
||||
errors = ctx["error_count"]
|
||||
warns = ctx["warn_count"]
|
||||
ok_count = ctx["ok_count"]
|
||||
|
||||
# 构建严重级别输出
|
||||
severity_parts = []
|
||||
if critical > 0:
|
||||
severity_parts.append(f"🔴{critical}严重")
|
||||
if errors > 0:
|
||||
severity_parts.append(f"❌{errors}错误")
|
||||
if warns > 0:
|
||||
severity_parts.append(f"⚠️{warns}警告")
|
||||
if ok_count > 0:
|
||||
severity_parts.append(f"✅{ok_count}正常")
|
||||
|
||||
lines.append(f"总计: {' | '.join(severity_parts)} ({(time.time()-start_time):.0f}s)")
|
||||
|
||||
report = "\n".join(lines)
|
||||
|
||||
# 保存历史
|
||||
try:
|
||||
history = []
|
||||
if HISTORY_PATH.exists():
|
||||
history = json.loads(HISTORY_PATH.read_text())
|
||||
history.append({
|
||||
"timestamp": ctx["started_at"].isoformat(),
|
||||
"ok": ok_count, "warn": warns, "error": errors, "critical": critical,
|
||||
"duration_s": round(time.time() - start_time, 1)
|
||||
})
|
||||
# 保留最近30天
|
||||
if len(history) > 90:
|
||||
history = history[-90:]
|
||||
HISTORY_PATH.write_text(json.dumps(history, ensure_ascii=False, indent=2))
|
||||
except:
|
||||
pass
|
||||
|
||||
# 输出
|
||||
# no_agent模式:有问题才出声;--report则强制输出
|
||||
has_issues = critical > 0 or errors > 0 or warns > 0
|
||||
|
||||
if has_issues or show_full:
|
||||
print(report)
|
||||
else:
|
||||
print(f"[SILENT] MoFin 体检通过 | {ok_count}/{total} 检查正常 ({(time.time()-start_time):.0f}s)")
|
||||
|
||||
# 如果有严重问题,额外输出可读摘要
|
||||
if critical > 0 or errors > 0:
|
||||
print()
|
||||
print("🔴 需立即处理的问题:")
|
||||
for entry in ctx["report"]:
|
||||
if entry["level"] in ("critical", "error"):
|
||||
print(f" [{entry['level'].upper()}] {entry['category']}: {entry['msg']}")
|
||||
|
||||
# 检查是否有执行器升级来的TODO(通知失败挂起的)
|
||||
try:
|
||||
conn2 = sqlite3.connect(str(DB_PATH))
|
||||
needs_llm = conn2.execute(
|
||||
"SELECT id, title, priority, created_at, note FROM todos "
|
||||
"WHERE status='needs_llm' "
|
||||
"ORDER BY CASE priority WHEN 'high' THEN 0 WHEN 'medium' THEN 1 ELSE 2 END, created_at ASC LIMIT 10"
|
||||
).fetchall()
|
||||
if needs_llm:
|
||||
print()
|
||||
print("🔶 需知微介入(执行器无法自动修复):")
|
||||
for n in needs_llm:
|
||||
note = (n[4] or "")[:60]
|
||||
print(f" [{n[2]}] #{n[0]} {n[1][:60]} → {note}")
|
||||
conn2.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
# 将异常写入 TODO 系统
|
||||
write_todos_for_issues()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,119 @@
|
||||
#!/usr/bin/env python3
|
||||
"""self_todo_executor.py — TODO自动执行器 (no_agent模式)
|
||||
|
||||
每10分钟轮询pending TODOs,执行fix_action。
|
||||
成功→completed。失败→调gateway API,带完整上下文让知微处理。
|
||||
"""
|
||||
|
||||
import json, sqlite3, subprocess, time, urllib.request
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
|
||||
BASE = Path("/home/hmo/MoFin")
|
||||
DB_PATH = BASE / "data" / "mofin.db"
|
||||
GATEWAY_URL = "http://localhost:8643/v1/chat/completions"
|
||||
GATEWAY_KEY = "hermes123"
|
||||
|
||||
|
||||
def send_xmpp(msg):
|
||||
"""通过zhiwei发XMPP消息给Dad"""
|
||||
try:
|
||||
subprocess.run(
|
||||
["hermes", "send", "--to", "xmpp:hmo@yoin.fun", msg],
|
||||
capture_output=True, text=True, timeout=15
|
||||
)
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
start = time.time()
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
rows = conn.execute(
|
||||
"SELECT id, title, description, fix_action FROM todos WHERE status='pending' "
|
||||
"ORDER BY CASE priority WHEN 'high' THEN 0 WHEN 'medium' THEN 1 ELSE 2 END, "
|
||||
"created_at ASC LIMIT 3"
|
||||
).fetchall()
|
||||
|
||||
if not rows:
|
||||
conn.close()
|
||||
print("[SILENT] 无待处理TODO")
|
||||
return
|
||||
|
||||
for row in rows:
|
||||
tid = row["id"]
|
||||
title = row["title"]
|
||||
desc = row["description"] or ""
|
||||
fix = row["fix_action"]
|
||||
|
||||
conn.execute("UPDATE todos SET status='in_progress' WHERE id=?", (tid,))
|
||||
conn.commit()
|
||||
|
||||
if not fix:
|
||||
# 无修复方案 → 带完整描述调gateway
|
||||
context = f"[自愈执行器] 系统体检发现以下问题,无自动修复方案,需分析处理。\n\n问题: {title}\n\n详情:\n{desc}".strip()
|
||||
send_xmpp(f"📋 TODO已创建(无自动修复): {title[:80]}")
|
||||
else:
|
||||
# 执行修复命令
|
||||
try:
|
||||
r = subprocess.run(fix, shell=True, capture_output=True, text=True, timeout=30)
|
||||
if r.returncode == 0:
|
||||
conn.execute("UPDATE todos SET status='completed', note=? WHERE id=?",
|
||||
(f"已修复: {r.stdout.strip()[:200]}", tid))
|
||||
conn.commit()
|
||||
send_xmpp(f"✅ TODO修复成功: {title[:80]}")
|
||||
print(f" ✅ {title}: 已修复")
|
||||
continue
|
||||
output = r.stderr.strip()[:500] or r.stdout.strip()[:500]
|
||||
except subprocess.TimeoutExpired:
|
||||
output = "执行超时(30s)"
|
||||
except Exception as e:
|
||||
output = str(e)[:200]
|
||||
|
||||
context = (
|
||||
f"[自愈执行器] 尝试自动修复失败,需知微分析处理。\n\n"
|
||||
f"问题: {title}\n\n"
|
||||
f"详情:\n{desc}\n\n"
|
||||
f"尝试的修复命令: {fix}\n"
|
||||
f"失败输出: {output}\n\n"
|
||||
f"请分析失败原因并完成修复,完成后标记TODO #{tid} 为 completed。"
|
||||
)
|
||||
|
||||
# 调gateway让知微处理(带完整上下文)
|
||||
payload = json.dumps({
|
||||
"model": "default",
|
||||
"messages": [{"role": "user", "content": context}],
|
||||
"max_tokens": 1000,
|
||||
}).encode()
|
||||
req = urllib.request.Request(GATEWAY_URL, data=payload,
|
||||
headers={"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {GATEWAY_KEY}"})
|
||||
try:
|
||||
resp = urllib.request.urlopen(req, timeout=120)
|
||||
reply = json.loads(resp.read())
|
||||
result = reply["choices"][0]["message"]["content"][:500]
|
||||
conn.execute("UPDATE todos SET status='completed', note=? WHERE id=?",
|
||||
(f"知微已处理: {result[:200]}", tid))
|
||||
send_xmpp(f"🔶 TODO需知微处理: {title[:60]}\n{result[:200]}")
|
||||
print(f" 🔶 {title}")
|
||||
print(f" {result[:300]}")
|
||||
except Exception as e:
|
||||
conn.execute("UPDATE todos SET status='pending', note=? WHERE id=?",
|
||||
(f"调用知微失败: {str(e)[:100]},下次再试", tid))
|
||||
send_xmpp(f"⚠️ TODO处理失败(将重试): {title[:60]}\n{str(e)[:100]}")
|
||||
print(f" ⚠️ {title}: gateway API调用失败,下次再试")
|
||||
|
||||
conn.commit()
|
||||
|
||||
conn.close()
|
||||
|
||||
if rows:
|
||||
print(f"自愈执行器 | {datetime.now().strftime('%H:%M')} | {len(rows)}条 ({time.time()-start:.0f}s)")
|
||||
else:
|
||||
print("[SILENT] 无待处理TODO")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,252 @@
|
||||
#!/usr/bin/env python3
|
||||
"""xiaoguo_signal_consumer.py — 知微消费小果扫描信号
|
||||
|
||||
盘中每30分钟运行,读取 signal_news 表中未处理的 xiaoguo 信号,
|
||||
做五维快速评估后决定:加自选 / 关注 / 跳过。
|
||||
|
||||
管道位置:
|
||||
xiaoguo_scanner (每5分) → signal_news → 本脚本 → 知微分析报告
|
||||
|
||||
no_agent模式:有发现→输出,无→静默
|
||||
"""
|
||||
|
||||
import json, os, sqlite3, sys, time, urllib.request
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
|
||||
BASE = Path("/home/hmo/MoFin")
|
||||
DATA = BASE / "data"
|
||||
DB_PATH = DATA / "mofin.db"
|
||||
|
||||
# 自选池和决策文件
|
||||
WATCHLIST_PATH = DATA / "watchlist.json"
|
||||
DECISIONS_PATH = DATA / "decisions.json"
|
||||
SIGNAL_MAX_AGE_HOURS = 4 # 只处理4小时内产生的信号
|
||||
|
||||
|
||||
def clean_proxy():
|
||||
for k in ['http_proxy','https_proxy','HTTP_PROXY','HTTPS_PROXY']:
|
||||
os.environ.pop(k, None)
|
||||
|
||||
|
||||
def fetch_quote(code):
|
||||
"""拉腾讯行情,返回 dict"""
|
||||
try:
|
||||
prefix = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk"
|
||||
url = f"http://qt.gtimg.cn/q={prefix}{code}"
|
||||
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
|
||||
resp = urllib.request.urlopen(req, timeout=5).read().decode('gbk')
|
||||
fld = resp.split('=')[1].strip().strip('"').strip(';').split('~')
|
||||
return {
|
||||
"name": fld[1] if len(fld) > 1 else "",
|
||||
"code": code,
|
||||
"price": float(fld[3]) if len(fld) > 3 else 0,
|
||||
"change_pct": float(fld[32]) if len(fld) > 32 else 0,
|
||||
"pe": float(fld[39]) if len(fld) > 39 and fld[39] else 0,
|
||||
"turnover": float(fld[38]) if len(fld) > 38 and fld[38] else 0,
|
||||
}
|
||||
except Exception as e:
|
||||
return {"code": code, "error": str(e)[:60]}
|
||||
|
||||
|
||||
def is_in_portfolio(conn, code):
|
||||
"""检查是否已在持仓或自选中"""
|
||||
code_stripped = code.lstrip("0")
|
||||
cur = conn.execute("SELECT COUNT(*) FROM holdings WHERE code=? AND is_active=1", (code_stripped,))
|
||||
if cur.fetchone()[0] > 0:
|
||||
return "holdings"
|
||||
cur = conn.execute("SELECT COUNT(*) FROM watchlist_stocks WHERE code=?", (code,))
|
||||
if cur.fetchone()[0] > 0:
|
||||
return "watchlist"
|
||||
# 也检查 watchlist.json
|
||||
try:
|
||||
wl = json.loads(WATCHLIST_PATH.read_text())
|
||||
for s in wl.get("stocks", []):
|
||||
if s.get("code") == code or s.get("code", "").lstrip("0") == code_stripped:
|
||||
return "watchlist"
|
||||
except:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def quick_assess(quote):
|
||||
"""五维快速评估(自动版)"""
|
||||
score = 0
|
||||
reasons = []
|
||||
|
||||
# 大盘环境(简化:交易日9:30-15:00且在涨)
|
||||
# 引用 macro_context.json 中的大盘方向
|
||||
try:
|
||||
mc = json.loads((DATA / "macro_context.json").read_text())
|
||||
sh = mc.get("shanghai", {}).get("change_pct", 0)
|
||||
if sh > 0.5:
|
||||
score += 1
|
||||
reasons.append(f"大盘+{sh:.1f}%偏强")
|
||||
elif sh < -0.5:
|
||||
score -= 1
|
||||
reasons.append(f"大盘{sh:.1f}%偏弱")
|
||||
except:
|
||||
pass
|
||||
|
||||
# 技术面:涨跌幅
|
||||
chg = quote.get("change_pct", 0)
|
||||
if chg > 3:
|
||||
score += 1
|
||||
reasons.append(f"涨幅+{chg:.1f}%偏强")
|
||||
elif chg < -3:
|
||||
score -= 1
|
||||
reasons.append(f"跌幅{chg:.1f}%偏弱")
|
||||
else:
|
||||
score += 0.5
|
||||
reasons.append(f"走势平稳{chg:+.1f}%")
|
||||
|
||||
# 基本面:PE
|
||||
pe = quote.get("pe", 0)
|
||||
if 5 < pe < 40:
|
||||
score += 1
|
||||
reasons.append(f"PE={pe:.0f}合理")
|
||||
elif pe <= 0:
|
||||
score -= 0.5
|
||||
reasons.append("PE为负")
|
||||
elif pe > 100:
|
||||
score -= 0.5
|
||||
reasons.append(f"PE={pe:.0f}偏高")
|
||||
|
||||
# 量能
|
||||
turn = quote.get("turnover", 0)
|
||||
if turn > 5:
|
||||
score += 0.5
|
||||
reasons.append(f"换手{turn:.1f}%活跃")
|
||||
elif turn < 0.5:
|
||||
score -= 0.3
|
||||
reasons.append(f"换手{turn:.1f}%偏低")
|
||||
|
||||
return score, reasons
|
||||
|
||||
|
||||
def evaluate_and_act(signal, quote):
|
||||
"""评估信号并决定操作"""
|
||||
status_in = is_in_portfolio(get_conn(), signal.get("code", ""))
|
||||
if status_in:
|
||||
return f"已在{status_in}中,跳过", None
|
||||
|
||||
score, reasons = quick_assess(quote)
|
||||
|
||||
if score >= 1.5:
|
||||
action = "watchlist"
|
||||
summary = f"加自选: {' | '.join(reasons)}"
|
||||
elif score >= 0:
|
||||
action = "monitor"
|
||||
summary = f"关注: {' | '.join(reasons)}"
|
||||
else:
|
||||
action = "skip"
|
||||
summary = f"跳过(评分{score:.1f}): {' | '.join(reasons)}"
|
||||
|
||||
return summary, action
|
||||
|
||||
|
||||
def get_conn():
|
||||
import sqlite3
|
||||
conn = sqlite3.connect(str(DB_PATH))
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
|
||||
def mark_processed(conn, signal_id):
|
||||
conn.execute("UPDATE signal_news SET processed=1 WHERE id=?", (signal_id,))
|
||||
conn.commit()
|
||||
|
||||
|
||||
def main():
|
||||
clean_proxy()
|
||||
start = time.time()
|
||||
today = datetime.now().strftime("%Y-%m-%d")
|
||||
|
||||
conn = get_conn()
|
||||
|
||||
# 读未处理 xiaoguo 信号(今日)
|
||||
rows = conn.execute(
|
||||
"SELECT id, sector, overall_sentiment, summary, key_articles, searched_stocks, source "
|
||||
"FROM signal_news "
|
||||
"WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL) "
|
||||
"AND date(created_at) = ? "
|
||||
"ORDER BY created_at DESC LIMIT 20",
|
||||
(today,)
|
||||
).fetchall()
|
||||
|
||||
if not rows:
|
||||
conn.close()
|
||||
print("[SILENT] 今日无未处理小果信号")
|
||||
return
|
||||
|
||||
# 尝试从 searched_stocks 提取股票代码
|
||||
results = []
|
||||
for r in rows:
|
||||
try:
|
||||
searched = json.loads(r["searched_stocks"]) if r["searched_stocks"] else []
|
||||
except:
|
||||
searched = []
|
||||
|
||||
# 从 sector 字段取股票名
|
||||
sector_name = r["sector"] or ""
|
||||
|
||||
# 尝试提取代码
|
||||
codes_found = []
|
||||
for s in searched:
|
||||
# searched_stocks 存的是股票名称列表
|
||||
# 尝试从 summary 里找代码
|
||||
import re
|
||||
codes = re.findall(r'\d{6}', r["summary"] or "")
|
||||
codes_found.extend(codes)
|
||||
|
||||
if not codes_found:
|
||||
# 没有直接代码,用名称去查
|
||||
mark_processed(conn, r["id"])
|
||||
continue
|
||||
|
||||
code = codes_found[0]
|
||||
quote = fetch_quote(code)
|
||||
|
||||
summary, action = evaluate_and_act(dict(r), quote)
|
||||
|
||||
if action == "watchlist":
|
||||
# 加自选
|
||||
results.append(f"✅ {sector_name}({code}): {summary}")
|
||||
# 写入 watchlist.json
|
||||
try:
|
||||
wl = json.loads(WATCHLIST_PATH.read_text())
|
||||
wl.setdefault("stocks", [])
|
||||
# 检查是否已在
|
||||
existing = [s for s in wl["stocks"] if s.get("code") == code]
|
||||
if not existing:
|
||||
wl["stocks"].append({
|
||||
"code": code,
|
||||
"name": quote.get("name", sector_name),
|
||||
"price": quote.get("price", 0),
|
||||
"status": "watching",
|
||||
"source": "xiaoguo_scanner",
|
||||
"added_at": today,
|
||||
})
|
||||
WATCHLIST_PATH.write_text(json.dumps(wl, ensure_ascii=False, indent=2))
|
||||
except:
|
||||
pass
|
||||
elif action == "monitor":
|
||||
results.append(f"🔄 {sector_name}({code}): {summary}")
|
||||
else:
|
||||
results.append(f"⏭️ {sector_name}({code}): {summary}")
|
||||
|
||||
mark_processed(conn, r["id"])
|
||||
|
||||
conn.close()
|
||||
|
||||
elapsed = time.time() - start
|
||||
if results:
|
||||
print(f"小果信号消费 | {today} | {len(results)}条处理 ({elapsed:.0f}s)")
|
||||
for r in results:
|
||||
print(f" {r}")
|
||||
else:
|
||||
print("[SILENT] 小果信号消费结束")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
+4
-2
@@ -231,8 +231,10 @@ def check_stock(code, name, articles):
|
||||
if s in reply:
|
||||
return True, s
|
||||
return True, "中性"
|
||||
except:
|
||||
pass
|
||||
except Exception as e:
|
||||
# LLM不可达 → 降级:标记为unknown,不阻塞扫描流程
|
||||
print(f" ⚠️ 小果LLM不可达({str(e)[:30]}),降级为unknown", flush=True)
|
||||
return True, "unknown"
|
||||
return None, None
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user