Files
AgentsMeeting/gateway/scripts/chat_bridge.py
T
hmo 1b2b935832 Initial: multi-agent XMPP communication system with dashboard
- Platform-based architecture (Windows/Linux/Mac)
- Agent instance registry (agents.yaml)
- Management dashboard with cross-platform monitoring
- xmpp_bot with HTTP bridge + health endpoints
- wechat_agent with WeChat-Hermes bridging
- Platform services: ProcessGuardian, HealthProbe, APIRouter, ChannelBridge
- Deployment: systemd (Linux) + PowerShell (Windows)
- Monitoring: SSH+ejabberdctl for cross-platform presence
2026-06-12 21:51:36 +08:00

619 lines
25 KiB
Python

"""
Chat Bridge — direct HTTP API calls with model fallback + session persistence.
Messages are dual-written:
- `.bridge_context.jsonl` for immediate context injection
- `opencode.db` (serve session) so session_search works for old messages
Context window: last 200 messages from session (hard limit, no compression).
Beyond 200: use session_search (##list_sessions## / ##switch_session##).
"""
import os, json, time, logging, sqlite3
from datetime import datetime, timezone, timedelta
from session_router import extract_session_context
os.environ["no_proxy"] = "*"
os.environ["NO_PROXY"] = "*"
import requests
_TZ = timezone(timedelta(hours=8))
# ── Logging ──
_LOG_FILE = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"logs", "bridge.log")
os.makedirs(os.path.dirname(_LOG_FILE), exist_ok=True)
_logger = logging.getLogger("chat_bridge")
_handler = logging.FileHandler(_LOG_FILE, encoding="utf-8")
_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
_logger.addHandler(_handler)
_logger.setLevel(logging.INFO)
# ── Provider configs from opencode config.json ──
_CONFIG_PATH = os.path.join(os.environ.get("USERPROFILE", "C:\\Users\\hmo"),
".config", "opencode", "config.json")
def _load_providers() -> dict:
try:
with open(_CONFIG_PATH, "r", encoding="utf-8") as f:
return json.load(f).get("provider", {})
except Exception as e:
_logger.error("Failed to load provider config: %s", e)
return {}
_PROVIDERS = _load_providers()
# ── Build provider chain dynamically from config ──
# Don't hardcode which providers have quota — try everything configured.
# Each provider's model name comes from its config (options.model),
# falling back to a sensible default based on provider key.
_DEFAULT_MODELS = {
"volcengine": "deepseek-v4-flash",
"opencode-go": "deepseek-v4-flash",
"opencode-go-new": "deepseek-v4-flash",
"deepseek": "deepseek-v4-flash",
"sense-nova": "nova-4",
}
def _build_chain() -> list[tuple[str, str, str]]:
"""Build (provider_key, base_url, model_name) in priority order.
优先用 volcengine(额度/免费)→ opencode-go-new(订阅)→ opencode-go(备用)。
deepseek(直连)作为最后兜底,额度不够时启用。
"""
allowed = ["volcengine", "opencode-go", "opencode-go-new"]
chain = []
for key in allowed:
prov = _PROVIDERS.get(key)
if not prov:
continue
opts = prov.get("options", {})
base = opts.get("baseURL", "")
api_key = opts.get("apiKey", "")
if not base or not api_key:
continue
model = opts.get("model") or _DEFAULT_MODELS.get(key, "deepseek-v4-flash")
chain.append((key, base, model))
return chain
DEFAULT_TIMEOUT = 60 # per model, in seconds
LOCK_DURATION = 300 # reuse good provider for 5 min
FAILED_BACKOFF = 1800 # skip failed provider for 30 min
_last_good_provider: str | None = None
_last_good_time: float = 0.0
_failed_providers: dict[str, float] = {}
_CACHE_FILE = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"temp", ".model_cache.json")
def _load_cache():
global _last_good_provider, _last_good_time
try:
with open(_CACHE_FILE, "r") as f:
d = json.load(f)
_last_good_provider = d.get("provider")
_last_good_time = d.get("time", 0.0)
except (FileNotFoundError, json.JSONDecodeError, ValueError):
pass
def _save_cache():
d = {"provider": _last_good_provider, "time": _last_good_time}
try:
os.makedirs(os.path.dirname(_CACHE_FILE), exist_ok=True)
with open(_CACHE_FILE, "w") as f:
json.dump(d, f)
except Exception:
pass
def _cache_model(provider_key: str):
global _last_good_provider, _last_good_time
_last_good_provider = provider_key
_last_good_time = time.time()
_save_cache()
def _get_providers_to_try() -> list[tuple[str, str, str]]:
"""
Returns [(provider_key, base_url, model_name), ...] to try.
"""
global _last_good_provider, _last_good_time, _failed_providers
now = time.time()
_failed_providers = {p: t for p, t in _failed_providers.items() if now < t}
chain = _build_chain()
# Lock active — reuse last good provider
if _last_good_provider and (now - _last_good_time) < LOCK_DURATION:
for key, base, model in chain:
if key == _last_good_provider:
return [(key, base, model)]
# Build available list
available = []
for key, base, model in chain:
if key in _failed_providers:
continue
available.append((key, base, model))
if not available and _last_good_provider:
for key, base, model in chain:
if key == _last_good_provider:
available.append((key, base, model))
break
return available
_load_cache()
# ── Tool definitions (function calling) ──
_TOOLS = [
{
"type": "function",
"function": {
"name": "run_command",
"description": "执行一条 shell 命令。可以 SSH 到远程服务器(如 root@47.115.32.206)。",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "要执行的 shell 命令"
}
},
"required": ["command"]
}
}
},
{
"type": "function",
"function": {
"name": "session_search",
"description": "搜索其他 session 的历史对话内容。默认查当前 session(xxm 自己的 session),也可查指定 session(如 TUI 工作台 ses_1d95d15c4ffehQaZ6hrbIbak5k)。返回最近 N 条消息(带时间戳和来源标记),按时间正序排列。",
"parameters": {
"type": "object",
"properties": {
"session_id": {
"type": "string",
"description": "要查询的 session ID。不传或传空字符串则查 TUI session"
},
"limit": {
"type": "integer",
"description": "返回最近多少条消息,默认 20,最大 100"
}
}
}
}
}
]
_MAX_TOOL_LOOPS = 30 # 超限后走 clean final force,不再泄漏 XML
def _run_tool_command(cmd: str) -> str:
"""Execute a shell command and return output."""
import subprocess as _sp
try:
r = _sp.run(cmd, shell=True, capture_output=True, timeout=60,
text=True, encoding='utf-8', errors='replace')
out = (r.stdout or "") + (r.stderr or "")
return out.strip() or f"(exit code {r.returncode}, no output)"
except _sp.TimeoutExpired:
return "(命令超时)"
except Exception as e:
return f"(执行失败: {e})"
# ── Serve session DB path ──
_SERVE_DB = os.path.join(
os.environ.get("USERPROFILE", "C:\\Users\\hmo"),
".local", "share", "opencode", "opencode.db")
class SessionBridge:
"""
Send message to LLM via direct HTTP API call.
Injects recent conversation context for continuity.
Context comes from the serve session (opencode.db, last 200 msgs),
with .bridge_context.jsonl as fallback.
Messages are written back to the session so session_search works.
"""
def __init__(self, session_id: str = "", serve_url: str = "",
temp_dir: str = "", timeout: int = DEFAULT_TIMEOUT):
self.session_id = session_id
self.timeout = timeout
self.temp_dir = temp_dir or os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "temp")
os.makedirs(self.temp_dir, exist_ok=True)
# ── Conversation log (fallback / debug) ──
self._ctx_log = os.path.join(self.temp_dir, ".bridge_context.jsonl")
self._ctx_max = 200
# ── Context management ─────────────────────────────────
def _read_recent_context(self) -> str:
"""Read last N exchanges, newest → oldest (top = most recent).
Priority:
1. Session (opencode.db) via extract_session_context() — has timestamps
2. Fallback: .bridge_context.jsonl — also with timestamps
Each line is prefixed with [MM-DD HH:MM] so LLM can judge recency.
"""
# Priority 1: session
if self.session_id:
try:
from session_router import extract_session_context
ctx = extract_session_context(self.session_id, limit=self._ctx_max)
if ctx:
return ctx
except Exception:
pass
# Priority 2: .bridge_context.jsonl fallback
try:
if not os.path.exists(self._ctx_log):
return ""
with open(self._ctx_log, "r", encoding="utf-8") as f:
raw = f.readlines()
recent = raw[-self._ctx_max:]
parts = []
for line in recent:
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
ts = entry.get("ts", 0)
role = entry.get("role", "?")
content = str(entry.get("content", ""))
source = entry.get("source", "bridge")
role_label = "用户" if role == "user" else "小小莫"
src_tag = {"xmpp": "[群聊] ", "vc": "[VC] ", "tui": "[TUI] ", "bridge": "[桥接] "}.get(source, f"[{source}] ")
ts_str = ""
if ts:
ts_str = datetime.fromtimestamp(ts, tz=_TZ).strftime("%m-%d %H:%M")
line_str = f"{ts_str} {src_tag}{role_label}: {content}" if ts_str else f"{src_tag}{role_label}: {content}"
parts.append(line_str)
except (json.JSONDecodeError, ValueError):
continue
return "\n".join(parts)
except Exception:
return ""
def _append_to_log(self, role: str, content: str, source: str = "bridge"):
try:
entry = json.dumps(
{"ts": int(time.time()), "role": role, "content": content, "source": source},
ensure_ascii=False)
with open(self._ctx_log, "a", encoding="utf-8") as f:
f.write(entry + "\n")
self._trim_log()
except Exception:
pass
def _trim_log(self):
try:
with open(self._ctx_log, "r", encoding="utf-8") as f:
lines = f.readlines()
if len(lines) > self._ctx_max * 2:
with open(self._ctx_log, "w", encoding="utf-8") as f:
f.writelines(lines[-self._ctx_max:])
except Exception:
pass
def _append_to_session(self, role: str, content: str, source: str = "bridge",
model_info: dict | None = None):
"""Write a message to the serve session (opencode.db).
Fields match the opencode session message schema (v1.17+):
- mode: 'user' (user) / 'Sisyphus - Ultraworker' (assistant)
- tokens / cost: always present so UI doesn't crash on null
- model: only for assistant messages
- finish: 'stop' for assistant messages
source distinguishes bridge-injected (xmpp/vc) vs native TUI messages.
"""
import uuid as _uuid
if not self.session_id:
return
try:
now_ms = int(time.time() * 1000)
msg_id = "msg_" + _uuid.uuid4().hex[:24]
part_id = "prt_" + _uuid.uuid4().hex[:24]
default_tokens = {"input": 0, "output": 0}
default_cost = {"input": 0, "output": 0}
if role == "user":
data = {
"role": "user", "source": source, "mode": "user",
"tokens": default_tokens, "cost": default_cost,
}
else:
data = {
"role": "assistant", "source": source, "mode": "Sisyphus - Ultraworker",
"tokens": default_tokens, "cost": default_cost,
"finish": "stop",
}
if model_info:
data["model"] = model_info
msg_data = json.dumps(data, ensure_ascii=False)
part_data = json.dumps({"type": "text", "text": content}, ensure_ascii=False)
conn = sqlite3.connect(_SERVE_DB)
conn.execute(
"INSERT INTO message (id, session_id, data, time_created, time_updated) VALUES (?, ?, ?, ?, ?)",
(msg_id, self.session_id, msg_data, now_ms, now_ms),
)
conn.execute(
"INSERT INTO part (id, message_id, session_id, data, time_created, time_updated) VALUES (?, ?, ?, ?, ?, ?)",
(part_id, msg_id, self.session_id, part_data, now_ms, now_ms),
)
conn.commit()
conn.close()
_logger.debug("append_to_session: %s → session %s (%d chars)",
role, self.session_id[:20], len(content))
except Exception as e:
_logger.warning("append_to_session failed: %s", e)
# ── Direct API call ──────────────────────────────────
def _call_api(self, provider_key: str, base_url: str, model: str,
messages: list, timeout: int) -> str | None:
"""
Send messages to LLM via function calling API (using `requests`).
Handles tool_calls loop internally.
Returns final text response after all tool calls resolved.
Timeout: connect=10s, read=timeout (ensures no infinite hang)
"""
prov = _PROVIDERS.get(provider_key)
if not prov:
_logger.error("Provider %s not found in config", provider_key)
return None
api_key = prov.get("options", {}).get("apiKey", "")
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
})
# Bypass system proxy (v2rayN :15000) — proxy causes permanent hang with volcengine API
session.trust_env = False
session.proxies = {"http": None, "https": None}
for loop in range(_MAX_TOOL_LOOPS):
url = f"{base_url.rstrip('/')}/chat/completions"
payload = {
"model": model,
"messages": messages,
"tools": _TOOLS,
}
t0 = time.time()
try:
resp = session.post(url, json=payload, timeout=(10, timeout))
resp.raise_for_status()
body = resp.json()
except requests.exceptions.Timeout:
_logger.warning("API %s/%s timeout (%ds) at loop %d",
provider_key, model, timeout, loop)
return None
except requests.exceptions.HTTPError as e:
err_body = ""
try:
err_body = e.response.text[:300]
except Exception:
pass
code = e.response.status_code if e.response is not None else 0
_logger.warning("API %s/%s HTTP %d: %s",
provider_key, model, code, err_body)
return None
except requests.exceptions.RequestException as e:
_logger.warning("API %s/%s request failed: %s",
provider_key, model, e)
return None
msg = body.get("choices", [{}])[0].get("message", {})
content = msg.get("content", "")
tool_calls = msg.get("tool_calls")
# No tool calls → final answer
if not tool_calls:
if content and content.strip():
elapsed = time.time() - t0
_logger.info("API %s/%s OK (%.1fs, loop %d)",
provider_key, model, elapsed, loop)
return content.strip()
# Empty content with no tool calls → something wrong
if loop == 0:
return None
return ""
# Has tool calls → execute them
messages.append({"role": "assistant", "content": content, "tool_calls": tool_calls})
for tc in tool_calls:
if tc.get("type") != "function":
continue
fn = tc.get("function", {})
fn_name = fn.get("name", "")
fn_args_str = fn.get("arguments", "{}")
tool_call_id = tc.get("id", "")
if fn_name == "run_command":
try:
fn_args = json.loads(fn_args_str)
cmd = fn_args.get("command", "")
except (json.JSONDecodeError, ValueError):
cmd = ""
output = _run_tool_command(cmd) if cmd else "(no command)"
_logger.info(" tool: run_command → %s (%d chars)", cmd[:80], len(output))
elif fn_name == "session_search":
try:
fn_args = json.loads(fn_args_str)
sid = fn_args.get("session_id", "") or self.session_id
limit = min(int(fn_args.get("limit", 20)), 100)
except (json.JSONDecodeError, ValueError, TypeError):
sid = self.session_id
limit = 20
ctx = extract_session_context(sid, limit=limit)
output = ctx if ctx else f"(session {sid}: no messages)"
_logger.info(" tool: session_search → %s (%d chars)", sid[:32], len(output))
else:
output = f"(unknown tool: {fn_name})"
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"content": output[:2000], # trim to avoid context overflow
})
_logger.warning("API %s/%s: max tool loops (%d) reached, forcing final answer",
provider_key, model, _MAX_TOOL_LOOPS)
# 循环上限到了,重发一次不带工具的 API。
# 关键:滤掉所有工具调用脏记录,只留干净的 system + user 节,否则 LLM
# 看到上下文里的 tool_calls 格式会跟着输出 XML 到群里。
try:
clean_msgs = [m for m in messages
if not m.get("tool_calls") and m.get("role") != "tool"]
final_url = f"{base_url.rstrip('/')}/chat/completions"
final_payload = {
"model": model,
"messages": clean_msgs,
}
final_resp = session.post(final_url, json=final_payload, timeout=(10, timeout))
final_resp.raise_for_status()
final_body = final_resp.json()
final_msg = final_body.get("choices", [{}])[0].get("message", {})
final_content = final_msg.get("content", "")
if final_content and final_content.strip():
_logger.info("API %s/%s final force OK (clean, %d msgs)",
provider_key, model, len(clean_msgs))
return final_content.strip()
except Exception as e:
_logger.warning("API %s/%s final force failed: %s", provider_key, model, e)
return None
# ── Clean message extraction ──────────────────────────
@staticmethod
def _extract_user_message(full_prompt: str) -> str:
"""Extract the actual user message from the SessionRouter's full prompt.
SessionRouter 的 prompt 格式:
[session: xxx]
[可用命令] ...
---
[群聊/coregroup] hmo: actual message
我们只存 "---" 后面的部分,不存 session 上下文。
"""
idx = full_prompt.rfind("\n---\n")
if idx >= 0:
rest = full_prompt[idx + 5:].strip()
if rest:
return rest
# Fallback: just use last 200 chars
return full_prompt[-200:].strip()
# ── Public API ───────────────────────────────────────
def send_raw(self, message: str) -> str | None:
"""Send message to LLM via function calling API."""
providers = _get_providers_to_try()
if not providers:
_logger.error("No available providers")
return None
_logger.info("send_raw: trying %d provider(s): %s",
len(providers), [(k, m) for k, _, m in providers])
# Build system prompt
sys_prompt = (
"你是 xxm(小小莫),老莫的 AI 助手。\n"
"你不是 Sisyphus,不是莫荷,不是莫小果。你是 xxm。\n"
"老莫让你做事就做,不要推脱,不要反复确认。\n"
"回复简洁,不用 emoji。\n"
"用 run_command 工具获取信息。\n"
"写文件的正确方式:用 Python 一次性写完所有内容,不要分多次调用。\n"
"错误示例(会覆盖,每调用一次就清空一次):python -c \"open('file', 'w').write('一行')\"\n"
"正确做法:把全部内容拼在一个 python -c 调用里写完。\n"
"\n"
"=== 上下文说明 ===\n"
"下面是最近 200 条对话历史,按时间正序排列(最上面是最旧的消息,最下面是最新的消息)。\n"
"每条消息前有 [MM-DD HH:MM] 时间戳,以及来源标记:\n"
" · [TUI] = 你和我(老莫)在 AI 工作台里的对话\n"
" · [群聊] = 微信群聊天记录\n"
"你可以根据时间判断消息的新旧程度。\n"
"凡是时间较早的消息(比如 30 分钟前、1 小时前),说明已经是过去的话题,\n"
"不要把它们当作当前正在发生的事情来讨论。重点关注最后几条消息,那才是最当前的。\n"
"超过 200 条的旧对话不在当前上下文中。\n"
"如果你需要查其他 session 里的内容(比如 TUI 工作台里老莫讨论过的方案),\n"
"可以用 session_search 工具搜索指定 session 的历史消息。\n"
"\n"
"=== 群聊沉默协议 ===\n"
"群里的消息你都会看到。判断是否回应:\n"
" · 老莫 @你 / 点名你 / 催你 → 正常回复\n"
" · 别人(小荷/小果/其他人)的对话跟你无关 → 保持沉默\n"
" · 有人问问题且你能帮上忙 → 可以主动回复\n"
"\n"
"保持沉默:回复开头写 __SILENT__。系统检测到就不会发出去。\n"
"想沉默 → __SILENT__ 开头。想说话 → 直接写回复。"
)
recent_ctx = self._read_recent_context()
if recent_ctx:
sys_prompt += f"\n\n最近对话:\n{recent_ctx}"
# Build messages array
messages = [
{"role": "system", "content": sys_prompt},
{"role": "user", "content": message},
]
# Extract clean message for context storage
clean_msg = self._extract_user_message(message)
for key, base, model in providers:
reply = self._call_api(key, base, model, messages, self.timeout)
if reply:
_cache_model(key)
_logger.info("send_raw: success via %s/%s", key, model)
model_info = {"modelID": model, "providerID": key}
self._append_to_log("user", clean_msg, "xmpp")
self._append_to_log("assistant", reply, "xmpp")
self._append_to_session("user", clean_msg, "xmpp")
self._append_to_session("assistant", reply, "xmpp", model_info)
return reply
# All providers failed — retry once after 3s for transient failures
_logger.warning("send_raw: ALL failed, retrying once after 3s...")
time.sleep(3)
for key, base, model in providers:
reply = self._call_api(key, base, model, messages, self.timeout)
if reply:
_cache_model(key)
_logger.info("send_raw: retry OK via %s/%s", key, model)
model_info = {"modelID": model, "providerID": key}
self._append_to_log("user", clean_msg, "xmpp")
self._append_to_log("assistant", reply, "xmpp")
self._append_to_session("user", clean_msg, "xmpp")
self._append_to_session("assistant", reply, "xmpp", model_info)
return reply
_logger.error("send_raw: ALL providers failed (incl. retry)")
return None
def send(self, message: str) -> str | None:
"""Alias for send_raw."""
return self.send_raw(message)