feat: add golang filter and mcp-server (#1942)

Co-authored-by: johnlanni <zty98751@alibaba-inc.com>
This commit is contained in:
Jingze
2025-03-24 11:07:03 +08:00
committed by GitHub
parent 45fbc8b084
commit f5c1e7f2ec
14 changed files with 1942 additions and 0 deletions

View File

@@ -167,6 +167,9 @@ build-gateway: prebuild buildx-prepare
build-gateway-local: prebuild
TARGET_ARCH=${TARGET_ARCH} DOCKER_TARGETS="docker.proxyv2" ./tools/hack/build-istio-image.sh docker
build-golang-filter:
./tools/hack/build-golang-filters.sh
build-istio: prebuild buildx-prepare
DOCKER_TARGETS="docker.pilot" IMG_URL="${IMG_URL}" ./tools/hack/build-istio-image.sh docker.buildx

View File

@@ -0,0 +1,20 @@
FROM golang:1.23 AS golang-base
ARG GOPROXY
ARG GO_FILTER_NAME
ENV GOFLAGS=-buildvcs=false
ENV GOPROXY=${GOPROXY}
WORKDIR /workspace
COPY . .
WORKDIR /workspace/$GO_FILTER_NAME
RUN go mod tidy
RUN go build -o /$GO_FILTER_NAME.so -buildmode=c-shared .
FROM scratch AS output
ARG GO_FILTER_NAME
COPY --from=golang-base /$GO_FILTER_NAME.so $GO_FILTER_NAME.so

View File

@@ -0,0 +1,10 @@
GO_FILTER_NAME ?= mcp-server
GOPROXY := $(shell go env GOPROXY)
.DEFAULT:
build:
DOCKER_BUILDKIT=1 docker build --build-arg GOPROXY=$(GOPROXY) \
--build-arg GO_FILTER_NAME=${GO_FILTER_NAME} \
-t ${GO_FILTER_NAME} \
--output ./${GO_FILTER_NAME} \
.

View File

@@ -0,0 +1,9 @@
## 介绍
## 快速构建
使用以下命令可以快速构建 golang filter 插件:
```bash
GO_FILTER_NAME=mcp-server make build
```

View File

@@ -0,0 +1,122 @@
package main
import (
"errors"
"fmt"
xds "github.com/cncf/xds/go/xds/type/v3"
"github.com/mark3labs/mcp-go/mcp"
"google.golang.org/protobuf/types/known/anypb"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
envoyHttp "github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http"
"github.com/envoyproxy/envoy/examples/golang-http/simple/internal"
"github.com/envoyproxy/envoy/examples/golang-http/simple/servers/gorm"
)
const Name = "mcp-server"
const SCHEME_PATH = "scheme"
func init() {
envoyHttp.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
}
type config struct {
echoBody string
// other fields
dbClient *gorm.DBClient
redisClient *internal.RedisClient
stopChan chan struct{}
SSEServer *internal.SSEServer
}
type parser struct {
}
// Parse the filter configuration
func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) {
configStruct := &xds.TypedStruct{}
if err := any.UnmarshalTo(configStruct); err != nil {
return nil, err
}
v := configStruct.Value
conf := &config{}
dsn, ok := v.AsMap()["dsn"].(string)
if !ok {
return nil, errors.New("missing dsn")
}
dbType, ok := v.AsMap()["dbType"].(string)
if !ok {
return nil, errors.New("missing database type")
}
dbClient, err := gorm.NewDBClient(dsn, dbType)
if err != nil {
return nil, fmt.Errorf("failed to initialize DBClient: %w", err)
}
conf.dbClient = dbClient
conf.stopChan = make(chan struct{})
redisClient, err := internal.NewRedisClient("localhost:6379", conf.stopChan)
if err != nil {
return nil, fmt.Errorf("failed to initialize RedisClient: %w", err)
}
conf.redisClient = redisClient
conf.SSEServer = internal.NewSSEServer(NewServer(conf.dbClient), internal.WithRedisClient(redisClient))
return conf, nil
}
func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
parentConfig := parent.(*config)
childConfig := child.(*config)
newConfig := *parentConfig
if childConfig.echoBody != "" {
newConfig.echoBody = childConfig.echoBody
}
if childConfig.dbClient != nil {
newConfig.dbClient = childConfig.dbClient
}
if childConfig.redisClient != nil {
newConfig.redisClient = childConfig.redisClient
}
return &newConfig
}
func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
conf, ok := c.(*config)
if !ok {
panic("unexpected config type")
}
return &filter{
callbacks: callbacks,
config: conf,
}
}
func NewServer(dbClient *gorm.DBClient) *internal.MCPServer {
mcpServer := internal.NewMCPServer(
"mcp-server-envoy-poc",
"1.0.0",
)
// Add query tool
mcpServer.AddTool(
mcp.NewToolWithRawSchema("query", "Run a read-only SQL query in clickhouse database with repository git data", gorm.GetQueryToolSchema()),
gorm.HandleQueryTool(dbClient),
)
api.LogInfo("Added query tool successfully")
// Add favorite files tool
mcpServer.AddTool(
mcp.NewToolWithRawSchema("author_favorite_files", "Favorite files for an author", gorm.GetFavoriteToolSchema()),
gorm.HandleFavoriteTool(dbClient),
)
return mcpServer
}
func main() {}

View File

@@ -0,0 +1,128 @@
package main
import (
"net/http"
"net/http/httptest"
"net/url"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
)
// The callbacks in the filter, like `DecodeHeaders`, can be implemented on demand.
// Because api.PassThroughStreamFilter provides a default implementation.
type filter struct {
api.PassThroughStreamFilter
callbacks api.FilterCallbackHandler
path string
config *config
req *http.Request
sse bool
message bool
}
// Callbacks which are called in request path
// The endStream is true if the request doesn't have body
func (f *filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.StatusType {
fullPath, _ := header.Get(":path")
parsedURL, _ := url.Parse(fullPath)
f.path = parsedURL.Path
method, _ := header.Get(":method")
api.LogInfo(f.path)
if f.path == f.config.SSEServer.SSEEndpoint {
if method != http.MethodGet {
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "")
} else {
f.sse = true
body := "SSE connection create"
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusOK, body, nil, 0, "")
}
api.LogInfo("SSE connection started")
return api.LocalReply
} else if f.path == f.config.SSEServer.MessageEndpoint {
if method != http.MethodPost {
f.callbacks.DecoderFilterCallbacks().SendLocalReply(http.StatusMethodNotAllowed, "Method not allowed", nil, 0, "")
}
// Create a new http.Request object
f.req = &http.Request{
Method: method,
URL: parsedURL,
Header: make(http.Header),
}
api.LogInfof("Message request: %v", parsedURL)
// Copy headers from api.RequestHeaderMap to http.Header
header.Range(func(key, value string) bool {
f.req.Header.Add(key, value)
return true
})
f.message = true
if endStream {
return api.Continue
} else {
return api.StopAndBuffer
}
}
if endStream {
return api.Continue
} else {
return api.StopAndBuffer
}
}
// DecodeData might be called multiple times during handling the request body.
// The endStream is true when handling the last piece of the body.
func (f *filter) DecodeData(buffer api.BufferInstance, endStream bool) api.StatusType {
api.LogInfo("Message DecodeData")
// support suspending & resuming the filter in a background goroutine
api.LogInfof("DecodeData: {%v}", buffer)
if f.message {
// Create a response recorder to capture the response
recorder := httptest.NewRecorder()
// Call the handleMessage method of SSEServer
f.config.SSEServer.HandleMessage(recorder, f.req, buffer.Bytes())
f.message = false
api.LogInfof("Message DecodeData SendLocalReply %v", recorder)
f.callbacks.DecoderFilterCallbacks().SendLocalReply(recorder.Code, recorder.Body.String(), recorder.Header(), 0, "")
return api.LocalReply
}
return api.Continue
}
// Callbacks which are called in response path
// The endStream is true if the response doesn't have body
func (f *filter) EncodeHeaders(header api.ResponseHeaderMap, endStream bool) api.StatusType {
if f.sse {
header.Set("Content-Type", "text/event-stream")
header.Set("Cache-Control", "no-cache")
header.Set("Connection", "keep-alive")
header.Set("Access-Control-Allow-Origin", "*")
header.Del("Content-Length")
api.LogInfo("SSE connection header set")
return api.Continue
}
return api.Continue
}
// TODO: 连接多种数据库
// TODO: 多种存储类型
// TODO: 数据库多个实例
// EncodeData might be called multiple times during handling the response body.
// The endStream is true when handling the last piece of the body.
func (f *filter) EncodeData(buffer api.BufferInstance, endStream bool) api.StatusType {
if f.sse {
//TODO: buffer cleanup
f.config.SSEServer.HandleSSE(f.callbacks)
f.sse = false
return api.Running
}
return api.Continue
}
// OnDestroy 或 OnStreamComplete 中停止 goroutine
func (f *filter) OnDestroy(reason api.DestroyReason) {
if f.sse && f.config.stopChan != nil {
api.LogInfo("Stopping SSE connection")
close(f.config.stopChan)
}
}

View File

@@ -0,0 +1,49 @@
module github.com/envoyproxy/envoy/examples/golang-http/simple
go 1.23
require (
github.com/envoyproxy/envoy v1.33.1-0.20250224062430-6c11eac01993
google.golang.org/protobuf v1.36.5
github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.6.0
github.com/mark3labs/mcp-go v0.12.0
gorm.io/driver/clickhouse v0.6.1
gorm.io/driver/postgres v1.5.11
gorm.io/gorm v1.25.12
)
require (
cel.dev/expr v0.15.0 // indirect
github.com/ClickHouse/ch-go v0.61.5 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.23.2 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.5 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
go.opentelemetry.io/otel v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -0,0 +1,173 @@
cel.dev/expr v0.15.0 h1:O1jzfJCQBfL5BFoYktaxwIhuttaQPsVWerH9/EEKx0w=
cel.dev/expr v0.15.0/go.mod h1:TRSuuV7DlVCE/uwv5QbAiW/v8l5O8C4eEPHeu7gf7Sg=
github.com/ClickHouse/ch-go v0.61.5 h1:zwR8QbYI0tsMiEcze/uIMK+Tz1D3XZXLdNrlaOpeEI4=
github.com/ClickHouse/ch-go v0.61.5/go.mod h1:s1LJW/F/LcFs5HJnuogFMta50kKDO0lf9zzfrbl0RQg=
github.com/ClickHouse/clickhouse-go/v2 v2.23.2 h1:+DAKPMnxLS7pduQZsrJc8OhdLS2L9MfDEJ2TS+hpYDM=
github.com/ClickHouse/clickhouse-go/v2 v2.23.2/go.mod h1:aNap51J1OM3yxQJRgM+AlP/MPkGBCL8A74uQThoQhR0=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 h1:Om6kYQYDUk5wWbT0t0q6pvyM49i9XZAv9dDrkDA7gjk=
github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/envoyproxy/envoy v1.33.1-0.20250224062430-6c11eac01993 h1:98rKr5Irapq0t68+sHM78LIkflsiDVttSExZTaqsxSo=
github.com/envoyproxy/envoy v1.33.1-0.20250224062430-6c11eac01993/go.mod h1:x7d0dNbE0xGuDBUkBg19VGCgnPQ+lJ2k8lDzDzKExow=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg=
github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mark3labs/mcp-go v0.12.0 h1:Pue1Tdwqcz77GHq18uzgmLT3wmeDUxXUSAqSwhGLhVo=
github.com/mark3labs/mcp-go v0.12.0/go.mod h1:cjMlBU0cv/cj9kjlgmRhoJ5JREdS7YX83xeIG9Ko/jE=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v1.24.2 h1:J/tulyYK6JwBldPViHJReihxxZ+22FHs0piGjQAvoUE=
github.com/onsi/gomega v1.24.2/go.mod h1:gs3J10IS7Z7r7eXRoNJIrNqU4ToQukCJhFtKrWgHWnk=
github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU=
github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU=
github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs=
go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4=
go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA=
go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d h1:DoPTO70H+bcDXcd39vOqb2viZxgqeBeSGtZ55yZU4/Q=
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/clickhouse v0.6.1 h1:t7JMB6sLBXxN8hEO6RdzCbJCwq/jAEVZdwXlmQs1Sd4=
gorm.io/driver/clickhouse v0.6.1/go.mod h1:riMYpJcGZ3sJ/OAZZ1rEP1j/Y0H6cByOAnwz7fo2AyM=
gorm.io/driver/postgres v1.5.11 h1:ubBVAfbKEUld/twyKZ0IYn9rSQh448EdelLYk9Mv314=
gorm.io/driver/postgres v1.5.11/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI=
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=

View File

@@ -0,0 +1,100 @@
package internal
import (
"context"
"fmt"
"log"
"time"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"github.com/go-redis/redis/v8"
)
// RedisClient is a struct to handle Redis connections and operations
type RedisClient struct {
client *redis.Client
ctx context.Context
stopChan chan struct{}
}
// NewRedisClient creates a new RedisClient instance and establishes a connection to the Redis server
func NewRedisClient(address string, stopChan chan struct{}) (*RedisClient, error) {
client := redis.NewClient(&redis.Options{
Addr: address,
Password: "", // no password set
DB: 0, // use default DB
})
// Ping the Redis server to check the connection
pong, err := client.Ping(context.Background()).Result()
if err != nil {
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
}
log.Printf("Connected to Redis: %s", pong)
return &RedisClient{
client: client,
ctx: context.Background(),
stopChan: stopChan,
}, nil
}
// TODO: redis keep alive check
// TODO: redis pub sub memory limit
// Publish publishes a message to a Redis channel
func (r *RedisClient) Publish(channel string, message string) error {
err := r.client.Publish(r.ctx, channel, message).Err()
if err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}
return nil
}
// Subscribe subscribes to a Redis channel and processes messages
func (r *RedisClient) Subscribe(channel string, callback func(message string)) error {
pubsub := r.client.Subscribe(r.ctx, channel)
_, err := pubsub.Receive(r.ctx)
if err != nil {
return fmt.Errorf("failed to subscribe to channel: %w", err)
}
go func() {
defer pubsub.Close()
for {
select {
case <-r.stopChan:
api.LogDebugf("Stopping subscription to channel %s", channel)
return
default:
msg, err := pubsub.ReceiveMessage(r.ctx)
if err != nil {
log.Printf("Error receiving message: %v", err)
return
}
callback(msg.Payload)
}
}
}()
return nil
}
// Set sets the value of a key in Redis
func (r *RedisClient) Set(key string, value string, expiration time.Duration) error {
err := r.client.Set(r.ctx, key, value, expiration).Err()
if err != nil {
return fmt.Errorf("failed to set key: %w", err)
}
return nil
}
// Get retrieves the value of a key from Redis
func (r *RedisClient) Get(key string) (string, error) {
val, err := r.client.Get(r.ctx, key).Result()
if err == redis.Nil {
return "", fmt.Errorf("key does not exist")
} else if err != nil {
return "", fmt.Errorf("failed to get key: %w", err)
}
return val, nil
}

View File

@@ -0,0 +1,846 @@
// Package server provides MCP (Model Control Protocol) server implementations.
package internal
import (
"context"
"encoding/json"
"fmt"
"regexp"
"sort"
"sync"
"sync/atomic"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"github.com/mark3labs/mcp-go/mcp"
)
// resourceEntry holds both a resource and its handler
type resourceEntry struct {
resource mcp.Resource
handler ResourceHandlerFunc
}
// resourceTemplateEntry holds both a template and its handler
type resourceTemplateEntry struct {
template mcp.ResourceTemplate
handler ResourceTemplateHandlerFunc
}
// ServerOption is a function that configures an MCPServer.
type ServerOption func(*MCPServer)
// ResourceHandlerFunc is a function that returns resource contents.
type ResourceHandlerFunc func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error)
// ResourceTemplateHandlerFunc is a function that returns a resource template.
type ResourceTemplateHandlerFunc func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error)
// PromptHandlerFunc handles prompt requests with given arguments.
type PromptHandlerFunc func(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error)
// ToolHandlerFunc handles tool calls with given arguments.
type ToolHandlerFunc func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error)
// ServerTool combines a Tool with its ToolHandlerFunc.
type ServerTool struct {
Tool mcp.Tool
Handler ToolHandlerFunc
}
// NotificationContext provides client identification for notifications
type NotificationContext struct {
ClientID string
SessionID string
}
// ServerNotification combines the notification with client context
type ServerNotification struct {
Context NotificationContext
Notification mcp.JSONRPCNotification
}
// NotificationHandlerFunc handles incoming notifications.
type NotificationHandlerFunc func(ctx context.Context, notification mcp.JSONRPCNotification)
// MCPServer implements a Model Control Protocol server that can handle various types of requests
// including resources, prompts, and tools.
type MCPServer struct {
mu sync.RWMutex // Add mutex for protecting shared resources
name string
version string
instructions string
resources map[string]resourceEntry
resourceTemplates map[string]resourceTemplateEntry
prompts map[string]mcp.Prompt
promptHandlers map[string]PromptHandlerFunc
tools map[string]ServerTool
notificationHandlers map[string]NotificationHandlerFunc
capabilities serverCapabilities
notifications chan ServerNotification
clientMu sync.Mutex // Separate mutex for client context
currentClient NotificationContext
initialized atomic.Bool // Use atomic for the initialized flag
}
// serverKey is the context key for storing the server instance
type serverKey struct{}
// ServerFromContext retrieves the MCPServer instance from a context
func ServerFromContext(ctx context.Context) *MCPServer {
if srv, ok := ctx.Value(serverKey{}).(*MCPServer); ok {
return srv
}
return nil
}
// WithContext sets the current client context and returns the provided context
func (s *MCPServer) WithContext(
ctx context.Context,
notifCtx NotificationContext,
) context.Context {
s.clientMu.Lock()
s.currentClient = notifCtx
s.clientMu.Unlock()
return ctx
}
// SendNotificationToClient sends a notification to the current client
func (s *MCPServer) SendNotificationToClient(
method string,
params map[string]interface{},
) error {
if s.notifications == nil {
return fmt.Errorf("notification channel not initialized")
}
s.clientMu.Lock()
clientContext := s.currentClient
s.clientMu.Unlock()
notification := mcp.JSONRPCNotification{
JSONRPC: mcp.JSONRPC_VERSION,
Notification: mcp.Notification{
Method: method,
Params: mcp.NotificationParams{
AdditionalFields: params,
},
},
}
select {
case s.notifications <- ServerNotification{
Context: clientContext,
Notification: notification,
}:
return nil
default:
return fmt.Errorf("notification channel full or blocked")
}
}
// serverCapabilities defines the supported features of the MCP server
type serverCapabilities struct {
tools *toolCapabilities
resources *resourceCapabilities
prompts *promptCapabilities
logging bool
}
// resourceCapabilities defines the supported resource-related features
type resourceCapabilities struct {
subscribe bool
listChanged bool
}
// promptCapabilities defines the supported prompt-related features
type promptCapabilities struct {
listChanged bool
}
// toolCapabilities defines the supported tool-related features
type toolCapabilities struct {
listChanged bool
}
// WithResourceCapabilities configures resource-related server capabilities
func WithResourceCapabilities(subscribe, listChanged bool) ServerOption {
return func(s *MCPServer) {
// Always create a non-nil capability object
s.capabilities.resources = &resourceCapabilities{
subscribe: subscribe,
listChanged: listChanged,
}
}
}
// WithPromptCapabilities configures prompt-related server capabilities
func WithPromptCapabilities(listChanged bool) ServerOption {
return func(s *MCPServer) {
// Always create a non-nil capability object
s.capabilities.prompts = &promptCapabilities{
listChanged: listChanged,
}
}
}
// WithToolCapabilities configures tool-related server capabilities
func WithToolCapabilities(listChanged bool) ServerOption {
return func(s *MCPServer) {
// Always create a non-nil capability object
s.capabilities.tools = &toolCapabilities{
listChanged: listChanged,
}
}
}
// WithLogging enables logging capabilities for the server
func WithLogging() ServerOption {
return func(s *MCPServer) {
s.capabilities.logging = true
}
}
// WithInstructions sets the server instructions for the client returned in the initialize response
func WithInstructions(instructions string) ServerOption {
return func(s *MCPServer) {
s.instructions = instructions
}
}
// NewMCPServer creates a new MCP server instance with the given name, version and options
func NewMCPServer(
name, version string,
opts ...ServerOption,
) *MCPServer {
s := &MCPServer{
resources: make(map[string]resourceEntry),
resourceTemplates: make(map[string]resourceTemplateEntry),
prompts: make(map[string]mcp.Prompt),
promptHandlers: make(map[string]PromptHandlerFunc),
tools: make(map[string]ServerTool),
name: name,
version: version,
notificationHandlers: make(map[string]NotificationHandlerFunc),
notifications: make(chan ServerNotification, 100),
capabilities: serverCapabilities{
tools: nil,
resources: nil,
prompts: nil,
logging: false,
},
}
for _, opt := range opts {
opt(s)
}
return s
}
// HandleMessage processes an incoming JSON-RPC message and returns an appropriate response
func (s *MCPServer) HandleMessage(
ctx context.Context,
message json.RawMessage,
) mcp.JSONRPCMessage {
// Add server to context
ctx = context.WithValue(ctx, serverKey{}, s)
var baseMessage struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
ID interface{} `json:"id,omitempty"`
}
if err := json.Unmarshal(message, &baseMessage); err != nil {
return createErrorResponse(
nil,
mcp.PARSE_ERROR,
"Failed to parse message",
)
}
// Check for valid JSONRPC version
if baseMessage.JSONRPC != mcp.JSONRPC_VERSION {
return createErrorResponse(
baseMessage.ID,
mcp.INVALID_REQUEST,
"Invalid JSON-RPC version",
)
}
if baseMessage.ID == nil {
var notification mcp.JSONRPCNotification
if err := json.Unmarshal(message, &notification); err != nil {
return createErrorResponse(
nil,
mcp.PARSE_ERROR,
"Failed to parse notification",
)
}
s.handleNotification(ctx, notification)
return nil // Return nil for notifications
}
api.LogInfof("HandleMessage: %s", baseMessage.Method)
switch baseMessage.Method {
case "initialize":
var request mcp.InitializeRequest
if err := json.Unmarshal(message, &request); err != nil {
return createErrorResponse(
baseMessage.ID,
mcp.INVALID_REQUEST,
"Invalid initialize request",
)
}
return s.handleInitialize(ctx, baseMessage.ID, request)
case "ping":
var request mcp.PingRequest
if err := json.Unmarshal(message, &request); err != nil {
return createErrorResponse(
baseMessage.ID,
mcp.INVALID_REQUEST,
"Invalid ping request",
)
}
return s.handlePing(ctx, baseMessage.ID, request)
case "resources/list":
if s.capabilities.resources == nil {
return createErrorResponse(
baseMessage.ID,
mcp.METHOD_NOT_FOUND,
"Resources not supported",
)
}
var request mcp.ListResourcesRequest
if err := json.Unmarshal(message, &request); err != nil {
return createErrorResponse(
baseMessage.ID,
mcp.INVALID_REQUEST,
"Invalid list resources request",
)
}
return s.handleListResources(ctx, baseMessage.ID, request)
case "resources/templates/list":
if s.capabilities.resources == nil {
return createErrorResponse(
baseMessage.ID,
mcp.METHOD_NOT_FOUND,
"Resources not supported",
)
}
var request mcp.ListResourceTemplatesRequest
if err := json.Unmarshal(message, &request); err != nil {
return createErrorResponse(
baseMessage.ID,
mcp.INVALID_REQUEST,
"Invalid list resource templates request",
)
}
return s.handleListResourceTemplates(ctx, baseMessage.ID, request)
case "resources/read":
if s.capabilities.resources == nil {
return createErrorResponse(
baseMessage.ID,
mcp.METHOD_NOT_FOUND,
"Resources not supported",
)
}
var request mcp.ReadResourceRequest
if err := json.Unmarshal(message, &request); err != nil {
return createErrorResponse(
baseMessage.ID,
mcp.INVALID_REQUEST,
"Invalid read resource request",
)
}
return s.handleReadResource(ctx, baseMessage.ID, request)
case "prompts/list":
if s.capabilities.prompts == nil {
return createErrorResponse(
baseMessage.ID,
mcp.METHOD_NOT_FOUND,
"Prompts not supported",
)
}
var request mcp.ListPromptsRequest
if err := json.Unmarshal(message, &request); err != nil {
return createErrorResponse(
baseMessage.ID,
mcp.INVALID_REQUEST,
"Invalid list prompts request",
)
}
return s.handleListPrompts(ctx, baseMessage.ID, request)
case "prompts/get":
if s.capabilities.prompts == nil {
return createErrorResponse(
baseMessage.ID,
mcp.METHOD_NOT_FOUND,
"Prompts not supported",
)
}
var request mcp.GetPromptRequest
if err := json.Unmarshal(message, &request); err != nil {
return createErrorResponse(
baseMessage.ID,
mcp.INVALID_REQUEST,
"Invalid get prompt request",
)
}
return s.handleGetPrompt(ctx, baseMessage.ID, request)
case "tools/list":
if s.capabilities.tools == nil {
return createErrorResponse(
baseMessage.ID,
mcp.METHOD_NOT_FOUND,
"Tools not supported",
)
}
var request mcp.ListToolsRequest
if err := json.Unmarshal(message, &request); err != nil {
return createErrorResponse(
baseMessage.ID,
mcp.INVALID_REQUEST,
"Invalid list tools request",
)
}
return s.handleListTools(ctx, baseMessage.ID, request)
case "tools/call":
if s.capabilities.tools == nil {
return createErrorResponse(
baseMessage.ID,
mcp.METHOD_NOT_FOUND,
"Tools not supported",
)
}
var request mcp.CallToolRequest
if err := json.Unmarshal(message, &request); err != nil {
return createErrorResponse(
baseMessage.ID,
mcp.INVALID_REQUEST,
"Invalid call tool request",
)
}
return s.handleToolCall(ctx, baseMessage.ID, request)
default:
return createErrorResponse(
baseMessage.ID,
mcp.METHOD_NOT_FOUND,
fmt.Sprintf("Method %s not found", baseMessage.Method),
)
}
}
// AddResource registers a new resource and its handler
func (s *MCPServer) AddResource(
resource mcp.Resource,
handler ResourceHandlerFunc,
) {
if s.capabilities.resources == nil {
s.capabilities.resources = &resourceCapabilities{}
}
s.mu.Lock()
defer s.mu.Unlock()
s.resources[resource.URI] = resourceEntry{
resource: resource,
handler: handler,
}
}
// AddResourceTemplate registers a new resource template and its handler
func (s *MCPServer) AddResourceTemplate(
template mcp.ResourceTemplate,
handler ResourceTemplateHandlerFunc,
) {
if s.capabilities.resources == nil {
s.capabilities.resources = &resourceCapabilities{}
}
s.mu.Lock()
defer s.mu.Unlock()
s.resourceTemplates[template.URITemplate] = resourceTemplateEntry{
template: template,
handler: handler,
}
}
// AddPrompt registers a new prompt handler with the given name
func (s *MCPServer) AddPrompt(prompt mcp.Prompt, handler PromptHandlerFunc) {
if s.capabilities.prompts == nil {
s.capabilities.prompts = &promptCapabilities{}
}
s.mu.Lock()
defer s.mu.Unlock()
s.prompts[prompt.Name] = prompt
s.promptHandlers[prompt.Name] = handler
}
// AddTool registers a new tool and its handler
func (s *MCPServer) AddTool(tool mcp.Tool, handler ToolHandlerFunc) {
s.AddTools(ServerTool{Tool: tool, Handler: handler})
}
// AddTools registers multiple tools at once
func (s *MCPServer) AddTools(tools ...ServerTool) {
if s.capabilities.tools == nil {
s.capabilities.tools = &toolCapabilities{}
}
s.mu.Lock()
for _, entry := range tools {
s.tools[entry.Tool.Name] = entry
}
initialized := s.initialized.Load()
s.mu.Unlock()
// Send notification if server is already initialized
if initialized {
if err := s.SendNotificationToClient("notifications/tools/list_changed", nil); err != nil {
// We can't return the error, but in a future version we could log it
}
}
}
// SetTools replaces all existing tools with the provided list
func (s *MCPServer) SetTools(tools ...ServerTool) {
s.mu.Lock()
s.tools = make(map[string]ServerTool)
s.mu.Unlock()
s.AddTools(tools...)
}
// DeleteTools removes a tool from the server
func (s *MCPServer) DeleteTools(names ...string) {
s.mu.Lock()
for _, name := range names {
delete(s.tools, name)
}
initialized := s.initialized.Load()
s.mu.Unlock()
// Send notification if server is already initialized
if initialized {
if err := s.SendNotificationToClient("notifications/tools/list_changed", nil); err != nil {
// We can't return the error, but in a future version we could log it
}
}
}
// AddNotificationHandler registers a new handler for incoming notifications
func (s *MCPServer) AddNotificationHandler(
method string,
handler NotificationHandlerFunc,
) {
s.mu.Lock()
defer s.mu.Unlock()
s.notificationHandlers[method] = handler
}
func (s *MCPServer) handleInitialize(
ctx context.Context,
id interface{},
request mcp.InitializeRequest,
) mcp.JSONRPCMessage {
capabilities := mcp.ServerCapabilities{}
// Only add resource capabilities if they're configured
if s.capabilities.resources != nil {
capabilities.Resources = &struct {
Subscribe bool `json:"subscribe,omitempty"`
ListChanged bool `json:"listChanged,omitempty"`
}{
Subscribe: s.capabilities.resources.subscribe,
ListChanged: s.capabilities.resources.listChanged,
}
}
// Only add prompt capabilities if they're configured
if s.capabilities.prompts != nil {
capabilities.Prompts = &struct {
ListChanged bool `json:"listChanged,omitempty"`
}{
ListChanged: s.capabilities.prompts.listChanged,
}
}
// Only add tool capabilities if they're configured
if s.capabilities.tools != nil {
capabilities.Tools = &struct {
ListChanged bool `json:"listChanged,omitempty"`
}{
ListChanged: s.capabilities.tools.listChanged,
}
}
if s.capabilities.logging {
capabilities.Logging = &struct{}{}
}
result := mcp.InitializeResult{
ProtocolVersion: mcp.LATEST_PROTOCOL_VERSION,
ServerInfo: mcp.Implementation{
Name: s.name,
Version: s.version,
},
Capabilities: capabilities,
Instructions: s.instructions,
}
s.initialized.Store(true)
return createResponse(id, result)
}
func (s *MCPServer) handlePing(
ctx context.Context,
id interface{},
request mcp.PingRequest,
) mcp.JSONRPCMessage {
return createResponse(id, mcp.EmptyResult{})
}
func (s *MCPServer) handleListResources(
ctx context.Context,
id interface{},
request mcp.ListResourcesRequest,
) mcp.JSONRPCMessage {
s.mu.RLock()
resources := make([]mcp.Resource, 0, len(s.resources))
for _, entry := range s.resources {
resources = append(resources, entry.resource)
}
s.mu.RUnlock()
result := mcp.ListResourcesResult{
Resources: resources,
}
if request.Params.Cursor != "" {
result.NextCursor = "" // Handle pagination if needed
}
return createResponse(id, result)
}
func (s *MCPServer) handleListResourceTemplates(
ctx context.Context,
id interface{},
request mcp.ListResourceTemplatesRequest,
) mcp.JSONRPCMessage {
s.mu.RLock()
templates := make([]mcp.ResourceTemplate, 0, len(s.resourceTemplates))
for _, entry := range s.resourceTemplates {
templates = append(templates, entry.template)
}
s.mu.RUnlock()
result := mcp.ListResourceTemplatesResult{
ResourceTemplates: templates,
}
if request.Params.Cursor != "" {
result.NextCursor = "" // Handle pagination if needed
}
return createResponse(id, result)
}
func (s *MCPServer) handleReadResource(
ctx context.Context,
id interface{},
request mcp.ReadResourceRequest,
) mcp.JSONRPCMessage {
s.mu.RLock()
// First try direct resource handlers
if entry, ok := s.resources[request.Params.URI]; ok {
handler := entry.handler
s.mu.RUnlock()
contents, err := handler(ctx, request)
if err != nil {
return createErrorResponse(id, mcp.INTERNAL_ERROR, err.Error())
}
return createResponse(id, mcp.ReadResourceResult{Contents: contents})
}
// If no direct handler found, try matching against templates
var matchedHandler ResourceTemplateHandlerFunc
var matched bool
for uriTemplate, entry := range s.resourceTemplates {
if matchesTemplate(request.Params.URI, uriTemplate) {
matchedHandler = entry.handler
matched = true
break
}
}
s.mu.RUnlock()
if matched {
contents, err := matchedHandler(ctx, request)
if err != nil {
return createErrorResponse(id, mcp.INTERNAL_ERROR, err.Error())
}
return createResponse(
id,
mcp.ReadResourceResult{Contents: contents},
)
}
return createErrorResponse(
id,
mcp.INVALID_PARAMS,
fmt.Sprintf(
"No handler found for resource URI: %s",
request.Params.URI,
),
)
}
// matchesTemplate checks if a URI matches a URI template pattern
func matchesTemplate(uri string, template string) bool {
// Convert template into a regex pattern
pattern := template
// Replace {name} with ([^/]+)
pattern = regexp.QuoteMeta(pattern)
pattern = regexp.MustCompile(`\\\{[^}]+\\\}`).
ReplaceAllString(pattern, `([^/]+)`)
pattern = "^" + pattern + "$"
matched, _ := regexp.MatchString(pattern, uri)
return matched
}
func (s *MCPServer) handleListPrompts(
ctx context.Context,
id interface{},
request mcp.ListPromptsRequest,
) mcp.JSONRPCMessage {
s.mu.RLock()
prompts := make([]mcp.Prompt, 0, len(s.prompts))
for _, prompt := range s.prompts {
prompts = append(prompts, prompt)
}
s.mu.RUnlock()
result := mcp.ListPromptsResult{
Prompts: prompts,
}
if request.Params.Cursor != "" {
result.NextCursor = "" // Handle pagination if needed
}
return createResponse(id, result)
}
func (s *MCPServer) handleGetPrompt(
ctx context.Context,
id interface{},
request mcp.GetPromptRequest,
) mcp.JSONRPCMessage {
s.mu.RLock()
handler, ok := s.promptHandlers[request.Params.Name]
s.mu.RUnlock()
if !ok {
return createErrorResponse(
id,
mcp.INVALID_PARAMS,
fmt.Sprintf("Prompt not found: %s", request.Params.Name),
)
}
result, err := handler(ctx, request)
if err != nil {
return createErrorResponse(id, mcp.INTERNAL_ERROR, err.Error())
}
return createResponse(id, result)
}
func (s *MCPServer) handleListTools(
ctx context.Context,
id interface{},
request mcp.ListToolsRequest,
) mcp.JSONRPCMessage {
s.mu.RLock()
tools := make([]mcp.Tool, 0, len(s.tools))
// Get all tool names for consistent ordering
toolNames := make([]string, 0, len(s.tools))
for name := range s.tools {
toolNames = append(toolNames, name)
}
// Sort the tool names for consistent ordering
sort.Strings(toolNames)
// Add tools in sorted order
for _, name := range toolNames {
tools = append(tools, s.tools[name].Tool)
}
s.mu.RUnlock()
result := mcp.ListToolsResult{
Tools: tools,
}
if request.Params.Cursor != "" {
result.NextCursor = "" // Handle pagination if needed
}
return createResponse(id, result)
}
func (s *MCPServer) handleToolCall(
ctx context.Context,
id interface{},
request mcp.CallToolRequest,
) mcp.JSONRPCMessage {
s.mu.RLock()
tool, ok := s.tools[request.Params.Name]
s.mu.RUnlock()
if !ok {
return createErrorResponse(
id,
mcp.INVALID_PARAMS,
fmt.Sprintf("Tool not found: %s", request.Params.Name),
)
}
result, err := tool.Handler(ctx, request)
if err != nil {
return createErrorResponse(id, mcp.INTERNAL_ERROR, err.Error())
}
return createResponse(id, result)
}
func (s *MCPServer) handleNotification(
ctx context.Context,
notification mcp.JSONRPCNotification,
) mcp.JSONRPCMessage {
s.mu.RLock()
handler, ok := s.notificationHandlers[notification.Method]
s.mu.RUnlock()
if ok {
handler(ctx, notification)
}
return nil
}
func createResponse(id interface{}, result interface{}) mcp.JSONRPCMessage {
return mcp.JSONRPCResponse{
JSONRPC: mcp.JSONRPC_VERSION,
ID: id,
Result: result,
}
}
func createErrorResponse(
id interface{},
code int,
message string,
) mcp.JSONRPCMessage {
return mcp.JSONRPCError{
JSONRPC: mcp.JSONRPC_VERSION,
ID: id,
Error: struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}{
Code: code,
Message: message,
},
}
}

View File

@@ -0,0 +1,212 @@
package internal
import (
"encoding/json"
"fmt"
"net/http"
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"github.com/google/uuid"
"github.com/mark3labs/mcp-go/mcp"
)
// SSEServer implements a Server-Sent Events (SSE) based MCP server.
// It provides real-time communication capabilities over HTTP using the SSE protocol.
type SSEServer struct {
server *MCPServer
baseURL string
MessageEndpoint string
SSEEndpoint string
sessions map[string]bool
redisClient *RedisClient // Redis client for pub/sub
}
// Option defines a function type for configuring SSEServer
type Option func(*SSEServer)
// WithBaseURL sets the base URL for the SSE server
func WithBaseURL(baseURL string) Option {
return func(s *SSEServer) {
s.baseURL = baseURL
}
}
// WithMessageEndpoint sets the message endpoint path
func WithMessageEndpoint(endpoint string) Option {
return func(s *SSEServer) {
s.MessageEndpoint = endpoint
}
}
// WithSSEEndpoint sets the SSE endpoint path
func WithSSEEndpoint(endpoint string) Option {
return func(s *SSEServer) {
s.SSEEndpoint = endpoint
}
}
func WithRedisClient(redisClient *RedisClient) Option {
return func(s *SSEServer) {
s.redisClient = redisClient
}
}
// NewSSEServer creates a new SSE server instance with the given MCP server and options.
func NewSSEServer(server *MCPServer, opts ...Option) *SSEServer {
s := &SSEServer{
server: server,
SSEEndpoint: "/sse",
MessageEndpoint: "/message",
sessions: make(map[string]bool),
}
// Apply all options
for _, opt := range opts {
opt(s)
}
return s
}
// handleSSE handles incoming SSE connection requests.
// It sets up appropriate headers and creates a new session for the client.
func (s *SSEServer) HandleSSE(cb api.FilterCallbackHandler) {
sessionID := uuid.New().String()
s.sessions[sessionID] = true
// sessionStore, _ := json.Marshal(s.sessions)
// TODO: sse:sessions?
// s.redisClient.Set("sse:sessions", string(sessionStore), 0)
defer delete(s.sessions, sessionID)
channel := fmt.Sprintf("sse:%s", sessionID)
messageEndpoint := fmt.Sprintf(
"%s%s?sessionId=%s",
s.baseURL,
s.MessageEndpoint,
sessionID,
)
// go func() {
// for {
// select {
// case serverNotification := <-s.server.notifications:
// // Only forward notifications meant for this session
// if serverNotification.Context.SessionID == sessionID {
// eventData, err := json.Marshal(serverNotification.Notification)
// if err == nil {
// select {
// case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData):
// // Event queued successfully
// case <-session.done:
// return
// }
// }
// }
// case <-session.done:
// return
// case <-r.Context().Done():
// return
// }
// }
// }()
err := s.redisClient.Subscribe(channel, func(message string) {
defer cb.EncoderFilterCallbacks().RecoverPanic()
api.LogInfof("SSE Send message: %s", message)
cb.EncoderFilterCallbacks().InjectData([]byte(message))
})
if err != nil {
api.LogErrorf("Failed to subscribe to Redis channel: %v", err)
}
// Send the initial endpoint event
initialEvent := fmt.Sprintf("event: endpoint\ndata: %s\r\n\r\n", messageEndpoint)
s.redisClient.Publish(channel, initialEvent)
}
// handleMessage processes incoming JSON-RPC messages from clients and sends responses
// back through both the SSE connection and HTTP response.
func (s *SSEServer) HandleMessage(w http.ResponseWriter, r *http.Request, body json.RawMessage) {
if r.Method != http.MethodPost {
s.writeJSONRPCError(w, nil, mcp.INVALID_REQUEST, fmt.Sprintf("Method %s not allowed", r.Method))
return
}
sessionID := r.URL.Query().Get("sessionId")
// if sessionID == "" {
// s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Missing sessionId")
// return
// }
// Set the client context in the server before handling the message
ctx := s.server.WithContext(r.Context(), NotificationContext{
ClientID: sessionID,
SessionID: sessionID,
})
//TODO sessions
// _, ok := s.sessions.Load(sessionID)
// if !ok {
// s.writeJSONRPCError(w, nil, mcp.INVALID_PARAMS, "Invalid session ID")
// return
// }
//TODO
// // Parse message as raw JSON
// var rawMessage json.RawMessage
// if err := json.NewDecoder(r.Body).Decode(&rawMessage); err != nil {
// s.writeJSONRPCError(w, nil, mcp.PARSE_ERROR, "Parse error")
// return
// }
// Process message through MCPServer
response := s.server.HandleMessage(ctx, body)
// Only send response if there is one (not for notifications)
if response != nil {
eventData, _ := json.Marshal(response)
if sessionID != "" {
channel := fmt.Sprintf("sse:%s", sessionID)
publishErr := s.redisClient.Publish(channel, fmt.Sprintf("event: message\ndata: %s\n\n", eventData))
if publishErr != nil {
api.LogErrorf("Failed to publish message to Redis: %v", publishErr)
}
}
// Send HTTP response
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(response)
} else {
// For notifications, just send 202 Accepted with no body
w.WriteHeader(http.StatusAccepted)
}
}
// writeJSONRPCError writes a JSON-RPC error response with the given error details.
func (s *SSEServer) writeJSONRPCError(
w http.ResponseWriter,
id interface{},
code int,
message string,
) {
response := createErrorResponse(id, code, message)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(response)
}
// // ServeHTTP implements the http.Handler interface.
// func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// switch r.URL.Path {
// case s.sseEndpoint:
// s.handleSSE(w, r)
// case s.messageEndpoint:
// s.handleMessage(w, r)
// default:
// http.NotFound(w, r)
// }
// }

View File

@@ -0,0 +1,99 @@
package gorm
import (
"fmt"
"log"
"os"
"gorm.io/driver/clickhouse"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
// DBClient is a struct to handle PostgreSQL connections and operations
type DBClient struct {
db *gorm.DB
}
// NewDBClient creates a new DBClient instance and establishes a connection to the PostgreSQL database
func NewDBClient(dsn string, dbType string) (*DBClient, error) {
// Configure GORM logger
newLogger := logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
logger.Config{
SlowThreshold: 0, // Slow SQL threshold
LogLevel: logger.Info, // Log level
IgnoreRecordNotFoundError: false, // Ignore ErrRecordNotFound error for logger
Colorful: true, // Disable color
},
)
var db *gorm.DB
var err error
if dbType == "postgres" {
db, err = gorm.Open(postgres.Open(dsn), &gorm.Config{
Logger: newLogger,
})
} else if dbType == "clickhouse" {
db, err = gorm.Open(clickhouse.Open(dsn), &gorm.Config{})
} else {
return nil, fmt.Errorf("unsupported database type")
}
// Connect to the database
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}
return &DBClient{db: db}, nil
}
// ExecuteSQL executes a raw SQL query and returns the result as a slice of maps
func (c *DBClient) ExecuteSQL(query string, args ...interface{}) ([]map[string]interface{}, error) {
rows, err := c.db.Raw(query, args...).Rows()
if err != nil {
return nil, fmt.Errorf("failed to execute SQL query: %w", err)
}
defer rows.Close()
// Get column names
columns, err := rows.Columns()
if err != nil {
return nil, fmt.Errorf("failed to get columns: %w", err)
}
// Prepare a slice to hold the results
var results []map[string]interface{}
// Iterate over the rows
for rows.Next() {
// Create a slice of interface{}'s to represent each column,
// and a second slice to contain pointers to each item in the columns slice.
columnsData := make([]interface{}, len(columns))
columnsPointers := make([]interface{}, len(columns))
for i := range columnsData {
columnsPointers[i] = &columnsData[i]
}
// Scan the result into the column pointers...
if err := rows.Scan(columnsPointers...); err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
}
// Create a map to hold the column name and value
rowMap := make(map[string]interface{})
for i, colName := range columns {
val := columnsData[i]
b, ok := val.([]byte)
if ok {
rowMap[colName] = string(b)
} else {
rowMap[colName] = val
}
}
// Append the map to the results slice
results = append(results, rowMap)
}
return results, nil
}

View File

@@ -0,0 +1,132 @@
package gorm
import (
"context"
"encoding/json"
"fmt"
"github.com/envoyproxy/envoy/examples/golang-http/simple/internal"
"github.com/mark3labs/mcp-go/mcp"
)
const favoriteFilesTemplate = `
WITH current_files AS (
SELECT path
FROM (
SELECT
old_path AS path,
max(time) AS last_time,
2 AS change_type
FROM git.file_changes
GROUP BY old_path
UNION ALL
SELECT
path,
max(time) AS last_time,
argMax(change_type, time) AS change_type
FROM git.file_changes
GROUP BY path
)
GROUP BY path
HAVING (argMax(change_type, last_time) != 2) AND (NOT match(path, '(^dbms/)|(^libs/)|(^tests/testflows/)|(^programs/server/store/)'))
ORDER BY path ASC
)
SELECT
path,
count() AS c
FROM git.file_changes
WHERE (author = '%s') AND (path IN (current_files))
GROUP BY path
ORDER BY c DESC
LIMIT 10`
// HandleQueryTool handles SQL query execution
func HandleQueryTool(dbClient *DBClient) internal.ToolHandlerFunc {
return func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
arguments := request.Params.Arguments
message, ok := arguments["sql"].(string)
if !ok {
return nil, fmt.Errorf("invalid message argument")
}
results, err := dbClient.ExecuteSQL(message)
if err != nil {
return nil, fmt.Errorf("failed to execute SQL query: %w", err)
}
jsonData, err := json.Marshal(results)
if err != nil {
return nil, fmt.Errorf("failed to marshal SQL results: %w", err)
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: string(jsonData),
},
},
}, nil
}
}
// HandleFavoriteTool handles author's favorite files query
func HandleFavoriteTool(dbClient *DBClient) internal.ToolHandlerFunc {
return func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
arguments := request.Params.Arguments
author, ok := arguments["author"].(string)
if !ok {
return nil, fmt.Errorf("invalid author argument")
}
query := fmt.Sprintf(favoriteFilesTemplate, author)
results, err := dbClient.ExecuteSQL(query)
if err != nil {
return nil, fmt.Errorf("failed to execute SQL query: %w", err)
}
jsonData, err := json.Marshal(results)
if err != nil {
return nil, fmt.Errorf("failed to marshal SQL results: %w", err)
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: string(jsonData),
},
},
}, nil
}
}
// GetQueryToolSchema returns the schema for query tool
func GetQueryToolSchema() json.RawMessage {
return json.RawMessage(`
{
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "The sql query to execute"
}
}
}
`)
}
// GetFavoriteToolSchema returns the schema for favorite files tool
func GetFavoriteToolSchema() json.RawMessage {
return json.RawMessage(`
{
"type": "object",
"properties": {
"author": {
"type": "string",
"description": "the author name"
}
}
}
`)
}

View File

@@ -0,0 +1,39 @@
# Copyright (c) 2022 Alibaba Group Holding Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env bash
set -euo pipefail
INNER_GO_FILTER_NAME=${GO_FILTER_NAME-""}
# OUTPUT_PACKAGE_DIR=${OUTPUT_PACKAGE_DIR:-"../external/package/"}
cd ./plugins/golang-filter
if [ ! -n "$INNER_GO_FILTER_NAME" ]; then
GO_FILTERS_DIR=$(pwd)
echo "🚀 Build all Go Filters under folder of $GO_FILTERS_DIR"
for file in `ls $GO_FILTERS_DIR`
do
if [ -d $GO_FILTERS_DIR/$file ]; then
name=${file##*/}
echo "🚀 Build Go Filter: $name"
GO_FILTER_NAME=${name} make build
# cp ${GO_FILTERS_DIR}/${file}/${name}.so ${OUTPUT_PACKAGE_DIR}
fi
done
else
echo "🚀 Build Go Filter: $INNER_GO_FILTER_NAME"
GO_FILTER_NAME=${INNER_GO_FILTER_NAME} make build
fi