diff --git a/plugins/golang-filter/mcp-session/filter.go b/plugins/golang-filter/mcp-session/filter.go index 9d9a27605..f31fa2404 100644 --- a/plugins/golang-filter/mcp-session/filter.go +++ b/plugins/golang-filter/mcp-session/filter.go @@ -289,8 +289,18 @@ func (f *filter) encodeDataFromRestUpstream(buffer api.BufferInstance, endStream func (f *filter) encodeDataFromSSEUpstream(buffer api.BufferInstance, endStream bool) api.StatusType { bufferBytes := buffer.Bytes() bufferData := string(bufferBytes) + api.LogDebugf("Received SSE data: %q, length: %d, endStream: %v", bufferData, len(bufferData), endStream) - err, endpointUrl := f.findEndpointUrl(bufferData) + // Combine cached data with new data + var combinedData string + if len(f.cachedResponseBody) > 0 { + combinedData = string(f.cachedResponseBody) + bufferData + api.LogDebugf("Combined with cached data: %q, total length: %d", combinedData, len(combinedData)) + } else { + combinedData = bufferData + } + + err, endpointUrl := f.findEndpointUrl(combinedData) if err != nil { api.LogWarnf("Failed to find endpoint URL in SSE data: %v", err) f.needProcess = false @@ -298,8 +308,12 @@ func (f *filter) encodeDataFromSSEUpstream(buffer api.BufferInstance, endStream } if endpointUrl == "" { // No endpoint URL found. Need to buffer and check again. - return api.StopAndBuffer + f.cachedResponseBody = []byte(combinedData) + buffer.Reset() + return api.Continue } + // Clear cached data + f.cachedResponseBody = nil // Remove query string since we don't need to change it. queryStringIndex := strings.IndexAny(endpointUrl, "?") @@ -310,12 +324,12 @@ func (f *filter) encodeDataFromSSEUpstream(buffer api.BufferInstance, endStream if changed, newEndpointUrl := f.rewriteEndpointUrl(endpointUrl); changed { api.LogDebugf("The endpoint URL is changed.\n Old: %s\n New: %s", endpointUrl, newEndpointUrl) - endpointUrlIndex := strings.Index(bufferData, endpointUrl) + endpointUrlIndex := strings.Index(combinedData, endpointUrl) if endpointUrlIndex == -1 { api.LogWarnf("Something wrong, the previously found endpoint URL %s not found in the SSE data now", endpointUrl) } else { - bufferData = bufferData[:endpointUrlIndex] + newEndpointUrl + bufferData[endpointUrlIndex+len(endpointUrl):] - _ = buffer.SetString(bufferData) + newBufferData := combinedData[:endpointUrlIndex] + newEndpointUrl + combinedData[endpointUrlIndex+len(endpointUrl):] + _ = buffer.SetString(newBufferData) } } else { api.LogDebugf("The endpoint URL %s is not changed", endpointUrl) @@ -492,6 +506,7 @@ func (f *filter) findEndpointUrl(bufferData string) (error, string) { // OnDestroy stops the goroutine func (f *filter) OnDestroy(reason api.DestroyReason) { api.LogDebugf("OnDestroy: reason=%v", reason) + f.cachedResponseBody = nil if f.serverName != "" && f.stopChan != nil { select { case <-f.stopChan: