Enhance the compatibility of AI observability plugins with different LLM suppliers (#2088)

This commit is contained in:
rinfx
2025-04-18 16:19:59 +08:00
committed by GitHub
parent b6d61f9568
commit 6773482300
3 changed files with 59 additions and 4 deletions

View File

@@ -46,7 +46,7 @@ const (
ResponseStreamingBody = "response_streaming_body"
ResponseBody = "response_body"
// Inner metric & log attributes name
// Inner metric & log attributes
Model = "model"
InputToken = "input_token"
OutputToken = "output_token"
@@ -55,6 +55,16 @@ const (
LLMDurationCount = "llm_duration_count"
LLMStreamDurationCount = "llm_stream_duration_count"
ResponseType = "response_type"
ChatID = "chat_id"
ChatRound = "chat_round"
// Inner span attributes
ArmsSpanKind = "gen_ai.span.kind"
ArmsModelName = "gen_ai.model_name"
ArmsRequestModel = "gen_ai.request.model"
ArmsInputToken = "gen_ai.usage.input_tokens"
ArmsOutputToken = "gen_ai.usage.output_tokens"
ArmsTotalToken = "gen_ai.usage.total_tokens"
// Extract Rule
RuleFirst = "first"
@@ -171,7 +181,8 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, lo
setAttributeBySource(ctx, config, FixedValue, nil, log)
// Set user defined log & span attributes which type is request_header
setAttributeBySource(ctx, config, RequestHeader, nil, log)
// Set request start time.
// Set span attributes for ARMS.
setSpanAttribute(ArmsSpanKind, "LLM", log)
return types.ActionContinue
}
@@ -179,6 +190,22 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config AIStatisticsConfig, lo
func onHttpRequestBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body []byte, log wrapper.Log) types.Action {
// Set user defined log & span attributes.
setAttributeBySource(ctx, config, RequestBody, body, log)
// Set span attributes for ARMS.
requestModel := gjson.GetBytes(body, "model").String()
if requestModel == "" {
requestModel = "UNKNOWN"
}
setSpanAttribute(ArmsRequestModel, requestModel, log)
// Set the number of conversation rounds
if gjson.GetBytes(body, "messages").Exists() {
userPromptCount := 0
for _, msg := range gjson.GetBytes(body, "messages").Array() {
if msg.Get("role").String() == "user" {
userPromptCount += 1
}
}
ctx.SetUserAttribute(ChatRound, userPromptCount)
}
// Write log
ctx.WriteUserAttributeToLogWithKey(wrapper.AILogKey)
@@ -211,6 +238,10 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat
}
ctx.SetUserAttribute(ResponseType, "stream")
chatID := gjson.GetBytes(data, "id").String()
if chatID != "" {
ctx.SetUserAttribute(ChatID, chatID)
}
// Get requestStartTime from http context
requestStartTime, ok := ctx.GetContext(StatisticsRequestStartTime).(int64)
@@ -231,6 +262,11 @@ func onHttpStreamingBody(ctx wrapper.HttpContext, config AIStatisticsConfig, dat
ctx.SetUserAttribute(Model, model)
ctx.SetUserAttribute(InputToken, inputToken)
ctx.SetUserAttribute(OutputToken, outputToken)
// Set span attributes for ARMS.
setSpanAttribute(ArmsModelName, model, log)
setSpanAttribute(ArmsInputToken, inputToken, log)
setSpanAttribute(ArmsOutputToken, outputToken, log)
setSpanAttribute(ArmsTotalToken, inputToken+outputToken, log)
}
// If the end of the stream is reached, record metrics/logs/spans.
if endOfStream {
@@ -263,12 +299,21 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body
ctx.SetUserAttribute(LLMServiceDuration, responseEndTime-requestStartTime)
ctx.SetUserAttribute(ResponseType, "normal")
chatID := gjson.GetBytes(body, "id").String()
if chatID != "" {
ctx.SetUserAttribute(ChatID, chatID)
}
// Set information about this request
if model, inputToken, outputToken, ok := getUsage(body); ok {
ctx.SetUserAttribute(Model, model)
ctx.SetUserAttribute(InputToken, inputToken)
ctx.SetUserAttribute(OutputToken, outputToken)
// Set span attributes for ARMS.
setSpanAttribute(ArmsModelName, model, log)
setSpanAttribute(ArmsInputToken, inputToken, log)
setSpanAttribute(ArmsOutputToken, outputToken, log)
setSpanAttribute(ArmsTotalToken, inputToken+outputToken, log)
}
// Set user defined log & span attributes.
@@ -283,8 +328,14 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config AIStatisticsConfig, body
return types.ActionContinue
}
func unifySSEChunk(data []byte) []byte {
data = bytes.ReplaceAll(data, []byte("\r\n"), []byte("\n"))
data = bytes.ReplaceAll(data, []byte("\r"), []byte("\n"))
return data
}
func getUsage(data []byte) (model string, inputTokenUsage int64, outputTokenUsage int64, ok bool) {
chunks := bytes.Split(bytes.TrimSpace(data), []byte("\n\n"))
chunks := bytes.Split(bytes.TrimSpace(unifySSEChunk(data)), []byte("\n\n"))
for _, chunk := range chunks {
// the feature strings are used to identify the usage data, like:
// {"model":"gpt2","usage":{"prompt_tokens":1,"completion_tokens":1}}
@@ -353,7 +404,7 @@ func setAttributeBySource(ctx wrapper.HttpContext, config AIStatisticsConfig, so
}
func extractStreamingBodyByJsonPath(data []byte, jsonPath string, rule string, log wrapper.Log) interface{} {
chunks := bytes.Split(bytes.TrimSpace(data), []byte("\n\n"))
chunks := bytes.Split(bytes.TrimSpace(unifySSEChunk(data)), []byte("\n\n"))
var value interface{}
if rule == RuleFirst {
for _, chunk := range chunks {