diff --git a/Makefile.core.mk b/Makefile.core.mk index aaffc67d6..f28862086 100644 --- a/Makefile.core.mk +++ b/Makefile.core.mk @@ -159,16 +159,17 @@ build-pilot-local: prebuild buildx-prepare: docker buildx inspect multi-arch >/dev/null 2>&1 || docker buildx create --name multi-arch --platform linux/amd64,linux/arm64 --use -build-gateway: prebuild buildx-prepare +build-gateway: prebuild buildx-prepare build-golang-filter USE_REAL_USER=1 TARGET_ARCH=amd64 DOCKER_TARGETS="docker.proxyv2" ./tools/hack/build-istio-image.sh init USE_REAL_USER=1 TARGET_ARCH=arm64 DOCKER_TARGETS="docker.proxyv2" ./tools/hack/build-istio-image.sh init DOCKER_TARGETS="docker.proxyv2" IMG_URL="${IMG_URL}" ./tools/hack/build-istio-image.sh docker.buildx -build-gateway-local: prebuild +build-gateway-local: prebuild build-golang-filter TARGET_ARCH=${TARGET_ARCH} DOCKER_TARGETS="docker.proxyv2" ./tools/hack/build-istio-image.sh docker build-golang-filter: - ./tools/hack/build-golang-filters.sh + TARGET_ARCH=amd64 ./tools/hack/build-golang-filters.sh + TARGET_ARCH=arm64 ./tools/hack/build-golang-filters.sh build-istio: prebuild buildx-prepare DOCKER_TARGETS="docker.pilot" IMG_URL="${IMG_URL}" ./tools/hack/build-istio-image.sh docker.buildx @@ -234,6 +235,8 @@ clean-gateway: clean-istio rm -rf external/proxy rm -rf external/go-control-plane rm -rf external/package/envoy.tar.gz + rm -rf external/package/mcp-server_amd64.so + rm -rf external/package/mcp-server_arm64.so clean-env: rm -rf out/ diff --git a/plugins/golang-filter/Dockerfile b/plugins/golang-filter/Dockerfile index 742c9f481..1daf28c42 100644 --- a/plugins/golang-filter/Dockerfile +++ b/plugins/golang-filter/Dockerfile @@ -2,9 +2,23 @@ FROM golang:1.23-bullseye AS golang-base ARG GOPROXY ARG GO_FILTER_NAME +ARG GOARCH ENV GOFLAGS=-buildvcs=false ENV GOPROXY=${GOPROXY} +ENV GOARCH=${GOARCH} +ENV CGO_ENABLED=1 + +# 根据目标架构安装对应的编译工具 +RUN if [ "$GOARCH" = "arm64" ]; then \ + echo "Installing ARM64 toolchain" && \ + apt-get update && \ + apt-get install -y gcc-aarch64-linux-gnu binutils-aarch64-linux-gnu; \ + else \ + echo "Installing AMD64 toolchain" && \ + apt-get update && \ + apt-get install -y gcc binutils; \ + fi WORKDIR /workspace @@ -13,8 +27,13 @@ COPY . . WORKDIR /workspace/$GO_FILTER_NAME RUN go mod tidy -RUN go build -o /$GO_FILTER_NAME.so -buildmode=c-shared . +RUN if [ "$GOARCH" = "arm64" ]; then \ + CC=aarch64-linux-gnu-gcc AS=aarch64-linux-gnu-as go build -o /$GO_FILTER_NAME.so -buildmode=c-shared .; \ + else \ + go build -o /$GO_FILTER_NAME.so -buildmode=c-shared .; \ + fi FROM scratch AS output ARG GO_FILTER_NAME -COPY --from=golang-base /$GO_FILTER_NAME.so $GO_FILTER_NAME.so \ No newline at end of file +ARG GOARCH +COPY --from=golang-base /${GO_FILTER_NAME}.so ${GO_FILTER_NAME}_${GOARCH}.so \ No newline at end of file diff --git a/plugins/golang-filter/Makefile b/plugins/golang-filter/Makefile index 639ef6dbc..ca4367440 100644 --- a/plugins/golang-filter/Makefile +++ b/plugins/golang-filter/Makefile @@ -1,10 +1,12 @@ GO_FILTER_NAME ?= mcp-server GOPROXY := $(shell go env GOPROXY) +GOARCH ?= amd64 .DEFAULT: build: DOCKER_BUILDKIT=1 docker build --build-arg GOPROXY=$(GOPROXY) \ --build-arg GO_FILTER_NAME=${GO_FILTER_NAME} \ + --build-arg GOARCH=${GOARCH} \ -t ${GO_FILTER_NAME} \ --output ./${GO_FILTER_NAME} \ . \ No newline at end of file diff --git a/plugins/golang-filter/mcp-server/config.go b/plugins/golang-filter/mcp-server/config.go index 9683ef85c..ba7ffa32d 100644 --- a/plugins/golang-filter/mcp-server/config.go +++ b/plugins/golang-filter/mcp-server/config.go @@ -7,12 +7,14 @@ import ( "google.golang.org/protobuf/types/known/anypb" "github.com/alibaba/higress/plugins/golang-filter/mcp-server/internal" - _ "github.com/alibaba/higress/plugins/golang-filter/mcp-server/servers/gorm" // 导入gorm包以执行其init函数 + _ "github.com/alibaba/higress/plugins/golang-filter/mcp-server/servers/gorm" "github.com/envoyproxy/envoy/contrib/golang/common/go/api" envoyHttp "github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http" ) const Name = "mcp-server" +const Version = "1.0.0" +const DefaultServerName = "default" func init() { envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{}) @@ -23,6 +25,7 @@ type config struct { redisClient *internal.RedisClient stopChan chan struct{} servers []*internal.SSEServer + defaultServer *internal.SSEServer } type parser struct { @@ -113,6 +116,9 @@ func (p *parser) Merge(parent interface{}, child interface{}) interface{} { if childConfig.servers != nil { newConfig.servers = append(newConfig.servers, childConfig.servers...) } + if childConfig.defaultServer != nil { + newConfig.defaultServer = childConfig.defaultServer + } return &newConfig } diff --git a/plugins/golang-filter/mcp-server/filter.go b/plugins/golang-filter/mcp-server/filter.go index bc17db2df..fe5ac4bed 100644 --- a/plugins/golang-filter/mcp-server/filter.go +++ b/plugins/golang-filter/mcp-server/filter.go @@ -1,10 +1,13 @@ package main import ( + "fmt" "net/http" "net/http/httptest" "net/url" + "strings" + "github.com/alibaba/higress/plugins/golang-filter/mcp-server/internal" "github.com/envoyproxy/envoy/contrib/golang/common/go/api" ) @@ -18,40 +21,60 @@ type filter struct { config *config req *http.Request - sse bool + serverName string message bool bodyBuffer []byte } +type RequestURL struct { + method string + scheme string + host string + path string + baseURL string + parsedURL *url.URL +} + +func NewRequestURL(header api.RequestHeaderMap) *RequestURL { + method, _ := header.Get(":method") + scheme, _ := header.Get(":scheme") + host, _ := header.Get(":authority") + path, _ := header.Get(":path") + baseURL := fmt.Sprintf("%s://%s", scheme, host) + parsedURL, _ := url.Parse(path) + api.LogInfof("RequestURL: method=%s, scheme=%s, host=%s, path=%s", method, scheme, host, path) + return &RequestURL{method: method, scheme: scheme, host: host, path: path, baseURL: baseURL, parsedURL: parsedURL} +} + // Callbacks which are called in request path // The endStream is true if the request doesn't have body func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.StatusType { - fullPath, _ := header.Get(":path") - parsedURL, _ := url.Parse(fullPath) - f.path = parsedURL.Path - method, _ := header.Get(":method") + url := NewRequestURL(header) + f.path = url.parsedURL.Path + for _, server := range f.config.servers { if f.path == server.GetSSEEndpoint() { - if method != http.MethodGet { + if url.method != http.MethodGet { f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "") } else { - f.sse = true + f.serverName = server.GetServerName() body := "SSE connection create" f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusOK, body, nil, 0, "") } api.LogInfof("%s SSE connection started", server.GetServerName()) + server.SetBaseURL(url.baseURL) return api.LocalReply } else if f.path == server.GetMessageEndpoint() { - if method != http.MethodPost { + if url.method != http.MethodPost { f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "") } // Create a new http.Request object f.req = &http.Request{ - Method: method, - URL: parsedURL, + Method: url.method, + URL: url.parsedURL, Header: make(http.Header), } - api.LogDebugf("Message request: %v", parsedURL) + api.LogDebugf("Message request: %v", url.parsedURL) // Copy headers from api.RequestHeaderMap to http.Header header.Range(func(key, value string) bool { f.req.Header.Add(key, value) @@ -65,11 +88,27 @@ func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api. } } } - if endStream { - return api.Continue - } else { - return api.StopAndBuffer + if !strings.HasSuffix(url.parsedURL.Path, f.config.ssePathSuffix) { + if endStream { + return api.Continue + } else { + return api.StopAndBuffer + } } + + if url.method != http.MethodGet { + f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "") + } else { + f.config.defaultServer = internal.NewSSEServer(internal.NewMCPServer(DefaultServerName, Version), + internal.WithSSEEndpoint(f.config.ssePathSuffix), + internal.WithMessageEndpoint(strings.TrimSuffix(url.parsedURL.Path, f.config.ssePathSuffix)), + internal.WithRedisClient(f.config.redisClient)) + f.serverName = f.config.defaultServer.GetServerName() + body := "SSE connection create" + f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusOK, body, nil, 0, "") + f.config.defaultServer.SetBaseURL(url.baseURL) + } + return api.LocalReply } // DecodeData might be called multiple times during handling the request body. @@ -101,7 +140,7 @@ func (f *filter) DecodeData(buffer api.BufferInstance, endStream bool) api.Statu // Callbacks which are called in response path // The endStream is true if the response doesn't have body func (f *filter) EncodeHeaders(header api.ResponseHeaderMap, endStream bool) api.StatusType { - if f.sse { + if f.serverName != "" { header.Set("Content-Type", "text/event-stream") header.Set("Cache-Control", "no-cache") header.Set("Connection", "keep-alive") @@ -115,21 +154,35 @@ func (f *filter) EncodeHeaders(header api.ResponseHeaderMap, endStream bool) api // EncodeData might be called multiple times during handling the response body. // The endStream is true when handling the last piece of the body. func (f *filter) EncodeData(buffer api.BufferInstance, endStream bool) api.StatusType { - for _, server := range f.config.servers { - if f.sse { + if f.serverName != "" { + // handle specific server + for _, server := range f.config.servers { + if f.serverName == server.GetServerName() { + buffer.Reset() + server.HandleSSE(f.callbacks) + return api.Running + } + } + // handle default server + if f.serverName == f.config.defaultServer.GetServerName() { buffer.Reset() - server.HandleSSE(f.callbacks) - f.sse = false + f.config.defaultServer.HandleSSE(f.callbacks) return api.Running } + return api.Continue } return api.Continue } -// OnDestroy 或 OnStreamComplete 中停止 goroutine +// OnDestroy stops the goroutine func (f *filter) OnDestroy(reason api.DestroyReason) { - if f.sse && f.config.stopChan != nil { - api.LogInfo("Stopping SSE connection") - close(f.config.stopChan) + if f.serverName != "" && f.config.stopChan != nil { + select { + case <-f.config.stopChan: + return + default: + api.LogInfo("Stopping SSE connection") + close(f.config.stopChan) + } } } diff --git a/plugins/golang-filter/mcp-server/internal/server.go b/plugins/golang-filter/mcp-server/internal/server.go index 4728becd4..2c980d4b0 100644 --- a/plugins/golang-filter/mcp-server/internal/server.go +++ b/plugins/golang-filter/mcp-server/internal/server.go @@ -1,4 +1,3 @@ -// Package server provides MCP (Model Control Protocol) server implementations. package internal import ( diff --git a/plugins/golang-filter/mcp-server/internal/sse.go b/plugins/golang-filter/mcp-server/internal/sse.go index c50fc41ca..cca4c9fed 100644 --- a/plugins/golang-filter/mcp-server/internal/sse.go +++ b/plugins/golang-filter/mcp-server/internal/sse.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net/http" + "sync" "github.com/envoyproxy/envoy/contrib/golang/common/go/api" "github.com/google/uuid" @@ -17,10 +18,14 @@ type SSEServer struct { baseURL string messageEndpoint string sseEndpoint string - sessions map[string]bool + sessions sync.Map redisClient *RedisClient // Redis client for pub/sub } +func (s *SSEServer) SetBaseURL(baseURL string) { + s.baseURL = baseURL +} + func (s *SSEServer) GetMessageEndpoint() string { return s.messageEndpoint } @@ -69,7 +74,6 @@ func NewSSEServer(server *MCPServer, opts ...Option) *SSEServer { server: server, sseEndpoint: "/sse", messageEndpoint: "/message", - sessions: make(map[string]bool), } // Apply all options @@ -84,12 +88,8 @@ func NewSSEServer(server *MCPServer, opts ...Option) *SSEServer { func (s *SSEServer) HandleSSE(cb api.FilterCallbackHandler) { sessionID := uuid.New().String() - s.sessions[sessionID] = true - - // sessionStore, _ := json.Marshal(s.sessions) - // TODO: sse:sessions? - // s.redisClient.Set("sse:sessions", string(sessionStore), 0) - defer delete(s.sessions, sessionID) + s.sessions.Store(sessionID, true) + defer s.sessions.Delete(sessionID) channel := fmt.Sprintf("sse:%s", sessionID) diff --git a/plugins/golang-filter/mcp-server/servers/gorm/server.go b/plugins/golang-filter/mcp-server/servers/gorm/server.go index 3d8f50d9d..76f34a907 100644 --- a/plugins/golang-filter/mcp-server/servers/gorm/server.go +++ b/plugins/golang-filter/mcp-server/servers/gorm/server.go @@ -52,7 +52,7 @@ func (c *DBConfig) NewServer() (*internal.MCPServer, error) { // Add query tool mcpServer.AddTool( - mcp.NewToolWithRawSchema("query", "Run a read-only SQL query in clickhouse database with repository git data", GetQueryToolSchema()), + mcp.NewToolWithRawSchema("query", "Run a read-only SQL query in database", GetQueryToolSchema()), HandleQueryTool(dbClient), ) diff --git a/tools/hack/build-golang-filters.sh b/tools/hack/build-golang-filters.sh index 4a63e2ea1..a9adae024 100755 --- a/tools/hack/build-golang-filters.sh +++ b/tools/hack/build-golang-filters.sh @@ -17,7 +17,7 @@ set -euo pipefail INNER_GO_FILTER_NAME=${GO_FILTER_NAME-""} -# OUTPUT_PACKAGE_DIR=${OUTPUT_PACKAGE_DIR:-"../external/package/"} +OUTPUT_PACKAGE_DIR=${OUTPUT_PACKAGE_DIR:-"../../external/package/"} cd ./plugins/golang-filter if [ ! -n "$INNER_GO_FILTER_NAME" ]; then @@ -28,8 +28,8 @@ if [ ! -n "$INNER_GO_FILTER_NAME" ]; then if [ -d $GO_FILTERS_DIR/$file ]; then name=${file##*/} echo "🚀 Build Go Filter: $name" - GO_FILTER_NAME=${name} make build - # cp ${GO_FILTERS_DIR}/${file}/${name}.so ${OUTPUT_PACKAGE_DIR} + GO_FILTER_NAME=${name} GOARCH=${TARGET_ARCH} make build + cp ${GO_FILTERS_DIR}/${file}/${name}_${TARGET_ARCH}.so ${OUTPUT_PACKAGE_DIR} fi done else