mirror of
https://github.com/alibaba/higress.git
synced 2026-06-09 04:37:31 +08:00
optimize plugin sdk (#1930)
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/util"
|
||||
"github.com/alibaba/higress/plugins/wasm-go/pkg/log"
|
||||
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
|
||||
"github.com/google/uuid"
|
||||
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
|
||||
@@ -125,11 +126,11 @@ func (c *ProviderConfig) initVariable() {
|
||||
c.failover.ctxVmLease = provider + "-" + id + "-vmLease"
|
||||
}
|
||||
|
||||
func parseConfig(json gjson.Result, config *any, log wrapper.Log) error {
|
||||
func parseConfig(json gjson.Result, config *any) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) SetApiTokensFailover(log wrapper.Log, activeProvider Provider) error {
|
||||
func (c *ProviderConfig) SetApiTokensFailover(activeProvider Provider) error {
|
||||
c.initVariable()
|
||||
// Reset shared data in case plugin configuration is updated
|
||||
log.Debugf("ai-proxy plugin configuration is updated, reset shared data")
|
||||
@@ -147,7 +148,7 @@ func (c *ProviderConfig) SetApiTokensFailover(log wrapper.Log, activeProvider Pr
|
||||
|
||||
wrapper.RegisteTickFunc(c.failover.healthCheckInterval, func() {
|
||||
// Only the Wasm VM that successfully acquires the lease will perform health check
|
||||
if c.isFailoverEnabled() && c.tryAcquireOrRenewLease(vmID, log) {
|
||||
if c.isFailoverEnabled() && c.tryAcquireOrRenewLease(vmID) {
|
||||
log.Debugf("Successfully acquired or renewed lease for %v: %v", vmID, c.GetType())
|
||||
unavailableTokens, _, err := getApiTokens(c.failover.ctxUnavailableApiTokens)
|
||||
if err != nil {
|
||||
@@ -157,7 +158,7 @@ func (c *ProviderConfig) SetApiTokensFailover(log wrapper.Log, activeProvider Pr
|
||||
if len(unavailableTokens) > 0 {
|
||||
for _, apiToken := range unavailableTokens {
|
||||
log.Debugf("Perform health check for unavailable apiTokens: %s", strings.Join(unavailableTokens, ", "))
|
||||
healthCheckEndpoint, headers, body := c.generateRequestHeadersAndBody(log)
|
||||
healthCheckEndpoint, headers, body := c.generateRequestHeadersAndBody()
|
||||
healthCheckClient = wrapper.NewClusterClient(wrapper.TargetCluster{
|
||||
Cluster: healthCheckEndpoint.Cluster,
|
||||
})
|
||||
@@ -165,7 +166,7 @@ func (c *ProviderConfig) SetApiTokensFailover(log wrapper.Log, activeProvider Pr
|
||||
ctx := createHttpContext()
|
||||
ctx.SetContext(c.failover.ctxApiTokenInUse, apiToken)
|
||||
|
||||
modifiedHeaders, modifiedBody, err := c.transformRequestHeadersAndBody(ctx, activeProvider, headers, body, log)
|
||||
modifiedHeaders, modifiedBody, err := c.transformRequestHeadersAndBody(ctx, activeProvider, headers, body)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to transform request headers and body: %v", err)
|
||||
}
|
||||
@@ -173,7 +174,7 @@ func (c *ProviderConfig) SetApiTokensFailover(log wrapper.Log, activeProvider Pr
|
||||
// The apiToken for ChatCompletion and Embeddings can be the same, so we only need to health check ChatCompletion
|
||||
err = healthCheckClient.Post(generateUrl(modifiedHeaders), util.HeaderToSlice(modifiedHeaders), modifiedBody, func(statusCode int, responseHeaders http.Header, responseBody []byte) {
|
||||
if statusCode == 200 {
|
||||
c.handleAvailableApiToken(apiToken, log)
|
||||
c.handleAvailableApiToken(apiToken)
|
||||
}
|
||||
}, uint32(c.failover.healthCheckTimeout))
|
||||
if err != nil {
|
||||
@@ -191,19 +192,19 @@ func generateUrl(header http.Header) string {
|
||||
return fmt.Sprintf("https://%s%s", header.Get(":authority"), header.Get(":path"))
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) transformRequestHeadersAndBody(ctx wrapper.HttpContext, activeProvider Provider, headers [][2]string, body []byte, log wrapper.Log) (http.Header, []byte, error) {
|
||||
func (c *ProviderConfig) transformRequestHeadersAndBody(ctx wrapper.HttpContext, activeProvider Provider, headers [][2]string, body []byte) (http.Header, []byte, error) {
|
||||
modifiedHeaders := util.SliceToHeader(headers)
|
||||
if handler, ok := activeProvider.(TransformRequestHeadersHandler); ok {
|
||||
handler.TransformRequestHeaders(ctx, ApiNameChatCompletion, modifiedHeaders, log)
|
||||
handler.TransformRequestHeaders(ctx, ApiNameChatCompletion, modifiedHeaders)
|
||||
}
|
||||
|
||||
var err error
|
||||
if handler, ok := activeProvider.(TransformRequestBodyHandler); ok {
|
||||
body, err = handler.TransformRequestBody(ctx, ApiNameChatCompletion, body, log)
|
||||
body, err = handler.TransformRequestBody(ctx, ApiNameChatCompletion, body)
|
||||
} else if handler, ok := activeProvider.(TransformRequestBodyHeadersHandler); ok {
|
||||
body, err = handler.TransformRequestBodyHeaders(ctx, ApiNameChatCompletion, body, modifiedHeaders, log)
|
||||
body, err = handler.TransformRequestBodyHeaders(ctx, ApiNameChatCompletion, body, modifiedHeaders)
|
||||
} else {
|
||||
body, err = c.defaultTransformRequestBody(ctx, ApiNameChatCompletion, body, log)
|
||||
body, err = c.defaultTransformRequestBody(ctx, ApiNameChatCompletion, body)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to transform request body: %v", err)
|
||||
@@ -213,14 +214,14 @@ func (c *ProviderConfig) transformRequestHeadersAndBody(ctx wrapper.HttpContext,
|
||||
}
|
||||
|
||||
func createHttpContext() *wrapper.CommonHttpCtx[any] {
|
||||
setParseConfig := wrapper.ParseConfigBy[any](parseConfig)
|
||||
setParseConfig := wrapper.ParseConfig[any](parseConfig)
|
||||
vmCtx := wrapper.NewCommonVmCtx[any]("health-check", setParseConfig)
|
||||
pluginCtx := vmCtx.NewPluginContext(rand.Uint32())
|
||||
ctx := pluginCtx.NewHttpContext(rand.Uint32()).(*wrapper.CommonHttpCtx[any])
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) generateRequestHeadersAndBody(log wrapper.Log) (HealthCheckEndpoint, [][2]string, []byte) {
|
||||
func (c *ProviderConfig) generateRequestHeadersAndBody() (HealthCheckEndpoint, [][2]string, []byte) {
|
||||
data, _, err := proxywasm.GetSharedData(c.failover.ctxHealthCheckEndpoint)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get request host and path: %v", err)
|
||||
@@ -248,20 +249,20 @@ func (c *ProviderConfig) generateRequestHeadersAndBody(log wrapper.Log) (HealthC
|
||||
return healthCheckEndpoint, headers, body
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) tryAcquireOrRenewLease(vmID string, log wrapper.Log) bool {
|
||||
func (c *ProviderConfig) tryAcquireOrRenewLease(vmID string) bool {
|
||||
now := time.Now().Unix()
|
||||
|
||||
data, cas, err := proxywasm.GetSharedData(c.failover.ctxVmLease)
|
||||
if err != nil {
|
||||
if errors.Is(err, types.ErrorStatusNotFound) {
|
||||
return c.setLease(vmID, now, cas, log)
|
||||
return c.setLease(vmID, now, cas)
|
||||
} else {
|
||||
log.Errorf("Failed to get lease: %v", err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
if data == nil {
|
||||
return c.setLease(vmID, now, cas, log)
|
||||
return c.setLease(vmID, now, cas)
|
||||
}
|
||||
|
||||
var lease Lease
|
||||
@@ -275,13 +276,13 @@ func (c *ProviderConfig) tryAcquireOrRenewLease(vmID string, log wrapper.Log) bo
|
||||
if lease.VMID == vmID || now-lease.Timestamp > 60 {
|
||||
lease.VMID = vmID
|
||||
lease.Timestamp = now
|
||||
return c.setLease(vmID, now, cas, log)
|
||||
return c.setLease(vmID, now, cas)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) setLease(vmID string, timestamp int64, cas uint32, log wrapper.Log) bool {
|
||||
func (c *ProviderConfig) setLease(vmID string, timestamp int64, cas uint32) bool {
|
||||
lease := Lease{
|
||||
VMID: vmID,
|
||||
Timestamp: timestamp,
|
||||
@@ -305,7 +306,7 @@ func generateVMID() string {
|
||||
|
||||
// When number of request successes exceeds the threshold during health check,
|
||||
// add the apiToken back to the available list and remove it from the unavailable list
|
||||
func (c *ProviderConfig) handleAvailableApiToken(apiToken string, log wrapper.Log) {
|
||||
func (c *ProviderConfig) handleAvailableApiToken(apiToken string) {
|
||||
successApiTokenRequestCount, _, err := getApiTokenRequestCount(c.failover.ctxApiTokenRequestSuccessCount)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get successApiTokenRequestCount: %v", err)
|
||||
@@ -315,18 +316,18 @@ func (c *ProviderConfig) handleAvailableApiToken(apiToken string, log wrapper.Lo
|
||||
successCount := successApiTokenRequestCount[apiToken] + 1
|
||||
if successCount >= c.failover.successThreshold {
|
||||
log.Infof("healthcheck after failover: apiToken %s is available now, add it back to the apiTokens list", apiToken)
|
||||
removeApiToken(c.failover.ctxUnavailableApiTokens, apiToken, log)
|
||||
addApiToken(c.failover.ctxApiTokens, apiToken, log)
|
||||
resetApiTokenRequestCount(c.failover.ctxApiTokenRequestSuccessCount, apiToken, log)
|
||||
removeApiToken(c.failover.ctxUnavailableApiTokens, apiToken)
|
||||
addApiToken(c.failover.ctxApiTokens, apiToken)
|
||||
resetApiTokenRequestCount(c.failover.ctxApiTokenRequestSuccessCount, apiToken)
|
||||
} else {
|
||||
log.Debugf("apiToken %s is still unavailable, the number of health check passed: %d, continue to health check...", apiToken, successCount)
|
||||
addApiTokenRequestCount(c.failover.ctxApiTokenRequestSuccessCount, apiToken, log)
|
||||
addApiTokenRequestCount(c.failover.ctxApiTokenRequestSuccessCount, apiToken)
|
||||
}
|
||||
}
|
||||
|
||||
// When number of request failures exceeds the threshold,
|
||||
// remove the apiToken from the available list and add it to the unavailable list
|
||||
func (c *ProviderConfig) handleUnavailableApiToken(ctx wrapper.HttpContext, apiToken string, log wrapper.Log) {
|
||||
func (c *ProviderConfig) handleUnavailableApiToken(ctx wrapper.HttpContext, apiToken string) {
|
||||
failureApiTokenRequestCount, _, err := getApiTokenRequestCount(c.failover.ctxApiTokenRequestFailureCount)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get failureApiTokenRequestCount: %v", err)
|
||||
@@ -346,26 +347,26 @@ func (c *ProviderConfig) handleUnavailableApiToken(ctx wrapper.HttpContext, apiT
|
||||
failureCount := failureApiTokenRequestCount[apiToken] + 1
|
||||
if failureCount >= c.failover.failureThreshold {
|
||||
log.Infof("failover: apiToken %s is unavailable now, remove it from apiTokens list", apiToken)
|
||||
removeApiToken(c.failover.ctxApiTokens, apiToken, log)
|
||||
addApiToken(c.failover.ctxUnavailableApiTokens, apiToken, log)
|
||||
resetApiTokenRequestCount(c.failover.ctxApiTokenRequestFailureCount, apiToken, log)
|
||||
removeApiToken(c.failover.ctxApiTokens, apiToken)
|
||||
addApiToken(c.failover.ctxUnavailableApiTokens, apiToken)
|
||||
resetApiTokenRequestCount(c.failover.ctxApiTokenRequestFailureCount, apiToken)
|
||||
// Set the request host and path to shared data in case they are needed in apiToken health check
|
||||
c.setHealthCheckEndpoint(ctx, log)
|
||||
c.setHealthCheckEndpoint(ctx)
|
||||
} else {
|
||||
log.Debugf("apiToken %s is still available as it has not reached the failure threshold, the number of failed request: %d", apiToken, failureCount)
|
||||
addApiTokenRequestCount(c.failover.ctxApiTokenRequestFailureCount, apiToken, log)
|
||||
addApiTokenRequestCount(c.failover.ctxApiTokenRequestFailureCount, apiToken)
|
||||
}
|
||||
}
|
||||
|
||||
func addApiToken(key, apiToken string, log wrapper.Log) {
|
||||
modifyApiToken(key, apiToken, addApiTokenOperation, log)
|
||||
func addApiToken(key, apiToken string) {
|
||||
modifyApiToken(key, apiToken, addApiTokenOperation)
|
||||
}
|
||||
|
||||
func removeApiToken(key, apiToken string, log wrapper.Log) {
|
||||
modifyApiToken(key, apiToken, removeApiTokenOperation, log)
|
||||
func removeApiToken(key, apiToken string) {
|
||||
modifyApiToken(key, apiToken, removeApiTokenOperation)
|
||||
}
|
||||
|
||||
func modifyApiToken(key, apiToken, op string, log wrapper.Log) {
|
||||
func modifyApiToken(key, apiToken, op string) {
|
||||
for attempt := 1; attempt <= casMaxRetries; attempt++ {
|
||||
apiTokens, cas, err := getApiTokens(key)
|
||||
if err != nil {
|
||||
@@ -468,15 +469,15 @@ func getApiTokenRequestCount(key string) (map[string]int64, uint32, error) {
|
||||
return apiTokens, cas, nil
|
||||
}
|
||||
|
||||
func addApiTokenRequestCount(key, apiToken string, log wrapper.Log) {
|
||||
modifyApiTokenRequestCount(key, apiToken, addApiTokenRequestCountOperation, log)
|
||||
func addApiTokenRequestCount(key, apiToken string) {
|
||||
modifyApiTokenRequestCount(key, apiToken, addApiTokenRequestCountOperation)
|
||||
}
|
||||
|
||||
func resetApiTokenRequestCount(key, apiToken string, log wrapper.Log) {
|
||||
modifyApiTokenRequestCount(key, apiToken, resetApiTokenRequestCountOperation, log)
|
||||
func resetApiTokenRequestCount(key, apiToken string) {
|
||||
modifyApiTokenRequestCount(key, apiToken, resetApiTokenRequestCountOperation)
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) ResetApiTokenRequestFailureCount(apiTokenInUse string, log wrapper.Log) {
|
||||
func (c *ProviderConfig) ResetApiTokenRequestFailureCount(apiTokenInUse string) {
|
||||
if c.isFailoverEnabled() {
|
||||
failureApiTokenRequestCount, _, err := getApiTokenRequestCount(c.failover.ctxApiTokenRequestFailureCount)
|
||||
if err != nil {
|
||||
@@ -484,12 +485,12 @@ func (c *ProviderConfig) ResetApiTokenRequestFailureCount(apiTokenInUse string,
|
||||
}
|
||||
if _, ok := failureApiTokenRequestCount[apiTokenInUse]; ok {
|
||||
log.Infof("Reset apiToken %s request failure count", apiTokenInUse)
|
||||
resetApiTokenRequestCount(c.failover.ctxApiTokenRequestFailureCount, apiTokenInUse, log)
|
||||
resetApiTokenRequestCount(c.failover.ctxApiTokenRequestFailureCount, apiTokenInUse)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func modifyApiTokenRequestCount(key, apiToken string, op string, log wrapper.Log) {
|
||||
func modifyApiTokenRequestCount(key, apiToken string, op string) {
|
||||
for attempt := 1; attempt <= casMaxRetries; attempt++ {
|
||||
apiTokenRequestCount, cas, err := getApiTokenRequestCount(key)
|
||||
if err != nil {
|
||||
@@ -524,7 +525,7 @@ func (c *ProviderConfig) initApiTokens() error {
|
||||
return setApiTokens(c.failover.ctxApiTokens, c.apiTokens, 0)
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) GetGlobalRandomToken(log wrapper.Log) string {
|
||||
func (c *ProviderConfig) GetGlobalRandomToken() string {
|
||||
apiTokens, _, err := getApiTokens(c.failover.ctxApiTokens)
|
||||
unavailableApiTokens, _, err := getApiTokens(c.failover.ctxUnavailableApiTokens)
|
||||
log.Debugf("apiTokens: %v, unavailableApiTokens: %v", apiTokens, unavailableApiTokens)
|
||||
@@ -550,7 +551,7 @@ func (c *ProviderConfig) GetAvailableApiToken(ctx wrapper.HttpContext) []string
|
||||
}
|
||||
|
||||
// SetAvailableApiTokens set available apiTokens of current request in the context, will be used in the retryOnFailure
|
||||
func (c *ProviderConfig) SetAvailableApiTokens(ctx wrapper.HttpContext, log wrapper.Log) {
|
||||
func (c *ProviderConfig) SetAvailableApiTokens(ctx wrapper.HttpContext) {
|
||||
var apiTokens []string
|
||||
if c.isFailoverEnabled() {
|
||||
apiTokens, _, _ = getApiTokens(c.failover.ctxApiTokens)
|
||||
@@ -572,14 +573,14 @@ func (c *ProviderConfig) resetSharedData() {
|
||||
_ = proxywasm.SetSharedData(c.failover.ctxApiTokenRequestFailureCount, nil, 0)
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) OnRequestFailed(activeProvider Provider, ctx wrapper.HttpContext, apiTokenInUse string, apiTokens []string, status string, log wrapper.Log) types.Action {
|
||||
func (c *ProviderConfig) OnRequestFailed(activeProvider Provider, ctx wrapper.HttpContext, apiTokenInUse string, apiTokens []string, status string) types.Action {
|
||||
if c.isFailoverEnabled() && util.MatchStatus(status, c.failover.failoverOnStatus) {
|
||||
log.Warnf("apiToken:%s need failover, error status:%s", apiTokenInUse, status)
|
||||
c.handleUnavailableApiToken(ctx, apiTokenInUse, log)
|
||||
c.handleUnavailableApiToken(ctx, apiTokenInUse)
|
||||
}
|
||||
if c.IsRetryOnFailureEnabled() && util.MatchStatus(status, c.retryOnFailure.retryOnStatus) {
|
||||
log.Warnf("need retry, notice that retry response will be bufferd, error status:%s", status)
|
||||
err := c.retryFailedRequest(activeProvider, ctx, apiTokenInUse, apiTokens, log)
|
||||
err := c.retryFailedRequest(activeProvider, ctx, apiTokenInUse, apiTokens)
|
||||
if err != nil {
|
||||
log.Errorf("retryFailedRequest failed, err:%v", err)
|
||||
return types.ActionContinue
|
||||
@@ -598,11 +599,11 @@ func (c *ProviderConfig) GetApiTokenInUse(ctx wrapper.HttpContext) string {
|
||||
return token
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) SetApiTokenInUse(ctx wrapper.HttpContext, log wrapper.Log) {
|
||||
func (c *ProviderConfig) SetApiTokenInUse(ctx wrapper.HttpContext) {
|
||||
var apiToken string
|
||||
// if enable apiToken failover, only use available apiToken from global apiTokens list
|
||||
if c.isFailoverEnabled() {
|
||||
apiToken = c.GetGlobalRandomToken(log)
|
||||
apiToken = c.GetGlobalRandomToken()
|
||||
} else {
|
||||
apiToken = c.GetRandomToken()
|
||||
}
|
||||
@@ -610,7 +611,7 @@ func (c *ProviderConfig) SetApiTokenInUse(ctx wrapper.HttpContext, log wrapper.L
|
||||
ctx.SetContext(c.failover.ctxApiTokenInUse, apiToken)
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) setHealthCheckEndpoint(ctx wrapper.HttpContext, log wrapper.Log) {
|
||||
func (c *ProviderConfig) setHealthCheckEndpoint(ctx wrapper.HttpContext) {
|
||||
cluster, err := proxywasm.GetProperty([]string{"cluster_name"})
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get cluster_name: %v", err)
|
||||
|
||||
Reference in New Issue
Block a user