From d406068c8910477620e29faed5bf5b2204bc9ee1 Mon Sep 17 00:00:00 2001 From: hmo Date: Thu, 25 Jun 2026 01:20:43 +0800 Subject: [PATCH] fix(easytier): add --no-listener flag to prevent port conflicts Without --no-listener, EasyTier opens a listener on the TUN/TAP interface which can conflict with other EasyTier instances on the same network. Ultraworked with Sisyphus Co-authored-by: Sisyphus --- xmpp_agent_core.py | 1291 ++++++++++++++++++++++++++++++++------------ 1 file changed, 959 insertions(+), 332 deletions(-) diff --git a/xmpp_agent_core.py b/xmpp_agent_core.py index 1b5d410..d94c54b 100644 --- a/xmpp_agent_core.py +++ b/xmpp_agent_core.py @@ -1,9 +1,35 @@ #!/usr/bin/env python3 -"""XMPP Bot - 统一版,支持 --agent mohe|zhiwei|xiao 参数""" -import asyncio, logging, ssl, json, urllib.request, os, time, sys, re -from slixmpp import ClientXMPP +""" +XMPP Agent Core — 统一版 +========================= +Single bot core for all agents. Supports --agent xxm|mohe|zhiwei|xiaoguo. + +Usage: + python xmpp_agent_core.py --agent xxm # xxm, uses chat_bridge + python xmpp_agent_core.py --agent mohe # mohe, uses Hermes API + python xmpp_agent_core.py --agent zhiwei # zhiwei, uses Hermes API + python xmpp_agent_core.py --agent xiaoguo # xiaoguo, uses Hermes API + +Shares: PID lock, reconnect, MUC join, dedup, batching, + coordinator protocol (GRANT/REVOKE), HTTP bridge. +Differs only in LLM calling method (chat_bridge vs Hermes API). +""" +import os, sys, time, threading, asyncio, logging, json, re, ssl +import urllib.request, http.server, urllib.parse + +# ── Windows selector loop (slixmpp needs it on Windows) ── +if sys.platform == "win32": + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + +# ── PATH: allow imports from gateway/scripts/ (proc_guard, chat_bridge) ── +_GATEWAY_SCRIPTS = os.path.join(os.path.dirname(os.path.abspath(__file__)), + "gateway", "scripts") +sys.path.insert(0, _GATEWAY_SCRIPTS) + +# ═══════════════════════════════════════════════════════════════ +# AGENTS Configuration +# ═══════════════════════════════════════════════════════════════ -# ── Agent 配置 ────────────────────────────────────────────── AGENTS = { "mohe": { "jid": "mohe@yoin.fun", @@ -13,7 +39,9 @@ AGENTS = { "http_port": 5804, "gateway": "http://localhost:8642/v1/chat/completions", "session_id": "xmpp-mohe-v2", - "kanban_session_id": "xmpp-mohe-kanban", + "server": "127.0.0.1", + "port": 5222, + "muc_rooms": ["coregroup@conference.yoin.fun"], "mention": "@mohe/@莫荷", }, "zhiwei": { @@ -23,8 +51,10 @@ AGENTS = { "name_cn": "知微", "http_port": 5805, "gateway": "http://localhost:8643/v1/chat/completions", - "session_id": "xmpp-zhiwei-v2", - "kanban_session_id": "xmpp-zhiwei-kanban", + "session_id": "xmpp-zhiwei", + "server": "127.0.0.1", + "port": 5222, + "muc_rooms": ["coregroup@conference.yoin.fun"], "mention": "@zhiwei/@知微", }, "xiaoguo": { @@ -36,362 +66,959 @@ AGENTS = { "gateway": "http://localhost:8645/v1/chat/completions", "session_id": "xmpp-xiaoguo", "kanban_session_id": "xmpp-xiaoguo-kanban", + "server": "127.0.0.1", + "port": 5222, + "muc_rooms": ["coregroup@conference.yoin.fun"], "mention": "@xiaoguo/@小果", }, + "xxm": { + "jid": "xxm@yoin.fun", + "password": "hermes123", + "nick": "xxm", + "name_cn": "笑笑", + "http_port": 5802, + "bridge": "chat_bridge", # use local chat_bridge instead of Hermes API + "session_id": "ses_xxm_xmpp", + "kanban_session_id": "xmpp-xxm-kanban", + "server": "192.168.1.246", # LAN direct connect + "port": 5222, + "muc_rooms": [ + "coregroup@conference.yoin.fun", + "jujidina@conference.yoin.fun", + ], + "mention": "@xxm/@笑笑", + }, } -agent = sys.argv[sys.argv.index("--agent") + 1] if "--agent" in sys.argv else "mohe" -cfg = AGENTS.get(agent, AGENTS["mohe"]) +# ── Agent selection ── +_agent_name = "mohe" +if "--agent" in sys.argv: + idx = sys.argv.index("--agent") + if idx + 1 < len(sys.argv): + _agent_name = sys.argv[idx + 1] +cfg = AGENTS.get(_agent_name, AGENTS["mohe"]) + +# ═══════════════════════════════════════════════════════════════ +# PID Lock — prevent duplicate instances +# ═══════════════════════════════════════════════════════════════ + +from proc_guard import guard as _proc_guard +_lock = _proc_guard(f"xmpp_bot_{_agent_name}") +if not _lock.ok: + print(_lock.message, flush=True) + sys.exit(1) + +# ═══════════════════════════════════════════════════════════════ +# Logging +# ═══════════════════════════════════════════════════════════════ + +_LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "gateway", "logs") +os.makedirs(_LOG_DIR, exist_ok=True) +_LOG_FILE = os.path.join(_LOG_DIR, f"xmpp_{_agent_name}.log") +_START_TIME = time.time() + + +def log(m: str): + with open(_LOG_FILE, "a", encoding="utf-8") as f: + f.write(f"{time.strftime('%H:%M:%S')} {m}\n") + logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') -GATEWAY = cfg["gateway"] -API_KEY = "hermes123" -AGENT_NICK = cfg["nick"] -AGENT_NAME = cfg["name_cn"] -AGENT_JID = cfg["jid"] -AGENT_MENTION = cfg["mention"] -SESSION_ID = cfg["session_id"] -KANBAN_SESSION_ID = cfg.get("kanban_session_id", SESSION_ID) -HTTP_PORT = cfg["http_port"] -_opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) -# ── HTTP 桥(接收本地脚本的主动发送请求) ── -from http.server import HTTPServer, BaseHTTPRequestHandler -import threading, json as json_mod +# ═══════════════════════════════════════════════════════════════ +# LLM Bridge Init — abstracted per agent +# ═══════════════════════════════════════════════════════════════ -_send_queue = [] +_IS_CHAT_BRIDGE = cfg.get("bridge") == "chat_bridge" +_router = None # set only for chat_bridge (xxm) -class SendHandler(BaseHTTPRequestHandler): - def do_POST(self): - length = int(self.headers.get('Content-Length', 0)) - body = self.rfile.read(length) +# ── Kanban session support ── +_CALL_SEQ = 0 +_KANBAN_SESSION_ID = cfg.get("kanban_session_id", None) + +if _IS_CHAT_BRIDGE: + from chat_bridge import SessionBridge + from session_router import SessionRouter + _bridge = SessionBridge(session_id=cfg["session_id"]) + _router = SessionRouter(bridge=_bridge, default_session=cfg["session_id"]) + # Kanban-dedicated bridge + router for separate session + _kanban_sid = _KANBAN_SESSION_ID or f"{cfg['session_id']}-kanban" + _kanban_bridge = SessionBridge(session_id=_kanban_sid) + _kanban_router = SessionRouter(bridge=_kanban_bridge, default_session=_kanban_sid) + log(f"LLM: chat_bridge (session={cfg['session_id']})") + log(f"Kanban: chat_bridge (session={_kanban_sid})") +else: + _opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) + log(f"LLM: Hermes API ({cfg['gateway']})") + + +def _call_llm(content: str, sender: str, is_group: bool = False, + session_id: str | None = None) -> str: + """Abstract LLM call. Returns raw response text (or empty string). + If session_id provided and differs from default, route to kanban handler.""" + if _IS_CHAT_BRIDGE: + if session_id and session_id != cfg["session_id"]: + # Prepends kanban-handler instructions so LLM knows how to handle + kanban_content = ( + "【看板处理协议】\n" + "收到卡片后三步判断:\n" + " A) 任务明确 → 直接执行 → 评论结果 + 更新状态\n" + " B) 信息不足 → 评论提问 + 设为 blocked\n" + " C) 不是我的活 → 评论说明 + 转派(如果能判断)\n" + "\n" + "汇报规则:\n" + " - 任务完成 → 简短 DM 给老莫摘要\n" + " - 评论/状态变更 → 不汇报\n" + " - 追问/转派 → 不汇报\n" + "\n" + "可用 API:\n" + " curl http://192.168.1.246:5803/api/kanban/t_xxx 查看卡片详情\n" + " 更新操作走 Kanban Dashboard UI\n" + "\n" + "卡片上下文不够?→ 用 session_search 查历史\n" + f"---\n{content}" + ) + return _kanban_router.route("xmpp", sender, kanban_content) or "" + return _router.route("xmpp", sender, content) or "" + else: + return _call_hermes_api(content, session_id) + + +def _call_hermes_api(content: str, session_id: str | None = None) -> str: + """POST to Hermes API, return response text or empty string.""" + target_sid = session_id or cfg["session_id"] + try: + payload = json.dumps({ + "model": "hermes-agent", + "messages": [{"role": "user", "content": content}] + }).encode() + req = urllib.request.Request(cfg["gateway"], data=payload, method="POST") + req.add_header("Content-Type", "application/json") + req.add_header("Authorization", "Bearer hermes123") + req.add_header("X-Hermes-Session-Id", target_sid) + result = _opener.open(req, timeout=600) + data = json.loads(result.read()) + reply = data.get("choices", [{}])[0].get("message", {}).get("content", "") + return reply.strip() + except Exception as e: + log(f"!!! Hermes API error: {e}") + return "" + + +# ═══════════════════════════════════════════════════════════════ +# EasyTier control (Windows) +# ═══════════════════════════════════════════════════════════════ + +_EASYTIER_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), + "tools", "easytier") +_EASYTIER_CORE = os.path.join(_EASYTIER_DIR, "easytier-core.exe") +_EASYTIER_PID_FILE = os.path.join(_EASYTIER_DIR, "easytier.pid") +_EASYTIER_NET = "--network-name mynet --network-secret ce75d0a5" +_EASYTIER_RELAY = "--peers tcp://47.115.32.206:11010" +_EASYTIER_IP = "--ipv4 10.144.144.3" + + +def _start_easytier(): + """Start EasyTier on Windows.""" + import subprocess as _sp + if not os.path.exists(_EASYTIER_CORE): + log(f"EasyTier binary not found: {_EASYTIER_CORE}") + return + # Check if already running + if os.path.exists(_EASYTIER_PID_FILE): try: - data = json_mod.loads(body) - room = data.get('to', 'coregroup@conference.yoin.fun') - text = data.get('body', '') - if text: - _send_queue.append((room, text)) - self.send_response(200) - self.end_headers() - self.wfile.write(b'{"ok":true}') - else: - self.send_response(400) - self.end_headers() - self.wfile.write(b'{"ok":false,"error":"empty body"}') - except Exception as e: - self.send_response(500) - self.end_headers() - self.wfile.write(f'{{"ok":false,"error":"{e}"}}'.encode()) + with open(_EASYTIER_PID_FILE) as f: + old_pid = int(f.read().strip()) + _sp.run(["taskkill", "/f", "/pid", str(old_pid)], capture_output=True, timeout=5) + log(f"Killed old EasyTier (PID {old_pid})") + except Exception: + pass + # Start + cmd = f'start /b "" "{_EASYTIER_CORE}" {_EASYTIER_NET} {_EASYTIER_RELAY} {_EASYTIER_IP} --disable-encryption --no-listener' + try: + _sp.run(cmd, shell=True, timeout=5) + log("EasyTier start command issued") + except Exception as e: + log(f"EasyTier start error: {e}") -def _run_http(): - server = HTTPServer(('127.0.0.1', HTTP_PORT), SendHandler) - server.timeout = 1.0 + +def _stop_easytier(): + """Stop EasyTier on Windows.""" + import subprocess as _sp + try: + _sp.run(["taskkill", "/f", "/im", "easytier-core.exe"], capture_output=True, timeout=5) + log("EasyTier stopped (taskkill)") + except Exception as e: + log(f"EasyTier stop error: {e}") + # Clean up PID file + try: + if os.path.exists(_EASYTIER_PID_FILE): + os.remove(_EASYTIER_PID_FILE) + except Exception: + pass + + +def _check_easytier() -> bool: + """Check if EasyTier is running on Windows.""" + import subprocess as _sp + try: + r = _sp.run(["tasklist", "/fi", "imagename eq easytier-core.exe"], + capture_output=True, text=True, timeout=5) + return "easytier-core.exe" in r.stdout + except Exception: + return False + + +# ═══════════════════════════════════════════════════════════════ +# Message Dedup +# ═══════════════════════════════════════════════════════════════ + +_DEDUP_CACHE: set[str] = set() +_DEDUP_LOCK = threading.Lock() + + +def _is_duplicate(msg_id: str) -> bool: + if not msg_id: + return False + with _DEDUP_LOCK: + if msg_id in _DEDUP_CACHE: + return True + _DEDUP_CACHE.add(msg_id) + if len(_DEDUP_CACHE) > 100: + _DEDUP_CACHE.clear() + return False + + +# ═══════════════════════════════════════════════════════════════ +# Coordinator Protocol (shared across all agents) +# ═══════════════════════════════════════════════════════════════ + +_COORDINATOR: str = "mohe" +_GRANTED: str | None = None +_REVOKED_UNTIL: float = 0.0 +_SHUTUP_PATTERNS = ["闭嘴", "别说话", "安静", "shut", "stfu", "别说了", "停"] + + +def _process_coordinator_signals(nickname: str, body: str) -> bool: + """Parse coordinator/GRANT/REVOKE from incoming messages. + Returns True if message was a control signal (consumed, no further processing).""" + global _COORDINATOR, _GRANTED, _REVOKED_UNTIL + # 1. hmo switches coordinator + if nickname == 'hmo' and 'coordinator=' in body.lower(): + for name in ('mohe', 'zhiwei', 'xxm'): + if f'coordinator={name}' in body.lower(): + _COORDINATOR = name + _GRANTED = None + log(f"Coordinator switched to {name} by hmo") + return True + # 2. GRANT signal (overrides REVOKE) + gm = re.search(r'\[GRANT:(\w+)\]', body) + if gm: + _GRANTED = gm.group(1) + _REVOKED_UNTIL = 0 + log(f"GRANT: {_GRANTED}") + return True + # 3. REVOKE signal (5min auto-restore) + rm = re.search(r'\[REVOKE:(\w+)\]', body) + if rm and rm.group(1) == cfg["nick"]: + _REVOKED_UNTIL = time.time() + 300 + log(f"REVOKEd: {cfg['nick']} silenced for 5min") + return True + return False + + +def _check_shutup(body: str) -> bool: + """hmo says shut up → 5min silence.""" + lower = body.lower().strip() + for pat in _SHUTUP_PATTERNS: + if pat.lower() in lower: + _REVOKED_UNTIL = time.time() + 300 + log(f"(shutup: '{pat}' → 5min silence)") + return True + return False + + +def _process_llm_grant(response: str): + """Parse GRANT signal from LLM's own response.""" + global _GRANTED + gm = re.search(r'\[GRANT:(\w+)\]', response) + if gm: + _GRANTED = gm.group(1) + _REVOKED_UNTIL = 0 + log(f"LLM GRANT: {_GRANTED}") + + +# ═══════════════════════════════════════════════════════════════ +# Response Extraction +# ═══════════════════════════════════════════════════════════════ + +_SILENCE_PATTERNS = [ + "保持沉默", "不应[该]?回复", "没有.*@.*我", "不是对[我我说]", + "跟我无关", "我不用回复", "不该回复", "不参与", + "不是我[应]?该[说回]", +] +_EXEC_RE = re.compile(r"##exec:(.+?)##", re.DOTALL) +_DELAY_RE = re.compile(r"##delay:?(\d+)?##") +_DELAY_DEFAULT = 15 +_EXEC_TIMEOUT = 60 + + +def _strip_toolcall_xml(text: str) -> str: + t = text + t = re.sub(r']*>.*?(|$)', '', t, flags=re.DOTALL) + t = re.sub(r'.*?(|$)', '', t, flags=re.DOTALL) + t = re.sub(r']*>.*?(|$)', '', t, flags=re.DOTALL) + t = re.sub(r'.*?(|$)', '', t, flags=re.DOTALL) + return t.strip() + + +def _extract_response(text: str) -> str | None: + """Strip __SILENT__, reasoning blocks, natural language silence. + Returns actual content to send, or None to stay silent.""" + if not text: + return None + t = text.strip() + if not t: + return None + t = _strip_toolcall_xml(t) + # Natural language silence detection + if not t.startswith("__SILENT__"): + first = t.split("\n", 1)[0] + for pat in _SILENCE_PATTERNS: + if re.search(pat, first): + return None + return t + # Has __SILENT__ prefix + parts = t.split("\n", 1) + if len(parts) < 2: + return None + rest = parts[1].strip() while True: - server.handle_request() + m = re.match(r'^([^)]*)\s*', rest) + if m: + rest = rest[m.end():] + continue + m = re.match(r'^\([^)]*\)\s*', rest) + if m: + rest = rest[m.end():] + continue + break + return rest.strip() or None -threading.Thread(target=_run_http, daemon=True).start() -logging.info(f"🚀 {AGENT_NAME} HTTP 桥启动于 :{HTTP_PORT}") -# ── XMPP Bot 类 ──────────────────────────────────────────────── -class AgentBot(ClientXMPP): +# ═══════════════════════════════════════════════════════════════ +# Message Batching (3s debounce + serialized processing) +# ═══════════════════════════════════════════════════════════════ + +_BATCH_WINDOW = 3.0 +_BATCH_TIMEOUT = 300 +_batch_entries: dict[str, list[str]] = {} +_batch_timers: dict[str, threading.Timer] = {} +_batch_processing: set[str] = set() +_batch_pending: dict[str, list[str]] = {} +_batch_lock = threading.Lock() +_BOT_NICK = cfg["nick"] + + +def _run_command(cmd: str) -> str: + """Execute ##exec:command## shell command.""" + log(f"(exec: {cmd[:120]})") + try: + import subprocess + r = subprocess.run(cmd, shell=True, capture_output=True, + timeout=_EXEC_TIMEOUT, text=True, encoding='utf-8', errors='replace') + out = (r.stdout or "") + (r.stderr or "") + out = out.strip() or f"(no output, exit={r.returncode})" + log(f"(exec done: {len(out)} bytes, exit={r.returncode})") + return out + except subprocess.TimeoutExpired: + log(f"(exec timeout >{_EXEC_TIMEOUT}s)") + return "(命令超时)" + except Exception as e: + log(f"(exec error: {e})") + return f"(命令执行失败: {e})" + + +def _schedule_delayed(delay_sec: int, room: str): + """Schedule ##delay:N## re-invocation.""" + global _xmpp_ref + import subprocess as _sp + + def _fire(): + bot = _xmpp_ref + if not bot: + return + try: + prompt = "时间到,请根据最新的信息汇报结果。" + raw = _call_llm(prompt, room, is_group=True) + reply = _extract_response(raw) + if reply: + text = reply.strip() + bot.send_message(mto=room, mbody=text, mtype='groupchat') + log(f"-> [Delay][{room}]: {text[:80]}") + except Exception as e: + log(f"!! delay err: {e}") + + t = threading.Timer(delay_sec, _fire) + t.daemon = True + t.start() + log(f"(delay +{delay_sec}s → {room})") + + +def _batch_done(room: str): + """Called when batch LLM finishes. Flush pending if any.""" + with _batch_lock: + _batch_processing.discard(room) + pending = _batch_pending.pop(room, None) + if pending: + _batch_entries[room] = pending + t = threading.Timer(0.1, _fire_batch, args=[room]) + t.daemon = True + t.start() + _batch_timers[room] = t + return + log(f"[Batch][{room}] (idle)") + + +def _fire_batch(room: str): + """Collect batched entries and call LLM.""" + with _batch_lock: + entries = _batch_entries.pop(room, None) + _batch_timers.pop(room, None) + if not entries: + return + _batch_processing.add(room) + combined = "\n".join(entries) + + def _handle(): + timed_out = [False] + + def _timeout(): + timed_out[0] = True + log(f"[Batch][{room}] TIMEOUT ({_BATCH_TIMEOUT}s)") + _batch_done(room) + + timer = threading.Timer(_BATCH_TIMEOUT, _timeout) + timer.daemon = True + timer.start() + try: + raw = _call_llm(combined, room, is_group=True) + if not timed_out[0]: + timer.cancel() + _process_llm_reply(raw, room) + else: + log(f"[Batch][{room}] route returned after timeout, discarded") + except Exception as e: + log(f"!!! BATCH: {e}") + if not timed_out[0]: + timer.cancel() + _batch_done(room) + + threading.Thread(target=_handle, daemon=True).start() + + +def _batch_group_message(room: str, nickname: str, body: str) -> bool: + """Add message to room batch. Returns True if batched, False if @mention (immediate).""" + if f"@{_BOT_NICK}" in body or body.startswith(_BOT_NICK): + return False + formatted = f"[{nickname}]: {body}" + with _batch_lock: + if room in _batch_processing: + _batch_pending.setdefault(room, []).append(formatted) + return True + timer = _batch_timers.pop(room, None) + if timer: + timer.cancel() + _batch_entries.setdefault(room, []).append(formatted) + t = threading.Timer(_BATCH_WINDOW, _fire_batch, args=[room]) + t.daemon = True + t.start() + _batch_timers[room] = t + return True + + +# ═══════════════════════════════════════════════════════════════ +# MAM Recovery — fetch recent history on startup +# ═══════════════════════════════════════════════════════════════ + +_MAM_RECOVERY = True +_MAM_RECOVERY_LOCK = threading.Lock() +_MAM_MARK_DONE = False +_MAM_TIMEOUT = 30 + + +def _set_mam_done(): + global _MAM_RECOVERY + with _MAM_RECOVERY_LOCK: + _MAM_RECOVERY = False + + +def _is_mam_recovery() -> bool: + if time.time() - _START_TIME > _MAM_TIMEOUT: + global _MAM_RECOVERY + with _MAM_RECOVERY_LOCK: + if _MAM_RECOVERY: + _MAM_RECOVERY = False + log("(MAM recovery timed out, force-disabled)") + return _MAM_RECOVERY + with _MAM_RECOVERY_LOCK: + return _MAM_RECOVERY + + +# ═══════════════════════════════════════════════════════════════ +# XML Escape +# ═══════════════════════════════════════════════════════════════ + +def _escape(text: str) -> str: + return (text.replace("&", "&").replace("<", "<") + .replace(">", ">").replace('"', """)) + + +# ═══════════════════════════════════════════════════════════════ +# HTTP Bridge — health, presence, messages, POST send +# ═══════════════════════════════════════════════════════════════ + +_HTTP_PORT = cfg["http_port"] +_MSG_BUF: list[dict] = [] +_MSG_BUF_LOCK = threading.Lock() +_xmpp_ref = None # set after bot creation + + +def _record_group_msg(nickname: str, body: str): + ts = time.strftime("%H:%M:%S") + with _MSG_BUF_LOCK: + _MSG_BUF.append({"ts": ts, "from": nickname, "body": body}) + if len(_MSG_BUF) > 200: + _MSG_BUF[:] = _MSG_BUF[-150:] + + +class _BridgeHandler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + parsed = urllib.parse.urlparse(self.path) + if parsed.path == "/muc": + try: + muc_info = {"rooms": {}} + bot = _xmpp_ref + if bot is not None and 'xep_0045' in bot.plugin: + muc_plugin = bot.plugin['xep_0045'] + for room_jid in cfg["muc_rooms"]: + room_data = {"jid": room_jid, "participants": []} + try: + if room_jid in muc_plugin.rooms: + room = muc_plugin.rooms[room_jid] + for nick, info in room.get('roster', {}).items(): + room_data["participants"].append({ + "nick": nick, + "jid": str(info.get('jid', '')), + "affiliation": str(info.get('affiliation', '')), + "role": str(info.get('role', '')), + }) + except Exception as e: + room_data["error"] = str(e) + muc_info["rooms"][room_jid] = room_data + self._reply(200, muc_info) + except Exception as e: + self._reply(500, {"ok": False, "error": str(e)}) + return + if parsed.path == "/health": + try: + bot = _xmpp_ref + session_ok = bot.session_started_event.is_set() if (bot and hasattr(bot, 'session_started_event')) else False + socket_ok = bot.is_connected() if (bot and hasattr(bot, 'is_connected')) else False + self._reply(200, { + "ok": True, "xmpp_connected": session_ok or socket_ok, + "agent": _agent_name, "jid": cfg["jid"], + "uptime_sec": int(time.time() - _START_TIME), + "muc_rooms": cfg["muc_rooms"], + }) + except Exception as e: + self._reply(500, {"ok": False, "error": str(e)}) + return + if parsed.path.startswith("/presence"): + jid_to_check = parsed.path[len("/presence/"):].strip() + if not jid_to_check: + self._reply(400, {"ok": False, "error": "missing JID"}) + return + try: + info = {"jid": jid_to_check, "online": False, "resources": []} + bot = _xmpp_ref + if bot and hasattr(bot, 'client_roster'): + roster = bot.client_roster + if jid_to_check in roster: + resources = list(roster[jid_to_check].resources.keys()) + info["online"] = len(resources) > 0 + info["resources"] = resources + self._reply(200, info) + except Exception as e: + self._reply(500, {"ok": False, "error": str(e)}) + return + if parsed.path == "/messages": + try: + qs = urllib.parse.parse_qs(parsed.query) + sender = qs.get("from", [None])[0] + with _MSG_BUF_LOCK: + msgs = list(_MSG_BUF) + if sender: + msgs = [m for m in msgs if m["from"] == sender] + self._reply(200, {"ok": True, "count": len(msgs), "messages": msgs[-50:]}) + except Exception as e: + self._reply(500, {"ok": False, "error": str(e)}) + return + self._reply(404, {"ok": False, "error": "not found"}) + + def do_POST(self): + try: + length = int(self.headers.get('Content-Length', 0)) + body = json.loads(self.rfile.read(length)) + path = urllib.parse.urlparse(self.path).path.rstrip('/') + + # /easytier endpoint — execute EasyTier action locally (no XMPP DM) + if path == "/easytier": + action = body.get("action", "") + if action == "start": + _start_easytier() + self._reply(200, {"ok": True, "message": "EasyTier started"}) + elif action == "stop": + _stop_easytier() + self._reply(200, {"ok": True, "message": "EasyTier stopped"}) + elif action == "status": + running = _check_easytier() + self._reply(200, {"ok": True, "running": running}) + else: + self._reply(400, {"ok": False, "error": "action must be start|stop|status"}) + return + + to = body.get('to', cfg["muc_rooms"][0]) + msg = body.get('message', '') or body.get('body', '') + msg_type = body.get('type', 'groupchat') + if not msg: + self._reply(400, {"ok": False, "error": "empty message"}) + return + safe = _escape(msg.strip()) + bot = _xmpp_ref + if bot: + bot.send_message(mto=to, mbody=msg.strip(), mtype=msg_type) + _record_group_msg(cfg["nick"], msg) + log(f"[http] → [{to.split('@')[0]}]: {msg[:80]} (type={msg_type})") + self._reply(200, {"ok": True}) + except Exception as e: + self._reply(500, {"ok": False, "error": str(e)}) + + def _reply(self, code, data): + body = json.dumps(data, ensure_ascii=False).encode('utf-8') + self.send_response(code) + self.send_header('Content-Type', 'application/json; charset=utf-8') + self.send_header('Content-Length', len(body)) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + pass + + +def _start_http_bridge(): + _httpd = http.server.HTTPServer(('0.0.0.0', _HTTP_PORT), _BridgeHandler) + _t = threading.Thread(target=_httpd.serve_forever, daemon=True) + _t.start() + log(f"HTTP bridge ready on :{_HTTP_PORT}") + + +# ═══════════════════════════════════════════════════════════════ +# Reply Processing (shared for all LLM responses) +# ═══════════════════════════════════════════════════════════════ + +def _process_llm_reply(raw_reply: str, room: str): + """Process LLM response: check silence/delay/exec/send.""" + global _xmpp_ref + if not raw_reply: + _batch_done(room) + return + # Parse GRANT signal from LLM response + _process_llm_grant(raw_reply) + # ##delay:N## → schedule later + delay_m = _DELAY_RE.search(raw_reply) + if delay_m: + sec = int(delay_m.group(1)) if delay_m.group(1) else _DELAY_DEFAULT + _schedule_delayed(sec, room) + _batch_done(room) + return + # ##exec:command## → run command, use output as reply + exec_m = _EXEC_RE.search(raw_reply) + if exec_m: + output = _run_command(exec_m.group(1)) + raw_reply = _EXEC_RE.sub(output, raw_reply, count=1) + # Extract actual response + reply_text = _extract_response(raw_reply) + if reply_text: + text = reply_text.strip() + bot = _xmpp_ref + if bot: + bot.send_message(mto=room, mbody=text, mtype='groupchat') + log(f"-> [{room.split('@')[0]}]: {text[:80]}") + else: + log(f"-> [{room.split('@')[0]}]: (silent)") + _batch_done(room) + + +# ═══════════════════════════════════════════════════════════════ +# Group message handler +# ═══════════════════════════════════════════════════════════════ + +def _handle_group_message(msg): + """Process a groupchat message (runs in thread).""" + global _COORDINATOR, _GRANTED, _REVOKED_UNTIL + if _is_mam_recovery(): + return + msg_id = msg.get("id", "") + if _is_duplicate(msg_id): + return + body = str(msg["body"]).strip() + if not body: + return + full_from = str(msg["from"]) + room = full_from.split("/")[0] + nickname = full_from.split("/")[1] if "/" in full_from else "" + # Self-message skip + if nickname == cfg["nick"]: + log(f"(self) {body[:80]}") + return + _record_group_msg(nickname, body) + # Coordinator signals + if _process_coordinator_signals(nickname, body): + return + # Revoke check + is_revoked = time.time() < _REVOKED_UNTIL + if is_revoked and _GRANTED == cfg["nick"]: + _GRANTED = None + is_revoked = False + log(f"GRANT overrides REVOKE for {cfg['nick']}") + if _check_shutup(body): + return + if is_revoked: + body = f"【只读消息】你被收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {room}] {nickname} 说: {body}" + # Batch or immediate (@mention) + if _batch_group_message(room, nickname, body): + log(f"[{room.split('@')[0]}] {nickname}: {body[:80]} (batched)") + return + log(f"[{room.split('@')[0]}] {nickname}: {body[:80]}") + raw = _call_llm(body, full_from, is_group=True) + _process_llm_reply(raw, room) + + +# ═══════════════════════════════════════════════════════════════ +# Private message handler +# ═══════════════════════════════════════════════════════════════ + +def _handle_private_message(msg): + """Process a private chat message.""" + global _CALL_SEQ + if msg["type"] == "groupchat": + return + msg_id = msg.get("id", "") + if _is_duplicate(msg_id): + return + body = str(msg["body"]).strip() + sender = str(msg["from"]).split("/")[0] + log(f"<{sender}> {body[:80]}") + if sender == cfg["jid"]: + log("(skipped self)") + return + if time.time() < _REVOKED_UNTIL: + log(f"(silenced) <{sender}> dropped") + return + if _check_shutup(body): + return + # ── Kanban routing ── + _CALL_SEQ += 1 + is_kanban = body.startswith('[Kanban]') + target_sid = _KANBAN_SESSION_ID if is_kanban else cfg["session_id"] + if is_kanban: + log(f"📋 看板通知(#{_CALL_SEQ}): {body[:80]}") + # ── EasyTier toggle ── + if body.startswith('[EasyTier]'): + action = body.replace('[EasyTier]', '').strip().lower() + log(f"🔌 EasyTier command: {action}") + if action == 'start': + _start_easytier() + reply_text = "[EasyTier] started on Windows" + elif action == 'stop': + _stop_easytier() + reply_text = "[EasyTier] stopped on Windows" + else: + reply_text = f"[EasyTier] unknown action: {action}" + bot = _xmpp_ref + if bot: + bot.send_message(mto=sender, mbody=reply_text, mtype='chat') + log(f"-> {sender}: {reply_text}") + return + raw = _call_llm(body, sender, is_group=False, session_id=target_sid) + if raw: + reply = _extract_response(raw) + if reply: + text = reply.strip() + bot = _xmpp_ref + if bot: + bot.send_message(mto=sender, mbody=text, mtype='chat') + log(f"-> {sender}: {text[:80]}") + + +# ═══════════════════════════════════════════════════════════════ +# AgentBot Class +# ═══════════════════════════════════════════════════════════════ + +import slixmpp + + +class AgentBot(slixmpp.ClientXMPP): def __init__(self): - super().__init__(AGENT_JID, cfg["password"]) - self.add_event_handler('session_bind', self.on_bind) - self.add_event_handler('message', self.on_msg) - self.add_event_handler('disconnected', self.on_disconnect) - self.add_event_handler('connected', self.on_connected) + super().__init__(cfg["jid"], cfg["password"]) + # Connection settings + self.enable_direct_tls = False + self.enable_starttls = True + self.auto_reconnect = True + self.reconnect_max_delay = 10 + self.whitespace_keepalive = True + self.whitespace_keepalive_interval = 30 + # SSL: accept self-signed certs ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE self.ssl_context = ctx - self.ready = asyncio.Event() - self._call_seq = 0 - self._muc_joined = False - self._recent_sent = [] - self._coordinator = 'mohe' # 默认协调者 - self._granted = None + # Event handlers + self.add_event_handler("session_start", self._on_session_start) + self.add_event_handler("message", self._on_any_message) + self.add_event_handler("groupchat_message", self._on_group_msg) + self.add_event_handler("disconnected", self._on_disconnected) + self.add_event_handler("connected", self._on_connected) + self.add_event_handler("session_end", self._on_session_end) + self.add_event_handler("connection_failed", self._on_conn_failed) + # MUC plugin + self.register_plugin('xep_0045') - async def on_connected(self, event): - logging.info(f"🔗 {AGENT_NAME} TCP连接已建立") + def _on_connected(self, event): + log("connection established") - async def on_bind(self, event): + def _on_session_start(self, event): self.send_presence() self.get_roster() + log(f"{cfg['jid']} online") + # Register MAM plugin lazily try: - self.plugin['xep_0045'].join_muc('coregroup@conference.yoin.fun', AGENT_NICK) - logging.info(f"✅ {AGENT_NAME} 加入群聊 coregroup") - except Exception as e: - logging.error(f"❌ {AGENT_NAME} 加入群聊失败: {e}") - self._muc_joined = True - self.ready.set() - logging.info(f"✅ {AGENT_NAME} XMPP 上线") - - async def on_disconnect(self, event): - self.ready.clear() - self._muc_joined = False - logging.warning(f"⚠️ {AGENT_NAME} XMPP 断线") - - async def on_msg(self, msg): - body = msg['body'] - sender = str(msg['from']) - msg_type = msg['type'] - if not body: - return - if msg_type == 'groupchat': - if AGENT_JID in sender: - return - nickname = sender.split('/')[-1] if '/' in sender else '' - # 自己的消息跳过(通过昵称) - if nickname == AGENT_NICK: - return - - # Coordinator 模式 — 全走 XMPP 消息 - - # 1. hmo 切换 coordinator(lead=xxx) - if nickname == 'hmo': - if 'lead=' in body.lower(): - for _name in ['mohe', 'zhiwei', 'xxm']: - if f'lead={_name}' in body.lower(): - self._coordinator = _name - self._granted = None - logging.info(f"👑 Coordinator 切换为 {_name}") - break - # hmo 直接 @点名 → 临时授权(一次) - elif any(tag in body for tag in [f'@{AGENT_NICK}', f'@{AGENT_NAME}']): - self._granted = AGENT_NICK - logging.info(f"🎤 被 hmo 点名,获得发言权") - - # 2. 检测授权信号(优先于收回,GRANT 可以覆盖 REVOKE) - _grant_match = re.search(r'\[GRANT:(\w+)\]', body) - if _grant_match: - self._granted = _grant_match.group(1) - self._revoked_until = 0 # 被授权时解除收回 - logging.info(f"🎤 收到授权:{self._granted}") - - # 3. 检测收回信号 - _revoke_match = re.search(r'\[REVOKE:(\w+)\]', body) - _revoked_until = getattr(self, '_revoked_until', 0) - if _revoke_match and _revoke_match.group(1) == AGENT_NICK: - self._revoked_until = time.time() + 300 # 5分钟自动解除 - logging.info(f"🔇 {AGENT_NAME} 发言权被收回(5分钟后自动恢复)") - if time.time() < _revoked_until: - # 被收回者:只读模式,能看到但不能回复 - _rr = sender.split('/')[0] - _readonly_body = f"【只读消息】你目前被暂时收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {_rr}] {nickname} 说: {body}" - await self.call_hermes(_readonly_body, sender, is_group=True) - return - - # 3. 判断角色 - _coordinator = getattr(self, '_coordinator', 'mohe') - _granted = getattr(self, '_granted', None) - _is_coordinator = (_coordinator == AGENT_NICK) - _is_granted = (_granted == AGENT_NICK) - - if _is_granted: - self._granted = None # 用完即收回 - - room = sender.split('/')[0] - - # 4. 判断权限:协调者/被授权者可发言,其他人只读 - if not _is_coordinator and not _is_granted: - _rr = sender.split('/')[0] - _ro_body = f"【只读消息】你目前不是协调者,只需了解内容。\n\n[核心群 {_rr}] {nickname} 说: {body}" + self.register_plugin('xep_0313') + except Exception: + log("(MAM: xep_0313 not available)") + # Join MUC rooms + async def _join_all(): + for room_jid in cfg["muc_rooms"]: try: - _ro_payload = json.dumps({ - "model": "hermes-agent", - "messages": [{"role": "user", "content": _ro_body}] - }).encode() - _ro_req = urllib.request.Request(GATEWAY, data=_ro_payload, method="POST") - _ro_req.add_header("Content-Type", "application/json") - _ro_req.add_header("Authorization", f"Bearer {API_KEY}") - _ro_req.add_header("X-Hermes-Session-Id", SESSION_ID) - _ro_loop = asyncio.get_event_loop() - await _ro_loop.run_in_executor(None, lambda: _opener.open(_ro_req, timeout=30)) - except Exception: - pass - return # 不发送任何回复 + self.plugin['xep_0045'].join_muc(room_jid, cfg["nick"]) + presence = ( + f"" + f"" + f"" + f"" + ) + self.send_raw(presence) + log(f"Joined {room_jid}") + except Exception as e: + log(f"MUC join failed {room_jid}: {e}") + await asyncio.sleep(2) + await asyncio.sleep(3) + await self._fetch_mam_history() + asyncio.ensure_future(_join_all()) - # 5. 被 REVOKE 的人:只读(虽然上面已经覆盖了,作为额外保障) - if time.time() < getattr(self, '_revoked_until', 0): - _rr2 = sender.split('/')[0] - _rv_body = f"【只读消息】你目前被收回发言权。只需了解内容。\n\n[核心群 {_rr2}] {nickname} 说: {body}" - try: - _rv_payload = json.dumps({ - "model": "hermes-agent", - "messages": [{"role": "user", "content": _rv_body}] - }).encode() - _rv_req = urllib.request.Request(GATEWAY, data=_rv_payload, method="POST") - _rv_req.add_header("Content-Type", "application/json") - _rv_req.add_header("Authorization", f"Bearer {API_KEY}") - _rv_req.add_header("X-Hermes-Session-Id", SESSION_ID) - _rv_loop = asyncio.get_event_loop() - await _rv_loop.run_in_executor(None, lambda: _opener.open(_rv_req, timeout=30)) - except Exception: - pass - return - - # 硬闭嘴闸门:hmo 说闭嘴类的话 → 静默 5 分钟 - _silent_until = getattr(self, '_silent_until', 0) - if time.time() < _silent_until: - return - if nickname == 'hmo': - _sk = ['闭嘴', '别说话', '安静', 'shut', 'stfu', '别说了', '停'] - if any(kw in body.lower() for kw in _sk): - self._silent_until = time.time() + 300 - logging.info(f"🔇 {AGENT_NAME} 收到闭嘴指令,静默 5 分钟") - return - - logging.info(f"📩 群消息 [{sender}]: {body[:100]}") - room = sender.split('/')[0] - ctx_body = ( - "【规则】以下是一条群聊消息。判断是否应该回复。\n" - "只有以下3种情况你才回复:\n" - f"1. hmo直接点名问你({AGENT_MENTION})\n" - "2. 你有其他人没说过的独家信息\n" - "3. 别人说错了关键事实,不纠正会有后果\n" - "如果以上都不符合,你的回复必须只包含 __SILENT__ 这10个字符," - "不要有任何其他内容(不要前缀、不要解释、不要标点、不要空格)。\n\n" - "注意:你是协调者(lead)。你的第一职责是管理讨论节奏,不是自己说话。\n" - "- 别人能回答的问题,不抢答。\n" - "- 如果其他 Agent 更合适,用 [GRANT:agent名] 授权他们发言(例如 [GRANT:zhiwei])。\n" - "- hmo 直接 @点名某人 也会自动授权。\n" - "- 如果有人跑题/刷屏,用 [REVOKE:agent名] 收回发言权(例如 [REVOKE:zhiwei])。\n" - "- [GRANT] 可以覆盖 [REVOKE]。标记会显示在消息中。\n\n" - f"[核心群 {room}] {nickname} 说: {body}" - ) - await self.call_hermes(ctx_body, room, is_group=True) + async def _fetch_mam_history(self): + """Query MAM for recent MUC messages to rebuild context.""" + if 'xep_0313' not in self.plugin: + log("(MAM: no plugin)") + _set_mam_done() return - if msg_type == 'chat' and 'hmo@yoin.fun' in sender: - self._call_seq += 1 - if body.startswith('[Kanban]'): - # 看板通知 → 走独立 session,不污染主会话 - target_sid = KANBAN_SESSION_ID - logging.info(f"📋 看板通知(#{self._call_seq}): {body[:80]}") - else: - target_sid = SESSION_ID - logging.info(f"📩 老爸(#{self._call_seq}): {body[:80]}") - await self.call_hermes(body, sender, seq=self._call_seq, session_id=target_sid) - - async def call_hermes(self, content, sender, is_group=False, seq=None, session_id=None): - msg_type = 'groupchat' if is_group else 'chat' - sid = session_id or SESSION_ID + # MAM recovery used in _on_session_start try: - payload = json.dumps({ - "model": "hermes-agent", - "messages": [{"role": "user", "content": content}] - }).encode() - req = urllib.request.Request(GATEWAY, data=payload, method="POST") - req.add_header("Content-Type", "application/json") - req.add_header("Authorization", f"Bearer {API_KEY}") - req.add_header("X-Hermes-Session-Id", sid) - - loop = asyncio.get_event_loop() - result = await loop.run_in_executor(None, lambda: _opener.open(req, timeout=600)) - - if seq is not None and seq < self._call_seq: - return - - data = json.loads(result.read()) - reply = data.get("choices", [{}])[0].get("message", {}).get("content", "") - reply_stripped = reply.strip() - # 检查 __SILENT__ 标记 - if reply_stripped.startswith('__SILENT__') or reply_stripped.startswith('`__SILENT__`'): - logging.info(f"⏭️ {AGENT_NAME} 决定沉默,不发送") - return - # 如果回复里任意位置出现了 __SILENT__,说明 LLM 没理解协议,整条作废 - if '__SILENT__' in reply: - logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __SILENT__,拦截") - return - # 如果回复里出现了 __REPLY__,也是协议混淆,拦截 - if '__REPLY__' in reply: - logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __REPLY__,拦截") - return - # 额外拦截:LLM 说"我沉默""我不说了"等宣布沉默的话→当 SILENT 处理 - _silent_phrases = ['我沉默', '我不说', '不说了', '不回复', '不插嘴', '我闭嘴', - '闭嘴上', '沉默是', '彻底沉默', '我会沉默', '将保持沉默'] - if any(p in reply for p in _silent_phrases): - logging.info(f"⏭️ {AGENT_NAME} 宣布沉默(命中关键词),拦截") - return - finish = data.get("choices", [{}])[0].get("finish_reason", "") - - if reply.strip() and finish != "silent": - # Coordinator 授权机制:检测回复中的 [GRANT:xxx] 标记 - _grant_match = re.search(r'\[GRANT:(\w+)\]', reply) - if _grant_match: - _grant_name = _grant_match.group(1) - self._granted = _grant_name - logging.info(f"🎤 授权 {_grant_name} 发言(通过 XMPP 发送)") - # 不剥离标记,让其他 bot 从 XMPP 消息中解析 - - if msg_type == 'groupchat': - self.send_message(mto=sender, mbody=reply, mtype='groupchat') - # 防回声:记录已发送的消息正文 - sent_norm = reply.strip()[:100] - self._recent_sent.append(sent_norm) - if len(self._recent_sent) > 10: - self._recent_sent.pop(0) - else: - import subprocess as sp - from xml.sax.saxutils import escape - safe = escape(reply) - sp.run([ - "docker", "exec", "ejabberd", "ejabberdctl", "send_stanza", - AGENT_JID, str(sender), - f"{safe}" - ], capture_output=True, timeout=10) - logging.info(f"✅ {AGENT_NAME} 回复: {reply[:80]}") - except Exception as e: - logging.error(f"❌ {AGENT_NAME} 错误: {e}") - -# ── 主入口 ─────────────────────────────────────────────── -async def main(): - retry_delay = 1 - max_delay = 60 - while True: - try: - bot = AgentBot() - bot.register_plugin('xep_0030') - bot.register_plugin('xep_0045') - bot.register_plugin('xep_0199') - - bot.connect(host='127.0.0.1', port=5222) - await asyncio.wait_for(bot.ready.wait(), timeout=30) - logging.info(f"{AGENT_NAME} XMPP 就绪") - retry_delay = 1 - - async def _drain_queue(): - while True: - await asyncio.sleep(1) - while _send_queue: - room, text = _send_queue.pop(0) + for room_jid in cfg["muc_rooms"]: + log(f"(MAM: querying {room_jid} for last 50 messages...)") + results = await self.plugin['xep_0313'].retrieve( + jid=room_jid, rsm={'max': 50}, + ) + count = 0 + for msg in results['mam']['results']: + forwarded = msg['mam_result']['forwarded'] + body = str(forwarded['stanza']['body'] or '').strip() + if not body: + continue + nick = str(forwarded['stanza']['from']).split('/')[-1] if '/' in str(forwarded['stanza']['from']) else '?' + # Feed into context for chat_bridge (xxm) + if _IS_CHAT_BRIDGE: + role = 'user' if nick != cfg["nick"] else 'assistant' try: - bot.send_message(mto=room, mbody=text, mtype='groupchat') - sent_norm = text.strip()[:100] - bot._recent_sent.append(sent_norm) - if len(bot._recent_sent) > 10: - bot._recent_sent.pop(0) - logging.info(f"📤 主动发送到 {room}: {text[:60]}") - except Exception as e: - logging.error(f"❌ 主动发送失败: {e}") - asyncio.create_task(_drain_queue()) - - while True: - await asyncio.sleep(15) - if not bot.is_connected(): - logging.warning("检测到断线,准备重连...") - break - - except asyncio.TimeoutError: - logging.warning("连接超时,准备重连...") + _bridge._append_to_log(role, f"[{nick}]: {body[:300]}") + except Exception: + pass + count += 1 + log(f"(MAM: loaded {count} msgs from {room_jid})") + _set_mam_done() + log("(MAM recovery complete)") except Exception as e: - logging.error(f"❌ 主循环错误: {e}") + log(f"(MAM error: {e})") + _set_mam_done() - logging.info(f"⏳ 等待 {retry_delay} 秒后重连...") - await asyncio.sleep(retry_delay) - retry_delay = min(retry_delay * 2, max_delay) + def _on_group_msg(self, msg): + threading.Thread(target=_handle_group_message, args=[msg], daemon=True).start() + + def _on_any_message(self, msg): + threading.Thread(target=_handle_private_message, args=[msg], daemon=True).start() + + def _on_session_end(self, event): + log(f"session ended") + + def _on_conn_failed(self, event): + log(f"connection failed: {event}") + + def _on_disconnected(self, event): + log(f"disconnected, reconnecting...") + + +# ═══════════════════════════════════════════════════════════════ +# Main +# ═══════════════════════════════════════════════════════════════ + +def main(): + log(f"Starting {cfg['jid']} ({cfg['name_cn']}) — agent={_agent_name}") + if _IS_CHAT_BRIDGE: + log(f" LLM: chat_bridge (session={cfg['session_id']})") + else: + log(f" LLM: Hermes API ({cfg['gateway']})") + log(f" Server: {cfg['server']}:{cfg['port']}") + log(f" Rooms: {cfg['muc_rooms']}") + + bot = AgentBot() + global _xmpp_ref + _xmpp_ref = bot + + _start_http_bridge() + + bot.connect(host=cfg["server"], port=cfg["port"]) + log(f"Connecting {cfg['jid']}@{cfg['server']}:{cfg['port']}") + + loop = asyncio.get_event_loop() + + async def _status_check(): + while True: + await asyncio.sleep(60) + log("(alive)") + + asyncio.ensure_future(_status_check()) -if __name__ == '__main__': try: - asyncio.run(main()) + loop.run_forever() except KeyboardInterrupt: - pass + log("Shutdown by user") + except Exception as e: + log(f"!!! MAIN LOOP CRASH: {e}") + import traceback + log(f"!!! {traceback.format_exc()[:500]}") + raise + + +if __name__ == "__main__": + main()