diff --git a/pkg/cmd/hgctl/plugin/init/init.go b/pkg/cmd/hgctl/plugin/init/init.go index b54dcb2d9..4946d1a4c 100644 --- a/pkg/cmd/hgctl/plugin/init/init.go +++ b/pkg/cmd/hgctl/plugin/init/init.go @@ -18,6 +18,7 @@ import ( "fmt" "io" "os" + "os/exec" "github.com/alibaba/higress/pkg/cmd/hgctl/plugin/option" "github.com/alibaba/higress/pkg/cmd/hgctl/plugin/utils" @@ -86,6 +87,12 @@ func runInit(w io.Writer, target string) (err error) { return errors.Wrap(err, "failed to create option.yaml") } + cmd := exec.Command("go", "mod", "tidy") + cmd.Dir = dir + if err := cmd.Run(); err != nil { + return errors.Wrap(err, "failed to run go mod tidy") + } + fmt.Fprintf(w, "Initialized the project in %q\n", dir) return nil diff --git a/pkg/cmd/hgctl/plugin/init/templates.go b/pkg/cmd/hgctl/plugin/init/templates.go index 9957e527b..df75d9b80 100644 --- a/pkg/cmd/hgctl/plugin/init/templates.go +++ b/pkg/cmd/hgctl/plugin/init/templates.go @@ -31,8 +31,8 @@ package main import ( "github.com/tidwall/gjson" - "github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm" - "github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm/types" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" ) @@ -93,8 +93,8 @@ module {{ .Name }} go 1.19 require ( - github.com/alibaba/higress/plugins/wasm-go v0.0.0-20231019123123-86b223bc75f1 - github.com/tetratelabs/proxy-wasm-go-sdk v0.22.0 + github.com/alibaba/higress/plugins/wasm-go main + github.com/higress-group/proxy-wasm-go-sdk main github.com/tidwall/gjson v1.14.3 ) ` diff --git a/plugins/wasm-go/extensions/ai-cache/.gitignore b/plugins/wasm-go/extensions/ai-cache/.gitignore new file mode 100644 index 000000000..47db8eedb --- /dev/null +++ b/plugins/wasm-go/extensions/ai-cache/.gitignore @@ -0,0 +1,19 @@ +# File generated by hgctl. Modify as required. + +* + +!/.gitignore + +!*.go +!go.sum +!go.mod + +!LICENSE +!*.md +!*.yaml +!*.yml + +!*/ + +/out +/test diff --git a/plugins/wasm-go/extensions/ai-cache/README.md b/plugins/wasm-go/extensions/ai-cache/README.md new file mode 100644 index 000000000..661981535 --- /dev/null +++ b/plugins/wasm-go/extensions/ai-cache/README.md @@ -0,0 +1,34 @@ +## 简介 + +**Note** + +> 需要数据面的proxy wasm版本大于等于0.2.100 + +> 编译时,需要带上版本的tag,例如:`tinygo build -o main.wasm -scheduler=none -target=wasi -gc=custom -tags="custommalloc nottinygc_finalizer proxy_wasm_version_0_2_100" ./` + +LLM 结果缓存插件,默认配置方式可以直接用于 openai 协议的结果缓存,同时支持流式和非流式响应的缓存。 + +## 配置说明 + +| Name | Type | Requirement | Default | Description | +| -------- | -------- | -------- | -------- | -------- | +| cacheKeyFrom.requestBody | string | optional | "messages.@reverse.0.content" | 从请求 Body 中基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串 | +| cacheValueFrom.responseBody | string | optional | "choices.0.message.content" | 从响应 Body 中基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串 | +| cacheStreamValueFrom.responseBody | string | optional | "choices.0.delta.content" | 从流式响应 Body 中基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串 | +| cacheKeyPrefix | string | optional | "higress-ai-cache:" | Redis缓存Key的前缀 | +| cacheTTL | integer | optional | 0 | 缓存的过期时间,单位是秒,默认值为0,即永不过期 | +| redis.serviceName | string | requried | - | redis 服务名称,带服务类型的完整 FQDN 名称,例如 my-redis.dns、redis.my-ns.svc.cluster.local | +| redis.servicePort | integer | optional | 6379 | redis 服务端口 | +| redis.timeout | integer | optional | 1000 | 请求 redis 的超时时间,单位为毫秒 | +| redis.username | string | optional | - | 登陆 redis 的用户名 | +| redis.password | string | optional | - | 登陆 redis 的密码 | +| returnResponseTemplate | string | optional | `{"id":"from-cache","choices":[%s],"model":"gpt-4o","object":"chat.completion","usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}}` | 返回 HTTP 响应的模版,用 %s 标记需要被 cache value 替换的部分 | +| returnStreamResponseTemplate | string | optional | `data:{"id":"from-cache","choices":[{"index":0,"delta":{"role":"assistant","content":"%s"},"finish_reason":"stop"}],"model":"gpt-4o","object":"chat.completion","usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}}\n\ndata:[DONE]\n\n` | 返回流式 HTTP 响应的模版,用 %s 标记需要被 cache value 替换的部分 | + +## 配置示例 + +```yaml +redis: + serviceName: my-redis.dns + timeout: 2000 +``` diff --git a/plugins/wasm-go/extensions/ai-cache/go.mod b/plugins/wasm-go/extensions/ai-cache/go.mod new file mode 100644 index 000000000..b8bff5d12 --- /dev/null +++ b/plugins/wasm-go/extensions/ai-cache/go.mod @@ -0,0 +1,23 @@ +// File generated by hgctl. Modify as required. + +module github.com/alibaba/higress/plugins/wasm-go/extensions/ai-cache + +go 1.19 + +replace github.com/alibaba/higress/plugins/wasm-go => ../.. + +require ( + github.com/alibaba/higress/plugins/wasm-go v1.3.6-0.20240528060522-53bccf89f441 + github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc + github.com/tidwall/gjson v1.14.3 + github.com/tidwall/resp v0.1.1 + github.com/tidwall/sjson v1.2.5 +) + +require ( + github.com/google/uuid v1.3.0 // indirect + github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 // indirect + github.com/magefile/mage v1.14.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect +) diff --git a/plugins/wasm-go/extensions/ai-cache/go.sum b/plugins/wasm-go/extensions/ai-cache/go.sum new file mode 100644 index 000000000..269363f07 --- /dev/null +++ b/plugins/wasm-go/extensions/ai-cache/go.sum @@ -0,0 +1,23 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 h1:IHDghbGQ2DTIXHBHxWfqCYQW1fKjyJ/I7W1pMyUDeEA= +github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520/go.mod h1:Nz8ORLaFiLWotg6GeKlJMhv8cci8mM43uEnLA5t8iew= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc h1:t2AT8zb6N/59Y78lyRWedVoVWHNRSCBh0oWCC+bluTQ= +github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo= +github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= +github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw= +github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/resp v0.1.1 h1:Ly20wkhqKTmDUPlyM1S7pWo5kk0tDu8OoC/vFArXmwE= +github.com/tidwall/resp v0.1.1/go.mod h1:3/FrruOBAxPTPtundW0VXgmsQ4ZBA0Aw714lVYgwFa0= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/plugins/wasm-go/extensions/ai-cache/main.go b/plugins/wasm-go/extensions/ai-cache/main.go new file mode 100644 index 000000000..9e5ae207b --- /dev/null +++ b/plugins/wasm-go/extensions/ai-cache/main.go @@ -0,0 +1,361 @@ +// File generated by hgctl. Modify as required. +// See: https://higress.io/zh-cn/docs/user/wasm-go#2-%E7%BC%96%E5%86%99-maingo-%E6%96%87%E4%BB%B6 + +package main + +import ( + "errors" + "fmt" + "strings" + + "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" + "github.com/tidwall/gjson" + "github.com/tidwall/resp" +) + +const ( + CacheKeyContextKey = "cacheKey" + CacheContentContextKey = "cacheContent" + PartialMessageContextKey = "partialMessage" + ToolCallsContextKey = "toolCalls" + StreamContextKey = "stream" + DefaultCacheKeyPrefix = "higress-ai-cache:" +) + +func main() { + wrapper.SetCtx( + "ai-cache", + wrapper.ParseConfigBy(parseConfig), + wrapper.ProcessRequestHeadersBy(onHttpRequestHeaders), + wrapper.ProcessRequestBodyBy(onHttpRequestBody), + wrapper.ProcessStreamingResponseBodyBy(onHttpResponseBody), + ) +} + +// @Name ai-cache +// @Category protocol +// @Phase AUTHN +// @Priority 10 +// @Title zh-CN AI Cache +// @Description zh-CN 大模型结果缓存 +// @IconUrl +// @Version 0.1.0 +// +// @Contact.name johnlanni +// @Contact.url +// @Contact.email +// +// @Example +// redis: +// serviceName: my-redis.dns +// timeout: 2000 +// cacheKeyFrom: +// requestBody: "messages.@reverse.0.content" +// cacheValueFrom: +// responseBody: "choices.0" +// returnResponseTemplate: | +// {"id":"from-cache","choices":[%s],"model":"gpt-4o","object":"chat.completion","usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}} +// @End + +type RedisInfo struct { + // @Title zh-CN redis 服务名称 + // @Description zh-CN 带服务类型的完整 FQDN 名称,例如 my-redis.dns、redis.my-ns.svc.cluster.local + ServiceName string `required:"true" yaml:"serviceName" json:"serviceName"` + // @Title zh-CN redis 服务端口 + // @Description zh-CN 默认值为6379 + ServicePort int `required:"false" yaml:"servicePort" json:"servicePort"` + // @Title zh-CN 用户名 + // @Description zh-CN 登陆 redis 的用户名,非必填 + Username string `required:"false" yaml:"username" json:"username"` + // @Title zh-CN 密码 + // @Description zh-CN 登陆 redis 的密码,非必填,可以只填密码 + Password string `required:"false" yaml:"password" json:"password"` + // @Title zh-CN 请求超时 + // @Description zh-CN 请求 redis 的超时时间,单位为毫秒。默认值是1000,即1秒 + Timeout int `required:"false" yaml:"timeout" json:"timeout"` +} + +type KVExtractor struct { + // @Title zh-CN 从请求 Body 中基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串 + RequestBody string `required:"false" yaml:"requestBody" json:"requestBody"` + // @Title zh-CN 从响应 Body 中基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串 + ResponseBody string `required:"false" yaml:"responseBody" json:"responseBody"` +} + +type PluginConfig struct { + // @Title zh-CN Redis 地址信息 + // @Description zh-CN 用于存储缓存结果的 Redis 地址 + RedisInfo RedisInfo `required:"true" yaml:"redis" json:"redis"` + // @Title zh-CN 缓存 key 的来源 + // @Description zh-CN 往 redis 里存时,使用的 key 的提取方式 + CacheKeyFrom KVExtractor `required:"true" yaml:"cacheKeyFrom" json:"cacheKeyFrom"` + // @Title zh-CN 缓存 value 的来源 + // @Description zh-CN 往 redis 里存时,使用的 value 的提取方式 + CacheValueFrom KVExtractor `required:"true" yaml:"cacheValueFrom" json:"cacheValueFrom"` + // @Title zh-CN 流式响应下,缓存 value 的来源 + // @Description zh-CN 往 redis 里存时,使用的 value 的提取方式 + CacheStreamValueFrom KVExtractor `required:"true" yaml:"cacheStreamValueFrom" json:"cacheStreamValueFrom"` + // @Title zh-CN 返回 HTTP 响应的模版 + // @Description zh-CN 用 %s 标记需要被 cache value 替换的部分 + ReturnResponseTemplate string `required:"true" yaml:"returnResponseTemplate" json:"returnResponseTemplate"` + // @Title zh-CN 返回流式 HTTP 响应的模版 + // @Description zh-CN 用 %s 标记需要被 cache value 替换的部分 + ReturnStreamResponseTemplate string `required:"true" yaml:"returnStreamResponseTemplate" json:"returnStreamResponseTemplate"` + // @Title zh-CN 缓存的过期时间 + // @Description zh-CN 单位是秒,默认值为0,即永不过期 + CacheTTL int `required:"false" yaml:"cacheTTL" json:"cacheTTL"` + // @Title zh-CN Redis缓存Key的前缀 + // @Description zh-CN 默认值是"higress-ai-cache:" + CacheKeyPrefix string `required:"false" yaml:"cacheKeyPrefix" json:"cacheKeyPrefix"` + redisClient wrapper.RedisClient `yaml:"-" json:"-"` +} + +func parseConfig(json gjson.Result, c *PluginConfig, log wrapper.Log) error { + c.RedisInfo.ServiceName = json.Get("redis.serviceName").String() + if c.RedisInfo.ServiceName == "" { + return errors.New("redis service name must not by empty") + } + c.RedisInfo.ServicePort = int(json.Get("redis.servicePort").Int()) + if c.RedisInfo.ServicePort == 0 { + if strings.HasSuffix(c.RedisInfo.ServiceName, ".static") { + // use default logic port which is 80 for static service + c.RedisInfo.ServicePort = 80 + } else { + c.RedisInfo.ServicePort = 6379 + } + } + c.RedisInfo.Username = json.Get("redis.username").String() + c.RedisInfo.Password = json.Get("redis.password").String() + c.RedisInfo.Timeout = int(json.Get("redis.timeout").Int()) + if c.RedisInfo.Timeout == 0 { + c.RedisInfo.Timeout = 1000 + } + c.CacheKeyFrom.RequestBody = json.Get("cacheKeyFrom.requestBody").String() + if c.CacheKeyFrom.RequestBody == "" { + c.CacheKeyFrom.RequestBody = "messages.@reverse.0.content" + } + c.CacheValueFrom.ResponseBody = json.Get("cacheValueFrom.responseBody").String() + if c.CacheValueFrom.ResponseBody == "" { + c.CacheValueFrom.ResponseBody = "choices.0.message.content" + } + c.CacheStreamValueFrom.ResponseBody = json.Get("cacheStreamValueFrom.responseBody").String() + if c.CacheStreamValueFrom.ResponseBody == "" { + c.CacheStreamValueFrom.ResponseBody = "choices.0.delta.content" + } + c.ReturnResponseTemplate = json.Get("returnResponseTemplate").String() + if c.ReturnResponseTemplate == "" { + c.ReturnResponseTemplate = `{"id":"from-cache","choices":[{"index":0,"message":{"role":"assistant","content":"%s"},"finish_reason":"stop"}],"model":"gpt-4o","object":"chat.completion","usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}}` + } + c.ReturnStreamResponseTemplate = json.Get("returnStreamResponseTemplate").String() + if c.ReturnStreamResponseTemplate == "" { + c.ReturnStreamResponseTemplate = `data:{"id":"from-cache","choices":[{"index":0,"delta":{"role":"assistant","content":"%s"},"finish_reason":"stop"}],"model":"gpt-4o","object":"chat.completion","usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}}` + "\n\ndata:[DONE]\n\n" + } + c.CacheKeyPrefix = json.Get("cacheKeyPrefix").String() + if c.CacheKeyPrefix == "" { + c.CacheKeyPrefix = DefaultCacheKeyPrefix + } + c.redisClient = wrapper.NewRedisClusterClient(wrapper.FQDNCluster{ + FQDN: c.RedisInfo.ServiceName, + Port: int64(c.RedisInfo.ServicePort), + }) + return c.redisClient.Init(c.RedisInfo.Username, c.RedisInfo.Password, int64(c.RedisInfo.Timeout)) +} + +func onHttpRequestHeaders(ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log) types.Action { + contentType, _ := proxywasm.GetHttpRequestHeader("content-type") + // The request does not have a body. + if contentType == "" { + return types.ActionContinue + } + if !strings.Contains(contentType, "application/json") { + log.Warnf("content is not json, can't process:%s", contentType) + ctx.DontReadRequestBody() + return types.ActionContinue + } + // compatiable with qwen + x_dashscope_sse, _ := proxywasm.GetHttpRequestHeader("X-DashScope-SSE") + accept, _ := proxywasm.GetHttpRequestHeader("Accept") + if x_dashscope_sse == "enable" || strings.Contains(accept, "text/event-stream") { + ctx.SetContext(StreamContextKey, struct{}{}) + } + proxywasm.RemoveHttpRequestHeader("Accept-Encoding") + // The request has a body and requires delaying the header transmission until a cache miss occurs, + // at which point the header should be sent. + return types.HeaderStopIteration +} + +func TrimQuote(source string) string { + return strings.Trim(source, `"`) +} + +func onHttpRequestBody(ctx wrapper.HttpContext, config PluginConfig, body []byte, log wrapper.Log) types.Action { + bodyJson := gjson.ParseBytes(body) + // TODO: It may be necessary to support stream mode determination for different LLM providers. + stream := false + if bodyJson.Get("stream").Bool() { + stream = true + ctx.SetContext(StreamContextKey, struct{}{}) + } else if ctx.GetContext(StreamContextKey) != nil { + stream = true + } + key := TrimQuote(bodyJson.Get(config.CacheKeyFrom.RequestBody).Raw) + if key == "" { + log.Debug("parse key from request body failed") + return types.ActionContinue + } + ctx.SetContext(CacheKeyContextKey, key) + err := config.redisClient.Get(config.CacheKeyPrefix+key, func(response resp.Value) { + if err := response.Error(); err != nil { + log.Errorf("redis get key:%s failed, err:%v", key, err) + proxywasm.ResumeHttpRequest() + return + } + if response.IsNull() { + log.Debugf("cache miss, key:%s", key) + proxywasm.ResumeHttpRequest() + return + } + log.Debugf("cache hit, key:%s", key) + ctx.SetContext(CacheKeyContextKey, nil) + if !stream { + proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "application/json; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnResponseTemplate, response.String())), -1) + } else { + proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "text/event-stream; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnStreamResponseTemplate, response.String())), -1) + } + }) + if err != nil { + log.Error("redis access failed") + return types.ActionContinue + } + return types.ActionPause +} + +func processSSEMessage(ctx wrapper.HttpContext, config PluginConfig, sseMessage string, log wrapper.Log) string { + subMessages := strings.Split(sseMessage, "\n") + var message string + for _, msg := range subMessages { + if strings.HasPrefix(msg, "data:") { + message = msg + break + } + } + if len(message) < 6 { + log.Errorf("invalid message:%s", message) + return "" + } + // skip the prefix "data:" + bodyJson := message[5:] + if gjson.Get(bodyJson, config.CacheStreamValueFrom.ResponseBody).Exists() { + tempContentI := ctx.GetContext(CacheContentContextKey) + if tempContentI == nil { + content := TrimQuote(gjson.Get(bodyJson, config.CacheStreamValueFrom.ResponseBody).Raw) + ctx.SetContext(CacheContentContextKey, content) + return content + } + append := TrimQuote(gjson.Get(bodyJson, config.CacheStreamValueFrom.ResponseBody).Raw) + content := tempContentI.(string) + append + ctx.SetContext(CacheContentContextKey, content) + return content + } else if gjson.Get(bodyJson, "choices.0.delta.content.tool_calls").Exists() { + // TODO: compatible with other providers + ctx.SetContext(ToolCallsContextKey, struct{}{}) + return "" + } + log.Debugf("unknown message:%s", bodyJson) + return "" +} + +func onHttpResponseBody(ctx wrapper.HttpContext, config PluginConfig, chunk []byte, isLastChunk bool, log wrapper.Log) []byte { + if ctx.GetContext(ToolCallsContextKey) != nil { + // we should not cache tool call result + return chunk + } + keyI := ctx.GetContext(CacheKeyContextKey) + if keyI == nil { + return chunk + } + if !isLastChunk { + stream := ctx.GetContext(StreamContextKey) + if stream == nil { + tempContentI := ctx.GetContext(CacheContentContextKey) + if tempContentI == nil { + ctx.SetContext(CacheContentContextKey, chunk) + return chunk + } + tempContent := tempContentI.([]byte) + tempContent = append(tempContent, chunk...) + ctx.SetContext(CacheContentContextKey, tempContent) + } else { + var partialMessage []byte + partialMessageI := ctx.GetContext(PartialMessageContextKey) + if partialMessageI != nil { + partialMessage = append(partialMessageI.([]byte), chunk...) + } else { + partialMessage = chunk + } + messages := strings.Split(string(partialMessage), "\n\n") + for i, msg := range messages { + if i < len(messages)-1 { + // process complete message + processSSEMessage(ctx, config, msg, log) + } + } + if !strings.HasSuffix(string(partialMessage), "\n\n") { + ctx.SetContext(PartialMessageContextKey, []byte(messages[len(messages)-1])) + } else { + ctx.SetContext(PartialMessageContextKey, nil) + } + } + return chunk + } + // last chunk + key := keyI.(string) + stream := ctx.GetContext(StreamContextKey) + var value string + if stream == nil { + var body []byte + tempContentI := ctx.GetContext(CacheContentContextKey) + if tempContentI != nil { + body = append(tempContentI.([]byte), chunk...) + } else { + body = chunk + } + bodyJson := gjson.ParseBytes(body) + + value = TrimQuote(bodyJson.Get(config.CacheValueFrom.ResponseBody).Raw) + if value == "" { + log.Warnf("parse value from response body failded, body:%s", body) + return chunk + } + } else { + if len(chunk) > 0 { + var lastMessage []byte + partialMessageI := ctx.GetContext(PartialMessageContextKey) + if partialMessageI != nil { + lastMessage = append(partialMessageI.([]byte), chunk...) + } else { + lastMessage = chunk + } + if !strings.HasSuffix(string(lastMessage), "\n\n") { + log.Warnf("invalid lastMessage:%s", lastMessage) + return chunk + } + // remove the last \n\n + lastMessage = lastMessage[:len(lastMessage)-2] + value = processSSEMessage(ctx, config, string(lastMessage), log) + } else { + tempContentI := ctx.GetContext(CacheContentContextKey) + if tempContentI == nil { + return chunk + } + value = tempContentI.(string) + } + } + config.redisClient.Set(config.CacheKeyPrefix+key, value, nil) + if config.CacheTTL != 0 { + config.redisClient.Expire(config.CacheKeyPrefix+key, config.CacheTTL, nil) + } + return chunk +} diff --git a/plugins/wasm-go/extensions/ai-cache/option.yaml b/plugins/wasm-go/extensions/ai-cache/option.yaml new file mode 100644 index 000000000..8be6bfd1c --- /dev/null +++ b/plugins/wasm-go/extensions/ai-cache/option.yaml @@ -0,0 +1,52 @@ +# File generated by hgctl. Modify as required. + +version: 1.0.0 + +build: + # The official builder image version + builder: + go: 1.19 + tinygo: 0.28.1 + oras: 1.0.0 + # The WASM plugin project directory + input: ./ + # The output of the build products + output: + # Choose between 'files' and 'image' + type: files + # Destination address: when type=files, specify the local directory path, e.g., './out' or + # type=image, specify the remote docker repository, e.g., 'docker.io//' + dest: ./out + # The authentication configuration for pushing image to the docker repository + docker-auth: ~/.docker/config.json + # The directory for the WASM plugin configuration structure + model-dir: ./ + # The WASM plugin configuration structure name + model: PluginConfig + # Enable debug mode + debug: false + +test: + # Test environment name, that is a docker compose project name + name: wasm-test + # The output path to build products, that is the source of test configuration parameters + from-path: ./out + # The test configuration source + test-path: ./test + # Docker compose configuration, which is empty, looks for the following files from 'test-path': + # compose.yaml, compose.yml, docker-compose.yml, docker-compose.yaml + compose-file: + # Detached mode: Run containers in the background + detach: false + +install: + # The namespace of the installation + namespace: higress-system + # Use to validate WASM plugin configuration when install by yaml + spec-yaml: ./out/spec.yaml + # Installation source. Choose between 'from-yaml' and 'from-go-project' + from-yaml: ./test/plugin-conf.yaml + # If 'from-go-src' is non-empty, the output type of the build option must be 'image' + from-go-src: + # Enable debug mode + debug: false diff --git a/plugins/wasm-go/pkg/wrapper/cluster_wrapper.go b/plugins/wasm-go/pkg/wrapper/cluster_wrapper.go index 101bfbfe9..96600192b 100644 --- a/plugins/wasm-go/pkg/wrapper/cluster_wrapper.go +++ b/plugins/wasm-go/pkg/wrapper/cluster_wrapper.go @@ -151,3 +151,20 @@ func (c ConsulCluster) HostName() string { } return c.ServiceName } + +type FQDNCluster struct { + FQDN string + Host string + Port int64 +} + +func (c FQDNCluster) ClusterName() string { + return fmt.Sprintf("outbound|%d||%s", c.Port, c.FQDN) +} + +func (c FQDNCluster) HostName() string { + if c.Host != "" { + return c.Host + } + return c.FQDN +} diff --git a/plugins/wasm-go/pkg/wrapper/redis_wrapper.go b/plugins/wasm-go/pkg/wrapper/redis_wrapper.go index 77321bedf..10aa9020b 100644 --- a/plugins/wasm-go/pkg/wrapper/redis_wrapper.go +++ b/plugins/wasm-go/pkg/wrapper/redis_wrapper.go @@ -142,7 +142,9 @@ func RedisCall(cluster Cluster, respQuery []byte, callback RedisResponseCallback } } } - callback(responseValue) + if callback != nil { + callback(responseValue) + } }) if err != nil { proxywasm.LogCriticalf("redis call failed, request-id: %s, error: %v", requestID, err)