feat: Add an AI-Proxy Wasm plugin (#921)

Co-authored-by: 澄潭 <zty98751@alibaba-inc.com>
This commit is contained in:
Kent Dong
2024-05-14 17:00:12 +08:00
committed by GitHub
parent 5c7736980c
commit 333f9b48f3
21 changed files with 2124 additions and 1 deletions

View File

@@ -0,0 +1,99 @@
package provider
import (
"errors"
"fmt"
"net/url"
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/util"
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
)
// azureProvider is the provider for Azure OpenAI service.
type azureProviderInitializer struct {
}
func (m *azureProviderInitializer) ValidateConfig(config ProviderConfig) error {
if config.azureServiceUrl == "" {
return errors.New("missing azureServiceUrl in provider config")
}
if _, err := url.Parse(config.azureServiceUrl); err != nil {
return fmt.Errorf("invalid azureServiceUrl: %w", err)
}
return nil
}
func (m *azureProviderInitializer) CreateProvider(config ProviderConfig) (Provider, error) {
var serviceUrl *url.URL
if u, err := url.Parse(config.azureServiceUrl); err != nil {
return nil, fmt.Errorf("invalid azureServiceUrl: %w", err)
} else {
serviceUrl = u
}
return &azureProvider{
config: config,
serviceUrl: serviceUrl,
contextCache: createContextCache(&config),
}, nil
}
type azureProvider struct {
config ProviderConfig
contextCache *contextCache
serviceUrl *url.URL
}
func (m *azureProvider) GetProviderType() string {
return providerTypeAzure
}
func (m *azureProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiName, log wrapper.Log) (types.Action, error) {
if apiName != ApiNameChatCompletion {
return types.ActionContinue, errUnsupportedApiName
}
_ = util.OverwriteRequestPath(m.serviceUrl.RequestURI())
_ = util.OverwriteRequestHost(m.serviceUrl.Host)
_ = proxywasm.ReplaceHttpRequestHeader("api-key", m.config.apiTokens[0])
if m.contextCache == nil {
ctx.DontReadRequestBody()
} else {
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
}
return types.ActionContinue, nil
}
func (m *azureProvider) OnRequestBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error) {
if apiName != ApiNameChatCompletion {
return types.ActionContinue, errUnsupportedApiName
}
if m.contextCache == nil {
return types.ActionContinue, nil
}
request := &chatCompletionRequest{}
if err := decodeChatCompletionRequest(body, request); err != nil {
return types.ActionContinue, err
}
err := m.contextCache.GetContent(func(content string, err error) {
defer func() {
_ = proxywasm.ResumeHttpRequest()
}()
if err != nil {
log.Errorf("failed to load context file: %v", err)
_ = util.SendResponse(500, util.MimeTypeTextPlain, fmt.Sprintf("failed to load context file: %v", err))
}
insertContextMessage(request, content)
if err := replaceJsonRequestBody(request, log); err != nil {
_ = util.SendResponse(500, util.MimeTypeTextPlain, fmt.Sprintf("failed to replace request body: %v", err))
}
}, log)
if err == nil {
return types.ActionPause, nil
}
return types.ActionContinue, err
}

View File

@@ -0,0 +1,17 @@
package provider
import "fmt"
type plainCluster struct {
serviceName string
servicePort int64
hostName string
}
func (c plainCluster) ClusterName() string {
return fmt.Sprintf("outbound|%d||%s", c.servicePort, c.serviceName)
}
func (c plainCluster) HostName() string {
return c.hostName
}

View File

@@ -0,0 +1,100 @@
package provider
import (
"errors"
"fmt"
"net/http"
"net/url"
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
"github.com/tidwall/gjson"
)
type ContextConfig struct {
// @Title zh-CN 文件URL
// @Description zh-CN 用于获取对话上下文的文件的URL。目前仅支持HTTP和HTTPS协议纯文本格式文件
fileUrl string `required:"true" yaml:"url" json:"url"`
// @Title zh-CN 上游服务名称
// @Description zh-CN 文件服务所对应的网关内上游服务名称
serviceName string `required:"true" yaml:"serviceName" json:"serviceName"`
// @Title zh-CN 上游服务端口
// @Description zh-CN 文件服务所对应的网关内上游服务名称
servicePort int64 `required:"true" yaml:"serviceName" json:"serviceName"`
fileUrlObj *url.URL `yaml:"-"`
}
func (c *ContextConfig) FromJson(json gjson.Result) {
c.fileUrl = json.Get("fileUrl").String()
c.serviceName = json.Get("serviceName").String()
c.servicePort = json.Get("servicePort").Int()
}
func (c *ContextConfig) Validate() error {
if c.fileUrl == "" {
return errors.New("missing fileUrl in context config")
}
if fileUrlObj, err := url.Parse(c.fileUrl); err != nil {
return fmt.Errorf("invalid fileUrl in context config: %v", err)
} else {
c.fileUrlObj = fileUrlObj
}
if c.serviceName == "" {
return errors.New("missing serviceName in context config")
}
if c.servicePort == 0 {
return errors.New("missing servicePort in context config")
}
return nil
}
type contextCache struct {
client wrapper.HttpClient
fileUrl *url.URL
timeout uint32
loaded bool
content string
}
func (c *contextCache) GetContent(callback func(string, error), log wrapper.Log) error {
if callback == nil {
return errors.New("callback is nil")
}
if c.loaded {
log.Debugf("context file loaded from cache")
callback(c.content, nil)
return nil
}
log.Infof("loading context file from %s", c.fileUrl.String())
return c.client.Get(c.fileUrl.Path, nil, func(statusCode int, responseHeaders http.Header, responseBody []byte) {
if statusCode != http.StatusOK {
callback("", fmt.Errorf("failed to load context file, status: %d", statusCode))
return
}
c.content = string(responseBody)
c.loaded = true
log.Debugf("content: %s", c.content)
callback(c.content, nil)
}, c.timeout)
}
func createContextCache(providerConfig *ProviderConfig) *contextCache {
contextConfig := providerConfig.context
if contextConfig == nil {
return nil
}
fileUrlObj, _ := url.Parse(contextConfig.fileUrl)
cluster := plainCluster{
serviceName: contextConfig.serviceName,
servicePort: contextConfig.servicePort,
hostName: fileUrlObj.Host,
}
return &contextCache{
client: wrapper.NewClusterClient(cluster),
fileUrl: fileUrlObj,
timeout: providerConfig.timeout,
}
}

View File

@@ -0,0 +1,44 @@
package provider
type chatCompletionRequest struct {
Model string `json:"model"`
Messages []chatMessage `json:"messages"`
MaxTokens int `json:"max_tokens,omitempty"`
FrequencyPenalty float64 `json:"frequency_penalty,omitempty"`
N int `json:"n,omitempty"`
PresencePenalty float64 `json:"presence_penalty,omitempty"`
Seed int `json:"seed,omitempty"`
Stream bool `json:"stream,omitempty"`
Temperature float64 `json:"temperature,omitempty"`
TopP float64 `json:"top_p,omitempty"`
User string `json:"user,omitempty"`
}
type chatCompletionResponse struct {
Id string `json:"id,omitempty"`
Choices []chatCompletionChoice `json:"choices,omitempty"`
Created int64 `json:"created,omitempty"`
Model string `json:"model,omitempty"`
SystemFingerprint string `json:"system_fingerprint,omitempty"`
Object string `json:"object,omitempty"`
Usage chatCompletionUsage `json:"usage,omitempty"`
}
type chatCompletionChoice struct {
Index int `json:"index"`
Message *chatMessage `json:"message,omitempty"`
Delta *chatMessage `json:"delta,omitempty"`
FinishReason string `json:"finish_reason,omitempty"`
}
type chatCompletionUsage struct {
PromptTokens int `json:"prompt_tokens,omitempty"`
CompletionTokens int `json:"completion_tokens,omitempty"`
TotalTokens int `json:"total_tokens,omitempty"`
}
type chatMessage struct {
Name string `json:"name,omitempty"`
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
}

View File

@@ -0,0 +1,152 @@
package provider
import (
"errors"
"fmt"
"net/http"
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/util"
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
"github.com/tidwall/gjson"
)
// moonshotProvider is the provider for Moonshot AI service.
const (
moonshotDomain = "api.moonshot.cn"
moonshotChatCompletionPath = "/v1/chat/completions"
)
type moonshotProviderInitializer struct {
}
func (m *moonshotProviderInitializer) ValidateConfig(config ProviderConfig) error {
if config.moonshotFileId != "" && config.context != nil {
return errors.New("moonshotFileId and context cannot be configured at the same time")
}
return nil
}
func (m *moonshotProviderInitializer) CreateProvider(config ProviderConfig) (Provider, error) {
return &moonshotProvider{
config: config,
client: wrapper.NewClusterClient(wrapper.RouteCluster{
Host: moonshotDomain,
}),
contextCache: createContextCache(&config),
}, nil
}
type moonshotProvider struct {
config ProviderConfig
client wrapper.HttpClient
fileContent string
contextCache *contextCache
}
func (m *moonshotProvider) GetProviderType() string {
return providerTypeMoonshot
}
func (m *moonshotProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiName, log wrapper.Log) (types.Action, error) {
if apiName != ApiNameChatCompletion {
return types.ActionContinue, errUnsupportedApiName
}
_ = util.OverwriteRequestPath(moonshotChatCompletionPath)
_ = util.OverwriteRequestHost(moonshotDomain)
_ = proxywasm.ReplaceHttpRequestHeader("Authorization", "Bearer "+m.config.GetRandomToken())
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
return types.ActionContinue, nil
}
func (m *moonshotProvider) OnRequestBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error) {
if apiName != ApiNameChatCompletion {
return types.ActionContinue, errUnsupportedApiName
}
request := &chatCompletionRequest{}
if err := decodeChatCompletionRequest(body, request); err != nil {
return types.ActionContinue, err
}
model := request.Model
if model == "" {
return types.ActionContinue, errors.New("missing model in chat completion request")
}
mappedModel := getMappedModel(model, m.config.modelMapping, log)
if mappedModel == "" {
return types.ActionContinue, errors.New("model becomes empty after applying the configured mapping")
}
request.Model = mappedModel
if m.config.moonshotFileId == "" && m.contextCache == nil {
return types.ActionContinue, replaceJsonRequestBody(request, log)
}
err := m.getContextContent(func(content string, err error) {
defer func() {
_ = proxywasm.ResumeHttpRequest()
}()
if err != nil {
log.Errorf("failed to load context file: %v", err)
_ = util.SendResponse(500, util.MimeTypeTextPlain, fmt.Sprintf("failed to load context file: %v", err))
return
}
err = m.performChatCompletion(ctx, content, request, log)
if err != nil {
_ = util.SendResponse(500, util.MimeTypeTextPlain, fmt.Sprintf("failed to perform chat completion: %v", err))
}
}, log)
if err == nil {
return types.ActionPause, nil
}
return types.ActionContinue, err
}
func (m *moonshotProvider) performChatCompletion(ctx wrapper.HttpContext, fileContent string, request *chatCompletionRequest, log wrapper.Log) error {
insertContextMessage(request, fileContent)
return replaceJsonRequestBody(request, log)
}
func (m *moonshotProvider) getContextContent(callback func(string, error), log wrapper.Log) error {
if m.config.moonshotFileId != "" {
if m.fileContent != "" {
callback(m.fileContent, nil)
return nil
}
return m.sendRequest(http.MethodGet, "/v1/files/"+m.config.moonshotFileId+"/content", "",
func(statusCode int, responseHeaders http.Header, responseBody []byte) {
responseString := string(responseBody)
if statusCode != http.StatusOK {
log.Errorf("failed to load knowledge base file from AI service, status: %d body: %s", statusCode, responseString)
callback("", fmt.Errorf("failed to load knowledge base file from moonshot service, status: %d", statusCode))
return
}
responseJson := gjson.Parse(responseString)
m.fileContent = responseJson.Get("content").String()
callback(m.fileContent, nil)
})
}
if m.contextCache != nil {
return m.contextCache.GetContent(callback, log)
}
return errors.New("both moonshotFileId and context are not configured")
}
func (m *moonshotProvider) sendRequest(method, path string, body string, callback wrapper.ResponseCallback) error {
switch method {
case http.MethodGet:
headers := util.CreateHeaders("Authorization", "Bearer "+m.config.GetRandomToken())
return m.client.Get(path, headers, callback, m.config.timeout)
case http.MethodPost:
headers := util.CreateHeaders("Authorization", "Bearer "+m.config.GetRandomToken(), "Content-Type", "application/json")
return m.client.Post(path, headers, []byte(body), callback, m.config.timeout)
default:
return errors.New("unsupported method: " + method)
}
}

View File

@@ -0,0 +1,87 @@
package provider
import (
"fmt"
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/util"
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
)
// azureProvider is the provider for Azure OpenAI service.
const (
openaiDomain = "api.openai.com"
openaiChatCompletionPath = "/v1/chat/completions"
)
type openaiProviderInitializer struct {
}
func (m *openaiProviderInitializer) ValidateConfig(config ProviderConfig) error {
return nil
}
func (m *openaiProviderInitializer) CreateProvider(config ProviderConfig) (Provider, error) {
return &openaiProvider{
config: config,
contextCache: createContextCache(&config),
}, nil
}
type openaiProvider struct {
config ProviderConfig
contextCache *contextCache
}
func (m *openaiProvider) GetProviderType() string {
return providerTypeOpenAI
}
func (m *openaiProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiName, log wrapper.Log) (types.Action, error) {
if apiName != ApiNameChatCompletion {
return types.ActionContinue, errUnsupportedApiName
}
_ = util.OverwriteRequestPath(openaiChatCompletionPath)
_ = util.OverwriteRequestHost(openaiDomain)
_ = proxywasm.ReplaceHttpRequestHeader("Authorization", "Bearer "+m.config.GetRandomToken())
if m.contextCache == nil {
ctx.DontReadRequestBody()
} else {
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
}
return types.ActionContinue, nil
}
func (m *openaiProvider) OnRequestBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error) {
if apiName != ApiNameChatCompletion {
return types.ActionContinue, errUnsupportedApiName
}
if m.contextCache == nil {
return types.ActionContinue, nil
}
request := &chatCompletionRequest{}
if err := decodeChatCompletionRequest(body, request); err != nil {
return types.ActionContinue, err
}
err := m.contextCache.GetContent(func(content string, err error) {
defer func() {
_ = proxywasm.ResumeHttpRequest()
}()
if err != nil {
log.Errorf("failed to load context file: %v", err)
_ = util.SendResponse(500, util.MimeTypeTextPlain, fmt.Sprintf("failed to load context file: %v", err))
}
insertContextMessage(request, content)
if err := replaceJsonRequestBody(request, log); err != nil {
_ = util.SendResponse(500, util.MimeTypeTextPlain, fmt.Sprintf("failed to replace request body: %v", err))
}
}, log)
if err == nil {
return types.ActionPause, nil
}
return types.ActionContinue, err
}

View File

@@ -0,0 +1,199 @@
package provider
import (
"errors"
"math/rand"
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
"github.com/tidwall/gjson"
)
type ApiName string
type Pointcut string
const (
ApiNameChatCompletion ApiName = "chatCompletion"
providerTypeMoonshot = "moonshot"
providerTypeAzure = "azure"
providerTypeQwen = "qwen"
providerTypeOpenAI = "openai"
protocolOpenAI = "openai"
protocolOriginal = "original"
roleSystem = "system"
ctxKeyStreamingBody = "streamingBody"
ctxKeyOriginalRequestModel = "originalRequestModel"
ctxKeyFinalRequestModel = "finalRequestModel"
objectChatCompletion = "chat.completion"
objectChatCompletionChunk = "chat.completion.chunk"
finishReasonStop = "stop"
wildcard = "*"
defaultTimeout = 2 * 60 * 1000 // ms
)
type providerInitializer interface {
ValidateConfig(ProviderConfig) error
CreateProvider(ProviderConfig) (Provider, error)
}
var (
errUnsupportedApiName = errors.New("unsupported API name")
providerInitializers = map[string]providerInitializer{
providerTypeMoonshot: &moonshotProviderInitializer{},
providerTypeAzure: &azureProviderInitializer{},
providerTypeQwen: &qwenProviderInitializer{},
providerTypeOpenAI: &openaiProviderInitializer{},
}
)
type Provider interface {
GetProviderType() string
}
type RequestHeadersHandler interface {
OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiName, log wrapper.Log) (types.Action, error)
}
type RequestBodyHandler interface {
OnRequestBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error)
}
type ResponseHeadersHandler interface {
OnResponseHeaders(ctx wrapper.HttpContext, apiName ApiName, log wrapper.Log) (types.Action, error)
}
type StreamingResponseBodyHandler interface {
OnStreamingResponseBody(ctx wrapper.HttpContext, name ApiName, chunk []byte, isLastChunk bool, log wrapper.Log) ([]byte, error)
}
type ResponseBodyHandler interface {
OnResponseBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error)
}
type ProviderConfig struct {
// @Title zh-CN AI服务提供商
// @Description zh-CN AI服务提供商类型目前支持的取值为"moonshot"
typ string `required:"true" yaml:"type" json:"type"`
// @Title zh-CN API Tokens
// @Description zh-CN 在请求AI服务时用于认证的API Token列表。不同的AI服务提供商可能有不同的名称。部分供应商只支持配置一个API Token如Azure OpenAI
apiTokens []string `required:"false" yaml:"apiToken" json:"apiTokens"`
// @Title zh-CN 请求超时
// @Description zh-CN 请求AI服务的超时时间单位为毫秒。默认值为120000即2分钟
timeout uint32 `required:"false" yaml:"timeout" json:"timeout"`
// @Title zh-CN Moonshot File ID
// @Description zh-CN 仅适用于Moonshot AI服务。Moonshot AI服务的文件 ID其内容用于补充 AI 请求上下文
moonshotFileId string `required:"false" yaml:"moonshotFileId" json:"moonshotFileId"`
// @Title zh-CN Azure OpenAI Service URL
// @Description zh-CN 仅适用于Azure OpenAI服务。要请求的OpenAI服务的完整URL包含api-version等参数
azureServiceUrl string `required:"false" yaml:"azureServiceUrl" json:"azureServiceUrl"`
// @Title zh-CN 模型名称映射表
// @Description zh-CN 用于将请求中的模型名称映射为目标AI服务商支持的模型名称。支持通过“*”来配置全局映射
modelMapping map[string]string `required:"false" yaml:"modelMapping" json:"modelMapping"`
// @Title zh-CN 对外接口协议
// @Description zh-CN 通过本插件对外提供的AI服务接口协议。默认值为“openai”即OpenAI的接口协议。如需保留原有接口协议可配置为“original"
protocol string `required:"false" yaml:"protocol" json:"protocol"`
// @Title zh-CN 模型对话上下文
// @Description zh-CN 配置一个外部获取对话上下文的文件来源用于在AI请求中补充对话上下文
context *ContextConfig `required:"false" yaml:"context" json:"context"`
}
func (c *ProviderConfig) FromJson(json gjson.Result) {
c.typ = json.Get("type").String()
c.apiTokens = make([]string, 0)
for _, token := range json.Get("apiTokens").Array() {
c.apiTokens = append(c.apiTokens, token.String())
}
c.timeout = uint32(json.Get("timeout").Uint())
if c.timeout == 0 {
c.timeout = defaultTimeout
}
c.moonshotFileId = json.Get("moonshotFileId").String()
c.azureServiceUrl = json.Get("azureServiceUrl").String()
c.modelMapping = make(map[string]string)
for k, v := range json.Get("modelMapping").Map() {
c.modelMapping[k] = v.String()
}
c.protocol = json.Get("protocol").String()
if c.protocol == "" {
c.protocol = protocolOpenAI
}
contextJson := json.Get("context")
if contextJson.Exists() {
c.context = &ContextConfig{}
c.context.FromJson(contextJson)
}
}
func (c *ProviderConfig) Validate() error {
if c.apiTokens == nil || len(c.apiTokens) == 0 {
return errors.New("no apiToken found in provider config")
}
if c.timeout < 0 {
return errors.New("invalid timeout in config")
}
if c.protocol != protocolOpenAI && c.protocol != protocolOriginal {
return errors.New("invalid protocol in config")
}
if c.context != nil {
if err := c.context.Validate(); err != nil {
return err
}
}
if c.typ == "" {
return errors.New("missing type in provider config")
}
initializer, has := providerInitializers[c.typ]
if !has {
return errors.New("unknown provider type: " + c.typ)
}
if err := initializer.ValidateConfig(*c); err != nil {
return err
}
return nil
}
func (c *ProviderConfig) GetRandomToken() string {
apiTokens := c.apiTokens
count := len(apiTokens)
switch count {
case 0:
return ""
case 1:
return apiTokens[0]
default:
return apiTokens[rand.Intn(count)]
}
}
func CreateProvider(pc ProviderConfig) (Provider, error) {
initializer, has := providerInitializers[pc.typ]
if !has {
return nil, errors.New("unknown provider type: " + pc.typ)
}
return initializer.CreateProvider(pc)
}
func getMappedModel(model string, modelMapping map[string]string, log wrapper.Log) string {
if modelMapping == nil || len(modelMapping) == 0 {
return model
}
if v, ok := modelMapping[model]; ok && len(v) != 0 {
log.Debugf("model %s is mapped to %s explictly", model, v)
return v
}
if v, ok := modelMapping[wildcard]; ok {
log.Debugf("model %s is mapped to %s via wildcard", model, v)
return v
}
return model
}

View File

@@ -0,0 +1,447 @@
package provider
import (
"encoding/json"
"errors"
"fmt"
"math"
"strings"
"time"
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-proxy/util"
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
)
// qwenProvider is the provider for Qwen service.
const (
qwenResultFormatMessage = "message"
qwenDomain = "dashscope.aliyuncs.com"
qwenChatCompletionPath = "/api/v1/services/aigc/text-generation/generation"
qwenTopPMin = 0.000001
qwenTopPMax = 0.999999
ctxKeyPushedMessageContent = "pushedMessageContent"
streamIdItemKey = "id:"
streamDataItemKey = "data:"
streamEndDataValue = "[DONE]"
streamEventHeader = "event: result\n:HTTP_STATUS/200\n"
)
type qwenProviderInitializer struct {
}
func (m *qwenProviderInitializer) ValidateConfig(config ProviderConfig) error {
return nil
}
func (m *qwenProviderInitializer) CreateProvider(config ProviderConfig) (Provider, error) {
return &qwenProvider{
config: config,
contextCache: createContextCache(&config),
}, nil
}
type qwenProvider struct {
config ProviderConfig
contextCache *contextCache
}
func (m *qwenProvider) GetProviderType() string {
return providerTypeQwen
}
const (
forceStreaming = true
)
func (m *qwenProvider) OnRequestHeaders(ctx wrapper.HttpContext, apiName ApiName, log wrapper.Log) (types.Action, error) {
if apiName != ApiNameChatCompletion {
return types.ActionContinue, errUnsupportedApiName
}
_ = util.OverwriteRequestPath(qwenChatCompletionPath)
_ = util.OverwriteRequestHost(qwenDomain)
_ = proxywasm.ReplaceHttpRequestHeader("Authorization", "Bearer "+m.config.GetRandomToken())
if m.config.protocol == protocolOriginal && m.config.context == nil {
ctx.DontReadRequestBody()
return types.ActionContinue, nil
}
if forceStreaming {
_ = proxywasm.RemoveHttpRequestHeader("Accept-Encoding")
_ = proxywasm.RemoveHttpRequestHeader("Content-Length")
_ = proxywasm.ReplaceHttpRequestHeader("Accept", "text/event-stream")
_ = proxywasm.ReplaceHttpRequestHeader("X-DashScope-SSE", "enable")
return types.ActionContinue, nil
} else {
// Delay the header processing to allow changing streaming mode in OnRequestBody
return types.HeaderStopIteration, nil
}
}
func (m *qwenProvider) OnRequestBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error) {
if apiName != ApiNameChatCompletion {
return types.ActionContinue, errUnsupportedApiName
}
if m.config.protocol == protocolOriginal {
if m.config.context == nil {
return types.ActionContinue, nil
}
request := &qwenTextGenRequest{}
if err := json.Unmarshal(body, request); err != nil {
return types.ActionContinue, fmt.Errorf("unable to unmarshal request: %v", err)
}
err := m.contextCache.GetContent(func(content string, err error) {
defer func() {
_ = proxywasm.ResumeHttpRequest()
}()
if err != nil {
log.Errorf("failed to load context file: %v", err)
_ = util.SendResponse(500, util.MimeTypeTextPlain, fmt.Sprintf("failed to load context file: %v", err))
}
m.insertContextMessage(request, content)
if err := replaceJsonRequestBody(request, log); err != nil {
_ = util.SendResponse(500, util.MimeTypeTextPlain, fmt.Sprintf("failed to replace request body: %v", err))
}
}, log)
if err == nil {
return types.ActionPause, nil
}
return types.ActionContinue, err
}
request := &chatCompletionRequest{}
if err := decodeChatCompletionRequest(body, request); err != nil {
return types.ActionContinue, err
}
model := request.Model
if model == "" {
return types.ActionContinue, errors.New("missing model in chat completion request")
}
ctx.SetContext(ctxKeyOriginalRequestModel, model)
mappedModel := getMappedModel(model, m.config.modelMapping, log)
if mappedModel == "" {
return types.ActionContinue, errors.New("model becomes empty after applying the configured mapping")
}
request.Model = mappedModel
ctx.SetContext(ctxKeyFinalRequestModel, request.Model)
if !forceStreaming {
if request.Stream {
_ = proxywasm.ReplaceHttpRequestHeader("Accept", "text/event-stream")
_ = proxywasm.ReplaceHttpRequestHeader("X-DashScope-SSE", "enable")
} else {
_ = proxywasm.ReplaceHttpRequestHeader("Accept", "*/*")
_ = proxywasm.RemoveHttpRequestHeader("X-DashScope-SSE")
}
}
if m.config.context == nil {
qwenRequest := m.buildQwenTextGenerationRequest(request)
return types.ActionContinue, replaceJsonRequestBody(qwenRequest, log)
}
err := m.contextCache.GetContent(func(content string, err error) {
defer func() {
_ = proxywasm.ResumeHttpRequest()
}()
if err != nil {
log.Errorf("failed to load context file: %v", err)
_ = util.SendResponse(500, util.MimeTypeTextPlain, fmt.Sprintf("failed to load context file: %v", err))
}
insertContextMessage(request, content)
qwenRequest := m.buildQwenTextGenerationRequest(request)
if err := replaceJsonRequestBody(qwenRequest, log); err != nil {
_ = util.SendResponse(500, util.MimeTypeTextPlain, fmt.Sprintf("failed to replace request body: %v", err))
}
}, log)
if err == nil {
return types.ActionPause, nil
}
return types.ActionContinue, err
}
func (m *qwenProvider) OnResponseHeaders(ctx wrapper.HttpContext, apiName ApiName, log wrapper.Log) (types.Action, error) {
if m.config.protocol == protocolOriginal {
ctx.DontReadResponseBody()
return types.ActionContinue, nil
}
_ = proxywasm.RemoveHttpResponseHeader("Content-Length")
return types.ActionContinue, nil
}
func (m *qwenProvider) OnStreamingResponseBody(ctx wrapper.HttpContext, name ApiName, chunk []byte, isLastChunk bool, log wrapper.Log) ([]byte, error) {
receivedBody := chunk
if bufferedStreamingBody, has := ctx.GetContext(ctxKeyStreamingBody).([]byte); has {
receivedBody = append(bufferedStreamingBody, chunk...)
}
eventStartIndex, lineStartIndex, valueStartIndex := 0, -1, -1
defer func() {
if eventStartIndex != -1 {
// Just in case the received chunk is not a complete event.
ctx.SetContext(ctxKeyStreamingBody, receivedBody[eventStartIndex:])
} else {
ctx.SetContext(ctxKeyStreamingBody, nil)
}
}()
// Sample event response:
var responseBuilder strings.Builder
currentEventId, currentKey := "", ""
i, length := 0, len(receivedBody)
for i = 0; i < length; i++ {
ch := receivedBody[i]
if ch != '\n' {
if lineStartIndex == -1 {
lineStartIndex = i
valueStartIndex = -1
}
if valueStartIndex == -1 {
if ch == ':' {
valueStartIndex = i + 1
currentKey = string(receivedBody[lineStartIndex:valueStartIndex])
}
} else if valueStartIndex == i && ch == ' ' {
// Skip leading spaces in data.
valueStartIndex = i + 1
}
continue
}
if lineStartIndex == -1 {
// Extra new line, Should be an event separator.
eventStartIndex = i + 1
continue
}
key := currentKey
value := receivedBody[valueStartIndex:i]
// Reset message parsing state.
eventStartIndex = -1
lineStartIndex = -1
valueStartIndex = -1
currentKey = ""
switch key {
case streamIdItemKey:
currentEventId = string(value)
break
case streamDataItemKey:
if err := m.convertStreamEvent(ctx, &responseBuilder, currentEventId, value, log); err != nil {
return nil, err
}
break
default:
break
}
}
modifiedResponseChunk := responseBuilder.String()
log.Debugf("=== modified response chunk: %s", modifiedResponseChunk)
return []byte(modifiedResponseChunk), nil
}
func (m *qwenProvider) OnResponseBody(ctx wrapper.HttpContext, apiName ApiName, body []byte, log wrapper.Log) (types.Action, error) {
qwenResponse := &qwenTextGenResponse{}
if err := json.Unmarshal(body, qwenResponse); err != nil {
return types.ActionContinue, fmt.Errorf("unable to unmarshal Qwen response: %v", err)
}
response := m.buildChatCompletionResponse(ctx, qwenResponse)
return types.ActionContinue, replaceJsonResponseBody(response, log)
}
func (m *qwenProvider) buildQwenTextGenerationRequest(origRequest *chatCompletionRequest) *qwenTextGenRequest {
return &qwenTextGenRequest{
Model: origRequest.Model,
Input: qwenTextGenInput{
Messages: origRequest.Messages,
},
Parameters: qwenTextGenParameters{
ResultFormat: qwenResultFormatMessage,
MaxTokens: origRequest.MaxTokens,
N: origRequest.N,
Seed: origRequest.Seed,
Temperature: origRequest.Temperature,
TopP: math.Max(qwenTopPMin, math.Min(origRequest.TopP, qwenTopPMax)),
},
}
}
func (m *qwenProvider) buildChatCompletionResponse(ctx wrapper.HttpContext, qwenResponse *qwenTextGenResponse) *chatCompletionResponse {
choices := make([]chatCompletionChoice, 0, len(qwenResponse.Output.Choices))
for _, qwenChoice := range qwenResponse.Output.Choices {
choices = append(choices, chatCompletionChoice{
Message: &qwenChoice.Message,
FinishReason: qwenChoice.FinishReason,
})
}
return &chatCompletionResponse{
Id: qwenResponse.RequestId,
Created: time.Now().UnixMilli() / 1000,
Model: ctx.GetContext(ctxKeyFinalRequestModel).(string),
SystemFingerprint: "",
Object: objectChatCompletion,
Choices: choices,
Usage: chatCompletionUsage{
PromptTokens: qwenResponse.Usage.InputTokens,
CompletionTokens: qwenResponse.Usage.OutputTokens,
TotalTokens: qwenResponse.Usage.TotalTokens,
},
}
}
func (m *qwenProvider) buildChatCompletionStreamingResponse(ctx wrapper.HttpContext, qwenResponse *qwenTextGenResponse) []*chatCompletionResponse {
baseMessage := chatCompletionResponse{
Id: qwenResponse.RequestId,
Created: time.Now().UnixMilli() / 1000,
Model: ctx.GetContext(ctxKeyFinalRequestModel).(string),
SystemFingerprint: "",
Object: objectChatCompletionChunk,
}
responses := make([]*chatCompletionResponse, 0)
qwenChoice := qwenResponse.Output.Choices[0]
message := qwenChoice.Message
content := message.Content
if rawPushedContent := ctx.GetContext(ctxKeyPushedMessageContent); rawPushedContent != nil {
if pushedContent := rawPushedContent.(string); pushedContent != "" && strings.HasPrefix(content, pushedContent) {
content = content[len(pushedContent):]
}
}
if content != "" {
deltaResponse := *&baseMessage
deltaResponse.Choices = append(deltaResponse.Choices, chatCompletionChoice{Delta: &chatMessage{Role: message.Role, Content: content}})
responses = append(responses, &deltaResponse)
ctx.SetContext(ctxKeyPushedMessageContent, message.Content)
}
// Yes, Qwen uses a string "null" as null.
if qwenChoice.FinishReason != "" && qwenChoice.FinishReason != "null" {
finishResponse := *&baseMessage
finishResponse.Choices = append(finishResponse.Choices, chatCompletionChoice{FinishReason: qwenChoice.FinishReason})
responses = append(responses, &finishResponse)
}
return responses
}
func (m *qwenProvider) convertStreamEvent(ctx wrapper.HttpContext, responseBuilder *strings.Builder, eventId string, eventData []byte, log wrapper.Log) error {
if string(eventData) == streamEndDataValue {
responseBuilder.WriteString(streamIdItemKey)
responseBuilder.WriteString(eventId)
responseBuilder.WriteString("\n")
responseBuilder.WriteString(streamEventHeader)
responseBuilder.WriteString(streamDataItemKey)
responseBuilder.WriteString(streamEndDataValue)
responseBuilder.WriteString("\n\n")
return nil
}
qwenResponse := &qwenTextGenResponse{}
if err := json.Unmarshal(eventData, qwenResponse); err != nil {
log.Errorf("unable to unmarshal Qwen response: %v", err)
return fmt.Errorf("unable to unmarshal Qwen response: %v", err)
}
responses := m.buildChatCompletionStreamingResponse(ctx, qwenResponse)
for _, response := range responses {
responseBody, err := json.Marshal(response)
if err != nil {
log.Errorf("unable to marshal response: %v", err)
return fmt.Errorf("unable to marshal response: %v", err)
}
responseBuilder.WriteString(streamIdItemKey)
responseBuilder.WriteString(eventId)
responseBuilder.WriteString("\n")
responseBuilder.WriteString(streamEventHeader)
responseBuilder.WriteString(streamDataItemKey)
responseBuilder.Write(responseBody)
responseBuilder.WriteString("\n\n")
}
return nil
}
func (m *qwenProvider) insertContextMessage(request *qwenTextGenRequest, content string) {
fileMessage := chatMessage{
Role: roleSystem,
Content: content,
}
firstNonSystemMessageIndex := -1
messages := request.Input.Messages
if messages != nil {
for i, message := range request.Input.Messages {
if message.Role != roleSystem {
firstNonSystemMessageIndex = i
break
}
}
}
if firstNonSystemMessageIndex == -1 {
request.Input.Messages = append(request.Input.Messages, fileMessage)
} else {
request.Input.Messages = append(request.Input.Messages[:firstNonSystemMessageIndex], append([]chatMessage{fileMessage}, request.Input.Messages[firstNonSystemMessageIndex:]...)...)
}
}
type qwenTextGenRequest struct {
Model string `json:"model"`
Input qwenTextGenInput `json:"input"`
Parameters qwenTextGenParameters `json:"parameters,omitempty"`
}
type qwenTextGenInput struct {
Messages []chatMessage `json:"messages"`
}
type qwenTextGenParameters struct {
ResultFormat string `json:"result_format,omitempty"`
MaxTokens int `json:"max_tokens,omitempty"`
RepetitionPenalty float64 `json:"repetition_penalty,omitempty"`
N int `json:"n,omitempty"`
Seed int `json:"seed,omitempty"`
Temperature float64 `json:"temperature,omitempty"`
TopP float64 `json:"top_p,omitempty"`
}
type qwenTextGenResponse struct {
RequestId string `json:"request_id"`
Output qwenTextGenOutput `json:"output"`
Usage qwenTextGenUsage `json:"usage"`
}
type qwenTextGenOutput struct {
FinishReason string `json:"finish_reason"`
Choices []qwenTextGenChoice `json:"choices"`
}
type qwenTextGenChoice struct {
FinishReason string `json:"finish_reason"`
Message chatMessage `json:"message"`
}
type qwenTextGenUsage struct {
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
TotalTokens int `json:"total_tokens"`
}

View File

@@ -0,0 +1,65 @@
package provider
import (
"encoding/json"
"errors"
"fmt"
"github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
)
func decodeChatCompletionRequest(body []byte, request *chatCompletionRequest) error {
if err := json.Unmarshal(body, request); err != nil {
return fmt.Errorf("unable to unmarshal request: %v", err)
}
if request.Messages == nil || len(request.Messages) == 0 {
return errors.New("no message found in the request body")
}
return nil
}
func replaceJsonRequestBody(request interface{}, log wrapper.Log) error {
body, err := json.Marshal(request)
if err != nil {
return fmt.Errorf("unable to marshal request: %v", err)
}
log.Debugf("request body: %s", string(body))
err = proxywasm.ReplaceHttpRequestBody(body)
if err != nil {
return fmt.Errorf("unable to replace the original request body: %v", err)
}
return err
}
func insertContextMessage(request *chatCompletionRequest, content string) {
fileMessage := chatMessage{
Role: roleSystem,
Content: content,
}
firstNonSystemMessageIndex := -1
for i, message := range request.Messages {
if message.Role != roleSystem {
firstNonSystemMessageIndex = i
break
}
}
if firstNonSystemMessageIndex == -1 {
request.Messages = append(request.Messages, fileMessage)
} else {
request.Messages = append(request.Messages[:firstNonSystemMessageIndex], append([]chatMessage{fileMessage}, request.Messages[firstNonSystemMessageIndex:]...)...)
}
}
func replaceJsonResponseBody(response interface{}, log wrapper.Log) error {
body, err := json.Marshal(response)
if err != nil {
return fmt.Errorf("unable to marshal response: %v", err)
}
log.Debugf("response body: %s", string(body))
err = proxywasm.ReplaceHttpResponseBody(body)
if err != nil {
return fmt.Errorf("unable to replace the original response body: %v", err)
}
return err
}