#!/usr/bin/env python3 """XMPP Bot - 统一版,支持 --agent mohe|zhiwei|xiao 参数""" import asyncio, logging, ssl, json, urllib.request, os, time, sys, re from slixmpp import ClientXMPP # ── Agent 配置 ────────────────────────────────────────────── AGENTS = { "mohe": { "jid": "mohe@yoin.fun", "password": "hermes123", "nick": "mohe", "name_cn": "莫荷", "http_port": 5804, "gateway": "http://localhost:8642/v1/chat/completions", "session_id": "xmpp-mohe-v2", "mention": "@mohe/@莫荷", }, "zhiwei": { "jid": "zhiwei@yoin.fun", "password": "hermes123", "nick": "zhiwei", "name_cn": "知微", "http_port": 5805, "gateway": "http://localhost:8643/v1/chat/completions", "session_id": "xmpp-zhiwei", "mention": "@zhiwei/@知微", }, "xiaoguo": { "jid": "xiaoguo@yoin.fun", "password": "hermes123", "nick": "xiaoguo", "name_cn": "小果", "http_port": 5806, "gateway": "http://localhost:8645/v1/chat/completions", "session_id": "xmpp-xiaoguo", "mention": "@xiaoguo/@小果", }, } agent = sys.argv[sys.argv.index("--agent") + 1] if "--agent" in sys.argv else "mohe" cfg = AGENTS.get(agent, AGENTS["mohe"]) logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') GATEWAY = cfg["gateway"] API_KEY = "hermes123" AGENT_NICK = cfg["nick"] AGENT_NAME = cfg["name_cn"] AGENT_JID = cfg["jid"] AGENT_MENTION = cfg["mention"] SESSION_ID = cfg["session_id"] HTTP_PORT = cfg["http_port"] _opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) # ── HTTP 桥(接收本地脚本的主动发送请求) ── from http.server import HTTPServer, BaseHTTPRequestHandler import threading, json as json_mod _send_queue = [] class SendHandler(BaseHTTPRequestHandler): def do_POST(self): length = int(self.headers.get('Content-Length', 0)) body = self.rfile.read(length) try: data = json_mod.loads(body) room = data.get('to', 'coregroup@conference.yoin.fun') text = data.get('body', '') if text: _send_queue.append((room, text)) self.send_response(200) self.end_headers() self.wfile.write(b'{"ok":true}') else: self.send_response(400) self.end_headers() self.wfile.write(b'{"ok":false,"error":"empty body"}') except Exception as e: self.send_response(500) self.end_headers() self.wfile.write(f'{{"ok":false,"error":"{e}"}}'.encode()) def _run_http(): server = HTTPServer(('127.0.0.1', HTTP_PORT), SendHandler) server.timeout = 1.0 while True: server.handle_request() threading.Thread(target=_run_http, daemon=True).start() logging.info(f"🚀 {AGENT_NAME} HTTP 桥启动于 :{HTTP_PORT}") # ── XMPP Bot 类 ──────────────────────────────────────────────── class AgentBot(ClientXMPP): def __init__(self): super().__init__(AGENT_JID, cfg["password"]) self.add_event_handler('session_bind', self.on_bind) self.add_event_handler('message', self.on_msg) self.add_event_handler('disconnected', self.on_disconnect) self.add_event_handler('connected', self.on_connected) ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE self.ssl_context = ctx self.ready = asyncio.Event() self._call_seq = 0 self._muc_joined = False self._recent_sent = [] self._coordinator = 'mohe' # 默认协调者 self._granted = None async def on_connected(self, event): logging.info(f"🔗 {AGENT_NAME} TCP连接已建立") async def on_bind(self, event): self.send_presence() self.get_roster() try: self.plugin['xep_0045'].join_muc('coregroup@conference.yoin.fun', AGENT_NICK) logging.info(f"✅ {AGENT_NAME} 加入群聊 coregroup") except Exception as e: logging.error(f"❌ {AGENT_NAME} 加入群聊失败: {e}") self._muc_joined = True self.ready.set() logging.info(f"✅ {AGENT_NAME} XMPP 上线") async def on_disconnect(self, event): self.ready.clear() self._muc_joined = False logging.warning(f"⚠️ {AGENT_NAME} XMPP 断线") async def on_msg(self, msg): body = msg['body'] sender = str(msg['from']) msg_type = msg['type'] if not body: return if msg_type == 'groupchat': if AGENT_JID in sender: return nickname = sender.split('/')[-1] if '/' in sender else '' # 自己的消息跳过(通过昵称) if nickname == AGENT_NICK: return # Coordinator 模式 — 全走 XMPP 消息,不依赖共享 DB # 1. hmo 切换 coordinator → 所有 bot 检测,各自更新本地状态 if nickname == 'hmo' and 'coordinator=' in body.lower(): for _name in ['mohe', 'zhiwei', 'xxm']: if f'coordinator={_name}' in body.lower(): self._coordinator = _name self._granted = None logging.info(f"👑 Coordinator 切换为 {_name}") break # 2. 检测其他 bot 发出的授权信号 [GRANT:xxx] _grant_match = re.search(r'\[GRANT:(\w+)\]', body) if _grant_match: self._granted = _grant_match.group(1) logging.info(f"🎤 收到授权:{self._granted} 获得发言权") # 3. 判断当前 bot 能否处理这条消息 _coordinator = getattr(self, '_coordinator', AGENT_NICK) _granted = getattr(self, '_granted', None) _can_speak = (_coordinator == AGENT_NICK) or (_granted == AGENT_NICK) if _can_speak and _granted == AGENT_NICK: # 被授权者用完即收回 self._granted = None if not _can_speak: return # 代码层拦截,不走 LLM # 硬闭嘴闸门:hmo 说闭嘴类的话 → 静默 5 分钟 _silent_until = getattr(self, '_silent_until', 0) if time.time() < _silent_until: return if nickname == 'hmo': _sk = ['闭嘴', '别说话', '安静', 'shut', 'stfu', '别说了', '停'] if any(kw in body.lower() for kw in _sk): self._silent_until = time.time() + 300 logging.info(f"🔇 {AGENT_NAME} 收到闭嘴指令,静默 5 分钟") return logging.info(f"📩 群消息 [{sender}]: {body[:100]}") room = sender.split('/')[0] ctx_body = ( "【规则】以下是一条群聊消息。判断是否应该回复。\n" "只有以下3种情况你才回复:\n" f"1. hmo直接点名问你({AGENT_MENTION})\n" "2. 你有其他人没说过的独家信息\n" "3. 别人说错了关键事实,不纠正会有后果\n" "如果以上都不符合,你的回复必须只包含 __SILENT__ 这10个字符," "不要有任何其他内容(不要前缀、不要解释、不要标点、不要空格)。\n\n" "注意:你是协调者(coordinator)。如果你认为某个问题应该由其他 Agent 回答," "可以在回复中加入 [GRANT:agent名](例如 [GRANT:zhiwei])," "该 Agent 会在看到标记后获得发言权。标记会显示在消息中。\n\n" f"[核心群 {room}] {nickname} 说: {body}" ) await self.call_hermes(ctx_body, room, is_group=True) return if msg_type == 'chat' and 'hmo@yoin.fun' in sender: self._call_seq += 1 logging.info(f"📩 老爸(#{self._call_seq}): {body}") await self.call_hermes(body, sender, seq=self._call_seq) async def call_hermes(self, content, sender, is_group=False, seq=None): msg_type = 'groupchat' if is_group else 'chat' try: payload = json.dumps({ "model": "hermes-agent", "messages": [{"role": "user", "content": content}] }).encode() req = urllib.request.Request(GATEWAY, data=payload, method="POST") req.add_header("Content-Type", "application/json") req.add_header("Authorization", f"Bearer {API_KEY}") req.add_header("X-Hermes-Session-Id", SESSION_ID) loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, lambda: _opener.open(req, timeout=600)) if seq is not None and seq < self._call_seq: return data = json.loads(result.read()) reply = data.get("choices", [{}])[0].get("message", {}).get("content", "") reply_stripped = reply.strip() # 检查 __SILENT__ 标记 if reply_stripped.startswith('__SILENT__') or reply_stripped.startswith('`__SILENT__`'): logging.info(f"⏭️ {AGENT_NAME} 决定沉默,不发送") return # 如果回复里任意位置出现了 __SILENT__,说明 LLM 没理解协议,整条作废 if '__SILENT__' in reply: logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __SILENT__,拦截") return # 如果回复里出现了 __REPLY__,也是协议混淆,拦截 if '__REPLY__' in reply: logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __REPLY__,拦截") return # 额外拦截:LLM 说"我沉默""我不说了"等宣布沉默的话→当 SILENT 处理 _silent_phrases = ['我沉默', '我不说', '不说了', '不回复', '不插嘴', '我闭嘴', '闭嘴上', '沉默是', '彻底沉默', '我会沉默', '将保持沉默'] if any(p in reply for p in _silent_phrases): logging.info(f"⏭️ {AGENT_NAME} 宣布沉默(命中关键词),拦截") return finish = data.get("choices", [{}])[0].get("finish_reason", "") if reply.strip() and finish != "silent": # Coordinator 授权机制:检测回复中的 [GRANT:xxx] 标记 _grant_match = re.search(r'\[GRANT:(\w+)\]', reply) if _grant_match: _grant_name = _grant_match.group(1) self._granted = _grant_name logging.info(f"🎤 授权 {_grant_name} 发言(通过 XMPP 发送)") # 不剥离标记,让其他 bot 从 XMPP 消息中解析 if msg_type == 'groupchat': self.send_message(mto=sender, mbody=reply, mtype='groupchat') # 防回声:记录已发送的消息正文 sent_norm = reply.strip()[:100] self._recent_sent.append(sent_norm) if len(self._recent_sent) > 10: self._recent_sent.pop(0) else: import subprocess as sp from xml.sax.saxutils import escape safe = escape(reply) sp.run([ "docker", "exec", "ejabberd", "ejabberdctl", "send_stanza", AGENT_JID, str(sender), f"{safe}" ], capture_output=True, timeout=10) logging.info(f"✅ {AGENT_NAME} 回复: {reply[:80]}") except Exception as e: logging.error(f"❌ {AGENT_NAME} 错误: {e}") # ── 主入口 ─────────────────────────────────────────────── async def main(): retry_delay = 1 max_delay = 60 while True: try: bot = AgentBot() bot.register_plugin('xep_0030') bot.register_plugin('xep_0045') bot.register_plugin('xep_0199') bot.connect(host='127.0.0.1', port=5222) await asyncio.wait_for(bot.ready.wait(), timeout=30) logging.info(f"{AGENT_NAME} XMPP 就绪") retry_delay = 1 async def _drain_queue(): while True: await asyncio.sleep(1) while _send_queue: room, text = _send_queue.pop(0) try: bot.send_message(mto=room, mbody=text, mtype='groupchat') sent_norm = text.strip()[:100] bot._recent_sent.append(sent_norm) if len(bot._recent_sent) > 10: bot._recent_sent.pop(0) logging.info(f"📤 主动发送到 {room}: {text[:60]}") except Exception as e: logging.error(f"❌ 主动发送失败: {e}") asyncio.create_task(_drain_queue()) while True: await asyncio.sleep(15) if not bot.is_connected(): logging.warning("检测到断线,准备重连...") break except asyncio.TimeoutError: logging.warning("连接超时,准备重连...") except Exception as e: logging.error(f"❌ 主循环错误: {e}") logging.info(f"⏳ 等待 {retry_delay} 秒后重连...") await asyncio.sleep(retry_delay) retry_delay = min(retry_delay * 2, max_delay) if __name__ == '__main__': try: asyncio.run(main()) except KeyboardInterrupt: pass