From 1694f48fd9d41b1000a8d75d8097b23c1bab56ae Mon Sep 17 00:00:00 2001 From: Kent Dong Date: Tue, 23 Jun 2026 17:38:50 +0800 Subject: [PATCH] fix(ai-cache): handle SSE first chunk with role only (fixes #3953) (#3962) Signed-off-by: CH3CHO Co-authored-by: woody --- .../wasm-go/extensions/ai-cache/main_test.go | 169 ++++++++++++++++++ plugins/wasm-go/extensions/ai-cache/util.go | 51 ++++-- 2 files changed, 205 insertions(+), 15 deletions(-) diff --git a/plugins/wasm-go/extensions/ai-cache/main_test.go b/plugins/wasm-go/extensions/ai-cache/main_test.go index 525b8fa3b..2a2b797c4 100644 --- a/plugins/wasm-go/extensions/ai-cache/main_test.go +++ b/plugins/wasm-go/extensions/ai-cache/main_test.go @@ -890,6 +890,175 @@ data: [DONE]` require.Equal(t, expectedStreamResponseBody, actualStreamResponseBody) }) + // 测试流式响应体处理:首 chunk 只带 role 无 content(混元大模型场景,#3953) + // 验证: + // 1) processSSEMessage 不会因为首 chunk 没有 content 而报错, + // 避免后续 chunk 被 ERROR_PARTIAL_MESSAGE_KEY 短路。 + // 2) 全部 content chunk 累积完成后,data: [DONE] 触发的 processStreamLastChunk + // 必须把累积内容写入 Redis —— 而不是像修复前那样返回空字符串导致 cacheResponse + // 直接吞掉。 + t.Run("stream response body with role-only first chunk", func(t *testing.T) { + host, status := test.NewTestHost(basicRedisConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 设置请求头 + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/api/chat"}, + {":method", "POST"}, + {"content-type", "application/json"}, + }) + + // 设置流式请求体 + requestBody := `{ + "model": "hy-mt2-pro", + "messages": [ + { + "role": "user", + "content": "介绍天津" + } + ], + "stream": true + }` + host.CallOnHttpRequestBody([]byte(requestBody)) + + // 模拟 Redis 缓存未命中(CallOnRedisCall 会消费 callout 列表中的第 0 项, + // 即上一步 CheckCacheForKey 产生的 GET)。响应后该条目被移除, + // 流式响应便会走完整的 cacheResponse 写缓存路径。 + host.CallOnRedisCall(0, test.CreateRedisRespNull()) + + // 设置流式响应头 + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"content-type", "text/event-stream"}, + }) + + // 首 chunk:只有 delta.role,没有 delta.content(混元/hy-mt2-pro 行为) + roleOnlyChunk := `data: {"id":"0b8ed3c9-3710-920b-abd9-581a0a77fd28","object":"chat.completion.chunk","created":1781169426,"model":"hy-mt2-pro","choices":[{"index":0,"delta":{"role":"assistant"}}]} + +` + + // 中间 chunk:包含实际 content + contentChunk := `data: {"id":"0b8ed3c9-3710-920b-abd9-581a0a77fd28","object":"chat.completion.chunk","created":1781169426,"model":"hy-mt2-pro","choices":[{"index":0,"delta":{"content":"天津"}}]} + +data: {"id":"0b8ed3c9-3710-920b-abd9-581a0a77fd28","object":"chat.completion.chunk","created":1781169426,"model":"hy-mt2-pro","choices":[{"index":0,"delta":{"content":"是历史文化名城"}}]} + +` + + // 末 chunk:流结束标记 + doneChunk := `data: [DONE] + +` + + // 三次调用模拟上游分片推送:首 chunk 不带 content,后续 chunk 带 content + // 修复前:第一次调用会因首 chunk 无 content 报错,ERROR_PARTIAL_MESSAGE_KEY 被设置, + // 后续所有 chunk 在 onHttpResponseBody 入口被短路,缓存永远写不进去。 + // 修复后:首 chunk 静默跳过,content chunk 正常累积并写入缓存。 + action := host.CallOnHttpStreamingResponseBody([]byte(roleOnlyChunk), false) + require.Equal(t, types.ActionContinue, action) + + action = host.CallOnHttpStreamingResponseBody([]byte(contentChunk), false) + require.Equal(t, types.ActionContinue, action) + + action = host.CallOnHttpStreamingResponseBody([]byte(doneChunk), true) + require.Equal(t, types.ActionContinue, action) + + // 在响应 SET 之前先快照 callout 属性 —— 框架在 CallOnRedisCallResponse + // 时会同步把条目从 map 里清掉,所以必须在消费前抓取。 + redisCalls := host.GetRedisCalloutAttributes() + require.GreaterOrEqual(t, len(redisCalls), 1, + "expected a SET redis callout after [DONE], got %d", len(redisCalls)) + + // 模拟 Redis SET 响应。 + host.CallOnRedisCall(0, test.CreateRedisRespString("OK")) + + // 断言:上游流式响应走到末尾后,ai-cache 必须真的发出一次 SET, + // 且写入的值是前面 content chunk 累积出的完整文本。 + setCall := redisCalls[len(redisCalls)-1] + setQuery := string(setCall.Query) + require.Contains(t, setQuery, "higress-ai-cache:介绍天津", + "SET command should target the cache key derived from the request, got query: %q", setQuery) + require.Contains(t, setQuery, "天津是历史文化名城", + "SET command should contain the accumulated value, got query: %q", setQuery) + }) + + // 测试流式响应体处理:最后一个 buffer 同时包含最后一段 content 与 data: [DONE] + // 验证: + // 1) 同一 buffer 内的最后一段 content 不会被 [DONE] 早 return 丢掉; + // 2) cacheResponse SET 写入的值是先前 chunk 与本次合并后的完整累积文本。 + // 回归保护:PR #3962 review 指出当 content 与 [DONE] 在同一 lastChunk 到达时, + // processSSEMessage 会在 [DONE] 处直接 return ctx 旧值,导致最后一段 content 丢失。 + t.Run("stream response body with final content and [DONE] in same chunk", func(t *testing.T) { + host, status := test.NewTestHost(basicRedisConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + // 设置请求头 + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/api/chat"}, + {":method", "POST"}, + {"content-type", "application/json"}, + }) + + // 设置流式请求体 + requestBody := `{ + "model": "hy-mt2-pro", + "messages": [ + { + "role": "user", + "content": "介绍天津" + } + ], + "stream": true + }` + host.CallOnHttpRequestBody([]byte(requestBody)) + + // 模拟 Redis 缓存未命中(消费 CheckCacheForKey 的 GET) + host.CallOnRedisCall(0, test.CreateRedisRespNull()) + + // 设置流式响应头 + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"content-type", "text/event-stream"}, + }) + + // 非末次 chunk:先累积 "天津" + firstChunk := `data: {"id":"0b8ed3c9-3710-920b-abd9-581a0a77fd28","object":"chat.completion.chunk","created":1781169426,"model":"hy-mt2-pro","choices":[{"index":0,"delta":{"content":"天津"}}]} + +` + + // 末次 chunk:最后一段 content + [DONE] 同一个 buffer 到达 + finalChunk := `data: {"id":"0b8ed3c9-3710-920b-abd9-581a0a77fd28","object":"chat.completion.chunk","created":1781169426,"model":"hy-mt2-pro","choices":[{"index":0,"delta":{"content":"是历史文化名城"}}]} + +data: [DONE] + +` + + action := host.CallOnHttpStreamingResponseBody([]byte(firstChunk), false) + require.Equal(t, types.ActionContinue, action) + + action = host.CallOnHttpStreamingResponseBody([]byte(finalChunk), true) + require.Equal(t, types.ActionContinue, action) + + // 在消费前快照 callout —— 框架在 CallOnRedisCall 同步把条目从 map 里清掉。 + redisCalls := host.GetRedisCalloutAttributes() + require.GreaterOrEqual(t, len(redisCalls), 1, + "expected a SET redis callout after [DONE], got %d", len(redisCalls)) + + // 模拟 Redis SET 响应 + host.CallOnRedisCall(0, test.CreateRedisRespString("OK")) + + // 断言 SET value 必须是完整累积文本,而非被 [DONE] 截断后的 "天津" + setCall := redisCalls[len(redisCalls)-1] + setQuery := string(setCall.Query) + require.Contains(t, setQuery, "higress-ai-cache:介绍天津", + "SET command should target the cache key derived from the request, got query: %q", setQuery) + require.Contains(t, setQuery, "天津是历史文化名城", + "SET command should contain the full accumulated value (final chunk content must not be dropped by [DONE]), got query: %q", setQuery) + }) + // 测试无缓存键的响应体处理 t.Run("response body without cache key", func(t *testing.T) { host, status := test.NewTestHost(basicRedisConfig) diff --git a/plugins/wasm-go/extensions/ai-cache/util.go b/plugins/wasm-go/extensions/ai-cache/util.go index 382b361f6..ffc057522 100644 --- a/plugins/wasm-go/extensions/ai-cache/util.go +++ b/plugins/wasm-go/extensions/ai-cache/util.go @@ -88,6 +88,13 @@ func processStreamLastChunk(ctx wrapper.HttpContext, c config.PluginConfig, chun if err != nil { return "", fmt.Errorf("[processStreamLastChunk] processSSEMessage failed, error: %v", err) } + // 兜底:[DONE] 或其它尾部 chunk 无 content 时,processSSEMessage 返回空, + // 此时从 ctx 取已累积的缓存内容,避免缓存写空。 + if value == "" { + if tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY); tempContentI != nil { + value = tempContentI.(string) + } + } return value, nil } tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY) @@ -99,6 +106,11 @@ func processStreamLastChunk(ctx wrapper.HttpContext, c config.PluginConfig, chun func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessage string, log log.Log) (string, error) { content := "" + // done 标记本次 sseMessage 是否遇到 [DONE]。当最后一段 content 与 [DONE] + // 处于同一 buffer 时,必须跳出循环后由循环外的 merge 逻辑统一合并到 + // CACHE_CONTENT_CONTEXT_KEY,否则本次解析的最后一段 content 会被 [DONE] + // 早 return 丢弃(PR #3962 review)。 + done := false for _, chunk := range strings.Split(sseMessage, "\n\n") { log.Debugf("single sse message: %s", chunk) subMessages := strings.Split(chunk, "\n") @@ -117,7 +129,9 @@ func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessag bodyJson := message[5:] if strings.TrimSpace(bodyJson) == "[DONE]" { - return content, nil + // 跳出循环,把已解析的局部 content 留到循环外统一合并。 + done = true + break } // Extract values from JSON fields @@ -130,22 +144,29 @@ func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessag } // Check if the ResponseBody field exists - if !responseBody.Exists() { - if ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY) != nil { - log.Debugf("[processSSEMessage] unable to extract content from message; cache content is not nil: %s", message) - return content, nil - } - return content, fmt.Errorf("[processSSEMessage] unable to extract content from message; cache content is nil: %s", message) - } else { + if responseBody.Exists() { content += responseBody.String() } } - tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY) - // If there is no content in the cache, initialize and set the content - if tempContentI == nil { - ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content) - } else { - ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, tempContentI.(string)+content) + + // 本次 sseMessage 既没解析到 content 也没遇到 [DONE]:保持 ctx 不变,直接返回。 + if content == "" && !done { + log.Debugf("[processSSEMessage] no content extracted; skipping cache update: %s", sseMessage) + return "", nil } - return content, nil + + if content != "" { + if v := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY); v == nil { + ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content) + } else { + ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, v.(string)+content) + } + } + + // handleStreamChunk 不使用返回值;processStreamLastChunk 把它作为 cacheResponse 的 + // SET value,必须是完整累积值(避免最后一段 content 因 [DONE] 早 return 被丢)。 + if v := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY); v != nil { + return v.(string), nil + } + return "", nil }