#!/usr/bin/env python3 """ MoWeChat Message Monitor v2 — reads WeChat process memory to capture incoming messages. Improvements over v1: - Better message extraction: looks for content near known wxids with length-prefix structure - Content filtering: rejects binary garbage, only keeps real text - Tracks messages by content + wxid hash """ import os import re import sys import json import time import hashlib import logging import argparse import urllib.request import urllib.error # ── Configuration ────────────────────────────────────────────── SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) LOG_DIR = os.path.join(SCRIPT_DIR, "..", "logs") os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE = os.path.join(LOG_DIR, "wechat_msg_monitor.log") SAWN_FILE = os.path.join(LOG_DIR, "wechat_seen_messages.json") HERMES_API = "http://192.168.1.246:8642/v1/chat/completions" HERMES_KEY = "hermes123" POLL_INTERVAL = 3 WECHAT_BINARY_MARKER = "/opt/wechat/wechat" # Known message sender wxids (populated during scanning) OWN_WXID = "wxid_c0a6izmwd78y22" # 老爸 (莫语不语) BOT_WXID = "wxid_7onnerpx2s2l22" # 莫荷自己 INTERESTING_WXIDS = {OWN_WXID, BOT_WXID} logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) log = logging.getLogger("mowechat") def is_valid_text(s, min_ratio=0.5): """Check if a string looks like real text (vs binary garbage).""" if len(s) < 2: return False # Count printable chars printable = 0 for ch in s: if ch.isprintable() and (ch.isalpha() or ch.isspace() or ch.isdigit() or ch in '.,!?;:\'\"-()[]{}@#_/\\'): printable += 1 return printable / max(len(s), 1) >= min_ratio def extract_strings(data, min_len=4): """Extract readable strings from binary data.""" result = [] current = b'' for b in data: if 32 <= b < 127 or b in (0x0a, 0x0d, 0x09): current += bytes([b]) elif b >= 0x80: # Part of multi-byte UTF-8 current += bytes([b]) else: if len(current) >= min_len: try: decoded = current.decode('utf-8', errors='replace') if is_valid_text(decoded): result.append(decoded) except: pass current = b'' if len(current) >= min_len: try: decoded = current.decode('utf-8', errors='replace') if is_valid_text(decoded): result.append(decoded) except: pass return result class WeChatMemoryMonitor: """Monitors WeChat process memory for new messages.""" def __init__(self): self.pid = None self.seen = self._load_seen() self.heap_region = None self.wxid_pattern = re.compile(rb'wxid_[a-zA-Z0-9]{10,28}\x00') # Known message sources self.known_wxids = set(INTERESTING_WXIDS) def _load_seen(self): try: with open(SAWN_FILE, 'r') as f: return set(json.load(f)) except: return set() def _save_seen(self): trimmed = set(list(self.seen)[-2000:]) try: with open(SAWN_FILE, 'w') as f: json.dump(list(trimmed), f) except: pass def find_wechat(self): """Find the main wechat process PID and heap region.""" for p in os.listdir('/proc'): if not p.isdigit(): continue try: with open(f'/proc/{p}/maps', 'r') as f: content = f.read(8192) if WECHAT_BINARY_MARKER in content: self.pid = int(p) # Find heap for line in content.split('\n'): if '[heap]' in line: parts = line.split() addr_range = parts[0].split('-') self.heap_region = (int(addr_range[0], 16), int(addr_range[1], 16)) return True except: continue return False def scan_message(self, mem, wxid_bytes, wxid_pos, wxid_end): """Try to extract a real message following a wxid in memory.""" wxid_str = wxid_bytes.decode('utf-8', errors='replace').strip('\x00') # Search within 512 bytes after the wxid for message content search_start = wxid_end search_end = min(search_start + 512, self.heap_region[1] - self.heap_region[0] if self.heap_region else search_start + 512) try: mem.seek(search_start) data = mem.read(search_end - search_start) except: return None # Look for null-terminated strings that look like messages messages = [] current = b'' for b in data: if b == 0: if len(current) >= 3: try: text = current.decode('utf-8', errors='replace') # Filter: must have real text content if is_valid_text(text) and len(text) >= 2 and not text.startswith('wxid_'): messages.append(text) except: pass current = b'' else: current += bytes([b]) # Also try to find message by looking for it at a known offset pattern # In WeChat's structure: [msg_type(4)] [svr_id(8)] [content_ptr(8)] [content_len(4)] [content...] if not messages: return None # Pick the best candidate (longest, most printable) best = max(messages, key=lambda m: (len(m), sum(1 for c in m if c.isalpha() or c.isdigit()))) return best def scan_heap(self): """Scan the heap region for messages.""" if not self.heap_region: return [] start, end = self.heap_region messages = [] try: with open(f'/proc/{self.pid}/mem', 'rb') as mem: # Read entire heap mem.seek(start) size = min(end - start, 50 * 1024 * 1024) # Max 50MB data = mem.read(size) # Find all wxid occurrences for match in self.wxid_pattern.finditer(data): wxid = match.group(0).decode('utf-8', errors='replace').strip('\x00') pos = match.start() global_pos = start + pos # Look for message content in the next 256 bytes content_area = data[pos + len(match.group()):pos + len(match.group()) + 256] # Try to find a null-terminated UTF-8 string that looks like a message for cmatch in re.finditer(rb'([\x20-\x7e\x80-\xff\x00]{4,})', content_area): raw = cmatch.group(0) # Remove trailing nulls raw = raw.rstrip(b'\x00') if len(raw) < 3: continue try: text = raw.decode('utf-8', errors='replace') except: continue # FILTER: Must have substantial real text content alpha_count = sum(1 for c in text if c.isalpha() or '\u4e00' <= c <= '\u9fff') total_len = len(text) if total_len < 3: continue # Skip if it's just another wxid if text.startswith('wxid_'): continue # Skip binary garbage (must be >= 40% alphabetic/CJK chars) if alpha_count / max(total_len, 1) < 0.3: continue # Skip if it looks like a URL/path with no message content if text.startswith('http') or text.startswith('/'): continue # Create hash for dedup msg_hash = hashlib.md5(f"{wxid}:{text}".encode()).hexdigest() if msg_hash not in self.seen: self.seen.add(msg_hash) messages.append({ 'wxid': wxid, 'content': text, 'pos': hex(global_pos), 'alpha_ratio': f"{alpha_count/total_len:.2f}", }) break # One best message per wxid occurrence except (PermissionError, ProcessLookupError) as e: log.warning(f"Memory read failed: {e}") except Exception as e: log.error(f"Heap scan error: {e}") return messages def forward_to_hermes(self, msg): """Forward to Hermes Gateway.""" payload = json.dumps({ "model": "nova-4", "messages": [ {"role": "system", "content": "You receive WeChat messages. Process according to standard pipeline."}, {"role": "user", "content": f"[WeChat] From: {msg['wxid']}\n{msg['content']}"} ] }).encode('utf-8') try: req = urllib.request.Request( HERMES_API, data=payload, headers={"Content-Type": "application/json", "Authorization": f"Bearer {HERMES_KEY}"}, method="POST" ) urllib.request.urlopen(req, timeout=3) log.info(f"Forwarded: {msg['wxid']}: {msg['content'][:60]}") except Exception as e: log.warning(f"Forward failed: {e}") def run(self, once=False): if not self.find_wechat(): log.error("WeChat not found!") return [] log.info(f"WeChat PID: {self.pid}, heap: 0x{self.heap_region[0]:x}-0x{self.heap_region[1]:x}" if self.heap_region else f"PID: {self.pid}, no heap") if once: messages = self.scan_heap() seen_wxids = set() for msg in messages: if msg['wxid'] not in seen_wxids: log.info(f" [{msg['wxid']}] {msg['content'][:80]}") seen_wxids.add(msg['wxid']) if messages: self._save_seen() return messages log.info(f"Monitoring every {POLL_INTERVAL}s...") while True: try: if not os.path.exists(f'/proc/{self.pid}'): log.warning("WeChat died, re-finding...") if not self.find_wechat(): time.sleep(30) continue messages = self.scan_heap() for msg in messages: log.info(f"NEW: [{msg['wxid']}] {msg['content'][:80]}") # self.forward_to_hermes(msg) if messages: self._save_seen() time.sleep(POLL_INTERVAL) except KeyboardInterrupt: break except Exception as e: log.error(f"Error: {e}") time.sleep(10) return True def main(): parser = argparse.ArgumentParser() parser.add_argument("--once", action="store_true") parser.add_argument("--foreground", action="store_true") args = parser.parse_args() monitor = WeChatMemoryMonitor() if args.once: msgs = monitor.run(once=True) # Only show real messages (from known contacts or with good content) real_msgs = [m for m in msgs if m['wxid'] in INTERESTING_WXIDS or float(m.get('alpha_ratio', 0)) > 0.5] print(f"\nFound {len(msgs)} potential messages, {len(real_msgs)} from known contacts") for m in real_msgs: print(f" [{m['wxid']}] {m['content'][:100]}") print(f"\n(Total seen: {len(monitor.seen)})") return monitor.run() if __name__ == "__main__": main()