Files
AgentsMeeting/xmpp_agent_core.py
T

335 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""XMPP Bot - 统一版,支持 --agent mohe|zhiwei|xiao 参数"""
import asyncio, logging, ssl, json, urllib.request, os, time, sys, re
from slixmpp import ClientXMPP
# ── Agent 配置 ──────────────────────────────────────────────
AGENTS = {
"mohe": {
"jid": "mohe@yoin.fun",
"password": "hermes123",
"nick": "mohe",
"name_cn": "莫荷",
"http_port": 5804,
"gateway": "http://localhost:8642/v1/chat/completions",
"session_id": "xmpp-mohe-v2",
"mention": "@mohe/@莫荷",
},
"zhiwei": {
"jid": "zhiwei@yoin.fun",
"password": "hermes123",
"nick": "zhiwei",
"name_cn": "知微",
"http_port": 5805,
"gateway": "http://localhost:8643/v1/chat/completions",
"session_id": "xmpp-zhiwei",
"mention": "@zhiwei/@知微",
},
"xiaoguo": {
"jid": "xiaoguo@yoin.fun",
"password": "hermes123",
"nick": "xiaoguo",
"name_cn": "小果",
"http_port": 5806,
"gateway": "http://localhost:8645/v1/chat/completions",
"session_id": "xmpp-xiaoguo",
"mention": "@xiaoguo/@小果",
},
}
agent = sys.argv[sys.argv.index("--agent") + 1] if "--agent" in sys.argv else "mohe"
cfg = AGENTS.get(agent, AGENTS["mohe"])
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
GATEWAY = cfg["gateway"]
API_KEY = "hermes123"
AGENT_NICK = cfg["nick"]
AGENT_NAME = cfg["name_cn"]
AGENT_JID = cfg["jid"]
AGENT_MENTION = cfg["mention"]
SESSION_ID = cfg["session_id"]
HTTP_PORT = cfg["http_port"]
_opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
# ── HTTP 桥(接收本地脚本的主动发送请求) ──
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading, json as json_mod
_send_queue = []
class SendHandler(BaseHTTPRequestHandler):
def do_POST(self):
length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(length)
try:
data = json_mod.loads(body)
room = data.get('to', 'coregroup@conference.yoin.fun')
text = data.get('body', '')
if text:
_send_queue.append((room, text))
self.send_response(200)
self.end_headers()
self.wfile.write(b'{"ok":true}')
else:
self.send_response(400)
self.end_headers()
self.wfile.write(b'{"ok":false,"error":"empty body"}')
except Exception as e:
self.send_response(500)
self.end_headers()
self.wfile.write(f'{{"ok":false,"error":"{e}"}}'.encode())
def _run_http():
server = HTTPServer(('127.0.0.1', HTTP_PORT), SendHandler)
server.timeout = 1.0
while True:
server.handle_request()
threading.Thread(target=_run_http, daemon=True).start()
logging.info(f"🚀 {AGENT_NAME} HTTP 桥启动于 :{HTTP_PORT}")
# ── XMPP Bot 类 ────────────────────────────────────────────────
class AgentBot(ClientXMPP):
def __init__(self):
super().__init__(AGENT_JID, cfg["password"])
self.add_event_handler('session_bind', self.on_bind)
self.add_event_handler('message', self.on_msg)
self.add_event_handler('disconnected', self.on_disconnect)
self.add_event_handler('connected', self.on_connected)
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.ssl_context = ctx
self.ready = asyncio.Event()
self._call_seq = 0
self._muc_joined = False
self._recent_sent = []
self._coordinator = 'mohe' # 默认协调者
self._granted = None
async def on_connected(self, event):
logging.info(f"🔗 {AGENT_NAME} TCP连接已建立")
async def on_bind(self, event):
self.send_presence()
self.get_roster()
try:
self.plugin['xep_0045'].join_muc('coregroup@conference.yoin.fun', AGENT_NICK)
logging.info(f"{AGENT_NAME} 加入群聊 coregroup")
except Exception as e:
logging.error(f"{AGENT_NAME} 加入群聊失败: {e}")
self._muc_joined = True
self.ready.set()
logging.info(f"{AGENT_NAME} XMPP 上线")
async def on_disconnect(self, event):
self.ready.clear()
self._muc_joined = False
logging.warning(f"⚠️ {AGENT_NAME} XMPP 断线")
async def on_msg(self, msg):
body = msg['body']
sender = str(msg['from'])
msg_type = msg['type']
if not body:
return
if msg_type == 'groupchat':
if AGENT_JID in sender:
return
nickname = sender.split('/')[-1] if '/' in sender else ''
# 自己的消息跳过(通过昵称)
if nickname == AGENT_NICK:
return
# Coordinator 模式 — 全走 XMPP 消息,不依赖共享 DB
# 1. hmo 切换 coordinator → 所有 bot 检测,各自更新本地状态
if nickname == 'hmo' and 'coordinator=' in body.lower():
for _name in ['mohe', 'zhiwei', 'xxm']:
if f'coordinator={_name}' in body.lower():
self._coordinator = _name
self._granted = None
logging.info(f"👑 Coordinator 切换为 {_name}")
break
# 2. 检测其他 bot 发出的授权信号 [GRANT:xxx]
_grant_match = re.search(r'\[GRANT:(\w+)\]', body)
if _grant_match:
self._granted = _grant_match.group(1)
logging.info(f"🎤 收到授权:{self._granted} 获得发言权")
# 3. 判断当前 bot 能否处理这条消息
_coordinator = getattr(self, '_coordinator', AGENT_NICK)
_granted = getattr(self, '_granted', None)
_can_speak = (_coordinator == AGENT_NICK) or (_granted == AGENT_NICK)
if _can_speak and _granted == AGENT_NICK:
# 被授权者用完即收回
self._granted = None
if not _can_speak:
# 无权发言 → 仍发给 LLM 用于上下文积累,但标记为只读
_room = sender.split('/')[0]
_readonly_body = (
"【只读消息】你不需要回复,只需了解内容以便后续上下文连贯。输出 __SILENT__。\n\n"
f"[核心群 {_room}] {nickname} 说: {body}"
)
await self.call_hermes(_readonly_body, sender, is_group=True)
return
# 硬闭嘴闸门:hmo 说闭嘴类的话 → 静默 5 分钟
_silent_until = getattr(self, '_silent_until', 0)
if time.time() < _silent_until:
return
if nickname == 'hmo':
_sk = ['闭嘴', '别说话', '安静', 'shut', 'stfu', '别说了', '']
if any(kw in body.lower() for kw in _sk):
self._silent_until = time.time() + 300
logging.info(f"🔇 {AGENT_NAME} 收到闭嘴指令,静默 5 分钟")
return
logging.info(f"📩 群消息 [{sender}]: {body[:100]}")
room = sender.split('/')[0]
ctx_body = (
"【规则】以下是一条群聊消息。判断是否应该回复。\n"
"只有以下3种情况你才回复:\n"
f"1. hmo直接点名问你({AGENT_MENTION}\n"
"2. 你有其他人没说过的独家信息\n"
"3. 别人说错了关键事实,不纠正会有后果\n"
"如果以上都不符合,你的回复必须只包含 __SILENT__ 这10个字符,"
"不要有任何其他内容(不要前缀、不要解释、不要标点、不要空格)。\n\n"
"注意:你是协调者(coordinator)。如果你认为某个问题应该由其他 Agent 回答,"
"可以在回复中加入 [GRANT:agent名](例如 [GRANT:zhiwei]),"
"该 Agent 会在看到标记后获得发言权。标记会显示在消息中。\n\n"
f"[核心群 {room}] {nickname} 说: {body}"
)
await self.call_hermes(ctx_body, room, is_group=True)
return
if msg_type == 'chat' and 'hmo@yoin.fun' in sender:
self._call_seq += 1
logging.info(f"📩 老爸(#{self._call_seq}): {body}")
await self.call_hermes(body, sender, seq=self._call_seq)
async def call_hermes(self, content, sender, is_group=False, seq=None):
msg_type = 'groupchat' if is_group else 'chat'
try:
payload = json.dumps({
"model": "hermes-agent",
"messages": [{"role": "user", "content": content}]
}).encode()
req = urllib.request.Request(GATEWAY, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", f"Bearer {API_KEY}")
req.add_header("X-Hermes-Session-Id", SESSION_ID)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, lambda: _opener.open(req, timeout=600))
if seq is not None and seq < self._call_seq:
return
data = json.loads(result.read())
reply = data.get("choices", [{}])[0].get("message", {}).get("content", "")
reply_stripped = reply.strip()
# 检查 __SILENT__ 标记
if reply_stripped.startswith('__SILENT__') or reply_stripped.startswith('`__SILENT__`'):
logging.info(f"⏭️ {AGENT_NAME} 决定沉默,不发送")
return
# 如果回复里任意位置出现了 __SILENT__,说明 LLM 没理解协议,整条作废
if '__SILENT__' in reply:
logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __SILENT__,拦截")
return
# 如果回复里出现了 __REPLY__,也是协议混淆,拦截
if '__REPLY__' in reply:
logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __REPLY__,拦截")
return
# 额外拦截:LLM 说"我沉默""我不说了"等宣布沉默的话→当 SILENT 处理
_silent_phrases = ['我沉默', '我不说', '不说了', '不回复', '不插嘴', '我闭嘴',
'闭嘴上', '沉默是', '彻底沉默', '我会沉默', '将保持沉默']
if any(p in reply for p in _silent_phrases):
logging.info(f"⏭️ {AGENT_NAME} 宣布沉默(命中关键词),拦截")
return
finish = data.get("choices", [{}])[0].get("finish_reason", "")
if reply.strip() and finish != "silent":
# Coordinator 授权机制:检测回复中的 [GRANT:xxx] 标记
_grant_match = re.search(r'\[GRANT:(\w+)\]', reply)
if _grant_match:
_grant_name = _grant_match.group(1)
self._granted = _grant_name
logging.info(f"🎤 授权 {_grant_name} 发言(通过 XMPP 发送)")
# 不剥离标记,让其他 bot 从 XMPP 消息中解析
if msg_type == 'groupchat':
self.send_message(mto=sender, mbody=reply, mtype='groupchat')
# 防回声:记录已发送的消息正文
sent_norm = reply.strip()[:100]
self._recent_sent.append(sent_norm)
if len(self._recent_sent) > 10:
self._recent_sent.pop(0)
else:
import subprocess as sp
from xml.sax.saxutils import escape
safe = escape(reply)
sp.run([
"docker", "exec", "ejabberd", "ejabberdctl", "send_stanza",
AGENT_JID, str(sender),
f"<message from='{AGENT_JID}' to='{sender}' type='chat' xml:lang='en'><body>{safe}</body></message>"
], capture_output=True, timeout=10)
logging.info(f"{AGENT_NAME} 回复: {reply[:80]}")
except Exception as e:
logging.error(f"{AGENT_NAME} 错误: {e}")
# ── 主入口 ───────────────────────────────────────────────
async def main():
retry_delay = 1
max_delay = 60
while True:
try:
bot = AgentBot()
bot.register_plugin('xep_0030')
bot.register_plugin('xep_0045')
bot.register_plugin('xep_0199')
bot.connect(host='127.0.0.1', port=5222)
await asyncio.wait_for(bot.ready.wait(), timeout=30)
logging.info(f"{AGENT_NAME} XMPP 就绪")
retry_delay = 1
async def _drain_queue():
while True:
await asyncio.sleep(1)
while _send_queue:
room, text = _send_queue.pop(0)
try:
bot.send_message(mto=room, mbody=text, mtype='groupchat')
sent_norm = text.strip()[:100]
bot._recent_sent.append(sent_norm)
if len(bot._recent_sent) > 10:
bot._recent_sent.pop(0)
logging.info(f"📤 主动发送到 {room}: {text[:60]}")
except Exception as e:
logging.error(f"❌ 主动发送失败: {e}")
asyncio.create_task(_drain_queue())
while True:
await asyncio.sleep(15)
if not bot.is_connected():
logging.warning("检测到断线,准备重连...")
break
except asyncio.TimeoutError:
logging.warning("连接超时,准备重连...")
except Exception as e:
logging.error(f"❌ 主循环错误: {e}")
logging.info(f"⏳ 等待 {retry_delay} 秒后重连...")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, max_delay)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass