mirror of
https://github.com/alibaba/higress.git
synced 2026-03-02 07:30:49 +08:00
Feat dynamic tool reset (#2031)
This commit is contained in:
@@ -101,4 +101,4 @@ require (
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
replace github.com/nacos-group/nacos-sdk-go/v2 v2.2.9 => github.com/luoxiner/nacos-sdk-go/v2 v2.2.9-30
|
||||
replace github.com/nacos-group/nacos-sdk-go/v2 v2.2.9 => github.com/luoxiner/nacos-sdk-go/v2 v2.2.9-40
|
||||
|
||||
@@ -40,6 +40,9 @@ func (n *NacosMcpRegsitry) ListToolsDesciption() []*registry.ToolDescription {
|
||||
}
|
||||
|
||||
func (n *NacosMcpRegsitry) GetToolRpcContext(toolName string) (*registry.RpcContext, bool) {
|
||||
if n.toolsRpcContext == nil {
|
||||
n.refreshToolsList()
|
||||
}
|
||||
tool, ok := n.toolsRpcContext[toolName]
|
||||
return tool, ok
|
||||
}
|
||||
@@ -87,9 +90,11 @@ func (n *NacosMcpRegsitry) refreshToolsListForGroup(group string, serviceMatcher
|
||||
|
||||
formatServiceName := getFormatServiceName(group, service)
|
||||
if _, ok := n.currentServiceSet[formatServiceName]; !ok {
|
||||
changed = true
|
||||
n.refreshToolsListForService(group, service)
|
||||
refreshed := n.refreshToolsListForService(group, service)
|
||||
n.listenToService(group, service)
|
||||
if refreshed {
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
|
||||
currentServiceList[formatServiceName] = true
|
||||
@@ -129,7 +134,23 @@ func getFormatServiceName(group string, service string) string {
|
||||
return fmt.Sprintf("%s_%s", group, service)
|
||||
}
|
||||
|
||||
func (n *NacosMcpRegsitry) refreshToolsListForServiceWithContent(group string, service string, newConfig *string, instances *[]model.Instance) {
|
||||
func (n *NacosMcpRegsitry) deleteToolForService(group string, service string) {
|
||||
toolsNeedReset := []string{}
|
||||
|
||||
formatServiceName := getFormatServiceName(group, service)
|
||||
for tool, _ := range n.toolsDescription {
|
||||
if strings.HasPrefix(tool, formatServiceName) {
|
||||
toolsNeedReset = append(toolsNeedReset, tool)
|
||||
}
|
||||
}
|
||||
|
||||
for _, tool := range toolsNeedReset {
|
||||
delete(n.toolsDescription, tool)
|
||||
delete(n.toolsRpcContext, tool)
|
||||
}
|
||||
}
|
||||
|
||||
func (n *NacosMcpRegsitry) refreshToolsListForServiceWithContent(group string, service string, newConfig *string, instances *[]model.Instance) bool {
|
||||
|
||||
if newConfig == nil {
|
||||
dataId := makeToolsConfigId(service)
|
||||
@@ -140,7 +161,7 @@ func (n *NacosMcpRegsitry) refreshToolsListForServiceWithContent(group string, s
|
||||
|
||||
if err != nil {
|
||||
api.LogError(fmt.Sprintf("Get tools config for sercice %s:%s error %s", group, service, err))
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
newConfig = &content
|
||||
@@ -155,17 +176,27 @@ func (n *NacosMcpRegsitry) refreshToolsListForServiceWithContent(group string, s
|
||||
|
||||
if err != nil {
|
||||
api.LogError(fmt.Sprintf("List instance for sercice %s:%s error %s", group, service, err))
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
instances = &instancesFromNacos
|
||||
}
|
||||
|
||||
var applicationDescription registry.McpApplicationDescription
|
||||
if newConfig == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// config deleted, tools should be removed
|
||||
if len(*newConfig) == 0 {
|
||||
n.deleteToolForService(group, service)
|
||||
return true
|
||||
}
|
||||
|
||||
err := json.Unmarshal([]byte(*newConfig), &applicationDescription)
|
||||
if err != nil {
|
||||
api.LogError(fmt.Sprintf("Parse tools config for sercice %s:%s error, config is %s, error is %s", group, service, *newConfig, err))
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
wrappedInstances := []registry.Instance{}
|
||||
@@ -186,6 +217,8 @@ func (n *NacosMcpRegsitry) refreshToolsListForServiceWithContent(group string, s
|
||||
n.toolsRpcContext = map[string]*registry.RpcContext{}
|
||||
}
|
||||
|
||||
n.deleteToolForService(group, service)
|
||||
|
||||
for _, tool := range applicationDescription.ToolsDescription {
|
||||
meta := applicationDescription.ToolsMeta[tool.Name]
|
||||
|
||||
@@ -207,6 +240,7 @@ func (n *NacosMcpRegsitry) refreshToolsListForServiceWithContent(group string, s
|
||||
n.toolsRpcContext[tool.Name] = &context
|
||||
}
|
||||
n.currentServiceSet[getFormatServiceName(group, service)] = true
|
||||
return true
|
||||
}
|
||||
|
||||
func (n *NacosMcpRegsitry) GetCredential(name string, group string) *registry.CredentialInfo {
|
||||
@@ -231,8 +265,8 @@ func (n *NacosMcpRegsitry) GetCredential(name string, group string) *registry.Cr
|
||||
return &credential
|
||||
}
|
||||
|
||||
func (n *NacosMcpRegsitry) refreshToolsListForService(group string, service string) {
|
||||
n.refreshToolsListForServiceWithContent(group, service, nil, nil)
|
||||
func (n *NacosMcpRegsitry) refreshToolsListForService(group string, service string) bool {
|
||||
return n.refreshToolsListForServiceWithContent(group, service, nil, nil)
|
||||
}
|
||||
|
||||
func (n *NacosMcpRegsitry) listenToService(group string, service string) {
|
||||
|
||||
@@ -50,8 +50,11 @@ func FixedQueryToken(cred *CredentialInfo, h *HttpRemoteCallHandle) {
|
||||
h.Query[key.(string)] = value.(string)
|
||||
}
|
||||
|
||||
func newHttpRemoteCallHandle(ctx *RpcContext) *HttpRemoteCallHandle {
|
||||
instance := selectOneInstance(ctx)
|
||||
func newHttpRemoteCallHandle(ctx *RpcContext) (*HttpRemoteCallHandle, error) {
|
||||
instance, err := selectOneInstance(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
method, ok := ctx.ToolMeta.InvokeContext["method"]
|
||||
if !ok {
|
||||
method = DEFAULT_HTTP_METHOD
|
||||
@@ -64,7 +67,7 @@ func newHttpRemoteCallHandle(ctx *RpcContext) *HttpRemoteCallHandle {
|
||||
|
||||
return &HttpRemoteCallHandle{
|
||||
CommonRemoteCallHandle: CommonRemoteCallHandle{
|
||||
Instance: &instance,
|
||||
Instance: instance,
|
||||
},
|
||||
Protocol: ctx.Protocol,
|
||||
Headers: http.Header{},
|
||||
@@ -72,7 +75,7 @@ func newHttpRemoteCallHandle(ctx *RpcContext) *HttpRemoteCallHandle {
|
||||
Query: map[string]string{},
|
||||
Path: path,
|
||||
Method: method,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// http remote handle implementation
|
||||
@@ -160,20 +163,25 @@ func (h *HttpRemoteCallHandle) doHttpCall() (*http.Response, error) {
|
||||
return http.DefaultClient.Do(&request)
|
||||
}
|
||||
|
||||
func selectOneInstance(ctx *RpcContext) Instance {
|
||||
func selectOneInstance(ctx *RpcContext) (*Instance, error) {
|
||||
instanceId := 0
|
||||
if ctx.Instances == nil || len(*ctx.Instances) == 0 {
|
||||
return nil, fmt.Errorf("No instance")
|
||||
}
|
||||
|
||||
instances := *ctx.Instances
|
||||
if len(instances) != 1 {
|
||||
if len(instances) > 1 {
|
||||
instanceId = rand.Intn(len(instances) - 1)
|
||||
}
|
||||
return instances[instanceId]
|
||||
select_instance := instances[instanceId]
|
||||
return &select_instance, nil
|
||||
}
|
||||
|
||||
func getRemoteCallhandle(ctx *RpcContext) RemoteCallHandle {
|
||||
func getRemoteCallhandle(ctx *RpcContext) (RemoteCallHandle, error) {
|
||||
if ctx.Protocol == PROTOCOL_HTTP || ctx.Protocol == PROTOCOL_HTTPS {
|
||||
return newHttpRemoteCallHandle(ctx)
|
||||
} else {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,11 +192,15 @@ func CommonRemoteCall(reg McpServerRegistry, toolName string, parameters map[str
|
||||
return nil, fmt.Errorf("Unknown tool %s", toolName)
|
||||
}
|
||||
|
||||
remoteHandle := getRemoteCallhandle(ctx)
|
||||
remoteHandle, err := getRemoteCallhandle(ctx)
|
||||
if remoteHandle == nil {
|
||||
return nil, fmt.Errorf("Unknown backend protocol %s", ctx.Protocol)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Call backend server error: %w", err)
|
||||
}
|
||||
|
||||
return remoteHandle.HandleToolCall(ctx, parameters)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user