Files
AgentsMeeting/xmpp_bot.py
T
hmo 1b2b935832 Initial: multi-agent XMPP communication system with dashboard
- Platform-based architecture (Windows/Linux/Mac)
- Agent instance registry (agents.yaml)
- Management dashboard with cross-platform monitoring
- xmpp_bot with HTTP bridge + health endpoints
- wechat_agent with WeChat-Hermes bridging
- Platform services: ProcessGuardian, HealthProbe, APIRouter, ChannelBridge
- Deployment: systemd (Linux) + PowerShell (Windows)
- Monitoring: SSH+ejabberdctl for cross-platform presence
2026-06-12 21:51:36 +08:00

144 lines
5.9 KiB
Python

#!/usr/bin/env python3
"""XMPP Bot mohe@yoin.fun - 稳定重连版"""
import asyncio, logging, ssl, json, urllib.request, os, time, re
from slixmpp import ClientXMPP
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
GATEWAY = "http://localhost:8642/v1/chat/completions"
API_KEY = "hermes123"
_opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
class MoheBot(ClientXMPP):
def __init__(self):
super().__init__('mohe@yoin.fun', 'hermes123')
self.add_event_handler('session_bind', self.on_bind)
self.add_event_handler('message', self.on_msg)
self.add_event_handler('disconnected', self.on_disconnect)
self.add_event_handler('connected', self.on_connected)
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.ssl_context = ctx
self.ready = asyncio.Event()
self._call_seq = 0
self._muc_joined = False
async def on_connected(self, event):
logging.info("🔗 TCP连接已建立")
async def on_bind(self, event):
self.send_presence()
self.get_roster()
# 加入内核组(每次重连后重新加入)
self.plugin['xep_0045'].join_muc('coregroup@conference.yoin.fun', 'mohe')
self._muc_joined = True
self.ready.set()
logging.info("✅ 莫荷 XMPP 上线")
async def on_disconnect(self, event):
self.ready.clear()
self._muc_joined = False
logging.warning("⚠️ XMPP 断线")
async def on_msg(self, msg):
body = msg['body']
sender = str(msg['from'])
msg_type = msg['type']
if not body:
return
if msg_type == 'groupchat':
if 'mohe@yoin.fun' in sender:
return
nickname = sender.split('/')[-1] if '/' in sender else ''
if nickname in ('hmo', 'xxm'):
logging.info(f"📩 群消息 [{sender}]: {body[:100]}")
room = sender.split('/')[0]
ctx_body = f"[核心群 {room}] {nickname} 说: {body}"
await self.call_hermes(ctx_body, room, is_group=True)
return
if msg_type == 'chat' and 'hmo@yoin.fun' in sender:
self._call_seq += 1
logging.info(f"📩 老爸(#{self._call_seq}): {body}")
await self.call_hermes(body, sender, seq=self._call_seq)
async def call_hermes(self, content, sender, is_group=False, seq=None):
msg_type = 'groupchat' if is_group else 'chat'
try:
payload = json.dumps({
"model": "hermes-agent",
"messages": [{"role": "user", "content": content}]
}).encode()
req = urllib.request.Request(GATEWAY, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", f"Bearer {API_KEY}")
req.add_header("X-Hermes-Session-Id", "xmpp-mohe-v2")
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, lambda: _opener.open(req, timeout=600))
if seq is not None and seq < self._call_seq:
return
data = json.loads(result.read())
reply = data.get("choices", [{}])[0].get("message", {}).get("content", "")
# 处理 __SILENT__ 和 __REPLY__ 标记
if reply.strip().startswith('__SILENT__'):
logging.info("⏭️ 决定沉默,不发送")
return
reply = re.sub(r'^__REPLY__\s*', '', reply)
finish = data.get("choices", [{}])[0].get("finish_reason", "")
if reply.strip() and finish != "silent":
if msg_type == 'groupchat':
self.send_message(mto=sender, mbody=reply, mtype='groupchat')
else:
import subprocess as sp
from xml.sax.saxutils import escape
safe = escape(reply)
sp.run([
"docker", "exec", "ejabberd", "ejabberdctl", "send_stanza",
"mohe@yoin.fun", str(sender),
f"<message from='mohe@yoin.fun' to='{sender}' type='chat' xml:lang='en'><body>{safe}</body></message>"
], capture_output=True, timeout=10)
logging.info(f"✅ 回复: {reply[:80]}")
except Exception as e:
logging.error(f"❌ 错误: {e}")
async def main():
retry_delay = 1 # 初始重试间隔(秒)
max_delay = 60 # 最大重试间隔
while True:
try:
bot = MoheBot()
bot.register_plugin('xep_0030') # Service Discovery
bot.register_plugin('xep_0045') # MUC
bot.register_plugin('xep_0199') # XMPP Ping(保活)
bot.connect(host='127.0.0.1', port=5222)
await asyncio.wait_for(bot.ready.wait(), timeout=30)
logging.info("莫荷 XMPP 就绪")
retry_delay = 1 # 连接成功后重置重试间隔
# 保持运行,断线时自动重连
while True:
await asyncio.sleep(15)
if not bot.is_connected():
logging.warning("检测到断线,准备重连...")
break
except asyncio.TimeoutError:
logging.warning("连接超时,准备重连...")
except Exception as e:
logging.error(f"❌ 主循环错误: {e}")
# 指数退避重连:1s → 2s → 4s → 8s → ... → 60s max
logging.info(f"⏳ 等待 {retry_delay} 秒后重连...")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, max_delay)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass