fix(ai-cache): handle SSE first chunk with role only (fixes #3953) (#3962)

Signed-off-by: CH3CHO <ch3cho@qq.com>
Co-authored-by: woody <yaodiwu618@gmail.com>
This commit is contained in:
Kent Dong
2026-06-23 17:38:50 +08:00
committed by GitHub
parent 7c94b5a822
commit 1694f48fd9
2 changed files with 205 additions and 15 deletions

View File

@@ -890,6 +890,175 @@ data: [DONE]`
require.Equal(t, expectedStreamResponseBody, actualStreamResponseBody)
})
// 测试流式响应体处理:首 chunk 只带 role 无 content混元大模型场景#3953
// 验证:
// 1) processSSEMessage 不会因为首 chunk 没有 content 而报错,
// 避免后续 chunk 被 ERROR_PARTIAL_MESSAGE_KEY 短路。
// 2) 全部 content chunk 累积完成后data: [DONE] 触发的 processStreamLastChunk
// 必须把累积内容写入 Redis —— 而不是像修复前那样返回空字符串导致 cacheResponse
// 直接吞掉。
t.Run("stream response body with role-only first chunk", func(t *testing.T) {
host, status := test.NewTestHost(basicRedisConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 设置请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
{"content-type", "application/json"},
})
// 设置流式请求体
requestBody := `{
"model": "hy-mt2-pro",
"messages": [
{
"role": "user",
"content": "介绍天津"
}
],
"stream": true
}`
host.CallOnHttpRequestBody([]byte(requestBody))
// 模拟 Redis 缓存未命中CallOnRedisCall 会消费 callout 列表中的第 0 项,
// 即上一步 CheckCacheForKey 产生的 GET。响应后该条目被移除
// 流式响应便会走完整的 cacheResponse 写缓存路径。
host.CallOnRedisCall(0, test.CreateRedisRespNull())
// 设置流式响应头
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
// 首 chunk只有 delta.role没有 delta.content混元/hy-mt2-pro 行为)
roleOnlyChunk := `data: {"id":"0b8ed3c9-3710-920b-abd9-581a0a77fd28","object":"chat.completion.chunk","created":1781169426,"model":"hy-mt2-pro","choices":[{"index":0,"delta":{"role":"assistant"}}]}
`
// 中间 chunk包含实际 content
contentChunk := `data: {"id":"0b8ed3c9-3710-920b-abd9-581a0a77fd28","object":"chat.completion.chunk","created":1781169426,"model":"hy-mt2-pro","choices":[{"index":0,"delta":{"content":"天津"}}]}
data: {"id":"0b8ed3c9-3710-920b-abd9-581a0a77fd28","object":"chat.completion.chunk","created":1781169426,"model":"hy-mt2-pro","choices":[{"index":0,"delta":{"content":"是历史文化名城"}}]}
`
// 末 chunk流结束标记
doneChunk := `data: [DONE]
`
// 三次调用模拟上游分片推送:首 chunk 不带 content后续 chunk 带 content
// 修复前:第一次调用会因首 chunk 无 content 报错ERROR_PARTIAL_MESSAGE_KEY 被设置,
// 后续所有 chunk 在 onHttpResponseBody 入口被短路,缓存永远写不进去。
// 修复后:首 chunk 静默跳过content chunk 正常累积并写入缓存。
action := host.CallOnHttpStreamingResponseBody([]byte(roleOnlyChunk), false)
require.Equal(t, types.ActionContinue, action)
action = host.CallOnHttpStreamingResponseBody([]byte(contentChunk), false)
require.Equal(t, types.ActionContinue, action)
action = host.CallOnHttpStreamingResponseBody([]byte(doneChunk), true)
require.Equal(t, types.ActionContinue, action)
// 在响应 SET 之前先快照 callout 属性 —— 框架在 CallOnRedisCallResponse
// 时会同步把条目从 map 里清掉,所以必须在消费前抓取。
redisCalls := host.GetRedisCalloutAttributes()
require.GreaterOrEqual(t, len(redisCalls), 1,
"expected a SET redis callout after [DONE], got %d", len(redisCalls))
// 模拟 Redis SET 响应。
host.CallOnRedisCall(0, test.CreateRedisRespString("OK"))
// 断言上游流式响应走到末尾后ai-cache 必须真的发出一次 SET
// 且写入的值是前面 content chunk 累积出的完整文本。
setCall := redisCalls[len(redisCalls)-1]
setQuery := string(setCall.Query)
require.Contains(t, setQuery, "higress-ai-cache:介绍天津",
"SET command should target the cache key derived from the request, got query: %q", setQuery)
require.Contains(t, setQuery, "天津是历史文化名城",
"SET command should contain the accumulated value, got query: %q", setQuery)
})
// 测试流式响应体处理:最后一个 buffer 同时包含最后一段 content 与 data: [DONE]
// 验证:
// 1) 同一 buffer 内的最后一段 content 不会被 [DONE] 早 return 丢掉;
// 2) cacheResponse SET 写入的值是先前 chunk 与本次合并后的完整累积文本。
// 回归保护PR #3962 review 指出当 content 与 [DONE] 在同一 lastChunk 到达时,
// processSSEMessage 会在 [DONE] 处直接 return ctx 旧值,导致最后一段 content 丢失。
t.Run("stream response body with final content and [DONE] in same chunk", func(t *testing.T) {
host, status := test.NewTestHost(basicRedisConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 设置请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
{"content-type", "application/json"},
})
// 设置流式请求体
requestBody := `{
"model": "hy-mt2-pro",
"messages": [
{
"role": "user",
"content": "介绍天津"
}
],
"stream": true
}`
host.CallOnHttpRequestBody([]byte(requestBody))
// 模拟 Redis 缓存未命中(消费 CheckCacheForKey 的 GET
host.CallOnRedisCall(0, test.CreateRedisRespNull())
// 设置流式响应头
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
// 非末次 chunk先累积 "天津"
firstChunk := `data: {"id":"0b8ed3c9-3710-920b-abd9-581a0a77fd28","object":"chat.completion.chunk","created":1781169426,"model":"hy-mt2-pro","choices":[{"index":0,"delta":{"content":"天津"}}]}
`
// 末次 chunk最后一段 content + [DONE] 同一个 buffer 到达
finalChunk := `data: {"id":"0b8ed3c9-3710-920b-abd9-581a0a77fd28","object":"chat.completion.chunk","created":1781169426,"model":"hy-mt2-pro","choices":[{"index":0,"delta":{"content":"是历史文化名城"}}]}
data: [DONE]
`
action := host.CallOnHttpStreamingResponseBody([]byte(firstChunk), false)
require.Equal(t, types.ActionContinue, action)
action = host.CallOnHttpStreamingResponseBody([]byte(finalChunk), true)
require.Equal(t, types.ActionContinue, action)
// 在消费前快照 callout —— 框架在 CallOnRedisCall 同步把条目从 map 里清掉。
redisCalls := host.GetRedisCalloutAttributes()
require.GreaterOrEqual(t, len(redisCalls), 1,
"expected a SET redis callout after [DONE], got %d", len(redisCalls))
// 模拟 Redis SET 响应
host.CallOnRedisCall(0, test.CreateRedisRespString("OK"))
// 断言 SET value 必须是完整累积文本,而非被 [DONE] 截断后的 "天津"
setCall := redisCalls[len(redisCalls)-1]
setQuery := string(setCall.Query)
require.Contains(t, setQuery, "higress-ai-cache:介绍天津",
"SET command should target the cache key derived from the request, got query: %q", setQuery)
require.Contains(t, setQuery, "天津是历史文化名城",
"SET command should contain the full accumulated value (final chunk content must not be dropped by [DONE]), got query: %q", setQuery)
})
// 测试无缓存键的响应体处理
t.Run("response body without cache key", func(t *testing.T) {
host, status := test.NewTestHost(basicRedisConfig)

View File

@@ -88,6 +88,13 @@ func processStreamLastChunk(ctx wrapper.HttpContext, c config.PluginConfig, chun
if err != nil {
return "", fmt.Errorf("[processStreamLastChunk] processSSEMessage failed, error: %v", err)
}
// 兜底:[DONE] 或其它尾部 chunk 无 content 时processSSEMessage 返回空,
// 此时从 ctx 取已累积的缓存内容,避免缓存写空。
if value == "" {
if tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY); tempContentI != nil {
value = tempContentI.(string)
}
}
return value, nil
}
tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY)
@@ -99,6 +106,11 @@ func processStreamLastChunk(ctx wrapper.HttpContext, c config.PluginConfig, chun
func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessage string, log log.Log) (string, error) {
content := ""
// done 标记本次 sseMessage 是否遇到 [DONE]。当最后一段 content 与 [DONE]
// 处于同一 buffer 时,必须跳出循环后由循环外的 merge 逻辑统一合并到
// CACHE_CONTENT_CONTEXT_KEY否则本次解析的最后一段 content 会被 [DONE]
// 早 return 丢弃PR #3962 review
done := false
for _, chunk := range strings.Split(sseMessage, "\n\n") {
log.Debugf("single sse message: %s", chunk)
subMessages := strings.Split(chunk, "\n")
@@ -117,7 +129,9 @@ func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessag
bodyJson := message[5:]
if strings.TrimSpace(bodyJson) == "[DONE]" {
return content, nil
// 跳出循环,把已解析的局部 content 留到循环外统一合并。
done = true
break
}
// Extract values from JSON fields
@@ -130,22 +144,29 @@ func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessag
}
// Check if the ResponseBody field exists
if !responseBody.Exists() {
if ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY) != nil {
log.Debugf("[processSSEMessage] unable to extract content from message; cache content is not nil: %s", message)
return content, nil
}
return content, fmt.Errorf("[processSSEMessage] unable to extract content from message; cache content is nil: %s", message)
} else {
if responseBody.Exists() {
content += responseBody.String()
}
}
tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY)
// If there is no content in the cache, initialize and set the content
if tempContentI == nil {
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content)
} else {
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, tempContentI.(string)+content)
// 本次 sseMessage 既没解析到 content 也没遇到 [DONE]:保持 ctx 不变,直接返回。
if content == "" && !done {
log.Debugf("[processSSEMessage] no content extracted; skipping cache update: %s", sseMessage)
return "", nil
}
return content, nil
if content != "" {
if v := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY); v == nil {
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content)
} else {
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, v.(string)+content)
}
}
// handleStreamChunk 不使用返回值processStreamLastChunk 把它作为 cacheResponse 的
// SET value必须是完整累积值避免最后一段 content 因 [DONE] 早 return 被丢)。
if v := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY); v != nil {
return v.(string), nil
}
return "", nil
}