diff --git a/plugins/wasm-go/extensions/streaming-body-example/go.mod b/plugins/wasm-go/extensions/streaming-body-example/go.mod new file mode 100644 index 000000000..0eda554af --- /dev/null +++ b/plugins/wasm-go/extensions/streaming-body-example/go.mod @@ -0,0 +1,20 @@ +module github.com/alibaba/higress/plugins/wasm-go/extensions/streaming-body-example + +go 1.18 + +replace github.com/alibaba/higress/plugins/wasm-go => ../.. + +require ( + github.com/alibaba/higress/plugins/wasm-go v0.0.0 + github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240318034951-d5306e367c43 +) + +require ( + github.com/google/uuid v1.3.0 // indirect + github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 // indirect + github.com/magefile/mage v1.14.0 // indirect + github.com/tidwall/gjson v1.14.3 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/resp v0.1.1 // indirect +) diff --git a/plugins/wasm-go/extensions/streaming-body-example/go.sum b/plugins/wasm-go/extensions/streaming-body-example/go.sum new file mode 100644 index 000000000..37ff5ba6c --- /dev/null +++ b/plugins/wasm-go/extensions/streaming-body-example/go.sum @@ -0,0 +1,21 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 h1:IHDghbGQ2DTIXHBHxWfqCYQW1fKjyJ/I7W1pMyUDeEA= +github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520/go.mod h1:Nz8ORLaFiLWotg6GeKlJMhv8cci8mM43uEnLA5t8iew= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240226064518-b3dc4646a35a h1:luYRvxLTE1xYxrXYj7nmjd1U0HHh8pUPiKfdZ0MhCGE= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240226064518-b3dc4646a35a/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240318034951-d5306e367c43/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo= +github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= +github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw= +github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/resp v0.1.1 h1:Ly20wkhqKTmDUPlyM1S7pWo5kk0tDu8OoC/vFArXmwE= +github.com/tidwall/resp v0.1.1/go.mod h1:3/FrruOBAxPTPtundW0VXgmsQ4ZBA0Aw714lVYgwFa0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/plugins/wasm-go/extensions/streaming-body-example/main.go b/plugins/wasm-go/extensions/streaming-body-example/main.go new file mode 100644 index 000000000..7a3a8301d --- /dev/null +++ b/plugins/wasm-go/extensions/streaming-body-example/main.go @@ -0,0 +1,55 @@ +// Copyright (c) 2022 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" + + "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" +) + +func main() { + wrapper.SetCtx( + "streaming-body-example", + wrapper.ProcessRequestHeadersBy(onHttpRequestHeaders), + wrapper.ProcessStreamingRequestBodyBy(onHttpRequestBody), + wrapper.ProcessResponseHeadersBy(onHttpResponseHeaders), + wrapper.ProcessStreamingResponseBodyBy(onHttpResponseBody), + ) +} + +type Config struct { +} + +func onHttpRequestHeaders(ctx wrapper.HttpContext, config Config, log wrapper.Log) types.Action { + proxywasm.RemoveHttpRequestHeader("content-length") + return types.ActionContinue +} + +func onHttpRequestBody(ctx wrapper.HttpContext, config Config, chunk []byte, isLastChunk bool, log wrapper.Log) []byte { + log.Infof("receive request body chunk:%s, isLastChunk:%v", chunk, isLastChunk) + return []byte("test\n") +} + +func onHttpResponseHeaders(ctx wrapper.HttpContext, config Config, log wrapper.Log) types.Action { + proxywasm.RemoveHttpResponseHeader("content-length") + return types.ActionContinue +} + +func onHttpResponseBody(ctx wrapper.HttpContext, config Config, chunk []byte, isLastChunk bool, log wrapper.Log) []byte { + log.Infof("receive response body chunk:%s, isLastChunk:%v", chunk, isLastChunk) + return []byte("test\n") +} diff --git a/plugins/wasm-go/go.mod b/plugins/wasm-go/go.mod index 3e4cca90e..dbb2ddfa3 100644 --- a/plugins/wasm-go/go.mod +++ b/plugins/wasm-go/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/google/uuid v1.3.0 github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 - github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240318034951-d5306e367c43 + github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc github.com/stretchr/testify v1.8.4 github.com/tidwall/gjson v1.14.3 github.com/tidwall/resp v0.1.1 diff --git a/plugins/wasm-go/go.sum b/plugins/wasm-go/go.sum index cdf88dfc6..c45752eb6 100644 --- a/plugins/wasm-go/go.sum +++ b/plugins/wasm-go/go.sum @@ -8,6 +8,8 @@ github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240226064518-b3dc4646a35a h1 github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240226064518-b3dc4646a35a/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo= github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240318034951-d5306e367c43 h1:dCw7F/9ciw4NZN7w68wQRaygZ2zGOWMTIEoRvP1tlWs= github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240318034951-d5306e367c43/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc h1:t2AT8zb6N/59Y78lyRWedVoVWHNRSCBh0oWCC+bluTQ= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo= github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go b/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go index aa690c60e..b4f6a6cfd 100644 --- a/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go +++ b/plugins/wasm-go/pkg/wrapper/plugin_wrapper.go @@ -41,26 +41,33 @@ type HttpContext interface { DontReadRequestBody() // If the onHttpResponseBody handle is not set, the request body will not be read by default DontReadResponseBody() + // If the onHttpStreamingRequestBody handle is not set, and the onHttpRequestBody handle is set, the request body will be buffered by default + BufferRequestBody() + // If the onHttpStreamingResponseBody handle is not set, and the onHttpResponseBody handle is set, the response body will be buffered by default + BufferResponseBody() } type ParseConfigFunc[PluginConfig any] func(json gjson.Result, config *PluginConfig, log Log) error type ParseRuleConfigFunc[PluginConfig any] func(json gjson.Result, global PluginConfig, config *PluginConfig, log Log) error type onHttpHeadersFunc[PluginConfig any] func(context HttpContext, config PluginConfig, log Log) types.Action type onHttpBodyFunc[PluginConfig any] func(context HttpContext, config PluginConfig, body []byte, log Log) types.Action +type onHttpStreamingBodyFunc[PluginConfig any] func(context HttpContext, config PluginConfig, chunk []byte, isLastChunk bool, log Log) []byte type onHttpStreamDoneFunc[PluginConfig any] func(context HttpContext, config PluginConfig, log Log) type CommonVmCtx[PluginConfig any] struct { types.DefaultVMContext - pluginName string - log Log - hasCustomConfig bool - parseConfig ParseConfigFunc[PluginConfig] - parseRuleConfig ParseRuleConfigFunc[PluginConfig] - onHttpRequestHeaders onHttpHeadersFunc[PluginConfig] - onHttpRequestBody onHttpBodyFunc[PluginConfig] - onHttpResponseHeaders onHttpHeadersFunc[PluginConfig] - onHttpResponseBody onHttpBodyFunc[PluginConfig] - onHttpStreamDone onHttpStreamDoneFunc[PluginConfig] + pluginName string + log Log + hasCustomConfig bool + parseConfig ParseConfigFunc[PluginConfig] + parseRuleConfig ParseRuleConfigFunc[PluginConfig] + onHttpRequestHeaders onHttpHeadersFunc[PluginConfig] + onHttpRequestBody onHttpBodyFunc[PluginConfig] + onHttpStreamingRequestBody onHttpStreamingBodyFunc[PluginConfig] + onHttpResponseHeaders onHttpHeadersFunc[PluginConfig] + onHttpResponseBody onHttpBodyFunc[PluginConfig] + onHttpStreamingResponseBody onHttpStreamingBodyFunc[PluginConfig] + onHttpStreamDone onHttpStreamDoneFunc[PluginConfig] } func SetCtx[PluginConfig any](pluginName string, setFuncs ...SetPluginFunc[PluginConfig]) { @@ -94,6 +101,12 @@ func ProcessRequestBodyBy[PluginConfig any](f onHttpBodyFunc[PluginConfig]) SetP } } +func ProcessStreamingRequestBodyBy[PluginConfig any](f onHttpStreamingBodyFunc[PluginConfig]) SetPluginFunc[PluginConfig] { + return func(ctx *CommonVmCtx[PluginConfig]) { + ctx.onHttpStreamingRequestBody = f + } +} + func ProcessResponseHeadersBy[PluginConfig any](f onHttpHeadersFunc[PluginConfig]) SetPluginFunc[PluginConfig] { return func(ctx *CommonVmCtx[PluginConfig]) { ctx.onHttpResponseHeaders = f @@ -106,6 +119,12 @@ func ProcessResponseBodyBy[PluginConfig any](f onHttpBodyFunc[PluginConfig]) Set } } +func ProcessStreamingResponseBodyBy[PluginConfig any](f onHttpStreamingBodyFunc[PluginConfig]) SetPluginFunc[PluginConfig] { + return func(ctx *CommonVmCtx[PluginConfig]) { + ctx.onHttpStreamingResponseBody = f + } +} + func ProcessStreamDoneBy[PluginConfig any](f onHttpStreamDoneFunc[PluginConfig]) SetPluginFunc[PluginConfig] { return func(ctx *CommonVmCtx[PluginConfig]) { ctx.onHttpStreamDone = f @@ -195,25 +214,34 @@ func (ctx *CommonPluginCtx[PluginConfig]) NewHttpContext(contextID uint32) types contextID: contextID, userContext: map[string]interface{}{}, } - if ctx.vm.onHttpRequestBody != nil { + if ctx.vm.onHttpRequestBody != nil || ctx.vm.onHttpStreamingRequestBody != nil { httpCtx.needRequestBody = true } - if ctx.vm.onHttpResponseBody != nil { + if ctx.vm.onHttpResponseBody != nil || ctx.vm.onHttpStreamingResponseBody != nil { httpCtx.needResponseBody = true } + if ctx.vm.onHttpStreamingRequestBody != nil { + httpCtx.streamingRequestBody = true + } + if ctx.vm.onHttpStreamingResponseBody != nil { + httpCtx.streamingResponseBody = true + } + return httpCtx } type CommonHttpCtx[PluginConfig any] struct { types.DefaultHttpContext - plugin *CommonPluginCtx[PluginConfig] - config *PluginConfig - needRequestBody bool - needResponseBody bool - requestBodySize int - responseBodySize int - contextID uint32 - userContext map[string]interface{} + plugin *CommonPluginCtx[PluginConfig] + config *PluginConfig + needRequestBody bool + needResponseBody bool + streamingRequestBody bool + streamingResponseBody bool + requestBodySize int + responseBodySize int + contextID uint32 + userContext map[string]interface{} } func (ctx *CommonHttpCtx[PluginConfig]) SetContext(key string, value interface{}) { @@ -252,6 +280,14 @@ func (ctx *CommonHttpCtx[PluginConfig]) DontReadResponseBody() { ctx.needResponseBody = false } +func (ctx *CommonHttpCtx[PluginConfig]) BufferRequestBody() { + ctx.streamingRequestBody = false +} + +func (ctx *CommonHttpCtx[PluginConfig]) BufferResponseBody() { + ctx.streamingResponseBody = false +} + func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestHeaders(numHeaders int, endOfStream bool) types.Action { config, err := ctx.plugin.GetMatchConfig() if err != nil { @@ -276,22 +312,32 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestBody(bodySize int, endOfStr if ctx.config == nil { return types.ActionContinue } - if ctx.plugin.vm.onHttpRequestBody == nil { - return types.ActionContinue - } if !ctx.needRequestBody { return types.ActionContinue } - ctx.requestBodySize += bodySize - if !endOfStream { - return types.ActionPause - } - body, err := proxywasm.GetHttpRequestBody(0, ctx.requestBodySize) - if err != nil { - ctx.plugin.vm.log.Warnf("get request body failed: %v", err) + if ctx.plugin.vm.onHttpStreamingRequestBody != nil && ctx.streamingRequestBody { + chunk, _ := proxywasm.GetHttpRequestBody(0, bodySize) + modifiedChunk := ctx.plugin.vm.onHttpStreamingRequestBody(ctx, *ctx.config, chunk, endOfStream, ctx.plugin.vm.log) + err := proxywasm.ReplaceHttpRequestBody(modifiedChunk) + if err != nil { + ctx.plugin.vm.log.Warnf("replace request body chunk failed: %v", err) + return types.ActionContinue + } return types.ActionContinue } - return ctx.plugin.vm.onHttpRequestBody(ctx, *ctx.config, body, ctx.plugin.vm.log) + if ctx.plugin.vm.onHttpRequestBody != nil { + ctx.requestBodySize += bodySize + if !endOfStream { + return types.ActionPause + } + body, err := proxywasm.GetHttpRequestBody(0, ctx.requestBodySize) + if err != nil { + ctx.plugin.vm.log.Warnf("get request body failed: %v", err) + return types.ActionContinue + } + return ctx.plugin.vm.onHttpRequestBody(ctx, *ctx.config, body, ctx.plugin.vm.log) + } + return types.ActionContinue } func (ctx *CommonHttpCtx[PluginConfig]) OnHttpResponseHeaders(numHeaders int, endOfStream bool) types.Action { @@ -312,22 +358,32 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpResponseBody(bodySize int, endOfSt if ctx.config == nil { return types.ActionContinue } - if ctx.plugin.vm.onHttpResponseBody == nil { - return types.ActionContinue - } if !ctx.needResponseBody { return types.ActionContinue } - ctx.responseBodySize += bodySize - if !endOfStream { - return types.ActionPause - } - body, err := proxywasm.GetHttpResponseBody(0, ctx.responseBodySize) - if err != nil { - ctx.plugin.vm.log.Warnf("get response body failed: %v", err) + if ctx.plugin.vm.onHttpStreamingResponseBody != nil && ctx.streamingResponseBody { + chunk, _ := proxywasm.GetHttpResponseBody(0, bodySize) + modifiedChunk := ctx.plugin.vm.onHttpStreamingResponseBody(ctx, *ctx.config, chunk, endOfStream, ctx.plugin.vm.log) + err := proxywasm.ReplaceHttpResponseBody(modifiedChunk) + if err != nil { + ctx.plugin.vm.log.Warnf("replace response body chunk failed: %v", err) + return types.ActionContinue + } return types.ActionContinue } - return ctx.plugin.vm.onHttpResponseBody(ctx, *ctx.config, body, ctx.plugin.vm.log) + if ctx.plugin.vm.onHttpResponseBody != nil { + ctx.responseBodySize += bodySize + if !endOfStream { + return types.ActionPause + } + body, err := proxywasm.GetHttpResponseBody(0, ctx.responseBodySize) + if err != nil { + ctx.plugin.vm.log.Warnf("get response body failed: %v", err) + return types.ActionContinue + } + return ctx.plugin.vm.onHttpResponseBody(ctx, *ctx.config, body, ctx.plugin.vm.log) + } + return types.ActionContinue } func (ctx *CommonHttpCtx[PluginConfig]) OnHttpStreamDone() {