diff --git a/Makefile.core.mk b/Makefile.core.mk index 2d84c0b11..aaffc67d6 100644 --- a/Makefile.core.mk +++ b/Makefile.core.mk @@ -167,6 +167,9 @@ build-gateway: prebuild buildx-prepare build-gateway-local: prebuild TARGET_ARCH=${TARGET_ARCH} DOCKER_TARGETS="docker.proxyv2" ./tools/hack/build-istio-image.sh docker +build-golang-filter: + ./tools/hack/build-golang-filters.sh + build-istio: prebuild buildx-prepare DOCKER_TARGETS="docker.pilot" IMG_URL="${IMG_URL}" ./tools/hack/build-istio-image.sh docker.buildx diff --git a/plugins/golang-filter/Dockerfile b/plugins/golang-filter/Dockerfile new file mode 100644 index 000000000..1f08417ae --- /dev/null +++ b/plugins/golang-filter/Dockerfile @@ -0,0 +1,20 @@ +FROM golang:1.23 AS golang-base + +ARG GOPROXY +ARG GO_FILTER_NAME + +ENV GOFLAGS=-buildvcs=false +ENV GOPROXY=${GOPROXY} + +WORKDIR /workspace + +COPY . . + +WORKDIR /workspace/$GO_FILTER_NAME + +RUN go mod tidy +RUN go build -o /$GO_FILTER_NAME.so -buildmode=c-shared . + +FROM scratch AS output +ARG GO_FILTER_NAME +COPY --from=golang-base /$GO_FILTER_NAME.so $GO_FILTER_NAME.so \ No newline at end of file diff --git a/plugins/golang-filter/Makefile b/plugins/golang-filter/Makefile new file mode 100644 index 000000000..639ef6dbc --- /dev/null +++ b/plugins/golang-filter/Makefile @@ -0,0 +1,10 @@ +GO_FILTER_NAME ?= mcp-server +GOPROXY := $(shell go env GOPROXY) + +.DEFAULT: +build: + DOCKER_BUILDKIT=1 docker build --build-arg GOPROXY=$(GOPROXY) \ + --build-arg GO_FILTER_NAME=${GO_FILTER_NAME} \ + -t ${GO_FILTER_NAME} \ + --output ./${GO_FILTER_NAME} \ + . \ No newline at end of file diff --git a/plugins/golang-filter/README.md b/plugins/golang-filter/README.md new file mode 100644 index 000000000..537820b8b --- /dev/null +++ b/plugins/golang-filter/README.md @@ -0,0 +1,9 @@ +## 介绍 + +## 快速构建 + +使用以下命令可以快速构建 golang filter 插件: + +```bash +GO_FILTER_NAME=mcp-server make build +``` \ No newline at end of file diff --git a/plugins/golang-filter/mcp-server/config.go b/plugins/golang-filter/mcp-server/config.go new file mode 100644 index 000000000..f8adbc955 --- /dev/null +++ b/plugins/golang-filter/mcp-server/config.go @@ -0,0 +1,122 @@ +package main + +import ( + "errors" + "fmt" + + xds "github.com/cncf/xds/go/xds/type/v3" + "github.com/mark3labs/mcp-go/mcp" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + envoyHttp "github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http" + "github.com/envoyproxy/envoy/examples/golang-http/simple/internal" + "github.com/envoyproxy/envoy/examples/golang-http/simple/servers/gorm" +) + +const Name = "mcp-server" +const SCHEME_PATH = "scheme" + +func init() { + envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{}) +} + +type config struct { + echoBody string + // other fields + dbClient *gorm.DBClient + redisClient *internal.RedisClient + stopChan chan struct{} + SSEServer *internal.SSEServer +} + +type parser struct { +} + +// Parse the filter configuration +func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) { + configStruct := &xds.TypedStruct{} + if err := any.UnmarshalTo(configStruct); err != nil { + return nil, err + } + + v := configStruct.Value + conf := &config{} + + dsn, ok := v.AsMap()["dsn"].(string) + if !ok { + return nil, errors.New("missing dsn") + } + + dbType, ok := v.AsMap()["dbType"].(string) + if !ok { + return nil, errors.New("missing database type") + } + + dbClient, err := gorm.NewDBClient(dsn, dbType) + if err != nil { + return nil, fmt.Errorf("failed to initialize DBClient: %w", err) + } + conf.dbClient = dbClient + + conf.stopChan = make(chan struct{}) + redisClient, err := internal.NewRedisClient("localhost:6379", conf.stopChan) + if err != nil { + return nil, fmt.Errorf("failed to initialize RedisClient: %w", err) + } + conf.redisClient = redisClient + + conf.SSEServer = internal.NewSSEServer(NewServer(conf.dbClient), internal.WithRedisClient(redisClient)) + return conf, nil +} + +func (p *parser) Merge(parent interface{}, child interface{}) interface{} { + parentConfig := parent.(*config) + childConfig := child.(*config) + + newConfig := *parentConfig + if childConfig.echoBody != "" { + newConfig.echoBody = childConfig.echoBody + } + if childConfig.dbClient != nil { + newConfig.dbClient = childConfig.dbClient + } + if childConfig.redisClient != nil { + newConfig.redisClient = childConfig.redisClient + } + return &newConfig +} + +func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter { + conf, ok := c.(*config) + if !ok { + panic("unexpected config type") + } + return &filter{ + callbacks: callbacks, + config: conf, + } +} + +func NewServer(dbClient *gorm.DBClient) *internal.MCPServer { + mcpServer := internal.NewMCPServer( + "mcp-server-envoy-poc", + "1.0.0", + ) + + // Add query tool + mcpServer.AddTool( + mcp.NewToolWithRawSchema("query", "Run a read-only SQL query in clickhouse database with repository git data", gorm.GetQueryToolSchema()), + gorm.HandleQueryTool(dbClient), + ) + api.LogInfo("Added query tool successfully") + + // Add favorite files tool + mcpServer.AddTool( + mcp.NewToolWithRawSchema("author_favorite_files", "Favorite files for an author", gorm.GetFavoriteToolSchema()), + gorm.HandleFavoriteTool(dbClient), + ) + return mcpServer +} + +func main() {} diff --git a/plugins/golang-filter/mcp-server/filter.go b/plugins/golang-filter/mcp-server/filter.go new file mode 100644 index 000000000..0762d1241 --- /dev/null +++ b/plugins/golang-filter/mcp-server/filter.go @@ -0,0 +1,128 @@ +package main + +import ( + "net/http" + "net/http/httptest" + "net/url" + + "github.com/envoyproxy/envoy/contrib/golang/common/go/api" +) + +// The callbacks in the filter, like `DecodeHeaders`, can be implemented on demand. +// Because api.PassThroughStreamFilter provides a default implementation. +type filter struct { + api.PassThroughStreamFilter + + callbacks api.FilterCallbackHandler + path string + config *config + + req *http.Request + sse bool + message bool +} + +// Callbacks which are called in request path +// The endStream is true if the request doesn't have body +func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.StatusType { + fullPath, _ := header.Get(":path") + parsedURL, _ := url.Parse(fullPath) + f.path = parsedURL.Path + method, _ := header.Get(":method") + api.LogInfo(f.path) + if f.path == f.config.SSEServer.SSEEndpoint { + if method != http.MethodGet { + f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "") + } else { + f.sse = true + body := "SSE connection create" + f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusOK, body, nil, 0, "") + } + api.LogInfo("SSE connection started") + return api.LocalReply + } else if f.path == f.config.SSEServer.MessageEndpoint { + if method != http.MethodPost { + f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "") + } + // Create a new http.Request object + f.req = &http.Request{ + Method: method, + URL: parsedURL, + Header: make(http.Header), + } + api.LogInfof("Message request: %v", parsedURL) + // Copy headers from api.RequestHeaderMap to http.Header + header.Range(func(key, value string) bool { + f.req.Header.Add(key, value) + return true + }) + f.message = true + if endStream { + return api.Continue + } else { + return api.StopAndBuffer + } + } + if endStream { + return api.Continue + } else { + return api.StopAndBuffer + } +} + +// DecodeData might be called multiple times during handling the request body. +// The endStream is true when handling the last piece of the body. +func (f *filter) DecodeData(buffer api.BufferInstance, endStream bool) api.StatusType { + api.LogInfo("Message DecodeData") + // support suspending & resuming the filter in a background goroutine + api.LogInfof("DecodeData: {%v}", buffer) + if f.message { + // Create a response recorder to capture the response + recorder := httptest.NewRecorder() + // Call the handleMessage method of SSEServer + f.config.SSEServer.HandleMessage(recorder, f.req, buffer.Bytes()) + f.message = false + api.LogInfof("Message DecodeData SendLocalReply %v", recorder) + f.callbacks.DecoderFilterCallbacks().SendLocalReply(recorder.Code, recorder.Body.String(), recorder.Header(), 0, "") + return api.LocalReply + } + return api.Continue +} + +// Callbacks which are called in response path +// The endStream is true if the response doesn't have body +func (f *filter) EncodeHeaders(header api.ResponseHeaderMap, endStream bool) api.StatusType { + if f.sse { + header.Set("Content-Type", "text/event-stream") + header.Set("Cache-Control", "no-cache") + header.Set("Connection", "keep-alive") + header.Set("Access-Control-Allow-Origin", "*") + header.Del("Content-Length") + api.LogInfo("SSE connection header set") + return api.Continue + } + return api.Continue +} + +// TODO: 连接多种数据库 +// TODO: 多种存储类型 +// TODO: 数据库多个实例 +// EncodeData might be called multiple times during handling the response body. +// The endStream is true when handling the last piece of the body. +func (f *filter) EncodeData(buffer api.BufferInstance, endStream bool) api.StatusType { + if f.sse { + //TODO: buffer cleanup + f.config.SSEServer.HandleSSE(f.callbacks) + f.sse = false + return api.Running + } + return api.Continue +} + +// OnDestroy 或 OnStreamComplete 中停止 goroutine +func (f *filter) OnDestroy(reason api.DestroyReason) { + if f.sse && f.config.stopChan != nil { + api.LogInfo("Stopping SSE connection") + close(f.config.stopChan) + } +} diff --git a/plugins/golang-filter/mcp-server/go.mod b/plugins/golang-filter/mcp-server/go.mod new file mode 100644 index 000000000..860781592 --- /dev/null +++ b/plugins/golang-filter/mcp-server/go.mod @@ -0,0 +1,49 @@ +module github.com/envoyproxy/envoy/examples/golang-http/simple + +go 1.23 + +require ( + github.com/envoyproxy/envoy v1.33.1-0.20250224062430-6c11eac01993 + google.golang.org/protobuf v1.36.5 + github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 + github.com/go-redis/redis/v8 v8.11.5 + github.com/google/uuid v1.6.0 + github.com/mark3labs/mcp-go v0.12.0 + gorm.io/driver/clickhouse v0.6.1 + gorm.io/driver/postgres v1.5.11 + gorm.io/gorm v1.25.12 +) + +require ( + cel.dev/expr v0.15.0 // indirect + github.com/ClickHouse/ch-go v0.61.5 // indirect + github.com/ClickHouse/clickhouse-go/v2 v2.23.2 // indirect + github.com/andybalholm/brotli v1.1.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect + github.com/go-faster/city v1.0.1 // indirect + github.com/go-faster/errors v0.7.1 // indirect + github.com/hashicorp/go-version v1.6.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx/v5 v5.5.5 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + github.com/klauspost/compress v1.17.8 // indirect + github.com/paulmach/orb v0.11.1 // indirect + github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/segmentio/asm v1.2.0 // indirect + github.com/shopspring/decimal v1.4.0 // indirect + go.opentelemetry.io/otel v1.26.0 // indirect + go.opentelemetry.io/otel/trace v1.26.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/sync v0.6.0 // indirect + golang.org/x/sys v0.19.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/plugins/golang-filter/mcp-server/go.sum b/plugins/golang-filter/mcp-server/go.sum new file mode 100644 index 000000000..2508afce7 --- /dev/null +++ b/plugins/golang-filter/mcp-server/go.sum @@ -0,0 +1,173 @@ +cel.dev/expr v0.15.0 h1:O1jzfJCQBfL5BFoYktaxwIhuttaQPsVWerH9/EEKx0w= +cel.dev/expr v0.15.0/go.mod h1:TRSuuV7DlVCE/uwv5QbAiW/v8l5O8C4eEPHeu7gf7Sg= +github.com/ClickHouse/ch-go v0.61.5 h1:zwR8QbYI0tsMiEcze/uIMK+Tz1D3XZXLdNrlaOpeEI4= +github.com/ClickHouse/ch-go v0.61.5/go.mod h1:s1LJW/F/LcFs5HJnuogFMta50kKDO0lf9zzfrbl0RQg= +github.com/ClickHouse/clickhouse-go/v2 v2.23.2 h1:+DAKPMnxLS7pduQZsrJc8OhdLS2L9MfDEJ2TS+hpYDM= +github.com/ClickHouse/clickhouse-go/v2 v2.23.2/go.mod h1:aNap51J1OM3yxQJRgM+AlP/MPkGBCL8A74uQThoQhR0= +github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= +github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 h1:Om6kYQYDUk5wWbT0t0q6pvyM49i9XZAv9dDrkDA7gjk= +github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/envoyproxy/envoy v1.33.1-0.20250224062430-6c11eac01993 h1:98rKr5Irapq0t68+sHM78LIkflsiDVttSExZTaqsxSo= +github.com/envoyproxy/envoy v1.33.1-0.20250224062430-6c11eac01993/go.mod h1:x7d0dNbE0xGuDBUkBg19VGCgnPQ+lJ2k8lDzDzKExow= +github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= +github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= +github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= +github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mark3labs/mcp-go v0.12.0 h1:Pue1Tdwqcz77GHq18uzgmLT3wmeDUxXUSAqSwhGLhVo= +github.com/mark3labs/mcp-go v0.12.0/go.mod h1:cjMlBU0cv/cj9kjlgmRhoJ5JREdS7YX83xeIG9Ko/jE= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.24.2 h1:J/tulyYK6JwBldPViHJReihxxZ+22FHs0piGjQAvoUE= +github.com/onsi/gomega v1.24.2/go.mod h1:gs3J10IS7Z7r7eXRoNJIrNqU4ToQukCJhFtKrWgHWnk= +github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU= +github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= +github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= +github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= +github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= +github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= +go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs= +go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4= +go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA= +go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= +golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d h1:DoPTO70H+bcDXcd39vOqb2viZxgqeBeSGtZ55yZU4/Q= +google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/clickhouse v0.6.1 h1:t7JMB6sLBXxN8hEO6RdzCbJCwq/jAEVZdwXlmQs1Sd4= +gorm.io/driver/clickhouse v0.6.1/go.mod h1:riMYpJcGZ3sJ/OAZZ1rEP1j/Y0H6cByOAnwz7fo2AyM= +gorm.io/driver/postgres v1.5.11 h1:ubBVAfbKEUld/twyKZ0IYn9rSQh448EdelLYk9Mv314= +gorm.io/driver/postgres v1.5.11/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI= +gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8= +gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= diff --git a/plugins/golang-filter/mcp-server/internal/redis.go b/plugins/golang-filter/mcp-server/internal/redis.go new file mode 100644 index 000000000..c62848087 --- /dev/null +++ b/plugins/golang-filter/mcp-server/internal/redis.go @@ -0,0 +1,100 @@ +package internal + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + "github.com/go-redis/redis/v8" +) + +// RedisClient is a struct to handle Redis connections and operations +type RedisClient struct { + client *redis.Client + ctx context.Context + stopChan chan struct{} +} + +// NewRedisClient creates a new RedisClient instance and establishes a connection to the Redis server +func NewRedisClient(address string, stopChan chan struct{}) (*RedisClient, error) { + client := redis.NewClient(&redis.Options{ + Addr: address, + Password: "", // no password set + DB: 0, // use default DB + }) + + // Ping the Redis server to check the connection + pong, err := client.Ping(context.Background()).Result() + if err != nil { + return nil, fmt.Errorf("failed to connect to Redis: %w", err) + } + log.Printf("Connected to Redis: %s", pong) + + return &RedisClient{ + client: client, + ctx: context.Background(), + stopChan: stopChan, + }, nil +} + +// TODO: redis keep alive check +// TODO: redis pub sub memory limit +// Publish publishes a message to a Redis channel +func (r *RedisClient) Publish(channel string, message string) error { + err := r.client.Publish(r.ctx, channel, message).Err() + if err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + return nil +} + +// Subscribe subscribes to a Redis channel and processes messages +func (r *RedisClient) Subscribe(channel string, callback func(message string)) error { + pubsub := r.client.Subscribe(r.ctx, channel) + _, err := pubsub.Receive(r.ctx) + if err != nil { + return fmt.Errorf("failed to subscribe to channel: %w", err) + } + + go func() { + defer pubsub.Close() + for { + select { + case <-r.stopChan: + api.LogDebugf("Stopping subscription to channel %s", channel) + return + default: + msg, err := pubsub.ReceiveMessage(r.ctx) + if err != nil { + log.Printf("Error receiving message: %v", err) + return + } + callback(msg.Payload) + } + } + }() + + return nil +} + +// Set sets the value of a key in Redis +func (r *RedisClient) Set(key string, value string, expiration time.Duration) error { + err := r.client.Set(r.ctx, key, value, expiration).Err() + if err != nil { + return fmt.Errorf("failed to set key: %w", err) + } + return nil +} + +// Get retrieves the value of a key from Redis +func (r *RedisClient) Get(key string) (string, error) { + val, err := r.client.Get(r.ctx, key).Result() + if err == redis.Nil { + return "", fmt.Errorf("key does not exist") + } else if err != nil { + return "", fmt.Errorf("failed to get key: %w", err) + } + return val, nil +} diff --git a/plugins/golang-filter/mcp-server/internal/server.go b/plugins/golang-filter/mcp-server/internal/server.go new file mode 100644 index 000000000..4728becd4 --- /dev/null +++ b/plugins/golang-filter/mcp-server/internal/server.go @@ -0,0 +1,846 @@ +// Package server provides MCP (Model Control Protocol) server implementations. +package internal + +import ( + "context" + "encoding/json" + "fmt" + "regexp" + "sort" + "sync" + "sync/atomic" + + "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + "github.com/mark3labs/mcp-go/mcp" +) + +// resourceEntry holds both a resource and its handler +type resourceEntry struct { + resource mcp.Resource + handler ResourceHandlerFunc +} + +// resourceTemplateEntry holds both a template and its handler +type resourceTemplateEntry struct { + template mcp.ResourceTemplate + handler ResourceTemplateHandlerFunc +} + +// ServerOption is a function that configures an MCPServer. +type ServerOption func(*MCPServer) + +// ResourceHandlerFunc is a function that returns resource contents. +type ResourceHandlerFunc func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) + +// ResourceTemplateHandlerFunc is a function that returns a resource template. +type ResourceTemplateHandlerFunc func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) + +// PromptHandlerFunc handles prompt requests with given arguments. +type PromptHandlerFunc func(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error) + +// ToolHandlerFunc handles tool calls with given arguments. +type ToolHandlerFunc func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) + +// ServerTool combines a Tool with its ToolHandlerFunc. +type ServerTool struct { + Tool mcp.Tool + Handler ToolHandlerFunc +} + +// NotificationContext provides client identification for notifications +type NotificationContext struct { + ClientID string + SessionID string +} + +// ServerNotification combines the notification with client context +type ServerNotification struct { + Context NotificationContext + Notification mcp.JSONRPCNotification +} + +// NotificationHandlerFunc handles incoming notifications. +type NotificationHandlerFunc func(ctx context.Context, notification mcp.JSONRPCNotification) + +// MCPServer implements a Model Control Protocol server that can handle various types of requests +// including resources, prompts, and tools. +type MCPServer struct { + mu sync.RWMutex // Add mutex for protecting shared resources + name string + version string + instructions string + resources map[string]resourceEntry + resourceTemplates map[string]resourceTemplateEntry + prompts map[string]mcp.Prompt + promptHandlers map[string]PromptHandlerFunc + tools map[string]ServerTool + notificationHandlers map[string]NotificationHandlerFunc + capabilities serverCapabilities + notifications chan ServerNotification + clientMu sync.Mutex // Separate mutex for client context + currentClient NotificationContext + initialized atomic.Bool // Use atomic for the initialized flag +} + +// serverKey is the context key for storing the server instance +type serverKey struct{} + +// ServerFromContext retrieves the MCPServer instance from a context +func ServerFromContext(ctx context.Context) *MCPServer { + if srv, ok := ctx.Value(serverKey{}).(*MCPServer); ok { + return srv + } + return nil +} + +// WithContext sets the current client context and returns the provided context +func (s *MCPServer) WithContext( + ctx context.Context, + notifCtx NotificationContext, +) context.Context { + s.clientMu.Lock() + s.currentClient = notifCtx + s.clientMu.Unlock() + return ctx +} + +// SendNotificationToClient sends a notification to the current client +func (s *MCPServer) SendNotificationToClient( + method string, + params map[string]interface{}, +) error { + if s.notifications == nil { + return fmt.Errorf("notification channel not initialized") + } + + s.clientMu.Lock() + clientContext := s.currentClient + s.clientMu.Unlock() + + notification := mcp.JSONRPCNotification{ + JSONRPC: mcp.JSONRPC_VERSION, + Notification: mcp.Notification{ + Method: method, + Params: mcp.NotificationParams{ + AdditionalFields: params, + }, + }, + } + + select { + case s.notifications <- ServerNotification{ + Context: clientContext, + Notification: notification, + }: + return nil + default: + return fmt.Errorf("notification channel full or blocked") + } +} + +// serverCapabilities defines the supported features of the MCP server +type serverCapabilities struct { + tools *toolCapabilities + resources *resourceCapabilities + prompts *promptCapabilities + logging bool +} + +// resourceCapabilities defines the supported resource-related features +type resourceCapabilities struct { + subscribe bool + listChanged bool +} + +// promptCapabilities defines the supported prompt-related features +type promptCapabilities struct { + listChanged bool +} + +// toolCapabilities defines the supported tool-related features +type toolCapabilities struct { + listChanged bool +} + +// WithResourceCapabilities configures resource-related server capabilities +func WithResourceCapabilities(subscribe, listChanged bool) ServerOption { + return func(s *MCPServer) { + // Always create a non-nil capability object + s.capabilities.resources = &resourceCapabilities{ + subscribe: subscribe, + listChanged: listChanged, + } + } +} + +// WithPromptCapabilities configures prompt-related server capabilities +func WithPromptCapabilities(listChanged bool) ServerOption { + return func(s *MCPServer) { + // Always create a non-nil capability object + s.capabilities.prompts = &promptCapabilities{ + listChanged: listChanged, + } + } +} + +// WithToolCapabilities configures tool-related server capabilities +func WithToolCapabilities(listChanged bool) ServerOption { + return func(s *MCPServer) { + // Always create a non-nil capability object + s.capabilities.tools = &toolCapabilities{ + listChanged: listChanged, + } + } +} + +// WithLogging enables logging capabilities for the server +func WithLogging() ServerOption { + return func(s *MCPServer) { + s.capabilities.logging = true + } +} + +// WithInstructions sets the server instructions for the client returned in the initialize response +func WithInstructions(instructions string) ServerOption { + return func(s *MCPServer) { + s.instructions = instructions + } +} + +// NewMCPServer creates a new MCP server instance with the given name, version and options +func NewMCPServer( + name, version string, + opts ...ServerOption, +) *MCPServer { + s := &MCPServer{ + resources: make(map[string]resourceEntry), + resourceTemplates: make(map[string]resourceTemplateEntry), + prompts: make(map[string]mcp.Prompt), + promptHandlers: make(map[string]PromptHandlerFunc), + tools: make(map[string]ServerTool), + name: name, + version: version, + notificationHandlers: make(map[string]NotificationHandlerFunc), + notifications: make(chan ServerNotification, 100), + capabilities: serverCapabilities{ + tools: nil, + resources: nil, + prompts: nil, + logging: false, + }, + } + + for _, opt := range opts { + opt(s) + } + + return s +} + +// HandleMessage processes an incoming JSON-RPC message and returns an appropriate response +func (s *MCPServer) HandleMessage( + ctx context.Context, + message json.RawMessage, +) mcp.JSONRPCMessage { + // Add server to context + ctx = context.WithValue(ctx, serverKey{}, s) + + var baseMessage struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + ID interface{} `json:"id,omitempty"` + } + + if err := json.Unmarshal(message, &baseMessage); err != nil { + return createErrorResponse( + nil, + mcp.PARSE_ERROR, + "Failed to parse message", + ) + } + + // Check for valid JSONRPC version + if baseMessage.JSONRPC != mcp.JSONRPC_VERSION { + return createErrorResponse( + baseMessage.ID, + mcp.INVALID_REQUEST, + "Invalid JSON-RPC version", + ) + } + + if baseMessage.ID == nil { + var notification mcp.JSONRPCNotification + if err := json.Unmarshal(message, ¬ification); err != nil { + return createErrorResponse( + nil, + mcp.PARSE_ERROR, + "Failed to parse notification", + ) + } + s.handleNotification(ctx, notification) + return nil // Return nil for notifications + } + api.LogInfof("HandleMessage: %s", baseMessage.Method) + switch baseMessage.Method { + case "initialize": + var request mcp.InitializeRequest + if err := json.Unmarshal(message, &request); err != nil { + return createErrorResponse( + baseMessage.ID, + mcp.INVALID_REQUEST, + "Invalid initialize request", + ) + } + return s.handleInitialize(ctx, baseMessage.ID, request) + case "ping": + var request mcp.PingRequest + if err := json.Unmarshal(message, &request); err != nil { + return createErrorResponse( + baseMessage.ID, + mcp.INVALID_REQUEST, + "Invalid ping request", + ) + } + return s.handlePing(ctx, baseMessage.ID, request) + case "resources/list": + if s.capabilities.resources == nil { + return createErrorResponse( + baseMessage.ID, + mcp.METHOD_NOT_FOUND, + "Resources not supported", + ) + } + var request mcp.ListResourcesRequest + if err := json.Unmarshal(message, &request); err != nil { + return createErrorResponse( + baseMessage.ID, + mcp.INVALID_REQUEST, + "Invalid list resources request", + ) + } + return s.handleListResources(ctx, baseMessage.ID, request) + case "resources/templates/list": + if s.capabilities.resources == nil { + return createErrorResponse( + baseMessage.ID, + mcp.METHOD_NOT_FOUND, + "Resources not supported", + ) + } + var request mcp.ListResourceTemplatesRequest + if err := json.Unmarshal(message, &request); err != nil { + return createErrorResponse( + baseMessage.ID, + mcp.INVALID_REQUEST, + "Invalid list resource templates request", + ) + } + return s.handleListResourceTemplates(ctx, baseMessage.ID, request) + case "resources/read": + if s.capabilities.resources == nil { + return createErrorResponse( + baseMessage.ID, + mcp.METHOD_NOT_FOUND, + "Resources not supported", + ) + } + var request mcp.ReadResourceRequest + if err := json.Unmarshal(message, &request); err != nil { + return createErrorResponse( + baseMessage.ID, + mcp.INVALID_REQUEST, + "Invalid read resource request", + ) + } + return s.handleReadResource(ctx, baseMessage.ID, request) + case "prompts/list": + if s.capabilities.prompts == nil { + return createErrorResponse( + baseMessage.ID, + mcp.METHOD_NOT_FOUND, + "Prompts not supported", + ) + } + var request mcp.ListPromptsRequest + if err := json.Unmarshal(message, &request); err != nil { + return createErrorResponse( + baseMessage.ID, + mcp.INVALID_REQUEST, + "Invalid list prompts request", + ) + } + return s.handleListPrompts(ctx, baseMessage.ID, request) + case "prompts/get": + if s.capabilities.prompts == nil { + return createErrorResponse( + baseMessage.ID, + mcp.METHOD_NOT_FOUND, + "Prompts not supported", + ) + } + var request mcp.GetPromptRequest + if err := json.Unmarshal(message, &request); err != nil { + return createErrorResponse( + baseMessage.ID, + mcp.INVALID_REQUEST, + "Invalid get prompt request", + ) + } + return s.handleGetPrompt(ctx, baseMessage.ID, request) + case "tools/list": + if s.capabilities.tools == nil { + return createErrorResponse( + baseMessage.ID, + mcp.METHOD_NOT_FOUND, + "Tools not supported", + ) + } + var request mcp.ListToolsRequest + if err := json.Unmarshal(message, &request); err != nil { + return createErrorResponse( + baseMessage.ID, + mcp.INVALID_REQUEST, + "Invalid list tools request", + ) + } + return s.handleListTools(ctx, baseMessage.ID, request) + case "tools/call": + if s.capabilities.tools == nil { + return createErrorResponse( + baseMessage.ID, + mcp.METHOD_NOT_FOUND, + "Tools not supported", + ) + } + var request mcp.CallToolRequest + if err := json.Unmarshal(message, &request); err != nil { + return createErrorResponse( + baseMessage.ID, + mcp.INVALID_REQUEST, + "Invalid call tool request", + ) + } + return s.handleToolCall(ctx, baseMessage.ID, request) + default: + return createErrorResponse( + baseMessage.ID, + mcp.METHOD_NOT_FOUND, + fmt.Sprintf("Method %s not found", baseMessage.Method), + ) + } +} + +// AddResource registers a new resource and its handler +func (s *MCPServer) AddResource( + resource mcp.Resource, + handler ResourceHandlerFunc, +) { + if s.capabilities.resources == nil { + s.capabilities.resources = &resourceCapabilities{} + } + s.mu.Lock() + defer s.mu.Unlock() + s.resources[resource.URI] = resourceEntry{ + resource: resource, + handler: handler, + } +} + +// AddResourceTemplate registers a new resource template and its handler +func (s *MCPServer) AddResourceTemplate( + template mcp.ResourceTemplate, + handler ResourceTemplateHandlerFunc, +) { + if s.capabilities.resources == nil { + s.capabilities.resources = &resourceCapabilities{} + } + s.mu.Lock() + defer s.mu.Unlock() + s.resourceTemplates[template.URITemplate] = resourceTemplateEntry{ + template: template, + handler: handler, + } +} + +// AddPrompt registers a new prompt handler with the given name +func (s *MCPServer) AddPrompt(prompt mcp.Prompt, handler PromptHandlerFunc) { + if s.capabilities.prompts == nil { + s.capabilities.prompts = &promptCapabilities{} + } + s.mu.Lock() + defer s.mu.Unlock() + s.prompts[prompt.Name] = prompt + s.promptHandlers[prompt.Name] = handler +} + +// AddTool registers a new tool and its handler +func (s *MCPServer) AddTool(tool mcp.Tool, handler ToolHandlerFunc) { + s.AddTools(ServerTool{Tool: tool, Handler: handler}) +} + +// AddTools registers multiple tools at once +func (s *MCPServer) AddTools(tools ...ServerTool) { + if s.capabilities.tools == nil { + s.capabilities.tools = &toolCapabilities{} + } + s.mu.Lock() + for _, entry := range tools { + s.tools[entry.Tool.Name] = entry + } + initialized := s.initialized.Load() + s.mu.Unlock() + + // Send notification if server is already initialized + if initialized { + if err := s.SendNotificationToClient("notifications/tools/list_changed", nil); err != nil { + // We can't return the error, but in a future version we could log it + } + } +} + +// SetTools replaces all existing tools with the provided list +func (s *MCPServer) SetTools(tools ...ServerTool) { + s.mu.Lock() + s.tools = make(map[string]ServerTool) + s.mu.Unlock() + s.AddTools(tools...) +} + +// DeleteTools removes a tool from the server +func (s *MCPServer) DeleteTools(names ...string) { + s.mu.Lock() + for _, name := range names { + delete(s.tools, name) + } + initialized := s.initialized.Load() + s.mu.Unlock() + + // Send notification if server is already initialized + if initialized { + if err := s.SendNotificationToClient("notifications/tools/list_changed", nil); err != nil { + // We can't return the error, but in a future version we could log it + } + } +} + +// AddNotificationHandler registers a new handler for incoming notifications +func (s *MCPServer) AddNotificationHandler( + method string, + handler NotificationHandlerFunc, +) { + s.mu.Lock() + defer s.mu.Unlock() + s.notificationHandlers[method] = handler +} + +func (s *MCPServer) handleInitialize( + ctx context.Context, + id interface{}, + request mcp.InitializeRequest, +) mcp.JSONRPCMessage { + capabilities := mcp.ServerCapabilities{} + + // Only add resource capabilities if they're configured + if s.capabilities.resources != nil { + capabilities.Resources = &struct { + Subscribe bool `json:"subscribe,omitempty"` + ListChanged bool `json:"listChanged,omitempty"` + }{ + Subscribe: s.capabilities.resources.subscribe, + ListChanged: s.capabilities.resources.listChanged, + } + } + + // Only add prompt capabilities if they're configured + if s.capabilities.prompts != nil { + capabilities.Prompts = &struct { + ListChanged bool `json:"listChanged,omitempty"` + }{ + ListChanged: s.capabilities.prompts.listChanged, + } + } + + // Only add tool capabilities if they're configured + if s.capabilities.tools != nil { + capabilities.Tools = &struct { + ListChanged bool `json:"listChanged,omitempty"` + }{ + ListChanged: s.capabilities.tools.listChanged, + } + } + + if s.capabilities.logging { + capabilities.Logging = &struct{}{} + } + + result := mcp.InitializeResult{ + ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION, + ServerInfo: mcp.Implementation{ + Name: s.name, + Version: s.version, + }, + Capabilities: capabilities, + Instructions: s.instructions, + } + + s.initialized.Store(true) + return createResponse(id, result) +} + +func (s *MCPServer) handlePing( + ctx context.Context, + id interface{}, + request mcp.PingRequest, +) mcp.JSONRPCMessage { + return createResponse(id, mcp.EmptyResult{}) +} + +func (s *MCPServer) handleListResources( + ctx context.Context, + id interface{}, + request mcp.ListResourcesRequest, +) mcp.JSONRPCMessage { + s.mu.RLock() + resources := make([]mcp.Resource, 0, len(s.resources)) + for _, entry := range s.resources { + resources = append(resources, entry.resource) + } + s.mu.RUnlock() + + result := mcp.ListResourcesResult{ + Resources: resources, + } + if request.Params.Cursor != "" { + result.NextCursor = "" // Handle pagination if needed + } + return createResponse(id, result) +} + +func (s *MCPServer) handleListResourceTemplates( + ctx context.Context, + id interface{}, + request mcp.ListResourceTemplatesRequest, +) mcp.JSONRPCMessage { + s.mu.RLock() + templates := make([]mcp.ResourceTemplate, 0, len(s.resourceTemplates)) + for _, entry := range s.resourceTemplates { + templates = append(templates, entry.template) + } + s.mu.RUnlock() + + result := mcp.ListResourceTemplatesResult{ + ResourceTemplates: templates, + } + if request.Params.Cursor != "" { + result.NextCursor = "" // Handle pagination if needed + } + return createResponse(id, result) +} + +func (s *MCPServer) handleReadResource( + ctx context.Context, + id interface{}, + request mcp.ReadResourceRequest, +) mcp.JSONRPCMessage { + s.mu.RLock() + // First try direct resource handlers + if entry, ok := s.resources[request.Params.URI]; ok { + handler := entry.handler + s.mu.RUnlock() + contents, err := handler(ctx, request) + if err != nil { + return createErrorResponse(id, mcp.INTERNAL_ERROR, err.Error()) + } + return createResponse(id, mcp.ReadResourceResult{Contents: contents}) + } + + // If no direct handler found, try matching against templates + var matchedHandler ResourceTemplateHandlerFunc + var matched bool + for uriTemplate, entry := range s.resourceTemplates { + if matchesTemplate(request.Params.URI, uriTemplate) { + matchedHandler = entry.handler + matched = true + break + } + } + s.mu.RUnlock() + + if matched { + contents, err := matchedHandler(ctx, request) + if err != nil { + return createErrorResponse(id, mcp.INTERNAL_ERROR, err.Error()) + } + return createResponse( + id, + mcp.ReadResourceResult{Contents: contents}, + ) + } + + return createErrorResponse( + id, + mcp.INVALID_PARAMS, + fmt.Sprintf( + "No handler found for resource URI: %s", + request.Params.URI, + ), + ) +} + +// matchesTemplate checks if a URI matches a URI template pattern +func matchesTemplate(uri string, template string) bool { + // Convert template into a regex pattern + pattern := template + // Replace {name} with ([^/]+) + pattern = regexp.QuoteMeta(pattern) + pattern = regexp.MustCompile(`\\\{[^}]+\\\}`). + ReplaceAllString(pattern, `([^/]+)`) + pattern = "^" + pattern + "$" + + matched, _ := regexp.MatchString(pattern, uri) + return matched +} + +func (s *MCPServer) handleListPrompts( + ctx context.Context, + id interface{}, + request mcp.ListPromptsRequest, +) mcp.JSONRPCMessage { + s.mu.RLock() + prompts := make([]mcp.Prompt, 0, len(s.prompts)) + for _, prompt := range s.prompts { + prompts = append(prompts, prompt) + } + s.mu.RUnlock() + + result := mcp.ListPromptsResult{ + Prompts: prompts, + } + if request.Params.Cursor != "" { + result.NextCursor = "" // Handle pagination if needed + } + return createResponse(id, result) +} + +func (s *MCPServer) handleGetPrompt( + ctx context.Context, + id interface{}, + request mcp.GetPromptRequest, +) mcp.JSONRPCMessage { + s.mu.RLock() + handler, ok := s.promptHandlers[request.Params.Name] + s.mu.RUnlock() + + if !ok { + return createErrorResponse( + id, + mcp.INVALID_PARAMS, + fmt.Sprintf("Prompt not found: %s", request.Params.Name), + ) + } + + result, err := handler(ctx, request) + if err != nil { + return createErrorResponse(id, mcp.INTERNAL_ERROR, err.Error()) + } + + return createResponse(id, result) +} + +func (s *MCPServer) handleListTools( + ctx context.Context, + id interface{}, + request mcp.ListToolsRequest, +) mcp.JSONRPCMessage { + s.mu.RLock() + tools := make([]mcp.Tool, 0, len(s.tools)) + + // Get all tool names for consistent ordering + toolNames := make([]string, 0, len(s.tools)) + for name := range s.tools { + toolNames = append(toolNames, name) + } + + // Sort the tool names for consistent ordering + sort.Strings(toolNames) + + // Add tools in sorted order + for _, name := range toolNames { + tools = append(tools, s.tools[name].Tool) + } + s.mu.RUnlock() + + result := mcp.ListToolsResult{ + Tools: tools, + } + if request.Params.Cursor != "" { + result.NextCursor = "" // Handle pagination if needed + } + return createResponse(id, result) +} + +func (s *MCPServer) handleToolCall( + ctx context.Context, + id interface{}, + request mcp.CallToolRequest, +) mcp.JSONRPCMessage { + s.mu.RLock() + tool, ok := s.tools[request.Params.Name] + s.mu.RUnlock() + + if !ok { + return createErrorResponse( + id, + mcp.INVALID_PARAMS, + fmt.Sprintf("Tool not found: %s", request.Params.Name), + ) + } + + result, err := tool.Handler(ctx, request) + if err != nil { + return createErrorResponse(id, mcp.INTERNAL_ERROR, err.Error()) + } + + return createResponse(id, result) +} + +func (s *MCPServer) handleNotification( + ctx context.Context, + notification mcp.JSONRPCNotification, +) mcp.JSONRPCMessage { + s.mu.RLock() + handler, ok := s.notificationHandlers[notification.Method] + s.mu.RUnlock() + + if ok { + handler(ctx, notification) + } + return nil +} + +func createResponse(id interface{}, result interface{}) mcp.JSONRPCMessage { + return mcp.JSONRPCResponse{ + JSONRPC: mcp.JSONRPC_VERSION, + ID: id, + Result: result, + } +} + +func createErrorResponse( + id interface{}, + code int, + message string, +) mcp.JSONRPCMessage { + return mcp.JSONRPCError{ + JSONRPC: mcp.JSONRPC_VERSION, + ID: id, + Error: struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` + }{ + Code: code, + Message: message, + }, + } +} diff --git a/plugins/golang-filter/mcp-server/internal/sse.go b/plugins/golang-filter/mcp-server/internal/sse.go new file mode 100644 index 000000000..51e035432 --- /dev/null +++ b/plugins/golang-filter/mcp-server/internal/sse.go @@ -0,0 +1,212 @@ +package internal + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + "github.com/google/uuid" + "github.com/mark3labs/mcp-go/mcp" +) + +// SSEServer implements a Server-Sent Events (SSE) based MCP server. +// It provides real-time communication capabilities over HTTP using the SSE protocol. +type SSEServer struct { + server *MCPServer + baseURL string + MessageEndpoint string + SSEEndpoint string + sessions map[string]bool + redisClient *RedisClient // Redis client for pub/sub +} + +// Option defines a function type for configuring SSEServer +type Option func(*SSEServer) + +// WithBaseURL sets the base URL for the SSE server +func WithBaseURL(baseURL string) Option { + return func(s *SSEServer) { + s.baseURL = baseURL + } +} + +// WithMessageEndpoint sets the message endpoint path +func WithMessageEndpoint(endpoint string) Option { + return func(s *SSEServer) { + s.MessageEndpoint = endpoint + } +} + +// WithSSEEndpoint sets the SSE endpoint path +func WithSSEEndpoint(endpoint string) Option { + return func(s *SSEServer) { + s.SSEEndpoint = endpoint + } +} + +func WithRedisClient(redisClient *RedisClient) Option { + return func(s *SSEServer) { + s.redisClient = redisClient + } +} + +// NewSSEServer creates a new SSE server instance with the given MCP server and options. +func NewSSEServer(server *MCPServer, opts ...Option) *SSEServer { + s := &SSEServer{ + server: server, + SSEEndpoint: "/sse", + MessageEndpoint: "/message", + sessions: make(map[string]bool), + } + + // Apply all options + for _, opt := range opts { + opt(s) + } + return s +} + +// handleSSE handles incoming SSE connection requests. +// It sets up appropriate headers and creates a new session for the client. +func (s *SSEServer) HandleSSE(cb api.FilterCallbackHandler) { + sessionID := uuid.New().String() + + s.sessions[sessionID] = true + + // sessionStore, _ := json.Marshal(s.sessions) + // TODO: sse:sessions? + // s.redisClient.Set("sse:sessions", string(sessionStore), 0) + defer delete(s.sessions, sessionID) + + channel := fmt.Sprintf("sse:%s", sessionID) + + messageEndpoint := fmt.Sprintf( + "%s%s?sessionId=%s", + s.baseURL, + s.MessageEndpoint, + sessionID, + ) + + // go func() { + // for { + // select { + // case serverNotification := <-s.server.notifications: + // // Only forward notifications meant for this session + // if serverNotification.Context.SessionID == sessionID { + // eventData, err := json.Marshal(serverNotification.Notification) + // if err == nil { + // select { + // case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData): + // // Event queued successfully + // case <-session.done: + // return + // } + // } + // } + // case <-session.done: + // return + // case <-r.Context().Done(): + // return + // } + // } + // }() + + err := s.redisClient.Subscribe(channel, func(message string) { + defer cb.EncoderFilterCallbacks().RecoverPanic() + api.LogInfof("SSE Send message: %s", message) + cb.EncoderFilterCallbacks().InjectData([]byte(message)) + }) + if err != nil { + api.LogErrorf("Failed to subscribe to Redis channel: %v", err) + } + + // Send the initial endpoint event + initialEvent := fmt.Sprintf("event: endpoint\ndata: %s\r\n\r\n", messageEndpoint) + s.redisClient.Publish(channel, initialEvent) +} + +// handleMessage processes incoming JSON-RPC messages from clients and sends responses +// back through both the SSE connection and HTTP response. +func (s *SSEServer) HandleMessage(w http.ResponseWriter, r *http.Request, body json.RawMessage) { + if r.Method != http.MethodPost { + s.writeJSONRPCError(w, nil, mcp.INVALID_REQUEST, fmt.Sprintf("Method %s not allowed", r.Method)) + return + } + + sessionID := r.URL.Query().Get("sessionId") + // if sessionID == "" { + // s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Missing sessionId") + // return + // } + + // Set the client context in the server before handling the message + ctx := s.server.WithContext(r.Context(), NotificationContext{ + ClientID: sessionID, + SessionID: sessionID, + }) + + //TODO: sessions + // _, ok := s.sessions.Load(sessionID) + // if !ok { + // s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Invalid session ID") + // return + // } + + //TODO + // // Parse message as raw JSON + // var rawMessage json.RawMessage + // if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil { + // s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "Parse error") + // return + // } + + // Process message through MCPServer + response := s.server.HandleMessage(ctx, body) + + // Only send response if there is one (not for notifications) + if response != nil { + eventData, _ := json.Marshal(response) + + if sessionID != "" { + channel := fmt.Sprintf("sse:%s", sessionID) + publishErr := s.redisClient.Publish(channel, fmt.Sprintf("event: message\ndata: %s\n\n", eventData)) + + if publishErr != nil { + api.LogErrorf("Failed to publish message to Redis: %v", publishErr) + } + } + // Send HTTP response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) + json.NewEncoder(w).Encode(response) + } else { + // For notifications, just send 202 Accepted with no body + w.WriteHeader(http.StatusAccepted) + } +} + +// writeJSONRPCError writes a JSON-RPC error response with the given error details. +func (s *SSEServer) writeJSONRPCError( + w http.ResponseWriter, + id interface{}, + code int, + message string, +) { + response := createErrorResponse(id, code, message) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + json.NewEncoder(w).Encode(response) +} + +// // ServeHTTP implements the http.Handler interface. +// func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { +// switch r.URL.Path { +// case s.sseEndpoint: +// s.handleSSE(w, r) +// case s.messageEndpoint: +// s.handleMessage(w, r) +// default: +// http.NotFound(w, r) +// } +// } diff --git a/plugins/golang-filter/mcp-server/servers/gorm/db.go b/plugins/golang-filter/mcp-server/servers/gorm/db.go new file mode 100644 index 000000000..5b53b509c --- /dev/null +++ b/plugins/golang-filter/mcp-server/servers/gorm/db.go @@ -0,0 +1,99 @@ +package gorm + +import ( + "fmt" + "log" + "os" + + "gorm.io/driver/clickhouse" + "gorm.io/driver/postgres" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +// DBClient is a struct to handle PostgreSQL connections and operations +type DBClient struct { + db *gorm.DB +} + +// NewDBClient creates a new DBClient instance and establishes a connection to the PostgreSQL database +func NewDBClient(dsn string, dbType string) (*DBClient, error) { + // Configure GORM logger + newLogger := logger.New( + log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer + logger.Config{ + SlowThreshold: 0, // Slow SQL threshold + LogLevel: logger.Info, // Log level + IgnoreRecordNotFoundError: false, // Ignore ErrRecordNotFound error for logger + Colorful: true, // Disable color + }, + ) + var db *gorm.DB + var err error + if dbType == "postgres" { + db, err = gorm.Open(postgres.Open(dsn), &gorm.Config{ + Logger: newLogger, + }) + } else if dbType == "clickhouse" { + db, err = gorm.Open(clickhouse.Open(dsn), &gorm.Config{}) + } else { + return nil, fmt.Errorf("unsupported database type") + } + // Connect to the database + if err != nil { + return nil, fmt.Errorf("failed to connect to database: %w", err) + } + + return &DBClient{db: db}, nil +} + +// ExecuteSQL executes a raw SQL query and returns the result as a slice of maps +func (c *DBClient) ExecuteSQL(query string, args ...interface{}) ([]map[string]interface{}, error) { + rows, err := c.db.Raw(query, args...).Rows() + if err != nil { + return nil, fmt.Errorf("failed to execute SQL query: %w", err) + } + defer rows.Close() + + // Get column names + columns, err := rows.Columns() + if err != nil { + return nil, fmt.Errorf("failed to get columns: %w", err) + } + + // Prepare a slice to hold the results + var results []map[string]interface{} + + // Iterate over the rows + for rows.Next() { + // Create a slice of interface{}'s to represent each column, + // and a second slice to contain pointers to each item in the columns slice. + columnsData := make([]interface{}, len(columns)) + columnsPointers := make([]interface{}, len(columns)) + for i := range columnsData { + columnsPointers[i] = &columnsData[i] + } + + // Scan the result into the column pointers... + if err := rows.Scan(columnsPointers...); err != nil { + return nil, fmt.Errorf("failed to scan row: %w", err) + } + + // Create a map to hold the column name and value + rowMap := make(map[string]interface{}) + for i, colName := range columns { + val := columnsData[i] + b, ok := val.([]byte) + if ok { + rowMap[colName] = string(b) + } else { + rowMap[colName] = val + } + } + + // Append the map to the results slice + results = append(results, rowMap) + } + + return results, nil +} diff --git a/plugins/golang-filter/mcp-server/servers/gorm/tools.go b/plugins/golang-filter/mcp-server/servers/gorm/tools.go new file mode 100644 index 000000000..cbeefa25f --- /dev/null +++ b/plugins/golang-filter/mcp-server/servers/gorm/tools.go @@ -0,0 +1,132 @@ +package gorm + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/envoyproxy/envoy/examples/golang-http/simple/internal" + "github.com/mark3labs/mcp-go/mcp" +) + +const favoriteFilesTemplate = ` +WITH current_files AS ( + SELECT path + FROM ( + SELECT + old_path AS path, + max(time) AS last_time, + 2 AS change_type + FROM git.file_changes + GROUP BY old_path + UNION ALL + SELECT + path, + max(time) AS last_time, + argMax(change_type, time) AS change_type + FROM git.file_changes + GROUP BY path + ) + GROUP BY path + HAVING (argMax(change_type, last_time) != 2) AND (NOT match(path, '(^dbms/)|(^libs/)|(^tests/testflows/)|(^programs/server/store/)')) + ORDER BY path ASC +) +SELECT + path, + count() AS c +FROM git.file_changes +WHERE (author = '%s') AND (path IN (current_files)) +GROUP BY path +ORDER BY c DESC +LIMIT 10` + +// HandleQueryTool handles SQL query execution +func HandleQueryTool(dbClient *DBClient) internal.ToolHandlerFunc { + return func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + arguments := request.Params.Arguments + message, ok := arguments["sql"].(string) + if !ok { + return nil, fmt.Errorf("invalid message argument") + } + + results, err := dbClient.ExecuteSQL(message) + if err != nil { + return nil, fmt.Errorf("failed to execute SQL query: %w", err) + } + + jsonData, err := json.Marshal(results) + if err != nil { + return nil, fmt.Errorf("failed to marshal SQL results: %w", err) + } + + return &mcp.CallToolResult{ + Content: []mcp.Content{ + mcp.TextContent{ + Type: "text", + Text: string(jsonData), + }, + }, + }, nil + } +} + +// HandleFavoriteTool handles author's favorite files query +func HandleFavoriteTool(dbClient *DBClient) internal.ToolHandlerFunc { + return func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + arguments := request.Params.Arguments + author, ok := arguments["author"].(string) + if !ok { + return nil, fmt.Errorf("invalid author argument") + } + query := fmt.Sprintf(favoriteFilesTemplate, author) + + results, err := dbClient.ExecuteSQL(query) + if err != nil { + return nil, fmt.Errorf("failed to execute SQL query: %w", err) + } + + jsonData, err := json.Marshal(results) + if err != nil { + return nil, fmt.Errorf("failed to marshal SQL results: %w", err) + } + + return &mcp.CallToolResult{ + Content: []mcp.Content{ + mcp.TextContent{ + Type: "text", + Text: string(jsonData), + }, + }, + }, nil + } +} + +// GetQueryToolSchema returns the schema for query tool +func GetQueryToolSchema() json.RawMessage { + return json.RawMessage(` + { + "type": "object", + "properties": { + "sql": { + "type": "string", + "description": "The sql query to execute" + } + } + } + `) +} + +// GetFavoriteToolSchema returns the schema for favorite files tool +func GetFavoriteToolSchema() json.RawMessage { + return json.RawMessage(` + { + "type": "object", + "properties": { + "author": { + "type": "string", + "description": "the author name" + } + } + } + `) +} diff --git a/tools/hack/build-golang-filters.sh b/tools/hack/build-golang-filters.sh new file mode 100755 index 000000000..4a63e2ea1 --- /dev/null +++ b/tools/hack/build-golang-filters.sh @@ -0,0 +1,39 @@ +# Copyright (c) 2022 Alibaba Group Holding Ltd. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/usr/bin/env bash + +set -euo pipefail + +INNER_GO_FILTER_NAME=${GO_FILTER_NAME-""} +# OUTPUT_PACKAGE_DIR=${OUTPUT_PACKAGE_DIR:-"../external/package/"} + +cd ./plugins/golang-filter +if [ ! -n "$INNER_GO_FILTER_NAME" ]; then + GO_FILTERS_DIR=$(pwd) + echo "🚀 Build all Go Filters under folder of $GO_FILTERS_DIR" + for file in `ls $GO_FILTERS_DIR` + do + if [ -d $GO_FILTERS_DIR/$file ]; then + name=${file##*/} + echo "🚀 Build Go Filter: $name" + GO_FILTER_NAME=${name} make build + # cp ${GO_FILTERS_DIR}/${file}/${name}.so ${OUTPUT_PACKAGE_DIR} + fi + done +else + echo "🚀 Build Go Filter: $INNER_GO_FILTER_NAME" + GO_FILTER_NAME=${INNER_GO_FILTER_NAME} make build +fi +