diff --git a/plugins/golang-filter/mcp-server/config.go b/plugins/golang-filter/mcp-server/config.go index 03767e93a..bea54bd0b 100644 --- a/plugins/golang-filter/mcp-server/config.go +++ b/plugins/golang-filter/mcp-server/config.go @@ -7,6 +7,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" "github.com/alibaba/higress/plugins/golang-filter/mcp-server/internal" + _ "github.com/alibaba/higress/plugins/golang-filter/mcp-server/registry/nacos" _ "github.com/alibaba/higress/plugins/golang-filter/mcp-server/servers/gorm" "github.com/envoyproxy/envoy/contrib/golang/common/go/api" envoyHttp "github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http" diff --git a/plugins/golang-filter/mcp-server/go.mod b/plugins/golang-filter/mcp-server/go.mod index b6ef3ad71..e12f1a956 100644 --- a/plugins/golang-filter/mcp-server/go.mod +++ b/plugins/golang-filter/mcp-server/go.mod @@ -36,6 +36,7 @@ require ( github.com/jinzhu/now v1.1.5 // indirect github.com/klauspost/compress v1.17.8 // indirect github.com/mattn/go-sqlite3 v1.14.22 // indirect + github.com/nacos-group/nacos-sdk-go/v2 v2.2.9 github.com/paulmach/orb v0.11.1 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/plugins/golang-filter/mcp-server/registry/nacos/nacos.go b/plugins/golang-filter/mcp-server/registry/nacos/nacos.go new file mode 100644 index 000000000..ebb2b28df --- /dev/null +++ b/plugins/golang-filter/mcp-server/registry/nacos/nacos.go @@ -0,0 +1,243 @@ +package nacos + +import ( + "encoding/json" + "fmt" + "regexp" + + "github.com/alibaba/higress/plugins/golang-filter/mcp-server/registry" + "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + "github.com/nacos-group/nacos-sdk-go/v2/clients/config_client" + "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client" + "github.com/nacos-group/nacos-sdk-go/v2/model" + "github.com/nacos-group/nacos-sdk-go/v2/vo" +) + +type NacosMcpRegsitry struct { + serviceMatcher map[string]string + configClient config_client.IConfigClient + namingClient naming_client.INamingClient + toolsDescription map[string]*registry.ToolDescription + toolsRpcContext map[string]*registry.RpcContext + toolChangeEventListeners []registry.ToolChangeEventListener + currentServiceSet map[string]bool +} + +const DEFAULT_SERVICE_LIST_MAX_PGSIZXE = 10000 +const MCP_TOOL_SUBFIX = "-mcp-tools.json" + +func (n *NacosMcpRegsitry) ListToolsDesciption() []*registry.ToolDescription { + if n.toolsDescription == nil { + n.refreshToolsList() + } + + result := []*registry.ToolDescription{} + for _, tool := range n.toolsDescription { + result = append(result, tool) + } + return result +} + +func (n *NacosMcpRegsitry) GetToolRpcContext(toolName string) (*registry.RpcContext, bool) { + tool, ok := n.toolsRpcContext[toolName] + return tool, ok +} + +func (n *NacosMcpRegsitry) RegisterToolChangeEventListener(listener registry.ToolChangeEventListener) { + n.toolChangeEventListeners = append(n.toolChangeEventListeners, listener) +} + +func (n *NacosMcpRegsitry) refreshToolsList() bool { + changed := false + for group, serviceMatcher := range n.serviceMatcher { + if n.refreshToolsListForGroup(group, serviceMatcher) { + changed = true + } + } + return changed +} + +func (n *NacosMcpRegsitry) refreshToolsListForGroup(group string, serviceMatcher string) bool { + services, err := n.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{ + GroupName: group, + PageNo: 1, + PageSize: DEFAULT_SERVICE_LIST_MAX_PGSIZXE, + }) + + if err != nil { + api.LogError(fmt.Sprintf("Get service list error when refresh tools list for group %s, error %s", group, err)) + return false + } + + changed := false + serviceList := services.Doms + pattern, err := regexp.Compile(serviceMatcher) + if err != nil { + api.LogErrorf("Match service error for patter %s", serviceMatcher) + return false + } + for _, service := range serviceList { + if !pattern.MatchString(service) { + continue + } + + if _, ok := n.currentServiceSet[group+service]; !ok { + changed = true + n.refreshToolsListForService(group, service) + n.listenToService(group, service) + } + } + return changed +} + +func (n *NacosMcpRegsitry) refreshToolsListForServiceWithContent(group string, service string, newConfig *string, instances *[]model.Instance) { + + if newConfig == nil { + dataId := makeToolsConfigId(service) + content, err := n.configClient.GetConfig(vo.ConfigParam{ + DataId: dataId, + Group: group, + }) + + if err != nil { + api.LogError(fmt.Sprintf("Get tools config for sercice %s:%s error %s", group, service, err)) + return + } + + newConfig = &content + } + + if instances == nil { + instancesFromNacos, err := n.namingClient.SelectInstances(vo.SelectInstancesParam{ + ServiceName: service, + GroupName: group, + HealthyOnly: true, + }) + + if err != nil { + api.LogError(fmt.Sprintf("List instance for sercice %s:%s error %s", group, service, err)) + return + } + + instances = &instancesFromNacos + } + + var applicationDescription registry.McpApplicationDescription + err := json.Unmarshal([]byte(*newConfig), &applicationDescription) + if err != nil { + api.LogError(fmt.Sprintf("Parse tools config for sercice %s:%s error, config is %s, error is %s", group, service, *newConfig, err)) + return + } + + wrappedInstances := []registry.Instance{} + for _, instance := range *instances { + wrappedInstance := registry.Instance{ + Host: instance.Ip, + Port: instance.Port, + Meta: instance.Metadata, + } + wrappedInstances = append(wrappedInstances, wrappedInstance) + } + + if n.toolsDescription == nil { + n.toolsDescription = map[string]*registry.ToolDescription{} + } + + if n.toolsRpcContext == nil { + n.toolsRpcContext = map[string]*registry.RpcContext{} + } + + for _, tool := range applicationDescription.ToolsDescription { + meta := applicationDescription.ToolsMeta[tool.Name] + + var cred *registry.CredentialInfo + credentialRef := meta.CredentialRef + if credentialRef != nil { + cred = n.GetCredential(*credentialRef, group) + } + + context := registry.RpcContext{ + ToolMeta: meta, + Instances: &wrappedInstances, + Protocol: applicationDescription.Protocol, + Credential: cred, + } + + tool.Name = makeToolName(group, service, tool.Name) + n.toolsDescription[tool.Name] = tool + n.toolsRpcContext[tool.Name] = &context + } + n.currentServiceSet[group+service] = true + api.LogInfo(fmt.Sprintf("Refresh tools list for service success %s:%s", group, service)) +} + +func (n *NacosMcpRegsitry) GetCredential(name string, group string) *registry.CredentialInfo { + dataId := makeCredentialDataId(name) + content, err := n.configClient.GetConfig(vo.ConfigParam{ + DataId: dataId, + Group: group, + }) + + if err != nil { + api.LogError(fmt.Sprintf("Get credentials for %s:%s error %s", group, dataId, err)) + return nil + } + + var credential registry.CredentialInfo + err = json.Unmarshal([]byte(content), &credential) + if err != nil { + api.LogError(fmt.Sprintf("Parse credentials for %s:%s error %s", group, dataId, err)) + return nil + } + + return &credential +} + +func (n *NacosMcpRegsitry) refreshToolsListForService(group string, service string) { + n.refreshToolsListForServiceWithContent(group, service, nil, nil) +} + +func (n *NacosMcpRegsitry) listenToService(group string, service string) { + + // config changed, tools description may be changed + err := n.configClient.ListenConfig(vo.ConfigParam{ + DataId: makeToolsConfigId(service), + Group: group, + OnChange: func(namespace, group, dataId, data string) { + n.refreshToolsListForServiceWithContent(group, service, &data, nil) + for _, listener := range n.toolChangeEventListeners { + listener.OnToolChanged(n) + } + }, + }) + + if err != nil { + api.LogError(fmt.Sprintf("Listen to service's tool config error %s", err)) + } + + err = n.namingClient.Subscribe(&vo.SubscribeParam{ + ServiceName: service, + GroupName: group, + SubscribeCallback: func(services []model.Instance, err error) { + n.refreshToolsListForServiceWithContent(group, service, nil, &services) + for _, listener := range n.toolChangeEventListeners { + listener.OnToolChanged(n) + } + }, + }) + if err != nil { + api.LogError(fmt.Sprintf("Listen to service's tool instance list error %s", err)) + } +} + +func makeToolName(group string, service string, toolName string) string { + return fmt.Sprintf("%s_%s_%s", group, service, toolName) +} + +func makeToolsConfigId(service string) string { + return service + MCP_TOOL_SUBFIX +} + +func makeCredentialDataId(credentialName string) string { + return credentialName +} diff --git a/plugins/golang-filter/mcp-server/registry/nacos/server.go b/plugins/golang-filter/mcp-server/registry/nacos/server.go new file mode 100644 index 000000000..a5bec9eda --- /dev/null +++ b/plugins/golang-filter/mcp-server/registry/nacos/server.go @@ -0,0 +1,170 @@ +package nacos + +import ( + "errors" + "fmt" + "time" + + "github.com/alibaba/higress/plugins/golang-filter/mcp-server/internal" + "github.com/alibaba/higress/plugins/golang-filter/mcp-server/registry" + "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + "github.com/mark3labs/mcp-go/mcp" + "github.com/nacos-group/nacos-sdk-go/v2/clients" + "github.com/nacos-group/nacos-sdk-go/v2/common/constant" + "github.com/nacos-group/nacos-sdk-go/v2/vo" +) + +func init() { + internal.GlobalRegistry.RegisterServer("nacos-mcp-registry", &NacosConfig{}) +} + +type NacosConfig struct { + ServerAddr *string + Ak *string + Sk *string + Namespace *string + RegionId *string + ServiceMatcher *map[string]string +} + +type McpServerToolsChangeListener struct { + mcpServer *internal.MCPServer +} + +func (l *McpServerToolsChangeListener) OnToolChanged(reg registry.McpServerRegistry) { + resetToolsToMcpServer(l.mcpServer, reg) +} + +func CreateNacosMcpRegsitry(config *NacosConfig) (*NacosMcpRegsitry, error) { + sc := []constant.ServerConfig{ + *constant.NewServerConfig(*config.ServerAddr, 8848, constant.WithContextPath("/nacos")), + } + + //create ClientConfig + cc := *constant.NewClientConfig( + constant.WithTimeoutMs(5000), + constant.WithNotLoadCacheAtStart(true), + constant.WithOpenKMS(true), + ) + + if config.Namespace != nil { + cc.NamespaceId = *config.Namespace + } + + if config.RegionId != nil { + cc.RegionId = *config.RegionId + } + + if config.Ak != nil { + cc.AccessKey = *config.Ak + } + + if config.Sk != nil { + cc.SecretKey = *config.Sk + } + + // create config client + configClient, err := clients.NewConfigClient( + vo.NacosClientParam{ + ClientConfig: &cc, + ServerConfigs: sc, + }, + ) + + if err != nil { + return nil, fmt.Errorf("failed to initial nacos config client: %w", err) + } + + namingClient, err := clients.NewNamingClient( + vo.NacosClientParam{ + ClientConfig: &cc, + ServerConfigs: sc, + }, + ) + + if err != nil { + return nil, fmt.Errorf("failed to initial naming config client: %w", err) + } + + return &NacosMcpRegsitry{ + configClient: configClient, + namingClient: namingClient, + serviceMatcher: *config.ServiceMatcher, + toolChangeEventListeners: []registry.ToolChangeEventListener{}, + currentServiceSet: map[string]bool{}, + }, nil +} + +func (c *NacosConfig) ParseConfig(config map[string]any) error { + + serverAddr, ok := config["serverAddr"].(string) + if !ok { + return errors.New("missing serverAddr") + } + c.ServerAddr = &serverAddr + + serviceMatcher, ok := config["serviceMatcher"].(map[string]any) + if !ok { + return errors.New("missing serviceMatcher") + } + + matchers := map[string]string{} + for key, value := range serviceMatcher { + matchers[key] = value.(string) + } + + c.ServiceMatcher = &matchers + + if ak, ok := config["accessKey"].(string); ok { + c.Ak = &ak + } + + if sk, ok := config["secretKey"].(string); ok { + c.Sk = &sk + } + + if region, ok := config["regionId"].(string); ok { + c.RegionId = ®ion + } + return nil +} + +func (c *NacosConfig) NewServer(serverName string) (*internal.MCPServer, error) { + mcpServer := internal.NewMCPServer( + serverName, + "1.0.0", + ) + + nacosRegistry, err := CreateNacosMcpRegsitry(c) + if err != nil { + return nil, fmt.Errorf("failed to initialize NacosMcpRegistry: %w", err) + } + + listener := McpServerToolsChangeListener{ + mcpServer: mcpServer, + } + nacosRegistry.RegisterToolChangeEventListener(&listener) + + go func() { + for { + if nacosRegistry.refreshToolsList() { + resetToolsToMcpServer(mcpServer, nacosRegistry) + } + time.Sleep(time.Second * 10) + } + }() + return mcpServer, nil +} + +func resetToolsToMcpServer(mcpServer *internal.MCPServer, reg registry.McpServerRegistry) { + wrappedTools := []internal.ServerTool{} + tools := reg.ListToolsDesciption() + for _, tool := range tools { + wrappedTools = append(wrappedTools, internal.ServerTool{ + Tool: mcp.NewToolWithRawSchema(tool.Name, tool.Description, tool.InputSchema), + Handler: registry.HandleRegistryToolsCall(reg), + }) + } + mcpServer.SetTools(wrappedTools...) + api.LogInfo("Config changed reset tools") +} diff --git a/plugins/golang-filter/mcp-server/registry/registry.go b/plugins/golang-filter/mcp-server/registry/registry.go new file mode 100644 index 000000000..acb208ae4 --- /dev/null +++ b/plugins/golang-filter/mcp-server/registry/registry.go @@ -0,0 +1,64 @@ +package registry + +import ( + "encoding/json" + + "github.com/mark3labs/mcp-go/mcp" +) + +type McpApplicationDescription struct { + Protocol string `json:"protocol"` + ToolsDescription []*ToolDescription `json:"tools"` + ToolsMeta map[string]ToolMeta `json:"toolsMeta"` +} + +type ToolMeta struct { + InvokeContext map[string]string `json:"invokeContext"` + ParametersMapping map[string]ParameterMapInfo `json:"parametersMapping"` + CredentialRef *string `json:"credentialRef"` +} + +type ParameterMapInfo struct { + ParamName string `json:"name"` + BackendName string `json:"backendName"` + ParamType string `json:"type"` + Position string `json:"position"` +} + +type ToolDescription struct { + Name string `json:"name"` + Description string `json:"description"` + InputSchema json.RawMessage `json:"inputSchema"` +} + +type ToolChangeEventListener interface { + OnToolChanged(McpServerRegistry) +} + +type McpServerRegistry interface { + ListToolsDesciption() []*ToolDescription + GetToolRpcContext(toolname string) (*RpcContext, bool) + RegisterToolChangeEventListener(listener ToolChangeEventListener) +} + +type RpcContext struct { + Instances *[]Instance + ToolMeta ToolMeta + Protocol string + Credential *CredentialInfo +} + +type CredentialInfo struct { + CredentialType string `json:"type"` + Credentials map[string]any `json:"credentialsMap"` +} + +type Instance struct { + Host string + Port uint64 + Meta map[string]string +} + +type RemoteCallHandle interface { + HandleToolCall(ctx *RpcContext, parameters map[string]any) (*mcp.CallToolResult, error) +} diff --git a/plugins/golang-filter/mcp-server/registry/remote.go b/plugins/golang-filter/mcp-server/registry/remote.go new file mode 100644 index 000000000..34f871a29 --- /dev/null +++ b/plugins/golang-filter/mcp-server/registry/remote.go @@ -0,0 +1,200 @@ +package registry + +import ( + "context" + "fmt" + "io" + "math/rand" + "net/http" + "net/url" + "strings" + + "github.com/alibaba/higress/plugins/golang-filter/mcp-server/internal" + "github.com/mark3labs/mcp-go/mcp" +) + +const HTTP_URL_TEMPLATE = "%s://%s:%d%s" +const FIX_QUERY_TOKEN_KEY = "key" +const FIX_QUERY_TOKEN_VALUE = "value" +const PROTOCOL_HTTP = "http" +const PROTOCOL_HTTPS = "https" +const DEFAULT_HTTP_METHOD = "GET" +const DEFAULT_HTTP_PATH = "/" + +func getHttpCredentialHandle(name string) (func(*CredentialInfo, *HttpRemoteCallHandle), error) { + if name == "fixed-query-token" { + return FixedQueryToken, nil + } + + return nil, fmt.Errorf("Unknown credential type") +} + +type CommonRemoteCallHandle struct { + Instance *Instance +} + +type HttpRemoteCallHandle struct { + CommonRemoteCallHandle + Protocol string + Headers http.Header + Body *string + Query map[string]string + Path string + Method string +} + +// http credentials handles +func FixedQueryToken(cred *CredentialInfo, h *HttpRemoteCallHandle) { + key, _ := cred.Credentials[FIX_QUERY_TOKEN_KEY] + value, _ := cred.Credentials[FIX_QUERY_TOKEN_VALUE] + h.Query[key.(string)] = value.(string) +} + +func newHttpRemoteCallHandle(ctx *RpcContext) *HttpRemoteCallHandle { + instance := selectOneInstance(ctx) + method, ok := ctx.ToolMeta.InvokeContext["method"] + if !ok { + method = DEFAULT_HTTP_METHOD + } + + path, ok := ctx.ToolMeta.InvokeContext["path"] + if !ok { + path = DEFAULT_HTTP_PATH + } + + return &HttpRemoteCallHandle{ + CommonRemoteCallHandle: CommonRemoteCallHandle{ + Instance: &instance, + }, + Protocol: ctx.Protocol, + Headers: http.Header{}, + Body: nil, + Query: map[string]string{}, + Path: path, + Method: method, + } +} + +// http remote handle implementation +func (h *HttpRemoteCallHandle) HandleToolCall(ctx *RpcContext, parameters map[string]any) (*mcp.CallToolResult, error) { + if ctx.Credential != nil { + credentialHandle, err := getHttpCredentialHandle(ctx.Credential.CredentialType) + if err != nil { + return nil, err + } + credentialHandle(ctx.Credential, h) + } + + err := h.handleParamMapping(&ctx.ToolMeta.ParametersMapping, parameters) + if err != nil { + return nil, err + } + + response, err := h.doHttpCall() + if err != nil { + return nil, err + } + body, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + responseType := "text" + if respType, ok := ctx.ToolMeta.InvokeContext["responseType"]; ok { + responseType = respType + } + + return &mcp.CallToolResult{ + Content: []mcp.Content{ + mcp.TextContent{ + Type: responseType, + Text: string(body), + }, + }, + }, nil +} + +func (h *HttpRemoteCallHandle) handleParamMapping(mapInfo *map[string]ParameterMapInfo, params map[string]any) error { + paramMapInfo := *mapInfo + for param, value := range params { + if info, ok := paramMapInfo[param]; ok { + if info.Position == "Query" { + h.Query[info.BackendName] = fmt.Sprintf("%s", value) + } else if info.Position == "Header" { + h.Headers[info.BackendName] = []string{fmt.Sprintf("%s", value)} + } else { + return fmt.Errorf("Unsupport position for args %s, pos is %s", param, info.Position) + } + } else { + h.Query[param] = fmt.Sprintf("%s", value) + } + } + return nil +} + +func (h *HttpRemoteCallHandle) doHttpCall() (*http.Response, error) { + pathPrefix := fmt.Sprintf(HTTP_URL_TEMPLATE, h.Protocol, h.Instance.Host, h.Instance.Port, h.Path) + queryString := "" + queryGroup := []string{} + for queryKey, queryValue := range h.Query { + queryGroup = append(queryGroup, url.QueryEscape(queryKey)+"="+url.QueryEscape(queryValue)) + } + + if len(queryGroup) > 0 { + queryString = "?" + strings.Join(queryGroup, "&") + } + fullUrl, err := url.Parse(pathPrefix + queryString) + if err != nil { + return nil, fmt.Errorf("Parse url error , url is %s", pathPrefix+queryString) + } + request := http.Request{ + URL: fullUrl, + Method: h.Method, + Header: h.Headers, + } + + if h.Body != nil { + request.Body = io.NopCloser(strings.NewReader(*h.Body)) + } + + return http.DefaultClient.Do(&request) +} + +func selectOneInstance(ctx *RpcContext) Instance { + instanceId := 0 + instances := *ctx.Instances + if len(instances) != 1 { + instanceId = rand.Intn(len(instances) - 1) + } + return instances[instanceId] +} + +func getRemoteCallhandle(ctx *RpcContext) RemoteCallHandle { + if ctx.Protocol == PROTOCOL_HTTP || ctx.Protocol == PROTOCOL_HTTPS { + return newHttpRemoteCallHandle(ctx) + } else { + return nil + } +} + +// common remote call process +func CommonRemoteCall(reg McpServerRegistry, toolName string, parameters map[string]any) (*mcp.CallToolResult, error) { + ctx, ok := reg.GetToolRpcContext(toolName) + if !ok { + return nil, fmt.Errorf("Unknown tool %s", toolName) + } + + remoteHandle := getRemoteCallhandle(ctx) + if remoteHandle == nil { + return nil, fmt.Errorf("Unknown backend protocol %s", ctx.Protocol) + } + + return remoteHandle.HandleToolCall(ctx, parameters) +} + +func HandleRegistryToolsCall(reg McpServerRegistry) internal.ToolHandlerFunc { + return func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + arguments := request.Params.Arguments + return CommonRemoteCall(reg, request.Params.Name, arguments) + } +}