diff --git a/plugins/wasm-go/extensions/ai-proxy/provider/provider.go b/plugins/wasm-go/extensions/ai-proxy/provider/provider.go index 3a6d188d8..d827b2f8a 100644 --- a/plugins/wasm-go/extensions/ai-proxy/provider/provider.go +++ b/plugins/wasm-go/extensions/ai-proxy/provider/provider.go @@ -1046,7 +1046,7 @@ func ExtractStreamingEvents(ctx wrapper.HttpContext, chunk []byte) []StreamEvent if lineStartIndex != -1 { value := string(body[valueStartIndex:i]) currentEvent.SetValue(currentKey, value) - } else { + } else if eventStartIndex != -1 { currentEvent.RawEvent = string(body[eventStartIndex : i+1]) // Extra new line. The current event is complete. events = append(events, *currentEvent) diff --git a/plugins/wasm-go/extensions/ai-proxy/provider/vertex.go b/plugins/wasm-go/extensions/ai-proxy/provider/vertex.go index b6a10ac36..f868da178 100644 --- a/plugins/wasm-go/extensions/ai-proxy/provider/vertex.go +++ b/plugins/wasm-go/extensions/ai-proxy/provider/vertex.go @@ -45,6 +45,7 @@ const ( contextClaudeMarker = "isClaudeRequest" contextOpenAICompatibleMarker = "isOpenAICompatibleRequest" contextVertexRawMarker = "isVertexRawRequest" + contextVertexStreamDoneMarker = "vertexStreamDoneSent" vertexAnthropicVersion = "vertex-2023-10-16" vertexImageVariationDefaultPrompt = "Create variations of the provided image." ) @@ -621,23 +622,46 @@ func (v *vertexProvider) OnStreamingResponseBody(ctx wrapper.HttpContext, name A return v.claude.OnStreamingResponseBody(ctx, name, chunk, isLastChunk) } log.Infof("[vertexProvider] receive chunk body: %s", string(chunk)) - if isLastChunk { - return []byte(ssePrefix + "[DONE]\n\n"), nil - } - if len(chunk) == 0 { + if len(chunk) == 0 && !isLastChunk { return nil, nil } if name != ApiNameChatCompletion { + if isLastChunk { + return []byte(ssePrefix + "[DONE]\n\n"), nil + } return chunk, nil } + responseBuilder := &strings.Builder{} - lines := strings.Split(string(chunk), "\n") - for _, data := range lines { - if len(data) < 6 { - // ignore blank line or wrong format + // Flush a trailing event when upstream closes stream without a final blank line. + chunkForParsing := chunk + if isLastChunk { + trailingNewLineCount := 0 + for i := len(chunkForParsing) - 1; i >= 0 && chunkForParsing[i] == '\n'; i-- { + trailingNewLineCount++ + } + if trailingNewLineCount < 2 { + chunkForParsing = append([]byte(nil), chunk...) + for i := 0; i < 2-trailingNewLineCount; i++ { + chunkForParsing = append(chunkForParsing, '\n') + } + } + } + streamEvents := ExtractStreamingEvents(ctx, chunkForParsing) + doneSent, _ := ctx.GetContext(contextVertexStreamDoneMarker).(bool) + appendDone := isLastChunk && !doneSent + for _, event := range streamEvents { + data := event.Data + if data == "" { + continue + } + if data == streamEndDataValue { + if !doneSent { + appendDone = true + doneSent = true + } continue } - data = data[6:] var vertexResp vertexChatResponse if err := json.Unmarshal([]byte(data), &vertexResp); err != nil { log.Errorf("unable to unmarshal vertex response: %v", err) @@ -651,7 +675,17 @@ func (v *vertexProvider) OnStreamingResponseBody(ctx wrapper.HttpContext, name A } v.appendResponse(responseBuilder, string(responseBody)) } + if appendDone { + responseBuilder.WriteString(ssePrefix + "[DONE]\n\n") + doneSent = true + } + ctx.SetContext(contextVertexStreamDoneMarker, doneSent) modifiedResponseChunk := responseBuilder.String() + if modifiedResponseChunk == "" { + // Returning an empty payload prevents main.go from falling back to + // forwarding the original raw chunk, which may contain partial JSON. + return []byte(""), nil + } log.Debugf("=== modified response chunk: %s", modifiedResponseChunk) return []byte(modifiedResponseChunk), nil } diff --git a/plugins/wasm-go/extensions/ai-proxy/test/vertex.go b/plugins/wasm-go/extensions/ai-proxy/test/vertex.go index d57f86fad..a534dacbf 100644 --- a/plugins/wasm-go/extensions/ai-proxy/test/vertex.go +++ b/plugins/wasm-go/extensions/ai-proxy/test/vertex.go @@ -691,7 +691,7 @@ func RunVertexOpenAICompatibleModeOnHttpRequestBodyTests(t *testing.T) { func RunVertexExpressModeOnStreamingResponseBodyTests(t *testing.T) { test.RunTest(t, func(t *testing.T) { - // 测试 Vertex Express Mode 流式响应处理 + // 测试 Vertex Express Mode 流式响应处理:最后一个 chunk 不应丢失 t.Run("vertex express mode streaming response body", func(t *testing.T) { host, status := test.NewTestHost(vertexExpressModeConfig) defer host.Reset() @@ -709,6 +709,9 @@ func RunVertexExpressModeOnStreamingResponseBodyTests(t *testing.T) { requestBody := `{"model":"gemini-2.5-flash","messages":[{"role":"user","content":"test"}],"stream":true}` host.CallOnHttpRequestBody([]byte(requestBody)) + // 设置响应属性,确保IsResponseFromUpstream()返回true + host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream")) + // 设置流式响应头 responseHeaders := [][2]string{ {":status", "200"}, @@ -717,8 +720,8 @@ func RunVertexExpressModeOnStreamingResponseBodyTests(t *testing.T) { host.CallOnHttpResponseHeaders(responseHeaders) // 模拟流式响应体 - chunk1 := `data: {"candidates":[{"content":{"parts":[{"text":"Hello"}],"role":"model"},"finishReason":"","index":0}],"usageMetadata":{"promptTokenCount":9,"candidatesTokenCount":5,"totalTokenCount":14}}` - chunk2 := `data: {"candidates":[{"content":{"parts":[{"text":"Hello! How can I help you today?"}],"role":"model"},"finishReason":"STOP","index":0}],"usageMetadata":{"promptTokenCount":9,"candidatesTokenCount":12,"totalTokenCount":21}}` + chunk1 := "data: {\"candidates\":[{\"content\":{\"parts\":[{\"text\":\"Hello\"}],\"role\":\"model\"},\"finishReason\":\"\",\"index\":0}],\"usageMetadata\":{\"promptTokenCount\":9,\"candidatesTokenCount\":5,\"totalTokenCount\":14}}\n\n" + chunk2 := "data: {\"candidates\":[{\"content\":{\"parts\":[{\"text\":\"Hello! How can I help you today?\"}],\"role\":\"model\"},\"finishReason\":\"STOP\",\"index\":0}],\"usageMetadata\":{\"promptTokenCount\":9,\"candidatesTokenCount\":12,\"totalTokenCount\":21}}\n\n" // 处理流式响应体 action1 := host.CallOnHttpStreamingResponseBody([]byte(chunk1), false) @@ -727,16 +730,194 @@ func RunVertexExpressModeOnStreamingResponseBodyTests(t *testing.T) { action2 := host.CallOnHttpStreamingResponseBody([]byte(chunk2), true) require.Equal(t, types.ActionContinue, action2) - // 验证流式响应处理 - debugLogs := host.GetDebugLogs() - hasStreamingLogs := false - for _, log := range debugLogs { - if strings.Contains(log, "streaming") || strings.Contains(log, "chunk") || strings.Contains(log, "vertex") { - hasStreamingLogs = true + // 验证最后一个 chunk 的内容不会被 [DONE] 覆盖 + transformedResponseBody := host.GetResponseBody() + require.NotNil(t, transformedResponseBody) + responseStr := string(transformedResponseBody) + require.Contains(t, responseStr, "Hello! How can I help you today?", "last chunk content should be preserved") + require.Contains(t, responseStr, "data: [DONE]", "stream should end with [DONE]") + }) + + // 测试 Vertex Express Mode 流式响应处理:单个 SSE 事件被拆包时可正确重组 + t.Run("vertex express mode streaming response body with split sse event", func(t *testing.T) { + host, status := test.NewTestHost(vertexExpressModeConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + {"Content-Type", "application/json"}, + }) + + requestBody := `{"model":"gemini-2.5-flash","messages":[{"role":"user","content":"test"}],"stream":true}` + host.CallOnHttpRequestBody([]byte(requestBody)) + + // 设置响应属性,确保IsResponseFromUpstream()返回true + host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream")) + + responseHeaders := [][2]string{ + {":status", "200"}, + {"Content-Type", "text/event-stream"}, + } + host.CallOnHttpResponseHeaders(responseHeaders) + + fullEvent := "data: {\"candidates\":[{\"content\":{\"parts\":[{\"text\":\"split chunk\"}],\"role\":\"model\"},\"finishReason\":\"STOP\",\"index\":0}],\"usageMetadata\":{\"promptTokenCount\":1,\"candidatesTokenCount\":2,\"totalTokenCount\":3}}\n\n" + splitIdx := strings.Index(fullEvent, "chunk") + require.Greater(t, splitIdx, 0, "split marker should exist in test payload") + chunkPart1 := fullEvent[:splitIdx] + chunkPart2 := fullEvent[splitIdx:] + + action1 := host.CallOnHttpStreamingResponseBody([]byte(chunkPart1), false) + require.Equal(t, types.ActionContinue, action1) + action2 := host.CallOnHttpStreamingResponseBody([]byte(chunkPart2), true) + require.Equal(t, types.ActionContinue, action2) + + transformedResponseBody := host.GetResponseBody() + require.NotNil(t, transformedResponseBody) + responseStr := string(transformedResponseBody) + require.Contains(t, responseStr, "split chunk", "split SSE event should be reassembled and parsed") + require.Contains(t, responseStr, "data: [DONE]", "stream should end with [DONE]") + }) + + // 测试:thoughtSignature 很大时,单个 SSE 事件被拆成多段也能重组并成功解析 + t.Run("vertex express mode streaming response body with huge thought signature split across chunks", func(t *testing.T) { + host, status := test.NewTestHost(vertexExpressModeConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + {"Content-Type", "application/json"}, + }) + + requestBody := `{"model":"gemini-2.5-flash","messages":[{"role":"user","content":"test"}],"stream":true}` + host.CallOnHttpRequestBody([]byte(requestBody)) + host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream")) + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"Content-Type", "text/event-stream"}, + }) + + hugeThoughtSignature := strings.Repeat("CmMBjz1rX4j+TQjtDy2rZxSdYOE1jUqDbRhWetraLlQNrkyaRNQZ/", 180) + fullEvent := "data: {\"candidates\":[{\"content\":{\"role\":\"model\",\"parts\":[{\"text\":\"thought-signature-merge-ok\",\"thoughtSignature\":\"" + + hugeThoughtSignature + + "\"}]},\"finishReason\":\"STOP\",\"index\":0}],\"usageMetadata\":{\"promptTokenCount\":28,\"candidatesTokenCount\":3589,\"totalTokenCount\":5240,\"thoughtsTokenCount\":1623}}\n\n" + + signatureStart := strings.Index(fullEvent, "\"thoughtSignature\":\"") + require.Greater(t, signatureStart, 0, "thoughtSignature field should exist in test payload") + splitAt1 := signatureStart + len("\"thoughtSignature\":\"") + 700 + splitAt2 := splitAt1 + 1600 + require.Less(t, splitAt2, len(fullEvent)-1, "split indexes should keep payload in three chunks") + + chunkPart1 := fullEvent[:splitAt1] + chunkPart2 := fullEvent[splitAt1:splitAt2] + chunkPart3 := fullEvent[splitAt2:] + + action1 := host.CallOnHttpStreamingResponseBody([]byte(chunkPart1), false) + require.Equal(t, types.ActionContinue, action1) + firstBody := host.GetResponseBody() + require.Equal(t, 0, len(firstBody), "partial chunk should not be forwarded to client") + + action2 := host.CallOnHttpStreamingResponseBody([]byte(chunkPart2), false) + require.Equal(t, types.ActionContinue, action2) + secondBody := host.GetResponseBody() + require.Equal(t, 0, len(secondBody), "partial chunk should not be forwarded to client") + + action3 := host.CallOnHttpStreamingResponseBody([]byte(chunkPart3), true) + require.Equal(t, types.ActionContinue, action3) + + transformedResponseBody := host.GetResponseBody() + require.NotNil(t, transformedResponseBody) + responseStr := string(transformedResponseBody) + require.Contains(t, responseStr, "thought-signature-merge-ok", "split huge thoughtSignature event should be reassembled and parsed") + require.Contains(t, responseStr, "data: [DONE]", "stream should end with [DONE]") + + errorLogs := host.GetErrorLogs() + hasUnmarshalError := false + for _, log := range errorLogs { + if strings.Contains(log, "unable to unmarshal vertex response") { + hasUnmarshalError = true break } } - require.True(t, hasStreamingLogs, "Should have streaming response processing logs") + require.False(t, hasUnmarshalError, "should not have vertex unmarshal errors for split huge thoughtSignature event") + }) + + // 测试:上游已发送 [DONE],框架再触发空的最后回调时不应重复输出 [DONE] + t.Run("vertex express mode streaming response body with upstream done and empty final callback", func(t *testing.T) { + host, status := test.NewTestHost(vertexExpressModeConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + {"Content-Type", "application/json"}, + }) + + requestBody := `{"model":"gemini-2.5-flash","messages":[{"role":"user","content":"test"}],"stream":true}` + host.CallOnHttpRequestBody([]byte(requestBody)) + host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream")) + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"Content-Type", "text/event-stream"}, + }) + + doneChunk := "data: [DONE]\n\n" + action1 := host.CallOnHttpStreamingResponseBody([]byte(doneChunk), false) + require.Equal(t, types.ActionContinue, action1) + firstBody := host.GetResponseBody() + require.NotNil(t, firstBody) + require.Contains(t, string(firstBody), "data: [DONE]", "first callback should output [DONE]") + + action2 := host.CallOnHttpStreamingResponseBody([]byte{}, true) + require.Equal(t, types.ActionContinue, action2) + + debugLogs := host.GetDebugLogs() + doneChunkLogCount := 0 + for _, log := range debugLogs { + if strings.Contains(log, "=== modified response chunk: data: [DONE]") { + doneChunkLogCount++ + } + } + require.Equal(t, 1, doneChunkLogCount, "[DONE] should only be emitted once when upstream already sent it") + }) + + // 测试:最后一个 chunk 缺少 SSE 结束空行时,isLastChunk=true 也应正确解析并输出 + t.Run("vertex express mode streaming response body last chunk without terminator", func(t *testing.T) { + host, status := test.NewTestHost(vertexExpressModeConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + + host.CallOnHttpRequestHeaders([][2]string{ + {":authority", "example.com"}, + {":path", "/v1/chat/completions"}, + {":method", "POST"}, + {"Content-Type", "application/json"}, + }) + + requestBody := `{"model":"gemini-2.5-flash","messages":[{"role":"user","content":"test"}],"stream":true}` + host.CallOnHttpRequestBody([]byte(requestBody)) + host.SetProperty([]string{"response", "code_details"}, []byte("via_upstream")) + host.CallOnHttpResponseHeaders([][2]string{ + {":status", "200"}, + {"Content-Type", "text/event-stream"}, + }) + + lastChunkWithoutTerminator := "data: {\"candidates\":[{\"content\":{\"parts\":[{\"text\":\"no terminator\"}],\"role\":\"model\"},\"finishReason\":\"STOP\",\"index\":0}],\"usageMetadata\":{\"promptTokenCount\":2,\"candidatesTokenCount\":3,\"totalTokenCount\":5}}" + action := host.CallOnHttpStreamingResponseBody([]byte(lastChunkWithoutTerminator), true) + require.Equal(t, types.ActionContinue, action) + + transformedResponseBody := host.GetResponseBody() + require.NotNil(t, transformedResponseBody) + responseStr := string(transformedResponseBody) + require.Contains(t, responseStr, "no terminator", "last chunk without terminator should still be parsed") + require.Contains(t, responseStr, "data: [DONE]", "stream should end with [DONE]") }) }) }