Files
AgentsMeeting/xmpp_agent_core.py
T

351 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""XMPP Bot - 统一版,支持 --agent mohe|zhiwei|xiao 参数"""
import asyncio, logging, ssl, json, urllib.request, os, time, sys, re, sqlite3
from slixmpp import ClientXMPP
# ── Agent 配置 ──────────────────────────────────────────────
AGENTS = {
"mohe": {
"jid": "mohe@yoin.fun",
"password": "hermes123",
"nick": "mohe",
"name_cn": "莫荷",
"http_port": 5804,
"gateway": "http://localhost:8642/v1/chat/completions",
"session_id": "xmpp-mohe-v2",
"mention": "@mohe/@莫荷",
},
"zhiwei": {
"jid": "zhiwei@yoin.fun",
"password": "hermes123",
"nick": "zhiwei",
"name_cn": "知微",
"http_port": 5805,
"gateway": "http://localhost:8643/v1/chat/completions",
"session_id": "xmpp-zhiwei",
"mention": "@zhiwei/@知微",
},
"xiaoguo": {
"jid": "xiaoguo@yoin.fun",
"password": "hermes123",
"nick": "xiaoguo",
"name_cn": "小果",
"http_port": 5806,
"gateway": "http://localhost:8645/v1/chat/completions",
"session_id": "xmpp-xiaoguo",
"mention": "@xiaoguo/@小果",
},
}
agent = sys.argv[sys.argv.index("--agent") + 1] if "--agent" in sys.argv else "mohe"
cfg = AGENTS.get(agent, AGENTS["mohe"])
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
GATEWAY = cfg["gateway"]
API_KEY = "hermes123"
AGENT_NICK = cfg["nick"]
AGENT_NAME = cfg["name_cn"]
AGENT_JID = cfg["jid"]
AGENT_MENTION = cfg["mention"]
SESSION_ID = cfg["session_id"]
HTTP_PORT = cfg["http_port"]
_opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
# ── HTTP 桥(接收本地脚本的主动发送请求) ──
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading, json as json_mod
_send_queue = []
class SendHandler(BaseHTTPRequestHandler):
def do_POST(self):
length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(length)
try:
data = json_mod.loads(body)
room = data.get('to', 'coregroup@conference.yoin.fun')
text = data.get('body', '')
if text:
_send_queue.append((room, text))
self.send_response(200)
self.end_headers()
self.wfile.write(b'{"ok":true}')
else:
self.send_response(400)
self.end_headers()
self.wfile.write(b'{"ok":false,"error":"empty body"}')
except Exception as e:
self.send_response(500)
self.end_headers()
self.wfile.write(f'{{"ok":false,"error":"{e}"}}'.encode())
def _run_http():
server = HTTPServer(('127.0.0.1', HTTP_PORT), SendHandler)
server.timeout = 1.0
while True:
server.handle_request()
threading.Thread(target=_run_http, daemon=True).start()
logging.info(f"🚀 {AGENT_NAME} HTTP 桥启动于 :{HTTP_PORT}")
# ── XMPP Bot 类 ────────────────────────────────────────────────
class AgentBot(ClientXMPP):
def __init__(self):
super().__init__(AGENT_JID, cfg["password"])
self.add_event_handler('session_bind', self.on_bind)
self.add_event_handler('message', self.on_msg)
self.add_event_handler('disconnected', self.on_disconnect)
self.add_event_handler('connected', self.on_connected)
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.ssl_context = ctx
self.ready = asyncio.Event()
self._call_seq = 0
self._muc_joined = False
self._recent_sent = []
async def on_connected(self, event):
logging.info(f"🔗 {AGENT_NAME} TCP连接已建立")
async def on_bind(self, event):
self.send_presence()
self.get_roster()
try:
self.plugin['xep_0045'].join_muc('coregroup@conference.yoin.fun', AGENT_NICK)
logging.info(f"{AGENT_NAME} 加入群聊 coregroup")
except Exception as e:
logging.error(f"{AGENT_NAME} 加入群聊失败: {e}")
self._muc_joined = True
self.ready.set()
logging.info(f"{AGENT_NAME} XMPP 上线")
async def on_disconnect(self, event):
self.ready.clear()
self._muc_joined = False
logging.warning(f"⚠️ {AGENT_NAME} XMPP 断线")
async def on_msg(self, msg):
body = msg['body']
sender = str(msg['from'])
msg_type = msg['type']
if not body:
return
if msg_type == 'groupchat':
if AGENT_JID in sender:
return
nickname = sender.split('/')[-1] if '/' in sender else ''
# 自己的消息跳过(通过昵称)
if nickname == AGENT_NICK:
return
# Coordinator 模式:读取当前协调者和被授权发言者
_coordinator = None
_granted = None
try:
_cdb = sqlite3.connect('/home/hmo/.hermes/state.db')
_crow = _cdb.execute("SELECT value FROM state_meta WHERE key='coregroup_coordinator'").fetchone()
_grow = _cdb.execute("SELECT value FROM state_meta WHERE key='coregroup_granted_speaker'").fetchone()
_cdb.close()
if _crow: _coordinator = _crow[0]
if _grow: _granted = _grow[0]
except Exception:
pass
# hmo 可以动态切换 coordinator
if nickname == 'hmo' and 'coordinator=' in body.lower():
for _name in ['mohe', 'zhiwei', 'xxm']:
if f'coordinator={_name}' in body.lower():
try:
_wdb = sqlite3.connect('/home/hmo/.hermes/state.db')
_wdb.execute("INSERT OR REPLACE INTO state_meta (key, value) VALUES ('coregroup_coordinator', ?)", (_name,))
_wdb.execute("DELETE FROM state_meta WHERE key='coregroup_granted_speaker'")
_wdb.commit()
_wdb.close()
_coordinator = _name
_granted = None
logging.info(f"👑 Coordinator 切换为 {_name}")
except Exception:
pass
break
# 判断当前 bot 能否处理这条消息
_can_speak = False
if _coordinator == AGENT_NICK:
_can_speak = True # 协调者始终能说
if _granted == AGENT_NICK:
_can_speak = True # 被授权者也能说
# 用完后自动清除授权(一次性)
try:
_wdb = sqlite3.connect('/home/hmo/.hermes/state.db')
_wdb.execute("DELETE FROM state_meta WHERE key='coregroup_granted_speaker'")
_wdb.commit()
_wdb.close()
except Exception:
pass
if not _can_speak:
return # 代码层拦截,不走 LLM
# 硬闭嘴闸门:hmo 说闭嘴类的话 → 静默 5 分钟
_silent_until = getattr(self, '_silent_until', 0)
if time.time() < _silent_until:
return
if nickname == 'hmo':
_sk = ['闭嘴', '别说话', '安静', 'shut', 'stfu', '别说了', '']
if any(kw in body.lower() for kw in _sk):
self._silent_until = time.time() + 300
logging.info(f"🔇 {AGENT_NAME} 收到闭嘴指令,静默 5 分钟")
return
logging.info(f"📩 群消息 [{sender}]: {body[:100]}")
room = sender.split('/')[0]
ctx_body = (
"【规则】以下是一条群聊消息。判断是否应该回复。\n"
"只有以下3种情况你才回复:\n"
f"1. hmo直接点名问你({AGENT_MENTION}\n"
"2. 你有其他人没说过的独家信息\n"
"3. 别人说错了关键事实,不纠正会有后果\n"
"如果以上都不符合,你的回复必须只包含 __SILENT__ 这10个字符,"
"不要有任何其他内容(不要前缀、不要解释、不要标点、不要空格)。\n\n"
"注意:你是当前群聊的协调者(coordinator)。如果你认为这个问题应该由其他 Agent 回答,"
"可以在回复中包含 [GRANT:agent名] 标记(例如 [GRANT:zhiwei]),bot 会自动授权该 Agent 发言一次。"
"授权后该 Agent 的回复会出现在群里,你不需要自己回答。\n\n"
f"[核心群 {room}] {nickname} 说: {body}"
)
await self.call_hermes(ctx_body, room, is_group=True)
return
if msg_type == 'chat' and 'hmo@yoin.fun' in sender:
self._call_seq += 1
logging.info(f"📩 老爸(#{self._call_seq}): {body}")
await self.call_hermes(body, sender, seq=self._call_seq)
async def call_hermes(self, content, sender, is_group=False, seq=None):
msg_type = 'groupchat' if is_group else 'chat'
try:
payload = json.dumps({
"model": "hermes-agent",
"messages": [{"role": "user", "content": content}]
}).encode()
req = urllib.request.Request(GATEWAY, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", f"Bearer {API_KEY}")
req.add_header("X-Hermes-Session-Id", SESSION_ID)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, lambda: _opener.open(req, timeout=600))
if seq is not None and seq < self._call_seq:
return
data = json.loads(result.read())
reply = data.get("choices", [{}])[0].get("message", {}).get("content", "")
reply_stripped = reply.strip()
# 检查 __SILENT__ 标记
if reply_stripped.startswith('__SILENT__') or reply_stripped.startswith('`__SILENT__`'):
logging.info(f"⏭️ {AGENT_NAME} 决定沉默,不发送")
return
# 如果回复里任意位置出现了 __SILENT__,说明 LLM 没理解协议,整条作废
if '__SILENT__' in reply:
logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __SILENT__,拦截")
return
# 如果回复里出现了 __REPLY__,也是协议混淆,拦截
if '__REPLY__' in reply:
logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __REPLY__,拦截")
return
# 额外拦截:LLM 说"我沉默""我不说了"等宣布沉默的话→当 SILENT 处理
_silent_phrases = ['我沉默', '我不说', '不说了', '不回复', '不插嘴', '我闭嘴',
'闭嘴上', '沉默是', '彻底沉默', '我会沉默', '将保持沉默']
if any(p in reply for p in _silent_phrases):
logging.info(f"⏭️ {AGENT_NAME} 宣布沉默(命中关键词),拦截")
return
finish = data.get("choices", [{}])[0].get("finish_reason", "")
if reply.strip() and finish != "silent":
# Coordinator 授权机制:检测回复中的 [GRANT:xxx] 标记
_grant_match = re.search(r'\[GRANT:(\w+)\]', reply)
if _grant_match:
_grant_name = _grant_match.group(1)
try:
_gdb = sqlite3.connect('/home/hmo/.hermes/state.db')
_gdb.execute("INSERT OR REPLACE INTO state_meta (key, value) VALUES ('coregroup_granted_speaker', ?)", (_grant_name,))
_gdb.commit()
_gdb.close()
logging.info(f"🎤 Coordinator 授权 {_grant_name} 发言")
except Exception:
pass
reply = reply.replace(_grant_match.group(0), '').strip()
if msg_type == 'groupchat':
self.send_message(mto=sender, mbody=reply, mtype='groupchat')
# 防回声:记录已发送的消息正文
sent_norm = reply.strip()[:100]
self._recent_sent.append(sent_norm)
if len(self._recent_sent) > 10:
self._recent_sent.pop(0)
else:
import subprocess as sp
from xml.sax.saxutils import escape
safe = escape(reply)
sp.run([
"docker", "exec", "ejabberd", "ejabberdctl", "send_stanza",
AGENT_JID, str(sender),
f"<message from='{AGENT_JID}' to='{sender}' type='chat' xml:lang='en'><body>{safe}</body></message>"
], capture_output=True, timeout=10)
logging.info(f"{AGENT_NAME} 回复: {reply[:80]}")
except Exception as e:
logging.error(f"{AGENT_NAME} 错误: {e}")
# ── 主入口 ───────────────────────────────────────────────
async def main():
retry_delay = 1
max_delay = 60
while True:
try:
bot = AgentBot()
bot.register_plugin('xep_0030')
bot.register_plugin('xep_0045')
bot.register_plugin('xep_0199')
bot.connect(host='127.0.0.1', port=5222)
await asyncio.wait_for(bot.ready.wait(), timeout=30)
logging.info(f"{AGENT_NAME} XMPP 就绪")
retry_delay = 1
async def _drain_queue():
while True:
await asyncio.sleep(1)
while _send_queue:
room, text = _send_queue.pop(0)
try:
bot.send_message(mto=room, mbody=text, mtype='groupchat')
sent_norm = text.strip()[:100]
bot._recent_sent.append(sent_norm)
if len(bot._recent_sent) > 10:
bot._recent_sent.pop(0)
logging.info(f"📤 主动发送到 {room}: {text[:60]}")
except Exception as e:
logging.error(f"❌ 主动发送失败: {e}")
asyncio.create_task(_drain_queue())
while True:
await asyncio.sleep(15)
if not bot.is_connected():
logging.warning("检测到断线,准备重连...")
break
except asyncio.TimeoutError:
logging.warning("连接超时,准备重连...")
except Exception as e:
logging.error(f"❌ 主循环错误: {e}")
logging.info(f"⏳ 等待 {retry_delay} 秒后重连...")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, max_delay)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass