feat: update Go filter mcp-server (#1950)

Co-authored-by: johnlanni <zty98751@alibaba-inc.com>
This commit is contained in:
Jingze
2025-03-26 14:31:23 +08:00
committed by GitHub
parent 87fe1aeeb5
commit f83e66c23b
9 changed files with 125 additions and 43 deletions

View File

@@ -159,16 +159,17 @@ build-pilot-local: prebuild
buildx-prepare: buildx-prepare:
docker buildx inspect multi-arch >/dev/null 2>&1 || docker buildx create --name multi-arch --platform linux/amd64,linux/arm64 --use docker buildx inspect multi-arch >/dev/null 2>&1 || docker buildx create --name multi-arch --platform linux/amd64,linux/arm64 --use
build-gateway: prebuild buildx-prepare build-gateway: prebuild buildx-prepare build-golang-filter
USE_REAL_USER=1 TARGET_ARCH=amd64 DOCKER_TARGETS="docker.proxyv2" ./tools/hack/build-istio-image.sh init USE_REAL_USER=1 TARGET_ARCH=amd64 DOCKER_TARGETS="docker.proxyv2" ./tools/hack/build-istio-image.sh init
USE_REAL_USER=1 TARGET_ARCH=arm64 DOCKER_TARGETS="docker.proxyv2" ./tools/hack/build-istio-image.sh init USE_REAL_USER=1 TARGET_ARCH=arm64 DOCKER_TARGETS="docker.proxyv2" ./tools/hack/build-istio-image.sh init
DOCKER_TARGETS="docker.proxyv2" IMG_URL="${IMG_URL}" ./tools/hack/build-istio-image.sh docker.buildx DOCKER_TARGETS="docker.proxyv2" IMG_URL="${IMG_URL}" ./tools/hack/build-istio-image.sh docker.buildx
build-gateway-local: prebuild build-gateway-local: prebuild build-golang-filter
TARGET_ARCH=${TARGET_ARCH} DOCKER_TARGETS="docker.proxyv2" ./tools/hack/build-istio-image.sh docker TARGET_ARCH=${TARGET_ARCH} DOCKER_TARGETS="docker.proxyv2" ./tools/hack/build-istio-image.sh docker
build-golang-filter: build-golang-filter:
./tools/hack/build-golang-filters.sh TARGET_ARCH=amd64 ./tools/hack/build-golang-filters.sh
TARGET_ARCH=arm64 ./tools/hack/build-golang-filters.sh
build-istio: prebuild buildx-prepare build-istio: prebuild buildx-prepare
DOCKER_TARGETS="docker.pilot" IMG_URL="${IMG_URL}" ./tools/hack/build-istio-image.sh docker.buildx DOCKER_TARGETS="docker.pilot" IMG_URL="${IMG_URL}" ./tools/hack/build-istio-image.sh docker.buildx
@@ -234,6 +235,8 @@ clean-gateway: clean-istio
rm -rf external/proxy rm -rf external/proxy
rm -rf external/go-control-plane rm -rf external/go-control-plane
rm -rf external/package/envoy.tar.gz rm -rf external/package/envoy.tar.gz
rm -rf external/package/mcp-server_amd64.so
rm -rf external/package/mcp-server_arm64.so
clean-env: clean-env:
rm -rf out/ rm -rf out/

View File

@@ -2,9 +2,23 @@ FROM golang:1.23-bullseye AS golang-base
ARG GOPROXY ARG GOPROXY
ARG GO_FILTER_NAME ARG GO_FILTER_NAME
ARG GOARCH
ENV GOFLAGS=-buildvcs=false ENV GOFLAGS=-buildvcs=false
ENV GOPROXY=${GOPROXY} ENV GOPROXY=${GOPROXY}
ENV GOARCH=${GOARCH}
ENV CGO_ENABLED=1
# 根据目标架构安装对应的编译工具
RUN if [ "$GOARCH" = "arm64" ]; then \
echo "Installing ARM64 toolchain" && \
apt-get update && \
apt-get install -y gcc-aarch64-linux-gnu binutils-aarch64-linux-gnu; \
else \
echo "Installing AMD64 toolchain" && \
apt-get update && \
apt-get install -y gcc binutils; \
fi
WORKDIR /workspace WORKDIR /workspace
@@ -13,8 +27,13 @@ COPY . .
WORKDIR /workspace/$GO_FILTER_NAME WORKDIR /workspace/$GO_FILTER_NAME
RUN go mod tidy RUN go mod tidy
RUN go build -o /$GO_FILTER_NAME.so -buildmode=c-shared . RUN if [ "$GOARCH" = "arm64" ]; then \
CC=aarch64-linux-gnu-gcc AS=aarch64-linux-gnu-as go build -o /$GO_FILTER_NAME.so -buildmode=c-shared .; \
else \
go build -o /$GO_FILTER_NAME.so -buildmode=c-shared .; \
fi
FROM scratch AS output FROM scratch AS output
ARG GO_FILTER_NAME ARG GO_FILTER_NAME
COPY --from=golang-base /$GO_FILTER_NAME.so $GO_FILTER_NAME.so ARG GOARCH
COPY --from=golang-base /${GO_FILTER_NAME}.so ${GO_FILTER_NAME}_${GOARCH}.so

View File

@@ -1,10 +1,12 @@
GO_FILTER_NAME ?= mcp-server GO_FILTER_NAME ?= mcp-server
GOPROXY := $(shell go env GOPROXY) GOPROXY := $(shell go env GOPROXY)
GOARCH ?= amd64
.DEFAULT: .DEFAULT:
build: build:
DOCKER_BUILDKIT=1 docker build --build-arg GOPROXY=$(GOPROXY) \ DOCKER_BUILDKIT=1 docker build --build-arg GOPROXY=$(GOPROXY) \
--build-arg GO_FILTER_NAME=${GO_FILTER_NAME} \ --build-arg GO_FILTER_NAME=${GO_FILTER_NAME} \
--build-arg GOARCH=${GOARCH} \
-t ${GO_FILTER_NAME} \ -t ${GO_FILTER_NAME} \
--output ./${GO_FILTER_NAME} \ --output ./${GO_FILTER_NAME} \
. .

View File

@@ -7,12 +7,14 @@ import (
"google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/anypb"
"github.com/alibaba/higress/plugins/golang-filter/mcp-server/internal" "github.com/alibaba/higress/plugins/golang-filter/mcp-server/internal"
_ "github.com/alibaba/higress/plugins/golang-filter/mcp-server/servers/gorm" // 导入gorm包以执行其init函数 _ "github.com/alibaba/higress/plugins/golang-filter/mcp-server/servers/gorm"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api" "github.com/envoyproxy/envoy/contrib/golang/common/go/api"
envoyHttp "github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http" envoyHttp "github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http"
) )
const Name = "mcp-server" const Name = "mcp-server"
const Version = "1.0.0"
const DefaultServerName = "default"
func init() { func init() {
envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{}) envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
@@ -23,6 +25,7 @@ type config struct {
redisClient *internal.RedisClient redisClient *internal.RedisClient
stopChan chan struct{} stopChan chan struct{}
servers []*internal.SSEServer servers []*internal.SSEServer
defaultServer *internal.SSEServer
} }
type parser struct { type parser struct {
@@ -113,6 +116,9 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
if childConfig.servers != nil { if childConfig.servers != nil {
newConfig.servers = append(newConfig.servers, childConfig.servers...) newConfig.servers = append(newConfig.servers, childConfig.servers...)
} }
if childConfig.defaultServer != nil {
newConfig.defaultServer = childConfig.defaultServer
}
return &newConfig return &newConfig
} }

View File

@@ -1,10 +1,13 @@
package main package main
import ( import (
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"strings"
"github.com/alibaba/higress/plugins/golang-filter/mcp-server/internal"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api" "github.com/envoyproxy/envoy/contrib/golang/common/go/api"
) )
@@ -18,40 +21,60 @@ type filter struct {
config *config config *config
req *http.Request req *http.Request
sse bool serverName string
message bool message bool
bodyBuffer []byte bodyBuffer []byte
} }
type RequestURL struct {
method string
scheme string
host string
path string
baseURL string
parsedURL *url.URL
}
func NewRequestURL(header api.RequestHeaderMap) *RequestURL {
method, _ := header.Get(":method")
scheme, _ := header.Get(":scheme")
host, _ := header.Get(":authority")
path, _ := header.Get(":path")
baseURL := fmt.Sprintf("%s://%s", scheme, host)
parsedURL, _ := url.Parse(path)
api.LogInfof("RequestURL: method=%s, scheme=%s, host=%s, path=%s", method, scheme, host, path)
return &RequestURL{method: method, scheme: scheme, host: host, path: path, baseURL: baseURL, parsedURL: parsedURL}
}
// Callbacks which are called in request path // Callbacks which are called in request path
// The endStream is true if the request doesn't have body // The endStream is true if the request doesn't have body
func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.StatusType { func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.StatusType {
fullPath, _ := header.Get(":path") url := NewRequestURL(header)
parsedURL, _ := url.Parse(fullPath) f.path = url.parsedURL.Path
f.path = parsedURL.Path
method, _ := header.Get(":method")
for _, server := range f.config.servers { for _, server := range f.config.servers {
if f.path == server.GetSSEEndpoint() { if f.path == server.GetSSEEndpoint() {
if method != http.MethodGet { if url.method != http.MethodGet {
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "") f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "")
} else { } else {
f.sse = true f.serverName = server.GetServerName()
body := "SSE connection create" body := "SSE connection create"
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusOK, body, nil, 0, "") f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusOK, body, nil, 0, "")
} }
api.LogInfof("%s SSE connection started", server.GetServerName()) api.LogInfof("%s SSE connection started", server.GetServerName())
server.SetBaseURL(url.baseURL)
return api.LocalReply return api.LocalReply
} else if f.path == server.GetMessageEndpoint() { } else if f.path == server.GetMessageEndpoint() {
if method != http.MethodPost { if url.method != http.MethodPost {
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "") f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "")
} }
// Create a new http.Request object // Create a new http.Request object
f.req = &http.Request{ f.req = &http.Request{
Method: method, Method: url.method,
URL: parsedURL, URL: url.parsedURL,
Header: make(http.Header), Header: make(http.Header),
} }
api.LogDebugf("Message request: %v", parsedURL) api.LogDebugf("Message request: %v", url.parsedURL)
// Copy headers from api.RequestHeaderMap to http.Header // Copy headers from api.RequestHeaderMap to http.Header
header.Range(func(key, value string) bool { header.Range(func(key, value string) bool {
f.req.Header.Add(key, value) f.req.Header.Add(key, value)
@@ -65,11 +88,27 @@ func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.
} }
} }
} }
if endStream { if !strings.HasSuffix(url.parsedURL.Path, f.config.ssePathSuffix) {
return api.Continue if endStream {
} else { return api.Continue
return api.StopAndBuffer } else {
return api.StopAndBuffer
}
} }
if url.method != http.MethodGet {
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "")
} else {
f.config.defaultServer = internal.NewSSEServer(internal.NewMCPServer(DefaultServerName, Version),
internal.WithSSEEndpoint(f.config.ssePathSuffix),
internal.WithMessageEndpoint(strings.TrimSuffix(url.parsedURL.Path, f.config.ssePathSuffix)),
internal.WithRedisClient(f.config.redisClient))
f.serverName = f.config.defaultServer.GetServerName()
body := "SSE connection create"
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusOK, body, nil, 0, "")
f.config.defaultServer.SetBaseURL(url.baseURL)
}
return api.LocalReply
} }
// DecodeData might be called multiple times during handling the request body. // DecodeData might be called multiple times during handling the request body.
@@ -101,7 +140,7 @@ func (f *filter) DecodeData(buffer api.BufferInstance, endStream bool) api.Statu
// Callbacks which are called in response path // Callbacks which are called in response path
// The endStream is true if the response doesn't have body // The endStream is true if the response doesn't have body
func (f *filter) EncodeHeaders(header api.ResponseHeaderMap, endStream bool) api.StatusType { func (f *filter) EncodeHeaders(header api.ResponseHeaderMap, endStream bool) api.StatusType {
if f.sse { if f.serverName != "" {
header.Set("Content-Type", "text/event-stream") header.Set("Content-Type", "text/event-stream")
header.Set("Cache-Control", "no-cache") header.Set("Cache-Control", "no-cache")
header.Set("Connection", "keep-alive") header.Set("Connection", "keep-alive")
@@ -115,21 +154,35 @@ func (f *filter) EncodeHeaders(header api.ResponseHeaderMap, endStream bool) api
// EncodeData might be called multiple times during handling the response body. // EncodeData might be called multiple times during handling the response body.
// The endStream is true when handling the last piece of the body. // The endStream is true when handling the last piece of the body.
func (f *filter) EncodeData(buffer api.BufferInstance, endStream bool) api.StatusType { func (f *filter) EncodeData(buffer api.BufferInstance, endStream bool) api.StatusType {
for _, server := range f.config.servers { if f.serverName != "" {
if f.sse { // handle specific server
for _, server := range f.config.servers {
if f.serverName == server.GetServerName() {
buffer.Reset()
server.HandleSSE(f.callbacks)
return api.Running
}
}
// handle default server
if f.serverName == f.config.defaultServer.GetServerName() {
buffer.Reset() buffer.Reset()
server.HandleSSE(f.callbacks) f.config.defaultServer.HandleSSE(f.callbacks)
f.sse = false
return api.Running return api.Running
} }
return api.Continue
} }
return api.Continue return api.Continue
} }
// OnDestroy 或 OnStreamComplete 中停止 goroutine // OnDestroy stops the goroutine
func (f *filter) OnDestroy(reason api.DestroyReason) { func (f *filter) OnDestroy(reason api.DestroyReason) {
if f.sse && f.config.stopChan != nil { if f.serverName != "" && f.config.stopChan != nil {
api.LogInfo("Stopping SSE connection") select {
close(f.config.stopChan) case <-f.config.stopChan:
return
default:
api.LogInfo("Stopping SSE connection")
close(f.config.stopChan)
}
} }
} }

View File

@@ -1,4 +1,3 @@
// Package server provides MCP (Model Control Protocol) server implementations.
package internal package internal
import ( import (

View File

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"sync"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api" "github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"github.com/google/uuid" "github.com/google/uuid"
@@ -17,10 +18,14 @@ type SSEServer struct {
baseURL string baseURL string
messageEndpoint string messageEndpoint string
sseEndpoint string sseEndpoint string
sessions map[string]bool sessions sync.Map
redisClient *RedisClient // Redis client for pub/sub redisClient *RedisClient // Redis client for pub/sub
} }
func (s *SSEServer) SetBaseURL(baseURL string) {
s.baseURL = baseURL
}
func (s *SSEServer) GetMessageEndpoint() string { func (s *SSEServer) GetMessageEndpoint() string {
return s.messageEndpoint return s.messageEndpoint
} }
@@ -69,7 +74,6 @@ func NewSSEServer(server *MCPServer, opts ...Option) *SSEServer {
server: server, server: server,
sseEndpoint: "/sse", sseEndpoint: "/sse",
messageEndpoint: "/message", messageEndpoint: "/message",
sessions: make(map[string]bool),
} }
// Apply all options // Apply all options
@@ -84,12 +88,8 @@ func NewSSEServer(server *MCPServer, opts ...Option) *SSEServer {
func (s *SSEServer) HandleSSE(cb api.FilterCallbackHandler) { func (s *SSEServer) HandleSSE(cb api.FilterCallbackHandler) {
sessionID := uuid.New().String() sessionID := uuid.New().String()
s.sessions[sessionID] = true s.sessions.Store(sessionID, true)
defer s.sessions.Delete(sessionID)
// sessionStore, _ := json.Marshal(s.sessions)
// TODO: sse:sessions?
// s.redisClient.Set("sse:sessions", string(sessionStore), 0)
defer delete(s.sessions, sessionID)
channel := fmt.Sprintf("sse:%s", sessionID) channel := fmt.Sprintf("sse:%s", sessionID)

View File

@@ -52,7 +52,7 @@ func (c *DBConfig) NewServer() (*internal.MCPServer, error) {
// Add query tool // Add query tool
mcpServer.AddTool( mcpServer.AddTool(
mcp.NewToolWithRawSchema("query", "Run a read-only SQL query in clickhouse database with repository git data", GetQueryToolSchema()), mcp.NewToolWithRawSchema("query", "Run a read-only SQL query in database", GetQueryToolSchema()),
HandleQueryTool(dbClient), HandleQueryTool(dbClient),
) )

View File

@@ -17,7 +17,7 @@
set -euo pipefail set -euo pipefail
INNER_GO_FILTER_NAME=${GO_FILTER_NAME-""} INNER_GO_FILTER_NAME=${GO_FILTER_NAME-""}
# OUTPUT_PACKAGE_DIR=${OUTPUT_PACKAGE_DIR:-"../external/package/"} OUTPUT_PACKAGE_DIR=${OUTPUT_PACKAGE_DIR:-"../../external/package/"}
cd ./plugins/golang-filter cd ./plugins/golang-filter
if [ ! -n "$INNER_GO_FILTER_NAME" ]; then if [ ! -n "$INNER_GO_FILTER_NAME" ]; then
@@ -28,8 +28,8 @@ if [ ! -n "$INNER_GO_FILTER_NAME" ]; then
if [ -d $GO_FILTERS_DIR/$file ]; then if [ -d $GO_FILTERS_DIR/$file ]; then
name=${file##*/} name=${file##*/}
echo "🚀 Build Go Filter: $name" echo "🚀 Build Go Filter: $name"
GO_FILTER_NAME=${name} make build GO_FILTER_NAME=${name} GOARCH=${TARGET_ARCH} make build
# cp ${GO_FILTERS_DIR}/${file}/${name}.so ${OUTPUT_PACKAGE_DIR} cp ${GO_FILTERS_DIR}/${file}/${name}_${TARGET_ARCH}.so ${OUTPUT_PACKAGE_DIR}
fi fi
done done
else else