fix: vertex streaming chunk parse (#3599)

This commit is contained in:
woody
2026-03-16 21:13:18 +08:00
committed by GitHub
parent 94f0d7179f
commit 8961db2e90
3 changed files with 235 additions and 20 deletions

View File

@@ -1046,7 +1046,7 @@ func ExtractStreamingEvents(ctx wrapper.HttpContext, chunk []byte) []StreamEvent
if lineStartIndex != -1 {
value := string(body[valueStartIndex:i])
currentEvent.SetValue(currentKey, value)
} else {
} else if eventStartIndex != -1 {
currentEvent.RawEvent = string(body[eventStartIndex : i+1])
// Extra new line. The current event is complete.
events = append(events, *currentEvent)

View File

@@ -45,6 +45,7 @@ const (
contextClaudeMarker = "isClaudeRequest"
contextOpenAICompatibleMarker = "isOpenAICompatibleRequest"
contextVertexRawMarker = "isVertexRawRequest"
contextVertexStreamDoneMarker = "vertexStreamDoneSent"
vertexAnthropicVersion = "vertex-2023-10-16"
vertexImageVariationDefaultPrompt = "Create variations of the provided image."
)
@@ -621,23 +622,46 @@ func (v *vertexProvider) OnStreamingResponseBody(ctx wrapper.HttpContext, name A
return v.claude.OnStreamingResponseBody(ctx, name, chunk, isLastChunk)
}
log.Infof("[vertexProvider] receive chunk body: %s", string(chunk))
if isLastChunk {
return []byte(ssePrefix + "[DONE]\n\n"), nil
}
if len(chunk) == 0 {
if len(chunk) == 0 && !isLastChunk {
return nil, nil
}
if name != ApiNameChatCompletion {
if isLastChunk {
return []byte(ssePrefix + "[DONE]\n\n"), nil
}
return chunk, nil
}
responseBuilder := &strings.Builder{}
lines := strings.Split(string(chunk), "\n")
for _, data := range lines {
if len(data) < 6 {
// ignore blank line or wrong format
// Flush a trailing event when upstream closes stream without a final blank line.
chunkForParsing := chunk
if isLastChunk {
trailingNewLineCount := 0
for i := len(chunkForParsing) - 1; i >= 0 && chunkForParsing[i] == '\n'; i-- {
trailingNewLineCount++
}
if trailingNewLineCount < 2 {
chunkForParsing = append([]byte(nil), chunk...)
for i := 0; i < 2-trailingNewLineCount; i++ {
chunkForParsing = append(chunkForParsing, '\n')
}
}
}
streamEvents := ExtractStreamingEvents(ctx, chunkForParsing)
doneSent, _ := ctx.GetContext(contextVertexStreamDoneMarker).(bool)
appendDone := isLastChunk && !doneSent
for _, event := range streamEvents {
data := event.Data
if data == "" {
continue
}
if data == streamEndDataValue {
if !doneSent {
appendDone = true
doneSent = true
}
continue
}
data = data[6:]
var vertexResp vertexChatResponse
if err := json.Unmarshal([]byte(data), &vertexResp); err != nil {
log.Errorf("unable to unmarshal vertex response: %v", err)
@@ -651,7 +675,17 @@ func (v *vertexProvider) OnStreamingResponseBody(ctx wrapper.HttpContext, name A
}
v.appendResponse(responseBuilder, string(responseBody))
}
if appendDone {
responseBuilder.WriteString(ssePrefix + "[DONE]\n\n")
doneSent = true
}
ctx.SetContext(contextVertexStreamDoneMarker, doneSent)
modifiedResponseChunk := responseBuilder.String()
if modifiedResponseChunk == "" {
// Returning an empty payload prevents main.go from falling back to
// forwarding the original raw chunk, which may contain partial JSON.
return []byte(""), nil
}
log.Debugf("=== modified response chunk: %s", modifiedResponseChunk)
return []byte(modifiedResponseChunk), nil
}

View File

@@ -691,7 +691,7 @@ func RunVertexOpenAICompatibleModeOnHttpRequestBodyTests(t *testing.T) {
func RunVertexExpressModeOnStreamingResponseBodyTests(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试 Vertex Express Mode 流式响应处理
// 测试 Vertex Express Mode 流式响应处理:最后一个 chunk 不应丢失
t.Run("vertex express mode streaming response body", func(t *testing.T) {
host, status := test.NewTestHost(vertexExpressModeConfig)
defer host.Reset()
@@ -709,6 +709,9 @@ func RunVertexExpressModeOnStreamingResponseBodyTests(t *testing.T) {
requestBody := `{"model":"gemini-2.5-flash","messages":[{"role":"user","content":"test"}],"stream":true}`
host.CallOnHttpRequestBody([]byte(requestBody))
// 设置响应属性确保IsResponseFromUpstream()返回true
host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream"))
// 设置流式响应头
responseHeaders := [][2]string{
{":status", "200"},
@@ -717,8 +720,8 @@ func RunVertexExpressModeOnStreamingResponseBodyTests(t *testing.T) {
host.CallOnHttpResponseHeaders(responseHeaders)
// 模拟流式响应体
chunk1 := `data: {"candidates":[{"content":{"parts":[{"text":"Hello"}],"role":"model"},"finishReason":"","index":0}],"usageMetadata":{"promptTokenCount":9,"candidatesTokenCount":5,"totalTokenCount":14}}`
chunk2 := `data: {"candidates":[{"content":{"parts":[{"text":"Hello! How can I help you today?"}],"role":"model"},"finishReason":"STOP","index":0}],"usageMetadata":{"promptTokenCount":9,"candidatesTokenCount":12,"totalTokenCount":21}}`
chunk1 := "data: {\"candidates\":[{\"content\":{\"parts\":[{\"text\":\"Hello\"}],\"role\":\"model\"},\"finishReason\":\"\",\"index\":0}],\"usageMetadata\":{\"promptTokenCount\":9,\"candidatesTokenCount\":5,\"totalTokenCount\":14}}\n\n"
chunk2 := "data: {\"candidates\":[{\"content\":{\"parts\":[{\"text\":\"Hello! How can I help you today?\"}],\"role\":\"model\"},\"finishReason\":\"STOP\",\"index\":0}],\"usageMetadata\":{\"promptTokenCount\":9,\"candidatesTokenCount\":12,\"totalTokenCount\":21}}\n\n"
// 处理流式响应体
action1 := host.CallOnHttpStreamingResponseBody([]byte(chunk1), false)
@@ -727,16 +730,194 @@ func RunVertexExpressModeOnStreamingResponseBodyTests(t *testing.T) {
action2 := host.CallOnHttpStreamingResponseBody([]byte(chunk2), true)
require.Equal(t, types.ActionContinue, action2)
// 验证流式响应处理
debugLogs := host.GetDebugLogs()
hasStreamingLogs := false
for _, log := range debugLogs {
if strings.Contains(log, "streaming") || strings.Contains(log, "chunk") || strings.Contains(log, "vertex") {
hasStreamingLogs = true
// 验证最后一个 chunk 的内容不会被 [DONE] 覆盖
transformedResponseBody := host.GetResponseBody()
require.NotNil(t, transformedResponseBody)
responseStr := string(transformedResponseBody)
require.Contains(t, responseStr, "Hello! How can I help you today?", "last chunk content should be preserved")
require.Contains(t, responseStr, "data: [DONE]", "stream should end with [DONE]")
})
// 测试 Vertex Express Mode 流式响应处理:单个 SSE 事件被拆包时可正确重组
t.Run("vertex express mode streaming response body with split sse event", func(t *testing.T) {
host, status := test.NewTestHost(vertexExpressModeConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
{"Content-Type", "application/json"},
})
requestBody := `{"model":"gemini-2.5-flash","messages":[{"role":"user","content":"test"}],"stream":true}`
host.CallOnHttpRequestBody([]byte(requestBody))
// 设置响应属性确保IsResponseFromUpstream()返回true
host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream"))
responseHeaders := [][2]string{
{":status", "200"},
{"Content-Type", "text/event-stream"},
}
host.CallOnHttpResponseHeaders(responseHeaders)
fullEvent := "data: {\"candidates\":[{\"content\":{\"parts\":[{\"text\":\"split chunk\"}],\"role\":\"model\"},\"finishReason\":\"STOP\",\"index\":0}],\"usageMetadata\":{\"promptTokenCount\":1,\"candidatesTokenCount\":2,\"totalTokenCount\":3}}\n\n"
splitIdx := strings.Index(fullEvent, "chunk")
require.Greater(t, splitIdx, 0, "split marker should exist in test payload")
chunkPart1 := fullEvent[:splitIdx]
chunkPart2 := fullEvent[splitIdx:]
action1 := host.CallOnHttpStreamingResponseBody([]byte(chunkPart1), false)
require.Equal(t, types.ActionContinue, action1)
action2 := host.CallOnHttpStreamingResponseBody([]byte(chunkPart2), true)
require.Equal(t, types.ActionContinue, action2)
transformedResponseBody := host.GetResponseBody()
require.NotNil(t, transformedResponseBody)
responseStr := string(transformedResponseBody)
require.Contains(t, responseStr, "split chunk", "split SSE event should be reassembled and parsed")
require.Contains(t, responseStr, "data: [DONE]", "stream should end with [DONE]")
})
// 测试thoughtSignature 很大时,单个 SSE 事件被拆成多段也能重组并成功解析
t.Run("vertex express mode streaming response body with huge thought signature split across chunks", func(t *testing.T) {
host, status := test.NewTestHost(vertexExpressModeConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
{"Content-Type", "application/json"},
})
requestBody := `{"model":"gemini-2.5-flash","messages":[{"role":"user","content":"test"}],"stream":true}`
host.CallOnHttpRequestBody([]byte(requestBody))
host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream"))
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"Content-Type", "text/event-stream"},
})
hugeThoughtSignature := strings.Repeat("CmMBjz1rX4j+TQjtDy2rZxSdYOE1jUqDbRhWetraLlQNrkyaRNQZ/", 180)
fullEvent := "data: {\"candidates\":[{\"content\":{\"role\":\"model\",\"parts\":[{\"text\":\"thought-signature-merge-ok\",\"thoughtSignature\":\"" +
hugeThoughtSignature +
"\"}]},\"finishReason\":\"STOP\",\"index\":0}],\"usageMetadata\":{\"promptTokenCount\":28,\"candidatesTokenCount\":3589,\"totalTokenCount\":5240,\"thoughtsTokenCount\":1623}}\n\n"
signatureStart := strings.Index(fullEvent, "\"thoughtSignature\":\"")
require.Greater(t, signatureStart, 0, "thoughtSignature field should exist in test payload")
splitAt1 := signatureStart + len("\"thoughtSignature\":\"") + 700
splitAt2 := splitAt1 + 1600
require.Less(t, splitAt2, len(fullEvent)-1, "split indexes should keep payload in three chunks")
chunkPart1 := fullEvent[:splitAt1]
chunkPart2 := fullEvent[splitAt1:splitAt2]
chunkPart3 := fullEvent[splitAt2:]
action1 := host.CallOnHttpStreamingResponseBody([]byte(chunkPart1), false)
require.Equal(t, types.ActionContinue, action1)
firstBody := host.GetResponseBody()
require.Equal(t, 0, len(firstBody), "partial chunk should not be forwarded to client")
action2 := host.CallOnHttpStreamingResponseBody([]byte(chunkPart2), false)
require.Equal(t, types.ActionContinue, action2)
secondBody := host.GetResponseBody()
require.Equal(t, 0, len(secondBody), "partial chunk should not be forwarded to client")
action3 := host.CallOnHttpStreamingResponseBody([]byte(chunkPart3), true)
require.Equal(t, types.ActionContinue, action3)
transformedResponseBody := host.GetResponseBody()
require.NotNil(t, transformedResponseBody)
responseStr := string(transformedResponseBody)
require.Contains(t, responseStr, "thought-signature-merge-ok", "split huge thoughtSignature event should be reassembled and parsed")
require.Contains(t, responseStr, "data: [DONE]", "stream should end with [DONE]")
errorLogs := host.GetErrorLogs()
hasUnmarshalError := false
for _, log := range errorLogs {
if strings.Contains(log, "unable to unmarshal vertex response") {
hasUnmarshalError = true
break
}
}
require.True(t, hasStreamingLogs, "Should have streaming response processing logs")
require.False(t, hasUnmarshalError, "should not have vertex unmarshal errors for split huge thoughtSignature event")
})
// 测试:上游已发送 [DONE],框架再触发空的最后回调时不应重复输出 [DONE]
t.Run("vertex express mode streaming response body with upstream done and empty final callback", func(t *testing.T) {
host, status := test.NewTestHost(vertexExpressModeConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
{"Content-Type", "application/json"},
})
requestBody := `{"model":"gemini-2.5-flash","messages":[{"role":"user","content":"test"}],"stream":true}`
host.CallOnHttpRequestBody([]byte(requestBody))
host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream"))
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"Content-Type", "text/event-stream"},
})
doneChunk := "data: [DONE]\n\n"
action1 := host.CallOnHttpStreamingResponseBody([]byte(doneChunk), false)
require.Equal(t, types.ActionContinue, action1)
firstBody := host.GetResponseBody()
require.NotNil(t, firstBody)
require.Contains(t, string(firstBody), "data: [DONE]", "first callback should output [DONE]")
action2 := host.CallOnHttpStreamingResponseBody([]byte{}, true)
require.Equal(t, types.ActionContinue, action2)
debugLogs := host.GetDebugLogs()
doneChunkLogCount := 0
for _, log := range debugLogs {
if strings.Contains(log, "=== modified response chunk: data: [DONE]") {
doneChunkLogCount++
}
}
require.Equal(t, 1, doneChunkLogCount, "[DONE] should only be emitted once when upstream already sent it")
})
// 测试:最后一个 chunk 缺少 SSE 结束空行时isLastChunk=true 也应正确解析并输出
t.Run("vertex express mode streaming response body last chunk without terminator", func(t *testing.T) {
host, status := test.NewTestHost(vertexExpressModeConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
{"Content-Type", "application/json"},
})
requestBody := `{"model":"gemini-2.5-flash","messages":[{"role":"user","content":"test"}],"stream":true}`
host.CallOnHttpRequestBody([]byte(requestBody))
host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream"))
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"Content-Type", "text/event-stream"},
})
lastChunkWithoutTerminator := "data: {\"candidates\":[{\"content\":{\"parts\":[{\"text\":\"no terminator\"}],\"role\":\"model\"},\"finishReason\":\"STOP\",\"index\":0}],\"usageMetadata\":{\"promptTokenCount\":2,\"candidatesTokenCount\":3,\"totalTokenCount\":5}}"
action := host.CallOnHttpStreamingResponseBody([]byte(lastChunkWithoutTerminator), true)
require.Equal(t, types.ActionContinue, action)
transformedResponseBody := host.GetResponseBody()
require.NotNil(t, transformedResponseBody)
responseStr := string(transformedResponseBody)
require.Contains(t, responseStr, "no terminator", "last chunk without terminator should still be parsed")
require.Contains(t, responseStr, "data: [DONE]", "stream should end with [DONE]")
})
})
}