From 9a89665b22c46d5e2a7d3c314b32b375ce4dddf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BE=84=E6=BD=AD?= Date: Mon, 17 Mar 2025 11:19:33 +0800 Subject: [PATCH] optimize retry&failover logic (#1903) --- plugins/wasm-go/extensions/ai-proxy/main.go | 13 ++- .../extensions/ai-proxy/provider/context.go | 5 - .../extensions/ai-proxy/provider/failover.go | 54 ++++++----- .../extensions/ai-proxy/provider/provider.go | 5 - .../extensions/ai-proxy/provider/qwen.go | 44 +++------ .../extensions/ai-proxy/provider/retry.go | 96 ++++++++----------- plugins/wasm-go/pkg/wrapper/http_wrapper.go | 4 +- plugins/wasm-go/pkg/wrapper/plugin_wrapper.go | 8 ++ 8 files changed, 104 insertions(+), 125 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-proxy/main.go b/plugins/wasm-go/extensions/ai-proxy/main.go index 3e9a555b4..81905ce11 100644 --- a/plugins/wasm-go/extensions/ai-proxy/main.go +++ b/plugins/wasm-go/extensions/ai-proxy/main.go @@ -37,7 +37,7 @@ func main() { } func parseGlobalConfig(json gjson.Result, pluginConfig *config.PluginConfig, log wrapper.Log) error { - //log.Debugf("loading global config: %s", json.String()) + log.Debugf("loading global config: %s", json.String()) pluginConfig.FromJson(json) if err := pluginConfig.Validate(); err != nil { @@ -53,7 +53,7 @@ func parseGlobalConfig(json gjson.Result, pluginConfig *config.PluginConfig, log } func parseOverrideRuleConfig(json gjson.Result, global config.PluginConfig, pluginConfig *config.PluginConfig, log wrapper.Log) error { - //log.Debugf("loading override rule config: %s", json.String()) + log.Debugf("loading override rule config: %s", json.String()) *pluginConfig = global @@ -111,6 +111,10 @@ func onHttpRequestHeader(ctx wrapper.HttpContext, pluginConfig config.PluginConf // Set available apiTokens of current request in the context, will be used in the retryOnFailure providerConfig.SetAvailableApiTokens(ctx, log) + // save the original request host and path in case they are needed for apiToken health check and retry + ctx.SetContext(provider.CtxRequestHost, wrapper.GetRequestHost()) + ctx.SetContext(provider.CtxRequestPath, wrapper.GetRequestPath()) + err := handler.OnRequestHeaders(ctx, apiName, log) if err != nil { _ = util.ErrorHandler("ai-proxy.proc_req_headers_failed", fmt.Errorf("failed to process request headers: %v", err)) @@ -138,12 +142,15 @@ func onHttpRequestBody(ctx wrapper.HttpContext, pluginConfig config.PluginConfig log.Debugf("[onHttpRequestBody] no active provider, skip processing") return types.ActionContinue } - log.Debugf("[onHttpRequestBody] provider=%s", activeProvider.GetProviderType()) if handler, ok := activeProvider.(provider.RequestBodyHandler); ok { apiName, _ := ctx.GetContext(provider.CtxKeyApiName).(provider.ApiName) providerConfig := pluginConfig.GetProviderConfig() + // If retryOnFailure is enabled, save the transformed body to the context in case of retry + if providerConfig.IsRetryOnFailureEnabled() { + ctx.SetContext(provider.CtxRequestBody, body) + } newBody, settingErr := providerConfig.ReplaceByCustomSettings(body) if settingErr != nil { log.Errorf("failed to replace request body by custom settings: %v", settingErr) diff --git a/plugins/wasm-go/extensions/ai-proxy/provider/context.go b/plugins/wasm-go/extensions/ai-proxy/provider/context.go index fb38f86c5..1bc46c090 100644 --- a/plugins/wasm-go/extensions/ai-proxy/provider/context.go +++ b/plugins/wasm-go/extensions/ai-proxy/provider/context.go @@ -107,11 +107,6 @@ func createContextCache(providerConfig *ProviderConfig) *contextCache { } func (c *contextCache) GetContextFromFile(ctx wrapper.HttpContext, provider Provider, body []byte, log wrapper.Log) error { - // get context will overwrite the original request host and path - // save the original request host and path in case they are needed for apiToken health check - ctx.SetContext(ctxRequestHost, wrapper.GetRequestHost()) - ctx.SetContext(ctxRequestPath, wrapper.GetRequestPath()) - if c.loaded { log.Debugf("context file loaded from cache") insertContext(provider, c.content, nil, body, log) diff --git a/plugins/wasm-go/extensions/ai-proxy/provider/failover.go b/plugins/wasm-go/extensions/ai-proxy/provider/failover.go index 235c0aa29..7f6fe4bd4 100644 --- a/plugins/wasm-go/extensions/ai-proxy/provider/failover.go +++ b/plugins/wasm-go/extensions/ai-proxy/provider/failover.go @@ -67,8 +67,9 @@ const ( removeApiTokenOperation = "removeApiToken" addApiTokenRequestCountOperation = "addApiTokenRequestCount" resetApiTokenRequestCountOperation = "resetApiTokenRequestCount" - ctxRequestHost = "requestHost" - ctxRequestPath = "requestPath" + CtxRequestHost = "requestHost" + CtxRequestPath = "requestPath" + CtxRequestBody = "requestBody" ) var ( @@ -158,7 +159,6 @@ func (c *ProviderConfig) SetApiTokensFailover(log wrapper.Log, activeProvider Pr log.Debugf("Perform health check for unavailable apiTokens: %s", strings.Join(unavailableTokens, ", ")) healthCheckEndpoint, headers, body := c.generateRequestHeadersAndBody(log) healthCheckClient = wrapper.NewClusterClient(wrapper.TargetCluster{ - Host: healthCheckEndpoint.Host, Cluster: healthCheckEndpoint.Cluster, }) @@ -171,7 +171,7 @@ func (c *ProviderConfig) SetApiTokensFailover(log wrapper.Log, activeProvider Pr } // The apiToken for ChatCompletion and Embeddings can be the same, so we only need to health check ChatCompletion - err = healthCheckClient.Post(healthCheckEndpoint.Path, modifiedHeaders, modifiedBody, func(statusCode int, responseHeaders http.Header, responseBody []byte) { + err = healthCheckClient.Post(generateUrl(modifiedHeaders), util.HeaderToSlice(modifiedHeaders), modifiedBody, func(statusCode int, responseHeaders http.Header, responseBody []byte) { if statusCode == 200 { c.handleAvailableApiToken(apiToken, log) } @@ -187,19 +187,21 @@ func (c *ProviderConfig) SetApiTokensFailover(log wrapper.Log, activeProvider Pr return nil } -func (c *ProviderConfig) transformRequestHeadersAndBody(ctx wrapper.HttpContext, activeProvider Provider, headers [][2]string, body []byte, log wrapper.Log) ([][2]string, []byte, error) { - originalHeaders := util.SliceToHeader(headers) +func generateUrl(header http.Header) string { + return fmt.Sprintf("https://%s%s", header.Get(":authority"), header.Get(":path")) +} + +func (c *ProviderConfig) transformRequestHeadersAndBody(ctx wrapper.HttpContext, activeProvider Provider, headers [][2]string, body []byte, log wrapper.Log) (http.Header, []byte, error) { + modifiedHeaders := util.SliceToHeader(headers) if handler, ok := activeProvider.(TransformRequestHeadersHandler); ok { - handler.TransformRequestHeaders(ctx, ApiNameChatCompletion, originalHeaders, log) + handler.TransformRequestHeaders(ctx, ApiNameChatCompletion, modifiedHeaders, log) } var err error if handler, ok := activeProvider.(TransformRequestBodyHandler); ok { body, err = handler.TransformRequestBody(ctx, ApiNameChatCompletion, body, log) } else if handler, ok := activeProvider.(TransformRequestBodyHeadersHandler); ok { - headers := util.GetOriginalRequestHeaders() - body, err = handler.TransformRequestBodyHeaders(ctx, ApiNameChatCompletion, body, originalHeaders, log) - util.ReplaceRequestHeaders(headers) + body, err = handler.TransformRequestBodyHeaders(ctx, ApiNameChatCompletion, body, modifiedHeaders, log) } else { body, err = c.defaultTransformRequestBody(ctx, ApiNameChatCompletion, body, log) } @@ -207,7 +209,6 @@ func (c *ProviderConfig) transformRequestHeadersAndBody(ctx wrapper.HttpContext, return nil, nil, fmt.Errorf("failed to transform request body: %v", err) } - modifiedHeaders := util.HeaderToSlice(originalHeaders) return modifiedHeaders, body, nil } @@ -232,6 +233,8 @@ func (c *ProviderConfig) generateRequestHeadersAndBody(log wrapper.Log) (HealthC headers := [][2]string{ {"content-type", "application/json"}, + {":authority", healthCheckEndpoint.Host}, + {":path", healthCheckEndpoint.Path}, } body := []byte(fmt.Sprintf(`{ "model": "%s", @@ -311,7 +314,7 @@ func (c *ProviderConfig) handleAvailableApiToken(apiToken string, log wrapper.Lo successCount := successApiTokenRequestCount[apiToken] + 1 if successCount >= c.failover.successThreshold { - log.Infof("apiToken %s is available now, add it back to the apiTokens list", apiToken) + log.Infof("healthcheck after failover: apiToken %s is available now, add it back to the apiTokens list", apiToken) removeApiToken(c.failover.ctxUnavailableApiTokens, apiToken, log) addApiToken(c.failover.ctxApiTokens, apiToken, log) resetApiTokenRequestCount(c.failover.ctxApiTokenRequestSuccessCount, apiToken, log) @@ -342,7 +345,7 @@ func (c *ProviderConfig) handleUnavailableApiToken(ctx wrapper.HttpContext, apiT failureCount := failureApiTokenRequestCount[apiToken] + 1 if failureCount >= c.failover.failureThreshold { - log.Infof("apiToken %s is unavailable now, remove it from apiTokens list", apiToken) + log.Infof("failover: apiToken %s is unavailable now, remove it from apiTokens list", apiToken) removeApiToken(c.failover.ctxApiTokens, apiToken, log) addApiToken(c.failover.ctxUnavailableApiTokens, apiToken, log) resetApiTokenRequestCount(c.failover.ctxApiTokenRequestFailureCount, apiToken, log) @@ -532,7 +535,8 @@ func (c *ProviderConfig) GetGlobalRandomToken(log wrapper.Log) string { count := len(apiTokens) switch count { case 0: - return "" + log.Warn("all tokens are unavailable, will use random one of the unavailable tokens") + return unavailableApiTokens[rand.Intn(len(unavailableApiTokens))] case 1: return apiTokens[0] default: @@ -570,10 +574,16 @@ func (c *ProviderConfig) resetSharedData() { func (c *ProviderConfig) OnRequestFailed(activeProvider Provider, ctx wrapper.HttpContext, apiTokenInUse string, apiTokens []string, status string, log wrapper.Log) types.Action { if c.isFailoverEnabled() && util.MatchStatus(status, c.failover.failoverOnStatus) { + log.Warnf("apiToken:%s need failover, error status:%s", apiTokenInUse, status) c.handleUnavailableApiToken(ctx, apiTokenInUse, log) } - if c.isRetryOnFailureEnabled() && util.MatchStatus(status, c.retryOnFailure.retryOnStatus) && isNotStreamingResponse(ctx) { - c.retryFailedRequest(activeProvider, ctx, apiTokenInUse, apiTokens, log) + if c.IsRetryOnFailureEnabled() && util.MatchStatus(status, c.retryOnFailure.retryOnStatus) { + log.Warnf("need retry, notice that retry response will be bufferd, error status:%s", status) + err := c.retryFailedRequest(activeProvider, ctx, apiTokenInUse, apiTokens, log) + if err != nil { + log.Errorf("retryFailedRequest failed, err:%v", err) + return types.ActionContinue + } return types.HeaderStopAllIterationAndWatermark } return types.ActionContinue @@ -606,13 +616,11 @@ func (c *ProviderConfig) setHealthCheckEndpoint(ctx wrapper.HttpContext, log wra log.Errorf("Failed to get cluster_name: %v", err) } - host := wrapper.GetRequestHost() - if host == "" { - host = ctx.GetContext(ctxRequestHost).(string) - } - path := wrapper.GetRequestPath() - if path == "" { - path = ctx.GetContext(ctxRequestPath).(string) + host := ctx.GetStringContext(CtxRequestHost, "") + path := ctx.GetStringContext(CtxRequestPath, "") + if host == "" || path == "" { + log.Errorf("get host or path failed, host:%s, path:%s", host, path) + return } healthCheckEndpoint := HealthCheckEndpoint{ diff --git a/plugins/wasm-go/extensions/ai-proxy/provider/provider.go b/plugins/wasm-go/extensions/ai-proxy/provider/provider.go index 22ec01baa..355bbf27c 100644 --- a/plugins/wasm-go/extensions/ai-proxy/provider/provider.go +++ b/plugins/wasm-go/extensions/ai-proxy/provider/provider.go @@ -698,11 +698,6 @@ func (c *ProviderConfig) handleRequestBody( return types.ActionContinue, err } - // If retryOnFailure is enabled, save the transformed body to the context in case of retry - if c.isRetryOnFailureEnabled() { - ctx.SetContext(ctxRequestBody, body) - } - if apiName == ApiNameChatCompletion { if c.context == nil { return types.ActionContinue, replaceRequestBody(body, log) diff --git a/plugins/wasm-go/extensions/ai-proxy/provider/qwen.go b/plugins/wasm-go/extensions/ai-proxy/provider/qwen.go index a842cad63..950574a91 100644 --- a/plugins/wasm-go/extensions/ai-proxy/provider/qwen.go +++ b/plugins/wasm-go/extensions/ai-proxy/provider/qwen.go @@ -12,7 +12,6 @@ import ( "github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/util" "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" - "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" "github.com/tidwall/gjson" "github.com/tidwall/sjson" @@ -89,6 +88,19 @@ func (m *qwenProvider) TransformRequestHeaders(ctx wrapper.HttpContext, apiName } func (m *qwenProvider) TransformRequestBodyHeaders(ctx wrapper.HttpContext, apiName ApiName, body []byte, headers http.Header, log wrapper.Log) ([]byte, error) { + if m.config.qwenEnableCompatible { + if gjson.GetBytes(body, "model").Exists() { + rawModel := gjson.GetBytes(body, "model").String() + mappedModel := getMappedModel(rawModel, m.config.modelMapping, log) + newBody, err := sjson.SetBytes(body, "model", mappedModel) + if err != nil { + log.Errorf("Replace model error: %v", err) + return newBody, err + } + return newBody, nil + } + return body, nil + } switch apiName { case ApiNameChatCompletion: return m.onChatCompletionRequestBody(ctx, body, headers, log) @@ -115,36 +127,6 @@ func (m *qwenProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiName } func (m *qwenProvider) OnRequestBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error) { - if m.config.qwenEnableCompatible { - if gjson.GetBytes(body, "model").Exists() { - rawModel := gjson.GetBytes(body, "model").String() - mappedModel := getMappedModel(rawModel, m.config.modelMapping, log) - newBody, err := sjson.SetBytes(body, "model", mappedModel) - if err != nil { - log.Errorf("Replace model error: %v", err) - return types.ActionContinue, err - } - - // TODO: Temporary fix to clamp top_p value to the range [qwenTopPMin, qwenTopPMax]. - if topPValue := gjson.GetBytes(body, "top_p"); topPValue.Exists() { - rawTopP := topPValue.Float() - scaledTopP := math.Max(qwenTopPMin, math.Min(rawTopP, qwenTopPMax)) - newBody, err = sjson.SetBytes(newBody, "top_p", scaledTopP) - if err != nil { - log.Errorf("Failed to replace top_p: %v", err) - return types.ActionContinue, err - } - } - - err = proxywasm.ReplaceHttpRequestBody(newBody) - if err != nil { - log.Errorf("Replace request body error: %v", err) - return types.ActionContinue, err - } - } - return types.ActionContinue, nil - } - if !m.config.isSupportedAPI(apiName) { return types.ActionContinue, errUnsupportedApiName } diff --git a/plugins/wasm-go/extensions/ai-proxy/provider/retry.go b/plugins/wasm-go/extensions/ai-proxy/provider/retry.go index 5eaec6325..dd2596efc 100644 --- a/plugins/wasm-go/extensions/ai-proxy/provider/retry.go +++ b/plugins/wasm-go/extensions/ai-proxy/provider/retry.go @@ -1,6 +1,8 @@ package provider import ( + "errors" + "fmt" "math/rand" "net/http" @@ -11,8 +13,7 @@ import ( ) const ( - ctxRequestBody = "requestBody" - ctxRetryCount = "retryCount" + ctxRetryCount = "retryCount" ) type retryOnFailure struct { @@ -34,7 +35,7 @@ func (r *retryOnFailure) FromJson(json gjson.Result) { } r.retryTimeout = json.Get("retryTimeout").Int() if r.retryTimeout == 0 { - r.retryTimeout = 30 * 1000 + r.retryTimeout = 60 * 1000 } for _, status := range json.Get("retryOnStatus").Array() { r.retryOnStatus = append(r.retryOnStatus, status.String()) @@ -45,16 +46,16 @@ func (r *retryOnFailure) FromJson(json gjson.Result) { } } -func (c *ProviderConfig) isRetryOnFailureEnabled() bool { +func (c *ProviderConfig) IsRetryOnFailureEnabled() bool { return c.retryOnFailure.enabled } -func (c *ProviderConfig) retryFailedRequest(activeProvider Provider, ctx wrapper.HttpContext, apiTokenInUse string, apiTokens []string, log wrapper.Log) { - log.Debugf("Retry failed request: provider=%s", activeProvider.GetProviderType()) - retryClient := createRetryClient(ctx) +func (c *ProviderConfig) retryFailedRequest(activeProvider Provider, ctx wrapper.HttpContext, apiTokenInUse string, apiTokens []string, log wrapper.Log) error { + log.Infof("Retry failed request: provider=%s", activeProvider.GetProviderType()) + retryClient := createRetryClient() apiName, _ := ctx.GetContext(CtxKeyApiName).(ApiName) ctx.SetContext(ctxRetryCount, 1) - c.sendRetryRequest(ctx, apiName, activeProvider, retryClient, apiTokenInUse, apiTokens, log) + return c.sendRetryRequest(ctx, apiName, activeProvider, retryClient, apiTokenInUse, apiTokens, log) } func (c *ProviderConfig) transformResponseHeadersAndBody(ctx wrapper.HttpContext, activeProvider Provider, apiName ApiName, headers http.Header, body []byte, log wrapper.Log) ([][2]string, []byte) { @@ -82,23 +83,28 @@ func (c *ProviderConfig) retryCall( apiTokenInUse string, apiTokens []string) { retryCount := ctx.GetContext(ctxRetryCount).(int) - log.Debugf("Sent retry request: %d/%d", retryCount, c.retryOnFailure.maxRetries) + log.Infof("Sent retry request: %d/%d", retryCount, c.retryOnFailure.maxRetries) if statusCode == 200 { - log.Debugf("Retry request succeeded") + log.Infof("Retry request succeeded") headers, body := c.transformResponseHeadersAndBody(ctx, activeProvider, apiName, responseHeaders, responseBody, log) proxywasm.SendHttpResponse(200, headers, body, -1) return } else { - log.Debugf("The retry request still failed, status: %d, responseHeaders: %v, responseBody: %s", statusCode, responseHeaders, string(responseBody)) + log.Infof("The retry request still failed, status: %d, responseHeaders: %v, responseBody: %s", statusCode, responseHeaders, string(responseBody)) } retryCount++ if retryCount <= int(c.retryOnFailure.maxRetries) { ctx.SetContext(ctxRetryCount, retryCount) - c.sendRetryRequest(ctx, apiName, activeProvider, retryClient, apiTokenInUse, apiTokens, log) + err := c.sendRetryRequest(ctx, apiName, activeProvider, retryClient, apiTokenInUse, apiTokens, log) + if err != nil { + log.Errorf("sendRetryRequest failed, err:%v", err) + proxywasm.ResumeHttpResponse() + return + } } else { - log.Debugf("Reached the maximum retry count: %d", c.retryOnFailure.maxRetries) + log.Infof("Reached the maximum retry count: %d", c.retryOnFailure.maxRetries) proxywasm.ResumeHttpResponse() return } @@ -107,65 +113,43 @@ func (c *ProviderConfig) retryCall( func (c *ProviderConfig) sendRetryRequest( ctx wrapper.HttpContext, apiName ApiName, activeProvider Provider, retryClient *wrapper.ClusterClient[wrapper.RouteCluster], - apiTokenInUse string, apiTokens []string, log wrapper.Log) { + apiTokenInUse string, apiTokens []string, log wrapper.Log) error { // Remove last failed token from retry apiTokens list apiTokens = removeApiTokenFromRetryList(apiTokens, apiTokenInUse, log) if len(apiTokens) == 0 { - log.Debugf("No more apiTokens to retry") - proxywasm.ResumeHttpResponse() - return + return errors.New("No more apiTokens to retry") } // Set apiTokenInUse for the retry request apiTokenInUse = GetRandomToken(apiTokens) log.Debugf("Retry request with apiToken: %s", apiTokenInUse) ctx.SetContext(c.failover.ctxApiTokenInUse, apiTokenInUse) - - requestHeaders, requestBody := c.getRetryRequestHeadersAndBody(ctx, activeProvider, apiName, log) - path := getRetryPath(ctx) - - err := retryClient.Post(path, util.HeaderToSlice(requestHeaders), requestBody, func(statusCode int, responseHeaders http.Header, responseBody []byte) { - c.retryCall(ctx, log, activeProvider, apiName, statusCode, responseHeaders, responseBody, retryClient, apiTokenInUse, apiTokens) - }, uint32(c.retryOnFailure.retryTimeout)) + requestBody := ctx.GetByteSliceContext(CtxRequestBody, []byte("")) + log.Debugf("get original requestBody:%s", requestBody) + modifiedHeaders, modifiedBody, err := c.transformRequestHeadersAndBody(ctx, activeProvider, [][2]string{ + {"content-type", "application/json"}, + {":authority", ctx.GetStringContext(CtxRequestHost, "")}, + {":path", ctx.GetStringContext(CtxRequestPath, "")}, + }, requestBody, log) if err != nil { - log.Errorf("Failed to send retry request: %v", err) - proxywasm.ResumeHttpResponse() - return + return fmt.Errorf("sendRetryRequest failed to transform request headers and body: %v", err) } + + err = retryClient.Post(generateUrl(modifiedHeaders), util.HeaderToSlice(modifiedHeaders), modifiedBody, + func(statusCode int, responseHeaders http.Header, responseBody []byte) { + c.retryCall(ctx, log, activeProvider, apiName, statusCode, responseHeaders, responseBody, retryClient, apiTokenInUse, apiTokens) + }, uint32(c.retryOnFailure.retryTimeout)) + if err != nil { + return fmt.Errorf("Failed to send retry request: %v", err) + } + return nil } -func createRetryClient(ctx wrapper.HttpContext) *wrapper.ClusterClient[wrapper.RouteCluster] { - host := wrapper.GetRequestHost() - if host == "" { - host = ctx.GetContext(ctxRequestHost).(string) - } - retryClient := wrapper.NewClusterClient(wrapper.RouteCluster{ - Host: host, - }) +func createRetryClient() *wrapper.ClusterClient[wrapper.RouteCluster] { + retryClient := wrapper.NewClusterClient(wrapper.RouteCluster{}) return retryClient } -func getRetryPath(ctx wrapper.HttpContext) string { - path := wrapper.GetRequestPath() - if path == "" { - path = ctx.GetContext(ctxRequestPath).(string) - } - return path -} - -func (c *ProviderConfig) getRetryRequestHeadersAndBody(ctx wrapper.HttpContext, activeProvider Provider, apiName ApiName, log wrapper.Log) (http.Header, []byte) { - // The retry request is sent with different apiToken, so the header needs to be regenerated - requestHeaders := http.Header{ - "Content-Type": []string{"application/json"}, - } - if handler, ok := activeProvider.(TransformRequestHeadersHandler); ok { - handler.TransformRequestHeaders(ctx, apiName, requestHeaders, log) - } - requestBody := ctx.GetContext(ctxRequestBody).([]byte) - - return requestHeaders, requestBody -} - func removeApiTokenFromRetryList(apiTokens []string, removedApiToken string, log wrapper.Log) []string { var availableApiTokens []string for _, s := range apiTokens { diff --git a/plugins/wasm-go/pkg/wrapper/http_wrapper.go b/plugins/wasm-go/pkg/wrapper/http_wrapper.go index fd1b11875..36b4fe76c 100644 --- a/plugins/wasm-go/pkg/wrapper/http_wrapper.go +++ b/plugins/wasm-go/pkg/wrapper/http_wrapper.go @@ -136,7 +136,7 @@ func HttpCall(cluster Cluster, method, rawURL string, headers [][2]string, body requestID, code, normalResponse, respBody) callback(code, headers, respBody) }) - proxywasm.LogDebugf("http call start, id: %s, cluster: %s, method: %s, url: %s, body: %s, timeout: %d", - requestID, cluster.ClusterName(), method, rawURL, body, timeout) + proxywasm.LogDebugf("http call start, id: %s, cluster: %s, method: %s, url: %s, headers: %#v, body: %s, timeout: %d", + requestID, cluster.ClusterName(), method, rawURL, headers, body, timeout) return err } diff --git a/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go b/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go index c61d59b36..a83693255 100644 --- a/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go +++ b/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go @@ -44,6 +44,7 @@ type HttpContext interface { GetContext(key string) interface{} GetBoolContext(key string, defaultValue bool) bool GetStringContext(key, defaultValue string) string + GetByteSliceContext(key string, defaultValue []byte) []byte GetUserAttribute(key string) interface{} SetUserAttribute(key string, value interface{}) SetUserAttributeMap(kvmap map[string]interface{}) @@ -483,6 +484,13 @@ func (ctx *CommonHttpCtx[PluginConfig]) GetStringContext(key, defaultValue strin return defaultValue } +func (ctx *CommonHttpCtx[PluginConfig]) GetByteSliceContext(key string, defaultValue []byte) []byte { + if s, ok := ctx.userContext[key].([]byte); ok { + return s + } + return defaultValue +} + func (ctx *CommonHttpCtx[PluginConfig]) Scheme() string { proxywasm.SetEffectiveContext(ctx.contextID) return GetRequestScheme()