Files

152 lines
4.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Mohe & XiaoXiaoMo Chat Viewer
Fixes one corrupted byte in session export, then parses cleanly.
Usage:
python moho_chat.py <session_id> [minutes]
Examples:
python moho_chat.py ses_1d95d15c4ffehQaZ6hrbIbak5k
python moho_chat.py ses_1d95d15c4ffehQaZ6hrbIbak5k 30
"""
import subprocess, sys, os, tempfile, re, json
from datetime import datetime, timezone, timedelta
def export(session_id):
"""Export session via cmd.exe (preserves UTF-16LE), return path."""
tmp = tempfile.NamedTemporaryFile(suffix='.json', delete=False)
tmp.close()
r = subprocess.run(
f'opencode.cmd export {session_id} > "{tmp.name}" 2>nul',
shell=True, timeout=180
)
if r.returncode != 0 or os.path.getsize(tmp.name) == 0:
raise ValueError('Export failed')
return tmp.name
def load(filepath):
"""Read UTF-16LE, fix corruption, parse JSON, return messages."""
with open(filepath, 'rb') as f:
raw = f.read()
# Decode UTF-16LE
if raw[:2] == b'\xff\xfe':
text = raw.decode('utf-16-le', errors='replace')
else:
text = raw.decode('utf-8', errors='replace')
# Strip to first {
brace = text.find('{')
if brace > 0:
text = text[brace:]
# FIX THE CORRUPTION:
# The first message's text field is missing its closing quote.
# Pattern: corrupt_char, comma, newline, then a field name
# Insert missing closing quote before the comma
# Specific fix: after corrupted chars that precede a structural comma
fixed = re.sub(
r'(\uFFFD)\s*,\s*\n\s*"(id|role|parts|info|timestamp|type|text)"',
r'?",\n "\2"',
text
)
# Also try without \uFFFD (in case it was decoded differently)
fixed = re.sub(
r'(\?)\s*,\s*\n\s*"(id|role|parts|info|timestamp|type|text)"',
r'?",\n "\2"',
fixed
)
return json.loads(fixed).get('messages', [])
def msg_text(msg):
parts = msg.get('parts', [])
text = ''
for p in parts:
if isinstance(p, dict) and p.get('type') == 'text':
text += p.get('text', '')
return text.strip()
def msg_ts(msg):
info = msg.get('info', {})
ts = info.get('timestamp', '') or ''
if not ts:
t = info.get('time', {})
if isinstance(t, dict) and t.get('created'):
ts = datetime.fromtimestamp(t['created']/1000, tz=timezone.utc).isoformat()
return ts
def main():
if len(sys.argv) < 2:
print(__doc__)
sys.exit(1)
session_id = sys.argv[1]
minutes = int(sys.argv[2]) if len(sys.argv) > 2 else None
print('Exporting...', file=sys.stderr)
tmp = export(session_id)
try:
messages = load(tmp)
finally:
os.unlink(tmp)
print(f'{len(messages)} messages', file=sys.stderr)
# Search for 测试新协议 (Moho's test message keyword)
found_moho = 0
for m in messages:
t = msg_text(m)
if '测试新协议' in t or '写入Session' in t or '23 × 17' in t:
found_moho += 1
if found_moho <= 5:
print(f' [Moho #{found_moho}] {t[:200]!r}', file=sys.stderr)
print(f' Moho explicit messages: {found_moho}', file=sys.stderr)
cutoff = None
if minutes:
cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes)
results = []
for m in messages:
text = msg_text(m)
if not (text.startswith('[mohe]') or text.startswith('[xxm]')):
continue
sender = '[mohe] Mohe' if text.startswith('[mohe]') else '[xxm] XiaoXiaoMo'
display = re.sub(r'^\[\w+\]\s*', '', text, count=1).strip()
ts_str = msg_ts(m)
ts_dt = None
if ts_str:
try: ts_dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
except: pass
if cutoff and ts_dt and ts_dt < cutoff:
continue
results.append((ts_dt or datetime.min, ts_str, sender, display))
results.sort(key=lambda x: x[0])
if not results:
print('No [mohe]/[xxm] messages found' + (f' in last {minutes} min' if minutes else ''))
return
print(f'\n{len(results)} message(s)' + (f' (last {minutes} min)' if minutes else ''))
print('=' * 60)
for ts_dt, ts_str, sender, display in results:
t = '??'
if ts_str:
try: t = datetime.fromisoformat(ts_str.replace('Z', '+00:00')).strftime('%H:%M:%S')
except: t = str(ts_str)[:19]
print(f'[{t}] {sender}: {display}')
if __name__ == '__main__':
main()