mirror of
https://github.com/alibaba/higress.git
synced 2026-03-20 02:07:27 +08:00
Compare commits
2 Commits
main
...
feat/ai-pr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
179a233ad6 | ||
|
|
bdfe9950ce |
@@ -385,6 +385,8 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin
|
||||
return chunk
|
||||
}
|
||||
|
||||
promoteThinking := pluginConfig.GetProviderConfig().GetPromoteThinkingOnEmpty()
|
||||
|
||||
log.Debugf("[onStreamingResponseBody] provider=%s", activeProvider.GetProviderType())
|
||||
log.Debugf("[onStreamingResponseBody] isLastChunk=%v chunk: %s", isLastChunk, string(chunk))
|
||||
|
||||
@@ -392,6 +394,9 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin
|
||||
apiName, _ := ctx.GetContext(provider.CtxKeyApiName).(provider.ApiName)
|
||||
modifiedChunk, err := handler.OnStreamingResponseBody(ctx, apiName, chunk, isLastChunk)
|
||||
if err == nil && modifiedChunk != nil {
|
||||
if promoteThinking {
|
||||
modifiedChunk = promoteThinkingInStreamingChunk(ctx, modifiedChunk, isLastChunk)
|
||||
}
|
||||
// Convert to Claude format if needed
|
||||
claudeChunk, convertErr := convertStreamingResponseToClaude(ctx, modifiedChunk)
|
||||
if convertErr != nil {
|
||||
@@ -435,6 +440,10 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin
|
||||
|
||||
result := []byte(responseBuilder.String())
|
||||
|
||||
if promoteThinking {
|
||||
result = promoteThinkingInStreamingChunk(ctx, result, isLastChunk)
|
||||
}
|
||||
|
||||
// Convert to Claude format if needed
|
||||
claudeChunk, convertErr := convertStreamingResponseToClaude(ctx, result)
|
||||
if convertErr != nil {
|
||||
@@ -443,11 +452,12 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin
|
||||
return claudeChunk
|
||||
}
|
||||
|
||||
if !needsClaudeResponseConversion(ctx) {
|
||||
if !needsClaudeResponseConversion(ctx) && !promoteThinking {
|
||||
return chunk
|
||||
}
|
||||
|
||||
// If provider doesn't implement any streaming handlers but we need Claude conversion
|
||||
// or thinking promotion
|
||||
// First extract complete events from the chunk
|
||||
events := provider.ExtractStreamingEvents(ctx, chunk)
|
||||
log.Debugf("[onStreamingResponseBody] %d events received (no handler)", len(events))
|
||||
@@ -464,6 +474,10 @@ func onStreamingResponseBody(ctx wrapper.HttpContext, pluginConfig config.Plugin
|
||||
|
||||
result := []byte(responseBuilder.String())
|
||||
|
||||
if promoteThinking {
|
||||
result = promoteThinkingInStreamingChunk(ctx, result, isLastChunk)
|
||||
}
|
||||
|
||||
// Convert to Claude format if needed
|
||||
claudeChunk, convertErr := convertStreamingResponseToClaude(ctx, result)
|
||||
if convertErr != nil {
|
||||
@@ -496,6 +510,16 @@ func onHttpResponseBody(ctx wrapper.HttpContext, pluginConfig config.PluginConfi
|
||||
finalBody = body
|
||||
}
|
||||
|
||||
// Promote thinking/reasoning to content when content is empty
|
||||
if pluginConfig.GetProviderConfig().GetPromoteThinkingOnEmpty() {
|
||||
promoted, err := provider.PromoteThinkingOnEmptyResponse(finalBody)
|
||||
if err != nil {
|
||||
log.Warnf("[promoteThinkingOnEmpty] failed: %v", err)
|
||||
} else {
|
||||
finalBody = promoted
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to Claude format if needed (applies to both branches)
|
||||
convertedBody, err := convertResponseBodyToClaude(ctx, finalBody)
|
||||
if err != nil {
|
||||
@@ -544,6 +568,49 @@ func convertStreamingResponseToClaude(ctx wrapper.HttpContext, data []byte) ([]b
|
||||
return claudeChunk, nil
|
||||
}
|
||||
|
||||
// promoteThinkingInStreamingChunk processes SSE-formatted streaming data, buffering
|
||||
// reasoning deltas and stripping them from chunks. On the last chunk, if no content
|
||||
// was ever seen, it appends a flush chunk that emits buffered reasoning as content.
|
||||
func promoteThinkingInStreamingChunk(ctx wrapper.HttpContext, data []byte, isLastChunk bool) []byte {
|
||||
// SSE data contains lines like "data: {...}\n\n"
|
||||
// We need to find and process each data line
|
||||
lines := strings.Split(string(data), "\n")
|
||||
modified := false
|
||||
for i, line := range lines {
|
||||
if !strings.HasPrefix(line, "data: ") {
|
||||
continue
|
||||
}
|
||||
payload := strings.TrimPrefix(line, "data: ")
|
||||
if payload == "[DONE]" || payload == "" {
|
||||
continue
|
||||
}
|
||||
stripped, err := provider.PromoteStreamingThinkingOnEmptyChunk(ctx, []byte(payload))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
newLine := "data: " + string(stripped)
|
||||
if newLine != line {
|
||||
lines[i] = newLine
|
||||
modified = true
|
||||
}
|
||||
}
|
||||
|
||||
result := data
|
||||
if modified {
|
||||
result = []byte(strings.Join(lines, "\n"))
|
||||
}
|
||||
|
||||
// On last chunk, flush buffered reasoning as content if no content was seen
|
||||
if isLastChunk {
|
||||
flushChunk := provider.PromoteStreamingThinkingFlush(ctx)
|
||||
if flushChunk != nil {
|
||||
result = append(flushChunk, result...)
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// Helper function to convert OpenAI response body to Claude format
|
||||
func convertResponseBodyToClaude(ctx wrapper.HttpContext, body []byte) ([]byte, error) {
|
||||
if !needsClaudeResponseConversion(ctx) {
|
||||
|
||||
@@ -133,6 +133,8 @@ func TestOpenAI(t *testing.T) {
|
||||
test.RunOpenAIOnHttpResponseHeadersTests(t)
|
||||
test.RunOpenAIOnHttpResponseBodyTests(t)
|
||||
test.RunOpenAIOnStreamingResponseBodyTests(t)
|
||||
test.RunOpenAIPromoteThinkingOnEmptyTests(t)
|
||||
test.RunOpenAIPromoteThinkingOnEmptyStreamingTests(t)
|
||||
}
|
||||
|
||||
func TestQwen(t *testing.T) {
|
||||
|
||||
@@ -255,6 +255,70 @@ func (m *chatMessage) handleStreamingReasoningContent(ctx wrapper.HttpContext, r
|
||||
}
|
||||
}
|
||||
|
||||
// promoteThinkingOnEmpty promotes reasoning_content to content when content is empty.
|
||||
// This handles models that put user-facing replies into thinking blocks instead of text blocks.
|
||||
func (r *chatCompletionResponse) promoteThinkingOnEmpty() {
|
||||
for i := range r.Choices {
|
||||
msg := r.Choices[i].Message
|
||||
if msg == nil {
|
||||
continue
|
||||
}
|
||||
if !isContentEmpty(msg.Content) {
|
||||
continue
|
||||
}
|
||||
if msg.ReasoningContent != "" {
|
||||
msg.Content = msg.ReasoningContent
|
||||
msg.ReasoningContent = ""
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// promoteStreamingThinkingOnEmpty accumulates reasoning content during streaming.
|
||||
// It strips reasoning from chunks and buffers it. When content is seen, it marks
|
||||
// the stream as having content so no promotion will happen.
|
||||
// Call PromoteStreamingThinkingFlush at the end of the stream to emit buffered
|
||||
// reasoning as content if no content was ever seen.
|
||||
// Returns true if the chunk was modified (reasoning stripped).
|
||||
func promoteStreamingThinkingOnEmpty(ctx wrapper.HttpContext, msg *chatMessage) bool {
|
||||
if msg == nil {
|
||||
return false
|
||||
}
|
||||
hasContentDelta, _ := ctx.GetContext(ctxKeyHasContentDelta).(bool)
|
||||
if hasContentDelta {
|
||||
return false
|
||||
}
|
||||
|
||||
if !isContentEmpty(msg.Content) {
|
||||
ctx.SetContext(ctxKeyHasContentDelta, true)
|
||||
return false
|
||||
}
|
||||
|
||||
// Buffer reasoning content and strip it from the chunk
|
||||
reasoning := msg.ReasoningContent
|
||||
if reasoning == "" {
|
||||
reasoning = msg.Reasoning
|
||||
}
|
||||
if reasoning != "" {
|
||||
buffered, _ := ctx.GetContext(ctxKeyBufferedReasoning).(string)
|
||||
ctx.SetContext(ctxKeyBufferedReasoning, buffered+reasoning)
|
||||
msg.ReasoningContent = ""
|
||||
msg.Reasoning = ""
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isContentEmpty(content any) bool {
|
||||
switch v := content.(type) {
|
||||
case nil:
|
||||
return true
|
||||
case string:
|
||||
return strings.TrimSpace(v) == ""
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
type chatMessageContent struct {
|
||||
CacheControl map[string]interface{} `json:"cache_control,omitempty"`
|
||||
Type string `json:"type,omitempty"`
|
||||
@@ -648,3 +712,87 @@ func (r embeddingsRequest) ParseInput() []string {
|
||||
}
|
||||
return input
|
||||
}
|
||||
|
||||
// PromoteThinkingOnEmptyResponse promotes reasoning_content to content in a non-streaming
|
||||
// response body when content is empty. Returns the original body if no promotion is needed.
|
||||
func PromoteThinkingOnEmptyResponse(body []byte) ([]byte, error) {
|
||||
var resp chatCompletionResponse
|
||||
if err := json.Unmarshal(body, &resp); err != nil {
|
||||
return body, fmt.Errorf("unable to unmarshal response for thinking promotion: %v", err)
|
||||
}
|
||||
promoted := false
|
||||
for i := range resp.Choices {
|
||||
msg := resp.Choices[i].Message
|
||||
if msg == nil {
|
||||
continue
|
||||
}
|
||||
if !isContentEmpty(msg.Content) {
|
||||
continue
|
||||
}
|
||||
if msg.ReasoningContent != "" {
|
||||
msg.Content = msg.ReasoningContent
|
||||
msg.ReasoningContent = ""
|
||||
promoted = true
|
||||
}
|
||||
}
|
||||
if !promoted {
|
||||
return body, nil
|
||||
}
|
||||
return json.Marshal(resp)
|
||||
}
|
||||
|
||||
// PromoteStreamingThinkingOnEmptyChunk buffers reasoning deltas and strips them from
|
||||
// the chunk during streaming. Call PromoteStreamingThinkingFlush on the last chunk
|
||||
// to emit buffered reasoning as content if no real content was ever seen.
|
||||
func PromoteStreamingThinkingOnEmptyChunk(ctx wrapper.HttpContext, data []byte) ([]byte, error) {
|
||||
var resp chatCompletionResponse
|
||||
if err := json.Unmarshal(data, &resp); err != nil {
|
||||
return data, nil // not a valid chat completion chunk, skip
|
||||
}
|
||||
modified := false
|
||||
for i := range resp.Choices {
|
||||
msg := resp.Choices[i].Delta
|
||||
if msg == nil {
|
||||
continue
|
||||
}
|
||||
if promoteStreamingThinkingOnEmpty(ctx, msg) {
|
||||
modified = true
|
||||
}
|
||||
}
|
||||
if !modified {
|
||||
return data, nil
|
||||
}
|
||||
return json.Marshal(resp)
|
||||
}
|
||||
|
||||
// PromoteStreamingThinkingFlush checks if the stream had no content and returns
|
||||
// an SSE chunk that emits the buffered reasoning as content. Returns nil if
|
||||
// content was already seen or no reasoning was buffered.
|
||||
func PromoteStreamingThinkingFlush(ctx wrapper.HttpContext) []byte {
|
||||
hasContentDelta, _ := ctx.GetContext(ctxKeyHasContentDelta).(bool)
|
||||
if hasContentDelta {
|
||||
return nil
|
||||
}
|
||||
buffered, _ := ctx.GetContext(ctxKeyBufferedReasoning).(string)
|
||||
if buffered == "" {
|
||||
return nil
|
||||
}
|
||||
// Build a minimal chat.completion.chunk with the buffered reasoning as content
|
||||
resp := chatCompletionResponse{
|
||||
Object: objectChatCompletionChunk,
|
||||
Choices: []chatCompletionChoice{
|
||||
{
|
||||
Index: 0,
|
||||
Delta: &chatMessage{
|
||||
Content: buffered,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
data, err := json.Marshal(resp)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
// Format as SSE
|
||||
return []byte("data: " + string(data) + "\n\n")
|
||||
}
|
||||
|
||||
@@ -178,6 +178,8 @@ const (
|
||||
ctxKeyPushedMessage = "pushedMessage"
|
||||
ctxKeyContentPushed = "contentPushed"
|
||||
ctxKeyReasoningContentPushed = "reasoningContentPushed"
|
||||
ctxKeyHasContentDelta = "hasContentDelta"
|
||||
ctxKeyBufferedReasoning = "bufferedReasoning"
|
||||
|
||||
objectChatCompletion = "chat.completion"
|
||||
objectChatCompletionChunk = "chat.completion.chunk"
|
||||
@@ -474,6 +476,12 @@ type ProviderConfig struct {
|
||||
// @Title zh-CN 合并连续同角色消息
|
||||
// @Description zh-CN 开启后,若请求的 messages 中存在连续的同角色消息(如连续两条 user 消息),将其内容合并为一条,以满足要求严格轮流交替(user→assistant→user→...)的模型服务商的要求。
|
||||
mergeConsecutiveMessages bool `required:"false" yaml:"mergeConsecutiveMessages" json:"mergeConsecutiveMessages"`
|
||||
// @Title zh-CN 空内容时提升思考为正文
|
||||
// @Description zh-CN 开启后,若模型响应只包含 reasoning_content/thinking 而没有正文内容,将 reasoning 内容提升为正文内容返回,避免客户端收到空回复。
|
||||
promoteThinkingOnEmpty bool `required:"false" yaml:"promoteThinkingOnEmpty" json:"promoteThinkingOnEmpty"`
|
||||
// @Title zh-CN HiClaw 模式
|
||||
// @Description zh-CN 开启后同时启用 mergeConsecutiveMessages 和 promoteThinkingOnEmpty,适用于 HiClaw 多 Agent 协作场景。
|
||||
hiclawMode bool `required:"false" yaml:"hiclawMode" json:"hiclawMode"`
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) GetId() string {
|
||||
@@ -699,6 +707,12 @@ func (c *ProviderConfig) FromJson(json gjson.Result) {
|
||||
}
|
||||
}
|
||||
c.mergeConsecutiveMessages = json.Get("mergeConsecutiveMessages").Bool()
|
||||
c.promoteThinkingOnEmpty = json.Get("promoteThinkingOnEmpty").Bool()
|
||||
c.hiclawMode = json.Get("hiclawMode").Bool()
|
||||
if c.hiclawMode {
|
||||
c.mergeConsecutiveMessages = true
|
||||
c.promoteThinkingOnEmpty = true
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) Validate() error {
|
||||
@@ -833,6 +847,10 @@ func (c *ProviderConfig) IsOriginal() bool {
|
||||
return c.protocol == protocolOriginal
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) GetPromoteThinkingOnEmpty() bool {
|
||||
return c.promoteThinkingOnEmpty
|
||||
}
|
||||
|
||||
func (c *ProviderConfig) ReplaceByCustomSettings(body []byte) ([]byte, error) {
|
||||
return ReplaceByCustomSettings(body, c.customSettings)
|
||||
}
|
||||
|
||||
50
plugins/wasm-go/extensions/ai-proxy/test/mock_context.go
Normal file
50
plugins/wasm-go/extensions/ai-proxy/test/mock_context.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package test
|
||||
|
||||
import "github.com/higress-group/wasm-go/pkg/iface"
|
||||
|
||||
// MockHttpContext is a minimal mock for wrapper.HttpContext used in unit tests
|
||||
// that call provider functions directly (e.g. streaming thinking promotion).
|
||||
type MockHttpContext struct {
|
||||
contextMap map[string]interface{}
|
||||
}
|
||||
|
||||
func NewMockHttpContext() *MockHttpContext {
|
||||
return &MockHttpContext{contextMap: make(map[string]interface{})}
|
||||
}
|
||||
|
||||
func (m *MockHttpContext) SetContext(key string, value interface{}) { m.contextMap[key] = value }
|
||||
func (m *MockHttpContext) GetContext(key string) interface{} { return m.contextMap[key] }
|
||||
func (m *MockHttpContext) GetBoolContext(key string, def bool) bool { return def }
|
||||
func (m *MockHttpContext) GetStringContext(key, def string) string { return def }
|
||||
func (m *MockHttpContext) GetByteSliceContext(key string, def []byte) []byte { return def }
|
||||
func (m *MockHttpContext) Scheme() string { return "" }
|
||||
func (m *MockHttpContext) Host() string { return "" }
|
||||
func (m *MockHttpContext) Path() string { return "" }
|
||||
func (m *MockHttpContext) Method() string { return "" }
|
||||
func (m *MockHttpContext) GetUserAttribute(key string) interface{} { return nil }
|
||||
func (m *MockHttpContext) SetUserAttribute(key string, value interface{}) {}
|
||||
func (m *MockHttpContext) SetUserAttributeMap(kvmap map[string]interface{}) {}
|
||||
func (m *MockHttpContext) GetUserAttributeMap() map[string]interface{} { return nil }
|
||||
func (m *MockHttpContext) WriteUserAttributeToLog() error { return nil }
|
||||
func (m *MockHttpContext) WriteUserAttributeToLogWithKey(key string) error { return nil }
|
||||
func (m *MockHttpContext) WriteUserAttributeToTrace() error { return nil }
|
||||
func (m *MockHttpContext) DontReadRequestBody() {}
|
||||
func (m *MockHttpContext) DontReadResponseBody() {}
|
||||
func (m *MockHttpContext) BufferRequestBody() {}
|
||||
func (m *MockHttpContext) BufferResponseBody() {}
|
||||
func (m *MockHttpContext) NeedPauseStreamingResponse() {}
|
||||
func (m *MockHttpContext) PushBuffer(buffer []byte) {}
|
||||
func (m *MockHttpContext) PopBuffer() []byte { return nil }
|
||||
func (m *MockHttpContext) BufferQueueSize() int { return 0 }
|
||||
func (m *MockHttpContext) DisableReroute() {}
|
||||
func (m *MockHttpContext) SetRequestBodyBufferLimit(byteSize uint32) {}
|
||||
func (m *MockHttpContext) SetResponseBodyBufferLimit(byteSize uint32) {}
|
||||
func (m *MockHttpContext) RouteCall(method, url string, headers [][2]string, body []byte, callback iface.RouteResponseCallback) error {
|
||||
return nil
|
||||
}
|
||||
func (m *MockHttpContext) GetExecutionPhase() iface.HTTPExecutionPhase { return 0 }
|
||||
func (m *MockHttpContext) HasRequestBody() bool { return false }
|
||||
func (m *MockHttpContext) HasResponseBody() bool { return false }
|
||||
func (m *MockHttpContext) IsWebsocket() bool { return false }
|
||||
func (m *MockHttpContext) IsBinaryRequestBody() bool { return false }
|
||||
func (m *MockHttpContext) IsBinaryResponseBody() bool { return false }
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/provider"
|
||||
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
|
||||
"github.com/higress-group/wasm-go/pkg/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -997,3 +998,158 @@ func RunOpenAIOnStreamingResponseBodyTests(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// 测试配置:OpenAI配置 + promoteThinkingOnEmpty
|
||||
var openAIPromoteThinkingConfig = func() json.RawMessage {
|
||||
data, _ := json.Marshal(map[string]interface{}{
|
||||
"provider": map[string]interface{}{
|
||||
"type": "openai",
|
||||
"apiTokens": []string{"sk-openai-test123456789"},
|
||||
"promoteThinkingOnEmpty": true,
|
||||
},
|
||||
})
|
||||
return data
|
||||
}()
|
||||
|
||||
// 测试配置:OpenAI配置 + hiclawMode
|
||||
var openAIHiclawModeConfig = func() json.RawMessage {
|
||||
data, _ := json.Marshal(map[string]interface{}{
|
||||
"provider": map[string]interface{}{
|
||||
"type": "openai",
|
||||
"apiTokens": []string{"sk-openai-test123456789"},
|
||||
"hiclawMode": true,
|
||||
},
|
||||
})
|
||||
return data
|
||||
}()
|
||||
|
||||
func RunOpenAIPromoteThinkingOnEmptyTests(t *testing.T) {
|
||||
// Config parsing tests via host framework
|
||||
test.RunGoTest(t, func(t *testing.T) {
|
||||
t.Run("promoteThinkingOnEmpty config parses", func(t *testing.T) {
|
||||
host, status := test.NewTestHost(openAIPromoteThinkingConfig)
|
||||
defer host.Reset()
|
||||
require.Equal(t, types.OnPluginStartStatusOK, status)
|
||||
config, err := host.GetMatchConfig()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, config)
|
||||
})
|
||||
|
||||
t.Run("hiclawMode config parses", func(t *testing.T) {
|
||||
host, status := test.NewTestHost(openAIHiclawModeConfig)
|
||||
defer host.Reset()
|
||||
require.Equal(t, types.OnPluginStartStatusOK, status)
|
||||
config, err := host.GetMatchConfig()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, config)
|
||||
})
|
||||
})
|
||||
|
||||
// Non-streaming promote logic tests via provider functions directly
|
||||
t.Run("promotes reasoning_content when content is empty string", func(t *testing.T) {
|
||||
body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","content":"","reasoning_content":"这是思考内容"},"finish_reason":"stop"}]}`)
|
||||
result, err := provider.PromoteThinkingOnEmptyResponse(body)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, string(result), `"content":"这是思考内容"`)
|
||||
require.NotContains(t, string(result), `"reasoning_content":"这是思考内容"`)
|
||||
})
|
||||
|
||||
t.Run("promotes reasoning_content when content is nil", func(t *testing.T) {
|
||||
body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","reasoning_content":"思考结果"},"finish_reason":"stop"}]}`)
|
||||
result, err := provider.PromoteThinkingOnEmptyResponse(body)
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, string(result), `"content":"思考结果"`)
|
||||
})
|
||||
|
||||
t.Run("no promotion when content is present", func(t *testing.T) {
|
||||
body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","content":"正常回复","reasoning_content":"思考过程"},"finish_reason":"stop"}]}`)
|
||||
result, err := provider.PromoteThinkingOnEmptyResponse(body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, string(body), string(result))
|
||||
})
|
||||
|
||||
t.Run("no promotion when no reasoning", func(t *testing.T) {
|
||||
body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","content":"正常回复"},"finish_reason":"stop"}]}`)
|
||||
result, err := provider.PromoteThinkingOnEmptyResponse(body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, string(body), string(result))
|
||||
})
|
||||
|
||||
t.Run("no promotion when both empty", func(t *testing.T) {
|
||||
body := []byte(`{"choices":[{"index":0,"message":{"role":"assistant","content":""},"finish_reason":"stop"}]}`)
|
||||
result, err := provider.PromoteThinkingOnEmptyResponse(body)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, string(body), string(result))
|
||||
})
|
||||
|
||||
t.Run("invalid json returns error", func(t *testing.T) {
|
||||
body := []byte(`not json`)
|
||||
result, err := provider.PromoteThinkingOnEmptyResponse(body)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, string(body), string(result))
|
||||
})
|
||||
}
|
||||
|
||||
func RunOpenAIPromoteThinkingOnEmptyStreamingTests(t *testing.T) {
|
||||
// Streaming tests use provider functions directly since the test framework
|
||||
// does not expose GetStreamingResponseBody.
|
||||
t.Run("streaming: buffers reasoning and flushes on end when no content", func(t *testing.T) {
|
||||
ctx := NewMockHttpContext()
|
||||
// Chunk with only reasoning_content
|
||||
data := []byte(`{"choices":[{"index":0,"delta":{"reasoning_content":"流式思考"}}]}`)
|
||||
result, err := provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data)
|
||||
require.NoError(t, err)
|
||||
// Reasoning should be stripped (not promoted inline)
|
||||
require.NotContains(t, string(result), `"content":"流式思考"`)
|
||||
|
||||
// Flush should emit buffered reasoning as content
|
||||
flush := provider.PromoteStreamingThinkingFlush(ctx)
|
||||
require.NotNil(t, flush)
|
||||
require.Contains(t, string(flush), `"content":"流式思考"`)
|
||||
})
|
||||
|
||||
t.Run("streaming: no flush when content was seen", func(t *testing.T) {
|
||||
ctx := NewMockHttpContext()
|
||||
// First chunk: content delta
|
||||
data1 := []byte(`{"choices":[{"index":0,"delta":{"content":"正文"}}]}`)
|
||||
_, _ = provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data1)
|
||||
|
||||
// Second chunk: reasoning only
|
||||
data2 := []byte(`{"choices":[{"index":0,"delta":{"reasoning_content":"后续思考"}}]}`)
|
||||
result, err := provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data2)
|
||||
require.NoError(t, err)
|
||||
// Should be unchanged since content was already seen
|
||||
require.Equal(t, string(data2), string(result))
|
||||
|
||||
// Flush should return nil since content was seen
|
||||
flush := provider.PromoteStreamingThinkingFlush(ctx)
|
||||
require.Nil(t, flush)
|
||||
})
|
||||
|
||||
t.Run("streaming: accumulates multiple reasoning chunks", func(t *testing.T) {
|
||||
ctx := NewMockHttpContext()
|
||||
data1 := []byte(`{"choices":[{"index":0,"delta":{"reasoning_content":"第一段"}}]}`)
|
||||
_, _ = provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data1)
|
||||
|
||||
data2 := []byte(`{"choices":[{"index":0,"delta":{"reasoning_content":"第二段"}}]}`)
|
||||
_, _ = provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data2)
|
||||
|
||||
flush := provider.PromoteStreamingThinkingFlush(ctx)
|
||||
require.NotNil(t, flush)
|
||||
require.Contains(t, string(flush), `"content":"第一段第二段"`)
|
||||
})
|
||||
|
||||
t.Run("streaming: no flush when no reasoning buffered", func(t *testing.T) {
|
||||
ctx := NewMockHttpContext()
|
||||
flush := provider.PromoteStreamingThinkingFlush(ctx)
|
||||
require.Nil(t, flush)
|
||||
})
|
||||
|
||||
t.Run("streaming: invalid json returns original", func(t *testing.T) {
|
||||
ctx := NewMockHttpContext()
|
||||
data := []byte(`not json`)
|
||||
result, err := provider.PromoteStreamingThinkingOnEmptyChunk(ctx, data)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, string(data), string(result))
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user