diff --git a/xmpp_agent_core.py b/xmpp_agent_core.py
index 1b5d410..d94c54b 100644
--- a/xmpp_agent_core.py
+++ b/xmpp_agent_core.py
@@ -1,9 +1,35 @@
#!/usr/bin/env python3
-"""XMPP Bot - 统一版,支持 --agent mohe|zhiwei|xiao 参数"""
-import asyncio, logging, ssl, json, urllib.request, os, time, sys, re
-from slixmpp import ClientXMPP
+"""
+XMPP Agent Core — 统一版
+=========================
+Single bot core for all agents. Supports --agent xxm|mohe|zhiwei|xiaoguo.
+
+Usage:
+ python xmpp_agent_core.py --agent xxm # xxm, uses chat_bridge
+ python xmpp_agent_core.py --agent mohe # mohe, uses Hermes API
+ python xmpp_agent_core.py --agent zhiwei # zhiwei, uses Hermes API
+ python xmpp_agent_core.py --agent xiaoguo # xiaoguo, uses Hermes API
+
+Shares: PID lock, reconnect, MUC join, dedup, batching,
+ coordinator protocol (GRANT/REVOKE), HTTP bridge.
+Differs only in LLM calling method (chat_bridge vs Hermes API).
+"""
+import os, sys, time, threading, asyncio, logging, json, re, ssl
+import urllib.request, http.server, urllib.parse
+
+# ── Windows selector loop (slixmpp needs it on Windows) ──
+if sys.platform == "win32":
+ asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
+
+# ── PATH: allow imports from gateway/scripts/ (proc_guard, chat_bridge) ──
+_GATEWAY_SCRIPTS = os.path.join(os.path.dirname(os.path.abspath(__file__)),
+ "gateway", "scripts")
+sys.path.insert(0, _GATEWAY_SCRIPTS)
+
+# ═══════════════════════════════════════════════════════════════
+# AGENTS Configuration
+# ═══════════════════════════════════════════════════════════════
-# ── Agent 配置 ──────────────────────────────────────────────
AGENTS = {
"mohe": {
"jid": "mohe@yoin.fun",
@@ -13,7 +39,9 @@ AGENTS = {
"http_port": 5804,
"gateway": "http://localhost:8642/v1/chat/completions",
"session_id": "xmpp-mohe-v2",
- "kanban_session_id": "xmpp-mohe-kanban",
+ "server": "127.0.0.1",
+ "port": 5222,
+ "muc_rooms": ["coregroup@conference.yoin.fun"],
"mention": "@mohe/@莫荷",
},
"zhiwei": {
@@ -23,8 +51,10 @@ AGENTS = {
"name_cn": "知微",
"http_port": 5805,
"gateway": "http://localhost:8643/v1/chat/completions",
- "session_id": "xmpp-zhiwei-v2",
- "kanban_session_id": "xmpp-zhiwei-kanban",
+ "session_id": "xmpp-zhiwei",
+ "server": "127.0.0.1",
+ "port": 5222,
+ "muc_rooms": ["coregroup@conference.yoin.fun"],
"mention": "@zhiwei/@知微",
},
"xiaoguo": {
@@ -36,362 +66,959 @@ AGENTS = {
"gateway": "http://localhost:8645/v1/chat/completions",
"session_id": "xmpp-xiaoguo",
"kanban_session_id": "xmpp-xiaoguo-kanban",
+ "server": "127.0.0.1",
+ "port": 5222,
+ "muc_rooms": ["coregroup@conference.yoin.fun"],
"mention": "@xiaoguo/@小果",
},
+ "xxm": {
+ "jid": "xxm@yoin.fun",
+ "password": "hermes123",
+ "nick": "xxm",
+ "name_cn": "笑笑",
+ "http_port": 5802,
+ "bridge": "chat_bridge", # use local chat_bridge instead of Hermes API
+ "session_id": "ses_xxm_xmpp",
+ "kanban_session_id": "xmpp-xxm-kanban",
+ "server": "192.168.1.246", # LAN direct connect
+ "port": 5222,
+ "muc_rooms": [
+ "coregroup@conference.yoin.fun",
+ "jujidina@conference.yoin.fun",
+ ],
+ "mention": "@xxm/@笑笑",
+ },
}
-agent = sys.argv[sys.argv.index("--agent") + 1] if "--agent" in sys.argv else "mohe"
-cfg = AGENTS.get(agent, AGENTS["mohe"])
+# ── Agent selection ──
+_agent_name = "mohe"
+if "--agent" in sys.argv:
+ idx = sys.argv.index("--agent")
+ if idx + 1 < len(sys.argv):
+ _agent_name = sys.argv[idx + 1]
+cfg = AGENTS.get(_agent_name, AGENTS["mohe"])
+
+# ═══════════════════════════════════════════════════════════════
+# PID Lock — prevent duplicate instances
+# ═══════════════════════════════════════════════════════════════
+
+from proc_guard import guard as _proc_guard
+_lock = _proc_guard(f"xmpp_bot_{_agent_name}")
+if not _lock.ok:
+ print(_lock.message, flush=True)
+ sys.exit(1)
+
+# ═══════════════════════════════════════════════════════════════
+# Logging
+# ═══════════════════════════════════════════════════════════════
+
+_LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "gateway", "logs")
+os.makedirs(_LOG_DIR, exist_ok=True)
+_LOG_FILE = os.path.join(_LOG_DIR, f"xmpp_{_agent_name}.log")
+_START_TIME = time.time()
+
+
+def log(m: str):
+ with open(_LOG_FILE, "a", encoding="utf-8") as f:
+ f.write(f"{time.strftime('%H:%M:%S')} {m}\n")
+
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
-GATEWAY = cfg["gateway"]
-API_KEY = "hermes123"
-AGENT_NICK = cfg["nick"]
-AGENT_NAME = cfg["name_cn"]
-AGENT_JID = cfg["jid"]
-AGENT_MENTION = cfg["mention"]
-SESSION_ID = cfg["session_id"]
-KANBAN_SESSION_ID = cfg.get("kanban_session_id", SESSION_ID)
-HTTP_PORT = cfg["http_port"]
-_opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
-# ── HTTP 桥(接收本地脚本的主动发送请求) ──
-from http.server import HTTPServer, BaseHTTPRequestHandler
-import threading, json as json_mod
+# ═══════════════════════════════════════════════════════════════
+# LLM Bridge Init — abstracted per agent
+# ═══════════════════════════════════════════════════════════════
-_send_queue = []
+_IS_CHAT_BRIDGE = cfg.get("bridge") == "chat_bridge"
+_router = None # set only for chat_bridge (xxm)
-class SendHandler(BaseHTTPRequestHandler):
- def do_POST(self):
- length = int(self.headers.get('Content-Length', 0))
- body = self.rfile.read(length)
+# ── Kanban session support ──
+_CALL_SEQ = 0
+_KANBAN_SESSION_ID = cfg.get("kanban_session_id", None)
+
+if _IS_CHAT_BRIDGE:
+ from chat_bridge import SessionBridge
+ from session_router import SessionRouter
+ _bridge = SessionBridge(session_id=cfg["session_id"])
+ _router = SessionRouter(bridge=_bridge, default_session=cfg["session_id"])
+ # Kanban-dedicated bridge + router for separate session
+ _kanban_sid = _KANBAN_SESSION_ID or f"{cfg['session_id']}-kanban"
+ _kanban_bridge = SessionBridge(session_id=_kanban_sid)
+ _kanban_router = SessionRouter(bridge=_kanban_bridge, default_session=_kanban_sid)
+ log(f"LLM: chat_bridge (session={cfg['session_id']})")
+ log(f"Kanban: chat_bridge (session={_kanban_sid})")
+else:
+ _opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
+ log(f"LLM: Hermes API ({cfg['gateway']})")
+
+
+def _call_llm(content: str, sender: str, is_group: bool = False,
+ session_id: str | None = None) -> str:
+ """Abstract LLM call. Returns raw response text (or empty string).
+ If session_id provided and differs from default, route to kanban handler."""
+ if _IS_CHAT_BRIDGE:
+ if session_id and session_id != cfg["session_id"]:
+ # Prepends kanban-handler instructions so LLM knows how to handle
+ kanban_content = (
+ "【看板处理协议】\n"
+ "收到卡片后三步判断:\n"
+ " A) 任务明确 → 直接执行 → 评论结果 + 更新状态\n"
+ " B) 信息不足 → 评论提问 + 设为 blocked\n"
+ " C) 不是我的活 → 评论说明 + 转派(如果能判断)\n"
+ "\n"
+ "汇报规则:\n"
+ " - 任务完成 → 简短 DM 给老莫摘要\n"
+ " - 评论/状态变更 → 不汇报\n"
+ " - 追问/转派 → 不汇报\n"
+ "\n"
+ "可用 API:\n"
+ " curl http://192.168.1.246:5803/api/kanban/t_xxx 查看卡片详情\n"
+ " 更新操作走 Kanban Dashboard UI\n"
+ "\n"
+ "卡片上下文不够?→ 用 session_search 查历史\n"
+ f"---\n{content}"
+ )
+ return _kanban_router.route("xmpp", sender, kanban_content) or ""
+ return _router.route("xmpp", sender, content) or ""
+ else:
+ return _call_hermes_api(content, session_id)
+
+
+def _call_hermes_api(content: str, session_id: str | None = None) -> str:
+ """POST to Hermes API, return response text or empty string."""
+ target_sid = session_id or cfg["session_id"]
+ try:
+ payload = json.dumps({
+ "model": "hermes-agent",
+ "messages": [{"role": "user", "content": content}]
+ }).encode()
+ req = urllib.request.Request(cfg["gateway"], data=payload, method="POST")
+ req.add_header("Content-Type", "application/json")
+ req.add_header("Authorization", "Bearer hermes123")
+ req.add_header("X-Hermes-Session-Id", target_sid)
+ result = _opener.open(req, timeout=600)
+ data = json.loads(result.read())
+ reply = data.get("choices", [{}])[0].get("message", {}).get("content", "")
+ return reply.strip()
+ except Exception as e:
+ log(f"!!! Hermes API error: {e}")
+ return ""
+
+
+# ═══════════════════════════════════════════════════════════════
+# EasyTier control (Windows)
+# ═══════════════════════════════════════════════════════════════
+
+_EASYTIER_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
+ "tools", "easytier")
+_EASYTIER_CORE = os.path.join(_EASYTIER_DIR, "easytier-core.exe")
+_EASYTIER_PID_FILE = os.path.join(_EASYTIER_DIR, "easytier.pid")
+_EASYTIER_NET = "--network-name mynet --network-secret ce75d0a5"
+_EASYTIER_RELAY = "--peers tcp://47.115.32.206:11010"
+_EASYTIER_IP = "--ipv4 10.144.144.3"
+
+
+def _start_easytier():
+ """Start EasyTier on Windows."""
+ import subprocess as _sp
+ if not os.path.exists(_EASYTIER_CORE):
+ log(f"EasyTier binary not found: {_EASYTIER_CORE}")
+ return
+ # Check if already running
+ if os.path.exists(_EASYTIER_PID_FILE):
try:
- data = json_mod.loads(body)
- room = data.get('to', 'coregroup@conference.yoin.fun')
- text = data.get('body', '')
- if text:
- _send_queue.append((room, text))
- self.send_response(200)
- self.end_headers()
- self.wfile.write(b'{"ok":true}')
- else:
- self.send_response(400)
- self.end_headers()
- self.wfile.write(b'{"ok":false,"error":"empty body"}')
- except Exception as e:
- self.send_response(500)
- self.end_headers()
- self.wfile.write(f'{{"ok":false,"error":"{e}"}}'.encode())
+ with open(_EASYTIER_PID_FILE) as f:
+ old_pid = int(f.read().strip())
+ _sp.run(["taskkill", "/f", "/pid", str(old_pid)], capture_output=True, timeout=5)
+ log(f"Killed old EasyTier (PID {old_pid})")
+ except Exception:
+ pass
+ # Start
+ cmd = f'start /b "" "{_EASYTIER_CORE}" {_EASYTIER_NET} {_EASYTIER_RELAY} {_EASYTIER_IP} --disable-encryption --no-listener'
+ try:
+ _sp.run(cmd, shell=True, timeout=5)
+ log("EasyTier start command issued")
+ except Exception as e:
+ log(f"EasyTier start error: {e}")
-def _run_http():
- server = HTTPServer(('127.0.0.1', HTTP_PORT), SendHandler)
- server.timeout = 1.0
+
+def _stop_easytier():
+ """Stop EasyTier on Windows."""
+ import subprocess as _sp
+ try:
+ _sp.run(["taskkill", "/f", "/im", "easytier-core.exe"], capture_output=True, timeout=5)
+ log("EasyTier stopped (taskkill)")
+ except Exception as e:
+ log(f"EasyTier stop error: {e}")
+ # Clean up PID file
+ try:
+ if os.path.exists(_EASYTIER_PID_FILE):
+ os.remove(_EASYTIER_PID_FILE)
+ except Exception:
+ pass
+
+
+def _check_easytier() -> bool:
+ """Check if EasyTier is running on Windows."""
+ import subprocess as _sp
+ try:
+ r = _sp.run(["tasklist", "/fi", "imagename eq easytier-core.exe"],
+ capture_output=True, text=True, timeout=5)
+ return "easytier-core.exe" in r.stdout
+ except Exception:
+ return False
+
+
+# ═══════════════════════════════════════════════════════════════
+# Message Dedup
+# ═══════════════════════════════════════════════════════════════
+
+_DEDUP_CACHE: set[str] = set()
+_DEDUP_LOCK = threading.Lock()
+
+
+def _is_duplicate(msg_id: str) -> bool:
+ if not msg_id:
+ return False
+ with _DEDUP_LOCK:
+ if msg_id in _DEDUP_CACHE:
+ return True
+ _DEDUP_CACHE.add(msg_id)
+ if len(_DEDUP_CACHE) > 100:
+ _DEDUP_CACHE.clear()
+ return False
+
+
+# ═══════════════════════════════════════════════════════════════
+# Coordinator Protocol (shared across all agents)
+# ═══════════════════════════════════════════════════════════════
+
+_COORDINATOR: str = "mohe"
+_GRANTED: str | None = None
+_REVOKED_UNTIL: float = 0.0
+_SHUTUP_PATTERNS = ["闭嘴", "别说话", "安静", "shut", "stfu", "别说了", "停"]
+
+
+def _process_coordinator_signals(nickname: str, body: str) -> bool:
+ """Parse coordinator/GRANT/REVOKE from incoming messages.
+ Returns True if message was a control signal (consumed, no further processing)."""
+ global _COORDINATOR, _GRANTED, _REVOKED_UNTIL
+ # 1. hmo switches coordinator
+ if nickname == 'hmo' and 'coordinator=' in body.lower():
+ for name in ('mohe', 'zhiwei', 'xxm'):
+ if f'coordinator={name}' in body.lower():
+ _COORDINATOR = name
+ _GRANTED = None
+ log(f"Coordinator switched to {name} by hmo")
+ return True
+ # 2. GRANT signal (overrides REVOKE)
+ gm = re.search(r'\[GRANT:(\w+)\]', body)
+ if gm:
+ _GRANTED = gm.group(1)
+ _REVOKED_UNTIL = 0
+ log(f"GRANT: {_GRANTED}")
+ return True
+ # 3. REVOKE signal (5min auto-restore)
+ rm = re.search(r'\[REVOKE:(\w+)\]', body)
+ if rm and rm.group(1) == cfg["nick"]:
+ _REVOKED_UNTIL = time.time() + 300
+ log(f"REVOKEd: {cfg['nick']} silenced for 5min")
+ return True
+ return False
+
+
+def _check_shutup(body: str) -> bool:
+ """hmo says shut up → 5min silence."""
+ lower = body.lower().strip()
+ for pat in _SHUTUP_PATTERNS:
+ if pat.lower() in lower:
+ _REVOKED_UNTIL = time.time() + 300
+ log(f"(shutup: '{pat}' → 5min silence)")
+ return True
+ return False
+
+
+def _process_llm_grant(response: str):
+ """Parse GRANT signal from LLM's own response."""
+ global _GRANTED
+ gm = re.search(r'\[GRANT:(\w+)\]', response)
+ if gm:
+ _GRANTED = gm.group(1)
+ _REVOKED_UNTIL = 0
+ log(f"LLM GRANT: {_GRANTED}")
+
+
+# ═══════════════════════════════════════════════════════════════
+# Response Extraction
+# ═══════════════════════════════════════════════════════════════
+
+_SILENCE_PATTERNS = [
+ "保持沉默", "不应[该]?回复", "没有.*@.*我", "不是对[我我说]",
+ "跟我无关", "我不用回复", "不该回复", "不参与",
+ "不是我[应]?该[说回]",
+]
+_EXEC_RE = re.compile(r"##exec:(.+?)##", re.DOTALL)
+_DELAY_RE = re.compile(r"##delay:?(\d+)?##")
+_DELAY_DEFAULT = 15
+_EXEC_TIMEOUT = 60
+
+
+def _strip_toolcall_xml(text: str) -> str:
+ t = text
+ t = re.sub(r']*>.*?(|$)', '', t, flags=re.DOTALL)
+ t = re.sub(r'.*?(|$)', '', t, flags=re.DOTALL)
+ t = re.sub(r']*>.*?(|$)', '', t, flags=re.DOTALL)
+ t = re.sub(r'.*?(|$)', '', t, flags=re.DOTALL)
+ return t.strip()
+
+
+def _extract_response(text: str) -> str | None:
+ """Strip __SILENT__, reasoning blocks, natural language silence.
+ Returns actual content to send, or None to stay silent."""
+ if not text:
+ return None
+ t = text.strip()
+ if not t:
+ return None
+ t = _strip_toolcall_xml(t)
+ # Natural language silence detection
+ if not t.startswith("__SILENT__"):
+ first = t.split("\n", 1)[0]
+ for pat in _SILENCE_PATTERNS:
+ if re.search(pat, first):
+ return None
+ return t
+ # Has __SILENT__ prefix
+ parts = t.split("\n", 1)
+ if len(parts) < 2:
+ return None
+ rest = parts[1].strip()
while True:
- server.handle_request()
+ m = re.match(r'^([^)]*)\s*', rest)
+ if m:
+ rest = rest[m.end():]
+ continue
+ m = re.match(r'^\([^)]*\)\s*', rest)
+ if m:
+ rest = rest[m.end():]
+ continue
+ break
+ return rest.strip() or None
-threading.Thread(target=_run_http, daemon=True).start()
-logging.info(f"🚀 {AGENT_NAME} HTTP 桥启动于 :{HTTP_PORT}")
-# ── XMPP Bot 类 ────────────────────────────────────────────────
-class AgentBot(ClientXMPP):
+# ═══════════════════════════════════════════════════════════════
+# Message Batching (3s debounce + serialized processing)
+# ═══════════════════════════════════════════════════════════════
+
+_BATCH_WINDOW = 3.0
+_BATCH_TIMEOUT = 300
+_batch_entries: dict[str, list[str]] = {}
+_batch_timers: dict[str, threading.Timer] = {}
+_batch_processing: set[str] = set()
+_batch_pending: dict[str, list[str]] = {}
+_batch_lock = threading.Lock()
+_BOT_NICK = cfg["nick"]
+
+
+def _run_command(cmd: str) -> str:
+ """Execute ##exec:command## shell command."""
+ log(f"(exec: {cmd[:120]})")
+ try:
+ import subprocess
+ r = subprocess.run(cmd, shell=True, capture_output=True,
+ timeout=_EXEC_TIMEOUT, text=True, encoding='utf-8', errors='replace')
+ out = (r.stdout or "") + (r.stderr or "")
+ out = out.strip() or f"(no output, exit={r.returncode})"
+ log(f"(exec done: {len(out)} bytes, exit={r.returncode})")
+ return out
+ except subprocess.TimeoutExpired:
+ log(f"(exec timeout >{_EXEC_TIMEOUT}s)")
+ return "(命令超时)"
+ except Exception as e:
+ log(f"(exec error: {e})")
+ return f"(命令执行失败: {e})"
+
+
+def _schedule_delayed(delay_sec: int, room: str):
+ """Schedule ##delay:N## re-invocation."""
+ global _xmpp_ref
+ import subprocess as _sp
+
+ def _fire():
+ bot = _xmpp_ref
+ if not bot:
+ return
+ try:
+ prompt = "时间到,请根据最新的信息汇报结果。"
+ raw = _call_llm(prompt, room, is_group=True)
+ reply = _extract_response(raw)
+ if reply:
+ text = reply.strip()
+ bot.send_message(mto=room, mbody=text, mtype='groupchat')
+ log(f"-> [Delay][{room}]: {text[:80]}")
+ except Exception as e:
+ log(f"!! delay err: {e}")
+
+ t = threading.Timer(delay_sec, _fire)
+ t.daemon = True
+ t.start()
+ log(f"(delay +{delay_sec}s → {room})")
+
+
+def _batch_done(room: str):
+ """Called when batch LLM finishes. Flush pending if any."""
+ with _batch_lock:
+ _batch_processing.discard(room)
+ pending = _batch_pending.pop(room, None)
+ if pending:
+ _batch_entries[room] = pending
+ t = threading.Timer(0.1, _fire_batch, args=[room])
+ t.daemon = True
+ t.start()
+ _batch_timers[room] = t
+ return
+ log(f"[Batch][{room}] (idle)")
+
+
+def _fire_batch(room: str):
+ """Collect batched entries and call LLM."""
+ with _batch_lock:
+ entries = _batch_entries.pop(room, None)
+ _batch_timers.pop(room, None)
+ if not entries:
+ return
+ _batch_processing.add(room)
+ combined = "\n".join(entries)
+
+ def _handle():
+ timed_out = [False]
+
+ def _timeout():
+ timed_out[0] = True
+ log(f"[Batch][{room}] TIMEOUT ({_BATCH_TIMEOUT}s)")
+ _batch_done(room)
+
+ timer = threading.Timer(_BATCH_TIMEOUT, _timeout)
+ timer.daemon = True
+ timer.start()
+ try:
+ raw = _call_llm(combined, room, is_group=True)
+ if not timed_out[0]:
+ timer.cancel()
+ _process_llm_reply(raw, room)
+ else:
+ log(f"[Batch][{room}] route returned after timeout, discarded")
+ except Exception as e:
+ log(f"!!! BATCH: {e}")
+ if not timed_out[0]:
+ timer.cancel()
+ _batch_done(room)
+
+ threading.Thread(target=_handle, daemon=True).start()
+
+
+def _batch_group_message(room: str, nickname: str, body: str) -> bool:
+ """Add message to room batch. Returns True if batched, False if @mention (immediate)."""
+ if f"@{_BOT_NICK}" in body or body.startswith(_BOT_NICK):
+ return False
+ formatted = f"[{nickname}]: {body}"
+ with _batch_lock:
+ if room in _batch_processing:
+ _batch_pending.setdefault(room, []).append(formatted)
+ return True
+ timer = _batch_timers.pop(room, None)
+ if timer:
+ timer.cancel()
+ _batch_entries.setdefault(room, []).append(formatted)
+ t = threading.Timer(_BATCH_WINDOW, _fire_batch, args=[room])
+ t.daemon = True
+ t.start()
+ _batch_timers[room] = t
+ return True
+
+
+# ═══════════════════════════════════════════════════════════════
+# MAM Recovery — fetch recent history on startup
+# ═══════════════════════════════════════════════════════════════
+
+_MAM_RECOVERY = True
+_MAM_RECOVERY_LOCK = threading.Lock()
+_MAM_MARK_DONE = False
+_MAM_TIMEOUT = 30
+
+
+def _set_mam_done():
+ global _MAM_RECOVERY
+ with _MAM_RECOVERY_LOCK:
+ _MAM_RECOVERY = False
+
+
+def _is_mam_recovery() -> bool:
+ if time.time() - _START_TIME > _MAM_TIMEOUT:
+ global _MAM_RECOVERY
+ with _MAM_RECOVERY_LOCK:
+ if _MAM_RECOVERY:
+ _MAM_RECOVERY = False
+ log("(MAM recovery timed out, force-disabled)")
+ return _MAM_RECOVERY
+ with _MAM_RECOVERY_LOCK:
+ return _MAM_RECOVERY
+
+
+# ═══════════════════════════════════════════════════════════════
+# XML Escape
+# ═══════════════════════════════════════════════════════════════
+
+def _escape(text: str) -> str:
+ return (text.replace("&", "&").replace("<", "<")
+ .replace(">", ">").replace('"', """))
+
+
+# ═══════════════════════════════════════════════════════════════
+# HTTP Bridge — health, presence, messages, POST send
+# ═══════════════════════════════════════════════════════════════
+
+_HTTP_PORT = cfg["http_port"]
+_MSG_BUF: list[dict] = []
+_MSG_BUF_LOCK = threading.Lock()
+_xmpp_ref = None # set after bot creation
+
+
+def _record_group_msg(nickname: str, body: str):
+ ts = time.strftime("%H:%M:%S")
+ with _MSG_BUF_LOCK:
+ _MSG_BUF.append({"ts": ts, "from": nickname, "body": body})
+ if len(_MSG_BUF) > 200:
+ _MSG_BUF[:] = _MSG_BUF[-150:]
+
+
+class _BridgeHandler(http.server.BaseHTTPRequestHandler):
+ def do_GET(self):
+ parsed = urllib.parse.urlparse(self.path)
+ if parsed.path == "/muc":
+ try:
+ muc_info = {"rooms": {}}
+ bot = _xmpp_ref
+ if bot is not None and 'xep_0045' in bot.plugin:
+ muc_plugin = bot.plugin['xep_0045']
+ for room_jid in cfg["muc_rooms"]:
+ room_data = {"jid": room_jid, "participants": []}
+ try:
+ if room_jid in muc_plugin.rooms:
+ room = muc_plugin.rooms[room_jid]
+ for nick, info in room.get('roster', {}).items():
+ room_data["participants"].append({
+ "nick": nick,
+ "jid": str(info.get('jid', '')),
+ "affiliation": str(info.get('affiliation', '')),
+ "role": str(info.get('role', '')),
+ })
+ except Exception as e:
+ room_data["error"] = str(e)
+ muc_info["rooms"][room_jid] = room_data
+ self._reply(200, muc_info)
+ except Exception as e:
+ self._reply(500, {"ok": False, "error": str(e)})
+ return
+ if parsed.path == "/health":
+ try:
+ bot = _xmpp_ref
+ session_ok = bot.session_started_event.is_set() if (bot and hasattr(bot, 'session_started_event')) else False
+ socket_ok = bot.is_connected() if (bot and hasattr(bot, 'is_connected')) else False
+ self._reply(200, {
+ "ok": True, "xmpp_connected": session_ok or socket_ok,
+ "agent": _agent_name, "jid": cfg["jid"],
+ "uptime_sec": int(time.time() - _START_TIME),
+ "muc_rooms": cfg["muc_rooms"],
+ })
+ except Exception as e:
+ self._reply(500, {"ok": False, "error": str(e)})
+ return
+ if parsed.path.startswith("/presence"):
+ jid_to_check = parsed.path[len("/presence/"):].strip()
+ if not jid_to_check:
+ self._reply(400, {"ok": False, "error": "missing JID"})
+ return
+ try:
+ info = {"jid": jid_to_check, "online": False, "resources": []}
+ bot = _xmpp_ref
+ if bot and hasattr(bot, 'client_roster'):
+ roster = bot.client_roster
+ if jid_to_check in roster:
+ resources = list(roster[jid_to_check].resources.keys())
+ info["online"] = len(resources) > 0
+ info["resources"] = resources
+ self._reply(200, info)
+ except Exception as e:
+ self._reply(500, {"ok": False, "error": str(e)})
+ return
+ if parsed.path == "/messages":
+ try:
+ qs = urllib.parse.parse_qs(parsed.query)
+ sender = qs.get("from", [None])[0]
+ with _MSG_BUF_LOCK:
+ msgs = list(_MSG_BUF)
+ if sender:
+ msgs = [m for m in msgs if m["from"] == sender]
+ self._reply(200, {"ok": True, "count": len(msgs), "messages": msgs[-50:]})
+ except Exception as e:
+ self._reply(500, {"ok": False, "error": str(e)})
+ return
+ self._reply(404, {"ok": False, "error": "not found"})
+
+ def do_POST(self):
+ try:
+ length = int(self.headers.get('Content-Length', 0))
+ body = json.loads(self.rfile.read(length))
+ path = urllib.parse.urlparse(self.path).path.rstrip('/')
+
+ # /easytier endpoint — execute EasyTier action locally (no XMPP DM)
+ if path == "/easytier":
+ action = body.get("action", "")
+ if action == "start":
+ _start_easytier()
+ self._reply(200, {"ok": True, "message": "EasyTier started"})
+ elif action == "stop":
+ _stop_easytier()
+ self._reply(200, {"ok": True, "message": "EasyTier stopped"})
+ elif action == "status":
+ running = _check_easytier()
+ self._reply(200, {"ok": True, "running": running})
+ else:
+ self._reply(400, {"ok": False, "error": "action must be start|stop|status"})
+ return
+
+ to = body.get('to', cfg["muc_rooms"][0])
+ msg = body.get('message', '') or body.get('body', '')
+ msg_type = body.get('type', 'groupchat')
+ if not msg:
+ self._reply(400, {"ok": False, "error": "empty message"})
+ return
+ safe = _escape(msg.strip())
+ bot = _xmpp_ref
+ if bot:
+ bot.send_message(mto=to, mbody=msg.strip(), mtype=msg_type)
+ _record_group_msg(cfg["nick"], msg)
+ log(f"[http] → [{to.split('@')[0]}]: {msg[:80]} (type={msg_type})")
+ self._reply(200, {"ok": True})
+ except Exception as e:
+ self._reply(500, {"ok": False, "error": str(e)})
+
+ def _reply(self, code, data):
+ body = json.dumps(data, ensure_ascii=False).encode('utf-8')
+ self.send_response(code)
+ self.send_header('Content-Type', 'application/json; charset=utf-8')
+ self.send_header('Content-Length', len(body))
+ self.end_headers()
+ self.wfile.write(body)
+
+ def log_message(self, format, *args):
+ pass
+
+
+def _start_http_bridge():
+ _httpd = http.server.HTTPServer(('0.0.0.0', _HTTP_PORT), _BridgeHandler)
+ _t = threading.Thread(target=_httpd.serve_forever, daemon=True)
+ _t.start()
+ log(f"HTTP bridge ready on :{_HTTP_PORT}")
+
+
+# ═══════════════════════════════════════════════════════════════
+# Reply Processing (shared for all LLM responses)
+# ═══════════════════════════════════════════════════════════════
+
+def _process_llm_reply(raw_reply: str, room: str):
+ """Process LLM response: check silence/delay/exec/send."""
+ global _xmpp_ref
+ if not raw_reply:
+ _batch_done(room)
+ return
+ # Parse GRANT signal from LLM response
+ _process_llm_grant(raw_reply)
+ # ##delay:N## → schedule later
+ delay_m = _DELAY_RE.search(raw_reply)
+ if delay_m:
+ sec = int(delay_m.group(1)) if delay_m.group(1) else _DELAY_DEFAULT
+ _schedule_delayed(sec, room)
+ _batch_done(room)
+ return
+ # ##exec:command## → run command, use output as reply
+ exec_m = _EXEC_RE.search(raw_reply)
+ if exec_m:
+ output = _run_command(exec_m.group(1))
+ raw_reply = _EXEC_RE.sub(output, raw_reply, count=1)
+ # Extract actual response
+ reply_text = _extract_response(raw_reply)
+ if reply_text:
+ text = reply_text.strip()
+ bot = _xmpp_ref
+ if bot:
+ bot.send_message(mto=room, mbody=text, mtype='groupchat')
+ log(f"-> [{room.split('@')[0]}]: {text[:80]}")
+ else:
+ log(f"-> [{room.split('@')[0]}]: (silent)")
+ _batch_done(room)
+
+
+# ═══════════════════════════════════════════════════════════════
+# Group message handler
+# ═══════════════════════════════════════════════════════════════
+
+def _handle_group_message(msg):
+ """Process a groupchat message (runs in thread)."""
+ global _COORDINATOR, _GRANTED, _REVOKED_UNTIL
+ if _is_mam_recovery():
+ return
+ msg_id = msg.get("id", "")
+ if _is_duplicate(msg_id):
+ return
+ body = str(msg["body"]).strip()
+ if not body:
+ return
+ full_from = str(msg["from"])
+ room = full_from.split("/")[0]
+ nickname = full_from.split("/")[1] if "/" in full_from else ""
+ # Self-message skip
+ if nickname == cfg["nick"]:
+ log(f"(self) {body[:80]}")
+ return
+ _record_group_msg(nickname, body)
+ # Coordinator signals
+ if _process_coordinator_signals(nickname, body):
+ return
+ # Revoke check
+ is_revoked = time.time() < _REVOKED_UNTIL
+ if is_revoked and _GRANTED == cfg["nick"]:
+ _GRANTED = None
+ is_revoked = False
+ log(f"GRANT overrides REVOKE for {cfg['nick']}")
+ if _check_shutup(body):
+ return
+ if is_revoked:
+ body = f"【只读消息】你被收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {room}] {nickname} 说: {body}"
+ # Batch or immediate (@mention)
+ if _batch_group_message(room, nickname, body):
+ log(f"[{room.split('@')[0]}] {nickname}: {body[:80]} (batched)")
+ return
+ log(f"[{room.split('@')[0]}] {nickname}: {body[:80]}")
+ raw = _call_llm(body, full_from, is_group=True)
+ _process_llm_reply(raw, room)
+
+
+# ═══════════════════════════════════════════════════════════════
+# Private message handler
+# ═══════════════════════════════════════════════════════════════
+
+def _handle_private_message(msg):
+ """Process a private chat message."""
+ global _CALL_SEQ
+ if msg["type"] == "groupchat":
+ return
+ msg_id = msg.get("id", "")
+ if _is_duplicate(msg_id):
+ return
+ body = str(msg["body"]).strip()
+ sender = str(msg["from"]).split("/")[0]
+ log(f"<{sender}> {body[:80]}")
+ if sender == cfg["jid"]:
+ log("(skipped self)")
+ return
+ if time.time() < _REVOKED_UNTIL:
+ log(f"(silenced) <{sender}> dropped")
+ return
+ if _check_shutup(body):
+ return
+ # ── Kanban routing ──
+ _CALL_SEQ += 1
+ is_kanban = body.startswith('[Kanban]')
+ target_sid = _KANBAN_SESSION_ID if is_kanban else cfg["session_id"]
+ if is_kanban:
+ log(f"📋 看板通知(#{_CALL_SEQ}): {body[:80]}")
+ # ── EasyTier toggle ──
+ if body.startswith('[EasyTier]'):
+ action = body.replace('[EasyTier]', '').strip().lower()
+ log(f"🔌 EasyTier command: {action}")
+ if action == 'start':
+ _start_easytier()
+ reply_text = "[EasyTier] started on Windows"
+ elif action == 'stop':
+ _stop_easytier()
+ reply_text = "[EasyTier] stopped on Windows"
+ else:
+ reply_text = f"[EasyTier] unknown action: {action}"
+ bot = _xmpp_ref
+ if bot:
+ bot.send_message(mto=sender, mbody=reply_text, mtype='chat')
+ log(f"-> {sender}: {reply_text}")
+ return
+ raw = _call_llm(body, sender, is_group=False, session_id=target_sid)
+ if raw:
+ reply = _extract_response(raw)
+ if reply:
+ text = reply.strip()
+ bot = _xmpp_ref
+ if bot:
+ bot.send_message(mto=sender, mbody=text, mtype='chat')
+ log(f"-> {sender}: {text[:80]}")
+
+
+# ═══════════════════════════════════════════════════════════════
+# AgentBot Class
+# ═══════════════════════════════════════════════════════════════
+
+import slixmpp
+
+
+class AgentBot(slixmpp.ClientXMPP):
def __init__(self):
- super().__init__(AGENT_JID, cfg["password"])
- self.add_event_handler('session_bind', self.on_bind)
- self.add_event_handler('message', self.on_msg)
- self.add_event_handler('disconnected', self.on_disconnect)
- self.add_event_handler('connected', self.on_connected)
+ super().__init__(cfg["jid"], cfg["password"])
+ # Connection settings
+ self.enable_direct_tls = False
+ self.enable_starttls = True
+ self.auto_reconnect = True
+ self.reconnect_max_delay = 10
+ self.whitespace_keepalive = True
+ self.whitespace_keepalive_interval = 30
+ # SSL: accept self-signed certs
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.ssl_context = ctx
- self.ready = asyncio.Event()
- self._call_seq = 0
- self._muc_joined = False
- self._recent_sent = []
- self._coordinator = 'mohe' # 默认协调者
- self._granted = None
+ # Event handlers
+ self.add_event_handler("session_start", self._on_session_start)
+ self.add_event_handler("message", self._on_any_message)
+ self.add_event_handler("groupchat_message", self._on_group_msg)
+ self.add_event_handler("disconnected", self._on_disconnected)
+ self.add_event_handler("connected", self._on_connected)
+ self.add_event_handler("session_end", self._on_session_end)
+ self.add_event_handler("connection_failed", self._on_conn_failed)
+ # MUC plugin
+ self.register_plugin('xep_0045')
- async def on_connected(self, event):
- logging.info(f"🔗 {AGENT_NAME} TCP连接已建立")
+ def _on_connected(self, event):
+ log("connection established")
- async def on_bind(self, event):
+ def _on_session_start(self, event):
self.send_presence()
self.get_roster()
+ log(f"{cfg['jid']} online")
+ # Register MAM plugin lazily
try:
- self.plugin['xep_0045'].join_muc('coregroup@conference.yoin.fun', AGENT_NICK)
- logging.info(f"✅ {AGENT_NAME} 加入群聊 coregroup")
- except Exception as e:
- logging.error(f"❌ {AGENT_NAME} 加入群聊失败: {e}")
- self._muc_joined = True
- self.ready.set()
- logging.info(f"✅ {AGENT_NAME} XMPP 上线")
-
- async def on_disconnect(self, event):
- self.ready.clear()
- self._muc_joined = False
- logging.warning(f"⚠️ {AGENT_NAME} XMPP 断线")
-
- async def on_msg(self, msg):
- body = msg['body']
- sender = str(msg['from'])
- msg_type = msg['type']
- if not body:
- return
- if msg_type == 'groupchat':
- if AGENT_JID in sender:
- return
- nickname = sender.split('/')[-1] if '/' in sender else ''
- # 自己的消息跳过(通过昵称)
- if nickname == AGENT_NICK:
- return
-
- # Coordinator 模式 — 全走 XMPP 消息
-
- # 1. hmo 切换 coordinator(lead=xxx)
- if nickname == 'hmo':
- if 'lead=' in body.lower():
- for _name in ['mohe', 'zhiwei', 'xxm']:
- if f'lead={_name}' in body.lower():
- self._coordinator = _name
- self._granted = None
- logging.info(f"👑 Coordinator 切换为 {_name}")
- break
- # hmo 直接 @点名 → 临时授权(一次)
- elif any(tag in body for tag in [f'@{AGENT_NICK}', f'@{AGENT_NAME}']):
- self._granted = AGENT_NICK
- logging.info(f"🎤 被 hmo 点名,获得发言权")
-
- # 2. 检测授权信号(优先于收回,GRANT 可以覆盖 REVOKE)
- _grant_match = re.search(r'\[GRANT:(\w+)\]', body)
- if _grant_match:
- self._granted = _grant_match.group(1)
- self._revoked_until = 0 # 被授权时解除收回
- logging.info(f"🎤 收到授权:{self._granted}")
-
- # 3. 检测收回信号
- _revoke_match = re.search(r'\[REVOKE:(\w+)\]', body)
- _revoked_until = getattr(self, '_revoked_until', 0)
- if _revoke_match and _revoke_match.group(1) == AGENT_NICK:
- self._revoked_until = time.time() + 300 # 5分钟自动解除
- logging.info(f"🔇 {AGENT_NAME} 发言权被收回(5分钟后自动恢复)")
- if time.time() < _revoked_until:
- # 被收回者:只读模式,能看到但不能回复
- _rr = sender.split('/')[0]
- _readonly_body = f"【只读消息】你目前被暂时收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {_rr}] {nickname} 说: {body}"
- await self.call_hermes(_readonly_body, sender, is_group=True)
- return
-
- # 3. 判断角色
- _coordinator = getattr(self, '_coordinator', 'mohe')
- _granted = getattr(self, '_granted', None)
- _is_coordinator = (_coordinator == AGENT_NICK)
- _is_granted = (_granted == AGENT_NICK)
-
- if _is_granted:
- self._granted = None # 用完即收回
-
- room = sender.split('/')[0]
-
- # 4. 判断权限:协调者/被授权者可发言,其他人只读
- if not _is_coordinator and not _is_granted:
- _rr = sender.split('/')[0]
- _ro_body = f"【只读消息】你目前不是协调者,只需了解内容。\n\n[核心群 {_rr}] {nickname} 说: {body}"
+ self.register_plugin('xep_0313')
+ except Exception:
+ log("(MAM: xep_0313 not available)")
+ # Join MUC rooms
+ async def _join_all():
+ for room_jid in cfg["muc_rooms"]:
try:
- _ro_payload = json.dumps({
- "model": "hermes-agent",
- "messages": [{"role": "user", "content": _ro_body}]
- }).encode()
- _ro_req = urllib.request.Request(GATEWAY, data=_ro_payload, method="POST")
- _ro_req.add_header("Content-Type", "application/json")
- _ro_req.add_header("Authorization", f"Bearer {API_KEY}")
- _ro_req.add_header("X-Hermes-Session-Id", SESSION_ID)
- _ro_loop = asyncio.get_event_loop()
- await _ro_loop.run_in_executor(None, lambda: _opener.open(_ro_req, timeout=30))
- except Exception:
- pass
- return # 不发送任何回复
+ self.plugin['xep_0045'].join_muc(room_jid, cfg["nick"])
+ presence = (
+ f""
+ f""
+ f""
+ f""
+ )
+ self.send_raw(presence)
+ log(f"Joined {room_jid}")
+ except Exception as e:
+ log(f"MUC join failed {room_jid}: {e}")
+ await asyncio.sleep(2)
+ await asyncio.sleep(3)
+ await self._fetch_mam_history()
+ asyncio.ensure_future(_join_all())
- # 5. 被 REVOKE 的人:只读(虽然上面已经覆盖了,作为额外保障)
- if time.time() < getattr(self, '_revoked_until', 0):
- _rr2 = sender.split('/')[0]
- _rv_body = f"【只读消息】你目前被收回发言权。只需了解内容。\n\n[核心群 {_rr2}] {nickname} 说: {body}"
- try:
- _rv_payload = json.dumps({
- "model": "hermes-agent",
- "messages": [{"role": "user", "content": _rv_body}]
- }).encode()
- _rv_req = urllib.request.Request(GATEWAY, data=_rv_payload, method="POST")
- _rv_req.add_header("Content-Type", "application/json")
- _rv_req.add_header("Authorization", f"Bearer {API_KEY}")
- _rv_req.add_header("X-Hermes-Session-Id", SESSION_ID)
- _rv_loop = asyncio.get_event_loop()
- await _rv_loop.run_in_executor(None, lambda: _opener.open(_rv_req, timeout=30))
- except Exception:
- pass
- return
-
- # 硬闭嘴闸门:hmo 说闭嘴类的话 → 静默 5 分钟
- _silent_until = getattr(self, '_silent_until', 0)
- if time.time() < _silent_until:
- return
- if nickname == 'hmo':
- _sk = ['闭嘴', '别说话', '安静', 'shut', 'stfu', '别说了', '停']
- if any(kw in body.lower() for kw in _sk):
- self._silent_until = time.time() + 300
- logging.info(f"🔇 {AGENT_NAME} 收到闭嘴指令,静默 5 分钟")
- return
-
- logging.info(f"📩 群消息 [{sender}]: {body[:100]}")
- room = sender.split('/')[0]
- ctx_body = (
- "【规则】以下是一条群聊消息。判断是否应该回复。\n"
- "只有以下3种情况你才回复:\n"
- f"1. hmo直接点名问你({AGENT_MENTION})\n"
- "2. 你有其他人没说过的独家信息\n"
- "3. 别人说错了关键事实,不纠正会有后果\n"
- "如果以上都不符合,你的回复必须只包含 __SILENT__ 这10个字符,"
- "不要有任何其他内容(不要前缀、不要解释、不要标点、不要空格)。\n\n"
- "注意:你是协调者(lead)。你的第一职责是管理讨论节奏,不是自己说话。\n"
- "- 别人能回答的问题,不抢答。\n"
- "- 如果其他 Agent 更合适,用 [GRANT:agent名] 授权他们发言(例如 [GRANT:zhiwei])。\n"
- "- hmo 直接 @点名某人 也会自动授权。\n"
- "- 如果有人跑题/刷屏,用 [REVOKE:agent名] 收回发言权(例如 [REVOKE:zhiwei])。\n"
- "- [GRANT] 可以覆盖 [REVOKE]。标记会显示在消息中。\n\n"
- f"[核心群 {room}] {nickname} 说: {body}"
- )
- await self.call_hermes(ctx_body, room, is_group=True)
+ async def _fetch_mam_history(self):
+ """Query MAM for recent MUC messages to rebuild context."""
+ if 'xep_0313' not in self.plugin:
+ log("(MAM: no plugin)")
+ _set_mam_done()
return
- if msg_type == 'chat' and 'hmo@yoin.fun' in sender:
- self._call_seq += 1
- if body.startswith('[Kanban]'):
- # 看板通知 → 走独立 session,不污染主会话
- target_sid = KANBAN_SESSION_ID
- logging.info(f"📋 看板通知(#{self._call_seq}): {body[:80]}")
- else:
- target_sid = SESSION_ID
- logging.info(f"📩 老爸(#{self._call_seq}): {body[:80]}")
- await self.call_hermes(body, sender, seq=self._call_seq, session_id=target_sid)
-
- async def call_hermes(self, content, sender, is_group=False, seq=None, session_id=None):
- msg_type = 'groupchat' if is_group else 'chat'
- sid = session_id or SESSION_ID
+ # MAM recovery used in _on_session_start
try:
- payload = json.dumps({
- "model": "hermes-agent",
- "messages": [{"role": "user", "content": content}]
- }).encode()
- req = urllib.request.Request(GATEWAY, data=payload, method="POST")
- req.add_header("Content-Type", "application/json")
- req.add_header("Authorization", f"Bearer {API_KEY}")
- req.add_header("X-Hermes-Session-Id", sid)
-
- loop = asyncio.get_event_loop()
- result = await loop.run_in_executor(None, lambda: _opener.open(req, timeout=600))
-
- if seq is not None and seq < self._call_seq:
- return
-
- data = json.loads(result.read())
- reply = data.get("choices", [{}])[0].get("message", {}).get("content", "")
- reply_stripped = reply.strip()
- # 检查 __SILENT__ 标记
- if reply_stripped.startswith('__SILENT__') or reply_stripped.startswith('`__SILENT__`'):
- logging.info(f"⏭️ {AGENT_NAME} 决定沉默,不发送")
- return
- # 如果回复里任意位置出现了 __SILENT__,说明 LLM 没理解协议,整条作废
- if '__SILENT__' in reply:
- logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __SILENT__,拦截")
- return
- # 如果回复里出现了 __REPLY__,也是协议混淆,拦截
- if '__REPLY__' in reply:
- logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __REPLY__,拦截")
- return
- # 额外拦截:LLM 说"我沉默""我不说了"等宣布沉默的话→当 SILENT 处理
- _silent_phrases = ['我沉默', '我不说', '不说了', '不回复', '不插嘴', '我闭嘴',
- '闭嘴上', '沉默是', '彻底沉默', '我会沉默', '将保持沉默']
- if any(p in reply for p in _silent_phrases):
- logging.info(f"⏭️ {AGENT_NAME} 宣布沉默(命中关键词),拦截")
- return
- finish = data.get("choices", [{}])[0].get("finish_reason", "")
-
- if reply.strip() and finish != "silent":
- # Coordinator 授权机制:检测回复中的 [GRANT:xxx] 标记
- _grant_match = re.search(r'\[GRANT:(\w+)\]', reply)
- if _grant_match:
- _grant_name = _grant_match.group(1)
- self._granted = _grant_name
- logging.info(f"🎤 授权 {_grant_name} 发言(通过 XMPP 发送)")
- # 不剥离标记,让其他 bot 从 XMPP 消息中解析
-
- if msg_type == 'groupchat':
- self.send_message(mto=sender, mbody=reply, mtype='groupchat')
- # 防回声:记录已发送的消息正文
- sent_norm = reply.strip()[:100]
- self._recent_sent.append(sent_norm)
- if len(self._recent_sent) > 10:
- self._recent_sent.pop(0)
- else:
- import subprocess as sp
- from xml.sax.saxutils import escape
- safe = escape(reply)
- sp.run([
- "docker", "exec", "ejabberd", "ejabberdctl", "send_stanza",
- AGENT_JID, str(sender),
- f"{safe}"
- ], capture_output=True, timeout=10)
- logging.info(f"✅ {AGENT_NAME} 回复: {reply[:80]}")
- except Exception as e:
- logging.error(f"❌ {AGENT_NAME} 错误: {e}")
-
-# ── 主入口 ───────────────────────────────────────────────
-async def main():
- retry_delay = 1
- max_delay = 60
- while True:
- try:
- bot = AgentBot()
- bot.register_plugin('xep_0030')
- bot.register_plugin('xep_0045')
- bot.register_plugin('xep_0199')
-
- bot.connect(host='127.0.0.1', port=5222)
- await asyncio.wait_for(bot.ready.wait(), timeout=30)
- logging.info(f"{AGENT_NAME} XMPP 就绪")
- retry_delay = 1
-
- async def _drain_queue():
- while True:
- await asyncio.sleep(1)
- while _send_queue:
- room, text = _send_queue.pop(0)
+ for room_jid in cfg["muc_rooms"]:
+ log(f"(MAM: querying {room_jid} for last 50 messages...)")
+ results = await self.plugin['xep_0313'].retrieve(
+ jid=room_jid, rsm={'max': 50},
+ )
+ count = 0
+ for msg in results['mam']['results']:
+ forwarded = msg['mam_result']['forwarded']
+ body = str(forwarded['stanza']['body'] or '').strip()
+ if not body:
+ continue
+ nick = str(forwarded['stanza']['from']).split('/')[-1] if '/' in str(forwarded['stanza']['from']) else '?'
+ # Feed into context for chat_bridge (xxm)
+ if _IS_CHAT_BRIDGE:
+ role = 'user' if nick != cfg["nick"] else 'assistant'
try:
- bot.send_message(mto=room, mbody=text, mtype='groupchat')
- sent_norm = text.strip()[:100]
- bot._recent_sent.append(sent_norm)
- if len(bot._recent_sent) > 10:
- bot._recent_sent.pop(0)
- logging.info(f"📤 主动发送到 {room}: {text[:60]}")
- except Exception as e:
- logging.error(f"❌ 主动发送失败: {e}")
- asyncio.create_task(_drain_queue())
-
- while True:
- await asyncio.sleep(15)
- if not bot.is_connected():
- logging.warning("检测到断线,准备重连...")
- break
-
- except asyncio.TimeoutError:
- logging.warning("连接超时,准备重连...")
+ _bridge._append_to_log(role, f"[{nick}]: {body[:300]}")
+ except Exception:
+ pass
+ count += 1
+ log(f"(MAM: loaded {count} msgs from {room_jid})")
+ _set_mam_done()
+ log("(MAM recovery complete)")
except Exception as e:
- logging.error(f"❌ 主循环错误: {e}")
+ log(f"(MAM error: {e})")
+ _set_mam_done()
- logging.info(f"⏳ 等待 {retry_delay} 秒后重连...")
- await asyncio.sleep(retry_delay)
- retry_delay = min(retry_delay * 2, max_delay)
+ def _on_group_msg(self, msg):
+ threading.Thread(target=_handle_group_message, args=[msg], daemon=True).start()
+
+ def _on_any_message(self, msg):
+ threading.Thread(target=_handle_private_message, args=[msg], daemon=True).start()
+
+ def _on_session_end(self, event):
+ log(f"session ended")
+
+ def _on_conn_failed(self, event):
+ log(f"connection failed: {event}")
+
+ def _on_disconnected(self, event):
+ log(f"disconnected, reconnecting...")
+
+
+# ═══════════════════════════════════════════════════════════════
+# Main
+# ═══════════════════════════════════════════════════════════════
+
+def main():
+ log(f"Starting {cfg['jid']} ({cfg['name_cn']}) — agent={_agent_name}")
+ if _IS_CHAT_BRIDGE:
+ log(f" LLM: chat_bridge (session={cfg['session_id']})")
+ else:
+ log(f" LLM: Hermes API ({cfg['gateway']})")
+ log(f" Server: {cfg['server']}:{cfg['port']}")
+ log(f" Rooms: {cfg['muc_rooms']}")
+
+ bot = AgentBot()
+ global _xmpp_ref
+ _xmpp_ref = bot
+
+ _start_http_bridge()
+
+ bot.connect(host=cfg["server"], port=cfg["port"])
+ log(f"Connecting {cfg['jid']}@{cfg['server']}:{cfg['port']}")
+
+ loop = asyncio.get_event_loop()
+
+ async def _status_check():
+ while True:
+ await asyncio.sleep(60)
+ log("(alive)")
+
+ asyncio.ensure_future(_status_check())
-if __name__ == '__main__':
try:
- asyncio.run(main())
+ loop.run_forever()
except KeyboardInterrupt:
- pass
+ log("Shutdown by user")
+ except Exception as e:
+ log(f"!!! MAIN LOOP CRASH: {e}")
+ import traceback
+ log(f"!!! {traceback.format_exc()[:500]}")
+ raise
+
+
+if __name__ == "__main__":
+ main()