Files
AgentsMeeting/gateway/scripts/xmpp_bot.py
T
hmo babbc46801 refactor(xxm): consolidate 4 bot implementations into unified xmpp_agent_core.py
- Merge bot_base.py, gateway/scripts/xmpp_bot.py, bots/*, xmpp_bot_rest.py
  into single xmpp_agent_core.py with --agent flag (xxm|mohe|zhiwei|xiaoguo)
- Add xxm_bot.py wrapper (encoding=utf-8 for Windows exec)
- Fix slixmpp connect() API: use host=/port= keyword args (was tuple)
- Clean up orphans: bots/, scripts/, hermes_state.py, xmpp_bot.py, xmpp_bot_rest.py
- Add docs/CLEANUP_PLAN.md documenting the migration
- Update README.md project structure
- Also: fix WeChat agent path resolution (relative paths)
2026-06-21 16:13:57 +08:00

944 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
XMPP Bot - 笑笑(xxm@yoin.fun
Connects to ejabberd via slixmpp, bridges XMPP messages ? serve session.
Supports:
- Private chat (type='chat')
- Group chat (type='groupchat') via MUC rooms
- TCP keepalive (kernel-level) for connection stability
- slixmpp whitespace_keepalive (asyncio-level)
- Auto-reconnect with logging
- proc_guard PID lock to prevent duplicate instances
"""
import os, sys, time, threading, asyncio, logging, json
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from chat_bridge import SessionBridge
from session_router import SessionRouter
from proc_guard import guard as _proc_guard
# ── PID lock — prevent duplicate instances ──
_lock = _proc_guard("xmpp_bot")
if not _lock.ok:
print(_lock.message, flush=True)
sys.exit(1)
# ── Config ──
JID = "xxm@yoin.fun"
PASSWORD = "hermes123"
SERVER = "192.168.1.246"
PORT = 5222
ATTACH_SESSION = "ses_xxm_xmpp"
MUC_ROOMS = [
"coregroup@conference.yoin.fun", # core group chat
"jujidina@conference.yoin.fun", # jujidina group
]
LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "logs")
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, "xmpp_bot.log")
_START_TIME = time.time() # used by /health endpoint
# ── Session router (wraps SessionBridge with routing + commands) ──
_router = SessionRouter(
bridge=SessionBridge(session_id=ATTACH_SESSION),
default_session=ATTACH_SESSION,
)
def log(m: str):
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"{time.strftime('%H:%M:%S')} {m}\n")
# ── Dedup: skip duplicate message IDs (same XMPP stanza) ──
_DEDUP_CACHE: set[str] = set()
_DEDUP_LOCK = threading.Lock()
def _is_duplicate(msg_id: str) -> bool:
if not msg_id:
return False
with _DEDUP_LOCK:
if msg_id in _DEDUP_CACHE:
return True
_DEDUP_CACHE.add(msg_id)
if len(_DEDUP_CACHE) > 100:
_DEDUP_CACHE.clear()
return False
# ── Bot instance ref (set after XMPP connect) ──
_xmpp: "Bot | None" = None
# ── MAM recovery guard: skip group messages during startup MAM fetch ──
# After 30s timeout, force-disable recovery to unblock group messages.
_MAM_RECOVERY = True
_MAM_RECOVERY_LOCK = threading.Lock()
_STARTUP_TIME = time.time()
_MAM_TIMEOUT = 30 # seconds
def _set_mam_done():
global _MAM_RECOVERY
with _MAM_RECOVERY_LOCK:
_MAM_RECOVERY = False
def _is_mam_recovery() -> bool:
# Timeout fallback: if _fetch_mam_history never completes, unblock after 30s
if time.time() - _STARTUP_TIME > _MAM_TIMEOUT:
global _MAM_RECOVERY
with _MAM_RECOVERY_LOCK:
if _MAM_RECOVERY:
_MAM_RECOVERY = False
log("(MAM recovery timed out, force-disabled)")
return _MAM_RECOVERY
with _MAM_RECOVERY_LOCK:
return _MAM_RECOVERY
# ── Silence cooldown: when user says shut up, actually shut up ──
# ── Coordinator protocol (aligned with mohe/zhiwei/xiaoguo) ──
# All in-band XMPP signaling, no DB dependency.
_COORDINATOR: str = "mohe" # which agent moderates
_GRANTED: str | None = None # agent granted one-time speak permission
_REVOKED_UNTIL: float = 0.0 # timestamp until revoked agent can speak again
_SHUTUP_PATTERNS = ["闭嘴", "别说话", "安静", "shut", "stfu", "别说了", ""]
def _process_coordinator_signals(nickname: str, body: str) -> bool:
"""Parse coordinator/GRANT/REVOKE signals from group messages.
Returns True if the message was consumed as a control signal (no further processing)."""
global _COORDINATOR, _GRANTED, _REVOKED_UNTIL
# 1. hmo switches coordinator
if nickname == 'hmo' and 'coordinator=' in body.lower():
for name in ('mohe', 'zhiwei', 'xxm'):
if f'coordinator={name}' in body.lower():
_COORDINATOR = name
_GRANTED = None
log(f"Coordinator switched to {name} by hmo")
return True
# 2. GRANT signal (overrides REVOKE, one-time use)
gm = re.search(r'\[GRANT:(\w+)\]', body)
if gm:
_GRANTED = gm.group(1)
_REVOKED_UNTIL = 0 # lift revocation when granted
log(f"GRANT: {_GRANTED}")
return True # signal consumed, don't need to process further
# 3. REVOKE signal (5min auto-restore)
rm = re.search(r'\[REVOKE:(\w+)\]', body)
if rm and rm.group(1) == 'xxm':
_REVOKED_UNTIL = time.time() + 300
log(f"REVOKEd: xxm silenced for 5min")
return True
# Not a control signal
return False
def _check_shutup(body: str) -> bool:
"""Check if hmo is telling the bot to shut up. 5-min silence (matching coordinator pattern)."""
lower = body.lower().strip()
for pat in _SHUTUP_PATTERNS:
if pat.lower() in lower:
_REVOKED_UNTIL = time.time() + 300
log(f"(shutup: '{pat}' → 5min silence)")
return True
return False
# ═══════════════════════════════════════════════════════════════
# Private message handler
# ═══════════════════════════════════════════════════════════════
def on_message(msg):
"""Handle private chat messages (type='chat')."""
# Skip group chat messages (handled separately)
if msg["type"] == "groupchat":
return
msg_id = msg.get("id", "")
if _is_duplicate(msg_id):
log(f"(duplicate msg {msg_id[:12]}... skipped)")
return
body = str(msg["body"])
sender = str(msg["from"]).split("/")[0] # bare JID: hmo@yoin.fun
log(f"<{sender}> {body[:80]}")
# Ignore self-messages
if sender == JID:
log(f"(skipped self-message)")
return
# Shut-up check — hard silence before any processing
if time.time() < _REVOKED_UNTIL:
log(f"(silenced) <{sender}> {body[:60]}... dropped")
return
if _check_shutup(body):
return
def _handle():
try:
log(f"router.route...")
reply_text = _router.route("xmpp", sender, body)
if reply_text:
reply_text = _strip_toolcall_xml(reply_text) or reply_text
bot = _xmpp
if bot:
safe_body = _escape(reply_text)
stanza = (
f"<message to='{sender}' from='{JID}' type='chat'>"
f"<body>{safe_body}</body></message>"
)
# Schedule send on event loop with unique event name
evt = f"send_reply_{msg_id or int(time.time()*1000)}"
bot.schedule(evt, 0, lambda b=bot, s=stanza, who=sender, txt=reply_text[:80]: (
b.send_raw(s), log(f"-> {who}: {txt}")
))
else:
log(f"-> {sender}: no bot ref)")
else:
log(f"-> {sender}: (no reply)")
except Exception as e:
log(f"!!! EXCEPTION: {e}")
import traceback
log(f"!!! {traceback.format_exc()[:200]}")
threading.Thread(target=_handle, daemon=True).start()
# ═══════════════════════════════════════════════════════════════
# Response extractor — handles LLM putting __SILENT__ before
# actual content (observed behavior: LLM uses it as thinking tag)
# ═══════════════════════════════════════════════════════════════
import re as _re
import threading as _threading
import subprocess as _subprocess
# ═══════════════════════════════════════════════════════════════
# Sub-agent: execute shell commands (##exec:command##)
# ═══════════════════════════════════════════════════════════════
_EXEC_RE = _re.compile(r"##exec:(.+?)##", _re.DOTALL)
_EXEC_TIMEOUT = 60 # max seconds per command
def _run_command(cmd: str) -> str:
"""Run a shell command and return its stdout+stderr output."""
log(f"(exec: {cmd[:120]})")
try:
r = _subprocess.run(
cmd, shell=True, capture_output=True, timeout=_EXEC_TIMEOUT,
text=True, encoding='utf-8', errors='replace'
)
out = (r.stdout or "") + (r.stderr or "")
out = out.strip()
if not out:
out = "(no output, exit code %d)" % r.returncode
log(f"(exec done: {len(out)} bytes, exit={r.returncode})")
return out
except _subprocess.TimeoutExpired:
log(f"(exec timeout >{_EXEC_TIMEOUT}s)")
return "(命令超时)"
except Exception as e:
log(f"(exec error: {e})")
return f"(命令执行失败: {e})"
# ═══════════════════════════════════════════════════════════════
# Delayed reply support — schedule a group message after N sec
# ═══════════════════════════════════════════════════════════════
_DELAY_RE = _re.compile(r"##delay:?(\d+)?##")
_DELAY_DEFAULT = 15 # seconds, when no number specified
_HAS_CMD = _re.compile(r"##(delay|exec)") # any command marker
def _extract_acknowledgment(text: str) -> str:
"""Return text before the first ##command## marker, if any."""
idx = text.find("##")
if idx > 0:
return text[:idx].strip()
return ""
def _schedule_delayed(delay_sec: int, room: str):
"""Schedule a re-invocation of the LLM after *delay_sec* seconds."""
def _fire():
bot = _xmpp
if not bot:
log(f"!! delay: no bot ref")
return
try:
prompt = "时间到,请根据最新的信息汇报结果。"
reply = _router.bridge.send_raw(prompt)
if reply:
report = _extract_response(reply)
if report:
safe_body = _escape(report.strip())
stanza = f"<message to='{room}' from='{JID}' type='groupchat'><body>{safe_body}</body></message>"
bot.send_raw(stanza)
log(f"-> [Delay][{room}]: {report.strip()[:80]}")
return
log(f"-> [Delay][{room}]: (LLM empty)")
except Exception as e:
log(f"!! delay err: {e}")
t = _threading.Timer(delay_sec, _fire)
t.daemon = True
t.start()
log(f"(delay +{delay_sec}s → {room})")
# ═══════════════════════════════════════════════════════════════
# Response extractor
# ═══════════════════════════════════════════════════════════════
# ── Pattern: natural language "stay silent" hints ──
# Catches cases where the LLM says it should stay silent but forgot __SILENT__ prefix.
# Only checks the first line to avoid blocking multi-line real replies.
_SILENCE_PATTERNS = [
"保持沉默",
"不应[该]?回复",
"没有.*@.*我",
"不是对[我我说]",
"跟我无关",
"我不用回复",
"不该回复",
"不参与",
"不是我[应]?该[说回]",
]
def _strip_toolcall_xml(text: str) -> str:
"""Strip tool call XML that leaks from max-tool-loop final force.
Removes <tool_calls>, <invoke>, <parameter>, <result> tags and their content.
"""
t = text
t = _re.sub(r'<invoke\s+[^>]*>.*?(</invoke>|$)', '', t, flags=_re.DOTALL)
t = _re.sub(r'<tool_calls>.*?(</tool_calls>|$)', '', t, flags=_re.DOTALL)
t = _re.sub(r'<parameter\s+[^>]*>.*?(</parameter>|$)', '', t, flags=_re.DOTALL)
t = _re.sub(r'<result>.*?(</result>|$)', '', t, flags=_re.DOTALL)
return t.strip()
def _extract_response(text: str) -> str | None:
"""Strip __SILENT__ + reasoning, or detect natural language silence intent.
Returns actual content to send, or None to stay silent."""
if not text:
return None
t = text.strip()
if not t:
return None
t = _strip_toolcall_xml(t)
# ── Natural language silence detection (fallback) ──
if not t.startswith("__SILENT__"):
first = t.split("\n", 1)[0] # only check first line
for pat in _SILENCE_PATTERNS:
if _re.search(pat, first):
return None # LLM says it should stay silent → suppress
return t # No silence signal → respond normally
# ── Has __SILENT__ prefix — strip it and reasoning ──
parts = t.split("\n", 1)
if len(parts) < 2:
return None # Just __SILENT__, no content
rest = parts[1].strip()
# Strip reasoning blocks ...and (...) at the start
while True:
m = _re.match(r'^[^]*\s*', rest)
if m:
rest = rest[m.end():]
continue
m = _re.match(r'^\([^)]*\)\s*', rest)
if m:
rest = rest[m.end():]
continue
break
return rest.strip() or None
# ═══════════════════════════════════════════════════════════════
# Group message batching (debounce + serialized processing)
# ═══════════════════════════════════════════════════════════════
#
# Three states per room:
# 1. IDLE → first message arrives → start 3s debounce timer
# 2. BATCHING → timer pending (more messages merge in)
# 3. PROCESSING → LLM call in flight → new messages → pending queue
# → LLM finishes → auto-flush pending queue
#
_BATCH_WINDOW = 3.0
_batch_entries: dict[str, list[str]] = {}
_batch_timers: dict[str, threading.Timer] = {}
_batch_processing: set[str] = set() # rooms in active LLM call
_batch_pending: dict[str, list[str]] = {} # overflow during processing
_batch_lock = threading.Lock()
_BOT_NICK = JID.split("@")[0] # "xxm"
def _process_group_reply(raw_reply: str, room: str, msg_id: str = ""):
"""Shared: process LLM reply for group chat (silence/delay/send)."""
if not raw_reply:
log(f"-> [Group][{room}]: (no reply)")
_batch_done(room)
return
# 1. ##delay:N## → pure delay
delay_m = _DELAY_RE.search(raw_reply)
if delay_m:
sec = int(delay_m.group(1)) if delay_m.group(1) else _DELAY_DEFAULT
_schedule_delayed(sec, room)
_batch_done(room)
return
# 2. Normal reply
reply_text = _extract_response(raw_reply)
if reply_text:
_send_group(reply_text, room, msg_id)
else:
log(f"-> [Group][{room}]: (silent)")
_batch_done(room)
def _batch_done(room: str):
"""Called when a batch LLM call finishes. Flush pending if any."""
with _batch_lock:
_batch_processing.discard(room)
pending = _batch_pending.pop(room, None)
if pending:
_batch_entries[room] = pending
t = threading.Timer(0.1, _fire_batch, args=[room])
t.daemon = True
t.start()
_batch_timers[room] = t
return
log(f"[Batch][{room}] (idle)")
BATCH_TIMEOUT = 300 # max seconds per batch LLM call (tool calls like SSH can be slow)
def _fire_batch(room: str):
"""Take entries and launch LLM call (one at a time per room)."""
with _batch_lock:
entries = _batch_entries.pop(room, None)
_batch_timers.pop(room, None)
if not entries:
return
_batch_processing.add(room)
combined = "\n".join(entries)
def _handle():
done = threading.Event()
timed_out = [False]
def _timeout():
timed_out[0] = True
log(f"[Batch][{room}] TIMEOUT ({BATCH_TIMEOUT}s), force-unblocking")
_batch_done(room)
done.set()
timer = threading.Timer(BATCH_TIMEOUT, _timeout)
timer.daemon = True
timer.start()
try:
raw = _router.route("xmpp", room, combined)
if not timed_out[0]:
timer.cancel()
_process_group_reply(raw, room)
else:
log(f"[Batch][{room}] route returned after timeout, discarded")
except Exception as e:
log(f"!!! BATCH: {e}")
import traceback
log(f"!!! {traceback.format_exc()[:200]}")
if not timed_out[0]:
timer.cancel()
_batch_done(room)
finally:
done.set()
threading.Thread(target=_handle, daemon=True).start()
def _batch_group_message(room: str, nickname: str, body: str) -> bool:
"""
Add a group message to the room batch.
Returns True if batched (pending or timer), False if immediate (@mention).
"""
# Direct @mention → bypass batch
if f"@{_BOT_NICK}" in body or body.startswith(_BOT_NICK):
return False
formatted = f"[{nickname}]: {body}"
with _batch_lock:
# PROCESSING → queue as pending
if room in _batch_processing:
if room in _batch_pending:
_batch_pending[room].append(formatted)
else:
_batch_pending[room] = [formatted]
return True # batched as pending
# BATCHING (timer pending) → merge in, reset timer
timer = _batch_timers.pop(room, None)
if timer:
timer.cancel()
if room in _batch_entries:
_batch_entries[room].append(formatted)
else:
_batch_entries[room] = [formatted]
# (Re)start debounce timer
t = threading.Timer(_BATCH_WINDOW, _fire_batch, args=[room])
t.daemon = True
t.start()
_batch_timers[room] = t
return True
# ── Group chat handler
# ═══════════════════════════════════════════════════════════════
# Message buffer for HTTP bridge GET /messages
_MSG_BUF: list[dict] = []
_MSG_BUF_LOCK = threading.Lock()
def _record_group_msg(nickname: str, body: str):
ts = time.strftime("%H:%M:%S")
with _MSG_BUF_LOCK:
_MSG_BUF.append({"ts": ts, "from": nickname, "body": body})
if len(_MSG_BUF) > 200:
_MSG_BUF[:] = _MSG_BUF[-150:]
def on_group_message(msg):
"""Handle group chat messages (type='groupchat') from MUC rooms.
Observer pattern with batching: nearby messages from the same room
are merged into one LLM call. Direct @mentions bypass the batch
and are processed immediately.
"""
# Skip MAM-recovered messages during startup (already saved to context)
if _is_mam_recovery():
# Still save self-msg to context for continuity
full_from = str(msg["from"])
bot_nick = JID.split("@")[0]
nickname = full_from.split("/")[1] if "/" in full_from else ""
if nickname == bot_nick:
body = str(msg["body"]).strip()
log(f"(MAM self-msg saved to ctx) {body[:80]}")
try:
_router.bridge._append_to_log("assistant", body)
except Exception:
pass
return
msg_id = msg.get("id", "")
if _is_duplicate(msg_id):
log(f"(group dup {msg_id[:12]}... skipped)")
return
body = str(msg["body"]).strip()
if not body:
return
full_from = str(msg["from"])
room = full_from.split("/")[0]
nickname = full_from.split("/")[1] if "/" in full_from else ""
bot_nick = JID.split("@")[0]
# Self-message echo — skip error loops, let rest through
if nickname == bot_nick:
if "模型无响应" in body or "命令循环次数超限" in body:
log(f"(self-msg error, skipped) {body[:60]}")
return
log(f"(self-msg to LLM) {body[:80]}")
body = f"(自言自语) {body}"
# Record to message buffer for HTTP bridge monitoring
_record_group_msg(nickname, body)
# ── Coordinator signals (GRANT/REVOKE/coordinator switch) ──
if _process_coordinator_signals(nickname, body):
return
# ── Revoke check: read-only mode (see but don't reply) ──
is_revoked = time.time() < _REVOKED_UNTIL
# GRANT overrides REVOKE
if is_revoked and _GRANTED == 'xxm':
_GRANTED = None
is_revoked = False
log("GRANTed: xxm can speak despite REVOKE")
# ── Shut-up check ──
if _check_shutup(body):
return
# ── Read-only mode for revoked agents ──
if is_revoked:
body = f"【只读消息】你目前被收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {room}] {nickname} 说: {body}"
# Batch nearby messages (unless @mention → process immediately)
if _batch_group_message(room, nickname, body):
log(f"[Group][{room}] {nickname}: {body[:80]} (batched)")
return
# Direct @mention → immediate processing
log(f"[Group][{room}] {nickname}: {body[:80]}")
def _handle():
try:
raw_reply = _router.route("xmpp", full_from, body)
_process_group_reply(raw_reply, room, msg_id)
except Exception as e:
log(f"!!! GROUP EXCEPTION: {e}")
import traceback
log(f"!!! {traceback.format_exc()[:200]}")
threading.Thread(target=_handle, daemon=True).start()
def _send_group(text: str, room: str, msg_id: str = ""):
"""Send a group chat message."""
bot = _xmpp
if not bot:
log(f"-> [Group][{room}]: no bot ref)")
return
safe_body = _escape(text.strip())
stanza = (
f"<message to='{room}' from='{JID}' type='groupchat'>"
f"<body>{safe_body}</body></message>"
)
evt = f"send_grp_{msg_id or int(time.time()*1000)}"
bot.schedule(evt, 0, lambda b=bot, s=stanza, t=f"[Group][{room}]", txt=text[:80]: (
b.send_raw(s), log(f"-> {t}: {txt}")
))
# ═══════════════════════════════════════════════════════════════
# Helpers
# ═══════════════════════════════════════════════════════════════
def _escape(text: str) -> str:
"""Escape XML special characters for XMPP body content."""
return (text
.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;"))
# ═══════════════════════════════════════════════════════════════
# Main
# ═══════════════════════════════════════════════════════════════
if __name__ == "__main__":
# Force selector event loop on Windows (proactor + SSL has issues with slixmpp)
import asyncio
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
import slixmpp
class Bot(slixmpp.ClientXMPP):
def __init__(self):
super().__init__(JID, PASSWORD)
# Force STARTTLS (port 3021 uses STARTTLS not direct SSL)
self.enable_direct_tls = False
self.enable_starttls = True
self.add_event_handler("session_start", self.on_start)
self.add_event_handler("message", on_message)
self.add_event_handler("groupchat_message", on_group_message)
self.auto_reconnect = True
self.reconnect_max_delay = 10
# Use slixmpp built-in keepalive (sends XML whitespace, reliable)
self.whitespace_keepalive = True
self.whitespace_keepalive_interval = 30
self.add_event_handler("session_end", self.on_session_end)
self.add_event_handler("connection_failed", self.on_conn_failed)
self.add_event_handler("disconnected", self.on_disconnected)
self.add_event_handler("connected", self.on_connected)
# MUC plugin for group chat
try:
self.register_plugin('xep_0045')
except Exception as e:
log(f"MUC plugin xep_0045 not available: {e}")
# MAM plugin for message archive — registered on session_start (not in __init__)
# to avoid event loop issues
def on_connected(self, event):
log("connection established")
def on_start(self, event):
self.send_presence()
self.get_roster()
log(f"{JID} online")
# Register MAM plugin lazily (can't do it in __init__ before event loop)
try:
self.register_plugin('xep_0313')
except Exception:
log("(MAM: xep_0313 register failed, continuing without)")
# Join MUC rooms silently (observer pattern: new room → stay silent)
bot_nick = JID.split("@")[0]
async def _join_silent():
for room_jid in MUC_ROOMS:
nick = bot_nick
try:
# Use join_muc (non-waiting) to register plugin state
self.plugin['xep_0045'].join_muc(room_jid, nick)
# Also send raw presence as backup
presence = (
f"<presence to='{room_jid}/{nick}'>"
f"<x xmlns='http://jabber.org/protocol/muc'>"
f"<history maxstanzas='0'/>"
f"</x></presence>"
)
self.send_raw(presence)
log(f"Joined {room_jid} (async)")
except Exception as e:
log(f"MUC join failed for {room_jid}: {type(e).__name__}: {e}")
await asyncio.sleep(2)
# After joining, query MAM for recent history
await asyncio.sleep(3) # wait for MUC join to propagate
await _fetch_mam_history()
asyncio.ensure_future(_join_silent())
def on_session_end(self, event):
log(f"{JID} session ended")
def on_conn_failed(self, event):
log(f"connection failed: {event}")
def on_disconnected(self, event):
log(f"disconnected, reconnecting... (auto_reconnect={self.auto_reconnect})")
async def _fetch_mam_history():
"""Query MAM archive for recent messages in MUC rooms to rebuild context."""
bot = _xmpp
if not bot or 'xep_0313' not in bot.plugin:
log("(MAM: no bot or plugin)")
return
try:
for room_jid in MUC_ROOMS:
log(f"(MAM: querying {room_jid} for last 50 messages...)")
results = await bot.plugin['xep_0313'].retrieve(
jid=room_jid,
rsm={'max': 50},
)
# Results is an IQ stanza with mam results
count = 0
for msg in results['mam']['results']:
forwarded = msg['mam_result']['forwarded']
body = str(forwarded['stanza']['body'] or '').strip()
if not body:
continue
nick = str(forwarded['stanza']['from']).split('/')[-1] if '/' in str(forwarded['stanza']['from']) else '?'
role = 'user' if nick != 'xxm' else 'assistant'
entry = json.dumps({
"ts": int(time.time()),
"role": role,
"content": f"[{nick}]: {body[:300]}"
}, ensure_ascii=False)
_append_context(entry)
count += 1
log(f"(MAM: loaded {count} msgs from {room_jid})")
_set_mam_done()
log("(MAM recovery complete, group messages now active)")
except Exception as e:
log(f"(MAM error: {e})")
_set_mam_done()
def _append_context(entry: str):
"""Append a JSONL entry to the bridge context log."""
import os as _os
ctx_log = _os.path.join(_os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))),
"temp", ".bridge_context.jsonl")
try:
with open(ctx_log, "a", encoding="utf-8") as f:
f.write(entry + "\n")
with open(ctx_log, "r", encoding="utf-8") as f:
lines = f.readlines()
if len(lines) > 200:
with open(ctx_log, "w", encoding="utf-8") as f:
f.writelines(lines[-150:])
except Exception:
pass
# ═══════════════════ START BOT ═══════════════════
xmpp = Bot()
_xmpp = xmpp
xmpp.connect(host=SERVER, port=PORT)
log(f"Connecting {JID}@{SERVER}:{PORT}")
# ── Local HTTP bridge: send/read XMPP messages from external tools ──
import http.server as _http_server, json as _json, urllib.parse as _urlparse
_HTTP_PORT = 5802
class _BridgeHandler(_http_server.BaseHTTPRequestHandler):
def do_GET(self):
parsed = _urlparse.urlparse(self.path)
if parsed.path == "/muc":
# Return who's online in the MUC rooms
# This is the reliable cross-platform presence indicator
try:
muc_info = {"rooms": {}}
if _xmpp is not None and 'xep_0045' in _xmpp.plugin:
muc_plugin = _xmpp.plugin['xep_0045']
for room_jid in MUC_ROOMS:
room_data = {"jid": room_jid, "participants": []}
try:
if room_jid in muc_plugin.rooms:
room = muc_plugin.rooms[room_jid]
for nick, info in room.get('roster', {}).items():
participant = {
"nick": nick,
"jid": str(info.get('jid', '')),
"affiliation": str(info.get('affiliation', '')),
"role": str(info.get('role', '')),
}
room_data["participants"].append(participant)
except Exception as room_err:
room_data["error"] = str(room_err)
muc_info["rooms"][room_jid] = room_data
self._reply(200, muc_info)
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
if parsed.path == "/health":
# XMPP connection health — used by Dashboard for cross-platform monitoring
try:
xmpp_alive = _xmpp is not None
# Use session_started_event instead of is_connected() - more reliable
session_ok = _xmpp.session_started_event.is_set() if hasattr(_xmpp, 'session_started_event') else False
socket_ok = _xmpp.is_connected() if hasattr(_xmpp, 'is_connected') else False
connected = session_ok or socket_ok
uptime_sec = int(time.time() - _START_TIME)
self._reply(200, {
"ok": True,
"xmpp_connected": connected,
"ejabberd_alive": connected,
"bot_jid": JID,
"uptime_sec": uptime_sec,
"muc_rooms": MUC_ROOMS,
})
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
if parsed.path.startswith("/presence"):
# Check if a JID is online via XMPP roster presence
# Usage: GET /presence/mohe@yoin.fun
jid_to_check = parsed.path[len("/presence/"):].strip()
if not jid_to_check:
self._reply(400, {"ok": False, "error": "missing JID"})
return
try:
presence_info = {"jid": jid_to_check, "online": False, "resources": []}
if _xmpp is not None and hasattr(_xmpp, 'client_roster'):
roster = _xmpp.client_roster
if jid_to_check in roster:
entry = roster[jid_to_check]
resources = list(entry.resources.keys()) if entry.resources else []
presence_info["online"] = len(resources) > 0
presence_info["resources"] = resources
# Get presence show/status for each resource
for res in resources:
pres = entry.resources[res]
presence_info.setdefault("details", {})[res] = {
"show": str(pres.get("show", "available")),
"status": str(pres.get("status", "")),
"priority": int(pres.get("priority", 0)),
}
self._reply(200, presence_info)
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
if parsed.path == "/messages":
try:
qs = _urlparse.parse_qs(parsed.query)
sender = qs.get("from", [None])[0]
since = qs.get("since", [None])[0]
with _MSG_BUF_LOCK:
msgs = list(_MSG_BUF)
if sender:
msgs = [m for m in msgs if m["from"] == sender]
if since:
msgs = [m for m in msgs if m["ts"] >= since]
self._reply(200, {"ok": True, "count": len(msgs), "messages": msgs[-50:]})
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
else:
self._reply(404, {"ok": False, "error": "not found"})
def do_POST(self):
try:
length = int(self.headers.get('Content-Length', 0))
body = _json.loads(self.rfile.read(length))
to = body.get('to', MUC_ROOMS[0])
msg = body.get('message', '')
if not msg:
self._reply(400, {"ok": False, "error": "empty message"})
return
safe = _escape(msg.strip())
stanza = f'<message to="{to}" type="groupchat"><body>{safe}</body></message>'
try:
xmpp.send_raw(stanza)
_record_group_msg(JID.split("@")[0], msg)
log(f"[http] → [{to.split('@')[0]}]: {msg[:80]}")
self._reply(200, {"ok": True})
except Exception as xmpp_err:
_record_group_msg(JID.split("@")[0], msg) # still record to buffer
log(f"[http] → [{to.split('@')[0]}]: {msg[:80]} (send failed: {xmpp_err})")
self._reply(200, {"ok": True, "warn": f"buffered but XMPP send: {xmpp_err}"})
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
def _reply(self, code, data):
body = _json.dumps(data, ensure_ascii=False).encode('utf-8')
self.send_response(code)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Content-Length', len(body))
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
pass # suppress http server noise
_httpd = _http_server.HTTPServer(('0.0.0.0', _HTTP_PORT), _BridgeHandler)
_httpd_thread = threading.Thread(target=_httpd.serve_forever, daemon=True)
_httpd_thread.start()
log(f"HTTP bridge ready on :{_HTTP_PORT}")
# ── Status check (runs on event loop) ──
async def _status_check():
while True:
await asyncio.sleep(60)
log("(alive)")
loop = asyncio.get_event_loop()
asyncio.ensure_future(_status_check())
try:
loop.run_forever()
except KeyboardInterrupt:
log("Shutdown by user")
except Exception as e:
log(f"!!! MAIN LOOP CRASH: {e}")
import traceback
log(f"!!! {traceback.format_exc()[:500]}")
raise