b9df510f31
- GRANT: [GRANT:xxm] one-time speak permission, overrides REVOKE - REVOKE: [REVOKE:xxm] 5-min speak ban, read-only mode - Coordinator switch: hmo can change with 'coordinator=xxm' - Shut-up: hmo says keywords → 5-min silence (was 30s) - Read-only mode: revoked agents see messages but output __SILENT__ - Removed old _is_silenced/_SILENCE_UNTIL, unified under _REVOKED_UNTIL
945 lines
38 KiB
Python
945 lines
38 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
XMPP Bot - 笑笑(xxm@yoin.fun)
|
||
Connects to ejabberd via slixmpp, bridges XMPP messages ? serve session.
|
||
|
||
Supports:
|
||
- Private chat (type='chat')
|
||
- Group chat (type='groupchat') via MUC rooms
|
||
- TCP keepalive (kernel-level) for connection stability
|
||
- slixmpp whitespace_keepalive (asyncio-level)
|
||
- Auto-reconnect with logging
|
||
- proc_guard PID lock to prevent duplicate instances
|
||
"""
|
||
import os, sys, time, threading, asyncio, logging, json
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
from chat_bridge import SessionBridge
|
||
from session_router import SessionRouter
|
||
from proc_guard import guard as _proc_guard
|
||
|
||
# ── PID lock — prevent duplicate instances ──
|
||
_lock = _proc_guard("xmpp_bot")
|
||
if not _lock.ok:
|
||
print(_lock.message, flush=True)
|
||
sys.exit(1)
|
||
|
||
# ── Config ──
|
||
JID = "xxm@yoin.fun"
|
||
PASSWORD = "hermes123"
|
||
SERVER = "xmpp.yoin.fun"
|
||
PORT = 3021
|
||
ATTACH_SESSION = "ses_xxm_xmpp"
|
||
MUC_ROOMS = [
|
||
"coregroup@conference.yoin.fun", # core group chat
|
||
"jujidina@conference.yoin.fun", # jujidina group
|
||
]
|
||
|
||
LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "logs")
|
||
os.makedirs(LOG_DIR, exist_ok=True)
|
||
LOG_FILE = os.path.join(LOG_DIR, "xmpp_bot.log")
|
||
|
||
_START_TIME = time.time() # used by /health endpoint
|
||
|
||
# ── Session router (wraps SessionBridge with routing + commands) ──
|
||
_router = SessionRouter(
|
||
bridge=SessionBridge(session_id=ATTACH_SESSION),
|
||
default_session=ATTACH_SESSION,
|
||
)
|
||
|
||
|
||
def log(m: str):
|
||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||
f.write(f"{time.strftime('%H:%M:%S')} {m}\n")
|
||
|
||
|
||
# ── Dedup: skip duplicate message IDs (same XMPP stanza) ──
|
||
_DEDUP_CACHE: set[str] = set()
|
||
_DEDUP_LOCK = threading.Lock()
|
||
|
||
|
||
def _is_duplicate(msg_id: str) -> bool:
|
||
if not msg_id:
|
||
return False
|
||
with _DEDUP_LOCK:
|
||
if msg_id in _DEDUP_CACHE:
|
||
return True
|
||
_DEDUP_CACHE.add(msg_id)
|
||
if len(_DEDUP_CACHE) > 100:
|
||
_DEDUP_CACHE.clear()
|
||
return False
|
||
|
||
|
||
# ── Bot instance ref (set after XMPP connect) ──
|
||
_xmpp: "Bot | None" = None
|
||
|
||
# ── MAM recovery guard: skip group messages during startup MAM fetch ──
|
||
# After 30s timeout, force-disable recovery to unblock group messages.
|
||
_MAM_RECOVERY = True
|
||
_MAM_RECOVERY_LOCK = threading.Lock()
|
||
_STARTUP_TIME = time.time()
|
||
_MAM_TIMEOUT = 30 # seconds
|
||
|
||
def _set_mam_done():
|
||
global _MAM_RECOVERY
|
||
with _MAM_RECOVERY_LOCK:
|
||
_MAM_RECOVERY = False
|
||
|
||
def _is_mam_recovery() -> bool:
|
||
# Timeout fallback: if _fetch_mam_history never completes, unblock after 30s
|
||
if time.time() - _STARTUP_TIME > _MAM_TIMEOUT:
|
||
global _MAM_RECOVERY
|
||
with _MAM_RECOVERY_LOCK:
|
||
if _MAM_RECOVERY:
|
||
_MAM_RECOVERY = False
|
||
log("(MAM recovery timed out, force-disabled)")
|
||
return _MAM_RECOVERY
|
||
with _MAM_RECOVERY_LOCK:
|
||
return _MAM_RECOVERY
|
||
|
||
# ── Silence cooldown: when user says shut up, actually shut up ──
|
||
# ── Coordinator protocol (aligned with mohe/zhiwei/xiaoguo) ──
|
||
# All in-band XMPP signaling, no DB dependency.
|
||
_COORDINATOR: str = "mohe" # which agent moderates
|
||
_GRANTED: str | None = None # agent granted one-time speak permission
|
||
_REVOKED_UNTIL: float = 0.0 # timestamp until revoked agent can speak again
|
||
_SHUTUP_PATTERNS = ["闭嘴", "别说话", "安静", "shut", "stfu", "别说了", "停"]
|
||
|
||
|
||
def _process_coordinator_signals(nickname: str, body: str) -> bool:
|
||
"""Parse coordinator/GRANT/REVOKE signals from group messages.
|
||
Returns True if the message was consumed as a control signal (no further processing)."""
|
||
global _COORDINATOR, _GRANTED, _REVOKED_UNTIL
|
||
|
||
# 1. hmo switches coordinator
|
||
if nickname == 'hmo' and 'coordinator=' in body.lower():
|
||
for name in ('mohe', 'zhiwei', 'xxm'):
|
||
if f'coordinator={name}' in body.lower():
|
||
_COORDINATOR = name
|
||
_GRANTED = None
|
||
log(f"Coordinator switched to {name} by hmo")
|
||
return True
|
||
|
||
# 2. GRANT signal (overrides REVOKE, one-time use)
|
||
gm = re.search(r'\[GRANT:(\w+)\]', body)
|
||
if gm:
|
||
_GRANTED = gm.group(1)
|
||
_REVOKED_UNTIL = 0 # lift revocation when granted
|
||
log(f"GRANT: {_GRANTED}")
|
||
return True # signal consumed, don't need to process further
|
||
|
||
# 3. REVOKE signal (5min auto-restore)
|
||
rm = re.search(r'\[REVOKE:(\w+)\]', body)
|
||
if rm and rm.group(1) == 'xxm':
|
||
_REVOKED_UNTIL = time.time() + 300
|
||
log(f"REVOKEd: xxm silenced for 5min")
|
||
return True
|
||
|
||
# Not a control signal
|
||
return False
|
||
|
||
|
||
def _check_shutup(body: str) -> bool:
|
||
"""Check if hmo is telling the bot to shut up. 5-min silence (matching coordinator pattern)."""
|
||
lower = body.lower().strip()
|
||
for pat in _SHUTUP_PATTERNS:
|
||
if pat.lower() in lower:
|
||
_REVOKED_UNTIL = time.time() + 300
|
||
log(f"(shutup: '{pat}' → 5min silence)")
|
||
return True
|
||
return False
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Private message handler
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
def on_message(msg):
|
||
"""Handle private chat messages (type='chat')."""
|
||
# Skip group chat messages (handled separately)
|
||
if msg["type"] == "groupchat":
|
||
return
|
||
|
||
msg_id = msg.get("id", "")
|
||
if _is_duplicate(msg_id):
|
||
log(f"(duplicate msg {msg_id[:12]}... skipped)")
|
||
return
|
||
|
||
body = str(msg["body"])
|
||
sender = str(msg["from"]).split("/")[0] # bare JID: hmo@yoin.fun
|
||
log(f"<{sender}> {body[:80]}")
|
||
|
||
# Ignore self-messages
|
||
if sender == JID:
|
||
log(f"(skipped self-message)")
|
||
return
|
||
|
||
# Shut-up check — hard silence before any processing
|
||
if time.time() < _REVOKED_UNTIL:
|
||
log(f"(silenced) <{sender}> {body[:60]}... dropped")
|
||
return
|
||
if _check_shutup(body):
|
||
return
|
||
|
||
def _handle():
|
||
try:
|
||
log(f"router.route...")
|
||
reply_text = _router.route("xmpp", sender, body)
|
||
if reply_text:
|
||
reply_text = _strip_toolcall_xml(reply_text) or reply_text
|
||
bot = _xmpp
|
||
if bot:
|
||
safe_body = _escape(reply_text)
|
||
stanza = (
|
||
f"<message to='{sender}' from='{JID}' type='chat'>"
|
||
f"<body>{safe_body}</body></message>"
|
||
)
|
||
# Schedule send on event loop with unique event name
|
||
evt = f"send_reply_{msg_id or int(time.time()*1000)}"
|
||
bot.schedule(evt, 0, lambda b=bot, s=stanza, who=sender, txt=reply_text[:80]: (
|
||
b.send_raw(s), log(f"-> {who}: {txt}")
|
||
))
|
||
else:
|
||
log(f"-> {sender}: no bot ref)")
|
||
else:
|
||
log(f"-> {sender}: (no reply)")
|
||
except Exception as e:
|
||
log(f"!!! EXCEPTION: {e}")
|
||
import traceback
|
||
log(f"!!! {traceback.format_exc()[:200]}")
|
||
|
||
threading.Thread(target=_handle, daemon=True).start()
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Response extractor — handles LLM putting __SILENT__ before
|
||
# actual content (observed behavior: LLM uses it as thinking tag)
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
import re as _re
|
||
import threading as _threading
|
||
import subprocess as _subprocess
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Sub-agent: execute shell commands (##exec:command##)
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
_EXEC_RE = _re.compile(r"##exec:(.+?)##", _re.DOTALL)
|
||
_EXEC_TIMEOUT = 60 # max seconds per command
|
||
|
||
|
||
def _run_command(cmd: str) -> str:
|
||
"""Run a shell command and return its stdout+stderr output."""
|
||
log(f"(exec: {cmd[:120]})")
|
||
try:
|
||
r = _subprocess.run(
|
||
cmd, shell=True, capture_output=True, timeout=_EXEC_TIMEOUT,
|
||
text=True, encoding='utf-8', errors='replace'
|
||
)
|
||
out = (r.stdout or "") + (r.stderr or "")
|
||
out = out.strip()
|
||
if not out:
|
||
out = "(no output, exit code %d)" % r.returncode
|
||
log(f"(exec done: {len(out)} bytes, exit={r.returncode})")
|
||
return out
|
||
except _subprocess.TimeoutExpired:
|
||
log(f"(exec timeout >{_EXEC_TIMEOUT}s)")
|
||
return "(命令超时)"
|
||
except Exception as e:
|
||
log(f"(exec error: {e})")
|
||
return f"(命令执行失败: {e})"
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Delayed reply support — schedule a group message after N sec
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
_DELAY_RE = _re.compile(r"##delay:?(\d+)?##")
|
||
_DELAY_DEFAULT = 15 # seconds, when no number specified
|
||
_HAS_CMD = _re.compile(r"##(delay|exec)") # any command marker
|
||
|
||
|
||
def _extract_acknowledgment(text: str) -> str:
|
||
"""Return text before the first ##command## marker, if any."""
|
||
idx = text.find("##")
|
||
if idx > 0:
|
||
return text[:idx].strip()
|
||
return ""
|
||
|
||
|
||
def _schedule_delayed(delay_sec: int, room: str):
|
||
"""Schedule a re-invocation of the LLM after *delay_sec* seconds."""
|
||
def _fire():
|
||
bot = _xmpp
|
||
if not bot:
|
||
log(f"!! delay: no bot ref")
|
||
return
|
||
try:
|
||
prompt = "时间到,请根据最新的信息汇报结果。"
|
||
reply = _router.bridge.send_raw(prompt)
|
||
if reply:
|
||
report = _extract_response(reply)
|
||
if report:
|
||
safe_body = _escape(report.strip())
|
||
stanza = f"<message to='{room}' from='{JID}' type='groupchat'><body>{safe_body}</body></message>"
|
||
bot.send_raw(stanza)
|
||
log(f"-> [Delay][{room}]: {report.strip()[:80]}")
|
||
return
|
||
log(f"-> [Delay][{room}]: (LLM empty)")
|
||
except Exception as e:
|
||
log(f"!! delay err: {e}")
|
||
t = _threading.Timer(delay_sec, _fire)
|
||
t.daemon = True
|
||
t.start()
|
||
log(f"(delay +{delay_sec}s → {room})")
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Response extractor
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
# ── Pattern: natural language "stay silent" hints ──
|
||
# Catches cases where the LLM says it should stay silent but forgot __SILENT__ prefix.
|
||
# Only checks the first line to avoid blocking multi-line real replies.
|
||
_SILENCE_PATTERNS = [
|
||
"保持沉默",
|
||
"不应[该]?回复",
|
||
"没有.*@.*我",
|
||
"不是对[我我说]",
|
||
"跟我无关",
|
||
"我不用回复",
|
||
"不该回复",
|
||
"不参与",
|
||
"不是我[应]?该[说回]",
|
||
]
|
||
|
||
|
||
def _strip_toolcall_xml(text: str) -> str:
|
||
"""Strip tool call XML that leaks from max-tool-loop final force.
|
||
Removes <tool_calls>, <invoke>, <parameter>, <result> tags and their content.
|
||
"""
|
||
t = text
|
||
t = _re.sub(r'<invoke\s+[^>]*>.*?(</invoke>|$)', '', t, flags=_re.DOTALL)
|
||
t = _re.sub(r'<tool_calls>.*?(</tool_calls>|$)', '', t, flags=_re.DOTALL)
|
||
t = _re.sub(r'<parameter\s+[^>]*>.*?(</parameter>|$)', '', t, flags=_re.DOTALL)
|
||
t = _re.sub(r'<result>.*?(</result>|$)', '', t, flags=_re.DOTALL)
|
||
return t.strip()
|
||
|
||
|
||
def _extract_response(text: str) -> str | None:
|
||
"""Strip __SILENT__ + reasoning, or detect natural language silence intent.
|
||
Returns actual content to send, or None to stay silent."""
|
||
if not text:
|
||
return None
|
||
t = text.strip()
|
||
if not t:
|
||
return None
|
||
t = _strip_toolcall_xml(t)
|
||
|
||
# ── Natural language silence detection (fallback) ──
|
||
if not t.startswith("__SILENT__"):
|
||
first = t.split("\n", 1)[0] # only check first line
|
||
for pat in _SILENCE_PATTERNS:
|
||
if _re.search(pat, first):
|
||
return None # LLM says it should stay silent → suppress
|
||
return t # No silence signal → respond normally
|
||
|
||
# ── Has __SILENT__ prefix — strip it and reasoning ──
|
||
parts = t.split("\n", 1)
|
||
if len(parts) < 2:
|
||
return None # Just __SILENT__, no content
|
||
|
||
rest = parts[1].strip()
|
||
# Strip reasoning blocks (...)and (...) at the start
|
||
while True:
|
||
m = _re.match(r'^([^)]*)\s*', rest)
|
||
if m:
|
||
rest = rest[m.end():]
|
||
continue
|
||
m = _re.match(r'^\([^)]*\)\s*', rest)
|
||
if m:
|
||
rest = rest[m.end():]
|
||
continue
|
||
break
|
||
return rest.strip() or None
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Group message batching (debounce + serialized processing)
|
||
# ═══════════════════════════════════════════════════════════════
|
||
#
|
||
# Three states per room:
|
||
# 1. IDLE → first message arrives → start 3s debounce timer
|
||
# 2. BATCHING → timer pending (more messages merge in)
|
||
# 3. PROCESSING → LLM call in flight → new messages → pending queue
|
||
# → LLM finishes → auto-flush pending queue
|
||
#
|
||
_BATCH_WINDOW = 3.0
|
||
_batch_entries: dict[str, list[str]] = {}
|
||
_batch_timers: dict[str, threading.Timer] = {}
|
||
_batch_processing: set[str] = set() # rooms in active LLM call
|
||
_batch_pending: dict[str, list[str]] = {} # overflow during processing
|
||
_batch_lock = threading.Lock()
|
||
_BOT_NICK = JID.split("@")[0] # "xxm"
|
||
|
||
|
||
def _process_group_reply(raw_reply: str, room: str, msg_id: str = ""):
|
||
"""Shared: process LLM reply for group chat (silence/delay/send)."""
|
||
if not raw_reply:
|
||
log(f"-> [Group][{room}]: (no reply)")
|
||
_batch_done(room)
|
||
return
|
||
|
||
# 1. ##delay:N## → pure delay
|
||
delay_m = _DELAY_RE.search(raw_reply)
|
||
if delay_m:
|
||
sec = int(delay_m.group(1)) if delay_m.group(1) else _DELAY_DEFAULT
|
||
_schedule_delayed(sec, room)
|
||
_batch_done(room)
|
||
return
|
||
|
||
# 2. Normal reply
|
||
reply_text = _extract_response(raw_reply)
|
||
if reply_text:
|
||
_send_group(reply_text, room, msg_id)
|
||
else:
|
||
log(f"-> [Group][{room}]: (silent)")
|
||
_batch_done(room)
|
||
|
||
|
||
def _batch_done(room: str):
|
||
"""Called when a batch LLM call finishes. Flush pending if any."""
|
||
with _batch_lock:
|
||
_batch_processing.discard(room)
|
||
pending = _batch_pending.pop(room, None)
|
||
if pending:
|
||
_batch_entries[room] = pending
|
||
t = threading.Timer(0.1, _fire_batch, args=[room])
|
||
t.daemon = True
|
||
t.start()
|
||
_batch_timers[room] = t
|
||
return
|
||
log(f"[Batch][{room}] (idle)")
|
||
|
||
|
||
BATCH_TIMEOUT = 300 # max seconds per batch LLM call (tool calls like SSH can be slow)
|
||
|
||
def _fire_batch(room: str):
|
||
"""Take entries and launch LLM call (one at a time per room)."""
|
||
with _batch_lock:
|
||
entries = _batch_entries.pop(room, None)
|
||
_batch_timers.pop(room, None)
|
||
if not entries:
|
||
return
|
||
_batch_processing.add(room)
|
||
|
||
combined = "\n".join(entries)
|
||
|
||
def _handle():
|
||
done = threading.Event()
|
||
timed_out = [False]
|
||
|
||
def _timeout():
|
||
timed_out[0] = True
|
||
log(f"[Batch][{room}] TIMEOUT ({BATCH_TIMEOUT}s), force-unblocking")
|
||
_batch_done(room)
|
||
done.set()
|
||
|
||
timer = threading.Timer(BATCH_TIMEOUT, _timeout)
|
||
timer.daemon = True
|
||
timer.start()
|
||
|
||
try:
|
||
raw = _router.route("xmpp", room, combined)
|
||
if not timed_out[0]:
|
||
timer.cancel()
|
||
_process_group_reply(raw, room)
|
||
else:
|
||
log(f"[Batch][{room}] route returned after timeout, discarded")
|
||
except Exception as e:
|
||
log(f"!!! BATCH: {e}")
|
||
import traceback
|
||
log(f"!!! {traceback.format_exc()[:200]}")
|
||
if not timed_out[0]:
|
||
timer.cancel()
|
||
_batch_done(room)
|
||
finally:
|
||
done.set()
|
||
|
||
threading.Thread(target=_handle, daemon=True).start()
|
||
|
||
|
||
def _batch_group_message(room: str, nickname: str, body: str) -> bool:
|
||
"""
|
||
Add a group message to the room batch.
|
||
Returns True if batched (pending or timer), False if immediate (@mention).
|
||
"""
|
||
# Direct @mention → bypass batch
|
||
if f"@{_BOT_NICK}" in body or body.startswith(_BOT_NICK):
|
||
return False
|
||
|
||
formatted = f"[{nickname}]: {body}"
|
||
|
||
with _batch_lock:
|
||
# PROCESSING → queue as pending
|
||
if room in _batch_processing:
|
||
if room in _batch_pending:
|
||
_batch_pending[room].append(formatted)
|
||
else:
|
||
_batch_pending[room] = [formatted]
|
||
return True # batched as pending
|
||
|
||
# BATCHING (timer pending) → merge in, reset timer
|
||
timer = _batch_timers.pop(room, None)
|
||
if timer:
|
||
timer.cancel()
|
||
if room in _batch_entries:
|
||
_batch_entries[room].append(formatted)
|
||
else:
|
||
_batch_entries[room] = [formatted]
|
||
|
||
# (Re)start debounce timer
|
||
t = threading.Timer(_BATCH_WINDOW, _fire_batch, args=[room])
|
||
t.daemon = True
|
||
t.start()
|
||
_batch_timers[room] = t
|
||
|
||
return True
|
||
|
||
|
||
# ── Group chat handler
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
# Message buffer for HTTP bridge GET /messages
|
||
_MSG_BUF: list[dict] = []
|
||
_MSG_BUF_LOCK = threading.Lock()
|
||
|
||
def _record_group_msg(nickname: str, body: str):
|
||
ts = time.strftime("%H:%M:%S")
|
||
with _MSG_BUF_LOCK:
|
||
_MSG_BUF.append({"ts": ts, "from": nickname, "body": body})
|
||
if len(_MSG_BUF) > 200:
|
||
_MSG_BUF[:] = _MSG_BUF[-150:]
|
||
|
||
|
||
def on_group_message(msg):
|
||
"""Handle group chat messages (type='groupchat') from MUC rooms.
|
||
|
||
Observer pattern with batching: nearby messages from the same room
|
||
are merged into one LLM call. Direct @mentions bypass the batch
|
||
and are processed immediately.
|
||
"""
|
||
# Skip MAM-recovered messages during startup (already saved to context)
|
||
if _is_mam_recovery():
|
||
# Still save self-msg to context for continuity
|
||
full_from = str(msg["from"])
|
||
bot_nick = JID.split("@")[0]
|
||
nickname = full_from.split("/")[1] if "/" in full_from else ""
|
||
if nickname == bot_nick:
|
||
body = str(msg["body"]).strip()
|
||
log(f"(MAM self-msg saved to ctx) {body[:80]}")
|
||
try:
|
||
_router.bridge._append_to_log("assistant", body)
|
||
except Exception:
|
||
pass
|
||
return
|
||
|
||
msg_id = msg.get("id", "")
|
||
if _is_duplicate(msg_id):
|
||
log(f"(group dup {msg_id[:12]}... skipped)")
|
||
return
|
||
|
||
body = str(msg["body"]).strip()
|
||
if not body:
|
||
return
|
||
|
||
full_from = str(msg["from"])
|
||
room = full_from.split("/")[0]
|
||
nickname = full_from.split("/")[1] if "/" in full_from else ""
|
||
bot_nick = JID.split("@")[0]
|
||
|
||
# Self-message echo — skip error loops, let rest through
|
||
if nickname == bot_nick:
|
||
if "模型无响应" in body or "命令循环次数超限" in body:
|
||
log(f"(self-msg error, skipped) {body[:60]}")
|
||
return
|
||
log(f"(self-msg to LLM) {body[:80]}")
|
||
body = f"(自言自语) {body}"
|
||
|
||
# Record to message buffer for HTTP bridge monitoring
|
||
_record_group_msg(nickname, body)
|
||
|
||
# ── Coordinator signals (GRANT/REVOKE/coordinator switch) ──
|
||
if _process_coordinator_signals(nickname, body):
|
||
return
|
||
|
||
# ── Revoke check: read-only mode (see but don't reply) ──
|
||
is_revoked = time.time() < _REVOKED_UNTIL
|
||
# GRANT overrides REVOKE
|
||
if is_revoked and _GRANTED == 'xxm':
|
||
_GRANTED = None
|
||
is_revoked = False
|
||
log("GRANTed: xxm can speak despite REVOKE")
|
||
|
||
# ── Shut-up check ──
|
||
if _check_shutup(body):
|
||
return
|
||
|
||
# ── Read-only mode for revoked agents ──
|
||
if is_revoked:
|
||
body = f"【只读消息】你目前被收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {room}] {nickname} 说: {body}"
|
||
|
||
# Batch nearby messages (unless @mention → process immediately)
|
||
if _batch_group_message(room, nickname, body):
|
||
log(f"[Group][{room}] {nickname}: {body[:80]} (batched)")
|
||
return
|
||
|
||
# Direct @mention → immediate processing
|
||
log(f"[Group][{room}] {nickname}: {body[:80]}")
|
||
|
||
def _handle():
|
||
try:
|
||
raw_reply = _router.route("xmpp", full_from, body)
|
||
_process_group_reply(raw_reply, room, msg_id)
|
||
except Exception as e:
|
||
log(f"!!! GROUP EXCEPTION: {e}")
|
||
import traceback
|
||
log(f"!!! {traceback.format_exc()[:200]}")
|
||
|
||
threading.Thread(target=_handle, daemon=True).start()
|
||
|
||
|
||
def _send_group(text: str, room: str, msg_id: str = ""):
|
||
"""Send a group chat message."""
|
||
bot = _xmpp
|
||
if not bot:
|
||
log(f"-> [Group][{room}]: no bot ref)")
|
||
return
|
||
safe_body = _escape(text.strip())
|
||
stanza = (
|
||
f"<message to='{room}' from='{JID}' type='groupchat'>"
|
||
f"<body>{safe_body}</body></message>"
|
||
)
|
||
evt = f"send_grp_{msg_id or int(time.time()*1000)}"
|
||
bot.schedule(evt, 0, lambda b=bot, s=stanza, t=f"[Group][{room}]", txt=text[:80]: (
|
||
b.send_raw(s), log(f"-> {t}: {txt}")
|
||
))
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Helpers
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
def _escape(text: str) -> str:
|
||
"""Escape XML special characters for XMPP body content."""
|
||
return (text
|
||
.replace("&", "&")
|
||
.replace("<", "<")
|
||
.replace(">", ">")
|
||
.replace('"', """))
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Main
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
if __name__ == "__main__":
|
||
# Force selector event loop on Windows (proactor + SSL has issues with slixmpp)
|
||
import asyncio
|
||
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
||
import slixmpp
|
||
|
||
class Bot(slixmpp.ClientXMPP):
|
||
def __init__(self):
|
||
super().__init__(JID, PASSWORD)
|
||
# Force STARTTLS (port 3021 uses STARTTLS not direct SSL)
|
||
self.enable_direct_tls = False
|
||
self.enable_starttls = True
|
||
self.add_event_handler("session_start", self.on_start)
|
||
self.add_event_handler("message", on_message)
|
||
self.add_event_handler("groupchat_message", on_group_message)
|
||
self.auto_reconnect = True
|
||
self.reconnect_max_delay = 10
|
||
|
||
# Use slixmpp built-in keepalive (sends XML whitespace, reliable)
|
||
self.whitespace_keepalive = True
|
||
self.whitespace_keepalive_interval = 30
|
||
|
||
self.add_event_handler("session_end", self.on_session_end)
|
||
self.add_event_handler("connection_failed", self.on_conn_failed)
|
||
self.add_event_handler("disconnected", self.on_disconnected)
|
||
self.add_event_handler("connected", self.on_connected)
|
||
|
||
# MUC plugin for group chat
|
||
try:
|
||
self.register_plugin('xep_0045')
|
||
except Exception as e:
|
||
log(f"MUC plugin xep_0045 not available: {e}")
|
||
# MAM plugin for message archive — registered on session_start (not in __init__)
|
||
# to avoid event loop issues
|
||
|
||
def on_connected(self, event):
|
||
log("connection established")
|
||
|
||
def on_start(self, event):
|
||
self.send_presence()
|
||
self.get_roster()
|
||
log(f"{JID} online")
|
||
|
||
# Register MAM plugin lazily (can't do it in __init__ before event loop)
|
||
try:
|
||
self.register_plugin('xep_0313')
|
||
except Exception:
|
||
log("(MAM: xep_0313 register failed, continuing without)")
|
||
|
||
# Join MUC rooms silently (observer pattern: new room → stay silent)
|
||
bot_nick = JID.split("@")[0]
|
||
async def _join_silent():
|
||
for room_jid in MUC_ROOMS:
|
||
for attempt in range(3):
|
||
try:
|
||
# Use join_muc_wait to ensure room join completes
|
||
await self.plugin['xep_0045'].join_muc_wait(room_jid, bot_nick, timeout=60)
|
||
log(f"Joined {room_jid} (silent)")
|
||
break
|
||
except asyncio.TimeoutError:
|
||
log(f"MUC join timeout ({attempt+1}/3) for {room_jid}")
|
||
if attempt == 2:
|
||
log(f"MUC setup failed for {room_jid} after 3 attempts")
|
||
await asyncio.sleep(5)
|
||
else:
|
||
await asyncio.sleep(3)
|
||
except Exception as e:
|
||
log(f"MUC setup failed for {room_jid}: {e} (type={type(e).__name__})")
|
||
await asyncio.sleep(5)
|
||
break
|
||
# After joining, query MAM for recent history
|
||
await asyncio.sleep(3) # wait for MUC join to propagate
|
||
await _fetch_mam_history()
|
||
asyncio.ensure_future(_join_silent())
|
||
|
||
def on_session_end(self, event):
|
||
log(f"{JID} session ended")
|
||
|
||
def on_conn_failed(self, event):
|
||
log(f"connection failed: {event}")
|
||
|
||
def on_disconnected(self, event):
|
||
log(f"disconnected, reconnecting... (auto_reconnect={self.auto_reconnect})")
|
||
|
||
async def _fetch_mam_history():
|
||
"""Query MAM archive for recent messages in MUC rooms to rebuild context."""
|
||
bot = _xmpp
|
||
if not bot or 'xep_0313' not in bot.plugin:
|
||
log("(MAM: no bot or plugin)")
|
||
return
|
||
try:
|
||
for room_jid in MUC_ROOMS:
|
||
log(f"(MAM: querying {room_jid} for last 50 messages...)")
|
||
results = await bot.plugin['xep_0313'].retrieve(
|
||
jid=room_jid,
|
||
rsm={'max': 50},
|
||
)
|
||
# Results is an IQ stanza with mam results
|
||
count = 0
|
||
for msg in results['mam']['results']:
|
||
forwarded = msg['mam_result']['forwarded']
|
||
body = str(forwarded['stanza']['body'] or '').strip()
|
||
if not body:
|
||
continue
|
||
nick = str(forwarded['stanza']['from']).split('/')[-1] if '/' in str(forwarded['stanza']['from']) else '?'
|
||
role = 'user' if nick != 'xxm' else 'assistant'
|
||
entry = json.dumps({
|
||
"ts": int(time.time()),
|
||
"role": role,
|
||
"content": f"[{nick}]: {body[:300]}"
|
||
}, ensure_ascii=False)
|
||
_append_context(entry)
|
||
count += 1
|
||
log(f"(MAM: loaded {count} msgs from {room_jid})")
|
||
_set_mam_done()
|
||
log("(MAM recovery complete, group messages now active)")
|
||
except Exception as e:
|
||
log(f"(MAM error: {e})")
|
||
_set_mam_done()
|
||
|
||
|
||
def _append_context(entry: str):
|
||
"""Append a JSONL entry to the bridge context log."""
|
||
import os as _os
|
||
ctx_log = _os.path.join(_os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))),
|
||
"temp", ".bridge_context.jsonl")
|
||
try:
|
||
with open(ctx_log, "a", encoding="utf-8") as f:
|
||
f.write(entry + "\n")
|
||
with open(ctx_log, "r", encoding="utf-8") as f:
|
||
lines = f.readlines()
|
||
if len(lines) > 200:
|
||
with open(ctx_log, "w", encoding="utf-8") as f:
|
||
f.writelines(lines[-150:])
|
||
except Exception:
|
||
pass
|
||
|
||
# ═══════════════════ START BOT ═══════════════════
|
||
xmpp = Bot()
|
||
_xmpp = xmpp
|
||
xmpp.connect(host=SERVER, port=PORT)
|
||
log(f"Connecting {JID}@{SERVER}:{PORT}")
|
||
|
||
# ── Local HTTP bridge: send/read XMPP messages from external tools ──
|
||
import http.server as _http_server, json as _json, urllib.parse as _urlparse
|
||
_HTTP_PORT = 5802
|
||
|
||
class _BridgeHandler(_http_server.BaseHTTPRequestHandler):
|
||
def do_GET(self):
|
||
parsed = _urlparse.urlparse(self.path)
|
||
if parsed.path == "/muc":
|
||
# Return who's online in the MUC rooms
|
||
# This is the reliable cross-platform presence indicator
|
||
try:
|
||
muc_info = {"rooms": {}}
|
||
if _xmpp is not None and 'xep_0045' in _xmpp.plugin:
|
||
muc_plugin = _xmpp.plugin['xep_0045']
|
||
for room_jid in MUC_ROOMS:
|
||
room_data = {"jid": room_jid, "participants": []}
|
||
try:
|
||
if room_jid in muc_plugin.rooms:
|
||
room = muc_plugin.rooms[room_jid]
|
||
for nick, info in room.get('roster', {}).items():
|
||
participant = {
|
||
"nick": nick,
|
||
"jid": str(info.get('jid', '')),
|
||
"affiliation": str(info.get('affiliation', '')),
|
||
"role": str(info.get('role', '')),
|
||
}
|
||
room_data["participants"].append(participant)
|
||
except Exception as room_err:
|
||
room_data["error"] = str(room_err)
|
||
muc_info["rooms"][room_jid] = room_data
|
||
self._reply(200, muc_info)
|
||
except Exception as e:
|
||
self._reply(500, {"ok": False, "error": str(e)})
|
||
return
|
||
if parsed.path == "/health":
|
||
# XMPP connection health — used by Dashboard for cross-platform monitoring
|
||
try:
|
||
xmpp_alive = _xmpp is not None
|
||
# Use session_started_event instead of is_connected() - more reliable
|
||
session_ok = _xmpp.session_started_event.is_set() if hasattr(_xmpp, 'session_started_event') else False
|
||
socket_ok = _xmpp.is_connected() if hasattr(_xmpp, 'is_connected') else False
|
||
connected = session_ok or socket_ok
|
||
uptime_sec = int(time.time() - _START_TIME)
|
||
self._reply(200, {
|
||
"ok": True,
|
||
"xmpp_connected": connected,
|
||
"ejabberd_alive": connected,
|
||
"bot_jid": JID,
|
||
"uptime_sec": uptime_sec,
|
||
"muc_rooms": MUC_ROOMS,
|
||
})
|
||
except Exception as e:
|
||
self._reply(500, {"ok": False, "error": str(e)})
|
||
return
|
||
if parsed.path.startswith("/presence"):
|
||
# Check if a JID is online via XMPP roster presence
|
||
# Usage: GET /presence/mohe@yoin.fun
|
||
jid_to_check = parsed.path[len("/presence/"):].strip()
|
||
if not jid_to_check:
|
||
self._reply(400, {"ok": False, "error": "missing JID"})
|
||
return
|
||
try:
|
||
presence_info = {"jid": jid_to_check, "online": False, "resources": []}
|
||
if _xmpp is not None and hasattr(_xmpp, 'client_roster'):
|
||
roster = _xmpp.client_roster
|
||
if jid_to_check in roster:
|
||
entry = roster[jid_to_check]
|
||
resources = list(entry.resources.keys()) if entry.resources else []
|
||
presence_info["online"] = len(resources) > 0
|
||
presence_info["resources"] = resources
|
||
# Get presence show/status for each resource
|
||
for res in resources:
|
||
pres = entry.resources[res]
|
||
presence_info.setdefault("details", {})[res] = {
|
||
"show": str(pres.get("show", "available")),
|
||
"status": str(pres.get("status", "")),
|
||
"priority": int(pres.get("priority", 0)),
|
||
}
|
||
self._reply(200, presence_info)
|
||
except Exception as e:
|
||
self._reply(500, {"ok": False, "error": str(e)})
|
||
return
|
||
if parsed.path == "/messages":
|
||
try:
|
||
qs = _urlparse.parse_qs(parsed.query)
|
||
sender = qs.get("from", [None])[0]
|
||
since = qs.get("since", [None])[0]
|
||
with _MSG_BUF_LOCK:
|
||
msgs = list(_MSG_BUF)
|
||
if sender:
|
||
msgs = [m for m in msgs if m["from"] == sender]
|
||
if since:
|
||
msgs = [m for m in msgs if m["ts"] >= since]
|
||
self._reply(200, {"ok": True, "count": len(msgs), "messages": msgs[-50:]})
|
||
except Exception as e:
|
||
self._reply(500, {"ok": False, "error": str(e)})
|
||
else:
|
||
self._reply(404, {"ok": False, "error": "not found"})
|
||
|
||
def do_POST(self):
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
body = _json.loads(self.rfile.read(length))
|
||
to = body.get('to', MUC_ROOMS[0])
|
||
msg = body.get('message', '')
|
||
if not msg:
|
||
self._reply(400, {"ok": False, "error": "empty message"})
|
||
return
|
||
safe = _escape(msg.strip())
|
||
stanza = f'<message to="{to}" type="groupchat"><body>{safe}</body></message>'
|
||
try:
|
||
xmpp.send_raw(stanza)
|
||
_record_group_msg(JID.split("@")[0], msg)
|
||
log(f"[http] → [{to.split('@')[0]}]: {msg[:80]}")
|
||
self._reply(200, {"ok": True})
|
||
except Exception as xmpp_err:
|
||
_record_group_msg(JID.split("@")[0], msg) # still record to buffer
|
||
log(f"[http] → [{to.split('@')[0]}]: {msg[:80]} (send failed: {xmpp_err})")
|
||
self._reply(200, {"ok": True, "warn": f"buffered but XMPP send: {xmpp_err}"})
|
||
except Exception as e:
|
||
self._reply(500, {"ok": False, "error": str(e)})
|
||
|
||
def _reply(self, code, data):
|
||
body = _json.dumps(data, ensure_ascii=False).encode('utf-8')
|
||
self.send_response(code)
|
||
self.send_header('Content-Type', 'application/json; charset=utf-8')
|
||
self.send_header('Content-Length', len(body))
|
||
self.end_headers()
|
||
self.wfile.write(body)
|
||
|
||
def log_message(self, format, *args):
|
||
pass # suppress http server noise
|
||
|
||
_httpd = _http_server.HTTPServer(('0.0.0.0', _HTTP_PORT), _BridgeHandler)
|
||
_httpd_thread = threading.Thread(target=_httpd.serve_forever, daemon=True)
|
||
_httpd_thread.start()
|
||
log(f"HTTP bridge ready on :{_HTTP_PORT}")
|
||
|
||
# ── Status check (runs on event loop) ──
|
||
async def _status_check():
|
||
while True:
|
||
await asyncio.sleep(60)
|
||
log("(alive)")
|
||
|
||
loop = asyncio.get_event_loop()
|
||
asyncio.ensure_future(_status_check())
|
||
|
||
try:
|
||
loop.run_forever()
|
||
except KeyboardInterrupt:
|
||
log("Shutdown by user")
|
||
except Exception as e:
|
||
log(f"!!! MAIN LOOP CRASH: {e}")
|
||
import traceback
|
||
log(f"!!! {traceback.format_exc()[:500]}")
|
||
raise
|