mirror of
https://github.com/alibaba/higress.git
synced 2026-03-19 01:37:28 +08:00
feat: Wasm go sdk support process streaming body (#933)
This commit is contained in:
@@ -41,26 +41,33 @@ type HttpContext interface {
|
||||
DontReadRequestBody()
|
||||
// If the onHttpResponseBody handle is not set, the request body will not be read by default
|
||||
DontReadResponseBody()
|
||||
// If the onHttpStreamingRequestBody handle is not set, and the onHttpRequestBody handle is set, the request body will be buffered by default
|
||||
BufferRequestBody()
|
||||
// If the onHttpStreamingResponseBody handle is not set, and the onHttpResponseBody handle is set, the response body will be buffered by default
|
||||
BufferResponseBody()
|
||||
}
|
||||
|
||||
type ParseConfigFunc[PluginConfig any] func(json gjson.Result, config *PluginConfig, log Log) error
|
||||
type ParseRuleConfigFunc[PluginConfig any] func(json gjson.Result, global PluginConfig, config *PluginConfig, log Log) error
|
||||
type onHttpHeadersFunc[PluginConfig any] func(context HttpContext, config PluginConfig, log Log) types.Action
|
||||
type onHttpBodyFunc[PluginConfig any] func(context HttpContext, config PluginConfig, body []byte, log Log) types.Action
|
||||
type onHttpStreamingBodyFunc[PluginConfig any] func(context HttpContext, config PluginConfig, chunk []byte, isLastChunk bool, log Log) []byte
|
||||
type onHttpStreamDoneFunc[PluginConfig any] func(context HttpContext, config PluginConfig, log Log)
|
||||
|
||||
type CommonVmCtx[PluginConfig any] struct {
|
||||
types.DefaultVMContext
|
||||
pluginName string
|
||||
log Log
|
||||
hasCustomConfig bool
|
||||
parseConfig ParseConfigFunc[PluginConfig]
|
||||
parseRuleConfig ParseRuleConfigFunc[PluginConfig]
|
||||
onHttpRequestHeaders onHttpHeadersFunc[PluginConfig]
|
||||
onHttpRequestBody onHttpBodyFunc[PluginConfig]
|
||||
onHttpResponseHeaders onHttpHeadersFunc[PluginConfig]
|
||||
onHttpResponseBody onHttpBodyFunc[PluginConfig]
|
||||
onHttpStreamDone onHttpStreamDoneFunc[PluginConfig]
|
||||
pluginName string
|
||||
log Log
|
||||
hasCustomConfig bool
|
||||
parseConfig ParseConfigFunc[PluginConfig]
|
||||
parseRuleConfig ParseRuleConfigFunc[PluginConfig]
|
||||
onHttpRequestHeaders onHttpHeadersFunc[PluginConfig]
|
||||
onHttpRequestBody onHttpBodyFunc[PluginConfig]
|
||||
onHttpStreamingRequestBody onHttpStreamingBodyFunc[PluginConfig]
|
||||
onHttpResponseHeaders onHttpHeadersFunc[PluginConfig]
|
||||
onHttpResponseBody onHttpBodyFunc[PluginConfig]
|
||||
onHttpStreamingResponseBody onHttpStreamingBodyFunc[PluginConfig]
|
||||
onHttpStreamDone onHttpStreamDoneFunc[PluginConfig]
|
||||
}
|
||||
|
||||
func SetCtx[PluginConfig any](pluginName string, setFuncs ...SetPluginFunc[PluginConfig]) {
|
||||
@@ -94,6 +101,12 @@ func ProcessRequestBodyBy[PluginConfig any](f onHttpBodyFunc[PluginConfig]) SetP
|
||||
}
|
||||
}
|
||||
|
||||
func ProcessStreamingRequestBodyBy[PluginConfig any](f onHttpStreamingBodyFunc[PluginConfig]) SetPluginFunc[PluginConfig] {
|
||||
return func(ctx *CommonVmCtx[PluginConfig]) {
|
||||
ctx.onHttpStreamingRequestBody = f
|
||||
}
|
||||
}
|
||||
|
||||
func ProcessResponseHeadersBy[PluginConfig any](f onHttpHeadersFunc[PluginConfig]) SetPluginFunc[PluginConfig] {
|
||||
return func(ctx *CommonVmCtx[PluginConfig]) {
|
||||
ctx.onHttpResponseHeaders = f
|
||||
@@ -106,6 +119,12 @@ func ProcessResponseBodyBy[PluginConfig any](f onHttpBodyFunc[PluginConfig]) Set
|
||||
}
|
||||
}
|
||||
|
||||
func ProcessStreamingResponseBodyBy[PluginConfig any](f onHttpStreamingBodyFunc[PluginConfig]) SetPluginFunc[PluginConfig] {
|
||||
return func(ctx *CommonVmCtx[PluginConfig]) {
|
||||
ctx.onHttpStreamingResponseBody = f
|
||||
}
|
||||
}
|
||||
|
||||
func ProcessStreamDoneBy[PluginConfig any](f onHttpStreamDoneFunc[PluginConfig]) SetPluginFunc[PluginConfig] {
|
||||
return func(ctx *CommonVmCtx[PluginConfig]) {
|
||||
ctx.onHttpStreamDone = f
|
||||
@@ -195,25 +214,34 @@ func (ctx *CommonPluginCtx[PluginConfig]) NewHttpContext(contextID uint32) types
|
||||
contextID: contextID,
|
||||
userContext: map[string]interface{}{},
|
||||
}
|
||||
if ctx.vm.onHttpRequestBody != nil {
|
||||
if ctx.vm.onHttpRequestBody != nil || ctx.vm.onHttpStreamingRequestBody != nil {
|
||||
httpCtx.needRequestBody = true
|
||||
}
|
||||
if ctx.vm.onHttpResponseBody != nil {
|
||||
if ctx.vm.onHttpResponseBody != nil || ctx.vm.onHttpStreamingResponseBody != nil {
|
||||
httpCtx.needResponseBody = true
|
||||
}
|
||||
if ctx.vm.onHttpStreamingRequestBody != nil {
|
||||
httpCtx.streamingRequestBody = true
|
||||
}
|
||||
if ctx.vm.onHttpStreamingResponseBody != nil {
|
||||
httpCtx.streamingResponseBody = true
|
||||
}
|
||||
|
||||
return httpCtx
|
||||
}
|
||||
|
||||
type CommonHttpCtx[PluginConfig any] struct {
|
||||
types.DefaultHttpContext
|
||||
plugin *CommonPluginCtx[PluginConfig]
|
||||
config *PluginConfig
|
||||
needRequestBody bool
|
||||
needResponseBody bool
|
||||
requestBodySize int
|
||||
responseBodySize int
|
||||
contextID uint32
|
||||
userContext map[string]interface{}
|
||||
plugin *CommonPluginCtx[PluginConfig]
|
||||
config *PluginConfig
|
||||
needRequestBody bool
|
||||
needResponseBody bool
|
||||
streamingRequestBody bool
|
||||
streamingResponseBody bool
|
||||
requestBodySize int
|
||||
responseBodySize int
|
||||
contextID uint32
|
||||
userContext map[string]interface{}
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) SetContext(key string, value interface{}) {
|
||||
@@ -252,6 +280,14 @@ func (ctx *CommonHttpCtx[PluginConfig]) DontReadResponseBody() {
|
||||
ctx.needResponseBody = false
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) BufferRequestBody() {
|
||||
ctx.streamingRequestBody = false
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) BufferResponseBody() {
|
||||
ctx.streamingResponseBody = false
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestHeaders(numHeaders int, endOfStream bool) types.Action {
|
||||
config, err := ctx.plugin.GetMatchConfig()
|
||||
if err != nil {
|
||||
@@ -276,22 +312,32 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpRequestBody(bodySize int, endOfStr
|
||||
if ctx.config == nil {
|
||||
return types.ActionContinue
|
||||
}
|
||||
if ctx.plugin.vm.onHttpRequestBody == nil {
|
||||
return types.ActionContinue
|
||||
}
|
||||
if !ctx.needRequestBody {
|
||||
return types.ActionContinue
|
||||
}
|
||||
ctx.requestBodySize += bodySize
|
||||
if !endOfStream {
|
||||
return types.ActionPause
|
||||
}
|
||||
body, err := proxywasm.GetHttpRequestBody(0, ctx.requestBodySize)
|
||||
if err != nil {
|
||||
ctx.plugin.vm.log.Warnf("get request body failed: %v", err)
|
||||
if ctx.plugin.vm.onHttpStreamingRequestBody != nil && ctx.streamingRequestBody {
|
||||
chunk, _ := proxywasm.GetHttpRequestBody(0, bodySize)
|
||||
modifiedChunk := ctx.plugin.vm.onHttpStreamingRequestBody(ctx, *ctx.config, chunk, endOfStream, ctx.plugin.vm.log)
|
||||
err := proxywasm.ReplaceHttpRequestBody(modifiedChunk)
|
||||
if err != nil {
|
||||
ctx.plugin.vm.log.Warnf("replace request body chunk failed: %v", err)
|
||||
return types.ActionContinue
|
||||
}
|
||||
return types.ActionContinue
|
||||
}
|
||||
return ctx.plugin.vm.onHttpRequestBody(ctx, *ctx.config, body, ctx.plugin.vm.log)
|
||||
if ctx.plugin.vm.onHttpRequestBody != nil {
|
||||
ctx.requestBodySize += bodySize
|
||||
if !endOfStream {
|
||||
return types.ActionPause
|
||||
}
|
||||
body, err := proxywasm.GetHttpRequestBody(0, ctx.requestBodySize)
|
||||
if err != nil {
|
||||
ctx.plugin.vm.log.Warnf("get request body failed: %v", err)
|
||||
return types.ActionContinue
|
||||
}
|
||||
return ctx.plugin.vm.onHttpRequestBody(ctx, *ctx.config, body, ctx.plugin.vm.log)
|
||||
}
|
||||
return types.ActionContinue
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) OnHttpResponseHeaders(numHeaders int, endOfStream bool) types.Action {
|
||||
@@ -312,22 +358,32 @@ func (ctx *CommonHttpCtx[PluginConfig]) OnHttpResponseBody(bodySize int, endOfSt
|
||||
if ctx.config == nil {
|
||||
return types.ActionContinue
|
||||
}
|
||||
if ctx.plugin.vm.onHttpResponseBody == nil {
|
||||
return types.ActionContinue
|
||||
}
|
||||
if !ctx.needResponseBody {
|
||||
return types.ActionContinue
|
||||
}
|
||||
ctx.responseBodySize += bodySize
|
||||
if !endOfStream {
|
||||
return types.ActionPause
|
||||
}
|
||||
body, err := proxywasm.GetHttpResponseBody(0, ctx.responseBodySize)
|
||||
if err != nil {
|
||||
ctx.plugin.vm.log.Warnf("get response body failed: %v", err)
|
||||
if ctx.plugin.vm.onHttpStreamingResponseBody != nil && ctx.streamingResponseBody {
|
||||
chunk, _ := proxywasm.GetHttpResponseBody(0, bodySize)
|
||||
modifiedChunk := ctx.plugin.vm.onHttpStreamingResponseBody(ctx, *ctx.config, chunk, endOfStream, ctx.plugin.vm.log)
|
||||
err := proxywasm.ReplaceHttpResponseBody(modifiedChunk)
|
||||
if err != nil {
|
||||
ctx.plugin.vm.log.Warnf("replace response body chunk failed: %v", err)
|
||||
return types.ActionContinue
|
||||
}
|
||||
return types.ActionContinue
|
||||
}
|
||||
return ctx.plugin.vm.onHttpResponseBody(ctx, *ctx.config, body, ctx.plugin.vm.log)
|
||||
if ctx.plugin.vm.onHttpResponseBody != nil {
|
||||
ctx.responseBodySize += bodySize
|
||||
if !endOfStream {
|
||||
return types.ActionPause
|
||||
}
|
||||
body, err := proxywasm.GetHttpResponseBody(0, ctx.responseBodySize)
|
||||
if err != nil {
|
||||
ctx.plugin.vm.log.Warnf("get response body failed: %v", err)
|
||||
return types.ActionContinue
|
||||
}
|
||||
return ctx.plugin.vm.onHttpResponseBody(ctx, *ctx.config, body, ctx.plugin.vm.log)
|
||||
}
|
||||
return types.ActionContinue
|
||||
}
|
||||
|
||||
func (ctx *CommonHttpCtx[PluginConfig]) OnHttpStreamDone() {
|
||||
|
||||
Reference in New Issue
Block a user