#!/usr/bin/env python3 """session_to_cron_bridge.py — 将Hermes session DB中的cron报告写到cron/output目录 Hermes cron jobs(如快速盯盘)将LLM输出存在 session DB (state.db) 中。 cron_to_xmpp.py 扫描 ~/.hermes/cron/output/ 目录的 .md 文件推送到XMPP。 这个脚本弥补这个缺口:从state.db读取最新的cron输出,生成.md文件。 工作方式: 1. 查询 state.db 中最近的 cron 会话(source='cron') 2. 提取 assistant 的最后一条非空消息 3. 与 relay journal 对比去重 4. 新消息写入 cron/output// 目录 5. cron_to_xmpp.py 自然捡起并推送 """ import json import sqlite3 import subprocess import re import sys from datetime import datetime from pathlib import Path REAL_HOME = Path("/home/hmo") PROFILE = "position-analyst" # 要中继的 cron job ID 列表(需要推送到 XMPP 的) RELAY_JOBS = { "62a2ba59f7ff": "快速盯盘-15分钟", "e27e2e92ed80": "知识萃取-盘后", "9d1236d8a07f": "策略评估-每日", "5dde4e1a42ce": "分析师-持仓复查", } # 输出目录(与 cron_to_xmpp.py 一致) # 注意:~/.hermes 是 symlink 到 /home/hmo/.hermes/profiles/position-analyst/home/.hermes # cron_to_xmpp.py 使用绝对路径 REAL_HOME / ".hermes" / "cron" / "output" # 所以这里必须用绝对路径,不要相信 ~/.hermes 的解析 OUTPUT_DIRS = [ REAL_HOME / ".hermes" / "cron" / "output", REAL_HOME / ".hermes" / "profiles" / PROFILE / "cron" / "output", ] JOURNAL = REAL_HOME / ".hermes" / "cron" / ".relay_journal.json" STATE_DB = REAL_HOME / ".hermes" / "profiles" / PROFILE / "state.db" MAX_AGE_MINUTES = 70 # 只处理最近70分钟内的报告 TRACK_FILE = REAL_HOME / ".hermes" / "cron" / ".bridge_track.json" # 追踪已桥接的session def load_track(): try: return set(json.loads(TRACK_FILE.read_text())) except: return set() def save_track(entries): TRACK_FILE.write_text(json.dumps(sorted(entries))) def load_journal(): try: return set(json.loads(JOURNAL.read_text())) except: return set() def save_journal(entries): JOURNAL.write_text(json.dumps(sorted(entries))) def ensure_output_dirs(): for d in OUTPUT_DIRS: d.mkdir(parents=True, exist_ok=True) for job_id in RELAY_JOBS: (d / job_id).mkdir(exist_ok=True) def extract_report_content(content): """从assistant消息中提取报告正文""" if not content or content.strip() in ("", " ", "\n", "\n\n"): return None text = content.strip() # 跳过太短的消息 if len(text) < 20: return None # 跳过 [SILENT] if "[SILENT]" in text: return None # 跳过思考过程(只留下实际报告内容) # 如果消息以"Now let me"/"Let me"/"I need"等开头,尝试找后面的报告正文 lines = text.split('\n') report_lines = [] in_report = False for line in lines: if not in_report: # 报告特征:以【开头 或 包含📊 或 包含【知微】 if any(x in line for x in ["【", "📊", "【知微", "【⚡", "## "]): in_report = True report_lines.append(line) else: report_lines.append(line) if report_lines: text = '\n'.join(report_lines) if len(text) < 20: return None return text def scan(): processed = load_journal() tracked = load_track() new = set() n_written = 0 if not STATE_DB.exists(): print(f"state.db not found: {STATE_DB}", file=sys.stderr) return conn = sqlite3.connect(str(STATE_DB)) conn.row_factory = sqlite3.Row cur = conn.cursor() now = datetime.now() for job_id, job_name in RELAY_JOBS.items(): # Find recent sessions for this job cur.execute(''' SELECT id, started_at, message_count, source FROM sessions WHERE id LIKE ? ORDER BY started_at DESC LIMIT 10 ''', (f'cron_{job_id}_%',)) sessions = cur.fetchall() for s in sessions: session_id = s['id'] # Skip already bridged sessions if session_id in tracked: continue started_at = datetime.fromtimestamp(s['started_at']) if s['started_at'] else now # Skip too old sessions age_minutes = (now - started_at).total_seconds() / 60 if age_minutes > MAX_AGE_MINUTES: continue # Find the last assistant message cur.execute(''' SELECT content, timestamp FROM messages WHERE session_id = ? AND role = 'assistant' AND content NOT IN ('', ' ', '\n', '\n\n', '\n\n\n') ORDER BY timestamp DESC LIMIT 1 ''', (session_id,)) msg = cur.fetchone() if not msg: tracked.add(session_id) continue content = msg['content'].strip() report = extract_report_content(content) if not report: tracked.add(session_id) continue # Mark as tracked even before writing tracked.add(session_id) # Generate a unique key for this report ts = datetime.fromtimestamp(msg['timestamp']).strftime('%Y%m%d_%H%M%S') if msg['timestamp'] else started_at.strftime('%Y%m%d_%H%M%S') filename = f"{job_name}_{ts}.md" for out_dir in OUTPUT_DIRS: out_path = out_dir / job_id / filename key = str(out_path.resolve()) if key in processed or key in new: continue # Write the report as an .md file (matching cron_to_xmpp.py format) md_content = f"# Cron Job: {job_name} ({session_id})\n\n## Response\n\n{report}\n" out_path.write_text(md_content, encoding='utf-8') new.add(key) n_written += 1 print(f" Written: {out_path.relative_to(REAL_HOME)}", file=sys.stderr) conn.close() if tracked: save_track(tracked) print(f"桥接完成:写入{n_written}份新报告", file=sys.stderr) # 桥接脚本只负责写入 .md 文件,不做去重追踪 # 这样可以避免重复推送的复杂问题 # 可能每次运行会写重复的文件,但cron_to_xmpp.py会用journal去重 print(f"桥接完成:写入{n_written}份新报告", file=sys.stderr) if __name__ == "__main__": scan()