152 lines
4.5 KiB
Python
152 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Mohe & XiaoXiaoMo Chat Viewer
|
||
Fixes one corrupted byte in session export, then parses cleanly.
|
||
|
||
Usage:
|
||
python moho_chat.py <session_id> [minutes]
|
||
|
||
Examples:
|
||
python moho_chat.py ses_1d95d15c4ffehQaZ6hrbIbak5k
|
||
python moho_chat.py ses_1d95d15c4ffehQaZ6hrbIbak5k 30
|
||
"""
|
||
|
||
import subprocess, sys, os, tempfile, re, json
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
def export(session_id):
|
||
"""Export session via cmd.exe (preserves UTF-16LE), return path."""
|
||
tmp = tempfile.NamedTemporaryFile(suffix='.json', delete=False)
|
||
tmp.close()
|
||
r = subprocess.run(
|
||
f'opencode.cmd export {session_id} > "{tmp.name}" 2>nul',
|
||
shell=True, timeout=180
|
||
)
|
||
if r.returncode != 0 or os.path.getsize(tmp.name) == 0:
|
||
raise ValueError('Export failed')
|
||
return tmp.name
|
||
|
||
def load(filepath):
|
||
"""Read UTF-16LE, fix corruption, parse JSON, return messages."""
|
||
with open(filepath, 'rb') as f:
|
||
raw = f.read()
|
||
|
||
# Decode UTF-16LE
|
||
if raw[:2] == b'\xff\xfe':
|
||
text = raw.decode('utf-16-le', errors='replace')
|
||
else:
|
||
text = raw.decode('utf-8', errors='replace')
|
||
|
||
# Strip to first {
|
||
brace = text.find('{')
|
||
if brace > 0:
|
||
text = text[brace:]
|
||
|
||
# FIX THE CORRUPTION:
|
||
# The first message's text field is missing its closing quote.
|
||
# Pattern: corrupt_char, comma, newline, then a field name
|
||
# Insert missing closing quote before the comma
|
||
|
||
# Specific fix: after corrupted chars that precede a structural comma
|
||
fixed = re.sub(
|
||
r'(\uFFFD)\s*,\s*\n\s*"(id|role|parts|info|timestamp|type|text)"',
|
||
r'?",\n "\2"',
|
||
text
|
||
)
|
||
|
||
# Also try without \uFFFD (in case it was decoded differently)
|
||
fixed = re.sub(
|
||
r'(\?)\s*,\s*\n\s*"(id|role|parts|info|timestamp|type|text)"',
|
||
r'?",\n "\2"',
|
||
fixed
|
||
)
|
||
|
||
return json.loads(fixed).get('messages', [])
|
||
|
||
def msg_text(msg):
|
||
parts = msg.get('parts', [])
|
||
text = ''
|
||
for p in parts:
|
||
if isinstance(p, dict) and p.get('type') == 'text':
|
||
text += p.get('text', '')
|
||
return text.strip()
|
||
|
||
def msg_ts(msg):
|
||
info = msg.get('info', {})
|
||
ts = info.get('timestamp', '') or ''
|
||
if not ts:
|
||
t = info.get('time', {})
|
||
if isinstance(t, dict) and t.get('created'):
|
||
ts = datetime.fromtimestamp(t['created']/1000, tz=timezone.utc).isoformat()
|
||
return ts
|
||
|
||
def main():
|
||
if len(sys.argv) < 2:
|
||
print(__doc__)
|
||
sys.exit(1)
|
||
|
||
session_id = sys.argv[1]
|
||
minutes = int(sys.argv[2]) if len(sys.argv) > 2 else None
|
||
|
||
print('Exporting...', file=sys.stderr)
|
||
tmp = export(session_id)
|
||
|
||
try:
|
||
messages = load(tmp)
|
||
finally:
|
||
os.unlink(tmp)
|
||
|
||
print(f'{len(messages)} messages', file=sys.stderr)
|
||
|
||
# Search for 测试新协议 (Moho's test message keyword)
|
||
found_moho = 0
|
||
for m in messages:
|
||
t = msg_text(m)
|
||
if '测试新协议' in t or '写入Session' in t or '23 × 17' in t:
|
||
found_moho += 1
|
||
if found_moho <= 5:
|
||
print(f' [Moho #{found_moho}] {t[:200]!r}', file=sys.stderr)
|
||
print(f' Moho explicit messages: {found_moho}', file=sys.stderr)
|
||
|
||
cutoff = None
|
||
if minutes:
|
||
cutoff = datetime.now(timezone.utc) - timedelta(minutes=minutes)
|
||
|
||
results = []
|
||
|
||
for m in messages:
|
||
text = msg_text(m)
|
||
if not (text.startswith('[mohe]') or text.startswith('[xxm]')):
|
||
continue
|
||
|
||
sender = '[mohe] Mohe' if text.startswith('[mohe]') else '[xxm] XiaoXiaoMo'
|
||
display = re.sub(r'^\[\w+\]\s*', '', text, count=1).strip()
|
||
ts_str = msg_ts(m)
|
||
|
||
ts_dt = None
|
||
if ts_str:
|
||
try: ts_dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
|
||
except: pass
|
||
if cutoff and ts_dt and ts_dt < cutoff:
|
||
continue
|
||
|
||
results.append((ts_dt or datetime.min, ts_str, sender, display))
|
||
|
||
results.sort(key=lambda x: x[0])
|
||
|
||
if not results:
|
||
print('No [mohe]/[xxm] messages found' + (f' in last {minutes} min' if minutes else ''))
|
||
return
|
||
|
||
print(f'\n{len(results)} message(s)' + (f' (last {minutes} min)' if minutes else ''))
|
||
print('=' * 60)
|
||
for ts_dt, ts_str, sender, display in results:
|
||
t = '??'
|
||
if ts_str:
|
||
try: t = datetime.fromisoformat(ts_str.replace('Z', '+00:00')).strftime('%H:%M:%S')
|
||
except: t = str(ts_str)[:19]
|
||
print(f'[{t}] {sender}: {display}')
|
||
|
||
if __name__ == '__main__':
|
||
main()
|