#!/usr/bin/env python3 """ XMPP Agent Core — 统一版 ========================= Single bot core for all agents. Supports --agent xxm|mohe|zhiwei|xiaoguo. Usage: python xmpp_agent_core.py --agent xxm # xxm, uses chat_bridge python xmpp_agent_core.py --agent mohe # mohe, uses Hermes API python xmpp_agent_core.py --agent zhiwei # zhiwei, uses Hermes API python xmpp_agent_core.py --agent xiaoguo # xiaoguo, uses Hermes API Shares: PID lock, reconnect, MUC join, dedup, batching, coordinator protocol (GRANT/REVOKE), HTTP bridge. Differs only in LLM calling method (chat_bridge vs Hermes API). """ import os, sys, time, threading, asyncio, logging, json, re, ssl import urllib.request, http.server, urllib.parse # ── Windows selector loop (slixmpp needs it on Windows) ── if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # ── PATH: allow imports from gateway/scripts/ (proc_guard, chat_bridge) ── _GATEWAY_SCRIPTS = os.path.join(os.path.dirname(os.path.abspath(__file__)), "gateway", "scripts") sys.path.insert(0, _GATEWAY_SCRIPTS) # ═══════════════════════════════════════════════════════════════ # AGENTS Configuration # ═══════════════════════════════════════════════════════════════ AGENTS = { "mohe": { "jid": "mohe@yoin.fun", "password": "hermes123", "nick": "mohe", "name_cn": "莫荷", "http_port": 5804, "gateway": "http://localhost:8642/v1/chat/completions", "session_id": "xmpp-mohe-v2", "server": "127.0.0.1", "port": 5222, "muc_rooms": ["coregroup@conference.yoin.fun"], "mention": "@mohe/@莫荷", }, "zhiwei": { "jid": "zhiwei@yoin.fun", "password": "hermes123", "nick": "zhiwei", "name_cn": "知微", "http_port": 5805, "gateway": "http://localhost:8643/v1/chat/completions", "session_id": "xmpp-zhiwei", "server": "127.0.0.1", "port": 5222, "muc_rooms": ["coregroup@conference.yoin.fun"], "mention": "@zhiwei/@知微", }, "xiaoguo": { "jid": "xiaoguo@yoin.fun", "password": "hermes123", "nick": "xiaoguo", "name_cn": "小果", "http_port": 5806, "gateway": "http://localhost:8645/v1/chat/completions", "session_id": "xmpp-xiaoguo", "kanban_session_id": "xmpp-xiaoguo-kanban", "server": "127.0.0.1", "port": 5222, "muc_rooms": ["coregroup@conference.yoin.fun"], "mention": "@xiaoguo/@小果", }, "xxm": { "jid": "xxm@yoin.fun", "password": "hermes123", "nick": "xxm", "name_cn": "笑笑", "http_port": 5802, "bridge": "chat_bridge", # use local chat_bridge instead of Hermes API "session_id": "ses_xxm_xmpp", "kanban_session_id": "xmpp-xxm-kanban", "server": "192.168.1.246", # LAN direct connect "port": 5222, "muc_rooms": [ "coregroup@conference.yoin.fun", "jujidina@conference.yoin.fun", ], "mention": "@xxm/@笑笑", }, } # ── Agent selection ── _agent_name = "mohe" if "--agent" in sys.argv: idx = sys.argv.index("--agent") if idx + 1 < len(sys.argv): _agent_name = sys.argv[idx + 1] cfg = AGENTS.get(_agent_name, AGENTS["mohe"]) # ═══════════════════════════════════════════════════════════════ # PID Lock — prevent duplicate instances # ═══════════════════════════════════════════════════════════════ from proc_guard import guard as _proc_guard _lock = _proc_guard(f"xmpp_bot_{_agent_name}") if not _lock.ok: print(_lock.message, flush=True) sys.exit(1) # ═══════════════════════════════════════════════════════════════ # Logging # ═══════════════════════════════════════════════════════════════ _LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "gateway", "logs") os.makedirs(_LOG_DIR, exist_ok=True) _LOG_FILE = os.path.join(_LOG_DIR, f"xmpp_{_agent_name}.log") _START_TIME = time.time() def log(m: str): with open(_LOG_FILE, "a", encoding="utf-8") as f: f.write(f"{time.strftime('%H:%M:%S')} {m}\n") logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') # ═══════════════════════════════════════════════════════════════ # LLM Bridge Init — abstracted per agent # ═══════════════════════════════════════════════════════════════ _IS_CHAT_BRIDGE = cfg.get("bridge") == "chat_bridge" _router = None # set only for chat_bridge (xxm) # ── Kanban session support ── _CALL_SEQ = 0 _KANBAN_SESSION_ID = cfg.get("kanban_session_id", None) if _IS_CHAT_BRIDGE: from chat_bridge import SessionBridge from session_router import SessionRouter _bridge = SessionBridge(session_id=cfg["session_id"]) _router = SessionRouter(bridge=_bridge, default_session=cfg["session_id"]) # Kanban-dedicated bridge + router for separate session _kanban_sid = _KANBAN_SESSION_ID or f"{cfg['session_id']}-kanban" _kanban_bridge = SessionBridge(session_id=_kanban_sid) _kanban_router = SessionRouter(bridge=_kanban_bridge, default_session=_kanban_sid) log(f"LLM: chat_bridge (session={cfg['session_id']})") log(f"Kanban: chat_bridge (session={_kanban_sid})") else: _opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) log(f"LLM: Hermes API ({cfg['gateway']})") def _call_llm(content: str, sender: str, is_group: bool = False, session_id: str | None = None) -> str: """Abstract LLM call. Returns raw response text (or empty string). If session_id provided and differs from default, route to kanban handler.""" if _IS_CHAT_BRIDGE: if session_id and session_id != cfg["session_id"]: # Prepends kanban-handler instructions so LLM knows how to handle kanban_content = ( "【看板处理协议】\n" "收到卡片后三步判断:\n" " A) 任务明确 → 直接执行 → 评论结果 + 更新状态\n" " B) 信息不足 → 评论提问 + 设为 blocked\n" " C) 不是我的活 → 评论说明 + 转派(如果能判断)\n" "\n" "汇报规则:\n" " - 任务完成 → 简短 DM 给老莫摘要\n" " - 评论/状态变更 → 不汇报\n" " - 追问/转派 → 不汇报\n" "\n" "可用 API:\n" " curl http://192.168.1.246:5803/api/kanban/t_xxx 查看卡片详情\n" " 更新操作走 Kanban Dashboard UI\n" "\n" "卡片上下文不够?→ 用 session_search 查历史\n" f"---\n{content}" ) return _kanban_router.route("xmpp", sender, kanban_content) or "" return _router.route("xmpp", sender, content) or "" else: return _call_hermes_api(content, session_id) def _call_hermes_api(content: str, session_id: str | None = None) -> str: """POST to Hermes API, return response text or empty string.""" target_sid = session_id or cfg["session_id"] try: payload = json.dumps({ "model": "hermes-agent", "messages": [{"role": "user", "content": content}] }).encode() req = urllib.request.Request(cfg["gateway"], data=payload, method="POST") req.add_header("Content-Type", "application/json") req.add_header("Authorization", "Bearer hermes123") req.add_header("X-Hermes-Session-Id", target_sid) result = _opener.open(req, timeout=600) data = json.loads(result.read()) reply = data.get("choices", [{}])[0].get("message", {}).get("content", "") return reply.strip() except Exception as e: log(f"!!! Hermes API error: {e}") return "" # ═══════════════════════════════════════════════════════════════ # EasyTier control (Windows) # ═══════════════════════════════════════════════════════════════ _EASYTIER_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "tools", "easytier") _EASYTIER_CORE = os.path.join(_EASYTIER_DIR, "easytier-core.exe") _EASYTIER_PID_FILE = os.path.join(_EASYTIER_DIR, "easytier.pid") _EASYTIER_NET = "--network-name mynet --network-secret ce75d0a5" _EASYTIER_RELAY = "--peers tcp://47.115.32.206:11010" _EASYTIER_IP = "--ipv4 10.144.144.3" def _start_easytier(): """Start EasyTier on Windows.""" import subprocess as _sp if not os.path.exists(_EASYTIER_CORE): log(f"EasyTier binary not found: {_EASYTIER_CORE}") return # Check if already running if os.path.exists(_EASYTIER_PID_FILE): try: with open(_EASYTIER_PID_FILE) as f: old_pid = int(f.read().strip()) _sp.run(["taskkill", "/f", "/pid", str(old_pid)], capture_output=True, timeout=5) log(f"Killed old EasyTier (PID {old_pid})") except Exception: pass # Start cmd = f'start /b "" "{_EASYTIER_CORE}" {_EASYTIER_NET} {_EASYTIER_RELAY} {_EASYTIER_IP} --disable-encryption --no-listener' try: _sp.run(cmd, shell=True, timeout=5) log("EasyTier start command issued") except Exception as e: log(f"EasyTier start error: {e}") def _stop_easytier(): """Stop EasyTier on Windows.""" import subprocess as _sp try: _sp.run(["taskkill", "/f", "/im", "easytier-core.exe"], capture_output=True, timeout=5) log("EasyTier stopped (taskkill)") except Exception as e: log(f"EasyTier stop error: {e}") # Clean up PID file try: if os.path.exists(_EASYTIER_PID_FILE): os.remove(_EASYTIER_PID_FILE) except Exception: pass def _check_easytier() -> bool: """Check if EasyTier is running on Windows.""" import subprocess as _sp try: r = _sp.run(["tasklist", "/fi", "imagename eq easytier-core.exe"], capture_output=True, text=True, timeout=5) return "easytier-core.exe" in r.stdout except Exception: return False # ═══════════════════════════════════════════════════════════════ # Message Dedup # ═══════════════════════════════════════════════════════════════ _DEDUP_CACHE: set[str] = set() _DEDUP_LOCK = threading.Lock() def _is_duplicate(msg_id: str) -> bool: if not msg_id: return False with _DEDUP_LOCK: if msg_id in _DEDUP_CACHE: return True _DEDUP_CACHE.add(msg_id) if len(_DEDUP_CACHE) > 100: _DEDUP_CACHE.clear() return False # ═══════════════════════════════════════════════════════════════ # Coordinator Protocol (shared across all agents) # ═══════════════════════════════════════════════════════════════ _COORDINATOR: str = "mohe" _GRANTED: str | None = None _REVOKED_UNTIL: float = 0.0 _SHUTUP_PATTERNS = ["闭嘴", "别说话", "安静", "shut", "stfu", "别说了", "停"] def _process_coordinator_signals(nickname: str, body: str) -> bool: """Parse coordinator/GRANT/REVOKE from incoming messages. Returns True if message was a control signal (consumed, no further processing).""" global _COORDINATOR, _GRANTED, _REVOKED_UNTIL # 1. hmo switches coordinator if nickname == 'hmo' and 'coordinator=' in body.lower(): for name in ('mohe', 'zhiwei', 'xxm'): if f'coordinator={name}' in body.lower(): _COORDINATOR = name _GRANTED = None log(f"Coordinator switched to {name} by hmo") return True # 2. GRANT signal (overrides REVOKE) gm = re.search(r'\[GRANT:(\w+)\]', body) if gm: _GRANTED = gm.group(1) _REVOKED_UNTIL = 0 log(f"GRANT: {_GRANTED}") return True # 3. REVOKE signal (5min auto-restore) rm = re.search(r'\[REVOKE:(\w+)\]', body) if rm and rm.group(1) == cfg["nick"]: _REVOKED_UNTIL = time.time() + 300 log(f"REVOKEd: {cfg['nick']} silenced for 5min") return True return False def _check_shutup(body: str) -> bool: """hmo says shut up → 5min silence.""" lower = body.lower().strip() for pat in _SHUTUP_PATTERNS: if pat.lower() in lower: _REVOKED_UNTIL = time.time() + 300 log(f"(shutup: '{pat}' → 5min silence)") return True return False def _process_llm_grant(response: str): """Parse GRANT signal from LLM's own response.""" global _GRANTED gm = re.search(r'\[GRANT:(\w+)\]', response) if gm: _GRANTED = gm.group(1) _REVOKED_UNTIL = 0 log(f"LLM GRANT: {_GRANTED}") # ═══════════════════════════════════════════════════════════════ # Response Extraction # ═══════════════════════════════════════════════════════════════ _SILENCE_PATTERNS = [ "保持沉默", "不应[该]?回复", "没有.*@.*我", "不是对[我我说]", "跟我无关", "我不用回复", "不该回复", "不参与", "不是我[应]?该[说回]", ] _EXEC_RE = re.compile(r"##exec:(.+?)##", re.DOTALL) _DELAY_RE = re.compile(r"##delay:?(\d+)?##") _DELAY_DEFAULT = 15 _EXEC_TIMEOUT = 60 def _strip_toolcall_xml(text: str) -> str: t = text t = re.sub(r']*>.*?(|$)', '', t, flags=re.DOTALL) t = re.sub(r'.*?(|$)', '', t, flags=re.DOTALL) t = re.sub(r']*>.*?(|$)', '', t, flags=re.DOTALL) t = re.sub(r'.*?(|$)', '', t, flags=re.DOTALL) return t.strip() def _extract_response(text: str) -> str | None: """Strip __SILENT__, reasoning blocks, natural language silence. Returns actual content to send, or None to stay silent.""" if not text: return None t = text.strip() if not t: return None t = _strip_toolcall_xml(t) # Natural language silence detection if not t.startswith("__SILENT__"): first = t.split("\n", 1)[0] for pat in _SILENCE_PATTERNS: if re.search(pat, first): return None return t # Has __SILENT__ prefix parts = t.split("\n", 1) if len(parts) < 2: return None rest = parts[1].strip() while True: m = re.match(r'^([^)]*)\s*', rest) if m: rest = rest[m.end():] continue m = re.match(r'^\([^)]*\)\s*', rest) if m: rest = rest[m.end():] continue break return rest.strip() or None # ═══════════════════════════════════════════════════════════════ # Message Batching (3s debounce + serialized processing) # ═══════════════════════════════════════════════════════════════ _BATCH_WINDOW = 3.0 _BATCH_TIMEOUT = 300 _batch_entries: dict[str, list[str]] = {} _batch_timers: dict[str, threading.Timer] = {} _batch_processing: set[str] = set() _batch_pending: dict[str, list[str]] = {} _batch_lock = threading.Lock() _BOT_NICK = cfg["nick"] def _run_command(cmd: str) -> str: """Execute ##exec:command## shell command.""" log(f"(exec: {cmd[:120]})") try: import subprocess r = subprocess.run(cmd, shell=True, capture_output=True, timeout=_EXEC_TIMEOUT, text=True, encoding='utf-8', errors='replace') out = (r.stdout or "") + (r.stderr or "") out = out.strip() or f"(no output, exit={r.returncode})" log(f"(exec done: {len(out)} bytes, exit={r.returncode})") return out except subprocess.TimeoutExpired: log(f"(exec timeout >{_EXEC_TIMEOUT}s)") return "(命令超时)" except Exception as e: log(f"(exec error: {e})") return f"(命令执行失败: {e})" def _schedule_delayed(delay_sec: int, room: str): """Schedule ##delay:N## re-invocation.""" global _xmpp_ref import subprocess as _sp def _fire(): bot = _xmpp_ref if not bot: return try: prompt = "时间到,请根据最新的信息汇报结果。" raw = _call_llm(prompt, room, is_group=True) reply = _extract_response(raw) if reply: text = reply.strip() bot.send_message(mto=room, mbody=text, mtype='groupchat') log(f"-> [Delay][{room}]: {text[:80]}") except Exception as e: log(f"!! delay err: {e}") t = threading.Timer(delay_sec, _fire) t.daemon = True t.start() log(f"(delay +{delay_sec}s → {room})") def _batch_done(room: str): """Called when batch LLM finishes. Flush pending if any.""" with _batch_lock: _batch_processing.discard(room) pending = _batch_pending.pop(room, None) if pending: _batch_entries[room] = pending t = threading.Timer(0.1, _fire_batch, args=[room]) t.daemon = True t.start() _batch_timers[room] = t return log(f"[Batch][{room}] (idle)") def _fire_batch(room: str): """Collect batched entries and call LLM.""" with _batch_lock: entries = _batch_entries.pop(room, None) _batch_timers.pop(room, None) if not entries: return _batch_processing.add(room) combined = "\n".join(entries) def _handle(): timed_out = [False] def _timeout(): timed_out[0] = True log(f"[Batch][{room}] TIMEOUT ({_BATCH_TIMEOUT}s)") _batch_done(room) timer = threading.Timer(_BATCH_TIMEOUT, _timeout) timer.daemon = True timer.start() try: raw = _call_llm(combined, room, is_group=True) if not timed_out[0]: timer.cancel() _process_llm_reply(raw, room) else: log(f"[Batch][{room}] route returned after timeout, discarded") except Exception as e: log(f"!!! BATCH: {e}") if not timed_out[0]: timer.cancel() _batch_done(room) threading.Thread(target=_handle, daemon=True).start() def _batch_group_message(room: str, nickname: str, body: str) -> bool: """Add message to room batch. Returns True if batched, False if @mention (immediate).""" if f"@{_BOT_NICK}" in body or body.startswith(_BOT_NICK): return False formatted = f"[{nickname}]: {body}" with _batch_lock: if room in _batch_processing: _batch_pending.setdefault(room, []).append(formatted) return True timer = _batch_timers.pop(room, None) if timer: timer.cancel() _batch_entries.setdefault(room, []).append(formatted) t = threading.Timer(_BATCH_WINDOW, _fire_batch, args=[room]) t.daemon = True t.start() _batch_timers[room] = t return True # ═══════════════════════════════════════════════════════════════ # MAM Recovery — fetch recent history on startup # ═══════════════════════════════════════════════════════════════ _MAM_RECOVERY = True _MAM_RECOVERY_LOCK = threading.Lock() _MAM_MARK_DONE = False _MAM_TIMEOUT = 30 def _set_mam_done(): global _MAM_RECOVERY with _MAM_RECOVERY_LOCK: _MAM_RECOVERY = False def _is_mam_recovery() -> bool: if time.time() - _START_TIME > _MAM_TIMEOUT: global _MAM_RECOVERY with _MAM_RECOVERY_LOCK: if _MAM_RECOVERY: _MAM_RECOVERY = False log("(MAM recovery timed out, force-disabled)") return _MAM_RECOVERY with _MAM_RECOVERY_LOCK: return _MAM_RECOVERY # ═══════════════════════════════════════════════════════════════ # XML Escape # ═══════════════════════════════════════════════════════════════ def _escape(text: str) -> str: return (text.replace("&", "&").replace("<", "<") .replace(">", ">").replace('"', """)) # ═══════════════════════════════════════════════════════════════ # HTTP Bridge — health, presence, messages, POST send # ═══════════════════════════════════════════════════════════════ _HTTP_PORT = cfg["http_port"] _MSG_BUF: list[dict] = [] _MSG_BUF_LOCK = threading.Lock() _xmpp_ref = None # set after bot creation def _record_group_msg(nickname: str, body: str): ts = time.strftime("%H:%M:%S") with _MSG_BUF_LOCK: _MSG_BUF.append({"ts": ts, "from": nickname, "body": body}) if len(_MSG_BUF) > 200: _MSG_BUF[:] = _MSG_BUF[-150:] class _BridgeHandler(http.server.BaseHTTPRequestHandler): def do_GET(self): parsed = urllib.parse.urlparse(self.path) if parsed.path == "/muc": try: muc_info = {"rooms": {}} bot = _xmpp_ref if bot is not None and 'xep_0045' in bot.plugin: muc_plugin = bot.plugin['xep_0045'] for room_jid in cfg["muc_rooms"]: room_data = {"jid": room_jid, "participants": []} try: if room_jid in muc_plugin.rooms: room = muc_plugin.rooms[room_jid] for nick, info in room.get('roster', {}).items(): room_data["participants"].append({ "nick": nick, "jid": str(info.get('jid', '')), "affiliation": str(info.get('affiliation', '')), "role": str(info.get('role', '')), }) except Exception as e: room_data["error"] = str(e) muc_info["rooms"][room_jid] = room_data self._reply(200, muc_info) except Exception as e: self._reply(500, {"ok": False, "error": str(e)}) return if parsed.path == "/health": try: bot = _xmpp_ref session_ok = bot.session_started_event.is_set() if (bot and hasattr(bot, 'session_started_event')) else False socket_ok = bot.is_connected() if (bot and hasattr(bot, 'is_connected')) else False self._reply(200, { "ok": True, "xmpp_connected": session_ok or socket_ok, "agent": _agent_name, "jid": cfg["jid"], "uptime_sec": int(time.time() - _START_TIME), "muc_rooms": cfg["muc_rooms"], }) except Exception as e: self._reply(500, {"ok": False, "error": str(e)}) return if parsed.path.startswith("/presence"): jid_to_check = parsed.path[len("/presence/"):].strip() if not jid_to_check: self._reply(400, {"ok": False, "error": "missing JID"}) return try: info = {"jid": jid_to_check, "online": False, "resources": []} bot = _xmpp_ref if bot and hasattr(bot, 'client_roster'): roster = bot.client_roster if jid_to_check in roster: resources = list(roster[jid_to_check].resources.keys()) info["online"] = len(resources) > 0 info["resources"] = resources self._reply(200, info) except Exception as e: self._reply(500, {"ok": False, "error": str(e)}) return if parsed.path == "/messages": try: qs = urllib.parse.parse_qs(parsed.query) sender = qs.get("from", [None])[0] with _MSG_BUF_LOCK: msgs = list(_MSG_BUF) if sender: msgs = [m for m in msgs if m["from"] == sender] self._reply(200, {"ok": True, "count": len(msgs), "messages": msgs[-50:]}) except Exception as e: self._reply(500, {"ok": False, "error": str(e)}) return self._reply(404, {"ok": False, "error": "not found"}) def do_POST(self): try: length = int(self.headers.get('Content-Length', 0)) body = json.loads(self.rfile.read(length)) path = urllib.parse.urlparse(self.path).path.rstrip('/') # /easytier endpoint — execute EasyTier action locally (no XMPP DM) if path == "/easytier": action = body.get("action", "") if action == "start": _start_easytier() self._reply(200, {"ok": True, "message": "EasyTier started"}) elif action == "stop": _stop_easytier() self._reply(200, {"ok": True, "message": "EasyTier stopped"}) elif action == "status": running = _check_easytier() self._reply(200, {"ok": True, "running": running}) else: self._reply(400, {"ok": False, "error": "action must be start|stop|status"}) return to = body.get('to', cfg["muc_rooms"][0]) msg = body.get('message', '') or body.get('body', '') msg_type = body.get('type', 'groupchat') if not msg: self._reply(400, {"ok": False, "error": "empty message"}) return safe = _escape(msg.strip()) bot = _xmpp_ref if bot: bot.send_message(mto=to, mbody=msg.strip(), mtype=msg_type) _record_group_msg(cfg["nick"], msg) log(f"[http] → [{to.split('@')[0]}]: {msg[:80]} (type={msg_type})") self._reply(200, {"ok": True}) except Exception as e: self._reply(500, {"ok": False, "error": str(e)}) def _reply(self, code, data): body = json.dumps(data, ensure_ascii=False).encode('utf-8') self.send_response(code) self.send_header('Content-Type', 'application/json; charset=utf-8') self.send_header('Content-Length', len(body)) self.end_headers() self.wfile.write(body) def log_message(self, format, *args): pass def _start_http_bridge(): _httpd = http.server.HTTPServer(('0.0.0.0', _HTTP_PORT), _BridgeHandler) _t = threading.Thread(target=_httpd.serve_forever, daemon=True) _t.start() log(f"HTTP bridge ready on :{_HTTP_PORT}") # ═══════════════════════════════════════════════════════════════ # Reply Processing (shared for all LLM responses) # ═══════════════════════════════════════════════════════════════ def _process_llm_reply(raw_reply: str, room: str): """Process LLM response: check silence/delay/exec/send.""" global _xmpp_ref if not raw_reply: _batch_done(room) return # Parse GRANT signal from LLM response _process_llm_grant(raw_reply) # ##delay:N## → schedule later delay_m = _DELAY_RE.search(raw_reply) if delay_m: sec = int(delay_m.group(1)) if delay_m.group(1) else _DELAY_DEFAULT _schedule_delayed(sec, room) _batch_done(room) return # ##exec:command## → run command, use output as reply exec_m = _EXEC_RE.search(raw_reply) if exec_m: output = _run_command(exec_m.group(1)) raw_reply = _EXEC_RE.sub(output, raw_reply, count=1) # Extract actual response reply_text = _extract_response(raw_reply) if reply_text: text = reply_text.strip() bot = _xmpp_ref if bot: bot.send_message(mto=room, mbody=text, mtype='groupchat') log(f"-> [{room.split('@')[0]}]: {text[:80]}") else: log(f"-> [{room.split('@')[0]}]: (silent)") _batch_done(room) # ═══════════════════════════════════════════════════════════════ # Group message handler # ═══════════════════════════════════════════════════════════════ def _handle_group_message(msg): """Process a groupchat message (runs in thread).""" global _COORDINATOR, _GRANTED, _REVOKED_UNTIL if _is_mam_recovery(): return msg_id = msg.get("id", "") if _is_duplicate(msg_id): return body = str(msg["body"]).strip() if not body: return full_from = str(msg["from"]) room = full_from.split("/")[0] nickname = full_from.split("/")[1] if "/" in full_from else "" # Self-message skip if nickname == cfg["nick"]: log(f"(self) {body[:80]}") return _record_group_msg(nickname, body) # Coordinator signals if _process_coordinator_signals(nickname, body): return # Revoke check is_revoked = time.time() < _REVOKED_UNTIL if is_revoked and _GRANTED == cfg["nick"]: _GRANTED = None is_revoked = False log(f"GRANT overrides REVOKE for {cfg['nick']}") if _check_shutup(body): return if is_revoked: body = f"【只读消息】你被收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {room}] {nickname} 说: {body}" # Batch or immediate (@mention) if _batch_group_message(room, nickname, body): log(f"[{room.split('@')[0]}] {nickname}: {body[:80]} (batched)") return log(f"[{room.split('@')[0]}] {nickname}: {body[:80]}") raw = _call_llm(body, full_from, is_group=True) _process_llm_reply(raw, room) # ═══════════════════════════════════════════════════════════════ # Private message handler # ═══════════════════════════════════════════════════════════════ def _handle_private_message(msg): """Process a private chat message.""" global _CALL_SEQ if msg["type"] == "groupchat": return msg_id = msg.get("id", "") if _is_duplicate(msg_id): return body = str(msg["body"]).strip() sender = str(msg["from"]).split("/")[0] log(f"<{sender}> {body[:80]}") if sender == cfg["jid"]: log("(skipped self)") return if time.time() < _REVOKED_UNTIL: log(f"(silenced) <{sender}> dropped") return if _check_shutup(body): return # ── Kanban routing ── _CALL_SEQ += 1 is_kanban = body.startswith('[Kanban]') target_sid = _KANBAN_SESSION_ID if is_kanban else cfg["session_id"] if is_kanban: log(f"📋 看板通知(#{_CALL_SEQ}): {body[:80]}") # ── EasyTier toggle ── if body.startswith('[EasyTier]'): action = body.replace('[EasyTier]', '').strip().lower() log(f"🔌 EasyTier command: {action}") if action == 'start': _start_easytier() reply_text = "[EasyTier] started on Windows" elif action == 'stop': _stop_easytier() reply_text = "[EasyTier] stopped on Windows" else: reply_text = f"[EasyTier] unknown action: {action}" bot = _xmpp_ref if bot: bot.send_message(mto=sender, mbody=reply_text, mtype='chat') log(f"-> {sender}: {reply_text}") return raw = _call_llm(body, sender, is_group=False, session_id=target_sid) if raw: reply = _extract_response(raw) if reply: text = reply.strip() bot = _xmpp_ref if bot: bot.send_message(mto=sender, mbody=text, mtype='chat') log(f"-> {sender}: {text[:80]}") # ═══════════════════════════════════════════════════════════════ # AgentBot Class # ═══════════════════════════════════════════════════════════════ import slixmpp class AgentBot(slixmpp.ClientXMPP): def __init__(self): super().__init__(cfg["jid"], cfg["password"]) # Connection settings self.enable_direct_tls = False self.enable_starttls = True self.auto_reconnect = True self.reconnect_max_delay = 10 self.whitespace_keepalive = True self.whitespace_keepalive_interval = 30 # SSL: accept self-signed certs ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE self.ssl_context = ctx # Event handlers self.add_event_handler("session_start", self._on_session_start) self.add_event_handler("message", self._on_any_message) self.add_event_handler("groupchat_message", self._on_group_msg) self.add_event_handler("disconnected", self._on_disconnected) self.add_event_handler("connected", self._on_connected) self.add_event_handler("session_end", self._on_session_end) self.add_event_handler("connection_failed", self._on_conn_failed) # MUC plugin self.register_plugin('xep_0045') def _on_connected(self, event): log("connection established") def _on_session_start(self, event): self.send_presence() self.get_roster() log(f"{cfg['jid']} online") # Register MAM plugin lazily try: self.register_plugin('xep_0313') except Exception: log("(MAM: xep_0313 not available)") # Join MUC rooms async def _join_all(): for room_jid in cfg["muc_rooms"]: try: self.plugin['xep_0045'].join_muc(room_jid, cfg["nick"]) presence = ( f"" f"" f"" f"" ) self.send_raw(presence) log(f"Joined {room_jid}") except Exception as e: log(f"MUC join failed {room_jid}: {e}") await asyncio.sleep(2) await asyncio.sleep(3) await self._fetch_mam_history() asyncio.ensure_future(_join_all()) async def _fetch_mam_history(self): """Query MAM for recent MUC messages to rebuild context.""" if 'xep_0313' not in self.plugin: log("(MAM: no plugin)") _set_mam_done() return # MAM recovery used in _on_session_start try: for room_jid in cfg["muc_rooms"]: log(f"(MAM: querying {room_jid} for last 50 messages...)") results = await self.plugin['xep_0313'].retrieve( jid=room_jid, rsm={'max': 50}, ) count = 0 for msg in results['mam']['results']: forwarded = msg['mam_result']['forwarded'] body = str(forwarded['stanza']['body'] or '').strip() if not body: continue nick = str(forwarded['stanza']['from']).split('/')[-1] if '/' in str(forwarded['stanza']['from']) else '?' # Feed into context for chat_bridge (xxm) if _IS_CHAT_BRIDGE: role = 'user' if nick != cfg["nick"] else 'assistant' try: _bridge._append_to_log(role, f"[{nick}]: {body[:300]}") except Exception: pass count += 1 log(f"(MAM: loaded {count} msgs from {room_jid})") _set_mam_done() log("(MAM recovery complete)") except Exception as e: log(f"(MAM error: {e})") _set_mam_done() def _on_group_msg(self, msg): threading.Thread(target=_handle_group_message, args=[msg], daemon=True).start() def _on_any_message(self, msg): threading.Thread(target=_handle_private_message, args=[msg], daemon=True).start() def _on_session_end(self, event): log(f"session ended") def _on_conn_failed(self, event): log(f"connection failed: {event}") def _on_disconnected(self, event): log(f"disconnected, reconnecting...") # ═══════════════════════════════════════════════════════════════ # Main # ═══════════════════════════════════════════════════════════════ def main(): log(f"Starting {cfg['jid']} ({cfg['name_cn']}) — agent={_agent_name}") if _IS_CHAT_BRIDGE: log(f" LLM: chat_bridge (session={cfg['session_id']})") else: log(f" LLM: Hermes API ({cfg['gateway']})") log(f" Server: {cfg['server']}:{cfg['port']}") log(f" Rooms: {cfg['muc_rooms']}") bot = AgentBot() global _xmpp_ref _xmpp_ref = bot _start_http_bridge() bot.connect(host=cfg["server"], port=cfg["port"]) log(f"Connecting {cfg['jid']}@{cfg['server']}:{cfg['port']}") loop = asyncio.get_event_loop() async def _status_check(): while True: await asyncio.sleep(60) log("(alive)") asyncio.ensure_future(_status_check()) try: loop.run_forever() except KeyboardInterrupt: log("Shutdown by user") except Exception as e: log(f"!!! MAIN LOOP CRASH: {e}") import traceback log(f"!!! {traceback.format_exc()[:500]}") raise if __name__ == "__main__": main()