From 1b0ee6e837b7a96b003627ab35fbe034ad579e76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BE=84=E6=BD=AD?= Date: Sun, 1 Feb 2026 00:35:50 +0800 Subject: [PATCH] feat(ai-statistics): add session ID tracking for multi-turn agent conversations (#3420) --- .../extensions/ai-statistics/README.md | 76 ++- .../extensions/ai-statistics/README_EN.md | 76 ++- .../ai-statistics/fix_tool_calls.patch | 15 + .../wasm-go/extensions/ai-statistics/main.go | 340 +++++++++- .../extensions/ai-statistics/main_test.go | 636 ++++++++++++++++++ 5 files changed, 1105 insertions(+), 38 deletions(-) create mode 100644 plugins/wasm-go/extensions/ai-statistics/fix_tool_calls.patch diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index 1743191e0..6bf3c6663 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -29,6 +29,7 @@ description: AI可观测配置参考 | `value_length_limit` | int | 非必填 | 4000 | 记录的单个value的长度限制 | | `enable_path_suffixes` | []string | 非必填 | [] | 只对这些特定路径后缀的请求生效,可以配置为 "\*" 以匹配所有路径(通配符检查会优先进行以提高性能)。如果为空数组,则对所有路径生效 | | `enable_content_types` | []string | 非必填 | [] | 只对这些内容类型的响应进行缓冲处理。如果为空数组,则对所有内容类型生效 | +| `session_id_header` | string | 非必填 | - | 指定读取 session ID 的 header 名称。如果不配置,将按以下优先级自动查找:`x-openclaw-session-key`、`x-clawdbot-session-key`、`x-moltbot-session-key`、`x-agent-session`。session ID 可用于追踪多轮 Agent 对话 | Attribute 配置说明: @@ -134,6 +135,14 @@ irate(route_upstream_model_consumer_metric_llm_duration_count[2m]) } ``` +如果请求中携带了 session ID header,日志中会自动添加 `session_id` 字段: + +```json +{ + "ai_log": "{\"session_id\":\"sess_abc123\",\"model\":\"qwen-turbo\",\"input_token\":\"10\",\"output_token\":\"69\",\"llm_first_token_duration\":\"309\",\"llm_service_duration\":\"1955\"}" +} +``` + #### 链路追踪 配置为空时,不会在 span 中添加额外的 attribute @@ -198,9 +207,11 @@ attributes: ### 记录问题与回答 +#### 仅记录当前轮次的问题与回答 + ```yaml attributes: - - key: question # 记录问题 + - key: question # 记录当前轮次的问题(最后一条用户消息) value_source: request_body value: messages.@reverse.0.content apply_to_log: true @@ -215,6 +226,69 @@ attributes: apply_to_log: true ``` +#### 记录完整的多轮对话历史(推荐配置) + +对于多轮 Agent 对话场景,使用内置属性可以大幅简化配置: + +```yaml +session_id_header: "x-session-id" # 可选,指定 session ID header +attributes: + - key: messages # 完整对话历史 + value_source: request_body + value: messages + apply_to_log: true + - key: question # 内置属性,自动提取最后一条用户消息 + apply_to_log: true + - key: answer # 内置属性,自动提取回答 + apply_to_log: true + - key: reasoning # 内置属性,自动提取思考过程 + apply_to_log: true + - key: tool_calls # 内置属性,自动提取工具调用 + apply_to_log: true +``` + +**内置属性说明:** + +插件提供以下内置属性 key,无需配置 `value_source` 和 `value` 字段即可自动提取: + +| 内置 Key | 说明 | 默认 value_source | +|---------|------|-------------------| +| `question` | 自动提取最后一条用户消息 | `request_body` | +| `answer` | 自动提取回答内容(支持 OpenAI/Claude 协议) | `response_streaming_body` / `response_body` | +| `tool_calls` | 自动提取并拼接工具调用(流式场景自动按 index 拼接 arguments) | `response_streaming_body` / `response_body` | +| `reasoning` | 自动提取思考过程(reasoning_content,如 DeepSeek-R1) | `response_streaming_body` / `response_body` | + +> **注意**:如果配置了 `value_source` 和 `value`,将优先使用配置的值,以保持向后兼容。 + +日志输出示例: + +```json +{ + "ai_log": "{\"session_id\":\"sess_abc123\",\"messages\":[{\"role\":\"user\",\"content\":\"北京天气怎么样?\"}],\"question\":\"北京天气怎么样?\",\"reasoning\":\"用户想知道北京的天气,我需要调用天气查询工具。\",\"tool_calls\":[{\"index\":0,\"id\":\"call_abc123\",\"type\":\"function\",\"function\":{\"name\":\"get_weather\",\"arguments\":\"{\\\"location\\\":\\\"Beijing\\\"}\"}}],\"model\":\"deepseek-reasoner\"}" +} +``` + +**流式响应中的 tool_calls 处理:** + +插件会自动按 `index` 字段识别每个独立的工具调用,拼接分片返回的 `arguments` 字符串,最终输出完整的工具调用列表。 + +## 调试 + +### 验证 ai_log 内容 + +在测试或调试过程中,可以通过开启 Higress 的 debug 日志来验证 ai_log 的内容: + +```bash +# 日志格式示例 +2026/01/31 23:29:30 proxy_debug_log: [ai-statistics] [nil] [test-request-id] [ai_log] attributes to be written: {"question":"What is 2+2?","answer":"4","reasoning":"...","tool_calls":[...],"session_id":"sess_123","model":"gpt-4","input_token":20,"output_token":10} +``` + +通过这个debug日志可以验证: +- question/answer/reasoning 是否正确提取 +- tool_calls 是否正确拼接(特别是流式场景下的arguments) +- session_id 是否正确识别 +- 各个字段是否符合预期 + ## 进阶 配合阿里云 SLS 数据加工,可以将 ai 相关的字段进行提取加工,例如原始日志为: diff --git a/plugins/wasm-go/extensions/ai-statistics/README_EN.md b/plugins/wasm-go/extensions/ai-statistics/README_EN.md index aa807a367..32d4c2489 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README_EN.md +++ b/plugins/wasm-go/extensions/ai-statistics/README_EN.md @@ -29,6 +29,7 @@ Users can also expand observable values ​​through configuration: | `value_length_limit` | int | optional | 4000 | length limit for each value | | `enable_path_suffixes` | []string | optional | ["/v1/chat/completions","/v1/completions","/v1/embeddings","/v1/models","/generateContent","/streamGenerateContent"] | Only effective for requests with these specific path suffixes, can be configured as "\*" to match all paths | | `enable_content_types` | []string | optional | ["text/event-stream","application/json"] | Only buffer response body for these content types | +| `session_id_header` | string | optional | - | Specify the header name to read session ID from. If not configured, it will automatically search in the following priority: `x-openclaw-session-key`, `x-clawdbot-session-key`, `x-moltbot-session-key`, `x-agent-session`. Session ID can be used to trace multi-turn Agent conversations | Attribute Configuration instructions: @@ -134,10 +135,35 @@ irate(route_upstream_model_consumer_metric_llm_duration_count[2m]) } ``` +If the request contains a session ID header, the log will automatically include a `session_id` field: + +```json +{ + "ai_log": "{\"session_id\":\"sess_abc123\",\"model\":\"qwen-turbo\",\"input_token\":\"10\",\"output_token\":\"69\",\"llm_first_token_duration\":\"309\",\"llm_service_duration\":\"1955\"}" +} +``` + #### Trace When the configuration is empty, no additional attributes will be added to the span. +## Debugging + +### Verifying ai_log Content + +During testing or debugging, you can enable Higress debug logging to verify the ai_log content: + +```bash +# Log format example +2026/01/31 23:29:30 proxy_debug_log: [ai-statistics] [nil] [test-request-id] [ai_log] attributes to be written: {"question":"What is 2+2?","answer":"4","reasoning":"...","tool_calls":[...],"session_id":"sess_123","model":"gpt-4","input_token":20,"output_token":10} +``` + +This debug log allows you to verify: +- Whether question/answer/reasoning are correctly extracted +- Whether tool_calls are properly concatenated (especially arguments in streaming scenarios) +- Whether session_id is correctly identified +- Whether all fields match expectations + ### Extract token usage information from non-openai protocols When setting the protocol to original in ai-proxy, taking Alibaba Cloud Bailian as an example, you can make the following configuration to specify how to extract `model`, `input_token`, `output_token` @@ -194,9 +220,11 @@ attributes: ### Record questions and answers +#### Record only current turn's question and answer + ```yaml attributes: - - key: question + - key: question # Record the current turn's question (last user message) value_source: request_body value: messages.@reverse.0.content apply_to_log: true @@ -211,6 +239,52 @@ attributes: apply_to_log: true ``` +#### Record complete multi-turn conversation history (Recommended) + +For multi-turn Agent conversation scenarios, using built-in attributes greatly simplifies the configuration: + +```yaml +session_id_header: "x-session-id" # Optional, specify session ID header +attributes: + - key: messages # Complete conversation history + value_source: request_body + value: messages + apply_to_log: true + - key: question # Built-in, auto-extracts last user message + apply_to_log: true + - key: answer # Built-in, auto-extracts answer + apply_to_log: true + - key: reasoning # Built-in, auto-extracts reasoning process + apply_to_log: true + - key: tool_calls # Built-in, auto-extracts tool calls + apply_to_log: true +``` + +**Built-in Attributes:** + +The plugin provides the following built-in attribute keys that automatically extract values without configuring `value_source` and `value` fields: + +| Built-in Key | Description | Default value_source | +|-------------|-------------|----------------------| +| `question` | Automatically extracts the last user message | `request_body` | +| `answer` | Automatically extracts answer content (supports OpenAI/Claude protocols) | `response_streaming_body` / `response_body` | +| `tool_calls` | Automatically extracts and assembles tool calls (streaming scenarios auto-concatenate arguments by index) | `response_streaming_body` / `response_body` | +| `reasoning` | Automatically extracts reasoning process (reasoning_content, e.g., DeepSeek-R1) | `response_streaming_body` / `response_body` | + +> **Note**: If `value_source` and `value` are configured, the configured values take priority for backward compatibility. + +Example log output: + +```json +{ + "ai_log": "{\"session_id\":\"sess_abc123\",\"messages\":[{\"role\":\"user\",\"content\":\"What's the weather in Beijing?\"}],\"question\":\"What's the weather in Beijing?\",\"reasoning\":\"The user wants to know the weather in Beijing, I need to call the weather query tool.\",\"tool_calls\":[{\"index\":0,\"id\":\"call_abc123\",\"type\":\"function\",\"function\":{\"name\":\"get_weather\",\"arguments\":\"{\\\"location\\\":\\\"Beijing\\\"}\"}}],\"model\":\"deepseek-reasoner\"}" +} +``` + +**Streaming tool_calls handling:** + +The plugin automatically identifies each independent tool call by the `index` field, concatenates fragmented `arguments` strings, and outputs the complete tool call list. + ### Path and Content Type Filtering Configuration Examples #### Process Only Specific AI Paths diff --git a/plugins/wasm-go/extensions/ai-statistics/fix_tool_calls.patch b/plugins/wasm-go/extensions/ai-statistics/fix_tool_calls.patch new file mode 100644 index 000000000..4597a38d5 --- /dev/null +++ b/plugins/wasm-go/extensions/ai-statistics/fix_tool_calls.patch @@ -0,0 +1,15 @@ +--- a/main.go ++++ b/main.go +@@ -790,6 +790,14 @@ + buffer = extractStreamingToolCalls(body, buffer) + ctx.SetContext(CtxStreamingToolCallsBuffer, buffer) + ++ // Also set tool_calls to user attributes so they appear in ai_log ++ toolCalls := getToolCallsFromBuffer(buffer) ++ if len(toolCalls) > 0 { ++ ctx.SetUserAttribute(BuiltinToolCallsKey, toolCalls) ++ return toolCalls ++ } + } + } else if source == ResponseBody { + if value := gjson.GetBytes(body, ToolCallsPathNonStreaming).Value(); value != nil { diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index c509f0980..0f4cadbcc 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -48,6 +48,9 @@ const ( RequestPath = "request_path" SkipProcessing = "skip_processing" + // Session ID related + SessionID = "session_id" + // AI API Paths PathOpenAIChatCompletions = "/v1/chat/completions" PathOpenAICompletions = "/v1/completions" @@ -87,8 +90,10 @@ const ( RuleAppend = "append" // Built-in attributes - BuiltinQuestionKey = "question" - BuiltinAnswerKey = "answer" + BuiltinQuestionKey = "question" + BuiltinAnswerKey = "answer" + BuiltinToolCallsKey = "tool_calls" + BuiltinReasoningKey = "reasoning" // Built-in attribute paths // Question paths (from request body) @@ -102,8 +107,132 @@ const ( // Answer paths (from response streaming body) AnswerPathOpenAIStreaming = "choices.0.delta.content" AnswerPathClaudeStreaming = "delta.text" + + // Tool calls paths + ToolCallsPathNonStreaming = "choices.0.message.tool_calls" + ToolCallsPathStreaming = "choices.0.delta.tool_calls" + + // Reasoning paths + ReasoningPathNonStreaming = "choices.0.message.reasoning_content" + ReasoningPathStreaming = "choices.0.delta.reasoning_content" + + // Context key for streaming tool calls buffer + CtxStreamingToolCallsBuffer = "streamingToolCallsBuffer" ) +// Default session ID headers in priority order +var defaultSessionHeaders = []string{ + "x-openclaw-session-key", + "x-clawdbot-session-key", + "x-moltbot-session-key", + "x-agent-session", +} + +// extractSessionId extracts session ID from request headers +// If customHeader is configured, it takes priority; otherwise falls back to default headers +func extractSessionId(customHeader string) string { + // If custom header is configured, try it first + if customHeader != "" { + if sessionId, _ := proxywasm.GetHttpRequestHeader(customHeader); sessionId != "" { + return sessionId + } + } + // Fall back to default session headers in priority order + for _, header := range defaultSessionHeaders { + if sessionId, _ := proxywasm.GetHttpRequestHeader(header); sessionId != "" { + return sessionId + } + } + return "" +} + +// ToolCall represents a single tool call in the response +type ToolCall struct { + Index int `json:"index,omitempty"` + ID string `json:"id,omitempty"` + Type string `json:"type,omitempty"` + Function ToolCallFunction `json:"function,omitempty"` +} + +// ToolCallFunction represents the function details in a tool call +type ToolCallFunction struct { + Name string `json:"name,omitempty"` + Arguments string `json:"arguments,omitempty"` +} + +// StreamingToolCallsBuffer holds the state for assembling streaming tool calls +type StreamingToolCallsBuffer struct { + ToolCalls map[int]*ToolCall // keyed by index +} + +// extractStreamingToolCalls extracts and assembles tool calls from streaming response chunks +func extractStreamingToolCalls(data []byte, buffer *StreamingToolCallsBuffer) *StreamingToolCallsBuffer { + if buffer == nil { + buffer = &StreamingToolCallsBuffer{ + ToolCalls: make(map[int]*ToolCall), + } + } + + chunks := bytes.Split(bytes.TrimSpace(wrapper.UnifySSEChunk(data)), []byte("\n\n")) + for _, chunk := range chunks { + toolCallsResult := gjson.GetBytes(chunk, ToolCallsPathStreaming) + if !toolCallsResult.Exists() || !toolCallsResult.IsArray() { + continue + } + + for _, tcResult := range toolCallsResult.Array() { + index := int(tcResult.Get("index").Int()) + + // Get or create tool call entry + tc, exists := buffer.ToolCalls[index] + if !exists { + tc = &ToolCall{Index: index} + buffer.ToolCalls[index] = tc + } + + // Update fields if present + if id := tcResult.Get("id").String(); id != "" { + tc.ID = id + } + if tcType := tcResult.Get("type").String(); tcType != "" { + tc.Type = tcType + } + if funcName := tcResult.Get("function.name").String(); funcName != "" { + tc.Function.Name = funcName + } + // Append arguments (they come in chunks) + if args := tcResult.Get("function.arguments").String(); args != "" { + tc.Function.Arguments += args + } + } + } + + return buffer +} + +// getToolCallsFromBuffer converts the buffer to a sorted slice of tool calls +func getToolCallsFromBuffer(buffer *StreamingToolCallsBuffer) []ToolCall { + if buffer == nil || len(buffer.ToolCalls) == 0 { + return nil + } + + // Find max index to create properly sized slice + maxIndex := 0 + for idx := range buffer.ToolCalls { + if idx > maxIndex { + maxIndex = idx + } + } + + result := make([]ToolCall, 0, len(buffer.ToolCalls)) + for i := 0; i <= maxIndex; i++ { + if tc, exists := buffer.ToolCalls[i]; exists { + result = append(result, *tc) + } + } + return result +} + // TracingSpan is the tracing span configuration. type Attribute struct { Key string `json:"key"` @@ -132,6 +261,8 @@ type AIStatisticsConfig struct { enablePathSuffixes []string // Content types to enable response body buffering enableContentTypes []string + // Session ID header name (if configured, takes priority over default headers) + sessionIdHeader string } func generateMetricName(route, cluster, model, consumer, metricName string) string { @@ -272,6 +403,11 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig) error { config.enableContentTypes = append(config.enableContentTypes, contentTypeStr) } + // Parse session ID header configuration + if sessionIdHeader := configJson.Get("session_id_header"); sessionIdHeader.Exists() { + config.sessionIdHeader = sessionIdHeader.String() + } + return nil } @@ -307,6 +443,12 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig) ty ctx.SetRequestBodyBufferLimit(defaultMaxBodyBytes) + // Extract session ID from headers + sessionId := extractSessionId(config.sessionIdHeader) + if sessionId != "" { + ctx.SetUserAttribute(SessionID, sessionId) + } + // Set span attributes for ARMS. setSpanAttribute(ArmsSpanKind, "LLM") // Set user defined log & span attributes which type is fixed_value @@ -361,6 +503,7 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body ctx.SetUserAttribute(ChatRound, userPromptCount) // Write log + debugLogAiLog(ctx) _ = ctx.WriteUserAttributeToLogWithKey(wrapper.AILogKey) return types.ActionContinue } @@ -452,6 +595,7 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat } // Write log + debugLogAiLog(ctx) _ = ctx.WriteUserAttributeToLogWithKey(wrapper.AILogKey) // Write metrics @@ -497,6 +641,7 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body setAttributeBySource(ctx, config, ResponseBody, body) // Write log + debugLogAiLog(ctx) _ = ctx.WriteUserAttributeToLogWithKey(wrapper.AILogKey) // Write metrics @@ -511,8 +656,16 @@ func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, so for _, attribute := range config.attributes { var key string var value interface{} - if source == attribute.ValueSource { - key = attribute.Key + key = attribute.Key + + // Check if this attribute should be processed for the current source + // For built-in attributes without value_source configured, use default source matching + if !shouldProcessBuiltinAttribute(key, attribute.ValueSource, source) { + continue + } + + // If value is configured, try to extract using the configured path + if attribute.Value != "" { switch source { case FixedValue: value = attribute.Value @@ -528,52 +681,81 @@ func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, so value = gjson.GetBytes(body, attribute.Value).Value() default: } + } - // Handle built-in attributes with Claude/OpenAI protocol fallback logic - if (value == nil || value == "") && isBuiltinAttribute(key) { - value = getBuiltinAttributeFallback(ctx, config, key, source, body, attribute.Rule) - if value != nil && value != "" { - log.Debugf("[attribute] Used protocol fallback for %s: %+v", key, value) - } + // Handle built-in attributes: use fallback if value is empty or not configured + if (value == nil || value == "") && isBuiltinAttribute(key) { + value = getBuiltinAttributeFallback(ctx, config, key, source, body, attribute.Rule) + if value != nil && value != "" { + log.Debugf("[attribute] Used built-in extraction for %s: %+v", key, value) } + } - if (value == nil || value == "") && attribute.DefaultValue != "" { - value = attribute.DefaultValue - } - if len(fmt.Sprint(value)) > config.valueLengthLimit { - value = fmt.Sprint(value)[:config.valueLengthLimit/2] + " [truncated] " + fmt.Sprint(value)[len(fmt.Sprint(value))-config.valueLengthLimit/2:] - } - log.Debugf("[attribute] source type: %s, key: %s, value: %+v", source, key, value) - if attribute.ApplyToLog { - if attribute.AsSeparateLogField { - marshalledJsonStr := wrapper.MarshalStr(fmt.Sprint(value)) - if err := proxywasm.SetProperty([]string{key}, []byte(marshalledJsonStr)); err != nil { - log.Warnf("failed to set %s in filter state, raw is %s, err is %v", key, marshalledJsonStr, err) - } - } else { - ctx.SetUserAttribute(key, value) + if (value == nil || value == "") && attribute.DefaultValue != "" { + value = attribute.DefaultValue + } + if len(fmt.Sprint(value)) > config.valueLengthLimit { + value = fmt.Sprint(value)[:config.valueLengthLimit/2] + " [truncated] " + fmt.Sprint(value)[len(fmt.Sprint(value))-config.valueLengthLimit/2:] + } + log.Debugf("[attribute] source type: %s, key: %s, value: %+v", source, key, value) + if attribute.ApplyToLog { + if attribute.AsSeparateLogField { + marshalledJsonStr := wrapper.MarshalStr(fmt.Sprint(value)) + if err := proxywasm.SetProperty([]string{key}, []byte(marshalledJsonStr)); err != nil { + log.Warnf("failed to set %s in filter state, raw is %s, err is %v", key, marshalledJsonStr, err) } + } else { + ctx.SetUserAttribute(key, value) } - // for metrics - if key == tokenusage.CtxKeyModel || key == tokenusage.CtxKeyInputToken || key == tokenusage.CtxKeyOutputToken || key == tokenusage.CtxKeyTotalToken { - ctx.SetContext(key, value) - } - if attribute.ApplyToSpan { - if attribute.TraceSpanKey != "" { - key = attribute.TraceSpanKey - } - setSpanAttribute(key, value) + } + // for metrics + if key == tokenusage.CtxKeyModel || key == tokenusage.CtxKeyInputToken || key == tokenusage.CtxKeyOutputToken || key == tokenusage.CtxKeyTotalToken { + ctx.SetContext(key, value) + } + if attribute.ApplyToSpan { + if attribute.TraceSpanKey != "" { + key = attribute.TraceSpanKey } + setSpanAttribute(key, value) } } } // isBuiltinAttribute checks if the given key is a built-in attribute func isBuiltinAttribute(key string) bool { - return key == BuiltinQuestionKey || key == BuiltinAnswerKey + return key == BuiltinQuestionKey || key == BuiltinAnswerKey || key == BuiltinToolCallsKey || key == BuiltinReasoningKey } -// getBuiltinAttributeFallback provides protocol compatibility fallback for question/answer attributes +// getBuiltinAttributeDefaultSources returns the default value_source(s) for a built-in attribute +// Returns nil if the key is not a built-in attribute +func getBuiltinAttributeDefaultSources(key string) []string { + switch key { + case BuiltinQuestionKey: + return []string{RequestBody} + case BuiltinAnswerKey, BuiltinToolCallsKey, BuiltinReasoningKey: + return []string{ResponseStreamingBody, ResponseBody} + default: + return nil + } +} + +// shouldProcessBuiltinAttribute checks if a built-in attribute should be processed for the given source +func shouldProcessBuiltinAttribute(key, configuredSource, currentSource string) bool { + // If value_source is configured, use exact match + if configuredSource != "" { + return configuredSource == currentSource + } + // If value_source is not configured and it's a built-in attribute, check default sources + defaultSources := getBuiltinAttributeDefaultSources(key) + for _, src := range defaultSources { + if src == currentSource { + return true + } + } + return false +} + +// getBuiltinAttributeFallback provides protocol compatibility fallback for built-in attributes func getBuiltinAttributeFallback(ctx wrapper.HttpContext, config AIStatisticsConfig, key, source string, body []byte, rule string) interface{} { switch key { case BuiltinQuestionKey: @@ -603,6 +785,37 @@ func getBuiltinAttributeFallback(ctx wrapper.HttpContext, config AIStatisticsCon return value } } + case BuiltinToolCallsKey: + if source == ResponseStreamingBody { + // Get or create buffer from context + var buffer *StreamingToolCallsBuffer + if existingBuffer, ok := ctx.GetContext(CtxStreamingToolCallsBuffer).(*StreamingToolCallsBuffer); ok { + buffer = existingBuffer + } + buffer = extractStreamingToolCalls(body, buffer) + ctx.SetContext(CtxStreamingToolCallsBuffer, buffer) + + // Also set tool_calls to user attributes so they appear in ai_log + toolCalls := getToolCallsFromBuffer(buffer) + if len(toolCalls) > 0 { + ctx.SetUserAttribute(BuiltinToolCallsKey, toolCalls) + return toolCalls + } + } else if source == ResponseBody { + if value := gjson.GetBytes(body, ToolCallsPathNonStreaming).Value(); value != nil { + return value + } + } + case BuiltinReasoningKey: + if source == ResponseStreamingBody { + if value := extractStreamingBodyByJsonPath(body, ReasoningPathStreaming, RuleAppend); value != nil && value != "" { + return value + } + } else if source == ResponseBody { + if value := gjson.GetBytes(body, ReasoningPathNonStreaming).Value(); value != nil && value != "" { + return value + } + } } return nil } @@ -641,6 +854,61 @@ func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string) i return value } +// debugLogAiLog logs the current user attributes that will be written to ai_log +func debugLogAiLog(ctx wrapper.HttpContext) { + // Get all user attributes as a map + userAttrs := make(map[string]interface{}) + + // Try to reconstruct from GetUserAttribute (note: this is best-effort) + // The actual attributes are stored internally, we log what we know + if question := ctx.GetUserAttribute("question"); question != nil { + userAttrs["question"] = question + } + if answer := ctx.GetUserAttribute("answer"); answer != nil { + userAttrs["answer"] = answer + } + if reasoning := ctx.GetUserAttribute("reasoning"); reasoning != nil { + userAttrs["reasoning"] = reasoning + } + if toolCalls := ctx.GetUserAttribute("tool_calls"); toolCalls != nil { + userAttrs["tool_calls"] = toolCalls + } + if messages := ctx.GetUserAttribute("messages"); messages != nil { + userAttrs["messages"] = messages + } + if sessionId := ctx.GetUserAttribute("session_id"); sessionId != nil { + userAttrs["session_id"] = sessionId + } + if model := ctx.GetUserAttribute("model"); model != nil { + userAttrs["model"] = model + } + if inputToken := ctx.GetUserAttribute("input_token"); inputToken != nil { + userAttrs["input_token"] = inputToken + } + if outputToken := ctx.GetUserAttribute("output_token"); outputToken != nil { + userAttrs["output_token"] = outputToken + } + if totalToken := ctx.GetUserAttribute("total_token"); totalToken != nil { + userAttrs["total_token"] = totalToken + } + if chatId := ctx.GetUserAttribute("chat_id"); chatId != nil { + userAttrs["chat_id"] = chatId + } + if responseType := ctx.GetUserAttribute("response_type"); responseType != nil { + userAttrs["response_type"] = responseType + } + if llmFirstTokenDuration := ctx.GetUserAttribute("llm_first_token_duration"); llmFirstTokenDuration != nil { + userAttrs["llm_first_token_duration"] = llmFirstTokenDuration + } + if llmServiceDuration := ctx.GetUserAttribute("llm_service_duration"); llmServiceDuration != nil { + userAttrs["llm_service_duration"] = llmServiceDuration + } + + // Log the attributes as JSON + logJson, _ := json.Marshal(userAttrs) + log.Debugf("[ai_log] attributes to be written: %s", string(logJson)) +} + // Set the tracing span with value. func setSpanAttribute(key string, value interface{}) { if value != "" { diff --git a/plugins/wasm-go/extensions/ai-statistics/main_test.go b/plugins/wasm-go/extensions/ai-statistics/main_test.go index 9c9c829e0..b5a021f97 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main_test.go +++ b/plugins/wasm-go/extensions/ai-statistics/main_test.go @@ -981,3 +981,639 @@ func TestCompleteFlow(t *testing.T) { }) }) } + +// ==================== Built-in Attributes Tests ==================== + +// 测试配置:历史兼容配置(显式配置 value_source 和 value) +var legacyQuestionAnswerConfig = func() json.RawMessage { + data, _ := json.Marshal(map[string]interface{}{ + "attributes": []map[string]interface{}{ + { + "key": "question", + "value_source": "request_body", + "value": "messages.@reverse.0.content", + "apply_to_log": true, + }, + { + "key": "answer", + "value_source": "response_streaming_body", + "value": "choices.0.delta.content", + "rule": "append", + "apply_to_log": true, + }, + { + "key": "answer", + "value_source": "response_body", + "value": "choices.0.message.content", + "apply_to_log": true, + }, + }, + }) + return data +}() + +// 测试配置:内置属性简化配置(不配置 value_source 和 value) +var builtinAttributesConfig = func() json.RawMessage { + data, _ := json.Marshal(map[string]interface{}{ + "attributes": []map[string]interface{}{ + { + "key": "question", + "apply_to_log": true, + }, + { + "key": "answer", + "apply_to_log": true, + }, + { + "key": "reasoning", + "apply_to_log": true, + }, + { + "key": "tool_calls", + "apply_to_log": true, + }, + }, + }) + return data +}() + +// 测试配置:session_id 配置 +var sessionIdConfig = func() json.RawMessage { + data, _ := json.Marshal(map[string]interface{}{ + "session_id_header": "x-custom-session", + "attributes": []map[string]interface{}{ + { + "key": "question", + "apply_to_log": true, + }, + { + "key": "answer", + "apply_to_log": true, + }, + }, + }) + return data +}() + +// TestLegacyConfigCompatibility 测试历史配置兼容性 +func TestLegacyConfigCompatibility(t *testing.T) { + test.RunTest(t, func(t *testing.T) { + // 测试使用显式 value_source 和 value 配置的 question/answer + t.Run("legacy question answer config", func(t *testing.T) { + host, status := test.NewTestHost(legacyQuestionAnswerConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 1. 处理请求头 + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + }) + + // 2. 处理请求体 + requestBody := []byte(`{ + "model": "gpt-4", + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "What is 2+2?"} + ] + }`) + action := host.CallOnHttpRequestBody(requestBody) + require.Equal(t, types.ActionContinue, action) + + // 3. 处理响应头 (非流式) + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"content-type", "application/json"}, + }) + + // 4. 处理响应体 + responseBody := []byte(`{ + "choices": [{"message": {"role": "assistant", "content": "2+2 equals 4."}}], + "model": "gpt-4", + "usage": {"prompt_tokens": 20, "completion_tokens": 10, "total_tokens": 30} + }`) + action = host.CallOnHttpResponseBody(responseBody) + require.Equal(t, types.ActionContinue, action) + + host.CompleteHttp() + }) + + // 测试使用显式配置的流式响应 + t.Run("legacy streaming answer config", func(t *testing.T) { + host, status := test.NewTestHost(legacyQuestionAnswerConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 1. 处理请求头 + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + }) + + // 2. 处理请求体 + requestBody := []byte(`{ + "model": "gpt-4", + "stream": true, + "messages": [{"role": "user", "content": "Hello"}] + }`) + host.CallOnHttpRequestBody(requestBody) + + // 3. 处理流式响应头 + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"content-type", "text/event-stream"}, + }) + + // 4. 处理流式响应体 + chunk1 := []byte(`data: {"choices":[{"delta":{"content":"Hello"}}]}`) + host.CallOnHttpStreamingResponseBody(chunk1, false) + + chunk2 := []byte(`data: {"choices":[{"delta":{"content":" there!"}}]}`) + host.CallOnHttpStreamingResponseBody(chunk2, true) + + host.CompleteHttp() + }) + }) +} + +// TestBuiltinAttributesDefaultSource 测试内置属性的默认 value_source +func TestBuiltinAttributesDefaultSource(t *testing.T) { + test.RunTest(t, func(t *testing.T) { + // 测试不配置 value_source 的内置属性(非流式响应) + t.Run("builtin attributes non-streaming", func(t *testing.T) { + host, status := test.NewTestHost(builtinAttributesConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 1. 处理请求头 + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + }) + + // 2. 处理请求体 - question 应该自动从 request_body 提取 + requestBody := []byte(`{ + "model": "deepseek-reasoner", + "messages": [ + {"role": "user", "content": "What is the capital of France?"} + ] + }`) + action := host.CallOnHttpRequestBody(requestBody) + require.Equal(t, types.ActionContinue, action) + + // 3. 处理响应头 (非流式) + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"content-type", "application/json"}, + }) + + // 4. 处理响应体 - answer, reasoning, tool_calls 应该自动从 response_body 提取 + responseBody := []byte(`{ + "choices": [{ + "message": { + "role": "assistant", + "content": "The capital of France is Paris.", + "reasoning_content": "The user is asking about geography. France is a country in Europe, and its capital city is Paris." + } + }], + "model": "deepseek-reasoner", + "usage": {"prompt_tokens": 15, "completion_tokens": 25, "total_tokens": 40} + }`) + action = host.CallOnHttpResponseBody(responseBody) + require.Equal(t, types.ActionContinue, action) + + host.CompleteHttp() + }) + + // 测试不配置 value_source 的内置属性(流式响应) + t.Run("builtin attributes streaming", func(t *testing.T) { + host, status := test.NewTestHost(builtinAttributesConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 1. 处理请求头 + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + }) + + // 2. 处理请求体 + requestBody := []byte(`{ + "model": "deepseek-reasoner", + "stream": true, + "messages": [{"role": "user", "content": "Tell me a joke"}] + }`) + host.CallOnHttpRequestBody(requestBody) + + // 3. 处理流式响应头 + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"content-type", "text/event-stream"}, + }) + + // 4. 处理流式响应体 - answer, reasoning 应该自动从 response_streaming_body 提取 + chunk1 := []byte(`data: {"choices":[{"delta":{"reasoning_content":"Let me think of a good joke..."}}]}`) + host.CallOnHttpStreamingResponseBody(chunk1, false) + + chunk2 := []byte(`data: {"choices":[{"delta":{"content":"Why did the chicken"}}]}`) + host.CallOnHttpStreamingResponseBody(chunk2, false) + + chunk3 := []byte(`data: {"choices":[{"delta":{"content":" cross the road?"}}]}`) + host.CallOnHttpStreamingResponseBody(chunk3, true) + + host.CompleteHttp() + }) + }) +} + +// TestStreamingToolCalls 测试流式 tool_calls 解析 +func TestStreamingToolCalls(t *testing.T) { + test.RunTest(t, func(t *testing.T) { + // 测试流式 tool_calls 拼接 + t.Run("streaming tool calls assembly", func(t *testing.T) { + host, status := test.NewTestHost(builtinAttributesConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 1. 处理请求头 + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + }) + + // 2. 处理请求体 + requestBody := []byte(`{ + "model": "gpt-4", + "stream": true, + "messages": [{"role": "user", "content": "What's the weather in Beijing?"}], + "tools": [{"type": "function", "function": {"name": "get_weather"}}] + }`) + host.CallOnHttpRequestBody(requestBody) + + // 3. 处理流式响应头 + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"content-type", "text/event-stream"}, + }) + + // 4. 处理流式响应体 - 模拟分片的 tool_calls + // 第一个 chunk: tool call 的 id 和 function name + chunk1 := []byte(`data: {"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_abc123","type":"function","function":{"name":"get_weather","arguments":""}}]}}]}`) + host.CallOnHttpStreamingResponseBody(chunk1, false) + + // 第二个 chunk: arguments 的第一部分 + chunk2 := []byte(`data: {"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"locat"}}]}}]}`) + host.CallOnHttpStreamingResponseBody(chunk2, false) + + // 第三个 chunk: arguments 的第二部分 + chunk3 := []byte(`data: {"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"ion\": \"Bei"}}]}}]}`) + host.CallOnHttpStreamingResponseBody(chunk3, false) + + // 第四个 chunk: arguments 的最后部分 + chunk4 := []byte(`data: {"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"jing\"}"}}]}}]}`) + host.CallOnHttpStreamingResponseBody(chunk4, false) + + // 最后一个 chunk: 结束 + chunk5 := []byte(`data: {"choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`) + host.CallOnHttpStreamingResponseBody(chunk5, true) + + host.CompleteHttp() + }) + + // 测试多个 tool_calls 的流式拼接 + t.Run("multiple streaming tool calls", func(t *testing.T) { + host, status := test.NewTestHost(builtinAttributesConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 1. 处理请求头 + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + }) + + // 2. 处理请求体 + requestBody := []byte(`{ + "model": "gpt-4", + "stream": true, + "messages": [{"role": "user", "content": "Compare weather in Beijing and Shanghai"}] + }`) + host.CallOnHttpRequestBody(requestBody) + + // 3. 处理流式响应头 + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"content-type", "text/event-stream"}, + }) + + // 4. 处理流式响应体 - 模拟多个 tool_calls + // 第一个 tool call + chunk1 := []byte(`data: {"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_001","type":"function","function":{"name":"get_weather","arguments":""}}]}}]}`) + host.CallOnHttpStreamingResponseBody(chunk1, false) + + // 第二个 tool call + chunk2 := []byte(`data: {"choices":[{"delta":{"tool_calls":[{"index":1,"id":"call_002","type":"function","function":{"name":"get_weather","arguments":""}}]}}]}`) + host.CallOnHttpStreamingResponseBody(chunk2, false) + + // 第一个 tool call 的 arguments + chunk3 := []byte(`data: {"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"location\":\"Beijing\"}"}}]}}]}`) + host.CallOnHttpStreamingResponseBody(chunk3, false) + + // 第二个 tool call 的 arguments + chunk4 := []byte(`data: {"choices":[{"delta":{"tool_calls":[{"index":1,"function":{"arguments":"{\"location\":\"Shanghai\"}"}}]}}]}`) + host.CallOnHttpStreamingResponseBody(chunk4, false) + + // 结束 + chunk5 := []byte(`data: {"choices":[{"delta":{},"finish_reason":"tool_calls"}]}`) + host.CallOnHttpStreamingResponseBody(chunk5, true) + + host.CompleteHttp() + }) + + // 测试非流式 tool_calls + t.Run("non-streaming tool calls", func(t *testing.T) { + host, status := test.NewTestHost(builtinAttributesConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 1. 处理请求头 + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + }) + + // 2. 处理请求体 + requestBody := []byte(`{ + "model": "gpt-4", + "messages": [{"role": "user", "content": "What's the weather?"}] + }`) + host.CallOnHttpRequestBody(requestBody) + + // 3. 处理响应头 + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"content-type", "application/json"}, + }) + + // 4. 处理响应体 - 非流式 tool_calls + responseBody := []byte(`{ + "choices": [{ + "message": { + "role": "assistant", + "content": null, + "tool_calls": [{ + "id": "call_abc123", + "type": "function", + "function": { + "name": "get_weather", + "arguments": "{\"location\": \"Beijing\"}" + } + }] + }, + "finish_reason": "tool_calls" + }], + "model": "gpt-4", + "usage": {"prompt_tokens": 20, "completion_tokens": 15, "total_tokens": 35} + }`) + action := host.CallOnHttpResponseBody(responseBody) + require.Equal(t, types.ActionContinue, action) + + host.CompleteHttp() + }) + }) +} + +// TestSessionIdExtraction 测试 session_id 提取 +func TestSessionIdExtraction(t *testing.T) { + test.RunTest(t, func(t *testing.T) { + // 测试自定义 session_id header + t.Run("custom session id header", func(t *testing.T) { + host, status := test.NewTestHost(sessionIdConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 处理请求头 - 带自定义 session header + action := host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + {"x-custom-session", "sess_custom_123"}, + }) + require.Equal(t, types.ActionContinue, action) + + host.CompleteHttp() + }) + + // 测试默认 session_id headers 优先级 + t.Run("default session id headers priority", func(t *testing.T) { + host, status := test.NewTestHost(builtinAttributesConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 处理请求头 - 带多个默认 session headers,应该使用优先级最高的 + action := host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + {"x-agent-session", "sess_agent_456"}, + {"x-clawdbot-session-key", "sess_clawdbot_789"}, + {"x-openclaw-session-key", "sess_openclaw_123"}, // 最高优先级 + }) + require.Equal(t, types.ActionContinue, action) + + host.CompleteHttp() + }) + + // 测试 fallback 到次优先级 header + t.Run("session id fallback", func(t *testing.T) { + host, status := test.NewTestHost(builtinAttributesConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 处理请求头 - 只有低优先级的 session header + action := host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + {"x-agent-session", "sess_agent_only"}, + }) + require.Equal(t, types.ActionContinue, action) + + host.CompleteHttp() + }) + }) +} + +// TestExtractStreamingToolCalls 单独测试 extractStreamingToolCalls 函数 +func TestExtractStreamingToolCalls(t *testing.T) { + t.Run("single tool call assembly", func(t *testing.T) { + // 模拟流式 chunks + chunks := [][]byte{ + []byte(`{"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_123","type":"function","function":{"name":"get_weather","arguments":""}}]}}]}`), + []byte(`{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"loc"}}]}}]}`), + []byte(`{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"ation"}}]}}]}`), + []byte(`{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"\":\"Beijing\"}"}}]}}]}`), + } + + var buffer *StreamingToolCallsBuffer + for _, chunk := range chunks { + buffer = extractStreamingToolCalls(chunk, buffer) + } + + toolCalls := getToolCallsFromBuffer(buffer) + require.Len(t, toolCalls, 1) + require.Equal(t, "call_123", toolCalls[0].ID) + require.Equal(t, "function", toolCalls[0].Type) + require.Equal(t, "get_weather", toolCalls[0].Function.Name) + require.Equal(t, `{"location":"Beijing"}`, toolCalls[0].Function.Arguments) + }) + + t.Run("multiple tool calls assembly", func(t *testing.T) { + chunks := [][]byte{ + []byte(`{"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_001","type":"function","function":{"name":"get_weather","arguments":""}}]}}]}`), + []byte(`{"choices":[{"delta":{"tool_calls":[{"index":1,"id":"call_002","type":"function","function":{"name":"get_time","arguments":""}}]}}]}`), + []byte(`{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"city\":\"Beijing\"}"}}]}}]}`), + []byte(`{"choices":[{"delta":{"tool_calls":[{"index":1,"function":{"arguments":"{\"timezone\":\"UTC+8\"}"}}]}}]}`), + } + + var buffer *StreamingToolCallsBuffer + for _, chunk := range chunks { + buffer = extractStreamingToolCalls(chunk, buffer) + } + + toolCalls := getToolCallsFromBuffer(buffer) + require.Len(t, toolCalls, 2) + + // 验证第一个 tool call + require.Equal(t, "call_001", toolCalls[0].ID) + require.Equal(t, "get_weather", toolCalls[0].Function.Name) + require.Equal(t, `{"city":"Beijing"}`, toolCalls[0].Function.Arguments) + + // 验证第二个 tool call + require.Equal(t, "call_002", toolCalls[1].ID) + require.Equal(t, "get_time", toolCalls[1].Function.Name) + require.Equal(t, `{"timezone":"UTC+8"}`, toolCalls[1].Function.Arguments) + }) + + t.Run("empty chunks", func(t *testing.T) { + chunks := [][]byte{ + []byte(`{"choices":[{"delta":{}}]}`), + []byte(`{"choices":[{"delta":{"content":"Hello"}}]}`), + } + + var buffer *StreamingToolCallsBuffer + for _, chunk := range chunks { + buffer = extractStreamingToolCalls(chunk, buffer) + } + + toolCalls := getToolCallsFromBuffer(buffer) + require.Len(t, toolCalls, 0) + }) +} + +// TestBuiltinAttributeHelpers 测试内置属性辅助函数 +func TestBuiltinAttributeHelpers(t *testing.T) { + t.Run("isBuiltinAttribute", func(t *testing.T) { + require.True(t, isBuiltinAttribute("question")) + require.True(t, isBuiltinAttribute("answer")) + require.True(t, isBuiltinAttribute("tool_calls")) + require.True(t, isBuiltinAttribute("reasoning")) + require.False(t, isBuiltinAttribute("custom_key")) + require.False(t, isBuiltinAttribute("model")) + }) + + t.Run("getBuiltinAttributeDefaultSources", func(t *testing.T) { + // question 应该默认从 request_body 提取 + questionSources := getBuiltinAttributeDefaultSources("question") + require.Equal(t, []string{RequestBody}, questionSources) + + // answer 应该支持 streaming 和 non-streaming + answerSources := getBuiltinAttributeDefaultSources("answer") + require.Contains(t, answerSources, ResponseStreamingBody) + require.Contains(t, answerSources, ResponseBody) + + // tool_calls 应该支持 streaming 和 non-streaming + toolCallsSources := getBuiltinAttributeDefaultSources("tool_calls") + require.Contains(t, toolCallsSources, ResponseStreamingBody) + require.Contains(t, toolCallsSources, ResponseBody) + + // reasoning 应该支持 streaming 和 non-streaming + reasoningSources := getBuiltinAttributeDefaultSources("reasoning") + require.Contains(t, reasoningSources, ResponseStreamingBody) + require.Contains(t, reasoningSources, ResponseBody) + + // 非内置属性应该返回 nil + customSources := getBuiltinAttributeDefaultSources("custom_key") + require.Nil(t, customSources) + }) + + t.Run("shouldProcessBuiltinAttribute", func(t *testing.T) { + // 配置了 value_source 时,应该精确匹配 + require.True(t, shouldProcessBuiltinAttribute("question", RequestBody, RequestBody)) + require.False(t, shouldProcessBuiltinAttribute("question", RequestBody, ResponseBody)) + + // 没有配置 value_source 时,内置属性应该使用默认 source + require.True(t, shouldProcessBuiltinAttribute("question", "", RequestBody)) + require.False(t, shouldProcessBuiltinAttribute("question", "", ResponseBody)) + + require.True(t, shouldProcessBuiltinAttribute("answer", "", ResponseBody)) + require.True(t, shouldProcessBuiltinAttribute("answer", "", ResponseStreamingBody)) + require.False(t, shouldProcessBuiltinAttribute("answer", "", RequestBody)) + + // 非内置属性没有配置 value_source 时,不应该处理 + require.False(t, shouldProcessBuiltinAttribute("custom_key", "", RequestBody)) + require.False(t, shouldProcessBuiltinAttribute("custom_key", "", ResponseBody)) + }) +} + +// TestSessionIdDebugOutput 演示session_id的debug日志输出 +func TestSessionIdDebugOutput(t *testing.T) { + test.RunTest(t, func(t *testing.T) { + t.Run("session id with full flow", func(t *testing.T) { + host, status := test.NewTestHost(sessionIdConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 1. 处理请求头 - 带 session_id + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + {"x-custom-session", "sess_abc123xyz"}, + }) + + // 2. 处理请求体 + requestBody := []byte(`{ + "model": "gpt-4", + "messages": [ + {"role": "user", "content": "What is 2+2?"} + ] + }`) + host.CallOnHttpRequestBody(requestBody) + + // 3. 处理响应头 + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"content-type", "application/json"}, + }) + + // 4. 处理响应体 + responseBody := []byte(`{ + "choices": [{"message": {"role": "assistant", "content": "2+2 equals 4."}}], + "model": "gpt-4", + "usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15} + }`) + host.CallOnHttpResponseBody(responseBody) + + host.CompleteHttp() + }) + }) +}