Files
AgentsMeeting/xmpp_agent_core.py
T
hmo babbc46801 refactor(xxm): consolidate 4 bot implementations into unified xmpp_agent_core.py
- Merge bot_base.py, gateway/scripts/xmpp_bot.py, bots/*, xmpp_bot_rest.py
  into single xmpp_agent_core.py with --agent flag (xxm|mohe|zhiwei|xiaoguo)
- Add xxm_bot.py wrapper (encoding=utf-8 for Windows exec)
- Fix slixmpp connect() API: use host=/port= keyword args (was tuple)
- Clean up orphans: bots/, scripts/, hermes_state.py, xmpp_bot.py, xmpp_bot_rest.py
- Add docs/CLEANUP_PLAN.md documenting the migration
- Update README.md project structure
- Also: fix WeChat agent path resolution (relative paths)
2026-06-21 16:13:57 +08:00

895 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
XMPP Agent Core — 统一版
=========================
Single bot core for all agents. Supports --agent xxm|mohe|zhiwei|xiaoguo.
Usage:
python xmpp_agent_core.py --agent xxm # xxm, uses chat_bridge
python xmpp_agent_core.py --agent mohe # mohe, uses Hermes API
python xmpp_agent_core.py --agent zhiwei # zhiwei, uses Hermes API
python xmpp_agent_core.py --agent xiaoguo # xiaoguo, uses Hermes API
Shares: PID lock, reconnect, MUC join, dedup, batching,
coordinator protocol (GRANT/REVOKE), HTTP bridge.
Differs only in LLM calling method (chat_bridge vs Hermes API).
"""
import os, sys, time, threading, asyncio, logging, json, re, ssl
import urllib.request, http.server, urllib.parse
# ── Windows selector loop (slixmpp needs it on Windows) ──
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# ── PATH: allow imports from gateway/scripts/ (proc_guard, chat_bridge) ──
_GATEWAY_SCRIPTS = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"gateway", "scripts")
sys.path.insert(0, _GATEWAY_SCRIPTS)
# ═══════════════════════════════════════════════════════════════
# AGENTS Configuration
# ═══════════════════════════════════════════════════════════════
AGENTS = {
"mohe": {
"jid": "mohe@yoin.fun",
"password": "hermes123",
"nick": "mohe",
"name_cn": "莫荷",
"http_port": 5804,
"gateway": "http://localhost:8642/v1/chat/completions",
"session_id": "xmpp-mohe-v2",
"server": "127.0.0.1",
"port": 5222,
"muc_rooms": ["coregroup@conference.yoin.fun"],
"mention": "@mohe/@莫荷",
},
"zhiwei": {
"jid": "zhiwei@yoin.fun",
"password": "hermes123",
"nick": "zhiwei",
"name_cn": "知微",
"http_port": 5805,
"gateway": "http://localhost:8643/v1/chat/completions",
"session_id": "xmpp-zhiwei",
"server": "127.0.0.1",
"port": 5222,
"muc_rooms": ["coregroup@conference.yoin.fun"],
"mention": "@zhiwei/@知微",
},
"xiaoguo": {
"jid": "xiaoguo@yoin.fun",
"password": "hermes123",
"nick": "xiaoguo",
"name_cn": "小果",
"http_port": 5806,
"gateway": "http://localhost:8645/v1/chat/completions",
"session_id": "xmpp-xiaoguo",
"server": "127.0.0.1",
"port": 5222,
"muc_rooms": ["coregroup@conference.yoin.fun"],
"mention": "@xiaoguo/@小果",
},
"xxm": {
"jid": "xxm@yoin.fun",
"password": "hermes123",
"nick": "xxm",
"name_cn": "笑笑",
"http_port": 5802,
"bridge": "chat_bridge", # use local chat_bridge instead of Hermes API
"session_id": "ses_xxm_xmpp",
"server": "192.168.1.246", # LAN direct connect
"port": 5222,
"muc_rooms": [
"coregroup@conference.yoin.fun",
"jujidina@conference.yoin.fun",
],
"mention": "@xxm/@笑笑",
},
}
# ── Agent selection ──
_agent_name = "mohe"
if "--agent" in sys.argv:
idx = sys.argv.index("--agent")
if idx + 1 < len(sys.argv):
_agent_name = sys.argv[idx + 1]
cfg = AGENTS.get(_agent_name, AGENTS["mohe"])
# ═══════════════════════════════════════════════════════════════
# PID Lock — prevent duplicate instances
# ═══════════════════════════════════════════════════════════════
from proc_guard import guard as _proc_guard
_lock = _proc_guard(f"xmpp_bot_{_agent_name}")
if not _lock.ok:
print(_lock.message, flush=True)
sys.exit(1)
# ═══════════════════════════════════════════════════════════════
# Logging
# ═══════════════════════════════════════════════════════════════
_LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "gateway", "logs")
os.makedirs(_LOG_DIR, exist_ok=True)
_LOG_FILE = os.path.join(_LOG_DIR, f"xmpp_{_agent_name}.log")
_START_TIME = time.time()
def log(m: str):
with open(_LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"{time.strftime('%H:%M:%S')} {m}\n")
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
# ═══════════════════════════════════════════════════════════════
# LLM Bridge Init — abstracted per agent
# ═══════════════════════════════════════════════════════════════
_IS_CHAT_BRIDGE = cfg.get("bridge") == "chat_bridge"
_router = None # set only for chat_bridge (xxm)
if _IS_CHAT_BRIDGE:
from chat_bridge import SessionBridge
from session_router import SessionRouter
_bridge = SessionBridge(session_id=cfg["session_id"])
_router = SessionRouter(bridge=_bridge, default_session=cfg["session_id"])
log(f"LLM: chat_bridge (session={cfg['session_id']})")
else:
_opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
log(f"LLM: Hermes API ({cfg['gateway']})")
def _call_llm(content: str, sender: str, is_group: bool = False) -> str:
"""Abstract LLM call. Returns raw response text (or empty string)."""
if _IS_CHAT_BRIDGE:
return _router.route("xmpp", sender, content) or ""
else:
return _call_hermes_api(content)
def _call_hermes_api(content: str) -> str:
"""POST to Hermes API, return response text or empty string."""
try:
payload = json.dumps({
"model": "hermes-agent",
"messages": [{"role": "user", "content": content}]
}).encode()
req = urllib.request.Request(cfg["gateway"], data=payload, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", "Bearer hermes123")
req.add_header("X-Hermes-Session-Id", cfg["session_id"])
result = _opener.open(req, timeout=600)
data = json.loads(result.read())
reply = data.get("choices", [{}])[0].get("message", {}).get("content", "")
return reply.strip()
except Exception as e:
log(f"!!! Hermes API error: {e}")
return ""
# ═══════════════════════════════════════════════════════════════
# Message Dedup
# ═══════════════════════════════════════════════════════════════
_DEDUP_CACHE: set[str] = set()
_DEDUP_LOCK = threading.Lock()
def _is_duplicate(msg_id: str) -> bool:
if not msg_id:
return False
with _DEDUP_LOCK:
if msg_id in _DEDUP_CACHE:
return True
_DEDUP_CACHE.add(msg_id)
if len(_DEDUP_CACHE) > 100:
_DEDUP_CACHE.clear()
return False
# ═══════════════════════════════════════════════════════════════
# Coordinator Protocol (shared across all agents)
# ═══════════════════════════════════════════════════════════════
_COORDINATOR: str = "mohe"
_GRANTED: str | None = None
_REVOKED_UNTIL: float = 0.0
_SHUTUP_PATTERNS = ["闭嘴", "别说话", "安静", "shut", "stfu", "别说了", ""]
def _process_coordinator_signals(nickname: str, body: str) -> bool:
"""Parse coordinator/GRANT/REVOKE from incoming messages.
Returns True if message was a control signal (consumed, no further processing)."""
global _COORDINATOR, _GRANTED, _REVOKED_UNTIL
# 1. hmo switches coordinator
if nickname == 'hmo' and 'coordinator=' in body.lower():
for name in ('mohe', 'zhiwei', 'xxm'):
if f'coordinator={name}' in body.lower():
_COORDINATOR = name
_GRANTED = None
log(f"Coordinator switched to {name} by hmo")
return True
# 2. GRANT signal (overrides REVOKE)
gm = re.search(r'\[GRANT:(\w+)\]', body)
if gm:
_GRANTED = gm.group(1)
_REVOKED_UNTIL = 0
log(f"GRANT: {_GRANTED}")
return True
# 3. REVOKE signal (5min auto-restore)
rm = re.search(r'\[REVOKE:(\w+)\]', body)
if rm and rm.group(1) == cfg["nick"]:
_REVOKED_UNTIL = time.time() + 300
log(f"REVOKEd: {cfg['nick']} silenced for 5min")
return True
return False
def _check_shutup(body: str) -> bool:
"""hmo says shut up → 5min silence."""
lower = body.lower().strip()
for pat in _SHUTUP_PATTERNS:
if pat.lower() in lower:
_REVOKED_UNTIL = time.time() + 300
log(f"(shutup: '{pat}' → 5min silence)")
return True
return False
def _process_llm_grant(response: str):
"""Parse GRANT signal from LLM's own response."""
global _GRANTED
gm = re.search(r'\[GRANT:(\w+)\]', response)
if gm:
_GRANTED = gm.group(1)
_REVOKED_UNTIL = 0
log(f"LLM GRANT: {_GRANTED}")
# ═══════════════════════════════════════════════════════════════
# Response Extraction
# ═══════════════════════════════════════════════════════════════
_SILENCE_PATTERNS = [
"保持沉默", "不应[该]?回复", "没有.*@.*我", "不是对[我我说]",
"跟我无关", "我不用回复", "不该回复", "不参与",
"不是我[应]?该[说回]",
]
_EXEC_RE = re.compile(r"##exec:(.+?)##", re.DOTALL)
_DELAY_RE = re.compile(r"##delay:?(\d+)?##")
_DELAY_DEFAULT = 15
_EXEC_TIMEOUT = 60
def _strip_toolcall_xml(text: str) -> str:
t = text
t = re.sub(r'<invoke\s+[^>]*>.*?(</invoke>|$)', '', t, flags=re.DOTALL)
t = re.sub(r'<tool_calls>.*?(</tool_calls>|$)', '', t, flags=re.DOTALL)
t = re.sub(r'<parameter\s+[^>]*>.*?(</parameter>|$)', '', t, flags=re.DOTALL)
t = re.sub(r'<result>.*?(</result>|$)', '', t, flags=re.DOTALL)
return t.strip()
def _extract_response(text: str) -> str | None:
"""Strip __SILENT__, reasoning blocks, natural language silence.
Returns actual content to send, or None to stay silent."""
if not text:
return None
t = text.strip()
if not t:
return None
t = _strip_toolcall_xml(t)
# Natural language silence detection
if not t.startswith("__SILENT__"):
first = t.split("\n", 1)[0]
for pat in _SILENCE_PATTERNS:
if re.search(pat, first):
return None
return t
# Has __SILENT__ prefix
parts = t.split("\n", 1)
if len(parts) < 2:
return None
rest = parts[1].strip()
while True:
m = re.match(r'^[^]*\s*', rest)
if m:
rest = rest[m.end():]
continue
m = re.match(r'^\([^)]*\)\s*', rest)
if m:
rest = rest[m.end():]
continue
break
return rest.strip() or None
# ═══════════════════════════════════════════════════════════════
# Message Batching (3s debounce + serialized processing)
# ═══════════════════════════════════════════════════════════════
_BATCH_WINDOW = 3.0
_BATCH_TIMEOUT = 300
_batch_entries: dict[str, list[str]] = {}
_batch_timers: dict[str, threading.Timer] = {}
_batch_processing: set[str] = set()
_batch_pending: dict[str, list[str]] = {}
_batch_lock = threading.Lock()
_BOT_NICK = cfg["nick"]
def _run_command(cmd: str) -> str:
"""Execute ##exec:command## shell command."""
log(f"(exec: {cmd[:120]})")
try:
import subprocess
r = subprocess.run(cmd, shell=True, capture_output=True,
timeout=_EXEC_TIMEOUT, text=True, encoding='utf-8', errors='replace')
out = (r.stdout or "") + (r.stderr or "")
out = out.strip() or f"(no output, exit={r.returncode})"
log(f"(exec done: {len(out)} bytes, exit={r.returncode})")
return out
except subprocess.TimeoutExpired:
log(f"(exec timeout >{_EXEC_TIMEOUT}s)")
return "(命令超时)"
except Exception as e:
log(f"(exec error: {e})")
return f"(命令执行失败: {e})"
def _schedule_delayed(delay_sec: int, room: str):
"""Schedule ##delay:N## re-invocation."""
global _xmpp_ref
import subprocess as _sp
from xml.sax.saxutils import escape
def _fire():
bot = _xmpp_ref
if not bot:
return
try:
prompt = "时间到,请根据最新的信息汇报结果。"
raw = _call_llm(prompt, room, is_group=True)
reply = _extract_response(raw)
if reply:
safe = escape(reply.strip())
stanza = f"<message to='{room}' type='groupchat'><body>{safe}</body></message>"
bot.send_raw(stanza)
log(f"-> [Delay][{room}]: {reply.strip()[:80]}")
except Exception as e:
log(f"!! delay err: {e}")
t = threading.Timer(delay_sec, _fire)
t.daemon = True
t.start()
log(f"(delay +{delay_sec}s → {room})")
def _batch_done(room: str):
"""Called when batch LLM finishes. Flush pending if any."""
with _batch_lock:
_batch_processing.discard(room)
pending = _batch_pending.pop(room, None)
if pending:
_batch_entries[room] = pending
t = threading.Timer(0.1, _fire_batch, args=[room])
t.daemon = True
t.start()
_batch_timers[room] = t
return
log(f"[Batch][{room}] (idle)")
def _fire_batch(room: str):
"""Collect batched entries and call LLM."""
with _batch_lock:
entries = _batch_entries.pop(room, None)
_batch_timers.pop(room, None)
if not entries:
return
_batch_processing.add(room)
combined = "\n".join(entries)
def _handle():
timed_out = [False]
def _timeout():
timed_out[0] = True
log(f"[Batch][{room}] TIMEOUT ({_BATCH_TIMEOUT}s)")
_batch_done(room)
timer = threading.Timer(_BATCH_TIMEOUT, _timeout)
timer.daemon = True
timer.start()
try:
raw = _call_llm(combined, room, is_group=True)
if not timed_out[0]:
timer.cancel()
_process_llm_reply(raw, room)
else:
log(f"[Batch][{room}] route returned after timeout, discarded")
except Exception as e:
log(f"!!! BATCH: {e}")
if not timed_out[0]:
timer.cancel()
_batch_done(room)
threading.Thread(target=_handle, daemon=True).start()
def _batch_group_message(room: str, nickname: str, body: str) -> bool:
"""Add message to room batch. Returns True if batched, False if @mention (immediate)."""
if f"@{_BOT_NICK}" in body or body.startswith(_BOT_NICK):
return False
formatted = f"[{nickname}]: {body}"
with _batch_lock:
if room in _batch_processing:
_batch_pending.setdefault(room, []).append(formatted)
return True
timer = _batch_timers.pop(room, None)
if timer:
timer.cancel()
_batch_entries.setdefault(room, []).append(formatted)
t = threading.Timer(_BATCH_WINDOW, _fire_batch, args=[room])
t.daemon = True
t.start()
_batch_timers[room] = t
return True
# ═══════════════════════════════════════════════════════════════
# MAM Recovery — fetch recent history on startup
# ═══════════════════════════════════════════════════════════════
_MAM_RECOVERY = True
_MAM_RECOVERY_LOCK = threading.Lock()
_MAM_MARK_DONE = False
_MAM_TIMEOUT = 30
def _set_mam_done():
global _MAM_RECOVERY
with _MAM_RECOVERY_LOCK:
_MAM_RECOVERY = False
def _is_mam_recovery() -> bool:
if time.time() - _START_TIME > _MAM_TIMEOUT:
global _MAM_RECOVERY
with _MAM_RECOVERY_LOCK:
if _MAM_RECOVERY:
_MAM_RECOVERY = False
log("(MAM recovery timed out, force-disabled)")
return _MAM_RECOVERY
with _MAM_RECOVERY_LOCK:
return _MAM_RECOVERY
# ═══════════════════════════════════════════════════════════════
# XML Escape
# ═══════════════════════════════════════════════════════════════
def _escape(text: str) -> str:
return (text.replace("&", "&amp;").replace("<", "&lt;")
.replace(">", "&gt;").replace('"', "&quot;"))
# ═══════════════════════════════════════════════════════════════
# HTTP Bridge — health, presence, messages, POST send
# ═══════════════════════════════════════════════════════════════
_HTTP_PORT = cfg["http_port"]
_MSG_BUF: list[dict] = []
_MSG_BUF_LOCK = threading.Lock()
_xmpp_ref = None # set after bot creation
def _record_group_msg(nickname: str, body: str):
ts = time.strftime("%H:%M:%S")
with _MSG_BUF_LOCK:
_MSG_BUF.append({"ts": ts, "from": nickname, "body": body})
if len(_MSG_BUF) > 200:
_MSG_BUF[:] = _MSG_BUF[-150:]
class _BridgeHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
parsed = urllib.parse.urlparse(self.path)
if parsed.path == "/muc":
try:
muc_info = {"rooms": {}}
bot = _xmpp_ref
if bot is not None and 'xep_0045' in bot.plugin:
muc_plugin = bot.plugin['xep_0045']
for room_jid in cfg["muc_rooms"]:
room_data = {"jid": room_jid, "participants": []}
try:
if room_jid in muc_plugin.rooms:
room = muc_plugin.rooms[room_jid]
for nick, info in room.get('roster', {}).items():
room_data["participants"].append({
"nick": nick,
"jid": str(info.get('jid', '')),
"affiliation": str(info.get('affiliation', '')),
"role": str(info.get('role', '')),
})
except Exception as e:
room_data["error"] = str(e)
muc_info["rooms"][room_jid] = room_data
self._reply(200, muc_info)
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
if parsed.path == "/health":
try:
bot = _xmpp_ref
session_ok = bot.session_started_event.is_set() if (bot and hasattr(bot, 'session_started_event')) else False
socket_ok = bot.is_connected() if (bot and hasattr(bot, 'is_connected')) else False
self._reply(200, {
"ok": True, "xmpp_connected": session_ok or socket_ok,
"agent": _agent_name, "jid": cfg["jid"],
"uptime_sec": int(time.time() - _START_TIME),
"muc_rooms": cfg["muc_rooms"],
})
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
if parsed.path.startswith("/presence"):
jid_to_check = parsed.path[len("/presence/"):].strip()
if not jid_to_check:
self._reply(400, {"ok": False, "error": "missing JID"})
return
try:
info = {"jid": jid_to_check, "online": False, "resources": []}
bot = _xmpp_ref
if bot and hasattr(bot, 'client_roster'):
roster = bot.client_roster
if jid_to_check in roster:
resources = list(roster[jid_to_check].resources.keys())
info["online"] = len(resources) > 0
info["resources"] = resources
self._reply(200, info)
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
if parsed.path == "/messages":
try:
qs = urllib.parse.parse_qs(parsed.query)
sender = qs.get("from", [None])[0]
with _MSG_BUF_LOCK:
msgs = list(_MSG_BUF)
if sender:
msgs = [m for m in msgs if m["from"] == sender]
self._reply(200, {"ok": True, "count": len(msgs), "messages": msgs[-50:]})
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
self._reply(404, {"ok": False, "error": "not found"})
def do_POST(self):
try:
length = int(self.headers.get('Content-Length', 0))
body = json.loads(self.rfile.read(length))
to = body.get('to', cfg["muc_rooms"][0])
msg = body.get('message', '') or body.get('body', '')
if not msg:
self._reply(400, {"ok": False, "error": "empty message"})
return
safe = _escape(msg.strip())
bot = _xmpp_ref
if bot:
stanza = f'<message to="{to}" type="groupchat"><body>{safe}</body></message>'
bot.send_raw(stanza)
_record_group_msg(cfg["nick"], msg)
log(f"[http] → [{to.split('@')[0]}]: {msg[:80]}")
self._reply(200, {"ok": True})
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
def _reply(self, code, data):
body = json.dumps(data, ensure_ascii=False).encode('utf-8')
self.send_response(code)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Content-Length', len(body))
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
pass
def _start_http_bridge():
_httpd = http.server.HTTPServer(('0.0.0.0', _HTTP_PORT), _BridgeHandler)
_t = threading.Thread(target=_httpd.serve_forever, daemon=True)
_t.start()
log(f"HTTP bridge ready on :{_HTTP_PORT}")
# ═══════════════════════════════════════════════════════════════
# Reply Processing (shared for all LLM responses)
# ═══════════════════════════════════════════════════════════════
def _process_llm_reply(raw_reply: str, room: str):
"""Process LLM response: check silence/delay/exec/send."""
global _xmpp_ref
if not raw_reply:
_batch_done(room)
return
# Parse GRANT signal from LLM response
_process_llm_grant(raw_reply)
# ##delay:N## → schedule later
delay_m = _DELAY_RE.search(raw_reply)
if delay_m:
sec = int(delay_m.group(1)) if delay_m.group(1) else _DELAY_DEFAULT
_schedule_delayed(sec, room)
_batch_done(room)
return
# ##exec:command## → run command, use output as reply
exec_m = _EXEC_RE.search(raw_reply)
if exec_m:
output = _run_command(exec_m.group(1))
raw_reply = _EXEC_RE.sub(output, raw_reply, count=1)
# Extract actual response
reply_text = _extract_response(raw_reply)
if reply_text:
safe = _escape(reply_text.strip())
bot = _xmpp_ref
if bot:
stanza = f"<message to='{room}' type='groupchat'><body>{safe}</body></message>"
bot.send_raw(stanza)
log(f"-> [{room.split('@')[0]}]: {reply_text.strip()[:80]}")
else:
log(f"-> [{room.split('@')[0]}]: (silent)")
_batch_done(room)
# ═══════════════════════════════════════════════════════════════
# Group message handler
# ═══════════════════════════════════════════════════════════════
def _handle_group_message(msg):
"""Process a groupchat message (runs in thread)."""
global _COORDINATOR, _GRANTED, _REVOKED_UNTIL
if _is_mam_recovery():
return
msg_id = msg.get("id", "")
if _is_duplicate(msg_id):
return
body = str(msg["body"]).strip()
if not body:
return
full_from = str(msg["from"])
room = full_from.split("/")[0]
nickname = full_from.split("/")[1] if "/" in full_from else ""
# Self-message skip
if nickname == cfg["nick"]:
log(f"(self) {body[:80]}")
return
_record_group_msg(nickname, body)
# Coordinator signals
if _process_coordinator_signals(nickname, body):
return
# Revoke check
is_revoked = time.time() < _REVOKED_UNTIL
if is_revoked and _GRANTED == cfg["nick"]:
_GRANTED = None
is_revoked = False
log(f"GRANT overrides REVOKE for {cfg['nick']}")
if _check_shutup(body):
return
if is_revoked:
body = f"【只读消息】你被收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {room}] {nickname} 说: {body}"
# Batch or immediate (@mention)
if _batch_group_message(room, nickname, body):
log(f"[{room.split('@')[0]}] {nickname}: {body[:80]} (batched)")
return
log(f"[{room.split('@')[0]}] {nickname}: {body[:80]}")
raw = _call_llm(body, full_from, is_group=True)
_process_llm_reply(raw, room)
# ═══════════════════════════════════════════════════════════════
# Private message handler
# ═══════════════════════════════════════════════════════════════
def _handle_private_message(msg):
"""Process a private chat message."""
if msg["type"] == "groupchat":
return
msg_id = msg.get("id", "")
if _is_duplicate(msg_id):
return
body = str(msg["body"]).strip()
sender = str(msg["from"]).split("/")[0]
log(f"<{sender}> {body[:80]}")
if sender == cfg["jid"]:
log("(skipped self)")
return
if time.time() < _REVOKED_UNTIL:
log(f"(silenced) <{sender}> dropped")
return
if _check_shutup(body):
return
raw = _call_llm(body, sender, is_group=False)
if raw:
reply = _extract_response(raw)
if reply:
from xml.sax.saxutils import escape
safe = escape(reply.strip())
if _IS_CHAT_BRIDGE:
bot = _xmpp_ref
if bot:
stanza = f"<message to='{sender}' type='chat'><body>{safe}</body></message>"
bot.send_raw(stanza)
log(f"-> {sender}: {reply.strip()[:80]}")
else:
import subprocess as sp
sp.run(["docker", "exec", "ejabberd", "ejabberdctl", "send_stanza",
cfg["jid"], sender,
f"<message from='{cfg['jid']}' to='{sender}' type='chat' xml:lang='en'><body>{safe}</body></message>"
], capture_output=True, timeout=10)
log(f"-> {sender}: {reply.strip()[:80]}")
# ═══════════════════════════════════════════════════════════════
# AgentBot Class
# ═══════════════════════════════════════════════════════════════
import slixmpp
class AgentBot(slixmpp.ClientXMPP):
def __init__(self):
super().__init__(cfg["jid"], cfg["password"])
# Connection settings
self.enable_direct_tls = False
self.enable_starttls = True
self.auto_reconnect = True
self.reconnect_max_delay = 10
self.whitespace_keepalive = True
self.whitespace_keepalive_interval = 30
# SSL: accept self-signed certs
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.ssl_context = ctx
# Event handlers
self.add_event_handler("session_start", self._on_session_start)
self.add_event_handler("message", self._on_any_message)
self.add_event_handler("groupchat_message", self._on_group_msg)
self.add_event_handler("disconnected", self._on_disconnected)
self.add_event_handler("connected", self._on_connected)
self.add_event_handler("session_end", self._on_session_end)
self.add_event_handler("connection_failed", self._on_conn_failed)
# MUC plugin
self.register_plugin('xep_0045')
def _on_connected(self, event):
log("connection established")
def _on_session_start(self, event):
self.send_presence()
self.get_roster()
log(f"{cfg['jid']} online")
# Register MAM plugin lazily
try:
self.register_plugin('xep_0313')
except Exception:
log("(MAM: xep_0313 not available)")
# Join MUC rooms
async def _join_all():
for room_jid in cfg["muc_rooms"]:
try:
self.plugin['xep_0045'].join_muc(room_jid, cfg["nick"])
presence = (
f"<presence to='{room_jid}/{cfg['nick']}'>"
f"<x xmlns='http://jabber.org/protocol/muc'>"
f"<history maxstanzas='0'/>"
f"</x></presence>"
)
self.send_raw(presence)
log(f"Joined {room_jid}")
except Exception as e:
log(f"MUC join failed {room_jid}: {e}")
await asyncio.sleep(2)
await asyncio.sleep(3)
await self._fetch_mam_history()
asyncio.ensure_future(_join_all())
async def _fetch_mam_history(self):
"""Query MAM for recent MUC messages to rebuild context."""
if 'xep_0313' not in self.plugin:
log("(MAM: no plugin)")
_set_mam_done()
return
try:
for room_jid in cfg["muc_rooms"]:
log(f"(MAM: querying {room_jid} for last 50 messages...)")
results = await self.plugin['xep_0313'].retrieve(
jid=room_jid, rsm={'max': 50},
)
count = 0
for msg in results['mam']['results']:
forwarded = msg['mam_result']['forwarded']
body = str(forwarded['stanza']['body'] or '').strip()
if not body:
continue
nick = str(forwarded['stanza']['from']).split('/')[-1] if '/' in str(forwarded['stanza']['from']) else '?'
# Feed into context for chat_bridge (xxm)
if _IS_CHAT_BRIDGE:
role = 'user' if nick != cfg["nick"] else 'assistant'
try:
_bridge._append_to_log(role, f"[{nick}]: {body[:300]}")
except Exception:
pass
count += 1
log(f"(MAM: loaded {count} msgs from {room_jid})")
_set_mam_done()
log("(MAM recovery complete)")
except Exception as e:
log(f"(MAM error: {e})")
_set_mam_done()
def _on_group_msg(self, msg):
threading.Thread(target=_handle_group_message, args=[msg], daemon=True).start()
def _on_any_message(self, msg):
threading.Thread(target=_handle_private_message, args=[msg], daemon=True).start()
def _on_session_end(self, event):
log(f"session ended")
def _on_conn_failed(self, event):
log(f"connection failed: {event}")
def _on_disconnected(self, event):
log(f"disconnected, reconnecting...")
# ═══════════════════════════════════════════════════════════════
# Main
# ═══════════════════════════════════════════════════════════════
def main():
log(f"Starting {cfg['jid']} ({cfg['name_cn']}) — agent={_agent_name}")
if _IS_CHAT_BRIDGE:
log(f" LLM: chat_bridge (session={cfg['session_id']})")
else:
log(f" LLM: Hermes API ({cfg['gateway']})")
log(f" Server: {cfg['server']}:{cfg['port']}")
log(f" Rooms: {cfg['muc_rooms']}")
bot = AgentBot()
global _xmpp_ref
_xmpp_ref = bot
_start_http_bridge()
bot.connect(host=cfg["server"], port=cfg["port"])
log(f"Connecting {cfg['jid']}@{cfg['server']}:{cfg['port']}")
loop = asyncio.get_event_loop()
async def _status_check():
while True:
await asyncio.sleep(60)
log("(alive)")
asyncio.ensure_future(_status_check())
try:
loop.run_forever()
except KeyboardInterrupt:
log("Shutdown by user")
except Exception as e:
log(f"!!! MAIN LOOP CRASH: {e}")
import traceback
log(f"!!! {traceback.format_exc()[:500]}")
raise
if __name__ == "__main__":
main()