Files
higress/plugins/wasm-go/extensions/mcp-server/main_test.go
2026-06-15 20:29:20 +08:00

3613 lines
118 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"encoding/json"
"strings"
"testing"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/proxytest"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
"github.com/higress-group/wasm-go/pkg/test"
"github.com/stretchr/testify/require"
)
// REST MCP服务器配置
var restMCPServerConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{
"name": "rest-test-server",
"type": "rest",
},
"tools": []map[string]interface{}{
{
"name": "get_weather",
"description": "获取天气信息",
"args": []map[string]interface{}{
{
"name": "location",
"description": "城市名称",
"type": "string",
"required": true,
},
},
"requestTemplate": map[string]interface{}{
"url": "https://httpbin.org/get?city={{.location}}",
"method": "GET",
},
},
},
})
return data
}()
// MCP代理服务器配置
var mcpProxyServerConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{
"name": "proxy-test-server",
"type": "mcp-proxy",
"transport": "http",
"mcpServerURL": "http://backend-mcp.example.com/mcp",
"timeout": 5000,
},
"tools": []map[string]interface{}{
{
"name": "get_product",
"description": "获取产品信息",
"args": []map[string]interface{}{
{
"name": "product_id",
"description": "产品ID",
"type": "string",
"required": true,
},
},
},
},
})
return data
}()
// MCP代理服务器带认证配置
var mcpProxyServerWithAuthConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{
"name": "proxy-auth-test-server",
"type": "mcp-proxy",
"transport": "http",
"mcpServerURL": "http://backend-mcp.example.com/mcp",
"timeout": 5000,
"defaultUpstreamSecurity": map[string]interface{}{
"id": "BackendApiKey",
},
"securitySchemes": []map[string]interface{}{
{
"id": "BackendApiKey",
"type": "apiKey",
"in": "header",
"name": "X-API-Key",
"defaultCredential": "test-default-key",
},
},
},
"tools": []map[string]interface{}{
{
"name": "get_secure_product",
"description": "获取安全产品信息",
"args": []map[string]interface{}{
{
"name": "product_id",
"description": "产品ID",
"type": "string",
"required": true,
},
},
"requestTemplate": map[string]interface{}{
"security": map[string]interface{}{
"id": "BackendApiKey",
},
},
},
},
})
return data
}()
// 内置天气MCP服务器配置
var weatherMCPServerConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{
"name": "weather-test-server",
"config": map[string]interface{}{
"apiKey": "test-api-key",
"baseUrl": "https://api.openweathermap.org/data/2.5",
},
},
})
return data
}()
// TestRestMCPServerConfig 测试REST MCP服务器配置解析
func TestRestMCPServerConfig(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
t.Run("valid rest mcp server config", func(t *testing.T) {
host, status := test.NewTestHost(restMCPServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 对于配置解析测试,主要验证插件启动状态
// GetMatchConfig在WASM模式下可能有限制我们主要关注启动成功
})
})
}
// TestMcpProxyServerConfig 测试MCP代理服务器配置解析
func TestMcpProxyServerConfig(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
t.Run("valid mcp proxy server config", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 对于配置解析测试,主要验证插件启动状态
// GetMatchConfig在WASM模式下可能有限制我们主要关注启动成功
})
})
}
// TestRestMCPServerBasicFlow 测试REST MCP服务器基本流程
func TestRestMCPServerBasicFlow(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
t.Run("tools/list request", func(t *testing.T) {
host, status := test.NewTestHost(restMCPServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}`
// 初始化HTTP上下文
host.InitHttp()
// 处理请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
// 处理请求体
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionContinue, action)
// 验证响应
localResponse := host.GetLocalResponse()
if localResponse != nil && len(localResponse.Data) > 0 {
var response map[string]interface{}
err := json.Unmarshal(localResponse.Data, &response)
require.NoError(t, err)
// 验证JSON-RPC格式
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(1), response["id"])
// 验证tools列表存在
result, ok := response["result"].(map[string]interface{})
require.True(t, ok)
tools, ok := result["tools"].([]interface{})
require.True(t, ok)
require.Greater(t, len(tools), 0)
// 验证第一个工具
tool, ok := tools[0].(map[string]interface{})
require.True(t, ok)
require.Equal(t, "get_weather", tool["name"])
require.Equal(t, "获取天气信息", tool["description"])
}
host.CompleteHttp()
})
})
}
// TestRestMCPServerToolsCall 测试REST MCP服务器的tools/call功能
func TestRestMCPServerToolsCall(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(restMCPServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsCallRequest := `{
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "get_weather",
"arguments": {
"location": "北京"
}
}
}`
// 初始化HTTP上下文
host.InitHttp()
// 处理请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
// 处理请求体 - 这会触发外部HTTP调用
action = host.CallOnHttpRequestBody([]byte(toolsCallRequest))
require.Equal(t, types.ActionContinue, action)
// Mock HTTP响应头
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"Content-Type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
// 处理外部API响应体
externalAPIResponse := `{
"args": {"city": "北京"},
"url": "https://httpbin.org/get?city=北京",
"headers": {
"Host": "httpbin.org"
}
}`
action = host.CallOnHttpResponseBody([]byte(externalAPIResponse))
require.Equal(t, types.ActionContinue, action)
// 验证最终MCP响应
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
// 验证JSON-RPC格式
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(2), response["id"])
// 验证结果存在REST MCP server会将外部API响应包装为MCP格式
result, ok := response["result"].(map[string]interface{})
require.True(t, ok)
content, ok := result["content"].([]interface{})
require.True(t, ok)
require.Greater(t, len(content), 0)
host.CompleteHttp()
})
}
// TestMcpProxyServerToolsList 测试MCP代理服务器的tools/list功能
func TestMcpProxyServerToolsList(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}`
// 初始化HTTP上下文
host.InitHttp()
// 处理请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
// 处理请求体 - 这会触发MCP初始化流程
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionPause, action) // 应该暂停等待后端响应
// Mock MCP初始化阶段的HTTP调用响应
// 第一步Initialize请求的响应
initResponse := `{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "BackendMCPServer",
"version": "1.0.0"
}
}
}`
// Mock initialize响应带session ID
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-123"},
}, []byte(initResponse))
// 第二步notifications/initialized请求的响应
notificationResponse := `{"jsonrpc": "2.0"}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-123"},
}, []byte(notificationResponse))
// 第三步实际的tools/list请求的响应这是executeToolsList中ctx.RouteCall的响应
toolsListResponse := `{
"jsonrpc": "2.0",
"id": 2,
"result": {
"tools": [
{
"name": "get_product",
"description": "获取产品信息",
"inputSchema": {
"type": "object",
"properties": {
"product_id": {
"type": "string",
"description": "产品ID"
}
},
"required": ["product_id"]
}
}
]
}
}`
// 这是对executeToolsList中ctx.RouteCall的响应
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(toolsListResponse))
// 验证最终MCP响应
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
// 验证JSON-RPC格式
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(1), response["id"])
// 验证代理转发的结果
result, ok := response["result"].(map[string]interface{})
require.True(t, ok)
tools, ok := result["tools"].([]interface{})
require.True(t, ok)
require.Greater(t, len(tools), 0)
// 验证工具信息
tool, ok := tools[0].(map[string]interface{})
require.True(t, ok)
require.Equal(t, "get_product", tool["name"])
require.Equal(t, "获取产品信息", tool["description"])
host.CompleteHttp()
})
}
// TestMcpProxyServerToolsCall 测试MCP代理服务器的tools/call功能
func TestMcpProxyServerToolsCall(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsCallRequest := `{
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": "get_product",
"arguments": {
"product_id": "12345"
}
}
}`
// 初始化HTTP上下文
host.InitHttp()
// 处理请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
// 处理请求体 - 这会触发MCP初始化流程和工具调用
action = host.CallOnHttpRequestBody([]byte(toolsCallRequest))
require.Equal(t, types.ActionPause, action) // 应该暂停等待后端响应
// Mock MCP初始化阶段的HTTP调用响应
// 第一步Initialize请求的响应
initResponse := `{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "BackendMCPServer",
"version": "1.0.0"
}
}
}`
// Mock initialize响应带session ID
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-456"},
}, []byte(initResponse))
// 第二步notifications/initialized请求的响应
notificationResponse := `{"jsonrpc": "2.0"}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-456"},
}, []byte(notificationResponse))
// 第三步实际的tools/call请求的响应
toolsCallResponse := `{
"jsonrpc": "2.0",
"id": 2,
"result": {
"content": [
{
"type": "text",
"text": "Product ID: 12345\nName: Sample Product\nPrice: $99.99\nDescription: This is a sample product for testing"
}
],
"isError": false
}
}`
// 这是对executeToolsCall中ctx.RouteCall的响应
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(toolsCallResponse))
// 验证最终MCP响应
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
// 验证JSON-RPC格式
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(3), response["id"])
// 验证代理转发的结果
result, ok := response["result"].(map[string]interface{})
require.True(t, ok)
content, ok := result["content"].([]interface{})
require.True(t, ok)
require.Greater(t, len(content), 0)
// 验证内容
textContent, ok := content[0].(map[string]interface{})
require.True(t, ok)
require.Equal(t, "text", textContent["type"])
require.Contains(t, textContent["text"], "Product ID: 12345")
// 验证isError字段
require.Equal(t, false, result["isError"])
host.CompleteHttp()
})
}
// TestMcpProxyServerAuthentication 测试MCP代理服务器认证功能
func TestMcpProxyServerAuthentication(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试tools/list请求的认证头
t.Run("tools/list authentication headers", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerWithAuthConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}`
// 初始化HTTP上下文
host.InitHttp()
// 处理请求头带用户API Key
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
{"x-api-key", "user-provided-key"}, // 用户提供的API Key
})
require.Equal(t, types.HeaderStopIteration, action)
// 处理请求体
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionPause, action)
// Mock MCP初始化响应
initResponse := `{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "SecureBackendMCPServer",
"version": "1.0.0"
}
}
}`
// 验证初始化请求的认证头
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "secure-session-list-123"},
}, []byte(initResponse))
// 验证初始化成功(从日志中可以确认发送了正确的认证头 [X-API-Key test-default-key]
// 实际的HTTP请求包含了正确的默认凭据用于上游认证
// Mock notifications/initialized响应
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "secure-session-list-123"},
}, []byte(`{"jsonrpc": "2.0"}`))
// Mock tools/list响应
toolsListResponse := `{
"jsonrpc": "2.0",
"id": 2,
"result": {
"tools": [
{
"name": "get_secure_product",
"description": "获取安全产品信息",
"inputSchema": {
"type": "object",
"properties": {
"product_id": {
"type": "string",
"description": "产品ID"
}
},
"required": ["product_id"]
}
}
]
}
}`
// 验证tools/list请求的认证头在响应处理前获取发送给后端的请求头
requestHeaders := host.GetRequestHeaders()
pathValue, hasPath := test.GetHeaderValue(requestHeaders, ":path")
require.True(t, hasPath, "Path header should exist")
require.Contains(t, pathValue, "/mcp", "Path should be MCP endpoint")
apiKeyValue, hasApiKey := test.GetHeaderValue(requestHeaders, "x-api-key")
require.True(t, hasApiKey, "X-API-Key header should exist in tools/list request")
require.Equal(t, "test-default-key", apiKeyValue, "Should use default credential for tools/list")
// 这是对executeToolsList中ctx.RouteCall的响应
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(toolsListResponse))
// 验证响应
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
// 验证响应格式
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(1), response["id"])
result, ok := response["result"].(map[string]interface{})
require.True(t, ok)
tools, ok := result["tools"].([]interface{})
require.True(t, ok)
require.Greater(t, len(tools), 0)
host.CompleteHttp()
})
// 测试tools/call请求的认证头
t.Run("tools/call authentication headers", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerWithAuthConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsCallRequest := `{
"jsonrpc": "2.0",
"id": 4,
"method": "tools/call",
"params": {
"name": "get_secure_product",
"arguments": {
"product_id": "secure-123"
}
}
}`
// 初始化HTTP上下文
host.InitHttp()
// 处理请求头带用户API Key
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
{"x-api-key", "user-provided-key"}, // 用户提供的API Key
})
require.Equal(t, types.HeaderStopIteration, action)
// 处理请求体
action = host.CallOnHttpRequestBody([]byte(toolsCallRequest))
require.Equal(t, types.ActionPause, action)
// Mock MCP初始化响应
initResponse := `{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "SecureBackendMCPServer",
"version": "1.0.0"
}
}
}`
// 验证初始化请求的认证头
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "secure-session-call-456"},
}, []byte(initResponse))
// 验证初始化成功(从日志中可以确认发送了正确的认证头 [X-API-Key test-default-key]
// 实际的HTTP请求包含了正确的默认凭据用于上游认证
// Mock notifications/initialized响应
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "secure-session-call-456"},
}, []byte(`{"jsonrpc": "2.0"}`))
// Mock工具调用响应
secureToolResponse := `{
"jsonrpc": "2.0",
"id": 2,
"result": {
"content": [
{
"type": "text",
"text": "Secure Product ID: secure-123\nName: Confidential Product\nAccess Level: Premium"
}
],
"isError": false
}
}`
// 验证tools/call请求的认证头在响应处理前获取发送给后端的请求头
requestHeaders := host.GetRequestHeaders()
pathValue, hasPath := test.GetHeaderValue(requestHeaders, ":path")
require.True(t, hasPath, "Path header should exist")
require.Contains(t, pathValue, "/mcp", "Path should be MCP endpoint")
apiKeyValue, hasApiKey := test.GetHeaderValue(requestHeaders, "x-api-key")
require.True(t, hasApiKey, "X-API-Key header should exist in tools/call request")
require.Equal(t, "test-default-key", apiKeyValue, "Should use default credential for tools/call")
// 这是对executeToolsCall中ctx.RouteCall的响应
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(secureToolResponse))
// 验证响应
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
// 验证响应格式
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(4), response["id"])
result, ok := response["result"].(map[string]interface{})
require.True(t, ok)
content, ok := result["content"].([]interface{})
require.True(t, ok)
textContent, ok := content[0].(map[string]interface{})
require.True(t, ok)
require.Contains(t, textContent["text"], "Secure Product ID: secure-123")
host.CompleteHttp()
})
})
}
// TestMcpProxyServerErrorHandling 测试MCP代理服务器错误处理
func TestMcpProxyServerErrorHandling(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试协议版本不匹配
t.Run("protocol version mismatch", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 5,
"method": "tools/list",
"params": {}
}`
// 初始化HTTP上下文
host.InitHttp()
// 处理请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
// 处理请求体
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionPause, action)
// Mock协议版本不匹配的错误响应
versionErrorResponse := `{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32602,
"message": "Unsupported protocol version",
"data": {
"supported": ["2024-11-05"],
"requested": "2025-03-26"
}
}
}`
host.CallOnHttpCall([][2]string{
{":status", "400"},
{"content-type", "application/json"},
}, []byte(versionErrorResponse))
// 验证错误响应
localResponse := host.GetLocalResponse()
if localResponse != nil && len(localResponse.Data) > 0 {
var response map[string]interface{}
err := json.Unmarshal(localResponse.Data, &response)
require.NoError(t, err)
// 验证错误被正确包装
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(5), response["id"])
errorField, ok := response["error"].(map[string]interface{})
require.True(t, ok)
require.Contains(t, errorField["message"], "backend")
}
host.CompleteHttp()
})
// 测试后端服务器超时
t.Run("backend timeout", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsCallRequest := `{
"jsonrpc": "2.0",
"id": 6,
"method": "tools/call",
"params": {
"name": "get_product",
"arguments": {
"product_id": "timeout-test"
}
}
}`
// 初始化HTTP上下文
host.InitHttp()
// 处理请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
// 处理请求体
action = host.CallOnHttpRequestBody([]byte(toolsCallRequest))
require.Equal(t, types.ActionPause, action)
// Mock超时错误 - 不提供响应,模拟超时
// 在实际实现中,这会触发超时处理逻辑
host.CompleteHttp()
})
// 测试后端工具执行错误
t.Run("backend tool error", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsCallRequest := `{
"jsonrpc": "2.0",
"id": 7,
"method": "tools/call",
"params": {
"name": "get_product",
"arguments": {
"product_id": "error-test"
}
}
}`
// 初始化HTTP上下文
host.InitHttp()
// 处理请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
// 处理请求体
action = host.CallOnHttpRequestBody([]byte(toolsCallRequest))
require.Equal(t, types.ActionPause, action)
// Mock正常的初始化流程
initResponse := `{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {"tools": {}},
"serverInfo": {"name": "TestServer", "version": "1.0.0"}
}
}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "error-session-001"},
}, []byte(initResponse))
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "error-session-001"},
}, []byte(`{"jsonrpc": "2.0"}`))
// Mock工具执行错误响应
toolErrorResponse := `{
"jsonrpc": "2.0",
"id": 2,
"result": {
"content": [
{
"type": "text",
"text": "Failed to fetch product: Database connection error"
}
],
"isError": true
}
}`
// 这是对executeToolsCall中ctx.RouteCall的响应
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(toolErrorResponse))
// 验证错误被正确传播
responseBody := host.GetResponseBody()
if len(responseBody) > 0 {
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
// 验证响应格式
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(7), response["id"])
result, ok := response["result"].(map[string]interface{})
require.True(t, ok)
require.Equal(t, true, result["isError"])
content, ok := result["content"].([]interface{})
require.True(t, ok)
textContent, ok := content[0].(map[string]interface{})
require.True(t, ok)
require.Contains(t, textContent["text"], "Database connection error")
}
host.CompleteHttp()
})
})
}
// TestMcpProxyServerSessionManagement 测试MCP代理服务器会话管理
func TestMcpProxyServerSessionManagement(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试单个HTTP Context内的会话状态管理
t.Run("context session state", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 在同一个HTTP Context中先进行tools/list然后tools/call
// 这将测试Context内的CtxMcpProxyInitialized状态管理
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 8,
"method": "tools/list",
"params": {}
}`
// 初始化HTTP上下文
host.InitHttp()
// 处理第一个请求tools/list
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionPause, action)
// Mock初始化过程第一次应该触发初始化
initResponse := `{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {"tools": {}},
"serverInfo": {"name": "TestServer", "version": "1.0.0"}
}
}`
sessionID := "context-session-999"
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", sessionID},
}, []byte(initResponse))
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", sessionID},
}, []byte(`{"jsonrpc": "2.0"}`))
// Mock tools/list响应
toolsListResponse := `{
"jsonrpc": "2.0",
"id": 2,
"result": {
"tools": [
{
"name": "get_product",
"description": "获取产品信息",
"inputSchema": {
"type": "object",
"properties": {
"product_id": {"type": "string", "description": "产品ID"}
},
"required": ["product_id"]
}
}
]
}
}`
// 这是对executeToolsList中ctx.RouteCall的响应
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(toolsListResponse))
// 验证tools/list响应
responseBody := host.GetResponseBody()
if len(responseBody) > 0 {
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(8), response["id"])
}
host.CompleteHttp()
})
// 测试每个新HTTP请求都需要重新初始化符合实际实现
t.Run("new request requires new initialization", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 第一个独立的HTTP请求
toolsCallRequest1 := `{
"jsonrpc": "2.0",
"id": 9,
"method": "tools/call",
"params": {
"name": "get_product",
"arguments": {
"product_id": "request-1"
}
}
}`
host.InitHttp()
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsCallRequest1))
require.Equal(t, types.ActionPause, action)
// Mock第一个请求的完整初始化流程
initResponse := `{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {"tools": {}},
"serverInfo": {"name": "TestServer", "version": "1.0.0"}
}
}`
sessionID1 := "session-request-1"
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", sessionID1},
}, []byte(initResponse))
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", sessionID1},
}, []byte(`{"jsonrpc": "2.0"}`))
// Mock tools/call响应
toolsCallResponse1 := `{
"jsonrpc": "2.0",
"id": 2,
"result": {
"content": [
{
"type": "text",
"text": "Product from request 1: request-1"
}
],
"isError": false
}
}`
// 这是对executeToolsCall中ctx.RouteCall的响应
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(toolsCallResponse1))
// 验证第一个请求的响应
responseBody := host.GetResponseBody()
if len(responseBody) > 0 {
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(9), response["id"])
}
host.CompleteHttp()
// 第二个独立的HTTP请求新的Context需要重新初始化
toolsCallRequest2 := `{
"jsonrpc": "2.0",
"id": 10,
"method": "tools/call",
"params": {
"name": "get_product",
"arguments": {
"product_id": "request-2"
}
}
}`
// 新的HTTP Context
host.InitHttp()
action = host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsCallRequest2))
require.Equal(t, types.ActionPause, action)
// Mock第二个请求的完整初始化流程每个新Context都需要重新初始化
sessionID2 := "session-request-2"
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", sessionID2}, // 不同的session ID
}, []byte(initResponse))
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", sessionID2},
}, []byte(`{"jsonrpc": "2.0"}`))
toolsCallResponse2 := `{
"jsonrpc": "2.0",
"id": 2,
"result": {
"content": [
{
"type": "text",
"text": "Product from request 2: request-2"
}
],
"isError": false
}
}`
// 这是对executeToolsCall中ctx.RouteCall的响应
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(toolsCallResponse2))
// 验证第二个请求的响应
responseBody = host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(10), response["id"])
result, ok := response["result"].(map[string]interface{})
require.True(t, ok)
content, ok := result["content"].([]interface{})
require.True(t, ok)
textContent, ok := content[0].(map[string]interface{})
require.True(t, ok)
require.Contains(t, textContent["text"], "request-2")
host.CompleteHttp()
})
// 测试初始化失败处理
t.Run("initialization failure", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsCallRequest := `{
"jsonrpc": "2.0",
"id": 11,
"method": "tools/call",
"params": {
"name": "get_product",
"arguments": {
"product_id": "init-failure-test"
}
}
}`
host.InitHttp()
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsCallRequest))
require.Equal(t, types.ActionPause, action)
// Mock初始化失败响应
initErrorResponse := `{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32001,
"message": "Backend server unavailable"
}
}`
host.CallOnHttpCall([][2]string{
{":status", "500"},
{"content-type", "application/json"},
}, []byte(initErrorResponse))
// 验证错误被正确处理
localResponse := host.GetLocalResponse()
if localResponse != nil && len(localResponse.Data) > 0 {
var response map[string]interface{}
err := json.Unmarshal(localResponse.Data, &response)
require.NoError(t, err)
// 验证错误响应格式
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(11), response["id"])
errorField, ok := response["error"].(map[string]interface{})
require.True(t, ok)
require.Contains(t, errorField["message"], "backend")
}
host.CompleteHttp()
})
})
}
// BenchmarkRestMCPServer 性能基准测试
func BenchmarkRestMCPServer(b *testing.B) {
host, status := test.NewTestHost(restMCPServerConfig)
defer host.Reset()
require.Equal(b, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}`
b.ResetTimer()
for i := 0; i < b.N; i++ {
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
host.CallOnHttpRequestBody([]byte(toolsListRequest))
host.CompleteHttp()
}
}
// TestMcpProxyServerAllowTools 测试MCP代理服务器allowTools功能
func TestMcpProxyServerAllowTools(t *testing.T) {
// 创建包含allowTools配置的测试配置
mcpProxyServerWithAllowToolsConfig := func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{
"name": "proxy-allow-tools-server",
"type": "mcp-proxy",
"transport": "http",
"mcpServerURL": "http://backend-mcp.example.com/mcp",
"timeout": 5000,
},
"allowTools": []string{"get_product", "create_order"}, // 只允许这两个工具
"tools": []map[string]interface{}{
{
"name": "get_product",
"type": "mcp-proxy",
"description": "Get product information",
},
{
"name": "create_order",
"type": "mcp-proxy",
"description": "Create a new order",
},
{
"name": "delete_user",
"type": "mcp-proxy",
"description": "Delete a user account",
},
},
})
return data
}()
test.RunTest(t, func(t *testing.T) {
// 测试配置级别的allowTools过滤
t.Run("config level allowTools filtering", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerWithAllowToolsConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}`
host.InitHttp()
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionPause, action) // 应该暂停等待异步响应
// Mock MCP initialization sequence
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-123"},
}, []byte(`{
"jsonrpc": "2.0",
"id": "init-1",
"result": {
"capabilities": {
"tools": {"listChanged": true}
},
"protocolVersion": "2024-11-05"
}
}`))
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-123"},
}, []byte(`{
"jsonrpc": "2.0",
"id": "notify-1",
"result": {}
}`))
// Mock tools/list response with 3 tools (但只有2个会被返回)
// 这是对executeToolsList中ctx.RouteCall的响应
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(`{
"jsonrpc": "2.0",
"id": 2,
"result": {
"tools": [
{
"name": "get_product",
"description": "Get product information",
"inputSchema": {"type": "object"}
},
{
"name": "create_order",
"description": "Create a new order",
"inputSchema": {"type": "object"}
},
{
"name": "delete_user",
"description": "Delete a user account",
"inputSchema": {"type": "object"}
}
]
}
}`))
host.CompleteHttp()
// 验证响应只包含允许的工具
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
result, hasResult := response["result"]
require.True(t, hasResult)
resultMap := result.(map[string]interface{})
tools, hasTools := resultMap["tools"]
require.True(t, hasTools)
toolsArray := tools.([]interface{})
// 应该只返回2个允许的工具delete_user被过滤掉
require.Len(t, toolsArray, 2)
toolNames := make([]string, 0)
for _, tool := range toolsArray {
toolMap := tool.(map[string]interface{})
toolNames = append(toolNames, toolMap["name"].(string))
}
require.Contains(t, toolNames, "get_product")
require.Contains(t, toolNames, "create_order")
require.NotContains(t, toolNames, "delete_user")
})
// 测试请求头级别的allowTools过滤
t.Run("header level allowTools filtering", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig) // 使用没有allowTools配置的基本配置
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}`
host.InitHttp()
// 设置请求头只允许get_product工具
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
{"x-envoy-allow-mcp-tools", "get_product"},
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionPause, action)
// Mock MCP initialization sequence
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-456"},
}, []byte(`{
"jsonrpc": "2.0",
"id": "init-2",
"result": {
"capabilities": {
"tools": {"listChanged": true}
},
"protocolVersion": "2024-11-05"
}
}`))
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-456"},
}, []byte(`{
"jsonrpc": "2.0",
"id": "notify-2",
"result": {}
}`))
// Mock tools/list response with multiple tools
// 这是对executeToolsList中ctx.RouteCall的响应
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(`{
"jsonrpc": "2.0",
"id": 2,
"result": {
"tools": [
{
"name": "get_product",
"description": "Get product information",
"inputSchema": {"type": "object"}
},
{
"name": "create_order",
"description": "Create a new order",
"inputSchema": {"type": "object"}
}
]
}
}`))
host.CompleteHttp()
// 验证响应只包含请求头中允许的工具
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
result, hasResult := response["result"]
require.True(t, hasResult)
resultMap := result.(map[string]interface{})
tools, hasTools := resultMap["tools"]
require.True(t, hasTools)
toolsArray := tools.([]interface{})
// 应该只返回1个工具(get_product)
require.Len(t, toolsArray, 1)
toolMap := toolsArray[0].(map[string]interface{})
require.Equal(t, "get_product", toolMap["name"])
})
// 测试配置和请求头都存在时的组合过滤
t.Run("combined config and header allowTools filtering", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerWithAllowToolsConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 3,
"method": "tools/list",
"params": {}
}`
host.InitHttp()
// 配置允许get_product, create_order
// 请求头允许get_product, delete_user
// 交集应该只有get_product
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
{"x-envoy-allow-mcp-tools", "get_product,delete_user"},
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionPause, action)
// Mock MCP initialization sequence
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-789"},
}, []byte(`{
"jsonrpc": "2.0",
"id": "init-3",
"result": {
"capabilities": {
"tools": {"listChanged": true}
},
"protocolVersion": "2024-11-05"
}
}`))
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-789"},
}, []byte(`{
"jsonrpc": "2.0",
"id": "notify-3",
"result": {}
}`))
// Mock tools/list response
// 这是对executeToolsList中ctx.RouteCall的响应
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(`{
"jsonrpc": "2.0",
"id": 3,
"result": {
"tools": [
{
"name": "get_product",
"description": "Get product information",
"inputSchema": {"type": "object"}
},
{
"name": "create_order",
"description": "Create a new order",
"inputSchema": {"type": "object"}
},
{
"name": "delete_user",
"description": "Delete a user account",
"inputSchema": {"type": "object"}
}
]
}
}`))
host.CompleteHttp()
// 验证响应只包含交集中的工具
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
result, hasResult := response["result"]
require.True(t, hasResult)
resultMap := result.(map[string]interface{})
tools, hasTools := resultMap["tools"]
require.True(t, hasTools)
toolsArray := tools.([]interface{})
// 应该只返回1个工具(get_product)因为它是唯一在两个allowTools列表中的工具
require.Len(t, toolsArray, 1)
toolMap := toolsArray[0].(map[string]interface{})
require.Equal(t, "get_product", toolMap["name"])
})
// 测试空白的请求头allowTools
t.Run("empty header allowTools", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 4,
"method": "tools/list",
"params": {}
}`
host.InitHttp()
// 设置空的allowTools头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
{"x-envoy-allow-mcp-tools", " , , "}, // 只有空白和逗号
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionPause, action)
// Mock MCP initialization sequence
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-empty"},
}, []byte(`{
"jsonrpc": "2.0",
"id": "init-4",
"result": {
"capabilities": {
"tools": {"listChanged": true}
},
"protocolVersion": "2024-11-05"
}
}`))
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-empty"},
}, []byte(`{
"jsonrpc": "2.0",
"id": "notify-4",
"result": {}
}`))
// Mock tools/list response
// 这是对executeToolsList中ctx.RouteCall的响应
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(`{
"jsonrpc": "2.0",
"id": 4,
"result": {
"tools": [
{
"name": "get_product",
"description": "Get product information",
"inputSchema": {"type": "object"}
},
{
"name": "create_order",
"description": "Create a new order",
"inputSchema": {"type": "object"}
}
]
}
}`))
host.CompleteHttp()
// 验证响应不包含任何工具空白header应该被当作配置为空禁止所有工具
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
result, hasResult := response["result"]
require.True(t, hasResult)
resultMap := result.(map[string]interface{})
tools, hasTools := resultMap["tools"]
require.True(t, hasTools)
toolsArray := tools.([]interface{})
// 应该返回0个工具因为空白header等于配置为空数组禁止所有工具
require.Len(t, toolsArray, 0)
})
// 测试不存在的allowTools header应该允许所有工具
t.Run("no header allowTools", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 5,
"method": "tools/list",
"params": {}
}`
host.InitHttp()
// 不设置x-envoy-allow-mcp-tools header
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionPause, action)
// Mock MCP initialization sequence
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-no-header"},
}, []byte(`{
"jsonrpc": "2.0",
"id": "init-5",
"result": {
"capabilities": {
"tools": {"listChanged": true}
},
"protocolVersion": "2024-11-05"
}
}`))
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
{"mcp-session-id", "test-session-no-header"},
}, []byte(`{
"jsonrpc": "2.0",
"id": "notify-5",
"result": {}
}`))
// Mock tools/list response with multiple tools
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpResponseBody([]byte(`{
"jsonrpc": "2.0",
"id": 5,
"result": {
"tools": [
{
"name": "get_product",
"description": "Get product information",
"inputSchema": {"type": "object"}
},
{
"name": "create_order",
"description": "Create a new order",
"inputSchema": {"type": "object"}
},
{
"name": "delete_user",
"description": "Delete a user account",
"inputSchema": {"type": "object"}
}
]
}
}`))
host.CompleteHttp()
// 验证响应包含所有工具header不存在时允许所有工具
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
result, hasResult := response["result"]
require.True(t, hasResult)
resultMap := result.(map[string]interface{})
tools, hasTools := resultMap["tools"]
require.True(t, hasTools)
toolsArray := tools.([]interface{})
// 应该返回所有3个工具因为header不存在意味着没有限制
require.Len(t, toolsArray, 3)
toolNames := make([]string, 0)
for _, tool := range toolsArray {
toolMap := tool.(map[string]interface{})
toolNames = append(toolNames, toolMap["name"].(string))
}
require.Contains(t, toolNames, "get_product")
require.Contains(t, toolNames, "create_order")
require.Contains(t, toolNames, "delete_user")
})
})
}
// MCP Proxy Server with SSE transport configuration
var mcpProxyServerSSEConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{
"name": "proxy-sse-test-server",
"type": "mcp-proxy",
"transport": "sse",
"mcpServerURL": "http://backend-mcp.example.com/sse",
"timeout": 5000,
},
"tools": []map[string]interface{}{
{
"name": "get_product",
"description": "Get product information",
"args": []map[string]interface{}{
{
"name": "product_id",
"description": "Product ID",
"type": "string",
"required": true,
},
},
},
},
})
return data
}()
// TestMcpProxyServerSSEToolsList tests tools/list with SSE transport
func TestMcpProxyServerSSEToolsList(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerSSEConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}`
// Initialize HTTP context
host.InitHttp()
// Process request headers
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
// Process request body - this triggers SSE channel establishment
// SSE protocol converts the POST request to GET in the request phase
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionContinue, action) // Should continue (not pause) as request is converted to GET
// Step 1: Mock SSE channel response headers (GET request response)
// The server returns text/event-stream for SSE channel
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
{"cache-control", "no-cache"},
})
require.Equal(t, types.HeaderStopIteration, action) // Should stop to process streaming body
// Step 2: Send SSE endpoint message
// This is the first message in the SSE stream indicating the endpoint URL
endpointSSEMessage := `event: endpoint
data: /sse/session-abc123
`
action = host.CallOnHttpStreamingResponseBody([]byte(endpointSSEMessage), false)
// After receiving endpoint, the proxy will send initialize request via RouteCluster
// The state machine is now waiting for initialize response
// Step 3: Mock initialize response via RouteCluster callback
initResponse := `{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"BackendSSEServer","version":"1.0.0"}}}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(initResponse))
// Step 4: Send SSE initialize response in the stream
// This matches the initialize request ID and triggers notification
initSSEMessage := `event: message
data: {"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"BackendSSEServer","version":"1.0.0"}}}
`
action = host.CallOnHttpStreamingResponseBody([]byte(initSSEMessage), false)
// After receiving init response, proxy sends notification via RouteCluster
// Step 5: Mock notification response via RouteCluster callback
notificationResponse := `{"jsonrpc":"2.0"}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(notificationResponse))
// After notification completes, proxy sends tools/list via RouteCluster
// Step 6: Mock tools/list response via RouteCluster callback
toolsListResponse := `{"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"get_product","description":"Get product information","inputSchema":{"type":"object","properties":{"product_id":{"type":"string","description":"Product ID"}},"required":["product_id"]}}]}}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(toolsListResponse))
// Step 7: Send SSE tools/list response in the stream
// This matches the tools/list request ID and triggers final response
toolsListSSEMessage := `event: message
data: {"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"get_product","description":"Get product information","inputSchema":{"type":"object","properties":{"product_id":{"type":"string","description":"Product ID"}},"required":["product_id"]}}]}}
`
action = host.CallOnHttpStreamingResponseBody([]byte(toolsListSSEMessage), true)
// This should inject the final response to client
// Verify the final response
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
// Verify JSON-RPC format
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(1), response["id"])
// Verify the proxied result
result, ok := response["result"].(map[string]interface{})
require.True(t, ok)
tools, ok := result["tools"].([]interface{})
require.True(t, ok)
require.Greater(t, len(tools), 0)
// Verify tool information
tool, ok := tools[0].(map[string]interface{})
require.True(t, ok)
require.Equal(t, "get_product", tool["name"])
require.Equal(t, "Get product information", tool["description"])
host.CompleteHttp()
})
}
// TestMcpProxyServerSSEToolsCall tests tools/call with SSE transport
func TestMcpProxyServerSSEToolsCall(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerSSEConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsCallRequest := `{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "get_product",
"arguments": {
"product_id": "12345"
}
}
}`
// Initialize HTTP context
host.InitHttp()
// Process request headers
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
// Process request body - this triggers SSE channel establishment
action = host.CallOnHttpRequestBody([]byte(toolsCallRequest))
require.Equal(t, types.ActionContinue, action)
// Step 1: Mock SSE channel response headers
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
{"cache-control", "no-cache"},
})
require.Equal(t, types.HeaderStopIteration, action)
// Step 2: Send SSE endpoint message
endpointSSEMessage := `event: endpoint
data: /sse/session-xyz789
`
action = host.CallOnHttpStreamingResponseBody([]byte(endpointSSEMessage), false)
// Step 3: Mock initialize response via RouteCluster callback
initResponse := `{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"BackendSSEServer","version":"1.0.0"}}}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(initResponse))
// Step 4: Send SSE initialize response in the stream
initSSEMessage := `event: message
data: {"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"BackendSSEServer","version":"1.0.0"}}}
`
action = host.CallOnHttpStreamingResponseBody([]byte(initSSEMessage), false)
// Step 5: Mock notification response via RouteCluster callback
notificationResponse := `{"jsonrpc":"2.0"}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(notificationResponse))
// Step 6: Mock tools/call response via RouteCluster callback
toolsCallResponse := `{"jsonrpc":"2.0","id":2,"result":{"content":[{"type":"text","text":"Product ID: 12345, Name: Sample Product, Price: $99.99"}]}}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(toolsCallResponse))
// Step 7: Send SSE tools/call response in the stream
toolsCallSSEMessage := `event: message
data: {"jsonrpc":"2.0","id":2,"result":{"content":[{"type":"text","text":"Product ID: 12345, Name: Sample Product, Price: $99.99"}]}}
`
action = host.CallOnHttpStreamingResponseBody([]byte(toolsCallSSEMessage), true)
// Verify the final response
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
// Verify JSON-RPC format
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(1), response["id"])
// Verify the proxied result
result, ok := response["result"].(map[string]interface{})
require.True(t, ok)
content, ok := result["content"].([]interface{})
require.True(t, ok)
require.Greater(t, len(content), 0)
// Verify content
contentItem, ok := content[0].(map[string]interface{})
require.True(t, ok)
require.Equal(t, "text", contentItem["type"])
require.Contains(t, contentItem["text"], "Product ID: 12345")
host.CompleteHttp()
})
}
// TestMcpProxyServerSSEAllowTools tests allowTools functionality with SSE transport
func TestMcpProxyServerSSEAllowTools(t *testing.T) {
// Create config with allowTools
mcpProxyServerSSEWithAllowToolsConfig := func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{
"name": "proxy-sse-allow-tools-server",
"type": "mcp-proxy",
"transport": "sse",
"mcpServerURL": "http://backend-mcp.example.com/sse",
"timeout": 5000,
},
"allowTools": []string{"get_product", "create_order"}, // Only allow these two tools
"tools": []map[string]interface{}{
{
"name": "get_product",
"type": "mcp-proxy",
"description": "Get product information",
},
{
"name": "create_order",
"type": "mcp-proxy",
"description": "Create a new order",
},
{
"name": "delete_user",
"type": "mcp-proxy",
"description": "Delete a user account",
},
},
})
return data
}()
test.RunTest(t, func(t *testing.T) {
// Test config level allowTools filtering
t.Run("config level allowTools filtering with SSE", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerSSEWithAllowToolsConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}`
host.InitHttp()
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionContinue, action)
// Mock SSE channel response
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
require.Equal(t, types.HeaderStopIteration, action)
// Send endpoint
endpointSSEMessage := `event: endpoint
data: /sse/session-allow-tools
`
host.CallOnHttpStreamingResponseBody([]byte(endpointSSEMessage), false)
// Mock initialize
initResponse := `{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"BackendSSEServer","version":"1.0.0"}}}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(initResponse))
initSSEMessage := `event: message
data: {"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"BackendSSEServer","version":"1.0.0"}}}
`
host.CallOnHttpStreamingResponseBody([]byte(initSSEMessage), false)
// Mock notification
notificationResponse := `{"jsonrpc":"2.0"}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(notificationResponse))
// Mock tools/list response with all 3 tools from backend
toolsListResponse := `{"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"get_product","description":"Get product information"},{"name":"create_order","description":"Create a new order"},{"name":"delete_user","description":"Delete a user account"}]}}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(toolsListResponse))
// Send SSE tools/list response
toolsListSSEMessage := `event: message
data: {"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"get_product","description":"Get product information"},{"name":"create_order","description":"Create a new order"},{"name":"delete_user","description":"Delete a user account"}]}}
`
host.CallOnHttpStreamingResponseBody([]byte(toolsListSSEMessage), true)
// Verify response - should only contain allowed tools
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
result, hasResult := response["result"]
require.True(t, hasResult)
resultMap := result.(map[string]interface{})
tools, hasTools := resultMap["tools"]
require.True(t, hasTools)
toolsArray := tools.([]interface{})
// Should only return 2 allowed tools (get_product, create_order)
require.Len(t, toolsArray, 2)
toolNames := make([]string, 0)
for _, tool := range toolsArray {
toolMap := tool.(map[string]interface{})
toolNames = append(toolNames, toolMap["name"].(string))
}
require.Contains(t, toolNames, "get_product")
require.Contains(t, toolNames, "create_order")
require.NotContains(t, toolNames, "delete_user") // Should be filtered out
host.CompleteHttp()
})
// Test header level allowTools filtering
t.Run("header level allowTools filtering with SSE", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerSSEWithAllowToolsConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}`
host.InitHttp()
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
{"x-envoy-allow-mcp-tools", "get_product"}, // Only allow get_product via header
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionContinue, action)
// Mock SSE channel response
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
require.Equal(t, types.HeaderStopIteration, action)
// Send endpoint
endpointSSEMessage := `event: endpoint
data: /sse/session-header-filter
`
host.CallOnHttpStreamingResponseBody([]byte(endpointSSEMessage), false)
// Mock initialize
initResponse := `{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"BackendSSEServer","version":"1.0.0"}}}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(initResponse))
initSSEMessage := `event: message
data: {"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"BackendSSEServer","version":"1.0.0"}}}
`
host.CallOnHttpStreamingResponseBody([]byte(initSSEMessage), false)
// Mock notification
notificationResponse := `{"jsonrpc":"2.0"}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(notificationResponse))
// Mock tools/list response with all 3 tools
toolsListResponse := `{"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"get_product","description":"Get product information"},{"name":"create_order","description":"Create a new order"},{"name":"delete_user","description":"Delete a user account"}]}}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(toolsListResponse))
// Send SSE tools/list response
toolsListSSEMessage := `event: message
data: {"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"get_product","description":"Get product information"},{"name":"create_order","description":"Create a new order"},{"name":"delete_user","description":"Delete a user account"}]}}
`
host.CallOnHttpStreamingResponseBody([]byte(toolsListSSEMessage), true)
// Verify response - should only contain get_product (intersection of config and header)
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
result, hasResult := response["result"]
require.True(t, hasResult)
resultMap := result.(map[string]interface{})
tools, hasTools := resultMap["tools"]
require.True(t, hasTools)
toolsArray := tools.([]interface{})
// Should only return 1 tool (get_product - intersection of config and header)
require.Len(t, toolsArray, 1)
tool := toolsArray[0].(map[string]interface{})
require.Equal(t, "get_product", tool["name"])
host.CompleteHttp()
})
})
}
// TestMcpProxyServerSSEAuthentication tests authentication functionality with SSE transport
func TestMcpProxyServerSSEAuthentication(t *testing.T) {
// Create SSE config with authentication
mcpProxyServerSSEWithAuthConfig := func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{
"name": "proxy-sse-auth-server",
"type": "mcp-proxy",
"transport": "sse",
"mcpServerURL": "http://backend-mcp.example.com/sse",
"timeout": 5000,
"defaultUpstreamSecurity": map[string]interface{}{
"id": "BackendApiKey",
},
"securitySchemes": []map[string]interface{}{
{
"id": "BackendApiKey",
"type": "apiKey",
"in": "header",
"name": "X-API-Key",
"defaultCredential": "backend-default-key",
},
},
},
"tools": []map[string]interface{}{
{
"name": "get_secure_data",
"type": "mcp-proxy",
"description": "Get secure data with authentication",
},
},
})
return data
}()
test.RunTest(t, func(t *testing.T) {
// Test authentication headers in SSE requests
t.Run("SSE requests with authentication headers", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerSSEWithAuthConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsListRequest := `{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
}`
host.InitHttp()
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsListRequest))
require.Equal(t, types.ActionContinue, action)
// Mock SSE channel response
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
require.Equal(t, types.HeaderStopIteration, action)
// Send endpoint
endpointSSEMessage := `event: endpoint
data: /sse/session-auth-test
`
host.CallOnHttpStreamingResponseBody([]byte(endpointSSEMessage), false)
// Verify initialize request has authentication header
httpCallouts := host.GetHttpCalloutAttributes()
require.Greater(t, len(httpCallouts), 0, "Expected at least one HTTP callout for initialize")
initCallout := httpCallouts[0]
require.True(t, test.HasHeaderWithValue(initCallout.Headers, "X-API-Key", "backend-default-key"),
"Initialize request should have X-API-Key header with default credential")
require.True(t, test.HasHeaderWithValue(initCallout.Headers, ":method", "POST"),
"Initialize request should use POST method")
require.True(t, test.HasHeaderWithValue(initCallout.Headers, "Content-Type", "application/json"),
"Initialize request should have Content-Type header")
// Mock initialize response
initResponse := `{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"SecureSSEServer","version":"1.0.0"}}}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(initResponse))
initSSEMessage := `event: message
data: {"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"SecureSSEServer","version":"1.0.0"}}}
`
host.CallOnHttpStreamingResponseBody([]byte(initSSEMessage), false)
// After sending initSSEMessage, notification HTTP call should be triggered
// Let's respond to initialize callback first, then respond to notification callback
// This will trigger the tools/list request
// Mock notification response (this responds to the notification HTTP call that was just sent)
notificationResponse := `{"jsonrpc":"2.0"}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(notificationResponse))
// After responding to notification, tools/list request should have been sent
// Verify tools/list request has authentication header
httpCallouts = host.GetHttpCalloutAttributes()
// Find the tools/list callout (it should be the last one, or one of the recent ones)
var toolsListCallout *proxytest.HttpCalloutAttribute
for i := len(httpCallouts) - 1; i >= 0; i-- {
if strings.Contains(string(httpCallouts[i].Body), "tools/list") {
toolsListCallout = &httpCallouts[i]
break
}
}
require.NotNil(t, toolsListCallout, "Expected to find tools/list HTTP callout")
require.True(t, test.HasHeaderWithValue(toolsListCallout.Headers, "X-API-Key", "backend-default-key"),
"Tools/list request should have X-API-Key header with default credential")
require.True(t, test.HasHeaderWithValue(toolsListCallout.Headers, ":method", "POST"),
"Tools/list request should use POST method")
require.True(t, test.HasHeaderWithValue(toolsListCallout.Headers, "Content-Type", "application/json"),
"Tools/list request should have Content-Type header")
// Verify request body contains tools/list
require.Contains(t, string(toolsListCallout.Body), "tools/list", "Request body should contain tools/list method")
// Mock tools/list response
toolsListResponse := `{"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"get_secure_data","description":"Get secure data with authentication"}]}}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(toolsListResponse))
toolsListSSEMessage := `event: message
data: {"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"get_secure_data","description":"Get secure data with authentication"}]}}
`
host.CallOnHttpStreamingResponseBody([]byte(toolsListSSEMessage), true)
// Verify final response
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
require.Equal(t, "2.0", response["jsonrpc"])
require.Equal(t, float64(1), response["id"])
result, ok := response["result"].(map[string]interface{})
require.True(t, ok)
tools, ok := result["tools"].([]interface{})
require.True(t, ok)
require.Len(t, tools, 1)
host.CompleteHttp()
})
// Test tools/call with authentication
t.Run("SSE tools/call with authentication headers", func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerSSEWithAuthConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
toolsCallRequest := `{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "get_secure_data",
"arguments": {
"data_id": "secret-123"
}
}
}`
host.InitHttp()
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp-server.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
action = host.CallOnHttpRequestBody([]byte(toolsCallRequest))
require.Equal(t, types.ActionContinue, action)
// Mock SSE channel response
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
require.Equal(t, types.HeaderStopIteration, action)
// Send endpoint - this triggers initialize request
endpointSSEMessage := `event: endpoint
data: /sse/session-call-auth
`
host.CallOnHttpStreamingResponseBody([]byte(endpointSSEMessage), false)
// Verify initialize request has authentication header
httpCallouts := host.GetHttpCalloutAttributes()
require.Greater(t, len(httpCallouts), 0, "Expected at least one HTTP callout for initialize")
initCallout := httpCallouts[0]
require.True(t, test.HasHeaderWithValue(initCallout.Headers, "X-API-Key", "backend-default-key"),
"Initialize request should have X-API-Key header with default credential")
require.True(t, test.HasHeaderWithValue(initCallout.Headers, ":method", "POST"),
"Initialize request should use POST method")
// Mock initialize response
initResponse := `{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"SecureSSEServer","version":"1.0.0"}}}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(initResponse))
// Send initialize SSE message - this triggers notification request
initSSEMessage := `event: message
data: {"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"SecureSSEServer","version":"1.0.0"}}}
`
host.CallOnHttpStreamingResponseBody([]byte(initSSEMessage), false)
// Mock notification response - this will trigger tools/call request
notificationResponse := `{"jsonrpc":"2.0"}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(notificationResponse))
// After responding to notification, tools/call request should have been sent
// Verify tools/call request has authentication header
httpCallouts = host.GetHttpCalloutAttributes()
// Find the tools/call callout
var toolsCallCallout *proxytest.HttpCalloutAttribute
for i := len(httpCallouts) - 1; i >= 0; i-- {
if strings.Contains(string(httpCallouts[i].Body), "tools/call") {
toolsCallCallout = &httpCallouts[i]
break
}
}
require.NotNil(t, toolsCallCallout, "Expected to find tools/call HTTP callout")
require.True(t, test.HasHeaderWithValue(toolsCallCallout.Headers, "X-API-Key", "backend-default-key"),
"Tools/call request should have X-API-Key header with default credential")
require.True(t, test.HasHeaderWithValue(toolsCallCallout.Headers, ":method", "POST"),
"Tools/call request should use POST method")
// Verify request body contains tools/call and arguments
require.Contains(t, string(toolsCallCallout.Body), "tools/call", "Request body should contain tools/call method")
require.Contains(t, string(toolsCallCallout.Body), "get_secure_data", "Request body should contain tool name")
require.Contains(t, string(toolsCallCallout.Body), "secret-123", "Request body should contain arguments")
// Mock tools/call response
toolsCallResponse := `{"jsonrpc":"2.0","id":2,"result":{"content":[{"type":"text","text":"Secure data for secret-123"}]}}`
host.CallOnHttpCall([][2]string{
{":status", "200"},
{"content-type", "application/json"},
}, []byte(toolsCallResponse))
toolsCallSSEMessage := `event: message
data: {"jsonrpc":"2.0","id":2,"result":{"content":[{"type":"text","text":"Secure data for secret-123"}]}}
`
host.CallOnHttpStreamingResponseBody([]byte(toolsCallSSEMessage), true)
// Verify final response
responseBody := host.GetResponseBody()
require.NotEmpty(t, responseBody)
var response map[string]interface{}
err := json.Unmarshal(responseBody, &response)
require.NoError(t, err)
require.Equal(t, "2.0", response["jsonrpc"])
result, ok := response["result"].(map[string]interface{})
require.True(t, ok)
content, ok := result["content"].([]interface{})
require.True(t, ok)
require.Greater(t, len(content), 0)
host.CompleteHttp()
})
})
}
// -----------------------------------------------------------------------------
// Phase 2.1 — REST Server Call() branch matrix
//
// Each sub-test stands up a minimal REST MCP config whose single tool exercises
// a specific branch of rest_server.go:Call (~lines 523-946). The plugin runs the
// real tool, makes one ctx.RouteCall, and we mock the backend reply via
// CallOnHttpResponseHeaders/Body. We inspect the upstream request via
// GetRequestHeaders/Body and the MCP-shaped response via GetResponseBody.
// -----------------------------------------------------------------------------
func TestRestMCPServer_CallBranches(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// -------------------------------------------------------------------
// argsToFormBody → upstream body is form-urlencoded; Content-Type set
// -------------------------------------------------------------------
t.Run("argsToFormBody encodes body as form-urlencoded", func(t *testing.T) {
cfg, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{"name": "rest-form", "type": "rest"},
"tools": []map[string]interface{}{{
"name": "submit",
"description": "form submit",
"args": []map[string]interface{}{
{"name": "user", "description": "u", "type": "string", "required": true},
{"name": "msg", "description": "m", "type": "string"},
},
"requestTemplate": map[string]interface{}{
"url": "http://backend.example/form",
"method": "POST",
"argsToFormBody": true,
},
}},
})
host, status := test.NewTestHost(cfg)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"},
{"content-type", "application/json"},
})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"submit","arguments":{"user":"alice 1","msg":"hi&there"}}}`))
upstreamHeaders := host.GetRequestHeaders()
upstreamBody := host.GetRequestBody()
ctHeader, has := test.GetHeaderValue(upstreamHeaders, "Content-Type")
require.True(t, has)
require.Contains(t, ctHeader, "application/x-www-form-urlencoded")
// `&` must be percent-encoded, space encoded as `+`.
require.Contains(t, string(upstreamBody), "user=alice+1")
require.Contains(t, string(upstreamBody), "msg=hi%26there")
host.CompleteHttp()
})
// -------------------------------------------------------------------
// argsToUrlParam → default args merged into URL query
// -------------------------------------------------------------------
t.Run("argsToUrlParam merges args into query", func(t *testing.T) {
cfg, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{"name": "rest-qp", "type": "rest"},
"tools": []map[string]interface{}{{
"name": "search",
"description": "search",
"args": []map[string]interface{}{
{"name": "q", "description": "q", "type": "string", "required": true},
{"name": "limit", "description": "lim", "type": "integer"},
},
"requestTemplate": map[string]interface{}{
"url": "http://backend.example/search?pre=set",
"method": "GET",
"argsToUrlParam": true,
},
}},
})
host, status := test.NewTestHost(cfg)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"},
{"content-type", "application/json"},
})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"search","arguments":{"q":"hello","limit":10}}}`))
upstreamHeaders := host.GetRequestHeaders()
pathVal, has := test.GetHeaderValue(upstreamHeaders, ":path")
require.True(t, has)
require.Contains(t, pathVal, "pre=set")
require.Contains(t, pathVal, "q=hello")
require.Contains(t, pathVal, "limit=10")
host.CompleteHttp()
})
// -------------------------------------------------------------------
// Direct-response tool → no backend call, response template fires
// -------------------------------------------------------------------
t.Run("direct response tool emits template result", func(t *testing.T) {
cfg, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{"name": "rest-dr", "type": "rest"},
"tools": []map[string]interface{}{{
"name": "ping",
"description": "static ping",
"args": []map[string]interface{}{
{"name": "name", "description": "n", "type": "string", "required": true},
},
// No requestTemplate.url → direct-response mode.
"responseTemplate": map[string]interface{}{
"body": "hello {{.args.name}}",
},
}},
})
host, status := test.NewTestHost(cfg)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"},
{"content-type", "application/json"},
})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"ping","arguments":{"name":"world"}}}`))
// Direct-response writes via SendLocalResponse, not the streaming response body.
localResp := host.GetLocalResponse()
require.NotNil(t, localResp, "direct-response must emit a local response with no backend call")
require.Contains(t, string(localResp.Data), "hello world")
host.CompleteHttp()
})
// -------------------------------------------------------------------
// Image content-type → SendMCPToolImageResult path (base64 in response)
// -------------------------------------------------------------------
t.Run("image content-type produces image MCP result", func(t *testing.T) {
cfg, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{"name": "rest-img", "type": "rest"},
"tools": []map[string]interface{}{{
"name": "get_image",
"description": "image fetch",
"args": []map[string]interface{}{{"name": "id", "description": "id", "type": "string"}},
"requestTemplate": map[string]interface{}{
"url": "http://backend.example/img/{{.args.id}}", "method": "GET",
},
}},
})
host, status := test.NewTestHost(cfg)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"}, {"content-type", "application/json"}})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_image","arguments":{"id":"42"}}}`))
pngBytes := []byte{0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A}
host.CallOnHttpResponseHeaders([][2]string{{":status", "200"}, {"Content-Type", "image/png"}})
host.CallOnHttpResponseBody(pngBytes)
respBody := host.GetResponseBody()
require.NotEmpty(t, respBody)
require.Contains(t, string(respBody), `"type":"image"`)
require.Contains(t, string(respBody), `"mimeType":"image/png"`)
host.CompleteHttp()
})
// -------------------------------------------------------------------
// outputSchema + JSON backend → structuredContent populated
// -------------------------------------------------------------------
t.Run("outputSchema with JSON body emits structuredContent", func(t *testing.T) {
cfg, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{"name": "rest-os", "type": "rest"},
"tools": []map[string]interface{}{{
"name": "info",
"description": "info",
"args": []map[string]interface{}{},
"outputSchema": map[string]interface{}{"type": "object"},
"requestTemplate": map[string]interface{}{
"url": "http://backend.example/info", "method": "GET",
},
}},
})
host, status := test.NewTestHost(cfg)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"}, {"content-type", "application/json"}})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"info","arguments":{}}}`))
host.CallOnHttpResponseHeaders([][2]string{{":status", "200"}, {"Content-Type", "application/json"}})
host.CallOnHttpResponseBody([]byte(`{"a":1,"b":"two"}`))
respBody := host.GetResponseBody()
require.NotEmpty(t, respBody)
require.Contains(t, string(respBody), "structuredContent", "outputSchema must produce structuredContent field")
host.CompleteHttp()
})
// -------------------------------------------------------------------
// errorResponseTemplate fires on 4xx/5xx, _headers is accessible
// -------------------------------------------------------------------
t.Run("errorResponseTemplate renders on backend error", func(t *testing.T) {
cfg, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{"name": "rest-err", "type": "rest"},
"tools": []map[string]interface{}{{
"name": "fail",
"description": "fail",
"args": []map[string]interface{}{},
"errorResponseTemplate": `upstream said: {{gjson "_headers.:status"}} - {{gjson "message"}}`,
"requestTemplate": map[string]interface{}{
"url": "http://backend.example/fail", "method": "GET",
},
}},
})
host, status := test.NewTestHost(cfg)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"}, {"content-type", "application/json"}})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"fail","arguments":{}}}`))
host.CallOnHttpResponseHeaders([][2]string{{":status", "500"}, {"Content-Type", "application/json"}})
host.CallOnHttpResponseBody([]byte(`{"message":"boom"}`))
respBody := host.GetResponseBody()
require.NotEmpty(t, respBody)
require.Contains(t, string(respBody), "upstream said: 500 - boom")
require.Contains(t, string(respBody), `"isError":true`)
host.CompleteHttp()
})
// -------------------------------------------------------------------
// prependBody + appendBody wrap raw response
// -------------------------------------------------------------------
t.Run("prependBody/appendBody wrap raw response", func(t *testing.T) {
cfg, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{"name": "rest-wrap", "type": "rest"},
"tools": []map[string]interface{}{{
"name": "say",
"description": "say",
"args": []map[string]interface{}{},
"requestTemplate": map[string]interface{}{
"url": "http://backend.example/say", "method": "GET",
},
"responseTemplate": map[string]interface{}{
"prependBody": "<<",
"appendBody": ">>",
},
}},
})
host, status := test.NewTestHost(cfg)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"}, {"content-type", "application/json"}})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"say","arguments":{}}}`))
host.CallOnHttpResponseHeaders([][2]string{{":status", "200"}, {"Content-Type", "text/plain"}})
host.CallOnHttpResponseBody([]byte("hi"))
respBody := host.GetResponseBody()
require.NotEmpty(t, respBody)
// Unmarshal to dodge JSON unicode-escape encoding of < and >.
var parsed map[string]interface{}
require.NoError(t, json.Unmarshal(respBody, &parsed))
result := parsed["result"].(map[string]interface{})
content := result["content"].([]interface{})
text := content[0].(map[string]interface{})["text"].(string)
require.Equal(t, "<<hi>>", text)
host.CompleteHttp()
})
// -------------------------------------------------------------------
// path arg with reserved chars → substituted into URL template
// -------------------------------------------------------------------
t.Run("path arg substitutes into URL placeholder", func(t *testing.T) {
cfg, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{"name": "rest-path", "type": "rest"},
"tools": []map[string]interface{}{{
"name": "get_user",
"description": "by id",
"args": []map[string]interface{}{
{"name": "id", "description": "uid", "type": "string", "required": true, "position": "path"},
},
"requestTemplate": map[string]interface{}{
"url": "http://backend.example/users/{id}",
"method": "GET",
},
}},
})
host, status := test.NewTestHost(cfg)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"}, {"content-type", "application/json"}})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_user","arguments":{"id":"alice%20smith"}}}`))
upstreamHeaders := host.GetRequestHeaders()
pathVal, has := test.GetHeaderValue(upstreamHeaders, ":path")
require.True(t, has)
// Path arg is substituted as-is into the URL template before being parsed.
require.Contains(t, pathVal, "/users/alice")
host.CompleteHttp()
})
// -------------------------------------------------------------------
// header arg from args list lands as upstream header
// -------------------------------------------------------------------
t.Run("header-position arg becomes upstream header", func(t *testing.T) {
cfg, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{"name": "rest-hdr", "type": "rest"},
"tools": []map[string]interface{}{{
"name": "auth_call",
"description": "with header",
"args": []map[string]interface{}{
{"name": "X-Trace", "description": "trace", "type": "string", "required": true, "position": "header"},
},
"requestTemplate": map[string]interface{}{
"url": "http://backend.example/protected",
"method": "GET",
},
}},
})
host, status := test.NewTestHost(cfg)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"}, {"content-type", "application/json"}})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"auth_call","arguments":{"X-Trace":"abc-123"}}}`))
upstreamHeaders := host.GetRequestHeaders()
require.True(t, test.HasHeaderWithValue(upstreamHeaders, "X-Trace", "abc-123"))
host.CompleteHttp()
})
})
}
// -----------------------------------------------------------------------------
// Phase 2.2 — SSE state machine error / edge paths
//
// Drive sse_proxy.go's handleSSEStreamingResponse + handleWaitingEndpoint /
// handleWaitingInitResp / handleWaitingToolResp through error branches that
// the happy-path TestMcpProxyServerSSE* tests don't cover.
// -----------------------------------------------------------------------------
func TestMcpProxyServerSSE_NonSSEContentTypeRejected(t *testing.T) {
// Backend returns text/plain instead of text/event-stream → first-chunk
// content-type validation must inject a JSON-RPC error.
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerSSEConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"},
{"content-type", "application/json"},
})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}`))
// First-chunk: backend returned a non-SSE content-type.
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
host.CallOnHttpStreamingResponseBody([]byte(`{"not":"sse"}`), true)
respBody := host.GetResponseBody()
require.NotEmpty(t, respBody)
require.Contains(t, string(respBody), "error", "rejected non-SSE response must surface a JSON-RPC error")
host.CompleteHttp()
})
}
func TestMcpProxyServerSSE_CharsetSuffixAccepted(t *testing.T) {
// content-type: text/event-stream;charset=utf-8 must still be accepted
// (substring match on text/event-stream).
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerSSEConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"},
{"content-type", "application/json"},
})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}`))
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream; charset=utf-8"},
})
// Endpoint event triggers initialize — we don't drive past this,
// just confirm content-type-suffix wasn't rejected (no error in body).
host.CallOnHttpStreamingResponseBody([]byte("event: endpoint\ndata: /sse/abc\n\n"), false)
// No injected error yet — buffer is just being consumed.
// Local response should NOT have been set with an error.
localResp := host.GetLocalResponse()
if localResp != nil {
require.NotContains(t, string(localResp.Data), "invalid content-type",
"text/event-stream with charset suffix must NOT be rejected")
}
host.CompleteHttp()
})
}
func TestMcpProxyServerSSE_EndpointSkipsUnrelatedEvents(t *testing.T) {
// Send a `ping` event before `endpoint` — the state machine must skip
// non-endpoint messages while in WaitingEndpoint and not error out.
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerSSEConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"}, {"content-type", "application/json"}})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}`))
host.CallOnHttpResponseHeaders([][2]string{{":status", "200"}, {"content-type", "text/event-stream"}})
// ping first — must be ignored.
host.CallOnHttpStreamingResponseBody([]byte("event: ping\ndata: keep-alive\n\n"), false)
// Then the real endpoint event.
host.CallOnHttpStreamingResponseBody([]byte("event: endpoint\ndata: /sse/session-xyz\n\n"), false)
// Verify the initialize request was sent upstream (proxy moved past WaitingEndpoint).
callouts := host.GetHttpCalloutAttributes()
require.NotEmpty(t, callouts, "endpoint event should trigger an initialize HTTP callout")
var sawInit bool
for _, c := range callouts {
if strings.Contains(string(c.Body), `"method":"initialize"`) {
sawInit = true
break
}
}
require.True(t, sawInit, "an initialize JSON-RPC call must have been sent after endpoint event")
host.CompleteHttp()
})
}
// -----------------------------------------------------------------------------
// Phase 2.3 — mcp-proxy Initialize callback error paths (proxy_tool.go:107-204)
// -----------------------------------------------------------------------------
func TestMcpProxyServer_InitializeBackend500(t *testing.T) {
// Initialize HTTP callback receives non-2xx status → must inject error response.
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"}, {"content-type", "application/json"}})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}`))
// Backend returns 500 on the initialize call.
host.CallOnHttpCall([][2]string{{":status", "500"}, {"content-type", "application/json"}}, []byte(`{"error":"down"}`))
// Errors go through SendLocalResponse, not the streaming body.
localResp := host.GetLocalResponse()
require.NotNil(t, localResp, "non-200 initialize response must inject a local error response")
require.Contains(t, string(localResp.Data), "error")
host.CompleteHttp()
})
}
func TestMcpProxyServer_InitializeMalformedJSON(t *testing.T) {
// Initialize response body is not valid JSON → parse error path.
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"}, {"content-type", "application/json"}})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}`))
host.CallOnHttpCall([][2]string{{":status", "200"}, {"content-type", "application/json"}}, []byte(`{not valid json`))
localResp := host.GetLocalResponse()
require.NotNil(t, localResp, "unparseable initialize response must inject a local error response")
require.Contains(t, string(localResp.Data), "error")
host.CompleteHttp()
})
}
func TestMcpProxyServer_InitializeSSEContentType(t *testing.T) {
// Initialize response carries text/event-stream → parseSSEResponse path.
// We send a valid SSE-wrapped JSON-RPC success body so the proxy unwraps and
// progresses past initialize. End-to-end completion is not the point — we
// just need to drive the SSE-unwrap branch.
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"}, {"content-type", "application/json"}})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}`))
// SSE-wrapped initialize success response — proxy must extract the data line.
sseInit := "event: message\ndata: " + `{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-03-26","capabilities":{"tools":{}},"serverInfo":{"name":"X","version":"1"}}}` + "\n\n"
host.CallOnHttpCall([][2]string{{":status", "200"}, {"content-type", "text/event-stream"}, {"mcp-session-id", "sse-session"}}, []byte(sseInit))
// If SSE-unwrap worked, the proxy will have moved to sending the initialized notification.
// Look for a 2nd outbound callout — its presence confirms initialize succeeded.
callouts := host.GetHttpCalloutAttributes()
var sawNotification bool
for _, c := range callouts {
if strings.Contains(string(c.Body), "notifications/initialized") {
sawNotification = true
break
}
}
require.True(t, sawNotification, "SSE-wrapped initialize response must be unwrapped so notification fires")
host.CompleteHttp()
})
}
func TestMcpProxyServer_InitializeUnknownErrorCode(t *testing.T) {
// Initialize returns a JSON-RPC error with a code OTHER than -32602
// → generic "backend initialization failed" path.
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"}, {"content-type", "application/json"}})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}`))
// JSON-RPC error with code != -32602.
host.CallOnHttpCall([][2]string{{":status", "200"}, {"content-type", "application/json"}},
[]byte(`{"jsonrpc":"2.0","id":1,"error":{"code":-32603,"message":"internal"}}`))
localResp := host.GetLocalResponse()
require.NotNil(t, localResp, "unknown error code must inject a local error response")
require.Contains(t, string(localResp.Data), "error")
host.CompleteHttp()
})
}
// -----------------------------------------------------------------------------
// Phase 2.4 — plugin.go HOST entry edge cases (onHttpRequestHeaders)
// -----------------------------------------------------------------------------
func TestPlugin_GetMethodRejected(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(restMCPServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "x"}, {":method", "GET"}, {":path", "/mcp"},
})
require.Equal(t, types.HeaderStopAllIterationAndWatermark, action)
localResp := host.GetLocalResponse()
require.NotNil(t, localResp, "GET must produce a local 405 response")
require.Equal(t, uint32(405), localResp.StatusCode)
host.CompleteHttp()
})
}
func TestPlugin_DeleteMethodRejected(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(restMCPServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "x"}, {":method", "DELETE"}, {":path", "/mcp"},
})
require.Equal(t, types.HeaderStopAllIterationAndWatermark, action)
localResp := host.GetLocalResponse()
require.NotNil(t, localResp, "DELETE must produce a local 405 response")
require.Equal(t, uint32(405), localResp.StatusCode)
host.CompleteHttp()
})
}
// Note: the wasm-go test harness always reports a request body as present, so
// the "POST with no body → 400" branch is not exercisable here. The dedicated
// pure-test for ctx.HasRequestBody() handling lives in pkg/mcp/server tests.
func TestPlugin_McpProtocolVersionHeaderStripped(t *testing.T) {
// MCP-Protocol-Version is parsed and removed; request continues normally.
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(restMCPServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"},
{"content-type", "application/json"},
{"MCP-Protocol-Version", "2025-03-26"},
})
// Valid version: header is consumed, request continues.
require.Equal(t, types.HeaderStopIteration, action)
// The header should have been removed before forwarding.
_, stillPresent := test.GetHeaderValue(host.GetRequestHeaders(), "MCP-Protocol-Version")
require.False(t, stillPresent, "MCP-Protocol-Version header must be stripped from forwarded request")
host.CompleteHttp()
})
}
func TestPlugin_McpProtocolVersionUnsupportedStillStripped(t *testing.T) {
// Unsupported version logs a warning but still strips the header and continues.
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(restMCPServerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"},
{"content-type", "application/json"},
{"MCP-Protocol-Version", "9999-99-99"},
})
require.Equal(t, types.HeaderStopIteration, action)
_, stillPresent := test.GetHeaderValue(host.GetRequestHeaders(), "MCP-Protocol-Version")
require.False(t, stillPresent, "even unsupported MCP-Protocol-Version must be stripped")
host.CompleteHttp()
})
}
func TestMcpProxyServerSSE_PartialChunkBuffered(t *testing.T) {
// SSE event arrives split across two chunks — first chunk has no terminator,
// second chunk completes the message. Proxy must wait, not error.
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyServerSSEConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{{":authority", "x"}, {":method", "POST"}, {":path", "/mcp"}, {"content-type", "application/json"}})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}`))
host.CallOnHttpResponseHeaders([][2]string{{":status", "200"}, {"content-type", "text/event-stream"}})
// First chunk: prefix only, no blank line terminator.
host.CallOnHttpStreamingResponseBody([]byte("event: endpoint\ndata: /sse/sess"), false)
// No callouts yet — message not yet complete.
require.Empty(t, host.GetHttpCalloutAttributes(),
"incomplete chunk must not trigger any upstream calls")
// Second chunk: completes data + blank line.
host.CallOnHttpStreamingResponseBody([]byte("ion-split\n\n"), false)
// Now initialize should have been sent.
callouts := host.GetHttpCalloutAttributes()
require.NotEmpty(t, callouts, "complete endpoint event must trigger initialize")
host.CompleteHttp()
})
}
// -----------------------------------------------------------------------------
// Phase 2.5 — ExtractAndRemoveIncomingCredential (HOST end-to-end)
//
// The pure function lives in pkg/mcp/server/auth_utils.go and is HOST-coupled
// because it calls proxywasm.GetHttpRequestHeader / RemoveHttpRequestHeader.
// We exercise it via the mcp-proxy tools/list path, which is the cleanest entry
// that hits ExtractAndRemoveIncomingCredential before any upstream call.
//
// Server shape: downstreamSecurity with Passthrough=true → the credential
// extracted from the *incoming* request is reused on the *upstream* request.
// Verifying the upstream callout headers proves both the extraction and the
// removal worked.
// -----------------------------------------------------------------------------
// mcpProxyPassthroughApiKeyConfig — downstream apiKey/header passthrough to
// upstream apiKey/header. The two schemes have different header names so we
// can independently assert (a) downstream header was removed and (b) upstream
// header carries the passthrough value.
var mcpProxyPassthroughApiKeyConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{
"name": "proxy-passthrough-apikey",
"type": "mcp-proxy",
"transport": "http",
"mcpServerURL": "http://backend-mcp.example.com/mcp",
"timeout": 5000,
"defaultDownstreamSecurity": map[string]interface{}{
"id": "ClientKey",
"passthrough": true,
},
"defaultUpstreamSecurity": map[string]interface{}{
"id": "BackendKey",
},
"securitySchemes": []map[string]interface{}{
{
"id": "ClientKey",
"type": "apiKey",
"in": "header",
"name": "X-Client-Key",
},
{
"id": "BackendKey",
"type": "apiKey",
"in": "header",
"name": "X-Backend-Key",
"defaultCredential": "fallback-default",
},
},
},
"tools": []map[string]interface{}{
{"name": "noop", "type": "mcp-proxy", "description": "noop"},
},
})
return data
}()
func TestExtractAndRemoveIncomingCredential_ApiKeyHeaderPassthrough(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyPassthroughApiKeyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
{"X-Client-Key", "secret-from-client"},
})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}`))
callouts := host.GetHttpCalloutAttributes()
require.NotEmpty(t, callouts, "tools/list must trigger an initialize callout")
init := callouts[0]
// Passthrough credential rides on the upstream scheme's header name.
require.True(t, test.HasHeaderWithValue(init.Headers, "X-Backend-Key", "secret-from-client"),
"upstream initialize must carry the extracted client credential under the upstream header name")
// The downstream header itself must NOT leak through to the upstream call.
if v, present := test.GetHeaderValue(init.Headers, "X-Client-Key"); present {
t.Errorf("downstream credential header X-Client-Key must be removed; got %q", v)
}
host.CompleteHttp()
})
}
// mcpProxyPassthroughBearerConfig — downstream http/bearer passthrough to
// upstream http/bearer. Stripping `Bearer ` from the incoming Authorization
// is part of ExtractAndRemoveIncomingCredential's contract.
var mcpProxyPassthroughBearerConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"server": map[string]interface{}{
"name": "proxy-passthrough-bearer",
"type": "mcp-proxy",
"transport": "http",
"mcpServerURL": "http://backend-mcp.example.com/mcp",
"timeout": 5000,
"defaultDownstreamSecurity": map[string]interface{}{
"id": "ClientBearer",
"passthrough": true,
},
"defaultUpstreamSecurity": map[string]interface{}{
"id": "BackendBearer",
},
"securitySchemes": []map[string]interface{}{
{"id": "ClientBearer", "type": "http", "scheme": "bearer"},
{"id": "BackendBearer", "type": "http", "scheme": "bearer", "defaultCredential": "default-token"},
},
},
"tools": []map[string]interface{}{
{"name": "noop", "type": "mcp-proxy", "description": "noop"},
},
})
return data
}()
func TestExtractAndRemoveIncomingCredential_HttpBearerPassthrough(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyPassthroughBearerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
{"Authorization", "Bearer client-token-xyz"},
})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}`))
callouts := host.GetHttpCalloutAttributes()
require.NotEmpty(t, callouts, "tools/list must trigger an initialize callout")
init := callouts[0]
// Upstream Authorization must use the EXTRACTED token (Bearer prefix
// stripped on the way in, re-applied on the way out by ApplySecurity).
authValue, present := test.GetHeaderValue(init.Headers, "Authorization")
require.True(t, present, "upstream must carry Authorization for upstream bearer scheme")
require.Equal(t, "Bearer client-token-xyz", authValue,
"passthrough token must round-trip as `Bearer <token>` to upstream")
host.CompleteHttp()
})
}
// Missing downstream header — ExtractAndRemoveIncomingCredential returns ""
// (no error). With Passthrough=true but no incoming credential, passthrough
// is skipped and the upstream scheme falls back to its DefaultCredential.
func TestExtractAndRemoveIncomingCredential_MissingHeaderFallsBackToDefault(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
host, status := test.NewTestHost(mcpProxyPassthroughApiKeyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.InitHttp()
// Note: no X-Client-Key header on the way in.
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "mcp.example.com"},
{":method", "POST"},
{":path", "/mcp"},
{"content-type", "application/json"},
})
host.CallOnHttpRequestBody([]byte(`{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}`))
callouts := host.GetHttpCalloutAttributes()
require.NotEmpty(t, callouts, "tools/list must trigger an initialize callout")
init := callouts[0]
// Missing client credential → not an error, just fall through to default.
require.True(t, test.HasHeaderWithValue(init.Headers, "X-Backend-Key", "fallback-default"),
"missing client credential must NOT cause an error; upstream falls back to DefaultCredential")
host.CompleteHttp()
})
}