mirror of
https://github.com/alibaba/higress.git
synced 2026-06-09 12:47:28 +08:00
feat(ai-statistics): add Claude/Anthropic streaming tool calls parsing support (#3523)
This commit is contained in:
@@ -127,10 +127,19 @@ const (
|
|||||||
AnswerPathOpenAIStreaming = "choices.0.delta.content"
|
AnswerPathOpenAIStreaming = "choices.0.delta.content"
|
||||||
AnswerPathClaudeStreaming = "delta.text"
|
AnswerPathClaudeStreaming = "delta.text"
|
||||||
|
|
||||||
// Tool calls paths
|
// Tool calls paths (OpenAI format)
|
||||||
ToolCallsPathNonStreaming = "choices.0.message.tool_calls"
|
ToolCallsPathNonStreaming = "choices.0.message.tool_calls"
|
||||||
ToolCallsPathStreaming = "choices.0.delta.tool_calls"
|
ToolCallsPathStreaming = "choices.0.delta.tool_calls"
|
||||||
|
|
||||||
|
// Claude/Anthropic tool calls paths (streaming)
|
||||||
|
ClaudeEventType = "type"
|
||||||
|
ClaudeContentBlockType = "content_block.type"
|
||||||
|
ClaudeContentBlockID = "content_block.id"
|
||||||
|
ClaudeContentBlockName = "content_block.name"
|
||||||
|
ClaudeContentBlockInput = "content_block.input"
|
||||||
|
ClaudeDeltaPartialJSON = "delta.partial_json"
|
||||||
|
ClaudeIndex = "index"
|
||||||
|
|
||||||
// Reasoning paths
|
// Reasoning paths
|
||||||
ReasoningPathNonStreaming = "choices.0.message.reasoning_content"
|
ReasoningPathNonStreaming = "choices.0.message.reasoning_content"
|
||||||
ReasoningPathStreaming = "choices.0.delta.reasoning_content"
|
ReasoningPathStreaming = "choices.0.delta.reasoning_content"
|
||||||
@@ -264,14 +273,18 @@ type ToolCallFunction struct {
|
|||||||
|
|
||||||
// StreamingToolCallsBuffer holds the state for assembling streaming tool calls
|
// StreamingToolCallsBuffer holds the state for assembling streaming tool calls
|
||||||
type StreamingToolCallsBuffer struct {
|
type StreamingToolCallsBuffer struct {
|
||||||
ToolCalls map[int]*ToolCall // keyed by index
|
ToolCalls map[int]*ToolCall // keyed by index (OpenAI format)
|
||||||
|
InToolBlock map[int]bool // tracks which indices are in tool_use blocks (Claude format)
|
||||||
|
ArgumentsBuffer map[int]string // buffers partial JSON arguments (Claude format)
|
||||||
}
|
}
|
||||||
|
|
||||||
// extractStreamingToolCalls extracts and assembles tool calls from streaming response chunks
|
// extractStreamingToolCalls extracts and assembles tool calls from streaming response chunks (OpenAI format)
|
||||||
func extractStreamingToolCalls(data []byte, buffer *StreamingToolCallsBuffer) *StreamingToolCallsBuffer {
|
func extractStreamingToolCalls(data []byte, buffer *StreamingToolCallsBuffer) *StreamingToolCallsBuffer {
|
||||||
if buffer == nil {
|
if buffer == nil {
|
||||||
buffer = &StreamingToolCallsBuffer{
|
buffer = &StreamingToolCallsBuffer{
|
||||||
ToolCalls: make(map[int]*ToolCall),
|
ToolCalls: make(map[int]*ToolCall),
|
||||||
|
InToolBlock: make(map[int]bool),
|
||||||
|
ArgumentsBuffer: make(map[int]string),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -312,6 +325,86 @@ func extractStreamingToolCalls(data []byte, buffer *StreamingToolCallsBuffer) *S
|
|||||||
return buffer
|
return buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// extractClaudeStreamingToolCalls extracts and assembles tool calls from Claude/Anthropic streaming response chunks
|
||||||
|
// Claude format uses events: content_block_start, content_block_delta, content_block_stop
|
||||||
|
func extractClaudeStreamingToolCalls(data []byte, buffer *StreamingToolCallsBuffer) *StreamingToolCallsBuffer {
|
||||||
|
if buffer == nil {
|
||||||
|
buffer = &StreamingToolCallsBuffer{
|
||||||
|
ToolCalls: make(map[int]*ToolCall),
|
||||||
|
InToolBlock: make(map[int]bool),
|
||||||
|
ArgumentsBuffer: make(map[int]string),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks := bytes.Split(bytes.TrimSpace(wrapper.UnifySSEChunk(data)), []byte("\n\n"))
|
||||||
|
for _, chunk := range chunks {
|
||||||
|
// Get event type
|
||||||
|
eventType := gjson.GetBytes(chunk, ClaudeEventType)
|
||||||
|
if !eventType.Exists() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
switch eventType.String() {
|
||||||
|
case "content_block_start":
|
||||||
|
// Check if this is a tool_use block
|
||||||
|
contentBlockType := gjson.GetBytes(chunk, ClaudeContentBlockType)
|
||||||
|
if contentBlockType.Exists() && contentBlockType.String() == "tool_use" {
|
||||||
|
index := int(gjson.GetBytes(chunk, ClaudeIndex).Int())
|
||||||
|
|
||||||
|
// Create tool call entry
|
||||||
|
tc := &ToolCall{Index: index}
|
||||||
|
|
||||||
|
// Extract id and name
|
||||||
|
if id := gjson.GetBytes(chunk, ClaudeContentBlockID).String(); id != "" {
|
||||||
|
tc.ID = id
|
||||||
|
}
|
||||||
|
if name := gjson.GetBytes(chunk, ClaudeContentBlockName).String(); name != "" {
|
||||||
|
tc.Function.Name = name
|
||||||
|
}
|
||||||
|
tc.Type = "tool_use"
|
||||||
|
|
||||||
|
buffer.ToolCalls[index] = tc
|
||||||
|
buffer.InToolBlock[index] = true
|
||||||
|
buffer.ArgumentsBuffer[index] = ""
|
||||||
|
|
||||||
|
// Try to extract initial input if present
|
||||||
|
if input := gjson.GetBytes(chunk, ClaudeContentBlockInput); input.Exists() {
|
||||||
|
if inputMap, ok := input.Value().(map[string]interface{}); ok {
|
||||||
|
if jsonBytes, err := json.Marshal(inputMap); err == nil {
|
||||||
|
buffer.ArgumentsBuffer[index] = string(jsonBytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case "content_block_delta":
|
||||||
|
// Check if we're in a tool block
|
||||||
|
index := int(gjson.GetBytes(chunk, ClaudeIndex).Int())
|
||||||
|
if buffer.InToolBlock[index] {
|
||||||
|
// Accumulate partial JSON arguments
|
||||||
|
partialJSON := gjson.GetBytes(chunk, ClaudeDeltaPartialJSON)
|
||||||
|
if partialJSON.Exists() {
|
||||||
|
buffer.ArgumentsBuffer[index] += partialJSON.String()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case "content_block_stop":
|
||||||
|
// Finalize the tool call if we were in a tool block
|
||||||
|
index := int(gjson.GetBytes(chunk, ClaudeIndex).Int())
|
||||||
|
if buffer.InToolBlock[index] {
|
||||||
|
buffer.InToolBlock[index] = false
|
||||||
|
|
||||||
|
// Parse accumulated arguments and set them
|
||||||
|
if tc, exists := buffer.ToolCalls[index]; exists {
|
||||||
|
tc.Function.Arguments = buffer.ArgumentsBuffer[index]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return buffer
|
||||||
|
}
|
||||||
|
|
||||||
// getToolCallsFromBuffer converts the buffer to a sorted slice of tool calls
|
// getToolCallsFromBuffer converts the buffer to a sorted slice of tool calls
|
||||||
func getToolCallsFromBuffer(buffer *StreamingToolCallsBuffer) []ToolCall {
|
func getToolCallsFromBuffer(buffer *StreamingToolCallsBuffer) []ToolCall {
|
||||||
if buffer == nil || len(buffer.ToolCalls) == 0 {
|
if buffer == nil || len(buffer.ToolCalls) == 0 {
|
||||||
@@ -1026,7 +1119,10 @@ func getBuiltinAttributeFallback(ctx wrapper.HttpContext, config AIStatisticsCon
|
|||||||
if existingBuffer, ok := ctx.GetContext(CtxStreamingToolCallsBuffer).(*StreamingToolCallsBuffer); ok {
|
if existingBuffer, ok := ctx.GetContext(CtxStreamingToolCallsBuffer).(*StreamingToolCallsBuffer); ok {
|
||||||
buffer = existingBuffer
|
buffer = existingBuffer
|
||||||
}
|
}
|
||||||
|
// Try OpenAI format first
|
||||||
buffer = extractStreamingToolCalls(body, buffer)
|
buffer = extractStreamingToolCalls(body, buffer)
|
||||||
|
// Also try Claude format (both formats can be checked)
|
||||||
|
buffer = extractClaudeStreamingToolCalls(body, buffer)
|
||||||
ctx.SetContext(CtxStreamingToolCallsBuffer, buffer)
|
ctx.SetContext(CtxStreamingToolCallsBuffer, buffer)
|
||||||
|
|
||||||
// Also set tool_calls to user attributes so they appear in ai_log
|
// Also set tool_calls to user attributes so they appear in ai_log
|
||||||
|
|||||||
Reference in New Issue
Block a user