From 37b038f7976d4a105efaff01697685ebf56ec21d Mon Sep 17 00:00:00 2001 From: woody Date: Wed, 20 May 2026 18:13:04 +0800 Subject: [PATCH] feat: route bedrock messages through mantle (#3820) Signed-off-by: wydream Signed-off-by: woody --- plugins/wasm-go/extensions/ai-proxy/README.md | 2 + .../wasm-go/extensions/ai-proxy/README_EN.md | 2 + .../extensions/ai-proxy/provider/bedrock.go | 108 ++++++-- .../extensions/ai-proxy/provider/provider.go | 5 +- .../extensions/ai-proxy/test/bedrock.go | 231 ++++++++++++++++++ 5 files changed, 327 insertions(+), 21 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-proxy/README.md b/plugins/wasm-go/extensions/ai-proxy/README.md index 45a0d2b6..9ba80ac7 100644 --- a/plugins/wasm-go/extensions/ai-proxy/README.md +++ b/plugins/wasm-go/extensions/ai-proxy/README.md @@ -2325,6 +2325,8 @@ Vertex AI 支持的分辨率(imageSize):`1k`、`2k`、`4k` ### 使用 OpenAI 协议代理 AWS Bedrock 服务 +对于 Bedrock,`/v1/chat/completions` 会继续转换为 Bedrock Runtime Converse API;`/v1/messages` 会直接转发到 Bedrock Mantle 的 Anthropic Messages API:`https://bedrock-mantle.{awsRegion}.api.aws/anthropic/v1/messages`,请求体、响应体和流式 SSE 都保持 Anthropic 原生格式,仅执行模型映射和认证处理。使用 `apiTokens` 访问 Mantle 时,插件会写入 `x-api-key` 请求头。 + AWS Bedrock 支持两种认证方式: #### 方式一:使用 AWS Access Key/Secret Key 认证(AWS Signature V4) diff --git a/plugins/wasm-go/extensions/ai-proxy/README_EN.md b/plugins/wasm-go/extensions/ai-proxy/README_EN.md index 93924b13..5a119d8f 100644 --- a/plugins/wasm-go/extensions/ai-proxy/README_EN.md +++ b/plugins/wasm-go/extensions/ai-proxy/README_EN.md @@ -2101,6 +2101,8 @@ Vertex AI supported resolutions (imageSize): `1k`, `2k`, `4k` ### Utilizing OpenAI Protocol Proxy for AWS Bedrock Services +For Bedrock, `/v1/chat/completions` continues to be converted to the Bedrock Runtime Converse API. `/v1/messages` is forwarded directly to the Bedrock Mantle Anthropic Messages API: `https://bedrock-mantle.{awsRegion}.api.aws/anthropic/v1/messages`. The request body, response body, and streaming SSE keep the native Anthropic format; the plugin only applies model mapping and authentication handling. When `apiTokens` are used with Mantle, the plugin sends the token in the `x-api-key` request header. + AWS Bedrock supports two authentication methods: #### Method 1: Using AWS Access Key/Secret Key Authentication (AWS Signature V4) diff --git a/plugins/wasm-go/extensions/ai-proxy/provider/bedrock.go b/plugins/wasm-go/extensions/ai-proxy/provider/bedrock.go index e0cbbac9..013898a2 100644 --- a/plugins/wasm-go/extensions/ai-proxy/provider/bedrock.go +++ b/plugins/wasm-go/extensions/ai-proxy/provider/bedrock.go @@ -23,26 +23,32 @@ import ( "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" "github.com/higress-group/wasm-go/pkg/log" "github.com/higress-group/wasm-go/pkg/wrapper" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" ) const ( - httpPostMethod = "POST" - awsService = "bedrock" + httpPostMethod = "POST" + awsServiceBedrock = "bedrock" + awsServiceBedrockMantle = "bedrock-mantle" // bedrock-runtime.{awsRegion}.amazonaws.com bedrockDefaultDomain = "bedrock-runtime.%s.amazonaws.com" + // bedrock-mantle.{awsRegion}.api.aws + bedrockMantleDomain = "bedrock-mantle.%s.api.aws" // converse路径 /model/{modelId}/converse bedrockChatCompletionPath = "/model/%s/converse" // converseStream路径 /model/{modelId}/converse-stream bedrockStreamChatCompletionPath = "/model/%s/converse-stream" // invoke_model 路径 /model/{modelId}/invoke - bedrockInvokeModelPath = "/model/%s/invoke" - bedrockSignedHeaders = "host;x-amz-date" - requestIdHeader = "X-Amzn-Requestid" - bedrockCacheTypeDefault = "default" - bedrockCacheTTL5m = "5m" - bedrockCacheTTL1h = "1h" - bedrockPromptCacheNova = "amazon.nova" - bedrockPromptCacheClaude = "anthropic.claude" + bedrockInvokeModelPath = "/model/%s/invoke" + bedrockMantleMessagesPath = "/anthropic/v1/messages" + bedrockSignedHeaders = "host;x-amz-date" + requestIdHeader = "X-Amzn-Requestid" + bedrockCacheTypeDefault = "default" + bedrockCacheTTL5m = "5m" + bedrockCacheTTL1h = "1h" + bedrockPromptCacheNova = "amazon.nova" + bedrockPromptCacheClaude = "anthropic.claude" bedrockCachePointPositionSystemPrompt = "systemPrompt" bedrockCachePointPositionLastUserMessage = "lastUserMessage" @@ -73,8 +79,9 @@ func (b *bedrockProviderInitializer) ValidateConfig(config *ProviderConfig) erro func (b *bedrockProviderInitializer) DefaultCapabilities() map[string]string { return map[string]string{ - string(ApiNameChatCompletion): bedrockChatCompletionPath, - string(ApiNameImageGeneration): bedrockInvokeModelPath, + string(ApiNameChatCompletion): bedrockChatCompletionPath, + string(ApiNameAnthropicMessages): bedrockMantleMessagesPath, + string(ApiNameImageGeneration): bedrockInvokeModelPath, } } @@ -92,6 +99,10 @@ type bedrockProvider struct { } func (b *bedrockProvider) OnStreamingResponseBody(ctx wrapper.HttpContext, name ApiName, chunk []byte, isLastChunk bool) ([]byte, error) { + if name == ApiNameAnthropicMessages { + return chunk, nil + } + events := extractAmazonEventStreamEvents(ctx, chunk) if len(events) == 0 { if isLastChunk { @@ -718,6 +729,18 @@ func (b *bedrockProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiN } func (b *bedrockProvider) TransformRequestHeaders(ctx wrapper.HttpContext, apiName ApiName, headers http.Header) { + if apiName == ApiNameAnthropicMessages { + util.OverwriteRequestHostHeader(headers, fmt.Sprintf(bedrockMantleDomain, strings.TrimSpace(b.config.awsRegion))) + util.OverwriteRequestPathHeaderByCapability(headers, string(apiName), b.config.capabilities) + headers.Set("anthropic-version", b.anthropicVersion()) + + if len(b.config.apiTokens) > 0 { + headers.Set("x-api-key", b.config.GetApiTokenInUse(ctx)) + headers.Del(util.HeaderAuthorization) + } + return + } + util.OverwriteRequestHostHeader(headers, fmt.Sprintf(bedrockDefaultDomain, strings.TrimSpace(b.config.awsRegion))) // If apiTokens is configured, set Bearer token authentication here @@ -733,7 +756,7 @@ func (b *bedrockProvider) OnRequestBody(ctx wrapper.HttpContext, apiName ApiName // and only apply auth headers. if b.config.IsOriginal() { headers := util.GetRequestHeaders() - b.setAuthHeaders(body, headers) + b.setAuthHeaders(apiName, body, headers) util.ReplaceRequestHeaders(headers) return types.ActionContinue, replaceRequestBody(body) } @@ -750,6 +773,8 @@ func (b *bedrockProvider) TransformRequestBodyHeaders(ctx wrapper.HttpContext, a switch apiName { case ApiNameChatCompletion: transformedBody, err = b.onChatCompletionRequestBody(ctx, body, headers) + case ApiNameAnthropicMessages: + transformedBody, err = b.onAnthropicMessagesRequestBody(ctx, body, headers) case ApiNameImageGeneration: transformedBody, err = b.onImageGenerationRequestBody(ctx, body, headers) default: @@ -762,7 +787,7 @@ func (b *bedrockProvider) TransformRequestBodyHeaders(ctx wrapper.HttpContext, a // Always apply auth after request body/path are finalized. // For Bearer token mode this is a no-op; for AK/SK mode this generates SigV4 headers. - b.setAuthHeaders(transformedBody, headers) + b.setAuthHeaders(apiName, transformedBody, headers) return transformedBody, nil } @@ -770,6 +795,8 @@ func (b *bedrockProvider) TransformResponseBody(ctx wrapper.HttpContext, apiName switch apiName { case ApiNameChatCompletion: return b.onChatCompletionResponseBody(ctx, body) + case ApiNameAnthropicMessages: + return body, nil case ApiNameImageGeneration: return b.onImageGenerationResponseBody(body) } @@ -797,6 +824,28 @@ func (b *bedrockProvider) onImageGenerationRequestBody(ctx wrapper.HttpContext, return b.buildBedrockImageGenerationRequest(request, headers) } +func (b *bedrockProvider) onAnthropicMessagesRequestBody(ctx wrapper.HttpContext, body []byte, headers http.Header) ([]byte, error) { + if gjson.GetBytes(body, "stream").Bool() { + headers.Set("Accept", "text/event-stream") + ctx.SetContext(ctxKeyIsStreaming, true) + } else { + ctx.SetContext(ctxKeyIsStreaming, false) + } + + model := gjson.GetBytes(body, "model").String() + if err := b.config.mapModel(ctx, &model); err != nil { + return nil, err + } + return sjson.SetBytes(body, "model", model) +} + +func (b *bedrockProvider) anthropicVersion() string { + if b.config.apiVersion != "" { + return b.config.apiVersion + } + return claudeDefaultVersion +} + func (b *bedrockProvider) buildBedrockImageGenerationRequest(origRequest *imageGenerationRequest, headers http.Header) ([]byte, error) { width, height := 1024, 1024 pairs := strings.Split(origRequest.Size, "x") @@ -1587,7 +1636,7 @@ func claudeToolResultBlockToBedrock(block claudeChatMessageContent) *toolResultB return result } -func (b *bedrockProvider) setAuthHeaders(body []byte, headers http.Header) { +func (b *bedrockProvider) setAuthHeaders(apiName ApiName, body []byte, headers http.Header) { // Bearer token authentication is already set in TransformRequestHeaders // This function only handles AWS SigV4 authentication which requires the request body if len(b.config.apiTokens) > 0 { @@ -1601,32 +1650,51 @@ func (b *bedrockProvider) setAuthHeaders(body []byte, headers http.Header) { amzDate := t.Format("20060102T150405Z") dateStamp := t.Format("20060102") path := headers.Get(":path") - signature := b.generateSignature(path, amzDate, dateStamp, body) + service := bedrockAWSService(apiName) + signature := b.generateSignatureWithService(path, amzDate, dateStamp, body, service) headers.Set("X-Amz-Date", amzDate) - util.OverwriteRequestAuthorizationHeader(headers, fmt.Sprintf("AWS4-HMAC-SHA256 Credential=%s/%s/%s/%s/aws4_request, SignedHeaders=%s, Signature=%s", accessKey, dateStamp, region, awsService, bedrockSignedHeaders, signature)) + util.OverwriteRequestAuthorizationHeader(headers, fmt.Sprintf("AWS4-HMAC-SHA256 Credential=%s/%s/%s/%s/aws4_request, SignedHeaders=%s, Signature=%s", accessKey, dateStamp, region, service, bedrockSignedHeaders, signature)) } func (b *bedrockProvider) generateSignature(path, amzDate, dateStamp string, body []byte) string { + return b.generateSignatureWithService(path, amzDate, dateStamp, body, awsServiceBedrock) +} + +func (b *bedrockProvider) generateSignatureWithService(path, amzDate, dateStamp string, body []byte, service string) string { canonicalURI := encodeSigV4Path(path) hashedPayload := sha256Hex(body) region := strings.TrimSpace(b.config.awsRegion) secretKey := strings.TrimSpace(b.config.awsSecretKey) - endpoint := fmt.Sprintf(bedrockDefaultDomain, region) + endpoint := bedrockAWSEndpoint(service, region) canonicalHeaders := fmt.Sprintf("host:%s\nx-amz-date:%s\n", endpoint, amzDate) canonicalRequest := fmt.Sprintf("%s\n%s\n\n%s\n%s\n%s", httpPostMethod, canonicalURI, canonicalHeaders, bedrockSignedHeaders, hashedPayload) - credentialScope := fmt.Sprintf("%s/%s/%s/aws4_request", dateStamp, region, awsService) + credentialScope := fmt.Sprintf("%s/%s/%s/aws4_request", dateStamp, region, service) hashedCanonReq := sha256Hex([]byte(canonicalRequest)) stringToSign := fmt.Sprintf("AWS4-HMAC-SHA256\n%s\n%s\n%s", amzDate, credentialScope, hashedCanonReq) - signingKey := getSignatureKey(secretKey, dateStamp, region, awsService) + signingKey := getSignatureKey(secretKey, dateStamp, region, service) signature := hmacHex(signingKey, stringToSign) return signature } +func bedrockAWSService(apiName ApiName) string { + if apiName == ApiNameAnthropicMessages { + return awsServiceBedrockMantle + } + return awsServiceBedrock +} + +func bedrockAWSEndpoint(service, region string) string { + if service == awsServiceBedrockMantle { + return fmt.Sprintf(bedrockMantleDomain, region) + } + return fmt.Sprintf(bedrockDefaultDomain, region) +} + func encodeSigV4Path(path string) string { // Keep only the URI path for canonical URI. Query string is handled separately in SigV4, // and this implementation uses an empty canonical query string. diff --git a/plugins/wasm-go/extensions/ai-proxy/provider/provider.go b/plugins/wasm-go/extensions/ai-proxy/provider/provider.go index 40c0b82d..56301d50 100644 --- a/plugins/wasm-go/extensions/ai-proxy/provider/provider.go +++ b/plugins/wasm-go/extensions/ai-proxy/provider/provider.go @@ -1166,7 +1166,10 @@ func ExtractStreamingEvents(ctx wrapper.HttpContext, chunk []byte) []StreamEvent func (c *ProviderConfig) isSupportedAPI(apiName ApiName) bool { _, exist := c.capabilities[string(apiName)] - return exist + if exist { + return true + } + return c.typ == providerTypeBedrock && apiName == ApiNameAnthropicMessages } func (c *ProviderConfig) IsSupportedAPI(apiName ApiName) bool { diff --git a/plugins/wasm-go/extensions/ai-proxy/test/bedrock.go b/plugins/wasm-go/extensions/ai-proxy/test/bedrock.go index 4e2c214a..6f280b07 100644 --- a/plugins/wasm-go/extensions/ai-proxy/test/bedrock.go +++ b/plugins/wasm-go/extensions/ai-proxy/test/bedrock.go @@ -11,6 +11,7 @@ import ( "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" "github.com/higress-group/wasm-go/pkg/test" "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" ) // Test config: Basic Bedrock config with AWS Access Key/Secret Key (AWS Signature V4) @@ -116,6 +117,37 @@ var bedrockApiTokenConfig = func() json.RawMessage { return data }() +var bedrockMantleApiTokenConfig = func() json.RawMessage { + data, _ := json.Marshal(map[string]interface{}{ + "provider": map[string]interface{}{ + "type": "bedrock", + "apiTokens": []string{ + "test-token-for-unit-test", + }, + "awsRegion": "us-east-1", + "modelMapping": map[string]string{ + "*": "anthropic.claude-opus-4-7", + }, + }, + }) + return data +}() + +var bedrockMantleAkSkConfig = func() json.RawMessage { + data, _ := json.Marshal(map[string]interface{}{ + "provider": map[string]interface{}{ + "type": "bedrock", + "awsAccessKey": "test-ak-for-unit-test", + "awsSecretKey": "test-sk-for-unit-test", + "awsRegion": "us-east-1", + "modelMapping": map[string]string{ + "*": "anthropic.claude-opus-4-7", + }, + }, + }) + return data +}() + func bedrockApiTokenConfigWithCachePointPositions(positions map[string]bool) json.RawMessage { data, _ := json.Marshal(map[string]interface{}{ "provider": map[string]interface{}{ @@ -425,6 +457,136 @@ func RunBedrockOnHttpRequestBodyTests(t *testing.T) { require.Contains(t, pathValue, "/converse", "Path should contain converse endpoint") }) + t.Run("bedrock anthropic messages request should use mantle endpoint and preserve native body", func(t *testing.T) { + host, status := test.NewTestHost(bedrockMantleApiTokenConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + action := host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/messages"}, + {":method", "POST"}, + {"Content-Type", "application/json"}, + }) + require.Equal(t, types.HeaderStopIteration, action) + + requestHeaders := host.GetRequestHeaders() + hostValue, hasHost := test.GetHeaderValue(requestHeaders, ":authority") + require.True(t, hasHost, "Host header should exist") + require.Equal(t, "bedrock-mantle.us-east-1.api.aws", hostValue) + + pathValue, hasPath := test.GetHeaderValue(requestHeaders, ":path") + require.True(t, hasPath, "Path header should exist") + require.Equal(t, "/anthropic/v1/messages", pathValue) + + apiKeyValue, hasAPIKey := test.GetHeaderValue(requestHeaders, "x-api-key") + require.True(t, hasAPIKey, "x-api-key header should exist") + require.Equal(t, "test-token-for-unit-test", apiKeyValue) + + _, hasAuth := test.GetHeaderValue(requestHeaders, "Authorization") + require.False(t, hasAuth, "Authorization header should not be sent for Mantle token auth") + + anthropicVersion, hasAnthropicVersion := test.GetHeaderValue(requestHeaders, "anthropic-version") + require.True(t, hasAnthropicVersion, "anthropic-version header should exist") + require.Equal(t, "2023-06-01", anthropicVersion) + + requestBody := `{ + "model": "claude-request-model", + "max_tokens": 1024, + "messages": [{ + "role": "assistant", + "content": [{ + "type": "tool_use", + "id": "toolu_1", + "name": "list_items", + "input": {} + }] + }] + }` + + action = host.CallOnHttpRequestBody([]byte(requestBody)) + require.Equal(t, types.ActionContinue, action) + + processedBody := host.GetRequestBody() + require.Equal(t, "anthropic.claude-opus-4-7", gjson.GetBytes(processedBody, "model").String()) + require.Equal(t, "{}", gjson.GetBytes(processedBody, "messages.0.content.0.input").Raw) + require.False(t, gjson.GetBytes(processedBody, "inferenceConfig").Exists(), "native Anthropic body should not be converted to Bedrock Converse") + require.False(t, gjson.GetBytes(processedBody, "toolConfig").Exists(), "native Anthropic tools should not be converted to Bedrock Converse") + + requestHeaders = host.GetRequestHeaders() + pathValue, hasPath = test.GetHeaderValue(requestHeaders, ":path") + require.True(t, hasPath, "Path header should exist") + require.Equal(t, "/anthropic/v1/messages", pathValue) + }) + + t.Run("bedrock anthropic messages request with ak sk should sign for mantle service", func(t *testing.T) { + host, status := test.NewTestHost(bedrockMantleAkSkConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + action := host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/messages"}, + {":method", "POST"}, + {"Content-Type", "application/json"}, + }) + require.Equal(t, types.HeaderStopIteration, action) + + requestBody := `{ + "model": "claude-request-model", + "max_tokens": 1024, + "messages": [{"role": "user", "content": "Hello"}] + }` + action = host.CallOnHttpRequestBody([]byte(requestBody)) + require.Equal(t, types.ActionContinue, action) + + requestHeaders := host.GetRequestHeaders() + hostValue, hasHost := test.GetHeaderValue(requestHeaders, ":authority") + require.True(t, hasHost, "Host header should exist") + require.Equal(t, "bedrock-mantle.us-east-1.api.aws", hostValue) + + pathValue, hasPath := test.GetHeaderValue(requestHeaders, ":path") + require.True(t, hasPath, "Path header should exist") + require.Equal(t, "/anthropic/v1/messages", pathValue) + + authValue, hasAuth := test.GetHeaderValue(requestHeaders, "Authorization") + require.True(t, hasAuth, "Authorization header should exist") + require.Contains(t, authValue, "AWS4-HMAC-SHA256") + require.Contains(t, authValue, "/bedrock-mantle/aws4_request") + }) + + t.Run("bedrock anthropic messages streaming request should keep mantle path and accept sse", func(t *testing.T) { + host, status := test.NewTestHost(bedrockMantleApiTokenConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + action := host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/messages"}, + {":method", "POST"}, + {"Content-Type", "application/json"}, + }) + require.Equal(t, types.HeaderStopIteration, action) + + requestBody := `{ + "model": "claude-request-model", + "max_tokens": 1024, + "messages": [{"role": "user", "content": "Hello"}], + "stream": true + }` + action = host.CallOnHttpRequestBody([]byte(requestBody)) + require.Equal(t, types.ActionContinue, action) + + requestHeaders := host.GetRequestHeaders() + pathValue, hasPath := test.GetHeaderValue(requestHeaders, ":path") + require.True(t, hasPath, "Path header should exist") + require.Equal(t, "/anthropic/v1/messages", pathValue) + + acceptValue, hasAccept := test.GetHeaderValue(requestHeaders, "Accept") + require.True(t, hasAccept, "Accept header should exist") + require.Equal(t, "text/event-stream", acceptValue) + }) + t.Run("bedrock request body prompt cache in-memory should inject system cache point only by default", func(t *testing.T) { host, status := test.NewTestHost(bedrockApiTokenConfig) defer host.Reset() @@ -1821,6 +1983,40 @@ func RunBedrockOnHttpResponseBodyTests(t *testing.T) { _, hasCacheWriteTokens := promptTokensDetails["cache_write_tokens"] require.False(t, hasCacheWriteTokens, "cache_write_tokens should not exist in OpenAI-compatible usage") }) + + t.Run("bedrock anthropic messages response body should pass through", func(t *testing.T) { + host, status := test.NewTestHost(bedrockMantleApiTokenConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + action := host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/messages"}, + {":method", "POST"}, + {"Content-Type", "application/json"}, + }) + require.Equal(t, types.HeaderStopIteration, action) + + requestBody := `{ + "model": "claude-request-model", + "max_tokens": 1024, + "messages": [{"role": "user", "content": "Hello"}] + }` + action = host.CallOnHttpRequestBody([]byte(requestBody)) + require.Equal(t, types.ActionContinue, action) + + host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream")) + action = host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"Content-Type", "application/json"}, + }) + require.Equal(t, types.ActionContinue, action) + + responseBody := `{"id":"msg_01","type":"message","role":"assistant","model":"anthropic.claude-opus-4-7","content":[{"type":"text","text":"Hello"}],"stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":2}}` + action = host.CallOnHttpResponseBody([]byte(responseBody)) + require.Equal(t, types.ActionContinue, action) + require.JSONEq(t, responseBody, string(host.GetResponseBody())) + }) }) } @@ -1840,6 +2036,41 @@ func RunBedrockOnStreamingResponseBodyTests(t *testing.T) { require.Equal(t, "", payload) }) + t.Run("bedrock anthropic messages streaming response should pass through mantle sse", func(t *testing.T) { + host, status := test.NewTestHost(bedrockMantleApiTokenConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + action := host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/messages"}, + {":method", "POST"}, + {"Content-Type", "application/json"}, + }) + require.Equal(t, types.HeaderStopIteration, action) + + requestBody := `{ + "model": "claude-request-model", + "max_tokens": 1024, + "messages": [{"role": "user", "content": "Hello"}], + "stream": true + }` + action = host.CallOnHttpRequestBody([]byte(requestBody)) + require.Equal(t, types.ActionContinue, action) + + host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream")) + action = host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"Content-Type", "text/event-stream; charset=utf-8"}, + }) + require.Equal(t, types.ActionContinue, action) + + chunk := []byte("event: content_block_delta\ndata: {\"type\":\"content_block_delta\",\"index\":0,\"delta\":{\"type\":\"text_delta\",\"text\":\"Hi\"}}\n\n") + action = host.CallOnHttpStreamingResponseBody(chunk, false) + require.Equal(t, types.ActionContinue, action) + require.Equal(t, chunk, host.GetResponseBody()) + }) + t.Run("bedrock streaming parallel tool calls should use dense OpenAI indexes", func(t *testing.T) { host, status := test.NewTestHost(bedrockApiTokenConfig) defer host.Reset()