feat(ai-load-balancer): add cluster_hash load balancing policy with FNV-1a consistent hashing (#3898)

Signed-off-by: zat366 <authentic.zhao@gmail.com>
This commit is contained in:
zat366
2026-06-01 10:19:46 +08:00
committed by GitHub
parent c21a38e783
commit 52c99eb27d
5 changed files with 388 additions and 5 deletions

View File

@@ -27,7 +27,7 @@ description: 针对LLM服务的负载均衡策略
`lb_type``cluster` 时支持的负载均衡策略包括:
- `cluster_metrics`: 基于网关统计的不同service的指标进行服务之间的负载均衡
- `cluster_hash`: 读取指定请求头做 FNV-1a 一致性 hash将相同 key 的请求始终路由到同一 cluster支持按权重分配流量
# 全局最小请求数
## 功能说明
@@ -217,7 +217,7 @@ lb_config:
| `mode` | string | 必填 | | 如何使用服务级指标做负载均衡,当前支持`[LeastBusy, LeastTotalLatency, LeastFirstTokenLatency ]` |
| `service_list` | []string | 必填 | | 路由后端服务列表 |
| `rate_limit` | string | 选填 | 1 | 单个服务处理请求比例上限取值范围0~1 |
| `cluster_header` | string | 选填 | `x-envoy-target-cluster` | 通过取该header的值得知需要路由到哪个后端服务 |
| `cluster_header` | string | 选填 | `x-higress-target-cluster` | 通过取该header的值得知需要路由到哪个后端服务 |
| `queue_size` | int | 选填 | 100 | 根据最近的多少个请求进行观测指标的计算 |
`mode` 各取值含义如下:
@@ -237,4 +237,46 @@ lb_config:
service_list:
- outbound|80||test-1.dns
- outbound|80||test-2.static
```
```
# Cluster Hash一致性 Hash 路由)
## 功能说明
读取指定请求头的值,使用 FNV-1a 一致性 hash 算法将请求路由到固定的上游集群,确保相同 hash key 的请求始终落到同一个 cluster同时支持按百分比权重控制各 cluster 的流量分配。
需要配合 EnvoyFilter 的 `cluster_header` 机制一起使用。
## 配置说明
| 名称 | 数据类型 | 填写要求 | 默认值 | 描述 |
|------|----------|----------|--------|------|
| `clusters` | []ClusterEntry | 必填 | - | cluster 列表,所有 `weight` 之和必须为 100 |
| `hash_header` | string | 选填 | `x-mse-consumer` | 读取 hash key 的请求头名称 |
| `cluster_header` | string | 选填 | `x-higress-target-cluster` | 写入目标 cluster 的请求头名称 |
### ClusterEntry 字段
| 名称 | 类型 | 必填 | 说明 |
|------|------|------|------|
| `cluster` | string | 是 | 上游集群名称,如 `outbound|443||llm-xxx.internal.static` |
| `weight` | int | 是 | 百分比权重,所有 cluster 的 weight 之和必须为 100 |
## 配置示例
```yaml
lb_type: cluster
lb_policy: cluster_hash
lb_config:
clusters:
- cluster: "outbound|80||llm-test1.internal.static"
weight: 69
- cluster: "outbound|443||llm-test2.internal.dns"
weight: 30
- cluster: "outbound|443||llm-test3.internal.dns"
weight: 1
hash_header: x-mse-consumer
cluster_header: x-higress-target-cluster
```
若请求缺少 hash header插件直接返回 **403**

View File

@@ -27,6 +27,7 @@ When `lb_type = endpoint`, current supported load balance policies are:
When `lb_type = cluster`, current supported load balance policies are:
- `cluster_metrics`: Load balancing based on metrics of clusters
- `cluster_hash`: Consistent hash routing based on a request header value, always routing the same hash key to the same cluster, with weighted traffic distribution
# Global Least Request
@@ -218,7 +219,7 @@ lb_config:
| `mode` | string | required | | how to use cluster metrics, value of `[LeastBusy, LeastTotalLatency, LeastFirstTokenLatency ]` |
| `service_list` | []string | required | | service list of current route |
| `rate_limit` | string | optional | 1 | The maximum percentage of requests a single node can receive, value of 0~1 |
| `cluster_header` | string | optional | `x-envoy-target-cluster` | By retrieving the value of this header, we can determine which backend service to route to |
| `cluster_header` | string | optional | `x-higress-target-cluster` | By retrieving the value of this header, we can determine which backend service to route to |
| `queue_size` | int | optional | 100 | The metrics is calculated based on the number of most recent requests. |
The meanings of the values for `mode` are as follows:
@@ -238,4 +239,46 @@ lb_config:
service_list:
- outbound|80||test-1.dns
- outbound|80||test-2.static
```
```
# Cluster Hash
## Introduction
Reads a specified request header value and uses FNV-1a consistent hashing to route requests to a fixed upstream cluster. The same hash key always maps to the same cluster, while weighted distribution controls traffic allocation across clusters.
Requires EnvoyFilter `cluster_header` mechanism to be enabled.
## Configuration
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| `clusters` | []ClusterEntry | required | - | Cluster list. Sum of all `weight` values must be 100 |
| `hash_header` | string | optional | `x-mse-consumer` | Request header name to read the hash key from |
| `cluster_header` | string | optional | `x-higress-target-cluster` | Request header name to write the selected cluster into |
### ClusterEntry Fields
| Name | Type | Required | Description |
|------|------|----------|-------------|
| `cluster` | string | yes | Upstream cluster name, e.g. `outbound\|443\|\|llm-xxx.internal.static` |
| `weight` | int | yes | Percentage weight. Sum of all cluster weights must be 100 |
## Configuration Example
```yaml
lb_type: cluster
lb_policy: cluster_hash
lb_config:
clusters:
- cluster: "outbound|80||llm-test1.internal.static"
weight: 69
- cluster: "outbound|443||llm-test2.internal.dns"
weight: 30
- cluster: "outbound|443||llm-test3.internal.dns"
weight: 1
hash_header: x-mse-consumer
cluster_header: x-higress-target-cluster
```
If the request is missing the hash header, the plugin returns **403** directly.

View File

@@ -0,0 +1,123 @@
package cluster_hash
import (
"fmt"
"hash/fnv"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
"github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types"
"github.com/higress-group/wasm-go/pkg/log"
"github.com/higress-group/wasm-go/pkg/wrapper"
"github.com/tidwall/gjson"
)
const (
DefaultHashHeader = "x-mse-consumer"
DefaultClusterHeader = "x-higress-target-cluster"
)
type clusterEntry struct {
Cluster string
Weight int
}
type ClusterHashLoadBalancer struct {
HashHeader string
ClusterHeader string
// slots is expanded from clusters by weight, length == 100.
slots []string
}
func NewClusterHashLoadBalancer(json gjson.Result) (ClusterHashLoadBalancer, error) {
lb := ClusterHashLoadBalancer{}
lb.HashHeader = json.Get("hash_header").String()
if lb.HashHeader == "" {
lb.HashHeader = DefaultHashHeader
}
lb.ClusterHeader = json.Get("cluster_header").String()
if lb.ClusterHeader == "" {
lb.ClusterHeader = DefaultClusterHeader
}
clustersJson := json.Get("clusters")
if !clustersJson.Exists() || !clustersJson.IsArray() || len(clustersJson.Array()) == 0 {
return lb, fmt.Errorf("clusters is required and must be a non-empty array")
}
var clusters []clusterEntry
var totalWeight int
for _, c := range clustersJson.Array() {
cluster := c.Get("cluster").String()
if cluster == "" {
return lb, fmt.Errorf("each entry must have a non-empty cluster field")
}
weight := int(c.Get("weight").Int())
if weight <= 0 {
return lb, fmt.Errorf("cluster %q has invalid weight %d, must be > 0", cluster, weight)
}
clusters = append(clusters, clusterEntry{Cluster: cluster, Weight: weight})
totalWeight += weight
}
if totalWeight != 100 {
return lb, fmt.Errorf("sum of cluster weights must be 100, got %d", totalWeight)
}
slots := make([]string, 0, 100)
for _, c := range clusters {
for i := 0; i < c.Weight; i++ {
slots = append(slots, c.Cluster)
}
}
lb.slots = slots
return lb, nil
}
func (lb ClusterHashLoadBalancer) selectCluster(hashKey string) string {
h := fnv.New32a()
h.Write([]byte(hashKey))
index := int(h.Sum32()) % len(lb.slots)
if index < 0 {
index += len(lb.slots)
}
return lb.slots[index]
}
func (lb ClusterHashLoadBalancer) HandleHttpRequestHeaders(ctx wrapper.HttpContext) types.Action {
hashKey, err := proxywasm.GetHttpRequestHeader(lb.HashHeader)
if err != nil || hashKey == "" {
log.Warnf("[ai-load-balancer/cluster_hash] missing hash header %q, rejecting request", lb.HashHeader)
_ = proxywasm.SendHttpResponse(403, nil, []byte("hash header required"), -1)
return types.ActionPause
}
cluster := lb.selectCluster(hashKey)
if err := proxywasm.ReplaceHttpRequestHeader(lb.ClusterHeader, cluster); err != nil {
log.Errorf("[ai-load-balancer/cluster_hash] failed to set target header: %v", err)
_ = proxywasm.SendHttpResponse(500, nil, []byte("internal error"), -1)
return types.ActionPause
}
log.Debugf("[ai-load-balancer/cluster_hash] %s=%s -> %s=%s", lb.HashHeader, hashKey, lb.ClusterHeader, cluster)
return types.ActionContinue
}
func (lb ClusterHashLoadBalancer) HandleHttpRequestBody(ctx wrapper.HttpContext, body []byte) types.Action {
return types.ActionContinue
}
func (lb ClusterHashLoadBalancer) HandleHttpResponseHeaders(ctx wrapper.HttpContext) types.Action {
return types.ActionContinue
}
func (lb ClusterHashLoadBalancer) HandleHttpStreamingResponseBody(ctx wrapper.HttpContext, data []byte, endOfStream bool) []byte {
return data
}
func (lb ClusterHashLoadBalancer) HandleHttpResponseBody(ctx wrapper.HttpContext, body []byte) types.Action {
return types.ActionContinue
}
func (lb ClusterHashLoadBalancer) HandleHttpStreamDone(ctx wrapper.HttpContext) {}

View File

@@ -0,0 +1,171 @@
package cluster_hash
import (
"fmt"
"testing"
"github.com/tidwall/gjson"
)
func TestParseConfig_Valid(t *testing.T) {
json := gjson.Parse(`{
"clusters": [
{"cluster": "outbound|443||llm-a.internal.dns", "weight": 70},
{"cluster": "outbound|443||llm-b.internal.dns", "weight": 30}
]
}`)
lb, err := NewClusterHashLoadBalancer(json)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if lb.HashHeader != DefaultHashHeader {
t.Errorf("expected default hash_header %q, got %q", DefaultHashHeader, lb.HashHeader)
}
if lb.ClusterHeader != DefaultClusterHeader {
t.Errorf("expected default cluster_header %q, got %q", DefaultClusterHeader, lb.ClusterHeader)
}
if len(lb.slots) != 100 {
t.Errorf("expected 100 slots, got %d", len(lb.slots))
}
}
func TestParseConfig_CustomHeaders(t *testing.T) {
json := gjson.Parse(`{
"hash_header": "x-custom-key",
"cluster_header": "x-custom-target",
"clusters": [
{"cluster": "outbound|443||llm-a.internal.dns", "weight": 100}
]
}`)
lb, err := NewClusterHashLoadBalancer(json)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if lb.HashHeader != "x-custom-key" {
t.Errorf("got hash_header %q", lb.HashHeader)
}
if lb.ClusterHeader != "x-custom-target" {
t.Errorf("got cluster_header %q", lb.ClusterHeader)
}
}
func TestParseConfig_WeightNotSum100(t *testing.T) {
json := gjson.Parse(`{
"clusters": [
{"cluster": "outbound|443||llm-a.internal.dns", "weight": 60},
{"cluster": "outbound|443||llm-b.internal.dns", "weight": 30}
]
}`)
if _, err := NewClusterHashLoadBalancer(json); err == nil {
t.Fatal("expected error for weights not summing to 100")
}
}
func TestParseConfig_EmptyClusters(t *testing.T) {
json := gjson.Parse(`{"clusters": []}`)
if _, err := NewClusterHashLoadBalancer(json); err == nil {
t.Fatal("expected error for empty clusters")
}
}
func TestParseConfig_MissingClusters(t *testing.T) {
json := gjson.Parse(`{}`)
if _, err := NewClusterHashLoadBalancer(json); err == nil {
t.Fatal("expected error for missing clusters field")
}
}
func TestParseConfig_MissingClusterField(t *testing.T) {
json := gjson.Parse(`{
"clusters": [
{"weight": 100}
]
}`)
if _, err := NewClusterHashLoadBalancer(json); err == nil {
t.Fatal("expected error for missing cluster field")
}
}
func TestParseConfig_ZeroWeight(t *testing.T) {
json := gjson.Parse(`{
"clusters": [
{"cluster": "outbound|443||llm-a.internal.dns", "weight": 0},
{"cluster": "outbound|443||llm-b.internal.dns", "weight": 100}
]
}`)
if _, err := NewClusterHashLoadBalancer(json); err == nil {
t.Fatal("expected error for zero weight")
}
}
func TestSelectCluster_Consistency(t *testing.T) {
lb := buildLB(t, []clusterEntry{
{Cluster: "outbound|443||llm-a.internal.dns", Weight: 50},
{Cluster: "outbound|443||llm-b.internal.dns", Weight: 50},
})
key := "alice"
first := lb.selectCluster(key)
for range 10 {
if got := lb.selectCluster(key); got != first {
t.Errorf("inconsistent result for same key: got %q, want %q", got, first)
}
}
}
func TestSelectCluster_Distribution(t *testing.T) {
clusterA := "outbound|443||llm-a.internal.dns"
clusterB := "outbound|443||llm-b.internal.dns"
lb := buildLB(t, []clusterEntry{
{Cluster: clusterA, Weight: 70},
{Cluster: clusterB, Weight: 30},
})
hasA, hasB := false, false
for _, c := range lb.slots {
switch c {
case clusterA:
hasA = true
case clusterB:
hasB = true
}
}
if !hasA || !hasB {
t.Fatalf("weight-expanded slots must include both clusters, hasA=%v hasB=%v", hasA, hasB)
}
seen := map[string]struct{}{}
for i := 0; i < 256 && len(seen) < 2; i++ {
seen[lb.selectCluster(fmt.Sprintf("key-%d", i))] = struct{}{}
}
if len(seen) < 2 {
t.Errorf("expected hash routing to reach at least 2 clusters, got %v", seen)
}
}
func TestSelectCluster_SingleCluster(t *testing.T) {
target := "outbound|443||llm-a.internal.dns"
lb := buildLB(t, []clusterEntry{
{Cluster: target, Weight: 100},
})
for _, key := range []string{"alice", "bob", "carol"} {
if got := lb.selectCluster(key); got != target {
t.Errorf("single cluster: expected %q, got %q for key %q", target, got, key)
}
}
}
func buildLB(t *testing.T, entries []clusterEntry) ClusterHashLoadBalancer {
t.Helper()
slots := make([]string, 0, 100)
for _, e := range entries {
for i := 0; i < e.Weight; i++ {
slots = append(slots, e.Cluster)
}
}
return ClusterHashLoadBalancer{
HashHeader: DefaultHashHeader,
ClusterHeader: DefaultClusterHeader,
slots: slots,
}
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/tidwall/gjson"
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-load-balancer/cluster_metrics"
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-load-balancer/cluster_hash"
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-load-balancer/endpoint_metrics"
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-load-balancer/global_least_request"
"github.com/alibaba/higress/plugins/wasm-go/extensions/ai-load-balancer/prefix_cache"
@@ -48,6 +49,7 @@ const (
EndpointLoadBalancerType = "endpoint"
// Cluster load balancer policies
MetricsBasedCluster = "cluster_metrics"
ClusterHashCluster = "cluster_hash"
// Endpoint load balancer policies
MetricsBasedEndpoint = "endpoint_metrics"
MetricsBasedEndpointDeprecated = "metrics_based" // Compatible with old configurations, equal to `endpoint_metrics`
@@ -68,6 +70,8 @@ func parseConfig(json gjson.Result, config *Config) error {
switch config.lbPolicy {
case MetricsBasedCluster:
config.lb, err = cluster_metrics.NewClusterEndpointLoadBalancer(json.Get("lb_config"))
case ClusterHashCluster:
config.lb, err = cluster_hash.NewClusterHashLoadBalancer(json.Get("lb_config"))
default:
err = fmt.Errorf("lb_policy %s is not supported", config.lbPolicy)
}