mirror of
https://github.com/alibaba/higress.git
synced 2026-02-06 23:21:08 +08:00
fix: When the sse event is divided into multiple chunks, the sse connection is blocked (#2865)
Co-authored-by: 澄潭 <zty98751@alibaba-inc.com>
This commit is contained in:
@@ -289,8 +289,18 @@ func (f *filter) encodeDataFromRestUpstream(buffer api.BufferInstance, endStream
|
|||||||
func (f *filter) encodeDataFromSSEUpstream(buffer api.BufferInstance, endStream bool) api.StatusType {
|
func (f *filter) encodeDataFromSSEUpstream(buffer api.BufferInstance, endStream bool) api.StatusType {
|
||||||
bufferBytes := buffer.Bytes()
|
bufferBytes := buffer.Bytes()
|
||||||
bufferData := string(bufferBytes)
|
bufferData := string(bufferBytes)
|
||||||
|
api.LogDebugf("Received SSE data: %q, length: %d, endStream: %v", bufferData, len(bufferData), endStream)
|
||||||
|
|
||||||
err, endpointUrl := f.findEndpointUrl(bufferData)
|
// Combine cached data with new data
|
||||||
|
var combinedData string
|
||||||
|
if len(f.cachedResponseBody) > 0 {
|
||||||
|
combinedData = string(f.cachedResponseBody) + bufferData
|
||||||
|
api.LogDebugf("Combined with cached data: %q, total length: %d", combinedData, len(combinedData))
|
||||||
|
} else {
|
||||||
|
combinedData = bufferData
|
||||||
|
}
|
||||||
|
|
||||||
|
err, endpointUrl := f.findEndpointUrl(combinedData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
api.LogWarnf("Failed to find endpoint URL in SSE data: %v", err)
|
api.LogWarnf("Failed to find endpoint URL in SSE data: %v", err)
|
||||||
f.needProcess = false
|
f.needProcess = false
|
||||||
@@ -298,8 +308,12 @@ func (f *filter) encodeDataFromSSEUpstream(buffer api.BufferInstance, endStream
|
|||||||
}
|
}
|
||||||
if endpointUrl == "" {
|
if endpointUrl == "" {
|
||||||
// No endpoint URL found. Need to buffer and check again.
|
// No endpoint URL found. Need to buffer and check again.
|
||||||
return api.StopAndBuffer
|
f.cachedResponseBody = []byte(combinedData)
|
||||||
|
buffer.Reset()
|
||||||
|
return api.Continue
|
||||||
}
|
}
|
||||||
|
// Clear cached data
|
||||||
|
f.cachedResponseBody = nil
|
||||||
|
|
||||||
// Remove query string since we don't need to change it.
|
// Remove query string since we don't need to change it.
|
||||||
queryStringIndex := strings.IndexAny(endpointUrl, "?")
|
queryStringIndex := strings.IndexAny(endpointUrl, "?")
|
||||||
@@ -310,12 +324,12 @@ func (f *filter) encodeDataFromSSEUpstream(buffer api.BufferInstance, endStream
|
|||||||
if changed, newEndpointUrl := f.rewriteEndpointUrl(endpointUrl); changed {
|
if changed, newEndpointUrl := f.rewriteEndpointUrl(endpointUrl); changed {
|
||||||
api.LogDebugf("The endpoint URL is changed.\n Old: %s\n New: %s", endpointUrl, newEndpointUrl)
|
api.LogDebugf("The endpoint URL is changed.\n Old: %s\n New: %s", endpointUrl, newEndpointUrl)
|
||||||
|
|
||||||
endpointUrlIndex := strings.Index(bufferData, endpointUrl)
|
endpointUrlIndex := strings.Index(combinedData, endpointUrl)
|
||||||
if endpointUrlIndex == -1 {
|
if endpointUrlIndex == -1 {
|
||||||
api.LogWarnf("Something wrong, the previously found endpoint URL %s not found in the SSE data now", endpointUrl)
|
api.LogWarnf("Something wrong, the previously found endpoint URL %s not found in the SSE data now", endpointUrl)
|
||||||
} else {
|
} else {
|
||||||
bufferData = bufferData[:endpointUrlIndex] + newEndpointUrl + bufferData[endpointUrlIndex+len(endpointUrl):]
|
newBufferData := combinedData[:endpointUrlIndex] + newEndpointUrl + combinedData[endpointUrlIndex+len(endpointUrl):]
|
||||||
_ = buffer.SetString(bufferData)
|
_ = buffer.SetString(newBufferData)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
api.LogDebugf("The endpoint URL %s is not changed", endpointUrl)
|
api.LogDebugf("The endpoint URL %s is not changed", endpointUrl)
|
||||||
@@ -492,6 +506,7 @@ func (f *filter) findEndpointUrl(bufferData string) (error, string) {
|
|||||||
// OnDestroy stops the goroutine
|
// OnDestroy stops the goroutine
|
||||||
func (f *filter) OnDestroy(reason api.DestroyReason) {
|
func (f *filter) OnDestroy(reason api.DestroyReason) {
|
||||||
api.LogDebugf("OnDestroy: reason=%v", reason)
|
api.LogDebugf("OnDestroy: reason=%v", reason)
|
||||||
|
f.cachedResponseBody = nil
|
||||||
if f.serverName != "" && f.stopChan != nil {
|
if f.serverName != "" && f.stopChan != nil {
|
||||||
select {
|
select {
|
||||||
case <-f.stopChan:
|
case <-f.stopChan:
|
||||||
|
|||||||
Reference in New Issue
Block a user