fix: refactored mcp server auto discovery logic and fix some issue (#2382)

Co-authored-by: johnlanni <zty98751@alibaba-inc.com>
This commit is contained in:
EricaLiu
2025-06-10 17:11:34 +08:00
committed by GitHub
parent 69d877c116
commit d2f09fe8c5
15 changed files with 1822 additions and 832 deletions

View File

@@ -250,6 +250,10 @@ spec:
registries: registries:
items: items:
properties: properties:
allowMcpServers:
items:
type: string
type: array
authSecretName: authSecretName:
type: string type: string
consulDatacenter: consulDatacenter:
@@ -265,12 +269,23 @@ spec:
type: string type: string
enableMCPServer: enableMCPServer:
type: boolean type: boolean
enableScopeMcpServers:
type: boolean
mcpServerBaseUrl: mcpServerBaseUrl:
type: string type: string
mcpServerExportDomains: mcpServerExportDomains:
items: items:
type: string type: string
type: array type: array
metadata:
additionalProperties:
properties:
innerMap:
additionalProperties:
type: string
type: object
type: object
type: object
nacosAccessKey: nacosAccessKey:
type: string type: string
nacosAddressServer: nacosAddressServer:

View File

@@ -133,6 +133,9 @@ type RegistryConfig struct {
McpServerExportDomains []string `protobuf:"bytes,20,rep,name=mcpServerExportDomains,proto3" json:"mcpServerExportDomains,omitempty"` McpServerExportDomains []string `protobuf:"bytes,20,rep,name=mcpServerExportDomains,proto3" json:"mcpServerExportDomains,omitempty"`
McpServerBaseUrl string `protobuf:"bytes,21,opt,name=mcpServerBaseUrl,proto3" json:"mcpServerBaseUrl,omitempty"` McpServerBaseUrl string `protobuf:"bytes,21,opt,name=mcpServerBaseUrl,proto3" json:"mcpServerBaseUrl,omitempty"`
EnableMCPServer *wrappers.BoolValue `protobuf:"bytes,22,opt,name=enableMCPServer,proto3" json:"enableMCPServer,omitempty"` EnableMCPServer *wrappers.BoolValue `protobuf:"bytes,22,opt,name=enableMCPServer,proto3" json:"enableMCPServer,omitempty"`
EnableScopeMcpServers *wrappers.BoolValue `protobuf:"bytes,23,opt,name=enableScopeMcpServers,proto3" json:"enableScopeMcpServers,omitempty"`
AllowMcpServers []string `protobuf:"bytes,24,rep,name=allowMcpServers,proto3" json:"allowMcpServers,omitempty"`
Metadata map[string]*InnerMap `protobuf:"bytes,25,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
} }
func (x *RegistryConfig) Reset() { func (x *RegistryConfig) Reset() {
@@ -321,6 +324,74 @@ func (x *RegistryConfig) GetEnableMCPServer() *wrappers.BoolValue {
return nil return nil
} }
func (x *RegistryConfig) GetEnableScopeMcpServers() *wrappers.BoolValue {
if x != nil {
return x.EnableScopeMcpServers
}
return nil
}
func (x *RegistryConfig) GetAllowMcpServers() []string {
if x != nil {
return x.AllowMcpServers
}
return nil
}
func (x *RegistryConfig) GetMetadata() map[string]*InnerMap {
if x != nil {
return x.Metadata
}
return nil
}
type InnerMap struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
InnerMap map[string]string `protobuf:"bytes,1,rep,name=inner_map,json=innerMap,proto3" json:"inner_map,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (x *InnerMap) Reset() {
*x = InnerMap{}
if protoimpl.UnsafeEnabled {
mi := &file_networking_v1_mcp_bridge_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *InnerMap) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*InnerMap) ProtoMessage() {}
func (x *InnerMap) ProtoReflect() protoreflect.Message {
mi := &file_networking_v1_mcp_bridge_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use InnerMap.ProtoReflect.Descriptor instead.
func (*InnerMap) Descriptor() ([]byte, []int) {
return file_networking_v1_mcp_bridge_proto_rawDescGZIP(), []int{2}
}
func (x *InnerMap) GetInnerMap() map[string]string {
if x != nil {
return x.InnerMap
}
return nil
}
var File_networking_v1_mcp_bridge_proto protoreflect.FileDescriptor var File_networking_v1_mcp_bridge_proto protoreflect.FileDescriptor
var file_networking_v1_mcp_bridge_proto_rawDesc = []byte{ var file_networking_v1_mcp_bridge_proto_rawDesc = []byte{
@@ -338,7 +409,7 @@ var file_networking_v1_mcp_bridge_proto_rawDesc = []byte{
0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x68, 0x69, 0x67, 0x72, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x68, 0x69, 0x67, 0x72, 0x65, 0x73,
0x73, 0x2e, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x73, 0x2e, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e,
0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0a, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0a,
0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x69, 0x65, 0x73, 0x22, 0xfd, 0x06, 0x0a, 0x0e, 0x52, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x69, 0x65, 0x73, 0x22, 0xa8, 0x09, 0x0a, 0x0e, 0x52,
0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x17, 0x0a, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x17, 0x0a,
0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x03, 0xe0, 0x41, 0x02, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x03, 0xe0, 0x41, 0x02,
0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02,
@@ -394,11 +465,39 @@ var file_networking_v1_mcp_bridge_proto_rawDesc = []byte{
0x50, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x18, 0x16, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x50, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x18, 0x16, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x0f, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x0f, 0x65, 0x6e, 0x61, 0x62, 0x6c,
0x65, 0x4d, 0x43, 0x50, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x65, 0x4d, 0x43, 0x50, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x50, 0x0a, 0x15, 0x65, 0x6e,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x6c, 0x69, 0x62, 0x61, 0x62, 0x61, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x6f, 0x70, 0x65, 0x4d, 0x63, 0x70, 0x53, 0x65, 0x72, 0x76,
0x2f, 0x68, 0x69, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6e, 0x65, 0x74, 0x65, 0x72, 0x73, 0x18, 0x17, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x42, 0x6f, 0x6f, 0x6c,
0x6f, 0x33, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x15, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x63, 0x6f,
0x70, 0x65, 0x4d, 0x63, 0x70, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x12, 0x28, 0x0a, 0x0f,
0x61, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x63, 0x70, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x18,
0x18, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x63, 0x70, 0x53,
0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x12, 0x4f, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61,
0x74, 0x61, 0x18, 0x19, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x33, 0x2e, 0x68, 0x69, 0x67, 0x72, 0x65,
0x73, 0x73, 0x2e, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31,
0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e,
0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d,
0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x5c, 0x0a, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64,
0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x35, 0x0a, 0x05, 0x76, 0x61,
0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x68, 0x69, 0x67, 0x72,
0x65, 0x73, 0x73, 0x2e, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x76,
0x31, 0x2e, 0x49, 0x6e, 0x6e, 0x65, 0x72, 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75,
0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x93, 0x01, 0x0a, 0x08, 0x49, 0x6e, 0x6e, 0x65, 0x72, 0x4d,
0x61, 0x70, 0x12, 0x4a, 0x0a, 0x09, 0x69, 0x6e, 0x6e, 0x65, 0x72, 0x5f, 0x6d, 0x61, 0x70, 0x18,
0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2d, 0x2e, 0x68, 0x69, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e,
0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e,
0x6e, 0x65, 0x72, 0x4d, 0x61, 0x70, 0x2e, 0x49, 0x6e, 0x6e, 0x65, 0x72, 0x4d, 0x61, 0x70, 0x45,
0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x69, 0x6e, 0x6e, 0x65, 0x72, 0x4d, 0x61, 0x70, 0x1a, 0x3b,
0x0a, 0x0d, 0x49, 0x6e, 0x6e, 0x65, 0x72, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12,
0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65,
0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09,
0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x2e, 0x5a, 0x2c, 0x67,
0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x6c, 0x69, 0x62, 0x61, 0x62,
0x61, 0x2f, 0x68, 0x69, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6e, 0x65,
0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
} }
var ( var (
@@ -413,20 +512,27 @@ func file_networking_v1_mcp_bridge_proto_rawDescGZIP() []byte {
return file_networking_v1_mcp_bridge_proto_rawDescData return file_networking_v1_mcp_bridge_proto_rawDescData
} }
var file_networking_v1_mcp_bridge_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_networking_v1_mcp_bridge_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_networking_v1_mcp_bridge_proto_goTypes = []interface{}{ var file_networking_v1_mcp_bridge_proto_goTypes = []interface{}{
(*McpBridge)(nil), // 0: higress.networking.v1.McpBridge (*McpBridge)(nil), // 0: higress.networking.v1.McpBridge
(*RegistryConfig)(nil), // 1: higress.networking.v1.RegistryConfig (*RegistryConfig)(nil), // 1: higress.networking.v1.RegistryConfig
(*wrappers.BoolValue)(nil), // 2: google.protobuf.BoolValue (*InnerMap)(nil), // 2: higress.networking.v1.InnerMap
nil, // 3: higress.networking.v1.RegistryConfig.MetadataEntry
nil, // 4: higress.networking.v1.InnerMap.InnerMapEntry
(*wrappers.BoolValue)(nil), // 5: google.protobuf.BoolValue
} }
var file_networking_v1_mcp_bridge_proto_depIdxs = []int32{ var file_networking_v1_mcp_bridge_proto_depIdxs = []int32{
1, // 0: higress.networking.v1.McpBridge.registries:type_name -> higress.networking.v1.RegistryConfig 1, // 0: higress.networking.v1.McpBridge.registries:type_name -> higress.networking.v1.RegistryConfig
2, // 1: higress.networking.v1.RegistryConfig.enableMCPServer:type_name -> google.protobuf.BoolValue 5, // 1: higress.networking.v1.RegistryConfig.enableMCPServer:type_name -> google.protobuf.BoolValue
2, // [2:2] is the sub-list for method output_type 5, // 2: higress.networking.v1.RegistryConfig.enableScopeMcpServers:type_name -> google.protobuf.BoolValue
2, // [2:2] is the sub-list for method input_type 3, // 3: higress.networking.v1.RegistryConfig.metadata:type_name -> higress.networking.v1.RegistryConfig.MetadataEntry
2, // [2:2] is the sub-list for extension type_name 4, // 4: higress.networking.v1.InnerMap.inner_map:type_name -> higress.networking.v1.InnerMap.InnerMapEntry
2, // [2:2] is the sub-list for extension extendee 2, // 5: higress.networking.v1.RegistryConfig.MetadataEntry.value:type_name -> higress.networking.v1.InnerMap
0, // [0:2] is the sub-list for field type_name 6, // [6:6] is the sub-list for method output_type
6, // [6:6] is the sub-list for method input_type
6, // [6:6] is the sub-list for extension type_name
6, // [6:6] is the sub-list for extension extendee
0, // [0:6] is the sub-list for field type_name
} }
func init() { file_networking_v1_mcp_bridge_proto_init() } func init() { file_networking_v1_mcp_bridge_proto_init() }
@@ -459,6 +565,18 @@ func file_networking_v1_mcp_bridge_proto_init() {
return nil return nil
} }
} }
file_networking_v1_mcp_bridge_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*InnerMap); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
} }
type x struct{} type x struct{}
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{
@@ -466,7 +584,7 @@ func file_networking_v1_mcp_bridge_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_networking_v1_mcp_bridge_proto_rawDesc, RawDescriptor: file_networking_v1_mcp_bridge_proto_rawDesc,
NumEnums: 0, NumEnums: 0,
NumMessages: 2, NumMessages: 5,
NumExtensions: 0, NumExtensions: 0,
NumServices: 0, NumServices: 0,
}, },

View File

@@ -71,4 +71,11 @@ message RegistryConfig {
repeated string mcpServerExportDomains = 20; repeated string mcpServerExportDomains = 20;
string mcpServerBaseUrl = 21; string mcpServerBaseUrl = 21;
google.protobuf.BoolValue enableMCPServer = 22; google.protobuf.BoolValue enableMCPServer = 22;
google.protobuf.BoolValue enableScopeMcpServers = 23;
repeated string allowMcpServers = 24;
map<string, InnerMap> metadata = 25;
}
message InnerMap {
map<string, string> inner_map = 1;
} }

View File

@@ -46,3 +46,24 @@ func (in *RegistryConfig) DeepCopy() *RegistryConfig {
func (in *RegistryConfig) DeepCopyInterface() interface{} { func (in *RegistryConfig) DeepCopyInterface() interface{} {
return in.DeepCopy() return in.DeepCopy()
} }
// DeepCopyInto supports using InnerMap within kubernetes types, where deepcopy-gen is used.
func (in *InnerMap) DeepCopyInto(out *InnerMap) {
p := proto.Clone(in).(*InnerMap)
*out = *p
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InnerMap. Required by controller-gen.
func (in *InnerMap) DeepCopy() *InnerMap {
if in == nil {
return nil
}
out := new(InnerMap)
in.DeepCopyInto(out)
return out
}
// DeepCopyInterface is an autogenerated deepcopy function, copying the receiver, creating a new InnerMap. Required by controller-gen.
func (in *InnerMap) DeepCopyInterface() interface{} {
return in.DeepCopy()
}

View File

@@ -28,6 +28,17 @@ func (this *RegistryConfig) UnmarshalJSON(b []byte) error {
return McpBridgeUnmarshaler.Unmarshal(bytes.NewReader(b), this) return McpBridgeUnmarshaler.Unmarshal(bytes.NewReader(b), this)
} }
// MarshalJSON is a custom marshaler for InnerMap
func (this *InnerMap) MarshalJSON() ([]byte, error) {
str, err := McpBridgeMarshaler.MarshalToString(this)
return []byte(str), err
}
// UnmarshalJSON is a custom unmarshaler for InnerMap
func (this *InnerMap) UnmarshalJSON(b []byte) error {
return McpBridgeUnmarshaler.Unmarshal(bytes.NewReader(b), this)
}
var ( var (
McpBridgeMarshaler = &jsonpb.Marshaler{} McpBridgeMarshaler = &jsonpb.Marshaler{}
McpBridgeUnmarshaler = &jsonpb.Unmarshaler{AllowUnknownFields: true} McpBridgeUnmarshaler = &jsonpb.Unmarshaler{AllowUnknownFields: true}

4
go.mod
View File

@@ -31,7 +31,7 @@ require (
github.com/hudl/fargo v1.4.0 github.com/hudl/fargo v1.4.0
github.com/mholt/acmez v1.2.0 github.com/mholt/acmez v1.2.0
github.com/nacos-group/nacos-sdk-go v1.0.8 github.com/nacos-group/nacos-sdk-go v1.0.8
github.com/nacos-group/nacos-sdk-go/v2 v2.1.2 github.com/nacos-group/nacos-sdk-go/v2 v2.3.2
github.com/onsi/gomega v1.27.10 github.com/onsi/gomega v1.27.10
github.com/spf13/cobra v1.8.0 github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
@@ -202,6 +202,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cast v1.5.1 // indirect github.com/spf13/cast v1.5.1 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tetratelabs/wazero v1.7.3 // indirect github.com/tetratelabs/wazero v1.7.3 // indirect
github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect github.com/tidwall/pretty v1.2.0 // indirect
@@ -274,6 +275,5 @@ replace github.com/caddyserver/certmagic => github.com/2456868764/certmagic v1.0
replace ( replace (
github.com/dubbogo/gost => github.com/johnlanni/gost v1.11.23-0.20220713132522-0967a24036c6 github.com/dubbogo/gost => github.com/johnlanni/gost v1.11.23-0.20220713132522-0967a24036c6
github.com/nacos-group/nacos-sdk-go/v2 => github.com/luoxiner/nacos-sdk-go/v2 v2.2.9-60
golang.org/x/exp => golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 golang.org/x/exp => golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
) )

4
go.sum
View File

@@ -1434,8 +1434,6 @@ github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhn
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/luoxiner/nacos-sdk-go/v2 v2.2.9-60 h1:FA/azfz2nSkMc1XR8LeqhcAiA/2/sOMcyBGYCTUc+Cs=
github.com/luoxiner/nacos-sdk-go/v2 v2.2.9-60/go.mod h1:9FKXl6FqOiVmm72i8kADtbeK71egyG9y3uRDBg41tpQ=
github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA=
github.com/lyft/protoc-gen-star/v2 v2.0.1/go.mod h1:RcCdONR2ScXaYnQC5tUzxzlpA3WVYF7/opLeUgcQs/o= github.com/lyft/protoc-gen-star/v2 v2.0.1/go.mod h1:RcCdONR2ScXaYnQC5tUzxzlpA3WVYF7/opLeUgcQs/o=
github.com/lyft/protoc-gen-star/v2 v2.0.3/go.mod h1:amey7yeodaJhXSbf/TlLvWiqQfLOSpEk//mLlc+axEk= github.com/lyft/protoc-gen-star/v2 v2.0.3/go.mod h1:amey7yeodaJhXSbf/TlLvWiqQfLOSpEk//mLlc+axEk=
@@ -1525,6 +1523,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/nacos-group/nacos-sdk-go v1.0.8 h1:8pEm05Cdav9sQgJSv5kyvlgfz0SzFUUGI3pWX6SiSnM= github.com/nacos-group/nacos-sdk-go v1.0.8 h1:8pEm05Cdav9sQgJSv5kyvlgfz0SzFUUGI3pWX6SiSnM=
github.com/nacos-group/nacos-sdk-go v1.0.8/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= github.com/nacos-group/nacos-sdk-go v1.0.8/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA=
github.com/nacos-group/nacos-sdk-go/v2 v2.3.2 h1:9QB2nCJzT5wkTVlxNYl3XL/7+G6p2USMi2gQh/ouQQo=
github.com/nacos-group/nacos-sdk-go/v2 v2.3.2/go.mod h1:9FKXl6FqOiVmm72i8kADtbeK71egyG9y3uRDBg41tpQ=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k=

View File

@@ -802,9 +802,20 @@ func (m *IngressConfig) convertDestinationRule(configs []common.WrapperConfig) [
if !exist { if !exist {
destinationRules[serviceName] = destinationRuleWrapper destinationRules[serviceName] = destinationRuleWrapper
} else if dr.DestinationRule.TrafficPolicy != nil { } else if dr.DestinationRule.TrafficPolicy != nil {
if dr.DestinationRule.TrafficPolicy.LoadBalancer == nil && // if the service is referenced by an sse type mcp server, an source ip based consistent hashing policy needs to be configured
destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer != nil { // consistent hashing policy will be generated by mcp server watcher, then if service do not have LoadBalancer settings, it will be merged
if destinationRuleWrapper.DestinationRule.TrafficPolicy != nil && destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer != nil {
if dr.DestinationRule.TrafficPolicy.LoadBalancer == nil {
dr.DestinationRule.TrafficPolicy.LoadBalancer = destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer dr.DestinationRule.TrafficPolicy.LoadBalancer = destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer
} else if dr.DestinationRule.TrafficPolicy.LoadBalancer.LbPolicy == nil {
dr.DestinationRule.TrafficPolicy.LoadBalancer.LbPolicy = destinationRuleWrapper.DestinationRule.TrafficPolicy.LoadBalancer.LbPolicy
}
}
// if the service is referenced by an https type mcp server, an client side simple mode tls policy needs to be configured
// simple mode tls policy will be generated by mcp server watcher, then if service do not have tls settings, it will be merged
if dr.DestinationRule.TrafficPolicy.Tls == nil && destinationRuleWrapper.DestinationRule.TrafficPolicy != nil &&
destinationRuleWrapper.DestinationRule.TrafficPolicy.Tls != nil {
dr.DestinationRule.TrafficPolicy.Tls = destinationRuleWrapper.DestinationRule.TrafficPolicy.Tls
} }
portTrafficPolicy := destinationRuleWrapper.DestinationRule.TrafficPolicy.PortLevelSettings[0] portTrafficPolicy := destinationRuleWrapper.DestinationRule.TrafficPolicy.PortLevelSettings[0]
portUpdated := false portUpdated := false

View File

@@ -24,12 +24,9 @@ const (
IstioMcpAutoGeneratedHttpRouteName = IstioMcpAutoGeneratedPrefix + "-httproute" IstioMcpAutoGeneratedHttpRouteName = IstioMcpAutoGeneratedPrefix + "-httproute"
IstioMcpAutoGeneratedMcpServerName = IstioMcpAutoGeneratedPrefix + "-mcpserver" IstioMcpAutoGeneratedMcpServerName = IstioMcpAutoGeneratedPrefix + "-mcpserver"
DefaultMcpToolsGroup = "mcp-tools"
DefaultMcpCredentialsGroup = "credentials"
DefaultNacosServiceNamespace = "public"
StdioProtocol = "stdio" StdioProtocol = "stdio"
HttpProtocol = "http" HttpProtocol = "http"
HttpsProtocol = "https"
DubboProtocol = "dubbo" DubboProtocol = "dubbo"
McpSSEProtocol = "mcp-sse" McpSSEProtocol = "mcp-sse"
McpStreambleProtocol = "mcp-streamble" McpStreambleProtocol = "mcp-streamble"
@@ -39,19 +36,19 @@ type McpToolArgsType string
// WasmPluginConfig Struct for mcp tool wasm plugin marshal // WasmPluginConfig Struct for mcp tool wasm plugin marshal
type WasmPluginConfig struct { type WasmPluginConfig struct {
Rules []*McpServerRule `json:"_rules_"` Rules []*McpServerRule `json:"_rules_,omitempty"`
} }
type McpServerRule struct { type McpServerRule struct {
MatchRoute []string `json:"_match_route_,omitempty"` MatchRoute []string `json:"_match_route_,omitempty"`
Server *ServerConfig `json:"server"` Server *ServerConfig `json:"server,omitempty"`
Tools []*McpTool `json:"tools"` Tools []*McpTool `json:"tools,omitempty"`
AllowTools []string `json:"allowTools,omitempty"`
} }
type ServerConfig struct { type ServerConfig struct {
Name string `json:"name,omitempty"` Name string `json:"name,omitempty"`
Config map[string]interface{} `json:"config,omitempty"` Config map[string]interface{} `json:"config,omitempty"`
AllowTools []string `json:"allowTools,omitempty"`
} }
type McpTool struct { type McpTool struct {
@@ -144,13 +141,13 @@ type InputSchema struct {
} }
type ToolsMeta struct { type ToolsMeta struct {
InvokeContext map[string]string `json:"InvokeContext,omitempty"` InvokeContext map[string]string `json:"invokeContext,omitempty"`
Enabled bool `json:"Enabled,omitempty"` Enabled bool `json:"enabled,omitempty"`
Templates map[string]interface{} `json:"Templates,omitempty"` Templates map[string]interface{} `json:"templates,omitempty"`
} }
type JsonGoTemplate struct { type JsonGoTemplate struct {
RequestTemplate RequestTemplate `json:"requestTemplate"` RequestTemplate RequestTemplate `json:"requestTemplate,omitempty"`
ResponseTemplate ResponseTemplate `json:"responseTemplate"` ResponseTemplate ResponseTemplate `json:"responseTemplate,omitempty"`
ArgsPosition map[string]string `json:"argsPosition,omitempty"` ArgsPosition map[string]string `json:"argsPosition,omitempty"`
} }

View File

@@ -0,0 +1,546 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mcpserver
import (
"encoding/json"
"fmt"
"regexp"
"strings"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
)
const McpServerVersionGroup = "mcp-server-versions"
const McpServerSpecGroup = "mcp-server"
const McpToolSpecGroup = "mcp-tools"
const SystemConfigIdPrefix = "system-"
const CredentialPrefix = "credentials-"
const DefaultNacosListConfigMode = "blur"
const ListMcpServerConfigIdPattern = "*mcp-versions.json"
const DefaultNacosListConfigPageSize = 50
type ServerSpecInfo struct {
RemoteServerConfig *RemoteServerConfig `json:"remoteServerConfig"`
}
type RemoteServerConfig struct {
ServiceRef *ServiceRef `json:"serviceRef"`
}
type ServiceRef struct {
ServiceName string `json:"serviceName"`
GroupName string `json:"groupName"`
NamespaceId string `json:"namespaceId"`
}
type NacosRegistryClient struct {
namespaceId string
configClient config_client.IConfigClient
namingClient naming_client.INamingClient
servers map[string]*ServerContext
}
type VersionedMcpServerInfo struct {
serverInfo *BasicMcpServerInfo
version string
}
type ServerContext struct {
id string
versionedMcpServerInfo *VersionedMcpServerInfo
serverChangeListener McpServerListener
configsMap map[string]*ConfigListenerWrap
serviceInfo *model.Service
namingCallback func(services []model.Instance, err error)
}
type McpServerConfig struct {
ServerSpecConfig string
ToolsSpecConfig string
ServiceInfo *model.Service
Credentials map[string]interface{}
}
type ConfigListenerWrap struct {
dataId string
group string
data string
listener func(namespace, group, dataId, data string)
}
type BasicMcpServerInfo struct {
Name string `json:"name"`
Id string `json:"id"`
FrontProtocol string `json:"frontProtocol"`
Protocol string `json:"protocol"`
}
type VersionsMcpServerInfo struct {
BasicMcpServerInfo
LatestPublishedVersion string `json:"latestPublishedVersion"`
Versions []*VersionDetail `json:"versionDetails"`
}
type VersionDetail struct {
Version string `json:"version"`
IsLatest bool `json:"is_latest"`
}
type McpServerListener func(info *McpServerConfig)
func NewMcpRegistryClient(clientConfig *constant.ClientConfig, serverConfig []constant.ServerConfig, namespaceId string) (*NacosRegistryClient, error) {
clientParam := vo.NacosClientParam{
ClientConfig: clientConfig,
ServerConfigs: serverConfig,
}
configClient, err := clients.NewConfigClient(clientParam)
namingClient, err := clients.NewNamingClient(clientParam)
if err != nil {
return nil, err
}
return &NacosRegistryClient{
namespaceId: namespaceId,
configClient: configClient,
namingClient: namingClient,
servers: map[string]*ServerContext{},
}, nil
}
func (n *NacosRegistryClient) listMcpServerConfigs() ([]model.ConfigItem, error) {
currentPageNum := 1
result := make([]model.ConfigItem, 0)
for {
configPage, err := n.configClient.SearchConfig(vo.SearchConfigParam{
Search: DefaultNacosListConfigMode,
DataId: ListMcpServerConfigIdPattern,
Group: McpServerVersionGroup,
PageNo: currentPageNum,
PageSize: DefaultNacosListConfigPageSize,
})
if err != nil {
mcpServerLog.Errorf("List mcp server configs for page size %d, page number %d error %v", currentPageNum, DefaultNacosListConfigPageSize)
}
if configPage == nil {
mcpServerLog.Errorf("List mcp server configs for page size %d, page number %d null %v", currentPageNum, DefaultNacosListConfigPageSize)
continue
}
result = append(result, configPage.PageItems...)
if configPage.PageNumber >= configPage.PagesAvailable {
break
}
currentPageNum += 1
}
return result, nil
}
// ListMcpServer List all mcp server from nacos mcp registry /**
func (n *NacosRegistryClient) ListMcpServer() ([]BasicMcpServerInfo, error) {
configs, err := n.listMcpServerConfigs()
if err != nil {
return nil, err
}
var result []BasicMcpServerInfo
for _, config := range configs {
mcpServerBasicConfig, err := n.configClient.GetConfig(vo.ConfigParam{
Group: McpServerVersionGroup,
DataId: config.DataId,
})
if err != nil {
mcpServerLog.Errorf("Get mcp server version config (dataId: %s) error, %v", config.DataId, err)
continue
}
if mcpServerBasicConfig == "" {
mcpServerLog.Infof("get empty mcp server version config (dataId: %s)", config.DataId)
continue
}
mcpServer := BasicMcpServerInfo{}
err = json.Unmarshal([]byte(mcpServerBasicConfig), &mcpServer)
if err != nil {
mcpServerLog.Errorf("Parse mcp server version config error %v", err)
continue
}
if !isMcpServerShouldBeDiscoveryForGateway(mcpServer) {
mcpServerLog.Infof("mcp server %s don't need to be discovered for gateway, skip it", mcpServerBasicConfig)
continue
}
result = append(result, mcpServer)
}
return result, nil
}
func isMcpServerShouldBeDiscoveryForGateway(info BasicMcpServerInfo) bool {
return "mcp-sse" == info.FrontProtocol || "mcp-streamable" == info.FrontProtocol
}
// ListenToMcpServer Listen to mcp server config and backend service
func (n *NacosRegistryClient) ListenToMcpServer(id string, listener McpServerListener) error {
versionConfigId := fmt.Sprintf("%s-mcp-versions.json", id)
serverVersionConfig, err := n.configClient.GetConfig(vo.ConfigParam{
Group: McpServerVersionGroup,
DataId: versionConfigId,
})
if err != nil {
mcpServerLog.Errorf("Get mcp server(id: %s) version config error, %v", id, err)
} else {
mcpServerLog.Infof("Get mcp server(id: %s) version config success, config is:\n %v", id, serverVersionConfig)
}
versionConfigCallBack := func(namespace string, group string, dataId string, content string) {
mcpServerLog.Infof("Call back to mcp server %s", id)
info := VersionsMcpServerInfo{}
err = json.Unmarshal([]byte(content), &info)
if err != nil {
mcpServerLog.Errorf("Parse mcp server (id: %s) version config callback error, %v", id, err)
return
}
latestVersion := info.LatestPublishedVersion
ctx := n.servers[id]
if ctx.versionedMcpServerInfo == nil {
ctx.versionedMcpServerInfo = &VersionedMcpServerInfo{}
}
ctx.versionedMcpServerInfo.serverInfo = &info.BasicMcpServerInfo
// first time the version is empty so it will trigger the change finally.
if ctx.versionedMcpServerInfo.version != latestVersion {
ctx.versionedMcpServerInfo.version = latestVersion
n.onServerVersionChanged(ctx)
n.triggerMcpServerChange(id)
}
}
n.servers[id] = &ServerContext{
id: id,
serverChangeListener: listener,
configsMap: map[string]*ConfigListenerWrap{
McpServerVersionGroup: {
dataId: versionConfigId,
group: McpServerVersionGroup,
listener: versionConfigCallBack,
},
},
}
// trigger callback manually
versionConfigCallBack(n.namespaceId, McpServerVersionGroup, versionConfigId, serverVersionConfig)
// Listen after get config to avoid multi-callback on same version
err = n.configClient.ListenConfig(vo.ConfigParam{
Group: McpServerVersionGroup,
DataId: versionConfigId,
OnChange: versionConfigCallBack,
})
if err != nil {
return err
}
return nil
}
func (n *NacosRegistryClient) onServerVersionChanged(ctx *ServerContext) {
id := ctx.versionedMcpServerInfo.serverInfo.Id
version := ctx.versionedMcpServerInfo.version
configsMap := map[string]string{
McpServerSpecGroup: fmt.Sprintf("%s-%s-mcp-server.json", id, version),
McpToolSpecGroup: fmt.Sprintf("%s-%s-mcp-tools.json", id, version),
}
for group, dataId := range configsMap {
configsKey := fmt.Sprintf(SystemConfigIdPrefix+"%s@@%s", id, group)
// Only listen to the last version of the server, so we should exist and cancel it first
if data, exist := ctx.configsMap[configsKey]; exist {
err := n.cancelListenToConfig(data)
if err != nil {
mcpServerLog.Errorf("cancel listen to old config %v error %v", dataId, err)
}
}
configListenerWrap, err := n.ListenToConfig(ctx, dataId, group)
if err != nil {
mcpServerLog.Errorf("listen to config %v error %v", dataId, err)
continue
}
ctx.configsMap[configsKey] = configListenerWrap
}
}
func (n *NacosRegistryClient) triggerMcpServerChange(id string) {
if context, exist := n.servers[id]; exist {
if config := mapConfigMapToServerConfig(context); config != nil {
context.serverChangeListener(config)
}
}
}
func mapConfigMapToServerConfig(ctx *ServerContext) *McpServerConfig {
result := &McpServerConfig{
Credentials: map[string]interface{}{},
}
configMaps := ctx.configsMap
for key, data := range configMaps {
if strings.HasPrefix(key, SystemConfigIdPrefix) {
group := strings.Split(key, "@@")[1]
if group == McpServerSpecGroup {
result.ServerSpecConfig = data.data
} else if group == McpToolSpecGroup {
result.ToolsSpecConfig = data.data
}
} else if strings.HasPrefix(key, CredentialPrefix) {
credentialId := strings.ReplaceAll(key, CredentialPrefix, "")
var credData interface{}
if err := json.Unmarshal([]byte(data.data), &credData); err != nil {
mcpServerLog.Errorf("parse credential %v error %v", credentialId, err)
// keep origin data if data is not an object
result.Credentials[credentialId] = data.data
} else {
result.Credentials[credentialId] = credData
}
}
}
result.ServiceInfo = ctx.serviceInfo
return result
}
func (n *NacosRegistryClient) replaceTemplateAndExactConfigsItems(ctx *ServerContext, config *ConfigListenerWrap) map[string]*ConfigListenerWrap {
result := map[string]*ConfigListenerWrap{}
compile := regexp.MustCompile("\\$\\{nacos\\.([a-zA-Z0-9-_:\\\\.]+/[a-zA-Z0-9-_:\\\\.]+)}")
allConfigs := compile.FindAllString(config.data, -1)
allConfigsMap := map[string]string{}
for _, config := range allConfigs {
allConfigsMap[config] = config
}
newContent := config.data
for _, data := range allConfigsMap {
dataIdAndGroup := strings.ReplaceAll(data, "${nacos.", "")
dataIdAndGroup = dataIdAndGroup[0 : len(dataIdAndGroup)-1]
dataIdAndGroupArray := strings.Split(dataIdAndGroup, "/")
dataId := strings.TrimSpace(dataIdAndGroupArray[0])
group := strings.TrimSpace(dataIdAndGroupArray[1])
configWrap, err := n.ListenToConfig(ctx, dataId, group)
if err != nil {
mcpServerLog.Errorf("extract configs %v from content error %v", dataId, err)
continue
}
result[CredentialPrefix+configWrap.group+"_"+configWrap.dataId] = configWrap
newContent = strings.Replace(newContent, data, ".config.credentials."+group+"_"+dataId, -1)
}
config.data = newContent
return result
}
func (n *NacosRegistryClient) resetNacosTemplateConfigs(ctx *ServerContext, config *ConfigListenerWrap) {
newCredentials := n.replaceTemplateAndExactConfigsItems(ctx, config)
// cancel all old config listener
for key, wrap := range ctx.configsMap {
if strings.HasPrefix(key, CredentialPrefix) {
if _, ok := newCredentials[key]; !ok {
err := n.cancelListenToConfig(wrap)
if err != nil {
mcpServerLog.Errorf("cancel listen to old credential listener error %v", err)
continue
}
}
}
}
for _, data := range newCredentials {
ctx.configsMap[CredentialPrefix+data.group+"_"+data.dataId] = data
}
}
func (n *NacosRegistryClient) refreshServiceListenerIfNeeded(ctx *ServerContext, serverConfig string) {
var serverInfo ServerSpecInfo
err := json.Unmarshal([]byte(serverConfig), &serverInfo)
if err != nil {
mcpServerLog.Errorf("parse server config error %v", err)
return
}
if serverInfo.RemoteServerConfig != nil && serverInfo.RemoteServerConfig.ServiceRef != nil {
ref := serverInfo.RemoteServerConfig.ServiceRef
if ctx.serviceInfo != nil {
if ctx.serviceInfo.Name == ref.ServiceName && ctx.serviceInfo.GroupName == ref.GroupName {
return
}
err := n.namingClient.Unsubscribe(&vo.SubscribeParam{
GroupName: ctx.serviceInfo.GroupName,
ServiceName: ctx.serviceInfo.Name,
SubscribeCallback: ctx.namingCallback,
})
if err != nil {
mcpServerLog.Errorf("unsubscribe service error:%v, groupName:%s, serviceName:%s", err, ctx.serviceInfo.GroupName, ctx.serviceInfo.Name)
}
}
service, err := n.namingClient.GetService(vo.GetServiceParam{
GroupName: ref.GroupName,
ServiceName: ref.ServiceName,
})
if err != nil {
mcpServerLog.Errorf("get service error:%v, groupName:%s, serviceName:%s", err, ref.GroupName, ref.ServiceName)
return
}
ctx.serviceInfo = &service
if ctx.namingCallback == nil {
ctx.namingCallback = func(services []model.Instance, err error) {
if ctx.serviceInfo == nil {
ctx.serviceInfo = &model.Service{
GroupName: ctx.serviceInfo.GroupName,
Name: ctx.serviceInfo.Name,
}
}
ctx.serviceInfo.Name = ref.ServiceName
ctx.serviceInfo.GroupName = ref.GroupName
ctx.serviceInfo.Hosts = services
n.triggerMcpServerChange(ctx.id)
}
}
err = n.namingClient.Subscribe(&vo.SubscribeParam{
GroupName: ctx.serviceInfo.GroupName,
ServiceName: ctx.serviceInfo.Name,
SubscribeCallback: ctx.namingCallback,
})
if err != nil {
mcpServerLog.Errorf("subscribe service error:%v, groupName:%s, serviceName:%s", err, ctx.serviceInfo.GroupName, ctx.serviceInfo.Name)
}
}
}
func (n *NacosRegistryClient) ListenToConfig(ctx *ServerContext, dataId string, group string) (*ConfigListenerWrap, error) {
wrap := ConfigListenerWrap{
dataId: dataId,
group: group,
}
configListener := func(namespace, group, dataId, data string) {
if group == McpToolSpecGroup {
n.resetNacosTemplateConfigs(ctx, &wrap)
} else if group == McpServerSpecGroup {
n.refreshServiceListenerIfNeeded(ctx, data)
}
if ctx.serverChangeListener != nil && wrap.data != data {
wrap.data = data
n.triggerMcpServerChange(ctx.versionedMcpServerInfo.serverInfo.Id)
}
}
config, err := n.configClient.GetConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
})
if err != nil {
return nil, err
}
wrap.listener = configListener
wrap.data = config
if group == McpToolSpecGroup {
n.resetNacosTemplateConfigs(ctx, &wrap)
} else if group == McpServerSpecGroup {
n.refreshServiceListenerIfNeeded(ctx, wrap.data)
}
err = n.configClient.ListenConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
OnChange: configListener,
})
if err != nil {
return nil, err
}
return &wrap, nil
}
func (n *NacosRegistryClient) cancelListenToConfig(wrap *ConfigListenerWrap) error {
return n.configClient.CancelListenConfig(vo.ConfigParam{
DataId: wrap.dataId,
Group: wrap.group,
OnChange: wrap.listener,
})
}
func (n *NacosRegistryClient) CancelListenToServer(id string) error {
if server, exist := n.servers[id]; exist && server != nil {
defer delete(n.servers, id)
for _, wrap := range server.configsMap {
if wrap != nil {
err := n.configClient.CancelListenConfig(vo.ConfigParam{
DataId: wrap.dataId,
Group: wrap.group,
OnChange: wrap.listener,
})
if err != nil {
mcpServerLog.Errorf("cancel listen config error:%v, dataId:%s, group:%s", err, wrap.dataId, wrap.group)
continue
}
}
}
if server.serviceInfo != nil {
err := n.namingClient.Unsubscribe(&vo.SubscribeParam{
GroupName: server.serviceInfo.GroupName,
ServiceName: server.serviceInfo.Name,
SubscribeCallback: server.namingCallback,
})
if err != nil {
mcpServerLog.Errorf("unsubscribe service error:%v, groupName:%s, serviceName:%s", err, server.serviceInfo.GroupName, server.serviceInfo.Name)
return err
}
}
}
return nil
}
func (n *NacosRegistryClient) CloseClient() {
n.namingClient.CloseClient()
n.configClient.CloseClient()
}

View File

@@ -1,180 +0,0 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mcpserver
import (
"fmt"
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
)
type MultiConfigListener struct {
configClient config_client.IConfigClient
onChange func(map[string]string)
configCache map[string]string
innerCallback func(string, string, string, string)
}
func NewMultiConfigListener(configClient config_client.IConfigClient, onChange func(map[string]string)) *MultiConfigListener {
result := &MultiConfigListener{
configClient: configClient,
configCache: make(map[string]string),
onChange: onChange,
}
result.innerCallback = func(namespace string, group string, dataId string, content string) {
result.configCache[group+DefaultJoiner+dataId] = content
result.onChange(result.configCache)
}
return result
}
func (l *MultiConfigListener) StartListen(configs []vo.ConfigParam) error {
for _, config := range configs {
content, err := l.configClient.GetConfig(vo.ConfigParam{
DataId: config.DataId,
Group: config.Group,
})
if err != nil {
return fmt.Errorf("get config %s/%s err: %v", config.Group, config.DataId, err)
}
l.configCache[config.Group+DefaultJoiner+config.DataId] = content
err = l.configClient.ListenConfig(vo.ConfigParam{
DataId: config.DataId,
Group: config.Group,
OnChange: l.innerCallback,
})
if err != nil {
return fmt.Errorf("listener to config %s/%s error: %w", config.Group, config.DataId, err)
}
}
l.onChange(l.configCache)
return nil
}
func (l *MultiConfigListener) Stop() {
l.configClient.CloseClient()
}
func (l *MultiConfigListener) CancelListen(configs []vo.ConfigParam) error {
for _, config := range configs {
if _, ok := l.configCache[config.Group+DefaultJoiner+config.DataId]; ok {
err := l.configClient.CancelListenConfig(vo.ConfigParam{
DataId: config.DataId,
Group: config.Group,
})
if err != nil {
return fmt.Errorf("cancel config %s/%s error: %w", config.Group, config.DataId, err)
}
delete(l.configCache, config.Group+config.DataId)
}
}
return nil
}
type ServiceCache struct {
services map[string]*NacosServiceRef
client naming_client.INamingClient
}
type NacosServiceRef struct {
refs map[string]func([]model.Instance)
callback func(services []model.Instance, err error)
instances *[]model.Instance
}
func NewServiceCache(client naming_client.INamingClient) *ServiceCache {
return &ServiceCache{
client: client,
services: make(map[string]*NacosServiceRef),
}
}
func (c *ServiceCache) AddListener(group string, serviceName string, key string, callback func([]model.Instance)) error {
uniqueServiceName := c.makeServiceUniqueName(group, serviceName)
if _, ok := c.services[uniqueServiceName]; !ok {
instances, err := c.client.SelectAllInstances(vo.SelectAllInstancesParam{
GroupName: group,
ServiceName: serviceName,
})
if err != nil {
return err
}
ref := &NacosServiceRef{
refs: map[string]func([]model.Instance){},
instances: &instances,
}
ref.callback = func(services []model.Instance, err error) {
ref.instances = &services
for _, refCallback := range ref.refs {
refCallback(*ref.instances)
}
}
c.services[uniqueServiceName] = ref
err = c.client.Subscribe(&vo.SubscribeParam{
GroupName: group,
ServiceName: serviceName,
SubscribeCallback: ref.callback,
})
if err != nil {
return err
}
}
ref := c.services[uniqueServiceName]
ref.refs[key] = callback
callback(*ref.instances)
return nil
}
func (c *ServiceCache) RemoveListener(group string, serviceName string, key string) error {
if ref, ok := c.services[c.makeServiceUniqueName(group, serviceName)]; ok {
delete(ref.refs, key)
if len(ref.refs) == 0 {
err := c.client.Unsubscribe(&vo.SubscribeParam{
GroupName: group,
ServiceName: serviceName,
SubscribeCallback: ref.callback,
})
delete(c.services, c.makeServiceUniqueName(group, serviceName))
if err != nil {
return err
}
}
}
return nil
}
func (c *ServiceCache) makeServiceUniqueName(group string, serviceName string) string {
return fmt.Sprintf("%s-%s", group, serviceName)
}
func (c *ServiceCache) Stop() {
c.client.CloseClient()
}

View File

@@ -31,11 +31,8 @@ import (
provider "github.com/alibaba/higress/registry" provider "github.com/alibaba/higress/registry"
"github.com/alibaba/higress/registry/memory" "github.com/alibaba/higress/registry/memory"
"github.com/golang/protobuf/ptypes/wrappers" "github.com/golang/protobuf/ptypes/wrappers"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant" "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/model" "github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"go.uber.org/atomic" "go.uber.org/atomic"
"istio.io/api/networking/v1alpha3" "istio.io/api/networking/v1alpha3"
"istio.io/istio/pkg/config" "istio.io/istio/pkg/config"
@@ -55,18 +52,18 @@ const (
DefaultNacosLogMaxAge = 3 DefaultNacosLogMaxAge = 3
DefaultRefreshInterval = time.Second * 30 DefaultRefreshInterval = time.Second * 30
DefaultRefreshIntervalLimit = time.Second * 10 DefaultRefreshIntervalLimit = time.Second * 10
DefaultFetchPageSize = 50
DefaultJoiner = "@@"
) )
var ( var (
supportedProtocols = map[string]bool{ supportedProtocols = map[string]bool{
provider.HttpProtocol: true, provider.HttpProtocol: true,
provider.HttpsProtocol: true,
provider.McpSSEProtocol: true, provider.McpSSEProtocol: true,
provider.McpStreambleProtocol: true, provider.McpStreambleProtocol: true,
} }
protocolUpstreamTypeMapping = map[string]string{ protocolUpstreamTypeMapping = map[string]string{
provider.HttpProtocol: mcpserver.UpstreamTypeRest, provider.HttpProtocol: mcpserver.UpstreamTypeRest,
provider.HttpsProtocol: mcpserver.UpstreamTypeRest,
provider.McpSSEProtocol: mcpserver.UpstreamTypeSSE, provider.McpSSEProtocol: mcpserver.UpstreamTypeSSE,
provider.McpStreambleProtocol: mcpserver.UpstreamTypeStreamable, provider.McpStreambleProtocol: mcpserver.UpstreamTypeStreamable,
} }
@@ -85,23 +82,14 @@ type watcher struct {
provider.BaseWatcher provider.BaseWatcher
apiv1.RegistryConfig apiv1.RegistryConfig
watchingConfig map[string]bool watchingConfig map[string]bool
watchingConfigRefs map[string]sets.Set[string]
configToConfigListener map[string]*MultiConfigListener
serviceCache map[string]*ServiceCache
configToService map[string]string
credentialKeyToName map[string]map[string]string
RegistryType provider.ServiceRegistryType RegistryType provider.ServiceRegistryType
Status provider.WatcherStatus Status provider.WatcherStatus
configClient config_client.IConfigClient registryClient *NacosRegistryClient
serverConfig []constant.ServerConfig
cache memory.Cache cache memory.Cache
mutex *sync.Mutex mutex *sync.Mutex
subMutex *sync.Mutex
callbackMutex *sync.Mutex
stop chan struct{} stop chan struct{}
isStop bool isStop bool
updateCacheWhenEmpty bool updateCacheWhenEmpty bool
nacosClientConfig *constant.ClientConfig
namespace string namespace string
clusterId string clusterId string
authOption provider.AuthOption authOption provider.AuthOption
@@ -112,17 +100,10 @@ type WatcherOption func(w *watcher)
func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) { func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) {
w := &watcher{ w := &watcher{
watchingConfig: make(map[string]bool), watchingConfig: make(map[string]bool),
configToService: make(map[string]string),
watchingConfigRefs: make(map[string]sets.Set[string]),
configToConfigListener: make(map[string]*MultiConfigListener),
credentialKeyToName: make(map[string]map[string]string),
serviceCache: map[string]*ServiceCache{},
RegistryType: "nacos3", RegistryType: "nacos3",
Status: provider.UnHealthy, Status: provider.UnHealthy,
cache: cache, cache: cache,
mutex: &sync.Mutex{}, mutex: &sync.Mutex{},
subMutex: &sync.Mutex{},
callbackMutex: &sync.Mutex{},
stop: make(chan struct{}), stop: make(chan struct{}),
} }
@@ -132,14 +113,14 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er
opt(w) opt(w)
} }
// The nacos mcp server uses these restricted namespaces and groups, and may be adjusted in the future. if w.NacosNamespace == "" {
w.NacosNamespace = "nacos-default-mcp" w.NacosNamespace = w.NacosNamespaceId
w.NacosNamespaceId = w.NacosNamespace }
w.NacosGroups = []string{"mcp-server"} w.NacosGroups = []string{"mcp-server"}
mcpServerLog.Infof("new nacos mcp server watcher with config Name:%s", w.Name) mcpServerLog.Infof("new nacos mcp server watcher with config Name:%s", w.Name)
w.nacosClientConfig = constant.NewClientConfig( clientConfig := constant.NewClientConfig(
constant.WithTimeoutMs(DefaultNacosTimeout), constant.WithTimeoutMs(DefaultNacosTimeout),
constant.WithLogLevel(DefaultNacosLogLevel), constant.WithLogLevel(DefaultNacosLogLevel),
constant.WithLogDir(DefaultNacosLogDir), constant.WithLogDir(DefaultNacosLogDir),
@@ -157,21 +138,18 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er
) )
initTimer := time.NewTimer(DefaultInitTimeout) initTimer := time.NewTimer(DefaultInitTimeout)
w.serverConfig = []constant.ServerConfig{ serverConfig := []constant.ServerConfig{
*constant.NewServerConfig(w.Domain, uint64(w.Port)), *constant.NewServerConfig(w.Domain, uint64(w.Port)),
} }
success := make(chan struct{}) success := make(chan struct{})
go func() { go func() {
configClient, err := clients.NewConfigClient(vo.NacosClientParam{ client, err := NewMcpRegistryClient(clientConfig, serverConfig, w.NacosNamespaceId)
ClientConfig: w.nacosClientConfig,
ServerConfigs: w.serverConfig,
})
if err == nil { if err == nil {
w.configClient = configClient w.registryClient = client
close(success) close(success)
} else { } else {
mcpServerLog.Errorf("can not create naming client, err:%v", err) mcpServerLog.Errorf("can not create registry client, err:%v", err)
} }
}() }()
@@ -183,6 +161,28 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er
} }
} }
func WithNacosNamespaceId(nacosNamespaceId string) WatcherOption {
return func(w *watcher) {
if nacosNamespaceId == "" {
w.NacosNamespaceId = "public"
} else {
w.NacosNamespaceId = nacosNamespaceId
}
}
}
func WithNacosNamespace(nacosNamespace string) WatcherOption {
return func(w *watcher) {
w.NacosNamespace = nacosNamespace
}
}
func WithNacosGroups(nacosGroups []string) WatcherOption {
return func(w *watcher) {
w.NacosGroups = nacosGroups
}
}
func WithNacosAddressServer(nacosAddressServer string) WatcherOption { func WithNacosAddressServer(nacosAddressServer string) WatcherOption {
return func(w *watcher) { return func(w *watcher) {
w.NacosAddressServer = nacosAddressServer w.NacosAddressServer = nacosAddressServer
@@ -302,229 +302,121 @@ func (w *watcher) fetchAllMcpConfig() error {
if w.isStop { if w.isStop {
return nil return nil
} }
fetchedConfigs := make(map[string]bool)
var tries int mcpConfigs, err := w.registryClient.ListMcpServer()
isV3 := true
if w.EnableMCPServer != nil {
isV3 = w.EnableMCPServer.GetValue()
}
for _, groupName := range w.NacosGroups {
for page := 1; ; page++ {
ss, err := w.configClient.SearchConfig(vo.SearchConfigParam{
Group: groupName,
Search: "blur",
PageNo: page,
PageSize: DefaultFetchPageSize,
IsV3: isV3,
})
if err != nil { if err != nil {
if tries > 10 { return fmt.Errorf("list mcp server failed ,error %s", err.Error())
return err
}
mcpServerLog.Errorf("fetch nacos config list failed, err:%v, pageNo:%d", err, page)
page--
tries++
continue
}
for _, item := range ss.PageItems {
fetchedConfigs[groupName+DefaultJoiner+item.DataId] = true
}
if len(ss.PageItems) < DefaultFetchPageSize {
break
}
} }
fetchedConfigs := map[string]bool{}
for _, c := range mcpConfigs {
fetchedConfigs[c.Id] = true
} }
for key := range w.watchingConfig { for key := range w.watchingConfig {
if _, exist := fetchedConfigs[key]; !exist { if _, exist := fetchedConfigs[key]; !exist {
s := strings.Split(key, DefaultJoiner) if err = w.registryClient.CancelListenToServer(key); err != nil {
err := w.unsubscribe(s[0], s[1]) return fmt.Errorf("cancel listen mcp server config %s failed, error %s", key, err.Error())
if err != nil {
return err
} }
mcpServerLog.Infof("cancel listen mcp server config %s success", key)
delete(w.watchingConfig, key) delete(w.watchingConfig, key)
// clean cache for this config
w.cache.UpdateConfigCache(config.GroupVersionKind{}, key, nil, true)
w.UpdateService()
} }
} }
wg := sync.WaitGroup{}
subscribeFailed := atomic.NewBool(false) subscribeFailed := atomic.NewBool(false)
watchingKeys := make(chan string, len(fetchedConfigs))
for key := range fetchedConfigs { for key := range fetchedConfigs {
s := strings.Split(key, DefaultJoiner)
if _, exist := w.watchingConfig[key]; !exist { if _, exist := w.watchingConfig[key]; !exist {
wg.Add(1) err = w.registryClient.ListenToMcpServer(key, w.mcpServerListener(key))
go func(k string) {
err := w.subscribe(s[0], s[1])
if err != nil { if err != nil {
mcpServerLog.Errorf("subscribe mcp server failed, dataId %v, errors: %v", key, err)
subscribeFailed.Store(true) subscribeFailed.Store(true)
mcpServerLog.Errorf("subscribe failed, group: %v, service: %v, errors: %v", s[0], s[1], err)
} else { } else {
watchingKeys <- k mcpServerLog.Infof("subscribe mcp server success, dataId:%s", key)
}
wg.Done()
}(key)
}
}
wg.Wait()
close(watchingKeys)
for key := range watchingKeys {
w.watchingConfig[key] = true w.watchingConfig[key] = true
} }
}
}
if subscribeFailed.Load() { if subscribeFailed.Load() {
return errors.New("subscribe services failed") return errors.New("subscribe services failed")
} }
return nil return nil
} }
func (w *watcher) unsubscribe(groupName string, dataId string) error { func (w *watcher) mcpServerListener(dataId string) func(info *McpServerConfig) {
mcpServerLog.Infof("unsubscribe mcp server, groupName:%s, dataId:%s", groupName, dataId) return func(info *McpServerConfig) {
defer w.UpdateService() defer w.UpdateService()
err := w.configClient.CancelListenConfig(vo.ConfigParam{ mcpServerLog.Infof("mcp server config callback, dataId %s", dataId)
DataId: dataId,
Group: groupName,
})
if err != nil {
mcpServerLog.Errorf("unsubscribe mcp server error:%v, groupName:%s, dataId:%s", err, groupName, dataId)
return err
}
key := strings.Join([]string{w.Name, w.NacosNamespace, groupName, dataId}, DefaultJoiner)
w.configToConfigListener[key].Stop()
delete(w.watchingConfigRefs, key)
delete(w.configToConfigListener, key)
// remove service for this config
configKey := strings.Join([]string{groupName, dataId}, DefaultJoiner)
svcInfo := w.configToService[configKey]
split := strings.Split(svcInfo, DefaultJoiner)
svcNamespace := split[0]
svcGroup := split[1]
svcName := split[2]
if w.serviceCache[svcNamespace] != nil {
err = w.serviceCache[svcNamespace].RemoveListener(svcGroup, svcName, configKey)
if err != nil {
mcpServerLog.Errorf("remove service listener error:%v, groupName:%s, dataId:%s", err, groupName, dataId)
}
}
delete(w.configToService, configKey)
w.cache.UpdateConfigCache(config.GroupVersionKind{}, key, nil, true)
return nil
}
func (w *watcher) subscribe(groupName string, dataId string) error {
mcpServerLog.Infof("subscribe mcp server, groupName:%s, dataId:%s", groupName, dataId)
// first we get this config and callback manually
content, err := w.configClient.GetConfig(vo.ConfigParam{
DataId: dataId,
Group: groupName,
})
if err != nil {
mcpServerLog.Errorf("get config %s/%s err: %v", groupName, dataId, err)
} else {
w.getConfigCallback(w.NacosNamespace, groupName, dataId, content)
}
// second, we set callback for this config
err = w.configClient.ListenConfig(vo.ConfigParam{
DataId: dataId,
Group: groupName,
OnChange: w.getConfigCallback,
})
if err != nil {
mcpServerLog.Errorf("subscribe mcp server error:%v, groupName:%s, dataId:%s", err, groupName, dataId)
return err
}
return nil
}
func (w *watcher) getConfigCallback(namespace, group, dataId, data string) {
mcpServerLog.Infof("get config callback, namespace:%s, groupName:%s, dataId:%s", namespace, group, dataId)
if data == "" {
return
}
key := strings.Join([]string{w.Name, w.NacosNamespace, group, dataId}, DefaultJoiner)
routeName := fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, group, strings.TrimSuffix(dataId, ".json"))
mcpServer := &provider.McpServer{} mcpServer := &provider.McpServer{}
if err := json.Unmarshal([]byte(data), mcpServer); err != nil { if err := json.Unmarshal([]byte(info.ServerSpecConfig), mcpServer); err != nil {
mcpServerLog.Errorf("Unmarshal config data to mcp server error:%v, namespace:%s, groupName:%s, dataId:%s", err, namespace, group, dataId) mcpServerLog.Errorf("unmarshal config data to mcp server error:%v, dataId:%s", err, dataId)
return
} }
// TODO support stdio and dubbo protocol
if !supportedProtocols[mcpServer.Protocol] { if !supportedProtocols[mcpServer.Protocol] {
return return
} }
// process mcp service if err := w.processServerConfig(dataId, info.ServiceInfo, mcpServer); err != nil {
w.subMutex.Lock() mcpServerLog.Errorf("process mcp server config error:%v, dataId:%s", err, dataId)
defer w.subMutex.Unlock()
if err := w.buildServiceEntryForMcpServer(mcpServer, group, dataId); err != nil {
mcpServerLog.Errorf("build service entry for mcp server failed, namespace %v, group: %v, dataId %v, errors: %v", namespace, group, dataId, err)
} }
// process mcp wasm if err := w.processToolConfig(dataId, info.ToolsSpecConfig, info.Credentials, mcpServer); err != nil {
// only generate wasm plugin for http protocol mcp server mcpServerLog.Errorf("process tool config error:%v, dataId:%s", err, dataId)
if mcpServer.Protocol != provider.HttpProtocol {
return
}
if _, exist := w.configToConfigListener[key]; !exist {
w.configToConfigListener[key] = NewMultiConfigListener(w.configClient, w.multiCallback(mcpServer, routeName, key))
}
if _, exist := w.watchingConfigRefs[key]; !exist {
w.watchingConfigRefs[key] = sets.New[string]()
}
listener := w.configToConfigListener[key]
curRef := sets.Set[string]{}
// add description ref
curRef.Insert(strings.Join([]string{provider.DefaultMcpToolsGroup, mcpServer.ToolsDescriptionRef}, DefaultJoiner))
// add credential ref
credentialNameMap := map[string]string{}
for name, ref := range mcpServer.Credentials {
credKey := strings.Join([]string{provider.DefaultMcpCredentialsGroup, ref.Ref}, DefaultJoiner)
curRef.Insert(credKey)
credentialNameMap[credKey] = name
}
w.callbackMutex.Lock()
w.credentialKeyToName[key] = credentialNameMap
w.callbackMutex.Unlock()
toBeAdd := curRef.Difference(w.watchingConfigRefs[key])
toBeDelete := w.watchingConfigRefs[key].Difference(curRef)
var toBeListen, toBeUnListen []vo.ConfigParam
for item, _ := range toBeAdd {
split := strings.Split(item, DefaultJoiner)
toBeListen = append(toBeListen, vo.ConfigParam{
Group: split[0],
DataId: split[1],
})
}
for item, _ := range toBeDelete {
split := strings.Split(item, DefaultJoiner)
toBeUnListen = append(toBeUnListen, vo.ConfigParam{
Group: split[0],
DataId: split[1],
})
}
// listen description and credential config
if len(toBeListen) > 0 {
if err := listener.StartListen(toBeListen); err != nil {
mcpServerLog.Errorf("listen config ref failed, group: %v, dataId %v, errors: %v", group, dataId, err)
}
}
// cancel listen description and credential config
if len(toBeUnListen) > 0 {
if err := listener.CancelListen(toBeUnListen); err != nil {
mcpServerLog.Errorf("cancel listen config ref failed, group: %v, dataId %v, errors: %v", group, dataId, err)
} }
} }
} }
func (w *watcher) multiCallback(server *provider.McpServer, routeName, configKey string) func(map[string]string) { func (w *watcher) processServerConfig(dataId string, services *model.Service, mcpServer *provider.McpServer) error {
callback := func(configs map[string]string) { serviceHost := getServiceFullHostFromMcpServer(mcpServer)
defer w.UpdateService() // generate se for mcp server
serviceEntry := generateServiceEntry(serviceHost, services)
if serviceEntry != nil {
se := &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.ServiceEntry,
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedSeName, strings.TrimSuffix(dataId, ".json")),
Namespace: w.namespace,
},
Spec: serviceEntry,
}
w.cache.UpdateConfigCache(gvk.ServiceEntry, dataId, se, false)
}
// generate vs for mcp server
virtualService := w.buildVirtualServiceForMcpServer(mcpServer, dataId, serviceHost, serviceEntry)
if virtualService != nil {
w.cache.UpdateConfigCache(gvk.VirtualService, dataId, virtualService, false)
ms := w.buildMcpServerForMcpServer(virtualService.Spec.(*v1alpha3.VirtualService), dataId, mcpServer)
w.cache.UpdateConfigCache(mcpserver.GvkMcpServer, dataId, ms, false)
}
// if protocol is sse, we should apply ConsistentHash policy for this service
// if protocol is https, we should apply tls policy for this service
destinationRule := generateDrForMcpServer(serviceHost, mcpServer.Protocol)
if destinationRule != nil {
dr := &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.DestinationRule,
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedDrName, strings.TrimSuffix(dataId, ".json")),
Namespace: w.namespace,
},
Spec: destinationRule,
}
w.cache.UpdateConfigCache(gvk.DestinationRule, dataId, dr, false)
}
return nil
}
mcpServerLog.Infof("callback, ref config changed: %s", configKey) func (w *watcher) processToolConfig(dataId, data string, credentials map[string]interface{}, server *provider.McpServer) error {
if server.Protocol != provider.HttpProtocol && server.Protocol != provider.HttpsProtocol {
return nil
}
toolsDescription := &provider.McpToolConfig{}
if err := json.Unmarshal([]byte(data), toolsDescription); err != nil {
return fmt.Errorf("unmarshal toolsDescriptionRef to mcp tool config error:%v, data %v", err, data)
}
routeName := fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, strings.TrimSuffix(dataId, ".json"))
rule := &provider.McpServerRule{ rule := &provider.McpServerRule{
MatchRoute: []string{routeName}, MatchRoute: []string{routeName},
Server: &provider.ServerConfig{ Server: &provider.ServerConfig{
@@ -532,35 +424,9 @@ func (w *watcher) multiCallback(server *provider.McpServer, routeName, configKey
Config: map[string]interface{}{}, Config: map[string]interface{}{},
}, },
} }
rule.Server.Config["credentials"] = credentials
// process mcp credential
credentialConfig := map[string]interface{}{}
for key, data := range configs {
if strings.HasPrefix(key, provider.DefaultMcpToolsGroup) {
// skip mcp tool description
continue
}
var cred interface{}
if err := json.Unmarshal([]byte(data), &cred); err != nil {
mcpServerLog.Errorf("unmarshal credential data %v to map error:%v", key, err)
}
w.callbackMutex.Lock()
name := w.credentialKeyToName[configKey][key]
w.callbackMutex.Unlock()
credentialConfig[name] = cred
}
rule.Server.Config["credentials"] = credentialConfig
// process mcp tool description
var allowTools []string var allowTools []string
for key, toolData := range configs {
if strings.HasPrefix(key, provider.DefaultMcpCredentialsGroup) {
// skip mcp credentials
continue
}
toolsDescription := &provider.McpToolConfig{}
if err := json.Unmarshal([]byte(toolData), toolsDescription); err != nil {
mcpServerLog.Errorf("unmarshal toolsDescriptionRef to mcp tool config error:%v", err)
}
for _, t := range toolsDescription.Tools { for _, t := range toolsDescription.Tools {
convertTool := &provider.McpTool{Name: t.Name, Description: t.Description} convertTool := &provider.McpTool{Name: t.Name, Description: t.Description}
@@ -596,6 +462,7 @@ func (w *watcher) multiCallback(server *provider.McpServer, routeName, configKey
requestTemplate, err := getRequestTemplateFromToolMeta(toolMeta) requestTemplate, err := getRequestTemplateFromToolMeta(toolMeta)
if err != nil { if err != nil {
mcpServerLog.Errorf("get request template from tool meta error:%v, tool name %v", err, t.Name) mcpServerLog.Errorf("get request template from tool meta error:%v, tool name %v", err, t.Name)
continue
} else { } else {
convertTool.RequestTemplate = requestTemplate convertTool.RequestTemplate = requestTemplate
} }
@@ -603,14 +470,14 @@ func (w *watcher) multiCallback(server *provider.McpServer, routeName, configKey
responseTemplate, err := getResponseTemplateFromToolMeta(toolMeta) responseTemplate, err := getResponseTemplateFromToolMeta(toolMeta)
if err != nil { if err != nil {
mcpServerLog.Errorf("get response template from tool meta error:%v, tool name %v", err, t.Name) mcpServerLog.Errorf("get response template from tool meta error:%v, tool name %v", err, t.Name)
continue
} else { } else {
convertTool.ResponseTemplate = responseTemplate convertTool.ResponseTemplate = responseTemplate
} }
rule.Tools = append(rule.Tools, convertTool) rule.Tools = append(rule.Tools, convertTool)
} }
}
rule.Server.AllowTools = allowTools rule.AllowTools = allowTools
wasmPluginConfig := &config.Config{ wasmPluginConfig := &config.Config{
Meta: config.Meta{ Meta: config.Meta{
GroupVersionKind: gvk.WasmPlugin, GroupVersionKind: gvk.WasmPlugin,
@@ -618,131 +485,20 @@ func (w *watcher) multiCallback(server *provider.McpServer, routeName, configKey
}, },
Spec: rule, Spec: rule,
} }
w.cache.UpdateConfigCache(gvk.WasmPlugin, configKey, wasmPluginConfig, false) w.cache.UpdateConfigCache(gvk.WasmPlugin, dataId, wasmPluginConfig, false)
}
return callback
}
func (w *watcher) buildServiceEntryForMcpServer(mcpServer *provider.McpServer, configGroup, dataId string) error {
if mcpServer == nil || mcpServer.RemoteServerConfig == nil || mcpServer.RemoteServerConfig.ServiceRef == nil {
return nil
}
mcpServerLog.Debugf("ServiceRef %v for %v", mcpServer.RemoteServerConfig.ServiceRef, dataId)
configKey := strings.Join([]string{configGroup, dataId}, DefaultJoiner)
serviceGroup := mcpServer.RemoteServerConfig.ServiceRef.GroupName
serviceNamespace := mcpServer.RemoteServerConfig.ServiceRef.NamespaceId
serviceName := mcpServer.RemoteServerConfig.ServiceRef.ServiceName
if serviceNamespace == "" {
serviceNamespace = provider.DefaultNacosServiceNamespace
}
// update config to service and unsubscribe old service
curSvcKey := strings.Join([]string{serviceNamespace, serviceGroup, serviceName}, DefaultJoiner)
if svcKey, exist := w.configToService[configKey]; exist && svcKey != curSvcKey {
split := strings.Split(svcKey, DefaultJoiner)
if svcCache, has := w.serviceCache[split[0]]; has {
if err := svcCache.RemoveListener(split[1], split[2], configKey); err != nil {
mcpServerLog.Errorf("remove listener error:%v", err)
}
}
}
w.configToService[configKey] = curSvcKey
if _, exist := w.serviceCache[serviceNamespace]; !exist {
namingConfig := constant.NewClientConfig(
constant.WithTimeoutMs(DefaultNacosTimeout),
constant.WithLogLevel(DefaultNacosLogLevel),
constant.WithLogDir(DefaultNacosLogDir),
constant.WithCacheDir(DefaultNacosCacheDir),
constant.WithNotLoadCacheAtStart(DefaultNacosNotLoadCache),
constant.WithLogRollingConfig(&constant.ClientLogRollingConfig{
MaxAge: DefaultNacosLogMaxAge,
}),
constant.WithUpdateCacheWhenEmpty(w.updateCacheWhenEmpty),
constant.WithNamespaceId(serviceNamespace),
constant.WithAccessKey(w.NacosAccessKey),
constant.WithSecretKey(w.NacosSecretKey),
constant.WithUsername(w.authOption.NacosUsername),
constant.WithPassword(w.authOption.NacosPassword),
)
client, err := clients.NewNamingClient(vo.NacosClientParam{
ClientConfig: namingConfig,
ServerConfigs: w.serverConfig,
})
if err == nil {
w.serviceCache[serviceNamespace] = NewServiceCache(client)
} else {
return fmt.Errorf("can not create naming client err:%v", err)
}
}
svcCache := w.serviceCache[serviceNamespace]
err := svcCache.AddListener(serviceGroup, serviceName, configKey, w.getServiceCallback(mcpServer, configGroup, dataId))
if err != nil {
return fmt.Errorf("add listener for dataId %v, service %s/%s error:%v", dataId, serviceGroup, serviceName, err)
}
return nil return nil
} }
func (w *watcher) getServiceCallback(server *provider.McpServer, configGroup, dataId string) func(services []model.Instance) { func (w *watcher) buildVirtualServiceForMcpServer(server *provider.McpServer, dataId, serviceName string, se *v1alpha3.ServiceEntry) *config.Config {
groupName := server.RemoteServerConfig.ServiceRef.GroupName if server == nil {
if groupName == "DEFAULT_GROUP" {
groupName = "DEFAULT-GROUP"
}
namespace := server.RemoteServerConfig.ServiceRef.NamespaceId
serviceName := server.RemoteServerConfig.ServiceRef.ServiceName
// Higress doesn't care about the MCP export path configured in nacos.
// Any path of the mcp server are supported in request routing.
path := "/"
protocol := server.Protocol
host := getNacosServiceFullHost(groupName, namespace, serviceName)
return func(services []model.Instance) {
defer w.UpdateService()
mcpServerLog.Infof("callback for %s/%s, serviceName : %s", configGroup, dataId, host)
configKey := strings.Join([]string{w.Name, w.NacosNamespace, configGroup, dataId}, DefaultJoiner)
if len(services) == 0 {
mcpServerLog.Errorf("callback for %s return empty service instance list, skip generate config", host)
return
}
serviceEntry := w.generateServiceEntry(host, services)
se := &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.ServiceEntry,
Name: fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedSeName, configGroup, strings.TrimSuffix(dataId, ".json")),
Namespace: "mcp",
},
Spec: serviceEntry,
}
if protocol == provider.McpSSEProtocol {
destinationRule := w.generateDrForSSEService(host)
dr := &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.DestinationRule,
Name: fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedDrName, configGroup, strings.TrimSuffix(dataId, ".json")),
Namespace: w.namespace,
},
Spec: destinationRule,
}
w.cache.UpdateConfigCache(gvk.DestinationRule, configKey, dr, false)
}
w.cache.UpdateConfigCache(gvk.ServiceEntry, configKey, se, false)
vs := w.buildVirtualServiceForMcpServer(serviceEntry, configGroup, dataId, path, server)
w.cache.UpdateConfigCache(gvk.VirtualService, configKey, vs, false)
mcpServer := w.buildMcpServerForMcpServer(vs.Spec.(*v1alpha3.VirtualService), configGroup, dataId, path, server)
w.cache.UpdateConfigCache(mcpserver.GvkMcpServer, configKey, mcpServer, false)
}
}
func (w *watcher) buildVirtualServiceForMcpServer(serviceentry *v1alpha3.ServiceEntry, group, dataId, path string, server *provider.McpServer) *config.Config {
if serviceentry == nil {
return nil return nil
} }
// if there is no export domain, use default *
hosts := w.McpServerExportDomains hosts := w.McpServerExportDomains
if len(hosts) == 0 { if len(hosts) == 0 {
hosts = []string{"*"} hosts = []string{"*"}
} }
// find gateway resources by host
var gateways []string var gateways []string
for _, host := range hosts { for _, host := range hosts {
cleanHost := common2.CleanHost(host) cleanHost := common2.CleanHost(host)
@@ -751,15 +507,12 @@ func (w *watcher) buildVirtualServiceForMcpServer(serviceentry *v1alpha3.Service
common2.CreateConvertedName(w.clusterId, cleanHost), common2.CreateConvertedName(w.clusterId, cleanHost),
common2.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost)) common2.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost))
} }
routeName := fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, group, strings.TrimSuffix(dataId, ".json")) routeName := fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, strings.TrimSuffix(dataId, ".json"))
// path format: /{base-path}/{mcp-server-name}
mergePath := "/" + server.Name mergePath := "/" + server.Name
if w.McpServerBaseUrl != "/" { if w.McpServerBaseUrl != "" && w.McpServerBaseUrl != "/" {
mergePath = strings.TrimSuffix(w.McpServerBaseUrl, "/") + mergePath mergePath = strings.TrimSuffix(w.McpServerBaseUrl, "/") + mergePath
} }
if path != "/" {
mergePath = mergePath + "/" + strings.TrimPrefix(path, "/")
}
mergePath = strings.TrimSuffix(mergePath, "/")
vs := &v1alpha3.VirtualService{ vs := &v1alpha3.VirtualService{
Hosts: hosts, Hosts: hosts,
@@ -771,49 +524,60 @@ func (w *watcher) buildVirtualServiceForMcpServer(serviceentry *v1alpha3.Service
// Example: // Example:
// Assume mergePath=/mcp/test prefixRewrite=/ requestPath=/mcp/test/abc // Assume mergePath=/mcp/test prefixRewrite=/ requestPath=/mcp/test/abc
// If we only use prefix match, the rewritten path will be //abc. // If we only use prefix match, the rewritten path will be //abc.
Match: []*v1alpha3.HTTPMatchRequest{{ Match: []*v1alpha3.HTTPMatchRequest{
{
Uri: &v1alpha3.StringMatch{ Uri: &v1alpha3.StringMatch{
MatchType: &v1alpha3.StringMatch_Exact{ MatchType: &v1alpha3.StringMatch_Exact{
Exact: mergePath, Exact: mergePath,
}, },
}, },
}, { },
{
Uri: &v1alpha3.StringMatch{ Uri: &v1alpha3.StringMatch{
MatchType: &v1alpha3.StringMatch_Prefix{ MatchType: &v1alpha3.StringMatch_Prefix{
Prefix: mergePath + "/", Prefix: mergePath + "/",
}, },
}, },
}}, },
},
Route: []*v1alpha3.HTTPRouteDestination{{ Route: []*v1alpha3.HTTPRouteDestination{{
Destination: &v1alpha3.Destination{ Destination: &v1alpha3.Destination{
Host: serviceentry.Hosts[0], Host: serviceName,
Port: &v1alpha3.PortSelector{
Number: serviceentry.Ports[0].Number,
},
}, },
}}, }},
}}, }},
} }
// we should rewrite path for sse and streamble
if routeRewriteProtocols[server.Protocol] { if routeRewriteProtocols[server.Protocol] {
vs.Http[0].Rewrite = &v1alpha3.HTTPRewrite{ vs.Http[0].Rewrite = &v1alpha3.HTTPRewrite{
Uri: "/", Uri: "/",
} }
} }
// we should rewrite host for dns service
if se != nil && se.Resolution == v1alpha3.ServiceEntry_DNS && len(se.Endpoints) > 0 {
if vs.Http[0].Rewrite == nil {
vs.Http[0].Rewrite = &v1alpha3.HTTPRewrite{
Authority: se.Endpoints[0].Address,
}
} else {
vs.Http[0].Rewrite.Authority = se.Endpoints[0].Address
}
}
mcpServerLog.Debugf("construct virtualservice %v", vs) mcpServerLog.Debugf("construct virtualservice %v", vs)
return &config.Config{ return &config.Config{
Meta: config.Meta{ Meta: config.Meta{
GroupVersionKind: gvk.VirtualService, GroupVersionKind: gvk.VirtualService,
Name: fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedVsName, group, dataId), Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedVsName, dataId),
Namespace: w.namespace, Namespace: w.namespace,
}, },
Spec: vs, Spec: vs,
} }
} }
func (w *watcher) buildMcpServerForMcpServer(vs *v1alpha3.VirtualService, group, dataId, path string, server *provider.McpServer) *config.Config { func (w *watcher) buildMcpServerForMcpServer(vs *v1alpha3.VirtualService, dataId string, server *provider.McpServer) *config.Config {
if vs == nil { if vs == nil {
return nil return nil
} }
@@ -821,7 +585,7 @@ func (w *watcher) buildMcpServerForMcpServer(vs *v1alpha3.VirtualService, group,
if len(domains) == 0 { if len(domains) == 0 {
domains = []string{"*"} domains = []string{"*"}
} }
name := fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedMcpServerName, group, strings.TrimSuffix(dataId, ".json")) name := fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedMcpServerName, strings.TrimSuffix(dataId, ".json"))
httpRoute := vs.Http[0] httpRoute := vs.Http[0]
pathMatchValue := "" pathMatchValue := ""
for _, match := range httpRoute.Match { for _, match := range httpRoute.Match {
@@ -856,52 +620,10 @@ func (w *watcher) buildMcpServerForMcpServer(vs *v1alpha3.VirtualService, group,
} }
} }
func (w *watcher) generateServiceEntry(host string, services []model.Instance) *v1alpha3.ServiceEntry { func generateDrForMcpServer(host, protocol string) *v1alpha3.DestinationRule {
portList := make([]*v1alpha3.ServicePort, 0) switch protocol {
endpoints := make([]*v1alpha3.WorkloadEntry, 0) case provider.McpSSEProtocol:
isDnsService := false return &v1alpha3.DestinationRule{
for _, service := range services {
protocol := common.HTTP
if service.Metadata != nil && service.Metadata["protocol"] != "" {
protocol = common.ParseProtocol(service.Metadata["protocol"])
}
port := &v1alpha3.ServicePort{
Name: protocol.String(),
Number: uint32(service.Port),
Protocol: protocol.String(),
}
if len(portList) == 0 {
portList = append(portList, port)
}
if !isValidIP(service.Ip) {
isDnsService = true
}
endpoint := &v1alpha3.WorkloadEntry{
Address: service.Ip,
Ports: map[string]uint32{port.Protocol: port.Number},
Labels: service.Metadata,
}
endpoints = append(endpoints, endpoint)
}
resolution := v1alpha3.ServiceEntry_STATIC
if isDnsService {
resolution = v1alpha3.ServiceEntry_DNS
}
se := &v1alpha3.ServiceEntry{
Hosts: []string{host},
Ports: portList,
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: resolution,
Endpoints: endpoints,
}
return se
}
func (w *watcher) generateDrForSSEService(host string) *v1alpha3.DestinationRule {
dr := &v1alpha3.DestinationRule{
Host: host, Host: host,
TrafficPolicy: &v1alpha3.TrafficPolicy{ TrafficPolicy: &v1alpha3.TrafficPolicy{
LoadBalancer: &v1alpha3.LoadBalancerSettings{ LoadBalancer: &v1alpha3.LoadBalancerSettings{
@@ -915,7 +637,17 @@ func (w *watcher) generateDrForSSEService(host string) *v1alpha3.DestinationRule
}, },
}, },
} }
return dr case provider.HttpsProtocol:
return &v1alpha3.DestinationRule{
Host: host,
TrafficPolicy: &v1alpha3.TrafficPolicy{
Tls: &v1alpha3.ClientTLSSettings{
Mode: v1alpha3.ClientTLSSettings_SIMPLE,
},
},
}
}
return nil
} }
func parseMcpArgs(args interface{}) (*provider.ToolArgs, error) { func parseMcpArgs(args interface{}) (*provider.ToolArgs, error) {
@@ -1016,35 +748,104 @@ func mergeMaps(maps ...map[string]string) map[string]string {
return res return res
} }
func getNacosServiceFullHost(groupName, namespace, serviceName string) string { func getServiceFullHostFromMcpServer(server *provider.McpServer) string {
if server == nil || server.RemoteServerConfig == nil || server.RemoteServerConfig.ServiceRef == nil {
return ""
}
groupName := server.RemoteServerConfig.ServiceRef.GroupName
if groupName == "DEFAULT_GROUP" {
groupName = "DEFAULT-GROUP"
}
namespace := server.RemoteServerConfig.ServiceRef.NamespaceId
serviceName := server.RemoteServerConfig.ServiceRef.ServiceName
suffix := strings.Join([]string{groupName, namespace, string(provider.Nacos)}, common.DotSeparator) suffix := strings.Join([]string{groupName, namespace, string(provider.Nacos)}, common.DotSeparator)
host := strings.Join([]string{serviceName, suffix}, common.DotSeparator) host := strings.Join([]string{serviceName, suffix}, common.DotSeparator)
return host return host
} }
func generateServiceEntry(host string, services *model.Service) *v1alpha3.ServiceEntry {
if services == nil || len(services.Hosts) == 0 {
return nil
}
portList := make([]*v1alpha3.ServicePort, 0)
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
for _, service := range services.Hosts {
protocol := common.HTTP
if service.Metadata != nil && service.Metadata["protocol"] != "" {
protocol = common.ParseProtocol(service.Metadata["protocol"])
}
port := &v1alpha3.ServicePort{
Name: protocol.String(),
Number: uint32(service.Port),
Protocol: protocol.String(),
}
if len(portList) == 0 {
portList = append(portList, port)
}
endpoint := &v1alpha3.WorkloadEntry{
Address: service.Ip,
Ports: map[string]uint32{port.Protocol: port.Number},
Labels: service.Metadata,
}
endpoints = append(endpoints, endpoint)
}
se := &v1alpha3.ServiceEntry{
Hosts: []string{host},
Ports: portList,
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: getNacosServiceResolution(services),
Endpoints: endpoints,
}
return se
}
func isValidIP(ipStr string) bool {
ip := net.ParseIP(ipStr)
return ip != nil
}
func getNacosServiceResolution(services *model.Service) v1alpha3.ServiceEntry_Resolution {
ipEndpoints := 0
dnsEndpoints := 0
for _, service := range services.Hosts {
if isValidIP(service.Ip) {
ipEndpoints = ipEndpoints + 1
} else {
dnsEndpoints = dnsEndpoints + 1
}
}
if ipEndpoints > 0 && dnsEndpoints > 0 {
mcpServerLog.Errorf("nacos service %v has both ip and dns endpoints, set to ip resolution ", services.Name)
return v1alpha3.ServiceEntry_STATIC
}
if ipEndpoints > 0 {
return v1alpha3.ServiceEntry_STATIC
}
return v1alpha3.ServiceEntry_DNS
}
func (w *watcher) Stop() { func (w *watcher) Stop() {
w.mutex.Lock() w.mutex.Lock()
defer w.mutex.Unlock() defer w.mutex.Unlock()
mcpServerLog.Infof("unsubscribe all configs")
for key := range w.watchingConfig { for key := range w.watchingConfig {
s := strings.Split(key, DefaultJoiner) err := w.registryClient.CancelListenToServer(key)
err := w.unsubscribe(s[0], s[1])
if err == nil { if err == nil {
delete(w.watchingConfig, key) delete(w.watchingConfig, key)
w.cache.UpdateConfigCache(config.GroupVersionKind{}, key, nil, true)
mcpServerLog.Infof("cancel listen to mcp server config %v", key)
} }
} }
mcpServerLog.Infof("stop all service nameing client")
for _, client := range w.serviceCache {
// TODO: This is a temporary implementation because of a bug in the nacos-go-sdk, which causes a block when stoping.
go client.Stop()
}
w.isStop = true w.isStop = true
mcpServerLog.Infof("stop all config client")
mcpServerLog.Infof("watcher %v stop", w.Name)
w.UpdateService()
close(w.stop) close(w.stop)
w.Ready(false) w.Ready(false)
w.registryClient.CloseClient()
} }
func (w *watcher) IsHealthy() bool { func (w *watcher) IsHealthy() bool {
@@ -1054,18 +855,3 @@ func (w *watcher) IsHealthy() bool {
func (w *watcher) GetRegistryType() string { func (w *watcher) GetRegistryType() string {
return w.RegistryType.String() return w.RegistryType.String()
} }
func isValidIP(ipStr string) bool {
ip := net.ParseIP(ipStr)
return ip != nil
}
func normalizeRewritePathPrefix(path string) string {
if path == "" || path == "/" {
return "/"
}
if path[0] != '/' {
path = "/" + path
}
return strings.TrimSuffix(path, "/")
}

View File

@@ -0,0 +1,585 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mcpserver
import (
"fmt"
"reflect"
"strings"
"sync"
"testing"
apiv1 "github.com/alibaba/higress/api/networking/v1"
common2 "github.com/alibaba/higress/pkg/ingress/kube/common"
provider "github.com/alibaba/higress/registry"
"github.com/alibaba/higress/registry/memory"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/stretchr/testify/mock"
wrappers "google.golang.org/protobuf/types/known/wrapperspb"
"istio.io/api/networking/v1alpha3"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/schema/gvk"
)
type mockWatcher struct {
watcher
mock.Mock
}
func newTestWatcher(cache memory.Cache, opts ...WatcherOption) mockWatcher {
w := &watcher{
watchingConfig: make(map[string]bool),
RegistryType: "mcpserver",
Status: provider.UnHealthy,
cache: cache,
mutex: &sync.Mutex{},
stop: make(chan struct{}),
}
w.NacosRefreshInterval = int64(DefaultRefreshInterval)
for _, opt := range opts {
opt(w)
}
if w.NacosNamespace == "" {
w.NacosNamespace = w.NacosNamespaceId
}
return mockWatcher{watcher: *w, Mock: mock.Mock{}}
}
func testCallback(msc *McpServerConfig) memory.Cache {
registryConfig := &apiv1.RegistryConfig{
Type: string(provider.Nacos),
Name: "mse-nacos-public",
Domain: "",
Port: 8848,
NacosAddressServer: "",
NacosAccessKey: "ak",
NacosSecretKey: "sk",
NacosNamespaceId: "",
NacosNamespace: "public",
NacosGroups: []string{"dev"},
NacosRefreshInterval: 0,
EnableMCPServer: wrappers.Bool(true),
McpServerExportDomains: []string{"mcp.com"},
McpServerBaseUrl: "/mcp-servers/",
EnableScopeMcpServers: wrappers.Bool(true),
AllowMcpServers: []string{"mcp-server-1", "mcp-server-2"},
Metadata: map[string]*apiv1.InnerMap{
"routeName": &apiv1.InnerMap{
InnerMap: map[string]string{"mcp-server-1": "mcp-route-1", "mcp-server-2": "mcp-route-2"},
},
},
}
localCache := memory.NewCache()
testWatcher := newTestWatcher(localCache,
WithType(registryConfig.Type),
WithName(registryConfig.Name),
WithNacosAddressServer(registryConfig.NacosAddressServer),
WithDomain(registryConfig.Domain),
WithPort(registryConfig.Port),
WithNacosNamespaceId(registryConfig.NacosNamespaceId),
WithNacosNamespace(registryConfig.NacosNamespace),
WithNacosGroups(registryConfig.NacosGroups),
WithNacosAccessKey(registryConfig.NacosAccessKey),
WithNacosSecretKey(registryConfig.NacosSecretKey),
WithNacosRefreshInterval(registryConfig.NacosRefreshInterval),
WithMcpExportDomains(registryConfig.McpServerExportDomains),
WithMcpBaseUrl(registryConfig.McpServerBaseUrl),
WithEnableMcpServer(registryConfig.EnableMCPServer))
testWatcher.AppendServiceUpdateHandler(func() {
fmt.Println("testWatcher service update success")
})
callback := testWatcher.mcpServerListener("mock-data-id")
callback(msc)
return localCache
}
func Test_Watcher(t *testing.T) {
dataId := "mock-data-id"
testCase := []struct {
name string
msc *McpServerConfig
dataId string
wantConfig map[string]*config.Config
}{
{
name: "normal case",
dataId: dataId,
msc: &McpServerConfig{
Credentials: map[string]interface{}{
"test-server": map[string]string{"data": "value"},
},
ServiceInfo: &model.Service{
Hosts: []model.Instance{
{
Ip: "127.0.0.1",
Port: 8080,
Metadata: map[string]string{"protocol": "http"},
},
},
},
ServerSpecConfig: `{
"name": "explore",
"protocol": "http",
"description": "explore",
"remoteServerConfig": {
"serviceRef": {
"namespaceId": "public",
"groupName": "DEFAULT_GROUP",
"serviceName": "explore"
},
"exportPath": ""
},
"enabled": true
}`,
ToolsSpecConfig: `{
"tools": [
{
"name": "explore",
"description": "find name from tag",
"inputSchema": {
"type": "object",
"properties": {
"tags": {
"type": "string",
"description": "tag"
}
}
}
}
],
"toolsMeta": {
"explore": {
"enabled": true,
"templates": {
"json-go-template": {
"requestTemplate": {
"method": "GET",
"url": "/v0/explore",
"argsToUrlParam": true
}
}
}
}
}
}`,
},
wantConfig: map[string]*config.Config{
gvk.ServiceEntry.String(): &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.ServiceEntry,
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedSeName, strings.TrimSuffix(dataId, ".json")),
},
Spec: &v1alpha3.ServiceEntry{
Hosts: []string{"explore.DEFAULT-GROUP.public.nacos"},
Ports: []*v1alpha3.ServicePort{
{
Number: 8080,
Name: "HTTP",
Protocol: "HTTP",
},
},
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_STATIC,
Endpoints: []*v1alpha3.WorkloadEntry{
{
Address: "127.0.0.1",
Ports: map[string]uint32{
"HTTP": 8080,
},
Labels: map[string]string{
"protocol": "http",
},
},
},
},
},
gvk.VirtualService.String(): &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.VirtualService,
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedVsName, strings.TrimSuffix(dataId, ".json")),
},
Spec: &v1alpha3.VirtualService{
Gateways: []string{"/" + common2.CleanHost("mcp.com"), common2.CreateConvertedName(constants.IstioIngressGatewayName, common2.CleanHost("mcp.com"))},
Hosts: []string{"mcp.com"},
Http: []*v1alpha3.HTTPRoute{
{
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, strings.TrimSuffix(dataId, ".json")),
Match: []*v1alpha3.HTTPMatchRequest{
{
Uri: &v1alpha3.StringMatch{
MatchType: &v1alpha3.StringMatch_Exact{
Exact: "/mcp-servers/explore",
},
},
},
{
Uri: &v1alpha3.StringMatch{
MatchType: &v1alpha3.StringMatch_Prefix{
Prefix: "/mcp-servers/explore/",
},
},
},
},
Route: []*v1alpha3.HTTPRouteDestination{
{
Destination: &v1alpha3.Destination{
Host: "explore.DEFAULT-GROUP.public.nacos",
},
},
},
},
},
},
},
},
},
{
name: "sse and dns endpoint case",
dataId: dataId,
msc: &McpServerConfig{
Credentials: map[string]interface{}{
"test-server": map[string]string{"data": "value"},
},
ServiceInfo: &model.Service{
Hosts: []model.Instance{
{
Ip: "example.com",
Port: 8080,
Metadata: map[string]string{"protocol": "http"},
},
},
},
ServerSpecConfig: `{
"name": "explore",
"protocol": "mcp-sse",
"description": "explore",
"remoteServerConfig": {
"serviceRef": {
"namespaceId": "public",
"groupName": "DEFAULT_GROUP",
"serviceName": "explore"
},
"exportPath": ""
},
"enabled": true
}`,
ToolsSpecConfig: `{
"tools": [
{
"name": "explore",
"description": "find name from tag",
"inputSchema": {
"type": "object",
"properties": {
"tags": {
"type": "string",
"description": "tag"
}
}
}
}
],
"toolsMeta": {
"explore": {
"enabled": true,
"templates": {
"json-go-template": {
"requestTemplate": {
"method": "GET",
"url": "/v0/explore",
"argsToUrlParam": true
}
}
}
}
}
}`,
},
wantConfig: map[string]*config.Config{
gvk.ServiceEntry.String(): &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.ServiceEntry,
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedSeName, strings.TrimSuffix(dataId, ".json")),
},
Spec: &v1alpha3.ServiceEntry{
Hosts: []string{"explore.DEFAULT-GROUP.public.nacos"},
Ports: []*v1alpha3.ServicePort{
{
Number: 8080,
Name: "HTTP",
Protocol: "HTTP",
},
},
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_DNS,
Endpoints: []*v1alpha3.WorkloadEntry{
{
Address: "example.com",
Ports: map[string]uint32{
"HTTP": 8080,
},
Labels: map[string]string{
"protocol": "http",
},
},
},
},
},
gvk.VirtualService.String(): &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.VirtualService,
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedVsName, strings.TrimSuffix(dataId, ".json")),
},
Spec: &v1alpha3.VirtualService{
Gateways: []string{"/" + common2.CleanHost("mcp.com"), common2.CreateConvertedName(constants.IstioIngressGatewayName, common2.CleanHost("mcp.com"))},
Hosts: []string{"mcp.com"},
Http: []*v1alpha3.HTTPRoute{
{
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, strings.TrimSuffix(dataId, ".json")),
Match: []*v1alpha3.HTTPMatchRequest{
{
Uri: &v1alpha3.StringMatch{
MatchType: &v1alpha3.StringMatch_Exact{
Exact: "/mcp-servers/explore",
},
},
},
{
Uri: &v1alpha3.StringMatch{
MatchType: &v1alpha3.StringMatch_Prefix{
Prefix: "/mcp-servers/explore/",
},
},
},
},
Route: []*v1alpha3.HTTPRouteDestination{
{
Destination: &v1alpha3.Destination{
Host: "explore.DEFAULT-GROUP.public.nacos",
},
},
},
Rewrite: &v1alpha3.HTTPRewrite{
Uri: "/",
Authority: "example.com",
},
},
},
},
},
gvk.DestinationRule.String(): &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.DestinationRule,
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedDrName, strings.TrimSuffix(dataId, ".json")),
},
Spec: &v1alpha3.DestinationRule{
Host: "explore.DEFAULT-GROUP.public.nacos",
TrafficPolicy: &v1alpha3.TrafficPolicy{
LoadBalancer: &v1alpha3.LoadBalancerSettings{
LbPolicy: &v1alpha3.LoadBalancerSettings_ConsistentHash{
ConsistentHash: &v1alpha3.LoadBalancerSettings_ConsistentHashLB{
HashKey: &v1alpha3.LoadBalancerSettings_ConsistentHashLB_UseSourceIp{
UseSourceIp: true,
},
},
},
},
},
},
},
},
},
{
name: "https and dns case",
dataId: dataId,
msc: &McpServerConfig{
Credentials: map[string]interface{}{
"test-server": map[string]string{"data": "value"},
},
ServiceInfo: &model.Service{
Hosts: []model.Instance{
{
Ip: "example.com",
Port: 8080,
Metadata: map[string]string{"protocol": "https"},
},
},
},
ServerSpecConfig: `{
"name": "explore",
"protocol": "https",
"description": "explore",
"remoteServerConfig": {
"serviceRef": {
"namespaceId": "public",
"groupName": "DEFAULT_GROUP",
"serviceName": "explore"
},
"exportPath": ""
},
"enabled": true
}`,
ToolsSpecConfig: `{
"tools": [
{
"name": "explore",
"description": "find name from tag",
"inputSchema": {
"type": "object",
"properties": {
"tags": {
"type": "string",
"description": "tag"
}
}
}
}
],
"toolsMeta": {
"explore": {
"enabled": true,
"templates": {
"json-go-template": {
"requestTemplate": {
"method": "GET",
"url": "/v0/explore",
"argsToUrlParam": true
}
}
}
}
}
}`,
},
wantConfig: map[string]*config.Config{
gvk.ServiceEntry.String(): &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.ServiceEntry,
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedSeName, strings.TrimSuffix(dataId, ".json")),
},
Spec: &v1alpha3.ServiceEntry{
Hosts: []string{"explore.DEFAULT-GROUP.public.nacos"},
Ports: []*v1alpha3.ServicePort{
{
Number: 8080,
Name: "HTTPS",
Protocol: "HTTPS",
},
},
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_DNS,
Endpoints: []*v1alpha3.WorkloadEntry{
{
Address: "example.com",
Ports: map[string]uint32{
"HTTPS": 8080,
},
Labels: map[string]string{
"protocol": "https",
},
},
},
},
},
gvk.VirtualService.String(): &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.VirtualService,
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedVsName, strings.TrimSuffix(dataId, ".json")),
},
Spec: &v1alpha3.VirtualService{
Gateways: []string{"/" + common2.CleanHost("mcp.com"), common2.CreateConvertedName(constants.IstioIngressGatewayName, common2.CleanHost("mcp.com"))},
Hosts: []string{"mcp.com"},
Http: []*v1alpha3.HTTPRoute{
{
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, strings.TrimSuffix(dataId, ".json")),
Match: []*v1alpha3.HTTPMatchRequest{
{
Uri: &v1alpha3.StringMatch{
MatchType: &v1alpha3.StringMatch_Exact{
Exact: "/mcp-servers/explore",
},
},
},
{
Uri: &v1alpha3.StringMatch{
MatchType: &v1alpha3.StringMatch_Prefix{
Prefix: "/mcp-servers/explore/",
},
},
},
},
Route: []*v1alpha3.HTTPRouteDestination{
{
Destination: &v1alpha3.Destination{
Host: "explore.DEFAULT-GROUP.public.nacos",
},
},
},
Rewrite: &v1alpha3.HTTPRewrite{
Authority: "example.com",
},
},
},
},
},
gvk.DestinationRule.String(): &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.DestinationRule,
Name: fmt.Sprintf("%s-%s", provider.IstioMcpAutoGeneratedDrName, strings.TrimSuffix(dataId, ".json")),
},
Spec: &v1alpha3.DestinationRule{
Host: "explore.DEFAULT-GROUP.public.nacos",
TrafficPolicy: &v1alpha3.TrafficPolicy{
Tls: &v1alpha3.ClientTLSSettings{
Mode: v1alpha3.ClientTLSSettings_SIMPLE,
},
},
},
},
},
},
}
for _, tc := range testCase {
t.Run(tc.name, func(t *testing.T) {
localCache := testCallback(tc.msc)
se := localCache.GetAllConfigs(gvk.ServiceEntry)[dataId]
wantSe := tc.wantConfig[gvk.ServiceEntry.String()]
if !reflect.DeepEqual(se, wantSe) {
t.Errorf("se is not equal, want %v\n, got %v", wantSe, se)
}
vs := localCache.GetAllConfigs(gvk.VirtualService)[dataId]
wantVs := tc.wantConfig[gvk.VirtualService.String()]
if !reflect.DeepEqual(vs, wantVs) {
t.Errorf("vs is not equal, want %v\n, got %v", wantVs, vs)
}
dr := localCache.GetAllConfigs(gvk.DestinationRule)[dataId]
wantDr := tc.wantConfig[gvk.DestinationRule.String()]
if !reflect.DeepEqual(dr, wantDr) {
t.Errorf("dr is not equal, want %v\n, got %v", wantDr, dr)
}
})
}
}

View File

@@ -16,11 +16,14 @@ package v2
import ( import (
"errors" "errors"
"fmt"
"net"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/nacos-group/nacos-sdk-go/v2/clients" "github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client" "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant" "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
@@ -32,9 +35,11 @@ import (
apiv1 "github.com/alibaba/higress/api/networking/v1" apiv1 "github.com/alibaba/higress/api/networking/v1"
"github.com/alibaba/higress/pkg/common" "github.com/alibaba/higress/pkg/common"
"github.com/alibaba/higress/registry"
provider "github.com/alibaba/higress/registry" provider "github.com/alibaba/higress/registry"
"github.com/alibaba/higress/registry/memory" "github.com/alibaba/higress/registry/memory"
"github.com/alibaba/higress/registry/nacos/address" "github.com/alibaba/higress/registry/nacos/address"
"github.com/alibaba/higress/registry/nacos/mcpserver"
) )
const ( const (
@@ -68,6 +73,9 @@ type watcher struct {
updateCacheWhenEmpty bool updateCacheWhenEmpty bool
nacosClientConfig *constant.ClientConfig nacosClientConfig *constant.ClientConfig
authOption provider.AuthOption authOption provider.AuthOption
namespace string
clusterId string
mcpWatcher provider.Watcher
} }
type WatcherOption func(w *watcher) type WatcherOption func(w *watcher)
@@ -88,6 +96,45 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er
opt(w) opt(w)
} }
if w.EnableMCPServer != nil && w.EnableMCPServer.GetValue() {
if w.Type != string(registry.Nacos3) {
log.Errorf("can not create mcpWatcher for nacos 2.x type, required nacos 3.x")
} else {
mcpWatcher, err := mcpserver.NewWatcher(
cache,
mcpserver.WithType(w.Type),
mcpserver.WithName(w.Name),
mcpserver.WithNacosAddressServer(w.NacosAddressServer),
mcpserver.WithDomain(w.Domain),
mcpserver.WithPort(w.Port),
mcpserver.WithNacosNamespaceId(w.NacosNamespaceId),
mcpserver.WithNacosNamespace(w.NacosNamespace),
mcpserver.WithNacosGroups(w.NacosGroups),
mcpserver.WithNacosAccessKey(w.NacosAccessKey),
mcpserver.WithNacosSecretKey(w.NacosSecretKey),
mcpserver.WithNacosRefreshInterval(w.NacosRefreshInterval),
mcpserver.WithMcpExportDomains(w.McpServerExportDomains),
mcpserver.WithMcpBaseUrl(w.McpServerBaseUrl),
mcpserver.WithEnableMcpServer(w.EnableMCPServer),
mcpserver.WithClusterId(w.clusterId),
mcpserver.WithNamespace(w.namespace),
mcpserver.WithAuthOption(w.authOption),
)
if err != nil {
return nil, fmt.Errorf("can not create mcp server watcher, err:%v", err)
}
var once sync.Once
mcpWatcher.ReadyHandler(func(ready bool) {
once.Do(func() {
if ready {
log.Infof("Registry mcp Watcher is ready, type:%s, name:%s", w.Type, w.Name)
}
})
})
w.mcpWatcher = mcpWatcher
}
}
if w.NacosNamespace == "" { if w.NacosNamespace == "" {
w.NacosNamespace = w.NacosNamespaceId w.NacosNamespace = w.NacosNamespaceId
} }
@@ -233,16 +280,52 @@ func WithAuthOption(authOption provider.AuthOption) WatcherOption {
} }
} }
func WithMcpExportDomains(exportDomains []string) WatcherOption {
return func(w *watcher) {
w.McpServerExportDomains = exportDomains
}
}
func WithMcpBaseUrl(url string) WatcherOption {
return func(w *watcher) {
w.McpServerBaseUrl = url
}
}
func WithEnableMcpServer(enable *wrappers.BoolValue) WatcherOption {
return func(w *watcher) {
w.EnableMCPServer = enable
}
}
func WithNamespace(ns string) WatcherOption {
return func(w *watcher) {
w.namespace = ns
}
}
func WithClusterId(id string) WatcherOption {
return func(w *watcher) {
w.clusterId = id
}
}
func (w *watcher) Run() { func (w *watcher) Run() {
ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval)) ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval))
defer ticker.Stop() defer ticker.Stop()
w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10)) w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10))
if w.mcpWatcher != nil {
w.mcpWatcher.AppendServiceUpdateHandler(w.UpdateService)
go w.mcpWatcher.Run()
}
err := w.fetchAllServices() err := w.fetchAllServices()
if err != nil { if err != nil {
log.Errorf("first fetch services failed, err:%v", err) log.Errorf("first fetch services failed, err:%v", err)
} else { } else {
if w.mcpWatcherReady() {
w.Ready(true) w.Ready(true)
} }
}
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
@@ -250,14 +333,20 @@ func (w *watcher) Run() {
if err != nil { if err != nil {
log.Errorf("fetch services failed, err:%v", err) log.Errorf("fetch services failed, err:%v", err)
} else { } else {
if w.mcpWatcherReady() {
w.Ready(true) w.Ready(true)
} }
}
case <-w.stop: case <-w.stop:
return return
} }
} }
} }
func (w *watcher) mcpWatcherReady() bool {
return w.mcpWatcher == nil || w.mcpWatcher.IsReady()
}
func (w *watcher) updateNacosClient() { func (w *watcher) updateNacosClient() {
for { for {
select { select {
@@ -438,6 +527,7 @@ func (w *watcher) getSubscribeCallback(groupName string, serviceName string) fun
func (w *watcher) generateServiceEntry(host string, services []model.Instance) *v1alpha3.ServiceEntry { func (w *watcher) generateServiceEntry(host string, services []model.Instance) *v1alpha3.ServiceEntry {
portList := make([]*v1alpha3.ServicePort, 0) portList := make([]*v1alpha3.ServicePort, 0)
endpoints := make([]*v1alpha3.WorkloadEntry, 0) endpoints := make([]*v1alpha3.WorkloadEntry, 0)
isDnsService := false
for _, service := range services { for _, service := range services {
protocol := common.HTTP protocol := common.HTTP
@@ -452,6 +542,9 @@ func (w *watcher) generateServiceEntry(host string, services []model.Instance) *
if len(portList) == 0 { if len(portList) == 0 {
portList = append(portList, port) portList = append(portList, port)
} }
if !isValidIP(service.Ip) {
isDnsService = true
}
endpoint := &v1alpha3.WorkloadEntry{ endpoint := &v1alpha3.WorkloadEntry{
Address: service.Ip, Address: service.Ip,
Ports: map[string]uint32{port.Protocol: port.Number}, Ports: map[string]uint32{port.Protocol: port.Number},
@@ -460,11 +553,15 @@ func (w *watcher) generateServiceEntry(host string, services []model.Instance) *
endpoints = append(endpoints, endpoint) endpoints = append(endpoints, endpoint)
} }
resolution := v1alpha3.ServiceEntry_STATIC
if isDnsService {
resolution = v1alpha3.ServiceEntry_DNS
}
se := &v1alpha3.ServiceEntry{ se := &v1alpha3.ServiceEntry{
Hosts: []string{host}, Hosts: []string{host},
Ports: portList, Ports: portList,
Location: v1alpha3.ServiceEntry_MESH_INTERNAL, Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_STATIC, Resolution: resolution,
Endpoints: endpoints, Endpoints: endpoints,
} }
@@ -477,6 +574,9 @@ func (w *watcher) Stop() {
if w.addrProvider != nil { if w.addrProvider != nil {
w.addrProvider.Stop() w.addrProvider.Stop()
} }
if w.mcpWatcher != nil {
w.mcpWatcher.Stop()
}
for key := range w.WatchingServices { for key := range w.WatchingServices {
s := strings.Split(key, DefaultJoiner) s := strings.Split(key, DefaultJoiner)
err := w.unsubscribe(s[0], s[1]) err := w.unsubscribe(s[0], s[1])
@@ -523,3 +623,8 @@ func shouldSubscribe(serviceName string) bool {
return true return true
} }
func isValidIP(ipStr string) bool {
ip := net.ParseIP(ipStr)
return ip != nil
}

View File

@@ -36,7 +36,6 @@ import (
"github.com/alibaba/higress/registry/eureka" "github.com/alibaba/higress/registry/eureka"
"github.com/alibaba/higress/registry/memory" "github.com/alibaba/higress/registry/memory"
"github.com/alibaba/higress/registry/nacos" "github.com/alibaba/higress/registry/nacos"
"github.com/alibaba/higress/registry/nacos/mcpserver"
nacosv2 "github.com/alibaba/higress/registry/nacos/v2" nacosv2 "github.com/alibaba/higress/registry/nacos/v2"
"github.com/alibaba/higress/registry/zookeeper" "github.com/alibaba/higress/registry/zookeeper"
) )
@@ -171,7 +170,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
nacos.WithNacosRefreshInterval(registry.NacosRefreshInterval), nacos.WithNacosRefreshInterval(registry.NacosRefreshInterval),
nacos.WithAuthOption(authOption), nacos.WithAuthOption(authOption),
) )
case string(Nacos2): case string(Nacos2), string(Nacos3):
watcher, err = nacosv2.NewWatcher( watcher, err = nacosv2.NewWatcher(
r.Cache, r.Cache,
nacosv2.WithType(registry.Type), nacosv2.WithType(registry.Type),
@@ -185,44 +184,13 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
nacosv2.WithNacosNamespace(registry.NacosNamespace), nacosv2.WithNacosNamespace(registry.NacosNamespace),
nacosv2.WithNacosGroups(registry.NacosGroups), nacosv2.WithNacosGroups(registry.NacosGroups),
nacosv2.WithNacosRefreshInterval(registry.NacosRefreshInterval), nacosv2.WithNacosRefreshInterval(registry.NacosRefreshInterval),
nacosv2.WithMcpExportDomains(registry.McpServerExportDomains),
nacosv2.WithMcpBaseUrl(registry.McpServerBaseUrl),
nacosv2.WithEnableMcpServer(registry.EnableMCPServer),
nacosv2.WithClusterId(r.clusterId),
nacosv2.WithNamespace(r.namespace),
nacosv2.WithAuthOption(authOption), nacosv2.WithAuthOption(authOption),
) )
case string(Nacos3):
if registry.EnableMCPServer.GetValue() {
watcher, err = mcpserver.NewWatcher(
r.Cache,
mcpserver.WithType(registry.Type),
mcpserver.WithName(registry.Name),
mcpserver.WithNacosAddressServer(registry.NacosAddressServer),
mcpserver.WithDomain(registry.Domain),
mcpserver.WithPort(registry.Port),
mcpserver.WithNacosAccessKey(registry.NacosAccessKey),
mcpserver.WithNacosSecretKey(registry.NacosSecretKey),
mcpserver.WithNacosRefreshInterval(registry.NacosRefreshInterval),
mcpserver.WithMcpExportDomains(registry.McpServerExportDomains),
mcpserver.WithMcpBaseUrl(registry.McpServerBaseUrl),
mcpserver.WithEnableMcpServer(registry.EnableMCPServer),
mcpserver.WithClusterId(r.clusterId),
mcpserver.WithNamespace(r.namespace),
mcpserver.WithAuthOption(authOption),
)
} else {
watcher, err = nacosv2.NewWatcher(
r.Cache,
nacosv2.WithType(registry.Type),
nacosv2.WithName(registry.Name),
nacosv2.WithNacosAddressServer(registry.NacosAddressServer),
nacosv2.WithDomain(registry.Domain),
nacosv2.WithPort(registry.Port),
nacosv2.WithNacosAccessKey(registry.NacosAccessKey),
nacosv2.WithNacosSecretKey(registry.NacosSecretKey),
nacosv2.WithNacosNamespaceId(registry.NacosNamespaceId),
nacosv2.WithNacosNamespace(registry.NacosNamespace),
nacosv2.WithNacosGroups(registry.NacosGroups),
nacosv2.WithNacosRefreshInterval(registry.NacosRefreshInterval),
nacosv2.WithAuthOption(authOption),
)
}
case string(Zookeeper): case string(Zookeeper):
watcher, err = zookeeper.NewWatcher( watcher, err = zookeeper.NewWatcher(
r.Cache, r.Cache,