Files
AgentsMeeting/xmpp_agent_core.py
T

387 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""XMPP Bot - 统一版,支持 --agent mohe|zhiwei|xiao 参数"""
import asyncio, logging, ssl, json, urllib.request, os, time, sys, re
from slixmpp import ClientXMPP
# ── Agent 配置 ──────────────────────────────────────────────
AGENTS = {
"mohe": {
"jid": "mohe@yoin.fun",
"password": "hermes123",
"nick": "mohe",
"name_cn": "莫荷",
"http_port": 5804,
"gateway": "http://localhost:8642/v1/chat/completions",
"session_id": "xmpp-mohe-v2",
"mention": "@mohe/@莫荷",
},
"zhiwei": {
"jid": "zhiwei@yoin.fun",
"password": "hermes123",
"nick": "zhiwei",
"name_cn": "知微",
"http_port": 5805,
"gateway": "http://localhost:8643/v1/chat/completions",
"session_id": "xmpp-zhiwei",
"mention": "@zhiwei/@知微",
},
"xiaoguo": {
"jid": "xiaoguo@yoin.fun",
"password": "hermes123",
"nick": "xiaoguo",
"name_cn": "小果",
"http_port": 5806,
"gateway": "http://localhost:8645/v1/chat/completions",
"session_id": "xmpp-xiaoguo",
"mention": "@xiaoguo/@小果",
},
}
agent = sys.argv[sys.argv.index("--agent") + 1] if "--agent" in sys.argv else "mohe"
cfg = AGENTS.get(agent, AGENTS["mohe"])
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
GATEWAY = cfg["gateway"]
API_KEY = "hermes123"
AGENT_NICK = cfg["nick"]
AGENT_NAME = cfg["name_cn"]
AGENT_JID = cfg["jid"]
AGENT_MENTION = cfg["mention"]
SESSION_ID = cfg["session_id"]
HTTP_PORT = cfg["http_port"]
_opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
# ── HTTP 桥(接收本地脚本的主动发送请求) ──
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading, json as json_mod
_send_queue = []
class SendHandler(BaseHTTPRequestHandler):
def do_POST(self):
length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(length)
try:
data = json_mod.loads(body)
room = data.get('to', 'coregroup@conference.yoin.fun')
text = data.get('body', '')
if text:
_send_queue.append((room, text))
self.send_response(200)
self.end_headers()
self.wfile.write(b'{"ok":true}')
else:
self.send_response(400)
self.end_headers()
self.wfile.write(b'{"ok":false,"error":"empty body"}')
except Exception as e:
self.send_response(500)
self.end_headers()
self.wfile.write(f'{{"ok":false,"error":"{e}"}}'.encode())
def _run_http():
server = HTTPServer(('127.0.0.1', HTTP_PORT), SendHandler)
server.timeout = 1.0
while True:
server.handle_request()
threading.Thread(target=_run_http, daemon=True).start()
logging.info(f"🚀 {AGENT_NAME} HTTP 桥启动于 :{HTTP_PORT}")
# ── XMPP Bot 类 ────────────────────────────────────────────────
class AgentBot(ClientXMPP):
def __init__(self):
super().__init__(AGENT_JID, cfg["password"])
self.add_event_handler('session_bind', self.on_bind)
self.add_event_handler('message', self.on_msg)
self.add_event_handler('disconnected', self.on_disconnect)
self.add_event_handler('connected', self.on_connected)
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.ssl_context = ctx
self.ready = asyncio.Event()
self._call_seq = 0
self._muc_joined = False
self._recent_sent = []
self._coordinator = 'mohe' # 默认协调者
self._granted = None
async def on_connected(self, event):
logging.info(f"🔗 {AGENT_NAME} TCP连接已建立")
async def on_bind(self, event):
self.send_presence()
self.get_roster()
try:
self.plugin['xep_0045'].join_muc('coregroup@conference.yoin.fun', AGENT_NICK)
logging.info(f"{AGENT_NAME} 加入群聊 coregroup")
except Exception as e:
logging.error(f"{AGENT_NAME} 加入群聊失败: {e}")
self._muc_joined = True
self.ready.set()
logging.info(f"{AGENT_NAME} XMPP 上线")
async def on_disconnect(self, event):
self.ready.clear()
self._muc_joined = False
logging.warning(f"⚠️ {AGENT_NAME} XMPP 断线")
async def on_msg(self, msg):
body = msg['body']
sender = str(msg['from'])
msg_type = msg['type']
if not body:
return
if msg_type == 'groupchat':
if AGENT_JID in sender:
return
nickname = sender.split('/')[-1] if '/' in sender else ''
# 自己的消息跳过(通过昵称)
if nickname == AGENT_NICK:
return
# Coordinator 模式 — 全走 XMPP 消息
# 1. hmo 切换 coordinatorlead=xxx
if nickname == 'hmo':
if 'lead=' in body.lower():
for _name in ['mohe', 'zhiwei', 'xxm']:
if f'lead={_name}' in body.lower():
self._coordinator = _name
self._granted = None
logging.info(f"👑 Coordinator 切换为 {_name}")
break
# hmo 直接 @点名 → 临时授权(一次)
elif any(tag in body for tag in [f'@{AGENT_NICK}', f'@{AGENT_NAME}']):
self._granted = AGENT_NICK
logging.info(f"🎤 被 hmo 点名,获得发言权")
# 2. 检测授权信号(优先于收回,GRANT 可以覆盖 REVOKE
_grant_match = re.search(r'\[GRANT:(\w+)\]', body)
if _grant_match:
self._granted = _grant_match.group(1)
self._revoked_until = 0 # 被授权时解除收回
logging.info(f"🎤 收到授权:{self._granted}")
# 3. 检测收回信号
_revoke_match = re.search(r'\[REVOKE:(\w+)\]', body)
_revoked_until = getattr(self, '_revoked_until', 0)
if _revoke_match and _revoke_match.group(1) == AGENT_NICK:
self._revoked_until = time.time() + 300 # 5分钟自动解除
logging.info(f"🔇 {AGENT_NAME} 发言权被收回(5分钟后自动恢复)")
if time.time() < _revoked_until:
# 被收回者:只读模式,能看到但不能回复
_rr = sender.split('/')[0]
_readonly_body = f"【只读消息】你目前被暂时收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {_rr}] {nickname} 说: {body}"
await self.call_hermes(_readonly_body, sender, is_group=True)
return
# 3. 判断角色
_coordinator = getattr(self, '_coordinator', 'mohe')
_granted = getattr(self, '_granted', None)
_is_coordinator = (_coordinator == AGENT_NICK)
_is_granted = (_granted == AGENT_NICK)
if _is_granted:
self._granted = None # 用完即收回
room = sender.split('/')[0]
# 4. 判断权限:协调者/被授权者可发言,其他人只读
if not _is_coordinator and not _is_granted:
_rr = sender.split('/')[0]
_ro_body = f"【只读消息】你目前不是协调者,只需了解内容。\n\n[核心群 {_rr}] {nickname} 说: {body}"
try:
_ro_payload = json.dumps({
"model": "hermes-agent",
"messages": [{"role": "user", "content": _ro_body}]
}).encode()
_ro_req = urllib.request.Request(GATEWAY, data=_ro_payload, method="POST")
_ro_req.add_header("Content-Type", "application/json")
_ro_req.add_header("Authorization", f"Bearer {API_KEY}")
_ro_req.add_header("X-Hermes-Session-Id", SESSION_ID)
_ro_loop = asyncio.get_event_loop()
await _ro_loop.run_in_executor(None, lambda: _opener.open(_ro_req, timeout=30))
except Exception:
pass
return # 不发送任何回复
# 5. 被 REVOKE 的人:只读(虽然上面已经覆盖了,作为额外保障)
if time.time() < getattr(self, '_revoked_until', 0):
_rr2 = sender.split('/')[0]
_rv_body = f"【只读消息】你目前被收回发言权。只需了解内容。\n\n[核心群 {_rr2}] {nickname} 说: {body}"
try:
_rv_payload = json.dumps({
"model": "hermes-agent",
"messages": [{"role": "user", "content": _rv_body}]
}).encode()
_rv_req = urllib.request.Request(GATEWAY, data=_rv_payload, method="POST")
_rv_req.add_header("Content-Type", "application/json")
_rv_req.add_header("Authorization", f"Bearer {API_KEY}")
_rv_req.add_header("X-Hermes-Session-Id", SESSION_ID)
_rv_loop = asyncio.get_event_loop()
await _rv_loop.run_in_executor(None, lambda: _opener.open(_rv_req, timeout=30))
except Exception:
pass
return
# 硬闭嘴闸门:hmo 说闭嘴类的话 → 静默 5 分钟
_silent_until = getattr(self, '_silent_until', 0)
if time.time() < _silent_until:
return
if nickname == 'hmo':
_sk = ['闭嘴', '别说话', '安静', 'shut', 'stfu', '别说了', '']
if any(kw in body.lower() for kw in _sk):
self._silent_until = time.time() + 300
logging.info(f"🔇 {AGENT_NAME} 收到闭嘴指令,静默 5 分钟")
return
logging.info(f"📩 群消息 [{sender}]: {body[:100]}")
room = sender.split('/')[0]
ctx_body = (
"【规则】以下是一条群聊消息。判断是否应该回复。\n"
"只有以下3种情况你才回复:\n"
f"1. hmo直接点名问你({AGENT_MENTION}\n"
"2. 你有其他人没说过的独家信息\n"
"3. 别人说错了关键事实,不纠正会有后果\n"
"如果以上都不符合,你的回复必须只包含 __SILENT__ 这10个字符,"
"不要有任何其他内容(不要前缀、不要解释、不要标点、不要空格)。\n\n"
"注意:你是协调者(lead)。你的第一职责是管理讨论节奏,不是自己说话。\n"
"- 别人能回答的问题,不抢答。\n"
"- 如果其他 Agent 更合适,用 [GRANT:agent名] 授权他们发言(例如 [GRANT:zhiwei])。\n"
"- hmo 直接 @点名某人 也会自动授权。\n"
"- 如果有人跑题/刷屏,用 [REVOKE:agent名] 收回发言权(例如 [REVOKE:zhiwei])。\n"
"- [GRANT] 可以覆盖 [REVOKE]。标记会显示在消息中。\n\n"
f"[核心群 {room}] {nickname} 说: {body}"
)
await self.call_hermes(ctx_body, room, is_group=True)
return
if msg_type == 'chat' and 'hmo@yoin.fun' in sender:
self._call_seq += 1
logging.info(f"📩 老爸(#{self._call_seq}): {body}")
await self.call_hermes(body, sender, seq=self._call_seq)
async def call_hermes(self, content, sender, is_group=False, seq=None):
msg_type = 'groupchat' if is_group else 'chat'
try:
payload = json.dumps({
"model": "hermes-agent",
"messages": [{"role": "user", "content": content}]
}).encode()
req = urllib.request.Request(GATEWAY, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", f"Bearer {API_KEY}")
req.add_header("X-Hermes-Session-Id", SESSION_ID)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, lambda: _opener.open(req, timeout=600))
if seq is not None and seq < self._call_seq:
return
data = json.loads(result.read())
reply = data.get("choices", [{}])[0].get("message", {}).get("content", "")
reply_stripped = reply.strip()
# 检查 __SILENT__ 标记
if reply_stripped.startswith('__SILENT__') or reply_stripped.startswith('`__SILENT__`'):
logging.info(f"⏭️ {AGENT_NAME} 决定沉默,不发送")
return
# 如果回复里任意位置出现了 __SILENT__,说明 LLM 没理解协议,整条作废
if '__SILENT__' in reply:
logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __SILENT__,拦截")
return
# 如果回复里出现了 __REPLY__,也是协议混淆,拦截
if '__REPLY__' in reply:
logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __REPLY__,拦截")
return
# 额外拦截:LLM 说"我沉默""我不说了"等宣布沉默的话→当 SILENT 处理
_silent_phrases = ['我沉默', '我不说', '不说了', '不回复', '不插嘴', '我闭嘴',
'闭嘴上', '沉默是', '彻底沉默', '我会沉默', '将保持沉默']
if any(p in reply for p in _silent_phrases):
logging.info(f"⏭️ {AGENT_NAME} 宣布沉默(命中关键词),拦截")
return
finish = data.get("choices", [{}])[0].get("finish_reason", "")
if reply.strip() and finish != "silent":
# Coordinator 授权机制:检测回复中的 [GRANT:xxx] 标记
_grant_match = re.search(r'\[GRANT:(\w+)\]', reply)
if _grant_match:
_grant_name = _grant_match.group(1)
self._granted = _grant_name
logging.info(f"🎤 授权 {_grant_name} 发言(通过 XMPP 发送)")
# 不剥离标记,让其他 bot 从 XMPP 消息中解析
if msg_type == 'groupchat':
self.send_message(mto=sender, mbody=reply, mtype='groupchat')
# 防回声:记录已发送的消息正文
sent_norm = reply.strip()[:100]
self._recent_sent.append(sent_norm)
if len(self._recent_sent) > 10:
self._recent_sent.pop(0)
else:
import subprocess as sp
from xml.sax.saxutils import escape
safe = escape(reply)
sp.run([
"docker", "exec", "ejabberd", "ejabberdctl", "send_stanza",
AGENT_JID, str(sender),
f"<message from='{AGENT_JID}' to='{sender}' type='chat' xml:lang='en'><body>{safe}</body></message>"
], capture_output=True, timeout=10)
logging.info(f"{AGENT_NAME} 回复: {reply[:80]}")
except Exception as e:
logging.error(f"{AGENT_NAME} 错误: {e}")
# ── 主入口 ───────────────────────────────────────────────
async def main():
retry_delay = 1
max_delay = 60
while True:
try:
bot = AgentBot()
bot.register_plugin('xep_0030')
bot.register_plugin('xep_0045')
bot.register_plugin('xep_0199')
bot.connect(host='127.0.0.1', port=5222)
await asyncio.wait_for(bot.ready.wait(), timeout=30)
logging.info(f"{AGENT_NAME} XMPP 就绪")
retry_delay = 1
async def _drain_queue():
while True:
await asyncio.sleep(1)
while _send_queue:
room, text = _send_queue.pop(0)
try:
bot.send_message(mto=room, mbody=text, mtype='groupchat')
sent_norm = text.strip()[:100]
bot._recent_sent.append(sent_norm)
if len(bot._recent_sent) > 10:
bot._recent_sent.pop(0)
logging.info(f"📤 主动发送到 {room}: {text[:60]}")
except Exception as e:
logging.error(f"❌ 主动发送失败: {e}")
asyncio.create_task(_drain_queue())
while True:
await asyncio.sleep(15)
if not bot.is_connected():
logging.warning("检测到断线,准备重连...")
break
except asyncio.TimeoutError:
logging.warning("连接超时,准备重连...")
except Exception as e:
logging.error(f"❌ 主循环错误: {e}")
logging.info(f"⏳ 等待 {retry_delay} 秒后重连...")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, max_delay)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass