diff --git a/plugins/wasm-go/extensions/ai-proxy/main.go b/plugins/wasm-go/extensions/ai-proxy/main.go index 1e2e574aa..dc34346c9 100644 --- a/plugins/wasm-go/extensions/ai-proxy/main.go +++ b/plugins/wasm-go/extensions/ai-proxy/main.go @@ -385,6 +385,8 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin return chunk } + promoteThinking := pluginConfig.GetProviderConfig().GetPromoteThinkingOnEmpty() + log.Debugf("[onStreamingResponseBody] provider=%s", activeProvider.GetProviderType()) log.Debugf("[onStreamingResponseBody] isLastChunk=%v chunk: %s", isLastChunk, string(chunk)) @@ -392,6 +394,9 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin apiName, _ := ctx.GetContext(provider.CtxKeyApiName).(provider.ApiName) modifiedChunk, err := handler.OnStreamingResponseBody(ctx, apiName, chunk, isLastChunk) if err == nil && modifiedChunk != nil { + if promoteThinking { + modifiedChunk = promoteThinkingInStreamingChunk(ctx, modifiedChunk, isLastChunk) + } // Convert to Claude format if needed claudeChunk, convertErr := convertStreamingResponseToClaude(ctx, modifiedChunk) if convertErr != nil { @@ -435,6 +440,10 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin result := []byte(responseBuilder.String()) + if promoteThinking { + result = promoteThinkingInStreamingChunk(ctx, result, isLastChunk) + } + // Convert to Claude format if needed claudeChunk, convertErr := convertStreamingResponseToClaude(ctx, result) if convertErr != nil { @@ -443,11 +452,12 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin return claudeChunk } - if !needsClaudeResponseConversion(ctx) { + if !needsClaudeResponseConversion(ctx) && !promoteThinking { return chunk } // If provider doesn't implement any streaming handlers but we need Claude conversion + // or thinking promotion // First extract complete events from the chunk events := provider.ExtractStreamingEvents(ctx, chunk) log.Debugf("[onStreamingResponseBody] %d events received (no handler)", len(events)) @@ -464,6 +474,10 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin result := []byte(responseBuilder.String()) + if promoteThinking { + result = promoteThinkingInStreamingChunk(ctx, result, isLastChunk) + } + // Convert to Claude format if needed claudeChunk, convertErr := convertStreamingResponseToClaude(ctx, result) if convertErr != nil { @@ -496,6 +510,16 @@ func onHttpResponseBody(ctx wrapper.HttpContext, pluginConfig config.PluginConfi finalBody = body } + // Promote thinking/reasoning to content when content is empty + if pluginConfig.GetProviderConfig().GetPromoteThinkingOnEmpty() { + promoted, err := provider.PromoteThinkingOnEmptyResponse(finalBody) + if err != nil { + log.Warnf("[promoteThinkingOnEmpty] failed: %v", err) + } else { + finalBody = promoted + } + } + // Convert to Claude format if needed (applies to both branches) convertedBody, err := convertResponseBodyToClaude(ctx, finalBody) if err != nil { @@ -544,6 +568,49 @@ func convertStreamingResponseToClaude(ctx wrapper.HttpContext, data []byte) ([]b return claudeChunk, nil } +// promoteThinkingInStreamingChunk processes SSE-formatted streaming data, buffering +// reasoning deltas and stripping them from chunks. On the last chunk, if no content +// was ever seen, it appends a flush chunk that emits buffered reasoning as content. +func promoteThinkingInStreamingChunk(ctx wrapper.HttpContext, data []byte, isLastChunk bool) []byte { + // SSE data contains lines like "data: {...}\n\n" + // We need to find and process each data line + lines := strings.Split(string(data), "\n") + modified := false + for i, line := range lines { + if !strings.HasPrefix(line, "data: ") { + continue + } + payload := strings.TrimPrefix(line, "data: ") + if payload == "[DONE]" || payload == "" { + continue + } + stripped, err := provider.PromoteStreamingThinkingOnEmptyChunk(ctx, []byte(payload)) + if err != nil { + continue + } + newLine := "data: " + string(stripped) + if newLine != line { + lines[i] = newLine + modified = true + } + } + + result := data + if modified { + result = []byte(strings.Join(lines, "\n")) + } + + // On last chunk, flush buffered reasoning as content if no content was seen + if isLastChunk { + flushChunk := provider.PromoteStreamingThinkingFlush(ctx) + if flushChunk != nil { + result = append(flushChunk, result...) + } + } + + return result +} + // Helper function to convert OpenAI response body to Claude format func convertResponseBodyToClaude(ctx wrapper.HttpContext, body []byte) ([]byte, error) { if !needsClaudeResponseConversion(ctx) { diff --git a/plugins/wasm-go/extensions/ai-proxy/main_test.go b/plugins/wasm-go/extensions/ai-proxy/main_test.go index e3ef5842f..bd7a421f7 100644 --- a/plugins/wasm-go/extensions/ai-proxy/main_test.go +++ b/plugins/wasm-go/extensions/ai-proxy/main_test.go @@ -133,6 +133,8 @@ func TestOpenAI(t *testing.T) { test.RunOpenAIOnHttpResponseHeadersTests(t) test.RunOpenAIOnHttpResponseBodyTests(t) test.RunOpenAIOnStreamingResponseBodyTests(t) + test.RunOpenAIPromoteThinkingOnEmptyTests(t) + test.RunOpenAIPromoteThinkingOnEmptyStreamingTests(t) } func TestQwen(t *testing.T) { diff --git a/plugins/wasm-go/extensions/ai-proxy/provider/model.go b/plugins/wasm-go/extensions/ai-proxy/provider/model.go index 881e4cbc4..8f951f543 100644 --- a/plugins/wasm-go/extensions/ai-proxy/provider/model.go +++ b/plugins/wasm-go/extensions/ai-proxy/provider/model.go @@ -255,6 +255,70 @@ func (m *chatMessage) handleStreamingReasoningContent(ctx wrapper.HttpContext, r } } +// promoteThinkingOnEmpty promotes reasoning_content to content when content is empty. +// This handles models that put user-facing replies into thinking blocks instead of text blocks. +func (r *chatCompletionResponse) promoteThinkingOnEmpty() { + for i := range r.Choices { + msg := r.Choices[i].Message + if msg == nil { + continue + } + if !isContentEmpty(msg.Content) { + continue + } + if msg.ReasoningContent != "" { + msg.Content = msg.ReasoningContent + msg.ReasoningContent = "" + } + } +} + +// promoteStreamingThinkingOnEmpty accumulates reasoning content during streaming. +// It strips reasoning from chunks and buffers it. When content is seen, it marks +// the stream as having content so no promotion will happen. +// Call PromoteStreamingThinkingFlush at the end of the stream to emit buffered +// reasoning as content if no content was ever seen. +// Returns true if the chunk was modified (reasoning stripped). +func promoteStreamingThinkingOnEmpty(ctx wrapper.HttpContext, msg *chatMessage) bool { + if msg == nil { + return false + } + hasContentDelta, _ := ctx.GetContext(ctxKeyHasContentDelta).(bool) + if hasContentDelta { + return false + } + + if !isContentEmpty(msg.Content) { + ctx.SetContext(ctxKeyHasContentDelta, true) + return false + } + + // Buffer reasoning content and strip it from the chunk + reasoning := msg.ReasoningContent + if reasoning == "" { + reasoning = msg.Reasoning + } + if reasoning != "" { + buffered, _ := ctx.GetContext(ctxKeyBufferedReasoning).(string) + ctx.SetContext(ctxKeyBufferedReasoning, buffered+reasoning) + msg.ReasoningContent = "" + msg.Reasoning = "" + return true + } + return false +} + +func isContentEmpty(content any) bool { + switch v := content.(type) { + case nil: + return true + case string: + return strings.TrimSpace(v) == "" + default: + return false + } +} + type chatMessageContent struct { CacheControl map[string]interface{} `json:"cache_control,omitempty"` Type string `json:"type,omitempty"` @@ -648,3 +712,87 @@ func (r embeddingsRequest) ParseInput() []string { } return input } + +// PromoteThinkingOnEmptyResponse promotes reasoning_content to content in a non-streaming +// response body when content is empty. Returns the original body if no promotion is needed. +func PromoteThinkingOnEmptyResponse(body []byte) ([]byte, error) { + var resp chatCompletionResponse + if err := json.Unmarshal(body, &resp); err != nil { + return body, fmt.Errorf("unable to unmarshal response for thinking promotion: %v", err) + } + promoted := false + for i := range resp.Choices { + msg := resp.Choices[i].Message + if msg == nil { + continue + } + if !isContentEmpty(msg.Content) { + continue + } + if msg.ReasoningContent != "" { + msg.Content = msg.ReasoningContent + msg.ReasoningContent = "" + promoted = true + } + } + if !promoted { + return body, nil + } + return json.Marshal(resp) +} + +// PromoteStreamingThinkingOnEmptyChunk buffers reasoning deltas and strips them from +// the chunk during streaming. Call PromoteStreamingThinkingFlush on the last chunk +// to emit buffered reasoning as content if no real content was ever seen. +func PromoteStreamingThinkingOnEmptyChunk(ctx wrapper.HttpContext, data []byte) ([]byte, error) { + var resp chatCompletionResponse + if err := json.Unmarshal(data, &resp); err != nil { + return data, nil // not a valid chat completion chunk, skip + } + modified := false + for i := range resp.Choices { + msg := resp.Choices[i].Delta + if msg == nil { + continue + } + if promoteStreamingThinkingOnEmpty(ctx, msg) { + modified = true + } + } + if !modified { + return data, nil + } + return json.Marshal(resp) +} + +// PromoteStreamingThinkingFlush checks if the stream had no content and returns +// an SSE chunk that emits the buffered reasoning as content. Returns nil if +// content was already seen or no reasoning was buffered. +func PromoteStreamingThinkingFlush(ctx wrapper.HttpContext) []byte { + hasContentDelta, _ := ctx.GetContext(ctxKeyHasContentDelta).(bool) + if hasContentDelta { + return nil + } + buffered, _ := ctx.GetContext(ctxKeyBufferedReasoning).(string) + if buffered == "" { + return nil + } + // Build a minimal chat.completion.chunk with the buffered reasoning as content + resp := chatCompletionResponse{ + Object: objectChatCompletionChunk, + Choices: []chatCompletionChoice{ + { + Index: 0, + Delta: &chatMessage{ + Content: buffered, + }, + }, + }, + } + data, err := json.Marshal(resp) + if err != nil { + return nil + } + // Format as SSE + return []byte("data: " + string(data) + "\n\n") +} diff --git a/plugins/wasm-go/extensions/ai-proxy/provider/provider.go b/plugins/wasm-go/extensions/ai-proxy/provider/provider.go index fec47e148..54e9203e4 100644 --- a/plugins/wasm-go/extensions/ai-proxy/provider/provider.go +++ b/plugins/wasm-go/extensions/ai-proxy/provider/provider.go @@ -178,6 +178,8 @@ const ( ctxKeyPushedMessage = "pushedMessage" ctxKeyContentPushed = "contentPushed" ctxKeyReasoningContentPushed = "reasoningContentPushed" + ctxKeyHasContentDelta = "hasContentDelta" + ctxKeyBufferedReasoning = "bufferedReasoning" objectChatCompletion = "chat.completion" objectChatCompletionChunk = "chat.completion.chunk" @@ -474,6 +476,12 @@ type ProviderConfig struct { // @Title zh-CN 合并连续同角色消息 // @Description zh-CN 开启后,若请求的 messages 中存在连续的同角色消息(如连续两条 user 消息),将其内容合并为一条,以满足要求严格轮流交替(user→assistant→user→...)的模型服务商的要求。 mergeConsecutiveMessages bool `required:"false" yaml:"mergeConsecutiveMessages" json:"mergeConsecutiveMessages"` + // @Title zh-CN 空内容时提升思考为正文 + // @Description zh-CN 开启后,若模型响应只包含 reasoning_content/thinking 而没有正文内容,将 reasoning 内容提升为正文内容返回,避免客户端收到空回复。 + promoteThinkingOnEmpty bool `required:"false" yaml:"promoteThinkingOnEmpty" json:"promoteThinkingOnEmpty"` + // @Title zh-CN HiClaw 模式 + // @Description zh-CN 开启后同时启用 mergeConsecutiveMessages 和 promoteThinkingOnEmpty,适用于 HiClaw 多 Agent 协作场景。 + hiclawMode bool `required:"false" yaml:"hiclawMode" json:"hiclawMode"` } func (c *ProviderConfig) GetId() string { @@ -699,6 +707,12 @@ func (c *ProviderConfig) FromJson(json gjson.Result) { } } c.mergeConsecutiveMessages = json.Get("mergeConsecutiveMessages").Bool() + c.promoteThinkingOnEmpty = json.Get("promoteThinkingOnEmpty").Bool() + c.hiclawMode = json.Get("hiclawMode").Bool() + if c.hiclawMode { + c.mergeConsecutiveMessages = true + c.promoteThinkingOnEmpty = true + } } func (c *ProviderConfig) Validate() error { @@ -833,6 +847,10 @@ func (c *ProviderConfig) IsOriginal() bool { return c.protocol == protocolOriginal } +func (c *ProviderConfig) GetPromoteThinkingOnEmpty() bool { + return c.promoteThinkingOnEmpty +} + func (c *ProviderConfig) ReplaceByCustomSettings(body []byte) ([]byte, error) { return ReplaceByCustomSettings(body, c.customSettings) } diff --git a/plugins/wasm-go/extensions/ai-proxy/test/mock_context.go b/plugins/wasm-go/extensions/ai-proxy/test/mock_context.go new file mode 100644 index 000000000..8d48af7e3 --- /dev/null +++ b/plugins/wasm-go/extensions/ai-proxy/test/mock_context.go @@ -0,0 +1,50 @@ +package test + +import "github.com/higress-group/wasm-go/pkg/iface" + +// MockHttpContext is a minimal mock for wrapper.HttpContext used in unit tests +// that call provider functions directly (e.g. streaming thinking promotion). +type MockHttpContext struct { + contextMap map[string]interface{} +} + +func NewMockHttpContext() *MockHttpContext { + return &MockHttpContext{contextMap: make(map[string]interface{})} +} + +func (m *MockHttpContext) SetContext(key string, value interface{}) { m.contextMap[key] = value } +func (m *MockHttpContext) GetContext(key string) interface{} { return m.contextMap[key] } +func (m *MockHttpContext) GetBoolContext(key string, def bool) bool { return def } +func (m *MockHttpContext) GetStringContext(key, def string) string { return def } +func (m *MockHttpContext) GetByteSliceContext(key string, def []byte) []byte { return def } +func (m *MockHttpContext) Scheme() string { return "" } +func (m *MockHttpContext) Host() string { return "" } +func (m *MockHttpContext) Path() string { return "" } +func (m *MockHttpContext) Method() string { return "" } +func (m *MockHttpContext) GetUserAttribute(key string) interface{} { return nil } +func (m *MockHttpContext) SetUserAttribute(key string, value interface{}) {} +func (m *MockHttpContext) SetUserAttributeMap(kvmap map[string]interface{}) {} +func (m *MockHttpContext) GetUserAttributeMap() map[string]interface{} { return nil } +func (m *MockHttpContext) WriteUserAttributeToLog() error { return nil } +func (m *MockHttpContext) WriteUserAttributeToLogWithKey(key string) error { return nil } +func (m *MockHttpContext) WriteUserAttributeToTrace() error { return nil } +func (m *MockHttpContext) DontReadRequestBody() {} +func (m *MockHttpContext) DontReadResponseBody() {} +func (m *MockHttpContext) BufferRequestBody() {} +func (m *MockHttpContext) BufferResponseBody() {} +func (m *MockHttpContext) NeedPauseStreamingResponse() {} +func (m *MockHttpContext) PushBuffer(buffer []byte) {} +func (m *MockHttpContext) PopBuffer() []byte { return nil } +func (m *MockHttpContext) BufferQueueSize() int { return 0 } +func (m *MockHttpContext) DisableReroute() {} +func (m *MockHttpContext) SetRequestBodyBufferLimit(byteSize uint32) {} +func (m *MockHttpContext) SetResponseBodyBufferLimit(byteSize uint32) {} +func (m *MockHttpContext) RouteCall(method, url string, headers [][2]string, body []byte, callback iface.RouteResponseCallback) error { + return nil +} +func (m *MockHttpContext) GetExecutionPhase() iface.HTTPExecutionPhase { return 0 } +func (m *MockHttpContext) HasRequestBody() bool { return false } +func (m *MockHttpContext) HasResponseBody() bool { return false } +func (m *MockHttpContext) IsWebsocket() bool { return false } +func (m *MockHttpContext) IsBinaryRequestBody() bool { return false } +func (m *MockHttpContext) IsBinaryResponseBody() bool { return false } diff --git a/plugins/wasm-go/extensions/ai-proxy/test/openai.go b/plugins/wasm-go/extensions/ai-proxy/test/openai.go index 2f72fabb0..9c5d0562f 100644 --- a/plugins/wasm-go/extensions/ai-proxy/test/openai.go +++ b/plugins/wasm-go/extensions/ai-proxy/test/openai.go @@ -5,6 +5,7 @@ import ( "strings" "testing" + "github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/provider" "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" "github.com/higress-group/wasm-go/pkg/test" "github.com/stretchr/testify/require" @@ -997,3 +998,158 @@ func RunOpenAIOnStreamingResponseBodyTests(t *testing.T) { }) }) } + +// 测试配置:OpenAI配置 + promoteThinkingOnEmpty +var openAIPromoteThinkingConfig = func() json.RawMessage { + data, _ := json.Marshal(map[string]interface{}{ + "provider": map[string]interface{}{ + "type": "openai", + "apiTokens": []string{"sk-openai-test123456789"}, + "promoteThinkingOnEmpty": true, + }, + }) + return data +}() + +// 测试配置:OpenAI配置 + hiclawMode +var openAIHiclawModeConfig = func() json.RawMessage { + data, _ := json.Marshal(map[string]interface{}{ + "provider": map[string]interface{}{ + "type": "openai", + "apiTokens": []string{"sk-openai-test123456789"}, + "hiclawMode": true, + }, + }) + return data +}() + +func RunOpenAIPromoteThinkingOnEmptyTests(t *testing.T) { + // Config parsing tests via host framework + test.RunGoTest(t, func(t *testing.T) { + t.Run("promoteThinkingOnEmpty config parses", func(t *testing.T) { + host, status := test.NewTestHost(openAIPromoteThinkingConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + config, err := host.GetMatchConfig() + require.NoError(t, err) + require.NotNil(t, config) + }) + + t.Run("hiclawMode config parses", func(t *testing.T) { + host, status := test.NewTestHost(openAIHiclawModeConfig) + defer host.Reset() + require.Equal(t, types.OnPluginStartStatusOK, status) + config, err := host.GetMatchConfig() + require.NoError(t, err) + require.NotNil(t, config) + }) + }) + + // Non-streaming promote logic tests via provider functions directly + t.Run("promotes reasoning_content when content is empty string", func(t *testing.T) { + body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","content":"","reasoning_content":"这是思考内容"},"finish_reason":"stop"}]}`) + result, err := provider.PromoteThinkingOnEmptyResponse(body) + require.NoError(t, err) + require.Contains(t, string(result), `"content":"这是思考内容"`) + require.NotContains(t, string(result), `"reasoning_content":"这是思考内容"`) + }) + + t.Run("promotes reasoning_content when content is nil", func(t *testing.T) { + body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","reasoning_content":"思考结果"},"finish_reason":"stop"}]}`) + result, err := provider.PromoteThinkingOnEmptyResponse(body) + require.NoError(t, err) + require.Contains(t, string(result), `"content":"思考结果"`) + }) + + t.Run("no promotion when content is present", func(t *testing.T) { + body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","content":"正常回复","reasoning_content":"思考过程"},"finish_reason":"stop"}]}`) + result, err := provider.PromoteThinkingOnEmptyResponse(body) + require.NoError(t, err) + require.Equal(t, string(body), string(result)) + }) + + t.Run("no promotion when no reasoning", func(t *testing.T) { + body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","content":"正常回复"},"finish_reason":"stop"}]}`) + result, err := provider.PromoteThinkingOnEmptyResponse(body) + require.NoError(t, err) + require.Equal(t, string(body), string(result)) + }) + + t.Run("no promotion when both empty", func(t *testing.T) { + body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","content":""},"finish_reason":"stop"}]}`) + result, err := provider.PromoteThinkingOnEmptyResponse(body) + require.NoError(t, err) + require.Equal(t, string(body), string(result)) + }) + + t.Run("invalid json returns error", func(t *testing.T) { + body := []byte(`not json`) + result, err := provider.PromoteThinkingOnEmptyResponse(body) + require.Error(t, err) + require.Equal(t, string(body), string(result)) + }) +} + +func RunOpenAIPromoteThinkingOnEmptyStreamingTests(t *testing.T) { + // Streaming tests use provider functions directly since the test framework + // does not expose GetStreamingResponseBody. + t.Run("streaming: buffers reasoning and flushes on end when no content", func(t *testing.T) { + ctx := NewMockHttpContext() + // Chunk with only reasoning_content + data := []byte(`{"choices":[{"index":0,"delta":{"reasoning_content":"流式思考"}}]}`) + result, err := provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data) + require.NoError(t, err) + // Reasoning should be stripped (not promoted inline) + require.NotContains(t, string(result), `"content":"流式思考"`) + + // Flush should emit buffered reasoning as content + flush := provider.PromoteStreamingThinkingFlush(ctx) + require.NotNil(t, flush) + require.Contains(t, string(flush), `"content":"流式思考"`) + }) + + t.Run("streaming: no flush when content was seen", func(t *testing.T) { + ctx := NewMockHttpContext() + // First chunk: content delta + data1 := []byte(`{"choices":[{"index":0,"delta":{"content":"正文"}}]}`) + _, _ = provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data1) + + // Second chunk: reasoning only + data2 := []byte(`{"choices":[{"index":0,"delta":{"reasoning_content":"后续思考"}}]}`) + result, err := provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data2) + require.NoError(t, err) + // Should be unchanged since content was already seen + require.Equal(t, string(data2), string(result)) + + // Flush should return nil since content was seen + flush := provider.PromoteStreamingThinkingFlush(ctx) + require.Nil(t, flush) + }) + + t.Run("streaming: accumulates multiple reasoning chunks", func(t *testing.T) { + ctx := NewMockHttpContext() + data1 := []byte(`{"choices":[{"index":0,"delta":{"reasoning_content":"第一段"}}]}`) + _, _ = provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data1) + + data2 := []byte(`{"choices":[{"index":0,"delta":{"reasoning_content":"第二段"}}]}`) + _, _ = provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data2) + + flush := provider.PromoteStreamingThinkingFlush(ctx) + require.NotNil(t, flush) + require.Contains(t, string(flush), `"content":"第一段第二段"`) + }) + + t.Run("streaming: no flush when no reasoning buffered", func(t *testing.T) { + ctx := NewMockHttpContext() + flush := provider.PromoteStreamingThinkingFlush(ctx) + require.Nil(t, flush) + }) + + t.Run("streaming: invalid json returns original", func(t *testing.T) { + ctx := NewMockHttpContext() + data := []byte(`not json`) + result, err := provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data) + require.NoError(t, err) + require.Equal(t, string(data), string(result)) + }) +}