diff --git a/plugins/wasm-go/extensions/api-workflow/Dockerfile b/plugins/wasm-go/extensions/api-workflow/Dockerfile new file mode 100644 index 000000000..9b084e059 --- /dev/null +++ b/plugins/wasm-go/extensions/api-workflow/Dockerfile @@ -0,0 +1,2 @@ +FROM scratch +COPY main.wasm plugin.wasm \ No newline at end of file diff --git a/plugins/wasm-go/extensions/api-workflow/README.md b/plugins/wasm-go/extensions/api-workflow/README.md new file mode 100644 index 000000000..9819b3688 --- /dev/null +++ b/plugins/wasm-go/extensions/api-workflow/README.md @@ -0,0 +1,384 @@ +--- +title: API 工作流 +keywords: [ API工作流 ] +description: API 工作流插件配置参考 +--- +## 功能说明 +`api工作流 `实现了可编排的API workflow 插件,支持根据配置定义生成DAG并执行工作流 + +## 配置说明 + +| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | 备注 | +|----------|--------|------| --- |--------|----| +| workflow | object | 必填 | | DAG的定义 | | +| env | object | 选填 | | 一些环境变量 | | + +`env`object的配置字段说明如下: + +| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | 备注 | +|----------|--------|------|------|-----------|--| +| timeout | int | 选填 | 5000 | 每次请求的过期时间 | 单位是毫秒(ms) | +| max_depth | int | 选填 | 100 | 工作流最大迭代次数 | | + + +`workflow`object的配置字段说明如下: + +| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | 备注 | +|-------|----------------------| ---- | --- |-----------|----| +| nodes | array of node object | 选填 | | DAG的定义的节点 | | +| edges | array of edge object | 必填 | | DAG的定义的边 | | + +`edge` object的配置字段说明如下: + +| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | +|-------------| ------ | ---- | --- |------------------------------------------------| +| source | string | 必填 | - | 上一步的操作,必须是定义的node的name,或者初始化工作流的start | +| target | string | 必填 | - | 当前的操作,必须是定义的node的name,或者结束工作流的关键字 end continue | | +| conditional | string | 选填 | - | 这一步是否执行的判断条件 | + +`node` object的配置字段说明如下: + +| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | 备注 | +| --------------- |------------------------------------|---| --- |-------------------------------|-------------------------------| +| name | string | 必填 | - | node名称 | 全局唯一 | +| service_name | string | 必填 | - | higress配置的服务名称 | | +| service_port | int | 选填 | 80 | higress配置的服务端口 | | +| service_domain | string | 选填 | | higress配置的服务domain | | +| service_path | string | 必填 | | 请求的path | | +| service_headers | array of header object | 选填 | | 请求的头 | | +| service_body_replace_keys| array of bodyReplaceKeyPair object | 选填| 请求body模板替换键值对 | 用来构造请求| 如果为空,则直接使用service_body_tmpl请求 | +| service_body_tmpl | string | 选填 | | 请求的body模板 | | +| service_method | string | 必填 | | 请求的方法 | GET,POST | + +`header` object 的配置字段说明如下: + +| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | 备注 | +|-------|------------------------|---| --- |-----------| --------- | +| key | string | 必填 | - | 头文件的key | | +| value | string | 必填 | - | 头文件的value | | + +`bodyReplaceKeyPair` object 配置说明 + +| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | 备注 | +|------|------------------------|---| --- |-----------|--| +| from | string | 必填 | - | 描述数据从哪获得 | | +| to | string | 必填 | - | 描述数据最后放到那 | | + + + +## 用法示例 + +我们把工作流抽象成DAG配置文件,加上控制流和数据流更方便的控制流程和构造请求。 + +![img](img/img.png) + + + +### DAG的定义 + +#### 边edge +描述操作如何编排 + +样例 +```yaml + edges: + - source: start + target: A + - source: start + target: B + - source: start + target: C + - source: A + target: D + - source: B + target: D + - source: C + target: D + - source: D + target: end + conditional: "gt {{D||check}} 0.9" + - source: D + target: E + conditional: "lt {{D||check}} 0.9" + - source: E + target: end +``` +#### 控制流 conditional 和 target +##### 分支 conditional +插件执行到conditional的定义不为空的步骤`edge`时,会根据表达式定义判断这步是否执行,如果判断为否,会跳过这个分支。 +表达式可使用参数,用{{xxx}}标注,具体定义见数据流`模板和变量` +支持比较表达式和例子如下: +`eq arg1 arg2`: arg1 == arg2时为true 不只是数字,支持string +`lt arg1 arg2`: arg1 < arg2时为true +`le arg1 arg2`: arg1 <= arg2时为true +`gt arg1 arg2`: arg1 > arg2时为true +`ge arg1 arg2`: arg1 >= arg2时为true +`and arg1 arg2`: arg1 && arg2 +`or arg1 arg2`: arg1 || arg2 +`contain arg1 arg2`: arg1 包含 arg2时为true +支持and 和 or的嵌套 比如 `and (eq 1 1) (or (contain hello hi) (lt 1 2))` + +##### 结束和执行工作流 target +当target为`name`,执行name的操作 +当target 为`end`,直接返回source的结果,结束工作流 +当target 为`continue`,结束工作流,将请求放行到下一个plugin + +#### 数据流 + +进入plugin的数据(request body),会根据构造模板json`node.service_body_tmpl`和`node.service_body_replace_keys`构造请求body,并把执行后结果存在key为`nodeName`的上下文里,只支持json格式的数据。 + +##### 模板和变量 +在工作流的配置文件中 +###### edge.conditional +配置文件的定义中,`edge.conditional` 支持模板和变量,方便根据数据流的数据来构建请求数据 +在模板里使用变量来代表数据和过滤。变量使用`{{str1||str2}}`包裹,使用`||`分隔,str1代表使用那个node的输出数据,str2代表如何取数据,过滤表达式基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串,`@all`代表全都要 + +例子 +```yaml +conditional: "lt {{D||check}} 0.9" +``` +node D 的返回值是 +```json +{"check": 0.99} +``` +解析后的表达式 `lt 0.99 0.9` + +###### node.service_body_tmpl 和 node.service_body_replace_keys +这组配置用来构造请求body,`node.service_body_tmpl`是模板json ,`node.service_body_replace_keys`用来描述如何填充模板json,是一个object的数组,from标识数据从哪里来,to表示填充的位置 +`from`是使用`str1||str2`的字符串,str1代表使用那个node的执行返回数据,str2代表如何取数据,表达式基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串 +`to`标识数据放哪,表达式基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法来描述填充位置,使用的是sjson来拼接json,填充到`tool.service_body_tmpl` 的模板json里 +当`node.service_body_replace_keys`为空时,代表直接发送`node.service_body_tmpl` + +例子 +```yaml + service_body_tmpl: + embeddings: + result: "" + msg: "" + sk: "sk-xxxxxx" + service_body_replace_keys: + - to "embeddings.result" + from "A||output.embeddings.0.embedding" + - to "msg" + from "B||@all" +``` +`A`节点的输出是 +```json +{"embeddings": {"output":{"embeddings":[{"embedding":[0.014398524595686043],"text_index":0}]},"usage":{"total_tokens":12},"request_id":"2a5229bc-53d9-91ca-bce2-00ae5e01a1d3"}} +``` +`B`节点的输出是 +```json +["higress项目主仓库的github地址是什么"] +``` +根据 service_body_tmpl 和 service_body_replace_keys 构造的request body如下 +```json +{"embeddings":{"result":"[0.014398524595686043,......]"},"msg":["higress项目主仓库的github地址是什么"],"sk":"sk-xxxxxx"} +``` + + + +### node的定义 + +具体执行的单元,封装了httpCall,提供http的访问能力,获取各种api的能力。request body支持自主构建。 + +样例 +```yaml + nodes: + - name: "A" + service_domain: "dashscope.aliyuncs.com" + service_name: "dashscope" + service_port: 443 + service_path: "/api/v1/services/embeddings/text-embedding/text-embedding" + service_method: "POST" + service_body_tmpl: + model: "text-embedding-v2" + input: + texts: "" + parameters: + text_type: "query" + service_body_replace_keys: + - from: "start||messages.#(role==user)#.content" + to: "input.texts" + service_headers: + - key: "Authorization" + value: "Bearer sk-b98f462xxxxxxxx" + - key: "Content-Type" + value: "application/json" +``` +这是请求官方 text-embedding-v2模型的请求样例 具体请求可以看 https://help.aliyun.com/zh/dashscope/developer-reference/text-embedding-api-details?spm=a2c22.12281978.0.0.4d596ea2lRn8xW +### 一个工作流的例子 +从三个节点ABC获取信息,等到数据都就位了,再执行D。 并根据D的输出判断是否需要执行E还是直接结束 +![dag.png](img/dag.png) +start的返回值(请求plugin的body) +```json +{ + "model":"qwen-7b-chat-xft", + "frequency_penalty":0, + "max_tokens":800, + "stream":false, + "messages": [{"role":"user","content":"higress项目主仓库的github地址是什么"}], + "presence_penalty":0,"temperature":0.7,"top_p":0.95 +} +``` +A的返回值是 +```json +{ + "output":{ + "embeddings": [ + { + "text_index": 0, + "embedding": [-0.006929283495992422,-0.005336422007530928] + }, + { + "text_index": 1, + "embedding": [-0.006929283495992422,-0.005336422007530928] + }, + { + "text_index": 2, + "embedding": [-0.006929283495992422,-0.005336422007530928] + }, + { + "text_index": 3, + "embedding": [-0.006929283495992422,-0.005336422007530928] + } + ] + }, + "usage":{ + "total_tokens":12 + }, + "request_id":"d89c06fb-46a1-47b6-acb9-bfb17f814969" +} +``` +B的返回值是 +```json +{"llm":"this is b"} +``` +C的返回值是 +```json +{ + "get": "this is c" +} +``` +D的返回值是 +```json +{"check": 0.99, "llm":{}} +``` +E的返回值是 +```json +{"save": "ok", "date":{}} +``` +这个工作流的配置文件如下: +```yaml +env: + max_depth: 100 + timeout: 3000 +workflow: + edges: + - source: start + target: A + - source: start + target: B + - source: start + target: C + - source: A + target: D + - source: B + target: D + - source: C + target: D + - source: D + target: end + conditional: "lt {{D||check}} 0.9" + - source: D + target: E + conditional: "gt {{D||check}} 0.9" + - source: E + target: end + nodes: + - name: "A" + service_domain: "dashscope.aliyuncs.com" + service_name: "dashscope" + service_port: 443 + service_path: "/api/v1/services/embeddings/text-embedding/text-embedding" + service_method: "POST" + service_body_tmpl: + model: "text-embedding-v2" + input: + texts: "" + parameters: + text_type: "query" + service_body_replace_keys: + - from: "start||messages.#(role==user)#.content" + to: "input.texts" + service_headers: + - key: "Authorization" + value: "Bearer sk-b98f462xxxxxxxx" + - key: "Content-Type" + value: "application/json" + - name: "B" + service_body_tmpl: + embeddings: "default" + msg: "default request body" + sk: "sk-xxxxxx" + service_body_replace_keys: + service_headers: + - key: "AK" + value: "ak-xxxxxxxxxxxxxxxxxxxx" + - key: "Content-Type" + value: "application/json" + service_method: "POST" + service_name: "whoai.static" + service_path: "/llm" + service_port: 80 + - name: "C" + service_method: "GET" + service_name: "whoai.static" + service_path: "/get" + service_port: 80 + - name: "D" + service_headers: + service_method: "POST" + service_name: "whoai.static" + service_path: "/check_cache" + service_port: 80 + service_body_tmpl: + A_result: "" + B_result: "" + C_result: "" + service_body_replace_keys: + - from: "A||output.embeddings.0.embedding.0" + to: "A_result" + - from: "B||llm" + to: "B_result" + - from: "C||get" + to: "C_result" + - name: "E" + service_method: "POST" + service_name: "whoai.static" + service_path: "/save_cache" + service_port: 80 + service_body_tmpl: + save: "" + service_body_replace_keys: + - from: "D||llm" + to: "save" +``` +执行请求 +```bash +curl -v '127.0.0.1:8080' -H 'Accept: application/json, text/event-stream' -H 'Content-Type: application/json'--data-raw '{"model":"qwen-7b-chat-xft","frequency_penalty":0,"max_tokens":800,"stream":false,"messages":[{"role":"user","content":"higress项目主仓库的github地址是什么"}],"presence_penalty":0,"temperature":0.7,"top_p":0.95}' +``` + +执行后的简略debug日志,可以看到工作流等到前置的ABC流程执行完毕后,根据返回值构建了D的body` {"A_result":0.007155838584362588,"B_result":"this is b","C_result":"this is c"}`;执行D后,根据D的返回值`{"check": 0.99, "llm":{}}`进行条件判断,最终继续执行了E`gt 0.99 0.9`,然后结束流程 +```bash +[api-workflow] workflow exec task,source is start,target is A, body is {"input":{"texts":["higress项目主仓库的github地址是什么"]},"model":"text-embedding-v2","parameters":{"text_type":"query"}},header is [[Authorization Bearer sk-b98f4628125xxxxxxxxxxxxxxxx] [Content-Type application/json]] +[api-workflow] workflow exec task,source is start,target is B, body is {"embeddings":"default","msg":"default request body","sk":"sk-xxxxxx"},header is [[AK ak-xxxxxxxxxxxxxxxxxxxx] [Content-Type application/json]] +[api-workflow] workflow exec task,source is start,target is C, body is ,header is [] +[api-workflow] source is B,target is D,stauts is map[A:0 B:0 C:0 D:2 E:1] +[api-workflow] source is C,target is D,stauts is map[A:0 B:0 C:0 D:1 E:1] +[api-workflow] source is A,target is D,stauts is map[A:0 B:0 C:0 D:0 E:1] +[api-workflow] workflow exec task,source is A,target is D, body is,header is [] +[api-workflow] source is D,target is end,workflow is pass +[api-workflow] source is D,target is E,stauts is map[A:0 B:0 C:0 D:0 E:0] +[api-workflow] workflow exec task,source is D,target is E, body is {"save":"{\"A_result\":0.007155838584362588,\"B_result\":\"this is b\",\"C_result\":\"this is c\"}"},header is [] +[api-workflow] source is E,target is end,workflow is end +``` diff --git a/plugins/wasm-go/extensions/api-workflow/go.mod b/plugins/wasm-go/extensions/api-workflow/go.mod new file mode 100644 index 000000000..c3073a8c8 --- /dev/null +++ b/plugins/wasm-go/extensions/api-workflow/go.mod @@ -0,0 +1,21 @@ +module api-workflow + +go 1.19 + +replace github.com/alibaba/higress/plugins/wasm-go => ../.. + +require ( + github.com/alibaba/higress/plugins/wasm-go v0.0.0 + github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240711023527-ba358c48772f + github.com/tidwall/gjson v1.14.3 + github.com/tidwall/sjson v1.2.5 +) + +require ( + github.com/google/uuid v1.3.0 // indirect + github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 // indirect + github.com/magefile/mage v1.14.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/resp v0.1.1 // indirect +) diff --git a/plugins/wasm-go/extensions/api-workflow/go.sum b/plugins/wasm-go/extensions/api-workflow/go.sum new file mode 100644 index 000000000..2995e01db --- /dev/null +++ b/plugins/wasm-go/extensions/api-workflow/go.sum @@ -0,0 +1,23 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 h1:IHDghbGQ2DTIXHBHxWfqCYQW1fKjyJ/I7W1pMyUDeEA= +github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520/go.mod h1:Nz8ORLaFiLWotg6GeKlJMhv8cci8mM43uEnLA5t8iew= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240711023527-ba358c48772f h1:ZIiIBRvIw62gA5MJhuwp1+2wWbqL9IGElQ499rUsYYg= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240711023527-ba358c48772f/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo= +github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= +github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw= +github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/resp v0.1.1 h1:Ly20wkhqKTmDUPlyM1S7pWo5kk0tDu8OoC/vFArXmwE= +github.com/tidwall/resp v0.1.1/go.mod h1:3/FrruOBAxPTPtundW0VXgmsQ4ZBA0Aw714lVYgwFa0= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/plugins/wasm-go/extensions/api-workflow/img/dag.png b/plugins/wasm-go/extensions/api-workflow/img/dag.png new file mode 100644 index 000000000..92a36a9f7 Binary files /dev/null and b/plugins/wasm-go/extensions/api-workflow/img/dag.png differ diff --git a/plugins/wasm-go/extensions/api-workflow/img/img.png b/plugins/wasm-go/extensions/api-workflow/img/img.png new file mode 100644 index 000000000..fcb2c659d Binary files /dev/null and b/plugins/wasm-go/extensions/api-workflow/img/img.png differ diff --git a/plugins/wasm-go/extensions/api-workflow/main.go b/plugins/wasm-go/extensions/api-workflow/main.go new file mode 100644 index 000000000..5a4254b8d --- /dev/null +++ b/plugins/wasm-go/extensions/api-workflow/main.go @@ -0,0 +1,307 @@ +// Copyright (c) 2022 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + ejson "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + + "api-workflow/utils" + . "api-workflow/workflow" + + "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" + "github.com/tidwall/gjson" +) + +const ( + DefaultMaxDepth uint32 = 100 + WorkflowExecStatus string = "workflowExecStatus" + DefaultTimeout uint32 = 5000 +) + +func main() { + wrapper.SetCtx( + "api-workflow", + wrapper.ParseConfigBy(parseConfig), + wrapper.ProcessRequestBodyBy(onHttpRequestBody), + ) +} + +func parseConfig(json gjson.Result, c *PluginConfig, log wrapper.Log) error { + + edges := make([]Edge, 0) + nodes := make(map[string]Node) + var err error + // env + env := json.Get("env") + // timeout + c.Env.Timeout = uint32(env.Get("timeout").Int()) + if c.Env.Timeout == 0 { + c.Env.Timeout = DefaultTimeout + } + // max_depth + c.Env.MaxDepth = uint32(env.Get("max_depth").Int()) + if c.Env.MaxDepth == 0 { + c.Env.MaxDepth = DefaultMaxDepth + } + // workflow + workflow := json.Get("workflow") + if !workflow.Exists() { + return errors.New("workflow is empty") + } + // workflow.edges + edges_ := workflow.Get("edges") + if edges_.Exists() && edges_.IsArray() { + for _, w := range edges_.Array() { + task := Task{} + edge := Edge{} + edge.Source = w.Get("source").String() + if edge.Source == "" { + return errors.New("source is empty") + } + edge.Target = w.Get("target").String() + if edge.Target == "" { + return errors.New("target is empty") + } + edge.Task = &task + + edge.Conditional = w.Get("conditional").String() + edges = append(edges, edge) + } + } + c.Workflow.Edges = edges + // workflow.nodes + nodes_ := workflow.Get("nodes") + if nodes_.Exists() && nodes_.IsArray() { + for _, value := range nodes_.Array() { + node := Node{} + node.Name = value.Get("name").String() + if node.Name == "" { + return errors.New("tool name is empty") + } + node.ServiceName = value.Get("service_name").String() + if node.ServiceName == "" { + return errors.New("tool service name is empty") + } + node.ServicePort = value.Get("service_port").Int() + if node.ServicePort == 0 { + if strings.HasSuffix(node.ServiceName, ".static") { + // use default logic port which is 80 for static service + node.ServicePort = 80 + } else { + return errors.New("tool service port is empty") + } + + } + node.ServiceDomain = value.Get("service_domain").String() + node.ServicePath = value.Get("service_path").String() + if node.ServicePath == "" { + node.ServicePath = "/" + } + node.ServiceMethod = value.Get("service_method").String() + if node.ServiceMethod == "" { + return errors.New("service_method is empty") + } + serviceHeaders := value.Get("service_headers") + if serviceHeaders.Exists() && serviceHeaders.IsArray() { + serviceHeaders_ := []ServiceHeader{} + err = ejson.Unmarshal([]byte(serviceHeaders.Raw), &serviceHeaders_) + node.ServiceHeaders = serviceHeaders_ + } + + node.ServiceBodyTmpl = value.Get("service_body_tmpl").String() + serviceBodyReplaceKeys := value.Get("service_body_replace_keys") + if serviceBodyReplaceKeys.Exists() && serviceBodyReplaceKeys.IsArray() { + serviceBodyReplaceKeys_ := []BodyReplaceKeyPair{} + err = ejson.Unmarshal([]byte(serviceBodyReplaceKeys.Raw), &serviceBodyReplaceKeys_) + node.ServiceBodyReplaceKeys = serviceBodyReplaceKeys_ + if err != nil { + return fmt.Errorf("unmarshal service body replace keys failed, err:%v", err) + } + } + + nodes[node.Name] = node + } + c.Workflow.Nodes = nodes + // workflow.WorkflowExecStatus + c.Workflow.WorkflowExecStatus, err = initWorkflowExecStatus(c) + log.Debugf("init status : %v", c.Workflow.WorkflowExecStatus) + if err != nil { + log.Errorf("init workflow exec status failed, err:%v", err) + return fmt.Errorf("init workflow exec status failed, err:%v", err) + } + } + log.Debugf("config : %v", c) + return nil +} + +func initWorkflowExecStatus(config *PluginConfig) (map[string]int, error) { + result := make(map[string]int) + + for name, _ := range config.Workflow.Nodes { + result[name] = 0 + } + for _, edge := range config.Workflow.Edges { + + if edge.Source == TaskStart || edge.Target == TaskContinue || edge.Target == TaskEnd { + continue + } + + count, ok := result[edge.Target] + if !ok { + return nil, fmt.Errorf("Target %s is not exist in nodes", edge.Target) + } + result[edge.Target] = count + 1 + + } + return result, nil +} + +func onHttpRequestBody(ctx wrapper.HttpContext, config PluginConfig, body []byte, log wrapper.Log) types.Action { + + initHeader := make([][2]string, 0) + // 初始化运行状态 + ctx.SetContext(WorkflowExecStatus, config.Workflow.WorkflowExecStatus) + + // 执行工作流 + for _, edge := range config.Workflow.Edges { + + if edge.Source == TaskStart { + ctx.SetContext(fmt.Sprintf("%s", TaskStart), body) + err := recursive(edge, initHeader, body, 1, config, log, ctx) + if err != nil { + // 工作流处理错误,返回500给用户 + log.Errorf("recursive failed: %v", err) + _ = utils.SendResponse(500, "api-workflow.recursive_failed", utils.MimeTypeTextPlain, fmt.Sprintf("workflow plugin recursive failed: %v", err)) + + } + } + } + + return types.ActionPause +} + +// 放入符合条件的edge +func recursive(edge Edge, headers [][2]string, body []byte, depth uint32, config PluginConfig, log wrapper.Log, ctx wrapper.HttpContext) error { + + var err error + // 防止递归次数太多 + if depth > config.Env.MaxDepth { + return fmt.Errorf("maximum recursion depth reached") + } + + // 判断是不是end + if edge.IsEnd() { + log.Debugf("source is %s,target is %s,workflow is end", edge.Source, edge.Target) + log.Debugf("body is %s", string(body)) + _ = proxywasm.SendHttpResponse(200, headers, body, -1) + return nil + } + // 判断是不是continue + if edge.IsContinue() { + log.Debugf("source is %s,target is %s,workflow is continue", edge.Source, edge.Target) + _ = proxywasm.ResumeHttpRequest() + return nil + } + + // 封装task + err = edge.WrapperTask(config, ctx) + if err != nil { + log.Errorf("workflow exec wrapperTask find error,source is %s,target is %s,error is %v ", edge.Source, edge.Target, err) + return fmt.Errorf("workflow exec wrapperTask find error,source is %s,target is %s,error is %v ", edge.Source, edge.Target, err) + } + + // 执行task + log.Debugf("workflow exec task,source is %s,target is %s, body is %s,header is %v", edge.Source, edge.Target, string(edge.Task.Body), edge.Task.Headers) + err = wrapper.HttpCall(edge.Task.Cluster, edge.Task.Method, edge.Task.ServicePath, edge.Task.Headers, edge.Task.Body, func(statusCode int, responseHeaders http.Header, responseBody []byte) { + log.Debugf("code:%d", statusCode) + // 判断response code + if statusCode < 400 { + + // 存入这轮返回的body + ctx.SetContext(fmt.Sprintf("%s", edge.Target), responseBody) + + headers_ := make([][2]string, len(responseHeaders)) + for key, value := range responseHeaders { + headers_ = append(headers_, [2]string{key, value[0]}) + } + // 判断是否进入下一步 + nextStatus := ctx.GetContext(WorkflowExecStatus).(map[string]int) + + // 进入下一步 + for _, next := range config.Workflow.Edges { + if next.Source == edge.Target { + // 更新workflow status + if next.Target != TaskContinue && next.Target != TaskEnd { + + nextStatus[next.Target] = nextStatus[next.Target] - 1 + log.Debugf("source is %s,target is %s,stauts is %v", next.Source, next.Target, nextStatus) + // 还有没执行完的边 + if nextStatus[next.Target] > 0 { + ctx.SetContext(WorkflowExecStatus, nextStatus) + return + } + // 执行出了问题 + if nextStatus[next.Target] < 0 { + log.Errorf("workflow exec status find error %v", nextStatus) + _ = utils.SendResponse(500, "api-workflow.exec_task_failed", utils.MimeTypeTextPlain, fmt.Sprintf("workflow exec status find error %v", nextStatus)) + return + } + } + // 判断是否执行 + isPass, err2 := next.IsPass(ctx) + if err2 != nil { + log.Errorf("check pass find error:%v", err2) + _ = utils.SendResponse(500, "api-workflow.task_check_paas_failed", utils.MimeTypeTextPlain, fmt.Sprintf("check pass find error:%v", err2)) + return + } + if isPass { + log.Debugf("source is %s,target is %s,workflow is pass ", next.Source, next.Target) + nextStatus = ctx.GetContext(WorkflowExecStatus).(map[string]int) + nextStatus[next.Target] = nextStatus[next.Target] - 1 + ctx.SetContext(WorkflowExecStatus, nextStatus) + continue + + } + + // 执行下一步 + err = recursive(next, headers_, responseBody, depth+1, config, log, ctx) + if err != nil { + log.Errorf("recursive error:%v", err) + _ = utils.SendResponse(500, "api-workflow.recursive_failed", utils.MimeTypeTextPlain, fmt.Sprintf("recursive error:%v", err)) + return + } + } + } + + } else { + // statusCode >= 400 ,task httpCall执行失败,放行请求,打印错误,结束workflow + log.Errorf("workflow exec task find error,code is %d,body is %s", statusCode, string(responseBody)) + _ = utils.SendResponse(500, "api-workflow.httpCall_failed", utils.MimeTypeTextPlain, fmt.Sprintf("workflow exec task find error,code is %d,body is %s", statusCode, string(responseBody))) + } + return + + }, config.Env.MaxDepth*config.Env.Timeout) + if err != nil { + log.Errorf("httpcall error:%v", err) + } + + return err +} diff --git a/plugins/wasm-go/extensions/api-workflow/utils/conditional.go b/plugins/wasm-go/extensions/api-workflow/utils/conditional.go new file mode 100644 index 000000000..03bf0c995 --- /dev/null +++ b/plugins/wasm-go/extensions/api-workflow/utils/conditional.go @@ -0,0 +1,116 @@ +package utils + +import ( + "fmt" + "regexp" + "strconv" + "strings" +) + +// 原子表达式描述: +// eq arg1 arg2: arg1 == arg2时为true +// ne arg1 arg2: arg1 != arg2时为true +// lt arg1 arg2: arg1 < arg2时为true +// le arg1 arg2: arg1 <= arg2时为true +// gt arg1 arg2: arg1 > arg2时为true +// ge arg1 arg2: arg1 >= arg2时为true +// and arg1 arg2: arg1 && arg2 +// or arg1 arg2: arg1 || arg2 +// contain arg1 arg2: arg1 包含 arg2时为true +var operators = map[string]interface{}{ + "eq": func(a, b interface{}) bool { + return fmt.Sprintf("%v", a) == fmt.Sprintf("%v", b) + }, + "ge": func(a, b float64) bool { return a >= b }, + "le": func(a, b float64) bool { return a <= b }, + "gt": func(a, b float64) bool { return a > b }, + "lt": func(a, b float64) bool { return a < b }, + "and": func(a, b bool) bool { return a && b }, + "or": func(a, b bool) bool { return a || b }, + "contain": func(a, b string) bool { return strings.Contains(a, b) }, +} + +// 执行判断条件 +func ExecConditionalStr(conditionalStr string) (bool, error) { + // 正则表达式匹配括号内的表达式 + re := regexp.MustCompile(`\(([^()]*)\)`) + matches := re.FindAllStringSubmatch(conditionalStr, -1) + // 找到最里面的(原子表达式) + for _, match := range matches { + subCondition := match[1] + result, err := ExecConditionalStr(subCondition) + if err != nil { + return false, err + } + // 用结果替换原子表达式 + conditionalStr = strings.ReplaceAll(conditionalStr, match[0], fmt.Sprintf("%t", result)) + } + + fields := strings.Fields(conditionalStr) + // 执行原子表达式 + if len(fields) == 3 { + compareFunc := operators[fields[0]] + switch fc := compareFunc.(type) { + default: + return false, fmt.Errorf("invalid conditional func %v", compareFunc) + case func(a, b float64) bool: + a, err := strconv.ParseFloat(fields[1], 64) + if err != nil { + return false, fmt.Errorf("invalid conditional str %s", conditionalStr) + } + b, err := strconv.ParseFloat(fields[2], 64) + if err != nil { + return false, fmt.Errorf("invalid conditional str %s", conditionalStr) + } + return fc(a, b), nil + case func(a, b bool) bool: + a, err := strconv.ParseBool(fields[1]) + if err != nil { + return false, fmt.Errorf("invalid conditional str %s", conditionalStr) + } + b, err := strconv.ParseBool(fields[2]) + if err != nil { + return false, fmt.Errorf("invalid conditional str %s", conditionalStr) + } + return fc(a, b), nil + case func(a, b string) bool: + a := fields[1] + b := fields[2] + return fc(a, b), nil + case func(a, b interface{}) bool: + a := fields[1] + b := fields[2] + return fc(a, b), nil + } + // 继续获取上一层的(原子表达式) + } else if strings.Contains(conditionalStr, "(") || strings.Contains(conditionalStr, ")") { + return ExecConditionalStr(conditionalStr) + // 原子表达式有问题,返回 + } else { + return false, fmt.Errorf("invalid conditional str %s", conditionalStr) + } + +} + +// 通过正则表达式寻找模板中的 {{foo}} 字符串foo +// 返回 {{foo}} : foo +func ParseTmplStr(tmpl string) map[string]string { + result := make(map[string]string) + re := regexp.MustCompile(`\{\{(.*?)\}\}`) + matches := re.FindAllStringSubmatch(tmpl, -1) + for _, match := range matches { + result[match[0]] = match[1] + } + return result +} + +// 使用kv替换模板中的字符 +// 例如 模板是`hello,{{foo}}` 使用{"{{foo}}":"bot"} 替换后为`hello,bot` +func ReplacedStr(tmpl string, kvs map[string]string) string { + + for k, v := range kvs { + tmpl = strings.Replace(tmpl, k, v, -1) + } + + return tmpl +} diff --git a/plugins/wasm-go/extensions/api-workflow/utils/conditional_test.go b/plugins/wasm-go/extensions/api-workflow/utils/conditional_test.go new file mode 100644 index 000000000..b167dc154 --- /dev/null +++ b/plugins/wasm-go/extensions/api-workflow/utils/conditional_test.go @@ -0,0 +1,100 @@ +package utils + +import ( + "reflect" + "testing" +) + +func TestExecConditionalStr(t *testing.T) { + + tests := []struct { + name string + args string + want bool + wantErr bool + }{ + {"eq int true", "eq 1 1", true, false}, + {"eq int false", "eq 1 2", false, false}, + {"eq str true", "eq foo foo", true, false}, + {"eq str false", "eq foo boo", false, false}, + {"eq float true", "eq 0.99 0.99", true, false}, + {"eq float false", "eq 1.1 2.2", false, false}, + {"eq float int false", "eq 1.0 1", false, false}, + {"eq float str false", "eq 1.0 foo", false, false}, + {"lt true", "lt 1.1 2", true, false}, + {"lt false", "lt 2 1", false, false}, + {"le true", "le 1 2", true, false}, + {"le false", "le 2 1", false, false}, + {"gt true", "gt 2 1", true, false}, + {"gt false", "gt 1 2", false, false}, + {"ge true", "ge 2 1", true, false}, + {"ge false", "ge 1 2", false, false}, + {"and true", "and true true", true, false}, + {"and false", "and true false", false, false}, + {"or true", "or true false", true, false}, + {"or false", "or false false", false, false}, + {"contain true", "contain helloworld world", true, false}, + {"contain false", "contain helloworld moon", false, false}, + {"invalid input", "invalid", false, true}, + {"nested expression 1", "and (eq 1 1) (lt 2 3)", true, false}, + {"nested expression 2", "or (eq 1 2) (and (eq 1 1) (gt 2 3))", false, false}, + {"nested expression error", "or (eq 1 2) (and (eq 1 1) (gt 2 3)))", false, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ExecConditionalStr(tt.args) + if (err != nil) != tt.wantErr { + t.Errorf("ExecConditionalStr() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("ExecConditionalStr() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestParseTmplStr(t *testing.T) { + type args struct { + tmpl string + } + tests := []struct { + name string + args string + want map[string]string + }{ + {"normal", "{{foo}}", map[string]string{"{{foo}}": "foo"}}, + {"single", "{foo}", map[string]string{}}, + {"empty", "foo", map[string]string{}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ParseTmplStr(tt.args) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ParseTmplStr() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestReplacedStr(t *testing.T) { + type args struct { + tmpl string + kvs map[string]string + } + tests := []struct { + name string + args args + want string + }{ + {"normal", args{tmpl: "hello,{{foo}}", kvs: map[string]string{"{{foo}}": "bot"}}, "hello,bot"}, + {"empty", args{tmpl: "hello,foo", kvs: map[string]string{"{{foo}}": "bot"}}, "hello,foo"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ReplacedStr(tt.args.tmpl, tt.args.kvs); got != tt.want { + t.Errorf("ReplacedStr() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/plugins/wasm-go/extensions/api-workflow/utils/http.go b/plugins/wasm-go/extensions/api-workflow/utils/http.go new file mode 100644 index 000000000..8a9ebb191 --- /dev/null +++ b/plugins/wasm-go/extensions/api-workflow/utils/http.go @@ -0,0 +1,45 @@ +package utils + +import "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" + +const ( + HeaderContentType = "Content-Type" + + MimeTypeTextPlain = "text/plain" + MimeTypeApplicationJson = "application/json" +) + +func SendResponse(statusCode uint32, statusCodeDetails string, contentType, body string) error { + return proxywasm.SendHttpResponseWithDetail(statusCode, statusCodeDetails, CreateHeaders(HeaderContentType, contentType), []byte(body), -1) +} + +func CreateHeaders(kvs ...string) [][2]string { + headers := make([][2]string, 0, len(kvs)/2) + for i := 0; i < len(kvs); i += 2 { + headers = append(headers, [2]string{kvs[i], kvs[i+1]}) + } + return headers +} + +func OverwriteRequestHost(host string) error { + if originHost, err := proxywasm.GetHttpRequestHeader(":authority"); err == nil { + _ = proxywasm.ReplaceHttpRequestHeader("X-ENVOY-ORIGINAL-HOST", originHost) + } + return proxywasm.ReplaceHttpRequestHeader(":authority", host) +} + +func OverwriteRequestPath(path string) error { + if originPath, err := proxywasm.GetHttpRequestHeader(":path"); err == nil { + _ = proxywasm.ReplaceHttpRequestHeader("X-ENVOY-ORIGINAL-PATH", originPath) + } + return proxywasm.ReplaceHttpRequestHeader(":path", path) +} + +func OverwriteRequestAuthorization(credential string) error { + if exist, _ := proxywasm.GetHttpRequestHeader("X-HI-ORIGINAL-AUTH"); exist == "" { + if originAuth, err := proxywasm.GetHttpRequestHeader("Authorization"); err == nil { + _ = proxywasm.AddHttpRequestHeader("X-HI-ORIGINAL-AUTH", originAuth) + } + } + return proxywasm.ReplaceHttpRequestHeader("Authorization", credential) +} diff --git a/plugins/wasm-go/extensions/api-workflow/utils/tools.go b/plugins/wasm-go/extensions/api-workflow/utils/tools.go new file mode 100644 index 000000000..2f62e82e9 --- /dev/null +++ b/plugins/wasm-go/extensions/api-workflow/utils/tools.go @@ -0,0 +1,7 @@ +package utils + +import "strings" + +func TrimQuote(source string) string { + return strings.Trim(source, `"`) +} diff --git a/plugins/wasm-go/extensions/api-workflow/workflow/workflow.go b/plugins/wasm-go/extensions/api-workflow/workflow/workflow.go new file mode 100644 index 000000000..3b827c68e --- /dev/null +++ b/plugins/wasm-go/extensions/api-workflow/workflow/workflow.go @@ -0,0 +1,325 @@ +package workflow + +import ( + "fmt" + "strings" + + "api-workflow/utils" + + "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" +) + +const ( + TaskTypeHTTP string = "http" + TaskStart string = "start" + TaskEnd string = "end" + TaskContinue string = "continue" + UseContextFlag string = "||" + AllFlag string = "@all" +) + +type PluginConfig struct { + + // @Title zh-CN 工作流 + // @Description zh-CN 工作流的具体描述 + Workflow Workflow `json:"workflow" yaml:"workflow"` + // @Title zh-CN 环境变量 + // @Description zh-CN 用来定义整个工作流的环境变量 + Env Env `json:"env" yaml:"env"` +} + +type Env struct { + // @Title zh-CN 超时时间 + // @Description zh-CN 用来定义工作流的超时时间,单位是毫秒 + Timeout uint32 `json:"timeout" yaml:"timeout"` + // @Title zh-CN 最大迭代深度 + // @Description zh-CN 用来定义工作流最大的迭代深度,默认是100 + MaxDepth uint32 `json:"max_depth" yaml:"max_depth"` +} +type Workflow struct { + // @Title zh-CN 边的列表 + // @Description zh-CN 边的列表 + Edges []Edge `json:"edges" yaml:"edges"` + // @Title zh-CN 节点的列表 + // @Description zh-CN 节点的列表 + Nodes map[string]Node `json:"nodes" yaml:"nodes"` + // @Title zh-CN 工作流的状态 + // @Description zh-CN 工作流的执行状态,用于记录node之间的相互依赖和执行情况 + WorkflowExecStatus map[string]int `json:"-" yaml:"-"` +} + +type Edge struct { + // @Title zh-CN 上一步节点 + // @Description zh-CN 上一步节点,必须是定义node的name,或者初始化工作流的start + Source string `json:"source" yaml:"source"` + // @Title zh-CN 当前执行的节点 + // @Description zh-CN 当前执行节点,必须是定义的node的name,或者结束工作流的关键字 end continue + Target string `json:"target" yaml:"target"` + // @Title zh-CN 执行操作 + // @Description zh-CN 执行单元,里面实时封装需要的数据 + Task *Task + // @Title zh-CN 判断表达式 + // @Description zh-CN 是否执行下一步的判断条件 + Conditional string `json:"conditional" yaml:"conditional"` +} + +type Task struct { + Cluster wrapper.Cluster `json:"-" yaml:"-"` + ServicePath string `json:"service_path" yaml:"service_path"` + ServicePort int64 `json:"service_port" yaml:"service_port"` + ServiceKey string `json:"service_key" yaml:"service_key"` + Body []byte `json:"-" yaml:"-"` + Headers [][2]string `json:"headers" yaml:"headers"` + Method string `json:"method" yaml:"method"` + TaskType string `json:"task_type" yaml:"task_type"` +} + +type Node struct { + // @Title zh-CN 节点名称 + // @Description zh-CN 节点名称全局唯一 + Name string `json:"name" yaml:"name"` + // @Title zh-CN 服务名称 + // @Description zh-CN 带服务类型的完整名称,例如 my.dns or foo.static + ServiceName string `json:"service_name" yaml:"service_name"` + // @Title zh-CN 服务端口 + // @Description zh-CN static类型默认是80 + ServicePort int64 `json:"service_port" yaml:"service_port"` + // @Title zh-CN 服务域名 + // @Description zh-CN 服务域名,例如 dashscope.aliyuncs.com + ServiceDomain string `json:"service_domain" yaml:"service_domain"` + // @Title zh-CN http访问路径 + // @Description zh-CN http访问路径,默认是 / + ServicePath string `json:"service_path" yaml:"service_path"` + // @Title zh-CN http 方法 + // @Description zh-CN http方法,支持所有可用方法 GET,POST等 + ServiceMethod string `json:"service_method" yaml:"service_method"` + // @Title zh-CN http 请求头文件 + // @Description zh-CN 请求头文件 + ServiceHeaders []ServiceHeader `json:"service_headers" yaml:"service_headers"` + // @Title zh-CN http 请求body模板 + // @Description zh-CN 请求body模板,用来构造请求 + ServiceBodyTmpl string `json:"service_body_tmpl" yaml:"service_body_tmpl"` + // @Title zh-CN http 请求body模板替换键值对 + // @Description zh-CN 请求body模板替换键值对,用来构造请求。to表示填充的位置,from表示数据从哪里, + // 标识表达式基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串 + ServiceBodyReplaceKeys []BodyReplaceKeyPair `json:"service_body_replace_keys" yaml:"service_body_replace_keys"` +} +type BodyReplaceKeyPair struct { + // @Title zh-CN from表示数据从哪里, + // @Description zh-CN from表示数据从哪里 + // 标识表达式基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串 + From string `json:"from" yaml:"from"` + // @Title zh-CN to表示填充的位置 + // @Description zh-CN to表示填充的位置, + // 标识表达式基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串 + To string `json:"to" yaml:"to"` +} +type ServiceHeader struct { + Key string `json:"key" yaml:"key"` + Value string `json:"value" yaml:"value"` +} + +func (w *Edge) IsEnd() bool { + if w.Target == TaskEnd { + return true + } + return false +} +func (w *Edge) IsContinue() bool { + if w.Target == TaskContinue { + return true + } + return false +} +func (e *Edge) IsPass(ctx wrapper.HttpContext) (bool, error) { + // 执行判断Conditional + if e.Conditional != "" { + + var err error + // 获取模板里的表达式 + + e.Conditional, err = e.WrapperDataByTmplStr(e.Conditional, ctx) + if err != nil { + return false, fmt.Errorf("workflow WrapperDateByTmplStr %s failed: %v", e.Conditional, err) + } + ok, err := e.ExecConditional() + if err != nil { + + return false, fmt.Errorf("wl exec conditional %s failed: %v", e.Conditional, err) + } + return !ok, nil + + } + return false, nil +} + +func (w *Edge) WrapperTask(config PluginConfig, ctx wrapper.HttpContext) error { + + // 判断 node 是否存在 + node, isTool := config.Workflow.Nodes[w.Target] + + if isTool { + w.Task.TaskType = TaskTypeHTTP + } else { + return fmt.Errorf("do not find target :%s", w.Target) + } + + switch w.Task.TaskType { + default: + return fmt.Errorf("unknown node type :%s", w.Task.TaskType) + case TaskTypeHTTP: + err := w.wrapperNodeTask(node, ctx) + if err != nil { + return err + } + + } + return nil + +} + +func (w *Edge) wrapperBody(requestBodyTemplate string, keyPairs []BodyReplaceKeyPair, ctx wrapper.HttpContext) error { + + requestBody, err := w.WrapperDataByTmplStrAndKeys(requestBodyTemplate, keyPairs, ctx) + if err != nil { + return fmt.Errorf("wrapper date by tmpl str is %s ,find err: %v", requestBodyTemplate, err) + } + + w.Task.Body = requestBody + return nil +} + +func (w *Edge) wrapperNodeTask(node Node, ctx wrapper.HttpContext) error { + // 封装cluster + w.Task.Cluster = wrapper.FQDNCluster{ + Host: node.ServiceDomain, + FQDN: node.ServiceName, + Port: node.ServicePort, + } + + // 封装请求body + err := w.wrapperBody(node.ServiceBodyTmpl, node.ServiceBodyReplaceKeys, ctx) + if err != nil { + return fmt.Errorf("wrapper body parse failed: %v", err) + } + + // 封装请求Method path headers + w.Task.Method = node.ServiceMethod + w.Task.ServicePath = node.ServicePath + w.Task.Headers = make([][2]string, 0) + if len(node.ServiceHeaders) > 0 { + for _, header := range node.ServiceHeaders { + w.Task.Headers = append(w.Task.Headers, [2]string{header.Key, header.Value}) + } + } + + return nil +} + +// 利用模板和替换键值对构造请求,使用`||`分隔,str1代表使用node是执行结果。tr2代表如何取数据,使用gjson的表达式,`@all`代表全都要 +func (w *Edge) WrapperDataByTmplStrAndKeys(tmpl string, keyPairs []BodyReplaceKeyPair, ctx wrapper.HttpContext) ([]byte, error) { + var err error + // 不需要替换 node.service_body_replace_keys 为空 + if len(keyPairs) == 0 { + return []byte(tmpl), nil + } + + for _, keyPair := range keyPairs { + + jsonPath := keyPair.From + target := keyPair.To + var contextValueRaw []byte + // 获取上下文数据 + if strings.Contains(jsonPath, UseContextFlag) { + pathStr := strings.Split(jsonPath, UseContextFlag) + if len(pathStr) == 2 { + contextKey := pathStr[0] + contextBody := ctx.GetContext(contextKey) + if contextValue, ok := contextBody.([]byte); ok { + contextValueRaw = contextValue + jsonPath = pathStr[1] + } else { + return nil, fmt.Errorf("context value is not []byte,key is %s", contextKey) + } + } + } + + // 执行封装 , `@all`代表全都要 + requestBody := gjson.ParseBytes(contextValueRaw) + if jsonPath == AllFlag { + + tmpl, err = sjson.SetRaw(tmpl, target, requestBody.Raw) + if err != nil { + return nil, fmt.Errorf("wrapper body parse failed: %v", err) + } + continue + } + requestBodyJson := requestBody.Get(jsonPath) + if requestBodyJson.Exists() { + tmpl, err = sjson.SetRaw(tmpl, target, requestBodyJson.Raw) + if err != nil { + return nil, fmt.Errorf("wrapper body parse failed: %v", err) + } + + } else { + return nil, fmt.Errorf("wrapper body parse failed: not exists %s", jsonPath) + } + } + return []byte(tmpl), nil + +} + +// 变量使用`{{str1||str2}}`包裹,使用`||`分隔,str1代表使用node是执行结果。tr2代表如何取数据,使用gjson的表达式,`@all`代表全都要 +func (w *Edge) WrapperDataByTmplStr(tmpl string, ctx wrapper.HttpContext) (string, error) { + var body []byte + // 获取模板里的表达式 + TmplKeyAndPath := utils.ParseTmplStr(tmpl) + if len(TmplKeyAndPath) == 0 { + return tmpl, nil + } + // 解析表达式 { "{{str1||str2}}":"str1||str2" } + for k, path := range TmplKeyAndPath { + // 变量使用`{{str1||str2}}`包裹,使用`||`分隔,str1代表使用前面命名为name的数据()。 + if strings.Contains(path, UseContextFlag) { + pathStr := strings.Split(path, UseContextFlag) + if len(pathStr) == 2 { + contextKey := pathStr[0] + contextBody := ctx.GetContext(contextKey) + if contextValue, ok := contextBody.([]byte); ok { + body = contextValue + path = pathStr[1] + } else { + return tmpl, fmt.Errorf("context value is not []byte,key is %s", contextKey) + } + } + // 执行封装 , `@all`代表全都要 + requestBody := gjson.ParseBytes(body) + if path == AllFlag { + tmpl = strings.Replace(tmpl, k, utils.TrimQuote(requestBody.Raw), -1) + continue + } + requestBodyJson := requestBody.Get(path) + if requestBodyJson.Exists() { + tmpl = utils.ReplacedStr(tmpl, map[string]string{k: utils.TrimQuote(requestBodyJson.Raw)}) + } else { + return tmpl, fmt.Errorf("use path {{%s}} get value is not exists,json is:%s", path, requestBody.Raw) + } + } else { + return "", fmt.Errorf("tmpl parse find error: || is not exists %s", path) + } + + } + return tmpl, nil +} + +func (w *Edge) ExecConditional() (bool, error) { + + ConditionalResult, err := utils.ExecConditionalStr(w.Conditional) + if err != nil { + return false, fmt.Errorf("exec conditional failed: %v", err) + } + return ConditionalResult, nil + +}