diff --git a/pkg/ingress/kube/configmap/config.go b/pkg/ingress/kube/configmap/config.go index 5c77289be..1b2bd8afa 100644 --- a/pkg/ingress/kube/configmap/config.go +++ b/pkg/ingress/kube/configmap/config.go @@ -40,6 +40,7 @@ type HigressConfig struct { Upstream *Upstream `json:"upstream,omitempty"` DisableXEnvoyHeaders bool `json:"disableXEnvoyHeaders,omitempty"` AddXRealIpHeader bool `json:"addXRealIpHeader,omitempty"` + McpServer *McpServer `json:"mcpServer,omitempty"` } func NewDefaultHigressConfig() *HigressConfig { @@ -51,6 +52,7 @@ func NewDefaultHigressConfig() *HigressConfig { Upstream: globalOption.Upstream, DisableXEnvoyHeaders: globalOption.DisableXEnvoyHeaders, AddXRealIpHeader: globalOption.AddXRealIpHeader, + McpServer: NewDefaultMcpServer(), } return higressConfig } diff --git a/pkg/ingress/kube/configmap/controller.go b/pkg/ingress/kube/configmap/controller.go index 6505e7655..a46ac4c64 100644 --- a/pkg/ingress/kube/configmap/controller.go +++ b/pkg/ingress/kube/configmap/controller.go @@ -89,6 +89,9 @@ func NewConfigmapMgr(XDSUpdater model.XDSUpdater, namespace string, higressConfi globalOptionController := NewGlobalOptionController(namespace) configmapMgr.AddItemControllers(globalOptionController) + mcpServerController := NewMcpServerController(namespace) + configmapMgr.AddItemControllers(mcpServerController) + configmapMgr.initEventHandlers() return configmapMgr diff --git a/pkg/ingress/kube/configmap/mcp_server.go b/pkg/ingress/kube/configmap/mcp_server.go new file mode 100644 index 000000000..295152ed2 --- /dev/null +++ b/pkg/ingress/kube/configmap/mcp_server.go @@ -0,0 +1,327 @@ +// Copyright (c) 2022 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package configmap + +import ( + "encoding/json" + "errors" + "fmt" + "reflect" + "strings" + "sync/atomic" + + "github.com/alibaba/higress/pkg/ingress/kube/util" + . "github.com/alibaba/higress/pkg/ingress/log" + networking "istio.io/api/networking/v1alpha3" + "istio.io/istio/pkg/config" + "istio.io/istio/pkg/config/schema/gvk" +) + +// RedisConfig defines the configuration for Redis connection +type RedisConfig struct { + // The address of Redis server in the format of "host:port" + Address string `json:"address,omitempty"` + // The username for Redis authentication + Username string `json:"username,omitempty"` + // The password for Redis authentication + Password string `json:"password,omitempty"` + // The database index to use + DB int `json:"db,omitempty"` +} + +// SSEServer defines the configuration for Server-Sent Events (SSE) server +type SSEServer struct { + // The name of the SSE server + Name string `json:"name,omitempty"` + // The path where the SSE server will be mounted, the full path is (PATH + SsePathSuffix) + Path string `json:"path,omitempty"` + // The type of the SSE server + Type string `json:"type,omitempty"` + // Additional Config parameters for the real MCP server implementation + Config map[string]interface{} `json:"config,omitempty"` +} + +// McpServer defines the configuration for MCP (Model Context Protocol) server +type McpServer struct { + // Flag to control whether MCP server is enabled + Enable bool `json:"enable,omitempty"` + // Redis Config for MCP server + Redis *RedisConfig `json:"redis,omitempty"` + // The suffix to be appended to SSE paths, default is "/sse" + SsePathSuffix string `json:"sse_path_suffix,omitempty"` + // List of SSE servers Configs + Servers []*SSEServer `json:"servers,omitempty"` +} + +func NewDefaultMcpServer() *McpServer { + return &McpServer{Enable: false} +} + +const ( + higressMcpServerEnvoyFilterName = "higress-config-mcp-server" +) + +func validMcpServer(m *McpServer) error { + if m == nil { + return nil + } + + if m.Enable && m.Redis == nil { + return errors.New("redis config cannot be empty when mcp server is enabled") + } + + return nil +} + +func compareMcpServer(old *McpServer, new *McpServer) (Result, error) { + if old == nil && new == nil { + return ResultNothing, nil + } + + if new == nil { + return ResultDelete, nil + } + + if !reflect.DeepEqual(old, new) { + return ResultReplace, nil + } + + return ResultNothing, nil +} + +func deepCopyMcpServer(mcp *McpServer) (*McpServer, error) { + newMcp := NewDefaultMcpServer() + newMcp.Enable = mcp.Enable + + if mcp.Redis != nil { + newMcp.Redis = &RedisConfig{ + Address: mcp.Redis.Address, + Username: mcp.Redis.Username, + Password: mcp.Redis.Password, + DB: mcp.Redis.DB, + } + } + + newMcp.SsePathSuffix = mcp.SsePathSuffix + + if len(mcp.Servers) > 0 { + newMcp.Servers = make([]*SSEServer, len(mcp.Servers)) + for i, server := range mcp.Servers { + newServer := &SSEServer{ + Name: server.Name, + Path: server.Path, + Type: server.Type, + } + if server.Config != nil { + newServer.Config = make(map[string]interface{}) + for k, v := range server.Config { + newServer.Config[k] = v + } + } + newMcp.Servers[i] = newServer + } + } + + return newMcp, nil +} + +type McpServerController struct { + Namespace string + mcpServer atomic.Value + Name string + eventHandler ItemEventHandler +} + +func NewMcpServerController(namespace string) *McpServerController { + mcpController := &McpServerController{ + Namespace: namespace, + mcpServer: atomic.Value{}, + Name: "mcpServer", + } + mcpController.SetMcpServer(NewDefaultMcpServer()) + return mcpController +} + +func (m *McpServerController) GetName() string { + return m.Name +} + +func (m *McpServerController) SetMcpServer(mcp *McpServer) { + m.mcpServer.Store(mcp) +} + +func (m *McpServerController) GetMcpServer() *McpServer { + value := m.mcpServer.Load() + if value != nil { + if mcp, ok := value.(*McpServer); ok { + return mcp + } + } + return nil +} + +func (m *McpServerController) AddOrUpdateHigressConfig(name util.ClusterNamespacedName, old *HigressConfig, new *HigressConfig) error { + if err := validMcpServer(new.McpServer); err != nil { + IngressLog.Errorf("data:%+v convert to mcp server, error: %+v", new.McpServer, err) + return nil + } + + result, _ := compareMcpServer(old.McpServer, new.McpServer) + + switch result { + case ResultReplace: + if newMcp, err := deepCopyMcpServer(new.McpServer); err != nil { + IngressLog.Infof("mcp server deepcopy error:%v", err) + } else { + m.SetMcpServer(newMcp) + IngressLog.Infof("AddOrUpdate Higress config mcp server") + m.eventHandler(higressMcpServerEnvoyFilterName) + IngressLog.Infof("send event with filter name:%s", higressMcpServerEnvoyFilterName) + } + case ResultDelete: + m.SetMcpServer(NewDefaultMcpServer()) + IngressLog.Infof("Delete Higress config mcp server") + m.eventHandler(higressMcpServerEnvoyFilterName) + IngressLog.Infof("send event with filter name:%s", higressMcpServerEnvoyFilterName) + } + + return nil +} + +func (m *McpServerController) ValidHigressConfig(higressConfig *HigressConfig) error { + if higressConfig == nil { + return nil + } + if higressConfig.McpServer == nil { + return nil + } + + return validMcpServer(higressConfig.McpServer) +} + +func (m *McpServerController) RegisterItemEventHandler(eventHandler ItemEventHandler) { + m.eventHandler = eventHandler +} + +func (m *McpServerController) ConstructEnvoyFilters() ([]*config.Config, error) { + configs := make([]*config.Config, 0) + mcpServer := m.GetMcpServer() + namespace := m.Namespace + + if mcpServer == nil || !mcpServer.Enable { + return configs, nil + } + + mcpStruct := m.constructMcpServerStruct(mcpServer) + if mcpStruct == "" { + return configs, nil + } + + config := &config.Config{ + Meta: config.Meta{ + GroupVersionKind: gvk.EnvoyFilter, + Name: higressMcpServerEnvoyFilterName, + Namespace: namespace, + }, + Spec: &networking.EnvoyFilter{ + ConfigPatches: []*networking.EnvoyFilter_EnvoyConfigObjectPatch{ + { + ApplyTo: networking.EnvoyFilter_HTTP_FILTER, + Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{ + Context: networking.EnvoyFilter_GATEWAY, + ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Listener{ + Listener: &networking.EnvoyFilter_ListenerMatch{ + FilterChain: &networking.EnvoyFilter_ListenerMatch_FilterChainMatch{ + Filter: &networking.EnvoyFilter_ListenerMatch_FilterMatch{ + Name: "envoy.filters.network.http_connection_manager", + SubFilter: &networking.EnvoyFilter_ListenerMatch_SubFilterMatch{ + Name: "envoy.filters.http.cors", + }, + }, + }, + }, + }, + }, + Patch: &networking.EnvoyFilter_Patch{ + Operation: networking.EnvoyFilter_Patch_INSERT_AFTER, + Value: util.BuildPatchStruct(mcpStruct), + }, + }, + }, + }, + } + + configs = append(configs, config) + return configs, nil +} + +func (m *McpServerController) constructMcpServerStruct(mcp *McpServer) string { + // 构建 servers 配置 + servers := "[]" + if len(mcp.Servers) > 0 { + serverConfigs := make([]string, len(mcp.Servers)) + for i, server := range mcp.Servers { + serverConfig := fmt.Sprintf(`{ + "name": "%s", + "path": "%s", + "type": "%s"`, + server.Name, server.Path, server.Type) + + if len(server.Config) > 0 { + config, _ := json.Marshal(server.Config) + serverConfig += fmt.Sprintf(`, + "config": %s`, string(config)) + } + + serverConfig += "}" + serverConfigs[i] = serverConfig + } + servers = fmt.Sprintf("[%s]", strings.Join(serverConfigs, ",")) + } + + // 构建完整的配置结构 + structFmt := `{ + "name": "envoy.filters.http.golang", + "typed_config": { + "@type": "type.googleapis.com/envoy.extensions.filters.http.golang.v3alpha.Config", + "value": { + "library_id": "mcp-server", + "library_path": "/var/lib/istio/envoy/mcp-server.so", + "plugin_name": "mcp-server", + "plugin_config": { + "@type": "type.googleapis.com/xds.type.v3.TypedStruct", + "value": { + "redis": { + "address": "%s", + "username": "%s", + "password": "%s", + "db": %d + }, + "sse_path_suffix": "%s", + "servers": %s + } + } + } + } + }` + + return fmt.Sprintf(structFmt, + mcp.Redis.Address, + mcp.Redis.Username, + mcp.Redis.Password, + mcp.Redis.DB, + mcp.SsePathSuffix, + servers) +} diff --git a/pkg/ingress/kube/configmap/mcp_server_test.go b/pkg/ingress/kube/configmap/mcp_server_test.go new file mode 100644 index 000000000..8812dac98 --- /dev/null +++ b/pkg/ingress/kube/configmap/mcp_server_test.go @@ -0,0 +1,354 @@ +// Copyright (c) 2022 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package configmap + +import ( + "errors" + "testing" + + "github.com/alibaba/higress/pkg/ingress/kube/util" + "github.com/stretchr/testify/assert" +) + +func Test_validMcpServer(t *testing.T) { + tests := []struct { + name string + mcp *McpServer + wantErr error + }{ + { + name: "default", + mcp: &McpServer{ + Enable: false, + }, + wantErr: nil, + }, + { + name: "nil", + mcp: nil, + wantErr: nil, + }, + { + name: "enabled but no redis config", + mcp: &McpServer{ + Enable: true, + Redis: nil, + }, + wantErr: errors.New("redis config cannot be empty when mcp server is enabled"), + }, + { + name: "valid config with redis", + mcp: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + Username: "default", + Password: "password", + DB: 0, + }, + SsePathSuffix: "/sse", + Servers: []*SSEServer{ + { + Name: "test-server", + Path: "/test", + Type: "test", + Config: map[string]interface{}{ + "key": "value", + }, + }, + }, + }, + wantErr: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validMcpServer(tt.mcp) + assert.Equal(t, tt.wantErr, err) + }) + } +} + +func Test_compareMcpServer(t *testing.T) { + tests := []struct { + name string + old *McpServer + new *McpServer + wantResult Result + wantErr error + }{ + { + name: "compare both nil", + old: nil, + new: nil, + wantResult: ResultNothing, + wantErr: nil, + }, + { + name: "compare result delete", + old: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + }, + }, + new: nil, + wantResult: ResultDelete, + wantErr: nil, + }, + { + name: "compare result equal", + old: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + }, + }, + new: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + }, + }, + wantResult: ResultNothing, + wantErr: nil, + }, + { + name: "compare result replace", + old: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + }, + }, + new: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "redis:6379", + }, + }, + wantResult: ResultReplace, + wantErr: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := compareMcpServer(tt.old, tt.new) + assert.Equal(t, tt.wantResult, result) + assert.Equal(t, tt.wantErr, err) + }) + } +} + +func Test_deepCopyMcpServer(t *testing.T) { + tests := []struct { + name string + mcp *McpServer + wantMcp *McpServer + wantErr error + }{ + { + name: "deep copy with redis only", + mcp: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + Username: "default", + Password: "password", + DB: 0, + }, + }, + wantMcp: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + Username: "default", + Password: "password", + DB: 0, + }, + }, + wantErr: nil, + }, + { + name: "deep copy with full config", + mcp: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + Username: "default", + Password: "password", + DB: 0, + }, + SsePathSuffix: "/sse", + Servers: []*SSEServer{ + { + Name: "test-server", + Path: "/test", + Type: "test", + Config: map[string]interface{}{ + "key": "value", + }, + }, + }, + }, + wantMcp: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + Username: "default", + Password: "password", + DB: 0, + }, + SsePathSuffix: "/sse", + Servers: []*SSEServer{ + { + Name: "test-server", + Path: "/test", + Type: "test", + Config: map[string]interface{}{ + "key": "value", + }, + }, + }, + }, + wantErr: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mcp, err := deepCopyMcpServer(tt.mcp) + assert.Equal(t, tt.wantMcp, mcp) + assert.Equal(t, tt.wantErr, err) + }) + } +} + +func TestMcpServerController_AddOrUpdateHigressConfig(t *testing.T) { + eventPush := "default" + defaultHandler := func(name string) { + eventPush = "push" + } + + defaultName := util.ClusterNamespacedName{} + + tests := []struct { + name string + old *HigressConfig + new *HigressConfig + wantErr error + wantEventPush string + wantMcp *McpServer + }{ + { + name: "default", + old: &HigressConfig{ + McpServer: NewDefaultMcpServer(), + }, + new: &HigressConfig{ + McpServer: NewDefaultMcpServer(), + }, + wantErr: nil, + wantEventPush: "default", + wantMcp: NewDefaultMcpServer(), + }, + { + name: "replace and push - enable mcp server", + old: &HigressConfig{ + McpServer: NewDefaultMcpServer(), + }, + new: &HigressConfig{ + McpServer: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + Username: "default", + Password: "password", + DB: 0, + }, + }, + }, + wantErr: nil, + wantEventPush: "push", + wantMcp: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + Username: "default", + Password: "password", + DB: 0, + }, + }, + }, + { + name: "replace and push - update config", + old: &HigressConfig{ + McpServer: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + }, + }, + }, + new: &HigressConfig{ + McpServer: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "redis:6379", + }, + }, + }, + wantErr: nil, + wantEventPush: "push", + wantMcp: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "redis:6379", + }, + }, + }, + { + name: "delete and push", + old: &HigressConfig{ + McpServer: &McpServer{ + Enable: true, + Redis: &RedisConfig{ + Address: "localhost:6379", + }, + }, + }, + new: &HigressConfig{ + McpServer: nil, + }, + wantErr: nil, + wantEventPush: "push", + wantMcp: NewDefaultMcpServer(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := NewMcpServerController("higress-system") + m.eventHandler = defaultHandler + eventPush = "default" + err := m.AddOrUpdateHigressConfig(defaultName, tt.old, tt.new) + assert.Equal(t, tt.wantEventPush, eventPush) + assert.Equal(t, tt.wantErr, err) + assert.Equal(t, tt.wantMcp, m.GetMcpServer()) + }) + } +}