# -*- coding: utf-8 -*- """ dashboard.py - AgentsMeeting management dashboard backend ========================================================= Flask app on :5803. Monitors agents across platforms via: - SSH + ejabberdctl connected_users (cross-platform, authoritative) - xmpp_bot HTTP API :5802 (/health, /muc - fallback) - Local process/port checks (Windows only) Auto-recovery: restarts local Windows agents after 3 consecutive offline checks. """ import os, sys, re, json, time, subprocess, logging, urllib.request from pathlib import Path from datetime import datetime, timedelta from flask import Flask, jsonify, request, send_from_directory # ---- Paths ---- PROJECT_ROOT = Path("/home/hmo/agentsmeeting-venv") GATEWAY_ROOT = Path("/home/hmo/agentsmeeting-venv") CONFIG_DIR = Path("/home/hmo/agentsmeeting-venv/config") LOGS_DIR = Path("/home/hmo/agentsmeeting-venv/logs") TEMPLATES_DIR = Path("/home/hmo/agentsmeeting-venv/templates") sys.path.insert(0, str(GATEWAY_ROOT / "scripts")) from proc_guard import guard # ---- Flask ---- app = Flask(__name__, template_folder=str(TEMPLATES_DIR)) # ---- Logging ---- LOG_FILE = LOGS_DIR / "dashboard.log" LOG_FILE.parent.mkdir(parents=True, exist_ok=True) logging.basicConfig( filename=str(LOG_FILE), level=logging.INFO, format="%(asctime)s [dashboard] %(message)s", ) log = logging.getLogger("dashboard") # ---- Constants ---- AGENTS_YAML = CONFIG_DIR / "agents.yaml" SCRIPT_NAMES = { "xmpp_bot": "xmpp_bot.py", "wechat_bridge": "wechat_agent.py", "api_proxy": "api_proxy.py", "health_check": "health_check_xxm.py", "mohe_watcher": "mohe_watcher.py", "watchdog": "xmpp_watchdog.py", } PYTHON = "/home/hmo/agentsmeeting-venv/bin/python3" SCRIPTS_DIR = Path("/home/hmo/agentsmeeting-venv") XMPP_BRIDGE_URL = "http://192.168.1.16:5802" EJABBERD_HOST = "192.168.1.246" # Auto-recovery: restart after this many consecutive offline checks AUTO_RECOVER_THRESHOLD = 3 _offline_counter: dict[str, int] = {} # ============================================================ # Config # ============================================================ def load_agents_config(): if AGENTS_YAML.exists(): import yaml with open(AGENTS_YAML, "r", encoding="utf-8") as f: return yaml.safe_load(f).get("agents", []) return _default_agents() def _default_agents(): return [ { "id": "agent-001", "name": "R&D Assistant", "display_name": "xxm", "jid": "xxm@yoin.fun", "platform": "windows", "host": "192.168.1.16", "bot_type": "xmpp", "provider": "volcengine", "services": [{"type": "xmpp_bot", "port": 5802}], }, { "id": "agent-002", "name": "Automation Manager", "display_name": "mohe", "jid": "mohe@yoin.fun", "platform": "linux", "host": "192.168.1.246", "bot_type": "hermes", "provider": "ocg-new", "services": [{"type": "hermes_gateway", "port": 8642}, {"type": "xmpp_bot"}], }, { "id": "agent-003", "name": "Local Inference", "display_name": "xiaoguo", "jid": "xiaoguo@yoin.fun", "platform": "mac", "host": "192.168.1.122", "bot_type": "xmpp", "provider": "ocg-old", "services": [{"type": "xmpp_bot"}, {"type": "omlx_server", "port": 18003}], }, { "id": "agent-004", "name": "Position Analyst", "display_name": "zhiwei", "jid": "zhiwei@yoin.fun", "platform": "linux", "host": "192.168.1.246", "bot_type": "hermes", "provider": "ocg-old", "services": [{"type": "hermes_gateway", "port": 8643}, {"type": "xmpp_bot"}], }, ] # ============================================================ # Cross-platform monitoring (XMPP + SSH) # ============================================================ def _xmpp_health(): """Query xmpp_bot /health for XMPP connection and ejabberd status.""" try: req = urllib.request.Request(f"{XMPP_BRIDGE_URL}/health") with urllib.request.urlopen(req, timeout=5) as resp: return json.loads(resp.read()) except Exception as e: return {"ok": False, "error": str(e), "xmpp_connected": False, "ejabberd_alive": False} def _ejabberd_online_jids(): """SSH to Linux and run ejabberdctl connected_users. Returns set of bare JIDs currently connected to ejabberd. This is the authoritative cross-platform presence source.""" try: cmd = ["docker", "exec", "ejabberd", "ejabberdctl", "connected_users"] result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if result.returncode != 0: return set() jids = set() for line in result.stdout.strip().split("\n"): line = line.strip() if line and "@" in line: jids.add(line.split("/")[0]) return jids except Exception as e: log.debug(f"ejabberd SSH query failed: {e}") return set() def _muc_participants(): """Fallback: query xmpp_bot /muc for room participants. Currently unreliable due to MUC join timeout (R01).""" try: req = urllib.request.Request(f"{XMPP_BRIDGE_URL}/muc") with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read()) except Exception: return set() participants = set() for room_data in data.get("rooms", {}).values(): for p in room_data.get("participants", []): jid = p.get("jid", "") if jid: participants.add(jid) nick = p.get("nick", "") if nick and "@" in nick: participants.add(nick) return participants # ============================================================ # Local process detection (Windows only) # ============================================================ def _get_local_processes(): processes = [] try: result = subprocess.run(["ps", "aux"], capture_output=True, text=True, timeout=5) for line in result.stdout.split("\n"): if "xmpp_bot" in line or "wechat_agent" in line or "dashboard" in line: parts = line.split() if len(parts) >= 11: processes.append({"pid": int(parts[1]), "cmdline": " ".join(parts[10:])}) except Exception as e: log.error(f"Process scan failed: {e}") return processes # ============================================================ # Log helpers # ============================================================ def _tail_logs(max_lines=50): all_lines = [] log_files = sorted(LOGS_DIR.glob("*.log"), key=lambda p: p.stat().st_mtime, reverse=True) for lf in log_files: try: with open(lf, "r", encoding="utf-8", errors="replace") as f: f.seek(0, os.SEEK_END) size = f.tell() read_size = min(size, max_lines * 500) if read_size > 0: f.seek(max(0, size - read_size)) for line in f.read().strip().split("\n"): if line.strip(): all_lines.append(f"[{lf.name}] {line}") except Exception: pass return all_lines[-max_lines:] if len(all_lines) > max_lines else all_lines def _count_recent_messages(minutes=5): log_path = LOGS_DIR / "xmpp_bot.log" if not log_path.exists(): return 0 try: with open(log_path, "r", encoding="utf-8", errors="replace") as f: lines = f.readlines() cutoff = datetime.now() - timedelta(minutes=minutes) count = 0 pattern = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})") for line in reversed(lines): m = pattern.search(line) if m: try: if datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S") < cutoff: break count += 1 except ValueError: pass return count except Exception: return 0 # ============================================================ # Routes # ============================================================ @app.route("/") def index(): return send_from_directory(str(TEMPLATES_DIR), "dashboard.html") @app.route("/api/agents") def api_agents(): agents_config = load_agents_config() local_procs = _get_local_processes() message_count = _count_recent_messages(5) # Primary: SSH ejabberdctl for cross-platform presence online_jids = _ejabberd_online_jids() if not online_jids: online_jids = _muc_participants() # fallback result = [] for agent in agents_config: agent_id = agent["id"] jid = agent.get("jid", "") platform = agent.get("platform", "") host = agent.get("host", "") # --- Presence --- xmpp_in_ejabberd = jid in online_jids if online_jids else None # --- Local process (Windows only) --- local_pid = None xmpp_connected = False if platform == "windows" and host in ("192.168.1.16", "127.0.0.1", "localhost"): for proc in local_procs: if "xmpp_bot.py" in proc.get("cmdline", ""): local_pid = proc["pid"] break health = _xmpp_health() xmpp_connected = health.get("xmpp_connected", False) # --- Service status --- services = [] for svc in agent.get("services", []): svc_type = svc.get("type", "") svc_port = svc.get("port") if svc_type == "xmpp_bot": if platform == "windows" and local_pid: svc_status = "running" if xmpp_connected else "degraded" svc_pid = local_pid elif xmpp_in_ejabberd is True: svc_status = "running" svc_pid = None elif xmpp_in_ejabberd is False: svc_status = "stopped" svc_pid = None else: svc_status = "unknown" svc_pid = None elif svc_type in ("hermes_gateway", "omlx_server"): svc_status = "unknown" svc_pid = None else: svc_status = "stopped" svc_pid = None for proc in local_procs: script_name = SCRIPT_NAMES.get(svc_type, "") if script_name and script_name in proc.get("cmdline", ""): svc_status = "running" svc_pid = proc["pid"] break services.append({ "type": svc_type, "port": svc_port, "status": svc_status, "pid": svc_pid, }) # --- Overall status --- if xmpp_in_ejabberd is True: status = "online" elif xmpp_in_ejabberd is False: status = "offline" elif platform == "windows" and host in ("192.168.1.16", "127.0.0.1", "localhost"): if xmpp_connected: status = "online" elif local_pid: status = "degraded" else: status = "offline" else: status = "unknown" # --- Auto-recovery --- if status == "offline" and platform == "windows": _offline_counter[agent_id] = _offline_counter.get(agent_id, 0) + 1 if _offline_counter[agent_id] >= AUTO_RECOVER_THRESHOLD: log.warning(f"Auto-recovery: restarting {agent_id}") _try_auto_recover(agent) else: _offline_counter[agent_id] = 0 result.append({ "id": agent_id, "name": agent.get("name", ""), "display_name": agent.get("display_name", ""), "jid": jid, "platform": platform, "host": host, "status": status, "xmpp_connected": xmpp_connected, "pid": local_pid, "last_message": None, "message_count_5min": message_count, "errors": 0, "offline_checks": _offline_counter.get(agent_id, 0), "restartable": platform == "windows" and host in ("192.168.1.16", "127.0.0.1", "localhost"), "services": services, }) return jsonify(result) def _try_auto_recover(agent): agent_id = agent["id"] platform = agent.get("platform", "") host = agent.get("host", "") if platform != "windows" or host not in ("192.168.1.16", "127.0.0.1", "localhost"): return for svc in agent.get("services", []): script_name = SCRIPT_NAMES.get(svc.get("type", "")) if not script_name: continue script_path = SCRIPTS_DIR / script_name if not script_path.exists(): continue try: subprocess.Popen( [PYTHON, str(script_path)], cwd=str(SCRIPTS_DIR), creationflags=subprocess.CREATE_NO_WINDOW, ) log.info(f"Auto-restarted {script_name} for {agent_id}") except Exception as e: log.error(f"Auto-restart failed: {e}") @app.route("/api/ejabberd") def api_ejabberd(): health = _xmpp_health() online_jids = _ejabberd_online_jids() return jsonify({ "alive": len(online_jids) > 0, "xmpp_bot_connected": health.get("xmpp_connected", False), "online_jids": sorted(list(online_jids)) if online_jids else [], "bot_jid": health.get("bot_jid", ""), }) @app.route("/api/agents//logs") def api_agent_logs(agent_id): lines = request.args.get("lines", 50, type=int) return jsonify({"lines": _tail_logs(lines)}) @app.route("/api/agents//start", methods=["POST"]) def api_agent_start(agent_id): agents_config = load_agents_config() agent = next((a for a in agents_config if a["id"] == agent_id), None) if not agent: return jsonify({"ok": False, "error": "Agent not found"}), 404 if agent.get("platform") != "windows": return jsonify({"ok": False, "error": "Remote restart not supported yet"}), 400 started = [] for svc in agent.get("services", []): script_name = SCRIPT_NAMES.get(svc.get("type", "")) if not script_name: continue script_path = SCRIPTS_DIR / script_name if not script_path.exists(): continue try: subprocess.Popen( [PYTHON, str(script_path)], cwd=str(SCRIPTS_DIR), creationflags=subprocess.CREATE_NO_WINDOW, ) started.append(script_name) log.info(f"Started {script_name} for {agent_id}") except Exception as e: log.error(f"Failed to start {script_name}: {e}") _offline_counter[agent_id] = 0 return jsonify({"ok": True, "started": started}) @app.route("/api/agents//stop", methods=["POST"]) def api_agent_stop(agent_id): agents_config = load_agents_config() agent = next((a for a in agents_config if a["id"] == agent_id), None) if not agent: return jsonify({"ok": False, "error": "Agent not found"}), 404 if agent.get("platform") != "windows": return jsonify({"ok": False, "error": "Remote stop not supported yet"}), 400 processes = _get_local_processes() stopped = [] for svc in agent.get("services", []): script_name = SCRIPT_NAMES.get(svc.get("type", "")) if not script_name: continue for proc in processes: if script_name in proc.get("cmdline", ""): pid = proc["pid"] try: subprocess.run(["taskkill", "/f", "/pid", str(pid)], capture_output=True) stopped.append({"script": script_name, "pid": pid}) except Exception as e: log.error(f"Failed to stop {script_name}: {e}") return jsonify({"ok": True, "stopped": stopped}) @app.route("/api/agents//restart", methods=["POST"]) def api_agent_restart(agent_id): api_agent_stop(agent_id) time.sleep(2) return api_agent_start(agent_id) PLATFORM_SERVICES = [ {"id": "wechat_bridge", "name": "WeChat Bridge", "type": "ChannelBridge", "desc": "bridges WeChat to mohe's hermes gateway", "health_url": "http://192.168.1.16:5801/health"}, {"id": "api_proxy", "name": "API Proxy", "type": "APIRouter", "desc": "proxies volcengine API with retry/fallback", "host": "192.168.1.16", "port": 8787}, ] @app.route("/api/platform") def api_platform(): """Return platform services status by querying health endpoints.""" import urllib.request as _ur result = [] for ps in PLATFORM_SERVICES: status = "stopped" health_url = ps.get("health_url", "") if health_url: try: req = _ur.Request(health_url) _ur.urlopen(req, timeout=3) status = "running" except Exception: status = "stopped" elif "port" in ps: import socket try: s = socket.socket() s.settimeout(2) s.connect((ps.get("host", "127.0.0.1"), ps["port"])) s.close() status = "running" except Exception: status = "stopped" result.append({ "id": ps["id"], "name": ps["name"], "type": ps["type"], "desc": ps["desc"], "status": status, }) return jsonify(result) @app.route("/api/platform") @app.route("/api/health") def api_health(): xmpp = _xmpp_health() return jsonify({ "ok": True, "time": datetime.now().isoformat(), "xmpp_bot_alive": xmpp.get("xmpp_connected", False), "ejabberd_alive": xmpp.get("ejabberd_alive", False), }) # ============================================================ # Main # ============================================================ def main(): lock = guard("dashboard") if not lock.ok: log.error(lock.message) print(f"[dashboard] {lock.message}") sys.exit(1) port = int(os.environ.get("DASHBOARD_PORT", 5803)) log.info(f"Dashboard starting on :{port}") print(f"[dashboard] Starting on http://127.0.0.1:{port}") app.run(host="0.0.0.0", port=port, debug=False, use_reloader=False) if __name__ == "__main__": main()