feat: route bedrock messages through mantle (#3820)

Signed-off-by: wydream <yaodiwu618@gmail.com>
Signed-off-by: woody <yaodiwu618@gmail.com>
This commit is contained in:
woody
2026-05-20 18:13:04 +08:00
committed by GitHub
parent 739d47ba9c
commit 37b038f797
5 changed files with 327 additions and 21 deletions

View File

@@ -11,6 +11,7 @@ import (
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
"github.com/higress-group/wasm-go/pkg/test"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
)
// Test config: Basic Bedrock config with AWS Access Key/Secret Key (AWS Signature V4)
@@ -116,6 +117,37 @@ var bedrockApiTokenConfig = func() json.RawMessage {
return data
}()
var bedrockMantleApiTokenConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"provider": map[string]interface{}{
"type": "bedrock",
"apiTokens": []string{
"test-token-for-unit-test",
},
"awsRegion": "us-east-1",
"modelMapping": map[string]string{
"*": "anthropic.claude-opus-4-7",
},
},
})
return data
}()
var bedrockMantleAkSkConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"provider": map[string]interface{}{
"type": "bedrock",
"awsAccessKey": "test-ak-for-unit-test",
"awsSecretKey": "test-sk-for-unit-test",
"awsRegion": "us-east-1",
"modelMapping": map[string]string{
"*": "anthropic.claude-opus-4-7",
},
},
})
return data
}()
func bedrockApiTokenConfigWithCachePointPositions(positions map[string]bool) json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"provider": map[string]interface{}{
@@ -425,6 +457,136 @@ func RunBedrockOnHttpRequestBodyTests(t *testing.T) {
require.Contains(t, pathValue, "/converse", "Path should contain converse endpoint")
})
t.Run("bedrock anthropic messages request should use mantle endpoint and preserve native body", func(t *testing.T) {
host, status := test.NewTestHost(bedrockMantleApiTokenConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/messages"},
{":method", "POST"},
{"Content-Type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
requestHeaders := host.GetRequestHeaders()
hostValue, hasHost := test.GetHeaderValue(requestHeaders, ":authority")
require.True(t, hasHost, "Host header should exist")
require.Equal(t, "bedrock-mantle.us-east-1.api.aws", hostValue)
pathValue, hasPath := test.GetHeaderValue(requestHeaders, ":path")
require.True(t, hasPath, "Path header should exist")
require.Equal(t, "/anthropic/v1/messages", pathValue)
apiKeyValue, hasAPIKey := test.GetHeaderValue(requestHeaders, "x-api-key")
require.True(t, hasAPIKey, "x-api-key header should exist")
require.Equal(t, "test-token-for-unit-test", apiKeyValue)
_, hasAuth := test.GetHeaderValue(requestHeaders, "Authorization")
require.False(t, hasAuth, "Authorization header should not be sent for Mantle token auth")
anthropicVersion, hasAnthropicVersion := test.GetHeaderValue(requestHeaders, "anthropic-version")
require.True(t, hasAnthropicVersion, "anthropic-version header should exist")
require.Equal(t, "2023-06-01", anthropicVersion)
requestBody := `{
"model": "claude-request-model",
"max_tokens": 1024,
"messages": [{
"role": "assistant",
"content": [{
"type": "tool_use",
"id": "toolu_1",
"name": "list_items",
"input": {}
}]
}]
}`
action = host.CallOnHttpRequestBody([]byte(requestBody))
require.Equal(t, types.ActionContinue, action)
processedBody := host.GetRequestBody()
require.Equal(t, "anthropic.claude-opus-4-7", gjson.GetBytes(processedBody, "model").String())
require.Equal(t, "{}", gjson.GetBytes(processedBody, "messages.0.content.0.input").Raw)
require.False(t, gjson.GetBytes(processedBody, "inferenceConfig").Exists(), "native Anthropic body should not be converted to Bedrock Converse")
require.False(t, gjson.GetBytes(processedBody, "toolConfig").Exists(), "native Anthropic tools should not be converted to Bedrock Converse")
requestHeaders = host.GetRequestHeaders()
pathValue, hasPath = test.GetHeaderValue(requestHeaders, ":path")
require.True(t, hasPath, "Path header should exist")
require.Equal(t, "/anthropic/v1/messages", pathValue)
})
t.Run("bedrock anthropic messages request with ak sk should sign for mantle service", func(t *testing.T) {
host, status := test.NewTestHost(bedrockMantleAkSkConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/messages"},
{":method", "POST"},
{"Content-Type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
requestBody := `{
"model": "claude-request-model",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Hello"}]
}`
action = host.CallOnHttpRequestBody([]byte(requestBody))
require.Equal(t, types.ActionContinue, action)
requestHeaders := host.GetRequestHeaders()
hostValue, hasHost := test.GetHeaderValue(requestHeaders, ":authority")
require.True(t, hasHost, "Host header should exist")
require.Equal(t, "bedrock-mantle.us-east-1.api.aws", hostValue)
pathValue, hasPath := test.GetHeaderValue(requestHeaders, ":path")
require.True(t, hasPath, "Path header should exist")
require.Equal(t, "/anthropic/v1/messages", pathValue)
authValue, hasAuth := test.GetHeaderValue(requestHeaders, "Authorization")
require.True(t, hasAuth, "Authorization header should exist")
require.Contains(t, authValue, "AWS4-HMAC-SHA256")
require.Contains(t, authValue, "/bedrock-mantle/aws4_request")
})
t.Run("bedrock anthropic messages streaming request should keep mantle path and accept sse", func(t *testing.T) {
host, status := test.NewTestHost(bedrockMantleApiTokenConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/messages"},
{":method", "POST"},
{"Content-Type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
requestBody := `{
"model": "claude-request-model",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Hello"}],
"stream": true
}`
action = host.CallOnHttpRequestBody([]byte(requestBody))
require.Equal(t, types.ActionContinue, action)
requestHeaders := host.GetRequestHeaders()
pathValue, hasPath := test.GetHeaderValue(requestHeaders, ":path")
require.True(t, hasPath, "Path header should exist")
require.Equal(t, "/anthropic/v1/messages", pathValue)
acceptValue, hasAccept := test.GetHeaderValue(requestHeaders, "Accept")
require.True(t, hasAccept, "Accept header should exist")
require.Equal(t, "text/event-stream", acceptValue)
})
t.Run("bedrock request body prompt cache in-memory should inject system cache point only by default", func(t *testing.T) {
host, status := test.NewTestHost(bedrockApiTokenConfig)
defer host.Reset()
@@ -1821,6 +1983,40 @@ func RunBedrockOnHttpResponseBodyTests(t *testing.T) {
_, hasCacheWriteTokens := promptTokensDetails["cache_write_tokens"]
require.False(t, hasCacheWriteTokens, "cache_write_tokens should not exist in OpenAI-compatible usage")
})
t.Run("bedrock anthropic messages response body should pass through", func(t *testing.T) {
host, status := test.NewTestHost(bedrockMantleApiTokenConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/messages"},
{":method", "POST"},
{"Content-Type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
requestBody := `{
"model": "claude-request-model",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Hello"}]
}`
action = host.CallOnHttpRequestBody([]byte(requestBody))
require.Equal(t, types.ActionContinue, action)
host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream"))
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"Content-Type", "application/json"},
})
require.Equal(t, types.ActionContinue, action)
responseBody := `{"id":"msg_01","type":"message","role":"assistant","model":"anthropic.claude-opus-4-7","content":[{"type":"text","text":"Hello"}],"stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":2}}`
action = host.CallOnHttpResponseBody([]byte(responseBody))
require.Equal(t, types.ActionContinue, action)
require.JSONEq(t, responseBody, string(host.GetResponseBody()))
})
})
}
@@ -1840,6 +2036,41 @@ func RunBedrockOnStreamingResponseBodyTests(t *testing.T) {
require.Equal(t, "", payload)
})
t.Run("bedrock anthropic messages streaming response should pass through mantle sse", func(t *testing.T) {
host, status := test.NewTestHost(bedrockMantleApiTokenConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/messages"},
{":method", "POST"},
{"Content-Type", "application/json"},
})
require.Equal(t, types.HeaderStopIteration, action)
requestBody := `{
"model": "claude-request-model",
"max_tokens": 1024,
"messages": [{"role": "user", "content": "Hello"}],
"stream": true
}`
action = host.CallOnHttpRequestBody([]byte(requestBody))
require.Equal(t, types.ActionContinue, action)
host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream"))
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"Content-Type", "text/event-stream; charset=utf-8"},
})
require.Equal(t, types.ActionContinue, action)
chunk := []byte("event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hi\"}}\n\n")
action = host.CallOnHttpStreamingResponseBody(chunk, false)
require.Equal(t, types.ActionContinue, action)
require.Equal(t, chunk, host.GetResponseBody())
})
t.Run("bedrock streaming parallel tool calls should use dense OpenAI indexes", func(t *testing.T) {
host, status := test.NewTestHost(bedrockApiTokenConfig)
defer host.Reset()