mirror of
https://github.com/alibaba/higress.git
synced 2026-06-09 04:37:31 +08:00
feat: add MCP SSE stateful session load balancer support (#2818)
This commit is contained in:
@@ -628,6 +628,7 @@ func (m *IngressConfig) convertEnvoyFilter(convertOptions *common.ConvertOptions
|
||||
mappings := map[string]*common.Rule{}
|
||||
|
||||
initHttp2RpcGlobalConfig := true
|
||||
initMcpSseGlobalFilter := true
|
||||
for _, routes := range convertOptions.HTTPRoutes {
|
||||
for _, route := range routes {
|
||||
if strings.HasSuffix(route.HTTPRoute.Name, "app-root") {
|
||||
@@ -647,6 +648,19 @@ func (m *IngressConfig) convertEnvoyFilter(convertOptions *common.ConvertOptions
|
||||
}
|
||||
}
|
||||
|
||||
loadBalance := route.WrapperConfig.AnnotationsConfig.LoadBalance
|
||||
if loadBalance != nil && loadBalance.McpSseStateful {
|
||||
IngressLog.Infof("Found MCP SSE stateful session for route %s", route.HTTPRoute.Name)
|
||||
envoyFilter, err := m.constructMcpSseStatefulSessionEnvoyFilter(route, m.namespace, initMcpSseGlobalFilter)
|
||||
if err != nil {
|
||||
IngressLog.Errorf("Construct MCP SSE stateful session EnvoyFilter error %v", err)
|
||||
} else {
|
||||
IngressLog.Infof("Append MCP SSE stateful session EnvoyFilter for route %s", route.HTTPRoute.Name)
|
||||
envoyFilters = append(envoyFilters, *envoyFilter)
|
||||
initMcpSseGlobalFilter = false
|
||||
}
|
||||
}
|
||||
|
||||
auth := route.WrapperConfig.AnnotationsConfig.Auth
|
||||
if auth == nil {
|
||||
continue
|
||||
@@ -1511,7 +1525,7 @@ func (m *IngressConfig) constructHttp2RpcEnvoyFilter(http2rpcConfig *annotations
|
||||
return &config.Config{
|
||||
Meta: config.Meta{
|
||||
GroupVersionKind: gvk.EnvoyFilter,
|
||||
Name: common.CreateConvertedName(constants.IstioIngressGatewayName, "http2rpc", http2rpcConfig.Name, "route", httpRoute.Name),
|
||||
Name: common.CreateConvertedName(constants.IstioIngressGatewayName, "http2rpc", http2rpcConfig.Name, "route", common.ConvertToDNSLabelValid(httpRoute.Name)),
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: &networking.EnvoyFilter{
|
||||
@@ -1940,6 +1954,99 @@ func (m *IngressConfig) Delete(config.GroupVersionKind, string, string, *string)
|
||||
return common.ErrUnsupportedOp
|
||||
}
|
||||
|
||||
func (m *IngressConfig) constructMcpSseStatefulSessionEnvoyFilter(route *common.WrapperHTTPRoute, namespace string, initGlobalFilter bool) (*config.Config, error) {
|
||||
httpRoute := route.HTTPRoute
|
||||
|
||||
var configPatches []*networking.EnvoyFilter_EnvoyConfigObjectPatch
|
||||
|
||||
// Add global HTTP filter if this is the first route using MCP SSE stateful session
|
||||
if initGlobalFilter {
|
||||
configPatches = append(configPatches, &networking.EnvoyFilter_EnvoyConfigObjectPatch{
|
||||
ApplyTo: networking.EnvoyFilter_HTTP_FILTER,
|
||||
Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{
|
||||
Context: networking.EnvoyFilter_GATEWAY,
|
||||
ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Listener{
|
||||
Listener: &networking.EnvoyFilter_ListenerMatch{
|
||||
FilterChain: &networking.EnvoyFilter_ListenerMatch_FilterChainMatch{
|
||||
Filter: &networking.EnvoyFilter_ListenerMatch_FilterMatch{
|
||||
Name: "envoy.filters.network.http_connection_manager",
|
||||
SubFilter: &networking.EnvoyFilter_ListenerMatch_SubFilterMatch{
|
||||
Name: "envoy.filters.http.router",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Patch: &networking.EnvoyFilter_Patch{
|
||||
Operation: networking.EnvoyFilter_Patch_INSERT_BEFORE,
|
||||
Value: buildPatchStruct(`{
|
||||
"name": "envoy.filters.http.mcp_sse_stateful_session",
|
||||
"typed_config": {
|
||||
"@type": "type.googleapis.com/udpa.type.v1.TypedStruct",
|
||||
"type_url": "type.googleapis.com/envoy.extensions.filters.http.mcp_sse_stateful_session.v3alpha.McpSseStatefulSession"
|
||||
}
|
||||
}`),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Add route-specific configuration
|
||||
configPatches = append(configPatches, &networking.EnvoyFilter_EnvoyConfigObjectPatch{
|
||||
ApplyTo: networking.EnvoyFilter_HTTP_ROUTE,
|
||||
Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{
|
||||
Context: networking.EnvoyFilter_GATEWAY,
|
||||
ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_RouteConfiguration{
|
||||
RouteConfiguration: &networking.EnvoyFilter_RouteConfigurationMatch{
|
||||
Vhost: &networking.EnvoyFilter_RouteConfigurationMatch_VirtualHostMatch{
|
||||
Route: &networking.EnvoyFilter_RouteConfigurationMatch_RouteMatch{
|
||||
Name: httpRoute.Name,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Patch: &networking.EnvoyFilter_Patch{
|
||||
Operation: networking.EnvoyFilter_Patch_MERGE,
|
||||
Value: buildPatchStruct(`{
|
||||
"typed_per_filter_config": {
|
||||
"envoy.filters.http.mcp_sse_stateful_session": {
|
||||
"@type": "type.googleapis.com/udpa.type.v1.TypedStruct",
|
||||
"type_url": "type.googleapis.com/envoy.extensions.filters.http.mcp_sse_stateful_session.v3alpha.McpSseStatefulSessionPerRoute",
|
||||
"value": {
|
||||
"mcp_sse_stateful_session": {
|
||||
"session_state": {
|
||||
"name": "envoy.http.mcp_sse_stateful_session.envelope",
|
||||
"typed_config": {
|
||||
"@type": "type.googleapis.com/udpa.type.v1.TypedStruct",
|
||||
"type_url": "type.googleapis.com/envoy.extensions.http.mcp_sse_stateful_session.envelope.v3alpha.EnvelopeSessionState",
|
||||
"value": {
|
||||
"param_name": "sessionId",
|
||||
"chunk_end_patterns": ["\r\n\r\n", "\n\n", "\r\r"]
|
||||
}
|
||||
}
|
||||
},
|
||||
"strict": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}`),
|
||||
},
|
||||
})
|
||||
|
||||
return &config.Config{
|
||||
Meta: config.Meta{
|
||||
GroupVersionKind: gvk.EnvoyFilter,
|
||||
Name: common.CreateConvertedName(constants.IstioIngressGatewayName, "mcp-lb-route", common.ConvertToDNSLabelValid(httpRoute.Name)),
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: &networking.EnvoyFilter{
|
||||
ConfigPatches: configPatches,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *IngressConfig) notifyXDSFullUpdate(gvk config.GroupVersionKind, reason istiomodel.TriggerReason, updatedConfigName *util.ClusterNamespacedName) {
|
||||
var configsUpdated map[istiomodel.ConfigKey]struct{}
|
||||
if updatedConfigName != nil {
|
||||
|
||||
@@ -66,9 +66,10 @@ type consistentHashByCookie struct {
|
||||
}
|
||||
|
||||
type LoadBalanceConfig struct {
|
||||
simple networking.LoadBalancerSettings_SimpleLB
|
||||
other *consistentHashByOther
|
||||
cookie *consistentHashByCookie
|
||||
simple networking.LoadBalancerSettings_SimpleLB
|
||||
other *consistentHashByOther
|
||||
cookie *consistentHashByCookie
|
||||
McpSseStateful bool
|
||||
}
|
||||
|
||||
type loadBalance struct{}
|
||||
@@ -129,7 +130,11 @@ func (l loadBalance) Parse(annotations Annotations, config *Ingress, _ *GlobalCo
|
||||
} else {
|
||||
if lb, err := annotations.ParseStringASAP(loadBalanceAnnotation); err == nil {
|
||||
lb = strings.ToUpper(lb)
|
||||
loadBalanceConfig.simple = networking.LoadBalancerSettings_SimpleLB(networking.LoadBalancerSettings_SimpleLB_value[lb])
|
||||
if lb == "MCP-SSE" {
|
||||
loadBalanceConfig.McpSseStateful = true
|
||||
} else {
|
||||
loadBalanceConfig.simple = networking.LoadBalancerSettings_SimpleLB(networking.LoadBalancerSettings_SimpleLB_value[lb])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -146,7 +146,7 @@ func GetHost(annotations map[string]string) string {
|
||||
|
||||
// Istio requires that the name of the gateway must conform to the DNS label.
|
||||
// For details, you can view: https://github.com/istio/istio/blob/2d5c40ad5e9cceebe64106005aa38381097da2ba/pkg/config/validation/validation.go#L478
|
||||
func convertToDNSLabelValid(input string) string {
|
||||
func ConvertToDNSLabelValid(input string) string {
|
||||
hasher := md5.New()
|
||||
hasher.Write([]byte(input))
|
||||
hash := hasher.Sum(nil)
|
||||
@@ -156,7 +156,7 @@ func convertToDNSLabelValid(input string) string {
|
||||
|
||||
// CleanHost follow the format of mse-ops for host.
|
||||
func CleanHost(host string) string {
|
||||
return convertToDNSLabelValid(host)
|
||||
return ConvertToDNSLabelValid(host)
|
||||
}
|
||||
|
||||
func CreateConvertedName(items ...string) string {
|
||||
|
||||
Reference in New Issue
Block a user