From 634de3f7f8b0935c1262d9787f57645b21f9639b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E8=B4=A4=E6=B6=9B?= <601803023@qq.com> Date: Mon, 17 Jun 2024 10:37:03 +0800 Subject: [PATCH] feat: cluster key rate limit enhancement (#1036) --- .../cluster-key-rate-limit/README.md | 231 +++++++++----- .../cluster-key-rate-limit/config.go | 293 +++++++++++------- .../extensions/cluster-key-rate-limit/go.mod | 2 + .../extensions/cluster-key-rate-limit/go.sum | 4 + .../extensions/cluster-key-rate-limit/main.go | 114 +++++-- .../cluster-key-rate-limit/utils.go | 14 + 6 files changed, 430 insertions(+), 228 deletions(-) diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/README.md b/plugins/wasm-go/extensions/cluster-key-rate-limit/README.md index a47ce83da..448585a94 100644 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/README.md +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/README.md @@ -1,38 +1,50 @@ # 功能说明 -`key-cluster-rate-limit`插件实现了基于特定键值实现集群限流,键值来源可以是 URL 参数、HTTP 请求头、客户端 IP 地址 +`key-cluster-rate-limit`插件实现了基于特定键值实现集群限流,键值来源可以是 URL 参数、HTTP 请求头、客户端 IP 地址、consumer 名称、cookie中 key 名称 # 配置说明 -| 配置项 | 类型 | 必填 | 默认值 | 说明 | -| ----------------------- | ------ | ---- | ------ | ---- | -| rule_name | string | 是 | - | 限流规则名称,根据限流规则名称和限流的客户端IP段来拼装redis key | -| limit_by_header | string | 否,`limit_by_header`,`limit_by_param`,`limit_by_per_ip` 中选填一项 | - | 配置获取限流键值的来源 http 请求头名称 | -| limit_by_param | string | 否,`limit_by_header`,`limit_by_param`,`limit_by_per_ip` 中选填一项 | - | 配置获取限流键值的来源 URL 参数名称 | -| limit_by_per_ip | string | 否,`limit_by_header`,`limit_by_param`,`limit_by_per_ip` 中选填一项 | - | 配置获取限流键值的来源 IP 参数名称,从请求头获取,以`from-header-对应的header名`,示例:`from-header-x-forwarded-for`,直接获取对端socket ip,配置为`from-remote-addr` | -| limit_keys | array of object | 是 | - | 配置匹配键值后的限流次数 | +| 配置项 | 类型 | 必填 | 默认值 | 说明 | +| ----------------------- | ------ | ---- | ------ |---------------------------------------------------------------------------| +| rule_name | string | 是 | - | 限流规则名称,根据限流规则名称+限流类型+限流key名称+限流key对应的实际值来拼装redis key | +| rule_items | array of object | 是 | - | 限流规则项,按照rule_items下的排列顺序,匹配第一个rule_item后命中限流规则,后续规则将被忽略 | | show_limit_quota_header | bool | 否 | false | 响应头中是否显示`X-RateLimit-Limit`(限制的总请求数)和`X-RateLimit-Remaining`(剩余还可以发送的请求数) | -| rejected_code | int | 否 | 429 | 请求被限流时,返回的HTTP状态码 | -| rejected_msg | string | 否 | Too many requests | 请求被限流时,返回的响应体 | -| redis | object | 是 | - | redis相关配置 | +| rejected_code | int | 否 | 429 | 请求被限流时,返回的HTTP状态码 | +| rejected_msg | string | 否 | Too many requests | 请求被限流时,返回的响应体 | +| redis | object | 是 | - | redis相关配置 | + +`rule_items`中每一项的配置字段说明 + +| 配置项 | 类型 | 必填 | 默认值 | 说明 | +| --------------------- | --------------- | -------------------------- | ------ | ------------------------------------------------------------ | +| limit_by_header | string | 否,`limit_by_*`中选填一项 | - | 配置获取限流键值的来源 HTTP 请求头名称 | +| limit_by_param | string | 否,`limit_by_*`中选填一项 | - | 配置获取限流键值的来源 URL 参数名称 | +| limit_by_consumer | string | 否,`limit_by_*`中选填一项 | - | 根据 consumer 名称进行限流,无需添加实际值 | +| limit_by_cookie | string | 否,`limit_by_*`中选填一项 | - | 配置获取限流键值的来源 Cookie中 key 名称 | +| limit_by_per_header | string | 否,`limit_by_*`中选填一项 | - | 按规则匹配特定 HTTP 请求头,并对每个请求头分别计算限流,配置获取限流键值的来源 HTTP 请求头名称,配置`limit_keys`时支持正则表达式或`*` | +| limit_by_per_param | string | 否,`limit_by_*`中选填一项 | - | 按规则匹配特定 URL 参数,并对每个参数分别计算限流,配置获取限流键值的来源 URL 参数名称,配置`limit_keys`时支持正则表达式或`*` | +| limit_by_per_consumer | string | 否,`limit_by_*`中选填一项 | - | 按规则匹配特定 consumer,并对每个 consumer 分别计算限流,根据 consumer 名称进行限流,无需添加实际值,配置`limit_keys`时支持正则表达式或`*` | +| limit_by_per_cookie | string | 否,`limit_by_*`中选填一项 | - | 按规则匹配特定 Cookie,并对每个 Cookie 分别计算限流,配置获取限流键值的来源 Cookie中 key 名称,配置`limit_keys`时支持正则表达式或`*` | +| limit_by_per_ip | string | 否,`limit_by_*`中选填一项 | - | 按规则匹配特定 IP,并对每个 IP 分别计算限流,配置获取限流键值的来源 IP 参数名称,从请求头获取,以`from-header-对应的header名`,示例:`from-header-x-forwarded-for`,直接获取对端socket ip,配置为`from-remote-addr` | +| limit_keys | array of object | 是 | - | 配置匹配键值后的限流次数 | `limit_keys`中每一项的配置字段说明 -| 配置项 | 类型 | 必填 | 默认值 | 说明 | -| ---------------- | ------ | ------------------------------------------------------------ | ------ | ------------------ | -| key | string | 是 | - | 匹配的键值 | -| query_per_second | int | 否,`query_per_second`,`query_per_minute`,`query_per_hour`,`query_per_day` 中选填一项 | - | 允许每秒请求次数 | -| query_per_minute | int | 否,`query_per_second`,`query_per_minute`,`query_per_hour`,`query_per_day` 中选填一项 | - | 允许每分钟请求次数 | -| query_per_hour | int | 否,`query_per_second`,`query_per_minute`,`query_per_hour`,`query_per_day` 中选填一项 | - | 允许每小时请求次数 | -| query_per_day | int | 否,`query_per_second`,`query_per_minute`,`query_per_hour`,`query_per_day` 中选填一项 | - | 允许每天请求次数 | +| 配置项 | 类型 | 必填 | 默认值 | 说明 | +| ---------------- | ------ | ------------------------------------------------------------ | ------ | ------------------------------------------------------------ | +| key | string | 是 | - | 匹配的键值,`limit_by_per_header`,`limit_by_per_param`,`limit_by_per_consumer`,`limit_by_per_cookie` 类型支持配置正则表达式(以regexp:开头后面跟正则表达式)或者*(代表所有),正则表达式示例:`regexp:^d.*`(以d开头的所有字符串);`limit_by_per_ip`支持配置 IP 地址或 IP 段 | +| query_per_second | int | 否,`query_per_second`,`query_per_minute`,`query_per_hour`,`query_per_day` 中选填一项 | - | 允许每秒请求次数 | +| query_per_minute | int | 否,`query_per_second`,`query_per_minute`,`query_per_hour`,`query_per_day` 中选填一项 | - | 允许每分钟请求次数 | +| query_per_hour | int | 否,`query_per_second`,`query_per_minute`,`query_per_hour`,`query_per_day` 中选填一项 | - | 允许每小时请求次数 | +| query_per_day | int | 否,`query_per_second`,`query_per_minute`,`query_per_hour`,`query_per_day` 中选填一项 | - | 允许每天请求次数 | `redis`中每一项的配置字段说明 | 配置项 | 类型 | 必填 | 默认值 | 说明 | | ------------ | ------ | ---- | ---------------------------------------------------------- | --------------------------- | -| service_name | string | 必填 | - | 输入redis服务的注册名称 | +| service_name | string | 必填 | - | redis 服务名称,带服务类型的完整 FQDN 名称,例如 my-redis.dns、redis.my-ns.svc.cluster.local | | service_port | int | 否 | 服务类型为固定地址(static service)默认值为80,其他为6379 | 输入redis服务的服务端口 | | username | string | 否 | - | redis用户名 | | password | string | 否 | - | redis密码 | @@ -43,91 +55,140 @@ # 配置示例 ## 识别请求参数 apikey,进行区别限流 + ```yaml -rule_name: limit_by_param_apikey -limit_by_param: apikey -limit_keys: -- key: 9a342114-ba8a-11ec-b1bf-00163e1250b5 - query_per_second: 10 -- key: a6a6d7f2-ba8a-11ec-bec2-00163e1250b5 - query_per_minute: 100 +rule_name: default_rule +rule_items: + - limit_by_param: apikey + limit_keys: + - key: 9a342114-ba8a-11ec-b1bf-00163e1250b5 + query_per_minute: 10 + - key: a6a6d7f2-ba8a-11ec-bec2-00163e1250b5 + query_per_hour: 100 + - limit_by_per_param: apikey + limit_keys: + # 正则表达式,匹配以a开头的所有字符串,每个apikey对应的请求10qds + - key: "regexp:^a.*" + query_per_second: 10 + # 正则表达式,匹配以b开头的所有字符串,每个apikey对应的请求100qd + - key: "regexp:^b.*" + query_per_minute: 100 + # 兜底用,匹配所有请求,每个apikey对应的请求1000qdh + - key: "*" + query_per_hour: 1000 redis: service_name: redis.static show_limit_quota_header: true ``` + + ## 识别请求头 x-ca-key,进行区别限流 + ```yaml -rule_name: limit_by_param_x-ca-key -limit_by_header: x-ca-key -limit_keys: -- key: 102234 - query_per_second: 10 -- key: 308239 - query_per_hour: 10 +rule_name: default_rule +rule_items: + - limit_by_header: x-ca-key + limit_keys: + - key: 102234 + query_per_minute: 10 + - key: 308239 + query_per_hour: 10 + - limit_by_per_header: x-ca-key + limit_keys: + # 正则表达式,匹配以a开头的所有字符串,每个apikey对应的请求10qds + - key: "regexp:^a.*" + query_per_second: 10 + # 正则表达式,匹配以b开头的所有字符串,每个apikey对应的请求100qd + - key: "regexp:^b.*" + query_per_minute: 100 + # 兜底用,匹配所有请求,每个apikey对应的请求1000qdh + - key: "*" + query_per_hour: 1000 redis: service_name: redis.static -show_limit_quota_header: true +show_limit_quota_header: true ``` + + ## 根据请求头 x-forwarded-for 获取对端IP,进行区别限流 ```yaml -rule_name: limit_by_per_ip_from-header-x-forwarded-for -limit_by_per_ip: from-header-x-forwarded-for -limit_keys: - # 精确ip -- key: 1.1.1.1 - query_per_day: 10 - # ip段,符合这个ip段的ip,每个ip 100qps -- key: 1.1.1.0/24 - query_per_day: 100 - # 兜底用,即默认每个ip 1000qps -- key: 0.0.0.0/0 - query_per_day: 1000 +rule_name: default_rule +rule_items: + - limit_by_per_ip: from-header-x-forwarded-for + limit_keys: + # 精确ip + - key: 1.1.1.1 + query_per_day: 10 + # ip段,符合这个ip段的ip,每个ip 100qpd + - key: 1.1.1.0/24 + query_per_day: 100 + # 兜底用,即默认每个ip 1000qpd + - key: 0.0.0.0/0 + query_per_day: 1000 redis: service_name: redis.static -show_limit_quota_header: true +show_limit_quota_header: true ``` -## 对特定路由或域名开启 + + +## 识别consumer,进行区别限流 ```yaml -# 使用_rules_字段进行细粒度规则配置 -_rules_: -# 规则一:按路由名称匹配生效 -- _match_route_: - - route-a - - route-b - rule_name: limit_rule1 - limit_by_per_ip: from-header-x-forwarded-for - limit_keys: - # 精确ip - - key: 1.1.1.1 - query_per_day: 10 - # ip段,符合这个ip段的ip,每个ip 100qps - - key: 1.1.1.0/24 - query_per_day: 100 - # 兜底用,即默认每个ip 1000qps - - key: 0.0.0.0/0 - query_per_day: 1000 - redis: - service_name: redis.static -# 规则二:按域名匹配生效 -- _match_domain_: - - "*.example.com" - - test.com - rule_name: limit_rule2 - limit_by_param: apikey - limit_keys: - - key: 9a342114-ba8a-11ec-b1bf-00163e1250b5 - query_per_second: 10 - - key: a6a6d7f2-ba8a-11ec-bec2-00163e1250b5 - query_per_minute: 100 - redis: - service_name: redis.static - show_limit_quota_header: true +rule_name: default_rule +rule_items: + - limit_by_consumer: '' + limit_keys: + - key: consumer1 + query_per_second: 10 + - key: consumer2 + query_per_hour: 100 + - limit_by_per_consumer: '' + limit_keys: + # 正则表达式,匹配以a开头的所有字符串,每个consumer对应的请求10qds + - key: "regexp:^a.*" + query_per_second: 10 + # 正则表达式,匹配以b开头的所有字符串,每个consumer对应的请求100qd + - key: "regexp:^b.*" + query_per_minute: 100 + # 兜底用,匹配所有请求,每个consumer对应的请求1000qdh + - key: "*" + query_per_hour: 1000 +redis: + service_name: redis.static +show_limit_quota_header: true +``` + + + +## 识别cookie中的键值对,进行区别限流 + +```yaml +rule_name: default_rule +rule_items: + - limit_by_cookie: key1 + limit_keys: + - key: value1 + query_per_minute: 10 + - key: value2 + query_per_hour: 100 + - limit_by_per_cookie: key1 + limit_keys: + # 正则表达式,匹配以a开头的所有字符串,每个cookie中的value对应的请求10qds + - key: "regexp:^a.*" + query_per_second: 10 + # 正则表达式,匹配以b开头的所有字符串,每个cookie中的value对应的请求100qd + - key: "regexp:^b.*" + query_per_minute: 100 + # 兜底用,匹配所有请求,每个cookie中的value对应的请求1000qdh + - key: "*" + query_per_hour: 1000 +rejected_code: 200 +rejected_msg: '{"code":-1,"msg":"Too many requests"}' +redis: + service_name: redis.static +show_limit_quota_header: true ``` -此例 `_match_route_` 中指定的 `route-a` 和 `route-b` 即在创建网关路由时填写的路由名称,当匹配到这两个路由时,将使用此段配置; -此例 `_match_domain_` 中指定的 `*.example.com` 和 `test.com` 用于匹配请求的域名,当发现域名匹配时,将使用此段配置; -配置的匹配生效顺序,将按照 `_rules_` 下规则的排列顺序,匹配第一个规则后生效对应配置,后续规则将被忽略 diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/config.go b/plugins/wasm-go/extensions/cluster-key-rate-limit/config.go index 2f7a5c397..ae7437015 100644 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/config.go +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/config.go @@ -2,19 +2,35 @@ package main import ( "errors" + "fmt" "github.com/alibaba/higress/plugins/wasm-go/pkg/wrapper" "github.com/tidwall/gjson" + re "github.com/wasilibs/go-re2" "github.com/zmap/go-iptree/iptree" "strings" ) -// 限流规则类型 -type limitRuleType string +// 限流规则项类型 +type limitRuleItemType string + +// 限流配置项key类型 +type limitConfigItemType string const ( - limitByHeaderType limitRuleType = "limitByHeader" - limitByParamType limitRuleType = "limitByParam" - limitByPerIpType limitRuleType = "limitByPerIp" + limitByHeaderType limitRuleItemType = "limit_by_header" + limitByParamType limitRuleItemType = "limit_by_param" + limitByConsumerType limitRuleItemType = "limit_by_consumer" + limitByCookieType limitRuleItemType = "limit_by_cookie" + limitByPerHeaderType limitRuleItemType = "limit_by_per_header" + limitByPerParamType limitRuleItemType = "limit_by_per_param" + limitByPerConsumerType limitRuleItemType = "limit_by_per_consumer" + limitByPerCookieType limitRuleItemType = "limit_by_per_cookie" + limitByPerIpType limitRuleItemType = "limit_by_per_ip" + + exactType limitConfigItemType = "exact" // 精确匹配 + regexpType limitConfigItemType = "regexp" // 正则表达式 + allType limitConfigItemType = "*" // 匹配所有情况 + ipNetType limitConfigItemType = "ipNet" // ip段 RemoteAddrSourceType = "remote-addr" HeaderSourceType = "header" @@ -28,29 +44,41 @@ const ( SecondsPerDay = 24 * SecondsPerHour ) +var timeWindows = map[string]int64{ + "query_per_second": Second, + "query_per_minute": SecondsPerMinute, + "query_per_hour": SecondsPerHour, + "query_per_day": SecondsPerDay, +} + type ClusterKeyRateLimitConfig struct { - ruleName string // 限流规则名称 - limitType limitRuleType // 限流类型 - limitByHeader string // 根据http请求头限流 - limitByParam string // 根据url参数限流 - limitByPerIp LimitByPerIp // 根据对端ip限流 - limitItems []LimitItem // 限流配置 key为限流的ip地址或者ip段 - showLimitQuotaHeader bool // 响应头中是否显示X-RateLimit-Limit和X-RateLimit-Remaining - rejectedCode uint32 // 当请求超过阈值被拒绝时,返回的HTTP状态码 - rejectedMsg string // 当请求超过阈值被拒绝时,返回的响应体 + ruleName string // 限流规则名称 + ruleItems []LimitRuleItem // 限流规则项 + showLimitQuotaHeader bool // 响应头中是否显示X-RateLimit-Limit和X-RateLimit-Remaining + rejectedCode uint32 // 当请求超过阈值被拒绝时,返回的HTTP状态码 + rejectedMsg string // 当请求超过阈值被拒绝时,返回的响应体 redisClient wrapper.RedisClient } +type LimitRuleItem struct { + limitType limitRuleItemType // 限流类型 + key string // 根据该key值进行限流,limit_by_consumer和limit_by_per_consumer两种类型为ConsumerHeader,其他类型为对应的key值 + limitByPerIp LimitByPerIp // 对端ip地址或ip段 + configItems []LimitConfigItem // 限流配置项 +} + type LimitByPerIp struct { sourceType string // ip来源类型 headerName string // 根据该请求头获取客户端ip } -type LimitItem struct { - key string // 限流key - ipNet *iptree.IPTree // 限流key转换的ip地址或者ip段 - count int64 // 指定时间窗口内的总请求数量阈值 - timeWindow int64 // 时间窗口大小 +type LimitConfigItem struct { + configType limitConfigItemType // 限流配置项key类型 + key string // 限流key + ipNet *iptree.IPTree // 限流key转换的ip地址或者ip段,仅用于itemType为ipNetType + regexp *re.Regexp // 正则表达式,仅用于itemType为regexpType + count int64 // 指定时间窗口内的总请求数量阈值 + timeWindow int64 // 时间窗口大小 } func initRedisClusterClient(json gjson.Result, config *ClusterKeyRateLimitConfig) error { @@ -84,56 +112,15 @@ func initRedisClusterClient(json gjson.Result, config *ClusterKeyRateLimitConfig return config.redisClient.Init(username, password, int64(timeout)) } -func parseClusterKeyRateLimitConfig(json gjson.Result, config *ClusterKeyRateLimitConfig, log wrapper.Log) error { +func parseClusterKeyRateLimitConfig(json gjson.Result, config *ClusterKeyRateLimitConfig) error { ruleName := json.Get("rule_name") if !ruleName.Exists() { return errors.New("missing rule_name in config") } config.ruleName = ruleName.String() - // 根据配置区分限流类型 - var limitType limitRuleType - limitByHeader := json.Get("limit_by_header") - if limitByHeader.Exists() && limitByHeader.String() != "" { - config.limitByHeader = limitByHeader.String() - limitType = limitByHeaderType - } - - limitByParam := json.Get("limit_by_param") - if limitByParam.Exists() && limitByParam.String() != "" { - config.limitByParam = limitByParam.String() - limitType = limitByParamType - } - - limitByPerIpResult := json.Get("limit_by_per_ip") - if limitByPerIpResult.Exists() && limitByPerIpResult.String() != "" { - limitByPerIp := limitByPerIpResult.String() - if strings.HasPrefix(limitByPerIp, "from-header-") { - headerName := limitByPerIp[len("from-header-"):] - if headerName == "" { - return errors.New("limit_by_per_ip parse error: empty after 'from-header-'") - } - config.limitByPerIp = LimitByPerIp{ - sourceType: HeaderSourceType, - headerName: headerName, - } - } else if limitByPerIp == "from-remote-addr" { - config.limitByPerIp = LimitByPerIp{ - sourceType: RemoteAddrSourceType, - headerName: "", - } - } else { - return errors.New("the 'limit_by_per_ip' restriction must start with 'from-header-' or be exactly 'from-remote-addr'") - } - limitType = limitByPerIpType - } - if limitType == "" { - return errors.New("only one of 'limit_by_header' and 'limit_by_param' and 'limit_by_per_ip' can be set") - } - config.limitType = limitType - - // 初始化LimitItem - err := initLimitItems(json, config, log) + // 初始化ruleItems + err := initRuleItems(json, config) if err != nil { return err } @@ -158,7 +145,86 @@ func parseClusterKeyRateLimitConfig(json gjson.Result, config *ClusterKeyRateLim return nil } -func initLimitItems(json gjson.Result, config *ClusterKeyRateLimitConfig, log wrapper.Log) error { +func initRuleItems(json gjson.Result, config *ClusterKeyRateLimitConfig) error { + ruleItemsResult := json.Get("rule_items") + if !ruleItemsResult.Exists() { + return errors.New("missing rule_items in config") + } + if len(ruleItemsResult.Array()) == 0 { + return errors.New("config rule_items cannot be empty") + } + var ruleItems []LimitRuleItem + for _, item := range ruleItemsResult.Array() { + var ruleItem LimitRuleItem + + // 根据配置区分限流类型 + var limitType limitRuleItemType + setLimitByKeyIfExists := func(field gjson.Result, limitTypeStr limitRuleItemType) { + if field.Exists() && field.String() != "" { + ruleItem.key = field.String() + limitType = limitTypeStr + } + } + setLimitByKeyIfExists(item.Get("limit_by_header"), limitByHeaderType) + setLimitByKeyIfExists(item.Get("limit_by_param"), limitByParamType) + setLimitByKeyIfExists(item.Get("limit_by_cookie"), limitByCookieType) + setLimitByKeyIfExists(item.Get("limit_by_per_header"), limitByPerHeaderType) + setLimitByKeyIfExists(item.Get("limit_by_per_param"), limitByPerParamType) + setLimitByKeyIfExists(item.Get("limit_by_per_cookie"), limitByPerCookieType) + + limitByConsumer := item.Get("limit_by_consumer") + if limitByConsumer.Exists() { + ruleItem.key = ConsumerHeader + limitType = limitByConsumerType + } + limitByPerConsumer := item.Get("limit_by_per_consumer") + if limitByPerConsumer.Exists() { + ruleItem.key = ConsumerHeader + limitType = limitByPerConsumerType + } + + limitByPerIpResult := item.Get("limit_by_per_ip") + if limitByPerIpResult.Exists() && limitByPerIpResult.String() != "" { + limitByPerIp := limitByPerIpResult.String() + ruleItem.key = limitByPerIp + if strings.HasPrefix(limitByPerIp, "from-header-") { + headerName := limitByPerIp[len("from-header-"):] + if headerName == "" { + return errors.New("limit_by_per_ip parse error: empty after 'from-header-'") + } + ruleItem.limitByPerIp = LimitByPerIp{ + sourceType: HeaderSourceType, + headerName: headerName, + } + } else if limitByPerIp == "from-remote-addr" { + ruleItem.limitByPerIp = LimitByPerIp{ + sourceType: RemoteAddrSourceType, + headerName: "", + } + } else { + return errors.New("the 'limit_by_per_ip' restriction must start with 'from-header-' or be exactly 'from-remote-addr'") + } + limitType = limitByPerIpType + } + + if limitType == "" { + return errors.New("only one of 'limit_by_header' and 'limit_by_param' and 'limit_by_consumer' and 'limit_by_cookie' and 'limit_by_per_header' and 'limit_by_per_param' and 'limit_by_per_consumer' and 'limit_by_per_cookie' and 'limit_by_per_ip' can be set") + } + ruleItem.limitType = limitType + + // 初始化configItems + err := initConfigItems(item, &ruleItem) + if err != nil { + return err + } + + ruleItems = append(ruleItems, ruleItem) + } + config.ruleItems = ruleItems + return nil +} + +func initConfigItems(json gjson.Result, rule *LimitRuleItem) error { limitKeys := json.Get("limit_keys") if !limitKeys.Exists() { return errors.New("missing limit_keys in config") @@ -166,65 +232,70 @@ func initLimitItems(json gjson.Result, config *ClusterKeyRateLimitConfig, log wr if len(limitKeys.Array()) == 0 { return errors.New("config limit_keys cannot be empty") } - var limitItems []LimitItem + var configItems []LimitConfigItem for _, item := range limitKeys.Array() { key := item.Get("key") if !key.Exists() || key.String() == "" { return errors.New("limit_keys key is required") } - var ipNet *iptree.IPTree - if config.limitType == limitByPerIpType { + + var ( + itemKey = key.String() + itemType limitConfigItemType + ipNet *iptree.IPTree + regexp *re.Regexp + ) + if rule.limitType == limitByPerIpType { var err error - ipNet, err = parseIPNet(key.String()) + ipNet, err = parseIPNet(itemKey) if err != nil { - log.Errorf("parseIPNet error: %v", err) - return err + return fmt.Errorf("failed to parse IPNet for key '%s': %w", itemKey, err) + } + itemType = ipNetType + } else if rule.limitType == limitByPerHeaderType || + rule.limitType == limitByPerParamType || + rule.limitType == limitByPerConsumerType || + rule.limitType == limitByPerCookieType { + if itemKey == "*" { + itemType = allType + } else if strings.HasPrefix(itemKey, "regexp:") { + regexpStr := itemKey[len("regexp:"):] + var err error + regexp, err = re.Compile(regexpStr) + if err != nil { + return fmt.Errorf("failed to compile regex for key '%s': %w", itemKey, err) + } + itemType = regexpType + } else { + return fmt.Errorf("the '%s' restriction must start with 'regexp:' or be exactly '*'", rule.limitType) } } else { - ipNet = nil + itemType = exactType } - qps := item.Get("query_per_second") - if qps.Exists() && qps.Int() > 0 { - limitItems = append(limitItems, LimitItem{ - key: key.String(), - ipNet: ipNet, - count: qps.Int(), - timeWindow: Second, - }) - continue - } - qpm := item.Get("query_per_minute") - if qpm.Exists() && qpm.Int() > 0 { - limitItems = append(limitItems, LimitItem{ - key: key.String(), - ipNet: ipNet, - count: qpm.Int(), - timeWindow: SecondsPerMinute, - }) - continue - } - qph := item.Get("query_per_hour") - if qph.Exists() && qph.Int() > 0 { - limitItems = append(limitItems, LimitItem{ - key: key.String(), - ipNet: ipNet, - count: qph.Int(), - timeWindow: SecondsPerHour, - }) - continue - } - qpd := item.Get("query_per_day") - if qpd.Exists() && qpd.Int() > 0 { - limitItems = append(limitItems, LimitItem{ - key: key.String(), - ipNet: ipNet, - count: qpd.Int(), - timeWindow: SecondsPerDay, - }) - continue + if configItem, err := createConfigItemFromRate(item, itemType, itemKey, ipNet, regexp); err != nil { + return err + } else if configItem != nil { + configItems = append(configItems, *configItem) } } - config.limitItems = limitItems + rule.configItems = configItems return nil } + +func createConfigItemFromRate(item gjson.Result, itemType limitConfigItemType, key string, ipNet *iptree.IPTree, regexp *re.Regexp) (*LimitConfigItem, error) { + for timeWindowKey, duration := range timeWindows { + q := item.Get(timeWindowKey) + if q.Exists() && q.Int() > 0 { + return &LimitConfigItem{ + configType: itemType, + key: key, + ipNet: ipNet, + regexp: regexp, + count: q.Int(), + timeWindow: duration, + }, nil + } + } + return nil, errors.New("one of 'query_per_second', 'query_per_minute', 'query_per_hour', or 'query_per_day' must be set for key: " + key) +} diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/go.mod b/plugins/wasm-go/extensions/cluster-key-rate-limit/go.mod index 2ce5cc8f7..e5b150336 100644 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/go.mod +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/go.mod @@ -9,6 +9,7 @@ require ( github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240327114451-d6b7174a84fc github.com/tidwall/gjson v1.14.3 github.com/tidwall/resp v0.1.1 + github.com/wasilibs/go-re2 v1.5.3 github.com/zmap/go-iptree v0.0.0-20210731043055-d4e632617837 ) @@ -17,6 +18,7 @@ require ( github.com/google/uuid v1.3.0 // indirect github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 // indirect github.com/magefile/mage v1.14.0 // indirect + github.com/tetratelabs/wazero v1.7.1 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect ) diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/go.sum b/plugins/wasm-go/extensions/cluster-key-rate-limit/go.sum index 7b2447a01..1757095bd 100644 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/go.sum +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/go.sum @@ -11,6 +11,8 @@ github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo= github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/tetratelabs/wazero v1.7.1 h1:QtSfd6KLc41DIMpDYlJdoMc6k7QTN246DM2+n2Y/Dx8= +github.com/tetratelabs/wazero v1.7.1/go.mod h1:ytl6Zuh20R/eROuyDaGPkp82O9C/DJfXAwJfQ3X6/7Y= github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw= github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -19,6 +21,8 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/resp v0.1.1 h1:Ly20wkhqKTmDUPlyM1S7pWo5kk0tDu8OoC/vFArXmwE= github.com/tidwall/resp v0.1.1/go.mod h1:3/FrruOBAxPTPtundW0VXgmsQ4ZBA0Aw714lVYgwFa0= +github.com/wasilibs/go-re2 v1.5.3 h1:wiuTcgDZdLhu8NG8oqF5sF5Q3yIU14lPAvXqeYzDK3g= +github.com/wasilibs/go-re2 v1.5.3/go.mod h1:PzpVPsBdFC7vM8QJbbEnOeTmwA0DGE783d/Gex8eCV8= github.com/zmap/go-iptree v0.0.0-20210731043055-d4e632617837 h1:DjHnADS2r2zynZ3WkCFAQ+PNYngMSNceRROi0pO6c3M= github.com/zmap/go-iptree v0.0.0-20210731043055-d4e632617837/go.mod h1:9vp0bxqozzQwcjBwenEXfKVq8+mYbwHkQ1NF9Ap0DMw= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/main.go b/plugins/wasm-go/extensions/cluster-key-rate-limit/main.go index 4c9cb3805..ab29c3f31 100644 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/main.go +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/main.go @@ -37,7 +37,7 @@ func main() { } const ( - ClusterRateLimitFormat string = "higress-cluster-key-rate-limit:%s:%s" + ClusterRateLimitFormat string = "higress-cluster-key-rate-limit:%s:%s:%s:%s" // redis key为前缀:限流规则名称:限流类型:限流key名称:限流key对应的实际值 FixedWindowScript string = ` local ttl = redis.call('ttl', KEYS[1]) if ttl < 0 then @@ -46,11 +46,12 @@ const ( end return {ARGV[1], redis.call('incrby', KEYS[1], -1), ttl} ` -) -const ( LimitContextKey string = "LimitContext" // 限流上下文信息 + ConsumerHeader string = "x-mse-consumer" // LimitByConsumer从该request header获取consumer的名字 + CookieHeader string = "cookie" + RateLimitLimitHeader string = "X-RateLimit-Limit" // 限制的总请求数 RateLimitRemainingHeader string = "X-RateLimit-Remaining" // 剩余还可以发送的请求数 RateLimitResetHeader string = "X-RateLimit-Reset" // 限流重置时间(触发限流时返回) @@ -67,7 +68,7 @@ func parseConfig(json gjson.Result, config *ClusterKeyRateLimitConfig, log wrapp if err != nil { return err } - err = parseClusterKeyRateLimitConfig(json, config, log) + err = parseClusterKeyRateLimitConfig(json, config) if err != nil { return err } @@ -76,15 +77,15 @@ func parseConfig(json gjson.Result, config *ClusterKeyRateLimitConfig, log wrapp func onHttpRequestHeaders(ctx wrapper.HttpContext, config ClusterKeyRateLimitConfig, log wrapper.Log) types.Action { // 判断是否命中限流规则 - key, limitItem := hitRateLimitRule(ctx, config, log) - if limitItem == nil { + val, ruleItem, configItem := checkRequestAgainstLimitRule(ctx, config.ruleItems, log) + if ruleItem == nil || configItem == nil { return types.ActionContinue } // 构建redis限流key和参数 - limitKey := fmt.Sprintf(ClusterRateLimitFormat, config.ruleName, key) + limitKey := fmt.Sprintf(ClusterRateLimitFormat, config.ruleName, ruleItem.limitType, ruleItem.key, val) keys := []interface{}{limitKey} - args := []interface{}{limitItem.count, limitItem.timeWindow} + args := []interface{}{configItem.count, configItem.timeWindow} // 执行限流逻辑 err := config.redisClient.Eval(FixedWindowScript, 1, keys, args, func(response resp.Value) { defer func() { @@ -126,43 +127,92 @@ func onHttpResponseHeaders(ctx wrapper.HttpContext, config ClusterKeyRateLimitCo return types.ActionContinue } -func hitRateLimitRule(ctx wrapper.HttpContext, config ClusterKeyRateLimitConfig, log wrapper.Log) (string, *LimitItem) { - switch config.limitType { - case limitByHeaderType: - headerVal, err := proxywasm.GetHttpRequestHeader(config.limitByHeader) +func checkRequestAgainstLimitRule(ctx wrapper.HttpContext, ruleItems []LimitRuleItem, log wrapper.Log) (string, *LimitRuleItem, *LimitConfigItem) { + for _, rule := range ruleItems { + val, ruleItem, configItem := hitRateRuleItem(ctx, rule, log) + if ruleItem != nil && configItem != nil { + return val, ruleItem, configItem + } + } + return "", nil, nil +} + +func hitRateRuleItem(ctx wrapper.HttpContext, rule LimitRuleItem, log wrapper.Log) (string, *LimitRuleItem, *LimitConfigItem) { + switch rule.limitType { + // 根据HTTP请求头限流 + case limitByHeaderType, limitByPerHeaderType: + val, err := proxywasm.GetHttpRequestHeader(rule.key) if err != nil { - log.Debugf("failed to get request header %s: %v", config.limitByHeader, err) - return "", nil + return logDebugAndReturnEmpty(log, "failed to get request header %s: %v", rule.key, err) } - return headerVal, findMatchingItem(config.limitItems, headerVal) - case limitByParamType: - parse, _ := url.Parse(ctx.Path()) - query, _ := url.ParseQuery(parse.RawQuery) - val, ok := query[config.limitByParam] + return val, &rule, findMatchingItem(rule.limitType, rule.configItems, val) + // 根据HTTP请求参数限流 + case limitByParamType, limitByPerParamType: + parse, err := url.Parse(ctx.Path()) + if err != nil { + return logDebugAndReturnEmpty(log, "failed to parse request path: %v", err) + } + query, err := url.ParseQuery(parse.RawQuery) + if err != nil { + return logDebugAndReturnEmpty(log, "failed to parse query params: %v", err) + } + val, ok := query[rule.key] if !ok { - log.Debugf("request param %s is empty", config.limitByParam) - return "", nil - } else { - return val[0], findMatchingItem(config.limitItems, val[0]) + return logDebugAndReturnEmpty(log, "request param %s is empty", rule.key) } + return val[0], &rule, findMatchingItem(rule.limitType, rule.configItems, val[0]) + // 根据consumer限流 + case limitByConsumerType, limitByPerConsumerType: + val, err := proxywasm.GetHttpRequestHeader(ConsumerHeader) + if err != nil { + return logDebugAndReturnEmpty(log, "failed to get request header %s: %v", ConsumerHeader, err) + } + return val, &rule, findMatchingItem(rule.limitType, rule.configItems, val) + // 根据cookie中key值限流 + case limitByCookieType, limitByPerCookieType: + cookie, err := proxywasm.GetHttpRequestHeader(CookieHeader) + if err != nil { + return logDebugAndReturnEmpty(log, "failed to get request cookie : %v", err) + } + val := extractCookieValueByKey(cookie, rule.key) + if val == "" { + return logDebugAndReturnEmpty(log, "cookie key '%s' extracted from cookie '%s' is empty.", rule.key, cookie) + } + return val, &rule, findMatchingItem(rule.limitType, rule.configItems, val) + // 根据客户端IP限流 case limitByPerIpType: - realIp, err := getDownStreamIp(config) + realIp, err := getDownStreamIp(rule) if err != nil { log.Warnf("failed to get down stream ip: %v", err) - return "", nil + return "", &rule, nil } - for _, item := range config.limitItems { + for _, item := range rule.configItems { if _, found, _ := item.ipNet.Get(realIp); !found { continue } - return realIp.String(), &item + return realIp.String(), &rule, &item } } - return "", nil + return "", nil, nil } -func findMatchingItem(items []LimitItem, key string) *LimitItem { +func logDebugAndReturnEmpty(log wrapper.Log, errMsg string, args ...interface{}) (string, *LimitRuleItem, *LimitConfigItem) { + log.Debugf(errMsg, args...) + return "", nil, nil +} + +func findMatchingItem(limitType limitRuleItemType, items []LimitConfigItem, key string) *LimitConfigItem { for _, item := range items { + // per类型,检查allType和regexpType + if limitType == limitByPerHeaderType || + limitType == limitByPerParamType || + limitType == limitByPerConsumerType || + limitType == limitByPerCookieType { + if item.configType == allType || (item.configType == regexpType && item.regexp.MatchString(key)) { + return &item + } + } + // 其他类型,直接比较key if item.key == key { return &item } @@ -170,13 +220,13 @@ func findMatchingItem(items []LimitItem, key string) *LimitItem { return nil } -func getDownStreamIp(config ClusterKeyRateLimitConfig) (net.IP, error) { +func getDownStreamIp(rule LimitRuleItem) (net.IP, error) { var ( realIpStr string err error ) - if config.limitByPerIp.sourceType == HeaderSourceType { - realIpStr, err = proxywasm.GetHttpRequestHeader(config.limitByPerIp.headerName) + if rule.limitByPerIp.sourceType == HeaderSourceType { + realIpStr, err = proxywasm.GetHttpRequestHeader(rule.limitByPerIp.headerName) if err == nil { realIpStr = strings.Split(strings.Trim(realIpStr, " "), ",")[0] } diff --git a/plugins/wasm-go/extensions/cluster-key-rate-limit/utils.go b/plugins/wasm-go/extensions/cluster-key-rate-limit/utils.go index 13fee7089..b8bba925c 100644 --- a/plugins/wasm-go/extensions/cluster-key-rate-limit/utils.go +++ b/plugins/wasm-go/extensions/cluster-key-rate-limit/utils.go @@ -43,3 +43,17 @@ func reconvertHeaders(hs map[string][]string) [][2]string { }) return ret } + +// extractCookieValueByKey 从cookie中提取key对应的value +func extractCookieValueByKey(cookie string, key string) (value string) { + pairs := strings.Split(cookie, ";") + for _, pair := range pairs { + pair = strings.TrimSpace(pair) + kv := strings.Split(pair, "=") + if kv[0] == key { + value = kv[1] + break + } + } + return value +}