mirror of
https://github.com/alibaba/higress.git
synced 2026-04-21 20:17:29 +08:00
feat(ai-statistics): support token details and builtin keys for reasoning_tokens/cached_tokens (#3424)
This commit is contained in:
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -17,6 +18,16 @@ import (
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
const (
|
||||
// Envoy log levels
|
||||
LogLevelTrace = iota
|
||||
LogLevelDebug
|
||||
LogLevelInfo
|
||||
LogLevelWarn
|
||||
LogLevelError
|
||||
LogLevelCritical
|
||||
)
|
||||
|
||||
func main() {}
|
||||
|
||||
func init() {
|
||||
@@ -90,10 +101,14 @@ const (
|
||||
RuleAppend = "append"
|
||||
|
||||
// Built-in attributes
|
||||
BuiltinQuestionKey = "question"
|
||||
BuiltinAnswerKey = "answer"
|
||||
BuiltinToolCallsKey = "tool_calls"
|
||||
BuiltinReasoningKey = "reasoning"
|
||||
BuiltinQuestionKey = "question"
|
||||
BuiltinAnswerKey = "answer"
|
||||
BuiltinToolCallsKey = "tool_calls"
|
||||
BuiltinReasoningKey = "reasoning"
|
||||
BuiltinReasoningTokens = "reasoning_tokens"
|
||||
BuiltinCachedTokens = "cached_tokens"
|
||||
BuiltinInputTokenDetails = "input_token_details"
|
||||
BuiltinOutputTokenDetails = "output_token_details"
|
||||
|
||||
// Built-in attribute paths
|
||||
// Question paths (from request body)
|
||||
@@ -578,6 +593,14 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat
|
||||
setSpanAttribute(ArmsModelName, usage.Model)
|
||||
setSpanAttribute(ArmsInputToken, usage.InputToken)
|
||||
setSpanAttribute(ArmsOutputToken, usage.OutputToken)
|
||||
|
||||
// Set token details to context for later use in attributes
|
||||
if len(usage.InputTokenDetails) > 0 {
|
||||
ctx.SetContext(tokenusage.CtxKeyInputTokenDetails, usage.InputTokenDetails)
|
||||
}
|
||||
if len(usage.OutputTokenDetails) > 0 {
|
||||
ctx.SetContext(tokenusage.CtxKeyOutputTokenDetails, usage.OutputTokenDetails)
|
||||
}
|
||||
}
|
||||
}
|
||||
// If the end of the stream is reached, record metrics/logs/spans.
|
||||
@@ -634,6 +657,14 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body
|
||||
setSpanAttribute(ArmsInputToken, usage.InputToken)
|
||||
setSpanAttribute(ArmsOutputToken, usage.OutputToken)
|
||||
setSpanAttribute(ArmsTotalToken, usage.TotalToken)
|
||||
|
||||
// Set token details to context for later use in attributes
|
||||
if len(usage.InputTokenDetails) > 0 {
|
||||
ctx.SetContext(tokenusage.CtxKeyInputTokenDetails, usage.InputTokenDetails)
|
||||
}
|
||||
if len(usage.OutputTokenDetails) > 0 {
|
||||
ctx.SetContext(tokenusage.CtxKeyOutputTokenDetails, usage.OutputTokenDetails)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -694,18 +725,41 @@ func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, so
|
||||
if (value == nil || value == "") && attribute.DefaultValue != "" {
|
||||
value = attribute.DefaultValue
|
||||
}
|
||||
if len(fmt.Sprint(value)) > config.valueLengthLimit {
|
||||
value = fmt.Sprint(value)[:config.valueLengthLimit/2] + " [truncated] " + fmt.Sprint(value)[len(fmt.Sprint(value))-config.valueLengthLimit/2:]
|
||||
|
||||
// Format value for logging/span
|
||||
var formattedValue interface{}
|
||||
switch v := value.(type) {
|
||||
case map[string]int64:
|
||||
// For token details maps, convert to JSON string
|
||||
jsonBytes, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
log.Warnf("failed to marshal token details: %v", err)
|
||||
formattedValue = fmt.Sprint(v)
|
||||
} else {
|
||||
formattedValue = string(jsonBytes)
|
||||
}
|
||||
default:
|
||||
formattedValue = value
|
||||
if len(fmt.Sprint(value)) > config.valueLengthLimit {
|
||||
formattedValue = fmt.Sprint(value)[:config.valueLengthLimit/2] + " [truncated] " + fmt.Sprint(value)[len(fmt.Sprint(value))-config.valueLengthLimit/2:]
|
||||
}
|
||||
}
|
||||
log.Debugf("[attribute] source type: %s, key: %s, value: %+v", source, key, value)
|
||||
|
||||
log.Debugf("[attribute] source type: %s, key: %s, value: %+v", source, key, formattedValue)
|
||||
if attribute.ApplyToLog {
|
||||
if attribute.AsSeparateLogField {
|
||||
marshalledJsonStr := wrapper.MarshalStr(fmt.Sprint(value))
|
||||
var marshalledJsonStr string
|
||||
if _, ok := value.(map[string]int64); ok {
|
||||
// Already marshaled in formattedValue
|
||||
marshalledJsonStr = fmt.Sprint(formattedValue)
|
||||
} else {
|
||||
marshalledJsonStr = wrapper.MarshalStr(fmt.Sprint(formattedValue))
|
||||
}
|
||||
if err := proxywasm.SetProperty([]string{key}, []byte(marshalledJsonStr)); err != nil {
|
||||
log.Warnf("failed to set %s in filter state, raw is %s, err is %v", key, marshalledJsonStr, err)
|
||||
}
|
||||
} else {
|
||||
ctx.SetUserAttribute(key, value)
|
||||
ctx.SetUserAttribute(key, formattedValue)
|
||||
}
|
||||
}
|
||||
// for metrics
|
||||
@@ -723,7 +777,9 @@ func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, so
|
||||
|
||||
// isBuiltinAttribute checks if the given key is a built-in attribute
|
||||
func isBuiltinAttribute(key string) bool {
|
||||
return key == BuiltinQuestionKey || key == BuiltinAnswerKey || key == BuiltinToolCallsKey || key == BuiltinReasoningKey
|
||||
return key == BuiltinQuestionKey || key == BuiltinAnswerKey || key == BuiltinToolCallsKey || key == BuiltinReasoningKey ||
|
||||
key == BuiltinReasoningTokens || key == BuiltinCachedTokens ||
|
||||
key == BuiltinInputTokenDetails || key == BuiltinOutputTokenDetails
|
||||
}
|
||||
|
||||
// getBuiltinAttributeDefaultSources returns the default value_source(s) for a built-in attribute
|
||||
@@ -734,6 +790,9 @@ func getBuiltinAttributeDefaultSources(key string) []string {
|
||||
return []string{RequestBody}
|
||||
case BuiltinAnswerKey, BuiltinToolCallsKey, BuiltinReasoningKey:
|
||||
return []string{ResponseStreamingBody, ResponseBody}
|
||||
case BuiltinReasoningTokens, BuiltinCachedTokens, BuiltinInputTokenDetails, BuiltinOutputTokenDetails:
|
||||
// Token details are only available after response is received
|
||||
return []string{ResponseStreamingBody, ResponseBody}
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
@@ -816,6 +875,38 @@ func getBuiltinAttributeFallback(ctx wrapper.HttpContext, config AIStatisticsCon
|
||||
return value
|
||||
}
|
||||
}
|
||||
case BuiltinReasoningTokens:
|
||||
// Extract reasoning_tokens from output_token_details (only available after response)
|
||||
if source == ResponseBody || source == ResponseStreamingBody {
|
||||
if outputTokenDetails, ok := ctx.GetContext(tokenusage.CtxKeyOutputTokenDetails).(map[string]int64); ok {
|
||||
if reasoningTokens, exists := outputTokenDetails["reasoning_tokens"]; exists {
|
||||
return reasoningTokens
|
||||
}
|
||||
}
|
||||
}
|
||||
case BuiltinCachedTokens:
|
||||
// Extract cached_tokens from input_token_details (only available after response)
|
||||
if source == ResponseBody || source == ResponseStreamingBody {
|
||||
if inputTokenDetails, ok := ctx.GetContext(tokenusage.CtxKeyInputTokenDetails).(map[string]int64); ok {
|
||||
if cachedTokens, exists := inputTokenDetails["cached_tokens"]; exists {
|
||||
return cachedTokens
|
||||
}
|
||||
}
|
||||
}
|
||||
case BuiltinInputTokenDetails:
|
||||
// Return the entire input_token_details map (only available after response)
|
||||
if source == ResponseBody || source == ResponseStreamingBody {
|
||||
if inputTokenDetails, ok := ctx.GetContext(tokenusage.CtxKeyInputTokenDetails).(map[string]int64); ok {
|
||||
return inputTokenDetails
|
||||
}
|
||||
}
|
||||
case BuiltinOutputTokenDetails:
|
||||
// Return the entire output_token_details map (only available after response)
|
||||
if source == ResponseBody || source == ResponseStreamingBody {
|
||||
if outputTokenDetails, ok := ctx.GetContext(tokenusage.CtxKeyOutputTokenDetails).(map[string]int64); ok {
|
||||
return outputTokenDetails
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -854,11 +945,31 @@ func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string) i
|
||||
return value
|
||||
}
|
||||
|
||||
// shouldLogDebug returns true if the log level is debug or trace
|
||||
func shouldLogDebug() bool {
|
||||
value, err := proxywasm.CallForeignFunction("get_log_level", nil)
|
||||
if err != nil {
|
||||
// If we can't get log level, default to not logging debug info
|
||||
return false
|
||||
}
|
||||
if len(value) < 4 {
|
||||
// Invalid log level value length
|
||||
return false
|
||||
}
|
||||
envoyLogLevel := binary.LittleEndian.Uint32(value[:4])
|
||||
return envoyLogLevel == LogLevelTrace || envoyLogLevel == LogLevelDebug
|
||||
}
|
||||
|
||||
// debugLogAiLog logs the current user attributes that will be written to ai_log
|
||||
func debugLogAiLog(ctx wrapper.HttpContext) {
|
||||
// Only log in debug/trace mode
|
||||
if !shouldLogDebug() {
|
||||
return
|
||||
}
|
||||
|
||||
// Get all user attributes as a map
|
||||
userAttrs := make(map[string]interface{})
|
||||
|
||||
|
||||
// Try to reconstruct from GetUserAttribute (note: this is best-effort)
|
||||
// The actual attributes are stored internally, we log what we know
|
||||
if question := ctx.GetUserAttribute("question"); question != nil {
|
||||
@@ -903,6 +1014,18 @@ func debugLogAiLog(ctx wrapper.HttpContext) {
|
||||
if llmServiceDuration := ctx.GetUserAttribute("llm_service_duration"); llmServiceDuration != nil {
|
||||
userAttrs["llm_service_duration"] = llmServiceDuration
|
||||
}
|
||||
if reasoningTokens := ctx.GetUserAttribute("reasoning_tokens"); reasoningTokens != nil {
|
||||
userAttrs["reasoning_tokens"] = reasoningTokens
|
||||
}
|
||||
if cachedTokens := ctx.GetUserAttribute("cached_tokens"); cachedTokens != nil {
|
||||
userAttrs["cached_tokens"] = cachedTokens
|
||||
}
|
||||
if inputTokenDetails := ctx.GetUserAttribute("input_token_details"); inputTokenDetails != nil {
|
||||
userAttrs["input_token_details"] = inputTokenDetails
|
||||
}
|
||||
if outputTokenDetails := ctx.GetUserAttribute("output_token_details"); outputTokenDetails != nil {
|
||||
userAttrs["output_token_details"] = outputTokenDetails
|
||||
}
|
||||
|
||||
// Log the attributes as JSON
|
||||
logJson, _ := json.Marshal(userAttrs)
|
||||
|
||||
Reference in New Issue
Block a user