# -*- coding: utf-8 -*-
"""
XMPP Bot - 笑笑(xxm@yoin.fun)
Connects to ejabberd via slixmpp, bridges XMPP messages ? serve session.
Supports:
- Private chat (type='chat')
- Group chat (type='groupchat') via MUC rooms
- TCP keepalive (kernel-level) for connection stability
- slixmpp whitespace_keepalive (asyncio-level)
- Auto-reconnect with logging
- proc_guard PID lock to prevent duplicate instances
"""
import os, sys, time, threading, asyncio, logging, json
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from chat_bridge import SessionBridge
from session_router import SessionRouter
from proc_guard import guard as _proc_guard
# ── PID lock — prevent duplicate instances ──
_lock = _proc_guard("xmpp_bot")
if not _lock.ok:
print(_lock.message, flush=True)
sys.exit(1)
# ── Config ──
JID = "xxm@yoin.fun"
PASSWORD = "hermes123"
SERVER = "xmpp.yoin.fun"
PORT = 3021
ATTACH_SESSION = "ses_xxm_xmpp"
MUC_ROOMS = [
"coregroup@conference.yoin.fun", # core group chat
]
LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "logs")
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, "xmpp_bot.log")
_START_TIME = time.time() # used by /health endpoint
# ── Session router (wraps SessionBridge with routing + commands) ──
_router = SessionRouter(
bridge=SessionBridge(session_id=ATTACH_SESSION),
default_session=ATTACH_SESSION,
)
def log(m: str):
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"{time.strftime('%H:%M:%S')} {m}\n")
# ── Dedup: skip duplicate message IDs (same XMPP stanza) ──
_DEDUP_CACHE: set[str] = set()
_DEDUP_LOCK = threading.Lock()
def _is_duplicate(msg_id: str) -> bool:
if not msg_id:
return False
with _DEDUP_LOCK:
if msg_id in _DEDUP_CACHE:
return True
_DEDUP_CACHE.add(msg_id)
if len(_DEDUP_CACHE) > 100:
_DEDUP_CACHE.clear()
return False
# ── Bot instance ref (set after XMPP connect) ──
_xmpp: "Bot | None" = None
# ── MAM recovery guard: skip group messages during startup MAM fetch ──
# After 30s timeout, force-disable recovery to unblock group messages.
_MAM_RECOVERY = True
_MAM_RECOVERY_LOCK = threading.Lock()
_STARTUP_TIME = time.time()
_MAM_TIMEOUT = 30 # seconds
def _set_mam_done():
global _MAM_RECOVERY
with _MAM_RECOVERY_LOCK:
_MAM_RECOVERY = False
def _is_mam_recovery() -> bool:
# Timeout fallback: if _fetch_mam_history never completes, unblock after 30s
if time.time() - _STARTUP_TIME > _MAM_TIMEOUT:
global _MAM_RECOVERY
with _MAM_RECOVERY_LOCK:
if _MAM_RECOVERY:
_MAM_RECOVERY = False
log("(MAM recovery timed out, force-disabled)")
return _MAM_RECOVERY
with _MAM_RECOVERY_LOCK:
return _MAM_RECOVERY
# ── Silence cooldown: when user says shut up, actually shut up ──
_SILENCE_UNTIL: float = 0.0
_SILENCE_LOCK = threading.Lock()
_SHUTUP_PATTERNS = [
"闭嘴", "住口",
"shut up", "shutup",
]
def _is_silenced() -> bool:
"""Check if bot is in silence mode.
If so, the caller should NOT process or respond to any message.
"""
with _SILENCE_LOCK:
if time.time() < _SILENCE_UNTIL:
return True
return False
def _check_shutup(body: str) -> bool:
"""Check if the user is telling the bot to shut up.
Returns True and sets silence cooldown if so.
"""
lower = body.lower().strip()
# Require minimum match: at least one shut-up keyword appears
# and the message is primarily about silencing (not a longer discussion)
for pat in _SHUTUP_PATTERNS:
if pat.lower() in lower:
# Set 30s silence - long enough to break the loop
with _SILENCE_LOCK:
_SILENCE_UNTIL = time.time() + 30
log(f"(shutup detected: '{pat}' → 30s silence)")
return True
return False
# ═══════════════════════════════════════════════════════════════
# Private message handler
# ═══════════════════════════════════════════════════════════════
def on_message(msg):
"""Handle private chat messages (type='chat')."""
# Skip group chat messages (handled separately)
if msg["type"] == "groupchat":
return
msg_id = msg.get("id", "")
if _is_duplicate(msg_id):
log(f"(duplicate msg {msg_id[:12]}... skipped)")
return
body = str(msg["body"])
sender = str(msg["from"]).split("/")[0] # bare JID: hmo@yoin.fun
log(f"<{sender}> {body[:80]}")
# Ignore self-messages
if sender == JID:
log(f"(skipped self-message)")
return
# Shut-up check — hard silence before any processing
if _is_silenced():
log(f"(silenced) <{sender}> {body[:60]}... dropped")
return
if _check_shutup(body):
return
def _handle():
try:
log(f"router.route...")
reply_text = _router.route("xmpp", sender, body)
if reply_text:
reply_text = _strip_toolcall_xml(reply_text) or reply_text
bot = _xmpp
if bot:
safe_body = _escape(reply_text)
stanza = (
f""
f"{safe_body}"
)
# Schedule send on event loop with unique event name
evt = f"send_reply_{msg_id or int(time.time()*1000)}"
bot.schedule(evt, 0, lambda b=bot, s=stanza, who=sender, txt=reply_text[:80]: (
b.send_raw(s), log(f"-> {who}: {txt}")
))
else:
log(f"-> {sender}: no bot ref)")
else:
log(f"-> {sender}: (no reply)")
except Exception as e:
log(f"!!! EXCEPTION: {e}")
import traceback
log(f"!!! {traceback.format_exc()[:200]}")
threading.Thread(target=_handle, daemon=True).start()
# ═══════════════════════════════════════════════════════════════
# Response extractor — handles LLM putting __SILENT__ before
# actual content (observed behavior: LLM uses it as thinking tag)
# ═══════════════════════════════════════════════════════════════
import re as _re
import threading as _threading
import subprocess as _subprocess
# ═══════════════════════════════════════════════════════════════
# Sub-agent: execute shell commands (##exec:command##)
# ═══════════════════════════════════════════════════════════════
_EXEC_RE = _re.compile(r"##exec:(.+?)##", _re.DOTALL)
_EXEC_TIMEOUT = 60 # max seconds per command
def _run_command(cmd: str) -> str:
"""Run a shell command and return its stdout+stderr output."""
log(f"(exec: {cmd[:120]})")
try:
r = _subprocess.run(
cmd, shell=True, capture_output=True, timeout=_EXEC_TIMEOUT,
text=True, encoding='utf-8', errors='replace'
)
out = (r.stdout or "") + (r.stderr or "")
out = out.strip()
if not out:
out = "(no output, exit code %d)" % r.returncode
log(f"(exec done: {len(out)} bytes, exit={r.returncode})")
return out
except _subprocess.TimeoutExpired:
log(f"(exec timeout >{_EXEC_TIMEOUT}s)")
return "(命令超时)"
except Exception as e:
log(f"(exec error: {e})")
return f"(命令执行失败: {e})"
# ═══════════════════════════════════════════════════════════════
# Delayed reply support — schedule a group message after N sec
# ═══════════════════════════════════════════════════════════════
_DELAY_RE = _re.compile(r"##delay:?(\d+)?##")
_DELAY_DEFAULT = 15 # seconds, when no number specified
_HAS_CMD = _re.compile(r"##(delay|exec)") # any command marker
def _extract_acknowledgment(text: str) -> str:
"""Return text before the first ##command## marker, if any."""
idx = text.find("##")
if idx > 0:
return text[:idx].strip()
return ""
def _schedule_delayed(delay_sec: int, room: str):
"""Schedule a re-invocation of the LLM after *delay_sec* seconds."""
def _fire():
bot = _xmpp
if not bot:
log(f"!! delay: no bot ref")
return
try:
prompt = "时间到,请根据最新的信息汇报结果。"
reply = _router.bridge.send_raw(prompt)
if reply:
report = _extract_response(reply)
if report:
safe_body = _escape(report.strip())
stanza = f"{safe_body}"
bot.send_raw(stanza)
log(f"-> [Delay][{room}]: {report.strip()[:80]}")
return
log(f"-> [Delay][{room}]: (LLM empty)")
except Exception as e:
log(f"!! delay err: {e}")
t = _threading.Timer(delay_sec, _fire)
t.daemon = True
t.start()
log(f"(delay +{delay_sec}s → {room})")
# ═══════════════════════════════════════════════════════════════
# Response extractor
# ═══════════════════════════════════════════════════════════════
# ── Pattern: natural language "stay silent" hints ──
# Catches cases where the LLM says it should stay silent but forgot __SILENT__ prefix.
# Only checks the first line to avoid blocking multi-line real replies.
_SILENCE_PATTERNS = [
"保持沉默",
"不应[该]?回复",
"没有.*@.*我",
"不是对[我我说]",
"跟我无关",
"我不用回复",
"不该回复",
"不参与",
"不是我[应]?该[说回]",
]
def _strip_toolcall_xml(text: str) -> str:
"""Strip tool call XML that leaks from max-tool-loop final force.
Removes , , , tags and their content.
"""
t = text
t = _re.sub(r']*>.*?(|$)', '', t, flags=_re.DOTALL)
t = _re.sub(r'.*?(|$)', '', t, flags=_re.DOTALL)
t = _re.sub(r']*>.*?(|$)', '', t, flags=_re.DOTALL)
t = _re.sub(r'.*?(|$)', '', t, flags=_re.DOTALL)
return t.strip()
def _extract_response(text: str) -> str | None:
"""Strip __SILENT__ + reasoning, or detect natural language silence intent.
Returns actual content to send, or None to stay silent."""
if not text:
return None
t = text.strip()
if not t:
return None
t = _strip_toolcall_xml(t)
# ── Natural language silence detection (fallback) ──
if not t.startswith("__SILENT__"):
first = t.split("\n", 1)[0] # only check first line
for pat in _SILENCE_PATTERNS:
if _re.search(pat, first):
return None # LLM says it should stay silent → suppress
return t # No silence signal → respond normally
# ── Has __SILENT__ prefix — strip it and reasoning ──
parts = t.split("\n", 1)
if len(parts) < 2:
return None # Just __SILENT__, no content
rest = parts[1].strip()
# Strip reasoning blocks (...)and (...) at the start
while True:
m = _re.match(r'^([^)]*)\s*', rest)
if m:
rest = rest[m.end():]
continue
m = _re.match(r'^\([^)]*\)\s*', rest)
if m:
rest = rest[m.end():]
continue
break
return rest.strip() or None
# ═══════════════════════════════════════════════════════════════
# Group message batching (debounce + serialized processing)
# ═══════════════════════════════════════════════════════════════
#
# Three states per room:
# 1. IDLE → first message arrives → start 3s debounce timer
# 2. BATCHING → timer pending (more messages merge in)
# 3. PROCESSING → LLM call in flight → new messages → pending queue
# → LLM finishes → auto-flush pending queue
#
_BATCH_WINDOW = 3.0
_batch_entries: dict[str, list[str]] = {}
_batch_timers: dict[str, threading.Timer] = {}
_batch_processing: set[str] = set() # rooms in active LLM call
_batch_pending: dict[str, list[str]] = {} # overflow during processing
_batch_lock = threading.Lock()
_BOT_NICK = JID.split("@")[0] # "xxm"
def _process_group_reply(raw_reply: str, room: str, msg_id: str = ""):
"""Shared: process LLM reply for group chat (silence/delay/send)."""
if not raw_reply:
log(f"-> [Group][{room}]: (no reply)")
_batch_done(room)
return
# 1. ##delay:N## → pure delay
delay_m = _DELAY_RE.search(raw_reply)
if delay_m:
sec = int(delay_m.group(1)) if delay_m.group(1) else _DELAY_DEFAULT
_schedule_delayed(sec, room)
_batch_done(room)
return
# 2. Normal reply
reply_text = _extract_response(raw_reply)
if reply_text:
_send_group(reply_text, room, msg_id)
else:
log(f"-> [Group][{room}]: (silent)")
_batch_done(room)
def _batch_done(room: str):
"""Called when a batch LLM call finishes. Flush pending if any."""
with _batch_lock:
_batch_processing.discard(room)
pending = _batch_pending.pop(room, None)
if pending:
_batch_entries[room] = pending
t = threading.Timer(0.1, _fire_batch, args=[room])
t.daemon = True
t.start()
_batch_timers[room] = t
return
log(f"[Batch][{room}] (idle)")
BATCH_TIMEOUT = 300 # max seconds per batch LLM call (tool calls like SSH can be slow)
def _fire_batch(room: str):
"""Take entries and launch LLM call (one at a time per room)."""
with _batch_lock:
entries = _batch_entries.pop(room, None)
_batch_timers.pop(room, None)
if not entries:
return
_batch_processing.add(room)
combined = "\n".join(entries)
def _handle():
done = threading.Event()
timed_out = [False]
def _timeout():
timed_out[0] = True
log(f"[Batch][{room}] TIMEOUT ({BATCH_TIMEOUT}s), force-unblocking")
_batch_done(room)
done.set()
timer = threading.Timer(BATCH_TIMEOUT, _timeout)
timer.daemon = True
timer.start()
try:
raw = _router.route("xmpp", room, combined)
if not timed_out[0]:
timer.cancel()
_process_group_reply(raw, room)
else:
log(f"[Batch][{room}] route returned after timeout, discarded")
except Exception as e:
log(f"!!! BATCH: {e}")
import traceback
log(f"!!! {traceback.format_exc()[:200]}")
if not timed_out[0]:
timer.cancel()
_batch_done(room)
finally:
done.set()
threading.Thread(target=_handle, daemon=True).start()
def _batch_group_message(room: str, nickname: str, body: str) -> bool:
"""
Add a group message to the room batch.
Returns True if batched (pending or timer), False if immediate (@mention).
"""
# Direct @mention → bypass batch
if f"@{_BOT_NICK}" in body or body.startswith(_BOT_NICK):
return False
formatted = f"[{nickname}]: {body}"
with _batch_lock:
# PROCESSING → queue as pending
if room in _batch_processing:
if room in _batch_pending:
_batch_pending[room].append(formatted)
else:
_batch_pending[room] = [formatted]
return True # batched as pending
# BATCHING (timer pending) → merge in, reset timer
timer = _batch_timers.pop(room, None)
if timer:
timer.cancel()
if room in _batch_entries:
_batch_entries[room].append(formatted)
else:
_batch_entries[room] = [formatted]
# (Re)start debounce timer
t = threading.Timer(_BATCH_WINDOW, _fire_batch, args=[room])
t.daemon = True
t.start()
_batch_timers[room] = t
return True
# ── Group chat handler
# ═══════════════════════════════════════════════════════════════
# Message buffer for HTTP bridge GET /messages
_MSG_BUF: list[dict] = []
_MSG_BUF_LOCK = threading.Lock()
def _record_group_msg(nickname: str, body: str):
ts = time.strftime("%H:%M:%S")
with _MSG_BUF_LOCK:
_MSG_BUF.append({"ts": ts, "from": nickname, "body": body})
if len(_MSG_BUF) > 200:
_MSG_BUF[:] = _MSG_BUF[-150:]
def on_group_message(msg):
"""Handle group chat messages (type='groupchat') from MUC rooms.
Observer pattern with batching: nearby messages from the same room
are merged into one LLM call. Direct @mentions bypass the batch
and are processed immediately.
"""
# Skip MAM-recovered messages during startup (already saved to context)
if _is_mam_recovery():
# Still save self-msg to context for continuity
full_from = str(msg["from"])
bot_nick = JID.split("@")[0]
nickname = full_from.split("/")[1] if "/" in full_from else ""
if nickname == bot_nick:
body = str(msg["body"]).strip()
log(f"(MAM self-msg saved to ctx) {body[:80]}")
try:
_router.bridge._append_to_log("assistant", body)
except Exception:
pass
return
msg_id = msg.get("id", "")
if _is_duplicate(msg_id):
log(f"(group dup {msg_id[:12]}... skipped)")
return
body = str(msg["body"]).strip()
if not body:
return
full_from = str(msg["from"])
room = full_from.split("/")[0]
nickname = full_from.split("/")[1] if "/" in full_from else ""
bot_nick = JID.split("@")[0]
# Self-message echo from MUC — save to bridge context so LLM
# can see what it just said in the group (don't discard).
if nickname == bot_nick:
log(f"(self-msg saved to ctx) {body[:80]}")
try:
_router.bridge._append_to_log("assistant", body)
except Exception:
pass
return
# Record to message buffer for HTTP bridge monitoring
_record_group_msg(nickname, body)
# Shut-up check — applies to group messages from others
if _is_silenced():
log(f"(group silenced) {body[:60]}... dropped")
return
if _check_shutup(body):
log(f"(group shutup detected)")
return
# Batch nearby messages (unless @mention → process immediately)
if _batch_group_message(room, nickname, body):
log(f"[Group][{room}] {nickname}: {body[:80]} (batched)")
return
# Direct @mention → immediate processing
log(f"[Group][{room}] {nickname}: {body[:80]}")
def _handle():
try:
raw_reply = _router.route("xmpp", full_from, body)
_process_group_reply(raw_reply, room, msg_id)
except Exception as e:
log(f"!!! GROUP EXCEPTION: {e}")
import traceback
log(f"!!! {traceback.format_exc()[:200]}")
threading.Thread(target=_handle, daemon=True).start()
def _send_group(text: str, room: str, msg_id: str = ""):
"""Send a group chat message."""
bot = _xmpp
if not bot:
log(f"-> [Group][{room}]: no bot ref)")
return
safe_body = _escape(text.strip())
stanza = (
f""
f"{safe_body}"
)
evt = f"send_grp_{msg_id or int(time.time()*1000)}"
bot.schedule(evt, 0, lambda b=bot, s=stanza, t=f"[Group][{room}]", txt=text[:80]: (
b.send_raw(s), log(f"-> {t}: {txt}")
))
# ═══════════════════════════════════════════════════════════════
# Helpers
# ═══════════════════════════════════════════════════════════════
def _escape(text: str) -> str:
"""Escape XML special characters for XMPP body content."""
return (text
.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
.replace('"', """))
# ═══════════════════════════════════════════════════════════════
# Main
# ═══════════════════════════════════════════════════════════════
if __name__ == "__main__":
# Force selector event loop on Windows (proactor + SSL has issues with slixmpp)
import asyncio
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
import slixmpp
class Bot(slixmpp.ClientXMPP):
def __init__(self):
super().__init__(JID, PASSWORD)
# Force STARTTLS (port 3021 uses STARTTLS not direct SSL)
self.enable_direct_tls = False
self.enable_starttls = True
self.add_event_handler("session_start", self.on_start)
self.add_event_handler("message", on_message)
self.add_event_handler("groupchat_message", on_group_message)
self.auto_reconnect = True
self.reconnect_max_delay = 10
# Use slixmpp built-in keepalive (sends XML whitespace, reliable)
self.whitespace_keepalive = True
self.whitespace_keepalive_interval = 30
self.add_event_handler("session_end", self.on_session_end)
self.add_event_handler("connection_failed", self.on_conn_failed)
self.add_event_handler("disconnected", self.on_disconnected)
self.add_event_handler("connected", self.on_connected)
# MUC plugin for group chat
try:
self.register_plugin('xep_0045')
except Exception as e:
log(f"MUC plugin xep_0045 not available: {e}")
# MAM plugin for message archive — registered on session_start (not in __init__)
# to avoid event loop issues
def on_connected(self, event):
log("connection established")
def on_start(self, event):
self.send_presence()
self.get_roster()
log(f"{JID} online")
# Register MAM plugin lazily (can't do it in __init__ before event loop)
try:
self.register_plugin('xep_0313')
except Exception:
log("(MAM: xep_0313 register failed, continuing without)")
# Join MUC rooms silently (observer pattern: new room → stay silent)
bot_nick = JID.split("@")[0]
async def _join_silent():
for room_jid in MUC_ROOMS:
for attempt in range(3):
try:
# Use join_muc_wait to ensure room join completes
await self.plugin['xep_0045'].join_muc_wait(room_jid, bot_nick, timeout=60)
log(f"Joined {room_jid} (silent)")
break
except asyncio.TimeoutError:
log(f"MUC join timeout ({attempt+1}/3) for {room_jid}")
if attempt == 2:
log(f"MUC setup failed for {room_jid} after 3 attempts")
await asyncio.sleep(5)
else:
await asyncio.sleep(3)
except Exception as e:
log(f"MUC setup failed for {room_jid}: {e} (type={type(e).__name__})")
await asyncio.sleep(5)
break
# After joining, query MAM for recent history
await asyncio.sleep(3) # wait for MUC join to propagate
await _fetch_mam_history()
asyncio.ensure_future(_join_silent())
def on_session_end(self, event):
log(f"{JID} session ended")
def on_conn_failed(self, event):
log(f"connection failed: {event}")
def on_disconnected(self, event):
log(f"disconnected, reconnecting... (auto_reconnect={self.auto_reconnect})")
async def _fetch_mam_history():
"""Query MAM archive for recent messages in MUC rooms to rebuild context."""
bot = _xmpp
if not bot or 'xep_0313' not in bot.plugin:
log("(MAM: no bot or plugin)")
return
try:
for room_jid in MUC_ROOMS:
log(f"(MAM: querying {room_jid} for last 50 messages...)")
results = await bot.plugin['xep_0313'].retrieve(
jid=room_jid,
rsm={'max': 50},
)
# Results is an IQ stanza with mam results
count = 0
for msg in results['mam']['results']:
forwarded = msg['mam_result']['forwarded']
body = str(forwarded['stanza']['body'] or '').strip()
if not body:
continue
nick = str(forwarded['stanza']['from']).split('/')[-1] if '/' in str(forwarded['stanza']['from']) else '?'
role = 'user' if nick != 'xxm' else 'assistant'
entry = json.dumps({
"ts": int(time.time()),
"role": role,
"content": f"[{nick}]: {body[:300]}"
}, ensure_ascii=False)
_append_context(entry)
count += 1
log(f"(MAM: loaded {count} msgs from {room_jid})")
_set_mam_done()
log("(MAM recovery complete, group messages now active)")
except Exception as e:
log(f"(MAM error: {e})")
_set_mam_done()
def _append_context(entry: str):
"""Append a JSONL entry to the bridge context log."""
import os as _os
ctx_log = _os.path.join(_os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))),
"temp", ".bridge_context.jsonl")
try:
with open(ctx_log, "a", encoding="utf-8") as f:
f.write(entry + "\n")
with open(ctx_log, "r", encoding="utf-8") as f:
lines = f.readlines()
if len(lines) > 200:
with open(ctx_log, "w", encoding="utf-8") as f:
f.writelines(lines[-150:])
except Exception:
pass
# ═══════════════════ START BOT ═══════════════════
xmpp = Bot()
_xmpp = xmpp
xmpp.connect(host=SERVER, port=PORT)
log(f"Connecting {JID}@{SERVER}:{PORT}")
# ── Local HTTP bridge: send/read XMPP messages from external tools ──
import http.server as _http_server, json as _json, urllib.parse as _urlparse
_HTTP_PORT = 5802
class _BridgeHandler(_http_server.BaseHTTPRequestHandler):
def do_GET(self):
parsed = _urlparse.urlparse(self.path)
if parsed.path == "/muc":
# Return who's online in the MUC rooms
# This is the reliable cross-platform presence indicator
try:
muc_info = {"rooms": {}}
if _xmpp is not None and 'xep_0045' in _xmpp.plugin:
muc_plugin = _xmpp.plugin['xep_0045']
for room_jid in MUC_ROOMS:
room_data = {"jid": room_jid, "participants": []}
try:
if room_jid in muc_plugin.rooms:
room = muc_plugin.rooms[room_jid]
for nick, info in room.get('roster', {}).items():
participant = {
"nick": nick,
"jid": str(info.get('jid', '')),
"affiliation": str(info.get('affiliation', '')),
"role": str(info.get('role', '')),
}
room_data["participants"].append(participant)
except Exception as room_err:
room_data["error"] = str(room_err)
muc_info["rooms"][room_jid] = room_data
self._reply(200, muc_info)
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
if parsed.path == "/health":
# XMPP connection health — used by Dashboard for cross-platform monitoring
try:
xmpp_alive = _xmpp is not None
# Use session_started_event instead of is_connected() - more reliable
session_ok = _xmpp.session_started_event.is_set() if hasattr(_xmpp, 'session_started_event') else False
socket_ok = _xmpp.is_connected() if hasattr(_xmpp, 'is_connected') else False
connected = session_ok or socket_ok
uptime_sec = int(time.time() - _START_TIME)
self._reply(200, {
"ok": True,
"xmpp_connected": connected,
"ejabberd_alive": connected,
"bot_jid": JID,
"uptime_sec": uptime_sec,
"muc_rooms": MUC_ROOMS,
})
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
if parsed.path.startswith("/presence"):
# Check if a JID is online via XMPP roster presence
# Usage: GET /presence/mohe@yoin.fun
jid_to_check = parsed.path[len("/presence/"):].strip()
if not jid_to_check:
self._reply(400, {"ok": False, "error": "missing JID"})
return
try:
presence_info = {"jid": jid_to_check, "online": False, "resources": []}
if _xmpp is not None and hasattr(_xmpp, 'client_roster'):
roster = _xmpp.client_roster
if jid_to_check in roster:
entry = roster[jid_to_check]
resources = list(entry.resources.keys()) if entry.resources else []
presence_info["online"] = len(resources) > 0
presence_info["resources"] = resources
# Get presence show/status for each resource
for res in resources:
pres = entry.resources[res]
presence_info.setdefault("details", {})[res] = {
"show": str(pres.get("show", "available")),
"status": str(pres.get("status", "")),
"priority": int(pres.get("priority", 0)),
}
self._reply(200, presence_info)
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
if parsed.path == "/messages":
try:
qs = _urlparse.parse_qs(parsed.query)
sender = qs.get("from", [None])[0]
since = qs.get("since", [None])[0]
with _MSG_BUF_LOCK:
msgs = list(_MSG_BUF)
if sender:
msgs = [m for m in msgs if m["from"] == sender]
if since:
msgs = [m for m in msgs if m["ts"] >= since]
self._reply(200, {"ok": True, "count": len(msgs), "messages": msgs[-50:]})
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
else:
self._reply(404, {"ok": False, "error": "not found"})
def do_POST(self):
try:
length = int(self.headers.get('Content-Length', 0))
body = _json.loads(self.rfile.read(length))
to = body.get('to', MUC_ROOMS[0])
msg = body.get('message', '')
if not msg:
self._reply(400, {"ok": False, "error": "empty message"})
return
safe = _escape(msg.strip())
stanza = f'{safe}'
try:
xmpp.send_raw(stanza)
_record_group_msg(JID.split("@")[0], msg)
log(f"[http] → [{to.split('@')[0]}]: {msg[:80]}")
self._reply(200, {"ok": True})
except Exception as xmpp_err:
_record_group_msg(JID.split("@")[0], msg) # still record to buffer
log(f"[http] → [{to.split('@')[0]}]: {msg[:80]} (send failed: {xmpp_err})")
self._reply(200, {"ok": True, "warn": f"buffered but XMPP send: {xmpp_err}"})
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
def _reply(self, code, data):
body = _json.dumps(data, ensure_ascii=False).encode('utf-8')
self.send_response(code)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Content-Length', len(body))
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
pass # suppress http server noise
_httpd = _http_server.HTTPServer(('0.0.0.0', _HTTP_PORT), _BridgeHandler)
_httpd_thread = threading.Thread(target=_httpd.serve_forever, daemon=True)
_httpd_thread.start()
log(f"HTTP bridge ready on :{_HTTP_PORT}")
# ── Status check (runs on event loop) ──
async def _status_check():
while True:
await asyncio.sleep(60)
log("(alive)")
loop = asyncio.get_event_loop()
asyncio.ensure_future(_status_check())
try:
loop.run_forever()
except KeyboardInterrupt:
log("Shutdown by user")
except Exception as e:
log(f"!!! MAIN LOOP CRASH: {e}")
import traceback
log(f"!!! {traceback.format_exc()[:500]}")
raise