From 6eeef07621acf0bc30b6f0269eef66d6dd554227 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BE=84=E6=BD=AD?= Date: Tue, 25 Mar 2025 11:55:14 +0800 Subject: [PATCH] revert wrapper changes (#1948) --- .../wasm-go/pkg/wrapper/cluster_wrapper.go | 17 -- plugins/wasm-go/pkg/wrapper/http_wrapper.go | 17 +- .../wasm-go/pkg/wrapper/jsonrpc_wrapper.go | 123 ---------- plugins/wasm-go/pkg/wrapper/mcp_wrapper.go | 221 ------------------ plugins/wasm-go/pkg/wrapper/plugin_wrapper.go | 83 +------ 5 files changed, 13 insertions(+), 448 deletions(-) delete mode 100644 plugins/wasm-go/pkg/wrapper/jsonrpc_wrapper.go delete mode 100644 plugins/wasm-go/pkg/wrapper/mcp_wrapper.go diff --git a/plugins/wasm-go/pkg/wrapper/cluster_wrapper.go b/plugins/wasm-go/pkg/wrapper/cluster_wrapper.go index 7cc41f6bd..e797394b5 100644 --- a/plugins/wasm-go/pkg/wrapper/cluster_wrapper.go +++ b/plugins/wasm-go/pkg/wrapper/cluster_wrapper.go @@ -24,19 +24,9 @@ import ( type Cluster interface { ClusterName() string HostName() string - HttpCallNotify() HttpCallNotify -} - -type BaseCluster struct { - notify HttpCallNotify -} - -func (base BaseCluster) HttpCallNotify() HttpCallNotify { - return base.notify } type RouteCluster struct { - BaseCluster Host string } @@ -56,7 +46,6 @@ func (c RouteCluster) HostName() string { } type TargetCluster struct { - BaseCluster Host string Cluster string } @@ -70,7 +59,6 @@ func (c TargetCluster) HostName() string { } type K8sCluster struct { - BaseCluster ServiceName string Namespace string Port int64 @@ -95,7 +83,6 @@ func (c K8sCluster) HostName() string { } type NacosCluster struct { - BaseCluster ServiceName string // use DEFAULT-GROUP by default Group string @@ -128,7 +115,6 @@ func (c NacosCluster) HostName() string { } type StaticIpCluster struct { - BaseCluster ServiceName string Port int64 Host string @@ -146,7 +132,6 @@ func (c StaticIpCluster) HostName() string { } type DnsCluster struct { - BaseCluster ServiceName string Domain string Port int64 @@ -161,7 +146,6 @@ func (c DnsCluster) HostName() string { } type ConsulCluster struct { - BaseCluster ServiceName string Datacenter string Port int64 @@ -182,7 +166,6 @@ func (c ConsulCluster) HostName() string { } type FQDNCluster struct { - BaseCluster FQDN string Host string Port int64 diff --git a/plugins/wasm-go/pkg/wrapper/http_wrapper.go b/plugins/wasm-go/pkg/wrapper/http_wrapper.go index 8db6bec55..36b4fe76c 100644 --- a/plugins/wasm-go/pkg/wrapper/http_wrapper.go +++ b/plugins/wasm-go/pkg/wrapper/http_wrapper.go @@ -25,11 +25,6 @@ import ( "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" ) -type HttpCallNotify interface { - HttpCallStart(uint32) - HttpCallEnd(uint32) -} - type ResponseCallback func(statusCode int, responseHeaders http.Header, responseBody []byte) type HttpClient interface { @@ -113,9 +108,7 @@ func HttpCall(cluster Cluster, method, rawURL string, headers [][2]string, body } headers = append(headers, [2]string{":method", method}, [2]string{":path", path}, [2]string{":authority", authority}) requestID := uuid.New().String() - httpCallNotify := cluster.HttpCallNotify() - var callID uint32 - callID, err = proxywasm.DispatchHttpCall(cluster.ClusterName(), headers, body, nil, timeout, func(numHeaders, bodySize, numTrailers int) { + _, err = proxywasm.DispatchHttpCall(cluster.ClusterName(), headers, body, nil, timeout, func(numHeaders, bodySize, numTrailers int) { respBody, err := proxywasm.GetHttpCallResponseBody(0, bodySize) if err != nil { proxywasm.LogCriticalf("failed to get response body: %v", err) @@ -142,12 +135,8 @@ func HttpCall(cluster Cluster, method, rawURL string, headers [][2]string, body proxywasm.LogDebugf("http call end, id: %s, code: %d, normal: %t, body: %s", requestID, code, normalResponse, respBody) callback(code, headers, respBody) - httpCallNotify.HttpCallEnd(callID) }) - if err == nil { - httpCallNotify.HttpCallStart(callID) - proxywasm.LogDebugf("http call start, id: %s, cluster: %s, method: %s, url: %s, headers: %#v, body: %s, timeout: %d", - requestID, cluster.ClusterName(), method, rawURL, headers, body, timeout) - } + proxywasm.LogDebugf("http call start, id: %s, cluster: %s, method: %s, url: %s, headers: %#v, body: %s, timeout: %d", + requestID, cluster.ClusterName(), method, rawURL, headers, body, timeout) return err } diff --git a/plugins/wasm-go/pkg/wrapper/jsonrpc_wrapper.go b/plugins/wasm-go/pkg/wrapper/jsonrpc_wrapper.go deleted file mode 100644 index 770301876..000000000 --- a/plugins/wasm-go/pkg/wrapper/jsonrpc_wrapper.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright (c) 2022 Alibaba Group Holding Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package wrapper - -import ( - "fmt" - - "github.com/alibaba/higress/plugins/wasm-go/pkg/log" - "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" - "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" - "github.com/tidwall/gjson" - "github.com/tidwall/sjson" -) - -const ( - CtxJsonRpcID = "jsonRpcID" - JError = "error" - JCode = "code" - JMessage = "message" - JResult = "result" - - ErrParseError = -32700 - ErrInvalidRequest = -32600 - ErrMethodNotFound = -32601 - ErrInvalidParams = -32602 - ErrInternalError = -32603 -) - -type JsonRpcRequestHandler[PluginConfig any] func(context HttpContext, config PluginConfig, id int64, method string, params gjson.Result) types.Action - -type JsonRpcResponseHandler[PluginConfig any] func(context HttpContext, config PluginConfig, id int64, result gjson.Result, error gjson.Result) types.Action - -type JsonRpcMethodHandler[PluginConfig any] func(context HttpContext, config PluginConfig, id int64, params gjson.Result) error - -type MethodHandlers[PluginConfig any] map[string]JsonRpcMethodHandler[PluginConfig] - -func sendJsonRpcResponse(id int64, extras map[string]any, debugInfo string) { - body := []byte(`{"jsonrpc": "2.0"}`) - body, _ = sjson.SetBytes(body, "id", id) - for key, value := range extras { - body, _ = sjson.SetBytes(body, key, value) - } - proxywasm.SendHttpResponseWithDetail(200, debugInfo, [][2]string{{"Content-Type", "application/json; charset=utf-8"}}, body, -1) -} - -func (ctx *CommonHttpCtx[PluginConfig]) OnJsonRpcResponseSuccess(result map[string]any) { - var ( - id int64 - ok bool - ) - if id, ok = ctx.userContext[CtxJsonRpcID].(int64); !ok { - proxywasm.SendHttpResponseWithDetail(500, "not_found_json_rpc_id", nil, []byte("not found json rpc id"), -1) - return - } - sendJsonRpcResponse(id, map[string]any{JResult: result}, "json_rpc_success") -} - -func (ctx *CommonHttpCtx[PluginConfig]) OnJsonRpcResponseError(err error, code ...int) { - var ( - id int64 - ok bool - ) - if id, ok = ctx.userContext[CtxJsonRpcID].(int64); !ok { - proxywasm.SendHttpResponseWithDetail(500, "not_found_json_rpc_id", nil, []byte("not found json rpc id"), -1) - return - } - errorCode := ErrInternalError - if len(code) > 0 { - errorCode = code[0] - } - sendJsonRpcResponse(id, map[string]any{JError: map[string]any{ - JMessage: err.Error(), - JCode: errorCode, - }}, "json_rpc_error") -} - -func (ctx *CommonHttpCtx[PluginConfig]) HandleJsonRpcMethod(context HttpContext, config PluginConfig, body []byte, handles MethodHandlers[PluginConfig]) types.Action { - id := gjson.GetBytes(body, "id").Int() - ctx.userContext[CtxJsonRpcID] = id - method := gjson.GetBytes(body, "method").String() - params := gjson.GetBytes(body, "params") - if handle, ok := handles[method]; ok { - log.Debugf("json rpc call id[%d] method[%s] with params[%s]", id, method, params.Raw) - err := handle(context, config, id, params) - if err != nil { - ctx.OnJsonRpcResponseError(err) - return types.ActionContinue - } - // Waiting for the response - return types.ActionPause - } - ctx.OnJsonRpcResponseError(fmt.Errorf("method not found:%s", method), ErrMethodNotFound) - return types.ActionContinue -} - -func (ctx *CommonHttpCtx[PluginConfig]) HandleJsonRpcRequest(context HttpContext, config PluginConfig, body []byte, handle JsonRpcRequestHandler[PluginConfig]) types.Action { - id := gjson.GetBytes(body, "id").Int() - ctx.userContext[CtxJsonRpcID] = id - method := gjson.GetBytes(body, "method").String() - params := gjson.GetBytes(body, "params") - log.Debugf("json rpc call id[%d] method[%s] with params[%s]", id, method, params.Raw) - return handle(context, config, id, method, params) -} - -func (ctx *CommonHttpCtx[PluginConfig]) HandleJsonRpcResponse(context HttpContext, config PluginConfig, body []byte, handle JsonRpcResponseHandler[PluginConfig]) types.Action { - id := gjson.GetBytes(body, "id").Int() - error := gjson.GetBytes(body, "error") - result := gjson.GetBytes(body, "result") - log.Debugf("json rpc response id[%d] error[%s] result[%s]", id, error.Raw, result.Raw) - return handle(context, config, id, result, error) -} diff --git a/plugins/wasm-go/pkg/wrapper/mcp_wrapper.go b/plugins/wasm-go/pkg/wrapper/mcp_wrapper.go deleted file mode 100644 index 8c7b08e94..000000000 --- a/plugins/wasm-go/pkg/wrapper/mcp_wrapper.go +++ /dev/null @@ -1,221 +0,0 @@ -// Copyright (c) 2022 Alibaba Group Holding Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package wrapper - -import ( - "encoding/json" - "errors" - "fmt" - "reflect" - - "github.com/alibaba/higress/plugins/wasm-go/pkg/log" - "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" - "github.com/invopop/jsonschema" - "github.com/tidwall/gjson" -) - -type MCPTool[PluginConfig any] interface { - Create(params []byte) MCPTool[PluginConfig] - Call(context HttpContext, config PluginConfig) error - Description() string - InputSchema() map[string]any -} - -type MCPTools[PluginConfig any] map[string]MCPTool[PluginConfig] - -type addMCPToolOption[PluginConfig any] struct { - name string - tool MCPTool[PluginConfig] -} - -func AddMCPTool[PluginConfig any](name string, tool MCPTool[PluginConfig]) CtxOption[PluginConfig] { - return &addMCPToolOption[PluginConfig]{ - name: name, - tool: tool, - } -} - -func (o *addMCPToolOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { - ctx.isJsonRpcSever = true - ctx.handleJsonRpcMethod = true - if _, exist := ctx.mcpTools[o.name]; exist { - panic(fmt.Sprintf("Conflict! There is a tool with the same name:%s", - o.name)) - } - ctx.mcpTools[o.name] = o.tool -} - -func (ctx *CommonHttpCtx[PluginConfig]) OnMCPResponseSuccess(result map[string]any) { - ctx.OnJsonRpcResponseSuccess(result) - // TODO: support pub to redis when use POST + SSE -} - -func (ctx *CommonHttpCtx[PluginConfig]) OnMCPResponseError(err error, code ...int) { - ctx.OnJsonRpcResponseError(err, code...) - // TODO: support pub to redis when use POST + SSE -} - -func (ctx *CommonHttpCtx[PluginConfig]) OnMCPToolCallSuccess(content []map[string]any) { - ctx.OnMCPResponseSuccess(map[string]any{ - "content": content, - "isError": false, - }) -} - -func (ctx *CommonHttpCtx[PluginConfig]) OnMCPToolCallError(err error) { - ctx.OnMCPResponseSuccess(map[string]any{ - "content": []map[string]any{ - { - "type": "text", - "text": err.Error(), - }, - }, - "isError": true, - }) -} - -func (ctx *CommonHttpCtx[PluginConfig]) SendMCPToolTextResult(result string) { - ctx.OnMCPToolCallSuccess([]map[string]any{ - { - "type": "text", - "text": result, - }, - }) -} - -func (ctx *CommonHttpCtx[PluginConfig]) registerMCPTools(mcpTools MCPTools[PluginConfig]) { - if !ctx.plugin.vm.isJsonRpcSever { - return - } - if !ctx.plugin.vm.handleJsonRpcMethod { - return - } - ctx.plugin.vm.jsonRpcMethodHandlers["tools/list"] = func(context HttpContext, config PluginConfig, id int64, params gjson.Result) error { - var tools []map[string]any - for name, tool := range mcpTools { - tools = append(tools, map[string]any{ - "name": name, - "description": tool.Description(), - "inputSchema": tool.InputSchema(), - }) - } - ctx.OnMCPResponseSuccess(map[string]any{ - "tools": tools, - "nextCursor": "", - }) - return nil - } - ctx.plugin.vm.jsonRpcMethodHandlers["tools/call"] = func(context HttpContext, config PluginConfig, id int64, params gjson.Result) error { - name := params.Get("name").String() - args := params.Get("arguments") - if tool, ok := mcpTools[name]; ok { - log.Debugf("mcp call tool[%s] with arguments[%s]", name, args.Raw) - toolInstance := tool.Create([]byte(args.Raw)) - err := toolInstance.Call(context, config) - // TODO: validate the json schema through github.com/kaptinlin/jsonschema - if err != nil { - ctx.OnMCPToolCallError(err) - return nil - } - return nil - } - ctx.OnMCPResponseError(errors.New("Unknown tool: invalid_tool_name"), ErrInvalidParams) - return nil - } -} - -type mcpToolRequestFunc[PluginConfig any] func(context HttpContext, config PluginConfig, toolName string, toolArgs gjson.Result) types.Action -type mcpToolResponseFunc[PluginConfig any] func(context HttpContext, config PluginConfig, isError bool, content gjson.Result) types.Action -type jsonRpcErrorFunc[PluginConfig any] func(context HttpContext, config PluginConfig, errorCode int64, errorMessage string) types.Action - -type mcpToolRequestOption[PluginConfig any] struct { - f mcpToolRequestFunc[PluginConfig] -} - -func OnMCPToolRequest[PluginConfig any](f mcpToolRequestFunc[PluginConfig]) CtxOption[PluginConfig] { - return &mcpToolRequestOption[PluginConfig]{f} -} - -func (o *mcpToolRequestOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { - ctx.isJsonRpcSever = true - ctx.onMcpToolRequest = o.f -} - -type mcpToolResponseOption[PluginConfig any] struct { - f mcpToolResponseFunc[PluginConfig] -} - -func OnMCPToolResponse[PluginConfig any](f mcpToolResponseFunc[PluginConfig]) CtxOption[PluginConfig] { - return &mcpToolResponseOption[PluginConfig]{f} -} - -func (o *mcpToolResponseOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { - ctx.isJsonRpcSever = true - ctx.onMcpToolResponse = o.f -} - -type jsonRpcErrorOption[PluginConfig any] struct { - f jsonRpcErrorFunc[PluginConfig] -} - -func OnJsonRpcError[PluginConfig any](f jsonRpcErrorFunc[PluginConfig]) CtxOption[PluginConfig] { - return &jsonRpcErrorOption[PluginConfig]{f} -} - -func (o *jsonRpcErrorOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) { - ctx.isJsonRpcSever = true - ctx.onJsonRpcError = o.f -} - -func (ctx *CommonHttpCtx[PluginConfig]) registerMCPToolProcessor() { - if !ctx.plugin.vm.isJsonRpcSever { - return - } - if ctx.plugin.vm.handleJsonRpcMethod { - return - } - if ctx.plugin.vm.onMcpToolRequest != nil { - ctx.plugin.vm.jsonRpcRequestHandler = func(context HttpContext, config PluginConfig, id int64, method string, params gjson.Result) types.Action { - toolName := params.Get("name").String() - toolArgs := params.Get("arguments") - return ctx.plugin.vm.onMcpToolRequest(context, config, toolName, toolArgs) - } - } - if ctx.plugin.vm.onMcpToolResponse != nil { - ctx.plugin.vm.jsonRpcResponseHandler = func(context HttpContext, config PluginConfig, id int64, result, error gjson.Result) types.Action { - if result.Exists() { - isError := result.Get("isError").Bool() - content := result.Get("content") - return ctx.plugin.vm.onMcpToolResponse(context, config, isError, content) - } - if error.Exists() && ctx.plugin.vm.onJsonRpcError != nil { - return ctx.plugin.vm.onJsonRpcError(context, config, error.Get("code").Int(), error.Get("message").String()) - } - return types.ActionContinue - } - } -} - -func ToInputSchema(v any) map[string]any { - t := reflect.TypeOf(v) - if t.Kind() == reflect.Ptr { - t = t.Elem() - } - inputSchema := jsonschema.Reflect(v).Definitions[t.Name()] - inputSchemaBytes, _ := json.Marshal(inputSchema) - var result map[string]any - json.Unmarshal(inputSchemaBytes, &result) - return result -} diff --git a/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go b/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go index f7100a5f3..c591366c5 100644 --- a/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go +++ b/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go @@ -70,11 +70,6 @@ type HttpContext interface { SetRequestBodyBufferLimit(byteSize uint32) // Note that this parameter affects the gateway's memory usage! Support setting a maximum buffer size for each response body individually in response phase. SetResponseBodyBufferLimit(byteSize uint32) - // Make a request to the target service of the current route using the specified URL and header. - RouteCall(method, url string, headers [][2]string, body []byte, callback ResponseCallback, timeoutMillisecond ...uint32) error - OnMCPToolCallSuccess(content []map[string]any) - OnMCPToolCallError(err error) - SendMCPToolTextResult(result string) } type oldParseConfigFunc[PluginConfig any] func(json gjson.Result, config *PluginConfig, log log.Log) error @@ -105,15 +100,6 @@ type CommonVmCtx[PluginConfig any] struct { onHttpResponseBody onHttpBodyFunc[PluginConfig] onHttpStreamingResponseBody onHttpStreamingBodyFunc[PluginConfig] onHttpStreamDone onHttpStreamDoneFunc[PluginConfig] - isJsonRpcSever bool - handleJsonRpcMethod bool - jsonRpcMethodHandlers MethodHandlers[PluginConfig] - mcpTools MCPTools[PluginConfig] - onMcpToolRequest mcpToolRequestFunc[PluginConfig] - onMcpToolResponse mcpToolResponseFunc[PluginConfig] - onJsonRpcError jsonRpcErrorFunc[PluginConfig] - jsonRpcRequestHandler JsonRpcRequestHandler[PluginConfig] - jsonRpcResponseHandler JsonRpcResponseHandler[PluginConfig] } type TickFuncEntry struct { @@ -407,10 +393,8 @@ func NewCommonVmCtx[PluginConfig any](pluginName string, options ...CtxOption[Pl func NewCommonVmCtxWithOptions[PluginConfig any](pluginName string, options ...CtxOption[PluginConfig]) *CommonVmCtx[PluginConfig] { ctx := &CommonVmCtx[PluginConfig]{ - pluginName: pluginName, - hasCustomConfig: true, - jsonRpcMethodHandlers: make(MethodHandlers[PluginConfig]), - mcpTools: make(MCPTools[PluginConfig]), + pluginName: pluginName, + hasCustomConfig: true, } for _, opt := range options { opt.Apply(ctx) @@ -419,6 +403,7 @@ func NewCommonVmCtxWithOptions[PluginConfig any](pluginName string, options ...C var config PluginConfig if unsafe.Sizeof(config) != 0 { msg := "the `parseConfig` is missing in NewCommonVmCtx's arguments" + ctx.log.Critical(msg) panic(msg) } ctx.hasCustomConfig = false @@ -510,12 +495,10 @@ func (ctx *CommonPluginCtx[PluginConfig]) NewHttpContext(contextID uint32) types userContext: map[string]interface{}{}, userAttribute: map[string]interface{}{}, } - httpCtx.registerMCPTools(ctx.vm.mcpTools) - httpCtx.registerMCPToolProcessor() - if ctx.vm.onHttpRequestBody != nil || ctx.vm.onHttpStreamingRequestBody != nil || len(ctx.vm.jsonRpcMethodHandlers) > 0 || ctx.vm.jsonRpcRequestHandler != nil { + if ctx.vm.onHttpRequestBody != nil || ctx.vm.onHttpStreamingRequestBody != nil { httpCtx.needRequestBody = true } - if ctx.vm.onHttpResponseBody != nil || ctx.vm.onHttpStreamingResponseBody != nil || ctx.vm.jsonRpcResponseHandler != nil { + if ctx.vm.onHttpResponseBody != nil || ctx.vm.onHttpStreamingResponseBody != nil { httpCtx.needResponseBody = true } if ctx.vm.onHttpStreamingRequestBody != nil { @@ -524,6 +507,7 @@ func (ctx *CommonPluginCtx[PluginConfig]) NewHttpContext(contextID uint32) types if ctx.vm.onHttpStreamingResponseBody != nil { httpCtx.streamingResponseBody = true } + return httpCtx } @@ -540,18 +524,6 @@ type CommonHttpCtx[PluginConfig any] struct { contextID uint32 userContext map[string]interface{} userAttribute map[string]interface{} - pendingCall int -} - -func (ctx *CommonHttpCtx[PluginConfig]) HttpCallStart(uint32) { - ctx.pendingCall++ -} - -func (ctx *CommonHttpCtx[PluginConfig]) HttpCallEnd(uint32) { - if ctx.pendingCall == 0 { - return - } - ctx.pendingCall-- } func (ctx *CommonHttpCtx[PluginConfig]) SetContext(key string, value interface{}) { @@ -627,13 +599,6 @@ func (ctx *CommonHttpCtx[PluginConfig]) WriteUserAttributeToTrace() error { return nil } -func (ctx *CommonHttpCtx[PluginConfig]) GetIntContext(key string, defaultValue int) int { - if b, ok := ctx.userContext[key].(int); ok { - return b - } - return defaultValue -} - func (ctx *CommonHttpCtx[PluginConfig]) GetBoolContext(key string, defaultValue bool) bool { if b, ok := ctx.userContext[key].(bool); ok { return b @@ -721,9 +686,6 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestHeaders(numHeaders int, end if IsBinaryRequestBody() { ctx.needRequestBody = false } - if ctx.plugin.vm.isJsonRpcSever && HasRequestBody() { - return types.HeaderStopIteration - } if ctx.plugin.vm.onHttpRequestHeaders == nil { return types.ActionContinue } @@ -747,9 +709,7 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestBody(bodySize int, endOfStr } return types.ActionContinue } - if ctx.plugin.vm.onHttpRequestBody != nil || - len(ctx.plugin.vm.jsonRpcMethodHandlers) > 0 || - ctx.plugin.vm.jsonRpcRequestHandler != nil { + if ctx.plugin.vm.onHttpRequestBody != nil { ctx.requestBodySize += bodySize if !endOfStream { return types.ActionPause @@ -759,14 +719,7 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestBody(bodySize int, endOfStr ctx.plugin.vm.log.Warnf("get request body failed: %v", err) return types.ActionContinue } - if ctx.plugin.vm.onHttpRequestBody != nil { - return ctx.plugin.vm.onHttpRequestBody(ctx, *ctx.config, body) - } - if len(ctx.plugin.vm.jsonRpcMethodHandlers) > 0 { - return ctx.HandleJsonRpcMethod(ctx, *ctx.config, body, ctx.plugin.vm.jsonRpcMethodHandlers) - } - // ctx.plugin.vm.jsonRpcRequestHandler not nil - return ctx.HandleJsonRpcRequest(ctx, *ctx.config, body, ctx.plugin.vm.jsonRpcRequestHandler) + return ctx.plugin.vm.onHttpRequestBody(ctx, *ctx.config, body) } return types.ActionContinue } @@ -802,7 +755,7 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpResponseBody(bodySize int, endOfSt } return types.ActionContinue } - if ctx.plugin.vm.onHttpResponseBody != nil || ctx.plugin.vm.jsonRpcResponseHandler != nil { + if ctx.plugin.vm.onHttpResponseBody != nil { ctx.responseBodySize += bodySize if !endOfStream { return types.ActionPause @@ -812,11 +765,7 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpResponseBody(bodySize int, endOfSt ctx.plugin.vm.log.Warnf("get response body failed: %v", err) return types.ActionContinue } - if ctx.plugin.vm.onHttpResponseBody != nil { - return ctx.plugin.vm.onHttpResponseBody(ctx, *ctx.config, body) - } - // ctx.plugin.vm.jsonRpcResponseHandler not nil - return ctx.HandleJsonRpcResponse(ctx, *ctx.config, body, ctx.plugin.vm.jsonRpcResponseHandler) + return ctx.plugin.vm.onHttpResponseBody(ctx, *ctx.config, body) } return types.ActionContinue } @@ -830,15 +779,3 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpStreamDone() { } ctx.plugin.vm.onHttpStreamDone(ctx, *ctx.config) } - -func (ctx *CommonHttpCtx[PluginConfig]) RouteCall(method, url string, headers [][2]string, body []byte, callback ResponseCallback, timeoutMillisecond ...uint32) error { - // Since the HttpCall here is a substitute for route invocation, the default timeout is slightly longer, at 1 minute. - var timeout uint32 = 60000 - if len(timeoutMillisecond) > 0 { - timeout = timeoutMillisecond[0] - } - cluster := RouteCluster{ - BaseCluster: BaseCluster{notify: ctx}, - } - return HttpCall(cluster, method, url, headers, body, callback, timeout) -}