fix(ai-statistics): skip empty streaming model values (#3851)

Signed-off-by: jianwei.wjw <jianwei.wjw@alibaba-inc.com>
This commit is contained in:
JianweiWang
2026-06-02 12:10:46 +08:00
committed by GitHub
parent 52c99eb27d
commit a45b1dde6c
4 changed files with 180 additions and 2 deletions

View File

@@ -62,6 +62,8 @@ Attribute 配置说明:
- `replace`:多个 chunk 中取最后一个有效 chunk 的值
- `append`:拼接多个有效 chunk 中的值,可用于获取回答内容
注意:对于 `first``replace`,有效 chunk 不包含路径不存在、JSON `null` 或空字符串 `""` 的结果。如果所有 chunk 都没有有效值,最终提取结果为 `nil`;依赖空字符串作为“上游显式清空”信号的配置需要注意此行为差异。`append` 规则保持原有拼接行为。
### 内置属性 (Built-in Attributes)
插件提供了一些内置属性键key可以直接使用而无需配置 `value_source``value`。这些内置属性会自动从请求/响应中提取相应的值:

View File

@@ -60,6 +60,8 @@ When `value_source` is `response_streaming_body`, `rule` should be configured to
- `replace`: extract value from the last valid chunk
- `append`: join value pieces from all valid chunks
Note: for `first` and `replace`, a valid chunk excludes missing paths, JSON `null`, and empty string `""` results. If no chunk contains a valid value, the extracted result is `nil`; configurations that depend on an empty string as an explicit upstream clear signal should account for this behavior difference. The `append` rule keeps its existing concatenation behavior.
### Built-in Attributes
The plugin provides several built-in attribute keys that can be used directly without configuring `value_source` and `value`. These built-in attributes automatically extract corresponding values from requests/responses:

View File

@@ -1185,13 +1185,18 @@ func getBuiltinAttributeFallback(ctx wrapper.HttpContext, config AIStatisticsCon
return nil
}
// extractStreamingBodyByJsonPath 从 SSE 流式响应中按 jsonPath 提取属性值。
// 入参 data 允许包含一个或多个已经拼接的 SSE chunkjsonPath 使用 gjson 语法rule 决定多 chunk 场景下取首个、覆盖或拼接。
// 返回值为提取到的业务值;当规则为 first/replace 时,仅把路径存在且非空的 chunk 视为有效 chunk避免首个空字符串覆盖后续真实值。
// 边界情况append 会保留历史行为继续拼接字符串,空 chunk 不产生额外内容;不支持的 rule 返回 nil 并记录错误日志。
func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string) interface{} {
chunks := bytes.Split(bytes.TrimSpace(wrapper.UnifySSEChunk(data)), []byte("\n\n"))
var value interface{}
if rule == RuleFirst {
for _, chunk := range chunks {
jsonObj := gjson.GetBytes(chunk, jsonPath)
if jsonObj.Exists() {
// 流式响应中首个 chunk 可能携带空 model/payloadfirst 语义应取首个非空有效值。
if isNonEmptyJSONValue(jsonObj) {
value = jsonObj.Value()
break
}
@@ -1199,7 +1204,8 @@ func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string) i
} else if rule == RuleReplace {
for _, chunk := range chunks {
jsonObj := gjson.GetBytes(chunk, jsonPath)
if jsonObj.Exists() {
// replace 语义取最后一个非空有效值,防止后续空值把已提取的业务值清空。
if isNonEmptyJSONValue(jsonObj) {
value = jsonObj.Value()
}
}
@@ -1219,6 +1225,21 @@ func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string) i
return value
}
// isNonEmptyJSONValue 判断 gjson 结果是否可以作为 first/replace 的有效流式提取值。
// 输入必须是已经按 jsonPath 查询出的结果路径不存在、JSON null 或空字符串都视为无效。
// 返回 true 表示该值可以写入日志、指标或 span数字 0、布尔 false、空对象/数组仍保留为有效值,避免破坏非字符串字段的历史兼容性。
// 边界情况:只跳过明确的空字符串,不裁剪空白字符串,避免改变调用方对原始文本值的处理。
func isNonEmptyJSONValue(result gjson.Result) bool {
if !result.Exists() {
return false
}
value := result.Value()
if value == nil || value == "" {
return false
}
return true
}
// shouldLogDebug returns true if the log level is debug or trace
func shouldLogDebug() bool {
value, err := proxywasm.CallForeignFunction("get_log_level", nil)

View File

@@ -21,6 +21,7 @@ import (
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
"github.com/higress-group/wasm-go/pkg/test"
"github.com/higress-group/wasm-go/pkg/wrapper"
"github.com/stretchr/testify/require"
)
@@ -110,6 +111,33 @@ var streamingBodyConfig = func() json.RawMessage {
return data
}()
var streamingModelExtractionConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"attributes": []map[string]interface{}{
{
"key": "first_model",
"value_source": "response_streaming_body",
"value": "model",
"rule": "first",
"apply_to_log": true,
"apply_to_span": false,
"as_separate_log_field": false,
},
{
"key": "replace_model",
"value_source": "response_streaming_body",
"value": "model",
"rule": "replace",
"apply_to_log": true,
"apply_to_span": false,
"as_separate_log_field": false,
},
},
"disable_openai_usage": true,
})
return data
}()
// 测试配置:请求体属性配置
var requestBodyConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
@@ -467,6 +495,17 @@ func TestOnHttpResponseHeaders(t *testing.T) {
})
}
func getAILogAttributes(t *testing.T, host test.TestHost) map[string]interface{} {
t.Helper()
raw, err := host.GetProperty([]string{wrapper.AILogKey})
require.NoError(t, err)
var attrs map[string]interface{}
require.NoError(t, json.Unmarshal([]byte(wrapper.UnmarshalStr(`"`+string(raw)+`"`)), &attrs))
return attrs
}
func TestOnHttpStreamingBody(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试流式响应体处理
@@ -511,6 +550,68 @@ func TestOnHttpStreamingBody(t *testing.T) {
host.CompleteHttp()
})
t.Run("streaming first and replace skip empty model chunks", func(t *testing.T) {
host, status := test.NewTestHost(streamingModelExtractionConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
})
require.Equal(t, types.ActionContinue, action)
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
require.Equal(t, types.ActionContinue, action)
action = host.CallOnHttpStreamingResponseBody([]byte("data: {\"model\":\"\"}\n\n"), false)
require.Equal(t, types.ActionContinue, action)
action = host.CallOnHttpStreamingResponseBody([]byte("data: {\"model\":null}\n\n"), false)
require.Equal(t, types.ActionContinue, action)
action = host.CallOnHttpStreamingResponseBody([]byte("data: {\"model\":\"gpt-4o\"}\n\n"), true)
require.Equal(t, types.ActionContinue, action)
attrs := getAILogAttributes(t, host)
require.Equal(t, "gpt-4o", attrs["first_model"])
require.Equal(t, "gpt-4o", attrs["replace_model"])
host.CompleteHttp()
})
t.Run("streaming first and replace return nil when model path is missing", func(t *testing.T) {
host, status := test.NewTestHost(streamingModelExtractionConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
})
require.Equal(t, types.ActionContinue, action)
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
require.Equal(t, types.ActionContinue, action)
action = host.CallOnHttpStreamingResponseBody([]byte("data: {\"choices\":[]}\n\n"), false)
require.Equal(t, types.ActionContinue, action)
action = host.CallOnHttpStreamingResponseBody([]byte("data: {\"choices\":[{\"delta\":{\"content\":\"hello\"}}]}\n\n"), true)
require.Equal(t, types.ActionContinue, action)
attrs := getAILogAttributes(t, host)
require.Nil(t, attrs["first_model"])
require.Nil(t, attrs["replace_model"])
host.CompleteHttp()
})
// 测试不包含 token 统计的流式响应体处理
t.Run("streaming body without token usage", func(t *testing.T) {
host, status := test.NewTestHost(streamingBodyConfig)
@@ -1452,6 +1553,58 @@ func TestSessionIdExtraction(t *testing.T) {
})
}
// TestExtractStreamingBodyByJsonPath 单独测试流式响应 body 的 JSONPath 提取规则
func TestExtractStreamingBodyByJsonPath(t *testing.T) {
t.Run("first skips empty string chunk", func(t *testing.T) {
// Azure/OpenAI 兼容流可能先返回带空 model 的过滤结果 chunk后续 chunk 才有真实模型名。
chunks := []byte(`data: {"choices":[],"created":0,"id":"","model":"","object":""}
data: {"choices":[{"delta":{"content":""}}],"created":1777444731,"id":"chatcmpl-1","model":"gpt-5.4-2026-03-05","object":"chat.completion.chunk"}`)
value := extractStreamingBodyByJsonPath(chunks, "model", RuleFirst)
require.Equal(t, "gpt-5.4-2026-03-05", value)
})
t.Run("replace skips trailing empty string chunk", func(t *testing.T) {
chunks := []byte(`data: {"model":"gpt-4o"}
data: {"model":""}`)
value := extractStreamingBodyByJsonPath(chunks, "model", RuleReplace)
require.Equal(t, "gpt-4o", value)
})
t.Run("first returns nil when path is missing in all chunks", func(t *testing.T) {
chunks := []byte(`data: {"choices":[]}
data: {"choices":[{"delta":{"content":"hello"}}]}`)
value := extractStreamingBodyByJsonPath(chunks, "model", RuleFirst)
require.Nil(t, value)
})
t.Run("first skips explicit null chunk", func(t *testing.T) {
chunks := []byte(`data: {"model":null}
data: {"model":"gpt-4o"}`)
value := extractStreamingBodyByJsonPath(chunks, "model", RuleFirst)
require.Equal(t, "gpt-4o", value)
})
t.Run("zero and false remain valid values", func(t *testing.T) {
numberValue := extractStreamingBodyByJsonPath([]byte(`data: {"usage":{"total_tokens":0}}`), "usage.total_tokens", RuleFirst)
boolValue := extractStreamingBodyByJsonPath([]byte(`data: {"filtered":false}`), "filtered", RuleFirst)
require.Equal(t, float64(0), numberValue)
require.Equal(t, false, boolValue)
})
}
// TestExtractStreamingToolCalls 单独测试 extractStreamingToolCalls 函数
func TestExtractStreamingToolCalls(t *testing.T) {
t.Run("single tool call assembly", func(t *testing.T) {