mirror of
https://github.com/alibaba/higress.git
synced 2026-03-03 16:10:48 +08:00
revert wrapper changes (#1948)
This commit is contained in:
@@ -24,19 +24,9 @@ import (
|
||||
type Cluster interface {
|
||||
ClusterName() string
|
||||
HostName() string
|
||||
HttpCallNotify() HttpCallNotify
|
||||
}
|
||||
|
||||
type BaseCluster struct {
|
||||
notify HttpCallNotify
|
||||
}
|
||||
|
||||
func (base BaseCluster) HttpCallNotify() HttpCallNotify {
|
||||
return base.notify
|
||||
}
|
||||
|
||||
type RouteCluster struct {
|
||||
BaseCluster
|
||||
Host string
|
||||
}
|
||||
|
||||
@@ -56,7 +46,6 @@ func (c RouteCluster) HostName() string {
|
||||
}
|
||||
|
||||
type TargetCluster struct {
|
||||
BaseCluster
|
||||
Host string
|
||||
Cluster string
|
||||
}
|
||||
@@ -70,7 +59,6 @@ func (c TargetCluster) HostName() string {
|
||||
}
|
||||
|
||||
type K8sCluster struct {
|
||||
BaseCluster
|
||||
ServiceName string
|
||||
Namespace string
|
||||
Port int64
|
||||
@@ -95,7 +83,6 @@ func (c K8sCluster) HostName() string {
|
||||
}
|
||||
|
||||
type NacosCluster struct {
|
||||
BaseCluster
|
||||
ServiceName string
|
||||
// use DEFAULT-GROUP by default
|
||||
Group string
|
||||
@@ -128,7 +115,6 @@ func (c NacosCluster) HostName() string {
|
||||
}
|
||||
|
||||
type StaticIpCluster struct {
|
||||
BaseCluster
|
||||
ServiceName string
|
||||
Port int64
|
||||
Host string
|
||||
@@ -146,7 +132,6 @@ func (c StaticIpCluster) HostName() string {
|
||||
}
|
||||
|
||||
type DnsCluster struct {
|
||||
BaseCluster
|
||||
ServiceName string
|
||||
Domain string
|
||||
Port int64
|
||||
@@ -161,7 +146,6 @@ func (c DnsCluster) HostName() string {
|
||||
}
|
||||
|
||||
type ConsulCluster struct {
|
||||
BaseCluster
|
||||
ServiceName string
|
||||
Datacenter string
|
||||
Port int64
|
||||
@@ -182,7 +166,6 @@ func (c ConsulCluster) HostName() string {
|
||||
}
|
||||
|
||||
type FQDNCluster struct {
|
||||
BaseCluster
|
||||
FQDN string
|
||||
Host string
|
||||
Port int64
|
||||
|
||||
@@ -25,11 +25,6 @@ import (
|
||||
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
|
||||
)
|
||||
|
||||
type HttpCallNotify interface {
|
||||
HttpCallStart(uint32)
|
||||
HttpCallEnd(uint32)
|
||||
}
|
||||
|
||||
type ResponseCallback func(statusCode int, responseHeaders http.Header, responseBody []byte)
|
||||
|
||||
type HttpClient interface {
|
||||
@@ -113,9 +108,7 @@ func HttpCall(cluster Cluster, method, rawURL string, headers [][2]string, body
|
||||
}
|
||||
headers = append(headers, [2]string{":method", method}, [2]string{":path", path}, [2]string{":authority", authority})
|
||||
requestID := uuid.New().String()
|
||||
httpCallNotify := cluster.HttpCallNotify()
|
||||
var callID uint32
|
||||
callID, err = proxywasm.DispatchHttpCall(cluster.ClusterName(), headers, body, nil, timeout, func(numHeaders, bodySize, numTrailers int) {
|
||||
_, err = proxywasm.DispatchHttpCall(cluster.ClusterName(), headers, body, nil, timeout, func(numHeaders, bodySize, numTrailers int) {
|
||||
respBody, err := proxywasm.GetHttpCallResponseBody(0, bodySize)
|
||||
if err != nil {
|
||||
proxywasm.LogCriticalf("failed to get response body: %v", err)
|
||||
@@ -142,12 +135,8 @@ func HttpCall(cluster Cluster, method, rawURL string, headers [][2]string, body
|
||||
proxywasm.LogDebugf("http call end, id: %s, code: %d, normal: %t, body: %s",
|
||||
requestID, code, normalResponse, respBody)
|
||||
callback(code, headers, respBody)
|
||||
httpCallNotify.HttpCallEnd(callID)
|
||||
})
|
||||
if err == nil {
|
||||
httpCallNotify.HttpCallStart(callID)
|
||||
proxywasm.LogDebugf("http call start, id: %s, cluster: %s, method: %s, url: %s, headers: %#v, body: %s, timeout: %d",
|
||||
requestID, cluster.ClusterName(), method, rawURL, headers, body, timeout)
|
||||
}
|
||||
proxywasm.LogDebugf("http call start, id: %s, cluster: %s, method: %s, url: %s, headers: %#v, body: %s, timeout: %d",
|
||||
requestID, cluster.ClusterName(), method, rawURL, headers, body, timeout)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,123 +0,0 @@
|
||||
// Copyright (c) 2022 Alibaba Group Holding Ltd.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wrapper
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/alibaba/higress/plugins/wasm-go/pkg/log"
|
||||
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
|
||||
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
|
||||
"github.com/tidwall/gjson"
|
||||
"github.com/tidwall/sjson"
|
||||
)
|
||||
|
||||
const (
|
||||
CtxJsonRpcID = "jsonRpcID"
|
||||
JError = "error"
|
||||
JCode = "code"
|
||||
JMessage = "message"
|
||||
JResult = "result"
|
||||
|
||||
ErrParseError = -32700
|
||||
ErrInvalidRequest = -32600
|
||||
ErrMethodNotFound = -32601
|
||||
ErrInvalidParams = -32602
|
||||
ErrInternalError = -32603
|
||||
)
|
||||
|
||||
type JsonRpcRequestHandler[PluginConfig any] func(context HttpContext, config PluginConfig, id int64, method string, params gjson.Result) types.Action
|
||||
|
||||
type JsonRpcResponseHandler[PluginConfig any] func(context HttpContext, config PluginConfig, id int64, result gjson.Result, error gjson.Result) types.Action
|
||||
|
||||
type JsonRpcMethodHandler[PluginConfig any] func(context HttpContext, config PluginConfig, id int64, params gjson.Result) error
|
||||
|
||||
type MethodHandlers[PluginConfig any] map[string]JsonRpcMethodHandler[PluginConfig]
|
||||
|
||||
func sendJsonRpcResponse(id int64, extras map[string]any, debugInfo string) {
|
||||
body := []byte(`{"jsonrpc": "2.0"}`)
|
||||
body, _ = sjson.SetBytes(body, "id", id)
|
||||
for key, value := range extras {
|
||||
body, _ = sjson.SetBytes(body, key, value)
|
||||
}
|
||||
proxywasm.SendHttpResponseWithDetail(200, debugInfo, [][2]string{{"Content-Type", "application/json; charset=utf-8"}}, body, -1)
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) OnJsonRpcResponseSuccess(result map[string]any) {
|
||||
var (
|
||||
id int64
|
||||
ok bool
|
||||
)
|
||||
if id, ok = ctx.userContext[CtxJsonRpcID].(int64); !ok {
|
||||
proxywasm.SendHttpResponseWithDetail(500, "not_found_json_rpc_id", nil, []byte("not found json rpc id"), -1)
|
||||
return
|
||||
}
|
||||
sendJsonRpcResponse(id, map[string]any{JResult: result}, "json_rpc_success")
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) OnJsonRpcResponseError(err error, code ...int) {
|
||||
var (
|
||||
id int64
|
||||
ok bool
|
||||
)
|
||||
if id, ok = ctx.userContext[CtxJsonRpcID].(int64); !ok {
|
||||
proxywasm.SendHttpResponseWithDetail(500, "not_found_json_rpc_id", nil, []byte("not found json rpc id"), -1)
|
||||
return
|
||||
}
|
||||
errorCode := ErrInternalError
|
||||
if len(code) > 0 {
|
||||
errorCode = code[0]
|
||||
}
|
||||
sendJsonRpcResponse(id, map[string]any{JError: map[string]any{
|
||||
JMessage: err.Error(),
|
||||
JCode: errorCode,
|
||||
}}, "json_rpc_error")
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) HandleJsonRpcMethod(context HttpContext, config PluginConfig, body []byte, handles MethodHandlers[PluginConfig]) types.Action {
|
||||
id := gjson.GetBytes(body, "id").Int()
|
||||
ctx.userContext[CtxJsonRpcID] = id
|
||||
method := gjson.GetBytes(body, "method").String()
|
||||
params := gjson.GetBytes(body, "params")
|
||||
if handle, ok := handles[method]; ok {
|
||||
log.Debugf("json rpc call id[%d] method[%s] with params[%s]", id, method, params.Raw)
|
||||
err := handle(context, config, id, params)
|
||||
if err != nil {
|
||||
ctx.OnJsonRpcResponseError(err)
|
||||
return types.ActionContinue
|
||||
}
|
||||
// Waiting for the response
|
||||
return types.ActionPause
|
||||
}
|
||||
ctx.OnJsonRpcResponseError(fmt.Errorf("method not found:%s", method), ErrMethodNotFound)
|
||||
return types.ActionContinue
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) HandleJsonRpcRequest(context HttpContext, config PluginConfig, body []byte, handle JsonRpcRequestHandler[PluginConfig]) types.Action {
|
||||
id := gjson.GetBytes(body, "id").Int()
|
||||
ctx.userContext[CtxJsonRpcID] = id
|
||||
method := gjson.GetBytes(body, "method").String()
|
||||
params := gjson.GetBytes(body, "params")
|
||||
log.Debugf("json rpc call id[%d] method[%s] with params[%s]", id, method, params.Raw)
|
||||
return handle(context, config, id, method, params)
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) HandleJsonRpcResponse(context HttpContext, config PluginConfig, body []byte, handle JsonRpcResponseHandler[PluginConfig]) types.Action {
|
||||
id := gjson.GetBytes(body, "id").Int()
|
||||
error := gjson.GetBytes(body, "error")
|
||||
result := gjson.GetBytes(body, "result")
|
||||
log.Debugf("json rpc response id[%d] error[%s] result[%s]", id, error.Raw, result.Raw)
|
||||
return handle(context, config, id, result, error)
|
||||
}
|
||||
@@ -1,221 +0,0 @@
|
||||
// Copyright (c) 2022 Alibaba Group Holding Ltd.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wrapper
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/alibaba/higress/plugins/wasm-go/pkg/log"
|
||||
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
|
||||
"github.com/invopop/jsonschema"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
type MCPTool[PluginConfig any] interface {
|
||||
Create(params []byte) MCPTool[PluginConfig]
|
||||
Call(context HttpContext, config PluginConfig) error
|
||||
Description() string
|
||||
InputSchema() map[string]any
|
||||
}
|
||||
|
||||
type MCPTools[PluginConfig any] map[string]MCPTool[PluginConfig]
|
||||
|
||||
type addMCPToolOption[PluginConfig any] struct {
|
||||
name string
|
||||
tool MCPTool[PluginConfig]
|
||||
}
|
||||
|
||||
func AddMCPTool[PluginConfig any](name string, tool MCPTool[PluginConfig]) CtxOption[PluginConfig] {
|
||||
return &addMCPToolOption[PluginConfig]{
|
||||
name: name,
|
||||
tool: tool,
|
||||
}
|
||||
}
|
||||
|
||||
func (o *addMCPToolOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) {
|
||||
ctx.isJsonRpcSever = true
|
||||
ctx.handleJsonRpcMethod = true
|
||||
if _, exist := ctx.mcpTools[o.name]; exist {
|
||||
panic(fmt.Sprintf("Conflict! There is a tool with the same name:%s",
|
||||
o.name))
|
||||
}
|
||||
ctx.mcpTools[o.name] = o.tool
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) OnMCPResponseSuccess(result map[string]any) {
|
||||
ctx.OnJsonRpcResponseSuccess(result)
|
||||
// TODO: support pub to redis when use POST + SSE
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) OnMCPResponseError(err error, code ...int) {
|
||||
ctx.OnJsonRpcResponseError(err, code...)
|
||||
// TODO: support pub to redis when use POST + SSE
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) OnMCPToolCallSuccess(content []map[string]any) {
|
||||
ctx.OnMCPResponseSuccess(map[string]any{
|
||||
"content": content,
|
||||
"isError": false,
|
||||
})
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) OnMCPToolCallError(err error) {
|
||||
ctx.OnMCPResponseSuccess(map[string]any{
|
||||
"content": []map[string]any{
|
||||
{
|
||||
"type": "text",
|
||||
"text": err.Error(),
|
||||
},
|
||||
},
|
||||
"isError": true,
|
||||
})
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) SendMCPToolTextResult(result string) {
|
||||
ctx.OnMCPToolCallSuccess([]map[string]any{
|
||||
{
|
||||
"type": "text",
|
||||
"text": result,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) registerMCPTools(mcpTools MCPTools[PluginConfig]) {
|
||||
if !ctx.plugin.vm.isJsonRpcSever {
|
||||
return
|
||||
}
|
||||
if !ctx.plugin.vm.handleJsonRpcMethod {
|
||||
return
|
||||
}
|
||||
ctx.plugin.vm.jsonRpcMethodHandlers["tools/list"] = func(context HttpContext, config PluginConfig, id int64, params gjson.Result) error {
|
||||
var tools []map[string]any
|
||||
for name, tool := range mcpTools {
|
||||
tools = append(tools, map[string]any{
|
||||
"name": name,
|
||||
"description": tool.Description(),
|
||||
"inputSchema": tool.InputSchema(),
|
||||
})
|
||||
}
|
||||
ctx.OnMCPResponseSuccess(map[string]any{
|
||||
"tools": tools,
|
||||
"nextCursor": "",
|
||||
})
|
||||
return nil
|
||||
}
|
||||
ctx.plugin.vm.jsonRpcMethodHandlers["tools/call"] = func(context HttpContext, config PluginConfig, id int64, params gjson.Result) error {
|
||||
name := params.Get("name").String()
|
||||
args := params.Get("arguments")
|
||||
if tool, ok := mcpTools[name]; ok {
|
||||
log.Debugf("mcp call tool[%s] with arguments[%s]", name, args.Raw)
|
||||
toolInstance := tool.Create([]byte(args.Raw))
|
||||
err := toolInstance.Call(context, config)
|
||||
// TODO: validate the json schema through github.com/kaptinlin/jsonschema
|
||||
if err != nil {
|
||||
ctx.OnMCPToolCallError(err)
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ctx.OnMCPResponseError(errors.New("Unknown tool: invalid_tool_name"), ErrInvalidParams)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
type mcpToolRequestFunc[PluginConfig any] func(context HttpContext, config PluginConfig, toolName string, toolArgs gjson.Result) types.Action
|
||||
type mcpToolResponseFunc[PluginConfig any] func(context HttpContext, config PluginConfig, isError bool, content gjson.Result) types.Action
|
||||
type jsonRpcErrorFunc[PluginConfig any] func(context HttpContext, config PluginConfig, errorCode int64, errorMessage string) types.Action
|
||||
|
||||
type mcpToolRequestOption[PluginConfig any] struct {
|
||||
f mcpToolRequestFunc[PluginConfig]
|
||||
}
|
||||
|
||||
func OnMCPToolRequest[PluginConfig any](f mcpToolRequestFunc[PluginConfig]) CtxOption[PluginConfig] {
|
||||
return &mcpToolRequestOption[PluginConfig]{f}
|
||||
}
|
||||
|
||||
func (o *mcpToolRequestOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) {
|
||||
ctx.isJsonRpcSever = true
|
||||
ctx.onMcpToolRequest = o.f
|
||||
}
|
||||
|
||||
type mcpToolResponseOption[PluginConfig any] struct {
|
||||
f mcpToolResponseFunc[PluginConfig]
|
||||
}
|
||||
|
||||
func OnMCPToolResponse[PluginConfig any](f mcpToolResponseFunc[PluginConfig]) CtxOption[PluginConfig] {
|
||||
return &mcpToolResponseOption[PluginConfig]{f}
|
||||
}
|
||||
|
||||
func (o *mcpToolResponseOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) {
|
||||
ctx.isJsonRpcSever = true
|
||||
ctx.onMcpToolResponse = o.f
|
||||
}
|
||||
|
||||
type jsonRpcErrorOption[PluginConfig any] struct {
|
||||
f jsonRpcErrorFunc[PluginConfig]
|
||||
}
|
||||
|
||||
func OnJsonRpcError[PluginConfig any](f jsonRpcErrorFunc[PluginConfig]) CtxOption[PluginConfig] {
|
||||
return &jsonRpcErrorOption[PluginConfig]{f}
|
||||
}
|
||||
|
||||
func (o *jsonRpcErrorOption[PluginConfig]) Apply(ctx *CommonVmCtx[PluginConfig]) {
|
||||
ctx.isJsonRpcSever = true
|
||||
ctx.onJsonRpcError = o.f
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) registerMCPToolProcessor() {
|
||||
if !ctx.plugin.vm.isJsonRpcSever {
|
||||
return
|
||||
}
|
||||
if ctx.plugin.vm.handleJsonRpcMethod {
|
||||
return
|
||||
}
|
||||
if ctx.plugin.vm.onMcpToolRequest != nil {
|
||||
ctx.plugin.vm.jsonRpcRequestHandler = func(context HttpContext, config PluginConfig, id int64, method string, params gjson.Result) types.Action {
|
||||
toolName := params.Get("name").String()
|
||||
toolArgs := params.Get("arguments")
|
||||
return ctx.plugin.vm.onMcpToolRequest(context, config, toolName, toolArgs)
|
||||
}
|
||||
}
|
||||
if ctx.plugin.vm.onMcpToolResponse != nil {
|
||||
ctx.plugin.vm.jsonRpcResponseHandler = func(context HttpContext, config PluginConfig, id int64, result, error gjson.Result) types.Action {
|
||||
if result.Exists() {
|
||||
isError := result.Get("isError").Bool()
|
||||
content := result.Get("content")
|
||||
return ctx.plugin.vm.onMcpToolResponse(context, config, isError, content)
|
||||
}
|
||||
if error.Exists() && ctx.plugin.vm.onJsonRpcError != nil {
|
||||
return ctx.plugin.vm.onJsonRpcError(context, config, error.Get("code").Int(), error.Get("message").String())
|
||||
}
|
||||
return types.ActionContinue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ToInputSchema(v any) map[string]any {
|
||||
t := reflect.TypeOf(v)
|
||||
if t.Kind() == reflect.Ptr {
|
||||
t = t.Elem()
|
||||
}
|
||||
inputSchema := jsonschema.Reflect(v).Definitions[t.Name()]
|
||||
inputSchemaBytes, _ := json.Marshal(inputSchema)
|
||||
var result map[string]any
|
||||
json.Unmarshal(inputSchemaBytes, &result)
|
||||
return result
|
||||
}
|
||||
@@ -70,11 +70,6 @@ type HttpContext interface {
|
||||
SetRequestBodyBufferLimit(byteSize uint32)
|
||||
// Note that this parameter affects the gateway's memory usage! Support setting a maximum buffer size for each response body individually in response phase.
|
||||
SetResponseBodyBufferLimit(byteSize uint32)
|
||||
// Make a request to the target service of the current route using the specified URL and header.
|
||||
RouteCall(method, url string, headers [][2]string, body []byte, callback ResponseCallback, timeoutMillisecond ...uint32) error
|
||||
OnMCPToolCallSuccess(content []map[string]any)
|
||||
OnMCPToolCallError(err error)
|
||||
SendMCPToolTextResult(result string)
|
||||
}
|
||||
|
||||
type oldParseConfigFunc[PluginConfig any] func(json gjson.Result, config *PluginConfig, log log.Log) error
|
||||
@@ -105,15 +100,6 @@ type CommonVmCtx[PluginConfig any] struct {
|
||||
onHttpResponseBody onHttpBodyFunc[PluginConfig]
|
||||
onHttpStreamingResponseBody onHttpStreamingBodyFunc[PluginConfig]
|
||||
onHttpStreamDone onHttpStreamDoneFunc[PluginConfig]
|
||||
isJsonRpcSever bool
|
||||
handleJsonRpcMethod bool
|
||||
jsonRpcMethodHandlers MethodHandlers[PluginConfig]
|
||||
mcpTools MCPTools[PluginConfig]
|
||||
onMcpToolRequest mcpToolRequestFunc[PluginConfig]
|
||||
onMcpToolResponse mcpToolResponseFunc[PluginConfig]
|
||||
onJsonRpcError jsonRpcErrorFunc[PluginConfig]
|
||||
jsonRpcRequestHandler JsonRpcRequestHandler[PluginConfig]
|
||||
jsonRpcResponseHandler JsonRpcResponseHandler[PluginConfig]
|
||||
}
|
||||
|
||||
type TickFuncEntry struct {
|
||||
@@ -407,10 +393,8 @@ func NewCommonVmCtx[PluginConfig any](pluginName string, options ...CtxOption[Pl
|
||||
|
||||
func NewCommonVmCtxWithOptions[PluginConfig any](pluginName string, options ...CtxOption[PluginConfig]) *CommonVmCtx[PluginConfig] {
|
||||
ctx := &CommonVmCtx[PluginConfig]{
|
||||
pluginName: pluginName,
|
||||
hasCustomConfig: true,
|
||||
jsonRpcMethodHandlers: make(MethodHandlers[PluginConfig]),
|
||||
mcpTools: make(MCPTools[PluginConfig]),
|
||||
pluginName: pluginName,
|
||||
hasCustomConfig: true,
|
||||
}
|
||||
for _, opt := range options {
|
||||
opt.Apply(ctx)
|
||||
@@ -419,6 +403,7 @@ func NewCommonVmCtxWithOptions[PluginConfig any](pluginName string, options ...C
|
||||
var config PluginConfig
|
||||
if unsafe.Sizeof(config) != 0 {
|
||||
msg := "the `parseConfig` is missing in NewCommonVmCtx's arguments"
|
||||
ctx.log.Critical(msg)
|
||||
panic(msg)
|
||||
}
|
||||
ctx.hasCustomConfig = false
|
||||
@@ -510,12 +495,10 @@ func (ctx *CommonPluginCtx[PluginConfig]) NewHttpContext(contextID uint32) types
|
||||
userContext: map[string]interface{}{},
|
||||
userAttribute: map[string]interface{}{},
|
||||
}
|
||||
httpCtx.registerMCPTools(ctx.vm.mcpTools)
|
||||
httpCtx.registerMCPToolProcessor()
|
||||
if ctx.vm.onHttpRequestBody != nil || ctx.vm.onHttpStreamingRequestBody != nil || len(ctx.vm.jsonRpcMethodHandlers) > 0 || ctx.vm.jsonRpcRequestHandler != nil {
|
||||
if ctx.vm.onHttpRequestBody != nil || ctx.vm.onHttpStreamingRequestBody != nil {
|
||||
httpCtx.needRequestBody = true
|
||||
}
|
||||
if ctx.vm.onHttpResponseBody != nil || ctx.vm.onHttpStreamingResponseBody != nil || ctx.vm.jsonRpcResponseHandler != nil {
|
||||
if ctx.vm.onHttpResponseBody != nil || ctx.vm.onHttpStreamingResponseBody != nil {
|
||||
httpCtx.needResponseBody = true
|
||||
}
|
||||
if ctx.vm.onHttpStreamingRequestBody != nil {
|
||||
@@ -524,6 +507,7 @@ func (ctx *CommonPluginCtx[PluginConfig]) NewHttpContext(contextID uint32) types
|
||||
if ctx.vm.onHttpStreamingResponseBody != nil {
|
||||
httpCtx.streamingResponseBody = true
|
||||
}
|
||||
|
||||
return httpCtx
|
||||
}
|
||||
|
||||
@@ -540,18 +524,6 @@ type CommonHttpCtx[PluginConfig any] struct {
|
||||
contextID uint32
|
||||
userContext map[string]interface{}
|
||||
userAttribute map[string]interface{}
|
||||
pendingCall int
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) HttpCallStart(uint32) {
|
||||
ctx.pendingCall++
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) HttpCallEnd(uint32) {
|
||||
if ctx.pendingCall == 0 {
|
||||
return
|
||||
}
|
||||
ctx.pendingCall--
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) SetContext(key string, value interface{}) {
|
||||
@@ -627,13 +599,6 @@ func (ctx *CommonHttpCtx[PluginConfig]) WriteUserAttributeToTrace() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) GetIntContext(key string, defaultValue int) int {
|
||||
if b, ok := ctx.userContext[key].(int); ok {
|
||||
return b
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) GetBoolContext(key string, defaultValue bool) bool {
|
||||
if b, ok := ctx.userContext[key].(bool); ok {
|
||||
return b
|
||||
@@ -721,9 +686,6 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestHeaders(numHeaders int, end
|
||||
if IsBinaryRequestBody() {
|
||||
ctx.needRequestBody = false
|
||||
}
|
||||
if ctx.plugin.vm.isJsonRpcSever && HasRequestBody() {
|
||||
return types.HeaderStopIteration
|
||||
}
|
||||
if ctx.plugin.vm.onHttpRequestHeaders == nil {
|
||||
return types.ActionContinue
|
||||
}
|
||||
@@ -747,9 +709,7 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestBody(bodySize int, endOfStr
|
||||
}
|
||||
return types.ActionContinue
|
||||
}
|
||||
if ctx.plugin.vm.onHttpRequestBody != nil ||
|
||||
len(ctx.plugin.vm.jsonRpcMethodHandlers) > 0 ||
|
||||
ctx.plugin.vm.jsonRpcRequestHandler != nil {
|
||||
if ctx.plugin.vm.onHttpRequestBody != nil {
|
||||
ctx.requestBodySize += bodySize
|
||||
if !endOfStream {
|
||||
return types.ActionPause
|
||||
@@ -759,14 +719,7 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestBody(bodySize int, endOfStr
|
||||
ctx.plugin.vm.log.Warnf("get request body failed: %v", err)
|
||||
return types.ActionContinue
|
||||
}
|
||||
if ctx.plugin.vm.onHttpRequestBody != nil {
|
||||
return ctx.plugin.vm.onHttpRequestBody(ctx, *ctx.config, body)
|
||||
}
|
||||
if len(ctx.plugin.vm.jsonRpcMethodHandlers) > 0 {
|
||||
return ctx.HandleJsonRpcMethod(ctx, *ctx.config, body, ctx.plugin.vm.jsonRpcMethodHandlers)
|
||||
}
|
||||
// ctx.plugin.vm.jsonRpcRequestHandler not nil
|
||||
return ctx.HandleJsonRpcRequest(ctx, *ctx.config, body, ctx.plugin.vm.jsonRpcRequestHandler)
|
||||
return ctx.plugin.vm.onHttpRequestBody(ctx, *ctx.config, body)
|
||||
}
|
||||
return types.ActionContinue
|
||||
}
|
||||
@@ -802,7 +755,7 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpResponseBody(bodySize int, endOfSt
|
||||
}
|
||||
return types.ActionContinue
|
||||
}
|
||||
if ctx.plugin.vm.onHttpResponseBody != nil || ctx.plugin.vm.jsonRpcResponseHandler != nil {
|
||||
if ctx.plugin.vm.onHttpResponseBody != nil {
|
||||
ctx.responseBodySize += bodySize
|
||||
if !endOfStream {
|
||||
return types.ActionPause
|
||||
@@ -812,11 +765,7 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpResponseBody(bodySize int, endOfSt
|
||||
ctx.plugin.vm.log.Warnf("get response body failed: %v", err)
|
||||
return types.ActionContinue
|
||||
}
|
||||
if ctx.plugin.vm.onHttpResponseBody != nil {
|
||||
return ctx.plugin.vm.onHttpResponseBody(ctx, *ctx.config, body)
|
||||
}
|
||||
// ctx.plugin.vm.jsonRpcResponseHandler not nil
|
||||
return ctx.HandleJsonRpcResponse(ctx, *ctx.config, body, ctx.plugin.vm.jsonRpcResponseHandler)
|
||||
return ctx.plugin.vm.onHttpResponseBody(ctx, *ctx.config, body)
|
||||
}
|
||||
return types.ActionContinue
|
||||
}
|
||||
@@ -830,15 +779,3 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpStreamDone() {
|
||||
}
|
||||
ctx.plugin.vm.onHttpStreamDone(ctx, *ctx.config)
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) RouteCall(method, url string, headers [][2]string, body []byte, callback ResponseCallback, timeoutMillisecond ...uint32) error {
|
||||
// Since the HttpCall here is a substitute for route invocation, the default timeout is slightly longer, at 1 minute.
|
||||
var timeout uint32 = 60000
|
||||
if len(timeoutMillisecond) > 0 {
|
||||
timeout = timeoutMillisecond[0]
|
||||
}
|
||||
cluster := RouteCluster{
|
||||
BaseCluster: BaseCluster{notify: ctx},
|
||||
}
|
||||
return HttpCall(cluster, method, url, headers, body, callback, timeout)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user