Files
wechat-hermes-gateway/scripts/wechat_agent.py
T

364 lines
16 KiB
Python

"""
WeChat Agent - wxhook + Hermes API (:8642)
"""
import sys, os, json, time, threading, requests, re
from http.server import HTTPServer, BaseHTTPRequestHandler
sys.path.insert(0, r"C:\Users\hmo\AppData\Local\Programs\Python\Python310\Lib\site-packages")
os.environ["WXHOOK_LOG_LEVEL"] = "ERROR"
from wxhook import Bot
from wxhook.events import TEXT_MESSAGE, IMAGE_MESSAGE, VOICE_MESSAGE, XML_MESSAGE
import pymem, pymem.process
BOT_WXID = "wxid_7onnerpx2s2l22"
WX_API = ""
LOG_FILE = r"C:\Users\hmo\Desktop\wechat_agent.log"
DLL = r"C:\Users\hmo\AppData\Local\Programs\Python\Python310\Lib\site-packages\wxhook\tools\wxhook.dll"
last_msg_time = time.time()
nickname_cache = {}
HERMES_API = "http://192.168.0.103:8642/v1/chat/completions"
HERMES_KEY = "hermes123"
SENSENOVA_KEY = "sk-aRNj3UwKSLPsDfh15QNTPwbHxahblfaO"
SENSENOVA_URL = "https://token.sensenova.cn/v1"
# SenseNova (商汤) for image gen + vision
SENSENOVA_KEY = "sk-aRNj3UwKSLPsDfh15QNTPwbHxahblfaO"
SENSENOVA_URL = "https://token.sensenova.cn/v1"
def log(m):
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"{time.strftime('%H:%M:%S')} {m}\n")
def send_wx(wxid, msg):
try:
requests.post(WX_API + "/api/sendTextMsg", json={"wxid": wxid, "msg": msg}, timeout=5)
log(f"SEND {wxid}: ok")
except Exception as e:
log(f"SEND ERR: {e}")
def get_nickname(wxid):
if wxid in nickname_cache:
return nickname_cache[wxid]
try:
r = requests.post(WX_API + "/api/getContactList", json={}, timeout=5)
for c in r.json().get("data", []):
if c.get("wxid") == wxid:
nick = c.get("nickname") or c.get("customAccount") or wxid
nickname_cache[wxid] = nick
return nick
except:
pass
nickname_cache[wxid] = wxid
return wxid
def call_hermes(wxid, content):
nickname = get_nickname(wxid)
headers = {"Authorization": f"Bearer {HERMES_KEY}", "X-Hermes-Session-Id": "sisyphus", "Content-Type": "application/json"}
sys_prompt = "回复简短。"
body = {"model": "hermes-agent", "messages": [{"role": "system", "content": sys_prompt}, {"role": "user", "content": content}]}
try:
r = requests.post(HERMES_API, json=body, headers=headers, timeout=180, proxies={"http": None, "https": None})
if r.status_code == 200:
return r.json()["choices"][0]["message"]["content"]
except Exception as e:
err_msg = str(e)
log(f"API ERR: {err_msg[:60]}")
# Notify user on errors
try:
if "timeout" in err_msg.lower() or "timed out" in err_msg.lower():
send_wx(fu, "[莫荷处理超时,你再发一遍试试?]")
elif "connection" in err_msg.lower() or "refused" in err_msg.lower():
send_wx(fu, "[跟莫荷的连接断了,正在自动重连...]")
elif "500" in err_msg or "50x" in err_msg:
send_wx(fu, "[莫荷那边出错了,等一会儿再试]")
except:
pass
return None
def watchdog():
global last_msg_time
while True:
idle = time.time() - last_msg_time
if idle > 120 and WX_API:
try:
r = requests.post(WX_API + "/api/checkLogin", json={}, timeout=5)
if r.json().get("code") == 1:
# API alive, just refresh webhook
port = WX_API.split(":")[-1]
requests.post(WX_API + "/api/hookSyncMsg", json={"ip": "127.0.0.1", "port": int(port), "enableHttp": 1, "url": "", "timeout": 300}, timeout=5)
log(f"WATCHDOG: refreshed ({int(idle)}s)")
else:
# API dead, find WeChat and inject DLL
log("WATCHDOG: re-injecting into running WeChat...")
try:
for proc in psutil.process_iter(["pid", "name"]):
if proc.info["name"] == "WeChat.exe":
pm = pymem.Pymem()
pm.open_process_from_id(proc.info["pid"])
pymem.process.inject_dll(pm.process_handle, DLL.encode())
pm.close()
log(f"WATCHDOG: injected into PID {proc.info['pid']}")
break
except Exception as ej:
log(f"WATCHDOG: inject failed: {ej}")
except:
pass
last_msg_time = time.time()
time.sleep(30)
threading.Thread(target=watchdog, daemon=True).start()
class RH(BaseHTTPRequestHandler):
def do_POST(self):
global last_msg_time
last_msg_time = time.time()
body = self.rfile.read(int(self.headers.get("Content-Length", 0)))
try:
d = json.loads(body)
if self.path == "/hermes-msg":
msg = d.get("message", "") or d.get("content", "") or str(d)[:200]
log("<<< HERMES: " + msg[:100])
with open(r"C:\Users\hmo\Desktop\hermes_inbox.txt", "a", encoding="utf-8") as f:
f.write(f"{time.strftime('%H:%M:%S')} {msg}\n")
self.send_response(200); self.end_headers(); return
to = d.get("to", "") or d.get("wxid", "")
msg = d.get("message", "") or d.get("content", "")
if to and msg:
log(f"REPLY {to}: {msg[:50]}")
send_wx(to, msg)
except Exception as e:
log(f"RH ERR: {e}")
self.send_response(200); self.end_headers()
def do_GET(self):
self.send_response(200); self.end_headers(); self.wfile.write(b'{"ok":true}')
def log_message(self, *a): pass
threading.Thread(target=lambda: HTTPServer(("0.0.0.0", 5801), RH).serve_forever(), daemon=True).start()
log("HTTP :5801")
log("Creating Bot...")
b = Bot()
WX_API = b.BASE_URL
log("Bot ready, API=" + WX_API)
@b.handle([TEXT_MESSAGE, IMAGE_MESSAGE, VOICE_MESSAGE, XML_MESSAGE])
def on_msg(_bot, event):
global last_msg_time
last_msg_time = time.time()
fu = event.fromUser or ""
if not fu or fu == BOT_WXID:
return
if event.type == VOICE_MESSAGE:
mid = event.msgId or 0
log(f"<- {fu}: [voice] msgId={mid}")
# Try various voice download methods with real msgId
try:
r1 = requests.post(WX_API + "/api/getVoiceByMsgId", json={"msgId": mid, "storeDir": r"C:\Users\hmo\Desktop\wechat_voice"}, timeout=10)
log(f"getVoice: {r1.json()}")
except Exception as e:
log(f"getVoice err: {e}")
try:
r2 = requests.post(WX_API + "/api/downloadAttach", json={"msgId": mid}, timeout=10)
log(f"downloadAttach: {r2.json()}")
except Exception as e:
log(f"downloadAttach err: {e}")
try:
r3 = requests.post(WX_API + "/api/forwardMsg", json={"msgId": mid, "wxid": "filehelper"}, timeout=10)
log(f"forwardMsg: {r3.json()}")
except Exception as e:
log(f"forwardMsg err: {e}")
reply = call_hermes(fu, "[voice message]")
if reply: send_wx(fu, reply)
return
if event.type == IMAGE_MESSAGE:
log(f"<- {fu}: [image]")
b64 = event.base64Img or ""
ocr_text = ""
if b64:
try:
import base64
os.makedirs(r"C:\Users\hmo\Desktop\wechat_images", exist_ok=True)
fname = os.path.join(r"C:\Users\hmo\Desktop\wechat_images", str(int(time.time())) + ".jpg")
with open(fname, "wb") as f:
f.write(base64.b64decode(b64))
api_key = os.environ.get("VOLCENGINE_API_KEY", "b0359bed-09f2-49e2-a53c-32ba057412e3")
with open(fname, "rb") as f:
img_b64 = base64.b64encode(f.read()).decode()
r = requests.post("https://ark.cn-beijing.volces.com/api/coding/v3/chat/completions",
json={"model": "doubao-seed-code", "messages": [{"role": "user", "content": [{"type": "text", "text": "描述图片"}, {"type": "image_url", "image_url": {"url": "data:image/jpeg;base64," + img_b64}}]}], "max_tokens": 500},
headers={"Authorization": "Bearer " + api_key, "Content-Type": "application/json"}, timeout=60)
ocr_text = r.json()["choices"][0]["message"]["content"]
except Exception as e:
log(f"OCR err: {e}")
msg = "[image]" + ("\n" + ocr_text if ocr_text else "")
reply = call_hermes(fu, msg)
if reply: send_wx(fu, reply)
return
# Handle XML messages (files, cards, etc.)
if event.type == XML_MESSAGE:
content = str(event.content or "")
log(f"<- {fu}: [xml] {content[:80]}")
# Try to extract file info from XML
import re as _re
fname_match = _re.search(r'<filename>(.*?)</filename>', content)
if fname_match:
reply = call_hermes(fu, f"[sent a file: {fname_match.group(1)}]")
else:
reply = call_hermes(fu, "[sent a file or card]")
if reply: send_wx(fu, reply)
return
content = event.content or ""
if not content:
return
log(f"<- {fu}: {content[:50]}")
reply = call_hermes(fu, content)
if reply:
log(f"-> {fu}: {reply[:50]}")
clean = reply
# Handle [FILE] tag
file_match = re.search(r'\[FILE\](.*?)\[/FILE\]', reply)
if file_match:
file_url = file_match.group(1).strip()
clean = re.sub(r'\s*\[FILE\].*?\[/FILE\]\s*', '', clean).strip()
try:
fr = requests.get(file_url, timeout=60, proxies={"http": None, "https": None})
if fr.status_code == 200:
fname = os.path.join(r"C:\Users\hmo\Desktop", f"send_file_{int(time.time())}.dat")
with open(fname, "wb") as f:
f.write(fr.content)
try:
_bot.send_file(fu, fname)
log(f"FILE sent")
except:
requests.post(WX_API + "/api/sendFileMsg", json={"wxid": fu, "filePath": fname}, timeout=10)
os.remove(fname)
except Exception as e:
log(f"FILE err: {e}")
# Handle [IMG] tag
img_match = re.search(r'\[IMG\](.*?)\[/IMG\]', clean)
if img_match:
img_cmd = img_match.group(1).strip()
clean = re.sub(r'\s*\[IMG\].*?\[/IMG\]\s*', '', clean).strip()
try:
if img_cmd.startswith("generate:") or img_cmd.startswith("draw:"):
# Generate image via SenseNova
parts = img_cmd.split(":", 1)[1].strip()
ratio = "1:1"
if "|" in parts:
ratio = parts.split("|")[1].strip()
prompt = parts.split("|")[0].strip()
else:
prompt = parts
# Map aspect ratio to SenseNova size
size_map = {"1:1":"2048x2048", "16:9":"2752x1536", "9:16":"1536x2752",
"3:2":"2496x1664", "2:3":"1664x2496", "3:4":"1760x2368", "4:3":"2368x1760"}
size = size_map.get(ratio, "2048x2048")
log(f"GEN SenseNova: {prompt[:30]} [{ratio}]")
gen_r = requests.post(SENSENOVA_URL + "/images/generations",
json={"model": "sensenova-u1-fast", "prompt": prompt, "size": size, "response_format": "url"},
headers={"Authorization": f"Bearer {SENSENOVA_KEY}", "Content-Type": "application/json"},
timeout=180)
if gen_r.status_code == 200:
img_url = gen_r.json()["data"][0]["url"]
ir = requests.get(img_url, timeout=60)
if ir.status_code == 200:
tmp = os.path.join(r"C:\Users\hmo\Desktop", f"gen_img_{int(time.time())}.png")
with open(tmp, "wb") as f:
f.write(ir.content)
_bot.send_image(fu, tmp)
os.remove(tmp)
log("GEN sent")
else:
log(f"GEN err: {gen_r.status_code} {gen_r.text[:100]}")
else:
ir = requests.get(img_cmd, timeout=30, proxies={"http": None, "https": None})
if ir.status_code == 200:
ext = ".jpg"
if "png" in ir.headers.get("content-type", ""): ext = ".png"
tmp = os.path.join(r"C:\Users\hmo\Desktop", f"send_img_{int(time.time())}{ext}")
with open(tmp, "wb") as f:
f.write(ir.content)
try:
_bot.send_image(fu, tmp)
except:
requests.post(WX_API + "/api/sendImagesMsg", json={"wxid": fu, "imagePath": tmp}, timeout=10)
os.remove(tmp)
except Exception as e:
log(f"IMG err: {e}")
# Handle [CONTACT:wxid]
contact_match = re.search(r'\[CONTACT:(\w+)\]', clean)
if contact_match:
cwxid = contact_match.group(1)
clean = re.sub(r'\s*\[CONTACT:\w+\]\s*', '', clean).strip()
try:
cr = requests.post(WX_API + "/api/getContactProfile", json={"wxid": cwxid}, timeout=10)
cd = cr.json().get("data", {})
info = f"昵称: {cd.get('nickname','?')} 备注: {cd.get('remark','')} 账号: {cd.get('account','')}"
send_wx(fu, info)
log(f"CONTACT info sent")
except Exception as e:
log(f"CONTACT err: {e}")
# Handle [ROOM_MEMBERS:roomid]
room_match = re.search(r'\[ROOM_MEMBERS:(\S+)\]', clean)
if room_match:
rid = room_match.group(1)
clean = re.sub(r'\s*\[ROOM_MEMBERS:\S+\]\s*', '', clean).strip()
try:
rr = requests.post(WX_API + "/api/getMemberFromChatRoom", json={"chatRoomId": rid}, timeout=10)
members = rr.json().get("data", {}).get("members", "")
send_wx(fu, f"群成员: {members[:100]}")
log(f"ROOM members sent")
except Exception as e:
log(f"ROOM err: {e}")
# Handle [EMOJI] tag
emoji_match = re.search(r'\[EMOJI\](.*?)\[/EMOJI\]', clean)
if emoji_match:
eurl = emoji_match.group(1).strip()
clean = re.sub(r'\s*\[EMOJI\].*?\[/EMOJI\]\s*', '', clean).strip()
try:
er = requests.get(eurl, timeout=30, proxies={"http": None, "https": None})
if er.status_code == 200:
epath = os.path.join(r"C:\Users\hmo\Desktop", f"emoji_{int(time.time())}.png")
with open(epath, "wb") as f:
f.write(er.content)
_bot.send_emotion(fu, epath)
os.remove(epath)
log(f"EMOJI sent")
except Exception as e:
log(f"EMOJI err: {e}")
# Handle [PAT:roomid:wxid]
pat_match = re.search(r'\[PAT:(\S+):(\S+)\]', clean)
if pat_match:
prid = pat_match.group(1)
pwxid = pat_match.group(2)
clean = re.sub(r'\s*\[PAT:\S+:\S+\]\s*', '', clean).strip()
try:
requests.post(WX_API + "/api/sendPatMsg", json={"receiver": prid, "wxid": pwxid}, timeout=10)
log(f"PAT sent")
except Exception as e:
log(f"PAT err: {e}")
# Handle [OCR:image_path]
ocr_match = re.search(r'\[OCR:(.+?)\]', clean)
if ocr_match:
opath = ocr_match.group(1).strip()
clean = re.sub(r'\s*\[OCR:.+?\]\s*', '', clean).strip()
try:
or_ = requests.post(WX_API + "/api/ocr", json={"imagePath": opath}, timeout=30)
otext = or_.json().get("data", "")
send_wx(fu, f"OCR: {otext[:200]}")
log(f"OCR sent")
except Exception as e:
log(f"OCR err: {e}")
# Send remaining text
if clean.strip():
send_wx(fu, clean.strip())
else:
log(f"-> {fu}: no reply")
print("[Agent] Hermes API: " + HERMES_API)
log("Ready")
b.run()