feat: TODO迁移到DB + no_agent自愈执行器

- 创建 mofin.db → todos 表(含fix_action/verification_check/retry机制)
- 创建 scripts/self_todo_executor.py(no_agent,纯代码逻辑,无LLM)
- 修改 morning_health_check.py:TODO写入DB而非JSON,新增derive_fix_action()
- cron替换:LLM cron → no_agent脚本,*/10 8-22高频轮询
- 成本:无pending时仅sqlite查询,约0.01s/次

处理链:
  health_check(8:00) → 可修直接自动修 → 不可修写DB(todos表) → 自愈执行器(每10分) → 有fix_action就执行 → 无fix_action标blocked留待人工
This commit is contained in:
知微
2026-06-24 20:44:36 +08:00
parent 26993c1d41
commit a76240b52d
3 changed files with 1007 additions and 951 deletions
+786 -910
View File
File diff suppressed because it is too large Load Diff
+65 -41
View File
@@ -40,6 +40,21 @@ HERMES_CRON_DIR = Path("/home/hmo/.hermes/profiles/position-analyst/cron")
TODO_PATH = Path("/home/hmo/.hermes/profiles/position-analyst/todo.json") TODO_PATH = Path("/home/hmo/.hermes/profiles/position-analyst/todo.json")
def derive_fix_action(detail, msg):
"""根据issue信息推导可执行的修复命令"""
# 小果扫描 error → 脚本已更新,只验证
if "xiaoguo_scanner" in msg or "小果扫描" in msg:
return f"cd {BASE} && python3 xiaoguo_scanner.py --test 2>&1 | head -5"
# system-audit error → 验证拷贝
if "system_audit" in msg or "系统审计" in msg:
return f"ls -la /home/hmo/.hermes/profiles/position-analyst/scripts/system_audit.py 2>&1"
# 港股汇率 → 刷新
if "港股汇率" in msg:
return f"cd {BASE} && python3 hk_rate.py 2>&1"
# delivery目标 → 无自动修复
return None
def auto_fix_issue(issue): def auto_fix_issue(issue):
"""对明确可自动修复的问题执行修复,返回 (fixed, fix_msg)""" """对明确可自动修复的问题执行修复,返回 (fixed, fix_msg)"""
item_id = issue.get("detail", "") item_id = issue.get("detail", "")
@@ -105,52 +120,61 @@ def write_todos_for_issues():
for issue, fix_msg in fixed_issues: for issue, fix_msg in fixed_issues:
print(f"{issue['category']}: {fix_msg}") print(f"{issue['category']}: {fix_msg}")
# 剩余的无法自动修复的→写TODO # 剩余的无法自动修复的→写TODO到数据库
if not remaining: if not remaining:
return return
# 读现有 TODO try:
existing = [] conn = sqlite3.connect(str(DB_PATH))
if TODO_PATH.exists(): todo_priority = {"critical": "high", "error": "medium", "warn": "low"}
try: new_count = 0
existing = json.loads(TODO_PATH.read_text())
except:
existing = []
existing_titles = {t.get("title", "") for t in existing}
todo_priority = {"critical": "high", "error": "medium", "warn": "low"}
new_items = []
for issue in remaining:
title = f"[体检发现] {issue['msg']}"
# 去重
if title in existing_titles:
for t in existing:
if t.get("title") == title:
if t.get("status") == "completed":
t["status"] = "pending"
t["priority"] = todo_priority.get(issue["level"], "medium")
t["note"] = f"重新打开: {ctx['started_at'].isoformat()}"
continue
existing_titles.add(title) for issue in remaining:
new_items.append({ title = f"[体检发现] {issue['msg']}"
"title": title, level = issue["level"]
"desc": f"体检发现于 {ctx['started_at'].strftime('%Y-%m-%d %H:%M')},分类: {issue['category']},详情: {issue.get('detail', '')}", pri = todo_priority.get(level, "medium")
"status": "pending",
"priority": todo_priority.get(issue["level"], "medium"), # 去重:检查是否已有相同 title
"created": ctx["started_at"].isoformat(), existing = conn.execute(
"target": "health_check_fix", "SELECT id, status FROM todos WHERE title=? AND status IN ('pending','in_progress','blocked')",
}) (title,)
).fetchone()
if new_items:
existing.extend(new_items) if existing:
TODO_PATH.write_text(json.dumps(existing, ensure_ascii=False, indent=2)) if existing["status"] == "blocked":
# 已阻塞的重新打开
conn.execute(
"UPDATE todos SET status='pending', priority=?, note='已重新打开', updated_at=CURRENT_TIMESTAMP WHERE id=?",
(pri, existing["id"])
)
else:
# 生成fix_action(如果可推导)
fix_action = derive_fix_action(issue.get("detail", ""), issue.get("msg", ""))
conn.execute(
"INSERT INTO todos (title, description, priority, source, status, fix_action) "
"VALUES (?, ?, ?, 'health_check', 'pending', ?)",
(title,
f"体检发现于 {ctx['started_at'].strftime('%Y-%m-%d %H:%M')}\n分类: {issue['category']}\n详情: {issue.get('detail', '')}",
pri, fix_action)
)
new_count += 1
print() conn.commit()
print("📋 已加入TODO(待处理):") conn.close()
for item in new_items:
print(f" [{item['priority']}] {item['title'][:70]}") if new_count > 0:
print()
print(f"📋 已加入TODO({new_count}条):")
# 重新查刚插入的pending记录
cur2 = conn.cursor()
for r2 in cur2.execute(
"SELECT title, priority FROM todos WHERE status='pending' AND source='health_check' "
"ORDER BY created_at DESC LIMIT ?", (new_count,)
).fetchall():
print(f" [{r2['priority']}] {r2['title'][:70]}")
conn.close()
except Exception as e:
print(f" TODO写入异常: {e}")
except Exception as e: except Exception as e:
pass # TODO 写入失败不阻碍体检主流程 pass # TODO 写入失败不阻碍体检主流程
+156
View File
@@ -0,0 +1,156 @@
#!/usr/bin/env python3
"""self_todo_executor.py — TODO自动执行器 (no_agent模式)
每10分钟轮询mofin.db中todos表的pending任务,执行fix_action命令。
纯代码逻辑,不调LLM。
有执行→输出摘要,无→SILENT。
"""
import json, os, sqlite3, subprocess, sys, time
from pathlib import Path
from datetime import datetime
BASE = Path("/home/hmo/MoFin")
DB_PATH = BASE / "data" / "mofin.db"
def get_conn():
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def verify_fix(verification_check):
"""运行验证命令,返回 (ok, output)"""
if not verification_check:
return True, "无验证命令"
try:
r = subprocess.run(
verification_check,
shell=True, capture_output=True, text=True, timeout=30
)
if r.returncode == 0:
return True, r.stdout.strip()[:200]
else:
return False, f"exit={r.returncode}: {r.stderr.strip()[:200]}"
except Exception as e:
return False, str(e)[:200]
def execute_fix(fix_action):
"""执行修复命令,返回 (ok, output)"""
if not fix_action:
return False, "无修复命令"
try:
r = subprocess.run(
fix_action,
shell=True, capture_output=True, text=True, timeout=60
)
if r.returncode == 0:
return True, r.stdout.strip()[:300] or "ok"
else:
return False, f"exit={r.returncode}: {r.stderr.strip()[:300]}"
except subprocess.TimeoutExpired:
return False, "执行超时(60s)"
except Exception as e:
return False, str(e)[:200]
def main():
start = time.time()
conn = get_conn()
cur = conn.cursor()
# 读所有pending任务(按优先级排序)
rows = cur.execute(
"SELECT id, title, description, priority, fix_action, verification_check, "
"retry_count, max_retries, note FROM todos "
"WHERE status='pending' ORDER BY "
"CASE priority WHEN 'high' THEN 0 WHEN 'medium' THEN 1 ELSE 2 END, "
"created_at ASC LIMIT 10"
).fetchall()
if not rows:
conn.close()
print("[SILENT] 无待处理TODO")
return
results = []
for row in rows:
todo_id = row["id"]
title = row["title"]
fix_action = row["fix_action"]
verification = row["verification_check"]
retry_count = row["retry_count"]
max_retries = row["max_retries"]
# 标记in_progress
cur.execute(
"UPDATE todos SET status='in_progress', updated_at=CURRENT_TIMESTAMP WHERE id=?",
(todo_id,)
)
conn.commit()
if fix_action:
# 执行修复
ok, output = execute_fix(fix_action)
if not ok:
retry_count += 1
if retry_count >= max_retries:
cur.execute(
"UPDATE todos SET status='blocked', retry_count=?, "
"note=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(retry_count, f"重试{retry_count}次仍失败: {output}", todo_id)
)
results.append(("", f"{title}: 已阻塞({output[:60]})"))
else:
cur.execute(
"UPDATE todos SET status='pending', retry_count=?, "
"note=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(retry_count, f"{retry_count}次失败: {output}", todo_id)
)
results.append(("🔄", f"{title}: 重试{retry_count}/{max_retries}({output[:60]})"))
conn.commit()
continue
# 修复成功→验证
v_ok, v_out = verify_fix(verification)
if v_ok:
cur.execute(
"UPDATE todos SET status='completed', note=?, "
"updated_at=CURRENT_TIMESTAMP WHERE id=?",
(f"已修复: {output[:200]}", todo_id)
)
results.append(("", f"{title}: 已修复"))
else:
# 修复成功但验证失败→可能验证命令不准,标记需人工
cur.execute(
"UPDATE todos SET status='completed', note=?, "
"updated_at=CURRENT_TIMESTAMP WHERE id=?",
(f"执行成功但验证异常: {v_out}", todo_id)
)
results.append(("⚠️", f"{title}: 已执行但验证异常({v_out[:60]})"))
else:
# 无fix_action → 标记为无法自动修复
cur.execute(
"UPDATE todos SET status='blocked', note='需人工处理(无修复命令)', "
"updated_at=CURRENT_TIMESTAMP WHERE id=?",
(todo_id,)
)
results.append(("", f"{title}: 无修复命令,已阻塞"))
conn.commit()
conn.close()
elapsed = time.time() - start
if results:
print(f"自愈执行器 | {datetime.now().strftime('%H:%M')} | {len(results)}条 ({elapsed:.0f}s)")
for icon, msg in results:
print(f" {icon} {msg}")
else:
print("[SILENT] 无待处理TODO")
if __name__ == "__main__":
main()