mirror of
https://github.com/alibaba/higress.git
synced 2026-06-09 12:47:28 +08:00
feat: improve ai statistic plugin (#2671)
This commit is contained in:
@@ -44,6 +44,15 @@ const (
|
||||
APIName = "api"
|
||||
ConsumerKey = "x-mse-consumer"
|
||||
RequestPath = "request_path"
|
||||
SkipProcessing = "skip_processing"
|
||||
|
||||
// AI API Paths
|
||||
PathOpenAIChatCompletions = "/v1/chat/completions"
|
||||
PathOpenAICompletions = "/v1/completions"
|
||||
PathOpenAIEmbeddings = "/v1/embeddings"
|
||||
PathOpenAIModels = "/v1/models"
|
||||
PathGeminiGenerateContent = "/generateContent"
|
||||
PathGeminiStreamGenerateContent = "/streamGenerateContent"
|
||||
|
||||
// Source Type
|
||||
FixedValue = "fixed_value"
|
||||
@@ -100,6 +109,10 @@ type AIStatisticsConfig struct {
|
||||
// If disableOpenaiUsage is true, model/input_token/output_token logs will be skipped
|
||||
disableOpenaiUsage bool
|
||||
valueLengthLimit int
|
||||
// Path suffixes to enable the plugin on
|
||||
enablePathSuffixes []string
|
||||
// Content types to enable response body buffering
|
||||
enableContentTypes []string
|
||||
}
|
||||
|
||||
func generateMetricName(route, cluster, model, consumer, metricName string) string {
|
||||
@@ -147,6 +160,41 @@ func (config *AIStatisticsConfig) incrementCounter(metricName string, inc uint64
|
||||
counter.Increment(inc)
|
||||
}
|
||||
|
||||
// isPathEnabled checks if the request path matches any of the enabled path suffixes
|
||||
func isPathEnabled(requestPath string, enabledSuffixes []string) bool {
|
||||
if len(enabledSuffixes) == 0 {
|
||||
return true // If no path suffixes configured, enable for all
|
||||
}
|
||||
|
||||
// Remove query parameters from path
|
||||
pathWithoutQuery := requestPath
|
||||
if queryPos := strings.Index(requestPath, "?"); queryPos != -1 {
|
||||
pathWithoutQuery = requestPath[:queryPos]
|
||||
}
|
||||
|
||||
// Check if path ends with any enabled suffix
|
||||
for _, suffix := range enabledSuffixes {
|
||||
if strings.HasSuffix(pathWithoutQuery, suffix) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// isContentTypeEnabled checks if the content type matches any of the enabled content types
|
||||
func isContentTypeEnabled(contentType string, enabledContentTypes []string) bool {
|
||||
if len(enabledContentTypes) == 0 {
|
||||
return true // If no content types configured, enable for all
|
||||
}
|
||||
|
||||
for _, enabledType := range enabledContentTypes {
|
||||
if strings.Contains(contentType, enabledType) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func parseConfig(configJson gjson.Result, config *AIStatisticsConfig) error {
|
||||
// Parse tracing span attributes setting.
|
||||
attributeConfigs := configJson.Get("attributes").Array()
|
||||
@@ -177,10 +225,49 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig) error {
|
||||
// Parse openai usage config setting.
|
||||
config.disableOpenaiUsage = configJson.Get("disable_openai_usage").Bool()
|
||||
|
||||
// Parse path suffix configuration
|
||||
pathSuffixes := configJson.Get("enable_path_suffixes").Array()
|
||||
config.enablePathSuffixes = make([]string, 0, len(pathSuffixes))
|
||||
|
||||
for _, suffix := range pathSuffixes {
|
||||
suffixStr := suffix.String()
|
||||
if suffixStr == "*" {
|
||||
// Clear the suffixes list since * means all paths are enabled
|
||||
config.enablePathSuffixes = make([]string, 0)
|
||||
break
|
||||
}
|
||||
config.enablePathSuffixes = append(config.enablePathSuffixes, suffixStr)
|
||||
}
|
||||
|
||||
// Parse content type configuration
|
||||
contentTypes := configJson.Get("enable_content_types").Array()
|
||||
config.enableContentTypes = make([]string, 0, len(contentTypes))
|
||||
|
||||
for _, contentType := range contentTypes {
|
||||
contentTypeStr := contentType.String()
|
||||
if contentTypeStr == "*" {
|
||||
// Clear the content types list since * means all content types are enabled
|
||||
config.enableContentTypes = make([]string, 0)
|
||||
break
|
||||
}
|
||||
config.enableContentTypes = append(config.enableContentTypes, contentTypeStr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig) types.Action {
|
||||
// Check if request path matches enabled suffixes
|
||||
requestPath, _ := proxywasm.GetHttpRequestHeader(":path")
|
||||
if !isPathEnabled(requestPath, config.enablePathSuffixes) {
|
||||
log.Debugf("ai-statistics: skipping request for path %s (not in enabled suffixes)", requestPath)
|
||||
// Set skip processing flag and avoid reading request/response body
|
||||
ctx.SetContext(SkipProcessing, true)
|
||||
ctx.DontReadRequestBody()
|
||||
ctx.DontReadResponseBody()
|
||||
return types.ActionContinue
|
||||
}
|
||||
|
||||
ctx.DisableReroute()
|
||||
route, _ := getRouteName()
|
||||
cluster, _ := getClusterName()
|
||||
@@ -212,6 +299,11 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig) ty
|
||||
}
|
||||
|
||||
func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte) types.Action {
|
||||
// Check if processing should be skipped
|
||||
if ctx.GetBoolContext(SkipProcessing, false) {
|
||||
return types.ActionContinue
|
||||
}
|
||||
|
||||
// Set user defined log & span attributes.
|
||||
setAttributeBySource(ctx, config, RequestBody, body)
|
||||
// Set span attributes for ARMS.
|
||||
@@ -254,6 +346,15 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body
|
||||
|
||||
func onHttpResponseHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig) types.Action {
|
||||
contentType, _ := proxywasm.GetHttpResponseHeader("content-type")
|
||||
|
||||
if !isContentTypeEnabled(contentType, config.enableContentTypes) {
|
||||
log.Debugf("ai-statistics: skipping response for content type %s (not in enabled content types)", contentType)
|
||||
// Set skip processing flag and avoid reading response body
|
||||
ctx.SetContext(SkipProcessing, true)
|
||||
ctx.DontReadResponseBody()
|
||||
return types.ActionContinue
|
||||
}
|
||||
|
||||
if !strings.Contains(contentType, "text/event-stream") {
|
||||
ctx.BufferResponseBody()
|
||||
}
|
||||
@@ -265,6 +366,11 @@ func onHttpResponseHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig) t
|
||||
}
|
||||
|
||||
func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, data []byte, endOfStream bool) []byte {
|
||||
// Check if processing should be skipped
|
||||
if ctx.GetBoolContext(SkipProcessing, false) {
|
||||
return data
|
||||
}
|
||||
|
||||
// Buffer stream body for record log & span attributes
|
||||
if config.shouldBufferStreamingBody {
|
||||
streamingBodyBuffer, ok := ctx.GetContext(CtxStreamingBodyBuffer).([]byte)
|
||||
@@ -334,6 +440,11 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat
|
||||
}
|
||||
|
||||
func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte) types.Action {
|
||||
// Check if processing should be skipped
|
||||
if ctx.GetBoolContext(SkipProcessing, false) {
|
||||
return types.ActionContinue
|
||||
}
|
||||
|
||||
// Get requestStartTime from http context
|
||||
requestStartTime, _ := ctx.GetContext(StatisticsRequestStartTime).(int64)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user