Files
wechat-hermes-gateway/api/history_api.py
T

379 lines
12 KiB
Python

"""
WeChat History REST API Server
Starts on port 19001. Queries WeChat chat history via wxhelper DLL (http://127.0.0.1:19088).
Usage:
python history_api.py # start on 0.0.0.0:19001
python history_api.py --port 19001 # explicit port
Endpoints:
GET / → API info
GET /health → health check
GET /api/contacts → list WeChat contacts (wxid + nickname)
GET /api/history?wxid=wxid_xxx&count=20 → query chat history
POST /api/history → same via JSON body {"wxid":"...","count":20}
Requires: wxhelper DLL injected and WeChat running (wechat_agent.py handles this).
"""
import os
import json
import time
import urllib.request
import urllib.error
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from datetime import datetime
os.environ["no_proxy"] = "*"
os.environ["NO_PROXY"] = "*"
# ── Configuration ──
WX_API = "http://127.0.0.1:19088"
DEFAULT_PORT = 19001
BOT_WXID = "wxid_7onnerpx2s2l22"
HOST = "0.0.0.0"
# ── Cached state ──
nickname_cache = {}
db_handle_cache = None
# ── wxhelper API helpers ──
def wxpost(path, data=None, timeout=10):
"""Call wxhelper HTTP API."""
try:
body = json.dumps(data or {}).encode()
req = urllib.request.Request(
WX_API + path,
data=body,
headers={"Content-Type": "application/json"}
)
r = urllib.request.urlopen(req, timeout=timeout)
return json.loads(r.read().decode())
except urllib.error.HTTPError as e:
return json.loads(e.read().decode()) if e.code else {"code": -1}
except Exception as e:
return {"code": -1, "error": str(e)[:200]}
def get_db_handle():
"""Get handle for MSG*.db database containing MSG table. Cached after first call."""
global db_handle_cache
if db_handle_cache:
return db_handle_cache
r = wxpost("/api/getDBInfo", timeout=10)
dbs = r.get("data") or []
for db in dbs:
dbname = db.get("databaseName", "")
if "MSG" in dbname or "Msg" in dbname:
db_handle_cache = db.get("handle")
return db_handle_cache
return None
def get_nickname(wxid):
"""Get contact nickname from wxid, with caching."""
if wxid in nickname_cache:
return nickname_cache[wxid]
r = wxpost("/api/getContactList", timeout=10)
for c in (r.get("data") or []):
if c.get("wxid") == wxid:
nick = c.get("nickname") or c.get("customAccount") or wxid
nickname_cache[wxid] = nick
return nick
nickname_cache[wxid] = wxid
return wxid
def query_history(wxid, limit=10):
"""Query historical text messages from MSG table for a contact.
Returns list of dicts: [{CreateTime, IsSender, Type, content}, ...]"""
h = get_db_handle()
if not h:
return None
limit_val = min(int(limit), 200)
sql = (
f"SELECT CreateTime, IsSender, Type, SubType, StrContent, DisplayContent "
f"FROM MSG WHERE StrTalker='{wxid}' AND Type IN (1,49) "
f"ORDER BY CreateTime DESC LIMIT {limit_val}"
)
r = wxpost("/api/execSql", {"dbHandle": h, "sql": sql}, timeout=15)
data = r.get("data") or []
if not data or len(data) < 2:
return None
# Skip header row, reverse to chronological order
rows = data[1:]
rows.reverse()
results = []
for row in rows:
content = (row[4] or "").strip() if len(row) > 4 else ""
if not content and len(row) > 5:
content = (row[5] or "").strip()
if not content:
continue
results.append({
"CreateTime": row[0],
"IsSender": row[1],
"Type": row[2],
"content": content
})
return results
def format_history_json(wxid, rows):
"""Format raw MSG rows into JSON-serializable dict for API response."""
sender_name = get_nickname(wxid)
bot_name = get_nickname(BOT_WXID)
if not rows:
return {
"ok": True, "wxid": wxid, "sender_name": sender_name,
"count": 0, "messages": []
}
messages = []
for row in rows:
ts = int(row.get("CreateTime", 0))
time_str = datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") if ts else ""
is_sender = int(row.get("IsSender", 0))
msg_type = int(row.get("Type", 1))
content = row.get("content", "")
messages.append({
"time": time_str,
"timestamp": ts,
"sender": bot_name if is_sender else sender_name,
"is_self": bool(is_sender),
"type": msg_type,
"type_name": {1: "text", 49: "link"}.get(msg_type, f"type_{msg_type}"),
"content": content[:500]
})
return {
"ok": True,
"wxid": wxid,
"sender_name": sender_name,
"count": len(messages),
"messages": messages
}
def get_contacts():
"""Get all contacts from WeChat."""
r = wxpost("/api/getContactList", timeout=10)
contacts = r.get("data") or []
# Update cache
for c in contacts:
wxid = c.get("wxid", "")
nick = c.get("nickname") or c.get("customAccount") or wxid
nickname_cache[wxid] = nick
return [
{
"wxid": c.get("wxid", ""),
"nickname": c.get("nickname", ""),
"remark": c.get("remark", ""),
"customAccount": c.get("customAccount", ""),
}
for c in contacts
]
def get_recent_chats(limit=20):
"""Get list of contacts with recent messages."""
h = get_db_handle()
if not h:
return []
sql = (
f"SELECT StrTalker, MAX(CreateTime) as last_time, COUNT(*) as msg_count "
f"FROM MSG WHERE Type IN (1,49) "
f"GROUP BY StrTalker ORDER BY last_time DESC LIMIT {min(limit, 50)}"
)
r = wxpost("/api/execSql", {"dbHandle": h, "sql": sql}, timeout=15)
data = r.get("data") or []
if not data or len(data) < 2:
return []
results = []
for row in data[1:]:
wxid = (row[0] or "").strip()
if not wxid or wxid in ("fmessage", "weixin", "wechat", "filehelper"):
continue
if wxid.startswith("gh_"):
continue
ts = int(row[1]) if row[1] else 0
count = int(row[2]) if len(row) > 2 and row[2] else 0
results.append({
"wxid": wxid,
"nickname": get_nickname(wxid),
"last_message_time": datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") if ts else None,
"last_message_ts": ts,
"message_count": count,
})
return results
# ── HTTP Request Handler ──
class HistoryAPIHandler(BaseHTTPRequestHandler):
def _send_json(self, data, status=200):
"""Send JSON response with proper headers."""
body = json.dumps(data, ensure_ascii=False, indent=2).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(body)
def _send_error_json(self, msg, status=400):
self._send_json({"ok": False, "error": msg}, status=status)
def _read_json_body(self):
"""Read and parse JSON request body."""
length = int(self.headers.get("Content-Length", 0))
if not length:
return {}
try:
body = self.rfile.read(length)
return json.loads(body)
except json.JSONDecodeError:
return {}
def do_GET(self):
parsed = urlparse(self.path)
path = parsed.path.rstrip("/") or "/"
params = parse_qs(parsed.query)
# ── Root / Health ──
if path in ("/", ""):
self._send_json({
"service": "WeChat History API",
"version": "1.0",
"port": DEFAULT_PORT,
"wxhelper": WX_API,
"endpoints": {
"GET /api/contacts": "List all WeChat contacts",
"GET /api/history": "Query chat history (params: wxid, count)",
"POST /api/history": "Same via JSON body",
"GET /api/recent": "Recent chats list",
"GET /health": "Health check",
}
})
return
if path == "/health":
self._send_json({
"status": "ok",
"timestamp": datetime.now().isoformat(),
"wxhelper": bool(get_db_handle())
})
return
# ── Contacts ──
if path == "/api/contacts":
contacts = get_contacts()
self._send_json({"ok": True, "count": len(contacts), "contacts": contacts})
return
# ── Recent Chats ──
if path == "/api/recent":
limit = int(params.get("limit", ["20"])[0])
chats = get_recent_chats(limit)
self._send_json({"ok": True, "count": len(chats), "chats": chats})
return
# ── History ──
if path == "/api/history":
wxid = params.get("wxid", [""])[0]
count = params.get("count", ["10"])[0]
if not wxid:
self._send_error_json("Missing required parameter: wxid")
return
try:
rows = query_history(wxid, count)
result = format_history_json(wxid, rows)
self._send_json(result)
except Exception as e:
self._send_error_json(str(e)[:200], status=500)
return
# ── 404 ──
self._send_error_json(f"Not found: {path}", status=404)
def do_POST(self):
parsed = urlparse(self.path)
path = parsed.path.rstrip("/") or "/"
body = self._read_json_body()
# ── History (POST) ──
if path in ("/api/history", "/history"):
wxid = (body.get("wxid", "") or "").strip()
count = body.get("count", 10) or body.get("limit", 10)
if not wxid:
self._send_error_json("Missing required field: wxid")
return
try:
rows = query_history(wxid, count)
result = format_history_json(wxid, rows)
self._send_json(result)
except Exception as e:
self._send_error_json(str(e)[:200], status=500)
return
# ── 404 ──
self._send_error_json(f"Not found: {path}", status=404)
def do_OPTIONS(self):
"""Handle CORS preflight."""
self.send_response(200)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def log_message(self, *args):
# Suppress default access logs to stdout
pass
# ── Main ──
def main():
import sys
port = DEFAULT_PORT
if "--port" in sys.argv:
idx = sys.argv.index("--port")
if idx + 1 < len(sys.argv):
port = int(sys.argv[idx + 1])
# Check wxhelper connectivity
print(f"[History API] Checking wxhelper at {WX_API}...")
try:
r = wxpost("/api/checkLogin", timeout=5)
if r.get("code") == 1:
print("[History API] wxhelper ONLINE")
db_handle = get_db_handle()
print(f"[History API] DB handle: {db_handle or 'NOT FOUND'}")
else:
print(f"[History API] WARNING: wxhelper not logged in: {r}")
except Exception as e:
print(f"[History API] WARNING: Cannot reach wxhelper: {e}")
# Start server
server = HTTPServer((HOST, port), HistoryAPIHandler)
print(f"[History API] Listening on http://{HOST}:{port}")
print(f"[History API] Endpoints:")
print(f" GET http://localhost:{port}/")
print(f" GET http://localhost:{port}/health")
print(f" GET http://localhost:{port}/api/contacts")
print(f" GET http://localhost:{port}/api/recent")
print(f" GET http://localhost:{port}/api/history?wxid=wxid_xxx&count=20")
print(f" POST http://localhost:{port}/api/history")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n[History API] Shutting down...")
server.shutdown()
if __name__ == "__main__":
main()