diff --git a/plugins/wasm-go/extensions/ai-statistics/.gitignore b/plugins/wasm-go/extensions/ai-statistics/.gitignore new file mode 100644 index 000000000..32841a534 --- /dev/null +++ b/plugins/wasm-go/extensions/ai-statistics/.gitignore @@ -0,0 +1,2 @@ +main.wasm +config.yaml \ No newline at end of file diff --git a/plugins/wasm-go/extensions/ai-statistics/README.md b/plugins/wasm-go/extensions/ai-statistics/README.md new file mode 100644 index 000000000..49a6e3ddf --- /dev/null +++ b/plugins/wasm-go/extensions/ai-statistics/README.md @@ -0,0 +1,44 @@ +# 介绍 +提供AI可观测基础能力,其后需接ai-proxy插件,如果不接ai-proxy插件的话,则只支持openai协议。 + +# 配置说明 + +| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | +|------------|--------|------|-----|------------------| +| `enable` | bool | 必填 | - | 是否开启ai统计功能 | + +开启后 metrics 示例: +``` +route_upstream_model_input_token{ai_route="openai",ai_cluster="qwen",ai_model="qwen-max"} 21 +route_upstream_model_output_token{ai_route="openai",ai_cluster="qwen",ai_model="qwen-max"} 17 +``` + +日志示例: + +```json +{ + "model": "qwen-max", + "input_token": "21", + "output_token": "17", + "authority": "dashscope.aliyuncs.com", + "bytes_received": "336", + "bytes_sent": "1675", + "duration": "1590", + "istio_policy_status": "-", + "method": "POST", + "path": "/v1/chat/completions", + "protocol": "HTTP/1.1", + "request_id": "5895f5a9-e4e3-425b-98db-6c6a926195b7", + "requested_server_name": "-", + "response_code": "200", + "response_flags": "-", + "route_name": "openai", + "start_time": "2024-06-18T09:37:14.078Z", + "trace_id": "-", + "upstream_cluster": "qwen", + "upstream_service_time": "496", + "upstream_transport_failure_reason": "-", + "user_agent": "PostmanRuntime/7.37.3", + "x_forwarded_for": "-" +} +``` \ No newline at end of file diff --git a/plugins/wasm-go/extensions/ai-statistics/go.mod b/plugins/wasm-go/extensions/ai-statistics/go.mod new file mode 100644 index 000000000..69544d3b6 --- /dev/null +++ b/plugins/wasm-go/extensions/ai-statistics/go.mod @@ -0,0 +1,21 @@ +module ai-statistics + +go 1.18 + +require ( + github.com/alibaba/higress/plugins/wasm-go v1.3.6-0.20240522012622-fc6a6aad8906 + github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc + github.com/tidwall/gjson v1.14.3 +) + +require github.com/tetratelabs/wazero v1.7.1 // indirect + +require ( + github.com/google/uuid v1.3.0 // indirect + github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 // indirect + github.com/magefile/mage v1.14.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/resp v0.1.1 // indirect + github.com/wasilibs/go-re2 v1.5.3 +) diff --git a/plugins/wasm-go/extensions/ai-statistics/go.sum b/plugins/wasm-go/extensions/ai-statistics/go.sum new file mode 100644 index 000000000..1e17cd5a8 --- /dev/null +++ b/plugins/wasm-go/extensions/ai-statistics/go.sum @@ -0,0 +1,26 @@ +github.com/alibaba/higress/plugins/wasm-go v1.3.6-0.20240522012622-fc6a6aad8906 h1:RhEmB+ApLKsClZD7joTC4ifmsVgOVz4pFLdPR3xhNaE= +github.com/alibaba/higress/plugins/wasm-go v1.3.6-0.20240522012622-fc6a6aad8906/go.mod h1:10jQXKsYFUF7djs+Oy7t82f4dbie9pISfP9FJwpPLuk= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 h1:IHDghbGQ2DTIXHBHxWfqCYQW1fKjyJ/I7W1pMyUDeEA= +github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520/go.mod h1:Nz8ORLaFiLWotg6GeKlJMhv8cci8mM43uEnLA5t8iew= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc h1:t2AT8zb6N/59Y78lyRWedVoVWHNRSCBh0oWCC+bluTQ= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo= +github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= +github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/tetratelabs/wazero v1.7.1 h1:QtSfd6KLc41DIMpDYlJdoMc6k7QTN246DM2+n2Y/Dx8= +github.com/tetratelabs/wazero v1.7.1/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= +github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw= +github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/resp v0.1.1 h1:Ly20wkhqKTmDUPlyM1S7pWo5kk0tDu8OoC/vFArXmwE= +github.com/tidwall/resp v0.1.1/go.mod h1:3/FrruOBAxPTPtundW0VXgmsQ4ZBA0Aw714lVYgwFa0= +github.com/wasilibs/go-re2 v1.5.3 h1:wiuTcgDZdLhu8NG8oqF5sF5Q3yIU14lPAvXqeYzDK3g= +github.com/wasilibs/go-re2 v1.5.3/go.mod h1:PzpVPsBdFC7vM8QJbbEnOeTmwA0DGE783d/Gex8eCV8= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/plugins/wasm-go/extensions/ai-statistics/main.go b/plugins/wasm-go/extensions/ai-statistics/main.go new file mode 100644 index 000000000..380e0d5bc --- /dev/null +++ b/plugins/wasm-go/extensions/ai-statistics/main.go @@ -0,0 +1,143 @@ +package main + +import ( + "fmt" + "strings" + + "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" + "github.com/tidwall/gjson" +) + +func main() { + wrapper.SetCtx( + "ai-statistics", + wrapper.ParseConfigBy(parseConfig), + wrapper.ProcessResponseHeadersBy(onHttpResponseHeaders), + wrapper.ProcessStreamingResponseBodyBy(onHttpStreamingBody), + wrapper.ProcessResponseBodyBy(onHttpResponseBody), + ) +} + +type AIStatisticsConfig struct { + enable bool + metrics map[string]proxywasm.MetricCounter +} + +func (config *AIStatisticsConfig) incrementCounter(metricName string, inc uint64, log wrapper.Log) { + counter, ok := config.metrics[metricName] + if !ok { + counter = proxywasm.DefineCounterMetric(metricName) + config.metrics[metricName] = counter + } + counter.Increment(inc) +} + +func parseConfig(json gjson.Result, config *AIStatisticsConfig, log wrapper.Log) error { + config.enable = json.Get("enable").Bool() + config.metrics = make(map[string]proxywasm.MetricCounter) + return nil +} + +func onHttpResponseHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action { + if !config.enable { + ctx.DontReadResponseBody() + return types.ActionContinue + } + contentType, _ := proxywasm.GetHttpResponseHeader("content-type") + if !strings.Contains(contentType, "text/event-stream") { + ctx.BufferResponseBody() + } + return types.ActionContinue +} + +func getLastChunk(data []byte) []byte { + chunks := strings.Split(strings.TrimSpace(string(data)), "\n\n") + length := len(chunks) + if length < 2 { + return data + } + // ai-proxy append extra usage chunk + return []byte(chunks[length-1]) +} + +func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, data []byte, endOfStream bool, log wrapper.Log) []byte { + lastChunk := getLastChunk(data) + modelObj := gjson.GetBytes(lastChunk, "model") + inputTokenObj := gjson.GetBytes(lastChunk, "usage.prompt_tokens") + outputTokenObj := gjson.GetBytes(lastChunk, "usage.completion_tokens") + if modelObj.Exists() && inputTokenObj.Exists() && outputTokenObj.Exists() { + ctx.SetContext("model", modelObj.String()) + ctx.SetContext("input_token", inputTokenObj.Int()) + ctx.SetContext("output_token", outputTokenObj.Int()) + } + + if endOfStream { + var route, cluster string + if raw, err := proxywasm.GetProperty([]string{"route_name"}); err == nil { + route = string(raw) + } + if raw, err := proxywasm.GetProperty([]string{"cluster_name"}); err == nil { + cluster = string(raw) + } + model, ok := ctx.GetContext("model").(string) + if !ok { + log.Error("Get model failed!") + return data + } + inputToken, ok := ctx.GetContext("input_token").(int64) + if !ok { + log.Error("Get input_token failed!") + return data + } + outputToken, ok := ctx.GetContext("output_token").(int64) + if !ok { + log.Error("Get output_token failed!") + return data + } + config.incrementCounter("route."+route+".upstream."+cluster+".model."+model+".input_token", uint64(inputToken), log) + config.incrementCounter("route."+route+".upstream."+cluster+".model."+model+".output_token", uint64(outputToken), log) + proxywasm.SetProperty([]string{"model"}, []byte(model)) + proxywasm.SetProperty([]string{"input_token"}, []byte(fmt.Sprint(inputToken))) + proxywasm.SetProperty([]string{"output_token"}, []byte(fmt.Sprint(outputToken))) + } + + return data +} + +func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action { + modeObj := gjson.GetBytes(body, "model") + inputTokenObj := gjson.GetBytes(body, "usage.prompt_tokens") + outputTokenObj := gjson.GetBytes(body, "usage.completion_tokens") + if !modeObj.Exists() { + log.Error("Get model failed") + return types.ActionContinue + } + if !inputTokenObj.Exists() { + log.Error("Get input_token failed") + return types.ActionContinue + } + if !outputTokenObj.Exists() { + log.Error("Get output_token failed") + return types.ActionContinue + } + model := modeObj.String() + inputToken := inputTokenObj.Int() + outputToken := outputTokenObj.Int() + var route, cluster string + if raw, err := proxywasm.GetProperty([]string{"route_name"}); err == nil { + route = string(raw) + } + if raw, err := proxywasm.GetProperty([]string{"cluster_name"}); err == nil { + cluster = string(raw) + } + config.incrementCounter("route."+route+".upstream."+cluster+".model."+model+".input_token", uint64(inputToken), log) + config.incrementCounter("route."+route+".upstream."+cluster+".model."+model+".output_token", uint64(outputToken), log) + + proxywasm.SetProperty([]string{"model"}, []byte(model)) + proxywasm.SetProperty([]string{"input_token"}, []byte(fmt.Sprint(inputToken))) + proxywasm.SetProperty([]string{"output_token"}, []byte(fmt.Sprint(outputToken))) + + return types.ActionContinue +}