#!/usr/bin/env python3 """ MoWeChat — 莫荷微信 Bot (Linux iLink 版) 替换方案:将 Windows wxhelper DLL 注入方案替换为腾讯官方 iLink Bot API。 架构:微信 → iLink Bot API (ilinkai.weixin.qq.com) → Hermes Gateway (:8642) 依赖: pip install weixin-bot-sdk aiohttp requests 首次运行: python3 wechat_agent.py 终端显示二维码 → 用莫荷的手机微信扫码 → 凭证保存后自动运行 版本: 1.0 """ from __future__ import annotations import asyncio import json import logging import os import re import sys import threading import time import uuid from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse from weixin_bot import WeixinBot, IncomingMessage # ── Configuration ────────────────────────────────────────────── SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) PROJECT_ROOT = os.path.dirname(SCRIPT_DIR) # gateway/ AGENTS_ROOT = os.path.dirname(PROJECT_ROOT) # AgentsMeeting/ LOG_DIR = os.path.join(AGENTS_ROOT, "logs") TEMP_DIR = os.path.join(AGENTS_ROOT, "temp") # Hermes Gateway — same endpoint as Windows wechat_agent.py uses HERMES_API = "http://192.168.1.246:8642/v1/chat/completions" HERMES_KEY = "hermes123" # Mohe's iLink Bot user_id (set after login) MOHE_USER_ID = None # ── Setup ────────────────────────────────────────────────────── os.makedirs(LOG_DIR, exist_ok=True) os.makedirs(TEMP_DIR, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%H:%M:%S", handlers=[ logging.FileHandler(os.path.join(LOG_DIR, "wechat_agent_linux.log"), encoding="utf-8"), logging.StreamHandler(), ], ) log = logging.getLogger("mohe") # ── PID lock ─────────────────────────────────────────────────── PID_FILE = os.path.join(TEMP_DIR, "wechat_agent_linux.pid") def acquire_pid_lock(): """Write PID file, exit if another instance is running.""" if os.path.exists(PID_FILE): try: with open(PID_FILE) as f: old_pid = int(f.read().strip()) # Check if process still exists os.kill(old_pid, 0) log.error(f"Another instance running (PID {old_pid}). Exiting.") sys.exit(1) except (ProcessLookupError, ValueError): pass # Stale PID file with open(PID_FILE, "w") as f: f.write(str(os.getpid())) def release_pid_lock(): try: os.remove(PID_FILE) except OSError: pass # ── Hermes Gateway API ───────────────────────────────────────── import requests as _requests def call_hermes(wxid_or_user: str, content: str) -> str | None: """ Send message to Hermes Gateway, get reply. Returns the reply text, or None if silent/no reply. """ headers = { "Authorization": f"Bearer {HERMES_KEY}", "X-Hermes-Session-Id": "sisyphus", "Content-Type": "application/json", } is_group = "@chatroom" in wxid_or_user system_prompt = ( "你是莫荷,女生。群聊中回复要短。" if is_group else "你是莫荷,女生。回复简短自然,像朋友聊天。" ) # Prefix with sender info like the Windows version does chat_type = "Group" if is_group else "Private" prefixed = f"[{chat_type}][{wxid_or_user}] {content}" body = { "model": "hermes-agent", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prefixed}, ], } try: r = _requests.post( HERMES_API, json=body, headers=headers, timeout=60, ) if r.status_code == 200: data = r.json() choice = data["choices"][0] finish_reason = choice.get("finish_reason", "") if finish_reason == "silent": log.info("Hermes: __SILENT__ (group skip)") return None return choice["message"]["content"] log.warning(f"Hermes HTTP {r.status_code}: {r.text[:200]}") except Exception as e: log.error(f"Hermes API error: {e}") return None # ── Image Generation (SenseNova) ─────────────────────────────── SENSENOVA_KEY = "sk-aRNj3UwKSLPsDfh15QNTPwbHxahblfaO" SENSENOVA_URL = "https://token.sensenova.cn/v1" def generate_image(prompt: str, ratio: str = "1:1") -> str | None: """Generate image via SenseNova API. Returns URL or None.""" size_map = { "1:1": "2048x2048", "16:9": "2752x1536", "9:16": "1536x2752", "3:2": "2496x1664", "2:3": "1664x2496", "3:4": "1760x2368", "4:3": "2368x1760", } size = size_map.get(ratio, "2048x2048") try: r = _requests.post( SENSENOVA_URL + "/images/generations", json={ "model": "sensenova-u1-fast", "prompt": prompt, "size": size, "response_format": "url", }, headers={ "Authorization": f"Bearer {SENSENOVA_KEY}", "Content-Type": "application/json", }, timeout=180, ) if r.status_code == 200: return r.json()["data"][0]["url"] log.warning(f"Image gen HTTP {r.status_code}: {r.text[:200]}") except Exception as e: log.error(f"Image gen error: {e}") return None # ── OCR via Doubao Vision API ────────────────────────────────── DOUBAO_KEY = "b0359bed-09f2-49e2-a53c-32ba057412e3" def ocr_image_from_url(img_url: str) -> str | None: """Download image from URL and OCR it. Returns text or None.""" try: r = _requests.get(img_url, timeout=30) if r.status_code != 200: log.warning(f"Image download HTTP {r.status_code}") return None import base64 b64 = base64.b64encode(r.content).decode() return _ocr_base64(b64) except Exception as e: log.error(f"Image download/OCR error: {e}") return None def _ocr_base64(b64_data: str) -> str | None: """OCR from base64-encoded image data.""" try: r = _requests.post( "https://ark.cn-beijing.volces.com/api/coding/v3/chat/completions", json={ "model": "doubao-seed-code", "messages": [{ "role": "user", "content": [ {"type": "text", "text": "请识别这张图片中的所有中文和英文字符,保持原文输出,包括数字、表格、百分比的完整结构。严格逐行逐列输出所有数据,不要省略、不要总结。"}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64_data}"}}, ], }], }, headers={"Authorization": f"Bearer {DOUBAO_KEY}", "Content-Type": "application/json"}, timeout=60, ) if r.status_code == 200: text = r.json()["choices"][0]["message"]["content"].strip() log.info(f"OCR OK ({len(text)} chars)") return text log.warning(f"OCR HTTP {r.status_code}: {r.text[:200]}") except Exception as e: log.error(f"OCR error: {e}") return None # ── Article Processor ────────────────────────────────────────── def fetch_article(url: str) -> dict | None: """Fetch article content via local article_processor (:5810).""" try: import urllib.request as ur req = ur.Request( "http://127.0.0.1:5810/process", data=json.dumps({"url": url}).encode("utf-8"), headers={"Content-Type": "application/json"}, ) with ur.urlopen(req, timeout=180) as resp: result = json.loads(resp.read().decode("utf-8")) if result.get("status") == "ok": return { "title": result.get("title", ""), "content": result.get("content", "")[:3000], "images": result.get("images_ocr", 0), } log.warning(f"Article processor error: {result.get('error','')[:100]}") except Exception as e: log.error(f"Article fetch error: {e}") return None # ── Main Message Handler ────────────────────────────────────── def detect_group(msg: IncomingMessage) -> str: """ Detect if a message is from a group chat. Returns the group_id if it's a group, or empty string if private chat. """ raw = msg.raw # Check group_id field directly from protocol gid = raw.get("group_id", "") or "" if gid: return gid # Also check session_id for group patterns (contains 'chatroom' or multiple '@') session_id = raw.get("session_id", "") or "" if "@chatroom" in session_id: return session_id.split("#")[0] if "#" in session_id else session_id return "" def extract_article_url(text: str) -> str | None: """Extract article URL from text. Supports WeChat public accounts, Xiaohongshu, and common share links.""" patterns = [ r"https?://mp\.weixin\.qq\.com[^\"'\s<)<>\[\]]+", r"https?://(?:www\.)?xiaohongshu\.com[^\"'\s<)<>\[\]]+", r"https?://xhslink\.com[^\"'\s<)<>\[\]]+", r"https?://(?:www\.)?zhihu\.com[^\"'\s<)<>\[\]]+", ] for p in patterns: m = re.search(p, text) if m: return m.group(0) return None async def handle_incoming(msg: IncomingMessage, bot: WeixinBot): """Process an incoming message from WeChat via iLink SDK.""" log.info(f"[{msg.type}] {msg.user_id}: {(msg.text or '')[:80]}") fu = msg.user_id content = msg.text or "" msg_type = msg.type # ── Group chat detection ── group_id = detect_group(msg) if group_id: log.info(f"GROUP message from {group_id}, sender={fu}") # Use the group_id as the conversation identifier (like Windows version uses roomid) conv_id = group_id prefix = "[Group]" else: conv_id = fu prefix = "[Private]" # ── TEXT message ── if msg_type == "text": handler_input = content # Check for ref_msg (forwarded/quoted content like articles) ref_text = "" for item in msg.raw.get("item_list", []): ref_msg = item.get("ref_msg") if ref_msg: ref_title = ref_msg.get("title", "") ref_item = ref_msg.get("message_item", {}) if isinstance(ref_item, dict) and ref_item.get("type") == 1: ref_text_data = ref_item.get("text_item", {}).get("text", "") if ref_text_data: ref_text = f"\n[引用消息] {ref_text_data[:500]}" if ref_title: handler_input = f"[老莫转发了一篇文章] 标题: {ref_title}\n\n{content}{ref_text}" # Detect article URLs (WeChat public account, Xiaohongshu, etc.) article_url = extract_article_url(handler_input) if article_url: log.info(f"Article URL detected: {article_url[:80]}") article = fetch_article(article_url) if article: title = article.get("title", "") article_text = article.get("content", "")[:2000] images = article.get("images", 0) handler_input = ( f"[老莫转发了一篇文章]\n标题: {title}\n" + (f"({images}张图片已OCR)\n" if images else "") + f"\n{article_text}" ) else: # Article processor failed, send original content log.warning("Article processor returned no content, sending raw text") reply = call_hermes(conv_id, handler_input) if reply and reply.strip(): await process_reply(reply, conv_id, bot) # ── IMAGE message ── elif msg_type == "image": log.info(f"Image from {conv_id}, attempting OCR...") # Get image URL from the raw message img_url = None for item in msg.raw.get("item_list", []): if item.get("type") in (2,): # IMAGE img_item = item.get("image_item", {}) img_url = img_item.get("url", "") if not img_url: # CDN media - download via CDN API media = img_item.get("media", {}) if media: log.info("Image has CDN media, attempting CDN download...") img_url = download_cdn_image(media) break ocr_text = None if img_url: ocr_text = ocr_image_from_url(img_url) if not ocr_text: log.warning("OCR returned no text from image URL") else: log.info("No image URL available, cannot OCR") if ocr_text: handler_input = f"[老莫发送了一张图片,OCR识别结果如下]\n{ocr_text}" else: handler_input = "[老莫发送了一张图片,但无法识别图片内容]" reply = call_hermes(conv_id, handler_input) if reply and reply.strip(): await bot.reply(msg, reply.strip()) # ── VOICE message ── elif msg_type == "voice": reply = call_hermes(conv_id, "[voice message]") if reply and reply.strip(): await bot.reply(msg, reply.strip()) # ── Unknown type ── else: log.info(f"Unhandled message type: {msg_type}") def download_cdn_image(media: dict) -> str | None: """ Download an image from WeChat CDN using the iLink media protocol. The media dict contains aes_key and encrypt_query_param for AES-128-ECB decryption. For now, this is a placeholder - CDN download requires AES decryption. """ logger = log logger.info(f"CDN media available but direct download not yet implemented") logger.debug(f"CDN media keys: {list(media.keys())}") return None async def process_reply(reply: str, fu: str, bot: WeixinBot): """ Process Hermes reply text, handling tags like [FILE], [IMG], [EMOJI]. Uses bot.reply(msg) or bot.send(user_id, text) based on context. """ if not reply or not reply.strip(): return clean = reply # ── [FILE] tag ── fm = re.search(r'\[FILE\](.*?)\[/FILE\]', clean) if fm: url = fm.group(1).strip() log.info(f"[FILE] URL: {url}") # iLink doesn't natively support file sending via simple URL. # Send as text with download link for now. clean = re.sub(r'\s*\[FILE\].*?\[/FILE\]\s*', '', clean).strip() extra = f"\n文件链接: {url}" if clean.strip(): clean += extra else: clean = f"文件: {url}" # ── [IMG] tag ── im = re.search(r'\[IMG\](.*?)\[/IMG\]', clean) if im: cmd = im.group(1).strip() clean = re.sub(r'\s*\[IMG\].*?\[/IMG\]\s*', '', clean).strip() if cmd.startswith("generate:") or cmd.startswith("draw:"): parts = cmd.split(":", 1)[1].strip() ratio = "1:1" if "|" in parts: ratio = parts.split("|")[1].strip() prompt = parts.split("|")[0].strip() else: prompt = parts log.info(f"[IMG] generate: {prompt[:40]} [{ratio}]") img_url = generate_image(prompt, ratio) if img_url: # For images, we can't send via iLink natively yet. # Send the URL as a text message. extra = f"\n图片已生成: {img_url}" if clean.strip(): clean += extra else: clean = f"图片已生成: {img_url}" else: extra = "\n[图片生成失败]" clean += extra if clean.strip() else extra.strip() else: # Regular image URL extra = f"\n图片: {cmd}" if clean.strip(): clean += extra else: clean = f"图片: {cmd}" # ── [EMOJI] tag ── em = re.search(r'\[EMOJI\](.*?)\[/EMOJI\]', clean) if em: url = em.group(1).strip() log.info(f"[EMOJI] URL: {url}") clean = re.sub(r'\s*\[EMOJI\].*?\[/EMOJI\]\s*', '', clean).strip() # ── [CONTACT] tag — not supported via iLink ── clean = re.sub(r'\s*\[CONTACT:\w+\]\s*', '', clean) # ── [ROOM_MEMBERS] tag — not supported via iLink ── clean = re.sub(r'\s*\[ROOM_MEMBERS:\S+\]\s*', '', clean) # ── [HISTORY] tag — not supported via iLink ── clean = re.sub(r'\s*\[HISTORY:\S+?:\d+\]\s*', '', clean) # ── [PAT] tag — not supported via iLink ── clean = re.sub(r'\s*\[PAT:\S+:\S+\]\s*', '', clean) # ── Send remaining text ── clean = clean.strip() if clean: await bot.send(fu, clean) # ── 5801 HTTP Server ────────────────────────────────────────── # Receives messages from Hermes/xxm to forward to WeChat. # Uses asyncio queue to bridge sync HTTP → async iLink SDK. _message_queue: asyncio.Queue[dict] = asyncio.Queue() _http_server_ready = threading.Event() class HermesMsgHandler(BaseHTTPRequestHandler): """HTTP server that receives Hermes messages and queues them for iLink sending.""" def do_POST(self): body = self.rfile.read(int(self.headers.get("Content-Length", 0))) try: d = json.loads(body) log.info(f"5801 POST: {json.dumps(d, ensure_ascii=False)[:200]}") to = d.get("to", "") or d.get("wxid", "") msg = d.get("message", "") or d.get("content", "") path = self.path # History query endpoint if path in ("/history", "/api/chatHistory"): wxid = d.get("wxid", "") or d.get("to", "") count = d.get("count", 10) or d.get("limit", 10) self._send_json({ "ok": True, "note": "iLink SDK does not support history query. Use Hermes session_search instead.", "messages": [], }) return # Stop endpoint if path == "/stop": _message_queue.put_nowait({"type": "stop"}) self._send_json({"ok": True, "status": "stopped"}) return # Queue the message for async processing (queue consumed by the iLink message loop) _message_queue.put_nowait({ "type": "outgoing", "to": to, "message": msg, }) except Exception as e: log.error(f"5801 handler error: {e}") self.send_response(200) self.end_headers() def do_GET(self): parsed = urlparse(self.path) if parsed.path == "/health": self._send_json({ "ok": True, "platform": "linux-ilink", "mohe_user_id": MOHE_USER_ID or "not_logged_in", }) return if parsed.path in ("/history", "/api/chatHistory"): import urllib.parse as up params = up.parse_qs(parsed.query) wxid = params.get("wxid", [""])[0] count = params.get("count", ["10"])[0] self._send_json({ "ok": True, "note": "iLink SDK does not support history query.", "messages": [], }) return self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(b'{"ok":true}') def _send_json(self, data): body = json.dumps(data, ensure_ascii=False).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def log_message(self, *a): pass def start_http_server(): """Start the sync HTTP server in a daemon thread.""" server = HTTPServer(("0.0.0.0", 5801), HermesMsgHandler) t = threading.Thread(target=server.serve_forever, daemon=True) t.start() log.info("HTTP :5801 ready (Hermes message receiver)") _http_server_ready.set() return server # ── Queue Consumer ──────────────────────────────────────────── # Consumes the 5801 message queue inside the iLink event loop. async def consume_outgoing_queue(bot: WeixinBot): """ Async task: consumes messages from 5801 queue and sends via iLink. Runs inside the same event loop as the iLink bot. """ while True: try: item = await _message_queue.get() msg_type = item.get("type") if msg_type == "stop": log.info("Queue consumer: stop signal received") bot.stop() break if msg_type == "outgoing": to = item.get("to", "") msg = item.get("message", "") if to and msg: try: await bot.send(to, msg) log.info(f"5801 -> {to}: {msg[:80]}") except RuntimeError as e: log.warning(f"5801 send failed (no context token for {to}): {e}") except Exception as e: log.error(f"5801 send error: {e}") except asyncio.CancelledError: break except Exception as e: log.error(f"Queue consumer error: {e}") # ── Outbound Message Router (replacement for session_router) ── # Handles messages from xxm/Hermes that need to be sent to WeChat. # In the iLink world, all outgoing messages go through bot.send(). async def route_hermes_message(msg_text: str, bot: WeixinBot): """Route a message from Hermes to WeChat via iLink.""" # For now, route to Dad's user_id if known. # In practice, the 5801 server with 'to' field is the primary path. log.info(f"Hermes message: {msg_text[:100]}") # The 5801 server handles the actual sending with explicit 'to' field. # This function is for messages without an explicit target. # ── Main ────────────────────────────────────────────────────── def main(): acquire_pid_lock() log.info("=== MoWeChat Agent (Linux/iLink v1.0) ===") # Start 5801 HTTP server (in a thread) http_server = start_http_server() _http_server_ready.wait(timeout=5) # Create and run the iLink bot bot = WeixinBot() try: # Login (QR scan first time, then auto) log.info("Logging in to iLink Bot API...") creds = bot.login() global MOHE_USER_ID MOHE_USER_ID = creds.user_id log.info(f"Logged in as {MOHE_USER_ID}") except Exception as e: log.error(f"Login failed: {e}") release_pid_lock() sys.exit(1) # Register message handler @bot.on_message async def handle(msg: IncomingMessage): await handle_incoming(msg, bot) # Register queue consumer (runs inside bot's event loop) # We schedule this before bot.run() by patching into the _run_loop flow. # Since _run_loop is private, we use a different approach: # Override the start of _run_loop by wrapping it. # Actually, bot.run() creates its own event loop with asyncio.run(). # We need to hook into that loop. Let's save the original _run_loop # and wrap it to also start the queue consumer. original_run_loop = bot._run_loop async def wrapped_run_loop(): # Start queue consumer as a background task asyncio.create_task(consume_outgoing_queue(bot)) # Run the original loop await original_run_loop() bot._run_loop = wrapped_run_loop try: log.info("Bot starting. Press Ctrl+C to stop.") bot.run() except KeyboardInterrupt: log.info("Shutting down...") except Exception as e: log.error(f"Bot error: {e}") finally: bot.stop() http_server.shutdown() release_pid_lock() log.info("Stopped.") if __name__ == "__main__": main()