// Copyright (c) 2024 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "encoding/json" "testing" "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" "github.com/higress-group/wasm-go/pkg/test" "github.com/stretchr/testify/require" ) // 测试配置:基础安全配置 var basicConfig = func() json.RawMessage { data, _ := json.Marshal(map[string]interface{}{ "serviceName": "security-service", "servicePort": 8080, "serviceHost": "security.example.com", "accessKey": "test-ak", "secretKey": "test-sk", "checkRequest": true, "checkResponse": true, "contentModerationLevelBar": "high", "promptAttackLevelBar": "high", "sensitiveDataLevelBar": "S3", "timeout": 2000, "bufferLimit": 1000, }) return data }() // 测试配置:仅检查请求 var requestOnlyConfig = func() json.RawMessage { data, _ := json.Marshal(map[string]interface{}{ "serviceName": "security-service", "servicePort": 8080, "serviceHost": "security.example.com", "accessKey": "test-ak", "secretKey": "test-sk", "checkRequest": true, "checkResponse": false, "contentModerationLevelBar": "high", "promptAttackLevelBar": "high", "sensitiveDataLevelBar": "S3", "timeout": 1000, "bufferLimit": 500, }) return data }() // 测试配置:缺少必需字段 var missingRequiredConfig = func() json.RawMessage { data, _ := json.Marshal(map[string]interface{}{ "accessKey": "test-ak", "secretKey": "test-sk", // 故意缺少必需字段:serviceName, servicePort, serviceHost }) return data }() // 测试配置:缺少服务配置字段 var missingServiceConfig = func() json.RawMessage { data, _ := json.Marshal(map[string]interface{}{ "accessKey": "test-ak", "secretKey": "test-sk", "checkRequest": true, "checkResponse": true, // 缺少 serviceName, servicePort, serviceHost }) return data }() // 测试配置:缺少认证字段 var missingAuthConfig = func() json.RawMessage { data, _ := json.Marshal(map[string]interface{}{ "serviceName": "security-service", "servicePort": 8080, "serviceHost": "security.example.com", "checkRequest": true, "checkResponse": true, // 缺少 accessKey, secretKey }) return data }() func TestParseConfig(t *testing.T) { test.RunGoTest(t, func(t *testing.T) { // 测试基础配置解析 t.Run("basic config", func(t *testing.T) { host, status := test.NewTestHost(basicConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusOK, status) config, err := host.GetMatchConfig() require.NoError(t, err) require.NotNil(t, config) securityConfig := config.(*AISecurityConfig) require.Equal(t, "test-ak", securityConfig.ak) require.Equal(t, "test-sk", securityConfig.sk) require.Equal(t, true, securityConfig.checkRequest) require.Equal(t, true, securityConfig.checkResponse) require.Equal(t, "high", securityConfig.contentModerationLevelBar) require.Equal(t, "high", securityConfig.promptAttackLevelBar) require.Equal(t, "S3", securityConfig.sensitiveDataLevelBar) require.Equal(t, uint32(2000), securityConfig.timeout) require.Equal(t, 1000, securityConfig.bufferLimit) }) // 测试仅检查请求的配置 t.Run("request only config", func(t *testing.T) { host, status := test.NewTestHost(requestOnlyConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusOK, status) config, err := host.GetMatchConfig() require.NoError(t, err) require.NotNil(t, config) securityConfig := config.(*AISecurityConfig) require.Equal(t, true, securityConfig.checkRequest) require.Equal(t, false, securityConfig.checkResponse) require.Equal(t, "high", securityConfig.contentModerationLevelBar) require.Equal(t, "high", securityConfig.promptAttackLevelBar) require.Equal(t, "S3", securityConfig.sensitiveDataLevelBar) }) // 测试缺少必需字段的配置 t.Run("missing required config", func(t *testing.T) { host, status := test.NewTestHost(missingRequiredConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusFailed, status) }) // 测试缺少服务配置字段 t.Run("missing service config", func(t *testing.T) { host, status := test.NewTestHost(missingServiceConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusFailed, status) }) // 测试缺少认证字段 t.Run("missing auth config", func(t *testing.T) { host, status := test.NewTestHost(missingAuthConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusFailed, status) }) }) } func TestOnHttpRequestHeaders(t *testing.T) { test.RunTest(t, func(t *testing.T) { // 测试启用请求检查的情况 t.Run("request checking enabled", func(t *testing.T) { host, status := test.NewTestHost(basicConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusOK, status) // 设置请求头 action := host.CallOnHttpRequestHeaders([][2]string{ {":authority", "example.com"}, {":path", "/v1/chat/completions"}, {":method", "POST"}, }) // 应该返回ActionContinue require.Equal(t, types.ActionContinue, action) }) // 测试禁用请求检查的情况 t.Run("request checking disabled", func(t *testing.T) { host, status := test.NewTestHost(requestOnlyConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusOK, status) // 设置请求头 action := host.CallOnHttpRequestHeaders([][2]string{ {":authority", "example.com"}, {":path", "/v1/chat/completions"}, {":method", "POST"}, }) // 应该返回ActionContinue require.Equal(t, types.ActionContinue, action) }) }) } func TestOnHttpRequestBody(t *testing.T) { test.RunTest(t, func(t *testing.T) { // 测试请求体安全检查通过 t.Run("request body security check pass", func(t *testing.T) { host, status := test.NewTestHost(basicConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusOK, status) // 先设置请求头 host.CallOnHttpRequestHeaders([][2]string{ {":authority", "example.com"}, {":path", "/v1/chat/completions"}, {":method", "POST"}, }) // 设置请求体 body := `{"messages": [{"role": "user", "content": "Hello, how are you?"}]}` action := host.CallOnHttpRequestBody([]byte(body)) // 应该返回ActionPause,等待安全检查结果 require.Equal(t, types.ActionPause, action) // 模拟安全检查服务响应(通过) securityResponse := `{"Code": 200, "Message": "Success", "RequestId": "req-123", "Data": {"RiskLevel": "low"}}` host.CallOnHttpCall([][2]string{ {":status", "200"}, {"content-type", "application/json"}, }, []byte(securityResponse)) action = host.GetHttpStreamAction() require.Equal(t, types.ActionContinue, action) host.CompleteHttp() }) // 测试空请求内容 t.Run("empty request content", func(t *testing.T) { host, status := test.NewTestHost(basicConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusOK, status) // 先设置请求头 host.CallOnHttpRequestHeaders([][2]string{ {":authority", "example.com"}, {":path", "/v1/chat/completions"}, {":method", "POST"}, }) // 设置空内容的请求体 body := `{"messages": [{"role": "user", "content": ""}]}` action := host.CallOnHttpRequestBody([]byte(body)) // 空内容应该直接通过 require.Equal(t, types.ActionContinue, action) }) }) } func TestOnHttpResponseHeaders(t *testing.T) { test.RunTest(t, func(t *testing.T) { // 测试启用响应检查的情况 t.Run("response checking enabled", func(t *testing.T) { host, status := test.NewTestHost(basicConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusOK, status) // 先设置请求头 host.CallOnHttpRequestHeaders([][2]string{ {":authority", "example.com"}, {":path", "/v1/chat/completions"}, {":method", "POST"}, }) // 设置响应头 action := host.CallOnHttpResponseHeaders([][2]string{ {":status", "200"}, {"content-type", "application/json"}, }) // 应该返回HeaderStopIteration require.Equal(t, types.HeaderStopIteration, action) }) // 测试禁用响应检查的情况 t.Run("response checking disabled", func(t *testing.T) { host, status := test.NewTestHost(requestOnlyConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusOK, status) // 先设置请求头 host.CallOnHttpRequestHeaders([][2]string{ {":authority", "example.com"}, {":path", "/v1/chat/completions"}, {":method", "POST"}, }) // 设置响应头 action := host.CallOnHttpResponseHeaders([][2]string{ {":status", "200"}, {"content-type", "application/json"}, }) // 应该返回ActionContinue require.Equal(t, types.ActionContinue, action) }) // 测试非200状态码 t.Run("non-200 status code", func(t *testing.T) { host, status := test.NewTestHost(basicConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusOK, status) // 先设置请求头 host.CallOnHttpRequestHeaders([][2]string{ {":authority", "example.com"}, {":path", "/v1/chat/completions"}, {":method", "POST"}, }) // 设置非200响应头 action := host.CallOnHttpResponseHeaders([][2]string{ {":status", "500"}, {"content-type", "application/json"}, }) // 应该返回ActionContinue require.Equal(t, types.ActionContinue, action) }) }) } func TestRiskLevelFunctions(t *testing.T) { // 测试风险等级转换函数 t.Run("risk level conversion", func(t *testing.T) { require.Equal(t, 4, levelToInt(MaxRisk)) require.Equal(t, 3, levelToInt(HighRisk)) require.Equal(t, 2, levelToInt(MediumRisk)) require.Equal(t, 1, levelToInt(LowRisk)) require.Equal(t, 0, levelToInt(NoRisk)) require.Equal(t, -1, levelToInt("invalid")) }) // 测试风险等级比较 t.Run("risk level comparison", func(t *testing.T) { require.True(t, levelToInt(HighRisk) >= levelToInt(MediumRisk)) require.True(t, levelToInt(MediumRisk) >= levelToInt(LowRisk)) require.True(t, levelToInt(LowRisk) >= levelToInt(NoRisk)) require.False(t, levelToInt(LowRisk) >= levelToInt(HighRisk)) }) } func TestUtilityFunctions(t *testing.T) { // 测试URL编码函数 t.Run("url encoding", func(t *testing.T) { original := "test+string:with=special&chars@$" encoded := urlEncoding(original) require.NotEqual(t, original, encoded) require.Contains(t, encoded, "%2B") // + 应该被编码 require.Contains(t, encoded, "%3A") // : 应该被编码 require.Contains(t, encoded, "%3D") // = 应该被编码 require.Contains(t, encoded, "%26") // & 应该被编码 }) // 测试HMAC-SHA1签名函数 t.Run("hmac sha1", func(t *testing.T) { message := "test message" secret := "test secret" signature := hmacSha1(message, secret) require.NotEmpty(t, signature) require.NotEqual(t, message, signature) }) // 测试签名生成函数 t.Run("signature generation", func(t *testing.T) { host, status := test.NewTestHost(basicConfig) defer host.Reset() require.Equal(t, types.OnPluginStartStatusOK, status) params := map[string]string{ "key1": "value1", "key2": "value2", } secret := "test-secret" signature := getSign(params, secret) require.NotEmpty(t, signature) }) // 测试十六进制ID生成函数 t.Run("hex id generation", func(t *testing.T) { id, err := generateHexID(16) require.NoError(t, err) require.Len(t, id, 16) require.Regexp(t, "^[0-9a-f]+$", id) }) // 测试随机ID生成函数 t.Run("random id generation", func(t *testing.T) { id := generateRandomID() require.NotEmpty(t, id) require.Contains(t, id, "chatcmpl-") require.Len(t, id, 38) // "chatcmpl-" + 29 random chars }) } func TestMarshalFunctions(t *testing.T) { // 测试marshalStr函数 t.Run("marshal string", func(t *testing.T) { testStr := "Hello, World!" marshalled := marshalStr(testStr) require.Equal(t, testStr, marshalled) }) // 测试extractMessageFromStreamingBody函数 t.Run("extract streaming body", func(t *testing.T) { // 使用正确的分隔符,每个chunk之间用双换行符分隔 streamingData := []byte(`{"choices":[{"index":0,"delta":{"role":"assistant","content":"Hello"}}]} {"choices":[{"index":0,"delta":{"role":"assistant","content":" World"}}]} {"choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}`) extracted := extractMessageFromStreamingBody(streamingData, "choices.0.delta.content") require.Equal(t, "Hello World", extracted) }) }