#!/usr/bin/env python3 """XMPP Bot - 统一版,支持 --agent mohe|zhiwei|xiao 参数""" import asyncio, logging, ssl, json, urllib.request, os, time, sys, re from slixmpp import ClientXMPP # ── Agent 配置 ────────────────────────────────────────────── AGENTS = { "mohe": { "jid": "mohe@yoin.fun", "password": "hermes123", "nick": "mohe", "name_cn": "莫荷", "http_port": 5802, "gateway": "http://localhost:8642/v1/chat/completions", "session_id": "xmpp-mohe-v2", "kanban_session_id": "xmpp-mohe-kanban", "mention": "@mohe/@莫荷", }, "zhiwei": { "jid": "zhiwei@yoin.fun", "password": "hermes123", "nick": "zhiwei", "name_cn": "知微", "http_port": 5805, "gateway": "http://localhost:8643/v1/chat/completions", "session_id": "xmpp-zhiwei-v2", "kanban_session_id": "xmpp-zhiwei-kanban", "mention": "@zhiwei/@知微", }, "xiaoguo": { "jid": "xiaoguo@yoin.fun", "password": "hermes123", "nick": "xiaoguo", "name_cn": "小果", "http_port": 5806, "gateway": "http://localhost:8645/v1/chat/completions", "session_id": "xmpp-xiaoguo", "kanban_session_id": "xmpp-xiaoguo-kanban", "mention": "@xiaoguo/@小果", }, } agent = sys.argv[sys.argv.index("--agent") + 1] if "--agent" in sys.argv else "mohe" cfg = AGENTS.get(agent, AGENTS["mohe"]) logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') GATEWAY = cfg["gateway"] API_KEY = "hermes123" AGENT_NICK = cfg["nick"] AGENT_NAME = cfg["name_cn"] AGENT_JID = cfg["jid"] AGENT_MENTION = cfg["mention"] SESSION_ID = cfg["session_id"] KANBAN_SESSION_ID = cfg.get("kanban_session_id", SESSION_ID) HTTP_PORT = cfg["http_port"] _opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) # ── 持久化消息队列(入站+出站) ── # 入站:Dad发的消息,先入队再处理,处理失败不丢失 _inbound_queue = asyncio.Queue() # 出站:外部脚本通过HTTP桥提交的待发送消息 _outbound_queue = [] # ── HTTP 桥(接收本地脚本的主动发送请求) ── from http.server import HTTPServer, BaseHTTPRequestHandler import threading, json as json_mod _xmpp_resource = "mohe" class SendHandler(BaseHTTPRequestHandler): def do_POST(self): length = int(self.headers.get('Content-Length', 0)) body = self.rfile.read(length) try: data = json_mod.loads(body) target = data.get('to', 'hmo@yoin.fun') text = data.get('body', '') msg_type = data.get('type', 'chat') if text: _outbound_queue.append((target, text, msg_type)) self.send_response(200) self.end_headers() self.wfile.write(b'{"ok":true}') else: self.send_response(400) self.end_headers() self.wfile.write(b'{"ok":false,"error":"empty body"}') except Exception as e: self.send_response(500) self.end_headers() self.wfile.write(f'{{"ok":false,"error":"{e}"}}'.encode()) def _run_http(): server = HTTPServer(('127.0.0.1', HTTP_PORT), SendHandler) server.timeout = 1.0 while True: try: server.handle_request() except: pass threading.Thread(target=_run_http, daemon=True).start() logging.info(f"🚀 {AGENT_NAME} HTTP 桥启动于 :{HTTP_PORT}") # ── Agent Bot ─────────────────────────────────────────────── class AgentBot(ClientXMPP): def __init__(self): super().__init__(AGENT_JID, cfg["password"]) self.ready = asyncio.Event() self._call_seq = 0 self._recent_sent = [] self._muc_joined = False global _xmpp_resource self.add_event_handler('session_bind', self.on_bind) self.add_event_handler('session_start', self.on_session_start) self.add_event_handler('message', self.on_msg) self.add_event_handler('disconnected', self.on_disconnect) self.add_event_handler('connected', self.on_connected) def on_connected(self, event): self.ready.clear() def on_bind(self, event): global _xmpp_resource bound_jid = str(self.boundjid) if "/" in bound_jid: _xmpp_resource = bound_jid.split("/", 1)[1] logging.info(f"XMPP resource captured: {_xmpp_resource}") logging.info(f"JID set to: {bound_jid}") async def on_session_start(self, event): self.send_presence() # 发送上线presence,否则收不到私聊消息 self.plugin['xep_0045'].join_muc('coregroup@conference.yoin.fun', AGENT_NICK) logging.info(f"✅ {AGENT_NAME} 加入群聊 coregroup") self.ready.set() def on_disconnect(self, event): self._muc_joined = False # ── 入站消息:先入队,异步处理 ── def on_msg(self, msg): if msg['type'] in ('chat', 'groupchat'): body = str(msg['body']).strip() if not body: return sender = str(msg['from']) msg_type = msg['type'] # 防回声 for s in self._recent_sent: if body[:50] in s or s in body[:50]: return # 群聊过滤 if msg_type == 'groupchat': nick = sender.split('/')[-1] if '/' in sender else '' if nick == AGENT_NICK: return mention_list = AGENT_MENTION.replace('@', '').split('/') is_for_me = any(m in body for m in ['@' + m for m in mention_list] + mention_list) if not is_for_me: return logging.info(f"💬 群聊(#{self._call_seq}): {body[:60]}") else: logging.info(f"📩 老爸(#{self._call_seq}): {body[:60]}") self._call_seq += 1 # 入队,不直接调gateway _inbound_queue.put_nowait({ "seq": self._call_seq, "content": body, "sender": sender, "is_group": (msg_type == 'groupchat'), "session_id": SESSION_ID, "ts": time.time(), "bot_ref": self, # 保留bot引用用于回复 }) async def call_hermes(self, content, sender, is_group=False, seq=None, session_id=None): """调gateway处理消息,返回回复文本""" msg_type = 'groupchat' if is_group else 'chat' sid = session_id or SESSION_ID try: payload = json.dumps({ "model": "hermes-agent", "messages": [{"role": "user", "content": content}] }).encode() req = urllib.request.Request(GATEWAY, data=payload, method="POST") req.add_header("Content-Type", "application/json") req.add_header("Authorization", f"Bearer {API_KEY}") req.add_header("X-Hermes-Session-Id", sid) loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, lambda: _opener.open(req, timeout=600)) data = json.loads(result.read()) reply = data.get("choices", [{}])[0].get("message", {}).get("content", "") return reply.strip() except Exception as e: logging.error(f"❌ {AGENT_NAME} gateway调用失败: {e}") return None def send_reply(self, sender, reply, msg_type): """通过bot发送回复""" if not reply: return if reply.startswith('__SILENT__') or '__SILENT__' in reply: logging.info(f"⏭️ {AGENT_NAME} 沉默,不发送") return for phrase in ['我沉默', '我不说', '不说了', '不回复', '不插嘴', '我闭嘴']: if phrase in reply: logging.info(f"⏭️ {AGENT_NAME} 宣布沉默,拦截") return if msg_type == 'groupchat': self.send_message(mto=sender, mbody=reply, mtype='groupchat') else: # 私聊:发到具体resource(Dad发消息时绑定的客户端) self.send_message(mto=sender, mbody=reply, mtype='chat') sent_norm = reply[:100] self._recent_sent.append(sent_norm) if len(self._recent_sent) > 10: self._recent_sent.pop(0) logging.info(f"✅ {AGENT_NAME} 回复: {reply[:80]}") # ── 入站消息处理循环(独立任务,崩了自动重启) ── async def process_inbound(): """从队列消费入站消息,调gateway处理,发回响应。 崩了会自动重启,不会丢消息(队列中的消息会等待下一轮处理)。""" pending_tasks = set() while True: try: item = await _inbound_queue.get() bot = item["bot_ref"] # 创建处理任务,保留引用防止GC task = asyncio.create_task(handle_one(item, bot)) pending_tasks.add(task) task.add_done_callback(pending_tasks.discard) except asyncio.CancelledError: break except Exception as e: logging.error(f"❌ 入站处理循环异常(已恢复): {e}") await asyncio.sleep(1) continue async def handle_one(item, bot): """处理单条入站消息""" try: reply = await bot.call_hermes( item["content"], item["sender"], is_group=item["is_group"], seq=item["seq"], session_id=item["session_id"] ) if reply: if seq := item.get("seq"): if seq < bot._call_seq - 5: return # 太旧的消息,跳过 msg_type = 'groupchat' if item["is_group"] else 'chat' bot.send_reply(item["sender"], reply, msg_type) else: logging.warning(f"⚠️ 消息#{item['seq']} gateway返回空,保留在队列") # 放回队尾等重试(最多3次) retry_count = item.get("retry", 0) if retry_count < 3: item["retry"] = retry_count + 1 item["ts"] = time.time() _inbound_queue.put_nowait(item) except Exception as e: logging.error(f"❌ 处理消息#{item.get('seq','?')}异常: {e}") # ── 出站消息发送循环 ── async def drain_outbound(bot): """从_outbound_queue取消息发送。独立循环,崩了自动重启。""" while True: try: await asyncio.sleep(0.5) while _outbound_queue: target, text, msg_type = _outbound_queue.pop(0) try: bot.send_message(mto=target, mbody=text, mtype=msg_type) sent_norm = text.strip()[:100] bot._recent_sent.append(sent_norm) if len(bot._recent_sent) > 10: bot._recent_sent.pop(0) logging.info(f"📤 主动发送到 {target}: {text[:60]}") except Exception as e: logging.error(f"❌ 主动发送失败: {e}") _outbound_queue.insert(0, (target, text, msg_type)) # 放回队首重试 await asyncio.sleep(3) break except asyncio.CancelledError: break except Exception as e: logging.error(f"❌ 出站循环异常(已恢复): {e}") await asyncio.sleep(1) continue # ── 主入口 ─────────────────────────────────────────────── async def main(): retry_delay = 1 max_delay = 60 while True: bot = None inbound_task = None outbound_task = None try: bot = AgentBot() bot.register_plugin('xep_0030') bot.register_plugin('xep_0045') bot.register_plugin('xep_0199') bot.connect(host='127.0.0.1', port=5222) await asyncio.wait_for(bot.ready.wait(), timeout=30) logging.info(f"{AGENT_NAME} XMPP 就绪") retry_delay = 1 # 启动独立处理循环 inbound_task = asyncio.create_task(process_inbound()) outbound_task = asyncio.create_task(drain_outbound(bot)) while True: await asyncio.sleep(15) if not bot.is_connected(): logging.warning("检测到断线,准备重连...") break # XMPP心跳检测:如果run_filters任务挂了,is_connected可能仍为True try: ping_ok = await asyncio.wait_for( bot.plugin['xep_0199'].send_ping(AGENT_JID, timeout=5), timeout=8 ) if not ping_ok: logging.warning("XMPP心跳超时(XEP-0199),准备重连...") break except asyncio.TimeoutError: logging.warning("XMPP心跳超时,准备重连...") break except Exception: # xep_0199可能因run_filters已死而抛出异常 logging.warning("XMPP心跳异常(run_filters可能已死),准备重连...") break except asyncio.TimeoutError: logging.warning("连接超时,准备重连...") except Exception as e: logging.error(f"❌ 主循环错误: {e}") finally: # 取消任务但不丢队列 for t in [inbound_task, outbound_task]: if t and not t.done(): t.cancel() if bot: try: bot.disconnect() except: pass # 等待旧session完全释放,防止两个bot抢资源 await asyncio.sleep(2) logging.info(f"⏳ 等待 {retry_delay} 秒后重连...") await asyncio.sleep(retry_delay) retry_delay = min(retry_delay * 2, max_delay) if __name__ == '__main__': try: asyncio.run(main()) except KeyboardInterrupt: pass