From b76a3aca5e3ab20977fa7c3c5f256bcef727aea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BE=84=E6=BD=AD?= Date: Sun, 15 Feb 2026 17:23:54 +0800 Subject: [PATCH] feat(ai-statistics): add lightweight mode with use_default_response_attributes (#3512) --- .../extensions/ai-statistics/README.md | 68 ++++++-- .../wasm-go/extensions/ai-statistics/main.go | 164 ++++++++++++------ 2 files changed, 170 insertions(+), 62 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index 3ff2eb91e..59cdf6376 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -24,6 +24,8 @@ description: AI可观测配置参考 | 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | |----------------|-------|------|-----|------------------------| +| `use_default_attributes` | bool | 非必填 | false | 是否使用默认完整属性配置,包含 messages、answer、question 等所有字段。适用于调试、审计场景 | +| `use_default_response_attributes` | bool | 非必填 | false | 是否使用轻量级默认属性配置(推荐),只包含 token 统计,不缓冲请求体和响应体。适用于生产环境 | | `attributes` | []Attribute | 非必填 | - | 用户希望记录在log/span中的信息 | | `disable_openai_usage` | bool | 非必填 | false | 非openai兼容协议时,model、token的支持非标,配置为true时可以避免报错 | | `value_length_limit` | int | 非必填 | 4000 | 记录的单个value的长度限制 | @@ -67,6 +69,7 @@ Attribute 配置说明: | 内置属性键 | 说明 | 适用场景 | |---------|------|---------| | `question` | 用户提问内容 | 支持 OpenAI/Claude 消息格式 | +| `system` | 系统提示词 | 支持 Claude `/v1/messages` 的顶层 system 字段 | | `answer` | AI 回答内容 | 支持 OpenAI/Claude 消息格式,流式和非流式 | | `tool_calls` | 工具调用信息 | OpenAI/Claude 工具调用 | | `reasoning` | 推理过程 | OpenAI o1 等推理模型 | @@ -391,25 +394,66 @@ data: {"choices":[{"delta":{"tool_calls":[{"index":1,"function":{"arguments":"{\ ### 使用默认配置快速启用 +插件提供两种默认配置模式: + +#### 轻量模式(推荐用于生产环境) + +通过 `use_default_response_attributes: true` 启用轻量模式: + +```yaml +use_default_response_attributes: true +``` + +此配置是**推荐的生产环境配置**,**不会缓冲请求体和流式响应体**,只记录 token 统计: + +| 字段 | 说明 | +|------|------| +| `reasoning_tokens` | 推理 token 数 | +| `cached_tokens` | 缓存命中 token 数 | +| `input_token_details` | 输入 token 详情 | +| `output_token_details` | 输出 token 详情 | + +**为什么推荐轻量模式?** + +轻量模式完全不缓冲请求体和响应体,避免了高并发下的内存问题: +- **不缓冲请求体**:不提取 `messages`、`question`、`system` 等需要解析请求体的字段 +- **不缓冲流式响应体**:不提取 `answer`、`reasoning`、`tool_calls` 等需要缓冲完整响应的字段 +- **只统计 token**:从响应的 usage 字段提取 token 信息,不需要缓冲完整响应 + +**内存对比:** + +| 场景 | 完整模式内存占用 | 轻量模式内存占用 | +|------|------------------|------------------| +| 10KB 请求 + 5KB 响应 | ~15KB | ~0KB (不缓冲) | +| 1MB 请求 (长对话) + 500KB 响应 | ~1.5MB | ~0KB (不缓冲) | +| 高并发 1000 QPS | 可能 1GB+ | 极低 | + +**注意**:轻量模式下 `model` 字段可能无法从请求体提取(会尝试从 Gemini 风格路径提取),`chat_round` 字段为 0。 + +#### 完整模式 + 通过 `use_default_attributes: true` 可以一键启用完整的流式观测能力: ```yaml use_default_attributes: true ``` -此配置会自动记录以下字段: +此配置会自动记录以下字段,**但会缓冲完整的请求体和流式响应体**: -| 字段 | 说明 | -|------|------| -| `messages` | 完整对话历史 | -| `question` | 最后一条用户消息 | -| `answer` | AI 回答(自动拼接流式 chunk) | -| `reasoning` | 推理过程(自动拼接流式 chunk) | -| `tool_calls` | 工具调用(自动按 index 组装) | -| `reasoning_tokens` | 推理 token 数 | -| `cached_tokens` | 缓存命中 token 数 | -| `input_token_details` | 输入 token 详情 | -| `output_token_details` | 输出 token 详情 | +| 字段 | 说明 | 内存影响 | +|------|------|----------| +| `messages` | 完整对话历史 | ⚠️ 可能很大 | +| `question` | 最后一条用户消息 | 需要缓冲请求体 | +| `system` | 系统提示词 | 需要缓冲请求体 | +| `answer` | AI 回答(自动拼接流式 chunk) | ⚠️ 需要缓冲响应体 | +| `reasoning` | 推理过程(自动拼接流式 chunk) | ⚠️ 需要缓冲响应体 | +| `tool_calls` | 工具调用(自动按 index 组装) | 需要缓冲响应体 | +| `reasoning_tokens` | 推理 token 数 | 无 | +| `cached_tokens` | 缓存命中 token 数 | 无 | +| `input_token_details` | 输入 token 详情 | 无 | +| `output_token_details` | 输出 token 详情 | 无 | + +**注意**:完整模式适用于调试、审计等需要完整对话记录的场景,但在高并发生产环境可能消耗大量内存。 ### 流式日志示例 diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index 349cfdd10..e773b453a 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -140,6 +140,7 @@ const ( ) // getDefaultAttributes returns the default attributes configuration for empty config +// This includes all attributes but may consume significant memory for large conversations func getDefaultAttributes() []Attribute { return []Attribute{ // Extract complete conversation history from request body @@ -193,6 +194,31 @@ func getDefaultAttributes() []Attribute { } } +// getDefaultResponseAttributes returns a lightweight default attributes configuration +// that only includes response-time token statistics, avoiding any request body buffering +func getDefaultResponseAttributes() []Attribute { + return []Attribute{ + // Token statistics (auto-extracted from response) - no body buffering needed + { + Key: BuiltinReasoningTokens, + ApplyToLog: true, + }, + { + Key: BuiltinCachedTokens, + ApplyToLog: true, + }, + // Detailed token information + { + Key: BuiltinInputTokenDetails, + ApplyToLog: true, + }, + { + Key: BuiltinOutputTokenDetails, + ApplyToLog: true, + }, + } +} + // Default session ID headers in priority order var defaultSessionHeaders = []string{ "x-openclaw-session-key", @@ -327,6 +353,8 @@ type AIStatisticsConfig struct { attributes []Attribute // If there exist attributes extracted from streaming body, chunks should be buffered shouldBufferStreamingBody bool + // If there exist attributes extracted from request body, request body should be buffered + shouldBufferRequestBody bool // If disableOpenaiUsage is true, model/input_token/output_token logs will be skipped disableOpenaiUsage bool valueLengthLimit int @@ -421,6 +449,8 @@ func isContentTypeEnabled(contentType string, enabledContentTypes []string) bool func parseConfig(configJson gjson.Result, config *AIStatisticsConfig) error { // Check if use_default_attributes is enabled useDefaultAttributes := configJson.Get("use_default_attributes").Bool() + // Check if use_default_response_attributes is enabled (lightweight mode) + useDefaultResponseAttributes := configJson.Get("use_default_response_attributes").Bool() // Parse tracing span attributes setting. attributeConfigs := configJson.Get("attributes").Array() @@ -440,26 +470,13 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig) error { config.valueLengthLimit = 10485760 // 10MB } log.Infof("Using default attributes configuration") - // Check if any default attribute needs streaming body buffering - for _, attribute := range config.attributes { - if attribute.ValueSource == ResponseStreamingBody { - config.shouldBufferStreamingBody = true - break - } - // For built-in attributes without explicit ValueSource, check default sources - if attribute.ValueSource == "" && isBuiltinAttribute(attribute.Key) { - defaultSources := getBuiltinAttributeDefaultSources(attribute.Key) - for _, src := range defaultSources { - if src == ResponseStreamingBody { - config.shouldBufferStreamingBody = true - break - } - } - if config.shouldBufferStreamingBody { - break - } - } + } else if useDefaultResponseAttributes { + config.attributes = getDefaultResponseAttributes() + // Use a reasonable default for lightweight mode + if !configJson.Get("value_length_limit").Exists() { + config.valueLengthLimit = 4000 } + log.Infof("Using default response attributes configuration (lightweight mode)") } else { config.attributes = make([]Attribute, len(attributeConfigs)) for i, attributeConfig := range attributeConfigs { @@ -469,15 +486,38 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig) error { log.Errorf("parse config failed, %v", err) return err } - if attribute.ValueSource == ResponseStreamingBody { - config.shouldBufferStreamingBody = true - } if attribute.Rule != "" && attribute.Rule != RuleFirst && attribute.Rule != RuleReplace && attribute.Rule != RuleAppend { return errors.New("value of rule must be one of [nil, first, replace, append]") } config.attributes[i] = attribute } } + + // Check if any attribute needs request body or streaming body buffering + for _, attribute := range config.attributes { + // Check for request body buffering + if attribute.ValueSource == RequestBody { + config.shouldBufferRequestBody = true + } + // Check for streaming body buffering (explicitly configured) + if attribute.ValueSource == ResponseStreamingBody { + config.shouldBufferStreamingBody = true + } + // For built-in attributes without explicit ValueSource, check default sources + if attribute.ValueSource == "" && isBuiltinAttribute(attribute.Key) { + defaultSources := getBuiltinAttributeDefaultSources(attribute.Key) + for _, src := range defaultSources { + if src == RequestBody { + config.shouldBufferRequestBody = true + } + // Only answer/reasoning/tool_calls need actual body buffering + // Token-related attributes are extracted from context, not from body + if src == ResponseStreamingBody && needsBodyBuffering(attribute.Key) { + config.shouldBufferStreamingBody = true + } + } + } + } // Metric settings config.counterMetrics = make(map[string]proxywasm.MetricCounter) @@ -488,8 +528,8 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig) error { pathSuffixes := configJson.Get("enable_path_suffixes").Array() config.enablePathSuffixes = make([]string, 0, len(pathSuffixes)) - // If use_default_attributes is enabled and enable_path_suffixes is not configured, use default path suffixes - if useDefaultAttributes && !configJson.Get("enable_path_suffixes").Exists() { + // If use_default_attributes or use_default_response_attributes is enabled and enable_path_suffixes is not configured, use default path suffixes + if (useDefaultAttributes || useDefaultResponseAttributes) && !configJson.Get("enable_path_suffixes").Exists() { config.enablePathSuffixes = []string{"/completions", "/messages"} log.Infof("Using default path suffixes: /completions, /messages") } else { @@ -557,7 +597,10 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig) ty ctx.SetContext(ConsumerKey, consumer) } - ctx.SetRequestBodyBufferLimit(defaultMaxBodyBytes) + // Only buffer request body if there are attributes to extract from it + if config.shouldBufferRequestBody { + ctx.SetRequestBodyBufferLimit(defaultMaxBodyBytes) + } // Extract session ID from headers sessionId := extractSessionId(config.sessionIdHeader) @@ -581,13 +624,21 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body return types.ActionContinue } - // Set user defined log & span attributes. - setAttributeBySource(ctx, config, RequestBody, body) - // Set span attributes for ARMS. + // Only process request body if we need to extract attributes from it + if config.shouldBufferRequestBody && len(body) > 0 { + // Set user defined log & span attributes. + setAttributeBySource(ctx, config, RequestBody, body) + } + + // Extract model from request body if available, otherwise try path requestModel := "UNKNOWN" - if model := gjson.GetBytes(body, "model"); model.Exists() { - requestModel = model.String() - } else { + if len(body) > 0 { + if model := gjson.GetBytes(body, "model"); model.Exists() { + requestModel = model.String() + } + } + // If model not found in body, try to extract from path (Gemini style) + if requestModel == "UNKNOWN" { requestPath := ctx.GetStringContext(RequestPath, "") if strings.Contains(requestPath, "generateContent") || strings.Contains(requestPath, "streamGenerateContent") { // Google Gemini GenerateContent reg := regexp.MustCompile(`^.*/(?P[^/]+)/models/(?P[^:]+):\w+Content$`) @@ -599,21 +650,23 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body } ctx.SetContext(tokenusage.CtxKeyRequestModel, requestModel) setSpanAttribute(ArmsRequestModel, requestModel) - // Set the number of conversation rounds + // Set the number of conversation rounds (only if body is available) userPromptCount := 0 - if messages := gjson.GetBytes(body, "messages"); messages.Exists() && messages.IsArray() { - // OpenAI and Claude/Anthropic format - both use "messages" array with "role" field - for _, msg := range messages.Array() { - if msg.Get("role").String() == "user" { - userPromptCount += 1 + if len(body) > 0 { + if messages := gjson.GetBytes(body, "messages"); messages.Exists() && messages.IsArray() { + // OpenAI and Claude/Anthropic format - both use "messages" array with "role" field + for _, msg := range messages.Array() { + if msg.Get("role").String() == "user" { + userPromptCount += 1 + } } - } - } else if contents := gjson.GetBytes(body, "contents"); contents.Exists() && contents.IsArray() { - // Google Gemini GenerateContent - for _, content := range contents.Array() { - if !content.Get("role").Exists() || content.Get("role").String() == "user" { - userPromptCount += 1 + } else if contents := gjson.GetBytes(body, "contents"); contents.Exists() && contents.IsArray() { + // Google Gemini GenerateContent + for _, content := range contents.Array() { + if !content.Get("role").Exists() || content.Get("role").String() == "user" { + userPromptCount += 1 + } } } } @@ -710,14 +763,14 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat responseEndTime := time.Now().UnixMilli() ctx.SetUserAttribute(LLMServiceDuration, responseEndTime-requestStartTime) - // Set user defined log & span attributes. + // Set user defined log & span attributes from streaming body. + // Always call setAttributeBySource even if shouldBufferStreamingBody is false, + // because token-related attributes are extracted from context (not buffered body). + var streamingBodyBuffer []byte if config.shouldBufferStreamingBody { - streamingBodyBuffer, ok := ctx.GetContext(CtxStreamingBodyBuffer).([]byte) - if !ok { - return data - } - setAttributeBySource(ctx, config, ResponseStreamingBody, streamingBodyBuffer) + streamingBodyBuffer, _ = ctx.GetContext(CtxStreamingBodyBuffer).([]byte) } + setAttributeBySource(ctx, config, ResponseStreamingBody, streamingBodyBuffer) // Write log debugLogAiLog(ctx) @@ -884,8 +937,17 @@ func isBuiltinAttribute(key string) bool { key == BuiltinInputTokenDetails || key == BuiltinOutputTokenDetails } +// needsBodyBuffering checks if a built-in attribute needs body buffering +// Token-related attributes are extracted from context (set by tokenusage.GetTokenUsage), +// so they don't require buffering the response body. +func needsBodyBuffering(key string) bool { + return key == BuiltinAnswerKey || key == BuiltinToolCallsKey || key == BuiltinReasoningKey +} + // getBuiltinAttributeDefaultSources returns the default value_source(s) for a built-in attribute // Returns nil if the key is not a built-in attribute +// Note: Token-related attributes are extracted from context (set by tokenusage.GetTokenUsage), +// so they don't require body buffering even though they're processed during response phase. func getBuiltinAttributeDefaultSources(key string) []string { switch key { case BuiltinQuestionKey, BuiltinSystemKey: @@ -893,7 +955,9 @@ func getBuiltinAttributeDefaultSources(key string) []string { case BuiltinAnswerKey, BuiltinToolCallsKey, BuiltinReasoningKey: return []string{ResponseStreamingBody, ResponseBody} case BuiltinReasoningTokens, BuiltinCachedTokens, BuiltinInputTokenDetails, BuiltinOutputTokenDetails: - // Token details are only available after response is received + // Token details are extracted from context (set by tokenusage.GetTokenUsage), + // not from body parsing. We use ResponseStreamingBody/ResponseBody to indicate + // they should be processed during response phase, but they don't require body buffering. return []string{ResponseStreamingBody, ResponseBody} default: return nil