add ai cache plugin (#1010)

This commit is contained in:
澄潭
2024-05-29 21:10:22 +08:00
committed by GitHub
parent b00f79f3af
commit 812edf1490
10 changed files with 543 additions and 5 deletions

View File

@@ -0,0 +1,19 @@
# File generated by hgctl. Modify as required.
*
!/.gitignore
!*.go
!go.sum
!go.mod
!LICENSE
!*.md
!*.yaml
!*.yml
!*/
/out
/test

View File

@@ -0,0 +1,34 @@
## 简介
**Note**
> 需要数据面的proxy wasm版本大于等于0.2.100
> 编译时需要带上版本的tag例如`tinygo build -o main.wasm -scheduler=none -target=wasi -gc=custom -tags="custommalloc nottinygc_finalizer proxy_wasm_version_0_2_100" ./`
LLM 结果缓存插件,默认配置方式可以直接用于 openai 协议的结果缓存,同时支持流式和非流式响应的缓存。
## 配置说明
| Name | Type | Requirement | Default | Description |
| -------- | -------- | -------- | -------- | -------- |
| cacheKeyFrom.requestBody | string | optional | "messages.@reverse.0.content" | 从请求 Body 中基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串 |
| cacheValueFrom.responseBody | string | optional | "choices.0.message.content" | 从响应 Body 中基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串 |
| cacheStreamValueFrom.responseBody | string | optional | "choices.0.delta.content" | 从流式响应 Body 中基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串 |
| cacheKeyPrefix | string | optional | "higress-ai-cache:" | Redis缓存Key的前缀 |
| cacheTTL | integer | optional | 0 | 缓存的过期时间单位是秒默认值为0即永不过期 |
| redis.serviceName | string | requried | - | redis 服务名称,带服务类型的完整 FQDN 名称,例如 my-redis.dns、redis.my-ns.svc.cluster.local |
| redis.servicePort | integer | optional | 6379 | redis 服务端口 |
| redis.timeout | integer | optional | 1000 | 请求 redis 的超时时间,单位为毫秒 |
| redis.username | string | optional | - | 登陆 redis 的用户名 |
| redis.password | string | optional | - | 登陆 redis 的密码 |
| returnResponseTemplate | string | optional | `{"id":"from-cache","choices":[%s],"model":"gpt-4o","object":"chat.completion","usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}}` | 返回 HTTP 响应的模版,用 %s 标记需要被 cache value 替换的部分 |
| returnStreamResponseTemplate | string | optional | `data:{"id":"from-cache","choices":[{"index":0,"delta":{"role":"assistant","content":"%s"},"finish_reason":"stop"}],"model":"gpt-4o","object":"chat.completion","usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}}\n\ndata:[DONE]\n\n` | 返回流式 HTTP 响应的模版,用 %s 标记需要被 cache value 替换的部分 |
## 配置示例
```yaml
redis:
serviceName: my-redis.dns
timeout: 2000
```

View File

@@ -0,0 +1,23 @@
// File generated by hgctl. Modify as required.
module github.com/alibaba/higress/plugins/wasm-go/extensions/ai-cache
go 1.19
replace github.com/alibaba/higress/plugins/wasm-go => ../..
require (
github.com/alibaba/higress/plugins/wasm-go v1.3.6-0.20240528060522-53bccf89f441
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc
github.com/tidwall/gjson v1.14.3
github.com/tidwall/resp v0.1.1
github.com/tidwall/sjson v1.2.5
)
require (
github.com/google/uuid v1.3.0 // indirect
github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 // indirect
github.com/magefile/mage v1.14.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
)

View File

@@ -0,0 +1,23 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 h1:IHDghbGQ2DTIXHBHxWfqCYQW1fKjyJ/I7W1pMyUDeEA=
github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520/go.mod h1:Nz8ORLaFiLWotg6GeKlJMhv8cci8mM43uEnLA5t8iew=
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc h1:t2AT8zb6N/59Y78lyRWedVoVWHNRSCBh0oWCC+bluTQ=
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo=
github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo=
github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw=
github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/resp v0.1.1 h1:Ly20wkhqKTmDUPlyM1S7pWo5kk0tDu8OoC/vFArXmwE=
github.com/tidwall/resp v0.1.1/go.mod h1:3/FrruOBAxPTPtundW0VXgmsQ4ZBA0Aw714lVYgwFa0=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

View File

@@ -0,0 +1,361 @@
// File generated by hgctl. Modify as required.
// See: https://higress.io/zh-cn/docs/user/wasm-go#2-%E7%BC%96%E5%86%99-maingo-%E6%96%87%E4%BB%B6
package main
import (
"errors"
"fmt"
"strings"
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
"github.com/tidwall/gjson"
"github.com/tidwall/resp"
)
const (
CacheKeyContextKey = "cacheKey"
CacheContentContextKey = "cacheContent"
PartialMessageContextKey = "partialMessage"
ToolCallsContextKey = "toolCalls"
StreamContextKey = "stream"
DefaultCacheKeyPrefix = "higress-ai-cache:"
)
func main() {
wrapper.SetCtx(
"ai-cache",
wrapper.ParseConfigBy(parseConfig),
wrapper.ProcessRequestHeadersBy(onHttpRequestHeaders),
wrapper.ProcessRequestBodyBy(onHttpRequestBody),
wrapper.ProcessStreamingResponseBodyBy(onHttpResponseBody),
)
}
// @Name ai-cache
// @Category protocol
// @Phase AUTHN
// @Priority 10
// @Title zh-CN AI Cache
// @Description zh-CN 大模型结果缓存
// @IconUrl
// @Version 0.1.0
//
// @Contact.name johnlanni
// @Contact.url
// @Contact.email
//
// @Example
// redis:
// serviceName: my-redis.dns
// timeout: 2000
// cacheKeyFrom:
// requestBody: "messages.@reverse.0.content"
// cacheValueFrom:
// responseBody: "choices.0"
// returnResponseTemplate: |
// {"id":"from-cache","choices":[%s],"model":"gpt-4o","object":"chat.completion","usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}}
// @End
type RedisInfo struct {
// @Title zh-CN redis 服务名称
// @Description zh-CN 带服务类型的完整 FQDN 名称,例如 my-redis.dns、redis.my-ns.svc.cluster.local
ServiceName string `required:"true" yaml:"serviceName" json:"serviceName"`
// @Title zh-CN redis 服务端口
// @Description zh-CN 默认值为6379
ServicePort int `required:"false" yaml:"servicePort" json:"servicePort"`
// @Title zh-CN 用户名
// @Description zh-CN 登陆 redis 的用户名,非必填
Username string `required:"false" yaml:"username" json:"username"`
// @Title zh-CN 密码
// @Description zh-CN 登陆 redis 的密码,非必填,可以只填密码
Password string `required:"false" yaml:"password" json:"password"`
// @Title zh-CN 请求超时
// @Description zh-CN 请求 redis 的超时时间单位为毫秒。默认值是1000即1秒
Timeout int `required:"false" yaml:"timeout" json:"timeout"`
}
type KVExtractor struct {
// @Title zh-CN 从请求 Body 中基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串
RequestBody string `required:"false" yaml:"requestBody" json:"requestBody"`
// @Title zh-CN 从响应 Body 中基于 [GJSON PATH](https://github.com/tidwall/gjson/blob/master/SYNTAX.md) 语法提取字符串
ResponseBody string `required:"false" yaml:"responseBody" json:"responseBody"`
}
type PluginConfig struct {
// @Title zh-CN Redis 地址信息
// @Description zh-CN 用于存储缓存结果的 Redis 地址
RedisInfo RedisInfo `required:"true" yaml:"redis" json:"redis"`
// @Title zh-CN 缓存 key 的来源
// @Description zh-CN 往 redis 里存时,使用的 key 的提取方式
CacheKeyFrom KVExtractor `required:"true" yaml:"cacheKeyFrom" json:"cacheKeyFrom"`
// @Title zh-CN 缓存 value 的来源
// @Description zh-CN 往 redis 里存时,使用的 value 的提取方式
CacheValueFrom KVExtractor `required:"true" yaml:"cacheValueFrom" json:"cacheValueFrom"`
// @Title zh-CN 流式响应下,缓存 value 的来源
// @Description zh-CN 往 redis 里存时,使用的 value 的提取方式
CacheStreamValueFrom KVExtractor `required:"true" yaml:"cacheStreamValueFrom" json:"cacheStreamValueFrom"`
// @Title zh-CN 返回 HTTP 响应的模版
// @Description zh-CN 用 %s 标记需要被 cache value 替换的部分
ReturnResponseTemplate string `required:"true" yaml:"returnResponseTemplate" json:"returnResponseTemplate"`
// @Title zh-CN 返回流式 HTTP 响应的模版
// @Description zh-CN 用 %s 标记需要被 cache value 替换的部分
ReturnStreamResponseTemplate string `required:"true" yaml:"returnStreamResponseTemplate" json:"returnStreamResponseTemplate"`
// @Title zh-CN 缓存的过期时间
// @Description zh-CN 单位是秒默认值为0即永不过期
CacheTTL int `required:"false" yaml:"cacheTTL" json:"cacheTTL"`
// @Title zh-CN Redis缓存Key的前缀
// @Description zh-CN 默认值是"higress-ai-cache:"
CacheKeyPrefix string `required:"false" yaml:"cacheKeyPrefix" json:"cacheKeyPrefix"`
redisClient wrapper.RedisClient `yaml:"-" json:"-"`
}
func parseConfig(json gjson.Result, c *PluginConfig, log wrapper.Log) error {
c.RedisInfo.ServiceName = json.Get("redis.serviceName").String()
if c.RedisInfo.ServiceName == "" {
return errors.New("redis service name must not by empty")
}
c.RedisInfo.ServicePort = int(json.Get("redis.servicePort").Int())
if c.RedisInfo.ServicePort == 0 {
if strings.HasSuffix(c.RedisInfo.ServiceName, ".static") {
// use default logic port which is 80 for static service
c.RedisInfo.ServicePort = 80
} else {
c.RedisInfo.ServicePort = 6379
}
}
c.RedisInfo.Username = json.Get("redis.username").String()
c.RedisInfo.Password = json.Get("redis.password").String()
c.RedisInfo.Timeout = int(json.Get("redis.timeout").Int())
if c.RedisInfo.Timeout == 0 {
c.RedisInfo.Timeout = 1000
}
c.CacheKeyFrom.RequestBody = json.Get("cacheKeyFrom.requestBody").String()
if c.CacheKeyFrom.RequestBody == "" {
c.CacheKeyFrom.RequestBody = "messages.@reverse.0.content"
}
c.CacheValueFrom.ResponseBody = json.Get("cacheValueFrom.responseBody").String()
if c.CacheValueFrom.ResponseBody == "" {
c.CacheValueFrom.ResponseBody = "choices.0.message.content"
}
c.CacheStreamValueFrom.ResponseBody = json.Get("cacheStreamValueFrom.responseBody").String()
if c.CacheStreamValueFrom.ResponseBody == "" {
c.CacheStreamValueFrom.ResponseBody = "choices.0.delta.content"
}
c.ReturnResponseTemplate = json.Get("returnResponseTemplate").String()
if c.ReturnResponseTemplate == "" {
c.ReturnResponseTemplate = `{"id":"from-cache","choices":[{"index":0,"message":{"role":"assistant","content":"%s"},"finish_reason":"stop"}],"model":"gpt-4o","object":"chat.completion","usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}}`
}
c.ReturnStreamResponseTemplate = json.Get("returnStreamResponseTemplate").String()
if c.ReturnStreamResponseTemplate == "" {
c.ReturnStreamResponseTemplate = `data:{"id":"from-cache","choices":[{"index":0,"delta":{"role":"assistant","content":"%s"},"finish_reason":"stop"}],"model":"gpt-4o","object":"chat.completion","usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}}` + "\n\ndata:[DONE]\n\n"
}
c.CacheKeyPrefix = json.Get("cacheKeyPrefix").String()
if c.CacheKeyPrefix == "" {
c.CacheKeyPrefix = DefaultCacheKeyPrefix
}
c.redisClient = wrapper.NewRedisClusterClient(wrapper.FQDNCluster{
FQDN: c.RedisInfo.ServiceName,
Port: int64(c.RedisInfo.ServicePort),
})
return c.redisClient.Init(c.RedisInfo.Username, c.RedisInfo.Password, int64(c.RedisInfo.Timeout))
}
func onHttpRequestHeaders(ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log) types.Action {
contentType, _ := proxywasm.GetHttpRequestHeader("content-type")
// The request does not have a body.
if contentType == "" {
return types.ActionContinue
}
if !strings.Contains(contentType, "application/json") {
log.Warnf("content is not json, can't process:%s", contentType)
ctx.DontReadRequestBody()
return types.ActionContinue
}
// compatiable with qwen
x_dashscope_sse, _ := proxywasm.GetHttpRequestHeader("X-DashScope-SSE")
accept, _ := proxywasm.GetHttpRequestHeader("Accept")
if x_dashscope_sse == "enable" || strings.Contains(accept, "text/event-stream") {
ctx.SetContext(StreamContextKey, struct{}{})
}
proxywasm.RemoveHttpRequestHeader("Accept-Encoding")
// The request has a body and requires delaying the header transmission until a cache miss occurs,
// at which point the header should be sent.
return types.HeaderStopIteration
}
func TrimQuote(source string) string {
return strings.Trim(source, `"`)
}
func onHttpRequestBody(ctx wrapper.HttpContext, config PluginConfig, body []byte, log wrapper.Log) types.Action {
bodyJson := gjson.ParseBytes(body)
// TODO: It may be necessary to support stream mode determination for different LLM providers.
stream := false
if bodyJson.Get("stream").Bool() {
stream = true
ctx.SetContext(StreamContextKey, struct{}{})
} else if ctx.GetContext(StreamContextKey) != nil {
stream = true
}
key := TrimQuote(bodyJson.Get(config.CacheKeyFrom.RequestBody).Raw)
if key == "" {
log.Debug("parse key from request body failed")
return types.ActionContinue
}
ctx.SetContext(CacheKeyContextKey, key)
err := config.redisClient.Get(config.CacheKeyPrefix+key, func(response resp.Value) {
if err := response.Error(); err != nil {
log.Errorf("redis get key:%s failed, err:%v", key, err)
proxywasm.ResumeHttpRequest()
return
}
if response.IsNull() {
log.Debugf("cache miss, key:%s", key)
proxywasm.ResumeHttpRequest()
return
}
log.Debugf("cache hit, key:%s", key)
ctx.SetContext(CacheKeyContextKey, nil)
if !stream {
proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "application/json; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnResponseTemplate, response.String())), -1)
} else {
proxywasm.SendHttpResponse(200, [][2]string{{"content-type", "text/event-stream; charset=utf-8"}}, []byte(fmt.Sprintf(config.ReturnStreamResponseTemplate, response.String())), -1)
}
})
if err != nil {
log.Error("redis access failed")
return types.ActionContinue
}
return types.ActionPause
}
func processSSEMessage(ctx wrapper.HttpContext, config PluginConfig, sseMessage string, log wrapper.Log) string {
subMessages := strings.Split(sseMessage, "\n")
var message string
for _, msg := range subMessages {
if strings.HasPrefix(msg, "data:") {
message = msg
break
}
}
if len(message) < 6 {
log.Errorf("invalid message:%s", message)
return ""
}
// skip the prefix "data:"
bodyJson := message[5:]
if gjson.Get(bodyJson, config.CacheStreamValueFrom.ResponseBody).Exists() {
tempContentI := ctx.GetContext(CacheContentContextKey)
if tempContentI == nil {
content := TrimQuote(gjson.Get(bodyJson, config.CacheStreamValueFrom.ResponseBody).Raw)
ctx.SetContext(CacheContentContextKey, content)
return content
}
append := TrimQuote(gjson.Get(bodyJson, config.CacheStreamValueFrom.ResponseBody).Raw)
content := tempContentI.(string) + append
ctx.SetContext(CacheContentContextKey, content)
return content
} else if gjson.Get(bodyJson, "choices.0.delta.content.tool_calls").Exists() {
// TODO: compatible with other providers
ctx.SetContext(ToolCallsContextKey, struct{}{})
return ""
}
log.Debugf("unknown message:%s", bodyJson)
return ""
}
func onHttpResponseBody(ctx wrapper.HttpContext, config PluginConfig, chunk []byte, isLastChunk bool, log wrapper.Log) []byte {
if ctx.GetContext(ToolCallsContextKey) != nil {
// we should not cache tool call result
return chunk
}
keyI := ctx.GetContext(CacheKeyContextKey)
if keyI == nil {
return chunk
}
if !isLastChunk {
stream := ctx.GetContext(StreamContextKey)
if stream == nil {
tempContentI := ctx.GetContext(CacheContentContextKey)
if tempContentI == nil {
ctx.SetContext(CacheContentContextKey, chunk)
return chunk
}
tempContent := tempContentI.([]byte)
tempContent = append(tempContent, chunk...)
ctx.SetContext(CacheContentContextKey, tempContent)
} else {
var partialMessage []byte
partialMessageI := ctx.GetContext(PartialMessageContextKey)
if partialMessageI != nil {
partialMessage = append(partialMessageI.([]byte), chunk...)
} else {
partialMessage = chunk
}
messages := strings.Split(string(partialMessage), "\n\n")
for i, msg := range messages {
if i < len(messages)-1 {
// process complete message
processSSEMessage(ctx, config, msg, log)
}
}
if !strings.HasSuffix(string(partialMessage), "\n\n") {
ctx.SetContext(PartialMessageContextKey, []byte(messages[len(messages)-1]))
} else {
ctx.SetContext(PartialMessageContextKey, nil)
}
}
return chunk
}
// last chunk
key := keyI.(string)
stream := ctx.GetContext(StreamContextKey)
var value string
if stream == nil {
var body []byte
tempContentI := ctx.GetContext(CacheContentContextKey)
if tempContentI != nil {
body = append(tempContentI.([]byte), chunk...)
} else {
body = chunk
}
bodyJson := gjson.ParseBytes(body)
value = TrimQuote(bodyJson.Get(config.CacheValueFrom.ResponseBody).Raw)
if value == "" {
log.Warnf("parse value from response body failded, body:%s", body)
return chunk
}
} else {
if len(chunk) > 0 {
var lastMessage []byte
partialMessageI := ctx.GetContext(PartialMessageContextKey)
if partialMessageI != nil {
lastMessage = append(partialMessageI.([]byte), chunk...)
} else {
lastMessage = chunk
}
if !strings.HasSuffix(string(lastMessage), "\n\n") {
log.Warnf("invalid lastMessage:%s", lastMessage)
return chunk
}
// remove the last \n\n
lastMessage = lastMessage[:len(lastMessage)-2]
value = processSSEMessage(ctx, config, string(lastMessage), log)
} else {
tempContentI := ctx.GetContext(CacheContentContextKey)
if tempContentI == nil {
return chunk
}
value = tempContentI.(string)
}
}
config.redisClient.Set(config.CacheKeyPrefix+key, value, nil)
if config.CacheTTL != 0 {
config.redisClient.Expire(config.CacheKeyPrefix+key, config.CacheTTL, nil)
}
return chunk
}

View File

@@ -0,0 +1,52 @@
# File generated by hgctl. Modify as required.
version: 1.0.0
build:
# The official builder image version
builder:
go: 1.19
tinygo: 0.28.1
oras: 1.0.0
# The WASM plugin project directory
input: ./
# The output of the build products
output:
# Choose between 'files' and 'image'
type: files
# Destination address: when type=files, specify the local directory path, e.g., './out' or
# type=image, specify the remote docker repository, e.g., 'docker.io/<your_username>/<your_image>'
dest: ./out
# The authentication configuration for pushing image to the docker repository
docker-auth: ~/.docker/config.json
# The directory for the WASM plugin configuration structure
model-dir: ./
# The WASM plugin configuration structure name
model: PluginConfig
# Enable debug mode
debug: false
test:
# Test environment name, that is a docker compose project name
name: wasm-test
# The output path to build products, that is the source of test configuration parameters
from-path: ./out
# The test configuration source
test-path: ./test
# Docker compose configuration, which is empty, looks for the following files from 'test-path':
# compose.yaml, compose.yml, docker-compose.yml, docker-compose.yaml
compose-file:
# Detached mode: Run containers in the background
detach: false
install:
# The namespace of the installation
namespace: higress-system
# Use to validate WASM plugin configuration when install by yaml
spec-yaml: ./out/spec.yaml
# Installation source. Choose between 'from-yaml' and 'from-go-project'
from-yaml: ./test/plugin-conf.yaml
# If 'from-go-src' is non-empty, the output type of the build option must be 'image'
from-go-src:
# Enable debug mode
debug: false

View File

@@ -151,3 +151,20 @@ func (c ConsulCluster) HostName() string {
}
return c.ServiceName
}
type FQDNCluster struct {
FQDN string
Host string
Port int64
}
func (c FQDNCluster) ClusterName() string {
return fmt.Sprintf("outbound|%d||%s", c.Port, c.FQDN)
}
func (c FQDNCluster) HostName() string {
if c.Host != "" {
return c.Host
}
return c.FQDN
}

View File

@@ -142,7 +142,9 @@ func RedisCall(cluster Cluster, respQuery []byte, callback RedisResponseCallback
}
}
}
callback(responseValue)
if callback != nil {
callback(responseValue)
}
})
if err != nil {
proxywasm.LogCriticalf("redis call failed, request-id: %s, error: %v", requestID, err)