""" QQ Bot - 莫笑笑 (3247454048) OneBot adapter 收:WebSocket 连 NapCat 收事件;发:OneBot API """ import os, json, time, threading, subprocess, queue as qmod, re, sys import urllib.request, urllib.error sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from proc_guard import guard as _proc_guard # ── PID lock — prevent duplicate instances ── _lock = _proc_guard("qq_bot") if not _lock.ok: print(_lock.message, flush=True) sys.exit(1) ONEBOT_API = "http://127.0.0.1:5700" ONEBOT_TOKEN = "hermes123" ATTACH_SESSION = "ses_1d95d15c4ffehQaZ6hrbIbak5k" LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "logs") LOG_FILE = os.path.join(LOG_DIR, "qq_bot.log") TEMP_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "temp") os.makedirs(LOG_DIR, exist_ok=True) os.makedirs(TEMP_DIR, exist_ok=True) os.environ["no_proxy"] = "*"; os.environ["NO_PROXY"] = "*" msg_queue = qmod.Queue() def log(m): with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(f"{time.strftime('%H:%M:%S')} {m}\n") def onebot_json(path, data=None): url = f"{ONEBOT_API}{path}?access_token={ONEBOT_TOKEN}" body = json.dumps(data).encode() if data else None try: r = urllib.request.urlopen(urllib.request.Request(url, data=body, headers={"Content-Type":"application/json"}), timeout=10) return json.loads(r.read()) except Exception as e: log(f"API ERR {path}: {e}") return None def get_recent_msgs(group_id, count=3): """Get recent group messages for polling.""" r = onebot_json("/get_group_msg_history", {"group_id": group_id, "count": count}) if r and r.get("retcode") == 0: return r.get("data", {}).get("messages", []) return [] def process_and_reply(msg_text, user_id, group_id, nickname): out_file = os.path.join(TEMP_DIR, f"qq_{int(time.time())}.txt") prefix = f"[QQ群:{group_id}]" if group_id else f"[QQ:{user_id}]" full = f"{prefix} {nickname}: {msg_text[:500]}" cmd = f'opencode run --attach http://127.0.0.1:4096 --password hermes123 --session {ATTACH_SESSION} --pure --format json "[{full[:400]}]"' reply = "" try: proc = subprocess.Popen(cmd, shell=True, stdout=open(out_file, "wb"), stderr=subprocess.STDOUT) for _ in range(60): time.sleep(3) if os.path.getsize(out_file) > 100: try: with open(out_file, "rb") as f2: for line in f2: try: evt = json.loads(line.decode("utf-8", errors="replace")) if evt.get("type") == "text": reply = evt.get("part", {}).get("text", "").strip() if reply: proc.kill(); break except: continue if reply: break except: continue else: proc.kill() if reply: text = re.sub(r'^\[xxm\]\s*', '', reply).strip()[:500] target = {"group_id": group_id} if group_id else {"user_id": user_id} onebot_json("/send_msg", {**target, "message": text}) log(f"REPLY {prefix}: {text[:60]}") except Exception as e: log(f"PROC ERR: {e}") finally: try: os.remove(out_file) except: pass def ws_listener(): """WebSocket client - connect to NapCat's WS server for events.""" import asyncio, websockets async def listen(): uri = "ws://127.0.0.1:5701" while True: try: async with websockets.connect(uri) as ws: log("WS connected") async for raw in ws: try: evt = json.loads(raw) if evt.get("post_type") == "message": msg = evt.get("message", "") uid = evt.get("user_id", 0) gid = evt.get("group_id", 0) sender = evt.get("sender", {}) nick = sender.get("nickname","") or sender.get("card","") or str(uid) msg_queue.put((msg, uid, gid, nick)) except: continue except Exception as e: log(f"WS err: {e}") await asyncio.sleep(5) asyncio.run(listen()) def poll_worker(): """Fallback: poll latest msgs in target group every 15s.""" seen = set() while True: time.sleep(15) msgs = get_recent_msgs(878426010, 3) for m in msgs: mid = m.get("message_id", 0) if mid in seen: continue seen.add(mid) uid = m.get("user_id", 0) if uid == 3247454048: continue # skip self text = "" for seg in (m.get("message") or []): if isinstance(seg, dict) and seg.get("type") == "text": text += seg.get("data", {}).get("text", "") if text.strip(): sender = m.get("sender", {}) nick = sender.get("nickname","") or sender.get("card","") or str(uid) mentions_me = False for seg in (m.get("message") or []): if isinstance(seg, dict): if seg.get("type") == "at" and str(seg.get("data",{}).get("qq","")) == "3247454048": mentions_me = True if not mentions_me and "莫笑笑" not in text: log(f"SKIP {nick}: not for me") continue log(f"POLL {nick}: {text[:60]}") process_and_reply(text, uid, 878426010, nick) if __name__ == "__main__": threading.Thread(target=poll_worker, daemon=True).start() log(f"QQ Bot started, group=878426010") threading.Thread(target=process_and_reply, args=("莫笑笑上线了", 0, 878426010, "莫笑笑"), daemon=True).start() # Keep main thread alive while True: time.sleep(60)