#!/usr/bin/env python3 """ MoWeChat Message Monitor v3 — precise message extraction from WeChat heap. Strategy: look for message content near known wxids with structural patterns. """ import os, re, sys, json, time, hashlib, struct, logging, argparse SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) LOG_DIR = os.path.join(SCRIPT_DIR, "..", "logs") os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE = os.path.join(LOG_DIR, "wechat_msg_v3.log") SAWN_FILE = os.path.join(LOG_DIR, "wechat_seen_v3.json") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s", handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()]) log = logging.getLogger("mv3") # Known wxids (discovered from memory) OWN_WXID = "wxid_c0a6izmwd78y22" # 莫语不语 (老爸) BOT_WXID = "wxid_7onnerpx2s2l22" # 莫荷 INTERESTING = {OWN_WXID, BOT_WXID} class Monitor: def __init__(self): self.pid = None self.heap = (0, 0) self.seen = self._load_seen() self.wxid_re = re.compile(rb'wxid_[a-zA-Z0-9]{10,28}\x00') def _load_seen(self): try: with open(SAWN_FILE) as f: return set(json.load(f)) except: return set() def _save_seen(self): s = set(list(self.seen)[-2000:]) try: with open(SAWN_FILE, 'w') as f: json.dump(list(s), f) except: pass def find_wechat(self): for p in os.listdir('/proc'): if not p.isdigit(): continue try: with open(f'/proc/{p}/maps') as f: c = f.read() if "/opt/wechat/wechat" in c: self.pid = int(p) for line in c.split('\n'): if '[heap]' in line: a = line.split()[0].split('-') self.heap = (int(a[0], 16), int(a[1], 16)) return True except: continue return False def is_valid_msg(self, text): """Check if text is a real WeChat message.""" if len(text) < 2 or len(text) > 5000: return False if text.startswith('wxid_') or text.startswith('http') or text.startswith('/'): return False # Count CJK + ASCII letters + digits good = sum(1 for c in text if c.isalpha() or c.isdigit() or c.isspace() or c in '.,!?;:\'"-()[]{}@#_/\\') if good / max(len(text), 1) < 0.6: return False # Must have at least 3 CJK chars OR 5 ASCII chars cjk = sum(1 for c in text if '\u4e00' <= c <= '\u9fff') ascii_alpha = sum(1 for c in text if c.isascii() and c.isalpha()) return cjk >= 2 or ascii_alpha >= 4 def scan(self): start, end = self.heap if not start: return [] try: with open(f'/proc/{self.pid}/mem', 'rb') as mem: mem.seek(start) data = mem.read(min(end - start, 60 * 1024 * 1024)) except: return [] results = [] # Strategy: find wxid -> look for a nearby null-terminated UTF-8 string # that looks like real message content # First pass: find all wxid positions in a bounded range for m in self.wxid_re.finditer(data): wxid = m.group(0).decode().strip('\x00') if wxid not in INTERESTING: continue pos = m.end() # position after wxid\0 # Scan forward up to 256 bytes for a printable string scan_end = min(pos + 256, len(data)) chunk = data[pos:scan_end] # Find the first null-terminated ASCII/UTF-8 string # that's at least 3 chars and not binary garbage i = 0 while i < len(chunk): if chunk[i] == 0: i += 1 continue # Start of potential string s_start = i while i < len(chunk) and chunk[i] != 0 and chunk[i] >= 0x20: i += 1 s_len = i - s_start if s_len >= 3: try: text = chunk[s_start:s_start+s_len].decode('utf-8', errors='replace') if self.is_valid_msg(text): h = hashlib.md5(f"{wxid}:{text}".encode()).hexdigest() if h not in self.seen: self.seen.add(h) results.append({'wxid': wxid, 'text': text}) except: pass # Skip null while i < len(chunk) and chunk[i] == 0: i += 1 # Second strategy: scan heap for standalone CJK strings >= 4 chars # that are NOT preceded by known binary patterns (to catch the actual # message content which may be at a different address than the wxid) for cm in re.finditer(rb'([\x80-\xff][\x80-\xff][\x80-\xff][\x80-\xff])', data): pos = cm.start() # Read up to 200 bytes from here snippet = data[pos:pos+200] # Find first null byte null_pos = snippet.find(b'\x00') if null_pos > 0: snippet = snippet[:null_pos] if len(snippet) < 4: continue try: text = snippet.decode('utf-8', errors='replace') except: continue # Only accept strings with substantial CJK content cjk = sum(1 for c in text if '\u4e00' <= c <= '\u9fff') if cjk < 2: continue if len(text) > 200: text = text[:200] h = hashlib.md5(f"cjk:{text}".encode()).hexdigest() if h not in self.seen and self.is_valid_msg(text): self.seen.add(h) results.append({'wxid': 'unknown', 'text': text}) if results: self._save_seen() return results def run(self, once=False): if not self.find_wechat(): log.error("WeChat not found") return [] log.info(f"PID {self.pid}, heap 0x{self.heap[0]:x}") if once: msgs = self.scan() for m in msgs: log.info(f" [{m['wxid']}] {m['text'][:80]}") return msgs while True: try: if not os.path.exists(f'/proc/{self.pid}'): log.warning("WeChat died") if not self.find_wechat(): time.sleep(30) continue msgs = self.scan() for m in msgs: log.info(f"NEW [{m['wxid']}] {m['text'][:80]}") time.sleep(3) except KeyboardInterrupt: break except: time.sleep(10) if __name__ == "__main__": m = Monitor() if '--once' in sys.argv: msgs = m.run(once=True) print(f"\nFound {len(msgs)} messages") for msg in msgs: print(f" [{msg['wxid']}] {msg['text']}") else: m.run()