mirror of
https://github.com/alibaba/higress.git
synced 2026-02-06 23:21:08 +08:00
optimize retry&failover logic (#1903)
This commit is contained in:
@@ -37,7 +37,7 @@ func main() {
|
||||
}
|
||||
|
||||
func parseGlobalConfig(json gjson.Result, pluginConfig *config.PluginConfig, log wrapper.Log) error {
|
||||
//log.Debugf("loading global config: %s", json.String())
|
||||
log.Debugf("loading global config: %s", json.String())
|
||||
|
||||
pluginConfig.FromJson(json)
|
||||
if err := pluginConfig.Validate(); err != nil {
|
||||
@@ -53,7 +53,7 @@ func parseGlobalConfig(json gjson.Result, pluginConfig *config.PluginConfig, log
|
||||
}
|
||||
|
||||
func parseOverrideRuleConfig(json gjson.Result, global config.PluginConfig, pluginConfig *config.PluginConfig, log wrapper.Log) error {
|
||||
//log.Debugf("loading override rule config: %s", json.String())
|
||||
log.Debugf("loading override rule config: %s", json.String())
|
||||
|
||||
*pluginConfig = global
|
||||
|
||||
@@ -111,6 +111,10 @@ func onHttpRequestHeader(ctx wrapper.HttpContext, pluginConfig config.PluginConf
|
||||
// Set available apiTokens of current request in the context, will be used in the retryOnFailure
|
||||
providerConfig.SetAvailableApiTokens(ctx, log)
|
||||
|
||||
// save the original request host and path in case they are needed for apiToken health check and retry
|
||||
ctx.SetContext(provider.CtxRequestHost, wrapper.GetRequestHost())
|
||||
ctx.SetContext(provider.CtxRequestPath, wrapper.GetRequestPath())
|
||||
|
||||
err := handler.OnRequestHeaders(ctx, apiName, log)
|
||||
if err != nil {
|
||||
_ = util.ErrorHandler("ai-proxy.proc_req_headers_failed", fmt.Errorf("failed to process request headers: %v", err))
|
||||
@@ -138,12 +142,15 @@ func onHttpRequestBody(ctx wrapper.HttpContext, pluginConfig config.PluginConfig
|
||||
log.Debugf("[onHttpRequestBody] no active provider, skip processing")
|
||||
return types.ActionContinue
|
||||
}
|
||||
|
||||
log.Debugf("[onHttpRequestBody] provider=%s", activeProvider.GetProviderType())
|
||||
|
||||
if handler, ok := activeProvider.(provider.RequestBodyHandler); ok {
|
||||
apiName, _ := ctx.GetContext(provider.CtxKeyApiName).(provider.ApiName)
|
||||
providerConfig := pluginConfig.GetProviderConfig()
|
||||
// If retryOnFailure is enabled, save the transformed body to the context in case of retry
|
||||
if providerConfig.IsRetryOnFailureEnabled() {
|
||||
ctx.SetContext(provider.CtxRequestBody, body)
|
||||
}
|
||||
newBody, settingErr := providerConfig.ReplaceByCustomSettings(body)
|
||||
if settingErr != nil {
|
||||
log.Errorf("failed to replace request body by custom settings: %v", settingErr)
|
||||
|
||||
@@ -107,11 +107,6 @@ func createContextCache(providerConfig *ProviderConfig) *contextCache {
|
||||
}
|
||||
|
||||
func (c *contextCache) GetContextFromFile(ctx wrapper.HttpContext, provider Provider, body []byte, log wrapper.Log) error {
|
||||
// get context will overwrite the original request host and path
|
||||
// save the original request host and path in case they are needed for apiToken health check
|
||||
ctx.SetContext(ctxRequestHost, wrapper.GetRequestHost())
|
||||
ctx.SetContext(ctxRequestPath, wrapper.GetRequestPath())
|
||||
|
||||
if c.loaded {
|
||||
log.Debugf("context file loaded from cache")
|
||||
insertContext(provider, c.content, nil, body, log)
|
||||
|
||||
@@ -67,8 +67,9 @@ const (
|
||||
removeApiTokenOperation = "removeApiToken"
|
||||
addApiTokenRequestCountOperation = "addApiTokenRequestCount"
|
||||
resetApiTokenRequestCountOperation = "resetApiTokenRequestCount"
|
||||
ctxRequestHost = "requestHost"
|
||||
ctxRequestPath = "requestPath"
|
||||
CtxRequestHost = "requestHost"
|
||||
CtxRequestPath = "requestPath"
|
||||
CtxRequestBody = "requestBody"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -158,7 +159,6 @@ func (c *ProviderConfig) SetApiTokensFailover(log wrapper.Log, activeProvider Pr
|
||||
log.Debugf("Perform health check for unavailable apiTokens: %s", strings.Join(unavailableTokens, ", "))
|
||||
healthCheckEndpoint, headers, body := c.generateRequestHeadersAndBody(log)
|
||||
healthCheckClient = wrapper.NewClusterClient(wrapper.TargetCluster{
|
||||
Host: healthCheckEndpoint.Host,
|
||||
Cluster: healthCheckEndpoint.Cluster,
|
||||
})
|
||||
|
||||
@@ -171,7 +171,7 @@ func (c *ProviderConfig) SetApiTokensFailover(log wrapper.Log, activeProvider Pr
|
||||
}
|
||||
|
||||
// The apiToken for ChatCompletion and Embeddings can be the same, so we only need to health check ChatCompletion
|
||||
err = healthCheckClient.Post(healthCheckEndpoint.Path, modifiedHeaders, modifiedBody, func(statusCode int, responseHeaders http.Header, responseBody []byte) {
|
||||
err = healthCheckClient.Post(generateUrl(modifiedHeaders), util.HeaderToSlice(modifiedHeaders), modifiedBody, func(statusCode int, responseHeaders http.Header, responseBody []byte) {
|
||||
if statusCode == 200 {
|
||||
c.handleAvailableApiToken(apiToken, log)
|
||||
}
|
||||
@@ -187,19 +187,21 @@ func (c *ProviderConfig) SetApiTokensFailover(log wrapper.Log, activeProvider Pr
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) transformRequestHeadersAndBody(ctx wrapper.HttpContext, activeProvider Provider, headers [][2]string, body []byte, log wrapper.Log) ([][2]string, []byte, error) {
|
||||
originalHeaders := util.SliceToHeader(headers)
|
||||
func generateUrl(header http.Header) string {
|
||||
return fmt.Sprintf("https://%s%s", header.Get(":authority"), header.Get(":path"))
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) transformRequestHeadersAndBody(ctx wrapper.HttpContext, activeProvider Provider, headers [][2]string, body []byte, log wrapper.Log) (http.Header, []byte, error) {
|
||||
modifiedHeaders := util.SliceToHeader(headers)
|
||||
if handler, ok := activeProvider.(TransformRequestHeadersHandler); ok {
|
||||
handler.TransformRequestHeaders(ctx, ApiNameChatCompletion, originalHeaders, log)
|
||||
handler.TransformRequestHeaders(ctx, ApiNameChatCompletion, modifiedHeaders, log)
|
||||
}
|
||||
|
||||
var err error
|
||||
if handler, ok := activeProvider.(TransformRequestBodyHandler); ok {
|
||||
body, err = handler.TransformRequestBody(ctx, ApiNameChatCompletion, body, log)
|
||||
} else if handler, ok := activeProvider.(TransformRequestBodyHeadersHandler); ok {
|
||||
headers := util.GetOriginalRequestHeaders()
|
||||
body, err = handler.TransformRequestBodyHeaders(ctx, ApiNameChatCompletion, body, originalHeaders, log)
|
||||
util.ReplaceRequestHeaders(headers)
|
||||
body, err = handler.TransformRequestBodyHeaders(ctx, ApiNameChatCompletion, body, modifiedHeaders, log)
|
||||
} else {
|
||||
body, err = c.defaultTransformRequestBody(ctx, ApiNameChatCompletion, body, log)
|
||||
}
|
||||
@@ -207,7 +209,6 @@ func (c *ProviderConfig) transformRequestHeadersAndBody(ctx wrapper.HttpContext,
|
||||
return nil, nil, fmt.Errorf("failed to transform request body: %v", err)
|
||||
}
|
||||
|
||||
modifiedHeaders := util.HeaderToSlice(originalHeaders)
|
||||
return modifiedHeaders, body, nil
|
||||
}
|
||||
|
||||
@@ -232,6 +233,8 @@ func (c *ProviderConfig) generateRequestHeadersAndBody(log wrapper.Log) (HealthC
|
||||
|
||||
headers := [][2]string{
|
||||
{"content-type", "application/json"},
|
||||
{":authority", healthCheckEndpoint.Host},
|
||||
{":path", healthCheckEndpoint.Path},
|
||||
}
|
||||
body := []byte(fmt.Sprintf(`{
|
||||
"model": "%s",
|
||||
@@ -311,7 +314,7 @@ func (c *ProviderConfig) handleAvailableApiToken(apiToken string, log wrapper.Lo
|
||||
|
||||
successCount := successApiTokenRequestCount[apiToken] + 1
|
||||
if successCount >= c.failover.successThreshold {
|
||||
log.Infof("apiToken %s is available now, add it back to the apiTokens list", apiToken)
|
||||
log.Infof("healthcheck after failover: apiToken %s is available now, add it back to the apiTokens list", apiToken)
|
||||
removeApiToken(c.failover.ctxUnavailableApiTokens, apiToken, log)
|
||||
addApiToken(c.failover.ctxApiTokens, apiToken, log)
|
||||
resetApiTokenRequestCount(c.failover.ctxApiTokenRequestSuccessCount, apiToken, log)
|
||||
@@ -342,7 +345,7 @@ func (c *ProviderConfig) handleUnavailableApiToken(ctx wrapper.HttpContext, apiT
|
||||
|
||||
failureCount := failureApiTokenRequestCount[apiToken] + 1
|
||||
if failureCount >= c.failover.failureThreshold {
|
||||
log.Infof("apiToken %s is unavailable now, remove it from apiTokens list", apiToken)
|
||||
log.Infof("failover: apiToken %s is unavailable now, remove it from apiTokens list", apiToken)
|
||||
removeApiToken(c.failover.ctxApiTokens, apiToken, log)
|
||||
addApiToken(c.failover.ctxUnavailableApiTokens, apiToken, log)
|
||||
resetApiTokenRequestCount(c.failover.ctxApiTokenRequestFailureCount, apiToken, log)
|
||||
@@ -532,7 +535,8 @@ func (c *ProviderConfig) GetGlobalRandomToken(log wrapper.Log) string {
|
||||
count := len(apiTokens)
|
||||
switch count {
|
||||
case 0:
|
||||
return ""
|
||||
log.Warn("all tokens are unavailable, will use random one of the unavailable tokens")
|
||||
return unavailableApiTokens[rand.Intn(len(unavailableApiTokens))]
|
||||
case 1:
|
||||
return apiTokens[0]
|
||||
default:
|
||||
@@ -570,10 +574,16 @@ func (c *ProviderConfig) resetSharedData() {
|
||||
|
||||
func (c *ProviderConfig) OnRequestFailed(activeProvider Provider, ctx wrapper.HttpContext, apiTokenInUse string, apiTokens []string, status string, log wrapper.Log) types.Action {
|
||||
if c.isFailoverEnabled() && util.MatchStatus(status, c.failover.failoverOnStatus) {
|
||||
log.Warnf("apiToken:%s need failover, error status:%s", apiTokenInUse, status)
|
||||
c.handleUnavailableApiToken(ctx, apiTokenInUse, log)
|
||||
}
|
||||
if c.isRetryOnFailureEnabled() && util.MatchStatus(status, c.retryOnFailure.retryOnStatus) && isNotStreamingResponse(ctx) {
|
||||
c.retryFailedRequest(activeProvider, ctx, apiTokenInUse, apiTokens, log)
|
||||
if c.IsRetryOnFailureEnabled() && util.MatchStatus(status, c.retryOnFailure.retryOnStatus) {
|
||||
log.Warnf("need retry, notice that retry response will be bufferd, error status:%s", status)
|
||||
err := c.retryFailedRequest(activeProvider, ctx, apiTokenInUse, apiTokens, log)
|
||||
if err != nil {
|
||||
log.Errorf("retryFailedRequest failed, err:%v", err)
|
||||
return types.ActionContinue
|
||||
}
|
||||
return types.HeaderStopAllIterationAndWatermark
|
||||
}
|
||||
return types.ActionContinue
|
||||
@@ -606,13 +616,11 @@ func (c *ProviderConfig) setHealthCheckEndpoint(ctx wrapper.HttpContext, log wra
|
||||
log.Errorf("Failed to get cluster_name: %v", err)
|
||||
}
|
||||
|
||||
host := wrapper.GetRequestHost()
|
||||
if host == "" {
|
||||
host = ctx.GetContext(ctxRequestHost).(string)
|
||||
}
|
||||
path := wrapper.GetRequestPath()
|
||||
if path == "" {
|
||||
path = ctx.GetContext(ctxRequestPath).(string)
|
||||
host := ctx.GetStringContext(CtxRequestHost, "")
|
||||
path := ctx.GetStringContext(CtxRequestPath, "")
|
||||
if host == "" || path == "" {
|
||||
log.Errorf("get host or path failed, host:%s, path:%s", host, path)
|
||||
return
|
||||
}
|
||||
|
||||
healthCheckEndpoint := HealthCheckEndpoint{
|
||||
|
||||
@@ -698,11 +698,6 @@ func (c *ProviderConfig) handleRequestBody(
|
||||
return types.ActionContinue, err
|
||||
}
|
||||
|
||||
// If retryOnFailure is enabled, save the transformed body to the context in case of retry
|
||||
if c.isRetryOnFailureEnabled() {
|
||||
ctx.SetContext(ctxRequestBody, body)
|
||||
}
|
||||
|
||||
if apiName == ApiNameChatCompletion {
|
||||
if c.context == nil {
|
||||
return types.ActionContinue, replaceRequestBody(body, log)
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
|
||||
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/util"
|
||||
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
|
||||
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
|
||||
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
|
||||
"github.com/tidwall/gjson"
|
||||
"github.com/tidwall/sjson"
|
||||
@@ -89,6 +88,19 @@ func (m *qwenProvider) TransformRequestHeaders(ctx wrapper.HttpContext, apiName
|
||||
}
|
||||
|
||||
func (m *qwenProvider) TransformRequestBodyHeaders(ctx wrapper.HttpContext, apiName ApiName, body []byte, headers http.Header, log wrapper.Log) ([]byte, error) {
|
||||
if m.config.qwenEnableCompatible {
|
||||
if gjson.GetBytes(body, "model").Exists() {
|
||||
rawModel := gjson.GetBytes(body, "model").String()
|
||||
mappedModel := getMappedModel(rawModel, m.config.modelMapping, log)
|
||||
newBody, err := sjson.SetBytes(body, "model", mappedModel)
|
||||
if err != nil {
|
||||
log.Errorf("Replace model error: %v", err)
|
||||
return newBody, err
|
||||
}
|
||||
return newBody, nil
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
switch apiName {
|
||||
case ApiNameChatCompletion:
|
||||
return m.onChatCompletionRequestBody(ctx, body, headers, log)
|
||||
@@ -115,36 +127,6 @@ func (m *qwenProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiName
|
||||
}
|
||||
|
||||
func (m *qwenProvider) OnRequestBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error) {
|
||||
if m.config.qwenEnableCompatible {
|
||||
if gjson.GetBytes(body, "model").Exists() {
|
||||
rawModel := gjson.GetBytes(body, "model").String()
|
||||
mappedModel := getMappedModel(rawModel, m.config.modelMapping, log)
|
||||
newBody, err := sjson.SetBytes(body, "model", mappedModel)
|
||||
if err != nil {
|
||||
log.Errorf("Replace model error: %v", err)
|
||||
return types.ActionContinue, err
|
||||
}
|
||||
|
||||
// TODO: Temporary fix to clamp top_p value to the range [qwenTopPMin, qwenTopPMax].
|
||||
if topPValue := gjson.GetBytes(body, "top_p"); topPValue.Exists() {
|
||||
rawTopP := topPValue.Float()
|
||||
scaledTopP := math.Max(qwenTopPMin, math.Min(rawTopP, qwenTopPMax))
|
||||
newBody, err = sjson.SetBytes(newBody, "top_p", scaledTopP)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to replace top_p: %v", err)
|
||||
return types.ActionContinue, err
|
||||
}
|
||||
}
|
||||
|
||||
err = proxywasm.ReplaceHttpRequestBody(newBody)
|
||||
if err != nil {
|
||||
log.Errorf("Replace request body error: %v", err)
|
||||
return types.ActionContinue, err
|
||||
}
|
||||
}
|
||||
return types.ActionContinue, nil
|
||||
}
|
||||
|
||||
if !m.config.isSupportedAPI(apiName) {
|
||||
return types.ActionContinue, errUnsupportedApiName
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
|
||||
@@ -11,8 +13,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
ctxRequestBody = "requestBody"
|
||||
ctxRetryCount = "retryCount"
|
||||
ctxRetryCount = "retryCount"
|
||||
)
|
||||
|
||||
type retryOnFailure struct {
|
||||
@@ -34,7 +35,7 @@ func (r *retryOnFailure) FromJson(json gjson.Result) {
|
||||
}
|
||||
r.retryTimeout = json.Get("retryTimeout").Int()
|
||||
if r.retryTimeout == 0 {
|
||||
r.retryTimeout = 30 * 1000
|
||||
r.retryTimeout = 60 * 1000
|
||||
}
|
||||
for _, status := range json.Get("retryOnStatus").Array() {
|
||||
r.retryOnStatus = append(r.retryOnStatus, status.String())
|
||||
@@ -45,16 +46,16 @@ func (r *retryOnFailure) FromJson(json gjson.Result) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) isRetryOnFailureEnabled() bool {
|
||||
func (c *ProviderConfig) IsRetryOnFailureEnabled() bool {
|
||||
return c.retryOnFailure.enabled
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) retryFailedRequest(activeProvider Provider, ctx wrapper.HttpContext, apiTokenInUse string, apiTokens []string, log wrapper.Log) {
|
||||
log.Debugf("Retry failed request: provider=%s", activeProvider.GetProviderType())
|
||||
retryClient := createRetryClient(ctx)
|
||||
func (c *ProviderConfig) retryFailedRequest(activeProvider Provider, ctx wrapper.HttpContext, apiTokenInUse string, apiTokens []string, log wrapper.Log) error {
|
||||
log.Infof("Retry failed request: provider=%s", activeProvider.GetProviderType())
|
||||
retryClient := createRetryClient()
|
||||
apiName, _ := ctx.GetContext(CtxKeyApiName).(ApiName)
|
||||
ctx.SetContext(ctxRetryCount, 1)
|
||||
c.sendRetryRequest(ctx, apiName, activeProvider, retryClient, apiTokenInUse, apiTokens, log)
|
||||
return c.sendRetryRequest(ctx, apiName, activeProvider, retryClient, apiTokenInUse, apiTokens, log)
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) transformResponseHeadersAndBody(ctx wrapper.HttpContext, activeProvider Provider, apiName ApiName, headers http.Header, body []byte, log wrapper.Log) ([][2]string, []byte) {
|
||||
@@ -82,23 +83,28 @@ func (c *ProviderConfig) retryCall(
|
||||
apiTokenInUse string, apiTokens []string) {
|
||||
|
||||
retryCount := ctx.GetContext(ctxRetryCount).(int)
|
||||
log.Debugf("Sent retry request: %d/%d", retryCount, c.retryOnFailure.maxRetries)
|
||||
log.Infof("Sent retry request: %d/%d", retryCount, c.retryOnFailure.maxRetries)
|
||||
|
||||
if statusCode == 200 {
|
||||
log.Debugf("Retry request succeeded")
|
||||
log.Infof("Retry request succeeded")
|
||||
headers, body := c.transformResponseHeadersAndBody(ctx, activeProvider, apiName, responseHeaders, responseBody, log)
|
||||
proxywasm.SendHttpResponse(200, headers, body, -1)
|
||||
return
|
||||
} else {
|
||||
log.Debugf("The retry request still failed, status: %d, responseHeaders: %v, responseBody: %s", statusCode, responseHeaders, string(responseBody))
|
||||
log.Infof("The retry request still failed, status: %d, responseHeaders: %v, responseBody: %s", statusCode, responseHeaders, string(responseBody))
|
||||
}
|
||||
|
||||
retryCount++
|
||||
if retryCount <= int(c.retryOnFailure.maxRetries) {
|
||||
ctx.SetContext(ctxRetryCount, retryCount)
|
||||
c.sendRetryRequest(ctx, apiName, activeProvider, retryClient, apiTokenInUse, apiTokens, log)
|
||||
err := c.sendRetryRequest(ctx, apiName, activeProvider, retryClient, apiTokenInUse, apiTokens, log)
|
||||
if err != nil {
|
||||
log.Errorf("sendRetryRequest failed, err:%v", err)
|
||||
proxywasm.ResumeHttpResponse()
|
||||
return
|
||||
}
|
||||
} else {
|
||||
log.Debugf("Reached the maximum retry count: %d", c.retryOnFailure.maxRetries)
|
||||
log.Infof("Reached the maximum retry count: %d", c.retryOnFailure.maxRetries)
|
||||
proxywasm.ResumeHttpResponse()
|
||||
return
|
||||
}
|
||||
@@ -107,65 +113,43 @@ func (c *ProviderConfig) retryCall(
|
||||
func (c *ProviderConfig) sendRetryRequest(
|
||||
ctx wrapper.HttpContext, apiName ApiName, activeProvider Provider,
|
||||
retryClient *wrapper.ClusterClient[wrapper.RouteCluster],
|
||||
apiTokenInUse string, apiTokens []string, log wrapper.Log) {
|
||||
apiTokenInUse string, apiTokens []string, log wrapper.Log) error {
|
||||
|
||||
// Remove last failed token from retry apiTokens list
|
||||
apiTokens = removeApiTokenFromRetryList(apiTokens, apiTokenInUse, log)
|
||||
if len(apiTokens) == 0 {
|
||||
log.Debugf("No more apiTokens to retry")
|
||||
proxywasm.ResumeHttpResponse()
|
||||
return
|
||||
return errors.New("No more apiTokens to retry")
|
||||
}
|
||||
// Set apiTokenInUse for the retry request
|
||||
apiTokenInUse = GetRandomToken(apiTokens)
|
||||
log.Debugf("Retry request with apiToken: %s", apiTokenInUse)
|
||||
ctx.SetContext(c.failover.ctxApiTokenInUse, apiTokenInUse)
|
||||
|
||||
requestHeaders, requestBody := c.getRetryRequestHeadersAndBody(ctx, activeProvider, apiName, log)
|
||||
path := getRetryPath(ctx)
|
||||
|
||||
err := retryClient.Post(path, util.HeaderToSlice(requestHeaders), requestBody, func(statusCode int, responseHeaders http.Header, responseBody []byte) {
|
||||
c.retryCall(ctx, log, activeProvider, apiName, statusCode, responseHeaders, responseBody, retryClient, apiTokenInUse, apiTokens)
|
||||
}, uint32(c.retryOnFailure.retryTimeout))
|
||||
requestBody := ctx.GetByteSliceContext(CtxRequestBody, []byte(""))
|
||||
log.Debugf("get original requestBody:%s", requestBody)
|
||||
modifiedHeaders, modifiedBody, err := c.transformRequestHeadersAndBody(ctx, activeProvider, [][2]string{
|
||||
{"content-type", "application/json"},
|
||||
{":authority", ctx.GetStringContext(CtxRequestHost, "")},
|
||||
{":path", ctx.GetStringContext(CtxRequestPath, "")},
|
||||
}, requestBody, log)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to send retry request: %v", err)
|
||||
proxywasm.ResumeHttpResponse()
|
||||
return
|
||||
return fmt.Errorf("sendRetryRequest failed to transform request headers and body: %v", err)
|
||||
}
|
||||
|
||||
err = retryClient.Post(generateUrl(modifiedHeaders), util.HeaderToSlice(modifiedHeaders), modifiedBody,
|
||||
func(statusCode int, responseHeaders http.Header, responseBody []byte) {
|
||||
c.retryCall(ctx, log, activeProvider, apiName, statusCode, responseHeaders, responseBody, retryClient, apiTokenInUse, apiTokens)
|
||||
}, uint32(c.retryOnFailure.retryTimeout))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to send retry request: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func createRetryClient(ctx wrapper.HttpContext) *wrapper.ClusterClient[wrapper.RouteCluster] {
|
||||
host := wrapper.GetRequestHost()
|
||||
if host == "" {
|
||||
host = ctx.GetContext(ctxRequestHost).(string)
|
||||
}
|
||||
retryClient := wrapper.NewClusterClient(wrapper.RouteCluster{
|
||||
Host: host,
|
||||
})
|
||||
func createRetryClient() *wrapper.ClusterClient[wrapper.RouteCluster] {
|
||||
retryClient := wrapper.NewClusterClient(wrapper.RouteCluster{})
|
||||
return retryClient
|
||||
}
|
||||
|
||||
func getRetryPath(ctx wrapper.HttpContext) string {
|
||||
path := wrapper.GetRequestPath()
|
||||
if path == "" {
|
||||
path = ctx.GetContext(ctxRequestPath).(string)
|
||||
}
|
||||
return path
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) getRetryRequestHeadersAndBody(ctx wrapper.HttpContext, activeProvider Provider, apiName ApiName, log wrapper.Log) (http.Header, []byte) {
|
||||
// The retry request is sent with different apiToken, so the header needs to be regenerated
|
||||
requestHeaders := http.Header{
|
||||
"Content-Type": []string{"application/json"},
|
||||
}
|
||||
if handler, ok := activeProvider.(TransformRequestHeadersHandler); ok {
|
||||
handler.TransformRequestHeaders(ctx, apiName, requestHeaders, log)
|
||||
}
|
||||
requestBody := ctx.GetContext(ctxRequestBody).([]byte)
|
||||
|
||||
return requestHeaders, requestBody
|
||||
}
|
||||
|
||||
func removeApiTokenFromRetryList(apiTokens []string, removedApiToken string, log wrapper.Log) []string {
|
||||
var availableApiTokens []string
|
||||
for _, s := range apiTokens {
|
||||
|
||||
@@ -136,7 +136,7 @@ func HttpCall(cluster Cluster, method, rawURL string, headers [][2]string, body
|
||||
requestID, code, normalResponse, respBody)
|
||||
callback(code, headers, respBody)
|
||||
})
|
||||
proxywasm.LogDebugf("http call start, id: %s, cluster: %s, method: %s, url: %s, body: %s, timeout: %d",
|
||||
requestID, cluster.ClusterName(), method, rawURL, body, timeout)
|
||||
proxywasm.LogDebugf("http call start, id: %s, cluster: %s, method: %s, url: %s, headers: %#v, body: %s, timeout: %d",
|
||||
requestID, cluster.ClusterName(), method, rawURL, headers, body, timeout)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@ type HttpContext interface {
|
||||
GetContext(key string) interface{}
|
||||
GetBoolContext(key string, defaultValue bool) bool
|
||||
GetStringContext(key, defaultValue string) string
|
||||
GetByteSliceContext(key string, defaultValue []byte) []byte
|
||||
GetUserAttribute(key string) interface{}
|
||||
SetUserAttribute(key string, value interface{})
|
||||
SetUserAttributeMap(kvmap map[string]interface{})
|
||||
@@ -483,6 +484,13 @@ func (ctx *CommonHttpCtx[PluginConfig]) GetStringContext(key, defaultValue strin
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) GetByteSliceContext(key string, defaultValue []byte) []byte {
|
||||
if s, ok := ctx.userContext[key].([]byte); ok {
|
||||
return s
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) Scheme() string {
|
||||
proxywasm.SetEffectiveContext(ctx.contextID)
|
||||
return GetRequestScheme()
|
||||
|
||||
Reference in New Issue
Block a user