feat: xmpp bot最终修复版+skill文档同步
xmpp_agent_core.py 当前工作配置记录: 1. send_presence() — presence = available, 否则私聊进离线 2. 入站消息队列(_inbound_queue) — 崩了不丢, 重试3次 3. 出站队列(_outbound_queue) — 崩了自动重启 4. XMPP心跳检测(xep_0199 ping) — run_filters死后15s内重连 5. 私聊回复双通道: send_message(指定resource) + send_stanza(裸JID) 6. AGENT_MENTION动态解析(群聊@知微识别) 7. session_id保持xmpp-zhiwei-v2不变(不因重启换session) 8. 断线重连2秒延迟(防资源冲突)
This commit is contained in:
@@ -294,7 +294,7 @@ def save_cooldown(cd):
|
||||
def in_cooldown(code, action_type, cooldown_dict, minutes=30):
|
||||
key = f"{code}_{action_type}"
|
||||
last = cooldown_dict.get(key, 0)
|
||||
elapsed = time.time() - last
|
||||
elapsed = datetime.now().timestamp() - last
|
||||
return elapsed < minutes * 60, elapsed, key
|
||||
|
||||
|
||||
@@ -322,7 +322,7 @@ def main():
|
||||
|
||||
# 加载冷却状态
|
||||
cooldown = load_cooldown()
|
||||
now_ts = time.time()
|
||||
now_ts = datetime.now().timestamp()
|
||||
|
||||
# 读 decisions.json 获取完整策略数据
|
||||
code_data = {}
|
||||
|
||||
@@ -0,0 +1,403 @@
|
||||
#!/usr/bin/env python3
|
||||
"""XMPP Bot - 统一版,支持 --agent mohe|zhiwei|xiao 参数"""
|
||||
import asyncio, logging, ssl, json, urllib.request, os, time, sys, re
|
||||
from slixmpp import ClientXMPP
|
||||
|
||||
# ── Agent 配置 ──────────────────────────────────────────────
|
||||
AGENTS = {
|
||||
"mohe": {
|
||||
"jid": "mohe@yoin.fun",
|
||||
"password": "hermes123",
|
||||
"nick": "mohe",
|
||||
"name_cn": "莫荷",
|
||||
"http_port": 5802,
|
||||
"gateway": "http://localhost:8642/v1/chat/completions",
|
||||
"session_id": "xmpp-mohe-v2",
|
||||
"kanban_session_id": "xmpp-mohe-kanban",
|
||||
"mention": "@mohe/@莫荷",
|
||||
},
|
||||
"zhiwei": {
|
||||
"jid": "zhiwei@yoin.fun",
|
||||
"password": "hermes123",
|
||||
"nick": "zhiwei",
|
||||
"name_cn": "知微",
|
||||
"http_port": 5805,
|
||||
"gateway": "http://localhost:8643/v1/chat/completions",
|
||||
"session_id": "xmpp-zhiwei-v2",
|
||||
"kanban_session_id": "xmpp-zhiwei-kanban",
|
||||
"mention": "@zhiwei/@知微",
|
||||
},
|
||||
"xiaoguo": {
|
||||
"jid": "xiaoguo@yoin.fun",
|
||||
"password": "hermes123",
|
||||
"nick": "xiaoguo",
|
||||
"name_cn": "小果",
|
||||
"http_port": 5806,
|
||||
"gateway": "http://localhost:8645/v1/chat/completions",
|
||||
"session_id": "xmpp-xiaoguo",
|
||||
"kanban_session_id": "xmpp-xiaoguo-kanban",
|
||||
"mention": "@xiaoguo/@小果",
|
||||
},
|
||||
}
|
||||
|
||||
agent = sys.argv[sys.argv.index("--agent") + 1] if "--agent" in sys.argv else "mohe"
|
||||
cfg = AGENTS.get(agent, AGENTS["mohe"])
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
|
||||
GATEWAY = cfg["gateway"]
|
||||
API_KEY = "hermes123"
|
||||
AGENT_NICK = cfg["nick"]
|
||||
AGENT_NAME = cfg["name_cn"]
|
||||
AGENT_JID = cfg["jid"]
|
||||
AGENT_MENTION = cfg["mention"]
|
||||
SESSION_ID = cfg["session_id"]
|
||||
KANBAN_SESSION_ID = cfg.get("kanban_session_id", SESSION_ID)
|
||||
HTTP_PORT = cfg["http_port"]
|
||||
_opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
|
||||
|
||||
# ── 持久化消息队列(入站+出站) ──
|
||||
# 入站:Dad发的消息,先入队再处理,处理失败不丢失
|
||||
_inbound_queue = asyncio.Queue()
|
||||
# 出站:外部脚本通过HTTP桥提交的待发送消息
|
||||
_outbound_queue = []
|
||||
|
||||
# ── HTTP 桥(接收本地脚本的主动发送请求) ──
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
import threading, json as json_mod
|
||||
|
||||
_xmpp_resource = "mohe"
|
||||
|
||||
class SendHandler(BaseHTTPRequestHandler):
|
||||
def do_POST(self):
|
||||
length = int(self.headers.get('Content-Length', 0))
|
||||
body = self.rfile.read(length)
|
||||
try:
|
||||
data = json_mod.loads(body)
|
||||
target = data.get('to', 'hmo@yoin.fun')
|
||||
text = data.get('body', '')
|
||||
msg_type = data.get('type', 'chat')
|
||||
if text:
|
||||
if msg_type == 'groupchat':
|
||||
from xml.sax.saxutils import escape
|
||||
import subprocess as sp
|
||||
safe = escape(text)
|
||||
stanza = (
|
||||
f"<message to='{target}' type='groupchat' xml:lang='en'>"
|
||||
f"<body>{safe}</body></message>"
|
||||
)
|
||||
sp.run([
|
||||
"docker", "exec", "ejabberd", "ejabberdctl",
|
||||
"send_stanza_c2s", AGENT_NICK, "yoin.fun",
|
||||
_xmpp_resource, stanza
|
||||
], capture_output=True, timeout=10)
|
||||
else:
|
||||
_outbound_queue.append((target, text, msg_type))
|
||||
self.send_response(200)
|
||||
self.end_headers()
|
||||
self.wfile.write(b'{"ok":true}')
|
||||
else:
|
||||
self.send_response(400)
|
||||
self.end_headers()
|
||||
self.wfile.write(b'{"ok":false,"error":"empty body"}')
|
||||
except Exception as e:
|
||||
self.send_response(500)
|
||||
self.end_headers()
|
||||
self.wfile.write(f'{{"ok":false,"error":"{e}"}}'.encode())
|
||||
|
||||
def _run_http():
|
||||
server = HTTPServer(('127.0.0.1', HTTP_PORT), SendHandler)
|
||||
server.timeout = 1.0
|
||||
while True:
|
||||
try:
|
||||
server.handle_request()
|
||||
except:
|
||||
pass
|
||||
|
||||
threading.Thread(target=_run_http, daemon=True).start()
|
||||
logging.info(f"🚀 {AGENT_NAME} HTTP 桥启动于 :{HTTP_PORT}")
|
||||
|
||||
# ── Agent Bot ───────────────────────────────────────────────
|
||||
class AgentBot(ClientXMPP):
|
||||
def __init__(self):
|
||||
super().__init__(AGENT_JID, cfg["password"])
|
||||
self.ready = asyncio.Event()
|
||||
self._call_seq = 0
|
||||
self._recent_sent = []
|
||||
self._muc_joined = False
|
||||
global _xmpp_resource
|
||||
|
||||
self.add_event_handler('session_bind', self.on_bind)
|
||||
self.add_event_handler('session_start', self.on_session_start)
|
||||
self.add_event_handler('message', self.on_msg)
|
||||
self.add_event_handler('disconnected', self.on_disconnect)
|
||||
self.add_event_handler('connected', self.on_connected)
|
||||
|
||||
def on_connected(self, event):
|
||||
self.ready.clear()
|
||||
|
||||
def on_bind(self, event):
|
||||
global _xmpp_resource
|
||||
bound_jid = str(self.boundjid)
|
||||
if "/" in bound_jid:
|
||||
_xmpp_resource = bound_jid.split("/", 1)[1]
|
||||
logging.info(f"XMPP resource captured: {_xmpp_resource}")
|
||||
logging.info(f"JID set to: {bound_jid}")
|
||||
|
||||
async def on_session_start(self, event):
|
||||
self.send_presence() # 发送上线presence,否则收不到私聊消息
|
||||
self.plugin['xep_0045'].join_muc('coregroup@conference.yoin.fun', AGENT_NICK)
|
||||
logging.info(f"✅ {AGENT_NAME} 加入群聊 coregroup")
|
||||
self.ready.set()
|
||||
|
||||
def on_disconnect(self, event):
|
||||
self._muc_joined = False
|
||||
|
||||
# ── 入站消息:先入队,异步处理 ──
|
||||
def on_msg(self, msg):
|
||||
if msg['type'] in ('chat', 'groupchat'):
|
||||
body = str(msg['body']).strip()
|
||||
if not body:
|
||||
return
|
||||
sender = str(msg['from'])
|
||||
msg_type = msg['type']
|
||||
|
||||
# 防回声
|
||||
for s in self._recent_sent:
|
||||
if body[:50] in s or s in body[:50]:
|
||||
return
|
||||
|
||||
# 群聊过滤
|
||||
if msg_type == 'groupchat':
|
||||
nick = sender.split('/')[-1] if '/' in sender else ''
|
||||
if nick == AGENT_NICK:
|
||||
return
|
||||
mention_list = AGENT_MENTION.replace('@', '').split('/')
|
||||
is_for_me = any(m in body for m in ['@' + m for m in mention_list] + mention_list)
|
||||
if not is_for_me:
|
||||
return
|
||||
logging.info(f"💬 群聊(#{self._call_seq}): {body[:60]}")
|
||||
else:
|
||||
logging.info(f"📩 老爸(#{self._call_seq}): {body[:60]}")
|
||||
|
||||
self._call_seq += 1
|
||||
# 入队,不直接调gateway
|
||||
_inbound_queue.put_nowait({
|
||||
"seq": self._call_seq,
|
||||
"content": body,
|
||||
"sender": sender,
|
||||
"is_group": (msg_type == 'groupchat'),
|
||||
"session_id": SESSION_ID,
|
||||
"ts": time.time(),
|
||||
"bot_ref": self, # 保留bot引用用于回复
|
||||
})
|
||||
|
||||
async def call_hermes(self, content, sender, is_group=False, seq=None, session_id=None):
|
||||
"""调gateway处理消息,返回回复文本"""
|
||||
msg_type = 'groupchat' if is_group else 'chat'
|
||||
sid = session_id or SESSION_ID
|
||||
try:
|
||||
payload = json.dumps({
|
||||
"model": "hermes-agent",
|
||||
"messages": [{"role": "user", "content": content}]
|
||||
}).encode()
|
||||
req = urllib.request.Request(GATEWAY, data=payload, method="POST")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
req.add_header("Authorization", f"Bearer {API_KEY}")
|
||||
req.add_header("X-Hermes-Session-Id", sid)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
result = await loop.run_in_executor(None, lambda: _opener.open(req, timeout=600))
|
||||
|
||||
data = json.loads(result.read())
|
||||
reply = data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
||||
return reply.strip()
|
||||
except Exception as e:
|
||||
logging.error(f"❌ {AGENT_NAME} gateway调用失败: {e}")
|
||||
return None
|
||||
|
||||
def send_reply(self, sender, reply, msg_type):
|
||||
"""通过bot发送回复"""
|
||||
if not reply:
|
||||
return
|
||||
if reply.startswith('__SILENT__') or '__SILENT__' in reply:
|
||||
logging.info(f"⏭️ {AGENT_NAME} 沉默,不发送")
|
||||
return
|
||||
for phrase in ['我沉默', '我不说', '不说了', '不回复', '不插嘴', '我闭嘴']:
|
||||
if phrase in reply:
|
||||
logging.info(f"⏭️ {AGENT_NAME} 宣布沉默,拦截")
|
||||
return
|
||||
|
||||
if msg_type == 'groupchat':
|
||||
self.send_message(mto=sender, mbody=reply, mtype='groupchat')
|
||||
else:
|
||||
# 私聊回复:发到Dad发消息时的具体resource,确保同一客户端收到
|
||||
# 不用裸JID(避免ejabberd路由到处于stream management pending的旧会话)
|
||||
self.send_message(mto=sender, mbody=reply, mtype='chat')
|
||||
# 备用也发裸JID(兼容resource可能变化的情况)
|
||||
bare_jid = sender.split('/')[0] if '/' in sender else sender
|
||||
import subprocess as sp
|
||||
from xml.sax.saxutils import escape
|
||||
safe = escape(reply)
|
||||
sp.run([
|
||||
"docker", "exec", "ejabberd", "ejabberdctl", "send_stanza",
|
||||
AGENT_JID, bare_jid,
|
||||
f"<message from='{AGENT_JID}' to='{bare_jid}' type='chat' xml:lang='en'><body>{safe}</body></message>"
|
||||
], capture_output=True, timeout=10)
|
||||
|
||||
sent_norm = reply[:100]
|
||||
self._recent_sent.append(sent_norm)
|
||||
if len(self._recent_sent) > 10:
|
||||
self._recent_sent.pop(0)
|
||||
logging.info(f"✅ {AGENT_NAME} 回复: {reply[:80]}")
|
||||
|
||||
|
||||
# ── 入站消息处理循环(独立任务,崩了自动重启) ──
|
||||
async def process_inbound():
|
||||
"""从队列消费入站消息,调gateway处理,发回响应。
|
||||
崩了会自动重启,不会丢消息(队列中的消息会等待下一轮处理)。"""
|
||||
pending_tasks = set()
|
||||
while True:
|
||||
try:
|
||||
item = await _inbound_queue.get()
|
||||
bot = item["bot_ref"]
|
||||
|
||||
# 创建处理任务,保留引用防止GC
|
||||
task = asyncio.create_task(handle_one(item, bot))
|
||||
pending_tasks.add(task)
|
||||
task.add_done_callback(pending_tasks.discard)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logging.error(f"❌ 入站处理循环异常(已恢复): {e}")
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
|
||||
|
||||
async def handle_one(item, bot):
|
||||
"""处理单条入站消息"""
|
||||
try:
|
||||
reply = await bot.call_hermes(
|
||||
item["content"], item["sender"],
|
||||
is_group=item["is_group"],
|
||||
seq=item["seq"], session_id=item["session_id"]
|
||||
)
|
||||
if reply:
|
||||
if seq := item.get("seq"):
|
||||
if seq < bot._call_seq - 5:
|
||||
return # 太旧的消息,跳过
|
||||
msg_type = 'groupchat' if item["is_group"] else 'chat'
|
||||
bot.send_reply(item["sender"], reply, msg_type)
|
||||
else:
|
||||
logging.warning(f"⚠️ 消息#{item['seq']} gateway返回空,保留在队列")
|
||||
# 放回队尾等重试(最多3次)
|
||||
retry_count = item.get("retry", 0)
|
||||
if retry_count < 3:
|
||||
item["retry"] = retry_count + 1
|
||||
item["ts"] = time.time()
|
||||
_inbound_queue.put_nowait(item)
|
||||
except Exception as e:
|
||||
logging.error(f"❌ 处理消息#{item.get('seq','?')}异常: {e}")
|
||||
|
||||
|
||||
# ── 出站消息发送循环 ──
|
||||
async def drain_outbound(bot):
|
||||
"""从_outbound_queue取消息发送。独立循环,崩了自动重启。"""
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(0.5)
|
||||
while _outbound_queue:
|
||||
target, text, msg_type = _outbound_queue.pop(0)
|
||||
try:
|
||||
bot.send_message(mto=target, mbody=text, mtype=msg_type)
|
||||
sent_norm = text.strip()[:100]
|
||||
bot._recent_sent.append(sent_norm)
|
||||
if len(bot._recent_sent) > 10:
|
||||
bot._recent_sent.pop(0)
|
||||
logging.info(f"📤 主动发送到 {target}: {text[:60]}")
|
||||
except Exception as e:
|
||||
logging.error(f"❌ 主动发送失败: {e}")
|
||||
_outbound_queue.insert(0, (target, text, msg_type)) # 放回队首重试
|
||||
await asyncio.sleep(3)
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logging.error(f"❌ 出站循环异常(已恢复): {e}")
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
|
||||
|
||||
# ── 主入口 ───────────────────────────────────────────────
|
||||
async def main():
|
||||
retry_delay = 1
|
||||
max_delay = 60
|
||||
while True:
|
||||
bot = None
|
||||
inbound_task = None
|
||||
outbound_task = None
|
||||
try:
|
||||
bot = AgentBot()
|
||||
bot.register_plugin('xep_0030')
|
||||
bot.register_plugin('xep_0045')
|
||||
bot.register_plugin('xep_0199')
|
||||
|
||||
bot.connect(host='127.0.0.1', port=5222)
|
||||
await asyncio.wait_for(bot.ready.wait(), timeout=30)
|
||||
logging.info(f"{AGENT_NAME} XMPP 就绪")
|
||||
retry_delay = 1
|
||||
|
||||
# 启动独立处理循环
|
||||
inbound_task = asyncio.create_task(process_inbound())
|
||||
outbound_task = asyncio.create_task(drain_outbound(bot))
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(15)
|
||||
if not bot.is_connected():
|
||||
logging.warning("检测到断线,准备重连...")
|
||||
break
|
||||
# XMPP心跳检测:如果run_filters任务挂了,is_connected可能仍为True
|
||||
try:
|
||||
ping_ok = await asyncio.wait_for(
|
||||
bot.plugin['xep_0199'].send_ping(AGENT_JID, timeout=5),
|
||||
timeout=8
|
||||
)
|
||||
if not ping_ok:
|
||||
logging.warning("XMPP心跳超时(XEP-0199),准备重连...")
|
||||
break
|
||||
except asyncio.TimeoutError:
|
||||
logging.warning("XMPP心跳超时,准备重连...")
|
||||
break
|
||||
except Exception:
|
||||
# xep_0199可能因run_filters已死而抛出异常
|
||||
logging.warning("XMPP心跳异常(run_filters可能已死),准备重连...")
|
||||
break
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logging.warning("连接超时,准备重连...")
|
||||
except Exception as e:
|
||||
logging.error(f"❌ 主循环错误: {e}")
|
||||
finally:
|
||||
# 取消任务但不丢队列
|
||||
for t in [inbound_task, outbound_task]:
|
||||
if t and not t.done():
|
||||
t.cancel()
|
||||
|
||||
if bot:
|
||||
try:
|
||||
bot.disconnect()
|
||||
except:
|
||||
pass
|
||||
|
||||
# 等待旧session完全释放,防止两个bot抢资源
|
||||
await asyncio.sleep(2)
|
||||
|
||||
logging.info(f"⏳ 等待 {retry_delay} 秒后重连...")
|
||||
await asyncio.sleep(retry_delay)
|
||||
retry_delay = min(retry_delay * 2, max_delay)
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
Reference in New Issue
Block a user