mirror of
https://github.com/alibaba/higress.git
synced 2026-05-21 19:27:28 +08:00
fix: update log info to debug (#1954)
This commit is contained in:
65
plugins/golang-filter/mcp-server/README.md
Normal file
65
plugins/golang-filter/mcp-server/README.md
Normal file
@@ -0,0 +1,65 @@
|
||||
# MCP Server
|
||||
[English](./README_en.md) | 简体中文
|
||||
|
||||
## 概述
|
||||
|
||||
MCP Server 是一个基于 Envoy 的 Golang Filter 插件,用于实现服务器端事件(SSE)和消息通信功能。该插件支持多种数据库类型,并使用 Redis 作为消息队列来实现负载均衡的请求通过对应的SSE连接发送。
|
||||
|
||||
> **注意**:MCP Server需要 Higress 2.1.0 或更高版本才能使用。
|
||||
## 项目结构
|
||||
```
|
||||
mcp-server/
|
||||
├── config.go # 配置解析相关代码
|
||||
├── filter.go # 请求处理相关代码
|
||||
├── internal/ # 内部实现逻辑
|
||||
├── servers/ # MCP 服务器实现
|
||||
├── go.mod # Go模块依赖定义
|
||||
└── go.sum # Go模块依赖校验
|
||||
```
|
||||
## MCP Server开发指南
|
||||
|
||||
```go
|
||||
// 在init函数中注册你的服务器
|
||||
// 参数1: 服务器名称
|
||||
// 参数2: 配置结构体实例
|
||||
func init() {
|
||||
internal.GlobalRegistry.RegisterServer("demo", &DemoConfig{})
|
||||
}
|
||||
|
||||
// 服务器配置结构体
|
||||
type DemoConfig struct {
|
||||
helloworld string
|
||||
}
|
||||
|
||||
// 解析配置方法
|
||||
// 从配置map中解析并验证配置项
|
||||
func (c *DBConfig) ParseConfig(config map[string]any) error {
|
||||
helloworld, ok := config["helloworld"].(string)
|
||||
if !ok { return errors.New("missing helloworld")}
|
||||
c.helloworld = helloworld
|
||||
return nil
|
||||
}
|
||||
|
||||
// 创建新的MCP服务器实例
|
||||
// serverName: 服务器名称
|
||||
// 返回值: MCP服务器实例和可能的错误
|
||||
func (c *DBConfig) NewServer(serverName string) (*internal.MCPServer, error) {
|
||||
mcpServer := internal.NewMCPServer(serverName, Version)
|
||||
|
||||
// 添加工具方法到服务器
|
||||
// mcpServer.AddTool()
|
||||
|
||||
// 添加资源到服务器
|
||||
// mcpServer.AddResource()
|
||||
|
||||
return mcpServer, nil
|
||||
}
|
||||
```
|
||||
|
||||
**Note**:
|
||||
需要在config.go里面使用下划线导入以执行包的init函数
|
||||
```go
|
||||
import (
|
||||
_ "github.com/alibaba/higress/plugins/golang-filter/mcp-server/servers/gorm"
|
||||
)
|
||||
```
|
||||
67
plugins/golang-filter/mcp-server/README_en.md
Normal file
67
plugins/golang-filter/mcp-server/README_en.md
Normal file
@@ -0,0 +1,67 @@
|
||||
# MCP Server
|
||||
English | [简体中文](./README.md)
|
||||
|
||||
## Overview
|
||||
|
||||
MCP Server is a Golang Filter plugin based on Envoy, designed to implement Server-Sent Events (SSE) and message communication functionality. This plugin supports various database types and uses Redis as a message queue to enable load-balanced requests to be sent through corresponding SSE connections.
|
||||
|
||||
> **Note**: MCP Server requires Higress 2.1.0 or higher version.
|
||||
|
||||
## Project Structure
|
||||
```
|
||||
mcp-server/
|
||||
├── config.go # Configuration parsing code
|
||||
├── filter.go # Request processing code
|
||||
├── internal/ # Internal implementation logic
|
||||
├── servers/ # MCP server implementation
|
||||
├── go.mod # Go module dependency definition
|
||||
└── go.sum # Go module dependency checksum
|
||||
```
|
||||
|
||||
## MCP Server Development Guide
|
||||
|
||||
```go
|
||||
// Register your server in the init function
|
||||
// Param 1: Server name
|
||||
// Param 2: Config struct instance
|
||||
func init() {
|
||||
internal.GlobalRegistry.RegisterServer("demo", &DemoConfig{})
|
||||
}
|
||||
|
||||
// Server configuration struct
|
||||
type DemoConfig struct {
|
||||
helloworld string
|
||||
}
|
||||
|
||||
// Configuration parsing method
|
||||
// Parse and validate configuration items from the config map
|
||||
func (c *DBConfig) ParseConfig(config map[string]any) error {
|
||||
helloworld, ok := config["helloworld"].(string)
|
||||
if !ok { return errors.New("missing helloworld")}
|
||||
c.helloworld = helloworld
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create a new MCP server instance
|
||||
// serverName: Server name
|
||||
// Returns: MCP server instance and possible error
|
||||
func (c *DBConfig) NewServer(serverName string) (*internal.MCPServer, error) {
|
||||
mcpServer := internal.NewMCPServer(serverName, Version)
|
||||
|
||||
// Add tool methods to server
|
||||
// mcpServer.AddTool()
|
||||
|
||||
// Add resources to server
|
||||
// mcpServer.AddResource()
|
||||
|
||||
return mcpServer, nil
|
||||
}
|
||||
```
|
||||
|
||||
**Note**:
|
||||
Need to use underscore import in config.go to execute the package's init function
|
||||
```go
|
||||
import (
|
||||
_ "github.com/alibaba/higress/plugins/golang-filter/mcp-server/servers/gorm"
|
||||
)
|
||||
```
|
||||
@@ -14,7 +14,8 @@ import (
|
||||
|
||||
const Name = "mcp-server"
|
||||
const Version = "1.0.0"
|
||||
const DefaultServerName = "default"
|
||||
const DefaultServerName = "defaultServer"
|
||||
const MessageEndpoint = "/message"
|
||||
|
||||
func init() {
|
||||
envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
|
||||
@@ -66,7 +67,7 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int
|
||||
|
||||
serverConfigs, ok := v.AsMap()["servers"].([]interface{})
|
||||
if !ok {
|
||||
api.LogInfo("No servers are configured")
|
||||
api.LogDebug("No servers are configured")
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
@@ -83,21 +84,36 @@ func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (int
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("server %s path is not set", serverType)
|
||||
}
|
||||
serverName, ok := serverConfigMap["name"].(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("server %s name is not set", serverType)
|
||||
}
|
||||
server := internal.GlobalRegistry.GetServer(serverType)
|
||||
|
||||
if server == nil {
|
||||
return nil, fmt.Errorf("server %s is not registered", serverType)
|
||||
}
|
||||
server.ParseConfig(serverConfigMap)
|
||||
serverInstance, err := server.NewServer()
|
||||
serverConfig, ok := serverConfigMap["config"].(map[string]interface{})
|
||||
if !ok {
|
||||
api.LogDebug(fmt.Sprintf("No config provided for server %s", serverType))
|
||||
}
|
||||
api.LogDebug(fmt.Sprintf("Server config: %+v", serverConfig))
|
||||
|
||||
err = server.ParseConfig(serverConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse server config: %w", err)
|
||||
}
|
||||
|
||||
serverInstance, err := server.NewServer(serverName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize DBServer: %w", err)
|
||||
}
|
||||
|
||||
conf.servers = append(conf.servers, internal.NewSSEServer(serverInstance,
|
||||
internal.WithRedisClient(redisClient),
|
||||
internal.WithSSEEndpoint(fmt.Sprintf("%s%s", serverPath, ssePathSuffix)),
|
||||
internal.WithMessageEndpoint(serverPath)))
|
||||
api.LogInfo(fmt.Sprintf("Registered MCP Server: %s", serverType))
|
||||
internal.WithMessageEndpoint(fmt.Sprintf("%s%s", serverPath, MessageEndpoint))))
|
||||
api.LogDebug(fmt.Sprintf("Registered MCP Server: %s", serverType))
|
||||
}
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ func NewRequestURL(header api.RequestHeaderMap) *RequestURL {
|
||||
path, _ := header.Get(":path")
|
||||
baseURL := fmt.Sprintf("%s://%s", scheme, host)
|
||||
parsedURL, _ := url.Parse(path)
|
||||
api.LogInfof("RequestURL: method=%s, scheme=%s, host=%s, path=%s", method, scheme, host, path)
|
||||
api.LogDebugf("RequestURL: method=%s, scheme=%s, host=%s, path=%s", method, scheme, host, path)
|
||||
return &RequestURL{method: method, scheme: scheme, host: host, path: path, baseURL: baseURL, parsedURL: parsedURL}
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.
|
||||
body := "SSE connection create"
|
||||
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusOK, body, nil, 0, "")
|
||||
}
|
||||
api.LogInfof("%s SSE connection started", server.GetServerName())
|
||||
api.LogDebugf("%s SSE connection started", server.GetServerName())
|
||||
server.SetBaseURL(url.baseURL)
|
||||
return api.LocalReply
|
||||
} else if f.path == server.GetMessageEndpoint() {
|
||||
@@ -181,7 +181,7 @@ func (f *filter) OnDestroy(reason api.DestroyReason) {
|
||||
case <-f.config.stopChan:
|
||||
return
|
||||
default:
|
||||
api.LogInfo("Stopping SSE connection")
|
||||
api.LogDebug("Stopping SSE connection")
|
||||
close(f.config.stopChan)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ func NewRedisClient(config *RedisConfig, stopChan chan struct{}) (*RedisClient,
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
|
||||
}
|
||||
api.LogInfof("Connected to Redis: %s", pong)
|
||||
api.LogDebugf("Connected to Redis: %s", pong)
|
||||
|
||||
redisClient := &RedisClient{
|
||||
client: client,
|
||||
@@ -127,7 +127,7 @@ func (r *RedisClient) reconnect() error {
|
||||
return fmt.Errorf("failed to reconnect to Redis: %w", err)
|
||||
}
|
||||
|
||||
api.LogInfof("Successfully reconnected to Redis")
|
||||
api.LogDebugf("Successfully reconnected to Redis")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -151,18 +151,18 @@ func (r *RedisClient) Subscribe(channel string, callback func(message string)) e
|
||||
go func() {
|
||||
defer func() {
|
||||
pubsub.Close()
|
||||
api.LogInfof("Closed subscription to channel %s", channel)
|
||||
api.LogDebugf("Closed subscription to channel %s", channel)
|
||||
}()
|
||||
|
||||
ch := pubsub.Channel()
|
||||
for {
|
||||
select {
|
||||
case <-r.stopChan:
|
||||
api.LogInfof("Stopping subscription to channel %s", channel)
|
||||
api.LogDebugf("Stopping subscription to channel %s", channel)
|
||||
return
|
||||
case msg, ok := <-ch:
|
||||
if !ok {
|
||||
api.LogInfof("Redis subscription channel closed for %s", channel)
|
||||
api.LogDebugf("Redis subscription channel closed for %s", channel)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ var GlobalRegistry = NewServerRegistry()
|
||||
|
||||
type Server interface {
|
||||
ParseConfig(config map[string]any) error
|
||||
NewServer() (*MCPServer, error)
|
||||
NewServer(serverName string) (*MCPServer, error)
|
||||
}
|
||||
|
||||
type ServerRegistry struct {
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
)
|
||||
|
||||
@@ -279,7 +278,7 @@ func (s *MCPServer) HandleMessage(
|
||||
s.handleNotification(ctx, notification)
|
||||
return nil // Return nil for notifications
|
||||
}
|
||||
api.LogInfof("HandleMessage: %s", baseMessage.Method)
|
||||
|
||||
switch baseMessage.Method {
|
||||
case "initialize":
|
||||
var request mcp.InitializeRequest
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
|
||||
"github.com/google/uuid"
|
||||
@@ -126,7 +127,7 @@ func (s *SSEServer) HandleSSE(cb api.FilterCallbackHandler) {
|
||||
|
||||
err := s.redisClient.Subscribe(channel, func(message string) {
|
||||
defer cb.EncoderFilterCallbacks().RecoverPanic()
|
||||
api.LogInfof("SSE Send message: %s", message)
|
||||
api.LogDebugf("SSE Send message: %s", message)
|
||||
cb.EncoderFilterCallbacks().InjectData([]byte(message))
|
||||
})
|
||||
if err != nil {
|
||||
@@ -135,7 +136,29 @@ func (s *SSEServer) HandleSSE(cb api.FilterCallbackHandler) {
|
||||
|
||||
// Send the initial endpoint event
|
||||
initialEvent := fmt.Sprintf("event: endpoint\ndata: %s\r\n\r\n", messageEndpoint)
|
||||
s.redisClient.Publish(channel, initialEvent)
|
||||
err = s.redisClient.Publish(channel, initialEvent)
|
||||
if err != nil {
|
||||
api.LogErrorf("Failed to send initial event: %v", err)
|
||||
}
|
||||
|
||||
// Start health check handler
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.redisClient.stopChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Send health check message
|
||||
healthCheckEvent := "event: health_check\ndata: ping\r\n\r\n"
|
||||
if err := s.redisClient.Publish(channel, healthCheckEvent); err != nil {
|
||||
api.LogErrorf("Failed to send health check: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// handleMessage processes incoming JSON-RPC messages from clients and sends responses
|
||||
|
||||
@@ -28,7 +28,7 @@ func NewDBClient(dsn string, dbType string) (*DBClient, error) {
|
||||
} else if dbType == "sqlite" {
|
||||
db, err = gorm.Open(sqlite.Open(dsn), &gorm.Config{})
|
||||
} else {
|
||||
return nil, fmt.Errorf("unsupported database type")
|
||||
return nil, fmt.Errorf("unsupported database type %s", dbType)
|
||||
}
|
||||
// Connect to the database
|
||||
if err != nil {
|
||||
|
||||
@@ -5,26 +5,22 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/alibaba/higress/plugins/golang-filter/mcp-server/internal"
|
||||
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
)
|
||||
|
||||
const Version = "1.0.0"
|
||||
|
||||
func init() {
|
||||
internal.GlobalRegistry.RegisterServer("database", &DBConfig{})
|
||||
}
|
||||
|
||||
type DBConfig struct {
|
||||
name string
|
||||
dbType string
|
||||
dsn string
|
||||
}
|
||||
|
||||
func (c *DBConfig) ParseConfig(config map[string]any) error {
|
||||
name, ok := config["name"].(string)
|
||||
if !ok {
|
||||
return errors.New("missing servername")
|
||||
}
|
||||
c.name = name
|
||||
|
||||
dsn, ok := config["dsn"].(string)
|
||||
if !ok {
|
||||
return errors.New("missing dsn")
|
||||
@@ -36,13 +32,15 @@ func (c *DBConfig) ParseConfig(config map[string]any) error {
|
||||
return errors.New("missing database type")
|
||||
}
|
||||
c.dbType = dbType
|
||||
api.LogDebugf("DBConfig ParseConfig: %+v", config)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *DBConfig) NewServer() (*internal.MCPServer, error) {
|
||||
func (c *DBConfig) NewServer(serverName string) (*internal.MCPServer, error) {
|
||||
mcpServer := internal.NewMCPServer(
|
||||
c.name,
|
||||
"1.0.0",
|
||||
serverName,
|
||||
Version,
|
||||
internal.WithInstructions(fmt.Sprintf("This is a %s database server", c.dbType)),
|
||||
)
|
||||
|
||||
dbClient, err := NewDBClient(c.dsn, c.dbType)
|
||||
|
||||
Reference in New Issue
Block a user