AI observability upgrade (#1587)

Co-authored-by: Kent Dong <ch3cho@qq.com>
This commit is contained in:
rinfx
2024-12-16 10:27:49 +08:00
committed by GitHub
parent 8544fa604d
commit ec39d56731
11 changed files with 194 additions and 236 deletions

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
@@ -28,14 +27,15 @@ func main() {
}
const (
// Trace span prefix
TracePrefix = "trace_span_tag."
// Context consts
StatisticsRequestStartTime = "ai-statistics-request-start-time"
StatisticsFirstTokenTime = "ai-statistics-first-token-time"
CtxGeneralAtrribute = "attributes"
CtxLogAtrribute = "logAttributes"
CtxStreamingBodyBuffer = "streamingBodyBuffer"
RouteName = "route"
ClusterName = "cluster"
APIName = "api"
// Source Type
FixedValue = "fixed_value"
@@ -46,12 +46,14 @@ const (
ResponseBody = "response_body"
// Inner metric & log attributes name
Model = "model"
InputToken = "input_token"
OutputToken = "output_token"
LLMFirstTokenDuration = "llm_first_token_duration"
LLMServiceDuration = "llm_service_duration"
LLMDurationCount = "llm_duration_count"
Model = "model"
InputToken = "input_token"
OutputToken = "output_token"
LLMFirstTokenDuration = "llm_first_token_duration"
LLMServiceDuration = "llm_service_duration"
LLMDurationCount = "llm_duration_count"
LLMStreamDurationCount = "llm_stream_duration_count"
ResponseType = "response_type"
// Extract Rule
RuleFirst = "first"
@@ -91,6 +93,19 @@ func getRouteName() (string, error) {
}
}
func getAPIName() (string, error) {
if raw, err := proxywasm.GetProperty([]string{"route_name"}); err != nil {
return "-", err
} else {
parts := strings.Split(string(raw), "@")
if len(parts) != 5 {
return "-", errors.New("not api type")
} else {
return strings.Join(parts[:3], "@"), nil
}
}
}
func getClusterName() (string, error) {
if raw, err := proxywasm.GetProperty([]string{"cluster_name"}); err != nil {
return "-", err
@@ -133,8 +148,15 @@ func parseConfig(configJson gjson.Result, config *AIStatisticsConfig, log wrappe
}
func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) types.Action {
ctx.SetContext(CtxGeneralAtrribute, map[string]string{})
ctx.SetContext(CtxLogAtrribute, map[string]string{})
route, _ := getRouteName()
cluster, _ := getClusterName()
api, api_error := getAPIName()
if api_error == nil {
route = api
}
ctx.SetContext(RouteName, route)
ctx.SetContext(ClusterName, cluster)
ctx.SetUserAttribute(APIName, api)
ctx.SetContext(StatisticsRequestStartTime, time.Now().UnixMilli())
// Set user defined log & span attributes which type is fixed_value
@@ -149,6 +171,9 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, lo
func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action {
// Set user defined log & span attributes.
setAttributeBySource(ctx, config, RequestBody, body, log)
// Write log
ctx.WriteUserAttributeToLogWithKey(wrapper.AILogKey)
return types.ActionContinue
}
@@ -177,6 +202,8 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat
ctx.SetContext(CtxStreamingBodyBuffer, streamingBodyBuffer)
}
ctx.SetUserAttribute(ResponseType, "stream")
// Get requestStartTime from http context
requestStartTime, ok := ctx.GetContext(StatisticsRequestStartTime).(int64)
if !ok {
@@ -188,28 +215,19 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat
if ctx.GetContext(StatisticsFirstTokenTime) == nil {
firstTokenTime := time.Now().UnixMilli()
ctx.SetContext(StatisticsFirstTokenTime, firstTokenTime)
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
attributes[LLMFirstTokenDuration] = fmt.Sprint(firstTokenTime - requestStartTime)
ctx.SetContext(CtxGeneralAtrribute, attributes)
ctx.SetUserAttribute(LLMFirstTokenDuration, firstTokenTime-requestStartTime)
}
// Set information about this request
if model, inputToken, outputToken, ok := getUsage(data); ok {
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
// Record Log Attributes
attributes[Model] = model
attributes[InputToken] = fmt.Sprint(inputToken)
attributes[OutputToken] = fmt.Sprint(outputToken)
// Set attributes to http context
ctx.SetContext(CtxGeneralAtrribute, attributes)
ctx.SetUserAttribute(Model, model)
ctx.SetUserAttribute(InputToken, inputToken)
ctx.SetUserAttribute(OutputToken, outputToken)
}
// If the end of the stream is reached, record metrics/logs/spans.
if endOfStream {
responseEndTime := time.Now().UnixMilli()
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
attributes[LLMServiceDuration] = fmt.Sprint(responseEndTime - requestStartTime)
ctx.SetContext(CtxGeneralAtrribute, attributes)
ctx.SetUserAttribute(LLMServiceDuration, responseEndTime-requestStartTime)
// Set user defined log & span attributes.
if config.shouldBufferStreamingBody {
@@ -220,11 +238,8 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat
setAttributeBySource(ctx, config, ResponseStreamingBody, streamingBodyBuffer, log)
}
// Write inner filter states which can be used by other plugins such as ai-token-ratelimit
writeFilterStates(ctx, log)
// Write log
writeLog(ctx, log)
ctx.WriteUserAttributeToLogWithKey(wrapper.AILogKey)
// Write metrics
writeMetric(ctx, config, log)
@@ -233,33 +248,26 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat
}
func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action {
// Get attributes from http context
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
// Get requestStartTime from http context
requestStartTime, _ := ctx.GetContext(StatisticsRequestStartTime).(int64)
responseEndTime := time.Now().UnixMilli()
attributes[LLMServiceDuration] = fmt.Sprint(responseEndTime - requestStartTime)
ctx.SetUserAttribute(LLMServiceDuration, responseEndTime-requestStartTime)
ctx.SetUserAttribute(ResponseType, "normal")
// Set information about this request
model, inputToken, outputToken, ok := getUsage(body)
if ok {
attributes[Model] = model
attributes[InputToken] = fmt.Sprint(inputToken)
attributes[OutputToken] = fmt.Sprint(outputToken)
// Update attributes
ctx.SetContext(CtxGeneralAtrribute, attributes)
if model, inputToken, outputToken, ok := getUsage(body); ok {
ctx.SetUserAttribute(Model, model)
ctx.SetUserAttribute(InputToken, inputToken)
ctx.SetUserAttribute(OutputToken, outputToken)
}
// Set user defined log & span attributes.
setAttributeBySource(ctx, config, ResponseBody, body, log)
// Write inner filter states which can be used by other plugins such as ai-token-ratelimit
writeFilterStates(ctx, log)
// Write log
writeLog(ctx, log)
ctx.WriteUserAttributeToLogWithKey(wrapper.AILogKey)
// Write metrics
writeMetric(ctx, config, log)
@@ -294,57 +302,45 @@ func getUsage(data []byte) (model string, inputTokenUsage int64, outputTokenUsag
// fetches the tracing span value from the specified source.
func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, source string, body []byte, log wrapper.Log) {
attributes, ok := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
if !ok {
log.Error("failed to get attributes from http context")
return
}
for _, attribute := range config.attributes {
var key, value string
var err error
if source == attribute.ValueSource {
key = attribute.Key
switch source {
case FixedValue:
log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, attribute.Value)
attributes[attribute.Key] = attribute.Value
value = attribute.Value
case RequestHeader:
if value, err := proxywasm.GetHttpRequestHeader(attribute.Value); err == nil {
if value, err = proxywasm.GetHttpRequestHeader(attribute.Value); err == nil {
log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value)
attributes[attribute.Key] = value
}
case RequestBody:
raw := gjson.GetBytes(body, attribute.Value).Raw
var value string
if len(raw) > 2 {
value = raw[1 : len(raw)-1]
}
log.Debugf("[attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value)
attributes[attribute.Key] = value
case ResponseHeader:
if value, err := proxywasm.GetHttpResponseHeader(attribute.Value); err == nil {
if value, err = proxywasm.GetHttpResponseHeader(attribute.Value); err == nil {
log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value)
attributes[attribute.Key] = value
}
case ResponseStreamingBody:
value := extractStreamingBodyByJsonPath(body, attribute.Value, attribute.Rule, log)
value = extractStreamingBodyByJsonPath(body, attribute.Value, attribute.Rule, log)
log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value)
attributes[attribute.Key] = value
case ResponseBody:
value := gjson.GetBytes(body, attribute.Value).Raw
if len(value) > 2 && value[0] == '"' && value[len(value)-1] == '"' {
value = value[1 : len(value)-1]
}
value = gjson.GetBytes(body, attribute.Value).String()
log.Debugf("[log attribute] source type: %s, key: %s, value: %s", source, attribute.Key, value)
attributes[attribute.Key] = value
default:
}
}
if attribute.ApplyToLog {
setLogAttribute(ctx, attribute.Key, attributes[attribute.Key], log)
}
if attribute.ApplyToSpan {
setSpanAttribute(attribute.Key, attributes[attribute.Key], log)
if attribute.ApplyToLog {
ctx.SetUserAttribute(key, value)
}
if attribute.ApplyToSpan {
setSpanAttribute(key, value, log)
}
}
}
ctx.SetContext(CtxGeneralAtrribute, attributes)
}
func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string, log wrapper.Log) string {
@@ -368,9 +364,9 @@ func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string, l
} else if rule == RuleAppend {
// extract llm response
for _, chunk := range chunks {
raw := gjson.GetBytes(chunk, jsonPath).Raw
if len(raw) > 2 && raw[0] == '"' && raw[len(raw)-1] == '"' {
value += raw[1 : len(raw)-1]
jsonObj := gjson.GetBytes(chunk, jsonPath)
if jsonObj.Exists() {
value += jsonObj.String()
}
}
} else {
@@ -379,123 +375,49 @@ func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string, l
return value
}
func setFilterState(key, value string, log wrapper.Log) {
if value != "" {
if e := proxywasm.SetProperty([]string{key}, []byte(fmt.Sprint(value))); e != nil {
log.Errorf("failed to set %s in filter state: %v", key, e)
}
} else {
log.Debugf("failed to write filter state [%s], because it's value is empty")
}
}
// Set the tracing span with value.
func setSpanAttribute(key, value string, log wrapper.Log) {
if value != "" {
traceSpanTag := TracePrefix + key
traceSpanTag := wrapper.TraceSpanTagPrefix + key
if e := proxywasm.SetProperty([]string{traceSpanTag}, []byte(value)); e != nil {
log.Errorf("failed to set %s in filter state: %v", traceSpanTag, e)
log.Warnf("failed to set %s in filter state: %v", traceSpanTag, e)
}
} else {
log.Debugf("failed to write span attribute [%s], because it's value is empty")
}
}
// fetches the tracing span value from the specified source.
func setLogAttribute(ctx wrapper.HttpContext, key string, value interface{}, log wrapper.Log) {
logAttributes, ok := ctx.GetContext(CtxLogAtrribute).(map[string]string)
if !ok {
log.Error("failed to get logAttributes from http context")
return
}
logAttributes[key] = fmt.Sprint(value)
ctx.SetContext(CtxLogAtrribute, logAttributes)
}
func writeFilterStates(ctx wrapper.HttpContext, log wrapper.Log) {
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
setFilterState(Model, attributes[Model], log)
setFilterState(InputToken, attributes[InputToken], log)
setFilterState(OutputToken, attributes[OutputToken], log)
}
func writeMetric(ctx wrapper.HttpContext, config AIStatisticsConfig, log wrapper.Log) {
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
route, _ := getRouteName()
cluster, _ := getClusterName()
model, ok := attributes["model"]
if !ok {
log.Errorf("Get model failed")
route := ctx.GetContext(RouteName).(string)
cluster := ctx.GetContext(ClusterName).(string)
// Generate usage metrics
var model string
var inputToken, outputToken int64
if ctx.GetUserAttribute(Model) == nil || ctx.GetUserAttribute(InputToken) == nil || ctx.GetUserAttribute(OutputToken) == nil {
log.Warnf("get usage information failed, skip metric record")
return
}
if inputToken, ok := attributes[InputToken]; ok {
inputTokenUint64, err := strconv.ParseUint(inputToken, 10, 0)
if err != nil || inputTokenUint64 == 0 {
log.Errorf("inputToken convert failed, value is %d, err msg is [%v]", inputTokenUint64, err)
return
}
config.incrementCounter(generateMetricName(route, cluster, model, InputToken), inputTokenUint64)
model = ctx.GetUserAttribute(Model).(string)
inputToken = ctx.GetUserAttribute(InputToken).(int64)
outputToken = ctx.GetUserAttribute(OutputToken).(int64)
if inputToken == 0 || outputToken == 0 {
log.Warnf("inputToken and outputToken cannot equal to 0, skip metric record")
return
}
if outputToken, ok := attributes[OutputToken]; ok {
outputTokenUint64, err := strconv.ParseUint(outputToken, 10, 0)
if err != nil || outputTokenUint64 == 0 {
log.Errorf("outputToken convert failed, value is %d, err msg is [%v]", outputTokenUint64, err)
return
}
config.incrementCounter(generateMetricName(route, cluster, model, OutputToken), outputTokenUint64)
}
if llmFirstTokenDuration, ok := attributes[LLMFirstTokenDuration]; ok {
llmFirstTokenDurationUint64, err := strconv.ParseUint(llmFirstTokenDuration, 10, 0)
if err != nil || llmFirstTokenDurationUint64 == 0 {
log.Errorf("llmFirstTokenDuration convert failed, value is %d, err msg is [%v]", llmFirstTokenDurationUint64, err)
return
}
config.incrementCounter(generateMetricName(route, cluster, model, LLMFirstTokenDuration), llmFirstTokenDurationUint64)
}
if llmServiceDuration, ok := attributes[LLMServiceDuration]; ok {
llmServiceDurationUint64, err := strconv.ParseUint(llmServiceDuration, 10, 0)
if err != nil || llmServiceDurationUint64 == 0 {
log.Errorf("llmServiceDuration convert failed, value is %d, err msg is [%v]", llmServiceDurationUint64, err)
return
}
config.incrementCounter(generateMetricName(route, cluster, model, LLMServiceDuration), llmServiceDurationUint64)
}
config.incrementCounter(generateMetricName(route, cluster, model, LLMDurationCount), 1)
}
config.incrementCounter(generateMetricName(route, cluster, model, InputToken), uint64(inputToken))
config.incrementCounter(generateMetricName(route, cluster, model, OutputToken), uint64(outputToken))
func writeLog(ctx wrapper.HttpContext, log wrapper.Log) {
attributes, _ := ctx.GetContext(CtxGeneralAtrribute).(map[string]string)
logAttributes, _ := ctx.GetContext(CtxLogAtrribute).(map[string]string)
// Set inner log fields
if attributes[Model] != "" {
logAttributes[Model] = attributes[Model]
// Generate duration metrics
var llmFirstTokenDuration, llmServiceDuration int64
// Is stream response
if ctx.GetUserAttribute(LLMFirstTokenDuration) != nil {
llmFirstTokenDuration = ctx.GetUserAttribute(LLMFirstTokenDuration).(int64)
config.incrementCounter(generateMetricName(route, cluster, model, LLMFirstTokenDuration), uint64(llmFirstTokenDuration))
config.incrementCounter(generateMetricName(route, cluster, model, LLMStreamDurationCount), 1)
}
if attributes[InputToken] != "" {
logAttributes[InputToken] = attributes[InputToken]
}
if attributes[OutputToken] != "" {
logAttributes[OutputToken] = attributes[OutputToken]
}
if attributes[LLMFirstTokenDuration] != "" {
logAttributes[LLMFirstTokenDuration] = attributes[LLMFirstTokenDuration]
}
if attributes[LLMServiceDuration] != "" {
logAttributes[LLMServiceDuration] = attributes[LLMServiceDuration]
}
// Traverse log fields
items := []string{}
for k, v := range logAttributes {
items = append(items, fmt.Sprintf(`"%s":"%s"`, k, v))
}
aiLogField := fmt.Sprintf(`{%s}`, strings.Join(items, ","))
// log.Infof("ai request json log: %s", aiLogField)
jsonMap := map[string]string{
"ai_log": aiLogField,
}
serialized, _ := json.Marshal(jsonMap)
jsonLogRaw := gjson.GetBytes(serialized, "ai_log").Raw
jsonLog := jsonLogRaw[1 : len(jsonLogRaw)-1]
if err := proxywasm.SetProperty([]string{"ai_log"}, []byte(jsonLog)); err != nil {
log.Errorf("failed to set ai_log in filter state: %v", err)
if ctx.GetUserAttribute(LLMServiceDuration) != nil {
llmServiceDuration = ctx.GetUserAttribute(LLMServiceDuration).(int64)
config.incrementCounter(generateMetricName(route, cluster, model, LLMServiceDuration), uint64(llmServiceDuration))
config.incrementCounter(generateMetricName(route, cluster, model, LLMDurationCount), 1)
}
}