882 lines
34 KiB
Python
882 lines
34 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
XMPP Agent Core — 统一版
|
||
=========================
|
||
Single bot core for all agents. Supports --agent xxm|mohe|zhiwei|xiaoguo.
|
||
|
||
Usage:
|
||
python xmpp_agent_core.py --agent xxm # xxm, uses chat_bridge
|
||
python xmpp_agent_core.py --agent mohe # mohe, uses Hermes API
|
||
python xmpp_agent_core.py --agent zhiwei # zhiwei, uses Hermes API
|
||
python xmpp_agent_core.py --agent xiaoguo # xiaoguo, uses Hermes API
|
||
|
||
Shares: PID lock, reconnect, MUC join, dedup, batching,
|
||
coordinator protocol (GRANT/REVOKE), HTTP bridge.
|
||
Differs only in LLM calling method (chat_bridge vs Hermes API).
|
||
"""
|
||
import os, sys, time, threading, asyncio, logging, json, re, ssl
|
||
import urllib.request, http.server, urllib.parse
|
||
|
||
# ── Windows selector loop (slixmpp needs it on Windows) ──
|
||
if sys.platform == "win32":
|
||
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
||
|
||
# ── PATH: allow imports from gateway/scripts/ (proc_guard, chat_bridge) ──
|
||
_GATEWAY_SCRIPTS = os.path.join(os.path.dirname(os.path.abspath(__file__)),
|
||
"gateway", "scripts")
|
||
sys.path.insert(0, _GATEWAY_SCRIPTS)
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# AGENTS Configuration
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
AGENTS = {
|
||
"mohe": {
|
||
"jid": "mohe@yoin.fun",
|
||
"password": "hermes123",
|
||
"nick": "mohe",
|
||
"name_cn": "莫荷",
|
||
"http_port": 5804,
|
||
"gateway": "http://localhost:8642/v1/chat/completions",
|
||
"session_id": "xmpp-mohe-v2",
|
||
"server": "127.0.0.1",
|
||
"port": 5222,
|
||
"muc_rooms": ["coregroup@conference.yoin.fun"],
|
||
"mention": "@mohe/@莫荷",
|
||
},
|
||
"zhiwei": {
|
||
"jid": "zhiwei@yoin.fun",
|
||
"password": "hermes123",
|
||
"nick": "zhiwei",
|
||
"name_cn": "知微",
|
||
"http_port": 5805,
|
||
"gateway": "http://localhost:8643/v1/chat/completions",
|
||
"session_id": "xmpp-zhiwei",
|
||
"server": "127.0.0.1",
|
||
"port": 5222,
|
||
"muc_rooms": ["coregroup@conference.yoin.fun"],
|
||
"mention": "@zhiwei/@知微",
|
||
},
|
||
"xiaoguo": {
|
||
"jid": "xiaoguo@yoin.fun",
|
||
"password": "hermes123",
|
||
"nick": "xiaoguo",
|
||
"name_cn": "小果",
|
||
"http_port": 5806,
|
||
"gateway": "http://localhost:8645/v1/chat/completions",
|
||
"session_id": "xmpp-xiaoguo",
|
||
"server": "127.0.0.1",
|
||
"port": 5222,
|
||
"muc_rooms": ["coregroup@conference.yoin.fun"],
|
||
"mention": "@xiaoguo/@小果",
|
||
},
|
||
"xxm": {
|
||
"jid": "xxm@yoin.fun",
|
||
"password": "hermes123",
|
||
"nick": "xxm",
|
||
"name_cn": "笑笑",
|
||
"http_port": 5802,
|
||
"bridge": "chat_bridge", # use local chat_bridge instead of Hermes API
|
||
"session_id": "ses_xxm_xmpp",
|
||
"server": "192.168.1.246", # LAN direct connect
|
||
"port": 5222,
|
||
"muc_rooms": [
|
||
"coregroup@conference.yoin.fun",
|
||
"jujidina@conference.yoin.fun",
|
||
],
|
||
"mention": "@xxm/@笑笑",
|
||
},
|
||
}
|
||
|
||
# ── Agent selection ──
|
||
_agent_name = "mohe"
|
||
if "--agent" in sys.argv:
|
||
idx = sys.argv.index("--agent")
|
||
if idx + 1 < len(sys.argv):
|
||
_agent_name = sys.argv[idx + 1]
|
||
cfg = AGENTS.get(_agent_name, AGENTS["mohe"])
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# PID Lock — prevent duplicate instances
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
from proc_guard import guard as _proc_guard
|
||
_lock = _proc_guard(f"xmpp_bot_{_agent_name}")
|
||
if not _lock.ok:
|
||
print(_lock.message, flush=True)
|
||
sys.exit(1)
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Logging
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
_LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "gateway", "logs")
|
||
os.makedirs(_LOG_DIR, exist_ok=True)
|
||
_LOG_FILE = os.path.join(_LOG_DIR, f"xmpp_{_agent_name}.log")
|
||
_START_TIME = time.time()
|
||
|
||
|
||
def log(m: str):
|
||
with open(_LOG_FILE, "a", encoding="utf-8") as f:
|
||
f.write(f"{time.strftime('%H:%M:%S')} {m}\n")
|
||
|
||
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# LLM Bridge Init — abstracted per agent
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
_IS_CHAT_BRIDGE = cfg.get("bridge") == "chat_bridge"
|
||
_router = None # set only for chat_bridge (xxm)
|
||
|
||
if _IS_CHAT_BRIDGE:
|
||
from chat_bridge import SessionBridge
|
||
from session_router import SessionRouter
|
||
_bridge = SessionBridge(session_id=cfg["session_id"])
|
||
_router = SessionRouter(bridge=_bridge, default_session=cfg["session_id"])
|
||
log(f"LLM: chat_bridge (session={cfg['session_id']})")
|
||
else:
|
||
_opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
|
||
log(f"LLM: Hermes API ({cfg['gateway']})")
|
||
|
||
|
||
def _call_llm(content: str, sender: str, is_group: bool = False) -> str:
|
||
"""Abstract LLM call. Returns raw response text (or empty string)."""
|
||
if _IS_CHAT_BRIDGE:
|
||
return _router.route("xmpp", sender, content) or ""
|
||
else:
|
||
return _call_hermes_api(content)
|
||
|
||
|
||
def _call_hermes_api(content: str) -> str:
|
||
"""POST to Hermes API, return response text or empty string."""
|
||
try:
|
||
payload = json.dumps({
|
||
"model": "hermes-agent",
|
||
"messages": [{"role": "user", "content": content}]
|
||
}).encode()
|
||
req = urllib.request.Request(cfg["gateway"], data=payload, method="POST")
|
||
req.add_header("Content-Type", "application/json")
|
||
req.add_header("Authorization", "Bearer hermes123")
|
||
req.add_header("X-Hermes-Session-Id", cfg["session_id"])
|
||
result = _opener.open(req, timeout=600)
|
||
data = json.loads(result.read())
|
||
reply = data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
||
return reply.strip()
|
||
except Exception as e:
|
||
log(f"!!! Hermes API error: {e}")
|
||
return ""
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Message Dedup
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
_DEDUP_CACHE: set[str] = set()
|
||
_DEDUP_LOCK = threading.Lock()
|
||
|
||
|
||
def _is_duplicate(msg_id: str) -> bool:
|
||
if not msg_id:
|
||
return False
|
||
with _DEDUP_LOCK:
|
||
if msg_id in _DEDUP_CACHE:
|
||
return True
|
||
_DEDUP_CACHE.add(msg_id)
|
||
if len(_DEDUP_CACHE) > 100:
|
||
_DEDUP_CACHE.clear()
|
||
return False
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Coordinator Protocol (shared across all agents)
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
_COORDINATOR: str = "mohe"
|
||
_GRANTED: str | None = None
|
||
_REVOKED_UNTIL: float = 0.0
|
||
_SHUTUP_PATTERNS = ["闭嘴", "别说话", "安静", "shut", "stfu", "别说了", "停"]
|
||
|
||
|
||
def _process_coordinator_signals(nickname: str, body: str) -> bool:
|
||
"""Parse coordinator/GRANT/REVOKE from incoming messages.
|
||
Returns True if message was a control signal (consumed, no further processing)."""
|
||
global _COORDINATOR, _GRANTED, _REVOKED_UNTIL
|
||
# 1. hmo switches coordinator
|
||
if nickname == 'hmo' and 'coordinator=' in body.lower():
|
||
for name in ('mohe', 'zhiwei', 'xxm'):
|
||
if f'coordinator={name}' in body.lower():
|
||
_COORDINATOR = name
|
||
_GRANTED = None
|
||
log(f"Coordinator switched to {name} by hmo")
|
||
return True
|
||
# 2. GRANT signal (overrides REVOKE)
|
||
gm = re.search(r'\[GRANT:(\w+)\]', body)
|
||
if gm:
|
||
_GRANTED = gm.group(1)
|
||
_REVOKED_UNTIL = 0
|
||
log(f"GRANT: {_GRANTED}")
|
||
return True
|
||
# 3. REVOKE signal (5min auto-restore)
|
||
rm = re.search(r'\[REVOKE:(\w+)\]', body)
|
||
if rm and rm.group(1) == cfg["nick"]:
|
||
_REVOKED_UNTIL = time.time() + 300
|
||
log(f"REVOKEd: {cfg['nick']} silenced for 5min")
|
||
return True
|
||
return False
|
||
|
||
|
||
def _check_shutup(body: str) -> bool:
|
||
"""hmo says shut up → 5min silence."""
|
||
lower = body.lower().strip()
|
||
for pat in _SHUTUP_PATTERNS:
|
||
if pat.lower() in lower:
|
||
_REVOKED_UNTIL = time.time() + 300
|
||
log(f"(shutup: '{pat}' → 5min silence)")
|
||
return True
|
||
return False
|
||
|
||
|
||
def _process_llm_grant(response: str):
|
||
"""Parse GRANT signal from LLM's own response."""
|
||
global _GRANTED
|
||
gm = re.search(r'\[GRANT:(\w+)\]', response)
|
||
if gm:
|
||
_GRANTED = gm.group(1)
|
||
_REVOKED_UNTIL = 0
|
||
log(f"LLM GRANT: {_GRANTED}")
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Response Extraction
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
_SILENCE_PATTERNS = [
|
||
"保持沉默", "不应[该]?回复", "没有.*@.*我", "不是对[我我说]",
|
||
"跟我无关", "我不用回复", "不该回复", "不参与",
|
||
"不是我[应]?该[说回]",
|
||
]
|
||
_EXEC_RE = re.compile(r"##exec:(.+?)##", re.DOTALL)
|
||
_DELAY_RE = re.compile(r"##delay:?(\d+)?##")
|
||
_DELAY_DEFAULT = 15
|
||
_EXEC_TIMEOUT = 60
|
||
|
||
|
||
def _strip_toolcall_xml(text: str) -> str:
|
||
t = text
|
||
t = re.sub(r'<invoke\s+[^>]*>.*?(</invoke>|$)', '', t, flags=re.DOTALL)
|
||
t = re.sub(r'<tool_calls>.*?(</tool_calls>|$)', '', t, flags=re.DOTALL)
|
||
t = re.sub(r'<parameter\s+[^>]*>.*?(</parameter>|$)', '', t, flags=re.DOTALL)
|
||
t = re.sub(r'<result>.*?(</result>|$)', '', t, flags=re.DOTALL)
|
||
return t.strip()
|
||
|
||
|
||
def _extract_response(text: str) -> str | None:
|
||
"""Strip __SILENT__, reasoning blocks, natural language silence.
|
||
Returns actual content to send, or None to stay silent."""
|
||
if not text:
|
||
return None
|
||
t = text.strip()
|
||
if not t:
|
||
return None
|
||
t = _strip_toolcall_xml(t)
|
||
# Natural language silence detection
|
||
if not t.startswith("__SILENT__"):
|
||
first = t.split("\n", 1)[0]
|
||
for pat in _SILENCE_PATTERNS:
|
||
if re.search(pat, first):
|
||
return None
|
||
return t
|
||
# Has __SILENT__ prefix
|
||
parts = t.split("\n", 1)
|
||
if len(parts) < 2:
|
||
return None
|
||
rest = parts[1].strip()
|
||
while True:
|
||
m = re.match(r'^([^)]*)\s*', rest)
|
||
if m:
|
||
rest = rest[m.end():]
|
||
continue
|
||
m = re.match(r'^\([^)]*\)\s*', rest)
|
||
if m:
|
||
rest = rest[m.end():]
|
||
continue
|
||
break
|
||
return rest.strip() or None
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Message Batching (3s debounce + serialized processing)
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
_BATCH_WINDOW = 3.0
|
||
_BATCH_TIMEOUT = 300
|
||
_batch_entries: dict[str, list[str]] = {}
|
||
_batch_timers: dict[str, threading.Timer] = {}
|
||
_batch_processing: set[str] = set()
|
||
_batch_pending: dict[str, list[str]] = {}
|
||
_batch_lock = threading.Lock()
|
||
_BOT_NICK = cfg["nick"]
|
||
|
||
|
||
def _run_command(cmd: str) -> str:
|
||
"""Execute ##exec:command## shell command."""
|
||
log(f"(exec: {cmd[:120]})")
|
||
try:
|
||
import subprocess
|
||
r = subprocess.run(cmd, shell=True, capture_output=True,
|
||
timeout=_EXEC_TIMEOUT, text=True, encoding='utf-8', errors='replace')
|
||
out = (r.stdout or "") + (r.stderr or "")
|
||
out = out.strip() or f"(no output, exit={r.returncode})"
|
||
log(f"(exec done: {len(out)} bytes, exit={r.returncode})")
|
||
return out
|
||
except subprocess.TimeoutExpired:
|
||
log(f"(exec timeout >{_EXEC_TIMEOUT}s)")
|
||
return "(命令超时)"
|
||
except Exception as e:
|
||
log(f"(exec error: {e})")
|
||
return f"(命令执行失败: {e})"
|
||
|
||
|
||
def _schedule_delayed(delay_sec: int, room: str):
|
||
"""Schedule ##delay:N## re-invocation."""
|
||
global _xmpp_ref
|
||
import subprocess as _sp
|
||
|
||
def _fire():
|
||
bot = _xmpp_ref
|
||
if not bot:
|
||
return
|
||
try:
|
||
prompt = "时间到,请根据最新的信息汇报结果。"
|
||
raw = _call_llm(prompt, room, is_group=True)
|
||
reply = _extract_response(raw)
|
||
if reply:
|
||
text = reply.strip()
|
||
bot.send_message(mto=room, mbody=text, mtype='groupchat')
|
||
log(f"-> [Delay][{room}]: {text[:80]}")
|
||
except Exception as e:
|
||
log(f"!! delay err: {e}")
|
||
|
||
t = threading.Timer(delay_sec, _fire)
|
||
t.daemon = True
|
||
t.start()
|
||
log(f"(delay +{delay_sec}s → {room})")
|
||
|
||
|
||
def _batch_done(room: str):
|
||
"""Called when batch LLM finishes. Flush pending if any."""
|
||
with _batch_lock:
|
||
_batch_processing.discard(room)
|
||
pending = _batch_pending.pop(room, None)
|
||
if pending:
|
||
_batch_entries[room] = pending
|
||
t = threading.Timer(0.1, _fire_batch, args=[room])
|
||
t.daemon = True
|
||
t.start()
|
||
_batch_timers[room] = t
|
||
return
|
||
log(f"[Batch][{room}] (idle)")
|
||
|
||
|
||
def _fire_batch(room: str):
|
||
"""Collect batched entries and call LLM."""
|
||
with _batch_lock:
|
||
entries = _batch_entries.pop(room, None)
|
||
_batch_timers.pop(room, None)
|
||
if not entries:
|
||
return
|
||
_batch_processing.add(room)
|
||
combined = "\n".join(entries)
|
||
|
||
def _handle():
|
||
timed_out = [False]
|
||
|
||
def _timeout():
|
||
timed_out[0] = True
|
||
log(f"[Batch][{room}] TIMEOUT ({_BATCH_TIMEOUT}s)")
|
||
_batch_done(room)
|
||
|
||
timer = threading.Timer(_BATCH_TIMEOUT, _timeout)
|
||
timer.daemon = True
|
||
timer.start()
|
||
try:
|
||
raw = _call_llm(combined, room, is_group=True)
|
||
if not timed_out[0]:
|
||
timer.cancel()
|
||
_process_llm_reply(raw, room)
|
||
else:
|
||
log(f"[Batch][{room}] route returned after timeout, discarded")
|
||
except Exception as e:
|
||
log(f"!!! BATCH: {e}")
|
||
if not timed_out[0]:
|
||
timer.cancel()
|
||
_batch_done(room)
|
||
|
||
threading.Thread(target=_handle, daemon=True).start()
|
||
|
||
|
||
def _batch_group_message(room: str, nickname: str, body: str) -> bool:
|
||
"""Add message to room batch. Returns True if batched, False if @mention (immediate)."""
|
||
if f"@{_BOT_NICK}" in body or body.startswith(_BOT_NICK):
|
||
return False
|
||
formatted = f"[{nickname}]: {body}"
|
||
with _batch_lock:
|
||
if room in _batch_processing:
|
||
_batch_pending.setdefault(room, []).append(formatted)
|
||
return True
|
||
timer = _batch_timers.pop(room, None)
|
||
if timer:
|
||
timer.cancel()
|
||
_batch_entries.setdefault(room, []).append(formatted)
|
||
t = threading.Timer(_BATCH_WINDOW, _fire_batch, args=[room])
|
||
t.daemon = True
|
||
t.start()
|
||
_batch_timers[room] = t
|
||
return True
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# MAM Recovery — fetch recent history on startup
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
_MAM_RECOVERY = True
|
||
_MAM_RECOVERY_LOCK = threading.Lock()
|
||
_MAM_MARK_DONE = False
|
||
_MAM_TIMEOUT = 30
|
||
|
||
|
||
def _set_mam_done():
|
||
global _MAM_RECOVERY
|
||
with _MAM_RECOVERY_LOCK:
|
||
_MAM_RECOVERY = False
|
||
|
||
|
||
def _is_mam_recovery() -> bool:
|
||
if time.time() - _START_TIME > _MAM_TIMEOUT:
|
||
global _MAM_RECOVERY
|
||
with _MAM_RECOVERY_LOCK:
|
||
if _MAM_RECOVERY:
|
||
_MAM_RECOVERY = False
|
||
log("(MAM recovery timed out, force-disabled)")
|
||
return _MAM_RECOVERY
|
||
with _MAM_RECOVERY_LOCK:
|
||
return _MAM_RECOVERY
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# XML Escape
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
def _escape(text: str) -> str:
|
||
return (text.replace("&", "&").replace("<", "<")
|
||
.replace(">", ">").replace('"', """))
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# HTTP Bridge — health, presence, messages, POST send
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
_HTTP_PORT = cfg["http_port"]
|
||
_MSG_BUF: list[dict] = []
|
||
_MSG_BUF_LOCK = threading.Lock()
|
||
_xmpp_ref = None # set after bot creation
|
||
|
||
|
||
def _record_group_msg(nickname: str, body: str):
|
||
ts = time.strftime("%H:%M:%S")
|
||
with _MSG_BUF_LOCK:
|
||
_MSG_BUF.append({"ts": ts, "from": nickname, "body": body})
|
||
if len(_MSG_BUF) > 200:
|
||
_MSG_BUF[:] = _MSG_BUF[-150:]
|
||
|
||
|
||
class _BridgeHandler(http.server.BaseHTTPRequestHandler):
|
||
def do_GET(self):
|
||
parsed = urllib.parse.urlparse(self.path)
|
||
if parsed.path == "/muc":
|
||
try:
|
||
muc_info = {"rooms": {}}
|
||
bot = _xmpp_ref
|
||
if bot is not None and 'xep_0045' in bot.plugin:
|
||
muc_plugin = bot.plugin['xep_0045']
|
||
for room_jid in cfg["muc_rooms"]:
|
||
room_data = {"jid": room_jid, "participants": []}
|
||
try:
|
||
if room_jid in muc_plugin.rooms:
|
||
room = muc_plugin.rooms[room_jid]
|
||
for nick, info in room.get('roster', {}).items():
|
||
room_data["participants"].append({
|
||
"nick": nick,
|
||
"jid": str(info.get('jid', '')),
|
||
"affiliation": str(info.get('affiliation', '')),
|
||
"role": str(info.get('role', '')),
|
||
})
|
||
except Exception as e:
|
||
room_data["error"] = str(e)
|
||
muc_info["rooms"][room_jid] = room_data
|
||
self._reply(200, muc_info)
|
||
except Exception as e:
|
||
self._reply(500, {"ok": False, "error": str(e)})
|
||
return
|
||
if parsed.path == "/health":
|
||
try:
|
||
bot = _xmpp_ref
|
||
session_ok = bot.session_started_event.is_set() if (bot and hasattr(bot, 'session_started_event')) else False
|
||
socket_ok = bot.is_connected() if (bot and hasattr(bot, 'is_connected')) else False
|
||
self._reply(200, {
|
||
"ok": True, "xmpp_connected": session_ok or socket_ok,
|
||
"agent": _agent_name, "jid": cfg["jid"],
|
||
"uptime_sec": int(time.time() - _START_TIME),
|
||
"muc_rooms": cfg["muc_rooms"],
|
||
})
|
||
except Exception as e:
|
||
self._reply(500, {"ok": False, "error": str(e)})
|
||
return
|
||
if parsed.path.startswith("/presence"):
|
||
jid_to_check = parsed.path[len("/presence/"):].strip()
|
||
if not jid_to_check:
|
||
self._reply(400, {"ok": False, "error": "missing JID"})
|
||
return
|
||
try:
|
||
info = {"jid": jid_to_check, "online": False, "resources": []}
|
||
bot = _xmpp_ref
|
||
if bot and hasattr(bot, 'client_roster'):
|
||
roster = bot.client_roster
|
||
if jid_to_check in roster:
|
||
resources = list(roster[jid_to_check].resources.keys())
|
||
info["online"] = len(resources) > 0
|
||
info["resources"] = resources
|
||
self._reply(200, info)
|
||
except Exception as e:
|
||
self._reply(500, {"ok": False, "error": str(e)})
|
||
return
|
||
if parsed.path == "/messages":
|
||
try:
|
||
qs = urllib.parse.parse_qs(parsed.query)
|
||
sender = qs.get("from", [None])[0]
|
||
with _MSG_BUF_LOCK:
|
||
msgs = list(_MSG_BUF)
|
||
if sender:
|
||
msgs = [m for m in msgs if m["from"] == sender]
|
||
self._reply(200, {"ok": True, "count": len(msgs), "messages": msgs[-50:]})
|
||
except Exception as e:
|
||
self._reply(500, {"ok": False, "error": str(e)})
|
||
return
|
||
self._reply(404, {"ok": False, "error": "not found"})
|
||
|
||
def do_POST(self):
|
||
try:
|
||
length = int(self.headers.get('Content-Length', 0))
|
||
body = json.loads(self.rfile.read(length))
|
||
to = body.get('to', cfg["muc_rooms"][0])
|
||
msg = body.get('message', '') or body.get('body', '')
|
||
if not msg:
|
||
self._reply(400, {"ok": False, "error": "empty message"})
|
||
return
|
||
safe = _escape(msg.strip())
|
||
bot = _xmpp_ref
|
||
if bot:
|
||
bot.send_message(mto=to, mbody=msg.strip(), mtype='groupchat')
|
||
_record_group_msg(cfg["nick"], msg)
|
||
log(f"[http] → [{to.split('@')[0]}]: {msg[:80]}")
|
||
self._reply(200, {"ok": True})
|
||
except Exception as e:
|
||
self._reply(500, {"ok": False, "error": str(e)})
|
||
|
||
def _reply(self, code, data):
|
||
body = json.dumps(data, ensure_ascii=False).encode('utf-8')
|
||
self.send_response(code)
|
||
self.send_header('Content-Type', 'application/json; charset=utf-8')
|
||
self.send_header('Content-Length', len(body))
|
||
self.end_headers()
|
||
self.wfile.write(body)
|
||
|
||
def log_message(self, format, *args):
|
||
pass
|
||
|
||
|
||
def _start_http_bridge():
|
||
_httpd = http.server.HTTPServer(('0.0.0.0', _HTTP_PORT), _BridgeHandler)
|
||
_t = threading.Thread(target=_httpd.serve_forever, daemon=True)
|
||
_t.start()
|
||
log(f"HTTP bridge ready on :{_HTTP_PORT}")
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Reply Processing (shared for all LLM responses)
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
def _process_llm_reply(raw_reply: str, room: str):
|
||
"""Process LLM response: check silence/delay/exec/send."""
|
||
global _xmpp_ref
|
||
if not raw_reply:
|
||
_batch_done(room)
|
||
return
|
||
# Parse GRANT signal from LLM response
|
||
_process_llm_grant(raw_reply)
|
||
# ##delay:N## → schedule later
|
||
delay_m = _DELAY_RE.search(raw_reply)
|
||
if delay_m:
|
||
sec = int(delay_m.group(1)) if delay_m.group(1) else _DELAY_DEFAULT
|
||
_schedule_delayed(sec, room)
|
||
_batch_done(room)
|
||
return
|
||
# ##exec:command## → run command, use output as reply
|
||
exec_m = _EXEC_RE.search(raw_reply)
|
||
if exec_m:
|
||
output = _run_command(exec_m.group(1))
|
||
raw_reply = _EXEC_RE.sub(output, raw_reply, count=1)
|
||
# Extract actual response
|
||
reply_text = _extract_response(raw_reply)
|
||
if reply_text:
|
||
text = reply_text.strip()
|
||
bot = _xmpp_ref
|
||
if bot:
|
||
bot.send_message(mto=room, mbody=text, mtype='groupchat')
|
||
log(f"-> [{room.split('@')[0]}]: {text[:80]}")
|
||
else:
|
||
log(f"-> [{room.split('@')[0]}]: (silent)")
|
||
_batch_done(room)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Group message handler
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
def _handle_group_message(msg):
|
||
"""Process a groupchat message (runs in thread)."""
|
||
global _COORDINATOR, _GRANTED, _REVOKED_UNTIL
|
||
if _is_mam_recovery():
|
||
return
|
||
msg_id = msg.get("id", "")
|
||
if _is_duplicate(msg_id):
|
||
return
|
||
body = str(msg["body"]).strip()
|
||
if not body:
|
||
return
|
||
full_from = str(msg["from"])
|
||
room = full_from.split("/")[0]
|
||
nickname = full_from.split("/")[1] if "/" in full_from else ""
|
||
# Self-message skip
|
||
if nickname == cfg["nick"]:
|
||
log(f"(self) {body[:80]}")
|
||
return
|
||
_record_group_msg(nickname, body)
|
||
# Coordinator signals
|
||
if _process_coordinator_signals(nickname, body):
|
||
return
|
||
# Revoke check
|
||
is_revoked = time.time() < _REVOKED_UNTIL
|
||
if is_revoked and _GRANTED == cfg["nick"]:
|
||
_GRANTED = None
|
||
is_revoked = False
|
||
log(f"GRANT overrides REVOKE for {cfg['nick']}")
|
||
if _check_shutup(body):
|
||
return
|
||
if is_revoked:
|
||
body = f"【只读消息】你被收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {room}] {nickname} 说: {body}"
|
||
# Batch or immediate (@mention)
|
||
if _batch_group_message(room, nickname, body):
|
||
log(f"[{room.split('@')[0]}] {nickname}: {body[:80]} (batched)")
|
||
return
|
||
log(f"[{room.split('@')[0]}] {nickname}: {body[:80]}")
|
||
raw = _call_llm(body, full_from, is_group=True)
|
||
_process_llm_reply(raw, room)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Private message handler
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
def _handle_private_message(msg):
|
||
"""Process a private chat message."""
|
||
if msg["type"] == "groupchat":
|
||
return
|
||
msg_id = msg.get("id", "")
|
||
if _is_duplicate(msg_id):
|
||
return
|
||
body = str(msg["body"]).strip()
|
||
sender = str(msg["from"]).split("/")[0]
|
||
log(f"<{sender}> {body[:80]}")
|
||
if sender == cfg["jid"]:
|
||
log("(skipped self)")
|
||
return
|
||
if time.time() < _REVOKED_UNTIL:
|
||
log(f"(silenced) <{sender}> dropped")
|
||
return
|
||
if _check_shutup(body):
|
||
return
|
||
raw = _call_llm(body, sender, is_group=False)
|
||
if raw:
|
||
reply = _extract_response(raw)
|
||
if reply:
|
||
text = reply.strip()
|
||
bot = _xmpp_ref
|
||
if bot:
|
||
bot.send_message(mto=sender, mbody=text, mtype='chat')
|
||
log(f"-> {sender}: {text[:80]}")
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# AgentBot Class
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
import slixmpp
|
||
|
||
|
||
class AgentBot(slixmpp.ClientXMPP):
|
||
def __init__(self):
|
||
super().__init__(cfg["jid"], cfg["password"])
|
||
# Connection settings
|
||
self.enable_direct_tls = False
|
||
self.enable_starttls = True
|
||
self.auto_reconnect = True
|
||
self.reconnect_max_delay = 10
|
||
self.whitespace_keepalive = True
|
||
self.whitespace_keepalive_interval = 30
|
||
# SSL: accept self-signed certs
|
||
ctx = ssl.create_default_context()
|
||
ctx.check_hostname = False
|
||
ctx.verify_mode = ssl.CERT_NONE
|
||
self.ssl_context = ctx
|
||
# Event handlers
|
||
self.add_event_handler("session_start", self._on_session_start)
|
||
self.add_event_handler("message", self._on_any_message)
|
||
self.add_event_handler("groupchat_message", self._on_group_msg)
|
||
self.add_event_handler("disconnected", self._on_disconnected)
|
||
self.add_event_handler("connected", self._on_connected)
|
||
self.add_event_handler("session_end", self._on_session_end)
|
||
self.add_event_handler("connection_failed", self._on_conn_failed)
|
||
# MUC plugin
|
||
self.register_plugin('xep_0045')
|
||
|
||
def _on_connected(self, event):
|
||
log("connection established")
|
||
|
||
def _on_session_start(self, event):
|
||
self.send_presence()
|
||
self.get_roster()
|
||
log(f"{cfg['jid']} online")
|
||
# Register MAM plugin lazily
|
||
try:
|
||
self.register_plugin('xep_0313')
|
||
except Exception:
|
||
log("(MAM: xep_0313 not available)")
|
||
# Join MUC rooms
|
||
async def _join_all():
|
||
for room_jid in cfg["muc_rooms"]:
|
||
try:
|
||
self.plugin['xep_0045'].join_muc(room_jid, cfg["nick"])
|
||
presence = (
|
||
f"<presence to='{room_jid}/{cfg['nick']}'>"
|
||
f"<x xmlns='http://jabber.org/protocol/muc'>"
|
||
f"<history maxstanzas='0'/>"
|
||
f"</x></presence>"
|
||
)
|
||
self.send_raw(presence)
|
||
log(f"Joined {room_jid}")
|
||
except Exception as e:
|
||
log(f"MUC join failed {room_jid}: {e}")
|
||
await asyncio.sleep(2)
|
||
await asyncio.sleep(3)
|
||
await self._fetch_mam_history()
|
||
asyncio.ensure_future(_join_all())
|
||
|
||
async def _fetch_mam_history(self):
|
||
"""Query MAM for recent MUC messages to rebuild context."""
|
||
if 'xep_0313' not in self.plugin:
|
||
log("(MAM: no plugin)")
|
||
_set_mam_done()
|
||
return
|
||
# MAM recovery used in _on_session_start
|
||
try:
|
||
for room_jid in cfg["muc_rooms"]:
|
||
log(f"(MAM: querying {room_jid} for last 50 messages...)")
|
||
results = await self.plugin['xep_0313'].retrieve(
|
||
jid=room_jid, rsm={'max': 50},
|
||
)
|
||
count = 0
|
||
for msg in results['mam']['results']:
|
||
forwarded = msg['mam_result']['forwarded']
|
||
body = str(forwarded['stanza']['body'] or '').strip()
|
||
if not body:
|
||
continue
|
||
nick = str(forwarded['stanza']['from']).split('/')[-1] if '/' in str(forwarded['stanza']['from']) else '?'
|
||
# Feed into context for chat_bridge (xxm)
|
||
if _IS_CHAT_BRIDGE:
|
||
role = 'user' if nick != cfg["nick"] else 'assistant'
|
||
try:
|
||
_bridge._append_to_log(role, f"[{nick}]: {body[:300]}")
|
||
except Exception:
|
||
pass
|
||
count += 1
|
||
log(f"(MAM: loaded {count} msgs from {room_jid})")
|
||
_set_mam_done()
|
||
log("(MAM recovery complete)")
|
||
except Exception as e:
|
||
log(f"(MAM error: {e})")
|
||
_set_mam_done()
|
||
|
||
def _on_group_msg(self, msg):
|
||
threading.Thread(target=_handle_group_message, args=[msg], daemon=True).start()
|
||
|
||
def _on_any_message(self, msg):
|
||
threading.Thread(target=_handle_private_message, args=[msg], daemon=True).start()
|
||
|
||
def _on_session_end(self, event):
|
||
log(f"session ended")
|
||
|
||
def _on_conn_failed(self, event):
|
||
log(f"connection failed: {event}")
|
||
|
||
def _on_disconnected(self, event):
|
||
log(f"disconnected, reconnecting...")
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Main
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
def main():
|
||
log(f"Starting {cfg['jid']} ({cfg['name_cn']}) — agent={_agent_name}")
|
||
if _IS_CHAT_BRIDGE:
|
||
log(f" LLM: chat_bridge (session={cfg['session_id']})")
|
||
else:
|
||
log(f" LLM: Hermes API ({cfg['gateway']})")
|
||
log(f" Server: {cfg['server']}:{cfg['port']}")
|
||
log(f" Rooms: {cfg['muc_rooms']}")
|
||
|
||
bot = AgentBot()
|
||
global _xmpp_ref
|
||
_xmpp_ref = bot
|
||
|
||
_start_http_bridge()
|
||
|
||
bot.connect(host=cfg["server"], port=cfg["port"])
|
||
log(f"Connecting {cfg['jid']}@{cfg['server']}:{cfg['port']}")
|
||
|
||
loop = asyncio.get_event_loop()
|
||
|
||
async def _status_check():
|
||
while True:
|
||
await asyncio.sleep(60)
|
||
log("(alive)")
|
||
|
||
asyncio.ensure_future(_status_check())
|
||
|
||
try:
|
||
loop.run_forever()
|
||
except KeyboardInterrupt:
|
||
log("Shutdown by user")
|
||
except Exception as e:
|
||
log(f"!!! MAIN LOOP CRASH: {e}")
|
||
import traceback
|
||
log(f"!!! {traceback.format_exc()[:500]}")
|
||
raise
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|