Files
higress/plugins/wasm-go/extensions/ai-statistics/main_test.go

1715 lines
54 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Copyright (c) 2024 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"encoding/json"
"testing"
"time"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
"github.com/higress-group/wasm-go/pkg/test"
"github.com/stretchr/testify/require"
)
// 测试配置:基本统计配置
var basicConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"attributes": []map[string]interface{}{
{
"key": "request_id",
"value_source": "request_header",
"value": "x-request-id",
"apply_to_log": true,
"apply_to_span": false,
"as_separate_log_field": false,
},
{
"key": "api_version",
"value_source": "fixed_value",
"value": "v1",
"apply_to_log": true,
"apply_to_span": true,
"as_separate_log_field": false,
},
{
"key": "model",
"value_source": "request_body",
"value": "model",
"apply_to_log": true,
"apply_to_span": true,
"as_separate_log_field": false,
},
{
"key": "input_token",
"value_source": "response_body",
"value": "usage.prompt_tokens",
"apply_to_log": true,
"apply_to_span": true,
"as_separate_log_field": false,
},
{
"key": "output_token",
"value_source": "response_body",
"value": "usage.completion_tokens",
"apply_to_log": true,
"apply_to_span": true,
"as_separate_log_field": false,
},
{
"key": "total_token",
"value_source": "response_body",
"value": "usage.total_tokens",
"apply_to_log": true,
"apply_to_span": true,
"as_separate_log_field": false,
},
},
"disable_openai_usage": false,
})
return data
}()
// 测试配置:流式响应体属性配置
var streamingBodyConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"attributes": []map[string]interface{}{
{
"key": "response_content",
"value_source": "response_streaming_body",
"value": "choices.0.message.content",
"rule": "first",
"apply_to_log": true,
"apply_to_span": false,
"as_separate_log_field": false,
},
{
"key": "model_name",
"value_source": "response_streaming_body",
"value": "model",
"rule": "replace",
"apply_to_log": true,
"apply_to_span": true,
"as_separate_log_field": false,
},
},
"disable_openai_usage": false,
})
return data
}()
// 测试配置:请求体属性配置
var requestBodyConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"attributes": []map[string]interface{}{
{
"key": "user_message_count",
"value_source": "request_body",
"value": "messages.#(role==\"user\")",
"apply_to_log": true,
"apply_to_span": false,
"as_separate_log_field": false,
},
{
"key": "request_model",
"value_source": "request_body",
"value": "model",
"apply_to_log": true,
"apply_to_span": true,
"as_separate_log_field": false,
},
},
"disable_openai_usage": false,
})
return data
}()
// 测试配置:响应体属性配置
var responseBodyConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"attributes": []map[string]interface{}{
{
"key": "response_status",
"value_source": "response_body",
"value": "status",
"apply_to_log": true,
"apply_to_span": false,
"as_separate_log_field": false,
},
{
"key": "response_message",
"value_source": "response_body",
"value": "message",
"apply_to_log": true,
"apply_to_span": true,
"as_separate_log_field": false,
},
},
"disable_openai_usage": false,
})
return data
}()
// 测试配置:禁用 OpenAI 使用统计
var disableOpenaiUsageConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"attributes": []map[string]interface{}{
{
"key": "custom_attribute",
"value_source": "fixed_value",
"value": "custom_value",
"apply_to_log": true,
"apply_to_span": false,
"as_separate_log_field": false,
},
},
"disable_openai_usage": true,
})
return data
}()
// 测试配置:空属性配置
var emptyAttributesConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"attributes": []map[string]interface{}{},
"disable_openai_usage": false,
})
return data
}()
func TestParseConfig(t *testing.T) {
test.RunGoTest(t, func(t *testing.T) {
// 测试基本统计配置解析
t.Run("basic config", func(t *testing.T) {
host, status := test.NewTestHost(basicConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
config, err := host.GetMatchConfig()
require.NoError(t, err)
require.NotNil(t, config)
})
// 测试流式响应体属性配置解析
t.Run("streaming body config", func(t *testing.T) {
host, status := test.NewTestHost(streamingBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
config, err := host.GetMatchConfig()
require.NoError(t, err)
require.NotNil(t, config)
})
// 测试请求体属性配置解析
t.Run("request body config", func(t *testing.T) {
host, status := test.NewTestHost(requestBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
config, err := host.GetMatchConfig()
require.NoError(t, err)
require.NotNil(t, config)
})
// 测试响应体属性配置解析
t.Run("response body config", func(t *testing.T) {
host, status := test.NewTestHost(responseBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
config, err := host.GetMatchConfig()
require.NoError(t, err)
require.NotNil(t, config)
})
// 测试禁用 OpenAI 使用统计配置解析
t.Run("disable openai usage config", func(t *testing.T) {
host, status := test.NewTestHost(disableOpenaiUsageConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
config, err := host.GetMatchConfig()
require.NoError(t, err)
require.NotNil(t, config)
})
// 测试空属性配置解析
t.Run("empty attributes config", func(t *testing.T) {
host, status := test.NewTestHost(emptyAttributesConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
config, err := host.GetMatchConfig()
require.NoError(t, err)
require.NotNil(t, config)
})
})
}
func TestOnHttpRequestHeaders(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试基本请求头处理
t.Run("basic request headers", func(t *testing.T) {
host, status := test.NewTestHost(basicConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 设置请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
{"x-request-id", "req-123"},
{"x-mse-consumer", "consumer1"},
})
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
// 测试包含 consumer 的请求头处理
t.Run("request headers with consumer", func(t *testing.T) {
host, status := test.NewTestHost(basicConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 设置请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
{"x-request-id", "req-456"},
{"x-mse-consumer", "consumer2"},
})
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
// 测试不包含 consumer 的请求头处理
t.Run("request headers without consumer", func(t *testing.T) {
host, status := test.NewTestHost(basicConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 设置请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
{"x-request-id", "req-789"},
})
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
})
}
func TestOnHttpRequestBody(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试基本请求体处理
t.Run("basic request body", func(t *testing.T) {
host, status := test.NewTestHost(requestBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 先设置请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
})
// 设置请求体
requestBody := []byte(`{
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
{"role": "user", "content": "How are you?"}
]
}`)
action := host.CallOnHttpRequestBody(requestBody)
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
// 测试 Google Gemini 格式的请求体处理
t.Run("gemini request body", func(t *testing.T) {
host, status := test.NewTestHost(requestBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 先设置请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/models/gemini-pro:generateContent"},
{":method", "POST"},
})
// 设置请求体
requestBody := []byte(`{
"contents": [
{"role": "user", "parts": [{"text": "Hello"}]},
{"parts": [{"text": "Hi there"}]}
]
}`)
action := host.CallOnHttpRequestBody(requestBody)
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
// 测试不包含消息的请求体处理
t.Run("request body without messages", func(t *testing.T) {
host, status := test.NewTestHost(requestBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 先设置请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
})
// 设置请求体
requestBody := []byte(`{
"model": "gpt-3.5-turbo",
"temperature": 0.7
}`)
action := host.CallOnHttpRequestBody(requestBody)
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
})
}
func TestOnHttpResponseHeaders(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试基本响应头处理
t.Run("basic response headers", func(t *testing.T) {
host, status := test.NewTestHost(basicConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 先设置请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
})
// 设置响应头
action := host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
// 测试流式响应头处理
t.Run("streaming response headers", func(t *testing.T) {
host, status := test.NewTestHost(streamingBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 先设置请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
})
// 设置流式响应头
action := host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
})
}
func TestOnHttpStreamingBody(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试流式响应体处理
t.Run("streaming response body", func(t *testing.T) {
host, status := test.NewTestHost(streamingBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 先设置请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
})
// 设置流式响应头
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
// 处理第一个流式块
firstChunk := []byte(`data: {"choices":[{"message":{"content":"Hello"}}],"model":"gpt-3.5-turbo"}`)
action := host.CallOnHttpStreamingResponseBody(firstChunk, false)
result := host.GetResponseBody()
require.Equal(t, firstChunk, result)
// 应该返回原始数据
require.Equal(t, types.ActionContinue, action)
// 处理最后一个流式块
lastChunk := []byte(`data: {"choices":[{"message":{"content":"How can I help you?"}}],"model":"gpt-3.5-turbo"}`)
action = host.CallOnHttpStreamingResponseBody(lastChunk, true)
// 应该返回原始数据
require.Equal(t, types.ActionContinue, action)
result = host.GetResponseBody()
require.Equal(t, lastChunk, result)
host.CompleteHttp()
})
// 测试不包含 token 统计的流式响应体处理
t.Run("streaming body without token usage", func(t *testing.T) {
host, status := test.NewTestHost(streamingBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 先设置请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
})
// 设置流式响应头
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
// 处理流式响应体
chunk := []byte(`data: {"message": "Hello world"}`)
action := host.CallOnHttpStreamingResponseBody(chunk, true)
// 应该返回原始数据
require.Equal(t, types.ActionContinue, action)
result := host.GetResponseBody()
require.Equal(t, chunk, result)
host.CompleteHttp()
})
})
}
func TestOnHttpResponseBody(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试基本响应体处理
t.Run("basic response body", func(t *testing.T) {
host, status := test.NewTestHost(responseBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 先设置请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
})
// 设置响应头
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
// 设置响应体
responseBody := []byte(`{
"status": "success",
"message": "Hello, how can I help you?",
"choices": [{"message": {"content": "Hello"}}],
"usage": {"prompt_tokens": 10, "completion_tokens": 15, "total_tokens": 25},
"model": "gpt-3.5-turbo"
}`)
action := host.CallOnHttpResponseBody(responseBody)
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
// 测试不包含 token 统计的响应体处理
t.Run("response body without token usage", func(t *testing.T) {
host, status := test.NewTestHost(responseBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 先设置请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
})
// 设置响应头
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
// 设置响应体
responseBody := []byte(`{
"status": "success",
"message": "Hello world"
}`)
action := host.CallOnHttpResponseBody(responseBody)
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
})
}
func TestMetrics(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试指标收集
t.Run("test token usage metrics", func(t *testing.T) {
host, status := test.NewTestHost(basicConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 设置路由和集群名称
host.SetRouteName("api-v1")
host.SetClusterName("cluster-1")
// 1. 处理请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
{"x-mse-consumer", "user1"},
})
// 2. 处理请求体
requestBody := []byte(`{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hello"}]
}`)
host.CallOnHttpRequestBody(requestBody)
// 添加延迟,确保有足够的时间间隔来计算 llm_service_duration
time.Sleep(10 * time.Millisecond)
// 3. 处理响应体
responseBody := []byte(`{
"choices": [{"message": {"content": "Hello, how can I help you?"}}],
"usage": {"prompt_tokens": 5, "completion_tokens": 8, "total_tokens": 13},
"model": "gpt-3.5-turbo"
}`)
host.CallOnHttpResponseBody(responseBody)
// 4. 完成请求
host.CompleteHttp()
// 5. 验证指标值
// 检查输入 token 指标
inputTokenMetric := "route.api-v1.upstream.cluster-1.model.gpt-3.5-turbo.consumer.user1.metric.input_token"
inputTokenValue, err := host.GetCounterMetric(inputTokenMetric)
require.NoError(t, err)
require.Equal(t, uint64(5), inputTokenValue)
// 检查输出 token 指标
outputTokenMetric := "route.api-v1.upstream.cluster-1.model.gpt-3.5-turbo.consumer.user1.metric.output_token"
outputTokenValue, err := host.GetCounterMetric(outputTokenMetric)
require.NoError(t, err)
require.Equal(t, uint64(8), outputTokenValue)
// 检查总 token 指标
totalTokenMetric := "route.api-v1.upstream.cluster-1.model.gpt-3.5-turbo.consumer.user1.metric.total_token"
totalTokenValue, err := host.GetCounterMetric(totalTokenMetric)
require.NoError(t, err)
require.Equal(t, uint64(13), totalTokenValue)
// 检查服务时长指标
serviceDurationMetric := "route.api-v1.upstream.cluster-1.model.gpt-3.5-turbo.consumer.user1.metric.llm_service_duration"
serviceDurationValue, err := host.GetCounterMetric(serviceDurationMetric)
require.NoError(t, err)
require.Greater(t, serviceDurationValue, uint64(0))
// 检查请求计数指标
durationCountMetric := "route.api-v1.upstream.cluster-1.model.gpt-3.5-turbo.consumer.user1.metric.llm_duration_count"
durationCountValue, err := host.GetCounterMetric(durationCountMetric)
require.NoError(t, err)
require.Equal(t, uint64(1), durationCountValue)
})
// 测试流式响应指标
t.Run("test streaming metrics", func(t *testing.T) {
host, status := test.NewTestHost(streamingBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 设置路由和集群名称
host.SetRouteName("api-v1")
host.SetClusterName("cluster-1")
// 1. 处理请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
{"x-mse-consumer", "user2"},
})
// 2. 处理请求体
requestBody := []byte(`{
"model": "gpt-4",
"messages": [
{"role": "user", "content": "Hello"}
]
}`)
action := host.CallOnHttpRequestBody(requestBody)
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
// 添加延迟,确保有足够的时间间隔来计算 llm_service_duration
time.Sleep(10 * time.Millisecond)
// 3. 处理流式响应头
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
// 4. 处理流式响应体 - 添加 usage 信息
firstChunk := []byte(`data: {"choices":[{"message":{"content":"Hello"}}],"model":"gpt-4","usage":{"prompt_tokens":5,"completion_tokens":3,"total_tokens":8}}`)
action = host.CallOnHttpStreamingResponseBody(firstChunk, false)
// 应该返回原始数据
require.Equal(t, types.ActionContinue, action)
result := host.GetResponseBody()
require.Equal(t, firstChunk, result)
// 5. 处理最后一个流式块 - 添加 usage 信息
lastChunk := []byte(`data: {"choices":[{"message":{"content":"How can I help you?"}}],"model":"gpt-4","usage":{"prompt_tokens":5,"completion_tokens":8,"total_tokens":13}}`)
action = host.CallOnHttpStreamingResponseBody(lastChunk, true)
// 应该返回原始数据
require.Equal(t, types.ActionContinue, action)
result = host.GetResponseBody()
require.Equal(t, lastChunk, result)
// 添加延迟,确保有足够的时间间隔来计算 llm_service_duration
time.Sleep(10 * time.Millisecond)
// 6. 完成请求
host.CompleteHttp()
// 7. 验证流式响应指标
// 检查首 token 延迟指标
firstTokenDurationMetric := "route.api-v1.upstream.cluster-1.model.gpt-4.consumer.user2.metric.llm_first_token_duration"
firstTokenDurationValue, err := host.GetCounterMetric(firstTokenDurationMetric)
require.NoError(t, err)
require.Greater(t, firstTokenDurationValue, uint64(0))
// 检查流式请求计数指标
streamDurationCountMetric := "route.api-v1.upstream.cluster-1.model.gpt-4.consumer.user2.metric.llm_stream_duration_count"
streamDurationCountValue, err := host.GetCounterMetric(streamDurationCountMetric)
require.NoError(t, err)
require.Equal(t, uint64(1), streamDurationCountValue)
// 检查服务时长指标
serviceDurationMetric := "route.api-v1.upstream.cluster-1.model.gpt-4.consumer.user2.metric.llm_service_duration"
serviceDurationValue, err := host.GetCounterMetric(serviceDurationMetric)
require.NoError(t, err)
require.Greater(t, serviceDurationValue, uint64(0))
// 检查 token 指标
inputTokenMetric := "route.api-v1.upstream.cluster-1.model.gpt-4.consumer.user2.metric.input_token"
inputTokenValue, err := host.GetCounterMetric(inputTokenMetric)
require.NoError(t, err)
require.Equal(t, uint64(5), inputTokenValue)
outputTokenMetric := "route.api-v1.upstream.cluster-1.model.gpt-4.consumer.user2.metric.output_token"
outputTokenValue, err := host.GetCounterMetric(outputTokenMetric)
require.NoError(t, err)
require.Equal(t, uint64(8), outputTokenValue)
totalTokenMetric := "route.api-v1.upstream.cluster-1.model.gpt-4.consumer.user2.metric.total_token"
totalTokenValue, err := host.GetCounterMetric(totalTokenMetric)
require.NoError(t, err)
require.Equal(t, uint64(13), totalTokenValue)
})
})
}
func TestCompleteFlow(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试完整的统计流程
t.Run("complete statistics flow", func(t *testing.T) {
host, status := test.NewTestHost(basicConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 设置路由和集群名称
host.SetRouteName("api-v1")
host.SetClusterName("cluster-1")
// 1. 处理请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
{"x-request-id", "req-123"},
{"x-mse-consumer", "consumer1"},
})
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
// 2. 处理请求体
requestBody := []byte(`{
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hello"}
]
}`)
action = host.CallOnHttpRequestBody(requestBody)
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
// 添加延迟,确保有足够的时间间隔来计算 llm_service_duration
time.Sleep(10 * time.Millisecond)
// 3. 处理响应头
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
// 4. 处理响应体
responseBody := []byte(`{
"choices": [{"message": {"content": "Hello, how can I help you?"}}],
"usage": {"prompt_tokens": 5, "completion_tokens": 8, "total_tokens": 13},
"model": "gpt-3.5-turbo"
}`)
action = host.CallOnHttpResponseBody(responseBody)
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
// 5. 完成请求
host.CompleteHttp()
// 6. 验证指标值
// 检查输入 token 指标
inputTokenMetric := "route.api-v1.upstream.cluster-1.model.gpt-3.5-turbo.consumer.consumer1.metric.input_token"
inputTokenValue, err := host.GetCounterMetric(inputTokenMetric)
require.NoError(t, err)
require.Equal(t, uint64(5), inputTokenValue)
// 检查输出 token 指标
outputTokenMetric := "route.api-v1.upstream.cluster-1.model.gpt-3.5-turbo.consumer.consumer1.metric.output_token"
outputTokenValue, err := host.GetCounterMetric(outputTokenMetric)
require.NoError(t, err)
require.Equal(t, uint64(8), outputTokenValue)
// 检查总 token 指标
totalTokenMetric := "route.api-v1.upstream.cluster-1.model.gpt-3.5-turbo.consumer.consumer1.metric.total_token"
totalTokenValue, err := host.GetCounterMetric(totalTokenMetric)
require.NoError(t, err)
require.Equal(t, uint64(13), totalTokenValue)
// 检查服务时长指标
serviceDurationMetric := "route.api-v1.upstream.cluster-1.model.gpt-3.5-turbo.consumer.consumer1.metric.llm_service_duration"
serviceDurationValue, err := host.GetCounterMetric(serviceDurationMetric)
require.NoError(t, err)
require.Greater(t, serviceDurationValue, uint64(0))
// 检查请求计数指标
durationCountMetric := "route.api-v1.upstream.cluster-1.model.gpt-3.5-turbo.consumer.consumer1.metric.llm_duration_count"
durationCountValue, err := host.GetCounterMetric(durationCountMetric)
require.NoError(t, err)
require.Equal(t, uint64(1), durationCountValue)
})
// 测试流式响应的完整流程
t.Run("complete streaming flow", func(t *testing.T) {
host, status := test.NewTestHost(streamingBodyConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 设置路由和集群名称
host.SetRouteName("api-v1")
host.SetClusterName("cluster-1")
// 1. 处理请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/api/chat"},
{":method", "POST"},
{"x-mse-consumer", "consumer2"},
})
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
// 2. 处理请求体
requestBody := []byte(`{
"model": "gpt-4",
"messages": [
{"role": "user", "content": "Hello"}
]
}`)
action = host.CallOnHttpRequestBody(requestBody)
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
// 添加延迟,确保有足够的时间间隔来计算 llm_service_duration
time.Sleep(10 * time.Millisecond)
// 3. 处理流式响应头
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
// 应该返回 ActionContinue
require.Equal(t, types.ActionContinue, action)
// 4. 处理流式响应体 - 添加 usage 信息
firstChunk := []byte(`data: {"choices":[{"message":{"content":"Hello"}}],"model":"gpt-4","usage":{"prompt_tokens":5,"completion_tokens":3,"total_tokens":8}}`)
action = host.CallOnHttpStreamingResponseBody(firstChunk, false)
// 应该返回原始数据
require.Equal(t, types.ActionContinue, action)
result := host.GetResponseBody()
require.Equal(t, firstChunk, result)
// 5. 处理最后一个流式块 - 添加 usage 信息
lastChunk := []byte(`data: {"choices":[{"message":{"content":"How can I help you?"}}],"model":"gpt-4","usage":{"prompt_tokens":5,"completion_tokens":8,"total_tokens":13}}`)
action = host.CallOnHttpStreamingResponseBody(lastChunk, true)
// 应该返回原始数据
require.Equal(t, types.ActionContinue, action)
result = host.GetResponseBody()
require.Equal(t, lastChunk, result)
// 添加延迟,确保有足够的时间间隔来计算 llm_service_duration
time.Sleep(10 * time.Millisecond)
// 6. 完成请求
host.CompleteHttp()
// 7. 验证流式响应指标
// 检查首 token 延迟指标
firstTokenDurationMetric := "route.api-v1.upstream.cluster-1.model.gpt-4.consumer.consumer2.metric.llm_first_token_duration"
firstTokenDurationValue, err := host.GetCounterMetric(firstTokenDurationMetric)
require.NoError(t, err)
require.Greater(t, firstTokenDurationValue, uint64(0))
// 检查流式请求计数指标
streamDurationCountMetric := "route.api-v1.upstream.cluster-1.model.gpt-4.consumer.consumer2.metric.llm_stream_duration_count"
streamDurationCountValue, err := host.GetCounterMetric(streamDurationCountMetric)
require.NoError(t, err)
require.Equal(t, uint64(1), streamDurationCountValue)
// 检查服务时长指标
serviceDurationMetric := "route.api-v1.upstream.cluster-1.model.gpt-4.consumer.consumer2.metric.llm_service_duration"
serviceDurationValue, err := host.GetCounterMetric(serviceDurationMetric)
require.NoError(t, err)
require.Greater(t, serviceDurationValue, uint64(0))
})
})
}
// ==================== Built-in Attributes Tests ====================
// 测试配置:历史兼容配置(显式配置 value_source 和 value
var legacyQuestionAnswerConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"attributes": []map[string]interface{}{
{
"key": "question",
"value_source": "request_body",
"value": "messages.@reverse.0.content",
"apply_to_log": true,
},
{
"key": "answer",
"value_source": "response_streaming_body",
"value": "choices.0.delta.content",
"rule": "append",
"apply_to_log": true,
},
{
"key": "answer",
"value_source": "response_body",
"value": "choices.0.message.content",
"apply_to_log": true,
},
},
})
return data
}()
// 测试配置:内置属性简化配置(不配置 value_source 和 value
var builtinAttributesConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"attributes": []map[string]interface{}{
{
"key": "question",
"apply_to_log": true,
},
{
"key": "answer",
"apply_to_log": true,
},
{
"key": "reasoning",
"apply_to_log": true,
},
{
"key": "tool_calls",
"apply_to_log": true,
},
},
})
return data
}()
// 测试配置session_id 配置
var sessionIdConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"session_id_header": "x-custom-session",
"attributes": []map[string]interface{}{
{
"key": "question",
"apply_to_log": true,
},
{
"key": "answer",
"apply_to_log": true,
},
},
})
return data
}()
// TestLegacyConfigCompatibility 测试历史配置兼容性
func TestLegacyConfigCompatibility(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试使用显式 value_source 和 value 配置的 question/answer
t.Run("legacy question answer config", func(t *testing.T) {
host, status := test.NewTestHost(legacyQuestionAnswerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 1. 处理请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
})
// 2. 处理请求体
requestBody := []byte(`{
"model": "gpt-4",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is 2+2?"}
]
}`)
action := host.CallOnHttpRequestBody(requestBody)
require.Equal(t, types.ActionContinue, action)
// 3. 处理响应头 (非流式)
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
// 4. 处理响应体
responseBody := []byte(`{
"choices": [{"message": {"role": "assistant", "content": "2+2 equals 4."}}],
"model": "gpt-4",
"usage": {"prompt_tokens": 20, "completion_tokens": 10, "total_tokens": 30}
}`)
action = host.CallOnHttpResponseBody(responseBody)
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
// 测试使用显式配置的流式响应
t.Run("legacy streaming answer config", func(t *testing.T) {
host, status := test.NewTestHost(legacyQuestionAnswerConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 1. 处理请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
})
// 2. 处理请求体
requestBody := []byte(`{
"model": "gpt-4",
"stream": true,
"messages": [{"role": "user", "content": "Hello"}]
}`)
host.CallOnHttpRequestBody(requestBody)
// 3. 处理流式响应头
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
// 4. 处理流式响应体
chunk1 := []byte(`data: {"choices":[{"delta":{"content":"Hello"}}]}`)
host.CallOnHttpStreamingResponseBody(chunk1, false)
chunk2 := []byte(`data: {"choices":[{"delta":{"content":" there!"}}]}`)
host.CallOnHttpStreamingResponseBody(chunk2, true)
host.CompleteHttp()
})
})
}
// TestBuiltinAttributesDefaultSource 测试内置属性的默认 value_source
func TestBuiltinAttributesDefaultSource(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试不配置 value_source 的内置属性(非流式响应)
t.Run("builtin attributes non-streaming", func(t *testing.T) {
host, status := test.NewTestHost(builtinAttributesConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 1. 处理请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
})
// 2. 处理请求体 - question 应该自动从 request_body 提取
requestBody := []byte(`{
"model": "deepseek-reasoner",
"messages": [
{"role": "user", "content": "What is the capital of France?"}
]
}`)
action := host.CallOnHttpRequestBody(requestBody)
require.Equal(t, types.ActionContinue, action)
// 3. 处理响应头 (非流式)
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
// 4. 处理响应体 - answer, reasoning, tool_calls 应该自动从 response_body 提取
responseBody := []byte(`{
"choices": [{
"message": {
"role": "assistant",
"content": "The capital of France is Paris.",
"reasoning_content": "The user is asking about geography. France is a country in Europe, and its capital city is Paris."
}
}],
"model": "deepseek-reasoner",
"usage": {"prompt_tokens": 15, "completion_tokens": 25, "total_tokens": 40}
}`)
action = host.CallOnHttpResponseBody(responseBody)
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
// 测试不配置 value_source 的内置属性(流式响应)
t.Run("builtin attributes streaming", func(t *testing.T) {
host, status := test.NewTestHost(builtinAttributesConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 1. 处理请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
})
// 2. 处理请求体
requestBody := []byte(`{
"model": "deepseek-reasoner",
"stream": true,
"messages": [{"role": "user", "content": "Tell me a joke"}]
}`)
host.CallOnHttpRequestBody(requestBody)
// 3. 处理流式响应头
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
// 4. 处理流式响应体 - answer, reasoning 应该自动从 response_streaming_body 提取
chunk1 := []byte(`data: {"choices":[{"delta":{"reasoning_content":"Let me think of a good joke..."}}]}`)
host.CallOnHttpStreamingResponseBody(chunk1, false)
chunk2 := []byte(`data: {"choices":[{"delta":{"content":"Why did the chicken"}}]}`)
host.CallOnHttpStreamingResponseBody(chunk2, false)
chunk3 := []byte(`data: {"choices":[{"delta":{"content":" cross the road?"}}]}`)
host.CallOnHttpStreamingResponseBody(chunk3, true)
host.CompleteHttp()
})
})
}
// TestStreamingToolCalls 测试流式 tool_calls 解析
func TestStreamingToolCalls(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试流式 tool_calls 拼接
t.Run("streaming tool calls assembly", func(t *testing.T) {
host, status := test.NewTestHost(builtinAttributesConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 1. 处理请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
})
// 2. 处理请求体
requestBody := []byte(`{
"model": "gpt-4",
"stream": true,
"messages": [{"role": "user", "content": "What's the weather in Beijing?"}],
"tools": [{"type": "function", "function": {"name": "get_weather"}}]
}`)
host.CallOnHttpRequestBody(requestBody)
// 3. 处理流式响应头
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
// 4. 处理流式响应体 - 模拟分片的 tool_calls
// 第一个 chunk: tool call 的 id 和 function name
chunk1 := []byte(`data: {"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_abc123","type":"function","function":{"name":"get_weather","arguments":""}}]}}]}`)
host.CallOnHttpStreamingResponseBody(chunk1, false)
// 第二个 chunk: arguments 的第一部分
chunk2 := []byte(`data: {"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"locat"}}]}}]}`)
host.CallOnHttpStreamingResponseBody(chunk2, false)
// 第三个 chunk: arguments 的第二部分
chunk3 := []byte(`data: {"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"ion\": \"Bei"}}]}}]}`)
host.CallOnHttpStreamingResponseBody(chunk3, false)
// 第四个 chunk: arguments 的最后部分
chunk4 := []byte(`data: {"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"jing\"}"}}]}}]}`)
host.CallOnHttpStreamingResponseBody(chunk4, false)
// 最后一个 chunk: 结束
chunk5 := []byte(`data: {"choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`)
host.CallOnHttpStreamingResponseBody(chunk5, true)
host.CompleteHttp()
})
// 测试多个 tool_calls 的流式拼接
t.Run("multiple streaming tool calls", func(t *testing.T) {
host, status := test.NewTestHost(builtinAttributesConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 1. 处理请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
})
// 2. 处理请求体
requestBody := []byte(`{
"model": "gpt-4",
"stream": true,
"messages": [{"role": "user", "content": "Compare weather in Beijing and Shanghai"}]
}`)
host.CallOnHttpRequestBody(requestBody)
// 3. 处理流式响应头
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "text/event-stream"},
})
// 4. 处理流式响应体 - 模拟多个 tool_calls
// 第一个 tool call
chunk1 := []byte(`data: {"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_001","type":"function","function":{"name":"get_weather","arguments":""}}]}}]}`)
host.CallOnHttpStreamingResponseBody(chunk1, false)
// 第二个 tool call
chunk2 := []byte(`data: {"choices":[{"delta":{"tool_calls":[{"index":1,"id":"call_002","type":"function","function":{"name":"get_weather","arguments":""}}]}}]}`)
host.CallOnHttpStreamingResponseBody(chunk2, false)
// 第一个 tool call 的 arguments
chunk3 := []byte(`data: {"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"location\":\"Beijing\"}"}}]}}]}`)
host.CallOnHttpStreamingResponseBody(chunk3, false)
// 第二个 tool call 的 arguments
chunk4 := []byte(`data: {"choices":[{"delta":{"tool_calls":[{"index":1,"function":{"arguments":"{\"location\":\"Shanghai\"}"}}]}}]}`)
host.CallOnHttpStreamingResponseBody(chunk4, false)
// 结束
chunk5 := []byte(`data: {"choices":[{"delta":{},"finish_reason":"tool_calls"}]}`)
host.CallOnHttpStreamingResponseBody(chunk5, true)
host.CompleteHttp()
})
// 测试非流式 tool_calls
t.Run("non-streaming tool calls", func(t *testing.T) {
host, status := test.NewTestHost(builtinAttributesConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 1. 处理请求头
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
})
// 2. 处理请求体
requestBody := []byte(`{
"model": "gpt-4",
"messages": [{"role": "user", "content": "What's the weather?"}]
}`)
host.CallOnHttpRequestBody(requestBody)
// 3. 处理响应头
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
// 4. 处理响应体 - 非流式 tool_calls
responseBody := []byte(`{
"choices": [{
"message": {
"role": "assistant",
"content": null,
"tool_calls": [{
"id": "call_abc123",
"type": "function",
"function": {
"name": "get_weather",
"arguments": "{\"location\": \"Beijing\"}"
}
}]
},
"finish_reason": "tool_calls"
}],
"model": "gpt-4",
"usage": {"prompt_tokens": 20, "completion_tokens": 15, "total_tokens": 35}
}`)
action := host.CallOnHttpResponseBody(responseBody)
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
})
}
// TestSessionIdExtraction 测试 session_id 提取
func TestSessionIdExtraction(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
// 测试自定义 session_id header
t.Run("custom session id header", func(t *testing.T) {
host, status := test.NewTestHost(sessionIdConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 处理请求头 - 带自定义 session header
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
{"x-custom-session", "sess_custom_123"},
})
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
// 测试默认 session_id headers 优先级
t.Run("default session id headers priority", func(t *testing.T) {
host, status := test.NewTestHost(builtinAttributesConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 处理请求头 - 带多个默认 session headers应该使用优先级最高的
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
{"x-agent-session", "sess_agent_456"},
{"x-clawdbot-session-key", "sess_clawdbot_789"},
{"x-openclaw-session-key", "sess_openclaw_123"}, // 最高优先级
})
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
// 测试 fallback 到次优先级 header
t.Run("session id fallback", func(t *testing.T) {
host, status := test.NewTestHost(builtinAttributesConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 处理请求头 - 只有低优先级的 session header
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
{"x-agent-session", "sess_agent_only"},
})
require.Equal(t, types.ActionContinue, action)
host.CompleteHttp()
})
})
}
// TestExtractStreamingToolCalls 单独测试 extractStreamingToolCalls 函数
func TestExtractStreamingToolCalls(t *testing.T) {
t.Run("single tool call assembly", func(t *testing.T) {
// 模拟流式 chunks
chunks := [][]byte{
[]byte(`{"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_123","type":"function","function":{"name":"get_weather","arguments":""}}]}}]}`),
[]byte(`{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"loc"}}]}}]}`),
[]byte(`{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"ation"}}]}}]}`),
[]byte(`{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"\":\"Beijing\"}"}}]}}]}`),
}
var buffer *StreamingToolCallsBuffer
for _, chunk := range chunks {
buffer = extractStreamingToolCalls(chunk, buffer)
}
toolCalls := getToolCallsFromBuffer(buffer)
require.Len(t, toolCalls, 1)
require.Equal(t, "call_123", toolCalls[0].ID)
require.Equal(t, "function", toolCalls[0].Type)
require.Equal(t, "get_weather", toolCalls[0].Function.Name)
require.Equal(t, `{"location":"Beijing"}`, toolCalls[0].Function.Arguments)
})
t.Run("multiple tool calls assembly", func(t *testing.T) {
chunks := [][]byte{
[]byte(`{"choices":[{"delta":{"tool_calls":[{"index":0,"id":"call_001","type":"function","function":{"name":"get_weather","arguments":""}}]}}]}`),
[]byte(`{"choices":[{"delta":{"tool_calls":[{"index":1,"id":"call_002","type":"function","function":{"name":"get_time","arguments":""}}]}}]}`),
[]byte(`{"choices":[{"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"city\":\"Beijing\"}"}}]}}]}`),
[]byte(`{"choices":[{"delta":{"tool_calls":[{"index":1,"function":{"arguments":"{\"timezone\":\"UTC+8\"}"}}]}}]}`),
}
var buffer *StreamingToolCallsBuffer
for _, chunk := range chunks {
buffer = extractStreamingToolCalls(chunk, buffer)
}
toolCalls := getToolCallsFromBuffer(buffer)
require.Len(t, toolCalls, 2)
// 验证第一个 tool call
require.Equal(t, "call_001", toolCalls[0].ID)
require.Equal(t, "get_weather", toolCalls[0].Function.Name)
require.Equal(t, `{"city":"Beijing"}`, toolCalls[0].Function.Arguments)
// 验证第二个 tool call
require.Equal(t, "call_002", toolCalls[1].ID)
require.Equal(t, "get_time", toolCalls[1].Function.Name)
require.Equal(t, `{"timezone":"UTC+8"}`, toolCalls[1].Function.Arguments)
})
t.Run("empty chunks", func(t *testing.T) {
chunks := [][]byte{
[]byte(`{"choices":[{"delta":{}}]}`),
[]byte(`{"choices":[{"delta":{"content":"Hello"}}]}`),
}
var buffer *StreamingToolCallsBuffer
for _, chunk := range chunks {
buffer = extractStreamingToolCalls(chunk, buffer)
}
toolCalls := getToolCallsFromBuffer(buffer)
require.Len(t, toolCalls, 0)
})
}
// TestBuiltinAttributeHelpers 测试内置属性辅助函数
func TestBuiltinAttributeHelpers(t *testing.T) {
t.Run("isBuiltinAttribute", func(t *testing.T) {
require.True(t, isBuiltinAttribute("question"))
require.True(t, isBuiltinAttribute("answer"))
require.True(t, isBuiltinAttribute("tool_calls"))
require.True(t, isBuiltinAttribute("reasoning"))
require.False(t, isBuiltinAttribute("custom_key"))
require.False(t, isBuiltinAttribute("model"))
})
t.Run("getBuiltinAttributeDefaultSources", func(t *testing.T) {
// question 应该默认从 request_body 提取
questionSources := getBuiltinAttributeDefaultSources("question")
require.Equal(t, []string{RequestBody}, questionSources)
// answer 应该支持 streaming 和 non-streaming
answerSources := getBuiltinAttributeDefaultSources("answer")
require.Contains(t, answerSources, ResponseStreamingBody)
require.Contains(t, answerSources, ResponseBody)
// tool_calls 应该支持 streaming 和 non-streaming
toolCallsSources := getBuiltinAttributeDefaultSources("tool_calls")
require.Contains(t, toolCallsSources, ResponseStreamingBody)
require.Contains(t, toolCallsSources, ResponseBody)
// reasoning 应该支持 streaming 和 non-streaming
reasoningSources := getBuiltinAttributeDefaultSources("reasoning")
require.Contains(t, reasoningSources, ResponseStreamingBody)
require.Contains(t, reasoningSources, ResponseBody)
// 非内置属性应该返回 nil
customSources := getBuiltinAttributeDefaultSources("custom_key")
require.Nil(t, customSources)
})
t.Run("shouldProcessBuiltinAttribute", func(t *testing.T) {
// 配置了 value_source 时,应该精确匹配
require.True(t, shouldProcessBuiltinAttribute("question", RequestBody, RequestBody))
require.False(t, shouldProcessBuiltinAttribute("question", RequestBody, ResponseBody))
// 没有配置 value_source 时,内置属性应该使用默认 source
require.True(t, shouldProcessBuiltinAttribute("question", "", RequestBody))
require.False(t, shouldProcessBuiltinAttribute("question", "", ResponseBody))
require.True(t, shouldProcessBuiltinAttribute("answer", "", ResponseBody))
require.True(t, shouldProcessBuiltinAttribute("answer", "", ResponseStreamingBody))
require.False(t, shouldProcessBuiltinAttribute("answer", "", RequestBody))
// 非内置属性没有配置 value_source 时,不应该处理
require.False(t, shouldProcessBuiltinAttribute("custom_key", "", RequestBody))
require.False(t, shouldProcessBuiltinAttribute("custom_key", "", ResponseBody))
})
}
// TestSessionIdDebugOutput 演示session_id的debug日志输出
func TestSessionIdDebugOutput(t *testing.T) {
test.RunTest(t, func(t *testing.T) {
t.Run("session id with full flow", func(t *testing.T) {
host, status := test.NewTestHost(sessionIdConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 1. 处理请求头 - 带 session_id
host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
{"x-custom-session", "sess_abc123xyz"},
})
// 2. 处理请求体
requestBody := []byte(`{
"model": "gpt-4",
"messages": [
{"role": "user", "content": "What is 2+2?"}
]
}`)
host.CallOnHttpRequestBody(requestBody)
// 3. 处理响应头
host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
// 4. 处理响应体
responseBody := []byte(`{
"choices": [{"message": {"role": "assistant", "content": "2+2 equals 4."}}],
"model": "gpt-4",
"usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}
}`)
host.CallOnHttpResponseBody(responseBody)
host.CompleteHttp()
})
})
}
// 测试配置Token Details 配置
var tokenDetailsConfig = func() json.RawMessage {
data, _ := json.Marshal(map[string]interface{}{
"attributes": []map[string]interface{}{
{
"key": "reasoning_tokens",
"apply_to_log": true,
},
{
"key": "cached_tokens",
"apply_to_log": true,
},
{
"key": "input_token_details",
"apply_to_log": true,
},
{
"key": "output_token_details",
"apply_to_log": true,
},
},
"disable_openai_usage": false,
})
return data
}()
// TestTokenDetails 测试 token details 功能
func TestTokenDetails(t *testing.T) {
t.Run("test builtin token details attributes", func(t *testing.T) {
host, status := test.NewTestHost(tokenDetailsConfig)
defer host.Reset()
require.Equal(t, types.OnPluginStartStatusOK, status)
// 设置路由和集群名称
host.SetRouteName("api-v1")
host.SetClusterName("cluster-1")
// 1. 处理请求头
action := host.CallOnHttpRequestHeaders([][2]string{
{":authority", "example.com"},
{":path", "/v1/chat/completions"},
{":method", "POST"},
})
require.Equal(t, types.ActionContinue, action)
// 2. 处理请求体
requestBody := []byte(`{
"model": "gpt-4o",
"messages": [
{"role": "user", "content": "Test question"}
]
}`)
action = host.CallOnHttpRequestBody(requestBody)
require.Equal(t, types.ActionContinue, action)
// 3. 处理响应头
action = host.CallOnHttpResponseHeaders([][2]string{
{":status", "200"},
{"content-type", "application/json"},
})
require.Equal(t, types.ActionContinue, action)
// 4. 处理响应体(包含 token details
responseBody := []byte(`{
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": "gpt-4o",
"usage": {
"prompt_tokens": 100,
"completion_tokens": 50,
"total_tokens": 150,
"completion_tokens_details": {
"reasoning_tokens": 25
},
"prompt_tokens_details": {
"cached_tokens": 80
}
},
"choices": [{
"message": {
"role": "assistant",
"content": "Test answer"
},
"finish_reason": "stop"
}]
}`)
action = host.CallOnHttpResponseBody(responseBody)
require.Equal(t, types.ActionContinue, action)
// 5. 完成请求
host.CompleteHttp()
})
}