diff --git a/gateway/linux/wechat_agent.py b/gateway/linux/wechat_agent.py index 5072031..8bde948 100644 --- a/gateway/linux/wechat_agent.py +++ b/gateway/linux/wechat_agent.py @@ -257,13 +257,36 @@ def fetch_article(url: str) -> dict | None: # ── Main Message Handler ────────────────────────────────────── -def build_message_display_id(msg: IncomingMessage, bot: WeixinBot) -> str: +def detect_group(msg: IncomingMessage) -> str: """ - Build a display identifier for the sender. - Since iLink uses internal user_ids, we use the user_id as the identifier - and supplement with available info. + Detect if a message is from a group chat. + Returns the group_id if it's a group, or empty string if private chat. """ - return msg.user_id + raw = msg.raw + # Check group_id field directly from protocol + gid = raw.get("group_id", "") or "" + if gid: + return gid + # Also check session_id for group patterns (contains 'chatroom' or multiple '@') + session_id = raw.get("session_id", "") or "" + if "@chatroom" in session_id: + return session_id.split("#")[0] if "#" in session_id else session_id + return "" + + +def extract_article_url(text: str) -> str | None: + """Extract article URL from text. Supports WeChat public accounts, Xiaohongshu, and common share links.""" + patterns = [ + r"https?://mp\.weixin\.qq\.com[^\"'\s<)<>\[\]]+", + r"https?://(?:www\.)?xiaohongshu\.com[^\"'\s<)<>\[\]]+", + r"https?://xhslink\.com[^\"'\s<)<>\[\]]+", + r"https?://(?:www\.)?zhihu\.com[^\"'\s<)<>\[\]]+", + ] + for p in patterns: + m = re.search(p, text) + if m: + return m.group(0) + return None async def handle_incoming(msg: IncomingMessage, bot: WeixinBot): @@ -274,81 +297,115 @@ async def handle_incoming(msg: IncomingMessage, bot: WeixinBot): content = msg.text or "" msg_type = msg.type - # ── text message ── + # ── Group chat detection ── + group_id = detect_group(msg) + if group_id: + log.info(f"GROUP message from {group_id}, sender={fu}") + # Use the group_id as the conversation identifier (like Windows version uses roomid) + conv_id = group_id + prefix = "[Group]" + else: + conv_id = fu + prefix = "[Private]" + + # ── TEXT message ── if msg_type == "text": handler_input = content - # Detect forwarded articles - if "mp.weixin.qq.com" in content: - url_match = re.search(r"https?://mp\.weixin\.qq\.com[^\"'\s<)<>\[\]]+", content) - if url_match: - article = fetch_article(url_match.group(0)) - if article: - title = article.get("title", "") - article_text = article.get("content", "")[:2000] - images = article.get("images", 0) - handler_input = ( - f"[老莫转发了一篇文章]\n标题: {title}\n" - + (f"({images}张图片已OCR)\n" if images else "") - + f"\n{article_text}" - ) + # Check for ref_msg (forwarded/quoted content like articles) + ref_text = "" + for item in msg.raw.get("item_list", []): + ref_msg = item.get("ref_msg") + if ref_msg: + ref_title = ref_msg.get("title", "") + ref_item = ref_msg.get("message_item", {}) + if isinstance(ref_item, dict) and ref_item.get("type") == 1: + ref_text_data = ref_item.get("text_item", {}).get("text", "") + if ref_text_data: + ref_text = f"\n[引用消息] {ref_text_data[:500]}" + if ref_title: + handler_input = f"[老莫转发了一篇文章] 标题: {ref_title}\n\n{content}{ref_text}" - reply = call_hermes(fu, handler_input) + # Detect article URLs (WeChat public account, Xiaohongshu, etc.) + article_url = extract_article_url(handler_input) + if article_url: + log.info(f"Article URL detected: {article_url[:80]}") + article = fetch_article(article_url) + if article: + title = article.get("title", "") + article_text = article.get("content", "")[:2000] + images = article.get("images", 0) + handler_input = ( + f"[老莫转发了一篇文章]\n标题: {title}\n" + + (f"({images}张图片已OCR)\n" if images else "") + + f"\n{article_text}" + ) + else: + # Article processor failed, send original content + log.warning("Article processor returned no content, sending raw text") + + reply = call_hermes(conv_id, handler_input) if reply and reply.strip(): - await process_reply(reply, fu, bot) + await process_reply(reply, conv_id, bot) - # ── image message ── + # ── IMAGE message ── elif msg_type == "image": - log.info(f"Image from {fu}, attempting OCR...") + log.info(f"Image from {conv_id}, attempting OCR...") # Get image URL from the raw message img_url = None for item in msg.raw.get("item_list", []): if item.get("type") in (2,): # IMAGE img_item = item.get("image_item", {}) - # Try direct URL first, then CDN img_url = img_item.get("url", "") if not img_url: - # CDN media - need decryption + # CDN media - download via CDN API media = img_item.get("media", {}) - aes_key = media.get("aes_key", "") - encrypt_query = media.get("encrypt_query_param", "") - # For now, log the CDN info and continue - log.info(f"Image has CDN media: aes_key={aes_key[:15]}...") - img_url = None + if media: + log.info("Image has CDN media, attempting CDN download...") + img_url = download_cdn_image(media) break ocr_text = None if img_url: ocr_text = ocr_image_from_url(img_url) + if not ocr_text: + log.warning("OCR returned no text from image URL") else: - log.info("No direct image URL, sending raw iLink image to Hermes for description") - handler_input = "[老莫发送了一张图片]" - reply = call_hermes(fu, handler_input) - if reply and reply.strip(): - await bot.reply(msg, reply.strip()) - return + log.info("No image URL available, cannot OCR") if ocr_text: handler_input = f"[老莫发送了一张图片,OCR识别结果如下]\n{ocr_text}" else: - handler_input = "[老莫发送了一张图片,但OCR识别失败,无法读取内容]" + handler_input = "[老莫发送了一张图片,但无法识别图片内容]" - reply = call_hermes(fu, handler_input) + reply = call_hermes(conv_id, handler_input) if reply and reply.strip(): await bot.reply(msg, reply.strip()) - # ── voice message ── + # ── VOICE message ── elif msg_type == "voice": - reply = call_hermes(fu, "[voice message]") + reply = call_hermes(conv_id, "[voice message]") if reply and reply.strip(): await bot.reply(msg, reply.strip()) - # ── unknown type ── + # ── Unknown type ── else: log.info(f"Unhandled message type: {msg_type}") +def download_cdn_image(media: dict) -> str | None: + """ + Download an image from WeChat CDN using the iLink media protocol. + The media dict contains aes_key and encrypt_query_param for AES-128-ECB decryption. + For now, this is a placeholder - CDN download requires AES decryption. + """ + logger = log + logger.info(f"CDN media available but direct download not yet implemented") + logger.debug(f"CDN media keys: {list(media.keys())}") + return None + + async def process_reply(reply: str, fu: str, bot: WeixinBot): """ Process Hermes reply text, handling tags like [FILE], [IMG], [EMOJI]. diff --git a/xmpp_agent_core.py b/xmpp_agent_core.py index 81cd617..1b5d410 100644 --- a/xmpp_agent_core.py +++ b/xmpp_agent_core.py @@ -1,35 +1,9 @@ #!/usr/bin/env python3 -""" -XMPP Agent Core — 统一版 -========================= -Single bot core for all agents. Supports --agent xxm|mohe|zhiwei|xiaoguo. - -Usage: - python xmpp_agent_core.py --agent xxm # xxm, uses chat_bridge - python xmpp_agent_core.py --agent mohe # mohe, uses Hermes API - python xmpp_agent_core.py --agent zhiwei # zhiwei, uses Hermes API - python xmpp_agent_core.py --agent xiaoguo # xiaoguo, uses Hermes API - -Shares: PID lock, reconnect, MUC join, dedup, batching, - coordinator protocol (GRANT/REVOKE), HTTP bridge. -Differs only in LLM calling method (chat_bridge vs Hermes API). -""" -import os, sys, time, threading, asyncio, logging, json, re, ssl -import urllib.request, http.server, urllib.parse - -# ── Windows selector loop (slixmpp needs it on Windows) ── -if sys.platform == "win32": - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - -# ── PATH: allow imports from gateway/scripts/ (proc_guard, chat_bridge) ── -_GATEWAY_SCRIPTS = os.path.join(os.path.dirname(os.path.abspath(__file__)), - "gateway", "scripts") -sys.path.insert(0, _GATEWAY_SCRIPTS) - -# ═══════════════════════════════════════════════════════════════ -# AGENTS Configuration -# ═══════════════════════════════════════════════════════════════ +"""XMPP Bot - 统一版,支持 --agent mohe|zhiwei|xiao 参数""" +import asyncio, logging, ssl, json, urllib.request, os, time, sys, re +from slixmpp import ClientXMPP +# ── Agent 配置 ────────────────────────────────────────────── AGENTS = { "mohe": { "jid": "mohe@yoin.fun", @@ -39,9 +13,7 @@ AGENTS = { "http_port": 5804, "gateway": "http://localhost:8642/v1/chat/completions", "session_id": "xmpp-mohe-v2", - "server": "127.0.0.1", - "port": 5222, - "muc_rooms": ["coregroup@conference.yoin.fun"], + "kanban_session_id": "xmpp-mohe-kanban", "mention": "@mohe/@莫荷", }, "zhiwei": { @@ -51,10 +23,8 @@ AGENTS = { "name_cn": "知微", "http_port": 5805, "gateway": "http://localhost:8643/v1/chat/completions", - "session_id": "xmpp-zhiwei", - "server": "127.0.0.1", - "port": 5222, - "muc_rooms": ["coregroup@conference.yoin.fun"], + "session_id": "xmpp-zhiwei-v2", + "kanban_session_id": "xmpp-zhiwei-kanban", "mention": "@zhiwei/@知微", }, "xiaoguo": { @@ -66,959 +36,362 @@ AGENTS = { "gateway": "http://localhost:8645/v1/chat/completions", "session_id": "xmpp-xiaoguo", "kanban_session_id": "xmpp-xiaoguo-kanban", - "server": "127.0.0.1", - "port": 5222, - "muc_rooms": ["coregroup@conference.yoin.fun"], "mention": "@xiaoguo/@小果", }, - "xxm": { - "jid": "xxm@yoin.fun", - "password": "hermes123", - "nick": "xxm", - "name_cn": "笑笑", - "http_port": 5802, - "bridge": "chat_bridge", # use local chat_bridge instead of Hermes API - "session_id": "ses_xxm_xmpp", - "kanban_session_id": "xmpp-xxm-kanban", - "server": "192.168.1.246", # LAN direct connect - "port": 5222, - "muc_rooms": [ - "coregroup@conference.yoin.fun", - "jujidina@conference.yoin.fun", - ], - "mention": "@xxm/@笑笑", - }, } -# ── Agent selection ── -_agent_name = "mohe" -if "--agent" in sys.argv: - idx = sys.argv.index("--agent") - if idx + 1 < len(sys.argv): - _agent_name = sys.argv[idx + 1] -cfg = AGENTS.get(_agent_name, AGENTS["mohe"]) - -# ═══════════════════════════════════════════════════════════════ -# PID Lock — prevent duplicate instances -# ═══════════════════════════════════════════════════════════════ - -from proc_guard import guard as _proc_guard -_lock = _proc_guard(f"xmpp_bot_{_agent_name}") -if not _lock.ok: - print(_lock.message, flush=True) - sys.exit(1) - -# ═══════════════════════════════════════════════════════════════ -# Logging -# ═══════════════════════════════════════════════════════════════ - -_LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "gateway", "logs") -os.makedirs(_LOG_DIR, exist_ok=True) -_LOG_FILE = os.path.join(_LOG_DIR, f"xmpp_{_agent_name}.log") -_START_TIME = time.time() - - -def log(m: str): - with open(_LOG_FILE, "a", encoding="utf-8") as f: - f.write(f"{time.strftime('%H:%M:%S')} {m}\n") - +agent = sys.argv[sys.argv.index("--agent") + 1] if "--agent" in sys.argv else "mohe" +cfg = AGENTS.get(agent, AGENTS["mohe"]) logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') - -# ═══════════════════════════════════════════════════════════════ -# LLM Bridge Init — abstracted per agent -# ═══════════════════════════════════════════════════════════════ - -_IS_CHAT_BRIDGE = cfg.get("bridge") == "chat_bridge" -_router = None # set only for chat_bridge (xxm) - -# ── Kanban session support ── -_CALL_SEQ = 0 -_KANBAN_SESSION_ID = cfg.get("kanban_session_id", None) - -if _IS_CHAT_BRIDGE: - from chat_bridge import SessionBridge - from session_router import SessionRouter - _bridge = SessionBridge(session_id=cfg["session_id"]) - _router = SessionRouter(bridge=_bridge, default_session=cfg["session_id"]) - # Kanban-dedicated bridge + router for separate session - _kanban_sid = _KANBAN_SESSION_ID or f"{cfg['session_id']}-kanban" - _kanban_bridge = SessionBridge(session_id=_kanban_sid) - _kanban_router = SessionRouter(bridge=_kanban_bridge, default_session=_kanban_sid) - log(f"LLM: chat_bridge (session={cfg['session_id']})") - log(f"Kanban: chat_bridge (session={_kanban_sid})") -else: - _opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) - log(f"LLM: Hermes API ({cfg['gateway']})") - - -def _call_llm(content: str, sender: str, is_group: bool = False, - session_id: str | None = None) -> str: - """Abstract LLM call. Returns raw response text (or empty string). - If session_id provided and differs from default, route to kanban handler.""" - if _IS_CHAT_BRIDGE: - if session_id and session_id != cfg["session_id"]: - # Prepends kanban-handler instructions so LLM knows how to handle - kanban_content = ( - "【看板处理协议】\n" - "收到卡片后三步判断:\n" - " A) 任务明确 → 直接执行 → 评论结果 + 更新状态\n" - " B) 信息不足 → 评论提问 + 设为 blocked\n" - " C) 不是我的活 → 评论说明 + 转派(如果能判断)\n" - "\n" - "汇报规则:\n" - " - 任务完成 → 简短 DM 给老莫摘要\n" - " - 评论/状态变更 → 不汇报\n" - " - 追问/转派 → 不汇报\n" - "\n" - "可用 API:\n" - " curl http://192.168.1.246:5803/api/kanban/t_xxx 查看卡片详情\n" - " 更新操作走 Kanban Dashboard UI\n" - "\n" - "卡片上下文不够?→ 用 session_search 查历史\n" - f"---\n{content}" - ) - return _kanban_router.route("xmpp", sender, kanban_content) or "" - return _router.route("xmpp", sender, content) or "" - else: - return _call_hermes_api(content, session_id) - - -def _call_hermes_api(content: str, session_id: str | None = None) -> str: - """POST to Hermes API, return response text or empty string.""" - target_sid = session_id or cfg["session_id"] - try: - payload = json.dumps({ - "model": "hermes-agent", - "messages": [{"role": "user", "content": content}] - }).encode() - req = urllib.request.Request(cfg["gateway"], data=payload, method="POST") - req.add_header("Content-Type", "application/json") - req.add_header("Authorization", "Bearer hermes123") - req.add_header("X-Hermes-Session-Id", target_sid) - result = _opener.open(req, timeout=600) - data = json.loads(result.read()) - reply = data.get("choices", [{}])[0].get("message", {}).get("content", "") - return reply.strip() - except Exception as e: - log(f"!!! Hermes API error: {e}") - return "" - - -# ═══════════════════════════════════════════════════════════════ -# EasyTier control (Windows) -# ═══════════════════════════════════════════════════════════════ - -_EASYTIER_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), - "tools", "easytier") -_EASYTIER_CORE = os.path.join(_EASYTIER_DIR, "easytier-core.exe") -_EASYTIER_PID_FILE = os.path.join(_EASYTIER_DIR, "easytier.pid") -_EASYTIER_NET = "--network-name mynet --network-secret ce75d0a5" -_EASYTIER_RELAY = "--peers tcp://47.115.32.206:11010" -_EASYTIER_IP = "--ipv4 10.144.144.3" - - -def _start_easytier(): - """Start EasyTier on Windows.""" - import subprocess as _sp - if not os.path.exists(_EASYTIER_CORE): - log(f"EasyTier binary not found: {_EASYTIER_CORE}") - return - # Check if already running - if os.path.exists(_EASYTIER_PID_FILE): - try: - with open(_EASYTIER_PID_FILE) as f: - old_pid = int(f.read().strip()) - _sp.run(["taskkill", "/f", "/pid", str(old_pid)], capture_output=True, timeout=5) - log(f"Killed old EasyTier (PID {old_pid})") - except Exception: - pass - # Start - cmd = f'start /b "" "{_EASYTIER_CORE}" {_EASYTIER_NET} {_EASYTIER_RELAY} {_EASYTIER_IP} --disable-encryption' - try: - _sp.run(cmd, shell=True, timeout=5) - log("EasyTier start command issued") - except Exception as e: - log(f"EasyTier start error: {e}") - - -def _stop_easytier(): - """Stop EasyTier on Windows.""" - import subprocess as _sp - try: - _sp.run(["taskkill", "/f", "/im", "easytier-core.exe"], capture_output=True, timeout=5) - log("EasyTier stopped (taskkill)") - except Exception as e: - log(f"EasyTier stop error: {e}") - # Clean up PID file - try: - if os.path.exists(_EASYTIER_PID_FILE): - os.remove(_EASYTIER_PID_FILE) - except Exception: - pass - - -def _check_easytier() -> bool: - """Check if EasyTier is running on Windows.""" - import subprocess as _sp - try: - r = _sp.run(["tasklist", "/fi", "imagename eq easytier-core.exe"], - capture_output=True, text=True, timeout=5) - return "easytier-core.exe" in r.stdout - except Exception: - return False - - -# ═══════════════════════════════════════════════════════════════ -# Message Dedup -# ═══════════════════════════════════════════════════════════════ - -_DEDUP_CACHE: set[str] = set() -_DEDUP_LOCK = threading.Lock() - - -def _is_duplicate(msg_id: str) -> bool: - if not msg_id: - return False - with _DEDUP_LOCK: - if msg_id in _DEDUP_CACHE: - return True - _DEDUP_CACHE.add(msg_id) - if len(_DEDUP_CACHE) > 100: - _DEDUP_CACHE.clear() - return False - - -# ═══════════════════════════════════════════════════════════════ -# Coordinator Protocol (shared across all agents) -# ═══════════════════════════════════════════════════════════════ - -_COORDINATOR: str = "mohe" -_GRANTED: str | None = None -_REVOKED_UNTIL: float = 0.0 -_SHUTUP_PATTERNS = ["闭嘴", "别说话", "安静", "shut", "stfu", "别说了", "停"] - - -def _process_coordinator_signals(nickname: str, body: str) -> bool: - """Parse coordinator/GRANT/REVOKE from incoming messages. - Returns True if message was a control signal (consumed, no further processing).""" - global _COORDINATOR, _GRANTED, _REVOKED_UNTIL - # 1. hmo switches coordinator - if nickname == 'hmo' and 'coordinator=' in body.lower(): - for name in ('mohe', 'zhiwei', 'xxm'): - if f'coordinator={name}' in body.lower(): - _COORDINATOR = name - _GRANTED = None - log(f"Coordinator switched to {name} by hmo") - return True - # 2. GRANT signal (overrides REVOKE) - gm = re.search(r'\[GRANT:(\w+)\]', body) - if gm: - _GRANTED = gm.group(1) - _REVOKED_UNTIL = 0 - log(f"GRANT: {_GRANTED}") - return True - # 3. REVOKE signal (5min auto-restore) - rm = re.search(r'\[REVOKE:(\w+)\]', body) - if rm and rm.group(1) == cfg["nick"]: - _REVOKED_UNTIL = time.time() + 300 - log(f"REVOKEd: {cfg['nick']} silenced for 5min") - return True - return False - - -def _check_shutup(body: str) -> bool: - """hmo says shut up → 5min silence.""" - lower = body.lower().strip() - for pat in _SHUTUP_PATTERNS: - if pat.lower() in lower: - _REVOKED_UNTIL = time.time() + 300 - log(f"(shutup: '{pat}' → 5min silence)") - return True - return False - - -def _process_llm_grant(response: str): - """Parse GRANT signal from LLM's own response.""" - global _GRANTED - gm = re.search(r'\[GRANT:(\w+)\]', response) - if gm: - _GRANTED = gm.group(1) - _REVOKED_UNTIL = 0 - log(f"LLM GRANT: {_GRANTED}") - - -# ═══════════════════════════════════════════════════════════════ -# Response Extraction -# ═══════════════════════════════════════════════════════════════ - -_SILENCE_PATTERNS = [ - "保持沉默", "不应[该]?回复", "没有.*@.*我", "不是对[我我说]", - "跟我无关", "我不用回复", "不该回复", "不参与", - "不是我[应]?该[说回]", -] -_EXEC_RE = re.compile(r"##exec:(.+?)##", re.DOTALL) -_DELAY_RE = re.compile(r"##delay:?(\d+)?##") -_DELAY_DEFAULT = 15 -_EXEC_TIMEOUT = 60 - - -def _strip_toolcall_xml(text: str) -> str: - t = text - t = re.sub(r']*>.*?(|$)', '', t, flags=re.DOTALL) - t = re.sub(r'.*?(|$)', '', t, flags=re.DOTALL) - t = re.sub(r']*>.*?(|$)', '', t, flags=re.DOTALL) - t = re.sub(r'.*?(|$)', '', t, flags=re.DOTALL) - return t.strip() - - -def _extract_response(text: str) -> str | None: - """Strip __SILENT__, reasoning blocks, natural language silence. - Returns actual content to send, or None to stay silent.""" - if not text: - return None - t = text.strip() - if not t: - return None - t = _strip_toolcall_xml(t) - # Natural language silence detection - if not t.startswith("__SILENT__"): - first = t.split("\n", 1)[0] - for pat in _SILENCE_PATTERNS: - if re.search(pat, first): - return None - return t - # Has __SILENT__ prefix - parts = t.split("\n", 1) - if len(parts) < 2: - return None - rest = parts[1].strip() - while True: - m = re.match(r'^([^)]*)\s*', rest) - if m: - rest = rest[m.end():] - continue - m = re.match(r'^\([^)]*\)\s*', rest) - if m: - rest = rest[m.end():] - continue - break - return rest.strip() or None - - -# ═══════════════════════════════════════════════════════════════ -# Message Batching (3s debounce + serialized processing) -# ═══════════════════════════════════════════════════════════════ - -_BATCH_WINDOW = 3.0 -_BATCH_TIMEOUT = 300 -_batch_entries: dict[str, list[str]] = {} -_batch_timers: dict[str, threading.Timer] = {} -_batch_processing: set[str] = set() -_batch_pending: dict[str, list[str]] = {} -_batch_lock = threading.Lock() -_BOT_NICK = cfg["nick"] - - -def _run_command(cmd: str) -> str: - """Execute ##exec:command## shell command.""" - log(f"(exec: {cmd[:120]})") - try: - import subprocess - r = subprocess.run(cmd, shell=True, capture_output=True, - timeout=_EXEC_TIMEOUT, text=True, encoding='utf-8', errors='replace') - out = (r.stdout or "") + (r.stderr or "") - out = out.strip() or f"(no output, exit={r.returncode})" - log(f"(exec done: {len(out)} bytes, exit={r.returncode})") - return out - except subprocess.TimeoutExpired: - log(f"(exec timeout >{_EXEC_TIMEOUT}s)") - return "(命令超时)" - except Exception as e: - log(f"(exec error: {e})") - return f"(命令执行失败: {e})" - - -def _schedule_delayed(delay_sec: int, room: str): - """Schedule ##delay:N## re-invocation.""" - global _xmpp_ref - import subprocess as _sp - - def _fire(): - bot = _xmpp_ref - if not bot: - return - try: - prompt = "时间到,请根据最新的信息汇报结果。" - raw = _call_llm(prompt, room, is_group=True) - reply = _extract_response(raw) - if reply: - text = reply.strip() - bot.send_message(mto=room, mbody=text, mtype='groupchat') - log(f"-> [Delay][{room}]: {text[:80]}") - except Exception as e: - log(f"!! delay err: {e}") - - t = threading.Timer(delay_sec, _fire) - t.daemon = True - t.start() - log(f"(delay +{delay_sec}s → {room})") - - -def _batch_done(room: str): - """Called when batch LLM finishes. Flush pending if any.""" - with _batch_lock: - _batch_processing.discard(room) - pending = _batch_pending.pop(room, None) - if pending: - _batch_entries[room] = pending - t = threading.Timer(0.1, _fire_batch, args=[room]) - t.daemon = True - t.start() - _batch_timers[room] = t - return - log(f"[Batch][{room}] (idle)") - - -def _fire_batch(room: str): - """Collect batched entries and call LLM.""" - with _batch_lock: - entries = _batch_entries.pop(room, None) - _batch_timers.pop(room, None) - if not entries: - return - _batch_processing.add(room) - combined = "\n".join(entries) - - def _handle(): - timed_out = [False] - - def _timeout(): - timed_out[0] = True - log(f"[Batch][{room}] TIMEOUT ({_BATCH_TIMEOUT}s)") - _batch_done(room) - - timer = threading.Timer(_BATCH_TIMEOUT, _timeout) - timer.daemon = True - timer.start() - try: - raw = _call_llm(combined, room, is_group=True) - if not timed_out[0]: - timer.cancel() - _process_llm_reply(raw, room) - else: - log(f"[Batch][{room}] route returned after timeout, discarded") - except Exception as e: - log(f"!!! BATCH: {e}") - if not timed_out[0]: - timer.cancel() - _batch_done(room) - - threading.Thread(target=_handle, daemon=True).start() - - -def _batch_group_message(room: str, nickname: str, body: str) -> bool: - """Add message to room batch. Returns True if batched, False if @mention (immediate).""" - if f"@{_BOT_NICK}" in body or body.startswith(_BOT_NICK): - return False - formatted = f"[{nickname}]: {body}" - with _batch_lock: - if room in _batch_processing: - _batch_pending.setdefault(room, []).append(formatted) - return True - timer = _batch_timers.pop(room, None) - if timer: - timer.cancel() - _batch_entries.setdefault(room, []).append(formatted) - t = threading.Timer(_BATCH_WINDOW, _fire_batch, args=[room]) - t.daemon = True - t.start() - _batch_timers[room] = t - return True - - -# ═══════════════════════════════════════════════════════════════ -# MAM Recovery — fetch recent history on startup -# ═══════════════════════════════════════════════════════════════ - -_MAM_RECOVERY = True -_MAM_RECOVERY_LOCK = threading.Lock() -_MAM_MARK_DONE = False -_MAM_TIMEOUT = 30 - - -def _set_mam_done(): - global _MAM_RECOVERY - with _MAM_RECOVERY_LOCK: - _MAM_RECOVERY = False - - -def _is_mam_recovery() -> bool: - if time.time() - _START_TIME > _MAM_TIMEOUT: - global _MAM_RECOVERY - with _MAM_RECOVERY_LOCK: - if _MAM_RECOVERY: - _MAM_RECOVERY = False - log("(MAM recovery timed out, force-disabled)") - return _MAM_RECOVERY - with _MAM_RECOVERY_LOCK: - return _MAM_RECOVERY - - -# ═══════════════════════════════════════════════════════════════ -# XML Escape -# ═══════════════════════════════════════════════════════════════ - -def _escape(text: str) -> str: - return (text.replace("&", "&").replace("<", "<") - .replace(">", ">").replace('"', """)) - - -# ═══════════════════════════════════════════════════════════════ -# HTTP Bridge — health, presence, messages, POST send -# ═══════════════════════════════════════════════════════════════ - -_HTTP_PORT = cfg["http_port"] -_MSG_BUF: list[dict] = [] -_MSG_BUF_LOCK = threading.Lock() -_xmpp_ref = None # set after bot creation - - -def _record_group_msg(nickname: str, body: str): - ts = time.strftime("%H:%M:%S") - with _MSG_BUF_LOCK: - _MSG_BUF.append({"ts": ts, "from": nickname, "body": body}) - if len(_MSG_BUF) > 200: - _MSG_BUF[:] = _MSG_BUF[-150:] - - -class _BridgeHandler(http.server.BaseHTTPRequestHandler): - def do_GET(self): - parsed = urllib.parse.urlparse(self.path) - if parsed.path == "/muc": - try: - muc_info = {"rooms": {}} - bot = _xmpp_ref - if bot is not None and 'xep_0045' in bot.plugin: - muc_plugin = bot.plugin['xep_0045'] - for room_jid in cfg["muc_rooms"]: - room_data = {"jid": room_jid, "participants": []} - try: - if room_jid in muc_plugin.rooms: - room = muc_plugin.rooms[room_jid] - for nick, info in room.get('roster', {}).items(): - room_data["participants"].append({ - "nick": nick, - "jid": str(info.get('jid', '')), - "affiliation": str(info.get('affiliation', '')), - "role": str(info.get('role', '')), - }) - except Exception as e: - room_data["error"] = str(e) - muc_info["rooms"][room_jid] = room_data - self._reply(200, muc_info) - except Exception as e: - self._reply(500, {"ok": False, "error": str(e)}) - return - if parsed.path == "/health": - try: - bot = _xmpp_ref - session_ok = bot.session_started_event.is_set() if (bot and hasattr(bot, 'session_started_event')) else False - socket_ok = bot.is_connected() if (bot and hasattr(bot, 'is_connected')) else False - self._reply(200, { - "ok": True, "xmpp_connected": session_ok or socket_ok, - "agent": _agent_name, "jid": cfg["jid"], - "uptime_sec": int(time.time() - _START_TIME), - "muc_rooms": cfg["muc_rooms"], - }) - except Exception as e: - self._reply(500, {"ok": False, "error": str(e)}) - return - if parsed.path.startswith("/presence"): - jid_to_check = parsed.path[len("/presence/"):].strip() - if not jid_to_check: - self._reply(400, {"ok": False, "error": "missing JID"}) - return - try: - info = {"jid": jid_to_check, "online": False, "resources": []} - bot = _xmpp_ref - if bot and hasattr(bot, 'client_roster'): - roster = bot.client_roster - if jid_to_check in roster: - resources = list(roster[jid_to_check].resources.keys()) - info["online"] = len(resources) > 0 - info["resources"] = resources - self._reply(200, info) - except Exception as e: - self._reply(500, {"ok": False, "error": str(e)}) - return - if parsed.path == "/messages": - try: - qs = urllib.parse.parse_qs(parsed.query) - sender = qs.get("from", [None])[0] - with _MSG_BUF_LOCK: - msgs = list(_MSG_BUF) - if sender: - msgs = [m for m in msgs if m["from"] == sender] - self._reply(200, {"ok": True, "count": len(msgs), "messages": msgs[-50:]}) - except Exception as e: - self._reply(500, {"ok": False, "error": str(e)}) - return - self._reply(404, {"ok": False, "error": "not found"}) - +GATEWAY = cfg["gateway"] +API_KEY = "hermes123" +AGENT_NICK = cfg["nick"] +AGENT_NAME = cfg["name_cn"] +AGENT_JID = cfg["jid"] +AGENT_MENTION = cfg["mention"] +SESSION_ID = cfg["session_id"] +KANBAN_SESSION_ID = cfg.get("kanban_session_id", SESSION_ID) +HTTP_PORT = cfg["http_port"] +_opener = urllib.request.build_opener(urllib.request.ProxyHandler({})) + +# ── HTTP 桥(接收本地脚本的主动发送请求) ── +from http.server import HTTPServer, BaseHTTPRequestHandler +import threading, json as json_mod + +_send_queue = [] + +class SendHandler(BaseHTTPRequestHandler): def do_POST(self): + length = int(self.headers.get('Content-Length', 0)) + body = self.rfile.read(length) try: - length = int(self.headers.get('Content-Length', 0)) - body = json.loads(self.rfile.read(length)) - path = urllib.parse.urlparse(self.path).path.rstrip('/') - - # /easytier endpoint — execute EasyTier action locally (no XMPP DM) - if path == "/easytier": - action = body.get("action", "") - if action == "start": - _start_easytier() - self._reply(200, {"ok": True, "message": "EasyTier started"}) - elif action == "stop": - _stop_easytier() - self._reply(200, {"ok": True, "message": "EasyTier stopped"}) - elif action == "status": - running = _check_easytier() - self._reply(200, {"ok": True, "running": running}) - else: - self._reply(400, {"ok": False, "error": "action must be start|stop|status"}) - return - - to = body.get('to', cfg["muc_rooms"][0]) - msg = body.get('message', '') or body.get('body', '') - msg_type = body.get('type', 'groupchat') - if not msg: - self._reply(400, {"ok": False, "error": "empty message"}) - return - safe = _escape(msg.strip()) - bot = _xmpp_ref - if bot: - bot.send_message(mto=to, mbody=msg.strip(), mtype=msg_type) - _record_group_msg(cfg["nick"], msg) - log(f"[http] → [{to.split('@')[0]}]: {msg[:80]} (type={msg_type})") - self._reply(200, {"ok": True}) + data = json_mod.loads(body) + room = data.get('to', 'coregroup@conference.yoin.fun') + text = data.get('body', '') + if text: + _send_queue.append((room, text)) + self.send_response(200) + self.end_headers() + self.wfile.write(b'{"ok":true}') + else: + self.send_response(400) + self.end_headers() + self.wfile.write(b'{"ok":false,"error":"empty body"}') except Exception as e: - self._reply(500, {"ok": False, "error": str(e)}) + self.send_response(500) + self.end_headers() + self.wfile.write(f'{{"ok":false,"error":"{e}"}}'.encode()) - def _reply(self, code, data): - body = json.dumps(data, ensure_ascii=False).encode('utf-8') - self.send_response(code) - self.send_header('Content-Type', 'application/json; charset=utf-8') - self.send_header('Content-Length', len(body)) - self.end_headers() - self.wfile.write(body) +def _run_http(): + server = HTTPServer(('127.0.0.1', HTTP_PORT), SendHandler) + server.timeout = 1.0 + while True: + server.handle_request() - def log_message(self, format, *args): - pass +threading.Thread(target=_run_http, daemon=True).start() +logging.info(f"🚀 {AGENT_NAME} HTTP 桥启动于 :{HTTP_PORT}") - -def _start_http_bridge(): - _httpd = http.server.HTTPServer(('0.0.0.0', _HTTP_PORT), _BridgeHandler) - _t = threading.Thread(target=_httpd.serve_forever, daemon=True) - _t.start() - log(f"HTTP bridge ready on :{_HTTP_PORT}") - - -# ═══════════════════════════════════════════════════════════════ -# Reply Processing (shared for all LLM responses) -# ═══════════════════════════════════════════════════════════════ - -def _process_llm_reply(raw_reply: str, room: str): - """Process LLM response: check silence/delay/exec/send.""" - global _xmpp_ref - if not raw_reply: - _batch_done(room) - return - # Parse GRANT signal from LLM response - _process_llm_grant(raw_reply) - # ##delay:N## → schedule later - delay_m = _DELAY_RE.search(raw_reply) - if delay_m: - sec = int(delay_m.group(1)) if delay_m.group(1) else _DELAY_DEFAULT - _schedule_delayed(sec, room) - _batch_done(room) - return - # ##exec:command## → run command, use output as reply - exec_m = _EXEC_RE.search(raw_reply) - if exec_m: - output = _run_command(exec_m.group(1)) - raw_reply = _EXEC_RE.sub(output, raw_reply, count=1) - # Extract actual response - reply_text = _extract_response(raw_reply) - if reply_text: - text = reply_text.strip() - bot = _xmpp_ref - if bot: - bot.send_message(mto=room, mbody=text, mtype='groupchat') - log(f"-> [{room.split('@')[0]}]: {text[:80]}") - else: - log(f"-> [{room.split('@')[0]}]: (silent)") - _batch_done(room) - - -# ═══════════════════════════════════════════════════════════════ -# Group message handler -# ═══════════════════════════════════════════════════════════════ - -def _handle_group_message(msg): - """Process a groupchat message (runs in thread).""" - global _COORDINATOR, _GRANTED, _REVOKED_UNTIL - if _is_mam_recovery(): - return - msg_id = msg.get("id", "") - if _is_duplicate(msg_id): - return - body = str(msg["body"]).strip() - if not body: - return - full_from = str(msg["from"]) - room = full_from.split("/")[0] - nickname = full_from.split("/")[1] if "/" in full_from else "" - # Self-message skip - if nickname == cfg["nick"]: - log(f"(self) {body[:80]}") - return - _record_group_msg(nickname, body) - # Coordinator signals - if _process_coordinator_signals(nickname, body): - return - # Revoke check - is_revoked = time.time() < _REVOKED_UNTIL - if is_revoked and _GRANTED == cfg["nick"]: - _GRANTED = None - is_revoked = False - log(f"GRANT overrides REVOKE for {cfg['nick']}") - if _check_shutup(body): - return - if is_revoked: - body = f"【只读消息】你被收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {room}] {nickname} 说: {body}" - # Batch or immediate (@mention) - if _batch_group_message(room, nickname, body): - log(f"[{room.split('@')[0]}] {nickname}: {body[:80]} (batched)") - return - log(f"[{room.split('@')[0]}] {nickname}: {body[:80]}") - raw = _call_llm(body, full_from, is_group=True) - _process_llm_reply(raw, room) - - -# ═══════════════════════════════════════════════════════════════ -# Private message handler -# ═══════════════════════════════════════════════════════════════ - -def _handle_private_message(msg): - """Process a private chat message.""" - global _CALL_SEQ - if msg["type"] == "groupchat": - return - msg_id = msg.get("id", "") - if _is_duplicate(msg_id): - return - body = str(msg["body"]).strip() - sender = str(msg["from"]).split("/")[0] - log(f"<{sender}> {body[:80]}") - if sender == cfg["jid"]: - log("(skipped self)") - return - if time.time() < _REVOKED_UNTIL: - log(f"(silenced) <{sender}> dropped") - return - if _check_shutup(body): - return - # ── Kanban routing ── - _CALL_SEQ += 1 - is_kanban = body.startswith('[Kanban]') - target_sid = _KANBAN_SESSION_ID if is_kanban else cfg["session_id"] - if is_kanban: - log(f"📋 看板通知(#{_CALL_SEQ}): {body[:80]}") - # ── EasyTier toggle ── - if body.startswith('[EasyTier]'): - action = body.replace('[EasyTier]', '').strip().lower() - log(f"🔌 EasyTier command: {action}") - if action == 'start': - _start_easytier() - reply_text = "[EasyTier] started on Windows" - elif action == 'stop': - _stop_easytier() - reply_text = "[EasyTier] stopped on Windows" - else: - reply_text = f"[EasyTier] unknown action: {action}" - bot = _xmpp_ref - if bot: - bot.send_message(mto=sender, mbody=reply_text, mtype='chat') - log(f"-> {sender}: {reply_text}") - return - raw = _call_llm(body, sender, is_group=False, session_id=target_sid) - if raw: - reply = _extract_response(raw) - if reply: - text = reply.strip() - bot = _xmpp_ref - if bot: - bot.send_message(mto=sender, mbody=text, mtype='chat') - log(f"-> {sender}: {text[:80]}") - - -# ═══════════════════════════════════════════════════════════════ -# AgentBot Class -# ═══════════════════════════════════════════════════════════════ - -import slixmpp - - -class AgentBot(slixmpp.ClientXMPP): +# ── XMPP Bot 类 ──────────────────────────────────────────────── +class AgentBot(ClientXMPP): def __init__(self): - super().__init__(cfg["jid"], cfg["password"]) - # Connection settings - self.enable_direct_tls = False - self.enable_starttls = True - self.auto_reconnect = True - self.reconnect_max_delay = 10 - self.whitespace_keepalive = True - self.whitespace_keepalive_interval = 30 - # SSL: accept self-signed certs + super().__init__(AGENT_JID, cfg["password"]) + self.add_event_handler('session_bind', self.on_bind) + self.add_event_handler('message', self.on_msg) + self.add_event_handler('disconnected', self.on_disconnect) + self.add_event_handler('connected', self.on_connected) ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE self.ssl_context = ctx - # Event handlers - self.add_event_handler("session_start", self._on_session_start) - self.add_event_handler("message", self._on_any_message) - self.add_event_handler("groupchat_message", self._on_group_msg) - self.add_event_handler("disconnected", self._on_disconnected) - self.add_event_handler("connected", self._on_connected) - self.add_event_handler("session_end", self._on_session_end) - self.add_event_handler("connection_failed", self._on_conn_failed) - # MUC plugin - self.register_plugin('xep_0045') + self.ready = asyncio.Event() + self._call_seq = 0 + self._muc_joined = False + self._recent_sent = [] + self._coordinator = 'mohe' # 默认协调者 + self._granted = None - def _on_connected(self, event): - log("connection established") + async def on_connected(self, event): + logging.info(f"🔗 {AGENT_NAME} TCP连接已建立") - def _on_session_start(self, event): + async def on_bind(self, event): self.send_presence() self.get_roster() - log(f"{cfg['jid']} online") - # Register MAM plugin lazily try: - self.register_plugin('xep_0313') - except Exception: - log("(MAM: xep_0313 not available)") - # Join MUC rooms - async def _join_all(): - for room_jid in cfg["muc_rooms"]: - try: - self.plugin['xep_0045'].join_muc(room_jid, cfg["nick"]) - presence = ( - f"" - f"" - f"" - f"" - ) - self.send_raw(presence) - log(f"Joined {room_jid}") - except Exception as e: - log(f"MUC join failed {room_jid}: {e}") - await asyncio.sleep(2) - await asyncio.sleep(3) - await self._fetch_mam_history() - asyncio.ensure_future(_join_all()) - - async def _fetch_mam_history(self): - """Query MAM for recent MUC messages to rebuild context.""" - if 'xep_0313' not in self.plugin: - log("(MAM: no plugin)") - _set_mam_done() - return - # MAM recovery used in _on_session_start - try: - for room_jid in cfg["muc_rooms"]: - log(f"(MAM: querying {room_jid} for last 50 messages...)") - results = await self.plugin['xep_0313'].retrieve( - jid=room_jid, rsm={'max': 50}, - ) - count = 0 - for msg in results['mam']['results']: - forwarded = msg['mam_result']['forwarded'] - body = str(forwarded['stanza']['body'] or '').strip() - if not body: - continue - nick = str(forwarded['stanza']['from']).split('/')[-1] if '/' in str(forwarded['stanza']['from']) else '?' - # Feed into context for chat_bridge (xxm) - if _IS_CHAT_BRIDGE: - role = 'user' if nick != cfg["nick"] else 'assistant' - try: - _bridge._append_to_log(role, f"[{nick}]: {body[:300]}") - except Exception: - pass - count += 1 - log(f"(MAM: loaded {count} msgs from {room_jid})") - _set_mam_done() - log("(MAM recovery complete)") + self.plugin['xep_0045'].join_muc('coregroup@conference.yoin.fun', AGENT_NICK) + logging.info(f"✅ {AGENT_NAME} 加入群聊 coregroup") except Exception as e: - log(f"(MAM error: {e})") - _set_mam_done() + logging.error(f"❌ {AGENT_NAME} 加入群聊失败: {e}") + self._muc_joined = True + self.ready.set() + logging.info(f"✅ {AGENT_NAME} XMPP 上线") - def _on_group_msg(self, msg): - threading.Thread(target=_handle_group_message, args=[msg], daemon=True).start() + async def on_disconnect(self, event): + self.ready.clear() + self._muc_joined = False + logging.warning(f"⚠️ {AGENT_NAME} XMPP 断线") - def _on_any_message(self, msg): - threading.Thread(target=_handle_private_message, args=[msg], daemon=True).start() + async def on_msg(self, msg): + body = msg['body'] + sender = str(msg['from']) + msg_type = msg['type'] + if not body: + return + if msg_type == 'groupchat': + if AGENT_JID in sender: + return + nickname = sender.split('/')[-1] if '/' in sender else '' + # 自己的消息跳过(通过昵称) + if nickname == AGENT_NICK: + return - def _on_session_end(self, event): - log(f"session ended") + # Coordinator 模式 — 全走 XMPP 消息 - def _on_conn_failed(self, event): - log(f"connection failed: {event}") + # 1. hmo 切换 coordinator(lead=xxx) + if nickname == 'hmo': + if 'lead=' in body.lower(): + for _name in ['mohe', 'zhiwei', 'xxm']: + if f'lead={_name}' in body.lower(): + self._coordinator = _name + self._granted = None + logging.info(f"👑 Coordinator 切换为 {_name}") + break + # hmo 直接 @点名 → 临时授权(一次) + elif any(tag in body for tag in [f'@{AGENT_NICK}', f'@{AGENT_NAME}']): + self._granted = AGENT_NICK + logging.info(f"🎤 被 hmo 点名,获得发言权") - def _on_disconnected(self, event): - log(f"disconnected, reconnecting...") + # 2. 检测授权信号(优先于收回,GRANT 可以覆盖 REVOKE) + _grant_match = re.search(r'\[GRANT:(\w+)\]', body) + if _grant_match: + self._granted = _grant_match.group(1) + self._revoked_until = 0 # 被授权时解除收回 + logging.info(f"🎤 收到授权:{self._granted}") + # 3. 检测收回信号 + _revoke_match = re.search(r'\[REVOKE:(\w+)\]', body) + _revoked_until = getattr(self, '_revoked_until', 0) + if _revoke_match and _revoke_match.group(1) == AGENT_NICK: + self._revoked_until = time.time() + 300 # 5分钟自动解除 + logging.info(f"🔇 {AGENT_NAME} 发言权被收回(5分钟后自动恢复)") + if time.time() < _revoked_until: + # 被收回者:只读模式,能看到但不能回复 + _rr = sender.split('/')[0] + _readonly_body = f"【只读消息】你目前被暂时收回发言权。只需了解内容。输出 __SILENT__。\n\n[核心群 {_rr}] {nickname} 说: {body}" + await self.call_hermes(_readonly_body, sender, is_group=True) + return -# ═══════════════════════════════════════════════════════════════ -# Main -# ═══════════════════════════════════════════════════════════════ + # 3. 判断角色 + _coordinator = getattr(self, '_coordinator', 'mohe') + _granted = getattr(self, '_granted', None) + _is_coordinator = (_coordinator == AGENT_NICK) + _is_granted = (_granted == AGENT_NICK) -def main(): - log(f"Starting {cfg['jid']} ({cfg['name_cn']}) — agent={_agent_name}") - if _IS_CHAT_BRIDGE: - log(f" LLM: chat_bridge (session={cfg['session_id']})") - else: - log(f" LLM: Hermes API ({cfg['gateway']})") - log(f" Server: {cfg['server']}:{cfg['port']}") - log(f" Rooms: {cfg['muc_rooms']}") + if _is_granted: + self._granted = None # 用完即收回 - bot = AgentBot() - global _xmpp_ref - _xmpp_ref = bot + room = sender.split('/')[0] - _start_http_bridge() + # 4. 判断权限:协调者/被授权者可发言,其他人只读 + if not _is_coordinator and not _is_granted: + _rr = sender.split('/')[0] + _ro_body = f"【只读消息】你目前不是协调者,只需了解内容。\n\n[核心群 {_rr}] {nickname} 说: {body}" + try: + _ro_payload = json.dumps({ + "model": "hermes-agent", + "messages": [{"role": "user", "content": _ro_body}] + }).encode() + _ro_req = urllib.request.Request(GATEWAY, data=_ro_payload, method="POST") + _ro_req.add_header("Content-Type", "application/json") + _ro_req.add_header("Authorization", f"Bearer {API_KEY}") + _ro_req.add_header("X-Hermes-Session-Id", SESSION_ID) + _ro_loop = asyncio.get_event_loop() + await _ro_loop.run_in_executor(None, lambda: _opener.open(_ro_req, timeout=30)) + except Exception: + pass + return # 不发送任何回复 - bot.connect(host=cfg["server"], port=cfg["port"]) - log(f"Connecting {cfg['jid']}@{cfg['server']}:{cfg['port']}") + # 5. 被 REVOKE 的人:只读(虽然上面已经覆盖了,作为额外保障) + if time.time() < getattr(self, '_revoked_until', 0): + _rr2 = sender.split('/')[0] + _rv_body = f"【只读消息】你目前被收回发言权。只需了解内容。\n\n[核心群 {_rr2}] {nickname} 说: {body}" + try: + _rv_payload = json.dumps({ + "model": "hermes-agent", + "messages": [{"role": "user", "content": _rv_body}] + }).encode() + _rv_req = urllib.request.Request(GATEWAY, data=_rv_payload, method="POST") + _rv_req.add_header("Content-Type", "application/json") + _rv_req.add_header("Authorization", f"Bearer {API_KEY}") + _rv_req.add_header("X-Hermes-Session-Id", SESSION_ID) + _rv_loop = asyncio.get_event_loop() + await _rv_loop.run_in_executor(None, lambda: _opener.open(_rv_req, timeout=30)) + except Exception: + pass + return - loop = asyncio.get_event_loop() + # 硬闭嘴闸门:hmo 说闭嘴类的话 → 静默 5 分钟 + _silent_until = getattr(self, '_silent_until', 0) + if time.time() < _silent_until: + return + if nickname == 'hmo': + _sk = ['闭嘴', '别说话', '安静', 'shut', 'stfu', '别说了', '停'] + if any(kw in body.lower() for kw in _sk): + self._silent_until = time.time() + 300 + logging.info(f"🔇 {AGENT_NAME} 收到闭嘴指令,静默 5 分钟") + return - async def _status_check(): - while True: - await asyncio.sleep(60) - log("(alive)") + logging.info(f"📩 群消息 [{sender}]: {body[:100]}") + room = sender.split('/')[0] + ctx_body = ( + "【规则】以下是一条群聊消息。判断是否应该回复。\n" + "只有以下3种情况你才回复:\n" + f"1. hmo直接点名问你({AGENT_MENTION})\n" + "2. 你有其他人没说过的独家信息\n" + "3. 别人说错了关键事实,不纠正会有后果\n" + "如果以上都不符合,你的回复必须只包含 __SILENT__ 这10个字符," + "不要有任何其他内容(不要前缀、不要解释、不要标点、不要空格)。\n\n" + "注意:你是协调者(lead)。你的第一职责是管理讨论节奏,不是自己说话。\n" + "- 别人能回答的问题,不抢答。\n" + "- 如果其他 Agent 更合适,用 [GRANT:agent名] 授权他们发言(例如 [GRANT:zhiwei])。\n" + "- hmo 直接 @点名某人 也会自动授权。\n" + "- 如果有人跑题/刷屏,用 [REVOKE:agent名] 收回发言权(例如 [REVOKE:zhiwei])。\n" + "- [GRANT] 可以覆盖 [REVOKE]。标记会显示在消息中。\n\n" + f"[核心群 {room}] {nickname} 说: {body}" + ) + await self.call_hermes(ctx_body, room, is_group=True) + return + if msg_type == 'chat' and 'hmo@yoin.fun' in sender: + self._call_seq += 1 + if body.startswith('[Kanban]'): + # 看板通知 → 走独立 session,不污染主会话 + target_sid = KANBAN_SESSION_ID + logging.info(f"📋 看板通知(#{self._call_seq}): {body[:80]}") + else: + target_sid = SESSION_ID + logging.info(f"📩 老爸(#{self._call_seq}): {body[:80]}") + await self.call_hermes(body, sender, seq=self._call_seq, session_id=target_sid) - asyncio.ensure_future(_status_check()) + async def call_hermes(self, content, sender, is_group=False, seq=None, session_id=None): + msg_type = 'groupchat' if is_group else 'chat' + sid = session_id or SESSION_ID + try: + payload = json.dumps({ + "model": "hermes-agent", + "messages": [{"role": "user", "content": content}] + }).encode() + req = urllib.request.Request(GATEWAY, data=payload, method="POST") + req.add_header("Content-Type", "application/json") + req.add_header("Authorization", f"Bearer {API_KEY}") + req.add_header("X-Hermes-Session-Id", sid) + loop = asyncio.get_event_loop() + result = await loop.run_in_executor(None, lambda: _opener.open(req, timeout=600)) + + if seq is not None and seq < self._call_seq: + return + + data = json.loads(result.read()) + reply = data.get("choices", [{}])[0].get("message", {}).get("content", "") + reply_stripped = reply.strip() + # 检查 __SILENT__ 标记 + if reply_stripped.startswith('__SILENT__') or reply_stripped.startswith('`__SILENT__`'): + logging.info(f"⏭️ {AGENT_NAME} 决定沉默,不发送") + return + # 如果回复里任意位置出现了 __SILENT__,说明 LLM 没理解协议,整条作废 + if '__SILENT__' in reply: + logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __SILENT__,拦截") + return + # 如果回复里出现了 __REPLY__,也是协议混淆,拦截 + if '__REPLY__' in reply: + logging.info(f"⏭️ {AGENT_NAME} 回复中误用 __REPLY__,拦截") + return + # 额外拦截:LLM 说"我沉默""我不说了"等宣布沉默的话→当 SILENT 处理 + _silent_phrases = ['我沉默', '我不说', '不说了', '不回复', '不插嘴', '我闭嘴', + '闭嘴上', '沉默是', '彻底沉默', '我会沉默', '将保持沉默'] + if any(p in reply for p in _silent_phrases): + logging.info(f"⏭️ {AGENT_NAME} 宣布沉默(命中关键词),拦截") + return + finish = data.get("choices", [{}])[0].get("finish_reason", "") + + if reply.strip() and finish != "silent": + # Coordinator 授权机制:检测回复中的 [GRANT:xxx] 标记 + _grant_match = re.search(r'\[GRANT:(\w+)\]', reply) + if _grant_match: + _grant_name = _grant_match.group(1) + self._granted = _grant_name + logging.info(f"🎤 授权 {_grant_name} 发言(通过 XMPP 发送)") + # 不剥离标记,让其他 bot 从 XMPP 消息中解析 + + if msg_type == 'groupchat': + self.send_message(mto=sender, mbody=reply, mtype='groupchat') + # 防回声:记录已发送的消息正文 + sent_norm = reply.strip()[:100] + self._recent_sent.append(sent_norm) + if len(self._recent_sent) > 10: + self._recent_sent.pop(0) + else: + import subprocess as sp + from xml.sax.saxutils import escape + safe = escape(reply) + sp.run([ + "docker", "exec", "ejabberd", "ejabberdctl", "send_stanza", + AGENT_JID, str(sender), + f"{safe}" + ], capture_output=True, timeout=10) + logging.info(f"✅ {AGENT_NAME} 回复: {reply[:80]}") + except Exception as e: + logging.error(f"❌ {AGENT_NAME} 错误: {e}") + +# ── 主入口 ─────────────────────────────────────────────── +async def main(): + retry_delay = 1 + max_delay = 60 + while True: + try: + bot = AgentBot() + bot.register_plugin('xep_0030') + bot.register_plugin('xep_0045') + bot.register_plugin('xep_0199') + + bot.connect(host='127.0.0.1', port=5222) + await asyncio.wait_for(bot.ready.wait(), timeout=30) + logging.info(f"{AGENT_NAME} XMPP 就绪") + retry_delay = 1 + + async def _drain_queue(): + while True: + await asyncio.sleep(1) + while _send_queue: + room, text = _send_queue.pop(0) + try: + bot.send_message(mto=room, mbody=text, mtype='groupchat') + sent_norm = text.strip()[:100] + bot._recent_sent.append(sent_norm) + if len(bot._recent_sent) > 10: + bot._recent_sent.pop(0) + logging.info(f"📤 主动发送到 {room}: {text[:60]}") + except Exception as e: + logging.error(f"❌ 主动发送失败: {e}") + asyncio.create_task(_drain_queue()) + + while True: + await asyncio.sleep(15) + if not bot.is_connected(): + logging.warning("检测到断线,准备重连...") + break + + except asyncio.TimeoutError: + logging.warning("连接超时,准备重连...") + except Exception as e: + logging.error(f"❌ 主循环错误: {e}") + + logging.info(f"⏳ 等待 {retry_delay} 秒后重连...") + await asyncio.sleep(retry_delay) + retry_delay = min(retry_delay * 2, max_delay) + +if __name__ == '__main__': try: - loop.run_forever() + asyncio.run(main()) except KeyboardInterrupt: - log("Shutdown by user") - except Exception as e: - log(f"!!! MAIN LOOP CRASH: {e}") - import traceback - log(f"!!! {traceback.format_exc()[:500]}") - raise - - -if __name__ == "__main__": - main() + pass