#!/usr/bin/env python3 """Kanban HTTP API — 轻量 REST 接口,供跨机器 Agent 读写 kanban。 不走 Hermes CLI,直接操作 SQLite,零依赖。""" import http.server import json import sqlite3 import urllib.parse import os from datetime import datetime KANBAN_DB = os.path.expanduser("~/.hermes/kanban.db") HOST = "0.0.0.0" PORT = 9580 def db(): return sqlite3.connect(KANBAN_DB) class KanbanAPI(http.server.BaseHTTPRequestHandler): def _json(self, data, status=200): self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode()) def _read_body(self): length = int(self.headers.get("Content-Length", 0)) return json.loads(self.rfile.read(length)) if length else {} def do_POST(self): path = urllib.parse.urlparse(self.path).path.rstrip("/") body = self._read_body() if path == "/kanban/create": title = body.get("title", "") assignee = body.get("assignee", "") body_text = body.get("body", "") if not title: return self._json({"error": "title required"}, 400) # Use hermes CLI to create the card import subprocess cmd = ["hermes", "kanban", "create", title] if assignee: cmd += ["--assignee", assignee] if body_text: cmd += ["--body", body_text] r = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if r.returncode != 0: return self._json({"error": r.stderr.strip()}, 500) # Parse output: "Created t_xxxxx (ready, assignee=xxx)" task_id = None for word in r.stdout.split(): if word.startswith("t_"): task_id = word.rstrip(",") break return self._json({"ok": True, "task_id": task_id, "raw": r.stdout.strip()}) elif path == "/kanban/create_xmpp": # 创建卡片并 XMPP 通知核心群 title = body.get("title", "") assignee = body.get("assignee", "") body_text = body.get("body", "") if not title: return self._json({"error": "title required"}, 400) import subprocess cmd = ["hermes", "kanban", "create", title] if assignee: cmd += ["--assignee", assignee] if body_text: cmd += ["--body", body_text] r = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if r.returncode != 0: return self._json({"error": r.stderr.strip()}, 500) task_id = None for word in r.stdout.split(): if word.startswith("t_"): task_id = word.rstrip(",") break # 发 XMPP 通知到核心群(走 mohe bot 的 5804 桥) xmpp_msg = f"[Kanban] 新任务 {task_id}: {title}" if assignee: xmpp_msg += f" → 指派给 {assignee}" if body_text: xmpp_msg += f"\n{body_text[:200]}" xmpp_msg += f"\n📌 评论: curl -X POST http://192.168.1.246:9580/kanban/comment -d '{{\"id\":\"{task_id}\",\"author\":\"你的名字\",\"body\":\"...\"}}'" try: import urllib.request as _ur _noti = json.dumps({ "to": "coregroup@conference.yoin.fun", "body": xmpp_msg }).encode() _req = _ur.Request("http://127.0.0.1:5804", data=_noti, method="POST") _req.add_header("Content-Type", "application/json") _ur.urlopen(_req, timeout=5) except Exception: pass return self._json({"ok": True, "task_id": task_id, "notified": "coregroup"}) elif path == "/kanban/list": status_filter = body.get("status") assignee_filter = body.get("assignee") d = db() query = "SELECT id, title, body, status, assignee, created_by, created_at FROM tasks" params = [] conditions = [] if status_filter: conditions.append("status = ?") params.append(status_filter) if assignee_filter: conditions.append("assignee = ?") params.append(assignee_filter) if conditions: query += " WHERE " + " AND ".join(conditions) query += " ORDER BY created_at DESC" rows = d.execute(query, params).fetchall() d.close() tasks = [] for r in rows: tasks.append({ "id": r[0], "title": r[1], "body": r[2], "status": r[3], "assignee": r[4], "created_by": r[5], "created_at": r[6] }) return self._json({"tasks": tasks}) elif path == "/kanban/comment": task_id = body.get("id") author = body.get("author", "unknown") text = body.get("body", "") if not task_id or not text: return self._json({"error": "id and body required"}, 400) d = db() d.execute( "INSERT INTO task_comments (task_id, author, body, created_at) VALUES (?, ?, ?, ?)", (task_id, author, text, int(__import__("time").time())) ) d.commit() d.close() # 通知核心群 try: _c_msg = f"[Kanban] {author} 评论了 {task_id}: {text[:100]}" _c_noti = json.dumps({"to": "coregroup@conference.yoin.fun", "body": _c_msg}).encode() _c_req = __import__("urllib.request").Request("http://127.0.0.1:5804", data=_c_noti, method="POST") _c_req.add_header("Content-Type", "application/json") __import__("urllib.request").urlopen(_c_req, timeout=5) except Exception: pass return self._json({"ok": True}) elif path == "/kanban/comments": task_id = body.get("id") if not task_id: return self._json({"error": "id required"}, 400) d = db() rows = d.execute( "SELECT id, author, body, created_at FROM task_comments WHERE task_id = ? ORDER BY created_at", (task_id,) ).fetchall() d.close() comments = [{"id": r[0], "author": r[1], "body": r[2], "created_at": r[3]} for r in rows] return self._json({"comments": comments}) elif path == "/kanban/update": task_id = body.get("id") if not task_id: return self._json({"error": "id required"}, 400) updates = {} for key in ("status", "assignee"): if key in body: updates[key] = body[key] if not updates: return self._json({"error": "nothing to update"}, 400) d = db() set_clause = ", ".join(f"{k} = ?" for k in updates) d.execute(f"UPDATE tasks SET {set_clause} WHERE id = ?", list(updates.values()) + [task_id]) d.commit() d.close() return self._json({"ok": True}) elif path == "/kanban/show": task_id = body.get("id") if not task_id: return self._json({"error": "id required"}, 400) d = db() row = d.execute( "SELECT id, title, body, status, assignee, created_by, created_at FROM tasks WHERE id = ?", (task_id,) ).fetchone() d.close() if not row: return self._json({"error": "not found"}, 404) return self._json({ "id": row[0], "title": row[1], "body": row[2], "status": row[3], "assignee": row[4], "created_by": row[5], "created_at": row[6] }) else: return self._json({"error": "not found"}, 404) def do_OPTIONS(self): self.send_response(204) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() def log_message(self, fmt, *args): print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}", flush=True) if __name__ == "__main__": server = http.server.HTTPServer((HOST, PORT), KanbanAPI) print(f"Kanban API running on http://{HOST}:{PORT}") server.serve_forever()