feat: 莫荷微信Bot Linux iLink版

新增 gateway/linux/ 目录,基于腾讯官方 iLink Bot API 的 Linux 原生实现。
替代 Windows wxhelper DLL 注入方案。

- wechat_agent.py: 核心 Agent (消息处理/OCR/文章处理/图片生成/5801服务)
- requirements.txt: weixin-bot-sdk + aiohttp + requests
- mohe-wechat.service: systemd 服务单元
- .env.example: 配置模板
- README.md: 使用说明

工作原理: iLink Bot API (ilinkai.weixin.qq.com)
  → QR扫码登录 → 长轮询收消息 → Hermes Gateway(:8642) 处理 → 回复
This commit is contained in:
2026-06-23 20:39:56 +08:00
parent 4cf125231e
commit 7727e907e6
5 changed files with 729 additions and 0 deletions
+11
View File
@@ -0,0 +1,11 @@
# 环境变量配置
# Hermes Gateway 地址
HERMES_API=http://192.168.1.246:8642/v1/chat/completions
HERMES_KEY=hermes123
# SenseNova 图像生成
SENSENOVA_KEY=sk-aRNj3UwKSLPsDfh15QNTPwbHxahblfaO
# Doubao OCR
DOUBAO_KEY=b0359bed-09f2-49e2-a53c-32ba057412e3
+52
View File
@@ -0,0 +1,52 @@
# MoWeChat — 莫荷微信 Bot (Linux iLink 版)
将莫荷的微信 bot 从 Windows wxhelper DLL 注入方案迁移到 Linux 原生运行的腾讯官方 iLink Bot API。
## 架构
```
微信 → iLink Bot API (ilinkai.weixin.qq.com) → wechat_agent.py → Hermes Gateway (:8642) → Agent
```
- **iLink Bot API**: 腾讯 2026 年开放的官方微信个人号 Bot 接口
- **weixin-bot-sdk**: Python SDK,处理 QR 登录、长轮询收消息、发消息
- **Hermes Gateway**: 原有的 LLM 处理管道,无变动
## 快速开始
```bash
cd gateway/linux
python3 -m venv .venv
.venv/bin/pip install -r requirements.txt
python3 wechat_agent.py
```
首次运行会显示二维码,用莫荷的手机微信扫码登录。凭证保存在 `~/.weixin-bot/credentials.json`,后续运行自动跳过扫码。
## 与 Windows 版的差异
| 功能 | Windows (wxhelper) | Linux (iLink) |
|------|-------------------|---------------|
| 消息收发 | DLL 注入 → HTTP | 官方 API → HTTP |
| QR 登录 | 降版本微信手动登录 | 终端二维码扫码 |
| 图片 OCR | doubao API | doubao API ✅ |
| 文章处理 | article_processor | article_processor ✅ |
| 图片生成 | SenseNova | SenseNova ✅ |
| 联系人查询 | wxhelper API | ❌ iLink 不支持 |
| 历史记录查询 | wxhelper SQL | ❌ 改用 session_search |
| 文件发送 | wxhelper sendFile | 发送下载链接 |
| 5801 端口 | 支持 | 支持 ✅ |
## 服务管理
systemd service:
```bash
sudo cp mohe-wechat.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable mohe-wechat
sudo systemctl start mohe-wechat
```
## 日志
日志文件:`../../logs/wechat_agent_linux.log`
+17
View File
@@ -0,0 +1,17 @@
[Unit]
Description=MoWeChat — 莫荷微信 Bot (Linux iLink 版)
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=hmo
WorkingDirectory=/home/hmo/projects/AgentsMeeting/gateway/linux
ExecStart=/home/hmo/projects/AgentsMeeting/.venv/bin/python3 /home/hmo/projects/AgentsMeeting/gateway/linux/wechat_agent.py
Restart=on-failure
RestartSec=5
StandardOutput=append:/home/hmo/projects/AgentsMeeting/logs/wechat_agent_linux.log
StandardError=append:/home/hmo/projects/AgentsMeeting/logs/wechat_agent_linux.log
[Install]
WantedBy=multi-user.target
+3
View File
@@ -0,0 +1,3 @@
weixin-bot-sdk>=0.2.0
aiohttp>=3.9
requests>=2.28
+646
View File
@@ -0,0 +1,646 @@
#!/usr/bin/env python3
"""
MoWeChat — 莫荷微信 Bot (Linux iLink 版)
替换方案:将 Windows wxhelper DLL 注入方案替换为腾讯官方 iLink Bot API。
架构:微信 → iLink Bot API (ilinkai.weixin.qq.com) → Hermes Gateway (:8642)
依赖:
pip install weixin-bot-sdk aiohttp requests
首次运行:
python3 wechat_agent.py
终端显示二维码 → 用莫荷的手机微信扫码 → 凭证保存后自动运行
版本: 1.0
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import sys
import threading
import time
import uuid
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse
from weixin_bot import WeixinBot, IncomingMessage
# ── Configuration ──────────────────────────────────────────────
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.dirname(SCRIPT_DIR) # gateway/
AGENTS_ROOT = os.path.dirname(PROJECT_ROOT) # AgentsMeeting/
LOG_DIR = os.path.join(AGENTS_ROOT, "logs")
TEMP_DIR = os.path.join(AGENTS_ROOT, "temp")
# Hermes Gateway — same endpoint as Windows wechat_agent.py uses
HERMES_API = "http://192.168.1.246:8642/v1/chat/completions"
HERMES_KEY = "hermes123"
# Mohe's iLink Bot user_id (set after login)
MOHE_USER_ID = None
# ── Setup ──────────────────────────────────────────────────────
os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(TEMP_DIR, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
handlers=[
logging.FileHandler(os.path.join(LOG_DIR, "wechat_agent_linux.log"), encoding="utf-8"),
logging.StreamHandler(),
],
)
log = logging.getLogger("mohe")
# ── PID lock ───────────────────────────────────────────────────
PID_FILE = os.path.join(TEMP_DIR, "wechat_agent_linux.pid")
def acquire_pid_lock():
"""Write PID file, exit if another instance is running."""
if os.path.exists(PID_FILE):
try:
with open(PID_FILE) as f:
old_pid = int(f.read().strip())
# Check if process still exists
os.kill(old_pid, 0)
log.error(f"Another instance running (PID {old_pid}). Exiting.")
sys.exit(1)
except (ProcessLookupError, ValueError):
pass # Stale PID file
with open(PID_FILE, "w") as f:
f.write(str(os.getpid()))
def release_pid_lock():
try:
os.remove(PID_FILE)
except OSError:
pass
# ── Hermes Gateway API ─────────────────────────────────────────
import requests as _requests
def call_hermes(wxid_or_user: str, content: str) -> str | None:
"""
Send message to Hermes Gateway, get reply.
Returns the reply text, or None if silent/no reply.
"""
headers = {
"Authorization": f"Bearer {HERMES_KEY}",
"X-Hermes-Session-Id": "sisyphus",
"Content-Type": "application/json",
}
is_group = "@chatroom" in wxid_or_user
system_prompt = (
"你是莫荷,女生。群聊中回复要短。"
if is_group
else "你是莫荷,女生。回复简短自然,像朋友聊天。"
)
# Prefix with sender info like the Windows version does
chat_type = "Group" if is_group else "Private"
prefixed = f"[{chat_type}][{wxid_or_user}] {content}"
body = {
"model": "hermes-agent",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prefixed},
],
}
try:
r = _requests.post(
HERMES_API,
json=body,
headers=headers,
timeout=60,
)
if r.status_code == 200:
data = r.json()
choice = data["choices"][0]
finish_reason = choice.get("finish_reason", "")
if finish_reason == "silent":
log.info("Hermes: __SILENT__ (group skip)")
return None
return choice["message"]["content"]
log.warning(f"Hermes HTTP {r.status_code}: {r.text[:200]}")
except Exception as e:
log.error(f"Hermes API error: {e}")
return None
# ── Image Generation (SenseNova) ───────────────────────────────
SENSENOVA_KEY = "sk-aRNj3UwKSLPsDfh15QNTPwbHxahblfaO"
SENSENOVA_URL = "https://token.sensenova.cn/v1"
def generate_image(prompt: str, ratio: str = "1:1") -> str | None:
"""Generate image via SenseNova API. Returns URL or None."""
size_map = {
"1:1": "2048x2048", "16:9": "2752x1536", "9:16": "1536x2752",
"3:2": "2496x1664", "2:3": "1664x2496", "3:4": "1760x2368", "4:3": "2368x1760",
}
size = size_map.get(ratio, "2048x2048")
try:
r = _requests.post(
SENSENOVA_URL + "/images/generations",
json={
"model": "sensenova-u1-fast",
"prompt": prompt,
"size": size,
"response_format": "url",
},
headers={
"Authorization": f"Bearer {SENSENOVA_KEY}",
"Content-Type": "application/json",
},
timeout=180,
)
if r.status_code == 200:
return r.json()["data"][0]["url"]
log.warning(f"Image gen HTTP {r.status_code}: {r.text[:200]}")
except Exception as e:
log.error(f"Image gen error: {e}")
return None
# ── OCR via Doubao Vision API ──────────────────────────────────
DOUBAO_KEY = "b0359bed-09f2-49e2-a53c-32ba057412e3"
def ocr_image_from_url(img_url: str) -> str | None:
"""Download image from URL and OCR it. Returns text or None."""
try:
r = _requests.get(img_url, timeout=30)
if r.status_code != 200:
log.warning(f"Image download HTTP {r.status_code}")
return None
import base64
b64 = base64.b64encode(r.content).decode()
return _ocr_base64(b64)
except Exception as e:
log.error(f"Image download/OCR error: {e}")
return None
def _ocr_base64(b64_data: str) -> str | None:
"""OCR from base64-encoded image data."""
try:
r = _requests.post(
"https://ark.cn-beijing.volces.com/api/coding/v3/chat/completions",
json={
"model": "doubao-seed-code",
"messages": [{
"role": "user",
"content": [
{"type": "text", "text": "请识别这张图片中的所有中文和英文字符,保持原文输出,包括数字、表格、百分比的完整结构。严格逐行逐列输出所有数据,不要省略、不要总结。"},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64_data}"}},
],
}],
},
headers={"Authorization": f"Bearer {DOUBAO_KEY}", "Content-Type": "application/json"},
timeout=60,
)
if r.status_code == 200:
text = r.json()["choices"][0]["message"]["content"].strip()
log.info(f"OCR OK ({len(text)} chars)")
return text
log.warning(f"OCR HTTP {r.status_code}: {r.text[:200]}")
except Exception as e:
log.error(f"OCR error: {e}")
return None
# ── Article Processor ──────────────────────────────────────────
def fetch_article(url: str) -> dict | None:
"""Fetch article content via local article_processor (:5810)."""
try:
import urllib.request as ur
req = ur.Request(
"http://127.0.0.1:5810/process",
data=json.dumps({"url": url}).encode("utf-8"),
headers={"Content-Type": "application/json"},
)
with ur.urlopen(req, timeout=180) as resp:
result = json.loads(resp.read().decode("utf-8"))
if result.get("status") == "ok":
return {
"title": result.get("title", ""),
"content": result.get("content", "")[:3000],
"images": result.get("images_ocr", 0),
}
log.warning(f"Article processor error: {result.get('error','')[:100]}")
except Exception as e:
log.error(f"Article fetch error: {e}")
return None
# ── Main Message Handler ──────────────────────────────────────
def build_message_display_id(msg: IncomingMessage, bot: WeixinBot) -> str:
"""
Build a display identifier for the sender.
Since iLink uses internal user_ids, we use the user_id as the identifier
and supplement with available info.
"""
return msg.user_id
async def handle_incoming(msg: IncomingMessage, bot: WeixinBot):
"""Process an incoming message from WeChat via iLink SDK."""
log.info(f"[{msg.type}] {msg.user_id}: {(msg.text or '')[:80]}")
fu = msg.user_id
content = msg.text or ""
msg_type = msg.type
# ── text message ──
if msg_type == "text":
handler_input = content
# Detect forwarded articles
if "mp.weixin.qq.com" in content:
url_match = re.search(r"https?://mp\.weixin\.qq\.com[^\"'\s<)<>\[\]]+", content)
if url_match:
article = fetch_article(url_match.group(0))
if article:
title = article.get("title", "")
article_text = article.get("content", "")[:2000]
images = article.get("images", 0)
handler_input = (
f"[老莫转发了一篇文章]\n标题: {title}\n"
+ (f"{images}张图片已OCR\n" if images else "")
+ f"\n{article_text}"
)
reply = call_hermes(fu, handler_input)
if reply and reply.strip():
await process_reply(reply, fu, bot)
# ── image message ──
elif msg_type == "image":
log.info(f"Image from {fu}, attempting OCR...")
# Get image URL from the raw message
img_url = None
for item in msg.raw.get("item_list", []):
if item.get("type") in (2,): # IMAGE
img_item = item.get("image_item", {})
# Try direct URL first, then CDN
img_url = img_item.get("url", "")
if not img_url:
# CDN media - need decryption
media = img_item.get("media", {})
aes_key = media.get("aes_key", "")
encrypt_query = media.get("encrypt_query_param", "")
# For now, log the CDN info and continue
log.info(f"Image has CDN media: aes_key={aes_key[:15]}...")
img_url = None
break
ocr_text = None
if img_url:
ocr_text = ocr_image_from_url(img_url)
else:
log.info("No direct image URL, sending raw iLink image to Hermes for description")
handler_input = "[老莫发送了一张图片]"
reply = call_hermes(fu, handler_input)
if reply and reply.strip():
await bot.reply(msg, reply.strip())
return
if ocr_text:
handler_input = f"[老莫发送了一张图片,OCR识别结果如下]\n{ocr_text}"
else:
handler_input = "[老莫发送了一张图片,但OCR识别失败,无法读取内容]"
reply = call_hermes(fu, handler_input)
if reply and reply.strip():
await bot.reply(msg, reply.strip())
# ── voice message ──
elif msg_type == "voice":
reply = call_hermes(fu, "[voice message]")
if reply and reply.strip():
await bot.reply(msg, reply.strip())
# ── unknown type ──
else:
log.info(f"Unhandled message type: {msg_type}")
async def process_reply(reply: str, fu: str, bot: WeixinBot):
"""
Process Hermes reply text, handling tags like [FILE], [IMG], [EMOJI].
Uses bot.reply(msg) or bot.send(user_id, text) based on context.
"""
if not reply or not reply.strip():
return
clean = reply
# ── [FILE] tag ──
fm = re.search(r'\[FILE\](.*?)\[/FILE\]', clean)
if fm:
url = fm.group(1).strip()
log.info(f"[FILE] URL: {url}")
# iLink doesn't natively support file sending via simple URL.
# Send as text with download link for now.
clean = re.sub(r'\s*\[FILE\].*?\[/FILE\]\s*', '', clean).strip()
extra = f"\n文件链接: {url}"
if clean.strip():
clean += extra
else:
clean = f"文件: {url}"
# ── [IMG] tag ──
im = re.search(r'\[IMG\](.*?)\[/IMG\]', clean)
if im:
cmd = im.group(1).strip()
clean = re.sub(r'\s*\[IMG\].*?\[/IMG\]\s*', '', clean).strip()
if cmd.startswith("generate:") or cmd.startswith("draw:"):
parts = cmd.split(":", 1)[1].strip()
ratio = "1:1"
if "|" in parts:
ratio = parts.split("|")[1].strip()
prompt = parts.split("|")[0].strip()
else:
prompt = parts
log.info(f"[IMG] generate: {prompt[:40]} [{ratio}]")
img_url = generate_image(prompt, ratio)
if img_url:
# For images, we can't send via iLink natively yet.
# Send the URL as a text message.
extra = f"\n图片已生成: {img_url}"
if clean.strip():
clean += extra
else:
clean = f"图片已生成: {img_url}"
else:
extra = "\n[图片生成失败]"
clean += extra if clean.strip() else extra.strip()
else:
# Regular image URL
extra = f"\n图片: {cmd}"
if clean.strip():
clean += extra
else:
clean = f"图片: {cmd}"
# ── [EMOJI] tag ──
em = re.search(r'\[EMOJI\](.*?)\[/EMOJI\]', clean)
if em:
url = em.group(1).strip()
log.info(f"[EMOJI] URL: {url}")
clean = re.sub(r'\s*\[EMOJI\].*?\[/EMOJI\]\s*', '', clean).strip()
# ── [CONTACT] tag — not supported via iLink ──
clean = re.sub(r'\s*\[CONTACT:\w+\]\s*', '', clean)
# ── [ROOM_MEMBERS] tag — not supported via iLink ──
clean = re.sub(r'\s*\[ROOM_MEMBERS:\S+\]\s*', '', clean)
# ── [HISTORY] tag — not supported via iLink ──
clean = re.sub(r'\s*\[HISTORY:\S+?:\d+\]\s*', '', clean)
# ── [PAT] tag — not supported via iLink ──
clean = re.sub(r'\s*\[PAT:\S+:\S+\]\s*', '', clean)
# ── Send remaining text ──
clean = clean.strip()
if clean:
await bot.send(fu, clean)
# ── 5801 HTTP Server ──────────────────────────────────────────
# Receives messages from Hermes/xxm to forward to WeChat.
# Uses asyncio queue to bridge sync HTTP → async iLink SDK.
_message_queue: asyncio.Queue[dict] = asyncio.Queue()
_http_server_ready = threading.Event()
class HermesMsgHandler(BaseHTTPRequestHandler):
"""HTTP server that receives Hermes messages and queues them for iLink sending."""
def do_POST(self):
body = self.rfile.read(int(self.headers.get("Content-Length", 0)))
try:
d = json.loads(body)
log.info(f"5801 POST: {json.dumps(d, ensure_ascii=False)[:200]}")
to = d.get("to", "") or d.get("wxid", "")
msg = d.get("message", "") or d.get("content", "")
path = self.path
# History query endpoint
if path in ("/history", "/api/chatHistory"):
wxid = d.get("wxid", "") or d.get("to", "")
count = d.get("count", 10) or d.get("limit", 10)
self._send_json({
"ok": True,
"note": "iLink SDK does not support history query. Use Hermes session_search instead.",
"messages": [],
})
return
# Stop endpoint
if path == "/stop":
_message_queue.put_nowait({"type": "stop"})
self._send_json({"ok": True, "status": "stopped"})
return
# Queue the message for async processing (queue consumed by the iLink message loop)
_message_queue.put_nowait({
"type": "outgoing",
"to": to,
"message": msg,
})
except Exception as e:
log.error(f"5801 handler error: {e}")
self.send_response(200)
self.end_headers()
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path == "/health":
self._send_json({
"ok": True,
"platform": "linux-ilink",
"mohe_user_id": MOHE_USER_ID or "not_logged_in",
})
return
if parsed.path in ("/history", "/api/chatHistory"):
import urllib.parse as up
params = up.parse_qs(parsed.query)
wxid = params.get("wxid", [""])[0]
count = params.get("count", ["10"])[0]
self._send_json({
"ok": True,
"note": "iLink SDK does not support history query.",
"messages": [],
})
return
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(b'{"ok":true}')
def _send_json(self, data):
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, *a):
pass
def start_http_server():
"""Start the sync HTTP server in a daemon thread."""
server = HTTPServer(("0.0.0.0", 5801), HermesMsgHandler)
t = threading.Thread(target=server.serve_forever, daemon=True)
t.start()
log.info("HTTP :5801 ready (Hermes message receiver)")
_http_server_ready.set()
return server
# ── Queue Consumer ────────────────────────────────────────────
# Consumes the 5801 message queue inside the iLink event loop.
async def consume_outgoing_queue(bot: WeixinBot):
"""
Async task: consumes messages from 5801 queue and sends via iLink.
Runs inside the same event loop as the iLink bot.
"""
while True:
try:
item = await _message_queue.get()
msg_type = item.get("type")
if msg_type == "stop":
log.info("Queue consumer: stop signal received")
bot.stop()
break
if msg_type == "outgoing":
to = item.get("to", "")
msg = item.get("message", "")
if to and msg:
try:
await bot.send(to, msg)
log.info(f"5801 -> {to}: {msg[:80]}")
except RuntimeError as e:
log.warning(f"5801 send failed (no context token for {to}): {e}")
except Exception as e:
log.error(f"5801 send error: {e}")
except asyncio.CancelledError:
break
except Exception as e:
log.error(f"Queue consumer error: {e}")
# ── Outbound Message Router (replacement for session_router) ──
# Handles messages from xxm/Hermes that need to be sent to WeChat.
# In the iLink world, all outgoing messages go through bot.send().
async def route_hermes_message(msg_text: str, bot: WeixinBot):
"""Route a message from Hermes to WeChat via iLink."""
# For now, route to Dad's user_id if known.
# In practice, the 5801 server with 'to' field is the primary path.
log.info(f"Hermes message: {msg_text[:100]}")
# The 5801 server handles the actual sending with explicit 'to' field.
# This function is for messages without an explicit target.
# ── Main ──────────────────────────────────────────────────────
def main():
acquire_pid_lock()
log.info("=== MoWeChat Agent (Linux/iLink v1.0) ===")
# Start 5801 HTTP server (in a thread)
http_server = start_http_server()
_http_server_ready.wait(timeout=5)
# Create and run the iLink bot
bot = WeixinBot()
try:
# Login (QR scan first time, then auto)
log.info("Logging in to iLink Bot API...")
creds = bot.login()
global MOHE_USER_ID
MOHE_USER_ID = creds.user_id
log.info(f"Logged in as {MOHE_USER_ID}")
except Exception as e:
log.error(f"Login failed: {e}")
release_pid_lock()
sys.exit(1)
# Register message handler
@bot.on_message
async def handle(msg: IncomingMessage):
await handle_incoming(msg, bot)
# Register queue consumer (runs inside bot's event loop)
# We schedule this before bot.run() by patching into the _run_loop flow.
# Since _run_loop is private, we use a different approach:
# Override the start of _run_loop by wrapping it.
# Actually, bot.run() creates its own event loop with asyncio.run().
# We need to hook into that loop. Let's save the original _run_loop
# and wrap it to also start the queue consumer.
original_run_loop = bot._run_loop
async def wrapped_run_loop():
# Start queue consumer as a background task
asyncio.create_task(consume_outgoing_queue(bot))
# Run the original loop
await original_run_loop()
bot._run_loop = wrapped_run_loop
try:
log.info("Bot starting. Press Ctrl+C to stop.")
bot.run()
except KeyboardInterrupt:
log.info("Shutting down...")
except Exception as e:
log.error(f"Bot error: {e}")
finally:
bot.stop()
http_server.shutdown()
release_pid_lock()
log.info("Stopped.")
if __name__ == "__main__":
main()