""" Session Router — multi-channel session routing with command loop. For XMPP/VC/WeChat channels, provides TUI-equivalent session experience: - auto → resolves to the most recently active session (same as TUI --continue) - list/switch sessions via NL ("切换到xxx") or commands - LLM-driven command system (##list_sessions##, ##switch_session##, etc.) Flow: route(channel, sender, message) → 1. check selection mode (pending user choice) 2. build prompt with session context from SQLite 3. send to LLM → parse reply for ##commands## 4. if command → execute → append result → re-send to LLM (loop) 5. if no command → return final reply """ import os, json, time, re, sqlite3, threading from datetime import datetime, timezone, timedelta from typing import Optional # ── Constants ── DB_PATH = os.path.expanduser("~/.local/share/opencode/opencode.db") MAX_LOOPS = 10 SELECTION_TIMEOUT = 120 # seconds RECENT_MSG_LIMIT = 200 # context messages from SQLite (小荷 uses 200) SESSION_LIST_LIMIT = 15 # max sessions shown in list # ── Command regex: ##command## or ##command:args## ── CMD_RE = re.compile(r"##(\w+)(?::([^#\n]*))?##") # ── Timezone ── TZ = timezone(timedelta(hours=8)) def _fmt_ts(ts_ms: int) -> str: """Format millisecond timestamp to MM-DD HH:MM string.""" return datetime.fromtimestamp(ts_ms / 1000, tz=TZ).strftime("%m-%d %H:%M") def _src_tag(source: str) -> str: """Map source to display tag.""" return { "tui": "[TUI] ", "xmpp": "[群聊] ", "vc": "[VC] ", "bridge": "[桥接] ", }.get(source, f"[{source}] ") # ═══════════════════════════════════════════════════════════════ # Context extractor # ═══════════════════════════════════════════════════════════════ def extract_session_context(session_id: str, limit: int = RECENT_MSG_LIMIT) -> str: """ Read last N conversational turns from opencode.db for a session. Returns formatted string like: 用户: xxx\n小小莫: xxx\n... Empty string on failure or no data. """ try: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row # 1. Get recent message IDs + timestamps msg_rows = conn.execute(""" SELECT id, data, time_created FROM message WHERE session_id=? ORDER BY time_created DESC LIMIT ? """, (session_id, limit * 3)).fetchall() if not msg_rows: conn.close() return "" msg_ids = [r["id"] for r in msg_rows] # 2. Get text parts for those messages placeholders = ",".join("?" * len(msg_ids)) part_rows = conn.execute( f""" SELECT message_id, data FROM part WHERE session_id=? AND message_id IN ({placeholders}) ORDER BY time_created """, (session_id, *msg_ids), ).fetchall() conn.close() # 3. Build role → parts + timestamp + source mapping msg_map = {} for r in msg_rows: try: d = json.loads(r["data"]) except (json.JSONDecodeError, ValueError): d = {} ts_str = _fmt_ts(r["time_created"]) if r["time_created"] else "" source = d.get("source", "tui") # tui (default) / xmpp / vc / bridge msg_map[r["id"]] = {"role": d.get("role", "?"), "ts": ts_str, "source": source, "parts": []} for r in part_rows: try: d = json.loads(r["data"]) except (json.JSONDecodeError, ValueError): continue if d.get("type") == "text": txt = (d.get("text") or "").strip() if txt: msg_map[r["message_id"]]["parts"].append(txt) # 4. Format as conversation lines (chronological order) with timestamps + source labels lines = [] for r in reversed(msg_rows): info = msg_map[r["id"]] role_label = "用户" if info["role"] == "user" else "小小莫" src_label = _src_tag(info["source"]) ts_tag = f"[{info['ts']}] " if info["ts"] else "" for txt in info["parts"][:3]: lines.append(f"{ts_tag}{src_label}{role_label}: {txt}") return "\n".join(lines[-limit:]) except Exception: return "" # ═══════════════════════════════════════════════════════════════ # Session Router # ═══════════════════════════════════════════════════════════════ class SessionRouter: """ Routes messages from external channels to the correct session, handling session switching, context injection, and command execution. Args: bridge: SessionBridge instance (raw LLM caller) db_path: path to opencode.db binding_file: path to session_routing.json default_session: fallback session ID when nothing is bound """ def __init__( self, bridge, db_path: str = DB_PATH, binding_file: str = "", default_session: str = "", ): self.bridge = bridge self.db_path = db_path self.binding_file = binding_file or os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "temp", "session_routing.json", ) os.makedirs(os.path.dirname(self.binding_file), exist_ok=True) self.default_session = default_session self._lock = threading.Lock() # Persisted bindings: {"channel:sender": "session_id" or "__auto__"} self._bindings: dict[str, str] = {} self._load_bindings() # In-memory selection mode state self._pending: dict[str, dict] = {} # In-memory conversation context for active command loops # {"channel:sender": [{"role": ..., "content": ...}, ...]} self._contexts: dict[str, list[dict]] = {} # Command registry self._commands = { "list_sessions": self._cmd_list_sessions, "switch_session": self._cmd_switch_session, "git_pull": self._cmd_git_pull, "git_push": self._cmd_git_push, "read_file": self._cmd_read_file, "write_file": self._cmd_write_file, "run": self._cmd_run, "help": self._cmd_help, } # Command documentation (injected as part of system prompt) self._cmd_guide = ( "你可以使用以下命令让 bot 执行操作,把命令放在回复中即可:\n" "##list_sessions## 列出所有可用的 session\n" "##switch_session:xxx## 切换到标题包含 xxx 的 session\n" "##git_pull:path## git pull 指定目录,默认 ~/projects/self-growing-knowledge\n" "##git_push:path;msg## git add/commit/push,message 可选\n" "##read_file:path## 读取文件内容(展示前 2000 行)\n" "##write_file:path|content## 写入/覆盖文件(注意:会覆盖!)\n" "##run:command## 执行任意 shell 命令\n" "##help## 查看所有可用命令\n" ) # ── Binding persistence ────────────────────────────── def _load_bindings(self): try: with open(self.binding_file, "r", encoding="utf-8") as f: self._bindings = json.load(f) except (FileNotFoundError, json.JSONDecodeError, ValueError): self._bindings = {} def _save_bindings(self): try: with open(self.binding_file, "w", encoding="utf-8") as f: json.dump(self._bindings, f, ensure_ascii=False, indent=2) except Exception: pass def _get_binding(self, key: str) -> str: """Return session_id for key, or '__auto__' if not bound.""" return self._bindings.get(key, "__auto__") def _set_binding(self, key: str, session_id: str): with self._lock: self._bindings[key] = session_id self._save_bindings() # ── Session resolution ─────────────────────────────── def _resolve_auto(self) -> str: """Query DB for the most recently updated session.""" try: conn = sqlite3.connect(self.db_path) row = conn.execute( "SELECT id FROM session ORDER BY time_updated DESC LIMIT 1" ).fetchone() conn.close() if row: return row[0] except Exception: pass return self.default_session def _resolve_session(self, key: str) -> str: """Resolve the effective session ID for a binding key.""" binding = self._get_binding(key) if binding == "__auto__": return self._resolve_auto() return binding def _get_session_title(self, session_id: str) -> str: """Look up session title from DB.""" try: conn = sqlite3.connect(self.db_path) row = conn.execute( "SELECT title FROM session WHERE id=?", (session_id,) ).fetchone() conn.close() if row: return row[0] except Exception: pass return session_id[:20] # ── Command parsing ───────────────────────────────── @staticmethod def _parse_command(text: str): """ Parse first ##command## or ##command:args## from reply. Returns (cmd_name, args, clean_text). - cmd_name: str or None - args: str or None - clean_text: text before the command, stripped """ m = CMD_RE.search(text) if not m: return None, None, text cmd = m.group(1) args = m.group(2).strip() if m.group(2) else None clean_text = text[: m.start()].strip() return cmd, args, clean_text # ── Command handlers ───────────────────────────────── def _cmd_list_sessions(self, key: str, args: Optional[str]) -> str: """Query and format session list.""" try: conn = sqlite3.connect(self.db_path) rows = conn.execute( """ SELECT id, title, time_updated FROM session ORDER BY time_updated DESC LIMIT ? """, (SESSION_LIST_LIMIT,), ).fetchall() conn.close() current_id = self._resolve_session(key) lines = [] for sid, title, ts in rows: marker = " ← 当前" if sid == current_id else "" lines.append(f" {title} ({_fmt_ts(ts)}){marker}") return "可用 sessions:\n" + "\n".join(lines) except Exception as e: return f"查询 session 失败:{e}" def _cmd_switch_session(self, key: str, args: Optional[str]) -> str: """Fuzzy-match session title and switch.""" if not args: return "请指定 session 名称,例如:##switch_session:接龙##" try: conn = sqlite3.connect(self.db_path) rows = conn.execute( """ SELECT id, title FROM session WHERE title LIKE ? ORDER BY time_updated DESC LIMIT ? """, (f"%{args}%", SESSION_LIST_LIMIT), ).fetchall() conn.close() except Exception as e: return f"查询失败:{e}" if not rows: return f"未找到标题包含「{args}」的 session" if len(rows) == 1: sid, title = rows[0] self._set_binding(key, sid) return f"已切换到「{title}」" # Multiple matches → selection mode (list up to 15) self._enter_selection(key, "switch_session", rows) items = "\n".join( f" {i}. {title}" for i, (_, title) in enumerate(rows, 1) ) return f"找到 {len(rows)} 个匹配(仅显示前{SESSION_LIST_LIMIT}个),请回复编号选择:\n{items}" # ── Git 命令 ────────────────────────────────────────── def _cmd_git_pull(self, key: str, args: Optional[str]) -> str: path = args or "~/projects/self-growing-knowledge" try: import subprocess r = subprocess.run( f"cd {path} && git pull origin master 2>&1", shell=True, capture_output=True, text=True, timeout=60, ) out = r.stdout.strip() + (f"\n{r.stderr.strip()}" if r.stderr.strip() else "") return out or "(no output)" except subprocess.TimeoutExpired: return "(git pull 超时)" except Exception as e: return f"(错误: {e})" def _cmd_git_push(self, key: str, args: Optional[str]) -> str: path = "~/projects/self-growing-knowledge" msg = "auto commit" if args and ";" in args: parts = args.split(";", 1) path = parts[0].strip() msg = parts[1].strip() elif args: path = args try: import subprocess r = subprocess.run( f"cd {path} && git add . && git commit -m '{msg}' && git push origin master 2>&1", shell=True, capture_output=True, text=True, timeout=60, ) out = r.stdout.strip() + (f"\n{r.stderr.strip()}" if r.stderr.strip() else "") return out or "(no output)" except subprocess.TimeoutExpired: return "(git push 超时)" except Exception as e: return f"(错误: {e})" def _cmd_read_file(self, key: str, args: Optional[str]) -> str: if not args: return "请指定文件路径" try: import os path = os.path.expanduser(args) with open(path, "r", encoding="utf-8", errors="replace") as f: content = f.read(50000) # 前 50000 字符 lines = content.split("\n") shown = "\n".join(lines[:2000]) if len(lines) > 2000: shown += f"\n... ({len(lines) - 2000} 行已截断)" return shown or "(空文件)" except FileNotFoundError: return f"(文件不存在: {args})" except Exception as e: return f"(读取失败: {e})" def _cmd_write_file(self, key: str, args: Optional[str]) -> str: if not args or "|" not in args: return "用法: ##write_file:路径|内容##" try: import os path_part, content = args.split("|", 1) path = os.path.expanduser(path_part.strip()) with open(path, "w", encoding="utf-8") as f: f.write(content) return f"(已写入 {os.path.getsize(path)} 字节到 {path})" except Exception as e: return f"(写入失败: {e})" def _cmd_run(self, key: str, args: Optional[str]) -> str: if not args: return "请指定命令" try: import subprocess r = subprocess.run( args, shell=True, capture_output=True, text=True, timeout=60, ) out = r.stdout.strip() + (f"\n{r.stderr.strip()}" if r.stderr.strip() else "") return out or f"(exit {r.returncode})" except subprocess.TimeoutExpired: return "(命令超时)" except Exception as e: return f"(错误: {e})" def _cmd_help(self, key: str, args: Optional[str]) -> str: return self._cmd_guide # ── Selection mode ────────────────────────────────── def _enter_selection(self, key: str, action: str, options: list): self._pending[key] = { "action": action, "options": options, "expires": time.time() + SELECTION_TIMEOUT, } def _handle_selection(self, key: str, message: str) -> Optional[str]: """ Handle a user message while in selection mode. Returns reply text if selection resolved, None if message should be processed normally (selection expired or cancelled). """ pending = self._pending.get(key) if not pending: return None if time.time() > pending["expires"]: del self._pending[key] return "选择已超时(120s),请重新操作。" text = message.strip() # Cancel if text.lower() in ("cancel", "取消", "算了"): del self._pending[key] return "已取消。" options = pending["options"] # Number selection if text.isdigit(): idx = int(text) - 1 if 0 <= idx < len(options): sid, title = options[idx] del self._pending[key] if pending["action"] == "switch_session": self._set_binding(key, sid) self._reset_context(key) return f"已切换到「{title}」" return "操作完成。" return f"请输入 1-{len(options)} 之间的编号。" # Keyword filter (narrow down within current options) matches = [(s, t) for s, t in options if text in t] if len(matches) == 1: sid, title = matches[0] del self._pending[key] if pending["action"] == "switch_session": self._set_binding(key, sid) self._reset_context(key) return f"已切换到「{title}」" return "操作完成。" elif len(matches) > 1: self._pending[key]["options"] = matches items = "\n".join( f" {i}. {t}" for i, (_, t) in enumerate(matches, 1) ) return f"找到多个匹配,请再次选择:\n{items}" # No match items = "\n".join( f" {i}. {t}" for i, (_, t) in enumerate(options, 1) ) return f"未找到匹配。请回复编号或输入 cancel 取消:\n{items}" # ── Context management for multi-turn command loops ── def _reset_context(self, key: str): """Clear accumulated conversation context for a key.""" self._contexts.pop(key, None) # ── Prompt building ───────────────────────────────── def _build_prompt(self, key: str, history: list[dict]) -> str: """ Build prompt with session title + command layer. 注意:不注入 TUI session 上下文(extract_session_context), 因为群聊对话跟 TUI 对话是两套上下文。桥接后的群聊上下文 由 SessionBridge 自己的 context log 管理(更干净)。 """ session_id = self._resolve_session(key) session_title = self._get_session_title(session_id) lines = [ f"[session: {session_title}]", "", ] # 不注入 TUI session 上下文,避免驴头不对马嘴 # ctx = extract_session_context(session_id, limit=20) # if ctx: # lines.append("【最近对话】") # lines.append(ctx) # lines.append("") lines.append( "[可用命令] ##switch_session:xxx## 切换session,##list_sessions## 列表," "##git_pull:路径## 拉代码,##git_push:路径;消息## 推代码," "##read_file:路径## 读文件,##write_file:路径|内容## 写文件," "##run:命令## 执行shell。说「去做」时必须同时用命令执行。" ) lines.append("---") if history: for entry in history: role_label = { "user": "用户", "assistant": "小小莫", "system": "系统", }.get(entry["role"], entry["role"]) lines.append(f"{role_label}:{entry['content']}") return "\n".join(lines) # ── Core LLM command loop ─────────────────────────── def _llm_loop( self, key: str, history: list[dict], loop_count: int = 0 ) -> str: """ Send to LLM → parse command → execute → loop. Returns final reply text (all commands stripped). """ if loop_count >= MAX_LOOPS: return "(命令循环次数超限,请重试)" prompt = self._build_prompt(key, history) reply = self.bridge.send_raw(prompt) if not reply: return "(模型无响应,请稍后重试)" cmd, args, clean_text = self._parse_command(reply) if not cmd: # No command → this is the final answer return clean_text or reply # Execute command handler = self._commands.get(cmd) if not handler: # Unknown command → treat as normal reply return clean_text or reply result = handler(key, args) # Append to history (mutates the list, seen by recursive call) and loop history.append( {"role": "assistant", "content": clean_text or f"(执行{cmd})"} ) history.append( { "role": "system", "content": ( f"##{cmd}## 执行结果:{result}\n" "(请根据结果继续回复用户,如有需要可在回复中继续使用命令)" ), } ) return self._llm_loop(key, history, loop_count + 1) # ── Public entry point ────────────────────────────── def route(self, channel: str, sender: str, message: str) -> str: """ Route a message from an external channel. Args: channel: "xmpp", "vc", or "wechat" sender: user identifier (JID / UID / WXID) message: raw text from the user Returns: reply text to send back (all ##commands## stripped) """ key = f"{channel}:{sender}" # 1. Check selection mode sel_reply = self._handle_selection(key, message) if sel_reply is not None: return sel_reply # 2. Reset accumulated context for fresh conversation self._reset_context(key) # 3. Build initial prompt with user message + channel context prefix = "" if channel == "xmpp" and "/" in sender: # XMPP group chat: sender is "room/nickname" room = sender.split("/")[0] nick = sender.split("/")[1] # Include nick so LLM knows who said it prefix = f"[群聊/{room.split('@')[0]}] {nick}: " tagged = f"{prefix}{message}" history = [{"role": "user", "content": tagged}] # Dynamic context switching: record active TUI session # so bot can inject TUI context when replying to coregroup if channel != "xmpp": try: sid = self._resolve_session(key, allow_create=False) if sid: from chat_bridge import mark_active_tui_session mark_active_tui_session(sid) except Exception: pass # 4. Run LLM command loop return self._llm_loop(key, history)