mirror of
https://github.com/alibaba/higress.git
synced 2026-06-09 12:47:28 +08:00
feat(ai-statistics): add session ID tracking for multi-turn agent conversations (#3420)
This commit is contained in:
@@ -48,6 +48,9 @@ const (
|
||||
RequestPath = "request_path"
|
||||
SkipProcessing = "skip_processing"
|
||||
|
||||
// Session ID related
|
||||
SessionID = "session_id"
|
||||
|
||||
// AI API Paths
|
||||
PathOpenAIChatCompletions = "/v1/chat/completions"
|
||||
PathOpenAICompletions = "/v1/completions"
|
||||
@@ -87,8 +90,10 @@ const (
|
||||
RuleAppend = "append"
|
||||
|
||||
// Built-in attributes
|
||||
BuiltinQuestionKey = "question"
|
||||
BuiltinAnswerKey = "answer"
|
||||
BuiltinQuestionKey = "question"
|
||||
BuiltinAnswerKey = "answer"
|
||||
BuiltinToolCallsKey = "tool_calls"
|
||||
BuiltinReasoningKey = "reasoning"
|
||||
|
||||
// Built-in attribute paths
|
||||
// Question paths (from request body)
|
||||
@@ -102,8 +107,132 @@ const (
|
||||
// Answer paths (from response streaming body)
|
||||
AnswerPathOpenAIStreaming = "choices.0.delta.content"
|
||||
AnswerPathClaudeStreaming = "delta.text"
|
||||
|
||||
// Tool calls paths
|
||||
ToolCallsPathNonStreaming = "choices.0.message.tool_calls"
|
||||
ToolCallsPathStreaming = "choices.0.delta.tool_calls"
|
||||
|
||||
// Reasoning paths
|
||||
ReasoningPathNonStreaming = "choices.0.message.reasoning_content"
|
||||
ReasoningPathStreaming = "choices.0.delta.reasoning_content"
|
||||
|
||||
// Context key for streaming tool calls buffer
|
||||
CtxStreamingToolCallsBuffer = "streamingToolCallsBuffer"
|
||||
)
|
||||
|
||||
// Default session ID headers in priority order
|
||||
var defaultSessionHeaders = []string{
|
||||
"x-openclaw-session-key",
|
||||
"x-clawdbot-session-key",
|
||||
"x-moltbot-session-key",
|
||||
"x-agent-session",
|
||||
}
|
||||
|
||||
// extractSessionId extracts session ID from request headers
|
||||
// If customHeader is configured, it takes priority; otherwise falls back to default headers
|
||||
func extractSessionId(customHeader string) string {
|
||||
// If custom header is configured, try it first
|
||||
if customHeader != "" {
|
||||
if sessionId, _ := proxywasm.GetHttpRequestHeader(customHeader); sessionId != "" {
|
||||
return sessionId
|
||||
}
|
||||
}
|
||||
// Fall back to default session headers in priority order
|
||||
for _, header := range defaultSessionHeaders {
|
||||
if sessionId, _ := proxywasm.GetHttpRequestHeader(header); sessionId != "" {
|
||||
return sessionId
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// ToolCall represents a single tool call in the response
|
||||
type ToolCall struct {
|
||||
Index int `json:"index,omitempty"`
|
||||
ID string `json:"id,omitempty"`
|
||||
Type string `json:"type,omitempty"`
|
||||
Function ToolCallFunction `json:"function,omitempty"`
|
||||
}
|
||||
|
||||
// ToolCallFunction represents the function details in a tool call
|
||||
type ToolCallFunction struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Arguments string `json:"arguments,omitempty"`
|
||||
}
|
||||
|
||||
// StreamingToolCallsBuffer holds the state for assembling streaming tool calls
|
||||
type StreamingToolCallsBuffer struct {
|
||||
ToolCalls map[int]*ToolCall // keyed by index
|
||||
}
|
||||
|
||||
// extractStreamingToolCalls extracts and assembles tool calls from streaming response chunks
|
||||
func extractStreamingToolCalls(data []byte, buffer *StreamingToolCallsBuffer) *StreamingToolCallsBuffer {
|
||||
if buffer == nil {
|
||||
buffer = &StreamingToolCallsBuffer{
|
||||
ToolCalls: make(map[int]*ToolCall),
|
||||
}
|
||||
}
|
||||
|
||||
chunks := bytes.Split(bytes.TrimSpace(wrapper.UnifySSEChunk(data)), []byte("\n\n"))
|
||||
for _, chunk := range chunks {
|
||||
toolCallsResult := gjson.GetBytes(chunk, ToolCallsPathStreaming)
|
||||
if !toolCallsResult.Exists() || !toolCallsResult.IsArray() {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, tcResult := range toolCallsResult.Array() {
|
||||
index := int(tcResult.Get("index").Int())
|
||||
|
||||
// Get or create tool call entry
|
||||
tc, exists := buffer.ToolCalls[index]
|
||||
if !exists {
|
||||
tc = &ToolCall{Index: index}
|
||||
buffer.ToolCalls[index] = tc
|
||||
}
|
||||
|
||||
// Update fields if present
|
||||
if id := tcResult.Get("id").String(); id != "" {
|
||||
tc.ID = id
|
||||
}
|
||||
if tcType := tcResult.Get("type").String(); tcType != "" {
|
||||
tc.Type = tcType
|
||||
}
|
||||
if funcName := tcResult.Get("function.name").String(); funcName != "" {
|
||||
tc.Function.Name = funcName
|
||||
}
|
||||
// Append arguments (they come in chunks)
|
||||
if args := tcResult.Get("function.arguments").String(); args != "" {
|
||||
tc.Function.Arguments += args
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return buffer
|
||||
}
|
||||
|
||||
// getToolCallsFromBuffer converts the buffer to a sorted slice of tool calls
|
||||
func getToolCallsFromBuffer(buffer *StreamingToolCallsBuffer) []ToolCall {
|
||||
if buffer == nil || len(buffer.ToolCalls) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Find max index to create properly sized slice
|
||||
maxIndex := 0
|
||||
for idx := range buffer.ToolCalls {
|
||||
if idx > maxIndex {
|
||||
maxIndex = idx
|
||||
}
|
||||
}
|
||||
|
||||
result := make([]ToolCall, 0, len(buffer.ToolCalls))
|
||||
for i := 0; i <= maxIndex; i++ {
|
||||
if tc, exists := buffer.ToolCalls[i]; exists {
|
||||
result = append(result, *tc)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// TracingSpan is the tracing span configuration.
|
||||
type Attribute struct {
|
||||
Key string `json:"key"`
|
||||
@@ -132,6 +261,8 @@ type AIStatisticsConfig struct {
|
||||
enablePathSuffixes []string
|
||||
// Content types to enable response body buffering
|
||||
enableContentTypes []string
|
||||
// Session ID header name (if configured, takes priority over default headers)
|
||||
sessionIdHeader string
|
||||
}
|
||||
|
||||
func generateMetricName(route, cluster, model, consumer, metricName string) string {
|
||||
@@ -272,6 +403,11 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig) error {
|
||||
config.enableContentTypes = append(config.enableContentTypes, contentTypeStr)
|
||||
}
|
||||
|
||||
// Parse session ID header configuration
|
||||
if sessionIdHeader := configJson.Get("session_id_header"); sessionIdHeader.Exists() {
|
||||
config.sessionIdHeader = sessionIdHeader.String()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -307,6 +443,12 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig) ty
|
||||
|
||||
ctx.SetRequestBodyBufferLimit(defaultMaxBodyBytes)
|
||||
|
||||
// Extract session ID from headers
|
||||
sessionId := extractSessionId(config.sessionIdHeader)
|
||||
if sessionId != "" {
|
||||
ctx.SetUserAttribute(SessionID, sessionId)
|
||||
}
|
||||
|
||||
// Set span attributes for ARMS.
|
||||
setSpanAttribute(ArmsSpanKind, "LLM")
|
||||
// Set user defined log & span attributes which type is fixed_value
|
||||
@@ -361,6 +503,7 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body
|
||||
ctx.SetUserAttribute(ChatRound, userPromptCount)
|
||||
|
||||
// Write log
|
||||
debugLogAiLog(ctx)
|
||||
_ = ctx.WriteUserAttributeToLogWithKey(wrapper.AILogKey)
|
||||
return types.ActionContinue
|
||||
}
|
||||
@@ -452,6 +595,7 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat
|
||||
}
|
||||
|
||||
// Write log
|
||||
debugLogAiLog(ctx)
|
||||
_ = ctx.WriteUserAttributeToLogWithKey(wrapper.AILogKey)
|
||||
|
||||
// Write metrics
|
||||
@@ -497,6 +641,7 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body
|
||||
setAttributeBySource(ctx, config, ResponseBody, body)
|
||||
|
||||
// Write log
|
||||
debugLogAiLog(ctx)
|
||||
_ = ctx.WriteUserAttributeToLogWithKey(wrapper.AILogKey)
|
||||
|
||||
// Write metrics
|
||||
@@ -511,8 +656,16 @@ func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, so
|
||||
for _, attribute := range config.attributes {
|
||||
var key string
|
||||
var value interface{}
|
||||
if source == attribute.ValueSource {
|
||||
key = attribute.Key
|
||||
key = attribute.Key
|
||||
|
||||
// Check if this attribute should be processed for the current source
|
||||
// For built-in attributes without value_source configured, use default source matching
|
||||
if !shouldProcessBuiltinAttribute(key, attribute.ValueSource, source) {
|
||||
continue
|
||||
}
|
||||
|
||||
// If value is configured, try to extract using the configured path
|
||||
if attribute.Value != "" {
|
||||
switch source {
|
||||
case FixedValue:
|
||||
value = attribute.Value
|
||||
@@ -528,52 +681,81 @@ func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, so
|
||||
value = gjson.GetBytes(body, attribute.Value).Value()
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Handle built-in attributes with Claude/OpenAI protocol fallback logic
|
||||
if (value == nil || value == "") && isBuiltinAttribute(key) {
|
||||
value = getBuiltinAttributeFallback(ctx, config, key, source, body, attribute.Rule)
|
||||
if value != nil && value != "" {
|
||||
log.Debugf("[attribute] Used protocol fallback for %s: %+v", key, value)
|
||||
}
|
||||
// Handle built-in attributes: use fallback if value is empty or not configured
|
||||
if (value == nil || value == "") && isBuiltinAttribute(key) {
|
||||
value = getBuiltinAttributeFallback(ctx, config, key, source, body, attribute.Rule)
|
||||
if value != nil && value != "" {
|
||||
log.Debugf("[attribute] Used built-in extraction for %s: %+v", key, value)
|
||||
}
|
||||
}
|
||||
|
||||
if (value == nil || value == "") && attribute.DefaultValue != "" {
|
||||
value = attribute.DefaultValue
|
||||
}
|
||||
if len(fmt.Sprint(value)) > config.valueLengthLimit {
|
||||
value = fmt.Sprint(value)[:config.valueLengthLimit/2] + " [truncated] " + fmt.Sprint(value)[len(fmt.Sprint(value))-config.valueLengthLimit/2:]
|
||||
}
|
||||
log.Debugf("[attribute] source type: %s, key: %s, value: %+v", source, key, value)
|
||||
if attribute.ApplyToLog {
|
||||
if attribute.AsSeparateLogField {
|
||||
marshalledJsonStr := wrapper.MarshalStr(fmt.Sprint(value))
|
||||
if err := proxywasm.SetProperty([]string{key}, []byte(marshalledJsonStr)); err != nil {
|
||||
log.Warnf("failed to set %s in filter state, raw is %s, err is %v", key, marshalledJsonStr, err)
|
||||
}
|
||||
} else {
|
||||
ctx.SetUserAttribute(key, value)
|
||||
if (value == nil || value == "") && attribute.DefaultValue != "" {
|
||||
value = attribute.DefaultValue
|
||||
}
|
||||
if len(fmt.Sprint(value)) > config.valueLengthLimit {
|
||||
value = fmt.Sprint(value)[:config.valueLengthLimit/2] + " [truncated] " + fmt.Sprint(value)[len(fmt.Sprint(value))-config.valueLengthLimit/2:]
|
||||
}
|
||||
log.Debugf("[attribute] source type: %s, key: %s, value: %+v", source, key, value)
|
||||
if attribute.ApplyToLog {
|
||||
if attribute.AsSeparateLogField {
|
||||
marshalledJsonStr := wrapper.MarshalStr(fmt.Sprint(value))
|
||||
if err := proxywasm.SetProperty([]string{key}, []byte(marshalledJsonStr)); err != nil {
|
||||
log.Warnf("failed to set %s in filter state, raw is %s, err is %v", key, marshalledJsonStr, err)
|
||||
}
|
||||
} else {
|
||||
ctx.SetUserAttribute(key, value)
|
||||
}
|
||||
// for metrics
|
||||
if key == tokenusage.CtxKeyModel || key == tokenusage.CtxKeyInputToken || key == tokenusage.CtxKeyOutputToken || key == tokenusage.CtxKeyTotalToken {
|
||||
ctx.SetContext(key, value)
|
||||
}
|
||||
if attribute.ApplyToSpan {
|
||||
if attribute.TraceSpanKey != "" {
|
||||
key = attribute.TraceSpanKey
|
||||
}
|
||||
setSpanAttribute(key, value)
|
||||
}
|
||||
// for metrics
|
||||
if key == tokenusage.CtxKeyModel || key == tokenusage.CtxKeyInputToken || key == tokenusage.CtxKeyOutputToken || key == tokenusage.CtxKeyTotalToken {
|
||||
ctx.SetContext(key, value)
|
||||
}
|
||||
if attribute.ApplyToSpan {
|
||||
if attribute.TraceSpanKey != "" {
|
||||
key = attribute.TraceSpanKey
|
||||
}
|
||||
setSpanAttribute(key, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// isBuiltinAttribute checks if the given key is a built-in attribute
|
||||
func isBuiltinAttribute(key string) bool {
|
||||
return key == BuiltinQuestionKey || key == BuiltinAnswerKey
|
||||
return key == BuiltinQuestionKey || key == BuiltinAnswerKey || key == BuiltinToolCallsKey || key == BuiltinReasoningKey
|
||||
}
|
||||
|
||||
// getBuiltinAttributeFallback provides protocol compatibility fallback for question/answer attributes
|
||||
// getBuiltinAttributeDefaultSources returns the default value_source(s) for a built-in attribute
|
||||
// Returns nil if the key is not a built-in attribute
|
||||
func getBuiltinAttributeDefaultSources(key string) []string {
|
||||
switch key {
|
||||
case BuiltinQuestionKey:
|
||||
return []string{RequestBody}
|
||||
case BuiltinAnswerKey, BuiltinToolCallsKey, BuiltinReasoningKey:
|
||||
return []string{ResponseStreamingBody, ResponseBody}
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// shouldProcessBuiltinAttribute checks if a built-in attribute should be processed for the given source
|
||||
func shouldProcessBuiltinAttribute(key, configuredSource, currentSource string) bool {
|
||||
// If value_source is configured, use exact match
|
||||
if configuredSource != "" {
|
||||
return configuredSource == currentSource
|
||||
}
|
||||
// If value_source is not configured and it's a built-in attribute, check default sources
|
||||
defaultSources := getBuiltinAttributeDefaultSources(key)
|
||||
for _, src := range defaultSources {
|
||||
if src == currentSource {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// getBuiltinAttributeFallback provides protocol compatibility fallback for built-in attributes
|
||||
func getBuiltinAttributeFallback(ctx wrapper.HttpContext, config AIStatisticsConfig, key, source string, body []byte, rule string) interface{} {
|
||||
switch key {
|
||||
case BuiltinQuestionKey:
|
||||
@@ -603,6 +785,37 @@ func getBuiltinAttributeFallback(ctx wrapper.HttpContext, config AIStatisticsCon
|
||||
return value
|
||||
}
|
||||
}
|
||||
case BuiltinToolCallsKey:
|
||||
if source == ResponseStreamingBody {
|
||||
// Get or create buffer from context
|
||||
var buffer *StreamingToolCallsBuffer
|
||||
if existingBuffer, ok := ctx.GetContext(CtxStreamingToolCallsBuffer).(*StreamingToolCallsBuffer); ok {
|
||||
buffer = existingBuffer
|
||||
}
|
||||
buffer = extractStreamingToolCalls(body, buffer)
|
||||
ctx.SetContext(CtxStreamingToolCallsBuffer, buffer)
|
||||
|
||||
// Also set tool_calls to user attributes so they appear in ai_log
|
||||
toolCalls := getToolCallsFromBuffer(buffer)
|
||||
if len(toolCalls) > 0 {
|
||||
ctx.SetUserAttribute(BuiltinToolCallsKey, toolCalls)
|
||||
return toolCalls
|
||||
}
|
||||
} else if source == ResponseBody {
|
||||
if value := gjson.GetBytes(body, ToolCallsPathNonStreaming).Value(); value != nil {
|
||||
return value
|
||||
}
|
||||
}
|
||||
case BuiltinReasoningKey:
|
||||
if source == ResponseStreamingBody {
|
||||
if value := extractStreamingBodyByJsonPath(body, ReasoningPathStreaming, RuleAppend); value != nil && value != "" {
|
||||
return value
|
||||
}
|
||||
} else if source == ResponseBody {
|
||||
if value := gjson.GetBytes(body, ReasoningPathNonStreaming).Value(); value != nil && value != "" {
|
||||
return value
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -641,6 +854,61 @@ func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string) i
|
||||
return value
|
||||
}
|
||||
|
||||
// debugLogAiLog logs the current user attributes that will be written to ai_log
|
||||
func debugLogAiLog(ctx wrapper.HttpContext) {
|
||||
// Get all user attributes as a map
|
||||
userAttrs := make(map[string]interface{})
|
||||
|
||||
// Try to reconstruct from GetUserAttribute (note: this is best-effort)
|
||||
// The actual attributes are stored internally, we log what we know
|
||||
if question := ctx.GetUserAttribute("question"); question != nil {
|
||||
userAttrs["question"] = question
|
||||
}
|
||||
if answer := ctx.GetUserAttribute("answer"); answer != nil {
|
||||
userAttrs["answer"] = answer
|
||||
}
|
||||
if reasoning := ctx.GetUserAttribute("reasoning"); reasoning != nil {
|
||||
userAttrs["reasoning"] = reasoning
|
||||
}
|
||||
if toolCalls := ctx.GetUserAttribute("tool_calls"); toolCalls != nil {
|
||||
userAttrs["tool_calls"] = toolCalls
|
||||
}
|
||||
if messages := ctx.GetUserAttribute("messages"); messages != nil {
|
||||
userAttrs["messages"] = messages
|
||||
}
|
||||
if sessionId := ctx.GetUserAttribute("session_id"); sessionId != nil {
|
||||
userAttrs["session_id"] = sessionId
|
||||
}
|
||||
if model := ctx.GetUserAttribute("model"); model != nil {
|
||||
userAttrs["model"] = model
|
||||
}
|
||||
if inputToken := ctx.GetUserAttribute("input_token"); inputToken != nil {
|
||||
userAttrs["input_token"] = inputToken
|
||||
}
|
||||
if outputToken := ctx.GetUserAttribute("output_token"); outputToken != nil {
|
||||
userAttrs["output_token"] = outputToken
|
||||
}
|
||||
if totalToken := ctx.GetUserAttribute("total_token"); totalToken != nil {
|
||||
userAttrs["total_token"] = totalToken
|
||||
}
|
||||
if chatId := ctx.GetUserAttribute("chat_id"); chatId != nil {
|
||||
userAttrs["chat_id"] = chatId
|
||||
}
|
||||
if responseType := ctx.GetUserAttribute("response_type"); responseType != nil {
|
||||
userAttrs["response_type"] = responseType
|
||||
}
|
||||
if llmFirstTokenDuration := ctx.GetUserAttribute("llm_first_token_duration"); llmFirstTokenDuration != nil {
|
||||
userAttrs["llm_first_token_duration"] = llmFirstTokenDuration
|
||||
}
|
||||
if llmServiceDuration := ctx.GetUserAttribute("llm_service_duration"); llmServiceDuration != nil {
|
||||
userAttrs["llm_service_duration"] = llmServiceDuration
|
||||
}
|
||||
|
||||
// Log the attributes as JSON
|
||||
logJson, _ := json.Marshal(userAttrs)
|
||||
log.Debugf("[ai_log] attributes to be written: %s", string(logJson))
|
||||
}
|
||||
|
||||
// Set the tracing span with value.
|
||||
func setSpanAttribute(key string, value interface{}) {
|
||||
if value != "" {
|
||||
|
||||
Reference in New Issue
Block a user