feat: support nacos mcp registry (#1961)

This commit is contained in:
Xin Luo
2025-03-27 09:41:22 +08:00
committed by GitHub
parent 1965d107d0
commit d3887835a3
6 changed files with 679 additions and 0 deletions

View File

@@ -7,6 +7,7 @@ import (
"google.golang.org/protobuf/types/known/anypb"
"github.com/alibaba/higress/plugins/golang-filter/mcp-server/internal"
_ "github.com/alibaba/higress/plugins/golang-filter/mcp-server/registry/nacos"
_ "github.com/alibaba/higress/plugins/golang-filter/mcp-server/servers/gorm"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
envoyHttp "github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http"

View File

@@ -36,6 +36,7 @@ require (
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/nacos-group/nacos-sdk-go/v2 v2.2.9
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect

View File

@@ -0,0 +1,243 @@
package nacos
import (
"encoding/json"
"fmt"
"regexp"
"github.com/alibaba/higress/plugins/golang-filter/mcp-server/registry"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
)
type NacosMcpRegsitry struct {
serviceMatcher map[string]string
configClient config_client.IConfigClient
namingClient naming_client.INamingClient
toolsDescription map[string]*registry.ToolDescription
toolsRpcContext map[string]*registry.RpcContext
toolChangeEventListeners []registry.ToolChangeEventListener
currentServiceSet map[string]bool
}
const DEFAULT_SERVICE_LIST_MAX_PGSIZXE = 10000
const MCP_TOOL_SUBFIX = "-mcp-tools.json"
func (n *NacosMcpRegsitry) ListToolsDesciption() []*registry.ToolDescription {
if n.toolsDescription == nil {
n.refreshToolsList()
}
result := []*registry.ToolDescription{}
for _, tool := range n.toolsDescription {
result = append(result, tool)
}
return result
}
func (n *NacosMcpRegsitry) GetToolRpcContext(toolName string) (*registry.RpcContext, bool) {
tool, ok := n.toolsRpcContext[toolName]
return tool, ok
}
func (n *NacosMcpRegsitry) RegisterToolChangeEventListener(listener registry.ToolChangeEventListener) {
n.toolChangeEventListeners = append(n.toolChangeEventListeners, listener)
}
func (n *NacosMcpRegsitry) refreshToolsList() bool {
changed := false
for group, serviceMatcher := range n.serviceMatcher {
if n.refreshToolsListForGroup(group, serviceMatcher) {
changed = true
}
}
return changed
}
func (n *NacosMcpRegsitry) refreshToolsListForGroup(group string, serviceMatcher string) bool {
services, err := n.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{
GroupName: group,
PageNo: 1,
PageSize: DEFAULT_SERVICE_LIST_MAX_PGSIZXE,
})
if err != nil {
api.LogError(fmt.Sprintf("Get service list error when refresh tools list for group %s, error %s", group, err))
return false
}
changed := false
serviceList := services.Doms
pattern, err := regexp.Compile(serviceMatcher)
if err != nil {
api.LogErrorf("Match service error for patter %s", serviceMatcher)
return false
}
for _, service := range serviceList {
if !pattern.MatchString(service) {
continue
}
if _, ok := n.currentServiceSet[group+service]; !ok {
changed = true
n.refreshToolsListForService(group, service)
n.listenToService(group, service)
}
}
return changed
}
func (n *NacosMcpRegsitry) refreshToolsListForServiceWithContent(group string, service string, newConfig *string, instances *[]model.Instance) {
if newConfig == nil {
dataId := makeToolsConfigId(service)
content, err := n.configClient.GetConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
})
if err != nil {
api.LogError(fmt.Sprintf("Get tools config for sercice %s:%s error %s", group, service, err))
return
}
newConfig = &content
}
if instances == nil {
instancesFromNacos, err := n.namingClient.SelectInstances(vo.SelectInstancesParam{
ServiceName: service,
GroupName: group,
HealthyOnly: true,
})
if err != nil {
api.LogError(fmt.Sprintf("List instance for sercice %s:%s error %s", group, service, err))
return
}
instances = &instancesFromNacos
}
var applicationDescription registry.McpApplicationDescription
err := json.Unmarshal([]byte(*newConfig), &applicationDescription)
if err != nil {
api.LogError(fmt.Sprintf("Parse tools config for sercice %s:%s error, config is %s, error is %s", group, service, *newConfig, err))
return
}
wrappedInstances := []registry.Instance{}
for _, instance := range *instances {
wrappedInstance := registry.Instance{
Host: instance.Ip,
Port: instance.Port,
Meta: instance.Metadata,
}
wrappedInstances = append(wrappedInstances, wrappedInstance)
}
if n.toolsDescription == nil {
n.toolsDescription = map[string]*registry.ToolDescription{}
}
if n.toolsRpcContext == nil {
n.toolsRpcContext = map[string]*registry.RpcContext{}
}
for _, tool := range applicationDescription.ToolsDescription {
meta := applicationDescription.ToolsMeta[tool.Name]
var cred *registry.CredentialInfo
credentialRef := meta.CredentialRef
if credentialRef != nil {
cred = n.GetCredential(*credentialRef, group)
}
context := registry.RpcContext{
ToolMeta: meta,
Instances: &wrappedInstances,
Protocol: applicationDescription.Protocol,
Credential: cred,
}
tool.Name = makeToolName(group, service, tool.Name)
n.toolsDescription[tool.Name] = tool
n.toolsRpcContext[tool.Name] = &context
}
n.currentServiceSet[group+service] = true
api.LogInfo(fmt.Sprintf("Refresh tools list for service success %s:%s", group, service))
}
func (n *NacosMcpRegsitry) GetCredential(name string, group string) *registry.CredentialInfo {
dataId := makeCredentialDataId(name)
content, err := n.configClient.GetConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
})
if err != nil {
api.LogError(fmt.Sprintf("Get credentials for %s:%s error %s", group, dataId, err))
return nil
}
var credential registry.CredentialInfo
err = json.Unmarshal([]byte(content), &credential)
if err != nil {
api.LogError(fmt.Sprintf("Parse credentials for %s:%s error %s", group, dataId, err))
return nil
}
return &credential
}
func (n *NacosMcpRegsitry) refreshToolsListForService(group string, service string) {
n.refreshToolsListForServiceWithContent(group, service, nil, nil)
}
func (n *NacosMcpRegsitry) listenToService(group string, service string) {
// config changed, tools description may be changed
err := n.configClient.ListenConfig(vo.ConfigParam{
DataId: makeToolsConfigId(service),
Group: group,
OnChange: func(namespace, group, dataId, data string) {
n.refreshToolsListForServiceWithContent(group, service, &data, nil)
for _, listener := range n.toolChangeEventListeners {
listener.OnToolChanged(n)
}
},
})
if err != nil {
api.LogError(fmt.Sprintf("Listen to service's tool config error %s", err))
}
err = n.namingClient.Subscribe(&vo.SubscribeParam{
ServiceName: service,
GroupName: group,
SubscribeCallback: func(services []model.Instance, err error) {
n.refreshToolsListForServiceWithContent(group, service, nil, &services)
for _, listener := range n.toolChangeEventListeners {
listener.OnToolChanged(n)
}
},
})
if err != nil {
api.LogError(fmt.Sprintf("Listen to service's tool instance list error %s", err))
}
}
func makeToolName(group string, service string, toolName string) string {
return fmt.Sprintf("%s_%s_%s", group, service, toolName)
}
func makeToolsConfigId(service string) string {
return service + MCP_TOOL_SUBFIX
}
func makeCredentialDataId(credentialName string) string {
return credentialName
}

View File

@@ -0,0 +1,170 @@
package nacos
import (
"errors"
"fmt"
"time"
"github.com/alibaba/higress/plugins/golang-filter/mcp-server/internal"
"github.com/alibaba/higress/plugins/golang-filter/mcp-server/registry"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"github.com/mark3labs/mcp-go/mcp"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
)
func init() {
internal.GlobalRegistry.RegisterServer("nacos-mcp-registry", &NacosConfig{})
}
type NacosConfig struct {
ServerAddr *string
Ak *string
Sk *string
Namespace *string
RegionId *string
ServiceMatcher *map[string]string
}
type McpServerToolsChangeListener struct {
mcpServer *internal.MCPServer
}
func (l *McpServerToolsChangeListener) OnToolChanged(reg registry.McpServerRegistry) {
resetToolsToMcpServer(l.mcpServer, reg)
}
func CreateNacosMcpRegsitry(config *NacosConfig) (*NacosMcpRegsitry, error) {
sc := []constant.ServerConfig{
*constant.NewServerConfig(*config.ServerAddr, 8848, constant.WithContextPath("/nacos")),
}
//create ClientConfig
cc := *constant.NewClientConfig(
constant.WithTimeoutMs(5000),
constant.WithNotLoadCacheAtStart(true),
constant.WithOpenKMS(true),
)
if config.Namespace != nil {
cc.NamespaceId = *config.Namespace
}
if config.RegionId != nil {
cc.RegionId = *config.RegionId
}
if config.Ak != nil {
cc.AccessKey = *config.Ak
}
if config.Sk != nil {
cc.SecretKey = *config.Sk
}
// create config client
configClient, err := clients.NewConfigClient(
vo.NacosClientParam{
ClientConfig: &cc,
ServerConfigs: sc,
},
)
if err != nil {
return nil, fmt.Errorf("failed to initial nacos config client: %w", err)
}
namingClient, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &cc,
ServerConfigs: sc,
},
)
if err != nil {
return nil, fmt.Errorf("failed to initial naming config client: %w", err)
}
return &NacosMcpRegsitry{
configClient: configClient,
namingClient: namingClient,
serviceMatcher: *config.ServiceMatcher,
toolChangeEventListeners: []registry.ToolChangeEventListener{},
currentServiceSet: map[string]bool{},
}, nil
}
func (c *NacosConfig) ParseConfig(config map[string]any) error {
serverAddr, ok := config["serverAddr"].(string)
if !ok {
return errors.New("missing serverAddr")
}
c.ServerAddr = &serverAddr
serviceMatcher, ok := config["serviceMatcher"].(map[string]any)
if !ok {
return errors.New("missing serviceMatcher")
}
matchers := map[string]string{}
for key, value := range serviceMatcher {
matchers[key] = value.(string)
}
c.ServiceMatcher = &matchers
if ak, ok := config["accessKey"].(string); ok {
c.Ak = &ak
}
if sk, ok := config["secretKey"].(string); ok {
c.Sk = &sk
}
if region, ok := config["regionId"].(string); ok {
c.RegionId = &region
}
return nil
}
func (c *NacosConfig) NewServer(serverName string) (*internal.MCPServer, error) {
mcpServer := internal.NewMCPServer(
serverName,
"1.0.0",
)
nacosRegistry, err := CreateNacosMcpRegsitry(c)
if err != nil {
return nil, fmt.Errorf("failed to initialize NacosMcpRegistry: %w", err)
}
listener := McpServerToolsChangeListener{
mcpServer: mcpServer,
}
nacosRegistry.RegisterToolChangeEventListener(&listener)
go func() {
for {
if nacosRegistry.refreshToolsList() {
resetToolsToMcpServer(mcpServer, nacosRegistry)
}
time.Sleep(time.Second * 10)
}
}()
return mcpServer, nil
}
func resetToolsToMcpServer(mcpServer *internal.MCPServer, reg registry.McpServerRegistry) {
wrappedTools := []internal.ServerTool{}
tools := reg.ListToolsDesciption()
for _, tool := range tools {
wrappedTools = append(wrappedTools, internal.ServerTool{
Tool: mcp.NewToolWithRawSchema(tool.Name, tool.Description, tool.InputSchema),
Handler: registry.HandleRegistryToolsCall(reg),
})
}
mcpServer.SetTools(wrappedTools...)
api.LogInfo("Config changed reset tools")
}

View File

@@ -0,0 +1,64 @@
package registry
import (
"encoding/json"
"github.com/mark3labs/mcp-go/mcp"
)
type McpApplicationDescription struct {
Protocol string `json:"protocol"`
ToolsDescription []*ToolDescription `json:"tools"`
ToolsMeta map[string]ToolMeta `json:"toolsMeta"`
}
type ToolMeta struct {
InvokeContext map[string]string `json:"invokeContext"`
ParametersMapping map[string]ParameterMapInfo `json:"parametersMapping"`
CredentialRef *string `json:"credentialRef"`
}
type ParameterMapInfo struct {
ParamName string `json:"name"`
BackendName string `json:"backendName"`
ParamType string `json:"type"`
Position string `json:"position"`
}
type ToolDescription struct {
Name string `json:"name"`
Description string `json:"description"`
InputSchema json.RawMessage `json:"inputSchema"`
}
type ToolChangeEventListener interface {
OnToolChanged(McpServerRegistry)
}
type McpServerRegistry interface {
ListToolsDesciption() []*ToolDescription
GetToolRpcContext(toolname string) (*RpcContext, bool)
RegisterToolChangeEventListener(listener ToolChangeEventListener)
}
type RpcContext struct {
Instances *[]Instance
ToolMeta ToolMeta
Protocol string
Credential *CredentialInfo
}
type CredentialInfo struct {
CredentialType string `json:"type"`
Credentials map[string]any `json:"credentialsMap"`
}
type Instance struct {
Host string
Port uint64
Meta map[string]string
}
type RemoteCallHandle interface {
HandleToolCall(ctx *RpcContext, parameters map[string]any) (*mcp.CallToolResult, error)
}

View File

@@ -0,0 +1,200 @@
package registry
import (
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"strings"
"github.com/alibaba/higress/plugins/golang-filter/mcp-server/internal"
"github.com/mark3labs/mcp-go/mcp"
)
const HTTP_URL_TEMPLATE = "%s://%s:%d%s"
const FIX_QUERY_TOKEN_KEY = "key"
const FIX_QUERY_TOKEN_VALUE = "value"
const PROTOCOL_HTTP = "http"
const PROTOCOL_HTTPS = "https"
const DEFAULT_HTTP_METHOD = "GET"
const DEFAULT_HTTP_PATH = "/"
func getHttpCredentialHandle(name string) (func(*CredentialInfo, *HttpRemoteCallHandle), error) {
if name == "fixed-query-token" {
return FixedQueryToken, nil
}
return nil, fmt.Errorf("Unknown credential type")
}
type CommonRemoteCallHandle struct {
Instance *Instance
}
type HttpRemoteCallHandle struct {
CommonRemoteCallHandle
Protocol string
Headers http.Header
Body *string
Query map[string]string
Path string
Method string
}
// http credentials handles
func FixedQueryToken(cred *CredentialInfo, h *HttpRemoteCallHandle) {
key, _ := cred.Credentials[FIX_QUERY_TOKEN_KEY]
value, _ := cred.Credentials[FIX_QUERY_TOKEN_VALUE]
h.Query[key.(string)] = value.(string)
}
func newHttpRemoteCallHandle(ctx *RpcContext) *HttpRemoteCallHandle {
instance := selectOneInstance(ctx)
method, ok := ctx.ToolMeta.InvokeContext["method"]
if !ok {
method = DEFAULT_HTTP_METHOD
}
path, ok := ctx.ToolMeta.InvokeContext["path"]
if !ok {
path = DEFAULT_HTTP_PATH
}
return &HttpRemoteCallHandle{
CommonRemoteCallHandle: CommonRemoteCallHandle{
Instance: &instance,
},
Protocol: ctx.Protocol,
Headers: http.Header{},
Body: nil,
Query: map[string]string{},
Path: path,
Method: method,
}
}
// http remote handle implementation
func (h *HttpRemoteCallHandle) HandleToolCall(ctx *RpcContext, parameters map[string]any) (*mcp.CallToolResult, error) {
if ctx.Credential != nil {
credentialHandle, err := getHttpCredentialHandle(ctx.Credential.CredentialType)
if err != nil {
return nil, err
}
credentialHandle(ctx.Credential, h)
}
err := h.handleParamMapping(&ctx.ToolMeta.ParametersMapping, parameters)
if err != nil {
return nil, err
}
response, err := h.doHttpCall()
if err != nil {
return nil, err
}
body, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
responseType := "text"
if respType, ok := ctx.ToolMeta.InvokeContext["responseType"]; ok {
responseType = respType
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: responseType,
Text: string(body),
},
},
}, nil
}
func (h *HttpRemoteCallHandle) handleParamMapping(mapInfo *map[string]ParameterMapInfo, params map[string]any) error {
paramMapInfo := *mapInfo
for param, value := range params {
if info, ok := paramMapInfo[param]; ok {
if info.Position == "Query" {
h.Query[info.BackendName] = fmt.Sprintf("%s", value)
} else if info.Position == "Header" {
h.Headers[info.BackendName] = []string{fmt.Sprintf("%s", value)}
} else {
return fmt.Errorf("Unsupport position for args %s, pos is %s", param, info.Position)
}
} else {
h.Query[param] = fmt.Sprintf("%s", value)
}
}
return nil
}
func (h *HttpRemoteCallHandle) doHttpCall() (*http.Response, error) {
pathPrefix := fmt.Sprintf(HTTP_URL_TEMPLATE, h.Protocol, h.Instance.Host, h.Instance.Port, h.Path)
queryString := ""
queryGroup := []string{}
for queryKey, queryValue := range h.Query {
queryGroup = append(queryGroup, url.QueryEscape(queryKey)+"="+url.QueryEscape(queryValue))
}
if len(queryGroup) > 0 {
queryString = "?" + strings.Join(queryGroup, "&")
}
fullUrl, err := url.Parse(pathPrefix + queryString)
if err != nil {
return nil, fmt.Errorf("Parse url error , url is %s", pathPrefix+queryString)
}
request := http.Request{
URL: fullUrl,
Method: h.Method,
Header: h.Headers,
}
if h.Body != nil {
request.Body = io.NopCloser(strings.NewReader(*h.Body))
}
return http.DefaultClient.Do(&request)
}
func selectOneInstance(ctx *RpcContext) Instance {
instanceId := 0
instances := *ctx.Instances
if len(instances) != 1 {
instanceId = rand.Intn(len(instances) - 1)
}
return instances[instanceId]
}
func getRemoteCallhandle(ctx *RpcContext) RemoteCallHandle {
if ctx.Protocol == PROTOCOL_HTTP || ctx.Protocol == PROTOCOL_HTTPS {
return newHttpRemoteCallHandle(ctx)
} else {
return nil
}
}
// common remote call process
func CommonRemoteCall(reg McpServerRegistry, toolName string, parameters map[string]any) (*mcp.CallToolResult, error) {
ctx, ok := reg.GetToolRpcContext(toolName)
if !ok {
return nil, fmt.Errorf("Unknown tool %s", toolName)
}
remoteHandle := getRemoteCallhandle(ctx)
if remoteHandle == nil {
return nil, fmt.Errorf("Unknown backend protocol %s", ctx.Protocol)
}
return remoteHandle.HandleToolCall(ctx, parameters)
}
func HandleRegistryToolsCall(reg McpServerRegistry) internal.ToolHandlerFunc {
return func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
arguments := request.Params.Arguments
return CommonRemoteCall(reg, request.Params.Name, arguments)
}
}