Files
hmo 1b2b935832 Initial: multi-agent XMPP communication system with dashboard
- Platform-based architecture (Windows/Linux/Mac)
- Agent instance registry (agents.yaml)
- Management dashboard with cross-platform monitoring
- xmpp_bot with HTTP bridge + health endpoints
- wechat_agent with WeChat-Hermes bridging
- Platform services: ProcessGuardian, HealthProbe, APIRouter, ChannelBridge
- Deployment: systemd (Linux) + PowerShell (Windows)
- Monitoring: SSH+ejabberdctl for cross-platform presence
2026-06-12 21:51:36 +08:00

524 lines
18 KiB
Python

# -*- coding: utf-8 -*-
"""
dashboard.py - AgentsMeeting management dashboard backend
=========================================================
Flask app on :5803. Monitors agents across platforms via:
- SSH + ejabberdctl connected_users (cross-platform, authoritative)
- xmpp_bot HTTP API :5802 (/health, /muc - fallback)
- Local process/port checks (Windows only)
Auto-recovery: restarts local Windows agents after 3 consecutive offline checks.
"""
import os, sys, re, json, time, subprocess, logging, urllib.request
from pathlib import Path
from datetime import datetime, timedelta
from flask import Flask, jsonify, request, send_from_directory
# ---- Paths ----
PROJECT_ROOT = Path("/home/hmo/agentsmeeting-venv")
GATEWAY_ROOT = Path("/home/hmo/agentsmeeting-venv")
CONFIG_DIR = Path("/home/hmo/agentsmeeting-venv/config")
LOGS_DIR = Path("/home/hmo/agentsmeeting-venv/logs")
TEMPLATES_DIR = Path("/home/hmo/agentsmeeting-venv/templates")
sys.path.insert(0, str(GATEWAY_ROOT / "scripts"))
from proc_guard import guard
# ---- Flask ----
app = Flask(__name__, template_folder=str(TEMPLATES_DIR))
# ---- Logging ----
LOG_FILE = LOGS_DIR / "dashboard.log"
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.INFO,
format="%(asctime)s [dashboard] %(message)s",
)
log = logging.getLogger("dashboard")
# ---- Constants ----
AGENTS_YAML = CONFIG_DIR / "agents.yaml"
SCRIPT_NAMES = {
"xmpp_bot": "xmpp_bot.py",
"wechat_bridge": "wechat_agent.py",
"api_proxy": "api_proxy.py",
"health_check": "health_check_xxm.py",
"mohe_watcher": "mohe_watcher.py",
"watchdog": "xmpp_watchdog.py",
}
PYTHON = "/home/hmo/agentsmeeting-venv/bin/python3"
SCRIPTS_DIR = Path("/home/hmo/agentsmeeting-venv")
XMPP_BRIDGE_URL = "http://192.168.1.16:5802"
EJABBERD_HOST = "192.168.1.246"
# Auto-recovery: restart after this many consecutive offline checks
AUTO_RECOVER_THRESHOLD = 3
_offline_counter: dict[str, int] = {}
# ============================================================
# Config
# ============================================================
def load_agents_config():
if AGENTS_YAML.exists():
import yaml
with open(AGENTS_YAML, "r", encoding="utf-8") as f:
return yaml.safe_load(f).get("agents", [])
return _default_agents()
def _default_agents():
return [
{
"id": "agent-001", "name": "R&D Assistant", "display_name": "xxm",
"jid": "xxm@yoin.fun", "platform": "windows", "host": "192.168.1.16",
"bot_type": "xmpp", "provider": "volcengine",
"services": [{"type": "xmpp_bot", "port": 5802}],
},
{
"id": "agent-002", "name": "Automation Manager", "display_name": "mohe",
"jid": "mohe@yoin.fun", "platform": "linux", "host": "192.168.1.246",
"bot_type": "hermes", "provider": "ocg-new",
"services": [{"type": "hermes_gateway", "port": 8642}, {"type": "xmpp_bot"}],
},
{
"id": "agent-003", "name": "Local Inference", "display_name": "xiaoguo",
"jid": "xiaoguo@yoin.fun", "platform": "mac", "host": "192.168.1.122",
"bot_type": "xmpp", "provider": "ocg-old",
"services": [{"type": "xmpp_bot"}, {"type": "omlx_server", "port": 18003}],
},
{
"id": "agent-004", "name": "Position Analyst", "display_name": "zhiwei",
"jid": "zhiwei@yoin.fun", "platform": "linux", "host": "192.168.1.246",
"bot_type": "hermes", "provider": "ocg-old",
"services": [{"type": "hermes_gateway", "port": 8643}, {"type": "xmpp_bot"}],
},
]
# ============================================================
# Cross-platform monitoring (XMPP + SSH)
# ============================================================
def _xmpp_health():
"""Query xmpp_bot /health for XMPP connection and ejabberd status."""
try:
req = urllib.request.Request(f"{XMPP_BRIDGE_URL}/health")
with urllib.request.urlopen(req, timeout=5) as resp:
return json.loads(resp.read())
except Exception as e:
return {"ok": False, "error": str(e), "xmpp_connected": False, "ejabberd_alive": False}
def _ejabberd_online_jids():
"""SSH to Linux and run ejabberdctl connected_users.
Returns set of bare JIDs currently connected to ejabberd.
This is the authoritative cross-platform presence source."""
try:
cmd = ["docker", "exec", "ejabberd", "ejabberdctl", "connected_users"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode != 0:
return set()
jids = set()
for line in result.stdout.strip().split("\n"):
line = line.strip()
if line and "@" in line:
jids.add(line.split("/")[0])
return jids
except Exception as e:
log.debug(f"ejabberd SSH query failed: {e}")
return set()
def _muc_participants():
"""Fallback: query xmpp_bot /muc for room participants.
Currently unreliable due to MUC join timeout (R01)."""
try:
req = urllib.request.Request(f"{XMPP_BRIDGE_URL}/muc")
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read())
except Exception:
return set()
participants = set()
for room_data in data.get("rooms", {}).values():
for p in room_data.get("participants", []):
jid = p.get("jid", "")
if jid:
participants.add(jid)
nick = p.get("nick", "")
if nick and "@" in nick:
participants.add(nick)
return participants
# ============================================================
# Local process detection (Windows only)
# ============================================================
def _get_local_processes():
processes = []
try:
result = subprocess.run(["ps", "aux"], capture_output=True, text=True, timeout=5)
for line in result.stdout.split("\n"):
if "xmpp_bot" in line or "wechat_agent" in line or "dashboard" in line:
parts = line.split()
if len(parts) >= 11:
processes.append({"pid": int(parts[1]), "cmdline": " ".join(parts[10:])})
except Exception as e:
log.error(f"Process scan failed: {e}")
return processes
# ============================================================
# Log helpers
# ============================================================
def _tail_logs(max_lines=50):
all_lines = []
log_files = sorted(LOGS_DIR.glob("*.log"), key=lambda p: p.stat().st_mtime, reverse=True)
for lf in log_files:
try:
with open(lf, "r", encoding="utf-8", errors="replace") as f:
f.seek(0, os.SEEK_END)
size = f.tell()
read_size = min(size, max_lines * 500)
if read_size > 0:
f.seek(max(0, size - read_size))
for line in f.read().strip().split("\n"):
if line.strip():
all_lines.append(f"[{lf.name}] {line}")
except Exception:
pass
return all_lines[-max_lines:] if len(all_lines) > max_lines else all_lines
def _count_recent_messages(minutes=5):
log_path = LOGS_DIR / "xmpp_bot.log"
if not log_path.exists():
return 0
try:
with open(log_path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
cutoff = datetime.now() - timedelta(minutes=minutes)
count = 0
pattern = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})")
for line in reversed(lines):
m = pattern.search(line)
if m:
try:
if datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S") < cutoff:
break
count += 1
except ValueError:
pass
return count
except Exception:
return 0
# ============================================================
# Routes
# ============================================================
@app.route("/")
def index():
return send_from_directory(str(TEMPLATES_DIR), "dashboard.html")
@app.route("/api/agents")
def api_agents():
agents_config = load_agents_config()
local_procs = _get_local_processes()
message_count = _count_recent_messages(5)
# Primary: SSH ejabberdctl for cross-platform presence
online_jids = _ejabberd_online_jids()
if not online_jids:
online_jids = _muc_participants() # fallback
result = []
for agent in agents_config:
agent_id = agent["id"]
jid = agent.get("jid", "")
platform = agent.get("platform", "")
host = agent.get("host", "")
# --- Presence ---
xmpp_in_ejabberd = jid in online_jids if online_jids else None
# --- Local process (Windows only) ---
local_pid = None
xmpp_connected = False
if platform == "windows" and host in ("192.168.1.16", "127.0.0.1", "localhost"):
for proc in local_procs:
if "xmpp_bot.py" in proc.get("cmdline", ""):
local_pid = proc["pid"]
break
health = _xmpp_health()
xmpp_connected = health.get("xmpp_connected", False)
# --- Service status ---
services = []
for svc in agent.get("services", []):
svc_type = svc.get("type", "")
svc_port = svc.get("port")
if svc_type == "xmpp_bot":
if platform == "windows" and local_pid:
svc_status = "running" if xmpp_connected else "degraded"
svc_pid = local_pid
elif xmpp_in_ejabberd is True:
svc_status = "running"
svc_pid = None
elif xmpp_in_ejabberd is False:
svc_status = "stopped"
svc_pid = None
else:
svc_status = "unknown"
svc_pid = None
elif svc_type in ("hermes_gateway", "omlx_server"):
svc_status = "unknown"
svc_pid = None
else:
svc_status = "stopped"
svc_pid = None
for proc in local_procs:
script_name = SCRIPT_NAMES.get(svc_type, "")
if script_name and script_name in proc.get("cmdline", ""):
svc_status = "running"
svc_pid = proc["pid"]
break
services.append({
"type": svc_type,
"port": svc_port,
"status": svc_status,
"pid": svc_pid,
})
# --- Overall status ---
if xmpp_in_ejabberd is True:
status = "online"
elif xmpp_in_ejabberd is False:
status = "offline"
elif platform == "windows" and host in ("192.168.1.16", "127.0.0.1", "localhost"):
if xmpp_connected:
status = "online"
elif local_pid:
status = "degraded"
else:
status = "offline"
else:
status = "unknown"
# --- Auto-recovery ---
if status == "offline" and platform == "windows":
_offline_counter[agent_id] = _offline_counter.get(agent_id, 0) + 1
if _offline_counter[agent_id] >= AUTO_RECOVER_THRESHOLD:
log.warning(f"Auto-recovery: restarting {agent_id}")
_try_auto_recover(agent)
else:
_offline_counter[agent_id] = 0
result.append({
"id": agent_id,
"name": agent.get("name", ""),
"display_name": agent.get("display_name", ""),
"jid": jid,
"platform": platform,
"host": host,
"status": status,
"xmpp_connected": xmpp_connected,
"pid": local_pid,
"last_message": None,
"message_count_5min": message_count,
"errors": 0,
"offline_checks": _offline_counter.get(agent_id, 0),
"restartable": platform == "windows" and host in ("192.168.1.16", "127.0.0.1", "localhost"),
"services": services,
})
return jsonify(result)
def _try_auto_recover(agent):
agent_id = agent["id"]
platform = agent.get("platform", "")
host = agent.get("host", "")
if platform != "windows" or host not in ("192.168.1.16", "127.0.0.1", "localhost"):
return
for svc in agent.get("services", []):
script_name = SCRIPT_NAMES.get(svc.get("type", ""))
if not script_name:
continue
script_path = SCRIPTS_DIR / script_name
if not script_path.exists():
continue
try:
subprocess.Popen(
[PYTHON, str(script_path)],
cwd=str(SCRIPTS_DIR),
creationflags=subprocess.CREATE_NO_WINDOW,
)
log.info(f"Auto-restarted {script_name} for {agent_id}")
except Exception as e:
log.error(f"Auto-restart failed: {e}")
@app.route("/api/ejabberd")
def api_ejabberd():
health = _xmpp_health()
online_jids = _ejabberd_online_jids()
return jsonify({
"alive": len(online_jids) > 0,
"xmpp_bot_connected": health.get("xmpp_connected", False),
"online_jids": sorted(list(online_jids)) if online_jids else [],
"bot_jid": health.get("bot_jid", ""),
})
@app.route("/api/agents/<agent_id>/logs")
def api_agent_logs(agent_id):
lines = request.args.get("lines", 50, type=int)
return jsonify({"lines": _tail_logs(lines)})
@app.route("/api/agents/<agent_id>/start", methods=["POST"])
def api_agent_start(agent_id):
agents_config = load_agents_config()
agent = next((a for a in agents_config if a["id"] == agent_id), None)
if not agent:
return jsonify({"ok": False, "error": "Agent not found"}), 404
if agent.get("platform") != "windows":
return jsonify({"ok": False, "error": "Remote restart not supported yet"}), 400
started = []
for svc in agent.get("services", []):
script_name = SCRIPT_NAMES.get(svc.get("type", ""))
if not script_name:
continue
script_path = SCRIPTS_DIR / script_name
if not script_path.exists():
continue
try:
subprocess.Popen(
[PYTHON, str(script_path)],
cwd=str(SCRIPTS_DIR),
creationflags=subprocess.CREATE_NO_WINDOW,
)
started.append(script_name)
log.info(f"Started {script_name} for {agent_id}")
except Exception as e:
log.error(f"Failed to start {script_name}: {e}")
_offline_counter[agent_id] = 0
return jsonify({"ok": True, "started": started})
@app.route("/api/agents/<agent_id>/stop", methods=["POST"])
def api_agent_stop(agent_id):
agents_config = load_agents_config()
agent = next((a for a in agents_config if a["id"] == agent_id), None)
if not agent:
return jsonify({"ok": False, "error": "Agent not found"}), 404
if agent.get("platform") != "windows":
return jsonify({"ok": False, "error": "Remote stop not supported yet"}), 400
processes = _get_local_processes()
stopped = []
for svc in agent.get("services", []):
script_name = SCRIPT_NAMES.get(svc.get("type", ""))
if not script_name:
continue
for proc in processes:
if script_name in proc.get("cmdline", ""):
pid = proc["pid"]
try:
subprocess.run(["taskkill", "/f", "/pid", str(pid)], capture_output=True)
stopped.append({"script": script_name, "pid": pid})
except Exception as e:
log.error(f"Failed to stop {script_name}: {e}")
return jsonify({"ok": True, "stopped": stopped})
@app.route("/api/agents/<agent_id>/restart", methods=["POST"])
def api_agent_restart(agent_id):
api_agent_stop(agent_id)
time.sleep(2)
return api_agent_start(agent_id)
PLATFORM_SERVICES = [
{"id": "wechat_bridge", "name": "WeChat Bridge", "type": "ChannelBridge",
"desc": "bridges WeChat to mohe's hermes gateway",
"health_url": "http://192.168.1.16:5801/health"},
{"id": "api_proxy", "name": "API Proxy", "type": "APIRouter",
"desc": "proxies volcengine API with retry/fallback",
"host": "192.168.1.16", "port": 8787},
]
@app.route("/api/platform")
def api_platform():
"""Return platform services status by querying health endpoints."""
import urllib.request as _ur
result = []
for ps in PLATFORM_SERVICES:
status = "stopped"
health_url = ps.get("health_url", "")
if health_url:
try:
req = _ur.Request(health_url)
_ur.urlopen(req, timeout=3)
status = "running"
except Exception:
status = "stopped"
elif "port" in ps:
import socket
try:
s = socket.socket()
s.settimeout(2)
s.connect((ps.get("host", "127.0.0.1"), ps["port"]))
s.close()
status = "running"
except Exception:
status = "stopped"
result.append({
"id": ps["id"],
"name": ps["name"],
"type": ps["type"],
"desc": ps["desc"],
"status": status,
})
return jsonify(result)
@app.route("/api/platform")
@app.route("/api/health")
def api_health():
xmpp = _xmpp_health()
return jsonify({
"ok": True,
"time": datetime.now().isoformat(),
"xmpp_bot_alive": xmpp.get("xmpp_connected", False),
"ejabberd_alive": xmpp.get("ejabberd_alive", False),
})
# ============================================================
# Main
# ============================================================
def main():
lock = guard("dashboard")
if not lock.ok:
log.error(lock.message)
print(f"[dashboard] {lock.message}")
sys.exit(1)
port = int(os.environ.get("DASHBOARD_PORT", 5803))
log.info(f"Dashboard starting on :{port}")
print(f"[dashboard] Starting on http://127.0.0.1:{port}")
app.run(host="0.0.0.0", port=port, debug=False, use_reloader=False)
if __name__ == "__main__":
main()