Update ai statistics (#1303)

This commit is contained in:
rinfx
2024-09-24 19:42:10 +08:00
committed by GitHub
parent bef9139753
commit b82853c653
5 changed files with 679 additions and 223 deletions

View File

@@ -1,69 +1,178 @@
# 介绍
提供AI可观测基础能力其后需接ai-proxy插件如果不接ai-proxy插件的话则只支持openai协议。
---
title: AI可观测
keywords: [higress, AI, observability]
description: AI可观测配置参考
---
# 配置说明
## 介绍
提供AI可观测基础能力包括 metric, log, trace其后需接ai-proxy插件如果不接ai-proxy插件的话则需要用户进行相应配置才可生效。
## 运行属性
插件执行阶段:`默认阶段`
插件执行优先级:`200`
## 配置说明
插件默认请求符合openai协议格式并提供了以下基础可观测值用户无需特殊配置
- metric提供了输入token、输出token、首个token的rt流式请求、请求总rt等指标支持在网关、路由、服务、模型四个维度上进行观测
- log提供了 input_token, output_token, model, llm_service_duration, llm_first_token_duration 等字段
用户还可以通过配置的方式对可观测的值进行扩展:
| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 |
|----------------|-------|------|-----|------------------------|
| `enable` | bool | 必填 | - | 是否开启ai统计功能 |
| `tracing_span` | array | 非必填 | - | 自定义tracing span tag 配置 |
| `attributes` | []Attribute | 必填 | - | 用户希望记录在log/span中的信息 |
Attribute 配置说明:
## tracing_span 配置说明
| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 |
|----------------|-------|-----|-----|------------------------|
| `key` | string | 必填 | - | tracing tag 名称 |
| `value_source` | string | 必填 | - | tag 取值来源 |
| `value` | string | 必填 | - | tag 取值 key value/path |
| `key` | string | 必填 | - | attrribute 名称 |
| `value_source` | string | 必填 | - | attrribute 取值来源,可选值为 `fixed_value`, `request_header`, `request_body`, `response_header`, `response_body`, `response_streaming_body` |
| `value` | string | 必填 | - | attrribute 取值 key value/path |
| `rule` | string | 非必填 | - | 从流式响应中提取 attrribute 的规则,可选值为 `first`, `replace`, `append`|
| `apply_to_log` | bool | 非必填 | false | 是否将提取的信息记录在日志中 |
| `apply_to_span` | bool | 非必填 | false | 是否将提取的信息记录在链路追踪span中 |
value_source为 tag 值的取值来源,可选配置值有 4 个
- property tag 值通过proxywasm.GetProperty()方法获取value配置GetProperty()方法要提取的key名
- requeset_header tag 值通过http请求头获取value配置为header key
- request_body tag 值通过请求body获取value配置格式为 gjson的 GJSON PATH 语法
- response_header tag 值通过http响应头获取value配置为header key
`value_source` 的各种取值含义如下
- `fixed_value`:固定值
- `requeset_header` attrribute 值通过 http 请求头获取value 配置为 header key
- `request_body` attrribute 值通过请求 body 获取value 配置格式为 gjson 的 jsonpath
- `response_header` attrribute 值通过 http 响应头获取value 配置为header key
- `response_body` attrribute 值通过响应 body 获取value 配置格式为 gjson 的 jsonpath
- `response_streaming_body` attrribute 值通过流式响应 body 获取value 配置格式为 gjson 的 jsonpath
`value_source``response_streaming_body` 时,应当配置 `rule`用于指定如何从流式body中获取指定值取值含义如下
- `first`多个chunk中取第一个有效chunk的值
- `replace`多个chunk中取最后一个有效chunk的值
- `append`拼接多个有效chunk中的值可用于获取回答内容
## 配置示例
如果希望在网关访问日志中记录ai-statistic相关的统计值需要修改log_format在原log_format基础上添加一个新字段示例如下
举例如下:
```yaml
tracing_label:
- key: "session_id"
value_source: "requeset_header"
value: "session_id"
- key: "user_content"
value_source: "request_body"
value: "input.messages.1.content"
'{"ai_log":"%FILTER_STATE(wasm.ai_log:PLAIN)%"}'
```
开启后 metrics 示例:
### 空配置
#### 监控
```
route_upstream_model_input_token{ai_route="openai",ai_cluster="qwen",ai_model="qwen-max"} 21
route_upstream_model_output_token{ai_route="openai",ai_cluster="qwen",ai_model="qwen-max"} 17
route_upstream_model_metric_input_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 10
route_upstream_model_metric_llm_duration_count{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 1
route_upstream_model_metric_llm_first_token_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 309
route_upstream_model_metric_llm_service_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 1955
route_upstream_model_metric_output_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 69
```
日志示例:
#### 日志
```json
{
"model": "qwen-max",
"input_token": "21",
"output_token": "17",
"authority": "dashscope.aliyuncs.com",
"bytes_received": "336",
"bytes_sent": "1675",
"duration": "1590",
"istio_policy_status": "-",
"method": "POST",
"path": "/v1/chat/completions",
"protocol": "HTTP/1.1",
"request_id": "5895f5a9-e4e3-425b-98db-6c6a926195b7",
"requested_server_name": "-",
"response_code": "200",
"response_flags": "-",
"route_name": "openai",
"start_time": "2024-06-18T09:37:14.078Z",
"trace_id": "-",
"upstream_cluster": "qwen",
"upstream_service_time": "496",
"upstream_transport_failure_reason": "-",
"user_agent": "PostmanRuntime/7.37.3",
"x_forwarded_for": "-"
"ai_log":"{\"model\":\"qwen-turbo\",\"input_token\":\"10\",\"output_token\":\"69\",\"llm_first_token_duration\":\"309\",\"llm_service_duration\":\"1955\"}"
}
```
#### 链路追踪
配置为空时不会在span中添加额外的attribute
### 从非openai协议提取token使用信息
在ai-proxy中设置协议为original时以百炼为例可作如下配置指定如何提取model, input_token, output_token
```yaml
attributes:
- key: model
value_source: response_body
value: usage.models.0.model_id
apply_to_log: true
apply_to_span: false
- key: input_token
value_source: response_body
value: usage.models.0.input_tokens
apply_to_log: true
apply_to_span: false
- key: output_token
value_source: response_body
value: usage.models.0.output_tokens
apply_to_log: true
apply_to_span: false
```
#### 监控
```
route_upstream_model_metric_input_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 343
route_upstream_model_metric_output_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 153
route_upstream_model_metric_llm_service_duration{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 3725
route_upstream_model_metric_llm_duration_count{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 1
```
#### 日志
此配置下日志效果如下:
```json
{
"ai_log": "{\"model\":\"qwen-max\",\"input_token\":\"343\",\"output_token\":\"153\",\"llm_service_duration\":\"19110\"}"
}
```
#### 链路追踪
链路追踪的 span 中可以看到 model, input_token, output_token 三个额外的 attribute
### 配合认证鉴权记录consumer
举例如下:
```yaml
attributes:
- key: consumer # 配合认证鉴权记录consumer
value_source: request_header
value: x-mse-consumer
apply_to_log: true
```
### 记录问题与回答
```yaml
attributes:
- key: question # 记录问题
value_source: request_body
value: messages.@reverse.0.content
apply_to_log: true
- key: answer # 在流式响应中提取大模型的回答
value_source: response_streaming_body
value: choices.0.delta.content
rule: append
apply_to_log: true
- key: answer # 在非流式响应中提取大模型的回答
value_source: response_body
value: choices.0.message.content
apply_to_log: true
```
## 进阶
配合阿里云SLS数据加工可以将ai相关的字段进行提取加工例如原始日志为
```
ai_log:{"question":"用python计算2的3次方","answer":"你可以使用 Python 的乘方运算符 `**` 来计算一个数的次方。计算2的3次方即2乘以自己2次可以用以下代码表示\n\n```python\nresult = 2 ** 3\nprint(result)\n```\n\n运行这段代码你会得到输出结果为8因为2乘以自己两次等于8。","model":"qwen-max","input_token":"16","output_token":"76","llm_service_duration":"5913"}
```
使用如下数据加工脚本可以提取出question和answer
```
e_regex("ai_log", grok("%{EXTRACTJSON}"))
e_set("question", json_select(v("json"), "question", default="-"))
e_set("answer", json_select(v("json"), "answer", default="-"))
```
提取后SLS中会添加question和answer两个字段示例如下
```
ai_log:{"question":"用python计算2的3次方","answer":"你可以使用 Python 的乘方运算符 `**` 来计算一个数的次方。计算2的3次方即2乘以自己2次可以用以下代码表示\n\n```python\nresult = 2 ** 3\nprint(result)\n```\n\n运行这段代码你会得到输出结果为8因为2乘以自己两次等于8。","model":"qwen-max","input_token":"16","output_token":"76","llm_service_duration":"5913"}
question:用python计算2的3次方
answer:你可以使用 Python 的乘方运算符 `**` 来计算一个数的次方。计算2的3次方即2乘以自己2次可以用以下代码表示
result = 2 ** 3
print(result)
运行这段代码你会得到输出结果为8因为2乘以自己两次等于8。
```

View File

@@ -0,0 +1,145 @@
---
title: AI Statistics
keywords: [higress, AI, observability]
description: AI Statistics plugin configuration reference
---
## Introduction
Provides basic AI observability capabilities, including metric, log, and trace. The ai-proxy plug-in needs to be connected afterwards. If the ai-proxy plug-in is not connected, the user needs to configure it accordingly to take effect.
## Runtime Properties
Plugin Phase: `CUSTOM`
Plugin Priority: `200`
## Configuration instructions
The default request of the plug-in conforms to the openai protocol format and provides the following basic observable values. Users do not need special configuration:
- metric: It provides indicators such as input token, output token, rt of the first token (streaming request), total request rt, etc., and supports observation in the four dimensions of gateway, routing, service, and model.
- log: Provides input_token, output_token, model, llm_service_duration, llm_first_token_duration and other fields
Users can also expand observable values through configuration:
| Name | Type | Required | Default | Description |
|----------------|-------|------|-----|------------------------|
| `attributes` | []Attribute | required | - | Information that the user wants to record in log/span |
Attribute Configuration instructions:
| Name | Type | Required | Default | Description |
|----------------|-------|-----|-----|------------------------|
| `key` | string | required | - | attrribute key |
| `value_source` | string | required | - | attrribute value source, optional values are `fixed_value`, `request_header`, `request_body`, `response_header`, `response_body`, `response_streaming_body` |
| `value` | string | required | - | how to get attrribute value |
| `rule` | string | optional | - | Rule to extract attribute from streaming response, optional values are `first`, `replace`, `append`|
| `apply_to_log` | bool | optional | false | Whether to record the extracted information in the log |
| `apply_to_span` | bool | optional | false | Whether to record the extracted information in the link tracking span |
The meanings of various values for `value_source` are as follows:
- `fixed_value`: fixed value
- `requeset_header`: The attrribute is obtained through the http request header
- `request_body`: The attrribute is obtained through the http request body
- `response_header`: The attrribute is obtained through the http response header
- `response_body`: The attrribute is obtained through the http response body
- `response_streaming_body`: The attrribute is obtained through the http streaming response body
When `value_source` is `response_streaming_body`, `rule` should be configured to specify how to obtain the specified value from the streaming body. The meaning of the value is as follows:
- `first`: extract value from the first valid chunk
- `replace`: extract value from the last valid chunk
- `append`: join value pieces from all valid chunks
## Configuration example
If you want to record ai-statistic related statistical values in the gateway access log, you need to modify log_format and add a new field based on the original log_format. The example is as follows:
```yaml
'{"ai_log":"%FILTER_STATE(wasm.ai_log:PLAIN)%"}'
```
### Empty
#### Metric
```
route_upstream_model_metric_input_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 10
route_upstream_model_metric_llm_duration_count{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 1
route_upstream_model_metric_llm_first_token_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 309
route_upstream_model_metric_llm_service_duration{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 1955
route_upstream_model_metric_output_token{ai_route="llm",ai_cluster="outbound|443||qwen.dns",ai_model="qwen-turbo"} 69
```
#### Log
```json
{
"ai_log":"{\"model\":\"qwen-turbo\",\"input_token\":\"10\",\"output_token\":\"69\",\"llm_first_token_duration\":\"309\",\"llm_service_duration\":\"1955\"}"
}
```
#### Trace
When the configuration is empty, no additional attributes will be added to the span.
### Extract token usage information from non-openai protocols
When setting the protocol to original in ai-proxy, taking Alibaba Cloud Bailian as an example, you can make the following configuration to specify how to extract `model`, `input_token`, `output_token`
```yaml
attributes:
- key: model
value_source: response_body
value: usage.models.0.model_id
apply_to_log: true
apply_to_span: false
- key: input_token
value_source: response_body
value: usage.models.0.input_tokens
apply_to_log: true
apply_to_span: false
- key: output_token
value_source: response_body
value: usage.models.0.output_tokens
apply_to_log: true
apply_to_span: false
```
#### Metric
```
route_upstream_model_metric_input_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 343
route_upstream_model_metric_output_token{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 153
route_upstream_model_metric_llm_service_duration{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 3725
route_upstream_model_metric_llm_duration_count{ai_route="bailian",ai_cluster="qwen",ai_model="qwen-max"} 1
```
#### Log
```json
{
"ai_log": "{\"model\":\"qwen-max\",\"input_token\":\"343\",\"output_token\":\"153\",\"llm_service_duration\":\"19110\"}"
}
```
#### Trace
Three additional attributes `model`, `input_token`, and `output_token` can be seen in the trace spans.
### Cooperate with authentication and authentication record consumer
```yaml
attributes:
- key: consumer
value_source: request_header
value: x-mse-consumer
apply_to_log: true
```
### Record questions and answers
```yaml
attributes:
- key: question
value_source: request_body
value: messages.@reverse.0.content
apply_to_log: true
- key: answer
value_source: response_streaming_body
value: choices.0.delta.content
rule: append
apply_to_log: true
- key: answer
value_source: response_body
value: choices.0.message.content
apply_to_log: true
```

View File

@@ -10,8 +10,6 @@ require (
github.com/tidwall/gjson v1.14.3
)
require github.com/tetratelabs/wazero v1.7.1 // indirect
require (
github.com/google/uuid v1.3.0 // indirect
github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 // indirect
@@ -19,5 +17,4 @@ require (
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/resp v0.1.1 // indirect
github.com/wasilibs/go-re2 v1.5.3
)

View File

@@ -1,19 +1,14 @@
github.com/alibaba/higress/plugins/wasm-go v1.3.6-0.20240522012622-fc6a6aad8906 h1:RhEmB+ApLKsClZD7joTC4ifmsVgOVz4pFLdPR3xhNaE=
github.com/alibaba/higress/plugins/wasm-go v1.3.6-0.20240522012622-fc6a6aad8906/go.mod h1:10jQXKsYFUF7djs+Oy7t82f4dbie9pISfP9FJwpPLuk=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 h1:IHDghbGQ2DTIXHBHxWfqCYQW1fKjyJ/I7W1pMyUDeEA=
github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520/go.mod h1:Nz8ORLaFiLWotg6GeKlJMhv8cci8mM43uEnLA5t8iew=
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc h1:t2AT8zb6N/59Y78lyRWedVoVWHNRSCBh0oWCC+bluTQ=
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo=
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240711023527-ba358c48772f h1:ZIiIBRvIw62gA5MJhuwp1+2wWbqL9IGElQ499rUsYYg=
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240711023527-ba358c48772f/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo=
github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo=
github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/tetratelabs/wazero v1.7.1 h1:QtSfd6KLc41DIMpDYlJdoMc6k7QTN246DM2+n2Y/Dx8=
github.com/tetratelabs/wazero v1.7.1/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y=
github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw=
github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
@@ -22,6 +17,4 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/resp v0.1.1 h1:Ly20wkhqKTmDUPlyM1S7pWo5kk0tDu8OoC/vFArXmwE=
github.com/tidwall/resp v0.1.1/go.mod h1:3/FrruOBAxPTPtundW0VXgmsQ4ZBA0Aw714lVYgwFa0=
github.com/wasilibs/go-re2 v1.5.3 h1:wiuTcgDZdLhu8NG8oqF5sF5Q3yIU14lPAvXqeYzDK3g=
github.com/wasilibs/go-re2 v1.5.3/go.mod h1:PzpVPsBdFC7vM8QJbbEnOeTmwA0DGE783d/Gex8eCV8=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

View File

@@ -3,19 +3,16 @@ package main
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
"github.com/tidwall/gjson"
"strconv"
"strings"
"time"
)
const (
StatisticsRequestStartTime = "ai-statistics-request-start-time"
StatisticsFirstTokenTime = "ai-statistics-first-token-time"
)
func main() {
@@ -30,146 +27,243 @@ func main() {
)
}
const (
// Trace span prefix
TracePrefix = "trace_span_tag."
// Context consts
StatisticsRequestStartTime = "ai-statistics-request-start-time"
StatisticsFirstTokenTime = "ai-statistics-first-token-time"
CtxGeneralAtrribute = "attributes"
CtxLogAtrribute = "logAttributes"
CtxStreamingBodyBuffer = "streamingBodyBuffer"
// Source Type
FixedValue = "fixed_value"
RequestHeader = "request_header"
RequestBody = "request_body"
ResponseHeader = "response_header"
ResponseStreamingBody = "response_streaming_body"
ResponseBody = "response_body"
// Inner metric & log attributes name
Model = "model"
InputToken = "input_token"
OutputToken = "output_token"
LLMFirstTokenDuration = "llm_first_token_duration"
LLMServiceDuration = "llm_service_duration"
LLMDurationCount = "llm_duration_count"
// Extract Rule
RuleFirst = "first"
RuleReplace = "replace"
RuleAppend = "append"
)
// TracingSpan is the tracing span configuration.
type TracingSpan struct {
Key string `required:"true" yaml:"key" json:"key"`
ValueSource string `required:"true" yaml:"valueSource" json:"valueSource"`
Value string `required:"true" yaml:"value" json:"value"`
type Attribute struct {
Key string `json:"key"`
ValueSource string `json:"value_source"`
Value string `json:"value"`
Rule string `json:"rule,omitempty"`
ApplyToLog bool `json:"apply_to_log,omitempty"`
ApplyToSpan bool `json:"apply_to_span,omitempty"`
}
type AIStatisticsConfig struct {
Enable bool `required:"true" yaml:"enable" json:"enable"`
// TracingSpan array define the tracing span.
TracingSpan []TracingSpan `required:"true" yaml:"tracingSpan" json:"tracingSpan"`
Metrics map[string]proxywasm.MetricCounter `required:"true" yaml:"metrics" json:"metrics"`
// Metrics
// TODO: add more metrics in Gauge and Histogram format
counterMetrics map[string]proxywasm.MetricCounter
// Attributes to be recorded in log & span
attributes []Attribute
// If there exist attributes extracted from streaming body, chunks should be buffered
shouldBufferStreamingBody bool
}
func (config *AIStatisticsConfig) incrementCounter(metricName string, inc uint64, log wrapper.Log) {
counter, ok := config.Metrics[metricName]
func generateMetricName(route, cluster, model, metricName string) string {
return fmt.Sprintf("route.%s.upstream.%s.model.%s.metric.%s", route, cluster, model, metricName)
}
func getRouteName() (string, error) {
if raw, err := proxywasm.GetProperty([]string{"route_name"}); err != nil {
return "-", err
} else {
return string(raw), nil
}
}
func getClusterName() (string, error) {
if raw, err := proxywasm.GetProperty([]string{"cluster_name"}); err != nil {
return "-", err
} else {
return string(raw), nil
}
}
func (config *AIStatisticsConfig) incrementCounter(metricName string, inc uint64) {
counter, ok := config.counterMetrics[metricName]
if !ok {
counter = proxywasm.DefineCounterMetric(metricName)
config.Metrics[metricName] = counter
config.counterMetrics[metricName] = counter
}
counter.Increment(inc)
}
func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrapper.Log) error {
config.Enable = configJson.Get("enable").Bool()
// Parse tracing span.
tracingSpanConfigArray := configJson.Get("tracing_span").Array()
config.TracingSpan = make([]TracingSpan, len(tracingSpanConfigArray))
for i, tracingSpanConfig := range tracingSpanConfigArray {
tracingSpan := TracingSpan{
Key: tracingSpanConfig.Get("key").String(),
ValueSource: tracingSpanConfig.Get("value_source").String(),
Value: tracingSpanConfig.Get("value").String(),
// Parse tracing span attributes setting.
attributeConfigs := configJson.Get("attributes").Array()
config.attributes = make([]Attribute, len(attributeConfigs))
for i, attributeConfig := range attributeConfigs {
attribute := Attribute{}
err := json.Unmarshal([]byte(attributeConfig.Raw), &attribute)
if err != nil {
log.Errorf("parse config failed, %v", err)
return err
}
config.TracingSpan[i] = tracingSpan
if attribute.ValueSource == ResponseStreamingBody {
config.shouldBufferStreamingBody = true
}
if attribute.Rule != "" && attribute.Rule != RuleFirst && attribute.Rule != RuleReplace && attribute.Rule != RuleAppend {
return errors.New("value of rule must be one of [nil, first, replace, append]")
}
config.attributes[i] = attribute
}
config.Metrics = make(map[string]proxywasm.MetricCounter)
configStr, _ := json.Marshal(config)
log.Infof("Init ai-statistics config success, config: %s.", configStr)
// Metric settings
config.counterMetrics = make(map[string]proxywasm.MetricCounter)
return nil
}
func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action {
ctx.SetContext(CtxGeneralAtrribute, map[string]string{})
ctx.SetContext(CtxLogAtrribute, map[string]string{})
ctx.SetContext(StatisticsRequestStartTime, time.Now().UnixMilli())
if !config.Enable {
ctx.DontReadRequestBody()
return types.ActionContinue
}
// Fetch request header tracing span value.
setTracingSpanValueBySource(config, "request_header", nil, log)
// Fetch request process proxy wasm property.
// Warn: The property may be modified by response process , so the value of the property may be overwritten.
setTracingSpanValueBySource(config, "property", nil, log)
// Set user defined log & span attributes which type is fixed_value
setAttributeBySource(ctx, config, FixedValue, nil, log)
// Set user defined log & span attributes which type is request_header
setAttributeBySource(ctx, config, RequestHeader, nil, log)
// Set request start time.
ctx.SetContext(StatisticsRequestStartTime, strconv.FormatUint(uint64(time.Now().UnixMilli()), 10))
// The request has a body and requires delaying the header transmission until a cache miss occurs,
// at which point the header should be sent.
return types.ActionContinue
}
func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action {
// Set request body tracing span value.
setTracingSpanValueBySource(config, "request_body", body, log)
// Set user defined log & span attributes.
setAttributeBySource(ctx, config, RequestBody, body, log)
return types.ActionContinue
}
func onHttpResponseHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action {
if !config.Enable {
ctx.DontReadResponseBody()
return types.ActionContinue
}
contentType, _ := proxywasm.GetHttpResponseHeader("content-type")
if !strings.Contains(contentType, "text/event-stream") {
ctx.BufferResponseBody()
}
// Set response header tracing span value.
setTracingSpanValueBySource(config, "response_header", nil, log)
// Set user defined log & span attributes.
setAttributeBySource(ctx, config, ResponseHeader, nil, log)
return types.ActionContinue
}
func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, data []byte, endOfStream bool, log wrapper.Log) []byte {
// If the end of the stream is reached, calculate the total time and set tracing span tag total_time.
// Otherwise, set tracing span tag first_token_time.
if endOfStream {
requestStartTimeStr := ctx.GetContext(StatisticsRequestStartTime).(string)
requestStartTime, _ := strconv.ParseInt(requestStartTimeStr, 10, 64)
responseEndTime := time.Now().UnixMilli()
setTracingSpanValue("total_time", fmt.Sprintf("%d", responseEndTime-requestStartTime), log)
} else {
firstTokenTime := ctx.GetContext(StatisticsFirstTokenTime)
if firstTokenTime == nil {
firstTokenTimeStr := strconv.FormatInt(time.Now().UnixMilli(), 10)
ctx.SetContext(StatisticsFirstTokenTime, firstTokenTimeStr)
setTracingSpanValue("first_token_time", firstTokenTimeStr, log)
// Buffer stream body for record log & span attributes
if config.shouldBufferStreamingBody {
var streamingBodyBuffer []byte
streamingBodyBuffer, ok := ctx.GetContext(CtxStreamingBodyBuffer).([]byte)
if !ok {
streamingBodyBuffer = data
} else {
streamingBodyBuffer = append(streamingBodyBuffer, data...)
}
ctx.SetContext(CtxStreamingBodyBuffer, streamingBodyBuffer)
}
model, inputToken, outputToken, ok := getUsage(data)
// Get requestStartTime from http context
requestStartTime, ok := ctx.GetContext(StatisticsRequestStartTime).(int64)
if !ok {
log.Error("failed to get requestStartTime from http context")
return data
}
setFilterStateData(model, inputToken, outputToken, log)
incrementCounter(config, model, inputToken, outputToken, log)
// Set tracing span tag input_tokens and output_tokens.
setTracingSpanValue("input_tokens", strconv.FormatInt(inputToken, 10), log)
setTracingSpanValue("output_tokens", strconv.FormatInt(outputToken, 10), log)
// Set response process proxy wasm property.
setTracingSpanValueBySource(config, "property", nil, log)
// If this is the first chunk, record first token duration metric and span attribute
if ctx.GetContext(StatisticsFirstTokenTime) == nil {
firstTokenTime := time.Now().UnixMilli()
ctx.SetContext(StatisticsFirstTokenTime, firstTokenTime)
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
attributes[LLMFirstTokenDuration] = fmt.Sprint(firstTokenTime - requestStartTime)
ctx.SetContext(CtxGeneralAtrribute, attributes)
}
// Set information about this request
if model, inputToken, outputToken, ok := getUsage(data); ok {
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
// Record Log Attributes
attributes[Model] = model
attributes[InputToken] = fmt.Sprint(inputToken)
attributes[OutputToken] = fmt.Sprint(outputToken)
// Set attributes to http context
ctx.SetContext(CtxGeneralAtrribute, attributes)
}
// If the end of the stream is reached, record metrics/logs/spans.
if endOfStream {
responseEndTime := time.Now().UnixMilli()
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
attributes[LLMServiceDuration] = fmt.Sprint(responseEndTime - requestStartTime)
ctx.SetContext(CtxGeneralAtrribute, attributes)
// Set user defined log & span attributes.
if config.shouldBufferStreamingBody {
streamingBodyBuffer, ok := ctx.GetContext(CtxStreamingBodyBuffer).([]byte)
if !ok {
return data
}
setAttributeBySource(ctx, config, ResponseStreamingBody, streamingBodyBuffer, log)
}
// Write inner filter states which can be used by other plugins such as ai-token-ratelimit
writeFilterStates(ctx, log)
// Write log
writeLog(ctx, log)
// Write metrics
writeMetric(ctx, config, log)
}
return data
}
func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action {
// Get attributes from http context
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
// Get requestStartTime from http context
requestStartTime, _ := ctx.GetContext(StatisticsRequestStartTime).(int64)
// Calculate the total time and set tracing span tag total_time.
requestStartTimeStr := ctx.GetContext(StatisticsRequestStartTime).(string)
requestStartTime, _ := strconv.ParseInt(requestStartTimeStr, 10, 64)
responseEndTime := time.Now().UnixMilli()
setTracingSpanValue("total_time", fmt.Sprintf("%d", responseEndTime-requestStartTime), log)
attributes[LLMServiceDuration] = fmt.Sprint(responseEndTime - requestStartTime)
// Set information about this request
model, inputToken, outputToken, ok := getUsage(body)
if !ok {
return types.ActionContinue
if ok {
attributes[Model] = model
attributes[InputToken] = fmt.Sprint(inputToken)
attributes[OutputToken] = fmt.Sprint(outputToken)
// Update attributes
ctx.SetContext(CtxGeneralAtrribute, attributes)
}
setFilterStateData(model, inputToken, outputToken, log)
incrementCounter(config, model, inputToken, outputToken, log)
// Set tracing span tag input_tokens and output_tokens.
setTracingSpanValue("input_tokens", strconv.FormatInt(inputToken, 10), log)
setTracingSpanValue("output_tokens", strconv.FormatInt(outputToken, 10), log)
// Set response process proxy wasm property.
setTracingSpanValueBySource(config, "property", nil, log)
// Set user defined log & span attributes.
setAttributeBySource(ctx, config, ResponseBody, body, log)
// Write inner filter states which can be used by other plugins such as ai-token-ratelimit
writeFilterStates(ctx, log)
// Write log
writeLog(ctx, log)
// Write metrics
writeMetric(ctx, config, log)
return types.ActionContinue
}
@@ -198,92 +292,210 @@ func getUsage(data []byte) (model string, inputTokenUsage int64, outputTokenUsag
return
}
// setFilterData sets the input_token and output_token in the filter state.
// ai-token-ratelimit will use these values to calculate the total token usage.
func setFilterStateData(model string, inputToken int64, outputToken int64, log wrapper.Log) {
if e := proxywasm.SetProperty([]string{"model"}, []byte(model)); e != nil {
log.Errorf("failed to set model in filter state: %v", e)
}
if e := proxywasm.SetProperty([]string{"input_token"}, []byte(fmt.Sprintf("%d", inputToken))); e != nil {
log.Errorf("failed to set input_token in filter state: %v", e)
}
if e := proxywasm.SetProperty([]string{"output_token"}, []byte(fmt.Sprintf("%d", outputToken))); e != nil {
log.Errorf("failed to set output_token in filter state: %v", e)
}
}
func incrementCounter(config AIStatisticsConfig, model string, inputToken int64, outputToken int64, log wrapper.Log) {
var route, cluster string
if raw, err := proxywasm.GetProperty([]string{"route_name"}); err == nil {
route = string(raw)
}
if raw, err := proxywasm.GetProperty([]string{"cluster_name"}); err == nil {
cluster = string(raw)
}
config.incrementCounter("route."+route+".upstream."+cluster+".model."+model+".input_token", uint64(inputToken), log)
config.incrementCounter("route."+route+".upstream."+cluster+".model."+model+".output_token", uint64(outputToken), log)
}
// fetches the tracing span value from the specified source.
func setTracingSpanValueBySource(config AIStatisticsConfig, tracingSource string, body []byte, log wrapper.Log) {
for _, tracingSpanEle := range config.TracingSpan {
if tracingSource == tracingSpanEle.ValueSource {
switch tracingSource {
case "response_header":
if value, err := proxywasm.GetHttpResponseHeader(tracingSpanEle.Value); err == nil {
setTracingSpanValue(tracingSpanEle.Key, value, log)
func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, source string, body []byte, log wrapper.Log) {
attributes, ok := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
if !ok {
log.Error("failed to get attributes from http context")
return
}
for _, attribute := range config.attributes {
if source == attribute.ValueSource {
switch source {
case FixedValue:
log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, attribute.Value)
attributes[attribute.Key] = attribute.Value
case RequestHeader:
if value, err := proxywasm.GetHttpRequestHeader(attribute.Value); err == nil {
log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value)
attributes[attribute.Key] = value
}
case "request_body":
bodyJson := gjson.ParseBytes(body)
value := trimQuote(bodyJson.Get(tracingSpanEle.Value).String())
setTracingSpanValue(tracingSpanEle.Key, value, log)
case "request_header":
if value, err := proxywasm.GetHttpRequestHeader(tracingSpanEle.Value); err == nil {
setTracingSpanValue(tracingSpanEle.Key, value, log)
case RequestBody:
raw := gjson.GetBytes(body, attribute.Value).Raw
var value string
if len(raw) > 2 {
value = raw[1 : len(raw)-1]
}
case "property":
if raw, err := proxywasm.GetProperty([]string{tracingSpanEle.Value}); err == nil {
setTracingSpanValue(tracingSpanEle.Key, string(raw), log)
log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value)
attributes[attribute.Key] = value
case ResponseHeader:
if value, err := proxywasm.GetHttpResponseHeader(attribute.Value); err == nil {
log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value)
attributes[attribute.Key] = value
}
case ResponseStreamingBody:
value := extractStreamingBodyByJsonPath(body, attribute.Value, attribute.Rule, log)
log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value)
attributes[attribute.Key] = value
case ResponseBody:
value := gjson.GetBytes(body, attribute.Value).Raw
if len(value) > 2 && value[0] == '"' && value[len(value)-1] == '"' {
value = value[1 : len(value)-1]
}
log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value)
attributes[attribute.Key] = value
default:
}
}
if attribute.ApplyToLog {
setLogAttribute(ctx, attribute.Key, attributes[attribute.Key], log)
}
if attribute.ApplyToSpan {
setSpanAttribute(attribute.Key, attributes[attribute.Key], log)
}
}
ctx.SetContext(CtxGeneralAtrribute, attributes)
}
func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string, log wrapper.Log) string {
chunks := bytes.Split(bytes.TrimSpace(data), []byte("\n\n"))
var value string
if rule == RuleFirst {
for _, chunk := range chunks {
jsonObj := gjson.GetBytes(chunk, jsonPath)
if jsonObj.Exists() {
value = jsonObj.String()
break
}
}
} else if rule == RuleReplace {
for _, chunk := range chunks {
jsonObj := gjson.GetBytes(chunk, jsonPath)
if jsonObj.Exists() {
value = jsonObj.String()
}
}
} else if rule == RuleAppend {
// extract llm response
for _, chunk := range chunks {
raw := gjson.GetBytes(chunk, jsonPath).Raw
if len(raw) > 2 && raw[0] == '"' && raw[len(raw)-1] == '"' {
value += raw[1 : len(raw)-1]
}
}
} else {
log.Errorf("unsupported rule type: %s", rule)
}
return value
}
func setFilterState(key, value string, log wrapper.Log) {
if value != "" {
if e := proxywasm.SetProperty([]string{key}, []byte(fmt.Sprint(value))); e != nil {
log.Errorf("failed to set %s in filter state: %v", key, e)
}
} else {
log.Debugf("failed to write filter state [%s], because it's value is empty")
}
}
// Set the tracing span with value.
func setTracingSpanValue(tracingKey, tracingValue string, log wrapper.Log) {
log.Debugf("try to set trace span [%s] with value [%s].", tracingKey, tracingValue)
if tracingValue != "" {
traceSpanTag := "trace_span_tag." + tracingKey
if raw, err := proxywasm.GetProperty([]string{traceSpanTag}); err == nil {
if raw != nil {
log.Warnf("trace span [%s] already exists, value will be overwrite, orign value: %s.", traceSpanTag, string(raw))
}
}
if e := proxywasm.SetProperty([]string{traceSpanTag}, []byte(tracingValue)); e != nil {
func setSpanAttribute(key, value string, log wrapper.Log) {
if value != "" {
traceSpanTag := TracePrefix + key
if e := proxywasm.SetProperty([]string{traceSpanTag}, []byte(value)); e != nil {
log.Errorf("failed to set %s in filter state: %v", traceSpanTag, e)
}
log.Debugf("successed to set trace span [%s] with value [%s].", traceSpanTag, tracingValue)
} else {
log.Debugf("failed to write span attribute [%s], because it's value is empty")
}
}
// trims the quote from the source string.
func trimQuote(source string) string {
TempKey := strings.Trim(source, `"`)
Key, _ := zhToUnicode([]byte(TempKey))
return string(Key)
// fetches the tracing span value from the specified source.
func setLogAttribute(ctx wrapper.HttpContext, key string, value interface{}, log wrapper.Log) {
logAttributes, ok := ctx.GetContext(CtxLogAtrribute).(map[string]string)
if !ok {
log.Error("failed to get logAttributes from http context")
return
}
logAttributes[key] = fmt.Sprint(value)
ctx.SetContext(CtxLogAtrribute, logAttributes)
}
// converts the zh string to Unicode.
func zhToUnicode(raw []byte) ([]byte, error) {
str, err := strconv.Unquote(strings.Replace(strconv.Quote(string(raw)), `\\u`, `\u`, -1))
if err != nil {
return nil, err
}
return []byte(str), nil
func writeFilterStates(ctx wrapper.HttpContext, log wrapper.Log) {
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
setFilterState(Model, attributes[Model], log)
setFilterState(InputToken, attributes[InputToken], log)
setFilterState(OutputToken, attributes[OutputToken], log)
}
func writeMetric(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) {
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
route, _ := getRouteName()
cluster, _ := getClusterName()
model, ok := attributes["model"]
if !ok {
log.Errorf("Get model failed")
return
}
if inputToken, ok := attributes[InputToken]; ok {
inputTokenUint64, err := strconv.ParseUint(inputToken, 10, 0)
if err != nil || inputTokenUint64 == 0 {
log.Errorf("inputToken convert failed, value is %d, err msg is [%v]", inputTokenUint64, err)
return
}
config.incrementCounter(generateMetricName(route, cluster, model, InputToken), inputTokenUint64)
}
if outputToken, ok := attributes[OutputToken]; ok {
outputTokenUint64, err := strconv.ParseUint(outputToken, 10, 0)
if err != nil || outputTokenUint64 == 0 {
log.Errorf("outputToken convert failed, value is %d, err msg is [%v]", outputTokenUint64, err)
return
}
config.incrementCounter(generateMetricName(route, cluster, model, OutputToken), outputTokenUint64)
}
if llmFirstTokenDuration, ok := attributes[LLMFirstTokenDuration]; ok {
llmFirstTokenDurationUint64, err := strconv.ParseUint(llmFirstTokenDuration, 10, 0)
if err != nil || llmFirstTokenDurationUint64 == 0 {
log.Errorf("llmFirstTokenDuration convert failed, value is %d, err msg is [%v]", llmFirstTokenDurationUint64, err)
return
}
config.incrementCounter(generateMetricName(route, cluster, model, LLMFirstTokenDuration), llmFirstTokenDurationUint64)
}
if llmServiceDuration, ok := attributes[LLMServiceDuration]; ok {
llmServiceDurationUint64, err := strconv.ParseUint(llmServiceDuration, 10, 0)
if err != nil || llmServiceDurationUint64 == 0 {
log.Errorf("llmServiceDuration convert failed, value is %d, err msg is [%v]", llmServiceDurationUint64, err)
return
}
config.incrementCounter(generateMetricName(route, cluster, model, LLMServiceDuration), llmServiceDurationUint64)
}
config.incrementCounter(generateMetricName(route, cluster, model, LLMDurationCount), 1)
}
func writeLog(ctx wrapper.HttpContext, log wrapper.Log) {
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
logAttributes, _ := ctx.GetContext(CtxLogAtrribute).(map[string]string)
// Set inner log fields
if attributes[Model] != "" {
logAttributes[Model] = attributes[Model]
}
if attributes[InputToken] != "" {
logAttributes[InputToken] = attributes[InputToken]
}
if attributes[OutputToken] != "" {
logAttributes[OutputToken] = attributes[OutputToken]
}
if attributes[LLMFirstTokenDuration] != "" {
logAttributes[LLMFirstTokenDuration] = attributes[LLMFirstTokenDuration]
}
if attributes[LLMServiceDuration] != "" {
logAttributes[LLMServiceDuration] = attributes[LLMServiceDuration]
}
// Traverse log fields
items := []string{}
for k, v := range logAttributes {
items = append(items, fmt.Sprintf(`"%s":"%s"`, k, v))
}
aiLogField := fmt.Sprintf(`{%s}`, strings.Join(items, ","))
// log.Infof("ai request json log: %s", aiLogField)
jsonMap := map[string]string{
"ai_log": aiLogField,
}
serialized, _ := json.Marshal(jsonMap)
jsonLogRaw := gjson.GetBytes(serialized, "ai_log").Raw
jsonLog := jsonLogRaw[1 : len(jsonLogRaw)-1]
if err := proxywasm.SetProperty([]string{"ai_log"}, []byte(jsonLog)); err != nil {
log.Errorf("failed to set ai_log in filter state: %v", err)
}
}