04db423416
- 70 skills with code and documentation - Add .gitignore (ignore __pycache__, output/, temp/, venv/) - Clean up test intermediates and caches
162 lines
4.7 KiB
Python
162 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MCP 测试服务器 - 简单的 Echo + 计算器
|
|
通过 stdio 通信
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
from datetime import datetime
|
|
|
|
|
|
def send_response(id: str, result: dict):
|
|
"""发送 JSON-RPC 响应"""
|
|
response = {
|
|
"jsonrpc": "2.0",
|
|
"id": id,
|
|
"result": result
|
|
}
|
|
msg = json.dumps(response)
|
|
sys.stdout.write(f"Content-Length: {len(msg)}\r\n\r\n{msg}")
|
|
sys.stdout.flush()
|
|
|
|
|
|
def send_error(id: str, code: int, message: str):
|
|
"""发送错误响应"""
|
|
response = {
|
|
"jsonrpc": "2.0",
|
|
"id": id,
|
|
"error": {"code": code, "message": message}
|
|
}
|
|
msg = json.dumps(response)
|
|
sys.stdout.write(f"Content-Length: {len(msg)}\r\n\r\n{msg}")
|
|
sys.stdout.flush()
|
|
|
|
|
|
def handle_request(request: dict):
|
|
"""处理请求"""
|
|
method = request.get("method", "")
|
|
params = request.get("params", {})
|
|
req_id = request.get("id", "0")
|
|
|
|
if method == "initialize":
|
|
send_response(req_id, {
|
|
"protocolVersion": "2024-11-05",
|
|
"capabilities": {
|
|
"tools": {"listChanged": True}
|
|
},
|
|
"serverInfo": {
|
|
"name": "test-mcp-server",
|
|
"version": "1.0.0"
|
|
}
|
|
})
|
|
|
|
elif method == "notifications/initialized":
|
|
pass
|
|
|
|
elif method == "tools/list":
|
|
send_response(req_id, {
|
|
"tools": [
|
|
{
|
|
"name": "echo",
|
|
"description": "返回输入的消息",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"message": {"type": "string", "description": "要返回的消息"}
|
|
},
|
|
"required": ["message"]
|
|
}
|
|
},
|
|
{
|
|
"name": "add",
|
|
"description": "两数相加",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"a": {"type": "number"},
|
|
"b": {"type": "number"}
|
|
},
|
|
"required": ["a", "b"]
|
|
}
|
|
},
|
|
{
|
|
"name": "get_time",
|
|
"description": "获取当前时间",
|
|
"inputSchema": {"type": "object", "properties": {}}
|
|
}
|
|
]
|
|
})
|
|
|
|
elif method == "tools/call":
|
|
tool_name = params.get("name", "")
|
|
tool_args = params.get("arguments", {})
|
|
|
|
if tool_name == "echo":
|
|
msg = tool_args.get("message", "")
|
|
send_response(req_id, {
|
|
"content": [{"type": "text", "text": f"Echo: {msg}"}]
|
|
})
|
|
|
|
elif tool_name == "add":
|
|
a = tool_args.get("a", 0)
|
|
b = tool_args.get("b", 0)
|
|
send_response(req_id, {
|
|
"content": [{"type": "text", "text": str(a + b)}]
|
|
})
|
|
|
|
elif tool_name == "get_time":
|
|
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
send_response(req_id, {
|
|
"content": [{"type": "text", "text": now}]
|
|
})
|
|
|
|
else:
|
|
send_error(req_id, -32601, f"Unknown tool: {tool_name}")
|
|
|
|
else:
|
|
send_error(req_id, -32601, f"Unknown method: {method}")
|
|
|
|
|
|
def main():
|
|
"""主循环 - 读取 stdin,处理请求"""
|
|
buffer = ""
|
|
|
|
while True:
|
|
try:
|
|
line = sys.stdin.readline()
|
|
if not line:
|
|
break
|
|
|
|
buffer += line
|
|
|
|
if "Content-Length:" in buffer:
|
|
parts = buffer.split("\r\n\r\n", 1)
|
|
if len(parts) == 2:
|
|
header, body = parts
|
|
length = int(header.split(":")[1].strip())
|
|
|
|
while len(body) < length:
|
|
body += sys.stdin.read(length - len(body))
|
|
|
|
request = json.loads(body[:length])
|
|
handle_request(request)
|
|
|
|
buffer = body[length:]
|
|
|
|
elif buffer.strip().startswith("{"):
|
|
try:
|
|
request = json.loads(buffer.strip())
|
|
handle_request(request)
|
|
buffer = ""
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
except Exception as e:
|
|
sys.stderr.write(f"Error: {e}\n")
|
|
sys.stderr.flush()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|