""" QQ 通道桥接 — NapCat <> AgentsMeeting 集成 状态: 骨架实现(通道就绪,需真实 QQ 账号对接) """ import json, logging, os from typing import Optional # NapCat HTTP API 地址 QQ_API_BASE = os.environ.get("QQ_API_BASE", "http://localhost:3000") QQ_BOT_ID = os.environ.get("QQ_BOT_ID", "") _log = logging.getLogger("qq_channel") class QQChannel: """QQ 消息通道桥接。通过 NapCat HTTP API 实现消息收发。""" def __init__(self): self.enabled = bool(QQ_BOT_ID) self.bot_id = QQ_BOT_ID def send_message(self, group_id: str, text: str) -> bool: """发送消息到 QQ 群""" if not self.enabled: return False import requests try: resp = requests.post( f"{QQ_API_BASE}/send_group_msg", json={"group_id": group_id, "message": text}, timeout=10 ) return resp.status_code == 200 except Exception as e: _log.error(f"QQ send failed: {e}") return False def send_private(self, user_id: str, text: str) -> bool: """发送私聊消息""" if not self.enabled: return False import requests try: resp = requests.post( f"{QQ_API_BASE}/send_private_msg", json={"user_id": user_id, "message": text}, timeout=10 ) return resp.status_code == 200 except Exception as e: _log.error(f"QQ private failed: {e}") return False