From a45b1dde6c85a238787d8ed55df44aecd45dae05 Mon Sep 17 00:00:00 2001 From: JianweiWang Date: Tue, 2 Jun 2026 12:10:46 +0800 Subject: [PATCH] fix(ai-statistics): skip empty streaming model values (#3851) Signed-off-by: jianwei.wjw --- .../extensions/ai-statistics/README.md | 2 + .../extensions/ai-statistics/README_EN.md | 2 + .../wasm-go/extensions/ai-statistics/main.go | 25 ++- .../extensions/ai-statistics/main_test.go | 153 ++++++++++++++++++ 4 files changed, 180 insertions(+), 2 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md index fe3d1c483..70f86e269 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README.md +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -62,6 +62,8 @@ Attribute 配置说明: - `replace`:多个 chunk 中取最后一个有效 chunk 的值 - `append`:拼接多个有效 chunk 中的值,可用于获取回答内容 +注意:对于 `first` 和 `replace`,有效 chunk 不包含路径不存在、JSON `null` 或空字符串 `""` 的结果。如果所有 chunk 都没有有效值,最终提取结果为 `nil`;依赖空字符串作为“上游显式清空”信号的配置需要注意此行为差异。`append` 规则保持原有拼接行为。 + ### 内置属性 (Built-in Attributes) 插件提供了一些内置属性键(key),可以直接使用而无需配置 `value_source` 和 `value`。这些内置属性会自动从请求/响应中提取相应的值: diff --git a/plugins/wasm-go/extensions/ai-statistics/README_EN.md b/plugins/wasm-go/extensions/ai-statistics/README_EN.md index bff24060f..9972cf9e4 100644 --- a/plugins/wasm-go/extensions/ai-statistics/README_EN.md +++ b/plugins/wasm-go/extensions/ai-statistics/README_EN.md @@ -60,6 +60,8 @@ When `value_source` is `response_streaming_body`, `rule` should be configured to - `replace`: extract value from the last valid chunk - `append`: join value pieces from all valid chunks +Note: for `first` and `replace`, a valid chunk excludes missing paths, JSON `null`, and empty string `""` results. If no chunk contains a valid value, the extracted result is `nil`; configurations that depend on an empty string as an explicit upstream clear signal should account for this behavior difference. The `append` rule keeps its existing concatenation behavior. + ### Built-in Attributes The plugin provides several built-in attribute keys that can be used directly without configuring `value_source` and `value`. These built-in attributes automatically extract corresponding values from requests/responses: diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go index 121bad10e..4c7b70d6a 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main.go +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -1185,13 +1185,18 @@ func getBuiltinAttributeFallback(ctx wrapper.HttpContext, config AIStatisticsCon return nil } +// extractStreamingBodyByJsonPath 从 SSE 流式响应中按 jsonPath 提取属性值。 +// 入参 data 允许包含一个或多个已经拼接的 SSE chunk,jsonPath 使用 gjson 语法,rule 决定多 chunk 场景下取首个、覆盖或拼接。 +// 返回值为提取到的业务值;当规则为 first/replace 时,仅把路径存在且非空的 chunk 视为有效 chunk,避免首个空字符串覆盖后续真实值。 +// 边界情况:append 会保留历史行为继续拼接字符串,空 chunk 不产生额外内容;不支持的 rule 返回 nil 并记录错误日志。 func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string) interface{} { chunks := bytes.Split(bytes.TrimSpace(wrapper.UnifySSEChunk(data)), []byte("\n\n")) var value interface{} if rule == RuleFirst { for _, chunk := range chunks { jsonObj := gjson.GetBytes(chunk, jsonPath) - if jsonObj.Exists() { + // 流式响应中首个 chunk 可能携带空 model/payload,first 语义应取首个非空有效值。 + if isNonEmptyJSONValue(jsonObj) { value = jsonObj.Value() break } @@ -1199,7 +1204,8 @@ func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string) i } else if rule == RuleReplace { for _, chunk := range chunks { jsonObj := gjson.GetBytes(chunk, jsonPath) - if jsonObj.Exists() { + // replace 语义取最后一个非空有效值,防止后续空值把已提取的业务值清空。 + if isNonEmptyJSONValue(jsonObj) { value = jsonObj.Value() } } @@ -1219,6 +1225,21 @@ func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string) i return value } +// isNonEmptyJSONValue 判断 gjson 结果是否可以作为 first/replace 的有效流式提取值。 +// 输入必须是已经按 jsonPath 查询出的结果;路径不存在、JSON null 或空字符串都视为无效。 +// 返回 true 表示该值可以写入日志、指标或 span;数字 0、布尔 false、空对象/数组仍保留为有效值,避免破坏非字符串字段的历史兼容性。 +// 边界情况:只跳过明确的空字符串,不裁剪空白字符串,避免改变调用方对原始文本值的处理。 +func isNonEmptyJSONValue(result gjson.Result) bool { + if !result.Exists() { + return false + } + value := result.Value() + if value == nil || value == "" { + return false + } + return true +} + // shouldLogDebug returns true if the log level is debug or trace func shouldLogDebug() bool { value, err := proxywasm.CallForeignFunction("get_log_level", nil) diff --git a/plugins/wasm-go/extensions/ai-statistics/main_test.go b/plugins/wasm-go/extensions/ai-statistics/main_test.go index 4af76e686..13dd363d2 100644 --- a/plugins/wasm-go/extensions/ai-statistics/main_test.go +++ b/plugins/wasm-go/extensions/ai-statistics/main_test.go @@ -21,6 +21,7 @@ import ( "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" "github.com/higress-group/wasm-go/pkg/test" + "github.com/higress-group/wasm-go/pkg/wrapper" "github.com/stretchr/testify/require" ) @@ -110,6 +111,33 @@ var streamingBodyConfig = func() json.RawMessage { return data }() +var streamingModelExtractionConfig = func() json.RawMessage { + data, _ := json.Marshal(map[string]interface{}{ + "attributes": []map[string]interface{}{ + { + "key": "first_model", + "value_source": "response_streaming_body", + "value": "model", + "rule": "first", + "apply_to_log": true, + "apply_to_span": false, + "as_separate_log_field": false, + }, + { + "key": "replace_model", + "value_source": "response_streaming_body", + "value": "model", + "rule": "replace", + "apply_to_log": true, + "apply_to_span": false, + "as_separate_log_field": false, + }, + }, + "disable_openai_usage": true, + }) + return data +}() + // 测试配置:请求体属性配置 var requestBodyConfig = func() json.RawMessage { data, _ := json.Marshal(map[string]interface{}{ @@ -467,6 +495,17 @@ func TestOnHttpResponseHeaders(t *testing.T) { }) } +func getAILogAttributes(t *testing.T, host test.TestHost) map[string]interface{} { + t.Helper() + + raw, err := host.GetProperty([]string{wrapper.AILogKey}) + require.NoError(t, err) + + var attrs map[string]interface{} + require.NoError(t, json.Unmarshal([]byte(wrapper.UnmarshalStr(`"`+string(raw)+`"`)), &attrs)) + return attrs +} + func TestOnHttpStreamingBody(t *testing.T) { test.RunTest(t, func(t *testing.T) { // 测试流式响应体处理 @@ -511,6 +550,68 @@ func TestOnHttpStreamingBody(t *testing.T) { host.CompleteHttp() }) + t.Run("streaming first and replace skip empty model chunks", func(t *testing.T) { + host, status := test.NewTestHost(streamingModelExtractionConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + action := host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + }) + require.Equal(t, types.ActionContinue, action) + + action = host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"content-type", "text/event-stream"}, + }) + require.Equal(t, types.ActionContinue, action) + + action = host.CallOnHttpStreamingResponseBody([]byte("data: {\"model\":\"\"}\n\n"), false) + require.Equal(t, types.ActionContinue, action) + action = host.CallOnHttpStreamingResponseBody([]byte("data: {\"model\":null}\n\n"), false) + require.Equal(t, types.ActionContinue, action) + action = host.CallOnHttpStreamingResponseBody([]byte("data: {\"model\":\"gpt-4o\"}\n\n"), true) + require.Equal(t, types.ActionContinue, action) + + attrs := getAILogAttributes(t, host) + require.Equal(t, "gpt-4o", attrs["first_model"]) + require.Equal(t, "gpt-4o", attrs["replace_model"]) + + host.CompleteHttp() + }) + + t.Run("streaming first and replace return nil when model path is missing", func(t *testing.T) { + host, status := test.NewTestHost(streamingModelExtractionConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + action := host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + }) + require.Equal(t, types.ActionContinue, action) + + action = host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"content-type", "text/event-stream"}, + }) + require.Equal(t, types.ActionContinue, action) + + action = host.CallOnHttpStreamingResponseBody([]byte("data: {\"choices\":[]}\n\n"), false) + require.Equal(t, types.ActionContinue, action) + action = host.CallOnHttpStreamingResponseBody([]byte("data: {\"choices\":[{\"delta\":{\"content\":\"hello\"}}]}\n\n"), true) + require.Equal(t, types.ActionContinue, action) + + attrs := getAILogAttributes(t, host) + require.Nil(t, attrs["first_model"]) + require.Nil(t, attrs["replace_model"]) + + host.CompleteHttp() + }) + // 测试不包含 token 统计的流式响应体处理 t.Run("streaming body without token usage", func(t *testing.T) { host, status := test.NewTestHost(streamingBodyConfig) @@ -1452,6 +1553,58 @@ func TestSessionIdExtraction(t *testing.T) { }) } +// TestExtractStreamingBodyByJsonPath 单独测试流式响应 body 的 JSONPath 提取规则 +func TestExtractStreamingBodyByJsonPath(t *testing.T) { + t.Run("first skips empty string chunk", func(t *testing.T) { + // Azure/OpenAI 兼容流可能先返回带空 model 的过滤结果 chunk,后续 chunk 才有真实模型名。 + chunks := []byte(`data: {"choices":[],"created":0,"id":"","model":"","object":""} + +data: {"choices":[{"delta":{"content":""}}],"created":1777444731,"id":"chatcmpl-1","model":"gpt-5.4-2026-03-05","object":"chat.completion.chunk"}`) + + value := extractStreamingBodyByJsonPath(chunks, "model", RuleFirst) + + require.Equal(t, "gpt-5.4-2026-03-05", value) + }) + + t.Run("replace skips trailing empty string chunk", func(t *testing.T) { + chunks := []byte(`data: {"model":"gpt-4o"} + +data: {"model":""}`) + + value := extractStreamingBodyByJsonPath(chunks, "model", RuleReplace) + + require.Equal(t, "gpt-4o", value) + }) + + t.Run("first returns nil when path is missing in all chunks", func(t *testing.T) { + chunks := []byte(`data: {"choices":[]} + +data: {"choices":[{"delta":{"content":"hello"}}]}`) + + value := extractStreamingBodyByJsonPath(chunks, "model", RuleFirst) + + require.Nil(t, value) + }) + + t.Run("first skips explicit null chunk", func(t *testing.T) { + chunks := []byte(`data: {"model":null} + +data: {"model":"gpt-4o"}`) + + value := extractStreamingBodyByJsonPath(chunks, "model", RuleFirst) + + require.Equal(t, "gpt-4o", value) + }) + + t.Run("zero and false remain valid values", func(t *testing.T) { + numberValue := extractStreamingBodyByJsonPath([]byte(`data: {"usage":{"total_tokens":0}}`), "usage.total_tokens", RuleFirst) + boolValue := extractStreamingBodyByJsonPath([]byte(`data: {"filtered":false}`), "filtered", RuleFirst) + + require.Equal(t, float64(0), numberValue) + require.Equal(t, false, boolValue) + }) +} + // TestExtractStreamingToolCalls 单独测试 extractStreamingToolCalls 函数 func TestExtractStreamingToolCalls(t *testing.T) { t.Run("single tool call assembly", func(t *testing.T) {