#!/usr/bin/env python3 """XMPP Bot mohe@yoin.fun - 稳定重连版""" import asyncio, logging, ssl, json, urllib.request, os, time, re from slixmpp import ClientXMPP logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') GATEWAY = "http://localhost:8642/v1/chat/completions" API_KEY = "hermes123" _opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) class MoheBot(ClientXMPP): def __init__(self): super().__init__('mohe@yoin.fun', 'hermes123') self.add_event_handler('session_bind', self.on_bind) self.add_event_handler('message', self.on_msg) self.add_event_handler('disconnected', self.on_disconnect) self.add_event_handler('connected', self.on_connected) ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE self.ssl_context = ctx self.ready = asyncio.Event() self._call_seq = 0 self._muc_joined = False async def on_connected(self, event): logging.info("🔗 TCP连接已建立") async def on_bind(self, event): self.send_presence() self.get_roster() # 加入内核组(每次重连后重新加入) self.plugin['xep_0045'].join_muc('coregroup@conference.yoin.fun', 'mohe') self._muc_joined = True self.ready.set() logging.info("✅ 莫荷 XMPP 上线") async def on_disconnect(self, event): self.ready.clear() self._muc_joined = False logging.warning("⚠️ XMPP 断线") async def on_msg(self, msg): body = msg['body'] sender = str(msg['from']) msg_type = msg['type'] if not body: return if msg_type == 'groupchat': if 'mohe@yoin.fun' in sender: return nickname = sender.split('/')[-1] if '/' in sender else '' if nickname in ('hmo', 'xxm'): logging.info(f"📩 群消息 [{sender}]: {body[:100]}") room = sender.split('/')[0] ctx_body = f"[核心群 {room}] {nickname} 说: {body}" await self.call_hermes(ctx_body, room, is_group=True) return if msg_type == 'chat' and 'hmo@yoin.fun' in sender: self._call_seq += 1 logging.info(f"📩 老爸(#{self._call_seq}): {body}") await self.call_hermes(body, sender, seq=self._call_seq) async def call_hermes(self, content, sender, is_group=False, seq=None): msg_type = 'groupchat' if is_group else 'chat' try: payload = json.dumps({ "model": "hermes-agent", "messages": [{"role": "user", "content": content}] }).encode() req = urllib.request.Request(GATEWAY, data=payload, method="POST") req.add_header("Content-Type", "application/json") req.add_header("Authorization", f"Bearer {API_KEY}") req.add_header("X-Hermes-Session-Id", "xmpp-mohe-v2") loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, lambda: _opener.open(req, timeout=600)) if seq is not None and seq < self._call_seq: return data = json.loads(result.read()) reply = data.get("choices", [{}])[0].get("message", {}).get("content", "") # 处理 __SILENT__ 和 __REPLY__ 标记 if reply.strip().startswith('__SILENT__'): logging.info("⏭️ 决定沉默,不发送") return reply = re.sub(r'^__REPLY__\s*', '', reply) finish = data.get("choices", [{}])[0].get("finish_reason", "") if reply.strip() and finish != "silent": if msg_type == 'groupchat': self.send_message(mto=sender, mbody=reply, mtype='groupchat') else: import subprocess as sp from xml.sax.saxutils import escape safe = escape(reply) sp.run([ "docker", "exec", "ejabberd", "ejabberdctl", "send_stanza", "mohe@yoin.fun", str(sender), f"{safe}" ], capture_output=True, timeout=10) logging.info(f"✅ 回复: {reply[:80]}") except Exception as e: logging.error(f"❌ 错误: {e}") async def main(): retry_delay = 1 # 初始重试间隔(秒) max_delay = 60 # 最大重试间隔 while True: try: bot = MoheBot() bot.register_plugin('xep_0030') # Service Discovery bot.register_plugin('xep_0045') # MUC bot.register_plugin('xep_0199') # XMPP Ping(保活) bot.connect(host='127.0.0.1', port=5222) await asyncio.wait_for(bot.ready.wait(), timeout=30) logging.info("莫荷 XMPP 就绪") retry_delay = 1 # 连接成功后重置重试间隔 # 保持运行,断线时自动重连 while True: await asyncio.sleep(15) if not bot.is_connected(): logging.warning("检测到断线,准备重连...") break except asyncio.TimeoutError: logging.warning("连接超时,准备重连...") except Exception as e: logging.error(f"❌ 主循环错误: {e}") # 指数退避重连:1s → 2s → 4s → 8s → ... → 60s max logging.info(f"⏳ 等待 {retry_delay} 秒后重连...") await asyncio.sleep(retry_delay) retry_delay = min(retry_delay * 2, max_delay) if __name__ == '__main__': try: asyncio.run(main()) except KeyboardInterrupt: pass