From 76ada0b8443d4b043e78d01d740900cfe0b41ab7 Mon Sep 17 00:00:00 2001 From: rinfx Date: Wed, 25 Jun 2025 09:28:14 +0800 Subject: [PATCH] add trace_span_key & as_seperate_log_field configuration for ai-statistics (#2488) --- .../extensions/ai-statistics/README.md | 37 ++++++++++++++----- .../extensions/ai-statistics/README_EN.md | 33 +++++++++++++---- .../wasm-go/extensions/ai-statistics/main.go | 32 +++++++++++----- plugins/wasm-go/pkg/wrapper/plugin_wrapper.go | 4 +- plugins/wasm-go/pkg/wrapper/utils.go | 4 +- 5 files changed, 78 insertions(+), 32 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index 294a51469..2b85251ac 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -29,22 +29,24 @@ Attribute 配置说明: | 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | |----------------|-------|-----|-----|------------------------| -| `key` | string | 必填 | - | attrribute 名称 | -| `value_source` | string | 必填 | - | attrribute 取值来源,可选值为 `fixed_value`, `request_header`, `request_body`, `response_header`, `response_body`, `response_streaming_body` | -| `value` | string | 必填 | - | attrribute 取值 key value/path | -| `default_value` | string | 非必填 | - | attrribute 默认值 | -| `rule` | string | 非必填 | - | 从流式响应中提取 attrribute 的规则,可选值为 `first`, `replace`, `append`| +| `key` | string | 必填 | - | attribute 名称 | +| `value_source` | string | 必填 | - | attribute 取值来源,可选值为 `fixed_value`, `request_header`, `request_body`, `response_header`, `response_body`, `response_streaming_body` | +| `value` | string | 必填 | - | attribute 取值 key value/path | +| `default_value` | string | 非必填 | - | attribute 默认值 | +| `rule` | string | 非必填 | - | 从流式响应中提取 attribute 的规则,可选值为 `first`, `replace`, `append`| | `apply_to_log` | bool | 非必填 | false | 是否将提取的信息记录在日志中 | | `apply_to_span` | bool | 非必填 | false | 是否将提取的信息记录在链路追踪span中 | +| `trace_span_key` | string | 非必填 | - | 链路追踪attribute key,默认会使用`key`的设置 | +| `as_separate_log_field` | bool | 非必填 | false | 记录日志时是否作为单独的字段,日志字段名使用`key`的设置 | `value_source` 的各种取值含义如下: - `fixed_value`:固定值 -- `request_header` : attrribute 值通过 http 请求头获取,value 配置为 header key -- `request_body` :attrribute 值通过请求 body 获取,value 配置格式为 gjson 的 jsonpath -- `response_header` :attrribute 值通过 http 响应头获取,value 配置为header key -- `response_body` :attrribute 值通过响应 body 获取,value 配置格式为 gjson 的 jsonpath -- `response_streaming_body` :attrribute 值通过流式响应 body 获取,value 配置格式为 gjson 的 jsonpath +- `request_header` : attribute 值通过 http 请求头获取,value 配置为 header key +- `request_body` :attribute 值通过请求 body 获取,value 配置格式为 gjson 的 jsonpath +- `response_header` :attribute 值通过 http 响应头获取,value 配置为header key +- `response_body` :attribute 值通过响应 body 获取,value 配置格式为 gjson 的 jsonpath +- `response_streaming_body` :attribute 值通过流式响应 body 获取,value 配置格式为 gjson 的 jsonpath 当 `value_source` 为 `response_streaming_body` 时,应当配置 `rule`,用于指定如何从流式body中获取指定值,取值含义如下: @@ -60,6 +62,21 @@ Attribute 配置说明: '{"ai_log":"%FILTER_STATE(wasm.ai_log:PLAIN)%"}' ``` +如果字段设置了 `as_separate_log_field`,例如: +```yaml +attributes: + - key: consumer + value_source: request_header + value: x-mse-consumer + apply_to_log: true + as_separate_log_field: true +``` + +那么要在日志中打印,需要额外设置log_format: +``` +'{"consumer":"%FILTER_STATE(wasm.consumer:PLAIN)%"}' +``` + ### 空配置 #### 监控 diff --git a/plugins/wasm-go/extensions/ai-statistics/README_EN.md b/plugins/wasm-go/extensions/ai-statistics/README_EN.md index 86a76c2cf..668159c82 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README_EN.md +++ b/plugins/wasm-go/extensions/ai-statistics/README_EN.md @@ -29,22 +29,24 @@ Attribute Configuration instructions: | Name | Type | Required | Default | Description | |----------------|-------|-----|-----|------------------------| -| `key` | string | required | - | attrribute key | -| `value_source` | string | required | - | attrribute value source, optional values ​​are `fixed_value`, `request_header`, `request_body`, `response_header`, `response_body`, `response_streaming_body` | -| `value` | string | required | - | how to get attrribute value | +| `key` | string | required | - | attribute key | +| `value_source` | string | required | - | attribute value source, optional values ​​are `fixed_value`, `request_header`, `request_body`, `response_header`, `response_body`, `response_streaming_body` | +| `value` | string | required | - | how to get attribute value | | `default_value` | string | optional | - | default value for attribute | | `rule` | string | optional | - | Rule to extract attribute from streaming response, optional values ​​are `first`, `replace`, `append`| | `apply_to_log` | bool | optional | false | Whether to record the extracted information in the log | | `apply_to_span` | bool | optional | false | Whether to record the extracted information in the link tracking span | +| `trace_span_key` | string | optional | - | span attribute key, default is the value of `key` | +| `as_separate_log_field` | bool | optional | false | Whether to use a separate log field, the field name is equal to the value of `key` | The meanings of various values for `value_source` ​​are as follows: - `fixed_value`: fixed value -- `request_header`: The attrribute is obtained through the http request header -- `request_body`: The attrribute is obtained through the http request body -- `response_header`: The attrribute is obtained through the http response header -- `response_body`: The attrribute is obtained through the http response body -- `response_streaming_body`: The attrribute is obtained through the http streaming response body +- `request_header`: The attribute is obtained through the http request header +- `request_body`: The attribute is obtained through the http request body +- `response_header`: The attribute is obtained through the http response header +- `response_body`: The attribute is obtained through the http response body +- `response_streaming_body`: The attribute is obtained through the http streaming response body When `value_source` is `response_streaming_body`, `rule` should be configured to specify how to obtain the specified value from the streaming body. The meaning of the value is as follows: @@ -60,6 +62,21 @@ If you want to record ai-statistic related statistical values in the gateway acc '{"ai_log":"%FILTER_STATE(wasm.ai_log:PLAIN)%"}' ``` +If the field is set with `as_separate_log_field`, for example: +```yaml +attributes: + - key: consumer + value_source: request_header + value: x-mse-consumer + apply_to_log: true + as_separate_log_field: true +``` + +Then to print in the log, you need to set log_format additionally: +``` +'{"consumer":"%FILTER_STATE(wasm.consumer:PLAIN)%"}' +``` + ### Empty #### Metric diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index d9d10c4fe..ca930cf19 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -75,13 +75,15 @@ const ( // TracingSpan is the tracing span configuration. type Attribute struct { - Key string `json:"key"` - ValueSource string `json:"value_source"` - Value string `json:"value"` - DefaultValue string `json:"default_value,omitempty"` - Rule string `json:"rule,omitempty"` - ApplyToLog bool `json:"apply_to_log,omitempty"` - ApplyToSpan bool `json:"apply_to_span,omitempty"` + Key string `json:"key"` + ValueSource string `json:"value_source"` + Value string `json:"value"` + TraceSpanKey string `json:"trace_span_key,omitempty"` + DefaultValue string `json:"default_value,omitempty"` + Rule string `json:"rule,omitempty"` + ApplyToLog bool `json:"apply_to_log,omitempty"` + ApplyToSpan bool `json:"apply_to_span,omitempty"` + AsSeparateLogField bool `json:"as_separate_log_field,omitempty"` } type AIStatisticsConfig struct { @@ -406,13 +408,23 @@ func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, so } log.Debugf("[attribute] source type: %s, key: %s, value: %+v", source, key, value) if attribute.ApplyToLog { - ctx.SetUserAttribute(key, value) + if attribute.AsSeparateLogField { + marshalledJsonStr := wrapper.MarshalStr(fmt.Sprint(value)) + if err := proxywasm.SetProperty([]string{key}, []byte(marshalledJsonStr)); err != nil { + log.Warnf("failed to set %s in filter state, raw is %s, err is %v", key, marshalledJsonStr, err) + } + } else { + ctx.SetUserAttribute(key, value) + } } // for metrics if key == Model || key == InputToken || key == OutputToken { ctx.SetContext(key, value) } if attribute.ApplyToSpan { + if attribute.TraceSpanKey != "" { + key = attribute.TraceSpanKey + } setSpanAttribute(key, value, log) } } @@ -481,10 +493,10 @@ func writeMetric(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper log.Warnf("ClusterName typd assert failed, skip metric record") return } - + if config.disableOpenaiUsage { return - } + } if ctx.GetUserAttribute(Model) == nil || ctx.GetUserAttribute(InputToken) == nil || ctx.GetUserAttribute(OutputToken) == nil { log.Warnf("get usage information failed, skip metric record") diff --git a/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go b/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go index 2b4847b11..8499dc1ad 100644 --- a/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go +++ b/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go @@ -566,7 +566,7 @@ func (ctx *CommonHttpCtx[PluginConfig]) WriteUserAttributeToLogWithKey(key strin newAttributeMap := map[string]interface{}{} if string(preMarshalledJsonLogStr) != "" { // e.g. {"field1":"value1","field2":"value2"} - preJsonLogStr := unmarshalStr(fmt.Sprintf(`"%s"`, string(preMarshalledJsonLogStr))) + preJsonLogStr := UnmarshalStr(fmt.Sprintf(`"%s"`, string(preMarshalledJsonLogStr))) err := json.Unmarshal([]byte(preJsonLogStr), &newAttributeMap) if err != nil { ctx.plugin.vm.log.Warnf("Unmarshal failed, will overwrite %s, pre value is: %s", key, string(preMarshalledJsonLogStr)) @@ -580,7 +580,7 @@ func (ctx *CommonHttpCtx[PluginConfig]) WriteUserAttributeToLogWithKey(key strin // e.g. {"field1":"value1","field2":2,"field3":"value3"} jsonStr, _ := json.Marshal(newAttributeMap) // e.g. {\"field1\":\"value1\",\"field2\":2,\"field3\":\"value3\"} - marshalledJsonStr := marshalStr(string(jsonStr)) + marshalledJsonStr := MarshalStr(string(jsonStr)) if err := proxywasm.SetProperty([]string{key}, []byte(marshalledJsonStr)); err != nil { ctx.plugin.vm.log.Warnf("failed to set %s in filter state, raw is %s, err is %v", key, marshalledJsonStr, err) return err diff --git a/plugins/wasm-go/pkg/wrapper/utils.go b/plugins/wasm-go/pkg/wrapper/utils.go index b54157858..a34d1d619 100644 --- a/plugins/wasm-go/pkg/wrapper/utils.go +++ b/plugins/wasm-go/pkg/wrapper/utils.go @@ -7,7 +7,7 @@ import ( "github.com/tidwall/gjson" ) -func unmarshalStr(marshalledJsonStr string) string { +func UnmarshalStr(marshalledJsonStr string) string { // e.g. "{\"field1\":\"value1\",\"field2\":\"value2\"}" var jsonStr string err := json.Unmarshal([]byte(marshalledJsonStr), &jsonStr) @@ -19,7 +19,7 @@ func unmarshalStr(marshalledJsonStr string) string { return jsonStr } -func marshalStr(raw string) string { +func MarshalStr(raw string) string { // e.g. {"field1":"value1","field2":"value2"} helper := map[string]string{ "placeholder": raw,