diff --git a/docs/self-healing-loop.md b/docs/self-healing-loop.md new file mode 100644 index 0000000..470f8bd --- /dev/null +++ b/docs/self-healing-loop.md @@ -0,0 +1,135 @@ +# MoFin 自愈循环系统设计 + +> 版本: v1.0 | 最后更新: 2026-06-24 +> 核心理念:体检发现问题 → TODO追踪 → 自动修复 → 次日体检验证,形成无人值守的修复闭环。 + +## 一、自愈循环架构 + +``` + ┌───────────────────────────────────┐ + │ morning_health_check.py │ + │ 每日 8:00(开盘前) │ + │ │ + │ 体检 7 层 43+ 项 │ + │ 发现问题 → 写 TODO │ + └──────────┬────────────────────────┘ + │ TODO: health_fix_xxx + ▼ + ┌───────────────────────────────────┐ + │ self-todo cron │ + │ 每 2-3 小时自动执行 │ + │ │ + │ 读取 pending 任务 │ + │ 按优先级逐个处理 │ + │ 完成后 git commit + 标记完成 │ + └──────────┬────────────────────────┘ + │ git push + ▼ + ┌───────────────────────────────────┐ + │ 次晨体检验证 │ + │ │ + │ 确认昨日问题已修复 → ✅ │ + │ 问题依然存在 → ⚠️ 升级标记 │ + └───────────────────────────────────┘ +``` + +## 二、健康检查 → TODO 对接 + +### 对接点 + +morning_health_check.py 发现异常后: + +``` +1. 读当前 TODO 文件 +2. 检查是否已有相同 issue 的 TODO(去重) +3. 如果没有 → 追加新 TODO: + - status: pending + - priority: 根据 severity(critical→high, error→medium, warn→low) + - deps: 无 + - title: "[体检发现] {问题描述}" +4. 如果有相同 TODO 且 status=completed → + - 说明昨天标记修复但今天仍然有问题 → 重新打开 + - 修改 status=pending, priority 升一级 +``` + +### TODO 字段映射 + +| 体检 severity | TODO priority | 预期修复时间 | +|---------------|---------------|-------------| +| critical | high | 当前 session / 下一个 self-todo | +| error | medium | 24h 内 | +| warn | low | 48h 内 | + +## 三、xiaoguo 信号桥(知微消费 xiaoguo_scanner 产出) + +### 问题 + +xiaoguo_scanner 每5分钟写 `signal_news` 表,但知微没有任何组件读取它。 + +### 解决方案 + +新增 `xiaoguo_signal_consumer.py`(no_agent 脚本,每30分钟运行一次): + +``` +xiaoguo_signal_consumer.py (每30分, 盘中) + │ + ├── 读 mofin.db signal_news 表 + │ WHERE source LIKE 'xiaoguo%' + │ AND (processed=0 OR processed IS NULL) + │ AND date(created_at) == today + │ + ├── 逐条处理: + │ ├── source=xiaoguo(看多信号): + │ │ ├── 检查是否已在自选/持仓 + │ │ ├── 拉腾讯行情(实时价/PE/涨跌幅) + │ │ ├── 五维快速评估(大盘→行业→消息→基本面→技术) + │ │ ├── 决策:加自选 / 关注 / 跳过 + │ │ └── 写入 decisions.json / watchlist.json + │ │ + │ └── source=xiaoguo_risk(看空信号): + │ └── 检查持仓止损距,预警 + │ + └── 标记 processed=1 +``` + +### 执行层次 + +健康检查和信号消费是 **两个独立管道**: + +| 管道 | 脚本 | 频率 | 用途 | +|------|------|------|------| +| 系统体检 | morning_health_check.py | 每日8:00 | 检查全部组件健康状态 | +| 信号消费 | xiaoguo_signal_consumer.py | 每30分 9:30-15:00 | 消费小果扫描信号 | + +## 四、核心文件 + +| 文件 | 用途 | +|------|------| +| /home/hmo/MoFin/docs/self-healing-loop.md | 本文档 | +| /home/hmo/MoFin/scripts/morning_health_check.py | 体检脚本(已存在,需修改) | +| /home/hmo/MoFin/scripts/xiaoguo_signal_consumer.py | 信号消费脚本(新建) | +| /home/hmo/.hermes/profiles/position-analyst/todo.json | TODO 系统数据文件 | + +## 五、TODO 集成修改计划 + +### morning_health_check.py 修改项 + +在 main() 函数末尾追加 TODO 写入逻辑: + +```python +def write_todos_for_issues(report_entries): + """将体检发现的异常写入 TODO 系统""" + todo_path = Path("/home/hmo/.hermes/profiles/position-analyst/todo.json") + # 读现有 TODO + # 去重(检查是否已有相同 issue) + # 追加新 TODO + # 写回 +``` + +### xiaoguo_signal_consumer.py 新建 + +30分钟盘中循环,消费 signal_news 中的未处理 xiaoguo 信号。 + +### self-todo cron 适配 + +确保 self-todo cron(b53435fbb38b 每小时多次运行)能识别并处理 `[体检发现]` 前缀的 TODO 项。 diff --git a/scripts/morning_health_check.py b/scripts/morning_health_check.py index 087fcf4..3a9318e 100755 --- a/scripts/morning_health_check.py +++ b/scripts/morning_health_check.py @@ -37,6 +37,67 @@ CHECKLIST_PATH = DATA / "health_checklist.json" HISTORY_PATH = DATA / "health_check_history.json" DB_PATH = DATA / "mofin.db" HERMES_CRON_DIR = Path("/home/hmo/.hermes/profiles/position-analyst/cron") +TODO_PATH = Path("/home/hmo/.hermes/profiles/position-analyst/todo.json") + + +def write_todos_for_issues(): + """将体检发现的异常写入 TODO 系统(去重、升级)""" + try: + if not ctx["report"]: + return + + # 只有 error/critical/warn 才写 TODO + issues = [e for e in ctx["report"] if e["level"] in ("critical", "error", "warn")] + if not issues: + return + + # 读现有 TODO + existing = [] + if TODO_PATH.exists(): + try: + existing = json.loads(TODO_PATH.read_text()) + except: + existing = [] + + existing_titles = {t.get("title", "") for t in existing} + todo_priority = {"critical": "high", "error": "medium", "warn": "low"} + + new_items = [] + for issue in issues: + title = f"[体检发现] {issue['msg']}" + # 去重:检查是否已有相同 title 的 TODO + if title in existing_titles: + # 已有相同 TODO -> 升级 + for t in existing: + if t.get("title") == title: + if t.get("status") == "completed": + # 昨天修了但今天还有问题 -> 重新打开并升级 + t["status"] = "pending" + t["priority"] = todo_priority.get(issue["level"], "medium") + t["note"] = f"重新打开: {ctx['started_at'].isoformat()}" + elif t.get("status") == "pending": + # 还是 pending -> 不重复写入 + pass + elif t.get("status") == "in_progress": + # 正在处理中 -> 不干扰 + pass + continue + + existing_titles.add(title) + new_items.append({ + "title": title, + "desc": f"体检发现于 {ctx['started_at'].strftime('%Y-%m-%d %H:%M')},分类: {issue['category']},详情: {issue.get('detail', '')}", + "status": "pending", + "priority": todo_priority.get(issue["level"], "medium"), + "created": ctx["started_at"].isoformat(), + "target": "health_check_fix", + }) + + if new_items: + existing.extend(new_items) + TODO_PATH.write_text(json.dumps(existing, ensure_ascii=False, indent=2)) + except Exception as e: + pass # TODO 写入失败不阻碍体检主流程 # 异常缓存(同一问题24h内不重复推) KNOWN_ISSUES_PATH = DATA / "health_known_issues.json" @@ -519,6 +580,9 @@ def main(): for entry in ctx["report"]: if entry["level"] in ("critical", "error"): print(f" [{entry['level'].upper()}] {entry['category']}: {entry['msg']}") + + # 将异常写入 TODO 系统 + write_todos_for_issues() if __name__ == "__main__": diff --git a/scripts/xiaoguo_signal_consumer.py b/scripts/xiaoguo_signal_consumer.py new file mode 100644 index 0000000..d8e35f1 --- /dev/null +++ b/scripts/xiaoguo_signal_consumer.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python3 +"""xiaoguo_signal_consumer.py — 知微消费小果扫描信号 + +盘中每30分钟运行,读取 signal_news 表中未处理的 xiaoguo 信号, +做五维快速评估后决定:加自选 / 关注 / 跳过。 + +管道位置: + xiaoguo_scanner (每5分) → signal_news → 本脚本 → 知微分析报告 + +no_agent模式:有发现→输出,无→静默 +""" + +import json, os, sqlite3, sys, time, urllib.request +from pathlib import Path +from datetime import datetime + +BASE = Path("/home/hmo/MoFin") +DATA = BASE / "data" +DB_PATH = DATA / "mofin.db" + +# 自选池和决策文件 +WATCHLIST_PATH = DATA / "watchlist.json" +DECISIONS_PATH = DATA / "decisions.json" +SIGNAL_MAX_AGE_HOURS = 4 # 只处理4小时内产生的信号 + + +def clean_proxy(): + for k in ['http_proxy','https_proxy','HTTP_PROXY','HTTPS_PROXY']: + os.environ.pop(k, None) + + +def fetch_quote(code): + """拉腾讯行情,返回 dict""" + try: + prefix = "sh" if code.startswith(('60','68','51','56','50')) else "sz" if code.startswith(('00','30','15')) else "hk" + url = f"http://qt.gtimg.cn/q={prefix}{code}" + req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) + resp = urllib.request.urlopen(req, timeout=5).read().decode('gbk') + fld = resp.split('=')[1].strip().strip('"').strip(';').split('~') + return { + "name": fld[1] if len(fld) > 1 else "", + "code": code, + "price": float(fld[3]) if len(fld) > 3 else 0, + "change_pct": float(fld[32]) if len(fld) > 32 else 0, + "pe": float(fld[39]) if len(fld) > 39 and fld[39] else 0, + "turnover": float(fld[38]) if len(fld) > 38 and fld[38] else 0, + } + except Exception as e: + return {"code": code, "error": str(e)[:60]} + + +def is_in_portfolio(conn, code): + """检查是否已在持仓或自选中""" + code_stripped = code.lstrip("0") + cur = conn.execute("SELECT COUNT(*) FROM holdings WHERE code=? AND is_active=1", (code_stripped,)) + if cur.fetchone()[0] > 0: + return "holdings" + cur = conn.execute("SELECT COUNT(*) FROM watchlist_stocks WHERE code=?", (code,)) + if cur.fetchone()[0] > 0: + return "watchlist" + # 也检查 watchlist.json + try: + wl = json.loads(WATCHLIST_PATH.read_text()) + for s in wl.get("stocks", []): + if s.get("code") == code or s.get("code", "").lstrip("0") == code_stripped: + return "watchlist" + except: + pass + return None + + +def quick_assess(quote): + """五维快速评估(自动版)""" + score = 0 + reasons = [] + + # 大盘环境(简化:交易日9:30-15:00且在涨) + # 引用 macro_context.json 中的大盘方向 + try: + mc = json.loads((DATA / "macro_context.json").read_text()) + sh = mc.get("shanghai", {}).get("change_pct", 0) + if sh > 0.5: + score += 1 + reasons.append(f"大盘+{sh:.1f}%偏强") + elif sh < -0.5: + score -= 1 + reasons.append(f"大盘{sh:.1f}%偏弱") + except: + pass + + # 技术面:涨跌幅 + chg = quote.get("change_pct", 0) + if chg > 3: + score += 1 + reasons.append(f"涨幅+{chg:.1f}%偏强") + elif chg < -3: + score -= 1 + reasons.append(f"跌幅{chg:.1f}%偏弱") + else: + score += 0.5 + reasons.append(f"走势平稳{chg:+.1f}%") + + # 基本面:PE + pe = quote.get("pe", 0) + if 5 < pe < 40: + score += 1 + reasons.append(f"PE={pe:.0f}合理") + elif pe <= 0: + score -= 0.5 + reasons.append("PE为负") + elif pe > 100: + score -= 0.5 + reasons.append(f"PE={pe:.0f}偏高") + + # 量能 + turn = quote.get("turnover", 0) + if turn > 5: + score += 0.5 + reasons.append(f"换手{turn:.1f}%活跃") + elif turn < 0.5: + score -= 0.3 + reasons.append(f"换手{turn:.1f}%偏低") + + return score, reasons + + +def evaluate_and_act(signal, quote): + """评估信号并决定操作""" + status_in = is_in_portfolio(get_conn(), signal.get("code", "")) + if status_in: + return f"已在{status_in}中,跳过", None + + score, reasons = quick_assess(quote) + + if score >= 1.5: + action = "watchlist" + summary = f"加自选: {' | '.join(reasons)}" + elif score >= 0: + action = "monitor" + summary = f"关注: {' | '.join(reasons)}" + else: + action = "skip" + summary = f"跳过(评分{score:.1f}): {' | '.join(reasons)}" + + return summary, action + + +def get_conn(): + import sqlite3 + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + return conn + + +def mark_processed(conn, signal_id): + conn.execute("UPDATE signal_news SET processed=1 WHERE id=?", (signal_id,)) + conn.commit() + + +def main(): + clean_proxy() + start = time.time() + today = datetime.now().strftime("%Y-%m-%d") + + conn = get_conn() + + # 读未处理 xiaoguo 信号(今日) + rows = conn.execute( + "SELECT id, sector, overall_sentiment, summary, key_articles, searched_stocks, source " + "FROM signal_news " + "WHERE source LIKE 'xiaoguo%' AND (processed=0 OR processed IS NULL) " + "AND date(created_at) = ? " + "ORDER BY created_at DESC LIMIT 20", + (today,) + ).fetchall() + + if not rows: + conn.close() + print("[SILENT] 今日无未处理小果信号") + return + + # 尝试从 searched_stocks 提取股票代码 + results = [] + for r in rows: + try: + searched = json.loads(r["searched_stocks"]) if r["searched_stocks"] else [] + except: + searched = [] + + # 从 sector 字段取股票名 + sector_name = r["sector"] or "" + + # 尝试提取代码 + codes_found = [] + for s in searched: + # searched_stocks 存的是股票名称列表 + # 尝试从 summary 里找代码 + import re + codes = re.findall(r'\d{6}', r["summary"] or "") + codes_found.extend(codes) + + if not codes_found: + # 没有直接代码,用名称去查 + mark_processed(conn, r["id"]) + continue + + code = codes_found[0] + quote = fetch_quote(code) + + summary, action = evaluate_and_act(dict(r), quote) + + if action == "watchlist": + # 加自选 + results.append(f"✅ {sector_name}({code}): {summary}") + # 写入 watchlist.json + try: + wl = json.loads(WATCHLIST_PATH.read_text()) + wl.setdefault("stocks", []) + # 检查是否已在 + existing = [s for s in wl["stocks"] if s.get("code") == code] + if not existing: + wl["stocks"].append({ + "code": code, + "name": quote.get("name", sector_name), + "price": quote.get("price", 0), + "status": "watching", + "source": "xiaoguo_scanner", + "added_at": today, + }) + WATCHLIST_PATH.write_text(json.dumps(wl, ensure_ascii=False, indent=2)) + except: + pass + elif action == "monitor": + results.append(f"🔄 {sector_name}({code}): {summary}") + else: + results.append(f"⏭️ {sector_name}({code}): {summary}") + + mark_processed(conn, r["id"]) + + conn.close() + + elapsed = time.time() - start + if results: + print(f"小果信号消费 | {today} | {len(results)}条处理 ({elapsed:.0f}s)") + for r in results: + print(f" {r}") + else: + print("[SILENT] 小果信号消费结束") + + +if __name__ == "__main__": + main()