""" WeChat History REST API Server Starts on port 19001. Queries WeChat chat history via wxhelper DLL (http://127.0.0.1:19088). Usage: python history_api.py # start on 0.0.0.0:19001 python history_api.py --port 19001 # explicit port Endpoints: GET / → API info GET /health → health check GET /api/contacts → list WeChat contacts (wxid + nickname) GET /api/history?wxid=wxid_xxx&count=20 → query chat history POST /api/history → same via JSON body {"wxid":"...","count":20} Requires: wxhelper DLL injected and WeChat running (wechat_agent.py handles this). """ import os import json import time import urllib.request import urllib.error from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs from datetime import datetime os.environ["no_proxy"] = "*" os.environ["NO_PROXY"] = "*" # ── Configuration ── WX_API = "http://127.0.0.1:19088" DEFAULT_PORT = 19001 BOT_WXID = "wxid_7onnerpx2s2l22" HOST = "0.0.0.0" # ── Cached state ── nickname_cache = {} db_handle_cache = None # ── wxhelper API helpers ── def wxpost(path, data=None, timeout=10): """Call wxhelper HTTP API.""" try: body = json.dumps(data or {}).encode() req = urllib.request.Request( WX_API + path, data=body, headers={"Content-Type": "application/json"} ) r = urllib.request.urlopen(req, timeout=timeout) return json.loads(r.read().decode()) except urllib.error.HTTPError as e: return json.loads(e.read().decode()) if e.code else {"code": -1} except Exception as e: return {"code": -1, "error": str(e)[:200]} def get_db_handle(): """Get handle for MSG*.db database containing MSG table. Cached after first call.""" global db_handle_cache if db_handle_cache: return db_handle_cache r = wxpost("/api/getDBInfo", timeout=10) dbs = r.get("data") or [] for db in dbs: dbname = db.get("databaseName", "") if dbname.startswith("MSG") and "Media" not in dbname: db_handle_cache = db.get("handle") return db_handle_cache return None def get_nickname(wxid): """Get contact nickname from wxid, with caching.""" if wxid in nickname_cache: return nickname_cache[wxid] r = wxpost("/api/getContactList", timeout=10) for c in (r.get("data") or []): if c.get("wxid") == wxid: nick = c.get("nickname") or c.get("customAccount") or wxid nickname_cache[wxid] = nick return nick nickname_cache[wxid] = wxid return wxid def query_history(wxid, limit=10): """Query historical text messages from MSG table for a contact. Returns list of dicts: [{CreateTime, IsSender, Type, content}, ...]""" h = get_db_handle() if not h: return None limit_val = min(int(limit), 200) sql = ( f"SELECT CreateTime, IsSender, Type, SubType, StrContent, DisplayContent, CompressContent, BytesExtra " f"FROM MSG WHERE StrTalker='{wxid}' AND Type IN ('1','49') " f"ORDER BY CreateTime DESC LIMIT {limit_val}" ) r = wxpost("/api/execSql", {"dbHandle": h, "sql": sql}, timeout=15) data = r.get("data") or [] if not data or len(data) < 2: return None # wxhelper returns [{value: [cols]}, {value: [row1]}, ...] rows = [item.get("value", item) if isinstance(item, dict) else item for item in data] rows = rows[1:] # skip header rows.reverse() results = [] for row in rows: content = (row[4] or "").strip() if len(row) > 4 else "" if not content and len(row) > 5: content = (row[5] or "").strip() # Type 49 (article link): extract URL from CompressContent or BytesExtra if not content and str(row[2]) == "49": try: import re # Try BytesExtra first (row[7]) for idx in [7, 6]: if idx < len(row) and row[idx]: text = str(row[idx]) urls = re.findall(r'https?://[^\s\x00-\x1f<>\"\']{10,}', text) if urls: content = urls[0] break except: pass if not content: if str(row[2]) == "49": content = "[文章链接]" else: continue results.append({ "CreateTime": row[0], "IsSender": row[1], "Type": row[2], "content": content }) return results def format_history_json(wxid, rows): """Format raw MSG rows into JSON-serializable dict for API response.""" sender_name = get_nickname(wxid) bot_name = get_nickname(BOT_WXID) if not rows: return { "ok": True, "wxid": wxid, "sender_name": sender_name, "count": 0, "messages": [] } messages = [] for row in rows: ts = int(row.get("CreateTime", 0)) time_str = datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") if ts else "" is_sender = int(row.get("IsSender", 0)) msg_type = int(row.get("Type", 1)) content = row.get("content", "") messages.append({ "time": time_str, "timestamp": ts, "sender": bot_name if is_sender else sender_name, "is_self": bool(is_sender), "type": msg_type, "type_name": {1: "text", 49: "link"}.get(msg_type, f"type_{msg_type}"), "content": content[:500] }) return { "ok": True, "wxid": wxid, "sender_name": sender_name, "count": len(messages), "messages": messages } def get_contacts(): """Get all contacts from WeChat.""" r = wxpost("/api/getContactList", timeout=10) contacts = r.get("data") or [] # Update cache for c in contacts: wxid = c.get("wxid", "") nick = c.get("nickname") or c.get("customAccount") or wxid nickname_cache[wxid] = nick return [ { "wxid": c.get("wxid", ""), "nickname": c.get("nickname", ""), "remark": c.get("remark", ""), "customAccount": c.get("customAccount", ""), } for c in contacts ] def get_recent_chats(limit=20): """Get list of contacts with recent messages.""" h = get_db_handle() if not h: return [] sql = ( f"SELECT DISTINCT StrTalker FROM MSG WHERE Type IN ('1','49') " f"LIMIT {min(limit, 50)}" ) r = wxpost("/api/execSql", {"dbHandle": h, "sql": sql}, timeout=15) data = r.get("data") or [] if not data or len(data) < 2: return [] rows = [item.get("value", item) if isinstance(item, dict) else item for item in data] results = [] for row in rows[1:]: wxid = (row[0] or "").strip() if not wxid or wxid in ("fmessage", "weixin", "wechat", "filehelper", "medianote", "floatbottle", "qmessage"): continue results.append({ "wxid": wxid, "nickname": get_nickname(wxid), "last_message_time": None, "last_message_ts": 0, "message_count": 0, }) return results # ── HTTP Request Handler ── class HistoryAPIHandler(BaseHTTPRequestHandler): def _send_json(self, data, status=200): """Send JSON response with proper headers.""" body = json.dumps(data, ensure_ascii=False, indent=2).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(body) def _send_error_json(self, msg, status=400): self._send_json({"ok": False, "error": msg}, status=status) def _read_json_body(self): """Read and parse JSON request body.""" length = int(self.headers.get("Content-Length", 0)) if not length: return {} try: body = self.rfile.read(length) return json.loads(body) except json.JSONDecodeError: return {} def do_GET(self): parsed = urlparse(self.path) path = parsed.path.rstrip("/") or "/" params = parse_qs(parsed.query) # ── Root / Health ── if path in ("/", ""): self._send_json({ "service": "WeChat History API", "version": "1.0", "port": DEFAULT_PORT, "wxhelper": WX_API, "endpoints": { "GET /api/contacts": "List all WeChat contacts", "GET /api/history": "Query chat history (params: wxid, count)", "POST /api/history": "Same via JSON body", "GET /api/recent": "Recent chats list", "GET /health": "Health check", } }) return if path == "/health": self._send_json({ "status": "ok", "timestamp": datetime.now().isoformat(), "wxhelper": bool(get_db_handle()) }) return # ── Contacts ── if path == "/api/contacts": contacts = get_contacts() self._send_json({"ok": True, "count": len(contacts), "contacts": contacts}) return # ── Recent Chats ── if path == "/api/recent": limit = int(params.get("limit", ["20"])[0]) chats = get_recent_chats(limit) self._send_json({"ok": True, "count": len(chats), "chats": chats}) return # ── History ── if path == "/api/history": wxid = params.get("wxid", [""])[0] count = params.get("count", ["10"])[0] if not wxid: self._send_error_json("Missing required parameter: wxid") return try: rows = query_history(wxid, count) result = format_history_json(wxid, rows) self._send_json(result) except Exception as e: self._send_error_json(str(e)[:200], status=500) return # ── 404 ── self._send_error_json(f"Not found: {path}", status=404) def do_POST(self): parsed = urlparse(self.path) path = parsed.path.rstrip("/") or "/" body = self._read_json_body() # ── History (POST) ── if path in ("/api/history", "/history"): wxid = (body.get("wxid", "") or "").strip() count = body.get("count", 10) or body.get("limit", 10) if not wxid: self._send_error_json("Missing required field: wxid") return try: rows = query_history(wxid, count) result = format_history_json(wxid, rows) self._send_json(result) except Exception as e: self._send_error_json(str(e)[:200], status=500) return # ── 404 ── self._send_error_json(f"Not found: {path}", status=404) def do_OPTIONS(self): """Handle CORS preflight.""" self.send_response(200) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() def log_message(self, *args): # Suppress default access logs to stdout pass # ── Main ── def main(): import sys port = DEFAULT_PORT if "--port" in sys.argv: idx = sys.argv.index("--port") if idx + 1 < len(sys.argv): port = int(sys.argv[idx + 1]) # Check wxhelper connectivity print(f"[History API] Checking wxhelper at {WX_API}...") try: r = wxpost("/api/checkLogin", timeout=5) if r.get("code") == 1: print("[History API] wxhelper ONLINE") db_handle = get_db_handle() print(f"[History API] DB handle: {db_handle or 'NOT FOUND'}") else: print(f"[History API] WARNING: wxhelper not logged in: {r}") except Exception as e: print(f"[History API] WARNING: Cannot reach wxhelper: {e}") # Start server server = HTTPServer((HOST, port), HistoryAPIHandler) print(f"[History API] Listening on http://{HOST}:{port}") print(f"[History API] Endpoints:") print(f" GET http://localhost:{port}/") print(f" GET http://localhost:{port}/health") print(f" GET http://localhost:{port}/api/contacts") print(f" GET http://localhost:{port}/api/recent") print(f" GET http://localhost:{port}/api/history?wxid=wxid_xxx&count=20") print(f" POST http://localhost:{port}/api/history") try: server.serve_forever() except KeyboardInterrupt: print("\n[History API] Shutting down...") server.shutdown() if __name__ == "__main__": main()