mirror of
https://github.com/alibaba/higress.git
synced 2026-02-06 15:10:54 +08:00
fix concurrent SSE connections returning wrong endpoint (#3341)
This commit is contained in:
@@ -26,8 +26,8 @@ type config struct {
|
||||
matchList []common.MatchRule
|
||||
enableUserLevelServer bool
|
||||
rateLimitConfig *handler.MCPRatelimitConfig
|
||||
defaultServer *common.SSEServer
|
||||
redisClient *common.RedisClient
|
||||
sharedMCPServer *common.MCPServer // Created once, thread-safe with sync.RWMutex
|
||||
}
|
||||
|
||||
func (c *config) Destroy() {
|
||||
@@ -110,6 +110,9 @@ func (p *Parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int
|
||||
}
|
||||
GlobalSSEPathSuffix = ssePathSuffix
|
||||
|
||||
// Create shared MCPServer once during config parsing (thread-safe with sync.RWMutex)
|
||||
conf.sharedMCPServer = common.NewMCPServer(DefaultServerName, Version)
|
||||
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
@@ -125,9 +128,6 @@ func (p *Parser) Merge(parent interface{}, child interface{}) interface{} {
|
||||
if childConfig.rateLimitConfig != nil {
|
||||
newConfig.rateLimitConfig = childConfig.rateLimitConfig
|
||||
}
|
||||
if childConfig.defaultServer != nil {
|
||||
newConfig.defaultServer = childConfig.defaultServer
|
||||
}
|
||||
return &newConfig
|
||||
}
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ type filter struct {
|
||||
skipRequestBody bool
|
||||
skipResponseBody bool
|
||||
cachedResponseBody []byte
|
||||
sseServer *common.SSEServer // SSE server instance for this filter (per-request, not shared)
|
||||
|
||||
userLevelConfig bool
|
||||
mcpConfigHandler *handler.MCPConfigHandler
|
||||
@@ -135,11 +136,13 @@ func (f *filter) processMcpRequestHeadersForRestUpstream(header api.RequestHeade
|
||||
trimmed += "?" + rq
|
||||
}
|
||||
|
||||
f.config.defaultServer = common.NewSSEServer(common.NewMCPServer(DefaultServerName, Version),
|
||||
// Create SSE server instance for this filter (per-request, not shared)
|
||||
// MCPServer is shared (thread-safe), but SSEServer must be per-request (contains request-specific messageEndpoint)
|
||||
f.sseServer = common.NewSSEServer(f.config.sharedMCPServer,
|
||||
common.WithSSEEndpoint(GlobalSSEPathSuffix),
|
||||
common.WithMessageEndpoint(trimmed),
|
||||
common.WithRedisClient(f.config.redisClient))
|
||||
f.serverName = f.config.defaultServer.GetServerName()
|
||||
f.serverName = f.sseServer.GetServerName()
|
||||
body := "SSE connection create"
|
||||
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusOK, body, nil, 0, "")
|
||||
}
|
||||
@@ -275,9 +278,9 @@ func (f *filter) encodeDataFromRestUpstream(buffer api.BufferInstance, endStream
|
||||
|
||||
if f.serverName != "" {
|
||||
if f.config.redisClient != nil {
|
||||
// handle default server
|
||||
// handle SSE server for this filter instance
|
||||
buffer.Reset()
|
||||
f.config.defaultServer.HandleSSE(f.callbacks, f.stopChan)
|
||||
f.sseServer.HandleSSE(f.callbacks, f.stopChan)
|
||||
return api.Running
|
||||
} else {
|
||||
_ = buffer.SetString(RedisNotEnabledResponseBody)
|
||||
|
||||
Reference in New Issue
Block a user