Add remote mcp server sdk (#1946)

This commit is contained in:
澄潭
2025-03-24 22:11:45 +08:00
committed by GitHub
parent f5d20b72e0
commit d9f16f7d5e
17 changed files with 1337 additions and 13 deletions

View File

@@ -70,6 +70,11 @@ type HttpContext interface {
SetRequestBodyBufferLimit(byteSize uint32)
// Note that this parameter affects the gateway's memory usage! Support setting a maximum buffer size for each response body individually in response phase.
SetResponseBodyBufferLimit(byteSize uint32)
// Make a request to the target service of the current route using the specified URL and header.
RouteCall(method, url string, headers [][2]string, body []byte, callback ResponseCallback, timeoutMillisecond ...uint32) error
OnMCPToolCallSuccess(content []map[string]any)
OnMCPToolCallError(err error)
SendMCPToolTextResult(result string)
}
type oldParseConfigFunc[PluginConfig any] func(json gjson.Result, config *PluginConfig, log log.Log) error
@@ -100,6 +105,15 @@ type CommonVmCtx[PluginConfig any] struct {
onHttpResponseBody onHttpBodyFunc[PluginConfig]
onHttpStreamingResponseBody onHttpStreamingBodyFunc[PluginConfig]
onHttpStreamDone onHttpStreamDoneFunc[PluginConfig]
isJsonRpcSever bool
handleJsonRpcMethod bool
jsonRpcMethodHandlers MethodHandlers[PluginConfig]
mcpTools MCPTools[PluginConfig]
onMcpToolRequest mcpToolRequestFunc[PluginConfig]
onMcpToolResponse mcpToolResponseFunc[PluginConfig]
onJsonRpcError jsonRpcErrorFunc[PluginConfig]
jsonRpcRequestHandler JsonRpcRequestHandler[PluginConfig]
jsonRpcResponseHandler JsonRpcResponseHandler[PluginConfig]
}
type TickFuncEntry struct {
@@ -393,8 +407,10 @@ func NewCommonVmCtx[PluginConfig any](pluginName string, options ...CtxOption[Pl
func NewCommonVmCtxWithOptions[PluginConfig any](pluginName string, options ...CtxOption[PluginConfig]) *CommonVmCtx[PluginConfig] {
ctx := &CommonVmCtx[PluginConfig]{
pluginName: pluginName,
hasCustomConfig: true,
pluginName: pluginName,
hasCustomConfig: true,
jsonRpcMethodHandlers: make(MethodHandlers[PluginConfig]),
mcpTools: make(MCPTools[PluginConfig]),
}
for _, opt := range options {
opt.Apply(ctx)
@@ -403,7 +419,6 @@ func NewCommonVmCtxWithOptions[PluginConfig any](pluginName string, options ...C
var config PluginConfig
if unsafe.Sizeof(config) != 0 {
msg := "the `parseConfig` is missing in NewCommonVmCtx's arguments"
ctx.log.Critical(msg)
panic(msg)
}
ctx.hasCustomConfig = false
@@ -495,10 +510,12 @@ func (ctx *CommonPluginCtx[PluginConfig]) NewHttpContext(contextID uint32) types
userContext: map[string]interface{}{},
userAttribute: map[string]interface{}{},
}
if ctx.vm.onHttpRequestBody != nil || ctx.vm.onHttpStreamingRequestBody != nil {
httpCtx.registerMCPTools(ctx.vm.mcpTools)
httpCtx.registerMCPToolProcessor()
if ctx.vm.onHttpRequestBody != nil || ctx.vm.onHttpStreamingRequestBody != nil || len(ctx.vm.jsonRpcMethodHandlers) > 0 || ctx.vm.jsonRpcRequestHandler != nil {
httpCtx.needRequestBody = true
}
if ctx.vm.onHttpResponseBody != nil || ctx.vm.onHttpStreamingResponseBody != nil {
if ctx.vm.onHttpResponseBody != nil || ctx.vm.onHttpStreamingResponseBody != nil || ctx.vm.jsonRpcResponseHandler != nil {
httpCtx.needResponseBody = true
}
if ctx.vm.onHttpStreamingRequestBody != nil {
@@ -507,7 +524,6 @@ func (ctx *CommonPluginCtx[PluginConfig]) NewHttpContext(contextID uint32) types
if ctx.vm.onHttpStreamingResponseBody != nil {
httpCtx.streamingResponseBody = true
}
return httpCtx
}
@@ -524,6 +540,18 @@ type CommonHttpCtx[PluginConfig any] struct {
contextID uint32
userContext map[string]interface{}
userAttribute map[string]interface{}
pendingCall int
}
func (ctx *CommonHttpCtx[PluginConfig]) HttpCallStart(uint32) {
ctx.pendingCall++
}
func (ctx *CommonHttpCtx[PluginConfig]) HttpCallEnd(uint32) {
if ctx.pendingCall == 0 {
return
}
ctx.pendingCall--
}
func (ctx *CommonHttpCtx[PluginConfig]) SetContext(key string, value interface{}) {
@@ -599,6 +627,13 @@ func (ctx *CommonHttpCtx[PluginConfig]) WriteUserAttributeToTrace() error {
return nil
}
func (ctx *CommonHttpCtx[PluginConfig]) GetIntContext(key string, defaultValue int) int {
if b, ok := ctx.userContext[key].(int); ok {
return b
}
return defaultValue
}
func (ctx *CommonHttpCtx[PluginConfig]) GetBoolContext(key string, defaultValue bool) bool {
if b, ok := ctx.userContext[key].(bool); ok {
return b
@@ -686,6 +721,9 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestHeaders(numHeaders int, end
if IsBinaryRequestBody() {
ctx.needRequestBody = false
}
if ctx.plugin.vm.isJsonRpcSever && HasRequestBody() {
return types.HeaderStopIteration
}
if ctx.plugin.vm.onHttpRequestHeaders == nil {
return types.ActionContinue
}
@@ -709,7 +747,9 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestBody(bodySize int, endOfStr
}
return types.ActionContinue
}
if ctx.plugin.vm.onHttpRequestBody != nil {
if ctx.plugin.vm.onHttpRequestBody != nil ||
len(ctx.plugin.vm.jsonRpcMethodHandlers) > 0 ||
ctx.plugin.vm.jsonRpcRequestHandler != nil {
ctx.requestBodySize += bodySize
if !endOfStream {
return types.ActionPause
@@ -719,7 +759,14 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestBody(bodySize int, endOfStr
ctx.plugin.vm.log.Warnf("get request body failed: %v", err)
return types.ActionContinue
}
return ctx.plugin.vm.onHttpRequestBody(ctx, *ctx.config, body)
if ctx.plugin.vm.onHttpRequestBody != nil {
return ctx.plugin.vm.onHttpRequestBody(ctx, *ctx.config, body)
}
if len(ctx.plugin.vm.jsonRpcMethodHandlers) > 0 {
return ctx.HandleJsonRpcMethod(ctx, *ctx.config, body, ctx.plugin.vm.jsonRpcMethodHandlers)
}
// ctx.plugin.vm.jsonRpcRequestHandler not nil
return ctx.HandleJsonRpcRequest(ctx, *ctx.config, body, ctx.plugin.vm.jsonRpcRequestHandler)
}
return types.ActionContinue
}
@@ -755,7 +802,7 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpResponseBody(bodySize int, endOfSt
}
return types.ActionContinue
}
if ctx.plugin.vm.onHttpResponseBody != nil {
if ctx.plugin.vm.onHttpResponseBody != nil || ctx.plugin.vm.jsonRpcResponseHandler != nil {
ctx.responseBodySize += bodySize
if !endOfStream {
return types.ActionPause
@@ -765,7 +812,11 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpResponseBody(bodySize int, endOfSt
ctx.plugin.vm.log.Warnf("get response body failed: %v", err)
return types.ActionContinue
}
return ctx.plugin.vm.onHttpResponseBody(ctx, *ctx.config, body)
if ctx.plugin.vm.onHttpResponseBody != nil {
return ctx.plugin.vm.onHttpResponseBody(ctx, *ctx.config, body)
}
// ctx.plugin.vm.jsonRpcResponseHandler not nil
return ctx.HandleJsonRpcResponse(ctx, *ctx.config, body, ctx.plugin.vm.jsonRpcResponseHandler)
}
return types.ActionContinue
}
@@ -779,3 +830,15 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpStreamDone() {
}
ctx.plugin.vm.onHttpStreamDone(ctx, *ctx.config)
}
func (ctx *CommonHttpCtx[PluginConfig]) RouteCall(method, url string, headers [][2]string, body []byte, callback ResponseCallback, timeoutMillisecond ...uint32) error {
// Since the HttpCall here is a substitute for route invocation, the default timeout is slightly longer, at 1 minute.
var timeout uint32 = 60000
if len(timeoutMillisecond) > 0 {
timeout = timeoutMillisecond[0]
}
cluster := RouteCluster{
BaseCluster: BaseCluster{notify: ctx},
}
return HttpCall(cluster, method, url, headers, body, callback, timeout)
}