feat(ai-statistics): add lightweight mode with use_default_response_attributes (#3512)

This commit is contained in:
澄潭
2026-02-15 17:23:54 +08:00
committed by GitHub
parent 28df33c596
commit b76a3aca5e
2 changed files with 170 additions and 62 deletions

View File

@@ -24,6 +24,8 @@ description: AI可观测配置参考
| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 |
|----------------|-------|------|-----|------------------------|
| `use_default_attributes` | bool | 非必填 | false | 是否使用默认完整属性配置,包含 messages、answer、question 等所有字段。适用于调试、审计场景 |
| `use_default_response_attributes` | bool | 非必填 | false | 是否使用轻量级默认属性配置(推荐),只包含 token 统计,不缓冲请求体和响应体。适用于生产环境 |
| `attributes` | []Attribute | 非必填 | - | 用户希望记录在log/span中的信息 |
| `disable_openai_usage` | bool | 非必填 | false | 非openai兼容协议时model、token的支持非标配置为true时可以避免报错 |
| `value_length_limit` | int | 非必填 | 4000 | 记录的单个value的长度限制 |
@@ -67,6 +69,7 @@ Attribute 配置说明:
| 内置属性键 | 说明 | 适用场景 |
|---------|------|---------|
| `question` | 用户提问内容 | 支持 OpenAI/Claude 消息格式 |
| `system` | 系统提示词 | 支持 Claude `/v1/messages` 的顶层 system 字段 |
| `answer` | AI 回答内容 | 支持 OpenAI/Claude 消息格式,流式和非流式 |
| `tool_calls` | 工具调用信息 | OpenAI/Claude 工具调用 |
| `reasoning` | 推理过程 | OpenAI o1 等推理模型 |
@@ -391,25 +394,66 @@ data: {"choices":[{"delta":{"tool_calls":[{"index":1,"function":{"arguments":"{\
### 使用默认配置快速启用
插件提供两种默认配置模式:
#### 轻量模式(推荐用于生产环境)
通过 `use_default_response_attributes: true` 启用轻量模式:
```yaml
use_default_response_attributes: true
```
此配置是**推荐的生产环境配置****不会缓冲请求体和流式响应体**,只记录 token 统计:
| 字段 | 说明 |
|------|------|
| `reasoning_tokens` | 推理 token 数 |
| `cached_tokens` | 缓存命中 token 数 |
| `input_token_details` | 输入 token 详情 |
| `output_token_details` | 输出 token 详情 |
**为什么推荐轻量模式?**
轻量模式完全不缓冲请求体和响应体,避免了高并发下的内存问题:
- **不缓冲请求体**:不提取 `messages``question``system` 等需要解析请求体的字段
- **不缓冲流式响应体**:不提取 `answer``reasoning``tool_calls` 等需要缓冲完整响应的字段
- **只统计 token**:从响应的 usage 字段提取 token 信息,不需要缓冲完整响应
**内存对比:**
| 场景 | 完整模式内存占用 | 轻量模式内存占用 |
|------|------------------|------------------|
| 10KB 请求 + 5KB 响应 | ~15KB | ~0KB (不缓冲) |
| 1MB 请求 (长对话) + 500KB 响应 | ~1.5MB | ~0KB (不缓冲) |
| 高并发 1000 QPS | 可能 1GB+ | 极低 |
**注意**:轻量模式下 `model` 字段可能无法从请求体提取(会尝试从 Gemini 风格路径提取),`chat_round` 字段为 0。
#### 完整模式
通过 `use_default_attributes: true` 可以一键启用完整的流式观测能力:
```yaml
use_default_attributes: true
```
此配置会自动记录以下字段:
此配置会自动记录以下字段**但会缓冲完整的请求体和流式响应体**
| 字段 | 说明 |
|------|------|
| `messages` | 完整对话历史 |
| `question` | 最后一条用户消息 |
| `answer` | AI 回答(自动拼接流式 chunk |
| `reasoning` | 推理过程(自动拼接流式 chunk |
| `tool_calls` | 工具调用(自动按 index 组装) |
| `reasoning_tokens` | 推理 token 数 |
| `cached_tokens` | 缓存命中 token 数 |
| `input_token_details` | 输入 token 详情 |
| `output_token_details` | 输 token 详情 |
| 字段 | 说明 | 内存影响 |
|------|------|----------|
| `messages` | 完整对话历史 | ⚠️ 可能很大 |
| `question` | 最后一条用户消息 | 需要缓冲请求体 |
| `system` | 系统提示词 | 需要缓冲请求体 |
| `answer` | AI 回答(自动拼接流式 chunk | ⚠️ 需要缓冲响应体 |
| `reasoning` | 推理过程(自动拼接流式 chunk | ⚠️ 需要缓冲响应体 |
| `tool_calls` | 工具调用(自动按 index 组装) | 需要缓冲响应体 |
| `reasoning_tokens` | 推理 token 数 | 无 |
| `cached_tokens` | 缓存命中 token 数 | 无 |
| `input_token_details` | 输 token 详情 | 无 |
| `output_token_details` | 输出 token 详情 | 无 |
**注意**:完整模式适用于调试、审计等需要完整对话记录的场景,但在高并发生产环境可能消耗大量内存。
### 流式日志示例

View File

@@ -140,6 +140,7 @@ const (
)
// getDefaultAttributes returns the default attributes configuration for empty config
// This includes all attributes but may consume significant memory for large conversations
func getDefaultAttributes() []Attribute {
return []Attribute{
// Extract complete conversation history from request body
@@ -193,6 +194,31 @@ func getDefaultAttributes() []Attribute {
}
}
// getDefaultResponseAttributes returns a lightweight default attributes configuration
// that only includes response-time token statistics, avoiding any request body buffering
func getDefaultResponseAttributes() []Attribute {
return []Attribute{
// Token statistics (auto-extracted from response) - no body buffering needed
{
Key: BuiltinReasoningTokens,
ApplyToLog: true,
},
{
Key: BuiltinCachedTokens,
ApplyToLog: true,
},
// Detailed token information
{
Key: BuiltinInputTokenDetails,
ApplyToLog: true,
},
{
Key: BuiltinOutputTokenDetails,
ApplyToLog: true,
},
}
}
// Default session ID headers in priority order
var defaultSessionHeaders = []string{
"x-openclaw-session-key",
@@ -327,6 +353,8 @@ type AIStatisticsConfig struct {
attributes []Attribute
// If there exist attributes extracted from streaming body, chunks should be buffered
shouldBufferStreamingBody bool
// If there exist attributes extracted from request body, request body should be buffered
shouldBufferRequestBody bool
// If disableOpenaiUsage is true, model/input_token/output_token logs will be skipped
disableOpenaiUsage bool
valueLengthLimit int
@@ -421,6 +449,8 @@ func isContentTypeEnabled(contentType string, enabledContentTypes []string) bool
func parseConfig(configJson gjson.Result, config *AIStatisticsConfig) error {
// Check if use_default_attributes is enabled
useDefaultAttributes := configJson.Get("use_default_attributes").Bool()
// Check if use_default_response_attributes is enabled (lightweight mode)
useDefaultResponseAttributes := configJson.Get("use_default_response_attributes").Bool()
// Parse tracing span attributes setting.
attributeConfigs := configJson.Get("attributes").Array()
@@ -440,26 +470,13 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig) error {
config.valueLengthLimit = 10485760 // 10MB
}
log.Infof("Using default attributes configuration")
// Check if any default attribute needs streaming body buffering
for _, attribute := range config.attributes {
if attribute.ValueSource == ResponseStreamingBody {
config.shouldBufferStreamingBody = true
break
}
// For built-in attributes without explicit ValueSource, check default sources
if attribute.ValueSource == "" && isBuiltinAttribute(attribute.Key) {
defaultSources := getBuiltinAttributeDefaultSources(attribute.Key)
for _, src := range defaultSources {
if src == ResponseStreamingBody {
config.shouldBufferStreamingBody = true
break
}
}
if config.shouldBufferStreamingBody {
break
}
}
} else if useDefaultResponseAttributes {
config.attributes = getDefaultResponseAttributes()
// Use a reasonable default for lightweight mode
if !configJson.Get("value_length_limit").Exists() {
config.valueLengthLimit = 4000
}
log.Infof("Using default response attributes configuration (lightweight mode)")
} else {
config.attributes = make([]Attribute, len(attributeConfigs))
for i, attributeConfig := range attributeConfigs {
@@ -469,15 +486,38 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig) error {
log.Errorf("parse config failed, %v", err)
return err
}
if attribute.ValueSource == ResponseStreamingBody {
config.shouldBufferStreamingBody = true
}
if attribute.Rule != "" && attribute.Rule != RuleFirst && attribute.Rule != RuleReplace && attribute.Rule != RuleAppend {
return errors.New("value of rule must be one of [nil, first, replace, append]")
}
config.attributes[i] = attribute
}
}
// Check if any attribute needs request body or streaming body buffering
for _, attribute := range config.attributes {
// Check for request body buffering
if attribute.ValueSource == RequestBody {
config.shouldBufferRequestBody = true
}
// Check for streaming body buffering (explicitly configured)
if attribute.ValueSource == ResponseStreamingBody {
config.shouldBufferStreamingBody = true
}
// For built-in attributes without explicit ValueSource, check default sources
if attribute.ValueSource == "" && isBuiltinAttribute(attribute.Key) {
defaultSources := getBuiltinAttributeDefaultSources(attribute.Key)
for _, src := range defaultSources {
if src == RequestBody {
config.shouldBufferRequestBody = true
}
// Only answer/reasoning/tool_calls need actual body buffering
// Token-related attributes are extracted from context, not from body
if src == ResponseStreamingBody && needsBodyBuffering(attribute.Key) {
config.shouldBufferStreamingBody = true
}
}
}
}
// Metric settings
config.counterMetrics = make(map[string]proxywasm.MetricCounter)
@@ -488,8 +528,8 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig) error {
pathSuffixes := configJson.Get("enable_path_suffixes").Array()
config.enablePathSuffixes = make([]string, 0, len(pathSuffixes))
// If use_default_attributes is enabled and enable_path_suffixes is not configured, use default path suffixes
if useDefaultAttributes && !configJson.Get("enable_path_suffixes").Exists() {
// If use_default_attributes or use_default_response_attributes is enabled and enable_path_suffixes is not configured, use default path suffixes
if (useDefaultAttributes || useDefaultResponseAttributes) && !configJson.Get("enable_path_suffixes").Exists() {
config.enablePathSuffixes = []string{"/completions", "/messages"}
log.Infof("Using default path suffixes: /completions, /messages")
} else {
@@ -557,7 +597,10 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig) ty
ctx.SetContext(ConsumerKey, consumer)
}
ctx.SetRequestBodyBufferLimit(defaultMaxBodyBytes)
// Only buffer request body if there are attributes to extract from it
if config.shouldBufferRequestBody {
ctx.SetRequestBodyBufferLimit(defaultMaxBodyBytes)
}
// Extract session ID from headers
sessionId := extractSessionId(config.sessionIdHeader)
@@ -581,13 +624,21 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body
return types.ActionContinue
}
// Set user defined log & span attributes.
setAttributeBySource(ctx, config, RequestBody, body)
// Set span attributes for ARMS.
// Only process request body if we need to extract attributes from it
if config.shouldBufferRequestBody && len(body) > 0 {
// Set user defined log & span attributes.
setAttributeBySource(ctx, config, RequestBody, body)
}
// Extract model from request body if available, otherwise try path
requestModel := "UNKNOWN"
if model := gjson.GetBytes(body, "model"); model.Exists() {
requestModel = model.String()
} else {
if len(body) > 0 {
if model := gjson.GetBytes(body, "model"); model.Exists() {
requestModel = model.String()
}
}
// If model not found in body, try to extract from path (Gemini style)
if requestModel == "UNKNOWN" {
requestPath := ctx.GetStringContext(RequestPath, "")
if strings.Contains(requestPath, "generateContent") || strings.Contains(requestPath, "streamGenerateContent") { // Google Gemini GenerateContent
reg := regexp.MustCompile(`^.*/(?P<api_version>[^/]+)/models/(?P<model>[^:]+):\w+Content$`)
@@ -599,21 +650,23 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body
}
ctx.SetContext(tokenusage.CtxKeyRequestModel, requestModel)
setSpanAttribute(ArmsRequestModel, requestModel)
// Set the number of conversation rounds
// Set the number of conversation rounds (only if body is available)
userPromptCount := 0
if messages := gjson.GetBytes(body, "messages"); messages.Exists() && messages.IsArray() {
// OpenAI and Claude/Anthropic format - both use "messages" array with "role" field
for _, msg := range messages.Array() {
if msg.Get("role").String() == "user" {
userPromptCount += 1
if len(body) > 0 {
if messages := gjson.GetBytes(body, "messages"); messages.Exists() && messages.IsArray() {
// OpenAI and Claude/Anthropic format - both use "messages" array with "role" field
for _, msg := range messages.Array() {
if msg.Get("role").String() == "user" {
userPromptCount += 1
}
}
}
} else if contents := gjson.GetBytes(body, "contents"); contents.Exists() && contents.IsArray() {
// Google Gemini GenerateContent
for _, content := range contents.Array() {
if !content.Get("role").Exists() || content.Get("role").String() == "user" {
userPromptCount += 1
} else if contents := gjson.GetBytes(body, "contents"); contents.Exists() && contents.IsArray() {
// Google Gemini GenerateContent
for _, content := range contents.Array() {
if !content.Get("role").Exists() || content.Get("role").String() == "user" {
userPromptCount += 1
}
}
}
}
@@ -710,14 +763,14 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat
responseEndTime := time.Now().UnixMilli()
ctx.SetUserAttribute(LLMServiceDuration, responseEndTime-requestStartTime)
// Set user defined log & span attributes.
// Set user defined log & span attributes from streaming body.
// Always call setAttributeBySource even if shouldBufferStreamingBody is false,
// because token-related attributes are extracted from context (not buffered body).
var streamingBodyBuffer []byte
if config.shouldBufferStreamingBody {
streamingBodyBuffer, ok := ctx.GetContext(CtxStreamingBodyBuffer).([]byte)
if !ok {
return data
}
setAttributeBySource(ctx, config, ResponseStreamingBody, streamingBodyBuffer)
streamingBodyBuffer, _ = ctx.GetContext(CtxStreamingBodyBuffer).([]byte)
}
setAttributeBySource(ctx, config, ResponseStreamingBody, streamingBodyBuffer)
// Write log
debugLogAiLog(ctx)
@@ -884,8 +937,17 @@ func isBuiltinAttribute(key string) bool {
key == BuiltinInputTokenDetails || key == BuiltinOutputTokenDetails
}
// needsBodyBuffering checks if a built-in attribute needs body buffering
// Token-related attributes are extracted from context (set by tokenusage.GetTokenUsage),
// so they don't require buffering the response body.
func needsBodyBuffering(key string) bool {
return key == BuiltinAnswerKey || key == BuiltinToolCallsKey || key == BuiltinReasoningKey
}
// getBuiltinAttributeDefaultSources returns the default value_source(s) for a built-in attribute
// Returns nil if the key is not a built-in attribute
// Note: Token-related attributes are extracted from context (set by tokenusage.GetTokenUsage),
// so they don't require body buffering even though they're processed during response phase.
func getBuiltinAttributeDefaultSources(key string) []string {
switch key {
case BuiltinQuestionKey, BuiltinSystemKey:
@@ -893,7 +955,9 @@ func getBuiltinAttributeDefaultSources(key string) []string {
case BuiltinAnswerKey, BuiltinToolCallsKey, BuiltinReasoningKey:
return []string{ResponseStreamingBody, ResponseBody}
case BuiltinReasoningTokens, BuiltinCachedTokens, BuiltinInputTokenDetails, BuiltinOutputTokenDetails:
// Token details are only available after response is received
// Token details are extracted from context (set by tokenusage.GetTokenUsage),
// not from body parsing. We use ResponseStreamingBody/ResponseBody to indicate
// they should be processed during response phase, but they don't require body buffering.
return []string{ResponseStreamingBody, ResponseBody}
default:
return nil