From 69b755a10d4ceacd271de95873d6bf5cf3429afe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E8=B4=A4=E6=B6=9B?= <601803023@qq.com> Date: Thu, 29 May 2025 09:57:10 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20cluster-key-rate-limit=20support=20sett?= =?UTF-8?q?ing=20global=20rate=20limit=20thresholds=20for=20routes?= =?UTF-8?q?=E2=80=8B=20(#2262)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../extensions/ai-token-ratelimit/config.go | 2 +- .../cluster-key-rate-limit/README.md | 165 ++++---- .../cluster-key-rate-limit/README_EN.md | 295 ++++++++------- .../cluster-key-rate-limit/config.go | 303 --------------- .../cluster-key-rate-limit/config/config.go | 357 ++++++++++++++++++ .../config/config_test.go | 211 +++++++++++ .../extensions/cluster-key-rate-limit/go.mod | 5 + .../extensions/cluster-key-rate-limit/go.sum | 9 + .../extensions/cluster-key-rate-limit/main.go | 162 ++++---- .../{ => util}/utils.go | 18 +- .../extensions/ext-auth/config/config.go | 11 +- .../extensions/ext-auth/config/config_test.go | 2 +- plugins/wasm-go/extensions/ext-auth/go.mod | 1 + plugins/wasm-go/extensions/ext-auth/go.sum | 3 + plugins/wasm-go/extensions/ext-auth/main.go | 17 +- 15 files changed, 965 insertions(+), 596 deletions(-) delete mode 100644 plugins/wasm-go/extensions/cluster-key-rate-limit/config.go create mode 100644 plugins/wasm-go/extensions/cluster-key-rate-limit/config/config.go create mode 100644 plugins/wasm-go/extensions/cluster-key-rate-limit/config/config_test.go rename plugins/wasm-go/extensions/cluster-key-rate-limit/{ => util}/utils.go (67%) diff --git a/plugins/wasm-go/extensions/ai-token-ratelimit/config.go b/plugins/wasm-go/extensions/ai-token-ratelimit/config.go index f8c5389ba..be148d7c4 100644 --- a/plugins/wasm-go/extensions/ai-token-ratelimit/config.go +++ b/plugins/wasm-go/extensions/ai-token-ratelimit/config.go @@ -142,7 +142,7 @@ func parseClusterKeyRateLimitConfig(json gjson.Result, config *ClusterKeyRateLim config.rejectedCode = DefaultRejectedCode } rejectedMsg := json.Get("rejected_msg") - if rejectedCode.Exists() { + if rejectedMsg.Exists() { config.rejectedMsg = rejectedMsg.String() } else { config.rejectedMsg = DefaultRejectedMsg diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/README.md b/plugins/wasm-go/extensions/cluster-key-rate-limit/README.md index f75ea01bc..d80a1e350 100644 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/README.md +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/README.md @@ -6,8 +6,12 @@ description: 基于 Key 集群限流插件配置参考 ## 功能说明 -`cluster-key-rate-limit` 插件基于 Redis 实现集群限流,适用于需要跨多个 Higress Gateway 实例实现全局一致速率限制的场景。 -限流所使用的 Key 可以来源于 URL 参数、HTTP 请求头、客户端 IP 地址、消费者名称或 Cookie 中的 Key。 +`cluster-key-rate-limit` 插件基于 Redis 实现**集群级限流**,适用于需要跨多个 Higress Gateway 实例进行**全局一致速率限制**的场景。 + +支持两种限流模式: + +- **规则级全局限流**:基于相同的 `rule_name` 和 `global_threshold` 配置,对自定义规则组设置全局限流阈值 +- **Key 级动态限流**:根据请求中的动态 Key(如 URL 参数、请求头、客户端 IP、Consumer 名称或 Cookie 字段)进行分组限流 ## 运行属性 @@ -19,12 +23,22 @@ description: 基于 Key 集群限流插件配置参考 | 配置项 | 类型 | 必填 | 默认值 | 说明 | | ----------------------- | ------ | ---- | ------ |-----------------------------------------------------------------------------| | rule_name | string | 是 | - | 限流规则名称,根据限流规则名称 + 限流类型 + 限流 key 名称 + 限流 key 对应的实际值来拼装 redis key | -| rule_items | array of object | 是 | - | 限流规则项,按照 rule_items 下的排列顺序,匹配第一个 rule_item 后命中限流规则,后续规则将被忽略 | +| global_threshold | Object | 否,`global_threshold` 或 `rule_items` 选填一项 | - | 对整个自定义规则组进行限流 | +| rule_items | array of object | 否,`global_threshold` 或 `rule_items` 选填一项 | - | 限流规则项,按照 rule_items 下的排列顺序,匹配第一个 rule_item 后命中限流规则,后续规则将被忽略 | | show_limit_quota_header | bool | 否 | false | 响应头中是否显示 `X-RateLimit-Limit`(限制的总请求数)和 `X-RateLimit-Remaining`(剩余还可以发送的请求数) | | rejected_code | int | 否 | 429 | 请求被限流时,返回的 HTTP 状态码 | | rejected_msg | string | 否 | Too many requests | 请求被限流时,返回的响应体 | | redis | object | 是 | - | redis 相关配置 | +`global_threshold` 中每一项的配置字段说明。 + +| 配置项 | 类型 | 必填 | 默认值 | 说明 | +| ---------------- | ---- | ------------------------------------------------------------ | ------ | ------------------ | +| query_per_second | int | 否,`query_per_second`,`query_per_minute`,`query_per_hour`,`query_per_day` 中选填一项 | - | 允许每秒请求次数 | +| query_per_minute | int | 否,`query_per_second`,`query_per_minute`,`query_per_hour`,`query_per_day` 中选填一项 | - | 允许每分钟请求次数 | +| query_per_hour | int | 否,`query_per_second`,`query_per_minute`,`query_per_hour`,`query_per_day` 中选填一项 | - | 允许每小时请求次数 | +| query_per_day | int | 否,`query_per_second`,`query_per_minute`,`query_per_hour`,`query_per_day` 中选填一项 | - | 允许每天请求次数 | + `rule_items` 中每一项的配置字段说明。 | 配置项 | 类型 | 必填 | 默认值 | 说明 | @@ -63,28 +77,39 @@ description: 基于 Key 集群限流插件配置参考 ## 配置示例 +### 自定义规则组全局限流 + +```yaml +rule_name: routeA-global-limit-rule +global_threshold: + query_per_minute: 1000 # 自定义规则组每分钟最多1000次请求 +redis: + service_name: redis.static +show_limit_quota_header: true +``` + ### 识别请求参数 apikey,进行区别限流 ```yaml -rule_name: default_rule +rule_name: routeA-request-param-limit-rule rule_items: -- limit_by_param: apikey - limit_keys: - - key: 9a342114-ba8a-11ec-b1bf-00163e1250b5 - query_per_minute: 10 - - key: a6a6d7f2-ba8a-11ec-bec2-00163e1250b5 - query_per_hour: 100 -- limit_by_per_param: apikey - limit_keys: - # 正则表达式,匹配以 a 开头的所有字符串,每个 apikey 对应的请求 10qds - - key: "regexp:^a.*" - query_per_second: 10 - # 正则表达式,匹配以 b 开头的所有字符串,每个 apikey 对应的请求 100qd - - key: "regexp:^b.*" - query_per_minute: 100 - # 兜底用,匹配所有请求,每个 apikey 对应的请求 1000qdh - - key: "*" - query_per_hour: 1000 + - limit_by_param: apikey + limit_keys: + - key: 9a342114-ba8a-11ec-b1bf-00163e1250b5 + query_per_minute: 10 + - key: a6a6d7f2-ba8a-11ec-bec2-00163e1250b5 + query_per_hour: 100 + - limit_by_per_param: apikey + limit_keys: + # 正则表达式,匹配以 a 开头的所有字符串,每个 apikey 对应的请求 10qds + - key: "regexp:^a.*" + query_per_second: 10 + # 正则表达式,匹配以 b 开头的所有字符串,每个 apikey 对应的请求 100qd + - key: "regexp:^b.*" + query_per_minute: 100 + # 兜底用,匹配所有请求,每个 apikey 对应的请求 1000qdh + - key: "*" + query_per_hour: 1000 redis: service_name: redis.static show_limit_quota_header: true @@ -93,25 +118,25 @@ show_limit_quota_header: true ### 识别请求头 x-ca-key,进行区别限流 ```yaml -rule_name: default_rule +rule_name: routeA-request-header-limit-rule rule_items: -- limit_by_header: x-ca-key - limit_keys: - - key: 102234 - query_per_minute: 10 - - key: 308239 - query_per_hour: 10 -- limit_by_per_header: x-ca-key - limit_keys: - # 正则表达式,匹配以 a 开头的所有字符串,每个 apikey 对应的请求 10qds - - key: "regexp:^a.*" - query_per_second: 10 - # 正则表达式,匹配以b开头的所有字符串,每个 apikey 对应的请求 100qd - - key: "regexp:^b.*" - query_per_minute: 100 - # 兜底用,匹配所有请求,每个 apikey 对应的请求 1000qdh - - key: "*" - query_per_hour: 1000 + - limit_by_header: x-ca-key + limit_keys: + - key: 102234 + query_per_minute: 10 + - key: 308239 + query_per_hour: 10 + - limit_by_per_header: x-ca-key + limit_keys: + # 正则表达式,匹配以 a 开头的所有字符串,每个 apikey 对应的请求 10qds + - key: "regexp:^a.*" + query_per_second: 10 + # 正则表达式,匹配以b开头的所有字符串,每个 apikey 对应的请求 100qd + - key: "regexp:^b.*" + query_per_minute: 100 + # 兜底用,匹配所有请求,每个 apikey 对应的请求 1000qdh + - key: "*" + query_per_hour: 1000 redis: service_name: redis.static show_limit_quota_header: true @@ -120,19 +145,19 @@ show_limit_quota_header: true ### 根据请求头 x-forwarded-for 获取对端 IP,进行区别限流 ```yaml -rule_name: default_rule +rule_name: routeA-client-ip-limit-rule rule_items: -- limit_by_per_ip: from-header-x-forwarded-for - limit_keys: - # 精确 IP - - key: 1.1.1.1 - query_per_day: 10 - # IP 段,符合这个 IP 段的 IP,每个 IP 100qpd - - key: 1.1.1.0/24 - query_per_day: 100 - # 兜底用,即默认每个 IP 1000 qpd - - key: 0.0.0.0/0 - query_per_day: 1000 + - limit_by_per_ip: from-header-x-forwarded-for + limit_keys: + # 精确 IP + - key: 1.1.1.1 + query_per_day: 10 + # IP 段,符合这个 IP 段的 IP,每个 IP 100qpd + - key: 1.1.1.0/24 + query_per_day: 100 + # 兜底用,即默认每个 IP 1000 qpd + - key: 0.0.0.0/0 + query_per_day: 1000 redis: service_name: redis.static show_limit_quota_header: true @@ -141,25 +166,25 @@ show_limit_quota_header: true ### 识别 consumer,进行区别限流 ```yaml -rule_name: default_rule +rule_name: routeA-consumer-limit-rule rule_items: -- limit_by_consumer: '' - limit_keys: - - key: consumer1 - query_per_second: 10 - - key: consumer2 - query_per_hour: 100 -- limit_by_per_consumer: '' - limit_keys: - # 正则表达式,匹配以 a 开头的所有字符串,每个 consumer 对应的请求 10qds - - key: "regexp:^a.*" - query_per_second: 10 - # 正则表达式,匹配以 b 开头的所有字符串,每个 consumer 对应的请求 100qd - - key: "regexp:^b.*" - query_per_minute: 100 - # 兜底用,匹配所有请求,每个 consumer 对应的请求 1000qdh - - key: "*" - query_per_hour: 1000 + - limit_by_consumer: '' + limit_keys: + - key: consumer1 + query_per_second: 10 + - key: consumer2 + query_per_hour: 100 + - limit_by_per_consumer: '' + limit_keys: + # 正则表达式,匹配以 a 开头的所有字符串,每个 consumer 对应的请求 10qds + - key: "regexp:^a.*" + query_per_second: 10 + # 正则表达式,匹配以 b 开头的所有字符串,每个 consumer 对应的请求 100qd + - key: "regexp:^b.*" + query_per_minute: 100 + # 兜底用,匹配所有请求,每个 consumer 对应的请求 1000qdh + - key: "*" + query_per_hour: 1000 redis: service_name: redis.static show_limit_quota_header: true @@ -168,7 +193,7 @@ show_limit_quota_header: true ### 识别 Cookie 中的键值对,进行区别限流 ```yaml -rule_name: default_rule +rule_name: routeA-cookie-limit-rule rule_items: - limit_by_cookie: key1 limit_keys: diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/README_EN.md b/plugins/wasm-go/extensions/cluster-key-rate-limit/README_EN.md index 83e0935d9..cba6cb9a9 100644 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/README_EN.md +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/README_EN.md @@ -1,164 +1,201 @@ --- -title: Key-Based Cluster Rate Limiting -keywords: [higress, rate-limit] -description: Configuration reference for the Key-Based Cluster Rate Limiting plugin +title: Cluster Rate Limiting Based on Key +keywords: [higress, rate-limit] +description: Configuration reference for the Key-based cluster rate limiting plugin + --- + ## Function Description -The `cluster-key-rate-limit` plugin implements cluster rate limiting based on Redis, suitable for scenarios that require global consistent rate limiting across multiple Higress Gateway instances. -The Key used for rate limiting can originate from URL parameters, HTTP request headers, client IP addresses, consumer names, or keys in cookies. +The `cluster-key-rate-limit` plugin implements **cluster-level rate limiting** based on Redis, suitable for scenarios +requiring **globally consistent rate limiting across multiple Higress Gateway instances**. -## Execution Attributes -Plugin Execution Phase: `default phase` -Plugin Execution Priority: `20` +It supports two rate limiting modes: -## Configuration Description -| Configuration Item | Type | Required | Default Value | Description | -|---------------------------|---------------|----------|---------------|-------------------------------------------------------------------------------------------| -| rule_name | string | Yes | - | The name of the rate limiting rule. The Redis key is constructed using rule name + rate limit type + limit key name + actual value of the limit key. | -| rule_items | array of object| Yes | - | Rate limiting rule items. The first matching `rule_item` based on the order under `rule_items` will trigger the rate limiting, and subsequent rules will be ignored. | -| show_limit_quota_header | bool | No | false | Whether to display `X-RateLimit-Limit` (total requests allowed) and `X-RateLimit-Remaining` (remaining requests that can be sent) in the response headers. | -| rejected_code | int | No | 429 | HTTP status code returned when a request is rate limited. | -| rejected_msg | string | No | Too many requests | Response body returned when a request is rate limited. | -| redis | object | Yes | - | Redis related configuration. | +- **Rule-Level Global Rate Limiting**: Applies a unified rate limit threshold to custom rule groups based on identical `rule_name` and `global_threshold` configurations. +- **Key-Level Dynamic Rate Limiting**: Groups and limits requests by dynamic keys extracted from requests, such as URL parameters, request headers, client IPs, consumer names, or cookie fields. -Description of configuration fields for each item in `rule_items`. -| Configuration Item | Type | Required | Default Value | Description | -|---------------------------|---------------|------------------------|---------------|-------------------------------------------------------------------------------------------------------| -| limit_by_header | string | No, one of `limit_by_*` | - | The name of the HTTP request header from which to retrieve the rate limiting key value. | -| limit_by_param | string | No, one of `limit_by_*` | - | The name of the URL parameter from which to retrieve the rate limiting key value. | -| limit_by_consumer | string | No, one of `limit_by_*` | - | Applies rate limiting based on consumer name without needing to add an actual value. | -| limit_by_cookie | string | No, one of `limit_by_*` | - | The name of the key in the Cookie from which to retrieve the rate limiting key value. | -| limit_by_per_header | string | No, one of `limit_by_*` | - | Matches specific HTTP request headers according to the rules and calculates rate limits for each header. The `limit_keys` configuration supports regular expressions or `*`. | -| limit_by_per_param | string | No, one of `limit_by_*` | - | Matches specific URL parameters according to the rules and calculates rate limits for each parameter. The `limit_keys` configuration supports regular expressions or `*`. | -| limit_by_per_consumer | string | No, one of `limit_by_*` | - | Matches specific consumers according to the rules and calculates rate limits for each consumer. The `limit_keys` configuration supports regular expressions or `*`. | -| limit_by_per_cookie | string | No, one of `limit_by_*` | - | Matches specific cookies according to the rules and calculates rate limits for each cookie. The `limit_keys` configuration supports regular expressions or `*`. | -| limit_by_per_ip | string | No, one of `limit_by_*` | - | Matches specific IPs according to the rules and calculates rate limits for each IP. Retrieve via IP parameter name from request headers, defined as `from-header-{header name}`, e.g., `from-header-x-forwarded-for`. To get the remote socket IP directly, use `from-remote-addr`. | -| limit_keys | array of object | Yes | - | Configures the limit counts after matching key values. | +## Operational Attributes -Description of configuration fields for each item in `limit_keys`. -| Configuration Item | Type | Required | Default Value | Description | -|---------------------------|---------------|------------------------------------------------------------------|---------------|--------------------------------------------------------------------| -| key | string | Yes | - | Matched key value; types `limit_by_per_header`, `limit_by_per_param`, `limit_by_per_consumer`, `limit_by_per_cookie` support regular expression configurations (starting with regexp: followed by a regular expression) or `*` (representing all), e.g., `regexp:^d.*` (all strings starting with d); `limit_by_per_ip` supports configuring IP addresses or IP segments. | -| query_per_second | int | No, one of `query_per_second`, `query_per_minute`, `query_per_hour`, `query_per_day` is optional. | - | Allowed number of requests per second. | -| query_per_minute | int | No, one of `query_per_second`, `query_per_minute`, `query_per_hour`, `query_per_day` is optional. | - | Allowed number of requests per minute. | -| query_per_hour | int | No, one of `query_per_second`, `query_per_minute`, `query_per_hour`, `query_per_day` is optional. | - | Allowed number of requests per hour. | -| query_per_day | int | No, one of `query_per_second`, `query_per_minute`, `query_per_hour`, `query_per_day` is optional. | - | Allowed number of requests per day. | +- **Plugin execution phase**: `Default phase` +- **Plugin execution priority**: `20` -Description of configuration fields for each item in `redis`. -| Configuration Item | Type | Required | Default Value | Description | -|--------------------|--------|----------|----------------------------------------|-----------------------------------------------------------------------------------------------------------------| -| service_name | string | Required | - | Full FQDN name of the Redis service, including service type, e.g., my-redis.dns, redis.my-ns.svc.cluster.local. | -| service_port | int | No | 80 for static services; otherwise 6379 | Service port for the Redis service. | -| username | string | No | - | Redis username. | -| password | string | No | - | Redis password. | -| timeout | int | No | 1000 | Redis connection timeout in milliseconds. | -| database | int | No | 0 | The database ID used, for example, configured as 1, corresponds to `SELECT 1`. | +## Configuration Instructions +| Configuration Item | Type | Required | Default Value | Description | +|--------------------------|---------------|-------------------------------------------|---------------------|----------------------------------------------------------------------------| +| rule_name | string | Yes | - | Name of the rate limiting rule. Used to construct the Redis key in the format: `rule_name:rate_limit_type:key_name:key_value`. | +| global_threshold | Object | No (choose either `global_threshold` or `rule_items`) | - | Apply rate limiting to the entire custom rule group.| +| rule_items | array of object | No (choose either `global_threshold` or `rule_items`) | - | Rate limiting rule items. Rules are matched in the order of the array; once the first matching rule is hit, subsequent rules are ignored. | +| show_limit_quota_header | bool | No | false | Whether to display `X-RateLimit-Limit` (total allowed requests) and `X-RateLimit-Remaining` (remaining allowed requests) in the response header. | +| rejected_code | int | No | 429 | HTTP status code returned when a request is rate-limited. | +| rejected_msg | string | No | Too many requests | Response body returned when a request is rate-limited. | +| redis | object | Yes | - | Configuration for Redis. | + +### Configuration Fields for `global_threshold` + +| Configuration Item | Type | Required | Default Value | Description | +|--------------------------|------|------------------------------------------|---------------|--------------------------------------| +| query_per_second | int | No (choose one of `query_per_second`, `query_per_minute`, `query_per_hour`, `query_per_day`) | - | Allowed requests per second. | +| query_per_minute | int | No (choose one of `query_per_second`, `query_per_minute`, `query_per_hour`, `query_per_day`) | - | Allowed requests per minute. | +| query_per_hour | int | No (choose one of `query_per_second`, `query_per_minute`, `query_per_hour`, `query_per_day`) | - | Allowed requests per hour. | +| query_per_day | int | No (choose one of `query_per_second`, `query_per_minute`, `query_per_hour`, `query_per_day`) | - | Allowed requests per day. | + +### Configuration Fields for `rule_items` + +| Configuration Item | Type | Required | Default Value | Description | +|-------------------------------|---------------|-----------------------------------|---------------|-----------------------------------------------------------------------------| +| limit_by_header | string | No (choose one of `limit_by_*` fields) | - | Configures the HTTP request header name to extract the rate limiting key. | +| limit_by_param | string | No (choose one of `limit_by_*` fields) | - | Configures the URL parameter name to extract the rate limiting key. | +| limit_by_consumer | string | No (choose one of `limit_by_*` fields) | - | Rate limits based on the consumer name (no need to add a specific value). | +| limit_by_cookie | string | No (choose one of `limit_by_*` fields) | - | Configures the Cookie key name to extract the rate limiting key. | +| limit_by_per_header | string | No (choose one of `limit_by_*` fields) | - | Matches specific HTTP headers by rule and calculates rate limits for each header. Supports regular expressions (starting with `regexp:`) or `*` for the `limit_keys` configuration. | +| limit_by_per_param | string | No (choose one of `limit_by_*` fields) | - | Matches specific URL parameters by rule and calculates rate limits for each parameter. Supports regular expressions (starting with `regexp:`) or `*` for the `limit_keys` configuration. | +| limit_by_per_consumer | string | No (choose one of `limit_by_*` fields) | - | Matches specific consumers by rule and calculates rate limits for each consumer. Supports regular expressions (starting with `regexp:`) or `*` for the `limit_keys` configuration (no need to add a specific value for the consumer name). | +| limit_by_per_cookie | string | No (choose one of `limit_by_*` fields) | - | Matches specific Cookies by rule and calculates rate limits for each Cookie value. Supports regular expressions (starting with `regexp:`) or `*` for the `limit_keys` configuration. | +| limit_by_per_ip | string | No (choose one of `limit_by_*` fields) | - | Matches specific IPs by rule and calculates rate limits for each IP. The IP can be extracted from a request header (formatted as `from-header-`, e.g., `from-header-x-forwarded-for`) or directly from the peer socket IP (configured as `from-remote-addr`). | +| limit_keys | array of object | Yes | - | Configures the rate limits for matched key values. | + +### Configuration Fields for `limit_keys` + +| Configuration Item | Type | Required | Default Value | Description | +|--------------------------|--------|------------------------------------------|---------------|-----------------------------------------------------------------------------| +| key | string | Yes | - | The matched key value. For `limit_by_per_header`, `limit_by_per_param`, `limit_by_per_consumer`, and `limit_by_per_cookie` types, supports regular expressions (prefixed with `regexp:`) or `*` (wildcard for all). Example regular expression: `regexp:^d.*` (matches all strings starting with `d`). For `limit_by_per_ip`, supports IP addresses or CIDR blocks. | +| query_per_second | int | No (choose one of `query_per_second`, `query_per_minute`, `query_per_hour`, `query_per_day`) | - | Allowed requests per second. | +| query_per_minute | int | No (choose one of `query_per_second`, `query_per_minute`, `query_per_hour`, `query_per_day`) | - | Allowed requests per minute. | +| query_per_hour | int | No (choose one of `query_per_second`, `query_per_minute`, `query_per_hour`, `query_per_day`) | - | Allowed requests per hour. | +| query_per_day | int | No (choose one of `query_per_second`, `query_per_minute`, `query_per_hour`, `query_per_day`) | - | Allowed requests per day. | + +### Configuration Fields for `redis` + +| Configuration Item | Type | Required | Default Value | Description | +|----------------------|--------|----------|-------------------------------------------------------------------|-----------------------------------------------------------------------------| +| service_name | string | Yes | - | The fully qualified domain name (FQDN) of the Redis service, including the service type (e.g., `my-redis.dns`, `redis.my-ns.svc.cluster.local`). | +| service_port | int | No | 80 (for static services), 6379 for other services | The port of the Redis service. | +| username | string | No | - | Redis username for authentication. | +| password | string | No | - | Redis password for authentication. | +| timeout | int | No | 1000 (milliseconds) | Redis connection timeout in milliseconds. | +| database | int | No | 0 | The ID of the Redis database to use (e.g., configuring `1` corresponds to `SELECT 1`). | ## Configuration Examples -### Distinguish rate limiting based on the request parameter apikey -```yaml -rule_name: default_rule -rule_items: -- limit_by_param: apikey - limit_keys: - - key: 9a342114-ba8a-11ec-b1bf-00163e1250b5 - query_per_minute: 10 - - key: a6a6d7f2-ba8a-11ec-bec2-00163e1250b5 - query_per_hour: 100 -- limit_by_per_param: apikey - limit_keys: - # Regular expression, matches all strings starting with a, each apikey corresponds to 10qds. - - key: "regexp:^a.*" - query_per_second: 10 - # Regular expression, matches all strings starting with b, each apikey corresponds to 100qd. - - key: "regexp:^b.*" - query_per_minute: 100 - # As a fallback, matches all requests, each apikey corresponds to 1000qdh. - - key: "*" - query_per_hour: 1000 +### Global Rate Limiting for Custom Rule Group + +```yaml +rule_name: routeA-global-limit-rule +global_threshold: + query_per_minute: 1000 # Maximum 1000 requests per minute for this rule group redis: service_name: redis.static show_limit_quota_header: true ``` -### Distinguish rate limiting based on the header x-ca-key -```yaml -rule_name: default_rule +### Rate Limiting by Request Parameter `apikey` + +```yaml +rule_name: routeA-request-param-limit-rule rule_items: -- limit_by_header: x-ca-key - limit_keys: - - key: 102234 - query_per_minute: 10 - - key: 308239 - query_per_hour: 10 -- limit_by_per_header: x-ca-key - limit_keys: - # Regular expression, matches all strings starting with a, each apikey corresponds to 10qds. - - key: "regexp:^a.*" - query_per_second: 10 - # Regular expression, matches all strings starting with b, each apikey corresponds to 100qd. - - key: "regexp:^b.*" - query_per_minute: 100 - # As a fallback, matches all requests, each apikey corresponds to 1000qdh. - - key: "*" - query_per_hour: 1000 + - limit_by_param: apikey + limit_keys: + - key: 9a342114-ba8a-11ec-b1bf-00163e1250b5 + query_per_minute: 10 + - key: a6a6d7f2-ba8a-11ec-bec2-00163e1250b5 + query_per_hour: 100 + - limit_by_per_param: apikey + limit_keys: + # Regular expression to match all strings starting with "a"; 10 requests per second for each apikey + - key: "regexp:^a.*" + query_per_second: 10 + # Regular expression to match all strings starting with "b"; 100 requests per minute for each apikey + - key: "regexp:^b.*" + query_per_minute: 100 + # Fallback rule to match all requests; 1000 requests per hour for each apikey + - key: "*" + query_per_hour: 1000 redis: service_name: redis.static show_limit_quota_header: true ``` -### Distinguish rate limiting based on the client IP from the request header x-forwarded-for -```yaml -rule_name: default_rule +### Rate Limiting by Request Header `x-ca-key` + +```yaml +rule_name: routeA-request-header-limit-rule rule_items: -- limit_by_per_ip: from-header-x-forwarded-for - limit_keys: - # Exact IP - - key: 1.1.1.1 - query_per_day: 10 - # IP segment, for IPs matching this segment, each IP corresponds to 100qpd. - - key: 1.1.1.0/24 - query_per_day: 100 - # As a fallback, defaults to 1000 qpd for each IP. - - key: 0.0.0.0/0 - query_per_day: 1000 + - limit_by_header: x-ca-key + limit_keys: + - key: 102234 + query_per_minute: 10 + - key: 308239 + query_per_hour: 10 + - limit_by_per_header: x-ca-key + limit_keys: + # Regular expression to match all strings starting with "a"; 10 requests per second for each key + - key: "regexp:^a.*" + query_per_second: 10 + # Regular expression to match all strings starting with "b"; 100 requests per minute for each key + - key: "regexp:^b.*" + query_per_minute: 100 + # Fallback rule to match all requests; 1000 requests per hour for each key + - key: "*" + query_per_hour: 1000 redis: service_name: redis.static show_limit_quota_header: true ``` -### Distinguish rate limiting based on consumers -```yaml -rule_name: default_rule +### Rate Limiting by Client IP Extracted from `x-forwarded-for` Header + +```yaml +rule_name: routeA-client-ip-limit-rule rule_items: -- limit_by_consumer: '' - limit_keys: - - key: consumer1 - query_per_second: 10 - - key: consumer2 - query_per_hour: 100 -- limit_by_per_consumer: '' - limit_keys: - # Regular expression, matches all strings starting with a, each consumer corresponds to 10qds. - - key: "regexp:^a.*" - query_per_second: 10 - # Regular expression, matches all strings starting with b, each consumer corresponds to 100qd. - - key: "regexp:^b.*" - query_per_minute: 100 - # As a fallback, matches all requests, each consumer corresponds to 1000qdh. - - key: "*" - query_per_hour: 1000 + - limit_by_per_ip: from-header-x-forwarded-for + limit_keys: + # Exact IP match + - key: 1.1.1.1 + query_per_day: 10 + # CIDR block match; 100 requests per day for each IP in the block + - key: 1.1.1.0/24 + query_per_day: 100 + # Fallback rule for all IPs; 1000 requests per day for each IP + - key: 0.0.0.0/0 + query_per_day: 1000 redis: service_name: redis.static show_limit_quota_header: true ``` -### Distinguish rate limiting based on key-value pairs in cookies -```yaml -rule_name: default_rule +### Rate Limiting by Consumer + +```yaml +rule_name: routeA-consumer-limit-rule +rule_items: + - limit_by_consumer: '' + limit_keys: + - key: consumer1 + query_per_second: 10 + - key: consumer2 + query_per_hour: 100 + - limit_by_per_consumer: '' + limit_keys: + # Regular expression to match all consumer names starting with "a"; 10 requests per second for each consumer + - key: "regexp:^a.*" + query_per_second: 10 + # Regular expression to match all consumer names starting with "b"; 100 requests per minute for each consumer + - key: "regexp:^b.*" + query_per_minute: 100 + # Fallback rule to match all consumers; 1000 requests per hour for each consumer + - key: "*" + query_per_hour: 1000 +redis: + service_name: redis.static +show_limit_quota_header: true +``` + +### Rate Limiting by Cookie Value + +```yaml +rule_name: routeA-cookie-limit-rule rule_items: - limit_by_cookie: key1 limit_keys: @@ -168,13 +205,13 @@ rule_items: query_per_hour: 100 - limit_by_per_cookie: key1 limit_keys: - # Regular expression, matches all strings starting with a, each cookie's value corresponds to 10qds. + # Regular expression to match all cookie values starting with "a"; 10 requests per second for each value - key: "regexp:^a.*" query_per_second: 10 - # Regular expression, matches all strings starting with b, each cookie's value corresponds to 100qd. + # Regular expression to match all cookie values starting with "b"; 100 requests per minute for each value - key: "regexp:^b.*" query_per_minute: 100 - # As a fallback, matches all requests, each cookie's value corresponds to 1000qdh. + # Fallback rule to match all cookie values; 1000 requests per hour for each value - key: "*" query_per_hour: 1000 rejected_code: 200 @@ -182,4 +219,4 @@ rejected_msg: '{"code":-1,"msg":"Too many requests"}' redis: service_name: redis.static show_limit_quota_header: true -``` +``` \ No newline at end of file diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/config.go b/plugins/wasm-go/extensions/cluster-key-rate-limit/config.go deleted file mode 100644 index 00d84b21f..000000000 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/config.go +++ /dev/null @@ -1,303 +0,0 @@ -package main - -import ( - "errors" - "fmt" - "strings" - - "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" - "github.com/tidwall/gjson" - re "github.com/wasilibs/go-re2" - "github.com/zmap/go-iptree/iptree" -) - -// 限流规则项类型 -type limitRuleItemType string - -// 限流配置项key类型 -type limitConfigItemType string - -const ( - limitByHeaderType limitRuleItemType = "limit_by_header" - limitByParamType limitRuleItemType = "limit_by_param" - limitByConsumerType limitRuleItemType = "limit_by_consumer" - limitByCookieType limitRuleItemType = "limit_by_cookie" - limitByPerHeaderType limitRuleItemType = "limit_by_per_header" - limitByPerParamType limitRuleItemType = "limit_by_per_param" - limitByPerConsumerType limitRuleItemType = "limit_by_per_consumer" - limitByPerCookieType limitRuleItemType = "limit_by_per_cookie" - limitByPerIpType limitRuleItemType = "limit_by_per_ip" - - exactType limitConfigItemType = "exact" // 精确匹配 - regexpType limitConfigItemType = "regexp" // 正则表达式 - allType limitConfigItemType = "*" // 匹配所有情况 - ipNetType limitConfigItemType = "ipNet" // ip段 - - RemoteAddrSourceType = "remote-addr" - HeaderSourceType = "header" - - DefaultRejectedCode uint32 = 429 - DefaultRejectedMsg string = "Too many requests" - - Second int64 = 1 - SecondsPerMinute = 60 * Second - SecondsPerHour = 60 * SecondsPerMinute - SecondsPerDay = 24 * SecondsPerHour -) - -var timeWindows = map[string]int64{ - "query_per_second": Second, - "query_per_minute": SecondsPerMinute, - "query_per_hour": SecondsPerHour, - "query_per_day": SecondsPerDay, -} - -type ClusterKeyRateLimitConfig struct { - ruleName string // 限流规则名称 - ruleItems []LimitRuleItem // 限流规则项 - showLimitQuotaHeader bool // 响应头中是否显示X-RateLimit-Limit和X-RateLimit-Remaining - rejectedCode uint32 // 当请求超过阈值被拒绝时,返回的HTTP状态码 - rejectedMsg string // 当请求超过阈值被拒绝时,返回的响应体 - redisClient wrapper.RedisClient -} - -type LimitRuleItem struct { - limitType limitRuleItemType // 限流类型 - key string // 根据该key值进行限流,limit_by_consumer和limit_by_per_consumer两种类型为ConsumerHeader,其他类型为对应的key值 - limitByPerIp LimitByPerIp // 对端ip地址或ip段 - configItems []LimitConfigItem // 限流配置项 -} - -type LimitByPerIp struct { - sourceType string // ip来源类型 - headerName string // 根据该请求头获取客户端ip -} - -type LimitConfigItem struct { - configType limitConfigItemType // 限流配置项key类型 - key string // 限流key - ipNet *iptree.IPTree // 限流key转换的ip地址或者ip段,仅用于itemType为ipNetType - regexp *re.Regexp // 正则表达式,仅用于itemType为regexpType - count int64 // 指定时间窗口内的总请求数量阈值 - timeWindow int64 // 时间窗口大小 -} - -func initRedisClusterClient(json gjson.Result, config *ClusterKeyRateLimitConfig) error { - redisConfig := json.Get("redis") - if !redisConfig.Exists() { - return errors.New("missing redis in config") - } - serviceName := redisConfig.Get("service_name").String() - if serviceName == "" { - return errors.New("redis service name must not be empty") - } - servicePort := int(redisConfig.Get("service_port").Int()) - if servicePort == 0 { - if strings.HasSuffix(serviceName, ".static") { - // use default logic port which is 80 for static service - servicePort = 80 - } else { - servicePort = 6379 - } - } - username := redisConfig.Get("username").String() - password := redisConfig.Get("password").String() - timeout := int(redisConfig.Get("timeout").Int()) - if timeout == 0 { - timeout = 1000 - } - config.redisClient = wrapper.NewRedisClusterClient(wrapper.FQDNCluster{ - FQDN: serviceName, - Port: int64(servicePort), - }) - database := int(redisConfig.Get("database").Int()) - return config.redisClient.Init(username, password, int64(timeout), wrapper.WithDataBase(database)) -} - -func parseClusterKeyRateLimitConfig(json gjson.Result, config *ClusterKeyRateLimitConfig) error { - ruleName := json.Get("rule_name") - if !ruleName.Exists() { - return errors.New("missing rule_name in config") - } - config.ruleName = ruleName.String() - - // 初始化ruleItems - err := initRuleItems(json, config) - if err != nil { - return err - } - - showLimitQuotaHeader := json.Get("show_limit_quota_header") - if showLimitQuotaHeader.Exists() { - config.showLimitQuotaHeader = showLimitQuotaHeader.Bool() - } - - rejectedCode := json.Get("rejected_code") - if rejectedCode.Exists() { - config.rejectedCode = uint32(rejectedCode.Uint()) - } else { - config.rejectedCode = DefaultRejectedCode - } - rejectedMsg := json.Get("rejected_msg") - if rejectedCode.Exists() { - config.rejectedMsg = rejectedMsg.String() - } else { - config.rejectedMsg = DefaultRejectedMsg - } - return nil -} - -func initRuleItems(json gjson.Result, config *ClusterKeyRateLimitConfig) error { - ruleItemsResult := json.Get("rule_items") - if !ruleItemsResult.Exists() { - return errors.New("missing rule_items in config") - } - if len(ruleItemsResult.Array()) == 0 { - return errors.New("config rule_items cannot be empty") - } - var ruleItems []LimitRuleItem - for _, item := range ruleItemsResult.Array() { - var ruleItem LimitRuleItem - - // 根据配置区分限流类型 - var limitType limitRuleItemType - setLimitByKeyIfExists := func(field gjson.Result, limitTypeStr limitRuleItemType) { - if field.Exists() && field.String() != "" { - ruleItem.key = field.String() - limitType = limitTypeStr - } - } - setLimitByKeyIfExists(item.Get("limit_by_header"), limitByHeaderType) - setLimitByKeyIfExists(item.Get("limit_by_param"), limitByParamType) - setLimitByKeyIfExists(item.Get("limit_by_cookie"), limitByCookieType) - setLimitByKeyIfExists(item.Get("limit_by_per_header"), limitByPerHeaderType) - setLimitByKeyIfExists(item.Get("limit_by_per_param"), limitByPerParamType) - setLimitByKeyIfExists(item.Get("limit_by_per_cookie"), limitByPerCookieType) - - limitByConsumer := item.Get("limit_by_consumer") - if limitByConsumer.Exists() { - ruleItem.key = ConsumerHeader - limitType = limitByConsumerType - } - limitByPerConsumer := item.Get("limit_by_per_consumer") - if limitByPerConsumer.Exists() { - ruleItem.key = ConsumerHeader - limitType = limitByPerConsumerType - } - - limitByPerIpResult := item.Get("limit_by_per_ip") - if limitByPerIpResult.Exists() && limitByPerIpResult.String() != "" { - limitByPerIp := limitByPerIpResult.String() - ruleItem.key = limitByPerIp - if strings.HasPrefix(limitByPerIp, "from-header-") { - headerName := limitByPerIp[len("from-header-"):] - if headerName == "" { - return errors.New("limit_by_per_ip parse error: empty after 'from-header-'") - } - ruleItem.limitByPerIp = LimitByPerIp{ - sourceType: HeaderSourceType, - headerName: headerName, - } - } else if limitByPerIp == "from-remote-addr" { - ruleItem.limitByPerIp = LimitByPerIp{ - sourceType: RemoteAddrSourceType, - headerName: "", - } - } else { - return errors.New("the 'limit_by_per_ip' restriction must start with 'from-header-' or be exactly 'from-remote-addr'") - } - limitType = limitByPerIpType - } - - if limitType == "" { - return errors.New("only one of 'limit_by_header' and 'limit_by_param' and 'limit_by_consumer' and 'limit_by_cookie' and 'limit_by_per_header' and 'limit_by_per_param' and 'limit_by_per_consumer' and 'limit_by_per_cookie' and 'limit_by_per_ip' can be set") - } - ruleItem.limitType = limitType - - // 初始化configItems - err := initConfigItems(item, &ruleItem) - if err != nil { - return err - } - - ruleItems = append(ruleItems, ruleItem) - } - config.ruleItems = ruleItems - return nil -} - -func initConfigItems(json gjson.Result, rule *LimitRuleItem) error { - limitKeys := json.Get("limit_keys") - if !limitKeys.Exists() { - return errors.New("missing limit_keys in config") - } - if len(limitKeys.Array()) == 0 { - return errors.New("config limit_keys cannot be empty") - } - var configItems []LimitConfigItem - for _, item := range limitKeys.Array() { - key := item.Get("key") - if !key.Exists() || key.String() == "" { - return errors.New("limit_keys key is required") - } - - var ( - itemKey = key.String() - itemType limitConfigItemType - ipNet *iptree.IPTree - regexp *re.Regexp - ) - if rule.limitType == limitByPerIpType { - var err error - ipNet, err = parseIPNet(itemKey) - if err != nil { - return fmt.Errorf("failed to parse IPNet for key '%s': %w", itemKey, err) - } - itemType = ipNetType - } else if rule.limitType == limitByPerHeaderType || - rule.limitType == limitByPerParamType || - rule.limitType == limitByPerConsumerType || - rule.limitType == limitByPerCookieType { - if itemKey == "*" { - itemType = allType - } else if strings.HasPrefix(itemKey, "regexp:") { - regexpStr := itemKey[len("regexp:"):] - var err error - regexp, err = re.Compile(regexpStr) - if err != nil { - return fmt.Errorf("failed to compile regex for key '%s': %w", itemKey, err) - } - itemType = regexpType - } else { - return fmt.Errorf("the '%s' restriction must start with 'regexp:' or be exactly '*'", rule.limitType) - } - } else { - itemType = exactType - } - - if configItem, err := createConfigItemFromRate(item, itemType, itemKey, ipNet, regexp); err != nil { - return err - } else if configItem != nil { - configItems = append(configItems, *configItem) - } - } - rule.configItems = configItems - return nil -} - -func createConfigItemFromRate(item gjson.Result, itemType limitConfigItemType, key string, ipNet *iptree.IPTree, regexp *re.Regexp) (*LimitConfigItem, error) { - for timeWindowKey, duration := range timeWindows { - q := item.Get(timeWindowKey) - if q.Exists() && q.Int() > 0 { - return &LimitConfigItem{ - configType: itemType, - key: key, - ipNet: ipNet, - regexp: regexp, - count: q.Int(), - timeWindow: duration, - }, nil - } - } - return nil, errors.New("one of 'query_per_second', 'query_per_minute', 'query_per_hour', or 'query_per_day' must be set for key: " + key) -} diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/config/config.go b/plugins/wasm-go/extensions/cluster-key-rate-limit/config/config.go new file mode 100644 index 000000000..17058b77c --- /dev/null +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/config/config.go @@ -0,0 +1,357 @@ +package config + +import ( + "errors" + "fmt" + "strings" + + "cluster-key-rate-limit/util" + "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" + "github.com/tidwall/gjson" + re "github.com/wasilibs/go-re2" + "github.com/zmap/go-iptree/iptree" +) + +// LimitRuleItemType 限流规则项类型 +type LimitRuleItemType string + +// LimitConfigItemType 限流配置项key类型 +type LimitConfigItemType string + +const ( + LimitByHeaderType LimitRuleItemType = "limit_by_header" + LimitByParamType LimitRuleItemType = "limit_by_param" + LimitByConsumerType LimitRuleItemType = "limit_by_consumer" + LimitByCookieType LimitRuleItemType = "limit_by_cookie" + LimitByPerHeaderType LimitRuleItemType = "limit_by_per_header" + LimitByPerParamType LimitRuleItemType = "limit_by_per_param" + LimitByPerConsumerType LimitRuleItemType = "limit_by_per_consumer" + LimitByPerCookieType LimitRuleItemType = "limit_by_per_cookie" + LimitByPerIpType LimitRuleItemType = "limit_by_per_ip" + + ExactType LimitConfigItemType = "exact" // 精确匹配 + RegexpType LimitConfigItemType = "regexp" // 正则表达式 + AllType LimitConfigItemType = "*" // 匹配所有情况 + IpNetType LimitConfigItemType = "ipNet" // ip段 + + ConsumerHeader = "x-mse-consumer" // LimitByConsumer从该request header获取consumer的名字 + + RemoteAddrSourceType = "remote-addr" + HeaderSourceType = "header" + + DefaultRejectedCode uint32 = 429 + DefaultRejectedMsg string = "Too many requests" + + Second int64 = 1 + SecondsPerMinute = 60 * Second + SecondsPerHour = 60 * SecondsPerMinute + SecondsPerDay = 24 * SecondsPerHour +) + +var timeWindows = map[string]int64{ + "query_per_second": Second, + "query_per_minute": SecondsPerMinute, + "query_per_hour": SecondsPerHour, + "query_per_day": SecondsPerDay, +} + +type ClusterKeyRateLimitConfig struct { + RuleName string // 限流规则名称 + GlobalThreshold *GlobalThreshold // 全局限流配置 + RuleItems []LimitRuleItem // 限流规则项 + ShowLimitQuotaHeader bool // 响应头中是否显示X-RateLimit-Limit和X-RateLimit-Remaining + RejectedCode uint32 // 当请求超过阈值被拒绝时,返回的HTTP状态码 + RejectedMsg string // 当请求超过阈值被拒绝时,返回的响应体 + RedisClient wrapper.RedisClient +} + +type GlobalThreshold struct { + Count int64 // 时间窗口内请求数 + TimeWindow int64 // 时间窗口大小(秒) +} + +type LimitRuleItem struct { + LimitType LimitRuleItemType // 限流类型 + Key string // 根据该key值进行限流,limit_by_consumer和limit_by_per_consumer两种类型为ConsumerHeader,其他类型为对应的key值 + LimitByPerIp LimitByPerIp // 对端ip地址或ip段 + ConfigItems []LimitConfigItem // 限流配置项 +} + +type LimitByPerIp struct { + SourceType string // ip来源类型 + HeaderName string // 根据该请求头获取客户端ip +} + +type LimitConfigItem struct { + ConfigType LimitConfigItemType // 限流配置项key类型 + Key string // 限流key + IpNet *iptree.IPTree // 限流key转换的ip地址或者ip段,仅用于itemType为ipNetType + Regexp *re.Regexp // 正则表达式,仅用于itemType为regexpType + Count int64 // 指定时间窗口内的总请求数量阈值 + TimeWindow int64 // 时间窗口大小 +} + +func InitRedisClusterClient(json gjson.Result, config *ClusterKeyRateLimitConfig) error { + redisConfig := json.Get("redis") + if !redisConfig.Exists() { + return errors.New("missing redis in config") + } + + serviceName := redisConfig.Get("service_name").String() + if serviceName == "" { + return errors.New("redis service name must not be empty") + } + + servicePort := int(redisConfig.Get("service_port").Int()) + if servicePort == 0 { + if strings.HasSuffix(serviceName, ".static") { + // use default logic port which is 80 for static service + servicePort = 80 + } else { + servicePort = 6379 + } + } + + username := redisConfig.Get("username").String() + password := redisConfig.Get("password").String() + timeout := int(redisConfig.Get("timeout").Int()) + if timeout == 0 { + timeout = 1000 + } + + config.RedisClient = wrapper.NewRedisClusterClient(wrapper.FQDNCluster{ + FQDN: serviceName, + Port: int64(servicePort), + }) + database := int(redisConfig.Get("database").Int()) + return config.RedisClient.Init(username, password, int64(timeout), wrapper.WithDataBase(database)) +} + +func ParseClusterKeyRateLimitConfig(json gjson.Result, config *ClusterKeyRateLimitConfig) error { + ruleName := json.Get("rule_name") + if !ruleName.Exists() { + return errors.New("missing rule_name in config") + } + config.RuleName = ruleName.String() + + // 初始化限流规则 + if err := initLimitRule(json, config); err != nil { + return err + } + + showLimitQuotaHeader := json.Get("show_limit_quota_header") + if showLimitQuotaHeader.Exists() { + config.ShowLimitQuotaHeader = showLimitQuotaHeader.Bool() + } + + rejectedCode := json.Get("rejected_code") + if rejectedCode.Exists() { + config.RejectedCode = uint32(rejectedCode.Uint()) + } else { + config.RejectedCode = DefaultRejectedCode + } + + rejectedMsg := json.Get("rejected_msg") + if rejectedMsg.Exists() { + config.RejectedMsg = rejectedMsg.String() + } else { + config.RejectedMsg = DefaultRejectedMsg + } + return nil +} + +func initLimitRule(json gjson.Result, config *ClusterKeyRateLimitConfig) error { + globalThresholdResult := json.Get("global_threshold") + ruleItemsResult := json.Get("rule_items") + + hasGlobal := globalThresholdResult.Exists() + hasRule := ruleItemsResult.Exists() + if !hasGlobal && !hasRule { + return errors.New("at least one of 'global_threshold' or 'rule_items' must be set") + } else if hasGlobal && hasRule { + return errors.New("'global_threshold' and 'rule_items' cannot be set at the same time") + } + + // 处理全局限流配置 + if hasGlobal { + threshold, err := parseGlobalThreshold(globalThresholdResult) + if err != nil { + return fmt.Errorf("failed to parse global_threshold: %w", err) + } + config.GlobalThreshold = threshold + return nil + } + + // 处理条件限流规则 + items := ruleItemsResult.Array() + if len(items) == 0 { + return errors.New("config rule_items cannot be empty") + } + + var ruleItems []LimitRuleItem + for _, item := range items { + ruleItem, err := parseLimitRuleItem(item) + if err != nil { + return fmt.Errorf("failed to parse rule_item in rule_items: %w", err) + } + ruleItems = append(ruleItems, *ruleItem) + } + config.RuleItems = ruleItems + return nil +} + +func parseGlobalThreshold(item gjson.Result) (*GlobalThreshold, error) { + for timeWindowKey, duration := range timeWindows { + q := item.Get(timeWindowKey) + if q.Exists() && q.Int() > 0 { + return &GlobalThreshold{ + Count: q.Int(), + TimeWindow: duration, + }, nil + } + } + return nil, errors.New("one of 'query_per_second', 'query_per_minute', 'query_per_hour', or 'query_per_day' must be set for global_threshold") +} + +func parseLimitRuleItem(item gjson.Result) (*LimitRuleItem, error) { + var ruleItem LimitRuleItem + // 根据配置区分限流类型 + var limitType LimitRuleItemType + + trySetLimitType := func(field gjson.Result, limitTypeStr LimitRuleItemType) { + if field.Exists() && field.String() != "" { + ruleItem.Key = field.String() + limitType = limitTypeStr + } + } + trySetLimitType(item.Get("limit_by_header"), LimitByHeaderType) + trySetLimitType(item.Get("limit_by_param"), LimitByParamType) + trySetLimitType(item.Get("limit_by_cookie"), LimitByCookieType) + trySetLimitType(item.Get("limit_by_per_header"), LimitByPerHeaderType) + trySetLimitType(item.Get("limit_by_per_param"), LimitByPerParamType) + trySetLimitType(item.Get("limit_by_per_cookie"), LimitByPerCookieType) + + limitByConsumer := item.Get("limit_by_consumer") + if limitByConsumer.Exists() { + ruleItem.Key = ConsumerHeader + limitType = LimitByConsumerType + } + limitByPerConsumer := item.Get("limit_by_per_consumer") + if limitByPerConsumer.Exists() { + ruleItem.Key = ConsumerHeader + limitType = LimitByPerConsumerType + } + + limitByPerIpResult := item.Get("limit_by_per_ip") + if limitByPerIpResult.Exists() && limitByPerIpResult.String() != "" { + limitByPerIp := limitByPerIpResult.String() + ruleItem.Key = limitByPerIp + if strings.HasPrefix(limitByPerIp, "from-header-") { + headerName := limitByPerIp[len("from-header-"):] + if headerName == "" { + return nil, errors.New("limit_by_per_ip parse error: empty after 'from-header-'") + } + ruleItem.LimitByPerIp = LimitByPerIp{ + SourceType: HeaderSourceType, + HeaderName: headerName, + } + } else if limitByPerIp == "from-remote-addr" { + ruleItem.LimitByPerIp = LimitByPerIp{ + SourceType: RemoteAddrSourceType, + HeaderName: "", + } + } else { + return nil, errors.New("the 'limit_by_per_ip' restriction must start with 'from-header-' or be exactly 'from-remote-addr'") + } + limitType = LimitByPerIpType + } + + if limitType == "" { + return nil, errors.New("only one of 'limit_by_header' and 'limit_by_param' and 'limit_by_consumer' and 'limit_by_cookie' and 'limit_by_per_header' and 'limit_by_per_param' and 'limit_by_per_consumer' and 'limit_by_per_cookie' and 'limit_by_per_ip' can be set") + } + ruleItem.LimitType = limitType + + // 初始化configItems + err := initConfigItems(item, &ruleItem) + if err != nil { + return nil, fmt.Errorf("failed to init config items: %w", err) + } + + return &ruleItem, nil +} + +func initConfigItems(json gjson.Result, rule *LimitRuleItem) error { + limitKeys := json.Get("limit_keys") + if !limitKeys.Exists() { + return errors.New("missing limit_keys in config") + } + if len(limitKeys.Array()) == 0 { + return errors.New("config limit_keys cannot be empty") + } + var configItems []LimitConfigItem + for _, item := range limitKeys.Array() { + key := item.Get("key") + if !key.Exists() || key.String() == "" { + return errors.New("limit_keys key is required") + } + + var ( + itemKey = key.String() + itemType LimitConfigItemType + ipNet *iptree.IPTree + regexp *re.Regexp + ) + if rule.LimitType == LimitByPerIpType { + var err error + ipNet, err = util.ParseIPNet(itemKey) + if err != nil { + return fmt.Errorf("failed to parse IPNet for key '%s': %w", itemKey, err) + } + itemType = IpNetType + } else if rule.LimitType == LimitByPerHeaderType || + rule.LimitType == LimitByPerParamType || + rule.LimitType == LimitByPerConsumerType || + rule.LimitType == LimitByPerCookieType { + if itemKey == "*" { + itemType = AllType + } else if strings.HasPrefix(itemKey, "regexp:") { + regexpStr := itemKey[len("regexp:"):] + var err error + regexp, err = re.Compile(regexpStr) + if err != nil { + return fmt.Errorf("failed to compile regex for key '%s': %w", itemKey, err) + } + itemType = RegexpType + } else { + return fmt.Errorf("the '%s' restriction must start with 'regexp:' or be exactly '*'", rule.LimitType) + } + } else { + itemType = ExactType + } + + if configItem, err := createConfigItemFromRate(item, itemType, itemKey, ipNet, regexp); err != nil { + return err + } else if configItem != nil { + configItems = append(configItems, *configItem) + } + } + rule.ConfigItems = configItems + return nil +} + +func createConfigItemFromRate(item gjson.Result, itemType LimitConfigItemType, key string, ipNet *iptree.IPTree, regexp *re.Regexp) (*LimitConfigItem, error) { + for timeWindowKey, duration := range timeWindows { + q := item.Get(timeWindowKey) + if q.Exists() && q.Int() > 0 { + return &LimitConfigItem{ + ConfigType: itemType, + Key: key, + IpNet: ipNet, + Regexp: regexp, + Count: q.Int(), + TimeWindow: duration, + }, nil + } + } + return nil, errors.New("one of 'query_per_second', 'query_per_minute', 'query_per_hour', or 'query_per_day' must be set for key: " + key) +} diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/config/config_test.go b/plugins/wasm-go/extensions/cluster-key-rate-limit/config/config_test.go new file mode 100644 index 000000000..26bad8fb8 --- /dev/null +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/config/config_test.go @@ -0,0 +1,211 @@ +package config + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tidwall/gjson" +) + +func TestParseClusterKeyRateLimitConfig(t *testing.T) { + tests := []struct { + name string + json string + expected ClusterKeyRateLimitConfig + expectedErr error + }{ + { + name: "MissingRuleName", + json: `{}`, + expectedErr: errors.New("missing rule_name in config"), + }, + { + name: "GlobalThreshold_QueryPerSecond", + json: `{ + "rule_name": "global-route-limit", + "global_threshold": { + "query_per_second": 100 + } + }`, + expected: ClusterKeyRateLimitConfig{ + RuleName: "global-route-limit", + GlobalThreshold: &GlobalThreshold{ + Count: 100, + TimeWindow: Second, + }, + RejectedCode: DefaultRejectedCode, + RejectedMsg: DefaultRejectedMsg, + }, + }, + { + name: "GlobalThreshold_QueryPerMinute", + json: `{ + "rule_name": "global-route-limit", + "global_threshold": { + "query_per_minute": 1000 + } + }`, + expected: ClusterKeyRateLimitConfig{ + RuleName: "global-route-limit", + GlobalThreshold: &GlobalThreshold{ + Count: 1000, + TimeWindow: SecondsPerMinute, + }, + RejectedCode: DefaultRejectedCode, + RejectedMsg: DefaultRejectedMsg, + }, + }, + { + name: "RuleItems_SingleRule", + json: `{ + "rule_name": "rule-based-limit", + "rule_items": [ + { + "limit_by_header": "x-test", + "limit_keys": [ + {"key": "key1", "query_per_second": 10} + ] + } + ] + }`, + expected: ClusterKeyRateLimitConfig{ + RuleName: "rule-based-limit", + RuleItems: []LimitRuleItem{ + { + LimitType: LimitByHeaderType, + Key: "x-test", + ConfigItems: []LimitConfigItem{ + { + ConfigType: ExactType, + Key: "key1", + Count: 10, + TimeWindow: Second, + }, + }, + }, + }, + RejectedCode: DefaultRejectedCode, + RejectedMsg: DefaultRejectedMsg, + }, + }, + { + name: "RuleItems_MultipleRules", + json: `{ + "rule_name": "multi-rule-limit", + "rule_items": [ + { + "limit_by_param": "user_id", + "limit_keys": [ + {"key": "123", "query_per_hour": 50} + ] + }, + { + "limit_by_per_cookie": "session_id", + "limit_keys": [ + {"key": "*", "query_per_day": 100} + ] + } + ] + }`, + expected: ClusterKeyRateLimitConfig{ + RuleName: "multi-rule-limit", + RuleItems: []LimitRuleItem{ + { + LimitType: LimitByParamType, + Key: "user_id", + ConfigItems: []LimitConfigItem{ + { + ConfigType: ExactType, + Key: "123", + Count: 50, + TimeWindow: SecondsPerHour, + }, + }, + }, + { + LimitType: LimitByPerCookieType, + Key: "session_id", + ConfigItems: []LimitConfigItem{ + { + ConfigType: AllType, + Key: "*", + Count: 100, + TimeWindow: SecondsPerDay, + }, + }, + }, + }, + RejectedCode: DefaultRejectedCode, + RejectedMsg: DefaultRejectedMsg, + }, + }, + { + name: "Conflict_GlobalThresholdAndRuleItems", + json: `{ + "rule_name": "test-conflict", + "global_threshold": {"query_per_second": 100}, + "rule_items": [{"limit_by_header": "x-test"}] + }`, + expectedErr: errors.New("'global_threshold' and 'rule_items' cannot be set at the same time"), + }, + { + name: "Missing_GlobalThresholdAndRuleItems", + json: `{ + "rule_name": "test-missing" + }`, + expectedErr: errors.New("at least one of 'global_threshold' or 'rule_items' must be set"), + }, + { + name: "Custom_RejectedCodeAndMessage", + json: `{ + "rule_name": "custom-reject", + "rejected_code": 403, + "rejected_msg": "Forbidden", + "global_threshold": {"query_per_second": 100} + }`, + expected: ClusterKeyRateLimitConfig{ + RuleName: "custom-reject", + GlobalThreshold: &GlobalThreshold{ + Count: 100, + TimeWindow: Second, + }, + RejectedCode: 403, + RejectedMsg: "Forbidden", + }, + }, + { + name: "ShowLimitQuotaHeader_Enabled", + json: `{ + "rule_name": "show-header", + "show_limit_quota_header": true, + "global_threshold": {"query_per_second": 100} + }`, + expected: ClusterKeyRateLimitConfig{ + RuleName: "show-header", + GlobalThreshold: &GlobalThreshold{ + Count: 100, + TimeWindow: Second, + }, + ShowLimitQuotaHeader: true, + RejectedCode: DefaultRejectedCode, + RejectedMsg: DefaultRejectedMsg, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var config ClusterKeyRateLimitConfig + result := gjson.Parse(tt.json) + err := ParseClusterKeyRateLimitConfig(result, &config) + + if tt.expectedErr != nil { + assert.EqualError(t, err, tt.expectedErr.Error()) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expected, config) + } + }) + } +} diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/go.mod b/plugins/wasm-go/extensions/cluster-key-rate-limit/go.mod index f9a4e5ba5..a2db02e95 100644 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/go.mod +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/go.mod @@ -7,6 +7,7 @@ replace github.com/alibaba/higress/plugins/wasm-go => ../.. require ( github.com/alibaba/higress/plugins/wasm-go v0.0.0 github.com/higress-group/proxy-wasm-go-sdk v1.0.0 + github.com/stretchr/testify v1.8.4 github.com/tidwall/gjson v1.17.3 github.com/tidwall/resp v0.1.1 github.com/wasilibs/go-re2 v1.5.3 @@ -15,10 +16,14 @@ require ( require ( github.com/asergeyev/nradix v0.0.0-20170505151046-3872ab85bb56 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/google/uuid v1.3.0 // indirect github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 // indirect github.com/magefile/mage v1.14.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/tetratelabs/wazero v1.7.1 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/sjson v1.2.5 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/go.sum b/plugins/wasm-go/extensions/cluster-key-rate-limit/go.sum index 7b8c22894..96927ade5 100644 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/go.sum +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/go.sum @@ -1,6 +1,7 @@ github.com/asergeyev/nradix v0.0.0-20170505151046-3872ab85bb56 h1:Wi5Tgn8K+jDcBYL+dIMS1+qXYH2r7tpRAyBgqrWfQtw= github.com/asergeyev/nradix v0.0.0-20170505151046-3872ab85bb56/go.mod h1:8BhOLuqtSuT5NZtZMwfvEibi09RO3u79uqfHZzfDTR4= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 h1:IHDghbGQ2DTIXHBHxWfqCYQW1fKjyJ/I7W1pMyUDeEA= @@ -10,9 +11,12 @@ github.com/higress-group/proxy-wasm-go-sdk v1.0.0/go.mod h1:iiSyFbo+rAtbtGt/bsef github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tetratelabs/wazero v1.7.1 h1:QtSfd6KLc41DIMpDYlJdoMc6k7QTN246DM2+n2Y/Dx8= github.com/tetratelabs/wazero v1.7.1/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94= github.com/tidwall/gjson v1.17.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -21,9 +25,14 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/resp v0.1.1 h1:Ly20wkhqKTmDUPlyM1S7pWo5kk0tDu8OoC/vFArXmwE= github.com/tidwall/resp v0.1.1/go.mod h1:3/FrruOBAxPTPtundW0VXgmsQ4ZBA0Aw714lVYgwFa0= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/wasilibs/go-re2 v1.5.3 h1:wiuTcgDZdLhu8NG8oqF5sF5Q3yIU14lPAvXqeYzDK3g= github.com/wasilibs/go-re2 v1.5.3/go.mod h1:PzpVPsBdFC7vM8QJbbEnOeTmwA0DGE783d/Gex8eCV8= github.com/wasilibs/nottinygc v0.4.0 h1:h1TJMihMC4neN6Zq+WKpLxgd9xCFMw7O9ETLwY2exJQ= github.com/zmap/go-iptree v0.0.0-20210731043055-d4e632617837 h1:DjHnADS2r2zynZ3WkCFAQ+PNYngMSNceRROi0pO6c3M= github.com/zmap/go-iptree v0.0.0-20210731043055-d4e632617837/go.mod h1:9vp0bxqozzQwcjBwenEXfKVq8+mYbwHkQ1NF9Ap0DMw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/main.go b/plugins/wasm-go/extensions/cluster-key-rate-limit/main.go index f8fde47a8..0ce0bb2de 100644 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/main.go +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/main.go @@ -21,6 +21,9 @@ import ( "strconv" "strings" + "cluster-key-rate-limit/config" + "cluster-key-rate-limit/util" + "github.com/alibaba/higress/plugins/wasm-go/pkg/log" "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" "github.com/higress-group/proxy-wasm-go-sdk/proxywasm/types" @@ -31,15 +34,20 @@ import ( func main() { wrapper.SetCtx( "cluster-key-rate-limit", - wrapper.ParseConfigBy(parseConfig), - wrapper.ProcessRequestHeadersBy(onHttpRequestHeaders), - wrapper.ProcessResponseHeadersBy(onHttpResponseHeaders), + wrapper.ParseConfig(parseConfig), + wrapper.ProcessRequestHeaders(onHttpRequestHeaders), + wrapper.ProcessResponseHeaders(onHttpResponseHeaders), ) } const ( - ClusterRateLimitFormat string = "higress-cluster-key-rate-limit:%s:%s:%d:%d:%s:%s" // redis key为前缀:限流规则名称:限流类型:时间窗口:窗口内限流数:限流key名称:限流key对应的实际值 - FixedWindowScript string = ` + // ClusterKeyPrefix 集群限流插件在 Redis 中 key 的统一前缀 + ClusterKeyPrefix = "higress-cluster-key-rate-limit" + // ClusterGlobalRateLimitFormat 全局限流模式 redis key 为 ClusterKeyPrefix:限流规则名称:global_threshold:时间窗口:窗口内限流数 + ClusterGlobalRateLimitFormat = ClusterKeyPrefix + ":%s:global_threshold:%d:%d" + // ClusterRateLimitFormat 规则限流模式 redis key 为 ClusterKeyPrefix:限流规则名称:限流类型:时间窗口:窗口内限流数:限流key名称:限流key对应的实际值 + ClusterRateLimitFormat = ClusterKeyPrefix + ":%s:%s:%d:%d:%s:%s" + FixedWindowScript = ` local ttl = redis.call('ttl', KEYS[1]) if ttl < 0 then redis.call('set', KEYS[1], ARGV[1] - 1, 'EX', ARGV[2]) @@ -48,14 +56,13 @@ const ( return {ARGV[1], redis.call('incrby', KEYS[1], -1), ttl} ` - LimitContextKey string = "LimitContext" // 限流上下文信息 + LimitContextKey = "LimitContext" // 限流上下文信息 - ConsumerHeader string = "x-mse-consumer" // LimitByConsumer从该request header获取consumer的名字 - CookieHeader string = "cookie" + CookieHeader = "cookie" - RateLimitLimitHeader string = "X-RateLimit-Limit" // 限制的总请求数 - RateLimitRemainingHeader string = "X-RateLimit-Remaining" // 剩余还可以发送的请求数 - RateLimitResetHeader string = "X-RateLimit-Reset" // 限流重置时间(触发限流时返回) + RateLimitLimitHeader = "X-RateLimit-Limit" // 限制的总请求数 + RateLimitRemainingHeader = "X-RateLimit-Remaining" // 剩余还可以发送的请求数 + RateLimitResetHeader = "X-RateLimit-Reset" // 限流重置时间(触发限流时返回) ) type LimitContext struct { @@ -64,31 +71,43 @@ type LimitContext struct { reset int } -func parseConfig(json gjson.Result, config *ClusterKeyRateLimitConfig, log wrapper.Log) error { - err := initRedisClusterClient(json, config) +func parseConfig(json gjson.Result, cfg *config.ClusterKeyRateLimitConfig) error { + err := config.InitRedisClusterClient(json, cfg) if err != nil { return err } - err = parseClusterKeyRateLimitConfig(json, config) + err = config.ParseClusterKeyRateLimitConfig(json, cfg) if err != nil { return err } return nil } -func onHttpRequestHeaders(ctx wrapper.HttpContext, config ClusterKeyRateLimitConfig, log wrapper.Log) types.Action { - // 判断是否命中限流规则 - val, ruleItem, configItem := checkRequestAgainstLimitRule(ctx, config.ruleItems, log) - if ruleItem == nil || configItem == nil { - return types.ActionContinue +func onHttpRequestHeaders(ctx wrapper.HttpContext, config config.ClusterKeyRateLimitConfig) types.Action { + limitKey, count, timeWindow := "", int64(0), int64(0) + + if config.GlobalThreshold != nil { + // 全局限流模式 + limitKey = fmt.Sprintf(ClusterGlobalRateLimitFormat, config.RuleName, config.GlobalThreshold.TimeWindow, config.GlobalThreshold.Count) + count = config.GlobalThreshold.Count + timeWindow = config.GlobalThreshold.TimeWindow + } else { + // 规则限流模式 + val, ruleItem, configItem := checkRequestAgainstLimitRule(ctx, config.RuleItems) + if ruleItem == nil || configItem == nil { + // 没有匹配到限流规则直接返回 + return types.ActionContinue + } + + limitKey = fmt.Sprintf(ClusterRateLimitFormat, config.RuleName, ruleItem.LimitType, configItem.TimeWindow, configItem.Count, ruleItem.Key, val) + count = configItem.Count + timeWindow = configItem.TimeWindow } - // 构建redis限流key和参数 - limitKey := fmt.Sprintf(ClusterRateLimitFormat, config.ruleName, ruleItem.limitType, configItem.timeWindow, configItem.count, ruleItem.key, val) - keys := []interface{}{limitKey} - args := []interface{}{configItem.count, configItem.timeWindow} // 执行限流逻辑 - err := config.redisClient.Eval(FixedWindowScript, 1, keys, args, func(response resp.Value) { + keys := []interface{}{limitKey} + args := []interface{}{count, timeWindow} + err := config.RedisClient.Eval(FixedWindowScript, 1, keys, args, func(response resp.Value) { resultArray := response.Array() if len(resultArray) != 3 { log.Errorf("redis response parse error, response: %v", response) @@ -108,6 +127,7 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config ClusterKeyRateLimitCon proxywasm.ResumeHttpRequest() } }) + if err != nil { log.Errorf("redis call failed: %v", err) return types.ActionContinue @@ -115,79 +135,81 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config ClusterKeyRateLimitCon return types.ActionPause } -func onHttpResponseHeaders(ctx wrapper.HttpContext, config ClusterKeyRateLimitConfig, log wrapper.Log) types.Action { +func onHttpResponseHeaders(ctx wrapper.HttpContext, config config.ClusterKeyRateLimitConfig) types.Action { limitContext, ok := ctx.GetContext(LimitContextKey).(LimitContext) if !ok { return types.ActionContinue } - if config.showLimitQuotaHeader { + if config.ShowLimitQuotaHeader { _ = proxywasm.ReplaceHttpResponseHeader(RateLimitLimitHeader, strconv.Itoa(limitContext.count)) _ = proxywasm.ReplaceHttpResponseHeader(RateLimitRemainingHeader, strconv.Itoa(limitContext.remaining)) } return types.ActionContinue } -func checkRequestAgainstLimitRule(ctx wrapper.HttpContext, ruleItems []LimitRuleItem, log wrapper.Log) (string, *LimitRuleItem, *LimitConfigItem) { - for _, rule := range ruleItems { - val, ruleItem, configItem := hitRateRuleItem(ctx, rule, log) - if ruleItem != nil && configItem != nil { - return val, ruleItem, configItem +func checkRequestAgainstLimitRule(ctx wrapper.HttpContext, ruleItems []config.LimitRuleItem) (string, *config.LimitRuleItem, *config.LimitConfigItem) { + if len(ruleItems) > 0 { + for _, rule := range ruleItems { + val, ruleItem, configItem := hitRateRuleItem(ctx, rule) + if ruleItem != nil && configItem != nil { + return val, ruleItem, configItem + } } } return "", nil, nil } -func hitRateRuleItem(ctx wrapper.HttpContext, rule LimitRuleItem, log wrapper.Log) (string, *LimitRuleItem, *LimitConfigItem) { - switch rule.limitType { +func hitRateRuleItem(ctx wrapper.HttpContext, rule config.LimitRuleItem) (string, *config.LimitRuleItem, *config.LimitConfigItem) { + switch rule.LimitType { // 根据HTTP请求头限流 - case limitByHeaderType, limitByPerHeaderType: - val, err := proxywasm.GetHttpRequestHeader(rule.key) + case config.LimitByHeaderType, config.LimitByPerHeaderType: + val, err := proxywasm.GetHttpRequestHeader(rule.Key) if err != nil { - return logDebugAndReturnEmpty(log, "failed to get request header %s: %v", rule.key, err) + return logDebugAndReturnEmpty("failed to get request header %s: %v", rule.Key, err) } - return val, &rule, findMatchingItem(rule.limitType, rule.configItems, val) + return val, &rule, findMatchingItem(rule.LimitType, rule.ConfigItems, val) // 根据HTTP请求参数限流 - case limitByParamType, limitByPerParamType: + case config.LimitByParamType, config.LimitByPerParamType: parse, err := url.Parse(ctx.Path()) if err != nil { - return logDebugAndReturnEmpty(log, "failed to parse request path: %v", err) + return logDebugAndReturnEmpty("failed to parse request path: %v", err) } query, err := url.ParseQuery(parse.RawQuery) if err != nil { - return logDebugAndReturnEmpty(log, "failed to parse query params: %v", err) + return logDebugAndReturnEmpty("failed to parse query params: %v", err) } - val, ok := query[rule.key] + val, ok := query[rule.Key] if !ok { - return logDebugAndReturnEmpty(log, "request param %s is empty", rule.key) + return logDebugAndReturnEmpty("request param %s is empty", rule.Key) } - return val[0], &rule, findMatchingItem(rule.limitType, rule.configItems, val[0]) + return val[0], &rule, findMatchingItem(rule.LimitType, rule.ConfigItems, val[0]) // 根据consumer限流 - case limitByConsumerType, limitByPerConsumerType: - val, err := proxywasm.GetHttpRequestHeader(ConsumerHeader) + case config.LimitByConsumerType, config.LimitByPerConsumerType: + val, err := proxywasm.GetHttpRequestHeader(config.ConsumerHeader) if err != nil { - return logDebugAndReturnEmpty(log, "failed to get request header %s: %v", ConsumerHeader, err) + return logDebugAndReturnEmpty("failed to get request header %s: %v", config.ConsumerHeader, err) } - return val, &rule, findMatchingItem(rule.limitType, rule.configItems, val) + return val, &rule, findMatchingItem(rule.LimitType, rule.ConfigItems, val) // 根据cookie中key值限流 - case limitByCookieType, limitByPerCookieType: + case config.LimitByCookieType, config.LimitByPerCookieType: cookie, err := proxywasm.GetHttpRequestHeader(CookieHeader) if err != nil { - return logDebugAndReturnEmpty(log, "failed to get request cookie : %v", err) + return logDebugAndReturnEmpty("failed to get request cookie : %v", err) } - val := extractCookieValueByKey(cookie, rule.key) + val := util.ExtractCookieValueByKey(cookie, rule.Key) if val == "" { - return logDebugAndReturnEmpty(log, "cookie key '%s' extracted from cookie '%s' is empty.", rule.key, cookie) + return logDebugAndReturnEmpty("cookie key '%s' extracted from cookie '%s' is empty.", rule.Key, cookie) } - return val, &rule, findMatchingItem(rule.limitType, rule.configItems, val) + return val, &rule, findMatchingItem(rule.LimitType, rule.ConfigItems, val) // 根据客户端IP限流 - case limitByPerIpType: + case config.LimitByPerIpType: realIp, err := getDownStreamIp(rule) if err != nil { log.Warnf("failed to get down stream ip: %v", err) return "", &rule, nil } - for _, item := range rule.configItems { - if _, found, _ := item.ipNet.Get(realIp); !found { + for _, item := range rule.ConfigItems { + if _, found, _ := item.IpNet.Get(realIp); !found { continue } return realIp.String(), &rule, &item @@ -196,37 +218,37 @@ func hitRateRuleItem(ctx wrapper.HttpContext, rule LimitRuleItem, log wrapper.Lo return "", nil, nil } -func logDebugAndReturnEmpty(log wrapper.Log, errMsg string, args ...interface{}) (string, *LimitRuleItem, *LimitConfigItem) { +func logDebugAndReturnEmpty(errMsg string, args ...interface{}) (string, *config.LimitRuleItem, *config.LimitConfigItem) { log.Debugf(errMsg, args...) return "", nil, nil } -func findMatchingItem(limitType limitRuleItemType, items []LimitConfigItem, key string) *LimitConfigItem { +func findMatchingItem(limitType config.LimitRuleItemType, items []config.LimitConfigItem, key string) *config.LimitConfigItem { for _, item := range items { // per类型,检查allType和regexpType - if limitType == limitByPerHeaderType || - limitType == limitByPerParamType || - limitType == limitByPerConsumerType || - limitType == limitByPerCookieType { - if item.configType == allType || (item.configType == regexpType && item.regexp.MatchString(key)) { + if limitType == config.LimitByPerHeaderType || + limitType == config.LimitByPerParamType || + limitType == config.LimitByPerConsumerType || + limitType == config.LimitByPerCookieType { + if item.ConfigType == config.AllType || (item.ConfigType == config.RegexpType && item.Regexp.MatchString(key)) { return &item } } // 其他类型,直接比较key - if item.key == key { + if item.Key == key { return &item } } return nil } -func getDownStreamIp(rule LimitRuleItem) (net.IP, error) { +func getDownStreamIp(rule config.LimitRuleItem) (net.IP, error) { var ( realIpStr string err error ) - if rule.limitByPerIp.sourceType == HeaderSourceType { - realIpStr, err = proxywasm.GetHttpRequestHeader(rule.limitByPerIp.headerName) + if rule.LimitByPerIp.SourceType == config.HeaderSourceType { + realIpStr, err = proxywasm.GetHttpRequestHeader(rule.LimitByPerIp.HeaderName) if err == nil { realIpStr = strings.Split(strings.Trim(realIpStr, " "), ",")[0] } @@ -238,7 +260,7 @@ func getDownStreamIp(rule LimitRuleItem) (net.IP, error) { if err != nil { return nil, err } - ip := parseIP(realIpStr) + ip := util.ParseIP(realIpStr) realIP := net.ParseIP(ip) if realIP == nil { return nil, fmt.Errorf("invalid ip[%s]", ip) @@ -246,13 +268,13 @@ func getDownStreamIp(rule LimitRuleItem) (net.IP, error) { return realIP, nil } -func rejected(config ClusterKeyRateLimitConfig, context LimitContext) { +func rejected(config config.ClusterKeyRateLimitConfig, context LimitContext) { headers := make(map[string][]string) headers[RateLimitResetHeader] = []string{strconv.Itoa(context.reset)} - if config.showLimitQuotaHeader { + if config.ShowLimitQuotaHeader { headers[RateLimitLimitHeader] = []string{strconv.Itoa(context.count)} headers[RateLimitRemainingHeader] = []string{strconv.Itoa(0)} } _ = proxywasm.SendHttpResponseWithDetail( - config.rejectedCode, "cluster-key-rate-limit.rejected", reconvertHeaders(headers), []byte(config.rejectedMsg), -1) + config.RejectedCode, "cluster-key-rate-limit.rejected", util.ReconvertHeaders(headers), []byte(config.RejectedMsg), -1) } diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/utils.go b/plugins/wasm-go/extensions/cluster-key-rate-limit/util/utils.go similarity index 67% rename from plugins/wasm-go/extensions/cluster-key-rate-limit/utils.go rename to plugins/wasm-go/extensions/cluster-key-rate-limit/util/utils.go index e1908a26b..2c6ca6313 100644 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/utils.go +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/util/utils.go @@ -1,4 +1,4 @@ -package main +package util import ( "fmt" @@ -8,8 +8,8 @@ import ( "github.com/zmap/go-iptree/iptree" ) -// parseIPNet 解析Ip段配置 -func parseIPNet(key string) (*iptree.IPTree, error) { +// ParseIPNet 解析Ip段配置 +func ParseIPNet(key string) (*iptree.IPTree, error) { tree := iptree.New() err := tree.AddByString(key, 0) if err != nil { @@ -18,8 +18,8 @@ func parseIPNet(key string) (*iptree.IPTree, error) { return tree, nil } -// parseIP 解析IP -func parseIP(source string) string { +// ParseIP 解析IP +func ParseIP(source string) string { if strings.Contains(source, ".") { // parse ipv4 return strings.Split(source, ":")[0] @@ -31,8 +31,8 @@ func parseIP(source string) string { return source } -// reconvertHeaders headers: map[string][]string -> [][2]string -func reconvertHeaders(hs map[string][]string) [][2]string { +// ReconvertHeaders headers: map[string][]string -> [][2]string +func ReconvertHeaders(hs map[string][]string) [][2]string { var ret [][2]string for k, vs := range hs { for _, v := range vs { @@ -45,8 +45,8 @@ func reconvertHeaders(hs map[string][]string) [][2]string { return ret } -// extractCookieValueByKey 从cookie中提取key对应的value -func extractCookieValueByKey(cookie string, key string) (value string) { +// ExtractCookieValueByKey 从cookie中提取key对应的value +func ExtractCookieValueByKey(cookie string, key string) (value string) { pairs := strings.Split(cookie, ";") for _, pair := range pairs { pair = strings.TrimSpace(pair) diff --git a/plugins/wasm-go/extensions/ext-auth/config/config.go b/plugins/wasm-go/extensions/ext-auth/config/config.go index def0955ce..648c94c04 100644 --- a/plugins/wasm-go/extensions/ext-auth/config/config.go +++ b/plugins/wasm-go/extensions/ext-auth/config/config.go @@ -7,6 +7,7 @@ import ( "strings" "ext-auth/expr" + "github.com/alibaba/higress/plugins/wasm-go/pkg/log" "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" "github.com/tidwall/gjson" ) @@ -56,12 +57,12 @@ type AuthorizationResponse struct { AllowedClientHeaders expr.Matcher } -func ParseConfig(json gjson.Result, config *ExtAuthConfig, log wrapper.Log) error { +func ParseConfig(json gjson.Result, config *ExtAuthConfig) error { httpServiceConfig := json.Get("http_service") if !httpServiceConfig.Exists() { return errors.New("missing http_service in config") } - if err := parseHttpServiceConfig(httpServiceConfig, config, log); err != nil { + if err := parseHttpServiceConfig(httpServiceConfig, config); err != nil { return err } @@ -88,10 +89,10 @@ func ParseConfig(json gjson.Result, config *ExtAuthConfig, log wrapper.Log) erro return nil } -func parseHttpServiceConfig(json gjson.Result, config *ExtAuthConfig, log wrapper.Log) error { +func parseHttpServiceConfig(json gjson.Result, config *ExtAuthConfig) error { var httpService HttpService - if err := parseEndpointConfig(json, &httpService, log); err != nil { + if err := parseEndpointConfig(json, &httpService); err != nil { return err } @@ -114,7 +115,7 @@ func parseHttpServiceConfig(json gjson.Result, config *ExtAuthConfig, log wrappe return nil } -func parseEndpointConfig(json gjson.Result, httpService *HttpService, log wrapper.Log) error { +func parseEndpointConfig(json gjson.Result, httpService *HttpService) error { endpointMode := json.Get("endpoint_mode").String() if endpointMode == "" { endpointMode = EndpointModeEnvoy diff --git a/plugins/wasm-go/extensions/ext-auth/config/config_test.go b/plugins/wasm-go/extensions/ext-auth/config/config_test.go index 299035f45..7e4dc1fb8 100644 --- a/plugins/wasm-go/extensions/ext-auth/config/config_test.go +++ b/plugins/wasm-go/extensions/ext-auth/config/config_test.go @@ -403,7 +403,7 @@ func TestParseConfig(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var config ExtAuthConfig result := gjson.Parse(tt.json) - err := ParseConfig(result, &config, &wrapper.DefaultLog{}) + err := ParseConfig(result, &config) if tt.expectedErr != "" { assert.EqualError(t, err, tt.expectedErr) diff --git a/plugins/wasm-go/extensions/ext-auth/go.mod b/plugins/wasm-go/extensions/ext-auth/go.mod index 925a894e5..1f426fae2 100644 --- a/plugins/wasm-go/extensions/ext-auth/go.mod +++ b/plugins/wasm-go/extensions/ext-auth/go.mod @@ -22,6 +22,7 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/tidwall/resp v0.1.1 // indirect + github.com/tidwall/sjson v1.2.5 // indirect golang.org/x/sys v0.21.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/plugins/wasm-go/extensions/ext-auth/go.sum b/plugins/wasm-go/extensions/ext-auth/go.sum index 7307bf466..066d7a2f2 100644 --- a/plugins/wasm-go/extensions/ext-auth/go.sum +++ b/plugins/wasm-go/extensions/ext-auth/go.sum @@ -14,6 +14,7 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tetratelabs/wazero v1.7.2 h1:1+z5nXJNwMLPAWaTePFi49SSTL0IMx/i3Fg8Yc25GDc= github.com/tetratelabs/wazero v1.7.2/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94= github.com/tidwall/gjson v1.17.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -22,6 +23,8 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/resp v0.1.1 h1:Ly20wkhqKTmDUPlyM1S7pWo5kk0tDu8OoC/vFArXmwE= github.com/tidwall/resp v0.1.1/go.mod h1:3/FrruOBAxPTPtundW0VXgmsQ4ZBA0Aw714lVYgwFa0= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/wasilibs/go-re2 v1.6.0 h1:CLlhDebt38wtl/zz4ww+hkXBMcxjrKFvTDXzFW2VOz8= github.com/wasilibs/go-re2 v1.6.0/go.mod h1:prArCyErsypRBI/jFAFJEbzyHzjABKqkzlidF0SNA04= github.com/wasilibs/nottinygc v0.4.0 h1:h1TJMihMC4neN6Zq+WKpLxgd9xCFMw7O9ETLwY2exJQ= diff --git a/plugins/wasm-go/extensions/ext-auth/main.go b/plugins/wasm-go/extensions/ext-auth/main.go index 7d3ce54b4..53b777a5b 100644 --- a/plugins/wasm-go/extensions/ext-auth/main.go +++ b/plugins/wasm-go/extensions/ext-auth/main.go @@ -20,6 +20,7 @@ import ( "ext-auth/config" "ext-auth/util" + "github.com/alibaba/higress/plugins/wasm-go/pkg/log" "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" "github.com/higress-group/proxy-wasm-go-sdk/proxywasm" @@ -29,9 +30,9 @@ import ( func main() { wrapper.SetCtx( "ext-auth", - wrapper.ParseConfigBy(config.ParseConfig), - wrapper.ProcessRequestHeadersBy(onHttpRequestHeaders), - wrapper.ProcessRequestBodyBy(onHttpRequestBody), + wrapper.ParseConfig(config.ParseConfig), + wrapper.ProcessRequestHeaders(onHttpRequestHeaders), + wrapper.ProcessRequestBody(onHttpRequestBody), ) } @@ -50,7 +51,7 @@ const ( HeaderXForwardedHost = "x-forwarded-host" ) -func onHttpRequestHeaders(ctx wrapper.HttpContext, config config.ExtAuthConfig, log wrapper.Log) types.Action { +func onHttpRequestHeaders(ctx wrapper.HttpContext, config config.ExtAuthConfig) types.Action { // If the request's domain and path match the MatchRules, skip authentication if config.MatchRules.IsAllowedByMode(ctx.Host(), ctx.Method(), wrapper.GetRequestPathWithoutQuery()) { ctx.DontReadRequestBody() @@ -70,17 +71,17 @@ func onHttpRequestHeaders(ctx wrapper.HttpContext, config config.ExtAuthConfig, } ctx.DontReadRequestBody() - return checkExtAuth(ctx, config, nil, log, types.HeaderStopAllIterationAndWatermark) + return checkExtAuth(ctx, config, nil, types.HeaderStopAllIterationAndWatermark) } -func onHttpRequestBody(ctx wrapper.HttpContext, config config.ExtAuthConfig, body []byte, log wrapper.Log) types.Action { +func onHttpRequestBody(ctx wrapper.HttpContext, config config.ExtAuthConfig, body []byte) types.Action { if config.HttpService.AuthorizationRequest.WithRequestBody { - return checkExtAuth(ctx, config, body, log, types.DataStopIterationAndBuffer) + return checkExtAuth(ctx, config, body, types.DataStopIterationAndBuffer) } return types.ActionContinue } -func checkExtAuth(ctx wrapper.HttpContext, cfg config.ExtAuthConfig, body []byte, log wrapper.Log, pauseAction types.Action) types.Action { +func checkExtAuth(ctx wrapper.HttpContext, cfg config.ExtAuthConfig, body []byte, pauseAction types.Action) types.Action { httpServiceConfig := cfg.HttpService extAuthReqHeaders := buildExtAuthRequestHeaders(ctx, cfg)