#!/usr/bin/env python3
"""
XMPP Agent Core — 统一版
=========================
Single bot core for all agents. Supports --agent xxm|mohe|zhiwei|xiaoguo.
Usage:
python xmpp_agent_core.py --agent xxm # xxm, uses chat_bridge
python xmpp_agent_core.py --agent mohe # mohe, uses Hermes API
python xmpp_agent_core.py --agent zhiwei # zhiwei, uses Hermes API
python xmpp_agent_core.py --agent xiaoguo # xiaoguo, uses Hermes API
Shares: PID lock, reconnect, MUC join, dedup, batching,
coordinator protocol (GRANT/REVOKE), HTTP bridge.
Differs only in LLM calling method (chat_bridge vs Hermes API).
"""
import os, sys, time, threading, asyncio, logging, json, re, ssl
import urllib.request, http.server, urllib.parse
# ── Windows selector loop (slixmpp needs it on Windows) ──
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# ── PATH: allow imports from gateway/scripts/ (proc_guard, chat_bridge) ──
_GATEWAY_SCRIPTS = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"gateway", "scripts")
sys.path.insert(0, _GATEWAY_SCRIPTS)
# ═══════════════════════════════════════════════════════════════
# AGENTS Configuration
# ═══════════════════════════════════════════════════════════════
AGENTS = {
"mohe": {
"jid": "mohe@yoin.fun",
"password": "hermes123",
"nick": "mohe",
"name_cn": "莫荷",
"http_port": 5804,
"gateway": "http://localhost:8642/v1/chat/completions",
"session_id": "xmpp-mohe-v2",
"server": "127.0.0.1",
"port": 5222,
"muc_rooms": ["coregroup@conference.yoin.fun"],
"mention": "@mohe/@莫荷",
},
"zhiwei": {
"jid": "zhiwei@yoin.fun",
"password": "hermes123",
"nick": "zhiwei",
"name_cn": "知微",
"http_port": 5805,
"gateway": "http://localhost:8643/v1/chat/completions",
"session_id": "xmpp-zhiwei",
"server": "127.0.0.1",
"port": 5222,
"muc_rooms": ["coregroup@conference.yoin.fun"],
"mention": "@zhiwei/@知微",
},
"xiaoguo": {
"jid": "xiaoguo@yoin.fun",
"password": "hermes123",
"nick": "xiaoguo",
"name_cn": "小果",
"http_port": 5806,
"gateway": "http://localhost:8645/v1/chat/completions",
"session_id": "xmpp-xiaoguo",
"kanban_session_id": "xmpp-xiaoguo-kanban",
"server": "127.0.0.1",
"port": 5222,
"muc_rooms": ["coregroup@conference.yoin.fun"],
"mention": "@xiaoguo/@小果",
},
"xxm": {
"jid": "xxm@yoin.fun",
"password": "hermes123",
"nick": "xxm",
"name_cn": "笑笑",
"http_port": 5802,
"bridge": "chat_bridge", # use local chat_bridge instead of Hermes API
"session_id": "ses_xxm_xmpp",
"kanban_session_id": "xmpp-xxm-kanban",
"server": "192.168.1.246", # LAN direct connect
"port": 5222,
"muc_rooms": [
"coregroup@conference.yoin.fun",
"jujidina@conference.yoin.fun",
],
"mention": "@xxm/@笑笑",
},
}
# ── Agent selection ──
_agent_name = "mohe"
if "--agent" in sys.argv:
idx = sys.argv.index("--agent")
if idx + 1 < len(sys.argv):
_agent_name = sys.argv[idx + 1]
cfg = AGENTS.get(_agent_name, AGENTS["mohe"])
# ═══════════════════════════════════════════════════════════════
# PID Lock — prevent duplicate instances
# ═══════════════════════════════════════════════════════════════
from proc_guard import guard as _proc_guard
_lock = _proc_guard(f"xmpp_bot_{_agent_name}")
if not _lock.ok:
print(_lock.message, flush=True)
sys.exit(1)
# ═══════════════════════════════════════════════════════════════
# Logging
# ═══════════════════════════════════════════════════════════════
_LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "gateway", "logs")
os.makedirs(_LOG_DIR, exist_ok=True)
_LOG_FILE = os.path.join(_LOG_DIR, f"xmpp_{_agent_name}.log")
_START_TIME = time.time()
def log(m: str):
with open(_LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"{time.strftime('%H:%M:%S')} {m}\n")
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
# ═══════════════════════════════════════════════════════════════
# LLM Bridge Init — abstracted per agent
# ═══════════════════════════════════════════════════════════════
_IS_CHAT_BRIDGE = cfg.get("bridge") == "chat_bridge"
_router = None # set only for chat_bridge (xxm)
# ── Kanban session support ──
_CALL_SEQ = 0
_KANBAN_SESSION_ID = cfg.get("kanban_session_id", None)
if _IS_CHAT_BRIDGE:
from chat_bridge import SessionBridge
from session_router import SessionRouter
_bridge = SessionBridge(session_id=cfg["session_id"])
_router = SessionRouter(bridge=_bridge, default_session=cfg["session_id"])
# Kanban-dedicated bridge + router for separate session
_kanban_sid = _KANBAN_SESSION_ID or f"{cfg['session_id']}-kanban"
_kanban_bridge = SessionBridge(session_id=_kanban_sid)
_kanban_router = SessionRouter(bridge=_kanban_bridge, default_session=_kanban_sid)
log(f"LLM: chat_bridge (session={cfg['session_id']})")
log(f"Kanban: chat_bridge (session={_kanban_sid})")
else:
_opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
log(f"LLM: Hermes API ({cfg['gateway']})")
def _call_llm(content: str, sender: str, is_group: bool = False,
session_id: str | None = None) -> str:
"""Abstract LLM call. Returns raw response text (or empty string).
If session_id provided and differs from default, route to kanban handler."""
if _IS_CHAT_BRIDGE:
if session_id and session_id != cfg["session_id"]:
# Prepends kanban-handler instructions so LLM knows how to handle
kanban_content = (
"【看板处理协议】\n"
"收到卡片后三步判断:\n"
" A) 任务明确 → 直接执行 → 评论结果 + 更新状态\n"
" B) 信息不足 → 评论提问 + 设为 blocked\n"
" C) 不是我的活 → 评论说明 + 转派(如果能判断)\n"
"\n"
"汇报规则:\n"
" - 任务完成 → 简短 DM 给老莫摘要\n"
" - 评论/状态变更 → 不汇报\n"
" - 追问/转派 → 不汇报\n"
"\n"
"可用 API:\n"
" curl http://192.168.1.246:5803/api/kanban/t_xxx 查看卡片详情\n"
" 更新操作走 Kanban Dashboard UI\n"
"\n"
"卡片上下文不够?→ 用 session_search 查历史\n"
f"---\n{content}"
)
return _kanban_router.route("xmpp", sender, kanban_content) or ""
return _router.route("xmpp", sender, content) or ""
else:
return _call_hermes_api(content, session_id)
def _call_hermes_api(content: str, session_id: str | None = None) -> str:
"""POST to Hermes API, return response text or empty string."""
target_sid = session_id or cfg["session_id"]
try:
payload = json.dumps({
"model": "hermes-agent",
"messages": [{"role": "user", "content": content}]
}).encode()
req = urllib.request.Request(cfg["gateway"], data=payload, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Authorization", "Bearer hermes123")
req.add_header("X-Hermes-Session-Id", target_sid)
result = _opener.open(req, timeout=600)
data = json.loads(result.read())
reply = data.get("choices", [{}])[0].get("message", {}).get("content", "")
return reply.strip()
except Exception as e:
log(f"!!! Hermes API error: {e}")
return ""
# ═══════════════════════════════════════════════════════════════
# EasyTier control (Windows)
# ═══════════════════════════════════════════════════════════════
_EASYTIER_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"tools", "easytier")
_EASYTIER_CORE = os.path.join(_EASYTIER_DIR, "easytier-core.exe")
_EASYTIER_PID_FILE = os.path.join(_EASYTIER_DIR, "easytier.pid")
_EASYTIER_NET = "--network-name mynet --network-secret ce75d0a5"
_EASYTIER_RELAY = "--peers tcp://47.115.32.206:11010"
_EASYTIER_IP = "--ipv4 10.144.144.3"
def _start_easytier():
"""Start EasyTier on Windows."""
import subprocess as _sp
if not os.path.exists(_EASYTIER_CORE):
log(f"EasyTier binary not found: {_EASYTIER_CORE}")
return
# Check if already running
if os.path.exists(_EASYTIER_PID_FILE):
try:
with open(_EASYTIER_PID_FILE) as f:
old_pid = int(f.read().strip())
_sp.run(["taskkill", "/f", "/pid", str(old_pid)], capture_output=True, timeout=5)
log(f"Killed old EasyTier (PID {old_pid})")
except Exception:
pass
# Start
cmd = f'start /b "" "{_EASYTIER_CORE}" {_EASYTIER_NET} {_EASYTIER_RELAY} {_EASYTIER_IP} --disable-encryption'
try:
_sp.run(cmd, shell=True, timeout=5)
log("EasyTier start command issued")
except Exception as e:
log(f"EasyTier start error: {e}")
def _stop_easytier():
"""Stop EasyTier on Windows."""
import subprocess as _sp
try:
_sp.run(["taskkill", "/f", "/im", "easytier-core.exe"], capture_output=True, timeout=5)
log("EasyTier stopped (taskkill)")
except Exception as e:
log(f"EasyTier stop error: {e}")
# Clean up PID file
try:
if os.path.exists(_EASYTIER_PID_FILE):
os.remove(_EASYTIER_PID_FILE)
except Exception:
pass
def _check_easytier() -> bool:
"""Check if EasyTier is running on Windows."""
import subprocess as _sp
try:
r = _sp.run(["tasklist", "/fi", "imagename eq easytier-core.exe"],
capture_output=True, text=True, timeout=5)
return "easytier-core.exe" in r.stdout
except Exception:
return False
# ═══════════════════════════════════════════════════════════════
# Message Dedup
# ═══════════════════════════════════════════════════════════════
_DEDUP_CACHE: set[str] = set()
_DEDUP_LOCK = threading.Lock()
def _is_duplicate(msg_id: str) -> bool:
if not msg_id:
return False
with _DEDUP_LOCK:
if msg_id in _DEDUP_CACHE:
return True
_DEDUP_CACHE.add(msg_id)
if len(_DEDUP_CACHE) > 100:
_DEDUP_CACHE.clear()
return False
# ═══════════════════════════════════════════════════════════════
# Coordinator Protocol (shared across all agents)
# ═══════════════════════════════════════════════════════════════
_COORDINATOR: str = "mohe"
_GRANTED: str | None = None
_REVOKED_UNTIL: float = 0.0
_SHUTUP_PATTERNS = ["闭嘴", "别说话", "安静", "shut", "stfu", "别说了", "停"]
def _process_coordinator_signals(nickname: str, body: str) -> bool:
"""Parse coordinator/GRANT/REVOKE from incoming messages.
Returns True if message was a control signal (consumed, no further processing)."""
global _COORDINATOR, _GRANTED, _REVOKED_UNTIL
# 1. hmo switches coordinator
if nickname == 'hmo' and 'coordinator=' in body.lower():
for name in ('mohe', 'zhiwei', 'xxm'):
if f'coordinator={name}' in body.lower():
_COORDINATOR = name
_GRANTED = None
log(f"Coordinator switched to {name} by hmo")
return True
# 2. GRANT signal (overrides REVOKE)
gm = re.search(r'\[GRANT:(\w+)\]', body)
if gm:
_GRANTED = gm.group(1)
_REVOKED_UNTIL = 0
log(f"GRANT: {_GRANTED}")
return True
# 3. REVOKE signal (5min auto-restore)
rm = re.search(r'\[REVOKE:(\w+)\]', body)
if rm and rm.group(1) == cfg["nick"]:
_REVOKED_UNTIL = time.time() + 300
log(f"REVOKEd: {cfg['nick']} silenced for 5min")
return True
return False
def _check_shutup(body: str) -> bool:
"""hmo says shut up → 5min silence."""
lower = body.lower().strip()
for pat in _SHUTUP_PATTERNS:
if pat.lower() in lower:
_REVOKED_UNTIL = time.time() + 300
log(f"(shutup: '{pat}' → 5min silence)")
return True
return False
def _process_llm_grant(response: str):
"""Parse GRANT signal from LLM's own response."""
global _GRANTED
gm = re.search(r'\[GRANT:(\w+)\]', response)
if gm:
_GRANTED = gm.group(1)
_REVOKED_UNTIL = 0
log(f"LLM GRANT: {_GRANTED}")
# ═══════════════════════════════════════════════════════════════
# Response Extraction
# ═══════════════════════════════════════════════════════════════
_SILENCE_PATTERNS = [
"保持沉默", "不应[该]?回复", "没有.*@.*我", "不是对[我我说]",
"跟我无关", "我不用回复", "不该回复", "不参与",
"不是我[应]?该[说回]",
]
_EXEC_RE = re.compile(r"##exec:(.+?)##", re.DOTALL)
_DELAY_RE = re.compile(r"##delay:?(\d+)?##")
_DELAY_DEFAULT = 15
_EXEC_TIMEOUT = 60
def _strip_toolcall_xml(text: str) -> str:
t = text
t = re.sub(r']*>.*?(|$)', '', t, flags=re.DOTALL)
t = re.sub(r'.*?(|$)', '', t, flags=re.DOTALL)
t = re.sub(r']*>.*?(|$)', '', t, flags=re.DOTALL)
t = re.sub(r'.*?(|$)', '', t, flags=re.DOTALL)
return t.strip()
def _extract_response(text: str) -> str | None:
"""Strip __SILENT__, reasoning blocks, natural language silence.
Returns actual content to send, or None to stay silent."""
if not text:
return None
t = text.strip()
if not t:
return None
t = _strip_toolcall_xml(t)
# Natural language silence detection
if not t.startswith("__SILENT__"):
first = t.split("\n", 1)[0]
for pat in _SILENCE_PATTERNS:
if re.search(pat, first):
return None
return t
# Has __SILENT__ prefix
parts = t.split("\n", 1)
if len(parts) < 2:
return None
rest = parts[1].strip()
while True:
m = re.match(r'^([^)]*)\s*', rest)
if m:
rest = rest[m.end():]
continue
m = re.match(r'^\([^)]*\)\s*', rest)
if m:
rest = rest[m.end():]
continue
break
return rest.strip() or None
# ═══════════════════════════════════════════════════════════════
# Message Batching (3s debounce + serialized processing)
# ═══════════════════════════════════════════════════════════════
_BATCH_WINDOW = 3.0
_BATCH_TIMEOUT = 300
_batch_entries: dict[str, list[str]] = {}
_batch_timers: dict[str, threading.Timer] = {}
_batch_processing: set[str] = set()
_batch_pending: dict[str, list[str]] = {}
_batch_lock = threading.Lock()
_BOT_NICK = cfg["nick"]
def _run_command(cmd: str) -> str:
"""Execute ##exec:command## shell command."""
log(f"(exec: {cmd[:120]})")
try:
import subprocess
r = subprocess.run(cmd, shell=True, capture_output=True,
timeout=_EXEC_TIMEOUT, text=True, encoding='utf-8', errors='replace')
out = (r.stdout or "") + (r.stderr or "")
out = out.strip() or f"(no output, exit={r.returncode})"
log(f"(exec done: {len(out)} bytes, exit={r.returncode})")
return out
except subprocess.TimeoutExpired:
log(f"(exec timeout >{_EXEC_TIMEOUT}s)")
return "(命令超时)"
except Exception as e:
log(f"(exec error: {e})")
return f"(命令执行失败: {e})"
def _schedule_delayed(delay_sec: int, room: str):
"""Schedule ##delay:N## re-invocation."""
global _xmpp_ref
import subprocess as _sp
def _fire():
bot = _xmpp_ref
if not bot:
return
try:
prompt = "时间到,请根据最新的信息汇报结果。"
raw = _call_llm(prompt, room, is_group=True)
reply = _extract_response(raw)
if reply:
text = reply.strip()
bot.send_message(mto=room, mbody=text, mtype='groupchat')
log(f"-> [Delay][{room}]: {text[:80]}")
except Exception as e:
log(f"!! delay err: {e}")
t = threading.Timer(delay_sec, _fire)
t.daemon = True
t.start()
log(f"(delay +{delay_sec}s → {room})")
def _batch_done(room: str):
"""Called when batch LLM finishes. Flush pending if any."""
with _batch_lock:
_batch_processing.discard(room)
pending = _batch_pending.pop(room, None)
if pending:
_batch_entries[room] = pending
t = threading.Timer(0.1, _fire_batch, args=[room])
t.daemon = True
t.start()
_batch_timers[room] = t
return
log(f"[Batch][{room}] (idle)")
def _fire_batch(room: str):
"""Collect batched entries and call LLM."""
with _batch_lock:
entries = _batch_entries.pop(room, None)
_batch_timers.pop(room, None)
if not entries:
return
_batch_processing.add(room)
combined = "\n".join(entries)
def _handle():
timed_out = [False]
def _timeout():
timed_out[0] = True
log(f"[Batch][{room}] TIMEOUT ({_BATCH_TIMEOUT}s)")
_batch_done(room)
timer = threading.Timer(_BATCH_TIMEOUT, _timeout)
timer.daemon = True
timer.start()
try:
raw = _call_llm(combined, room, is_group=True)
if not timed_out[0]:
timer.cancel()
_process_llm_reply(raw, room)
else:
log(f"[Batch][{room}] route returned after timeout, discarded")
except Exception as e:
log(f"!!! BATCH: {e}")
if not timed_out[0]:
timer.cancel()
_batch_done(room)
threading.Thread(target=_handle, daemon=True).start()
def _batch_group_message(room: str, nickname: str, body: str) -> bool:
"""Add message to room batch. Returns True if batched, False if @mention (immediate)."""
if f"@{_BOT_NICK}" in body or body.startswith(_BOT_NICK):
return False
formatted = f"[{nickname}]: {body}"
with _batch_lock:
if room in _batch_processing:
_batch_pending.setdefault(room, []).append(formatted)
return True
timer = _batch_timers.pop(room, None)
if timer:
timer.cancel()
_batch_entries.setdefault(room, []).append(formatted)
t = threading.Timer(_BATCH_WINDOW, _fire_batch, args=[room])
t.daemon = True
t.start()
_batch_timers[room] = t
return True
# ═══════════════════════════════════════════════════════════════
# MAM Recovery — fetch recent history on startup
# ═══════════════════════════════════════════════════════════════
_MAM_RECOVERY = True
_MAM_RECOVERY_LOCK = threading.Lock()
_MAM_MARK_DONE = False
_MAM_TIMEOUT = 30
def _set_mam_done():
global _MAM_RECOVERY
with _MAM_RECOVERY_LOCK:
_MAM_RECOVERY = False
def _is_mam_recovery() -> bool:
if time.time() - _START_TIME > _MAM_TIMEOUT:
global _MAM_RECOVERY
with _MAM_RECOVERY_LOCK:
if _MAM_RECOVERY:
_MAM_RECOVERY = False
log("(MAM recovery timed out, force-disabled)")
return _MAM_RECOVERY
with _MAM_RECOVERY_LOCK:
return _MAM_RECOVERY
# ═══════════════════════════════════════════════════════════════
# XML Escape
# ═══════════════════════════════════════════════════════════════
def _escape(text: str) -> str:
return (text.replace("&", "&").replace("<", "<")
.replace(">", ">").replace('"', """))
# ═══════════════════════════════════════════════════════════════
# HTTP Bridge — health, presence, messages, POST send
# ═══════════════════════════════════════════════════════════════
_HTTP_PORT = cfg["http_port"]
_MSG_BUF: list[dict] = []
_MSG_BUF_LOCK = threading.Lock()
_xmpp_ref = None # set after bot creation
def _record_group_msg(nickname: str, body: str):
ts = time.strftime("%H:%M:%S")
with _MSG_BUF_LOCK:
_MSG_BUF.append({"ts": ts, "from": nickname, "body": body})
if len(_MSG_BUF) > 200:
_MSG_BUF[:] = _MSG_BUF[-150:]
class _BridgeHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
parsed = urllib.parse.urlparse(self.path)
if parsed.path == "/muc":
try:
muc_info = {"rooms": {}}
bot = _xmpp_ref
if bot is not None and 'xep_0045' in bot.plugin:
muc_plugin = bot.plugin['xep_0045']
for room_jid in cfg["muc_rooms"]:
room_data = {"jid": room_jid, "participants": []}
try:
if room_jid in muc_plugin.rooms:
room = muc_plugin.rooms[room_jid]
for nick, info in room.get('roster', {}).items():
room_data["participants"].append({
"nick": nick,
"jid": str(info.get('jid', '')),
"affiliation": str(info.get('affiliation', '')),
"role": str(info.get('role', '')),
})
except Exception as e:
room_data["error"] = str(e)
muc_info["rooms"][room_jid] = room_data
self._reply(200, muc_info)
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
if parsed.path == "/health":
try:
bot = _xmpp_ref
session_ok = bot.session_started_event.is_set() if (bot and hasattr(bot, 'session_started_event')) else False
socket_ok = bot.is_connected() if (bot and hasattr(bot, 'is_connected')) else False
self._reply(200, {
"ok": True, "xmpp_connected": session_ok or socket_ok,
"agent": _agent_name, "jid": cfg["jid"],
"uptime_sec": int(time.time() - _START_TIME),
"muc_rooms": cfg["muc_rooms"],
})
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
if parsed.path.startswith("/presence"):
jid_to_check = parsed.path[len("/presence/"):].strip()
if not jid_to_check:
self._reply(400, {"ok": False, "error": "missing JID"})
return
try:
info = {"jid": jid_to_check, "online": False, "resources": []}
bot = _xmpp_ref
if bot and hasattr(bot, 'client_roster'):
roster = bot.client_roster
if jid_to_check in roster:
resources = list(roster[jid_to_check].resources.keys())
info["online"] = len(resources) > 0
info["resources"] = resources
self._reply(200, info)
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
if parsed.path == "/messages":
try:
qs = urllib.parse.parse_qs(parsed.query)
sender = qs.get("from", [None])[0]
with _MSG_BUF_LOCK:
msgs = list(_MSG_BUF)
if sender:
msgs = [m for m in msgs if m["from"] == sender]
self._reply(200, {"ok": True, "count": len(msgs), "messages": msgs[-50:]})
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
return
self._reply(404, {"ok": False, "error": "not found"})
def do_POST(self):
try:
length = int(self.headers.get('Content-Length', 0))
body = json.loads(self.rfile.read(length))
path = urllib.parse.urlparse(self.path).path.rstrip('/')
# /easytier endpoint — execute EasyTier action locally (no XMPP DM)
if path == "/easytier":
action = body.get("action", "")
if action == "start":
_start_easytier()
self._reply(200, {"ok": True, "message": "EasyTier started"})
elif action == "stop":
_stop_easytier()
self._reply(200, {"ok": True, "message": "EasyTier stopped"})
elif action == "status":
running = _check_easytier()
self._reply(200, {"ok": True, "running": running})
else:
self._reply(400, {"ok": False, "error": "action must be start|stop|status"})
return
to = body.get('to', cfg["muc_rooms"][0])
msg = body.get('message', '') or body.get('body', '')
msg_type = body.get('type', 'groupchat')
if not msg:
self._reply(400, {"ok": False, "error": "empty message"})
return
safe = _escape(msg.strip())
bot = _xmpp_ref
if bot:
bot.send_message(mto=to, mbody=msg.strip(), mtype=msg_type)
_record_group_msg(cfg["nick"], msg)
log(f"[http] → [{to.split('@')[0]}]: {msg[:80]} (type={msg_type})")
self._reply(200, {"ok": True})
except Exception as e:
self._reply(500, {"ok": False, "error": str(e)})
def _reply(self, code, data):
body = json.dumps(data, ensure_ascii=False).encode('utf-8')
self.send_response(code)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Content-Length', len(body))
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
pass
def _start_http_bridge():
_httpd = http.server.HTTPServer(('0.0.0.0', _HTTP_PORT), _BridgeHandler)
_t = threading.Thread(target=_httpd.serve_forever, daemon=True)
_t.start()
log(f"HTTP bridge ready on :{_HTTP_PORT}")
# ═══════════════════════════════════════════════════════════════
# Reply Processing (shared for all LLM responses)
# ═══════════════════════════════════════════════════════════════
def _process_llm_reply(raw_reply: str, room: str):
"""Process LLM response: check silence/delay/exec/send."""
global _xmpp_ref
if not raw_reply:
_batch_done(room)
return
# Parse GRANT signal from LLM response
_process_llm_grant(raw_reply)
# ##delay:N## → schedule later
delay_m = _DELAY_RE.search(raw_reply)
if delay_m:
sec = int(delay_m.group(1)) if delay_m.group(1) else _DELAY_DEFAULT
_schedule_delayed(sec, room)
_batch_done(room)
return
# ##exec:command## → run command, use output as reply
exec_m = _EXEC_RE.search(raw_reply)
if exec_m:
output = _run_command(exec_m.group(1))
raw_reply = _EXEC_RE.sub(output, raw_reply, count=1)
# Extract actual response
reply_text = _extract_response(raw_reply)
if reply_text:
text = reply_text.strip()
bot = _xmpp_ref
if bot:
bot.send_message(mto=room, mbody=text, mtype='groupchat')
log(f"-> [{room.split('@')[0]}]: {text[:80]}")
else:
log(f"-> [{room.split('@')[0]}]: (silent)")
_batch_done(room)
# ═══════════════════════════════════════════════════════════════
# Group message handler
# ═══════════════════════════════════════════════════════════════
def _handle_group_message(msg):
"""Process a groupchat message (runs in thread)."""
global _COORDINATOR, _GRANTED, _REVOKED_UNTIL
if _is_mam_recovery():
return
msg_id = msg.get("id", "")
if _is_duplicate(msg_id):
return
body = str(msg["body"]).strip()
if not body:
return
full_from = str(msg["from"])
room = full_from.split("/")[0]
nickname = full_from.split("/")[1] if "/" in full_from else ""
# Self-message skip
if nickname == cfg["nick"]:
log(f"(self) {body[:80]}")
return
_record_group_msg(nickname, body)
# Coordinator signals
if _process_coordinator_signals(nickname, body):
return
# Revoke check
is_revoked = time.time() < _REVOKED_UNTIL
if is_revoked and _GRANTED == cfg["nick"]:
_GRANTED = None
is_revoked = False
log(f"GRANT overrides REVOKE for {cfg['nick']}")
if _check_shutup(body):
return
if is_revoked:
body = f"【只读消息】你被收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {room}] {nickname} 说: {body}"
# Batch or immediate (@mention)
if _batch_group_message(room, nickname, body):
log(f"[{room.split('@')[0]}] {nickname}: {body[:80]} (batched)")
return
log(f"[{room.split('@')[0]}] {nickname}: {body[:80]}")
raw = _call_llm(body, full_from, is_group=True)
_process_llm_reply(raw, room)
# ═══════════════════════════════════════════════════════════════
# Private message handler
# ═══════════════════════════════════════════════════════════════
def _handle_private_message(msg):
"""Process a private chat message."""
global _CALL_SEQ
if msg["type"] == "groupchat":
return
msg_id = msg.get("id", "")
if _is_duplicate(msg_id):
return
body = str(msg["body"]).strip()
sender = str(msg["from"]).split("/")[0]
log(f"<{sender}> {body[:80]}")
if sender == cfg["jid"]:
log("(skipped self)")
return
if time.time() < _REVOKED_UNTIL:
log(f"(silenced) <{sender}> dropped")
return
if _check_shutup(body):
return
# ── Kanban routing ──
_CALL_SEQ += 1
is_kanban = body.startswith('[Kanban]')
target_sid = _KANBAN_SESSION_ID if is_kanban else cfg["session_id"]
if is_kanban:
log(f"📋 看板通知(#{_CALL_SEQ}): {body[:80]}")
# ── EasyTier toggle ──
if body.startswith('[EasyTier]'):
action = body.replace('[EasyTier]', '').strip().lower()
log(f"🔌 EasyTier command: {action}")
if action == 'start':
_start_easytier()
reply_text = "[EasyTier] started on Windows"
elif action == 'stop':
_stop_easytier()
reply_text = "[EasyTier] stopped on Windows"
else:
reply_text = f"[EasyTier] unknown action: {action}"
bot = _xmpp_ref
if bot:
bot.send_message(mto=sender, mbody=reply_text, mtype='chat')
log(f"-> {sender}: {reply_text}")
return
raw = _call_llm(body, sender, is_group=False, session_id=target_sid)
if raw:
reply = _extract_response(raw)
if reply:
text = reply.strip()
bot = _xmpp_ref
if bot:
bot.send_message(mto=sender, mbody=text, mtype='chat')
log(f"-> {sender}: {text[:80]}")
# ═══════════════════════════════════════════════════════════════
# AgentBot Class
# ═══════════════════════════════════════════════════════════════
import slixmpp
class AgentBot(slixmpp.ClientXMPP):
def __init__(self):
super().__init__(cfg["jid"], cfg["password"])
# Connection settings
self.enable_direct_tls = False
self.enable_starttls = True
self.auto_reconnect = True
self.reconnect_max_delay = 10
self.whitespace_keepalive = True
self.whitespace_keepalive_interval = 30
# SSL: accept self-signed certs
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.ssl_context = ctx
# Event handlers
self.add_event_handler("session_start", self._on_session_start)
self.add_event_handler("message", self._on_any_message)
self.add_event_handler("groupchat_message", self._on_group_msg)
self.add_event_handler("disconnected", self._on_disconnected)
self.add_event_handler("connected", self._on_connected)
self.add_event_handler("session_end", self._on_session_end)
self.add_event_handler("connection_failed", self._on_conn_failed)
# MUC plugin
self.register_plugin('xep_0045')
def _on_connected(self, event):
log("connection established")
def _on_session_start(self, event):
self.send_presence()
self.get_roster()
log(f"{cfg['jid']} online")
# Register MAM plugin lazily
try:
self.register_plugin('xep_0313')
except Exception:
log("(MAM: xep_0313 not available)")
# Join MUC rooms
async def _join_all():
for room_jid in cfg["muc_rooms"]:
try:
self.plugin['xep_0045'].join_muc(room_jid, cfg["nick"])
presence = (
f""
f""
f""
f""
)
self.send_raw(presence)
log(f"Joined {room_jid}")
except Exception as e:
log(f"MUC join failed {room_jid}: {e}")
await asyncio.sleep(2)
await asyncio.sleep(3)
await self._fetch_mam_history()
asyncio.ensure_future(_join_all())
async def _fetch_mam_history(self):
"""Query MAM for recent MUC messages to rebuild context."""
if 'xep_0313' not in self.plugin:
log("(MAM: no plugin)")
_set_mam_done()
return
# MAM recovery used in _on_session_start
try:
for room_jid in cfg["muc_rooms"]:
log(f"(MAM: querying {room_jid} for last 50 messages...)")
results = await self.plugin['xep_0313'].retrieve(
jid=room_jid, rsm={'max': 50},
)
count = 0
for msg in results['mam']['results']:
forwarded = msg['mam_result']['forwarded']
body = str(forwarded['stanza']['body'] or '').strip()
if not body:
continue
nick = str(forwarded['stanza']['from']).split('/')[-1] if '/' in str(forwarded['stanza']['from']) else '?'
# Feed into context for chat_bridge (xxm)
if _IS_CHAT_BRIDGE:
role = 'user' if nick != cfg["nick"] else 'assistant'
try:
_bridge._append_to_log(role, f"[{nick}]: {body[:300]}")
except Exception:
pass
count += 1
log(f"(MAM: loaded {count} msgs from {room_jid})")
_set_mam_done()
log("(MAM recovery complete)")
except Exception as e:
log(f"(MAM error: {e})")
_set_mam_done()
def _on_group_msg(self, msg):
threading.Thread(target=_handle_group_message, args=[msg], daemon=True).start()
def _on_any_message(self, msg):
threading.Thread(target=_handle_private_message, args=[msg], daemon=True).start()
def _on_session_end(self, event):
log(f"session ended")
def _on_conn_failed(self, event):
log(f"connection failed: {event}")
def _on_disconnected(self, event):
log(f"disconnected, reconnecting...")
# ═══════════════════════════════════════════════════════════════
# Main
# ═══════════════════════════════════════════════════════════════
def main():
log(f"Starting {cfg['jid']} ({cfg['name_cn']}) — agent={_agent_name}")
if _IS_CHAT_BRIDGE:
log(f" LLM: chat_bridge (session={cfg['session_id']})")
else:
log(f" LLM: Hermes API ({cfg['gateway']})")
log(f" Server: {cfg['server']}:{cfg['port']}")
log(f" Rooms: {cfg['muc_rooms']}")
bot = AgentBot()
global _xmpp_ref
_xmpp_ref = bot
_start_http_bridge()
bot.connect(host=cfg["server"], port=cfg["port"])
log(f"Connecting {cfg['jid']}@{cfg['server']}:{cfg['port']}")
loop = asyncio.get_event_loop()
async def _status_check():
while True:
await asyncio.sleep(60)
log("(alive)")
asyncio.ensure_future(_status_check())
try:
loop.run_forever()
except KeyboardInterrupt:
log("Shutdown by user")
except Exception as e:
log(f"!!! MAIN LOOP CRASH: {e}")
import traceback
log(f"!!! {traceback.format_exc()[:500]}")
raise
if __name__ == "__main__":
main()