diff --git a/plugins/wasm-go/extensions/ai-load-balancer/README.md b/plugins/wasm-go/extensions/ai-load-balancer/README.md index 0ec11ebf8..c1c52788e 100644 --- a/plugins/wasm-go/extensions/ai-load-balancer/README.md +++ b/plugins/wasm-go/extensions/ai-load-balancer/README.md @@ -27,7 +27,7 @@ description: 针对LLM服务的负载均衡策略 `lb_type` 为 `cluster` 时支持的负载均衡策略包括: - `cluster_metrics`: 基于网关统计的不同service的指标进行服务之间的负载均衡 - +- `cluster_hash`: 读取指定请求头做 FNV-1a 一致性 hash,将相同 key 的请求始终路由到同一 cluster,支持按权重分配流量 # 全局最小请求数 ## 功能说明 @@ -217,7 +217,7 @@ lb_config: | `mode` | string | 必填 | | 如何使用服务级指标做负载均衡,当前支持`[LeastBusy, LeastTotalLatency, LeastFirstTokenLatency ]` | | `service_list` | []string | 必填 | | 路由后端服务列表 | | `rate_limit` | string | 选填 | 1 | 单个服务处理请求比例上限,取值范围0~1 | -| `cluster_header` | string | 选填 | `x-envoy-target-cluster` | 通过取该header的值得知需要路由到哪个后端服务 | +| `cluster_header` | string | 选填 | `x-higress-target-cluster` | 通过取该header的值得知需要路由到哪个后端服务 | | `queue_size` | int | 选填 | 100 | 根据最近的多少个请求进行观测指标的计算 | `mode` 各取值含义如下: @@ -237,4 +237,46 @@ lb_config: service_list: - outbound|80||test-1.dns - outbound|80||test-2.static -``` \ No newline at end of file +``` + +# Cluster Hash(一致性 Hash 路由) + +## 功能说明 + +读取指定请求头的值,使用 FNV-1a 一致性 hash 算法将请求路由到固定的上游集群,确保相同 hash key 的请求始终落到同一个 cluster,同时支持按百分比权重控制各 cluster 的流量分配。 + +需要配合 EnvoyFilter 的 `cluster_header` 机制一起使用。 + +## 配置说明 + +| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 | +|------|----------|----------|--------|------| +| `clusters` | []ClusterEntry | 必填 | - | cluster 列表,所有 `weight` 之和必须为 100 | +| `hash_header` | string | 选填 | `x-mse-consumer` | 读取 hash key 的请求头名称 | +| `cluster_header` | string | 选填 | `x-higress-target-cluster` | 写入目标 cluster 的请求头名称 | + +### ClusterEntry 字段 + +| 名称 | 类型 | 必填 | 说明 | +|------|------|------|------| +| `cluster` | string | 是 | 上游集群名称,如 `outbound|443||llm-xxx.internal.static` | +| `weight` | int | 是 | 百分比权重,所有 cluster 的 weight 之和必须为 100 | + +## 配置示例 + +```yaml +lb_type: cluster +lb_policy: cluster_hash +lb_config: + clusters: + - cluster: "outbound|80||llm-test1.internal.static" + weight: 69 + - cluster: "outbound|443||llm-test2.internal.dns" + weight: 30 + - cluster: "outbound|443||llm-test3.internal.dns" + weight: 1 + hash_header: x-mse-consumer + cluster_header: x-higress-target-cluster +``` + +若请求缺少 hash header,插件直接返回 **403**。 \ No newline at end of file diff --git a/plugins/wasm-go/extensions/ai-load-balancer/README_EN.md b/plugins/wasm-go/extensions/ai-load-balancer/README_EN.md index de6f9110e..ac2bfe93b 100644 --- a/plugins/wasm-go/extensions/ai-load-balancer/README_EN.md +++ b/plugins/wasm-go/extensions/ai-load-balancer/README_EN.md @@ -27,6 +27,7 @@ When `lb_type = endpoint`, current supported load balance policies are: When `lb_type = cluster`, current supported load balance policies are: - `cluster_metrics`: Load balancing based on metrics of clusters +- `cluster_hash`: Consistent hash routing based on a request header value, always routing the same hash key to the same cluster, with weighted traffic distribution # Global Least Request @@ -218,7 +219,7 @@ lb_config: | `mode` | string | required | | how to use cluster metrics, value of `[LeastBusy, LeastTotalLatency, LeastFirstTokenLatency ]` | | `service_list` | []string | required | | service list of current route | | `rate_limit` | string | optional | 1 | The maximum percentage of requests a single node can receive, value of 0~1 | -| `cluster_header` | string | optional | `x-envoy-target-cluster` | By retrieving the value of this header, we can determine which backend service to route to | +| `cluster_header` | string | optional | `x-higress-target-cluster` | By retrieving the value of this header, we can determine which backend service to route to | | `queue_size` | int | optional | 100 | The metrics is calculated based on the number of most recent requests. | The meanings of the values ​​for `mode` are as follows: @@ -238,4 +239,46 @@ lb_config: service_list: - outbound|80||test-1.dns - outbound|80||test-2.static -``` \ No newline at end of file +``` + +# Cluster Hash + +## Introduction + +Reads a specified request header value and uses FNV-1a consistent hashing to route requests to a fixed upstream cluster. The same hash key always maps to the same cluster, while weighted distribution controls traffic allocation across clusters. + +Requires EnvoyFilter `cluster_header` mechanism to be enabled. + +## Configuration + +| Name | Type | Required | Default | Description | +|------|------|----------|---------|-------------| +| `clusters` | []ClusterEntry | required | - | Cluster list. Sum of all `weight` values must be 100 | +| `hash_header` | string | optional | `x-mse-consumer` | Request header name to read the hash key from | +| `cluster_header` | string | optional | `x-higress-target-cluster` | Request header name to write the selected cluster into | + +### ClusterEntry Fields + +| Name | Type | Required | Description | +|------|------|----------|-------------| +| `cluster` | string | yes | Upstream cluster name, e.g. `outbound\|443\|\|llm-xxx.internal.static` | +| `weight` | int | yes | Percentage weight. Sum of all cluster weights must be 100 | + +## Configuration Example + +```yaml +lb_type: cluster +lb_policy: cluster_hash +lb_config: + clusters: + - cluster: "outbound|80||llm-test1.internal.static" + weight: 69 + - cluster: "outbound|443||llm-test2.internal.dns" + weight: 30 + - cluster: "outbound|443||llm-test3.internal.dns" + weight: 1 + hash_header: x-mse-consumer + cluster_header: x-higress-target-cluster +``` + +If the request is missing the hash header, the plugin returns **403** directly. \ No newline at end of file diff --git a/plugins/wasm-go/extensions/ai-load-balancer/cluster_hash/lb_policy.go b/plugins/wasm-go/extensions/ai-load-balancer/cluster_hash/lb_policy.go new file mode 100644 index 000000000..f7f8a39f1 --- /dev/null +++ b/plugins/wasm-go/extensions/ai-load-balancer/cluster_hash/lb_policy.go @@ -0,0 +1,123 @@ +package cluster_hash + +import ( + "fmt" + "hash/fnv" + + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" + "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" + "github.com/higress-group/wasm-go/pkg/log" + "github.com/higress-group/wasm-go/pkg/wrapper" + "github.com/tidwall/gjson" +) + +const ( + DefaultHashHeader = "x-mse-consumer" + DefaultClusterHeader = "x-higress-target-cluster" +) + +type clusterEntry struct { + Cluster string + Weight int +} + +type ClusterHashLoadBalancer struct { + HashHeader string + ClusterHeader string + // slots is expanded from clusters by weight, length == 100. + slots []string +} + +func NewClusterHashLoadBalancer(json gjson.Result) (ClusterHashLoadBalancer, error) { + lb := ClusterHashLoadBalancer{} + + lb.HashHeader = json.Get("hash_header").String() + if lb.HashHeader == "" { + lb.HashHeader = DefaultHashHeader + } + + lb.ClusterHeader = json.Get("cluster_header").String() + if lb.ClusterHeader == "" { + lb.ClusterHeader = DefaultClusterHeader + } + + clustersJson := json.Get("clusters") + if !clustersJson.Exists() || !clustersJson.IsArray() || len(clustersJson.Array()) == 0 { + return lb, fmt.Errorf("clusters is required and must be a non-empty array") + } + + var clusters []clusterEntry + var totalWeight int + for _, c := range clustersJson.Array() { + cluster := c.Get("cluster").String() + if cluster == "" { + return lb, fmt.Errorf("each entry must have a non-empty cluster field") + } + weight := int(c.Get("weight").Int()) + if weight <= 0 { + return lb, fmt.Errorf("cluster %q has invalid weight %d, must be > 0", cluster, weight) + } + clusters = append(clusters, clusterEntry{Cluster: cluster, Weight: weight}) + totalWeight += weight + } + + if totalWeight != 100 { + return lb, fmt.Errorf("sum of cluster weights must be 100, got %d", totalWeight) + } + + slots := make([]string, 0, 100) + for _, c := range clusters { + for i := 0; i < c.Weight; i++ { + slots = append(slots, c.Cluster) + } + } + lb.slots = slots + return lb, nil +} + +func (lb ClusterHashLoadBalancer) selectCluster(hashKey string) string { + h := fnv.New32a() + h.Write([]byte(hashKey)) + index := int(h.Sum32()) % len(lb.slots) + if index < 0 { + index += len(lb.slots) + } + return lb.slots[index] +} + +func (lb ClusterHashLoadBalancer) HandleHttpRequestHeaders(ctx wrapper.HttpContext) types.Action { + hashKey, err := proxywasm.GetHttpRequestHeader(lb.HashHeader) + if err != nil || hashKey == "" { + log.Warnf("[ai-load-balancer/cluster_hash] missing hash header %q, rejecting request", lb.HashHeader) + _ = proxywasm.SendHttpResponse(403, nil, []byte("hash header required"), -1) + return types.ActionPause + } + + cluster := lb.selectCluster(hashKey) + if err := proxywasm.ReplaceHttpRequestHeader(lb.ClusterHeader, cluster); err != nil { + log.Errorf("[ai-load-balancer/cluster_hash] failed to set target header: %v", err) + _ = proxywasm.SendHttpResponse(500, nil, []byte("internal error"), -1) + return types.ActionPause + } + + log.Debugf("[ai-load-balancer/cluster_hash] %s=%s -> %s=%s", lb.HashHeader, hashKey, lb.ClusterHeader, cluster) + return types.ActionContinue +} + +func (lb ClusterHashLoadBalancer) HandleHttpRequestBody(ctx wrapper.HttpContext, body []byte) types.Action { + return types.ActionContinue +} + +func (lb ClusterHashLoadBalancer) HandleHttpResponseHeaders(ctx wrapper.HttpContext) types.Action { + return types.ActionContinue +} + +func (lb ClusterHashLoadBalancer) HandleHttpStreamingResponseBody(ctx wrapper.HttpContext, data []byte, endOfStream bool) []byte { + return data +} + +func (lb ClusterHashLoadBalancer) HandleHttpResponseBody(ctx wrapper.HttpContext, body []byte) types.Action { + return types.ActionContinue +} + +func (lb ClusterHashLoadBalancer) HandleHttpStreamDone(ctx wrapper.HttpContext) {} diff --git a/plugins/wasm-go/extensions/ai-load-balancer/cluster_hash/lb_policy_test.go b/plugins/wasm-go/extensions/ai-load-balancer/cluster_hash/lb_policy_test.go new file mode 100644 index 000000000..695819978 --- /dev/null +++ b/plugins/wasm-go/extensions/ai-load-balancer/cluster_hash/lb_policy_test.go @@ -0,0 +1,171 @@ +package cluster_hash + +import ( + "fmt" + "testing" + + "github.com/tidwall/gjson" +) + +func TestParseConfig_Valid(t *testing.T) { + json := gjson.Parse(`{ + "clusters": [ + {"cluster": "outbound|443||llm-a.internal.dns", "weight": 70}, + {"cluster": "outbound|443||llm-b.internal.dns", "weight": 30} + ] + }`) + lb, err := NewClusterHashLoadBalancer(json) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lb.HashHeader != DefaultHashHeader { + t.Errorf("expected default hash_header %q, got %q", DefaultHashHeader, lb.HashHeader) + } + if lb.ClusterHeader != DefaultClusterHeader { + t.Errorf("expected default cluster_header %q, got %q", DefaultClusterHeader, lb.ClusterHeader) + } + if len(lb.slots) != 100 { + t.Errorf("expected 100 slots, got %d", len(lb.slots)) + } +} + +func TestParseConfig_CustomHeaders(t *testing.T) { + json := gjson.Parse(`{ + "hash_header": "x-custom-key", + "cluster_header": "x-custom-target", + "clusters": [ + {"cluster": "outbound|443||llm-a.internal.dns", "weight": 100} + ] + }`) + lb, err := NewClusterHashLoadBalancer(json) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lb.HashHeader != "x-custom-key" { + t.Errorf("got hash_header %q", lb.HashHeader) + } + if lb.ClusterHeader != "x-custom-target" { + t.Errorf("got cluster_header %q", lb.ClusterHeader) + } +} + +func TestParseConfig_WeightNotSum100(t *testing.T) { + json := gjson.Parse(`{ + "clusters": [ + {"cluster": "outbound|443||llm-a.internal.dns", "weight": 60}, + {"cluster": "outbound|443||llm-b.internal.dns", "weight": 30} + ] + }`) + if _, err := NewClusterHashLoadBalancer(json); err == nil { + t.Fatal("expected error for weights not summing to 100") + } +} + +func TestParseConfig_EmptyClusters(t *testing.T) { + json := gjson.Parse(`{"clusters": []}`) + if _, err := NewClusterHashLoadBalancer(json); err == nil { + t.Fatal("expected error for empty clusters") + } +} + +func TestParseConfig_MissingClusters(t *testing.T) { + json := gjson.Parse(`{}`) + if _, err := NewClusterHashLoadBalancer(json); err == nil { + t.Fatal("expected error for missing clusters field") + } +} + +func TestParseConfig_MissingClusterField(t *testing.T) { + json := gjson.Parse(`{ + "clusters": [ + {"weight": 100} + ] + }`) + if _, err := NewClusterHashLoadBalancer(json); err == nil { + t.Fatal("expected error for missing cluster field") + } +} + +func TestParseConfig_ZeroWeight(t *testing.T) { + json := gjson.Parse(`{ + "clusters": [ + {"cluster": "outbound|443||llm-a.internal.dns", "weight": 0}, + {"cluster": "outbound|443||llm-b.internal.dns", "weight": 100} + ] + }`) + if _, err := NewClusterHashLoadBalancer(json); err == nil { + t.Fatal("expected error for zero weight") + } +} + +func TestSelectCluster_Consistency(t *testing.T) { + lb := buildLB(t, []clusterEntry{ + {Cluster: "outbound|443||llm-a.internal.dns", Weight: 50}, + {Cluster: "outbound|443||llm-b.internal.dns", Weight: 50}, + }) + + key := "alice" + first := lb.selectCluster(key) + for range 10 { + if got := lb.selectCluster(key); got != first { + t.Errorf("inconsistent result for same key: got %q, want %q", got, first) + } + } +} + +func TestSelectCluster_Distribution(t *testing.T) { + clusterA := "outbound|443||llm-a.internal.dns" + clusterB := "outbound|443||llm-b.internal.dns" + lb := buildLB(t, []clusterEntry{ + {Cluster: clusterA, Weight: 70}, + {Cluster: clusterB, Weight: 30}, + }) + + hasA, hasB := false, false + for _, c := range lb.slots { + switch c { + case clusterA: + hasA = true + case clusterB: + hasB = true + } + } + if !hasA || !hasB { + t.Fatalf("weight-expanded slots must include both clusters, hasA=%v hasB=%v", hasA, hasB) + } + + seen := map[string]struct{}{} + for i := 0; i < 256 && len(seen) < 2; i++ { + seen[lb.selectCluster(fmt.Sprintf("key-%d", i))] = struct{}{} + } + if len(seen) < 2 { + t.Errorf("expected hash routing to reach at least 2 clusters, got %v", seen) + } +} + +func TestSelectCluster_SingleCluster(t *testing.T) { + target := "outbound|443||llm-a.internal.dns" + lb := buildLB(t, []clusterEntry{ + {Cluster: target, Weight: 100}, + }) + for _, key := range []string{"alice", "bob", "carol"} { + if got := lb.selectCluster(key); got != target { + t.Errorf("single cluster: expected %q, got %q for key %q", target, got, key) + } + } +} + +func buildLB(t *testing.T, entries []clusterEntry) ClusterHashLoadBalancer { + t.Helper() + slots := make([]string, 0, 100) + for _, e := range entries { + for i := 0; i < e.Weight; i++ { + slots = append(slots, e.Cluster) + } + } + return ClusterHashLoadBalancer{ + HashHeader: DefaultHashHeader, + ClusterHeader: DefaultClusterHeader, + slots: slots, + } +} diff --git a/plugins/wasm-go/extensions/ai-load-balancer/main.go b/plugins/wasm-go/extensions/ai-load-balancer/main.go index 74e722265..333cbc639 100644 --- a/plugins/wasm-go/extensions/ai-load-balancer/main.go +++ b/plugins/wasm-go/extensions/ai-load-balancer/main.go @@ -8,6 +8,7 @@ import ( "github.com/tidwall/gjson" "github.com/alibaba/higress/plugins/wasm-go/extensions/ai-load-balancer/cluster_metrics" + "github.com/alibaba/higress/plugins/wasm-go/extensions/ai-load-balancer/cluster_hash" "github.com/alibaba/higress/plugins/wasm-go/extensions/ai-load-balancer/endpoint_metrics" "github.com/alibaba/higress/plugins/wasm-go/extensions/ai-load-balancer/global_least_request" "github.com/alibaba/higress/plugins/wasm-go/extensions/ai-load-balancer/prefix_cache" @@ -48,6 +49,7 @@ const ( EndpointLoadBalancerType = "endpoint" // Cluster load balancer policies MetricsBasedCluster = "cluster_metrics" + ClusterHashCluster = "cluster_hash" // Endpoint load balancer policies MetricsBasedEndpoint = "endpoint_metrics" MetricsBasedEndpointDeprecated = "metrics_based" // Compatible with old configurations, equal to `endpoint_metrics` @@ -68,6 +70,8 @@ func parseConfig(json gjson.Result, config *Config) error { switch config.lbPolicy { case MetricsBasedCluster: config.lb, err = cluster_metrics.NewClusterEndpointLoadBalancer(json.Get("lb_config")) + case ClusterHashCluster: + config.lb, err = cluster_hash.NewClusterHashLoadBalancer(json.Get("lb_config")) default: err = fmt.Errorf("lb_policy %s is not supported", config.lbPolicy) }