1b2b935832
- Platform-based architecture (Windows/Linux/Mac) - Agent instance registry (agents.yaml) - Management dashboard with cross-platform monitoring - xmpp_bot with HTTP bridge + health endpoints - wechat_agent with WeChat-Hermes bridging - Platform services: ProcessGuardian, HealthProbe, APIRouter, ChannelBridge - Deployment: systemd (Linux) + PowerShell (Windows) - Monitoring: SSH+ejabberdctl for cross-platform presence
50 lines
1.5 KiB
Python
50 lines
1.5 KiB
Python
"""
|
|
QQ 通道桥接 — NapCat <> AgentsMeeting 集成
|
|
状态: 骨架实现(通道就绪,需真实 QQ 账号对接)
|
|
"""
|
|
import json, logging, os
|
|
from typing import Optional
|
|
|
|
# NapCat HTTP API 地址
|
|
QQ_API_BASE = os.environ.get("QQ_API_BASE", "http://localhost:3000")
|
|
QQ_BOT_ID = os.environ.get("QQ_BOT_ID", "")
|
|
|
|
_log = logging.getLogger("qq_channel")
|
|
|
|
class QQChannel:
|
|
"""QQ 消息通道桥接。通过 NapCat HTTP API 实现消息收发。"""
|
|
|
|
def __init__(self):
|
|
self.enabled = bool(QQ_BOT_ID)
|
|
self.bot_id = QQ_BOT_ID
|
|
|
|
def send_message(self, group_id: str, text: str) -> bool:
|
|
"""发送消息到 QQ 群"""
|
|
if not self.enabled: return False
|
|
import requests
|
|
try:
|
|
resp = requests.post(
|
|
f"{QQ_API_BASE}/send_group_msg",
|
|
json={"group_id": group_id, "message": text},
|
|
timeout=10
|
|
)
|
|
return resp.status_code == 200
|
|
except Exception as e:
|
|
_log.error(f"QQ send failed: {e}")
|
|
return False
|
|
|
|
def send_private(self, user_id: str, text: str) -> bool:
|
|
"""发送私聊消息"""
|
|
if not self.enabled: return False
|
|
import requests
|
|
try:
|
|
resp = requests.post(
|
|
f"{QQ_API_BASE}/send_private_msg",
|
|
json={"user_id": user_id, "message": text},
|
|
timeout=10
|
|
)
|
|
return resp.status_code == 200
|
|
except Exception as e:
|
|
_log.error(f"QQ private failed: {e}")
|
|
return False
|