Compare commits

...

2 Commits

Author SHA1 Message Date
johnlanni
179a233ad6 refactor(ai-proxy): redesign streaming thinking promotion to buffer-and-flush
Instead of promoting reasoning to content inline per-chunk (which would
emit reasoning as content prematurely if real content arrives later),
the streaming path now buffers reasoning content and strips it from
chunks. On the last chunk, if no content was ever seen, the buffered
reasoning is flushed as a single content chunk.

Also moves tests into test/openai.go TestOpenAI suite and adds
MockHttpContext for provider-level streaming tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 00:05:17 +08:00
johnlanni
bdfe9950ce feat(ai-proxy): add promoteThinkingOnEmpty and hiclawMode config options
When some models (e.g. kimi-k2.5) put user-facing replies into
reasoning_content/thinking blocks without generating text content,
downstream clients receive empty responses. This adds a new
promoteThinkingOnEmpty option that promotes reasoning content to
text content when the response has no text block.

Also adds hiclawMode as a convenience flag that enables both
mergeConsecutiveMessages and promoteThinkingOnEmpty for multi-agent
collaboration scenarios.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 23:28:17 +08:00
6 changed files with 442 additions and 1 deletions

View File

@@ -385,6 +385,8 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin
return chunk
}
promoteThinking := pluginConfig.GetProviderConfig().GetPromoteThinkingOnEmpty()
log.Debugf("[onStreamingResponseBody] provider=%s", activeProvider.GetProviderType())
log.Debugf("[onStreamingResponseBody] isLastChunk=%v chunk: %s", isLastChunk, string(chunk))
@@ -392,6 +394,9 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin
apiName, _ := ctx.GetContext(provider.CtxKeyApiName).(provider.ApiName)
modifiedChunk, err := handler.OnStreamingResponseBody(ctx, apiName, chunk, isLastChunk)
if err == nil && modifiedChunk != nil {
if promoteThinking {
modifiedChunk = promoteThinkingInStreamingChunk(ctx, modifiedChunk, isLastChunk)
}
// Convert to Claude format if needed
claudeChunk, convertErr := convertStreamingResponseToClaude(ctx, modifiedChunk)
if convertErr != nil {
@@ -435,6 +440,10 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin
result := []byte(responseBuilder.String())
if promoteThinking {
result = promoteThinkingInStreamingChunk(ctx, result, isLastChunk)
}
// Convert to Claude format if needed
claudeChunk, convertErr := convertStreamingResponseToClaude(ctx, result)
if convertErr != nil {
@@ -443,11 +452,12 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin
return claudeChunk
}
if !needsClaudeResponseConversion(ctx) {
if !needsClaudeResponseConversion(ctx) && !promoteThinking {
return chunk
}
// If provider doesn't implement any streaming handlers but we need Claude conversion
// or thinking promotion
// First extract complete events from the chunk
events := provider.ExtractStreamingEvents(ctx, chunk)
log.Debugf("[onStreamingResponseBody] %d events received (no handler)", len(events))
@@ -464,6 +474,10 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin
result := []byte(responseBuilder.String())
if promoteThinking {
result = promoteThinkingInStreamingChunk(ctx, result, isLastChunk)
}
// Convert to Claude format if needed
claudeChunk, convertErr := convertStreamingResponseToClaude(ctx, result)
if convertErr != nil {
@@ -496,6 +510,16 @@ func onHttpResponseBody(ctx wrapper.HttpContext, pluginConfig config.PluginConfi
finalBody = body
}
// Promote thinking/reasoning to content when content is empty
if pluginConfig.GetProviderConfig().GetPromoteThinkingOnEmpty() {
promoted, err := provider.PromoteThinkingOnEmptyResponse(finalBody)
if err != nil {
log.Warnf("[promoteThinkingOnEmpty] failed: %v", err)
} else {
finalBody = promoted
}
}
// Convert to Claude format if needed (applies to both branches)
convertedBody, err := convertResponseBodyToClaude(ctx, finalBody)
if err != nil {
@@ -544,6 +568,49 @@ func convertStreamingResponseToClaude(ctx wrapper.HttpContext, data []byte) ([]b
return claudeChunk, nil
}
// promoteThinkingInStreamingChunk processes SSE-formatted streaming data, buffering
// reasoning deltas and stripping them from chunks. On the last chunk, if no content
// was ever seen, it appends a flush chunk that emits buffered reasoning as content.
func promoteThinkingInStreamingChunk(ctx wrapper.HttpContext, data []byte, isLastChunk bool) []byte {
// SSE data contains lines like "data: {...}\n\n"
// We need to find and process each data line
lines := strings.Split(string(data), "\n")
modified := false
for i, line := range lines {
if !strings.HasPrefix(line, "data: ") {
continue
}
payload := strings.TrimPrefix(line, "data: ")
if payload == "[DONE]" || payload == "" {
continue
}
stripped, err := provider.PromoteStreamingThinkingOnEmptyChunk(ctx, []byte(payload))
if err != nil {
continue
}
newLine := "data: " + string(stripped)
if newLine != line {
lines[i] = newLine
modified = true
}
}
result := data
if modified {
result = []byte(strings.Join(lines, "\n"))
}
// On last chunk, flush buffered reasoning as content if no content was seen
if isLastChunk {
flushChunk := provider.PromoteStreamingThinkingFlush(ctx)
if flushChunk != nil {
result = append(flushChunk, result...)
}
}
return result
}
// Helper function to convert OpenAI response body to Claude format
func convertResponseBodyToClaude(ctx wrapper.HttpContext, body []byte) ([]byte, error) {
if !needsClaudeResponseConversion(ctx) {

View File

@@ -133,6 +133,8 @@ func TestOpenAI(t *testing.T) {
test.RunOpenAIOnHttpResponseHeadersTests(t)
test.RunOpenAIOnHttpResponseBodyTests(t)
test.RunOpenAIOnStreamingResponseBodyTests(t)
test.RunOpenAIPromoteThinkingOnEmptyTests(t)
test.RunOpenAIPromoteThinkingOnEmptyStreamingTests(t)
}
func TestQwen(t *testing.T) {

View File

@@ -255,6 +255,70 @@ func (m *chatMessage) handleStreamingReasoningContent(ctx wrapper.HttpContext, r
}
}
// promoteThinkingOnEmpty promotes reasoning_content to content when content is empty.
// This handles models that put user-facing replies into thinking blocks instead of text blocks.
func (r *chatCompletionResponse) promoteThinkingOnEmpty() {
for i := range r.Choices {
msg := r.Choices[i].Message
if msg == nil {
continue
}
if !isContentEmpty(msg.Content) {
continue
}
if msg.ReasoningContent != "" {
msg.Content = msg.ReasoningContent
msg.ReasoningContent = ""
}
}
}
// promoteStreamingThinkingOnEmpty accumulates reasoning content during streaming.
// It strips reasoning from chunks and buffers it. When content is seen, it marks
// the stream as having content so no promotion will happen.
// Call PromoteStreamingThinkingFlush at the end of the stream to emit buffered
// reasoning as content if no content was ever seen.
// Returns true if the chunk was modified (reasoning stripped).
func promoteStreamingThinkingOnEmpty(ctx wrapper.HttpContext, msg *chatMessage) bool {
if msg == nil {
return false
}
hasContentDelta, _ := ctx.GetContext(ctxKeyHasContentDelta).(bool)
if hasContentDelta {
return false
}
if !isContentEmpty(msg.Content) {
ctx.SetContext(ctxKeyHasContentDelta, true)
return false
}
// Buffer reasoning content and strip it from the chunk
reasoning := msg.ReasoningContent
if reasoning == "" {
reasoning = msg.Reasoning
}
if reasoning != "" {
buffered, _ := ctx.GetContext(ctxKeyBufferedReasoning).(string)
ctx.SetContext(ctxKeyBufferedReasoning, buffered+reasoning)
msg.ReasoningContent = ""
msg.Reasoning = ""
return true
}
return false
}
func isContentEmpty(content any) bool {
switch v := content.(type) {
case nil:
return true
case string:
return strings.TrimSpace(v) == ""
default:
return false
}
}
type chatMessageContent struct {
CacheControl map[string]interface{} `json:"cache_control,omitempty"`
Type string `json:"type,omitempty"`
@@ -648,3 +712,87 @@ func (r embeddingsRequest) ParseInput() []string {
}
return input
}
// PromoteThinkingOnEmptyResponse promotes reasoning_content to content in a non-streaming
// response body when content is empty. Returns the original body if no promotion is needed.
func PromoteThinkingOnEmptyResponse(body []byte) ([]byte, error) {
var resp chatCompletionResponse
if err := json.Unmarshal(body, &resp); err != nil {
return body, fmt.Errorf("unable to unmarshal response for thinking promotion: %v", err)
}
promoted := false
for i := range resp.Choices {
msg := resp.Choices[i].Message
if msg == nil {
continue
}
if !isContentEmpty(msg.Content) {
continue
}
if msg.ReasoningContent != "" {
msg.Content = msg.ReasoningContent
msg.ReasoningContent = ""
promoted = true
}
}
if !promoted {
return body, nil
}
return json.Marshal(resp)
}
// PromoteStreamingThinkingOnEmptyChunk buffers reasoning deltas and strips them from
// the chunk during streaming. Call PromoteStreamingThinkingFlush on the last chunk
// to emit buffered reasoning as content if no real content was ever seen.
func PromoteStreamingThinkingOnEmptyChunk(ctx wrapper.HttpContext, data []byte) ([]byte, error) {
var resp chatCompletionResponse
if err := json.Unmarshal(data, &resp); err != nil {
return data, nil // not a valid chat completion chunk, skip
}
modified := false
for i := range resp.Choices {
msg := resp.Choices[i].Delta
if msg == nil {
continue
}
if promoteStreamingThinkingOnEmpty(ctx, msg) {
modified = true
}
}
if !modified {
return data, nil
}
return json.Marshal(resp)
}
// PromoteStreamingThinkingFlush checks if the stream had no content and returns
// an SSE chunk that emits the buffered reasoning as content. Returns nil if
// content was already seen or no reasoning was buffered.
func PromoteStreamingThinkingFlush(ctx wrapper.HttpContext) []byte {
hasContentDelta, _ := ctx.GetContext(ctxKeyHasContentDelta).(bool)
if hasContentDelta {
return nil
}
buffered, _ := ctx.GetContext(ctxKeyBufferedReasoning).(string)
if buffered == "" {
return nil
}
// Build a minimal chat.completion.chunk with the buffered reasoning as content
resp := chatCompletionResponse{
Object: objectChatCompletionChunk,
Choices: []chatCompletionChoice{
{
Index: 0,
Delta: &chatMessage{
Content: buffered,
},
},
},
}
data, err := json.Marshal(resp)
if err != nil {
return nil
}
// Format as SSE
return []byte("data: " + string(data) + "\n\n")
}

View File

@@ -178,6 +178,8 @@ const (
ctxKeyPushedMessage = "pushedMessage"
ctxKeyContentPushed = "contentPushed"
ctxKeyReasoningContentPushed = "reasoningContentPushed"
ctxKeyHasContentDelta = "hasContentDelta"
ctxKeyBufferedReasoning = "bufferedReasoning"
objectChatCompletion = "chat.completion"
objectChatCompletionChunk = "chat.completion.chunk"
@@ -474,6 +476,12 @@ type ProviderConfig struct {
// @Title zh-CN 合并连续同角色消息
// @Description zh-CN 开启后,若请求的 messages 中存在连续的同角色消息(如连续两条 user 消息将其内容合并为一条以满足要求严格轮流交替user→assistant→user→...)的模型服务商的要求。
mergeConsecutiveMessages bool `required:"false" yaml:"mergeConsecutiveMessages" json:"mergeConsecutiveMessages"`
// @Title zh-CN 空内容时提升思考为正文
// @Description zh-CN 开启后,若模型响应只包含 reasoning_content/thinking 而没有正文内容,将 reasoning 内容提升为正文内容返回,避免客户端收到空回复。
promoteThinkingOnEmpty bool `required:"false" yaml:"promoteThinkingOnEmpty" json:"promoteThinkingOnEmpty"`
// @Title zh-CN HiClaw 模式
// @Description zh-CN 开启后同时启用 mergeConsecutiveMessages 和 promoteThinkingOnEmpty适用于 HiClaw 多 Agent 协作场景。
hiclawMode bool `required:"false" yaml:"hiclawMode" json:"hiclawMode"`
}
func (c *ProviderConfig) GetId() string {
@@ -699,6 +707,12 @@ func (c *ProviderConfig) FromJson(json gjson.Result) {
}
}
c.mergeConsecutiveMessages = json.Get("mergeConsecutiveMessages").Bool()
c.promoteThinkingOnEmpty = json.Get("promoteThinkingOnEmpty").Bool()
c.hiclawMode = json.Get("hiclawMode").Bool()
if c.hiclawMode {
c.mergeConsecutiveMessages = true
c.promoteThinkingOnEmpty = true
}
}
func (c *ProviderConfig) Validate() error {
@@ -833,6 +847,10 @@ func (c *ProviderConfig) IsOriginal() bool {
return c.protocol == protocolOriginal
}
func (c *ProviderConfig) GetPromoteThinkingOnEmpty() bool {
return c.promoteThinkingOnEmpty
}
func (c *ProviderConfig) ReplaceByCustomSettings(body []byte) ([]byte, error) {
return ReplaceByCustomSettings(body, c.customSettings)
}

View File

@@ -0,0 +1,50 @@
package test
import "github.com/higress-group/wasm-go/pkg/iface"
// MockHttpContext is a minimal mock for wrapper.HttpContext used in unit tests
// that call provider functions directly (e.g. streaming thinking promotion).
type MockHttpContext struct {
contextMap map[string]interface{}
}
func NewMockHttpContext() *MockHttpContext {
return &MockHttpContext{contextMap: make(map[string]interface{})}
}
func (m *MockHttpContext) SetContext(key string, value interface{}) { m.contextMap[key] = value }
func (m *MockHttpContext) GetContext(key string) interface{} { return m.contextMap[key] }
func (m *MockHttpContext) GetBoolContext(key string, def bool) bool { return def }
func (m *MockHttpContext) GetStringContext(key, def string) string { return def }
func (m *MockHttpContext) GetByteSliceContext(key string, def []byte) []byte { return def }
func (m *MockHttpContext) Scheme() string { return "" }
func (m *MockHttpContext) Host() string { return "" }
func (m *MockHttpContext) Path() string { return "" }
func (m *MockHttpContext) Method() string { return "" }
func (m *MockHttpContext) GetUserAttribute(key string) interface{} { return nil }
func (m *MockHttpContext) SetUserAttribute(key string, value interface{}) {}
func (m *MockHttpContext) SetUserAttributeMap(kvmap map[string]interface{}) {}
func (m *MockHttpContext) GetUserAttributeMap() map[string]interface{} { return nil }
func (m *MockHttpContext) WriteUserAttributeToLog() error { return nil }
func (m *MockHttpContext) WriteUserAttributeToLogWithKey(key string) error { return nil }
func (m *MockHttpContext) WriteUserAttributeToTrace() error { return nil }
func (m *MockHttpContext) DontReadRequestBody() {}
func (m *MockHttpContext) DontReadResponseBody() {}
func (m *MockHttpContext) BufferRequestBody() {}
func (m *MockHttpContext) BufferResponseBody() {}
func (m *MockHttpContext) NeedPauseStreamingResponse() {}
func (m *MockHttpContext) PushBuffer(buffer []byte) {}
func (m *MockHttpContext) PopBuffer() []byte { return nil }
func (m *MockHttpContext) BufferQueueSize() int { return 0 }
func (m *MockHttpContext) DisableReroute() {}
func (m *MockHttpContext) SetRequestBodyBufferLimit(byteSize uint32) {}
func (m *MockHttpContext) SetResponseBodyBufferLimit(byteSize uint32) {}
func (m *MockHttpContext) RouteCall(method, url string, headers [][2]string, body []byte, callback iface.RouteResponseCallback) error {
return nil
}
func (m *MockHttpContext) GetExecutionPhase() iface.HTTPExecutionPhase { return 0 }
func (m *MockHttpContext) HasRequestBody() bool { return false }
func (m *MockHttpContext) HasResponseBody() bool { return false }
func (m *MockHttpContext) IsWebsocket() bool { return false }
func (m *MockHttpContext) IsBinaryRequestBody() bool { return false }
func (m *MockHttpContext) IsBinaryResponseBody() bool { return false }

View File

@@ -5,6 +5,7 @@ import (
"strings"
"testing"
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/provider"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
"github.com/higress-group/wasm-go/pkg/test"
"github.com/stretchr/testify/require"
@@ -997,3 +998,158 @@ func RunOpenAIOnStreamingResponseBodyTests(t *testing.T) {
})
})
}
// 测试配置OpenAI配置 + promoteThinkingOnEmpty
var openAIPromoteThinkingConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"provider": map[string]interface{}{
"type": "openai",
"apiTokens": []string{"sk-openai-test123456789"},
"promoteThinkingOnEmpty": true,
},
})
return data
}()
// 测试配置OpenAI配置 + hiclawMode
var openAIHiclawModeConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"provider": map[string]interface{}{
"type": "openai",
"apiTokens": []string{"sk-openai-test123456789"},
"hiclawMode": true,
},
})
return data
}()
func RunOpenAIPromoteThinkingOnEmptyTests(t *testing.T) {
// Config parsing tests via host framework
test.RunGoTest(t, func(t *testing.T) {
t.Run("promoteThinkingOnEmpty config parses", func(t *testing.T) {
host, status := test.NewTestHost(openAIPromoteThinkingConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
config, err := host.GetMatchConfig()
require.NoError(t, err)
require.NotNil(t, config)
})
t.Run("hiclawMode config parses", func(t *testing.T) {
host, status := test.NewTestHost(openAIHiclawModeConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
config, err := host.GetMatchConfig()
require.NoError(t, err)
require.NotNil(t, config)
})
})
// Non-streaming promote logic tests via provider functions directly
t.Run("promotes reasoning_content when content is empty string", func(t *testing.T) {
body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","content":"","reasoning_content":"这是思考内容"},"finish_reason":"stop"}]}`)
result, err := provider.PromoteThinkingOnEmptyResponse(body)
require.NoError(t, err)
require.Contains(t, string(result), `"content":"这是思考内容"`)
require.NotContains(t, string(result), `"reasoning_content":"这是思考内容"`)
})
t.Run("promotes reasoning_content when content is nil", func(t *testing.T) {
body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","reasoning_content":"思考结果"},"finish_reason":"stop"}]}`)
result, err := provider.PromoteThinkingOnEmptyResponse(body)
require.NoError(t, err)
require.Contains(t, string(result), `"content":"思考结果"`)
})
t.Run("no promotion when content is present", func(t *testing.T) {
body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","content":"正常回复","reasoning_content":"思考过程"},"finish_reason":"stop"}]}`)
result, err := provider.PromoteThinkingOnEmptyResponse(body)
require.NoError(t, err)
require.Equal(t, string(body), string(result))
})
t.Run("no promotion when no reasoning", func(t *testing.T) {
body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","content":"正常回复"},"finish_reason":"stop"}]}`)
result, err := provider.PromoteThinkingOnEmptyResponse(body)
require.NoError(t, err)
require.Equal(t, string(body), string(result))
})
t.Run("no promotion when both empty", func(t *testing.T) {
body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","content":""},"finish_reason":"stop"}]}`)
result, err := provider.PromoteThinkingOnEmptyResponse(body)
require.NoError(t, err)
require.Equal(t, string(body), string(result))
})
t.Run("invalid json returns error", func(t *testing.T) {
body := []byte(`not json`)
result, err := provider.PromoteThinkingOnEmptyResponse(body)
require.Error(t, err)
require.Equal(t, string(body), string(result))
})
}
func RunOpenAIPromoteThinkingOnEmptyStreamingTests(t *testing.T) {
// Streaming tests use provider functions directly since the test framework
// does not expose GetStreamingResponseBody.
t.Run("streaming: buffers reasoning and flushes on end when no content", func(t *testing.T) {
ctx := NewMockHttpContext()
// Chunk with only reasoning_content
data := []byte(`{"choices":[{"index":0,"delta":{"reasoning_content":"流式思考"}}]}`)
result, err := provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data)
require.NoError(t, err)
// Reasoning should be stripped (not promoted inline)
require.NotContains(t, string(result), `"content":"流式思考"`)
// Flush should emit buffered reasoning as content
flush := provider.PromoteStreamingThinkingFlush(ctx)
require.NotNil(t, flush)
require.Contains(t, string(flush), `"content":"流式思考"`)
})
t.Run("streaming: no flush when content was seen", func(t *testing.T) {
ctx := NewMockHttpContext()
// First chunk: content delta
data1 := []byte(`{"choices":[{"index":0,"delta":{"content":"正文"}}]}`)
_, _ = provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data1)
// Second chunk: reasoning only
data2 := []byte(`{"choices":[{"index":0,"delta":{"reasoning_content":"后续思考"}}]}`)
result, err := provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data2)
require.NoError(t, err)
// Should be unchanged since content was already seen
require.Equal(t, string(data2), string(result))
// Flush should return nil since content was seen
flush := provider.PromoteStreamingThinkingFlush(ctx)
require.Nil(t, flush)
})
t.Run("streaming: accumulates multiple reasoning chunks", func(t *testing.T) {
ctx := NewMockHttpContext()
data1 := []byte(`{"choices":[{"index":0,"delta":{"reasoning_content":"第一段"}}]}`)
_, _ = provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data1)
data2 := []byte(`{"choices":[{"index":0,"delta":{"reasoning_content":"第二段"}}]}`)
_, _ = provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data2)
flush := provider.PromoteStreamingThinkingFlush(ctx)
require.NotNil(t, flush)
require.Contains(t, string(flush), `"content":"第一段第二段"`)
})
t.Run("streaming: no flush when no reasoning buffered", func(t *testing.T) {
ctx := NewMockHttpContext()
flush := provider.PromoteStreamingThinkingFlush(ctx)
require.Nil(t, flush)
})
t.Run("streaming: invalid json returns original", func(t *testing.T) {
ctx := NewMockHttpContext()
data := []byte(`not json`)
result, err := provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data)
require.NoError(t, err)
require.Equal(t, string(data), string(result))
})
}