refactor(v2): upgrade module to github.com/alibaba/higress/v2 (#2922)

Signed-off-by: Xijun Dai <daixijun1990@gmail.com>
This commit is contained in:
Xijun Dai
2025-09-21 14:29:07 +08:00
committed by GitHub
parent cd2082033c
commit 47827ad271
235 changed files with 1077 additions and 1045 deletions

View File

@@ -13,8 +13,10 @@ import (
"google.golang.org/protobuf/types/known/anypb"
)
const Name = "mcp-server"
const Version = "1.0.0"
const (
Name = "mcp-server"
Version = "1.0.0"
)
type SSEServerWrapper struct {
BaseServer *common.SSEServer
@@ -31,8 +33,7 @@ func (c *config) Destroy() {
}
}
type Parser struct {
}
type Parser struct{}
func (p *Parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) {
configStruct := &xds.TypedStruct{}

View File

@@ -24,8 +24,10 @@ type NacosMcpRegistry struct {
currentServiceSet map[string]bool
}
const DEFAULT_SERVICE_LIST_MAX_PGSIZXE = 10000
const MCP_TOOL_SUBFIX = "-mcp-tools.json"
const (
DEFAULT_SERVICE_LIST_MAX_PGSIZXE = 10000
MCP_TOOL_SUBFIX = "-mcp-tools.json"
)
func (n *NacosMcpRegistry) ListToolsDescription() []*registry.ToolDescription {
if n.toolsDescription == nil {
@@ -67,7 +69,6 @@ func (n *NacosMcpRegistry) refreshToolsListForGroup(group string, serviceMatcher
PageNo: 1,
PageSize: DEFAULT_SERVICE_LIST_MAX_PGSIZXE,
})
if err != nil {
api.LogError(fmt.Sprintf("Get service list error when refresh tools list for group %s, error %s", group, err))
return false
@@ -101,7 +102,7 @@ func (n *NacosMcpRegistry) refreshToolsListForGroup(group string, serviceMatcher
}
serviceShouldBeDeleted := []string{}
for serviceName, _ := range n.currentServiceSet {
for serviceName := range n.currentServiceSet {
if !strings.HasPrefix(serviceName, group) {
continue
}
@@ -110,7 +111,7 @@ func (n *NacosMcpRegistry) refreshToolsListForGroup(group string, serviceMatcher
serviceShouldBeDeleted = append(serviceShouldBeDeleted, serviceName)
changed = true
toolsShouldBeDeleted := []string{}
for toolName, _ := range n.toolsDescription {
for toolName := range n.toolsDescription {
if strings.HasPrefix(toolName, serviceName) {
toolsShouldBeDeleted = append(toolsShouldBeDeleted, toolName)
}
@@ -138,7 +139,7 @@ func (n *NacosMcpRegistry) deleteToolForService(group string, service string) {
toolsNeedReset := []string{}
formatServiceName := getFormatServiceName(group, service)
for tool, _ := range n.toolsDescription {
for tool := range n.toolsDescription {
if strings.HasPrefix(tool, formatServiceName) {
toolsNeedReset = append(toolsNeedReset, tool)
}
@@ -151,14 +152,12 @@ func (n *NacosMcpRegistry) deleteToolForService(group string, service string) {
}
func (n *NacosMcpRegistry) refreshToolsListForServiceWithContent(group string, service string, newConfig *string, instances *[]model.Instance) bool {
if newConfig == nil {
dataId := makeToolsConfigId(service)
content, err := n.configClient.GetConfig(vo.ConfigParam{
DataId: dataId,
Group: group,
})
if err != nil {
api.LogError(fmt.Sprintf("Get tools config for sercice %s:%s error %s", group, service, err))
return false
@@ -173,7 +172,6 @@ func (n *NacosMcpRegistry) refreshToolsListForServiceWithContent(group string, s
GroupName: group,
HealthyOnly: true,
})
if err != nil {
api.LogError(fmt.Sprintf("List instance for sercice %s:%s error %s", group, service, err))
return false
@@ -249,7 +247,6 @@ func (n *NacosMcpRegistry) GetCredential(name string, group string) *registry.Cr
DataId: dataId,
Group: group,
})
if err != nil {
api.LogError(fmt.Sprintf("Get credentials for %s:%s error %s", group, dataId, err))
return nil
@@ -270,7 +267,6 @@ func (n *NacosMcpRegistry) refreshToolsListForService(group string, service stri
}
func (n *NacosMcpRegistry) listenToService(group string, service string) {
// config changed, tools description may be changed
err := n.configClient.ListenConfig(vo.ConfigParam{
DataId: makeToolsConfigId(service),
@@ -282,7 +278,6 @@ func (n *NacosMcpRegistry) listenToService(group string, service string) {
}
},
})
if err != nil {
api.LogError(fmt.Sprintf("Listen to service's tool config error %s", err))
}

View File

@@ -40,7 +40,7 @@ func CreateNacosMcpRegistry(config *NacosConfig) (*NacosMcpRegistry, error) {
*constant.NewServerConfig(*config.ServerAddr, 8848, constant.WithContextPath("/nacos")),
}
//create ClientConfig
// create ClientConfig
cc := *constant.NewClientConfig(
constant.WithTimeoutMs(5000),
constant.WithNotLoadCacheAtStart(true),
@@ -74,7 +74,6 @@ func CreateNacosMcpRegistry(config *NacosConfig) (*NacosMcpRegistry, error) {
ServerConfigs: sc,
},
)
if err != nil {
return nil, fmt.Errorf("failed to initial nacos config client: %w", err)
}
@@ -85,7 +84,6 @@ func CreateNacosMcpRegistry(config *NacosConfig) (*NacosMcpRegistry, error) {
ServerConfigs: sc,
},
)
if err != nil {
return nil, fmt.Errorf("failed to initial naming config client: %w", err)
}
@@ -100,7 +98,6 @@ func CreateNacosMcpRegistry(config *NacosConfig) (*NacosMcpRegistry, error) {
}
func (c *NacosConfig) ParseConfig(config map[string]any) error {
serverAddr, ok := config["serverAddr"].(string)
if !ok {
return errors.New("missing serverAddr")

View File

@@ -13,13 +13,15 @@ import (
"github.com/mark3labs/mcp-go/mcp"
)
const HTTP_URL_TEMPLATE = "%s://%s:%d%s"
const FIX_QUERY_TOKEN_KEY = "key"
const FIX_QUERY_TOKEN_VALUE = "value"
const PROTOCOL_HTTP = "http"
const PROTOCOL_HTTPS = "https"
const DEFAULT_HTTP_METHOD = "GET"
const DEFAULT_HTTP_PATH = "/"
const (
HTTP_URL_TEMPLATE = "%s://%s:%d%s"
FIX_QUERY_TOKEN_KEY = "key"
FIX_QUERY_TOKEN_VALUE = "value"
PROTOCOL_HTTP = "http"
PROTOCOL_HTTPS = "https"
DEFAULT_HTTP_METHOD = "GET"
DEFAULT_HTTP_PATH = "/"
)
func getHttpCredentialHandle(name string) (func(*CredentialInfo, *HttpRemoteCallHandle), error) {
if name == "fixed-query-token" {

View File

@@ -98,7 +98,7 @@ func GetQueryToolSchema() json.RawMessage {
{
"type": "object",
"properties": {
"sql": {
"sql": {
"type": "string",
"description": "The sql query to execute"
}
@@ -113,7 +113,7 @@ func GetExecuteToolSchema() json.RawMessage {
{
"type": "object",
"properties": {
"sql": {
"sql": {
"type": "string",
"description": "The sql to execute"
}
@@ -128,7 +128,7 @@ func GetDescribeTableToolSchema() json.RawMessage {
{
"type": "object",
"properties": {
"table": {
"table": {
"type": "string",
"description": "table name"
}

View File

@@ -13,10 +13,12 @@ import (
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
)
const Name = "mcp-session"
const Version = "1.0.0"
const ConfigPathSuffix = "/config"
const DefaultServerName = "higress-mcp-server"
const (
Name = "mcp-session"
Version = "1.0.0"
ConfigPathSuffix = "/config"
DefaultServerName = "higress-mcp-server"
)
var GlobalSSEPathSuffix = "/sse"
@@ -35,8 +37,7 @@ func (c *config) Destroy() {
}
}
type Parser struct {
}
type Parser struct{}
// Parse the filter configuration
func (p *Parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) {

View File

@@ -19,13 +19,17 @@ import (
)
// 用于统计函数的递归调用次数
const ToolCallsCount = "ToolCallsCount"
const StreamContextKey = "Stream"
const (
ToolCallsCount = "ToolCallsCount"
StreamContextKey = "Stream"
)
// react的正则规则
const ActionPattern = `Action:\s*(.*?)[.\n]`
const ActionInputPattern = `Action Input:\s*(.*)`
const FinalAnswerPattern = `Final Answer:(.*)`
const (
ActionPattern = `Action:\s*(.*?)[.\n]`
ActionInputPattern = `Action Input:\s*(.*)`
FinalAnswerPattern = `Final Answer:(.*)`
)
func main() {}
@@ -76,7 +80,7 @@ func firstReq(ctx wrapper.HttpContext, config PluginConfig, prompt string, rawRe
rawRequest.Stream = false
}
//replace old message and resume request qwen
// replace old message and resume request qwen
newbody, err := json.Marshal(rawRequest)
if err != nil {
return types.ActionContinue
@@ -96,7 +100,7 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config PluginConfig, body []byte
log.Debug("onHttpRequestBody start")
defer log.Debug("onHttpRequestBody end")
//拿到请求
// 拿到请求
var rawRequest Request
err := json.Unmarshal(body, &rawRequest)
if err != nil {
@@ -105,7 +109,7 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config PluginConfig, body []byte
}
log.Debugf("onHttpRequestBody rawRequest: %v", rawRequest)
//获取用户query
// 获取用户query
var query string
var history string
messageLength := len(rawRequest.Messages)
@@ -129,7 +133,7 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config PluginConfig, body []byte
return types.ActionContinue
}
//拼装agent prompt模板
// 拼装agent prompt模板
tool_desc := make([]string, 0)
tool_names := make([]string, 0)
for _, apisParam := range config.APIsParam {
@@ -164,13 +168,13 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config PluginConfig, body []byte
ctx.SetContext(ToolCallsCount, 0)
//清理历史对话记录
// 清理历史对话记录
dashscope.MessageStore.Clear()
//将请求加入到历史对话存储器中
// 将请求加入到历史对话存储器中
dashscope.MessageStore.AddForUser(prompt)
//开始第一次请求
// 开始第一次请求
ret := firstReq(ctx, config, prompt, rawRequest, log)
return ret
@@ -221,7 +225,7 @@ func jsonFormat(llmClient wrapper.HttpClient, llmInfo LLMInfo, jsonSchema map[st
headers,
completionSerialized,
func(statusCode int, responseHeaders http.Header, responseBody []byte) {
//得到gpt的返回结果
// 得到gpt的返回结果
var responseCompletion dashscope.CompletionResponse
_ = json.Unmarshal(responseBody, &responseCompletion)
log.Infof("[jsonFormat] content: %s", responseCompletion.Choices[0].Message.Content)
@@ -299,7 +303,7 @@ func toolsCallResult(ctx wrapper.HttpContext, llmClient wrapper.HttpClient, llmI
headers,
completionSerialized,
func(statusCode int, responseHeaders http.Header, responseBody []byte) {
//得到gpt的返回结果
// 得到gpt的返回结果
var responseCompletion dashscope.CompletionResponse
_ = json.Unmarshal(responseBody, &responseCompletion)
log.Infof("[toolsCall] content: %s", responseCompletion.Choices[0].Message.Content)
@@ -307,7 +311,7 @@ func toolsCallResult(ctx wrapper.HttpContext, llmClient wrapper.HttpClient, llmI
if responseCompletion.Choices[0].Message.Content != "" {
retType, actionInput := toolsCall(ctx, llmClient, llmInfo, jsonResp, aPIsParam, aPIClient, responseCompletion.Choices[0].Message.Content, rawResponse, log)
if retType == types.ActionContinue {
//得到了Final Answer
// 得到了Final Answer
var assistantMessage Message
var streamMode bool
if ctx.GetContext(StreamContextKey) == nil {
@@ -388,14 +392,14 @@ func toolsCall(ctx wrapper.HttpContext, llmClient wrapper.HttpClient, llmInfo LL
action, actionInput := outputParser(content, log)
//得到最终答案
// 得到最终答案
if action == "Final Answer" {
return types.ActionContinue, actionInput
}
count := ctx.GetContext(ToolCallsCount).(int)
count++
log.Debugf("toolCallsCount:%d, config.LLMInfo.MaxIterations=%d", count, llmInfo.MaxIterations)
//函数递归调用次数,达到了预设的循环次数,强制结束
// 函数递归调用次数,达到了预设的循环次数,强制结束
if int64(count) > llmInfo.MaxIterations {
ctx.SetContext(ToolCallsCount, 0)
return types.ActionContinue, ""
@@ -403,7 +407,7 @@ func toolsCall(ctx wrapper.HttpContext, llmClient wrapper.HttpClient, llmInfo LL
ctx.SetContext(ToolCallsCount, count)
}
//没得到最终答案
// 没得到最终答案
var urlStr string
var headers [][2]string
@@ -419,7 +423,7 @@ func toolsCall(ctx wrapper.HttpContext, llmClient wrapper.HttpClient, llmInfo LL
log.Infof("calls %s", tools_param.ToolName)
log.Infof("actionInput: %s", actionInput)
//将大模型需要的参数反序列化
// 将大模型需要的参数反序列化
var data map[string]interface{}
if err := json.Unmarshal([]byte(actionInput), &data); err != nil {
log.Debugf("Error: %s", err.Error())
@@ -522,7 +526,7 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config PluginConfig, body []byt
log.Debugf("onHttpResponseBody start")
defer log.Debugf("onHttpResponseBody end")
//初始化接收gpt返回内容的结构体
// 初始化接收gpt返回内容的结构体
var rawResponse Response
err := json.Unmarshal(body, &rawResponse)
if err != nil {
@@ -530,9 +534,9 @@ func onHttpResponseBody(ctx wrapper.HttpContext, config PluginConfig, body []byt
return types.ActionContinue
}
log.Infof("first content: %s", rawResponse.Choices[0].Message.Content)
//如果gpt返回的内容不是空的
// 如果gpt返回的内容不是空的
if rawResponse.Choices[0].Message.Content != "" {
//进入agent的循环思考工具调用的过程中
// 进入agent的循环思考工具调用的过程中
retType, _ := toolsCall(ctx, config.LLMClient, config.LLMInfo, config.JsonResp, config.APIsParam, config.APIClient, rawResponse.Choices[0].Message.Content, rawResponse, log)
return retType
} else {

View File

@@ -28,7 +28,6 @@ func CheckCacheForKey(key string, ctx wrapper.HttpContext, c config.PluginConfig
err := activeCacheProvider.Get(queryKey, func(response resp.Value) {
handleCacheResponse(key, response, ctx, log, stream, c, useSimilaritySearch)
})
if err != nil {
log.Errorf("[%s] [CheckCacheForKey] failed to retrieve key: %s from cache, error: %v", PLUGIN_NAME, key, err)
return err

View File

@@ -83,7 +83,6 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, c config.PluginConfig, log lo
}
func onHttpRequestBody(ctx wrapper.HttpContext, c config.PluginConfig, body []byte, log log.Log) types.Action {
bodyJson := gjson.ParseBytes(body)
// TODO: It may be necessary to support stream mode determination for different LLM providers.
stream := false