mirror of
https://github.com/alibaba/higress.git
synced 2026-03-07 18:10:54 +08:00
986 lines
30 KiB
Go
986 lines
30 KiB
Go
// Copyright (c) 2022 Alibaba Group Holding Ltd.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package mcpserver
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
apiv1 "github.com/alibaba/higress/api/networking/v1"
|
|
"github.com/alibaba/higress/pkg/common"
|
|
common2 "github.com/alibaba/higress/pkg/ingress/kube/common"
|
|
provider "github.com/alibaba/higress/registry"
|
|
"github.com/alibaba/higress/registry/memory"
|
|
"github.com/golang/protobuf/ptypes/wrappers"
|
|
"github.com/nacos-group/nacos-sdk-go/v2/clients"
|
|
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
|
|
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
|
|
"github.com/nacos-group/nacos-sdk-go/v2/model"
|
|
"github.com/nacos-group/nacos-sdk-go/v2/vo"
|
|
"go.uber.org/atomic"
|
|
"istio.io/api/networking/v1alpha3"
|
|
"istio.io/istio/pkg/config"
|
|
"istio.io/istio/pkg/config/constants"
|
|
"istio.io/istio/pkg/config/schema/gvk"
|
|
"istio.io/istio/pkg/log"
|
|
"istio.io/istio/pkg/util/sets"
|
|
)
|
|
|
|
const (
|
|
DefaultInitTimeout = time.Second * 10
|
|
DefaultNacosTimeout = 5000
|
|
DefaultNacosLogLevel = "info"
|
|
DefaultNacosLogDir = "/var/log/nacos/log/mcp/log"
|
|
DefaultNacosCacheDir = "/var/log/nacos/log/mcp/cache"
|
|
DefaultNacosNotLoadCache = true
|
|
DefaultNacosLogMaxAge = 3
|
|
DefaultRefreshInterval = time.Second * 30
|
|
DefaultRefreshIntervalLimit = time.Second * 10
|
|
DefaultFetchPageSize = 50
|
|
DefaultJoiner = "@@"
|
|
NacosV3LabelKey = "isV3"
|
|
)
|
|
|
|
var mcpServerLog = log.RegisterScope("McpServer", "Nacos Mcp Server Watcher process.")
|
|
|
|
type watcher struct {
|
|
provider.BaseWatcher
|
|
apiv1.RegistryConfig
|
|
watchingConfig map[string]bool
|
|
watchingConfigRefs map[string]sets.Set[string]
|
|
configToConfigListener map[string]*MultiConfigListener
|
|
serviceCache map[string]*ServiceCache
|
|
configToService map[string]string
|
|
credentialKeyToName map[string]map[string]string
|
|
RegistryType provider.ServiceRegistryType
|
|
Status provider.WatcherStatus
|
|
configClient config_client.IConfigClient
|
|
serverConfig []constant.ServerConfig
|
|
cache memory.Cache
|
|
mutex *sync.Mutex
|
|
subMutex *sync.Mutex
|
|
callbackMutex *sync.Mutex
|
|
stop chan struct{}
|
|
isStop bool
|
|
updateCacheWhenEmpty bool
|
|
nacosClientConfig *constant.ClientConfig
|
|
namespace string
|
|
clusterId string
|
|
authOption provider.AuthOption
|
|
}
|
|
|
|
type WatcherOption func(w *watcher)
|
|
|
|
func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) {
|
|
w := &watcher{
|
|
watchingConfig: make(map[string]bool),
|
|
configToService: make(map[string]string),
|
|
watchingConfigRefs: make(map[string]sets.Set[string]),
|
|
configToConfigListener: make(map[string]*MultiConfigListener),
|
|
credentialKeyToName: make(map[string]map[string]string),
|
|
serviceCache: map[string]*ServiceCache{},
|
|
RegistryType: "nacos3",
|
|
Status: provider.UnHealthy,
|
|
cache: cache,
|
|
mutex: &sync.Mutex{},
|
|
subMutex: &sync.Mutex{},
|
|
callbackMutex: &sync.Mutex{},
|
|
stop: make(chan struct{}),
|
|
}
|
|
|
|
w.NacosRefreshInterval = int64(DefaultRefreshInterval)
|
|
|
|
for _, opt := range opts {
|
|
opt(w)
|
|
}
|
|
|
|
// The nacos mcp server uses these restricted namespaces and groups, and may be adjusted in the future.
|
|
w.NacosNamespace = "nacos-default-mcp"
|
|
w.NacosNamespaceId = w.NacosNamespace
|
|
w.NacosGroups = []string{"mcp-server"}
|
|
|
|
mcpServerLog.Infof("new nacos mcp server watcher with config Name:%s", w.Name)
|
|
|
|
w.nacosClientConfig = constant.NewClientConfig(
|
|
constant.WithTimeoutMs(DefaultNacosTimeout),
|
|
constant.WithLogLevel(DefaultNacosLogLevel),
|
|
constant.WithLogDir(DefaultNacosLogDir),
|
|
constant.WithCacheDir(DefaultNacosCacheDir),
|
|
constant.WithNotLoadCacheAtStart(DefaultNacosNotLoadCache),
|
|
constant.WithLogRollingConfig(&constant.ClientLogRollingConfig{
|
|
MaxAge: DefaultNacosLogMaxAge,
|
|
}),
|
|
constant.WithUpdateCacheWhenEmpty(w.updateCacheWhenEmpty),
|
|
constant.WithNamespaceId(w.NacosNamespaceId),
|
|
constant.WithAccessKey(w.NacosAccessKey),
|
|
constant.WithSecretKey(w.NacosSecretKey),
|
|
constant.WithUsername(w.authOption.NacosUsername),
|
|
constant.WithPassword(w.authOption.NacosPassword),
|
|
)
|
|
|
|
initTimer := time.NewTimer(DefaultInitTimeout)
|
|
w.serverConfig = []constant.ServerConfig{
|
|
*constant.NewServerConfig(w.Domain, uint64(w.Port)),
|
|
}
|
|
|
|
success := make(chan struct{})
|
|
go func() {
|
|
configClient, err := clients.NewConfigClient(vo.NacosClientParam{
|
|
ClientConfig: w.nacosClientConfig,
|
|
ServerConfigs: w.serverConfig,
|
|
})
|
|
if err == nil {
|
|
w.configClient = configClient
|
|
close(success)
|
|
} else {
|
|
mcpServerLog.Errorf("can not create naming client, err:%v", err)
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-initTimer.C:
|
|
return nil, errors.New("new nacos mcp server watcher timeout")
|
|
case <-success:
|
|
return w, nil
|
|
}
|
|
}
|
|
|
|
func WithNacosAddressServer(nacosAddressServer string) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.NacosAddressServer = nacosAddressServer
|
|
}
|
|
}
|
|
|
|
func WithNacosAccessKey(nacosAccessKey string) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.NacosAccessKey = nacosAccessKey
|
|
}
|
|
}
|
|
|
|
func WithNacosSecretKey(nacosSecretKey string) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.NacosSecretKey = nacosSecretKey
|
|
}
|
|
}
|
|
|
|
func WithNacosRefreshInterval(refreshInterval int64) WatcherOption {
|
|
return func(w *watcher) {
|
|
if refreshInterval < int64(DefaultRefreshIntervalLimit) {
|
|
refreshInterval = int64(DefaultRefreshIntervalLimit)
|
|
}
|
|
w.NacosRefreshInterval = refreshInterval
|
|
}
|
|
}
|
|
|
|
func WithType(t string) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.Type = t
|
|
}
|
|
}
|
|
|
|
func WithName(name string) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.Name = name
|
|
}
|
|
}
|
|
|
|
func WithDomain(domain string) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.Domain = domain
|
|
}
|
|
}
|
|
|
|
func WithPort(port uint32) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.Port = port
|
|
}
|
|
}
|
|
|
|
func WithMcpExportDomains(exportDomains []string) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.McpServerExportDomains = exportDomains
|
|
}
|
|
}
|
|
|
|
func WithMcpBaseUrl(url string) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.McpServerBaseUrl = url
|
|
}
|
|
}
|
|
|
|
func WithEnableMcpServer(enable *wrappers.BoolValue) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.EnableMCPServer = enable
|
|
}
|
|
}
|
|
|
|
func WithNamespace(ns string) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.namespace = ns
|
|
}
|
|
}
|
|
|
|
func WithClusterId(id string) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.clusterId = id
|
|
}
|
|
}
|
|
|
|
func WithAuthOption(authOption provider.AuthOption) WatcherOption {
|
|
return func(w *watcher) {
|
|
w.authOption = authOption
|
|
}
|
|
}
|
|
|
|
func (w *watcher) Run() {
|
|
ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval))
|
|
defer ticker.Stop()
|
|
w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10))
|
|
err := w.fetchAllMcpConfig()
|
|
if err != nil {
|
|
mcpServerLog.Errorf("first fetch mcp server config failed, err:%v", err)
|
|
} else {
|
|
w.Ready(true)
|
|
}
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
err := w.fetchAllMcpConfig()
|
|
if err != nil {
|
|
mcpServerLog.Errorf("fetch mcp server config failed, err:%v", err)
|
|
} else {
|
|
w.Ready(true)
|
|
}
|
|
case <-w.stop:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *watcher) fetchAllMcpConfig() error {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
if w.isStop {
|
|
return nil
|
|
}
|
|
fetchedConfigs := make(map[string]bool)
|
|
var tries int
|
|
isV3 := true
|
|
if w.EnableMCPServer != nil {
|
|
isV3 = w.EnableMCPServer.GetValue()
|
|
}
|
|
for _, groupName := range w.NacosGroups {
|
|
for page := 1; ; page++ {
|
|
ss, err := w.configClient.SearchConfig(vo.SearchConfigParam{
|
|
Group: groupName,
|
|
Search: "blur",
|
|
PageNo: page,
|
|
PageSize: DefaultFetchPageSize,
|
|
IsV3: isV3,
|
|
})
|
|
if err != nil {
|
|
if tries > 10 {
|
|
return err
|
|
}
|
|
mcpServerLog.Errorf("fetch nacos config list failed, err:%v, pageNo:%d", err, page)
|
|
page--
|
|
tries++
|
|
continue
|
|
}
|
|
for _, item := range ss.PageItems {
|
|
fetchedConfigs[groupName+DefaultJoiner+item.DataId] = true
|
|
}
|
|
if len(ss.PageItems) < DefaultFetchPageSize {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
for key := range w.watchingConfig {
|
|
if _, exist := fetchedConfigs[key]; !exist {
|
|
s := strings.Split(key, DefaultJoiner)
|
|
err := w.unsubscribe(s[0], s[1])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
delete(w.watchingConfig, key)
|
|
}
|
|
}
|
|
|
|
wg := sync.WaitGroup{}
|
|
subscribeFailed := atomic.NewBool(false)
|
|
watchingKeys := make(chan string, len(fetchedConfigs))
|
|
for key := range fetchedConfigs {
|
|
s := strings.Split(key, DefaultJoiner)
|
|
if _, exist := w.watchingConfig[key]; !exist {
|
|
wg.Add(1)
|
|
go func(k string) {
|
|
err := w.subscribe(s[0], s[1])
|
|
if err != nil {
|
|
subscribeFailed.Store(true)
|
|
mcpServerLog.Errorf("subscribe failed, group: %v, service: %v, errors: %v", s[0], s[1], err)
|
|
} else {
|
|
watchingKeys <- k
|
|
}
|
|
wg.Done()
|
|
}(key)
|
|
}
|
|
}
|
|
wg.Wait()
|
|
close(watchingKeys)
|
|
for key := range watchingKeys {
|
|
w.watchingConfig[key] = true
|
|
}
|
|
if subscribeFailed.Load() {
|
|
return errors.New("subscribe services failed")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *watcher) unsubscribe(groupName string, dataId string) error {
|
|
mcpServerLog.Infof("unsubscribe mcp server, groupName:%s, dataId:%s", groupName, dataId)
|
|
defer w.UpdateService()
|
|
|
|
err := w.configClient.CancelListenConfig(vo.ConfigParam{
|
|
DataId: dataId,
|
|
Group: groupName,
|
|
})
|
|
if err != nil {
|
|
mcpServerLog.Errorf("unsubscribe mcp server error:%v, groupName:%s, dataId:%s", err, groupName, dataId)
|
|
return err
|
|
}
|
|
key := strings.Join([]string{w.Name, w.NacosNamespace, groupName, dataId}, DefaultJoiner)
|
|
w.configToConfigListener[key].Stop()
|
|
delete(w.watchingConfigRefs, key)
|
|
delete(w.configToConfigListener, key)
|
|
// remove service for this config
|
|
configKey := strings.Join([]string{groupName, dataId}, DefaultJoiner)
|
|
svcInfo := w.configToService[configKey]
|
|
split := strings.Split(svcInfo, DefaultJoiner)
|
|
svcNamespace := split[0]
|
|
svcGroup := split[1]
|
|
svcName := split[2]
|
|
if w.serviceCache[svcNamespace] != nil {
|
|
err = w.serviceCache[svcNamespace].RemoveListener(svcGroup, svcName, configKey)
|
|
if err != nil {
|
|
mcpServerLog.Errorf("remove service listener error:%v, groupName:%s, dataId:%s", err, groupName, dataId)
|
|
}
|
|
}
|
|
delete(w.configToService, configKey)
|
|
|
|
w.cache.UpdateConfigCache(config.GroupVersionKind{}, key, nil, true)
|
|
return nil
|
|
}
|
|
|
|
func (w *watcher) subscribe(groupName string, dataId string) error {
|
|
mcpServerLog.Infof("subscribe mcp server, groupName:%s, dataId:%s", groupName, dataId)
|
|
// first we get this config and callback manually
|
|
content, err := w.configClient.GetConfig(vo.ConfigParam{
|
|
DataId: dataId,
|
|
Group: groupName,
|
|
})
|
|
if err != nil {
|
|
mcpServerLog.Errorf("get config %s/%s err: %v", groupName, dataId, err)
|
|
} else {
|
|
w.getConfigCallback(w.NacosNamespace, groupName, dataId, content)
|
|
}
|
|
// second, we set callback for this config
|
|
err = w.configClient.ListenConfig(vo.ConfigParam{
|
|
DataId: dataId,
|
|
Group: groupName,
|
|
OnChange: w.getConfigCallback,
|
|
})
|
|
if err != nil {
|
|
mcpServerLog.Errorf("subscribe mcp server error:%v, groupName:%s, dataId:%s", err, groupName, dataId)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *watcher) getConfigCallback(namespace, group, dataId, data string) {
|
|
mcpServerLog.Infof("get config callback, namespace:%s, groupName:%s, dataId:%s", namespace, group, dataId)
|
|
|
|
if data == "" {
|
|
return
|
|
}
|
|
|
|
key := strings.Join([]string{w.Name, w.NacosNamespace, group, dataId}, DefaultJoiner)
|
|
routeName := fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, group, strings.TrimSuffix(dataId, ".json"))
|
|
|
|
mcpServer := &provider.McpServer{}
|
|
if err := json.Unmarshal([]byte(data), mcpServer); err != nil {
|
|
mcpServerLog.Errorf("Unmarshal config data to mcp server error:%v, namespace:%s, groupName:%s, dataId:%s", err, namespace, group, dataId)
|
|
return
|
|
}
|
|
if mcpServer.Protocol == provider.StdioProtocol || mcpServer.Protocol == provider.DubboProtocol || mcpServer.Protocol == provider.McpSSEProtocol {
|
|
return
|
|
}
|
|
// process mcp service
|
|
w.subMutex.Lock()
|
|
defer w.subMutex.Unlock()
|
|
if err := w.buildServiceEntryForMcpServer(mcpServer, group, dataId); err != nil {
|
|
mcpServerLog.Errorf("build service entry for mcp server failed, namespace %v, group: %v, dataId %v, errors: %v", namespace, group, dataId, err)
|
|
}
|
|
// process mcp wasm
|
|
// only generate wasm plugin for http protocol mcp server
|
|
if mcpServer.Protocol != provider.HttpProtocol {
|
|
return
|
|
}
|
|
if _, exist := w.configToConfigListener[key]; !exist {
|
|
w.configToConfigListener[key] = NewMultiConfigListener(w.configClient, w.multiCallback(mcpServer, routeName, key))
|
|
}
|
|
if _, exist := w.watchingConfigRefs[key]; !exist {
|
|
w.watchingConfigRefs[key] = sets.New[string]()
|
|
}
|
|
listener := w.configToConfigListener[key]
|
|
|
|
curRef := sets.Set[string]{}
|
|
// add description ref
|
|
curRef.Insert(strings.Join([]string{provider.DefaultMcpToolsGroup, mcpServer.ToolsDescriptionRef}, DefaultJoiner))
|
|
// add credential ref
|
|
credentialNameMap := map[string]string{}
|
|
for name, ref := range mcpServer.Credentials {
|
|
credKey := strings.Join([]string{provider.DefaultMcpCredentialsGroup, ref.Ref}, DefaultJoiner)
|
|
curRef.Insert(credKey)
|
|
credentialNameMap[credKey] = name
|
|
}
|
|
w.callbackMutex.Lock()
|
|
w.credentialKeyToName[key] = credentialNameMap
|
|
w.callbackMutex.Unlock()
|
|
|
|
toBeAdd := curRef.Difference(w.watchingConfigRefs[key])
|
|
toBeDelete := w.watchingConfigRefs[key].Difference(curRef)
|
|
|
|
var toBeListen, toBeUnListen []vo.ConfigParam
|
|
for item, _ := range toBeAdd {
|
|
split := strings.Split(item, DefaultJoiner)
|
|
toBeListen = append(toBeListen, vo.ConfigParam{
|
|
Group: split[0],
|
|
DataId: split[1],
|
|
})
|
|
}
|
|
for item, _ := range toBeDelete {
|
|
split := strings.Split(item, DefaultJoiner)
|
|
toBeUnListen = append(toBeUnListen, vo.ConfigParam{
|
|
Group: split[0],
|
|
DataId: split[1],
|
|
})
|
|
}
|
|
|
|
// listen description and credential config
|
|
if len(toBeListen) > 0 {
|
|
if err := listener.StartListen(toBeListen); err != nil {
|
|
mcpServerLog.Errorf("listen config ref failed, group: %v, dataId %v, errors: %v", group, dataId, err)
|
|
}
|
|
}
|
|
// cancel listen description and credential config
|
|
if len(toBeUnListen) > 0 {
|
|
if err := listener.CancelListen(toBeUnListen); err != nil {
|
|
mcpServerLog.Errorf("cancel listen config ref failed, group: %v, dataId %v, errors: %v", group, dataId, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *watcher) multiCallback(server *provider.McpServer, routeName, configKey string) func(map[string]string) {
|
|
callback := func(configs map[string]string) {
|
|
defer w.UpdateService()
|
|
|
|
mcpServerLog.Infof("callback, ref config changed: %s", configKey)
|
|
rule := &provider.McpServerRule{
|
|
MatchRoute: []string{routeName},
|
|
Server: &provider.ServerConfig{
|
|
Name: server.Name,
|
|
Config: map[string]interface{}{},
|
|
},
|
|
}
|
|
|
|
// process mcp credential
|
|
credentialConfig := map[string]interface{}{}
|
|
for key, data := range configs {
|
|
if strings.HasPrefix(key, provider.DefaultMcpToolsGroup) {
|
|
// skip mcp tool description
|
|
continue
|
|
}
|
|
var cred interface{}
|
|
if err := json.Unmarshal([]byte(data), &cred); err != nil {
|
|
mcpServerLog.Errorf("unmarshal credential data %v to map error:%v", key, err)
|
|
}
|
|
w.callbackMutex.Lock()
|
|
name := w.credentialKeyToName[configKey][key]
|
|
w.callbackMutex.Unlock()
|
|
credentialConfig[name] = cred
|
|
}
|
|
rule.Server.Config["credentials"] = credentialConfig
|
|
// process mcp tool description
|
|
var allowTools []string
|
|
for key, toolData := range configs {
|
|
if strings.HasPrefix(key, provider.DefaultMcpCredentialsGroup) {
|
|
// skip mcp credentials
|
|
continue
|
|
}
|
|
toolsDescription := &provider.McpToolConfig{}
|
|
if err := json.Unmarshal([]byte(toolData), toolsDescription); err != nil {
|
|
mcpServerLog.Errorf("unmarshal toolsDescriptionRef to mcp tool config error:%v", err)
|
|
}
|
|
for _, t := range toolsDescription.Tools {
|
|
convertTool := &provider.McpTool{Name: t.Name, Description: t.Description}
|
|
|
|
toolMeta := toolsDescription.ToolsMeta[t.Name]
|
|
if toolMeta != nil && toolMeta.Enabled {
|
|
allowTools = append(allowTools, t.Name)
|
|
}
|
|
argsPosition, err := getArgsPositionFromToolMeta(toolMeta)
|
|
if err != nil {
|
|
mcpServerLog.Errorf("get args position from tool meta error:%v, tool name %v", err, t.Name)
|
|
}
|
|
|
|
requiredMap := sets.Set[string]{}
|
|
for _, s := range t.InputSchema.Required {
|
|
requiredMap.Insert(s)
|
|
}
|
|
|
|
for argsName, args := range t.InputSchema.Properties {
|
|
convertArgs, err := parseMcpArgs(args)
|
|
if err != nil {
|
|
mcpServerLog.Errorf("parse mcp args error:%v, tool name %v, args name %v", err, t.Name, argsName)
|
|
continue
|
|
}
|
|
convertArgs.Name = argsName
|
|
convertArgs.Required = requiredMap.Contains(argsName)
|
|
if pos, exist := argsPosition[argsName]; exist {
|
|
convertArgs.Position = pos
|
|
}
|
|
convertTool.Args = append(convertTool.Args, convertArgs)
|
|
mcpServerLog.Debugf("parseMcpArgs, toolArgs:%v", convertArgs)
|
|
}
|
|
|
|
requestTemplate, err := getRequestTemplateFromToolMeta(toolMeta)
|
|
if err != nil {
|
|
mcpServerLog.Errorf("get request template from tool meta error:%v, tool name %v", err, t.Name)
|
|
} else {
|
|
convertTool.RequestTemplate = requestTemplate
|
|
}
|
|
|
|
responseTemplate, err := getResponseTemplateFromToolMeta(toolMeta)
|
|
if err != nil {
|
|
mcpServerLog.Errorf("get response template from tool meta error:%v, tool name %v", err, t.Name)
|
|
} else {
|
|
convertTool.ResponseTemplate = responseTemplate
|
|
}
|
|
rule.Tools = append(rule.Tools, convertTool)
|
|
}
|
|
}
|
|
|
|
rule.Server.AllowTools = allowTools
|
|
wasmPluginConfig := &config.Config{
|
|
Meta: config.Meta{
|
|
GroupVersionKind: gvk.WasmPlugin,
|
|
Namespace: w.namespace,
|
|
},
|
|
Spec: rule,
|
|
}
|
|
w.cache.UpdateConfigCache(gvk.WasmPlugin, configKey, wasmPluginConfig, false)
|
|
}
|
|
return callback
|
|
}
|
|
|
|
func (w *watcher) buildServiceEntryForMcpServer(mcpServer *provider.McpServer, configGroup, dataId string) error {
|
|
if mcpServer == nil || mcpServer.RemoteServerConfig == nil || mcpServer.RemoteServerConfig.ServiceRef == nil {
|
|
return nil
|
|
}
|
|
mcpServerLog.Debugf("ServiceRef %v for %v", mcpServer.RemoteServerConfig.ServiceRef, dataId)
|
|
configKey := strings.Join([]string{configGroup, dataId}, DefaultJoiner)
|
|
|
|
serviceGroup := mcpServer.RemoteServerConfig.ServiceRef.GroupName
|
|
serviceNamespace := mcpServer.RemoteServerConfig.ServiceRef.NamespaceId
|
|
serviceName := mcpServer.RemoteServerConfig.ServiceRef.ServiceName
|
|
if serviceNamespace == "" {
|
|
serviceNamespace = provider.DefaultNacosServiceNamespace
|
|
}
|
|
// update config to service and unsubscribe old service
|
|
curSvcKey := strings.Join([]string{serviceNamespace, serviceGroup, serviceName}, DefaultJoiner)
|
|
if svcKey, exist := w.configToService[configKey]; exist && svcKey != curSvcKey {
|
|
split := strings.Split(svcKey, DefaultJoiner)
|
|
if svcCache, has := w.serviceCache[split[0]]; has {
|
|
if err := svcCache.RemoveListener(split[1], split[2], configKey); err != nil {
|
|
mcpServerLog.Errorf("remove listener error:%v", err)
|
|
}
|
|
}
|
|
}
|
|
w.configToService[configKey] = curSvcKey
|
|
|
|
if _, exist := w.serviceCache[serviceNamespace]; !exist {
|
|
namingConfig := constant.NewClientConfig(
|
|
constant.WithTimeoutMs(DefaultNacosTimeout),
|
|
constant.WithLogLevel(DefaultNacosLogLevel),
|
|
constant.WithLogDir(DefaultNacosLogDir),
|
|
constant.WithCacheDir(DefaultNacosCacheDir),
|
|
constant.WithNotLoadCacheAtStart(DefaultNacosNotLoadCache),
|
|
constant.WithLogRollingConfig(&constant.ClientLogRollingConfig{
|
|
MaxAge: DefaultNacosLogMaxAge,
|
|
}),
|
|
constant.WithUpdateCacheWhenEmpty(w.updateCacheWhenEmpty),
|
|
constant.WithNamespaceId(serviceNamespace),
|
|
constant.WithAccessKey(w.NacosAccessKey),
|
|
constant.WithSecretKey(w.NacosSecretKey),
|
|
constant.WithUsername(w.authOption.NacosUsername),
|
|
constant.WithPassword(w.authOption.NacosPassword),
|
|
)
|
|
client, err := clients.NewNamingClient(vo.NacosClientParam{
|
|
ClientConfig: namingConfig,
|
|
ServerConfigs: w.serverConfig,
|
|
})
|
|
if err == nil {
|
|
w.serviceCache[serviceNamespace] = NewServiceCache(client)
|
|
} else {
|
|
return fmt.Errorf("can not create naming client err:%v", err)
|
|
}
|
|
}
|
|
svcCache := w.serviceCache[serviceNamespace]
|
|
err := svcCache.AddListener(serviceGroup, serviceName, configKey, w.getServiceCallback(mcpServer, configGroup, dataId))
|
|
if err != nil {
|
|
return fmt.Errorf("add listener for dataId %v, service %s/%s error:%v", dataId, serviceGroup, serviceName, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *watcher) getServiceCallback(server *provider.McpServer, configGroup, dataId string) func(services []model.Instance) {
|
|
groupName := server.RemoteServerConfig.ServiceRef.GroupName
|
|
if groupName == "DEFAULT_GROUP" {
|
|
groupName = "DEFAULT-GROUP"
|
|
}
|
|
namespace := server.RemoteServerConfig.ServiceRef.NamespaceId
|
|
serviceName := server.RemoteServerConfig.ServiceRef.ServiceName
|
|
path := server.RemoteServerConfig.ExportPath
|
|
protocol := server.Protocol
|
|
host := getNacosServiceFullHost(groupName, namespace, serviceName)
|
|
|
|
return func(services []model.Instance) {
|
|
defer w.UpdateService()
|
|
|
|
mcpServerLog.Infof("callback for %s/%s, serviceName : %s", configGroup, dataId, host)
|
|
configKey := strings.Join([]string{w.Name, w.NacosNamespace, configGroup, dataId}, DefaultJoiner)
|
|
if len(services) == 0 {
|
|
mcpServerLog.Errorf("callback for %s return empty service instance list, skip generate config", host)
|
|
return
|
|
}
|
|
|
|
serviceEntry := w.generateServiceEntry(host, services)
|
|
se := &config.Config{
|
|
Meta: config.Meta{
|
|
GroupVersionKind: gvk.ServiceEntry,
|
|
Name: fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedSeName, configGroup, strings.TrimSuffix(dataId, ".json")),
|
|
Namespace: "mcp",
|
|
},
|
|
Spec: serviceEntry,
|
|
}
|
|
if protocol == provider.McpSSEProtocol {
|
|
destinationRule := w.generateDrForSSEService(host)
|
|
dr := &config.Config{
|
|
Meta: config.Meta{
|
|
GroupVersionKind: gvk.DestinationRule,
|
|
Name: fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedDrName, configGroup, strings.TrimSuffix(dataId, ".json")),
|
|
Namespace: w.namespace,
|
|
},
|
|
Spec: destinationRule,
|
|
}
|
|
w.cache.UpdateConfigCache(gvk.DestinationRule, configKey, dr, false)
|
|
}
|
|
w.cache.UpdateConfigCache(gvk.ServiceEntry, configKey, se, false)
|
|
vs := w.buildVirtualServiceForMcpServer(serviceEntry, configGroup, dataId, path, server)
|
|
w.cache.UpdateConfigCache(gvk.VirtualService, configKey, vs, false)
|
|
}
|
|
}
|
|
|
|
func (w *watcher) buildVirtualServiceForMcpServer(serviceentry *v1alpha3.ServiceEntry, group, dataId, path string, server *provider.McpServer) *config.Config {
|
|
if serviceentry == nil {
|
|
return nil
|
|
}
|
|
hosts := w.McpServerExportDomains
|
|
if len(hosts) == 0 {
|
|
hosts = []string{"*"}
|
|
}
|
|
var gateways []string
|
|
for _, host := range hosts {
|
|
cleanHost := common2.CleanHost(host)
|
|
// namespace/name, name format: (istio cluster id)-host
|
|
gateways = append(gateways, w.namespace+"/"+
|
|
common2.CreateConvertedName(w.clusterId, cleanHost),
|
|
common2.CreateConvertedName(constants.IstioIngressGatewayName, cleanHost))
|
|
}
|
|
routeName := fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedHttpRouteName, group, strings.TrimSuffix(dataId, ".json"))
|
|
mergePath := "/" + server.Name
|
|
if w.McpServerBaseUrl != "/" {
|
|
mergePath = strings.TrimSuffix(w.McpServerBaseUrl, "/") + mergePath
|
|
}
|
|
if path != "/" {
|
|
mergePath = mergePath + "/" + strings.TrimPrefix(path, "/")
|
|
}
|
|
|
|
vs := &v1alpha3.VirtualService{
|
|
Hosts: hosts,
|
|
Gateways: gateways,
|
|
Http: []*v1alpha3.HTTPRoute{{
|
|
Name: routeName,
|
|
Match: []*v1alpha3.HTTPMatchRequest{{
|
|
Uri: &v1alpha3.StringMatch{
|
|
MatchType: &v1alpha3.StringMatch_Prefix{
|
|
Prefix: mergePath,
|
|
},
|
|
},
|
|
}},
|
|
Rewrite: &v1alpha3.HTTPRewrite{
|
|
Uri: path,
|
|
},
|
|
Route: []*v1alpha3.HTTPRouteDestination{{
|
|
Destination: &v1alpha3.Destination{
|
|
Host: serviceentry.Hosts[0],
|
|
Port: &v1alpha3.PortSelector{
|
|
Number: serviceentry.Ports[0].Number,
|
|
},
|
|
},
|
|
}},
|
|
}},
|
|
}
|
|
|
|
if server.Protocol == provider.McpStreambleProtocol {
|
|
vs.Http[0].Rewrite = &v1alpha3.HTTPRewrite{
|
|
Uri: path,
|
|
}
|
|
}
|
|
|
|
mcpServerLog.Debugf("construct virtualservice %v", vs)
|
|
|
|
return &config.Config{
|
|
Meta: config.Meta{
|
|
GroupVersionKind: gvk.VirtualService,
|
|
Name: fmt.Sprintf("%s-%s-%s", provider.IstioMcpAutoGeneratedVsName, group, dataId),
|
|
Namespace: w.namespace,
|
|
},
|
|
Spec: vs,
|
|
}
|
|
}
|
|
|
|
func (w *watcher) generateServiceEntry(host string, services []model.Instance) *v1alpha3.ServiceEntry {
|
|
portList := make([]*v1alpha3.ServicePort, 0)
|
|
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
|
|
isDnsService := false
|
|
|
|
for _, service := range services {
|
|
protocol := common.HTTP
|
|
if service.Metadata != nil && service.Metadata["protocol"] != "" {
|
|
protocol = common.ParseProtocol(service.Metadata["protocol"])
|
|
}
|
|
port := &v1alpha3.ServicePort{
|
|
Name: protocol.String(),
|
|
Number: uint32(service.Port),
|
|
Protocol: protocol.String(),
|
|
}
|
|
if len(portList) == 0 {
|
|
portList = append(portList, port)
|
|
}
|
|
if !isValidIP(service.Ip) {
|
|
isDnsService = true
|
|
}
|
|
endpoint := &v1alpha3.WorkloadEntry{
|
|
Address: service.Ip,
|
|
Ports: map[string]uint32{port.Protocol: port.Number},
|
|
Labels: service.Metadata,
|
|
}
|
|
endpoints = append(endpoints, endpoint)
|
|
}
|
|
|
|
resolution := v1alpha3.ServiceEntry_STATIC
|
|
if isDnsService {
|
|
resolution = v1alpha3.ServiceEntry_DNS
|
|
}
|
|
se := &v1alpha3.ServiceEntry{
|
|
Hosts: []string{host},
|
|
Ports: portList,
|
|
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
|
|
Resolution: resolution,
|
|
Endpoints: endpoints,
|
|
}
|
|
|
|
return se
|
|
}
|
|
|
|
func (w *watcher) generateDrForSSEService(host string) *v1alpha3.DestinationRule {
|
|
dr := &v1alpha3.DestinationRule{
|
|
Host: host,
|
|
TrafficPolicy: &v1alpha3.TrafficPolicy{
|
|
LoadBalancer: &v1alpha3.LoadBalancerSettings{
|
|
LbPolicy: &v1alpha3.LoadBalancerSettings_ConsistentHash{
|
|
ConsistentHash: &v1alpha3.LoadBalancerSettings_ConsistentHashLB{
|
|
HashKey: &v1alpha3.LoadBalancerSettings_ConsistentHashLB_UseSourceIp{
|
|
UseSourceIp: true,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
return dr
|
|
}
|
|
|
|
func parseMcpArgs(args interface{}) (*provider.ToolArgs, error) {
|
|
argsData, err := json.Marshal(args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
toolArgs := &provider.ToolArgs{}
|
|
if err = json.Unmarshal(argsData, toolArgs); err != nil {
|
|
return nil, err
|
|
}
|
|
return toolArgs, nil
|
|
}
|
|
|
|
func getArgsPositionFromToolMeta(toolMeta *provider.ToolsMeta) (map[string]string, error) {
|
|
result := map[string]string{}
|
|
if toolMeta == nil {
|
|
return result, nil
|
|
}
|
|
toolTemplate := toolMeta.Templates
|
|
for kind, meta := range toolTemplate {
|
|
switch kind {
|
|
case provider.JsonGoTemplateType:
|
|
templateData, err := json.Marshal(meta)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
template := &provider.JsonGoTemplate{}
|
|
if err = json.Unmarshal(templateData, template); err != nil {
|
|
return result, err
|
|
}
|
|
result = mergeMaps(result, template.ArgsPosition)
|
|
default:
|
|
return result, fmt.Errorf("unsupport tool meta type %v", kind)
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func getRequestTemplateFromToolMeta(toolMeta *provider.ToolsMeta) (*provider.RequestTemplate, error) {
|
|
if toolMeta == nil {
|
|
return nil, nil
|
|
}
|
|
toolTemplate := toolMeta.Templates
|
|
for kind, meta := range toolTemplate {
|
|
switch kind {
|
|
case provider.JsonGoTemplateType:
|
|
templateData, err := json.Marshal(meta)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
template := &provider.JsonGoTemplate{}
|
|
if err = json.Unmarshal(templateData, template); err != nil {
|
|
return nil, err
|
|
}
|
|
return &template.RequestTemplate, nil
|
|
default:
|
|
return nil, fmt.Errorf("unsupport tool meta type")
|
|
}
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func getResponseTemplateFromToolMeta(toolMeta *provider.ToolsMeta) (*provider.ResponseTemplate, error) {
|
|
if toolMeta == nil {
|
|
return nil, nil
|
|
}
|
|
toolTemplate := toolMeta.Templates
|
|
for kind, meta := range toolTemplate {
|
|
switch kind {
|
|
case provider.JsonGoTemplateType:
|
|
templateData, err := json.Marshal(meta)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
template := &provider.JsonGoTemplate{}
|
|
if err = json.Unmarshal(templateData, template); err != nil {
|
|
return nil, err
|
|
}
|
|
return &template.ResponseTemplate, nil
|
|
default:
|
|
return nil, fmt.Errorf("unsupport tool meta type")
|
|
}
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func mergeMaps(maps ...map[string]string) map[string]string {
|
|
if len(maps) == 0 {
|
|
return nil
|
|
}
|
|
res := make(map[string]string, len(maps[0]))
|
|
for _, m := range maps {
|
|
for k, v := range m {
|
|
res[k] = v
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
func getNacosServiceFullHost(groupName, namespace, serviceName string) string {
|
|
suffix := strings.Join([]string{groupName, namespace, string(provider.Nacos)}, common.DotSeparator)
|
|
host := strings.Join([]string{serviceName, suffix}, common.DotSeparator)
|
|
return host
|
|
}
|
|
|
|
func (w *watcher) Stop() {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
mcpServerLog.Infof("unsubscribe all configs")
|
|
for key := range w.watchingConfig {
|
|
s := strings.Split(key, DefaultJoiner)
|
|
err := w.unsubscribe(s[0], s[1])
|
|
if err == nil {
|
|
delete(w.watchingConfig, key)
|
|
}
|
|
}
|
|
mcpServerLog.Infof("stop all service nameing client")
|
|
for _, client := range w.serviceCache {
|
|
// TODO: This is a temporary implementation because of a bug in the nacos-go-sdk, which causes a block when stoping.
|
|
go client.Stop()
|
|
}
|
|
|
|
w.isStop = true
|
|
mcpServerLog.Infof("stop all config client")
|
|
mcpServerLog.Infof("watcher %v stop", w.Name)
|
|
|
|
close(w.stop)
|
|
w.Ready(false)
|
|
}
|
|
|
|
func (w *watcher) IsHealthy() bool {
|
|
return w.Status == provider.Healthy
|
|
}
|
|
|
|
func (w *watcher) GetRegistryType() string {
|
|
return w.RegistryType.String()
|
|
}
|
|
|
|
func isValidIP(ipStr string) bool {
|
|
ip := net.ParseIP(ipStr)
|
|
return ip != nil
|
|
}
|