From d2f09fe8c5a962e87ea5b08e3606a1e6d790d5c7 Mon Sep 17 00:00:00 2001 From: EricaLiu <30773688+Erica177@users.noreply.github.com> Date: Tue, 10 Jun 2025 17:11:34 +0800 Subject: [PATCH] fix: refactored mcp server auto discovery logic and fix some issue (#2382) Co-authored-by: johnlanni --- .../customresourcedefinitions.gen.yaml | 15 + api/networking/v1/mcp_bridge.pb.go | 192 +++- api/networking/v1/mcp_bridge.proto | 7 + api/networking/v1/mcp_bridge_deepcopy.gen.go | 21 + api/networking/v1/mcp_bridge_json.gen.go | 11 + go.mod | 4 +- go.sum | 4 +- pkg/ingress/config/ingress_config.go | 17 +- registry/mcp_model.go | 27 +- registry/nacos/mcpserver/client.go | 546 +++++++++++ registry/nacos/mcpserver/util.go | 180 ---- registry/nacos/mcpserver/watcher.go | 890 +++++++----------- registry/nacos/mcpserver/watcher_test.go | 585 ++++++++++++ registry/nacos/v2/watcher.go | 111 ++- registry/reconcile/reconcile.go | 44 +- 15 files changed, 1822 insertions(+), 832 deletions(-) create mode 100644 registry/nacos/mcpserver/client.go delete mode 100644 registry/nacos/mcpserver/util.go create mode 100644 registry/nacos/mcpserver/watcher_test.go diff --git a/api/kubernetes/customresourcedefinitions.gen.yaml b/api/kubernetes/customresourcedefinitions.gen.yaml index 6e4984fad..8e310a7fe 100644 --- a/api/kubernetes/customresourcedefinitions.gen.yaml +++ b/api/kubernetes/customresourcedefinitions.gen.yaml @@ -250,6 +250,10 @@ spec: registries: items: properties: + allowMcpServers: + items: + type: string + type: array authSecretName: type: string consulDatacenter: @@ -265,12 +269,23 @@ spec: type: string enableMCPServer: type: boolean + enableScopeMcpServers: + type: boolean mcpServerBaseUrl: type: string mcpServerExportDomains: items: type: string type: array + metadata: + additionalProperties: + properties: + innerMap: + additionalProperties: + type: string + type: object + type: object + type: object nacosAccessKey: type: string nacosAddressServer: diff --git a/api/networking/v1/mcp_bridge.pb.go b/api/networking/v1/mcp_bridge.pb.go index e2cc839b3..bf6d91c38 100644 --- a/api/networking/v1/mcp_bridge.pb.go +++ b/api/networking/v1/mcp_bridge.pb.go @@ -111,28 +111,31 @@ type RegistryConfig struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` - Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` - Domain string `protobuf:"bytes,3,opt,name=domain,proto3" json:"domain,omitempty"` - Port uint32 `protobuf:"varint,4,opt,name=port,proto3" json:"port,omitempty"` - NacosAddressServer string `protobuf:"bytes,5,opt,name=nacosAddressServer,proto3" json:"nacosAddressServer,omitempty"` - NacosAccessKey string `protobuf:"bytes,6,opt,name=nacosAccessKey,proto3" json:"nacosAccessKey,omitempty"` - NacosSecretKey string `protobuf:"bytes,7,opt,name=nacosSecretKey,proto3" json:"nacosSecretKey,omitempty"` - NacosNamespaceId string `protobuf:"bytes,8,opt,name=nacosNamespaceId,proto3" json:"nacosNamespaceId,omitempty"` - NacosNamespace string `protobuf:"bytes,9,opt,name=nacosNamespace,proto3" json:"nacosNamespace,omitempty"` - NacosGroups []string `protobuf:"bytes,10,rep,name=nacosGroups,proto3" json:"nacosGroups,omitempty"` - NacosRefreshInterval int64 `protobuf:"varint,11,opt,name=nacosRefreshInterval,proto3" json:"nacosRefreshInterval,omitempty"` - ConsulNamespace string `protobuf:"bytes,12,opt,name=consulNamespace,proto3" json:"consulNamespace,omitempty"` - ZkServicesPath []string `protobuf:"bytes,13,rep,name=zkServicesPath,proto3" json:"zkServicesPath,omitempty"` - ConsulDatacenter string `protobuf:"bytes,14,opt,name=consulDatacenter,proto3" json:"consulDatacenter,omitempty"` - ConsulServiceTag string `protobuf:"bytes,15,opt,name=consulServiceTag,proto3" json:"consulServiceTag,omitempty"` - ConsulRefreshInterval int64 `protobuf:"varint,16,opt,name=consulRefreshInterval,proto3" json:"consulRefreshInterval,omitempty"` - AuthSecretName string `protobuf:"bytes,17,opt,name=authSecretName,proto3" json:"authSecretName,omitempty"` - Protocol string `protobuf:"bytes,18,opt,name=protocol,proto3" json:"protocol,omitempty"` - Sni string `protobuf:"bytes,19,opt,name=sni,proto3" json:"sni,omitempty"` - McpServerExportDomains []string `protobuf:"bytes,20,rep,name=mcpServerExportDomains,proto3" json:"mcpServerExportDomains,omitempty"` - McpServerBaseUrl string `protobuf:"bytes,21,opt,name=mcpServerBaseUrl,proto3" json:"mcpServerBaseUrl,omitempty"` - EnableMCPServer *wrappers.BoolValue `protobuf:"bytes,22,opt,name=enableMCPServer,proto3" json:"enableMCPServer,omitempty"` + Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Domain string `protobuf:"bytes,3,opt,name=domain,proto3" json:"domain,omitempty"` + Port uint32 `protobuf:"varint,4,opt,name=port,proto3" json:"port,omitempty"` + NacosAddressServer string `protobuf:"bytes,5,opt,name=nacosAddressServer,proto3" json:"nacosAddressServer,omitempty"` + NacosAccessKey string `protobuf:"bytes,6,opt,name=nacosAccessKey,proto3" json:"nacosAccessKey,omitempty"` + NacosSecretKey string `protobuf:"bytes,7,opt,name=nacosSecretKey,proto3" json:"nacosSecretKey,omitempty"` + NacosNamespaceId string `protobuf:"bytes,8,opt,name=nacosNamespaceId,proto3" json:"nacosNamespaceId,omitempty"` + NacosNamespace string `protobuf:"bytes,9,opt,name=nacosNamespace,proto3" json:"nacosNamespace,omitempty"` + NacosGroups []string `protobuf:"bytes,10,rep,name=nacosGroups,proto3" json:"nacosGroups,omitempty"` + NacosRefreshInterval int64 `protobuf:"varint,11,opt,name=nacosRefreshInterval,proto3" json:"nacosRefreshInterval,omitempty"` + ConsulNamespace string `protobuf:"bytes,12,opt,name=consulNamespace,proto3" json:"consulNamespace,omitempty"` + ZkServicesPath []string `protobuf:"bytes,13,rep,name=zkServicesPath,proto3" json:"zkServicesPath,omitempty"` + ConsulDatacenter string `protobuf:"bytes,14,opt,name=consulDatacenter,proto3" json:"consulDatacenter,omitempty"` + ConsulServiceTag string `protobuf:"bytes,15,opt,name=consulServiceTag,proto3" json:"consulServiceTag,omitempty"` + ConsulRefreshInterval int64 `protobuf:"varint,16,opt,name=consulRefreshInterval,proto3" json:"consulRefreshInterval,omitempty"` + AuthSecretName string `protobuf:"bytes,17,opt,name=authSecretName,proto3" json:"authSecretName,omitempty"` + Protocol string `protobuf:"bytes,18,opt,name=protocol,proto3" json:"protocol,omitempty"` + Sni string `protobuf:"bytes,19,opt,name=sni,proto3" json:"sni,omitempty"` + McpServerExportDomains []string `protobuf:"bytes,20,rep,name=mcpServerExportDomains,proto3" json:"mcpServerExportDomains,omitempty"` + McpServerBaseUrl string `protobuf:"bytes,21,opt,name=mcpServerBaseUrl,proto3" json:"mcpServerBaseUrl,omitempty"` + EnableMCPServer *wrappers.BoolValue `protobuf:"bytes,22,opt,name=enableMCPServer,proto3" json:"enableMCPServer,omitempty"` + EnableScopeMcpServers *wrappers.BoolValue `protobuf:"bytes,23,opt,name=enableScopeMcpServers,proto3" json:"enableScopeMcpServers,omitempty"` + AllowMcpServers []string `protobuf:"bytes,24,rep,name=allowMcpServers,proto3" json:"allowMcpServers,omitempty"` + Metadata map[string]*InnerMap `protobuf:"bytes,25,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (x *RegistryConfig) Reset() { @@ -321,6 +324,74 @@ func (x *RegistryConfig) GetEnableMCPServer() *wrappers.BoolValue { return nil } +func (x *RegistryConfig) GetEnableScopeMcpServers() *wrappers.BoolValue { + if x != nil { + return x.EnableScopeMcpServers + } + return nil +} + +func (x *RegistryConfig) GetAllowMcpServers() []string { + if x != nil { + return x.AllowMcpServers + } + return nil +} + +func (x *RegistryConfig) GetMetadata() map[string]*InnerMap { + if x != nil { + return x.Metadata + } + return nil +} + +type InnerMap struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + InnerMap map[string]string `protobuf:"bytes,1,rep,name=inner_map,json=innerMap,proto3" json:"inner_map,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *InnerMap) Reset() { + *x = InnerMap{} + if protoimpl.UnsafeEnabled { + mi := &file_networking_v1_mcp_bridge_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *InnerMap) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*InnerMap) ProtoMessage() {} + +func (x *InnerMap) ProtoReflect() protoreflect.Message { + mi := &file_networking_v1_mcp_bridge_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use InnerMap.ProtoReflect.Descriptor instead. +func (*InnerMap) Descriptor() ([]byte, []int) { + return file_networking_v1_mcp_bridge_proto_rawDescGZIP(), []int{2} +} + +func (x *InnerMap) GetInnerMap() map[string]string { + if x != nil { + return x.InnerMap + } + return nil +} + var File_networking_v1_mcp_bridge_proto protoreflect.FileDescriptor var file_networking_v1_mcp_bridge_proto_rawDesc = []byte{ @@ -338,7 +409,7 @@ var file_networking_v1_mcp_bridge_proto_rawDesc = []byte{ 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x68, 0x69, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0a, - 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x69, 0x65, 0x73, 0x22, 0xfd, 0x06, 0x0a, 0x0e, 0x52, + 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x69, 0x65, 0x73, 0x22, 0xa8, 0x09, 0x0a, 0x0e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x17, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x03, 0xe0, 0x41, 0x02, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, @@ -394,11 +465,39 @@ var file_networking_v1_mcp_bridge_proto_rawDesc = []byte{ 0x50, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x18, 0x16, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x0f, 0x65, 0x6e, 0x61, 0x62, 0x6c, - 0x65, 0x4d, 0x43, 0x50, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x6c, 0x69, 0x62, 0x61, 0x62, 0x61, - 0x2f, 0x68, 0x69, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6e, 0x65, 0x74, - 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x65, 0x4d, 0x43, 0x50, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x50, 0x0a, 0x15, 0x65, 0x6e, + 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x6f, 0x70, 0x65, 0x4d, 0x63, 0x70, 0x53, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x73, 0x18, 0x17, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x15, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x6f, + 0x70, 0x65, 0x4d, 0x63, 0x70, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x12, 0x28, 0x0a, 0x0f, + 0x61, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x63, 0x70, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x18, + 0x18, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x63, 0x70, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x12, 0x4f, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0x18, 0x19, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x33, 0x2e, 0x68, 0x69, 0x67, 0x72, 0x65, + 0x73, 0x73, 0x2e, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, + 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, + 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x5c, 0x0a, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x35, 0x0a, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x68, 0x69, 0x67, 0x72, + 0x65, 0x73, 0x73, 0x2e, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x76, + 0x31, 0x2e, 0x49, 0x6e, 0x6e, 0x65, 0x72, 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x93, 0x01, 0x0a, 0x08, 0x49, 0x6e, 0x6e, 0x65, 0x72, 0x4d, + 0x61, 0x70, 0x12, 0x4a, 0x0a, 0x09, 0x69, 0x6e, 0x6e, 0x65, 0x72, 0x5f, 0x6d, 0x61, 0x70, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x68, 0x69, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, + 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, + 0x6e, 0x65, 0x72, 0x4d, 0x61, 0x70, 0x2e, 0x49, 0x6e, 0x6e, 0x65, 0x72, 0x4d, 0x61, 0x70, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x69, 0x6e, 0x6e, 0x65, 0x72, 0x4d, 0x61, 0x70, 0x1a, 0x3b, + 0x0a, 0x0d, 0x49, 0x6e, 0x6e, 0x65, 0x72, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, + 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, + 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x2e, 0x5a, 0x2c, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x6c, 0x69, 0x62, 0x61, 0x62, + 0x61, 0x2f, 0x68, 0x69, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6e, 0x65, + 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -413,20 +512,27 @@ func file_networking_v1_mcp_bridge_proto_rawDescGZIP() []byte { return file_networking_v1_mcp_bridge_proto_rawDescData } -var file_networking_v1_mcp_bridge_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_networking_v1_mcp_bridge_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_networking_v1_mcp_bridge_proto_goTypes = []interface{}{ (*McpBridge)(nil), // 0: higress.networking.v1.McpBridge (*RegistryConfig)(nil), // 1: higress.networking.v1.RegistryConfig - (*wrappers.BoolValue)(nil), // 2: google.protobuf.BoolValue + (*InnerMap)(nil), // 2: higress.networking.v1.InnerMap + nil, // 3: higress.networking.v1.RegistryConfig.MetadataEntry + nil, // 4: higress.networking.v1.InnerMap.InnerMapEntry + (*wrappers.BoolValue)(nil), // 5: google.protobuf.BoolValue } var file_networking_v1_mcp_bridge_proto_depIdxs = []int32{ 1, // 0: higress.networking.v1.McpBridge.registries:type_name -> higress.networking.v1.RegistryConfig - 2, // 1: higress.networking.v1.RegistryConfig.enableMCPServer:type_name -> google.protobuf.BoolValue - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 5, // 1: higress.networking.v1.RegistryConfig.enableMCPServer:type_name -> google.protobuf.BoolValue + 5, // 2: higress.networking.v1.RegistryConfig.enableScopeMcpServers:type_name -> google.protobuf.BoolValue + 3, // 3: higress.networking.v1.RegistryConfig.metadata:type_name -> higress.networking.v1.RegistryConfig.MetadataEntry + 4, // 4: higress.networking.v1.InnerMap.inner_map:type_name -> higress.networking.v1.InnerMap.InnerMapEntry + 2, // 5: higress.networking.v1.RegistryConfig.MetadataEntry.value:type_name -> higress.networking.v1.InnerMap + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_networking_v1_mcp_bridge_proto_init() } @@ -459,6 +565,18 @@ func file_networking_v1_mcp_bridge_proto_init() { return nil } } + file_networking_v1_mcp_bridge_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*InnerMap); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -466,7 +584,7 @@ func file_networking_v1_mcp_bridge_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_networking_v1_mcp_bridge_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 5, NumExtensions: 0, NumServices: 0, }, diff --git a/api/networking/v1/mcp_bridge.proto b/api/networking/v1/mcp_bridge.proto index d652d49e8..88dafe7e0 100644 --- a/api/networking/v1/mcp_bridge.proto +++ b/api/networking/v1/mcp_bridge.proto @@ -71,4 +71,11 @@ message RegistryConfig { repeated string mcpServerExportDomains = 20; string mcpServerBaseUrl = 21; google.protobuf.BoolValue enableMCPServer = 22; + google.protobuf.BoolValue enableScopeMcpServers = 23; + repeated string allowMcpServers = 24; + map metadata = 25; } + +message InnerMap { + map inner_map = 1; +} \ No newline at end of file diff --git a/api/networking/v1/mcp_bridge_deepcopy.gen.go b/api/networking/v1/mcp_bridge_deepcopy.gen.go index 3abbc520e..d30821100 100644 --- a/api/networking/v1/mcp_bridge_deepcopy.gen.go +++ b/api/networking/v1/mcp_bridge_deepcopy.gen.go @@ -46,3 +46,24 @@ func (in *RegistryConfig) DeepCopy() *RegistryConfig { func (in *RegistryConfig) DeepCopyInterface() interface{} { return in.DeepCopy() } + +// DeepCopyInto supports using InnerMap within kubernetes types, where deepcopy-gen is used. +func (in *InnerMap) DeepCopyInto(out *InnerMap) { + p := proto.Clone(in).(*InnerMap) + *out = *p +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InnerMap. Required by controller-gen. +func (in *InnerMap) DeepCopy() *InnerMap { + if in == nil { + return nil + } + out := new(InnerMap) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInterface is an autogenerated deepcopy function, copying the receiver, creating a new InnerMap. Required by controller-gen. +func (in *InnerMap) DeepCopyInterface() interface{} { + return in.DeepCopy() +} diff --git a/api/networking/v1/mcp_bridge_json.gen.go b/api/networking/v1/mcp_bridge_json.gen.go index 65b9bc860..91c3ad5fe 100644 --- a/api/networking/v1/mcp_bridge_json.gen.go +++ b/api/networking/v1/mcp_bridge_json.gen.go @@ -28,6 +28,17 @@ func (this *RegistryConfig) UnmarshalJSON(b []byte) error { return McpBridgeUnmarshaler.Unmarshal(bytes.NewReader(b), this) } +// MarshalJSON is a custom marshaler for InnerMap +func (this *InnerMap) MarshalJSON() ([]byte, error) { + str, err := McpBridgeMarshaler.MarshalToString(this) + return []byte(str), err +} + +// UnmarshalJSON is a custom unmarshaler for InnerMap +func (this *InnerMap) UnmarshalJSON(b []byte) error { + return McpBridgeUnmarshaler.Unmarshal(bytes.NewReader(b), this) +} + var ( McpBridgeMarshaler = &jsonpb.Marshaler{} McpBridgeUnmarshaler = &jsonpb.Unmarshaler{AllowUnknownFields: true} diff --git a/go.mod b/go.mod index eb7a2121a..15cd363cd 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/hudl/fargo v1.4.0 github.com/mholt/acmez v1.2.0 github.com/nacos-group/nacos-sdk-go v1.0.8 - github.com/nacos-group/nacos-sdk-go/v2 v2.1.2 + github.com/nacos-group/nacos-sdk-go/v2 v2.3.2 github.com/onsi/gomega v1.27.10 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 @@ -202,6 +202,7 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cast v1.5.1 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/tetratelabs/wazero v1.7.3 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect @@ -274,6 +275,5 @@ replace github.com/caddyserver/certmagic => github.com/2456868764/certmagic v1.0 replace ( github.com/dubbogo/gost => github.com/johnlanni/gost v1.11.23-0.20220713132522-0967a24036c6 - github.com/nacos-group/nacos-sdk-go/v2 => github.com/luoxiner/nacos-sdk-go/v2 v2.2.9-60 golang.org/x/exp => golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 ) diff --git a/go.sum b/go.sum index 75c09a7b6..9ef3523fb 100644 --- a/go.sum +++ b/go.sum @@ -1434,8 +1434,6 @@ github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhn github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= -github.com/luoxiner/nacos-sdk-go/v2 v2.2.9-60 h1:FA/azfz2nSkMc1XR8LeqhcAiA/2/sOMcyBGYCTUc+Cs= -github.com/luoxiner/nacos-sdk-go/v2 v2.2.9-60/go.mod h1:9FKXl6FqOiVmm72i8kADtbeK71egyG9y3uRDBg41tpQ= github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-star/v2 v2.0.1/go.mod h1:RcCdONR2ScXaYnQC5tUzxzlpA3WVYF7/opLeUgcQs/o= github.com/lyft/protoc-gen-star/v2 v2.0.3/go.mod h1:amey7yeodaJhXSbf/TlLvWiqQfLOSpEk//mLlc+axEk= @@ -1525,6 +1523,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/nacos-group/nacos-sdk-go v1.0.8 h1:8pEm05Cdav9sQgJSv5kyvlgfz0SzFUUGI3pWX6SiSnM= github.com/nacos-group/nacos-sdk-go v1.0.8/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= +github.com/nacos-group/nacos-sdk-go/v2 v2.3.2 h1:9QB2nCJzT5wkTVlxNYl3XL/7+G6p2USMi2gQh/ouQQo= +github.com/nacos-group/nacos-sdk-go/v2 v2.3.2/go.mod h1:9FKXl6FqOiVmm72i8kADtbeK71egyG9y3uRDBg41tpQ= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= diff --git a/pkg/ingress/config/ingress_config.go b/pkg/ingress/config/ingress_config.go index 6779bb9cb..5875b3092 100644 --- a/pkg/ingress/config/ingress_config.go +++ b/pkg/ingress/config/ingress_config.go @@ -802,9 +802,20 @@ func (m *IngressConfig) convertDestinationRule(configs []common.WrapperConfig) [ if !exist { destinationRules[serviceName] = destinationRuleWrapper } else if dr.DestinationRule.TrafficPolicy != nil { - if dr.DestinationRule.TrafficPolicy.LoadBalancer == nil && - destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer != nil { - dr.DestinationRule.TrafficPolicy.LoadBalancer = destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer + // if the service is referenced by an sse type mcp server, an source ip based consistent hashing policy needs to be configured + // consistent hashing policy will be generated by mcp server watcher, then if service do not have LoadBalancer settings, it will be merged + if destinationRuleWrapper.DestinationRule.TrafficPolicy != nil && destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer != nil { + if dr.DestinationRule.TrafficPolicy.LoadBalancer == nil { + dr.DestinationRule.TrafficPolicy.LoadBalancer = destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer + } else if dr.DestinationRule.TrafficPolicy.LoadBalancer.LbPolicy == nil { + dr.DestinationRule.TrafficPolicy.LoadBalancer.LbPolicy = destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer.LbPolicy + } + } + // if the service is referenced by an https type mcp server, an client side simple mode tls policy needs to be configured + // simple mode tls policy will be generated by mcp server watcher, then if service do not have tls settings, it will be merged + if dr.DestinationRule.TrafficPolicy.Tls == nil && destinationRuleWrapper.DestinationRule.TrafficPolicy != nil && + destinationRuleWrapper.DestinationRule.TrafficPolicy.Tls != nil { + dr.DestinationRule.TrafficPolicy.Tls = destinationRuleWrapper.DestinationRule.TrafficPolicy.Tls } portTrafficPolicy := destinationRuleWrapper.DestinationRule.TrafficPolicy.PortLevelSettings[0] portUpdated := false diff --git a/registry/mcp_model.go b/registry/mcp_model.go index e53a5a9df..8e98f8c73 100644 --- a/registry/mcp_model.go +++ b/registry/mcp_model.go @@ -24,12 +24,9 @@ const ( IstioMcpAutoGeneratedHttpRouteName = IstioMcpAutoGeneratedPrefix + "-httproute" IstioMcpAutoGeneratedMcpServerName = IstioMcpAutoGeneratedPrefix + "-mcpserver" - DefaultMcpToolsGroup = "mcp-tools" - DefaultMcpCredentialsGroup = "credentials" - DefaultNacosServiceNamespace = "public" - StdioProtocol = "stdio" HttpProtocol = "http" + HttpsProtocol = "https" DubboProtocol = "dubbo" McpSSEProtocol = "mcp-sse" McpStreambleProtocol = "mcp-streamble" @@ -39,19 +36,19 @@ type McpToolArgsType string // WasmPluginConfig Struct for mcp tool wasm plugin marshal type WasmPluginConfig struct { - Rules []*McpServerRule `json:"_rules_"` + Rules []*McpServerRule `json:"_rules_,omitempty"` } type McpServerRule struct { MatchRoute []string `json:"_match_route_,omitempty"` - Server *ServerConfig `json:"server"` - Tools []*McpTool `json:"tools"` + Server *ServerConfig `json:"server,omitempty"` + Tools []*McpTool `json:"tools,omitempty"` + AllowTools []string `json:"allowTools,omitempty"` } type ServerConfig struct { - Name string `json:"name,omitempty"` - Config map[string]interface{} `json:"config,omitempty"` - AllowTools []string `json:"allowTools,omitempty"` + Name string `json:"name,omitempty"` + Config map[string]interface{} `json:"config,omitempty"` } type McpTool struct { @@ -144,13 +141,13 @@ type InputSchema struct { } type ToolsMeta struct { - InvokeContext map[string]string `json:"InvokeContext,omitempty"` - Enabled bool `json:"Enabled,omitempty"` - Templates map[string]interface{} `json:"Templates,omitempty"` + InvokeContext map[string]string `json:"invokeContext,omitempty"` + Enabled bool `json:"enabled,omitempty"` + Templates map[string]interface{} `json:"templates,omitempty"` } type JsonGoTemplate struct { - RequestTemplate RequestTemplate `json:"requestTemplate"` - ResponseTemplate ResponseTemplate `json:"responseTemplate"` + RequestTemplate RequestTemplate `json:"requestTemplate,omitempty"` + ResponseTemplate ResponseTemplate `json:"responseTemplate,omitempty"` ArgsPosition map[string]string `json:"argsPosition,omitempty"` } diff --git a/registry/nacos/mcpserver/client.go b/registry/nacos/mcpserver/client.go new file mode 100644 index 000000000..33c20574c --- /dev/null +++ b/registry/nacos/mcpserver/client.go @@ -0,0 +1,546 @@ +// Copyright (c) 2022 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mcpserver + +import ( + "encoding/json" + "fmt" + "regexp" + "strings" + + "github.com/nacos-group/nacos-sdk-go/v2/clients" + "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client" + "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client" + "github.com/nacos-group/nacos-sdk-go/v2/common/constant" + "github.com/nacos-group/nacos-sdk-go/v2/model" + "github.com/nacos-group/nacos-sdk-go/v2/vo" +) + +const McpServerVersionGroup = "mcp-server-versions" +const McpServerSpecGroup = "mcp-server" +const McpToolSpecGroup = "mcp-tools" +const SystemConfigIdPrefix = "system-" +const CredentialPrefix = "credentials-" +const DefaultNacosListConfigMode = "blur" + +const ListMcpServerConfigIdPattern = "*mcp-versions.json" + +const DefaultNacosListConfigPageSize = 50 + +type ServerSpecInfo struct { + RemoteServerConfig *RemoteServerConfig `json:"remoteServerConfig"` +} + +type RemoteServerConfig struct { + ServiceRef *ServiceRef `json:"serviceRef"` +} + +type ServiceRef struct { + ServiceName string `json:"serviceName"` + GroupName string `json:"groupName"` + NamespaceId string `json:"namespaceId"` +} + +type NacosRegistryClient struct { + namespaceId string + configClient config_client.IConfigClient + namingClient naming_client.INamingClient + servers map[string]*ServerContext +} + +type VersionedMcpServerInfo struct { + serverInfo *BasicMcpServerInfo + version string +} + +type ServerContext struct { + id string + versionedMcpServerInfo *VersionedMcpServerInfo + serverChangeListener McpServerListener + configsMap map[string]*ConfigListenerWrap + serviceInfo *model.Service + namingCallback func(services []model.Instance, err error) +} + +type McpServerConfig struct { + ServerSpecConfig string + ToolsSpecConfig string + ServiceInfo *model.Service + Credentials map[string]interface{} +} + +type ConfigListenerWrap struct { + dataId string + group string + data string + listener func(namespace, group, dataId, data string) +} + +type BasicMcpServerInfo struct { + Name string `json:"name"` + Id string `json:"id"` + FrontProtocol string `json:"frontProtocol"` + Protocol string `json:"protocol"` +} + +type VersionsMcpServerInfo struct { + BasicMcpServerInfo + LatestPublishedVersion string `json:"latestPublishedVersion"` + Versions []*VersionDetail `json:"versionDetails"` +} + +type VersionDetail struct { + Version string `json:"version"` + IsLatest bool `json:"is_latest"` +} + +type McpServerListener func(info *McpServerConfig) + +func NewMcpRegistryClient(clientConfig *constant.ClientConfig, serverConfig []constant.ServerConfig, namespaceId string) (*NacosRegistryClient, error) { + clientParam := vo.NacosClientParam{ + ClientConfig: clientConfig, + ServerConfigs: serverConfig, + } + configClient, err := clients.NewConfigClient(clientParam) + namingClient, err := clients.NewNamingClient(clientParam) + + if err != nil { + return nil, err + } + + return &NacosRegistryClient{ + namespaceId: namespaceId, + configClient: configClient, + namingClient: namingClient, + servers: map[string]*ServerContext{}, + }, nil +} + +func (n *NacosRegistryClient) listMcpServerConfigs() ([]model.ConfigItem, error) { + currentPageNum := 1 + result := make([]model.ConfigItem, 0) + for { + configPage, err := n.configClient.SearchConfig(vo.SearchConfigParam{ + Search: DefaultNacosListConfigMode, + DataId: ListMcpServerConfigIdPattern, + Group: McpServerVersionGroup, + PageNo: currentPageNum, + PageSize: DefaultNacosListConfigPageSize, + }) + + if err != nil { + mcpServerLog.Errorf("List mcp server configs for page size %d, page number %d error %v", currentPageNum, DefaultNacosListConfigPageSize) + } + + if configPage == nil { + mcpServerLog.Errorf("List mcp server configs for page size %d, page number %d null %v", currentPageNum, DefaultNacosListConfigPageSize) + continue + } + + result = append(result, configPage.PageItems...) + + if configPage.PageNumber >= configPage.PagesAvailable { + break + } + + currentPageNum += 1 + } + return result, nil +} + +// ListMcpServer List all mcp server from nacos mcp registry /** +func (n *NacosRegistryClient) ListMcpServer() ([]BasicMcpServerInfo, error) { + configs, err := n.listMcpServerConfigs() + + if err != nil { + return nil, err + } + + var result []BasicMcpServerInfo + for _, config := range configs { + mcpServerBasicConfig, err := n.configClient.GetConfig(vo.ConfigParam{ + Group: McpServerVersionGroup, + DataId: config.DataId, + }) + + if err != nil { + mcpServerLog.Errorf("Get mcp server version config (dataId: %s) error, %v", config.DataId, err) + continue + } + + if mcpServerBasicConfig == "" { + mcpServerLog.Infof("get empty mcp server version config (dataId: %s)", config.DataId) + continue + } + + mcpServer := BasicMcpServerInfo{} + err = json.Unmarshal([]byte(mcpServerBasicConfig), &mcpServer) + if err != nil { + mcpServerLog.Errorf("Parse mcp server version config error %v", err) + continue + } + + if !isMcpServerShouldBeDiscoveryForGateway(mcpServer) { + mcpServerLog.Infof("mcp server %s don't need to be discovered for gateway, skip it", mcpServerBasicConfig) + continue + } + + result = append(result, mcpServer) + } + return result, nil +} + +func isMcpServerShouldBeDiscoveryForGateway(info BasicMcpServerInfo) bool { + return "mcp-sse" == info.FrontProtocol || "mcp-streamable" == info.FrontProtocol +} + +// ListenToMcpServer Listen to mcp server config and backend service +func (n *NacosRegistryClient) ListenToMcpServer(id string, listener McpServerListener) error { + versionConfigId := fmt.Sprintf("%s-mcp-versions.json", id) + serverVersionConfig, err := n.configClient.GetConfig(vo.ConfigParam{ + Group: McpServerVersionGroup, + DataId: versionConfigId, + }) + if err != nil { + mcpServerLog.Errorf("Get mcp server(id: %s) version config error, %v", id, err) + } else { + mcpServerLog.Infof("Get mcp server(id: %s) version config success, config is:\n %v", id, serverVersionConfig) + } + + versionConfigCallBack := func(namespace string, group string, dataId string, content string) { + mcpServerLog.Infof("Call back to mcp server %s", id) + info := VersionsMcpServerInfo{} + err = json.Unmarshal([]byte(content), &info) + if err != nil { + mcpServerLog.Errorf("Parse mcp server (id: %s) version config callback error, %v", id, err) + return + } + latestVersion := info.LatestPublishedVersion + + ctx := n.servers[id] + if ctx.versionedMcpServerInfo == nil { + ctx.versionedMcpServerInfo = &VersionedMcpServerInfo{} + } + ctx.versionedMcpServerInfo.serverInfo = &info.BasicMcpServerInfo + + // first time the version is empty so it will trigger the change finally. + if ctx.versionedMcpServerInfo.version != latestVersion { + ctx.versionedMcpServerInfo.version = latestVersion + n.onServerVersionChanged(ctx) + n.triggerMcpServerChange(id) + } + } + + n.servers[id] = &ServerContext{ + id: id, + serverChangeListener: listener, + configsMap: map[string]*ConfigListenerWrap{ + McpServerVersionGroup: { + dataId: versionConfigId, + group: McpServerVersionGroup, + listener: versionConfigCallBack, + }, + }, + } + + // trigger callback manually + versionConfigCallBack(n.namespaceId, McpServerVersionGroup, versionConfigId, serverVersionConfig) + // Listen after get config to avoid multi-callback on same version + err = n.configClient.ListenConfig(vo.ConfigParam{ + Group: McpServerVersionGroup, + DataId: versionConfigId, + OnChange: versionConfigCallBack, + }) + + if err != nil { + return err + } + + return nil +} + +func (n *NacosRegistryClient) onServerVersionChanged(ctx *ServerContext) { + id := ctx.versionedMcpServerInfo.serverInfo.Id + version := ctx.versionedMcpServerInfo.version + configsMap := map[string]string{ + McpServerSpecGroup: fmt.Sprintf("%s-%s-mcp-server.json", id, version), + McpToolSpecGroup: fmt.Sprintf("%s-%s-mcp-tools.json", id, version), + } + + for group, dataId := range configsMap { + configsKey := fmt.Sprintf(SystemConfigIdPrefix+"%s@@%s", id, group) + // Only listen to the last version of the server, so we should exist and cancel it first + if data, exist := ctx.configsMap[configsKey]; exist { + err := n.cancelListenToConfig(data) + if err != nil { + mcpServerLog.Errorf("cancel listen to old config %v error %v", dataId, err) + } + } + + configListenerWrap, err := n.ListenToConfig(ctx, dataId, group) + if err != nil { + mcpServerLog.Errorf("listen to config %v error %v", dataId, err) + continue + } + ctx.configsMap[configsKey] = configListenerWrap + } +} + +func (n *NacosRegistryClient) triggerMcpServerChange(id string) { + if context, exist := n.servers[id]; exist { + if config := mapConfigMapToServerConfig(context); config != nil { + context.serverChangeListener(config) + } + } +} + +func mapConfigMapToServerConfig(ctx *ServerContext) *McpServerConfig { + result := &McpServerConfig{ + Credentials: map[string]interface{}{}, + } + configMaps := ctx.configsMap + for key, data := range configMaps { + if strings.HasPrefix(key, SystemConfigIdPrefix) { + group := strings.Split(key, "@@")[1] + if group == McpServerSpecGroup { + result.ServerSpecConfig = data.data + } else if group == McpToolSpecGroup { + result.ToolsSpecConfig = data.data + } + } else if strings.HasPrefix(key, CredentialPrefix) { + credentialId := strings.ReplaceAll(key, CredentialPrefix, "") + var credData interface{} + if err := json.Unmarshal([]byte(data.data), &credData); err != nil { + mcpServerLog.Errorf("parse credential %v error %v", credentialId, err) + // keep origin data if data is not an object + result.Credentials[credentialId] = data.data + } else { + result.Credentials[credentialId] = credData + } + } + } + + result.ServiceInfo = ctx.serviceInfo + return result +} + +func (n *NacosRegistryClient) replaceTemplateAndExactConfigsItems(ctx *ServerContext, config *ConfigListenerWrap) map[string]*ConfigListenerWrap { + result := map[string]*ConfigListenerWrap{} + compile := regexp.MustCompile("\\$\\{nacos\\.([a-zA-Z0-9-_:\\\\.]+/[a-zA-Z0-9-_:\\\\.]+)}") + allConfigs := compile.FindAllString(config.data, -1) + allConfigsMap := map[string]string{} + for _, config := range allConfigs { + allConfigsMap[config] = config + } + + newContent := config.data + for _, data := range allConfigsMap { + dataIdAndGroup := strings.ReplaceAll(data, "${nacos.", "") + dataIdAndGroup = dataIdAndGroup[0 : len(dataIdAndGroup)-1] + dataIdAndGroupArray := strings.Split(dataIdAndGroup, "/") + dataId := strings.TrimSpace(dataIdAndGroupArray[0]) + group := strings.TrimSpace(dataIdAndGroupArray[1]) + configWrap, err := n.ListenToConfig(ctx, dataId, group) + if err != nil { + mcpServerLog.Errorf("extract configs %v from content error %v", dataId, err) + continue + } + result[CredentialPrefix+configWrap.group+"_"+configWrap.dataId] = configWrap + newContent = strings.Replace(newContent, data, ".config.credentials."+group+"_"+dataId, -1) + } + + config.data = newContent + return result +} + +func (n *NacosRegistryClient) resetNacosTemplateConfigs(ctx *ServerContext, config *ConfigListenerWrap) { + newCredentials := n.replaceTemplateAndExactConfigsItems(ctx, config) + + // cancel all old config listener + for key, wrap := range ctx.configsMap { + if strings.HasPrefix(key, CredentialPrefix) { + if _, ok := newCredentials[key]; !ok { + err := n.cancelListenToConfig(wrap) + if err != nil { + mcpServerLog.Errorf("cancel listen to old credential listener error %v", err) + continue + } + } + } + } + + for _, data := range newCredentials { + ctx.configsMap[CredentialPrefix+data.group+"_"+data.dataId] = data + } +} + +func (n *NacosRegistryClient) refreshServiceListenerIfNeeded(ctx *ServerContext, serverConfig string) { + var serverInfo ServerSpecInfo + err := json.Unmarshal([]byte(serverConfig), &serverInfo) + if err != nil { + mcpServerLog.Errorf("parse server config error %v", err) + return + } + + if serverInfo.RemoteServerConfig != nil && serverInfo.RemoteServerConfig.ServiceRef != nil { + ref := serverInfo.RemoteServerConfig.ServiceRef + if ctx.serviceInfo != nil { + if ctx.serviceInfo.Name == ref.ServiceName && ctx.serviceInfo.GroupName == ref.GroupName { + return + } + + err := n.namingClient.Unsubscribe(&vo.SubscribeParam{ + GroupName: ctx.serviceInfo.GroupName, + ServiceName: ctx.serviceInfo.Name, + SubscribeCallback: ctx.namingCallback, + }) + if err != nil { + mcpServerLog.Errorf("unsubscribe service error:%v, groupName:%s, serviceName:%s", err, ctx.serviceInfo.GroupName, ctx.serviceInfo.Name) + } + } + + service, err := n.namingClient.GetService(vo.GetServiceParam{ + GroupName: ref.GroupName, + ServiceName: ref.ServiceName, + }) + + if err != nil { + mcpServerLog.Errorf("get service error:%v, groupName:%s, serviceName:%s", err, ref.GroupName, ref.ServiceName) + return + } + + ctx.serviceInfo = &service + + if ctx.namingCallback == nil { + ctx.namingCallback = func(services []model.Instance, err error) { + if ctx.serviceInfo == nil { + ctx.serviceInfo = &model.Service{ + GroupName: ctx.serviceInfo.GroupName, + Name: ctx.serviceInfo.Name, + } + } + + ctx.serviceInfo.Name = ref.ServiceName + ctx.serviceInfo.GroupName = ref.GroupName + ctx.serviceInfo.Hosts = services + n.triggerMcpServerChange(ctx.id) + } + } + + err = n.namingClient.Subscribe(&vo.SubscribeParam{ + GroupName: ctx.serviceInfo.GroupName, + ServiceName: ctx.serviceInfo.Name, + SubscribeCallback: ctx.namingCallback, + }) + if err != nil { + mcpServerLog.Errorf("subscribe service error:%v, groupName:%s, serviceName:%s", err, ctx.serviceInfo.GroupName, ctx.serviceInfo.Name) + } + } +} + +func (n *NacosRegistryClient) ListenToConfig(ctx *ServerContext, dataId string, group string) (*ConfigListenerWrap, error) { + wrap := ConfigListenerWrap{ + dataId: dataId, + group: group, + } + + configListener := func(namespace, group, dataId, data string) { + if group == McpToolSpecGroup { + n.resetNacosTemplateConfigs(ctx, &wrap) + } else if group == McpServerSpecGroup { + n.refreshServiceListenerIfNeeded(ctx, data) + } + + if ctx.serverChangeListener != nil && wrap.data != data { + wrap.data = data + n.triggerMcpServerChange(ctx.versionedMcpServerInfo.serverInfo.Id) + } + } + + config, err := n.configClient.GetConfig(vo.ConfigParam{ + DataId: dataId, + Group: group, + }) + + if err != nil { + return nil, err + } + + wrap.listener = configListener + wrap.data = config + if group == McpToolSpecGroup { + n.resetNacosTemplateConfigs(ctx, &wrap) + } else if group == McpServerSpecGroup { + n.refreshServiceListenerIfNeeded(ctx, wrap.data) + } + + err = n.configClient.ListenConfig(vo.ConfigParam{ + DataId: dataId, + Group: group, + OnChange: configListener, + }) + if err != nil { + return nil, err + } + + return &wrap, nil +} + +func (n *NacosRegistryClient) cancelListenToConfig(wrap *ConfigListenerWrap) error { + return n.configClient.CancelListenConfig(vo.ConfigParam{ + DataId: wrap.dataId, + Group: wrap.group, + OnChange: wrap.listener, + }) +} + +func (n *NacosRegistryClient) CancelListenToServer(id string) error { + if server, exist := n.servers[id]; exist && server != nil { + defer delete(n.servers, id) + + for _, wrap := range server.configsMap { + if wrap != nil { + err := n.configClient.CancelListenConfig(vo.ConfigParam{ + DataId: wrap.dataId, + Group: wrap.group, + OnChange: wrap.listener, + }) + + if err != nil { + mcpServerLog.Errorf("cancel listen config error:%v, dataId:%s, group:%s", err, wrap.dataId, wrap.group) + continue + } + } + } + + if server.serviceInfo != nil { + err := n.namingClient.Unsubscribe(&vo.SubscribeParam{ + GroupName: server.serviceInfo.GroupName, + ServiceName: server.serviceInfo.Name, + SubscribeCallback: server.namingCallback, + }) + if err != nil { + mcpServerLog.Errorf("unsubscribe service error:%v, groupName:%s, serviceName:%s", err, server.serviceInfo.GroupName, server.serviceInfo.Name) + return err + } + } + } + return nil +} + +func (n *NacosRegistryClient) CloseClient() { + n.namingClient.CloseClient() + n.configClient.CloseClient() +} diff --git a/registry/nacos/mcpserver/util.go b/registry/nacos/mcpserver/util.go deleted file mode 100644 index fd5496a5d..000000000 --- a/registry/nacos/mcpserver/util.go +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright (c) 2022 Alibaba Group Holding Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package mcpserver - -import ( - "fmt" - - "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client" - "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client" - "github.com/nacos-group/nacos-sdk-go/v2/model" - "github.com/nacos-group/nacos-sdk-go/v2/vo" -) - -type MultiConfigListener struct { - configClient config_client.IConfigClient - onChange func(map[string]string) - configCache map[string]string - innerCallback func(string, string, string, string) -} - -func NewMultiConfigListener(configClient config_client.IConfigClient, onChange func(map[string]string)) *MultiConfigListener { - result := &MultiConfigListener{ - configClient: configClient, - configCache: make(map[string]string), - onChange: onChange, - } - - result.innerCallback = func(namespace string, group string, dataId string, content string) { - result.configCache[group+DefaultJoiner+dataId] = content - result.onChange(result.configCache) - } - - return result -} - -func (l *MultiConfigListener) StartListen(configs []vo.ConfigParam) error { - for _, config := range configs { - content, err := l.configClient.GetConfig(vo.ConfigParam{ - DataId: config.DataId, - Group: config.Group, - }) - - if err != nil { - return fmt.Errorf("get config %s/%s err: %v", config.Group, config.DataId, err) - } - l.configCache[config.Group+DefaultJoiner+config.DataId] = content - err = l.configClient.ListenConfig(vo.ConfigParam{ - DataId: config.DataId, - Group: config.Group, - OnChange: l.innerCallback, - }) - - if err != nil { - return fmt.Errorf("listener to config %s/%s error: %w", config.Group, config.DataId, err) - } - } - - l.onChange(l.configCache) - return nil -} - -func (l *MultiConfigListener) Stop() { - l.configClient.CloseClient() -} - -func (l *MultiConfigListener) CancelListen(configs []vo.ConfigParam) error { - for _, config := range configs { - if _, ok := l.configCache[config.Group+DefaultJoiner+config.DataId]; ok { - err := l.configClient.CancelListenConfig(vo.ConfigParam{ - DataId: config.DataId, - Group: config.Group, - }) - - if err != nil { - return fmt.Errorf("cancel config %s/%s error: %w", config.Group, config.DataId, err) - } - delete(l.configCache, config.Group+config.DataId) - } - } - return nil -} - -type ServiceCache struct { - services map[string]*NacosServiceRef - client naming_client.INamingClient -} - -type NacosServiceRef struct { - refs map[string]func([]model.Instance) - callback func(services []model.Instance, err error) - instances *[]model.Instance -} - -func NewServiceCache(client naming_client.INamingClient) *ServiceCache { - return &ServiceCache{ - client: client, - services: make(map[string]*NacosServiceRef), - } -} - -func (c *ServiceCache) AddListener(group string, serviceName string, key string, callback func([]model.Instance)) error { - uniqueServiceName := c.makeServiceUniqueName(group, serviceName) - if _, ok := c.services[uniqueServiceName]; !ok { - instances, err := c.client.SelectAllInstances(vo.SelectAllInstancesParam{ - GroupName: group, - ServiceName: serviceName, - }) - - if err != nil { - return err - } - - ref := &NacosServiceRef{ - refs: map[string]func([]model.Instance){}, - instances: &instances, - } - - ref.callback = func(services []model.Instance, err error) { - ref.instances = &services - for _, refCallback := range ref.refs { - refCallback(*ref.instances) - } - } - - c.services[uniqueServiceName] = ref - - err = c.client.Subscribe(&vo.SubscribeParam{ - GroupName: group, - ServiceName: serviceName, - SubscribeCallback: ref.callback, - }) - if err != nil { - return err - } - } - - ref := c.services[uniqueServiceName] - ref.refs[key] = callback - callback(*ref.instances) - return nil -} - -func (c *ServiceCache) RemoveListener(group string, serviceName string, key string) error { - if ref, ok := c.services[c.makeServiceUniqueName(group, serviceName)]; ok { - delete(ref.refs, key) - if len(ref.refs) == 0 { - err := c.client.Unsubscribe(&vo.SubscribeParam{ - GroupName: group, - ServiceName: serviceName, - SubscribeCallback: ref.callback, - }) - - delete(c.services, c.makeServiceUniqueName(group, serviceName)) - if err != nil { - return err - } - } - } - return nil -} - -func (c *ServiceCache) makeServiceUniqueName(group string, serviceName string) string { - return fmt.Sprintf("%s-%s", group, serviceName) -} - -func (c *ServiceCache) Stop() { - c.client.CloseClient() -} diff --git a/registry/nacos/mcpserver/watcher.go b/registry/nacos/mcpserver/watcher.go index 92e6d1084..13fe3ef80 100644 --- a/registry/nacos/mcpserver/watcher.go +++ b/registry/nacos/mcpserver/watcher.go @@ -31,11 +31,8 @@ import ( provider "github.com/alibaba/higress/registry" "github.com/alibaba/higress/registry/memory" "github.com/golang/protobuf/ptypes/wrappers" - "github.com/nacos-group/nacos-sdk-go/v2/clients" - "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client" "github.com/nacos-group/nacos-sdk-go/v2/common/constant" "github.com/nacos-group/nacos-sdk-go/v2/model" - "github.com/nacos-group/nacos-sdk-go/v2/vo" "go.uber.org/atomic" "istio.io/api/networking/v1alpha3" "istio.io/istio/pkg/config" @@ -55,18 +52,18 @@ const ( DefaultNacosLogMaxAge = 3 DefaultRefreshInterval = time.Second * 30 DefaultRefreshIntervalLimit = time.Second * 10 - DefaultFetchPageSize = 50 - DefaultJoiner = "@@" ) var ( supportedProtocols = map[string]bool{ provider.HttpProtocol: true, + provider.HttpsProtocol: true, provider.McpSSEProtocol: true, provider.McpStreambleProtocol: true, } protocolUpstreamTypeMapping = map[string]string{ provider.HttpProtocol: mcpserver.UpstreamTypeRest, + provider.HttpsProtocol: mcpserver.UpstreamTypeRest, provider.McpSSEProtocol: mcpserver.UpstreamTypeSSE, provider.McpStreambleProtocol: mcpserver.UpstreamTypeStreamable, } @@ -84,46 +81,30 @@ var mcpServerLog = log.RegisterScope("McpServer", "Nacos Mcp Server Watcher proc type watcher struct { provider.BaseWatcher apiv1.RegistryConfig - watchingConfig map[string]bool - watchingConfigRefs map[string]sets.Set[string] - configToConfigListener map[string]*MultiConfigListener - serviceCache map[string]*ServiceCache - configToService map[string]string - credentialKeyToName map[string]map[string]string - RegistryType provider.ServiceRegistryType - Status provider.WatcherStatus - configClient config_client.IConfigClient - serverConfig []constant.ServerConfig - cache memory.Cache - mutex *sync.Mutex - subMutex *sync.Mutex - callbackMutex *sync.Mutex - stop chan struct{} - isStop bool - updateCacheWhenEmpty bool - nacosClientConfig *constant.ClientConfig - namespace string - clusterId string - authOption provider.AuthOption + watchingConfig map[string]bool + RegistryType provider.ServiceRegistryType + Status provider.WatcherStatus + registryClient *NacosRegistryClient + cache memory.Cache + mutex *sync.Mutex + stop chan struct{} + isStop bool + updateCacheWhenEmpty bool + namespace string + clusterId string + authOption provider.AuthOption } type WatcherOption func(w *watcher) func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) { w := &watcher{ - watchingConfig: make(map[string]bool), - configToService: make(map[string]string), - watchingConfigRefs: make(map[string]sets.Set[string]), - configToConfigListener: make(map[string]*MultiConfigListener), - credentialKeyToName: make(map[string]map[string]string), - serviceCache: map[string]*ServiceCache{}, - RegistryType: "nacos3", - Status: provider.UnHealthy, - cache: cache, - mutex: &sync.Mutex{}, - subMutex: &sync.Mutex{}, - callbackMutex: &sync.Mutex{}, - stop: make(chan struct{}), + watchingConfig: make(map[string]bool), + RegistryType: "nacos3", + Status: provider.UnHealthy, + cache: cache, + mutex: &sync.Mutex{}, + stop: make(chan struct{}), } w.NacosRefreshInterval = int64(DefaultRefreshInterval) @@ -132,14 +113,14 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er opt(w) } - // The nacos mcp server uses these restricted namespaces and groups, and may be adjusted in the future. - w.NacosNamespace = "nacos-default-mcp" - w.NacosNamespaceId = w.NacosNamespace + if w.NacosNamespace == "" { + w.NacosNamespace = w.NacosNamespaceId + } w.NacosGroups = []string{"mcp-server"} mcpServerLog.Infof("new nacos mcp server watcher with config Name:%s", w.Name) - w.nacosClientConfig = constant.NewClientConfig( + clientConfig := constant.NewClientConfig( constant.WithTimeoutMs(DefaultNacosTimeout), constant.WithLogLevel(DefaultNacosLogLevel), constant.WithLogDir(DefaultNacosLogDir), @@ -157,21 +138,18 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er ) initTimer := time.NewTimer(DefaultInitTimeout) - w.serverConfig = []constant.ServerConfig{ + serverConfig := []constant.ServerConfig{ *constant.NewServerConfig(w.Domain, uint64(w.Port)), } success := make(chan struct{}) go func() { - configClient, err := clients.NewConfigClient(vo.NacosClientParam{ - ClientConfig: w.nacosClientConfig, - ServerConfigs: w.serverConfig, - }) + client, err := NewMcpRegistryClient(clientConfig, serverConfig, w.NacosNamespaceId) if err == nil { - w.configClient = configClient + w.registryClient = client close(success) } else { - mcpServerLog.Errorf("can not create naming client, err:%v", err) + mcpServerLog.Errorf("can not create registry client, err:%v", err) } }() @@ -183,6 +161,28 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er } } +func WithNacosNamespaceId(nacosNamespaceId string) WatcherOption { + return func(w *watcher) { + if nacosNamespaceId == "" { + w.NacosNamespaceId = "public" + } else { + w.NacosNamespaceId = nacosNamespaceId + } + } +} + +func WithNacosNamespace(nacosNamespace string) WatcherOption { + return func(w *watcher) { + w.NacosNamespace = nacosNamespace + } +} + +func WithNacosGroups(nacosGroups []string) WatcherOption { + return func(w *watcher) { + w.NacosGroups = nacosGroups + } +} + func WithNacosAddressServer(nacosAddressServer string) WatcherOption { return func(w *watcher) { w.NacosAddressServer = nacosAddressServer @@ -302,447 +302,203 @@ func (w *watcher) fetchAllMcpConfig() error { if w.isStop { return nil } - fetchedConfigs := make(map[string]bool) - var tries int - isV3 := true - if w.EnableMCPServer != nil { - isV3 = w.EnableMCPServer.GetValue() + + mcpConfigs, err := w.registryClient.ListMcpServer() + if err != nil { + return fmt.Errorf("list mcp server failed ,error %s", err.Error()) } - for _, groupName := range w.NacosGroups { - for page := 1; ; page++ { - ss, err := w.configClient.SearchConfig(vo.SearchConfigParam{ - Group: groupName, - Search: "blur", - PageNo: page, - PageSize: DefaultFetchPageSize, - IsV3: isV3, - }) - if err != nil { - if tries > 10 { - return err - } - mcpServerLog.Errorf("fetch nacos config list failed, err:%v, pageNo:%d", err, page) - page-- - tries++ - continue - } - for _, item := range ss.PageItems { - fetchedConfigs[groupName+DefaultJoiner+item.DataId] = true - } - if len(ss.PageItems) < DefaultFetchPageSize { - break - } - } + + fetchedConfigs := map[string]bool{} + for _, c := range mcpConfigs { + fetchedConfigs[c.Id] = true } for key := range w.watchingConfig { if _, exist := fetchedConfigs[key]; !exist { - s := strings.Split(key, DefaultJoiner) - err := w.unsubscribe(s[0], s[1]) - if err != nil { - return err + if err = w.registryClient.CancelListenToServer(key); err != nil { + return fmt.Errorf("cancel listen mcp server config %s failed, error %s", key, err.Error()) } + mcpServerLog.Infof("cancel listen mcp server config %s success", key) delete(w.watchingConfig, key) + // clean cache for this config + w.cache.UpdateConfigCache(config.GroupVersionKind{}, key, nil, true) + w.UpdateService() } } - wg := sync.WaitGroup{} subscribeFailed := atomic.NewBool(false) - watchingKeys := make(chan string, len(fetchedConfigs)) for key := range fetchedConfigs { - s := strings.Split(key, DefaultJoiner) if _, exist := w.watchingConfig[key]; !exist { - wg.Add(1) - go func(k string) { - err := w.subscribe(s[0], s[1]) - if err != nil { - subscribeFailed.Store(true) - mcpServerLog.Errorf("subscribe failed, group: %v, service: %v, errors: %v", s[0], s[1], err) - } else { - watchingKeys <- k - } - wg.Done() - }(key) + err = w.registryClient.ListenToMcpServer(key, w.mcpServerListener(key)) + if err != nil { + mcpServerLog.Errorf("subscribe mcp server failed, dataId %v, errors: %v", key, err) + subscribeFailed.Store(true) + } else { + mcpServerLog.Infof("subscribe mcp server success, dataId:%s", key) + w.watchingConfig[key] = true + } } } - wg.Wait() - close(watchingKeys) - for key := range watchingKeys { - w.watchingConfig[key] = true - } + if subscribeFailed.Load() { return errors.New("subscribe services failed") } return nil } -func (w *watcher) unsubscribe(groupName string, dataId string) error { - mcpServerLog.Infof("unsubscribe mcp server, groupName:%s, dataId:%s", groupName, dataId) - defer w.UpdateService() - - err := w.configClient.CancelListenConfig(vo.ConfigParam{ - DataId: dataId, - Group: groupName, - }) - if err != nil { - mcpServerLog.Errorf("unsubscribe mcp server error:%v, groupName:%s, dataId:%s", err, groupName, dataId) - return err - } - key := strings.Join([]string{w.Name, w.NacosNamespace, groupName, dataId}, DefaultJoiner) - w.configToConfigListener[key].Stop() - delete(w.watchingConfigRefs, key) - delete(w.configToConfigListener, key) - // remove service for this config - configKey := strings.Join([]string{groupName, dataId}, DefaultJoiner) - svcInfo := w.configToService[configKey] - split := strings.Split(svcInfo, DefaultJoiner) - svcNamespace := split[0] - svcGroup := split[1] - svcName := split[2] - if w.serviceCache[svcNamespace] != nil { - err = w.serviceCache[svcNamespace].RemoveListener(svcGroup, svcName, configKey) - if err != nil { - mcpServerLog.Errorf("remove service listener error:%v, groupName:%s, dataId:%s", err, groupName, dataId) - } - } - delete(w.configToService, configKey) - - w.cache.UpdateConfigCache(config.GroupVersionKind{}, key, nil, true) - return nil -} - -func (w *watcher) subscribe(groupName string, dataId string) error { - mcpServerLog.Infof("subscribe mcp server, groupName:%s, dataId:%s", groupName, dataId) - // first we get this config and callback manually - content, err := w.configClient.GetConfig(vo.ConfigParam{ - DataId: dataId, - Group: groupName, - }) - if err != nil { - mcpServerLog.Errorf("get config %s/%s err: %v", groupName, dataId, err) - } else { - w.getConfigCallback(w.NacosNamespace, groupName, dataId, content) - } - // second, we set callback for this config - err = w.configClient.ListenConfig(vo.ConfigParam{ - DataId: dataId, - Group: groupName, - OnChange: w.getConfigCallback, - }) - if err != nil { - mcpServerLog.Errorf("subscribe mcp server error:%v, groupName:%s, dataId:%s", err, groupName, dataId) - return err - } - return nil -} - -func (w *watcher) getConfigCallback(namespace, group, dataId, data string) { - mcpServerLog.Infof("get config callback, namespace:%s, groupName:%s, dataId:%s", namespace, group, dataId) - - if data == "" { - return - } - - key := strings.Join([]string{w.Name, w.NacosNamespace, group, dataId}, DefaultJoiner) - routeName := fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, group, strings.TrimSuffix(dataId, ".json")) - - mcpServer := &provider.McpServer{} - if err := json.Unmarshal([]byte(data), mcpServer); err != nil { - mcpServerLog.Errorf("Unmarshal config data to mcp server error:%v, namespace:%s, groupName:%s, dataId:%s", err, namespace, group, dataId) - return - } - if !supportedProtocols[mcpServer.Protocol] { - return - } - // process mcp service - w.subMutex.Lock() - defer w.subMutex.Unlock() - if err := w.buildServiceEntryForMcpServer(mcpServer, group, dataId); err != nil { - mcpServerLog.Errorf("build service entry for mcp server failed, namespace %v, group: %v, dataId %v, errors: %v", namespace, group, dataId, err) - } - // process mcp wasm - // only generate wasm plugin for http protocol mcp server - if mcpServer.Protocol != provider.HttpProtocol { - return - } - if _, exist := w.configToConfigListener[key]; !exist { - w.configToConfigListener[key] = NewMultiConfigListener(w.configClient, w.multiCallback(mcpServer, routeName, key)) - } - if _, exist := w.watchingConfigRefs[key]; !exist { - w.watchingConfigRefs[key] = sets.New[string]() - } - listener := w.configToConfigListener[key] - - curRef := sets.Set[string]{} - // add description ref - curRef.Insert(strings.Join([]string{provider.DefaultMcpToolsGroup, mcpServer.ToolsDescriptionRef}, DefaultJoiner)) - // add credential ref - credentialNameMap := map[string]string{} - for name, ref := range mcpServer.Credentials { - credKey := strings.Join([]string{provider.DefaultMcpCredentialsGroup, ref.Ref}, DefaultJoiner) - curRef.Insert(credKey) - credentialNameMap[credKey] = name - } - w.callbackMutex.Lock() - w.credentialKeyToName[key] = credentialNameMap - w.callbackMutex.Unlock() - - toBeAdd := curRef.Difference(w.watchingConfigRefs[key]) - toBeDelete := w.watchingConfigRefs[key].Difference(curRef) - - var toBeListen, toBeUnListen []vo.ConfigParam - for item, _ := range toBeAdd { - split := strings.Split(item, DefaultJoiner) - toBeListen = append(toBeListen, vo.ConfigParam{ - Group: split[0], - DataId: split[1], - }) - } - for item, _ := range toBeDelete { - split := strings.Split(item, DefaultJoiner) - toBeUnListen = append(toBeUnListen, vo.ConfigParam{ - Group: split[0], - DataId: split[1], - }) - } - - // listen description and credential config - if len(toBeListen) > 0 { - if err := listener.StartListen(toBeListen); err != nil { - mcpServerLog.Errorf("listen config ref failed, group: %v, dataId %v, errors: %v", group, dataId, err) - } - } - // cancel listen description and credential config - if len(toBeUnListen) > 0 { - if err := listener.CancelListen(toBeUnListen); err != nil { - mcpServerLog.Errorf("cancel listen config ref failed, group: %v, dataId %v, errors: %v", group, dataId, err) - } - } -} - -func (w *watcher) multiCallback(server *provider.McpServer, routeName, configKey string) func(map[string]string) { - callback := func(configs map[string]string) { +func (w *watcher) mcpServerListener(dataId string) func(info *McpServerConfig) { + return func(info *McpServerConfig) { defer w.UpdateService() - mcpServerLog.Infof("callback, ref config changed: %s", configKey) - rule := &provider.McpServerRule{ - MatchRoute: []string{routeName}, - Server: &provider.ServerConfig{ - Name: server.Name, - Config: map[string]interface{}{}, - }, + mcpServerLog.Infof("mcp server config callback, dataId %s", dataId) + mcpServer := &provider.McpServer{} + if err := json.Unmarshal([]byte(info.ServerSpecConfig), mcpServer); err != nil { + mcpServerLog.Errorf("unmarshal config data to mcp server error:%v, dataId:%s", err, dataId) } - - // process mcp credential - credentialConfig := map[string]interface{}{} - for key, data := range configs { - if strings.HasPrefix(key, provider.DefaultMcpToolsGroup) { - // skip mcp tool description - continue - } - var cred interface{} - if err := json.Unmarshal([]byte(data), &cred); err != nil { - mcpServerLog.Errorf("unmarshal credential data %v to map error:%v", key, err) - } - w.callbackMutex.Lock() - name := w.credentialKeyToName[configKey][key] - w.callbackMutex.Unlock() - credentialConfig[name] = cred - } - rule.Server.Config["credentials"] = credentialConfig - // process mcp tool description - var allowTools []string - for key, toolData := range configs { - if strings.HasPrefix(key, provider.DefaultMcpCredentialsGroup) { - // skip mcp credentials - continue - } - toolsDescription := &provider.McpToolConfig{} - if err := json.Unmarshal([]byte(toolData), toolsDescription); err != nil { - mcpServerLog.Errorf("unmarshal toolsDescriptionRef to mcp tool config error:%v", err) - } - for _, t := range toolsDescription.Tools { - convertTool := &provider.McpTool{Name: t.Name, Description: t.Description} - - toolMeta := toolsDescription.ToolsMeta[t.Name] - if toolMeta != nil && toolMeta.Enabled { - allowTools = append(allowTools, t.Name) - } - argsPosition, err := getArgsPositionFromToolMeta(toolMeta) - if err != nil { - mcpServerLog.Errorf("get args position from tool meta error:%v, tool name %v", err, t.Name) - } - - requiredMap := sets.Set[string]{} - for _, s := range t.InputSchema.Required { - requiredMap.Insert(s) - } - - for argsName, args := range t.InputSchema.Properties { - convertArgs, err := parseMcpArgs(args) - if err != nil { - mcpServerLog.Errorf("parse mcp args error:%v, tool name %v, args name %v", err, t.Name, argsName) - continue - } - convertArgs.Name = argsName - convertArgs.Required = requiredMap.Contains(argsName) - if pos, exist := argsPosition[argsName]; exist { - convertArgs.Position = pos - } - convertTool.Args = append(convertTool.Args, convertArgs) - mcpServerLog.Debugf("parseMcpArgs, toolArgs:%v", convertArgs) - } - - requestTemplate, err := getRequestTemplateFromToolMeta(toolMeta) - if err != nil { - mcpServerLog.Errorf("get request template from tool meta error:%v, tool name %v", err, t.Name) - } else { - convertTool.RequestTemplate = requestTemplate - } - - responseTemplate, err := getResponseTemplateFromToolMeta(toolMeta) - if err != nil { - mcpServerLog.Errorf("get response template from tool meta error:%v, tool name %v", err, t.Name) - } else { - convertTool.ResponseTemplate = responseTemplate - } - rule.Tools = append(rule.Tools, convertTool) - } - } - - rule.Server.AllowTools = allowTools - wasmPluginConfig := &config.Config{ - Meta: config.Meta{ - GroupVersionKind: gvk.WasmPlugin, - Namespace: w.namespace, - }, - Spec: rule, - } - w.cache.UpdateConfigCache(gvk.WasmPlugin, configKey, wasmPluginConfig, false) - } - return callback -} - -func (w *watcher) buildServiceEntryForMcpServer(mcpServer *provider.McpServer, configGroup, dataId string) error { - if mcpServer == nil || mcpServer.RemoteServerConfig == nil || mcpServer.RemoteServerConfig.ServiceRef == nil { - return nil - } - mcpServerLog.Debugf("ServiceRef %v for %v", mcpServer.RemoteServerConfig.ServiceRef, dataId) - configKey := strings.Join([]string{configGroup, dataId}, DefaultJoiner) - - serviceGroup := mcpServer.RemoteServerConfig.ServiceRef.GroupName - serviceNamespace := mcpServer.RemoteServerConfig.ServiceRef.NamespaceId - serviceName := mcpServer.RemoteServerConfig.ServiceRef.ServiceName - if serviceNamespace == "" { - serviceNamespace = provider.DefaultNacosServiceNamespace - } - // update config to service and unsubscribe old service - curSvcKey := strings.Join([]string{serviceNamespace, serviceGroup, serviceName}, DefaultJoiner) - if svcKey, exist := w.configToService[configKey]; exist && svcKey != curSvcKey { - split := strings.Split(svcKey, DefaultJoiner) - if svcCache, has := w.serviceCache[split[0]]; has { - if err := svcCache.RemoveListener(split[1], split[2], configKey); err != nil { - mcpServerLog.Errorf("remove listener error:%v", err) - } - } - } - w.configToService[configKey] = curSvcKey - - if _, exist := w.serviceCache[serviceNamespace]; !exist { - namingConfig := constant.NewClientConfig( - constant.WithTimeoutMs(DefaultNacosTimeout), - constant.WithLogLevel(DefaultNacosLogLevel), - constant.WithLogDir(DefaultNacosLogDir), - constant.WithCacheDir(DefaultNacosCacheDir), - constant.WithNotLoadCacheAtStart(DefaultNacosNotLoadCache), - constant.WithLogRollingConfig(&constant.ClientLogRollingConfig{ - MaxAge: DefaultNacosLogMaxAge, - }), - constant.WithUpdateCacheWhenEmpty(w.updateCacheWhenEmpty), - constant.WithNamespaceId(serviceNamespace), - constant.WithAccessKey(w.NacosAccessKey), - constant.WithSecretKey(w.NacosSecretKey), - constant.WithUsername(w.authOption.NacosUsername), - constant.WithPassword(w.authOption.NacosPassword), - ) - client, err := clients.NewNamingClient(vo.NacosClientParam{ - ClientConfig: namingConfig, - ServerConfigs: w.serverConfig, - }) - if err == nil { - w.serviceCache[serviceNamespace] = NewServiceCache(client) - } else { - return fmt.Errorf("can not create naming client err:%v", err) - } - } - svcCache := w.serviceCache[serviceNamespace] - err := svcCache.AddListener(serviceGroup, serviceName, configKey, w.getServiceCallback(mcpServer, configGroup, dataId)) - if err != nil { - return fmt.Errorf("add listener for dataId %v, service %s/%s error:%v", dataId, serviceGroup, serviceName, err) - } - return nil -} - -func (w *watcher) getServiceCallback(server *provider.McpServer, configGroup, dataId string) func(services []model.Instance) { - groupName := server.RemoteServerConfig.ServiceRef.GroupName - if groupName == "DEFAULT_GROUP" { - groupName = "DEFAULT-GROUP" - } - namespace := server.RemoteServerConfig.ServiceRef.NamespaceId - serviceName := server.RemoteServerConfig.ServiceRef.ServiceName - // Higress doesn't care about the MCP export path configured in nacos. - // Any path of the mcp server are supported in request routing. - path := "/" - protocol := server.Protocol - host := getNacosServiceFullHost(groupName, namespace, serviceName) - - return func(services []model.Instance) { - defer w.UpdateService() - - mcpServerLog.Infof("callback for %s/%s, serviceName : %s", configGroup, dataId, host) - configKey := strings.Join([]string{w.Name, w.NacosNamespace, configGroup, dataId}, DefaultJoiner) - if len(services) == 0 { - mcpServerLog.Errorf("callback for %s return empty service instance list, skip generate config", host) + // TODO support stdio and dubbo protocol + if !supportedProtocols[mcpServer.Protocol] { return } + if err := w.processServerConfig(dataId, info.ServiceInfo, mcpServer); err != nil { + mcpServerLog.Errorf("process mcp server config error:%v, dataId:%s", err, dataId) + } + if err := w.processToolConfig(dataId, info.ToolsSpecConfig, info.Credentials, mcpServer); err != nil { + mcpServerLog.Errorf("process tool config error:%v, dataId:%s", err, dataId) + } + } +} - serviceEntry := w.generateServiceEntry(host, services) +func (w *watcher) processServerConfig(dataId string, services *model.Service, mcpServer *provider.McpServer) error { + serviceHost := getServiceFullHostFromMcpServer(mcpServer) + // generate se for mcp server + serviceEntry := generateServiceEntry(serviceHost, services) + if serviceEntry != nil { se := &config.Config{ Meta: config.Meta{ GroupVersionKind: gvk.ServiceEntry, - Name: fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedSeName, configGroup, strings.TrimSuffix(dataId, ".json")), - Namespace: "mcp", + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedSeName, strings.TrimSuffix(dataId, ".json")), + Namespace: w.namespace, }, Spec: serviceEntry, } - if protocol == provider.McpSSEProtocol { - destinationRule := w.generateDrForSSEService(host) - dr := &config.Config{ - Meta: config.Meta{ - GroupVersionKind: gvk.DestinationRule, - Name: fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedDrName, configGroup, strings.TrimSuffix(dataId, ".json")), - Namespace: w.namespace, - }, - Spec: destinationRule, - } - w.cache.UpdateConfigCache(gvk.DestinationRule, configKey, dr, false) - } - w.cache.UpdateConfigCache(gvk.ServiceEntry, configKey, se, false) - vs := w.buildVirtualServiceForMcpServer(serviceEntry, configGroup, dataId, path, server) - w.cache.UpdateConfigCache(gvk.VirtualService, configKey, vs, false) - mcpServer := w.buildMcpServerForMcpServer(vs.Spec.(*v1alpha3.VirtualService), configGroup, dataId, path, server) - w.cache.UpdateConfigCache(mcpserver.GvkMcpServer, configKey, mcpServer, false) + w.cache.UpdateConfigCache(gvk.ServiceEntry, dataId, se, false) } + // generate vs for mcp server + virtualService := w.buildVirtualServiceForMcpServer(mcpServer, dataId, serviceHost, serviceEntry) + if virtualService != nil { + w.cache.UpdateConfigCache(gvk.VirtualService, dataId, virtualService, false) + ms := w.buildMcpServerForMcpServer(virtualService.Spec.(*v1alpha3.VirtualService), dataId, mcpServer) + w.cache.UpdateConfigCache(mcpserver.GvkMcpServer, dataId, ms, false) + } + // if protocol is sse, we should apply ConsistentHash policy for this service + // if protocol is https, we should apply tls policy for this service + destinationRule := generateDrForMcpServer(serviceHost, mcpServer.Protocol) + if destinationRule != nil { + dr := &config.Config{ + Meta: config.Meta{ + GroupVersionKind: gvk.DestinationRule, + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedDrName, strings.TrimSuffix(dataId, ".json")), + Namespace: w.namespace, + }, + Spec: destinationRule, + } + w.cache.UpdateConfigCache(gvk.DestinationRule, dataId, dr, false) + } + return nil } -func (w *watcher) buildVirtualServiceForMcpServer(serviceentry *v1alpha3.ServiceEntry, group, dataId, path string, server *provider.McpServer) *config.Config { - if serviceentry == nil { +func (w *watcher) processToolConfig(dataId, data string, credentials map[string]interface{}, server *provider.McpServer) error { + if server.Protocol != provider.HttpProtocol && server.Protocol != provider.HttpsProtocol { return nil } + toolsDescription := &provider.McpToolConfig{} + if err := json.Unmarshal([]byte(data), toolsDescription); err != nil { + return fmt.Errorf("unmarshal toolsDescriptionRef to mcp tool config error:%v, data %v", err, data) + } + + routeName := fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, strings.TrimSuffix(dataId, ".json")) + rule := &provider.McpServerRule{ + MatchRoute: []string{routeName}, + Server: &provider.ServerConfig{ + Name: server.Name, + Config: map[string]interface{}{}, + }, + } + rule.Server.Config["credentials"] = credentials + + var allowTools []string + for _, t := range toolsDescription.Tools { + convertTool := &provider.McpTool{Name: t.Name, Description: t.Description} + + toolMeta := toolsDescription.ToolsMeta[t.Name] + if toolMeta != nil && toolMeta.Enabled { + allowTools = append(allowTools, t.Name) + } + argsPosition, err := getArgsPositionFromToolMeta(toolMeta) + if err != nil { + mcpServerLog.Errorf("get args position from tool meta error:%v, tool name %v", err, t.Name) + } + + requiredMap := sets.Set[string]{} + for _, s := range t.InputSchema.Required { + requiredMap.Insert(s) + } + + for argsName, args := range t.InputSchema.Properties { + convertArgs, err := parseMcpArgs(args) + if err != nil { + mcpServerLog.Errorf("parse mcp args error:%v, tool name %v, args name %v", err, t.Name, argsName) + continue + } + convertArgs.Name = argsName + convertArgs.Required = requiredMap.Contains(argsName) + if pos, exist := argsPosition[argsName]; exist { + convertArgs.Position = pos + } + convertTool.Args = append(convertTool.Args, convertArgs) + mcpServerLog.Debugf("parseMcpArgs, toolArgs:%v", convertArgs) + } + + requestTemplate, err := getRequestTemplateFromToolMeta(toolMeta) + if err != nil { + mcpServerLog.Errorf("get request template from tool meta error:%v, tool name %v", err, t.Name) + continue + } else { + convertTool.RequestTemplate = requestTemplate + } + + responseTemplate, err := getResponseTemplateFromToolMeta(toolMeta) + if err != nil { + mcpServerLog.Errorf("get response template from tool meta error:%v, tool name %v", err, t.Name) + continue + } else { + convertTool.ResponseTemplate = responseTemplate + } + rule.Tools = append(rule.Tools, convertTool) + } + + rule.AllowTools = allowTools + wasmPluginConfig := &config.Config{ + Meta: config.Meta{ + GroupVersionKind: gvk.WasmPlugin, + Namespace: w.namespace, + }, + Spec: rule, + } + w.cache.UpdateConfigCache(gvk.WasmPlugin, dataId, wasmPluginConfig, false) + return nil +} + +func (w *watcher) buildVirtualServiceForMcpServer(server *provider.McpServer, dataId, serviceName string, se *v1alpha3.ServiceEntry) *config.Config { + if server == nil { + return nil + } + // if there is no export domain, use default * hosts := w.McpServerExportDomains if len(hosts) == 0 { hosts = []string{"*"} } + // find gateway resources by host var gateways []string for _, host := range hosts { cleanHost := common2.CleanHost(host) @@ -751,15 +507,12 @@ func (w *watcher) buildVirtualServiceForMcpServer(serviceentry *v1alpha3.Service common2.CreateConvertedName(w.clusterId, cleanHost), common2.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost)) } - routeName := fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, group, strings.TrimSuffix(dataId, ".json")) + routeName := fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, strings.TrimSuffix(dataId, ".json")) + // path format: /{base-path}/{mcp-server-name} mergePath := "/" + server.Name - if w.McpServerBaseUrl != "/" { + if w.McpServerBaseUrl != "" && w.McpServerBaseUrl != "/" { mergePath = strings.TrimSuffix(w.McpServerBaseUrl, "/") + mergePath } - if path != "/" { - mergePath = mergePath + "/" + strings.TrimPrefix(path, "/") - } - mergePath = strings.TrimSuffix(mergePath, "/") vs := &v1alpha3.VirtualService{ Hosts: hosts, @@ -771,49 +524,60 @@ func (w *watcher) buildVirtualServiceForMcpServer(serviceentry *v1alpha3.Service // Example: // Assume mergePath=/mcp/test prefixRewrite=/ requestPath=/mcp/test/abc // If we only use prefix match, the rewritten path will be //abc. - Match: []*v1alpha3.HTTPMatchRequest{{ - Uri: &v1alpha3.StringMatch{ - MatchType: &v1alpha3.StringMatch_Exact{ - Exact: mergePath, + Match: []*v1alpha3.HTTPMatchRequest{ + { + Uri: &v1alpha3.StringMatch{ + MatchType: &v1alpha3.StringMatch_Exact{ + Exact: mergePath, + }, }, }, - }, { - Uri: &v1alpha3.StringMatch{ - MatchType: &v1alpha3.StringMatch_Prefix{ - Prefix: mergePath + "/", + { + Uri: &v1alpha3.StringMatch{ + MatchType: &v1alpha3.StringMatch_Prefix{ + Prefix: mergePath + "/", + }, }, }, - }}, + }, Route: []*v1alpha3.HTTPRouteDestination{{ Destination: &v1alpha3.Destination{ - Host: serviceentry.Hosts[0], - Port: &v1alpha3.PortSelector{ - Number: serviceentry.Ports[0].Number, - }, + Host: serviceName, }, }}, }}, } + // we should rewrite path for sse and streamble if routeRewriteProtocols[server.Protocol] { vs.Http[0].Rewrite = &v1alpha3.HTTPRewrite{ Uri: "/", } } + // we should rewrite host for dns service + if se != nil && se.Resolution == v1alpha3.ServiceEntry_DNS && len(se.Endpoints) > 0 { + if vs.Http[0].Rewrite == nil { + vs.Http[0].Rewrite = &v1alpha3.HTTPRewrite{ + Authority: se.Endpoints[0].Address, + } + } else { + vs.Http[0].Rewrite.Authority = se.Endpoints[0].Address + } + } mcpServerLog.Debugf("construct virtualservice %v", vs) return &config.Config{ Meta: config.Meta{ GroupVersionKind: gvk.VirtualService, - Name: fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedVsName, group, dataId), + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedVsName, dataId), Namespace: w.namespace, }, Spec: vs, } } -func (w *watcher) buildMcpServerForMcpServer(vs *v1alpha3.VirtualService, group, dataId, path string, server *provider.McpServer) *config.Config { +func (w *watcher) buildMcpServerForMcpServer(vs *v1alpha3.VirtualService, dataId string, server *provider.McpServer) *config.Config { if vs == nil { return nil } @@ -821,7 +585,7 @@ func (w *watcher) buildMcpServerForMcpServer(vs *v1alpha3.VirtualService, group, if len(domains) == 0 { domains = []string{"*"} } - name := fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedMcpServerName, group, strings.TrimSuffix(dataId, ".json")) + name := fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedMcpServerName, strings.TrimSuffix(dataId, ".json")) httpRoute := vs.Http[0] pathMatchValue := "" for _, match := range httpRoute.Match { @@ -856,66 +620,34 @@ func (w *watcher) buildMcpServerForMcpServer(vs *v1alpha3.VirtualService, group, } } -func (w *watcher) generateServiceEntry(host string, services []model.Instance) *v1alpha3.ServiceEntry { - portList := make([]*v1alpha3.ServicePort, 0) - endpoints := make([]*v1alpha3.WorkloadEntry, 0) - isDnsService := false - - for _, service := range services { - protocol := common.HTTP - if service.Metadata != nil && service.Metadata["protocol"] != "" { - protocol = common.ParseProtocol(service.Metadata["protocol"]) - } - port := &v1alpha3.ServicePort{ - Name: protocol.String(), - Number: uint32(service.Port), - Protocol: protocol.String(), - } - if len(portList) == 0 { - portList = append(portList, port) - } - if !isValidIP(service.Ip) { - isDnsService = true - } - endpoint := &v1alpha3.WorkloadEntry{ - Address: service.Ip, - Ports: map[string]uint32{port.Protocol: port.Number}, - Labels: service.Metadata, - } - endpoints = append(endpoints, endpoint) - } - - resolution := v1alpha3.ServiceEntry_STATIC - if isDnsService { - resolution = v1alpha3.ServiceEntry_DNS - } - se := &v1alpha3.ServiceEntry{ - Hosts: []string{host}, - Ports: portList, - Location: v1alpha3.ServiceEntry_MESH_INTERNAL, - Resolution: resolution, - Endpoints: endpoints, - } - - return se -} - -func (w *watcher) generateDrForSSEService(host string) *v1alpha3.DestinationRule { - dr := &v1alpha3.DestinationRule{ - Host: host, - TrafficPolicy: &v1alpha3.TrafficPolicy{ - LoadBalancer: &v1alpha3.LoadBalancerSettings{ - LbPolicy: &v1alpha3.LoadBalancerSettings_ConsistentHash{ - ConsistentHash: &v1alpha3.LoadBalancerSettings_ConsistentHashLB{ - HashKey: &v1alpha3.LoadBalancerSettings_ConsistentHashLB_UseSourceIp{ - UseSourceIp: true, +func generateDrForMcpServer(host, protocol string) *v1alpha3.DestinationRule { + switch protocol { + case provider.McpSSEProtocol: + return &v1alpha3.DestinationRule{ + Host: host, + TrafficPolicy: &v1alpha3.TrafficPolicy{ + LoadBalancer: &v1alpha3.LoadBalancerSettings{ + LbPolicy: &v1alpha3.LoadBalancerSettings_ConsistentHash{ + ConsistentHash: &v1alpha3.LoadBalancerSettings_ConsistentHashLB{ + HashKey: &v1alpha3.LoadBalancerSettings_ConsistentHashLB_UseSourceIp{ + UseSourceIp: true, + }, }, }, }, }, - }, + } + case provider.HttpsProtocol: + return &v1alpha3.DestinationRule{ + Host: host, + TrafficPolicy: &v1alpha3.TrafficPolicy{ + Tls: &v1alpha3.ClientTLSSettings{ + Mode: v1alpha3.ClientTLSSettings_SIMPLE, + }, + }, + } } - return dr + return nil } func parseMcpArgs(args interface{}) (*provider.ToolArgs, error) { @@ -1016,35 +748,104 @@ func mergeMaps(maps ...map[string]string) map[string]string { return res } -func getNacosServiceFullHost(groupName, namespace, serviceName string) string { +func getServiceFullHostFromMcpServer(server *provider.McpServer) string { + if server == nil || server.RemoteServerConfig == nil || server.RemoteServerConfig.ServiceRef == nil { + return "" + } + groupName := server.RemoteServerConfig.ServiceRef.GroupName + if groupName == "DEFAULT_GROUP" { + groupName = "DEFAULT-GROUP" + } + namespace := server.RemoteServerConfig.ServiceRef.NamespaceId + serviceName := server.RemoteServerConfig.ServiceRef.ServiceName suffix := strings.Join([]string{groupName, namespace, string(provider.Nacos)}, common.DotSeparator) host := strings.Join([]string{serviceName, suffix}, common.DotSeparator) return host } +func generateServiceEntry(host string, services *model.Service) *v1alpha3.ServiceEntry { + if services == nil || len(services.Hosts) == 0 { + return nil + } + portList := make([]*v1alpha3.ServicePort, 0) + endpoints := make([]*v1alpha3.WorkloadEntry, 0) + + for _, service := range services.Hosts { + protocol := common.HTTP + if service.Metadata != nil && service.Metadata["protocol"] != "" { + protocol = common.ParseProtocol(service.Metadata["protocol"]) + } + port := &v1alpha3.ServicePort{ + Name: protocol.String(), + Number: uint32(service.Port), + Protocol: protocol.String(), + } + if len(portList) == 0 { + portList = append(portList, port) + } + endpoint := &v1alpha3.WorkloadEntry{ + Address: service.Ip, + Ports: map[string]uint32{port.Protocol: port.Number}, + Labels: service.Metadata, + } + endpoints = append(endpoints, endpoint) + } + + se := &v1alpha3.ServiceEntry{ + Hosts: []string{host}, + Ports: portList, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Resolution: getNacosServiceResolution(services), + Endpoints: endpoints, + } + + return se +} + +func isValidIP(ipStr string) bool { + ip := net.ParseIP(ipStr) + return ip != nil +} + +func getNacosServiceResolution(services *model.Service) v1alpha3.ServiceEntry_Resolution { + ipEndpoints := 0 + dnsEndpoints := 0 + for _, service := range services.Hosts { + if isValidIP(service.Ip) { + ipEndpoints = ipEndpoints + 1 + } else { + dnsEndpoints = dnsEndpoints + 1 + } + } + if ipEndpoints > 0 && dnsEndpoints > 0 { + mcpServerLog.Errorf("nacos service %v has both ip and dns endpoints, set to ip resolution ", services.Name) + return v1alpha3.ServiceEntry_STATIC + } + if ipEndpoints > 0 { + return v1alpha3.ServiceEntry_STATIC + } + return v1alpha3.ServiceEntry_DNS +} + func (w *watcher) Stop() { w.mutex.Lock() defer w.mutex.Unlock() - mcpServerLog.Infof("unsubscribe all configs") + for key := range w.watchingConfig { - s := strings.Split(key, DefaultJoiner) - err := w.unsubscribe(s[0], s[1]) + err := w.registryClient.CancelListenToServer(key) if err == nil { delete(w.watchingConfig, key) + w.cache.UpdateConfigCache(config.GroupVersionKind{}, key, nil, true) + mcpServerLog.Infof("cancel listen to mcp server config %v", key) } } - mcpServerLog.Infof("stop all service nameing client") - for _, client := range w.serviceCache { - // TODO: This is a temporary implementation because of a bug in the nacos-go-sdk, which causes a block when stoping. - go client.Stop() - } w.isStop = true - mcpServerLog.Infof("stop all config client") - mcpServerLog.Infof("watcher %v stop", w.Name) + w.UpdateService() close(w.stop) w.Ready(false) + w.registryClient.CloseClient() } func (w *watcher) IsHealthy() bool { @@ -1054,18 +855,3 @@ func (w *watcher) IsHealthy() bool { func (w *watcher) GetRegistryType() string { return w.RegistryType.String() } - -func isValidIP(ipStr string) bool { - ip := net.ParseIP(ipStr) - return ip != nil -} - -func normalizeRewritePathPrefix(path string) string { - if path == "" || path == "/" { - return "/" - } - if path[0] != '/' { - path = "/" + path - } - return strings.TrimSuffix(path, "/") -} diff --git a/registry/nacos/mcpserver/watcher_test.go b/registry/nacos/mcpserver/watcher_test.go new file mode 100644 index 000000000..32da00765 --- /dev/null +++ b/registry/nacos/mcpserver/watcher_test.go @@ -0,0 +1,585 @@ +// Copyright (c) 2022 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mcpserver + +import ( + "fmt" + "reflect" + "strings" + "sync" + "testing" + + apiv1 "github.com/alibaba/higress/api/networking/v1" + common2 "github.com/alibaba/higress/pkg/ingress/kube/common" + provider "github.com/alibaba/higress/registry" + "github.com/alibaba/higress/registry/memory" + "github.com/nacos-group/nacos-sdk-go/v2/model" + "github.com/stretchr/testify/mock" + wrappers "google.golang.org/protobuf/types/known/wrapperspb" + "istio.io/api/networking/v1alpha3" + "istio.io/istio/pkg/config" + "istio.io/istio/pkg/config/constants" + "istio.io/istio/pkg/config/schema/gvk" +) + +type mockWatcher struct { + watcher + mock.Mock +} + +func newTestWatcher(cache memory.Cache, opts ...WatcherOption) mockWatcher { + w := &watcher{ + watchingConfig: make(map[string]bool), + RegistryType: "mcpserver", + Status: provider.UnHealthy, + cache: cache, + mutex: &sync.Mutex{}, + stop: make(chan struct{}), + } + + w.NacosRefreshInterval = int64(DefaultRefreshInterval) + + for _, opt := range opts { + opt(w) + } + + if w.NacosNamespace == "" { + w.NacosNamespace = w.NacosNamespaceId + } + + return mockWatcher{watcher: *w, Mock: mock.Mock{}} +} + +func testCallback(msc *McpServerConfig) memory.Cache { + registryConfig := &apiv1.RegistryConfig{ + Type: string(provider.Nacos), + Name: "mse-nacos-public", + Domain: "", + Port: 8848, + NacosAddressServer: "", + NacosAccessKey: "ak", + NacosSecretKey: "sk", + NacosNamespaceId: "", + NacosNamespace: "public", + NacosGroups: []string{"dev"}, + NacosRefreshInterval: 0, + EnableMCPServer: wrappers.Bool(true), + McpServerExportDomains: []string{"mcp.com"}, + McpServerBaseUrl: "/mcp-servers/", + EnableScopeMcpServers: wrappers.Bool(true), + AllowMcpServers: []string{"mcp-server-1", "mcp-server-2"}, + Metadata: map[string]*apiv1.InnerMap{ + "routeName": &apiv1.InnerMap{ + InnerMap: map[string]string{"mcp-server-1": "mcp-route-1", "mcp-server-2": "mcp-route-2"}, + }, + }, + } + localCache := memory.NewCache() + + testWatcher := newTestWatcher(localCache, + WithType(registryConfig.Type), + WithName(registryConfig.Name), + WithNacosAddressServer(registryConfig.NacosAddressServer), + WithDomain(registryConfig.Domain), + WithPort(registryConfig.Port), + WithNacosNamespaceId(registryConfig.NacosNamespaceId), + WithNacosNamespace(registryConfig.NacosNamespace), + WithNacosGroups(registryConfig.NacosGroups), + WithNacosAccessKey(registryConfig.NacosAccessKey), + WithNacosSecretKey(registryConfig.NacosSecretKey), + WithNacosRefreshInterval(registryConfig.NacosRefreshInterval), + WithMcpExportDomains(registryConfig.McpServerExportDomains), + WithMcpBaseUrl(registryConfig.McpServerBaseUrl), + WithEnableMcpServer(registryConfig.EnableMCPServer)) + testWatcher.AppendServiceUpdateHandler(func() { + fmt.Println("testWatcher service update success") + }) + + callback := testWatcher.mcpServerListener("mock-data-id") + callback(msc) + return localCache +} + +func Test_Watcher(t *testing.T) { + dataId := "mock-data-id" + + testCase := []struct { + name string + msc *McpServerConfig + dataId string + wantConfig map[string]*config.Config + }{ + { + name: "normal case", + dataId: dataId, + msc: &McpServerConfig{ + Credentials: map[string]interface{}{ + "test-server": map[string]string{"data": "value"}, + }, + ServiceInfo: &model.Service{ + Hosts: []model.Instance{ + { + Ip: "127.0.0.1", + Port: 8080, + Metadata: map[string]string{"protocol": "http"}, + }, + }, + }, + ServerSpecConfig: `{ + "name": "explore", + "protocol": "http", + "description": "explore", + "remoteServerConfig": { + "serviceRef": { + "namespaceId": "public", + "groupName": "DEFAULT_GROUP", + "serviceName": "explore" + }, + "exportPath": "" + }, + "enabled": true + }`, + ToolsSpecConfig: `{ + "tools": [ + { + "name": "explore", + "description": "find name from tag", + "inputSchema": { + "type": "object", + "properties": { + "tags": { + "type": "string", + "description": "tag" + } + } + } + } + ], + "toolsMeta": { + "explore": { + "enabled": true, + "templates": { + "json-go-template": { + "requestTemplate": { + "method": "GET", + "url": "/v0/explore", + "argsToUrlParam": true + } + } + } + } + } + }`, + }, + wantConfig: map[string]*config.Config{ + gvk.ServiceEntry.String(): &config.Config{ + Meta: config.Meta{ + GroupVersionKind: gvk.ServiceEntry, + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedSeName, strings.TrimSuffix(dataId, ".json")), + }, + Spec: &v1alpha3.ServiceEntry{ + Hosts: []string{"explore.DEFAULT-GROUP.public.nacos"}, + Ports: []*v1alpha3.ServicePort{ + { + Number: 8080, + Name: "HTTP", + Protocol: "HTTP", + }, + }, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Resolution: v1alpha3.ServiceEntry_STATIC, + Endpoints: []*v1alpha3.WorkloadEntry{ + { + Address: "127.0.0.1", + Ports: map[string]uint32{ + "HTTP": 8080, + }, + Labels: map[string]string{ + "protocol": "http", + }, + }, + }, + }, + }, + gvk.VirtualService.String(): &config.Config{ + Meta: config.Meta{ + GroupVersionKind: gvk.VirtualService, + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedVsName, strings.TrimSuffix(dataId, ".json")), + }, + Spec: &v1alpha3.VirtualService{ + Gateways: []string{"/" + common2.CleanHost("mcp.com"), common2.CreateConvertedName(constants.IstioIngressGatewayName, common2.CleanHost("mcp.com"))}, + Hosts: []string{"mcp.com"}, + Http: []*v1alpha3.HTTPRoute{ + { + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, strings.TrimSuffix(dataId, ".json")), + Match: []*v1alpha3.HTTPMatchRequest{ + { + Uri: &v1alpha3.StringMatch{ + MatchType: &v1alpha3.StringMatch_Exact{ + Exact: "/mcp-servers/explore", + }, + }, + }, + { + Uri: &v1alpha3.StringMatch{ + MatchType: &v1alpha3.StringMatch_Prefix{ + Prefix: "/mcp-servers/explore/", + }, + }, + }, + }, + Route: []*v1alpha3.HTTPRouteDestination{ + { + Destination: &v1alpha3.Destination{ + Host: "explore.DEFAULT-GROUP.public.nacos", + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "sse and dns endpoint case", + dataId: dataId, + msc: &McpServerConfig{ + Credentials: map[string]interface{}{ + "test-server": map[string]string{"data": "value"}, + }, + ServiceInfo: &model.Service{ + Hosts: []model.Instance{ + { + Ip: "example.com", + Port: 8080, + Metadata: map[string]string{"protocol": "http"}, + }, + }, + }, + ServerSpecConfig: `{ + "name": "explore", + "protocol": "mcp-sse", + "description": "explore", + "remoteServerConfig": { + "serviceRef": { + "namespaceId": "public", + "groupName": "DEFAULT_GROUP", + "serviceName": "explore" + }, + "exportPath": "" + }, + "enabled": true + }`, + ToolsSpecConfig: `{ + "tools": [ + { + "name": "explore", + "description": "find name from tag", + "inputSchema": { + "type": "object", + "properties": { + "tags": { + "type": "string", + "description": "tag" + } + } + } + } + ], + "toolsMeta": { + "explore": { + "enabled": true, + "templates": { + "json-go-template": { + "requestTemplate": { + "method": "GET", + "url": "/v0/explore", + "argsToUrlParam": true + } + } + } + } + } + }`, + }, + wantConfig: map[string]*config.Config{ + gvk.ServiceEntry.String(): &config.Config{ + Meta: config.Meta{ + GroupVersionKind: gvk.ServiceEntry, + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedSeName, strings.TrimSuffix(dataId, ".json")), + }, + Spec: &v1alpha3.ServiceEntry{ + Hosts: []string{"explore.DEFAULT-GROUP.public.nacos"}, + Ports: []*v1alpha3.ServicePort{ + { + Number: 8080, + Name: "HTTP", + Protocol: "HTTP", + }, + }, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Resolution: v1alpha3.ServiceEntry_DNS, + Endpoints: []*v1alpha3.WorkloadEntry{ + { + Address: "example.com", + Ports: map[string]uint32{ + "HTTP": 8080, + }, + Labels: map[string]string{ + "protocol": "http", + }, + }, + }, + }, + }, + gvk.VirtualService.String(): &config.Config{ + Meta: config.Meta{ + GroupVersionKind: gvk.VirtualService, + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedVsName, strings.TrimSuffix(dataId, ".json")), + }, + Spec: &v1alpha3.VirtualService{ + Gateways: []string{"/" + common2.CleanHost("mcp.com"), common2.CreateConvertedName(constants.IstioIngressGatewayName, common2.CleanHost("mcp.com"))}, + Hosts: []string{"mcp.com"}, + Http: []*v1alpha3.HTTPRoute{ + { + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, strings.TrimSuffix(dataId, ".json")), + Match: []*v1alpha3.HTTPMatchRequest{ + { + Uri: &v1alpha3.StringMatch{ + MatchType: &v1alpha3.StringMatch_Exact{ + Exact: "/mcp-servers/explore", + }, + }, + }, + { + Uri: &v1alpha3.StringMatch{ + MatchType: &v1alpha3.StringMatch_Prefix{ + Prefix: "/mcp-servers/explore/", + }, + }, + }, + }, + Route: []*v1alpha3.HTTPRouteDestination{ + { + Destination: &v1alpha3.Destination{ + Host: "explore.DEFAULT-GROUP.public.nacos", + }, + }, + }, + Rewrite: &v1alpha3.HTTPRewrite{ + Uri: "/", + Authority: "example.com", + }, + }, + }, + }, + }, + gvk.DestinationRule.String(): &config.Config{ + Meta: config.Meta{ + GroupVersionKind: gvk.DestinationRule, + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedDrName, strings.TrimSuffix(dataId, ".json")), + }, + Spec: &v1alpha3.DestinationRule{ + Host: "explore.DEFAULT-GROUP.public.nacos", + TrafficPolicy: &v1alpha3.TrafficPolicy{ + LoadBalancer: &v1alpha3.LoadBalancerSettings{ + LbPolicy: &v1alpha3.LoadBalancerSettings_ConsistentHash{ + ConsistentHash: &v1alpha3.LoadBalancerSettings_ConsistentHashLB{ + HashKey: &v1alpha3.LoadBalancerSettings_ConsistentHashLB_UseSourceIp{ + UseSourceIp: true, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "https and dns case", + dataId: dataId, + msc: &McpServerConfig{ + Credentials: map[string]interface{}{ + "test-server": map[string]string{"data": "value"}, + }, + ServiceInfo: &model.Service{ + Hosts: []model.Instance{ + { + Ip: "example.com", + Port: 8080, + Metadata: map[string]string{"protocol": "https"}, + }, + }, + }, + ServerSpecConfig: `{ + "name": "explore", + "protocol": "https", + "description": "explore", + "remoteServerConfig": { + "serviceRef": { + "namespaceId": "public", + "groupName": "DEFAULT_GROUP", + "serviceName": "explore" + }, + "exportPath": "" + }, + "enabled": true + }`, + ToolsSpecConfig: `{ + "tools": [ + { + "name": "explore", + "description": "find name from tag", + "inputSchema": { + "type": "object", + "properties": { + "tags": { + "type": "string", + "description": "tag" + } + } + } + } + ], + "toolsMeta": { + "explore": { + "enabled": true, + "templates": { + "json-go-template": { + "requestTemplate": { + "method": "GET", + "url": "/v0/explore", + "argsToUrlParam": true + } + } + } + } + } + }`, + }, + wantConfig: map[string]*config.Config{ + gvk.ServiceEntry.String(): &config.Config{ + Meta: config.Meta{ + GroupVersionKind: gvk.ServiceEntry, + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedSeName, strings.TrimSuffix(dataId, ".json")), + }, + Spec: &v1alpha3.ServiceEntry{ + Hosts: []string{"explore.DEFAULT-GROUP.public.nacos"}, + Ports: []*v1alpha3.ServicePort{ + { + Number: 8080, + Name: "HTTPS", + Protocol: "HTTPS", + }, + }, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Resolution: v1alpha3.ServiceEntry_DNS, + Endpoints: []*v1alpha3.WorkloadEntry{ + { + Address: "example.com", + Ports: map[string]uint32{ + "HTTPS": 8080, + }, + Labels: map[string]string{ + "protocol": "https", + }, + }, + }, + }, + }, + gvk.VirtualService.String(): &config.Config{ + Meta: config.Meta{ + GroupVersionKind: gvk.VirtualService, + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedVsName, strings.TrimSuffix(dataId, ".json")), + }, + Spec: &v1alpha3.VirtualService{ + Gateways: []string{"/" + common2.CleanHost("mcp.com"), common2.CreateConvertedName(constants.IstioIngressGatewayName, common2.CleanHost("mcp.com"))}, + Hosts: []string{"mcp.com"}, + Http: []*v1alpha3.HTTPRoute{ + { + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, strings.TrimSuffix(dataId, ".json")), + Match: []*v1alpha3.HTTPMatchRequest{ + { + Uri: &v1alpha3.StringMatch{ + MatchType: &v1alpha3.StringMatch_Exact{ + Exact: "/mcp-servers/explore", + }, + }, + }, + { + Uri: &v1alpha3.StringMatch{ + MatchType: &v1alpha3.StringMatch_Prefix{ + Prefix: "/mcp-servers/explore/", + }, + }, + }, + }, + Route: []*v1alpha3.HTTPRouteDestination{ + { + Destination: &v1alpha3.Destination{ + Host: "explore.DEFAULT-GROUP.public.nacos", + }, + }, + }, + Rewrite: &v1alpha3.HTTPRewrite{ + Authority: "example.com", + }, + }, + }, + }, + }, + gvk.DestinationRule.String(): &config.Config{ + Meta: config.Meta{ + GroupVersionKind: gvk.DestinationRule, + Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedDrName, strings.TrimSuffix(dataId, ".json")), + }, + Spec: &v1alpha3.DestinationRule{ + Host: "explore.DEFAULT-GROUP.public.nacos", + TrafficPolicy: &v1alpha3.TrafficPolicy{ + Tls: &v1alpha3.ClientTLSSettings{ + Mode: v1alpha3.ClientTLSSettings_SIMPLE, + }, + }, + }, + }, + }, + }, + } + + for _, tc := range testCase { + t.Run(tc.name, func(t *testing.T) { + localCache := testCallback(tc.msc) + se := localCache.GetAllConfigs(gvk.ServiceEntry)[dataId] + wantSe := tc.wantConfig[gvk.ServiceEntry.String()] + if !reflect.DeepEqual(se, wantSe) { + t.Errorf("se is not equal, want %v\n, got %v", wantSe, se) + } + + vs := localCache.GetAllConfigs(gvk.VirtualService)[dataId] + wantVs := tc.wantConfig[gvk.VirtualService.String()] + if !reflect.DeepEqual(vs, wantVs) { + t.Errorf("vs is not equal, want %v\n, got %v", wantVs, vs) + } + + dr := localCache.GetAllConfigs(gvk.DestinationRule)[dataId] + wantDr := tc.wantConfig[gvk.DestinationRule.String()] + if !reflect.DeepEqual(dr, wantDr) { + t.Errorf("dr is not equal, want %v\n, got %v", wantDr, dr) + } + }) + } +} diff --git a/registry/nacos/v2/watcher.go b/registry/nacos/v2/watcher.go index c58a09204..bf63f6b2f 100644 --- a/registry/nacos/v2/watcher.go +++ b/registry/nacos/v2/watcher.go @@ -16,11 +16,14 @@ package v2 import ( "errors" + "fmt" + "net" "strconv" "strings" "sync" "time" + "github.com/golang/protobuf/ptypes/wrappers" "github.com/nacos-group/nacos-sdk-go/v2/clients" "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client" "github.com/nacos-group/nacos-sdk-go/v2/common/constant" @@ -32,9 +35,11 @@ import ( apiv1 "github.com/alibaba/higress/api/networking/v1" "github.com/alibaba/higress/pkg/common" + "github.com/alibaba/higress/registry" provider "github.com/alibaba/higress/registry" "github.com/alibaba/higress/registry/memory" "github.com/alibaba/higress/registry/nacos/address" + "github.com/alibaba/higress/registry/nacos/mcpserver" ) const ( @@ -68,6 +73,9 @@ type watcher struct { updateCacheWhenEmpty bool nacosClientConfig *constant.ClientConfig authOption provider.AuthOption + namespace string + clusterId string + mcpWatcher provider.Watcher } type WatcherOption func(w *watcher) @@ -88,6 +96,45 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er opt(w) } + if w.EnableMCPServer != nil && w.EnableMCPServer.GetValue() { + if w.Type != string(registry.Nacos3) { + log.Errorf("can not create mcpWatcher for nacos 2.x type, required nacos 3.x") + } else { + mcpWatcher, err := mcpserver.NewWatcher( + cache, + mcpserver.WithType(w.Type), + mcpserver.WithName(w.Name), + mcpserver.WithNacosAddressServer(w.NacosAddressServer), + mcpserver.WithDomain(w.Domain), + mcpserver.WithPort(w.Port), + mcpserver.WithNacosNamespaceId(w.NacosNamespaceId), + mcpserver.WithNacosNamespace(w.NacosNamespace), + mcpserver.WithNacosGroups(w.NacosGroups), + mcpserver.WithNacosAccessKey(w.NacosAccessKey), + mcpserver.WithNacosSecretKey(w.NacosSecretKey), + mcpserver.WithNacosRefreshInterval(w.NacosRefreshInterval), + mcpserver.WithMcpExportDomains(w.McpServerExportDomains), + mcpserver.WithMcpBaseUrl(w.McpServerBaseUrl), + mcpserver.WithEnableMcpServer(w.EnableMCPServer), + mcpserver.WithClusterId(w.clusterId), + mcpserver.WithNamespace(w.namespace), + mcpserver.WithAuthOption(w.authOption), + ) + if err != nil { + return nil, fmt.Errorf("can not create mcp server watcher, err:%v", err) + } + var once sync.Once + mcpWatcher.ReadyHandler(func(ready bool) { + once.Do(func() { + if ready { + log.Infof("Registry mcp Watcher is ready, type:%s, name:%s", w.Type, w.Name) + } + }) + }) + w.mcpWatcher = mcpWatcher + } + } + if w.NacosNamespace == "" { w.NacosNamespace = w.NacosNamespaceId } @@ -233,15 +280,51 @@ func WithAuthOption(authOption provider.AuthOption) WatcherOption { } } +func WithMcpExportDomains(exportDomains []string) WatcherOption { + return func(w *watcher) { + w.McpServerExportDomains = exportDomains + } +} + +func WithMcpBaseUrl(url string) WatcherOption { + return func(w *watcher) { + w.McpServerBaseUrl = url + } +} + +func WithEnableMcpServer(enable *wrappers.BoolValue) WatcherOption { + return func(w *watcher) { + w.EnableMCPServer = enable + } +} + +func WithNamespace(ns string) WatcherOption { + return func(w *watcher) { + w.namespace = ns + } +} + +func WithClusterId(id string) WatcherOption { + return func(w *watcher) { + w.clusterId = id + } +} + func (w *watcher) Run() { ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval)) defer ticker.Stop() w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10)) + if w.mcpWatcher != nil { + w.mcpWatcher.AppendServiceUpdateHandler(w.UpdateService) + go w.mcpWatcher.Run() + } err := w.fetchAllServices() if err != nil { log.Errorf("first fetch services failed, err:%v", err) } else { - w.Ready(true) + if w.mcpWatcherReady() { + w.Ready(true) + } } for { select { @@ -250,7 +333,9 @@ func (w *watcher) Run() { if err != nil { log.Errorf("fetch services failed, err:%v", err) } else { - w.Ready(true) + if w.mcpWatcherReady() { + w.Ready(true) + } } case <-w.stop: return @@ -258,6 +343,10 @@ func (w *watcher) Run() { } } +func (w *watcher) mcpWatcherReady() bool { + return w.mcpWatcher == nil || w.mcpWatcher.IsReady() +} + func (w *watcher) updateNacosClient() { for { select { @@ -438,6 +527,7 @@ func (w *watcher) getSubscribeCallback(groupName string, serviceName string) fun func (w *watcher) generateServiceEntry(host string, services []model.Instance) *v1alpha3.ServiceEntry { portList := make([]*v1alpha3.ServicePort, 0) endpoints := make([]*v1alpha3.WorkloadEntry, 0) + isDnsService := false for _, service := range services { protocol := common.HTTP @@ -452,6 +542,9 @@ func (w *watcher) generateServiceEntry(host string, services []model.Instance) * if len(portList) == 0 { portList = append(portList, port) } + if !isValidIP(service.Ip) { + isDnsService = true + } endpoint := &v1alpha3.WorkloadEntry{ Address: service.Ip, Ports: map[string]uint32{port.Protocol: port.Number}, @@ -460,11 +553,15 @@ func (w *watcher) generateServiceEntry(host string, services []model.Instance) * endpoints = append(endpoints, endpoint) } + resolution := v1alpha3.ServiceEntry_STATIC + if isDnsService { + resolution = v1alpha3.ServiceEntry_DNS + } se := &v1alpha3.ServiceEntry{ Hosts: []string{host}, Ports: portList, Location: v1alpha3.ServiceEntry_MESH_INTERNAL, - Resolution: v1alpha3.ServiceEntry_STATIC, + Resolution: resolution, Endpoints: endpoints, } @@ -477,6 +574,9 @@ func (w *watcher) Stop() { if w.addrProvider != nil { w.addrProvider.Stop() } + if w.mcpWatcher != nil { + w.mcpWatcher.Stop() + } for key := range w.WatchingServices { s := strings.Split(key, DefaultJoiner) err := w.unsubscribe(s[0], s[1]) @@ -523,3 +623,8 @@ func shouldSubscribe(serviceName string) bool { return true } + +func isValidIP(ipStr string) bool { + ip := net.ParseIP(ipStr) + return ip != nil +} diff --git a/registry/reconcile/reconcile.go b/registry/reconcile/reconcile.go index 180c3701a..364c24017 100644 --- a/registry/reconcile/reconcile.go +++ b/registry/reconcile/reconcile.go @@ -36,7 +36,6 @@ import ( "github.com/alibaba/higress/registry/eureka" "github.com/alibaba/higress/registry/memory" "github.com/alibaba/higress/registry/nacos" - "github.com/alibaba/higress/registry/nacos/mcpserver" nacosv2 "github.com/alibaba/higress/registry/nacos/v2" "github.com/alibaba/higress/registry/zookeeper" ) @@ -171,7 +170,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC nacos.WithNacosRefreshInterval(registry.NacosRefreshInterval), nacos.WithAuthOption(authOption), ) - case string(Nacos2): + case string(Nacos2), string(Nacos3): watcher, err = nacosv2.NewWatcher( r.Cache, nacosv2.WithType(registry.Type), @@ -185,44 +184,13 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC nacosv2.WithNacosNamespace(registry.NacosNamespace), nacosv2.WithNacosGroups(registry.NacosGroups), nacosv2.WithNacosRefreshInterval(registry.NacosRefreshInterval), + nacosv2.WithMcpExportDomains(registry.McpServerExportDomains), + nacosv2.WithMcpBaseUrl(registry.McpServerBaseUrl), + nacosv2.WithEnableMcpServer(registry.EnableMCPServer), + nacosv2.WithClusterId(r.clusterId), + nacosv2.WithNamespace(r.namespace), nacosv2.WithAuthOption(authOption), ) - case string(Nacos3): - if registry.EnableMCPServer.GetValue() { - watcher, err = mcpserver.NewWatcher( - r.Cache, - mcpserver.WithType(registry.Type), - mcpserver.WithName(registry.Name), - mcpserver.WithNacosAddressServer(registry.NacosAddressServer), - mcpserver.WithDomain(registry.Domain), - mcpserver.WithPort(registry.Port), - mcpserver.WithNacosAccessKey(registry.NacosAccessKey), - mcpserver.WithNacosSecretKey(registry.NacosSecretKey), - mcpserver.WithNacosRefreshInterval(registry.NacosRefreshInterval), - mcpserver.WithMcpExportDomains(registry.McpServerExportDomains), - mcpserver.WithMcpBaseUrl(registry.McpServerBaseUrl), - mcpserver.WithEnableMcpServer(registry.EnableMCPServer), - mcpserver.WithClusterId(r.clusterId), - mcpserver.WithNamespace(r.namespace), - mcpserver.WithAuthOption(authOption), - ) - } else { - watcher, err = nacosv2.NewWatcher( - r.Cache, - nacosv2.WithType(registry.Type), - nacosv2.WithName(registry.Name), - nacosv2.WithNacosAddressServer(registry.NacosAddressServer), - nacosv2.WithDomain(registry.Domain), - nacosv2.WithPort(registry.Port), - nacosv2.WithNacosAccessKey(registry.NacosAccessKey), - nacosv2.WithNacosSecretKey(registry.NacosSecretKey), - nacosv2.WithNacosNamespaceId(registry.NacosNamespaceId), - nacosv2.WithNacosNamespace(registry.NacosNamespace), - nacosv2.WithNacosGroups(registry.NacosGroups), - nacosv2.WithNacosRefreshInterval(registry.NacosRefreshInterval), - nacosv2.WithAuthOption(authOption), - ) - } case string(Zookeeper): watcher, err = zookeeper.NewWatcher( r.Cache,