188 lines
7.6 KiB
Python
188 lines
7.6 KiB
Python
"""
|
|
WeChat Agent - wxhook + Hermes API (:8642)
|
|
"""
|
|
import sys, os, json, time, threading, requests, re
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
|
|
sys.path.insert(0, r"C:\Users\hmo\AppData\Local\Programs\Python\Python310\Lib\site-packages")
|
|
os.environ["WXHOOK_LOG_LEVEL"] = "ERROR"
|
|
|
|
from wxhook import Bot
|
|
from wxhook.events import TEXT_MESSAGE, IMAGE_MESSAGE, VOICE_MESSAGE
|
|
import pymem, pymem.process
|
|
|
|
BOT_WXID = "wxid_7onnerpx2s2l22"
|
|
WX_API = ""
|
|
LOG_FILE = r"C:\Users\hmo\Desktop\wechat_agent.log"
|
|
DLL = r"C:\Users\hmo\AppData\Local\Programs\Python\Python310\Lib\site-packages\wxhook\tools\wxhook.dll"
|
|
last_msg_time = time.time()
|
|
nickname_cache = {}
|
|
|
|
HERMES_API = "http://192.168.0.103:8642/v1/chat/completions"
|
|
HERMES_KEY = "hermes123"
|
|
|
|
def log(m):
|
|
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
|
f.write(f"{time.strftime('%H:%M:%S')} {m}\n")
|
|
|
|
def send_wx(wxid, msg):
|
|
try:
|
|
requests.post(WX_API + "/api/sendTextMsg", json={"wxid": wxid, "msg": msg}, timeout=5)
|
|
log(f"SEND {wxid}: ok")
|
|
except Exception as e:
|
|
log(f"SEND ERR: {e}")
|
|
|
|
def get_nickname(wxid):
|
|
if wxid in nickname_cache:
|
|
return nickname_cache[wxid]
|
|
try:
|
|
r = requests.post(WX_API + "/api/getContactList", json={}, timeout=5)
|
|
for c in r.json().get("data", []):
|
|
if c.get("wxid") == wxid:
|
|
nick = c.get("nickname") or c.get("customAccount") or wxid
|
|
nickname_cache[wxid] = nick
|
|
return nick
|
|
except:
|
|
pass
|
|
nickname_cache[wxid] = wxid
|
|
return wxid
|
|
|
|
def call_hermes(wxid, content):
|
|
nickname = get_nickname(wxid)
|
|
headers = {"Authorization": f"Bearer {HERMES_KEY}", "X-Hermes-Session-Id": "sisyphus", "Content-Type": "application/json"}
|
|
sys_prompt = f"你是莫荷,女生。你主人是老爸({nickname})。回复简短像聊天。发图用 [IMG]URL[/IMG]。"
|
|
body = {"model": "hermes-agent", "messages": [{"role": "system", "content": sys_prompt}, {"role": "user", "content": content}]}
|
|
try:
|
|
r = requests.post(HERMES_API, json=body, headers=headers, timeout=120, proxies={"http": None, "https": None})
|
|
if r.status_code == 200:
|
|
return r.json()["choices"][0]["message"]["content"]
|
|
except Exception as e:
|
|
log(f"API ERR: {e}")
|
|
return None
|
|
|
|
def watchdog():
|
|
global last_msg_time
|
|
while True:
|
|
idle = time.time() - last_msg_time
|
|
if idle > 120 and WX_API:
|
|
try:
|
|
r = requests.post(WX_API + "/api/checkLogin", json={}, timeout=5)
|
|
if r.json().get("code") == 1:
|
|
requests.post(WX_API + "/api/hookSyncMsg", json={"ip": "127.0.0.1", "port": 19001, "enableHttp": 1, "url": "", "timeout": 300}, timeout=5)
|
|
log(f"WATCHDOG: refreshed ({int(idle)}s)")
|
|
else:
|
|
log("WATCHDOG: re-injecting...")
|
|
pymem.process.inject_dll(pymem.Pymem("WeChat.exe").process_handle, DLL.encode())
|
|
except:
|
|
pass
|
|
last_msg_time = time.time()
|
|
time.sleep(30)
|
|
|
|
threading.Thread(target=watchdog, daemon=True).start()
|
|
|
|
class RH(BaseHTTPRequestHandler):
|
|
def do_POST(self):
|
|
global last_msg_time
|
|
last_msg_time = time.time()
|
|
body = self.rfile.read(int(self.headers.get("Content-Length", 0)))
|
|
try:
|
|
d = json.loads(body)
|
|
if self.path == "/hermes-msg":
|
|
msg = d.get("message", "") or d.get("content", "") or str(d)[:200]
|
|
log("<<< HERMES: " + msg[:100])
|
|
with open(r"C:\Users\hmo\Desktop\hermes_inbox.txt", "a", encoding="utf-8") as f:
|
|
f.write(f"{time.strftime('%H:%M:%S')} {msg}\n")
|
|
self.send_response(200); self.end_headers(); return
|
|
to = d.get("to", "") or d.get("wxid", "")
|
|
msg = d.get("message", "") or d.get("content", "")
|
|
if to and msg:
|
|
log(f"REPLY {to}: {msg[:50]}")
|
|
send_wx(to, msg)
|
|
except Exception as e:
|
|
log(f"RH ERR: {e}")
|
|
self.send_response(200); self.end_headers()
|
|
def do_GET(self):
|
|
self.send_response(200); self.end_headers(); self.wfile.write(b'{"ok":true}')
|
|
def log_message(self, *a): pass
|
|
|
|
threading.Thread(target=lambda: HTTPServer(("0.0.0.0", 5801), RH).serve_forever(), daemon=True).start()
|
|
log("HTTP :5801")
|
|
|
|
log("Creating Bot...")
|
|
b = Bot()
|
|
WX_API = b.BASE_URL
|
|
log("Bot ready, API=" + WX_API)
|
|
|
|
@b.handle([TEXT_MESSAGE, IMAGE_MESSAGE, VOICE_MESSAGE])
|
|
def on_msg(_bot, event):
|
|
global last_msg_time
|
|
last_msg_time = time.time()
|
|
fu = event.fromUser or ""
|
|
if not fu or fu == BOT_WXID:
|
|
return
|
|
if event.type == VOICE_MESSAGE:
|
|
log(f"<- {fu}: [voice]")
|
|
reply = call_hermes(fu, "[voice message]")
|
|
if reply: send_wx(fu, reply)
|
|
return
|
|
if event.type == IMAGE_MESSAGE:
|
|
log(f"<- {fu}: [image]")
|
|
b64 = event.base64Img or ""
|
|
ocr_text = ""
|
|
if b64:
|
|
try:
|
|
import base64
|
|
os.makedirs(r"C:\Users\hmo\Desktop\wechat_images", exist_ok=True)
|
|
fname = os.path.join(r"C:\Users\hmo\Desktop\wechat_images", str(int(time.time())) + ".jpg")
|
|
with open(fname, "wb") as f:
|
|
f.write(base64.b64decode(b64))
|
|
api_key = os.environ.get("VOLCENGINE_API_KEY", "b0359bed-09f2-49e2-a53c-32ba057412e3")
|
|
with open(fname, "rb") as f:
|
|
img_b64 = base64.b64encode(f.read()).decode()
|
|
r = requests.post("https://ark.cn-beijing.volces.com/api/coding/v3/chat/completions",
|
|
json={"model": "doubao-seed-code", "messages": [{"role": "user", "content": [{"type": "text", "text": "描述图片"}, {"type": "image_url", "image_url": {"url": "data:image/jpeg;base64," + img_b64}}]}], "max_tokens": 500},
|
|
headers={"Authorization": "Bearer " + api_key, "Content-Type": "application/json"}, timeout=60)
|
|
ocr_text = r.json()["choices"][0]["message"]["content"]
|
|
except Exception as e:
|
|
log(f"OCR err: {e}")
|
|
msg = "[image]" + ("\n" + ocr_text if ocr_text else "")
|
|
reply = call_hermes(fu, msg)
|
|
if reply: send_wx(fu, reply)
|
|
return
|
|
content = event.content or ""
|
|
if not content:
|
|
return
|
|
log(f"<- {fu}: {content[:50]}")
|
|
reply = call_hermes(fu, content)
|
|
if reply:
|
|
log(f"-> {fu}: {reply[:50]}")
|
|
img_match = re.search(r'\[IMG\](.*?)\[/IMG\]', reply)
|
|
if img_match:
|
|
img_cmd = img_match.group(1).strip()
|
|
clean = re.sub(r'\s*\[IMG\].*?\[/IMG\]\s*', '', reply).strip()
|
|
if clean:
|
|
send_wx(fu, clean)
|
|
try:
|
|
ir = requests.get(img_cmd, timeout=30, proxies={"http": None, "https": None})
|
|
if ir.status_code == 200:
|
|
ext = ".jpg"
|
|
if "png" in ir.headers.get("content-type", ""): ext = ".png"
|
|
tmp = os.path.join(r"C:\Users\hmo\Desktop", f"send_img_{int(time.time())}{ext}")
|
|
with open(tmp, "wb") as f:
|
|
f.write(ir.content)
|
|
try:
|
|
_bot.send_image(fu, tmp)
|
|
except:
|
|
requests.post(WX_API + "/api/sendImagesMsg", json={"wxid": fu, "imagePath": tmp}, timeout=10)
|
|
os.remove(tmp)
|
|
except Exception as e:
|
|
log(f"IMG err: {e}")
|
|
else:
|
|
send_wx(fu, reply)
|
|
else:
|
|
log(f"-> {fu}: no reply")
|
|
|
|
print("[Agent] Hermes API: " + HERMES_API)
|
|
log("Ready")
|
|
b.run()
|