mirror of
https://github.com/alibaba/higress.git
synced 2026-05-27 14:17:27 +08:00
feat(ingress): support custom parameter names for MCP SSE stateful sessions (#3008)
This commit is contained in:
@@ -653,7 +653,7 @@ func (m *IngressConfig) convertEnvoyFilter(convertOptions *common.ConvertOptions
|
|||||||
loadBalance := route.WrapperConfig.AnnotationsConfig.LoadBalance
|
loadBalance := route.WrapperConfig.AnnotationsConfig.LoadBalance
|
||||||
if loadBalance != nil && loadBalance.McpSseStateful {
|
if loadBalance != nil && loadBalance.McpSseStateful {
|
||||||
IngressLog.Infof("Found MCP SSE stateful session for route %s", route.HTTPRoute.Name)
|
IngressLog.Infof("Found MCP SSE stateful session for route %s", route.HTTPRoute.Name)
|
||||||
envoyFilter, err := m.constructMcpSseStatefulSessionEnvoyFilter(route, m.namespace, initMcpSseGlobalFilter)
|
envoyFilter, err := m.constructMcpSseStatefulSessionEnvoyFilter(route, m.namespace, initMcpSseGlobalFilter, loadBalance.McpSseStatefulKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
IngressLog.Errorf("Construct MCP SSE stateful session EnvoyFilter error %v", err)
|
IngressLog.Errorf("Construct MCP SSE stateful session EnvoyFilter error %v", err)
|
||||||
} else {
|
} else {
|
||||||
@@ -1956,7 +1956,7 @@ func (m *IngressConfig) Delete(config.GroupVersionKind, string, string, *string)
|
|||||||
return common.ErrUnsupportedOp
|
return common.ErrUnsupportedOp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *IngressConfig) constructMcpSseStatefulSessionEnvoyFilter(route *common.WrapperHTTPRoute, namespace string, initGlobalFilter bool) (*config.Config, error) {
|
func (m *IngressConfig) constructMcpSseStatefulSessionEnvoyFilter(route *common.WrapperHTTPRoute, namespace string, initGlobalFilter bool, mcpSseStatefulKey string) (*config.Config, error) {
|
||||||
httpRoute := route.HTTPRoute
|
httpRoute := route.HTTPRoute
|
||||||
|
|
||||||
var configPatches []*networking.EnvoyFilter_EnvoyConfigObjectPatch
|
var configPatches []*networking.EnvoyFilter_EnvoyConfigObjectPatch
|
||||||
@@ -2010,7 +2010,7 @@ func (m *IngressConfig) constructMcpSseStatefulSessionEnvoyFilter(route *common.
|
|||||||
},
|
},
|
||||||
Patch: &networking.EnvoyFilter_Patch{
|
Patch: &networking.EnvoyFilter_Patch{
|
||||||
Operation: networking.EnvoyFilter_Patch_MERGE,
|
Operation: networking.EnvoyFilter_Patch_MERGE,
|
||||||
Value: buildPatchStruct(`{
|
Value: buildPatchStruct(fmt.Sprintf(`{
|
||||||
"typed_per_filter_config": {
|
"typed_per_filter_config": {
|
||||||
"envoy.filters.http.mcp_sse_stateful_session": {
|
"envoy.filters.http.mcp_sse_stateful_session": {
|
||||||
"@type": "type.googleapis.com/udpa.type.v1.TypedStruct",
|
"@type": "type.googleapis.com/udpa.type.v1.TypedStruct",
|
||||||
@@ -2023,7 +2023,7 @@ func (m *IngressConfig) constructMcpSseStatefulSessionEnvoyFilter(route *common.
|
|||||||
"@type": "type.googleapis.com/udpa.type.v1.TypedStruct",
|
"@type": "type.googleapis.com/udpa.type.v1.TypedStruct",
|
||||||
"type_url": "type.googleapis.com/envoy.extensions.http.mcp_sse_stateful_session.envelope.v3alpha.EnvelopeSessionState",
|
"type_url": "type.googleapis.com/envoy.extensions.http.mcp_sse_stateful_session.envelope.v3alpha.EnvelopeSessionState",
|
||||||
"value": {
|
"value": {
|
||||||
"param_name": "sessionId",
|
"param_name": "%s",
|
||||||
"chunk_end_patterns": ["\r\n\r\n", "\n\n", "\r\r"]
|
"chunk_end_patterns": ["\r\n\r\n", "\n\n", "\r\r"]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -2033,7 +2033,7 @@ func (m *IngressConfig) constructMcpSseStatefulSessionEnvoyFilter(route *common.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}`),
|
}`, mcpSseStatefulKey)),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@@ -41,6 +41,9 @@ const (
|
|||||||
|
|
||||||
defaultAffinityCookieName = "INGRESSCOOKIE"
|
defaultAffinityCookieName = "INGRESSCOOKIE"
|
||||||
defaultAffinityCookiePath = "/"
|
defaultAffinityCookiePath = "/"
|
||||||
|
|
||||||
|
mcpSseStatefulKey = "mcp-sse-stateful-param-name"
|
||||||
|
defaultMcpSseStatefulKey = "sessionId"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -70,6 +73,7 @@ type LoadBalanceConfig struct {
|
|||||||
other *consistentHashByOther
|
other *consistentHashByOther
|
||||||
cookie *consistentHashByCookie
|
cookie *consistentHashByCookie
|
||||||
McpSseStateful bool
|
McpSseStateful bool
|
||||||
|
McpSseStatefulKey string
|
||||||
}
|
}
|
||||||
|
|
||||||
type loadBalance struct{}
|
type loadBalance struct{}
|
||||||
@@ -139,6 +143,11 @@ func (l loadBalance) Parse(annotations Annotations, config *Ingress, _ *GlobalCo
|
|||||||
lb = strings.ToUpper(lb)
|
lb = strings.ToUpper(lb)
|
||||||
if lb == "MCP-SSE" {
|
if lb == "MCP-SSE" {
|
||||||
loadBalanceConfig.McpSseStateful = true
|
loadBalanceConfig.McpSseStateful = true
|
||||||
|
if key, err := annotations.ParseStringASAP(mcpSseStatefulKey); err == nil {
|
||||||
|
loadBalanceConfig.McpSseStatefulKey = key
|
||||||
|
} else {
|
||||||
|
loadBalanceConfig.McpSseStatefulKey = defaultMcpSseStatefulKey
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
loadBalanceConfig.simple = networking.LoadBalancerSettings_SimpleLB(networking.LoadBalancerSettings_SimpleLB_value[lb])
|
loadBalanceConfig.simple = networking.LoadBalancerSettings_SimpleLB(networking.LoadBalancerSettings_SimpleLB_value[lb])
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user