Files
AgentsMeeting/xmpp_agent_core.py
T

285 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""XMPP Bot - 统一版,支持 --agent mohe|zhiwei|xiao 参数"""
import asyncio, logging, ssl, json, urllib.request, os, time, sys
from slixmpp import ClientXMPP
# ── Agent 配置 ──────────────────────────────────────────────
AGENTS = {
"mohe": {
"jid": "mohe@yoin.fun",
"password": "hermes123",
"nick": "mohe",
"name_cn": "莫荷",
"http_port": 5804,
"gateway": "http://localhost:8642/v1/chat/completions",
"session_id": "xmpp-mohe-v2",
"mention": "@mohe/@莫荷",
},
"zhiwei": {
"jid": "zhiwei@yoin.fun",
"password": "hermes123",
"nick": "zhiwei",
"name_cn": "知微",
"http_port": 5805,
"gateway": "http://localhost:8643/v1/chat/completions",
"session_id": "xmpp-zhiwei",
"mention": "@zhiwei/@知微",
},
"xiaoguo": {
"jid": "xiaoguo@yoin.fun",
"password": "hermes123",
"nick": "xiaoguo",
"name_cn": "小果",
"http_port": 5806,
"gateway": "http://localhost:8645/v1/chat/completions",
"session_id": "xmpp-xiaoguo",
"mention": "@xiaoguo/@小果",
},
}
agent = sys.argv[sys.argv.index("--agent") + 1] if "--agent" in sys.argv else "mohe"
cfg = AGENTS.get(agent, AGENTS["mohe"])
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
GATEWAY = cfg["gateway"]
API_KEY = "hermes123"
AGENT_NICK = cfg["nick"]
AGENT_NAME = cfg["name_cn"]
AGENT_JID = cfg["jid"]
AGENT_MENTION = cfg["mention"]
SESSION_ID = cfg["session_id"]
HTTP_PORT = cfg["http_port"]
_opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
# ── HTTP 桥(接收本地脚本的主动发送请求) ──
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading, json as json_mod
_send_queue = []
class SendHandler(BaseHTTPRequestHandler):
def do_POST(self):
length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(length)
try:
data = json_mod.loads(body)
room = data.get('to', 'coregroup@conference.yoin.fun')
text = data.get('body', '')
if text:
_send_queue.append((room, text))
self.send_response(200)
self.end_headers()
self.wfile.write(b'{"ok":true}')
else:
self.send_response(400)
self.end_headers()
self.wfile.write(b'{"ok":false,"error":"empty body"}')
except Exception as e:
self.send_response(500)
self.end_headers()
self.wfile.write(f'{{"ok":false,"error":"{e}"}}'.encode())
def _run_http():
server = HTTPServer(('127.0.0.1', HTTP_PORT), SendHandler)
server.timeout = 1.0
while True:
server.handle_request()
threading.Thread(target=_run_http, daemon=True).start()
logging.info(f"🚀 {AGENT_NAME} HTTP 桥启动于 :{HTTP_PORT}")
# ── XMPP Bot 类 ────────────────────────────────────────────────
class AgentBot(ClientXMPP):
def __init__(self):
super().__init__(AGENT_JID, cfg["password"])
self.add_event_handler('session_bind', self.on_bind)
self.add_event_handler('message', self.on_msg)
self.add_event_handler('disconnected', self.on_disconnect)
self.add_event_handler('connected', self.on_connected)
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.ssl_context = ctx
self.ready = asyncio.Event()
self._call_seq = 0
self._muc_joined = False
self._recent_sent = []
async def on_connected(self, event):
logging.info(f"🔗 {AGENT_NAME} TCP连接已建立")
async def on_bind(self, event):
self.send_presence()
self.get_roster()
try:
self.plugin['xep_0045'].join_muc('coregroup@conference.yoin.fun', AGENT_NICK)
logging.info(f"{AGENT_NAME} 加入群聊 coregroup")
except Exception as e:
logging.error(f"{AGENT_NAME} 加入群聊失败: {e}")
self._muc_joined = True
self.ready.set()
logging.info(f"{AGENT_NAME} XMPP 上线")
async def on_disconnect(self, event):
self.ready.clear()
self._muc_joined = False
logging.warning(f"⚠️ {AGENT_NAME} XMPP 断线")
async def on_msg(self, msg):
body = msg['body']
sender = str(msg['from'])
msg_type = msg['type']
if not body:
return
if msg_type == 'groupchat':
if AGENT_JID in sender:
return
nickname = sender.split('/')[-1] if '/' in sender else ''
# 自己的消息跳过(通过昵称)
if nickname == AGENT_NICK:
return
# 硬闭嘴闸门:hmo 说闭嘴类的话 → 静默 5 分钟
_silent_until = getattr(self, '_silent_until', 0)
if time.time() < _silent_until:
return
if nickname == 'hmo':
_sk = ['闭嘴', '别说话', '安静', 'shut', 'stfu', '别说了', '']
if any(kw in body.lower() for kw in _sk):
self._silent_until = time.time() + 300
logging.info(f"🔇 {AGENT_NAME} 收到闭嘴指令,静默 5 分钟")
return
logging.info(f"📩 群消息 [{sender}]: {body[:100]}")
room = sender.split('/')[0]
ctx_body = (
"【规则】以下是一条群聊消息。判断是否应该回复。\n"
"只有以下3种情况你才回复:\n"
f"1. hmo直接点名问你({AGENT_MENTION}\n"
"2. 你有其他人没说过的独家信息\n"
"3. 别人说错了关键事实,不纠正会有后果\n"
"如果以上都不符合,你的回复必须只包含 __SILENT__ 这10个字符,"
"不要有任何其他内容(不要前缀、不要解释、不要标点、不要空格)。\n\n"
f"[核心群 {room}] {nickname} 说: {body}"
)
await self.call_hermes(ctx_body, room, is_group=True)
return
if msg_type == 'chat' and 'hmo@yoin.fun' in sender:
self._call_seq += 1
logging.info(f"📩 老爸(#{self._call_seq}): {body}")
await self.call_hermes(body, sender, seq=self._call_seq)
async def call_hermes(self, content, sender, is_group=False, seq=None):
msg_type = 'groupchat' if is_group else 'chat'
try:
payload = json.dumps({
"model": "hermes-agent",
"messages": [{"role": "user", "content": content}]
}).encode()
req = urllib.request.Request(GATEWAY, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", f"Bearer {API_KEY}")
req.add_header("X-Hermes-Session-Id", SESSION_ID)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, lambda: _opener.open(req, timeout=600))
if seq is not None and seq < self._call_seq:
return
data = json.loads(result.read())
reply = data.get("choices", [{}])[0].get("message", {}).get("content", "")
reply_stripped = reply.strip()
# 检查 __SILENT__ 标记
if reply_stripped.startswith('__SILENT__') or reply_stripped.startswith('`__SILENT__`'):
logging.info(f"⏭️ {AGENT_NAME} 决定沉默,不发送")
return
# 如果回复里任意位置出现了 __SILENT__,说明 LLM 没理解协议,整条作废
if '__SILENT__' in reply:
logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __SILENT__,拦截")
return
# 如果回复里出现了 __REPLY__,也是协议混淆,拦截
if '__REPLY__' in reply:
logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __REPLY__,拦截")
return
# 额外拦截:LLM 说"我沉默""我不说了"等宣布沉默的话→当 SILENT 处理
_silent_phrases = ['我沉默', '我不说', '不说了', '不回复', '不插嘴', '我闭嘴',
'闭嘴上', '沉默是', '彻底沉默', '我会沉默', '将保持沉默']
if any(p in reply for p in _silent_phrases):
logging.info(f"⏭️ {AGENT_NAME} 宣布沉默(命中关键词),拦截")
return
finish = data.get("choices", [{}])[0].get("finish_reason", "")
if reply.strip() and finish != "silent":
if msg_type == 'groupchat':
self.send_message(mto=sender, mbody=reply, mtype='groupchat')
sent_norm = reply.strip()[:100]
self._recent_sent.append(sent_norm)
if len(self._recent_sent) > 10:
self._recent_sent.pop(0)
else:
import subprocess as sp
from xml.sax.saxutils import escape
safe = escape(reply)
sp.run([
"docker", "exec", "ejabberd", "ejabberdctl", "send_stanza",
AGENT_JID, str(sender),
f"<message from='{AGENT_JID}' to='{sender}' type='chat' xml:lang='en'><body>{safe}</body></message>"
], capture_output=True, timeout=10)
logging.info(f"{AGENT_NAME} 回复: {reply[:80]}")
except Exception as e:
logging.error(f"{AGENT_NAME} 错误: {e}")
# ── 主入口 ───────────────────────────────────────────────
async def main():
retry_delay = 1
max_delay = 60
while True:
try:
bot = AgentBot()
bot.register_plugin('xep_0030')
bot.register_plugin('xep_0045')
bot.register_plugin('xep_0199')
bot.connect(host='127.0.0.1', port=5222)
await asyncio.wait_for(bot.ready.wait(), timeout=30)
logging.info(f"{AGENT_NAME} XMPP 就绪")
retry_delay = 1
async def _drain_queue():
while True:
await asyncio.sleep(1)
while _send_queue:
room, text = _send_queue.pop(0)
try:
bot.send_message(mto=room, mbody=text, mtype='groupchat')
sent_norm = text.strip()[:100]
bot._recent_sent.append(sent_norm)
if len(bot._recent_sent) > 10:
bot._recent_sent.pop(0)
logging.info(f"📤 主动发送到 {room}: {text[:60]}")
except Exception as e:
logging.error(f"❌ 主动发送失败: {e}")
asyncio.create_task(_drain_queue())
while True:
await asyncio.sleep(15)
if not bot.is_connected():
logging.warning("检测到断线,准备重连...")
break
except asyncio.TimeoutError:
logging.warning("连接超时,准备重连...")
except Exception as e:
logging.error(f"❌ 主循环错误: {e}")
logging.info(f"⏳ 等待 {retry_delay} 秒后重连...")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, max_delay)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass