#!/usr/bin/env python3 """ MoWeChat Message Monitor — reads WeChat process memory to capture incoming messages. No GDB, no ptrace attach, no crash risk. Uses /proc/PID/mem to scan the heap for message patterns. """ import os import re import sys import json import time import hashlib import logging import argparse import urllib.request import urllib.error # ── Configuration ────────────────────────────────────────────── SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) LOG_DIR = os.path.join(SCRIPT_DIR, "..", "logs") os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE = os.path.join(LOG_DIR, "wechat_msg_monitor.log") SAWN_FILE = os.path.join(LOG_DIR, "wechat_seen_messages.json") HERMES_API = "http://192.168.1.246:8642/v1/chat/completions" HERMES_KEY = "hermes123" # Polling interval (seconds) POLL_INTERVAL = 2 # Minimum message content length to consider valid MIN_MSG_LEN = 2 MAX_MSG_LEN = 10000 # WeChat binary marker in /proc/PID/maps WECHAT_BINARY_MARKER = "/opt/wechat/wechat" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) log = logging.getLogger("mowechat") class WeChatMemoryMonitor: """Monitors WeChat process memory for new messages.""" def __init__(self): self.pid = None self.seen = self._load_seen() self.heap_regions = [] self.anon_regions = [] self.wxid_pattern = re.compile(rb'wxid_[a-zA-Z0-9]{10,28}\x00') # Chinese characters and ASCII printable self.msg_pattern = re.compile(rb'[\x20-\x7e\x80-\xff]{2,}') def _load_seen(self): """Load previously seen message hash set.""" try: with open(SAWN_FILE, 'r') as f: return set(json.load(f)) except: return set() def _save_seen(self): """Save seen message hash set (trim to last 1000).""" trimmed = set(list(self.seen)[-1000:]) try: with open(SAWN_FILE, 'w') as f: json.dump(list(trimmed), f) except: pass def find_wechat(self): """Find the main wechat process PID.""" for p in os.listdir('/proc'): if not p.isdigit(): continue try: with open(f'/proc/{p}/maps', 'r') as f: content = f.read(4096) if WECHAT_BINARY_MARKER in content: self.pid = int(p) return True except: continue return False def update_memory_regions(self): """Read /proc/PID/maps to find heap and anonymous regions.""" self.heap_regions = [] self.anon_regions = [] try: with open(f'/proc/{self.pid}/maps', 'r') as f: for line in f: parts = line.strip().split() if len(parts) < 5: continue addr_range = parts[0].split('-') start = int(addr_range[0], 16) end = int(addr_range[1], 16) perms = parts[1] name = parts[-1] if len(parts) > 4 else '' size_mb = (end - start) / (1024 * 1024) if size_mb > 50: # Skip huge regions continue if perms.startswith('rw'): if name == '[heap]': self.heap_regions.append((start, end)) elif not name: # Anonymous mapping self.anon_regions.append((start, end)) except Exception as e: log.error(f"Failed to read maps: {e}") def scan_region(self, start, end, region_name=""): """Scan a memory region for WeChat message patterns.""" messages = [] try: with open(f'/proc/{self.pid}/mem', 'rb') as mem: mem.seek(start) data = mem.read(end - start) # Find all wxid patterns (likely message senders) for match in self.wxid_pattern.finditer(data): wxid = match.group(0).decode('utf-8', errors='replace').strip('\x00') pos = match.start() # Look for message content after the wxid # Message content is typically within 256 bytes after the wxid content_start = pos + len(match.group()) search_end = min(content_start + 512, len(data)) # Find the next null-terminated string that's not the wxid itself for cmatch in self.msg_pattern.finditer(data, content_start, search_end): content = cmatch.group(0).decode('utf-8', errors='replace').strip('\x00') # Filter out known non-message patterns if len(content) < MIN_MSG_LEN or len(content) > MAX_MSG_LEN: continue if content.startswith('wxid_') or content.startswith('http'): # Skip if it's another wxid or begins with a URL # Actually, messages CAN start with URLs, so only skip wxid if content.startswith('wxid_'): continue # Create a unique hash for dedup msg_hash = hashlib.md5(f"{wxid}:{content}".encode()).hexdigest() if msg_hash not in self.seen: self.seen.add(msg_hash) messages.append({ 'wxid': wxid, 'content': content, 'pos': hex(pos + start), 'region': region_name, }) break # Only one message per wxid except (PermissionError, ProcessLookupError) as e: log.warning(f"Memory read failed: {e}") except Exception as e: log.error(f"Scan error at {region_name}: {e}") return messages def scan_all(self): """Scan all relevant memory regions for new messages.""" self.update_memory_regions() all_messages = [] for start, end in self.heap_regions: msgs = self.scan_region(start, end, "[heap]") all_messages.extend(msgs) for start, end in self.anon_regions[:20]: # Limit anonymous regions size_mb = (end - start) / (1024 * 1024) if size_mb > 10: # Skip large anonymous maps continue msgs = self.scan_region(start, end, "[anon]") all_messages.extend(msgs) if all_messages: self._save_seen() return all_messages def forward_to_hermes(self, msg): """Forward a captured message to Hermes Gateway.""" payload = json.dumps({ "model": "nova-4", "messages": [ { "role": "system", "content": ( "You receive WeChat messages. " "Process this message according to the standard pipeline." ) }, { "role": "user", "content": ( f"[WeChat Message]\n" f"From: {msg['wxid']}\n" f"Content: {msg['content']}" ) } ] }).encode('utf-8') try: req = urllib.request.Request( HERMES_API, data=payload, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {HERMES_KEY}" }, method="POST" ) urllib.request.urlopen(req, timeout=3) log.info(f"Forwarded: {msg['wxid']}: {msg['content'][:60]}") except Exception as e: log.warning(f"Forward failed: {e}") def run(self, once=False): """Main monitoring loop.""" if not self.find_wechat(): log.error("WeChat process not found!") return False log.info(f"Found WeChat PID: {self.pid}") if once: messages = self.scan_all() for msg in messages: log.info(f"Captured: [{msg['wxid']}] {msg['content'][:80]}") return messages log.info(f"Starting monitor (poll every {POLL_INTERVAL}s)...") while True: try: # Check process is alive if not os.path.exists(f'/proc/{self.pid}'): log.warning("WeChat process died, re-finding...") if not self.find_wechat(): log.error("Cannot find WeChat, sleeping 30s...") time.sleep(30) continue log.info(f"Re-attached to PID: {self.pid}") messages = self.scan_all() for msg in messages: log.info(f"NEW: [{msg['wxid']}] {msg['content'][:80]}") # self.forward_to_hermes(msg) time.sleep(POLL_INTERVAL) except KeyboardInterrupt: log.info("Monitor stopped.") break except Exception as e: log.error(f"Monitor error: {e}") time.sleep(10) return True def main(): parser = argparse.ArgumentParser(description="MoWeChat Message Monitor") parser.add_argument("--once", action="store_true", help="Scan once and exit") parser.add_argument("--foreground", action="store_true", help="Run in foreground") args = parser.parse_args() monitor = WeChatMemoryMonitor() if args.once: msgs = monitor.run(once=True) if msgs: print(json.dumps(msgs, ensure_ascii=False, indent=2)) else: print("No new messages found.") return monitor.run() if __name__ == "__main__": main()