mirror of
https://github.com/alibaba/higress.git
synced 2026-04-22 04:27:26 +08:00
feat: cluster key rate limit enhancement (#1036)
This commit is contained in:
@@ -37,7 +37,7 @@ func main() {
|
||||
}
|
||||
|
||||
const (
|
||||
ClusterRateLimitFormat string = "higress-cluster-key-rate-limit:%s:%s"
|
||||
ClusterRateLimitFormat string = "higress-cluster-key-rate-limit:%s:%s:%s:%s" // redis key为前缀:限流规则名称:限流类型:限流key名称:限流key对应的实际值
|
||||
FixedWindowScript string = `
|
||||
local ttl = redis.call('ttl', KEYS[1])
|
||||
if ttl < 0 then
|
||||
@@ -46,11 +46,12 @@ const (
|
||||
end
|
||||
return {ARGV[1], redis.call('incrby', KEYS[1], -1), ttl}
|
||||
`
|
||||
)
|
||||
|
||||
const (
|
||||
LimitContextKey string = "LimitContext" // 限流上下文信息
|
||||
|
||||
ConsumerHeader string = "x-mse-consumer" // LimitByConsumer从该request header获取consumer的名字
|
||||
CookieHeader string = "cookie"
|
||||
|
||||
RateLimitLimitHeader string = "X-RateLimit-Limit" // 限制的总请求数
|
||||
RateLimitRemainingHeader string = "X-RateLimit-Remaining" // 剩余还可以发送的请求数
|
||||
RateLimitResetHeader string = "X-RateLimit-Reset" // 限流重置时间(触发限流时返回)
|
||||
@@ -67,7 +68,7 @@ func parseConfig(json gjson.Result, config *ClusterKeyRateLimitConfig, log wrapp
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = parseClusterKeyRateLimitConfig(json, config, log)
|
||||
err = parseClusterKeyRateLimitConfig(json, config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -76,15 +77,15 @@ func parseConfig(json gjson.Result, config *ClusterKeyRateLimitConfig, log wrapp
|
||||
|
||||
func onHttpRequestHeaders(ctx wrapper.HttpContext, config ClusterKeyRateLimitConfig, log wrapper.Log) types.Action {
|
||||
// 判断是否命中限流规则
|
||||
key, limitItem := hitRateLimitRule(ctx, config, log)
|
||||
if limitItem == nil {
|
||||
val, ruleItem, configItem := checkRequestAgainstLimitRule(ctx, config.ruleItems, log)
|
||||
if ruleItem == nil || configItem == nil {
|
||||
return types.ActionContinue
|
||||
}
|
||||
|
||||
// 构建redis限流key和参数
|
||||
limitKey := fmt.Sprintf(ClusterRateLimitFormat, config.ruleName, key)
|
||||
limitKey := fmt.Sprintf(ClusterRateLimitFormat, config.ruleName, ruleItem.limitType, ruleItem.key, val)
|
||||
keys := []interface{}{limitKey}
|
||||
args := []interface{}{limitItem.count, limitItem.timeWindow}
|
||||
args := []interface{}{configItem.count, configItem.timeWindow}
|
||||
// 执行限流逻辑
|
||||
err := config.redisClient.Eval(FixedWindowScript, 1, keys, args, func(response resp.Value) {
|
||||
defer func() {
|
||||
@@ -126,43 +127,92 @@ func onHttpResponseHeaders(ctx wrapper.HttpContext, config ClusterKeyRateLimitCo
|
||||
return types.ActionContinue
|
||||
}
|
||||
|
||||
func hitRateLimitRule(ctx wrapper.HttpContext, config ClusterKeyRateLimitConfig, log wrapper.Log) (string, *LimitItem) {
|
||||
switch config.limitType {
|
||||
case limitByHeaderType:
|
||||
headerVal, err := proxywasm.GetHttpRequestHeader(config.limitByHeader)
|
||||
func checkRequestAgainstLimitRule(ctx wrapper.HttpContext, ruleItems []LimitRuleItem, log wrapper.Log) (string, *LimitRuleItem, *LimitConfigItem) {
|
||||
for _, rule := range ruleItems {
|
||||
val, ruleItem, configItem := hitRateRuleItem(ctx, rule, log)
|
||||
if ruleItem != nil && configItem != nil {
|
||||
return val, ruleItem, configItem
|
||||
}
|
||||
}
|
||||
return "", nil, nil
|
||||
}
|
||||
|
||||
func hitRateRuleItem(ctx wrapper.HttpContext, rule LimitRuleItem, log wrapper.Log) (string, *LimitRuleItem, *LimitConfigItem) {
|
||||
switch rule.limitType {
|
||||
// 根据HTTP请求头限流
|
||||
case limitByHeaderType, limitByPerHeaderType:
|
||||
val, err := proxywasm.GetHttpRequestHeader(rule.key)
|
||||
if err != nil {
|
||||
log.Debugf("failed to get request header %s: %v", config.limitByHeader, err)
|
||||
return "", nil
|
||||
return logDebugAndReturnEmpty(log, "failed to get request header %s: %v", rule.key, err)
|
||||
}
|
||||
return headerVal, findMatchingItem(config.limitItems, headerVal)
|
||||
case limitByParamType:
|
||||
parse, _ := url.Parse(ctx.Path())
|
||||
query, _ := url.ParseQuery(parse.RawQuery)
|
||||
val, ok := query[config.limitByParam]
|
||||
return val, &rule, findMatchingItem(rule.limitType, rule.configItems, val)
|
||||
// 根据HTTP请求参数限流
|
||||
case limitByParamType, limitByPerParamType:
|
||||
parse, err := url.Parse(ctx.Path())
|
||||
if err != nil {
|
||||
return logDebugAndReturnEmpty(log, "failed to parse request path: %v", err)
|
||||
}
|
||||
query, err := url.ParseQuery(parse.RawQuery)
|
||||
if err != nil {
|
||||
return logDebugAndReturnEmpty(log, "failed to parse query params: %v", err)
|
||||
}
|
||||
val, ok := query[rule.key]
|
||||
if !ok {
|
||||
log.Debugf("request param %s is empty", config.limitByParam)
|
||||
return "", nil
|
||||
} else {
|
||||
return val[0], findMatchingItem(config.limitItems, val[0])
|
||||
return logDebugAndReturnEmpty(log, "request param %s is empty", rule.key)
|
||||
}
|
||||
return val[0], &rule, findMatchingItem(rule.limitType, rule.configItems, val[0])
|
||||
// 根据consumer限流
|
||||
case limitByConsumerType, limitByPerConsumerType:
|
||||
val, err := proxywasm.GetHttpRequestHeader(ConsumerHeader)
|
||||
if err != nil {
|
||||
return logDebugAndReturnEmpty(log, "failed to get request header %s: %v", ConsumerHeader, err)
|
||||
}
|
||||
return val, &rule, findMatchingItem(rule.limitType, rule.configItems, val)
|
||||
// 根据cookie中key值限流
|
||||
case limitByCookieType, limitByPerCookieType:
|
||||
cookie, err := proxywasm.GetHttpRequestHeader(CookieHeader)
|
||||
if err != nil {
|
||||
return logDebugAndReturnEmpty(log, "failed to get request cookie : %v", err)
|
||||
}
|
||||
val := extractCookieValueByKey(cookie, rule.key)
|
||||
if val == "" {
|
||||
return logDebugAndReturnEmpty(log, "cookie key '%s' extracted from cookie '%s' is empty.", rule.key, cookie)
|
||||
}
|
||||
return val, &rule, findMatchingItem(rule.limitType, rule.configItems, val)
|
||||
// 根据客户端IP限流
|
||||
case limitByPerIpType:
|
||||
realIp, err := getDownStreamIp(config)
|
||||
realIp, err := getDownStreamIp(rule)
|
||||
if err != nil {
|
||||
log.Warnf("failed to get down stream ip: %v", err)
|
||||
return "", nil
|
||||
return "", &rule, nil
|
||||
}
|
||||
for _, item := range config.limitItems {
|
||||
for _, item := range rule.configItems {
|
||||
if _, found, _ := item.ipNet.Get(realIp); !found {
|
||||
continue
|
||||
}
|
||||
return realIp.String(), &item
|
||||
return realIp.String(), &rule, &item
|
||||
}
|
||||
}
|
||||
return "", nil
|
||||
return "", nil, nil
|
||||
}
|
||||
|
||||
func findMatchingItem(items []LimitItem, key string) *LimitItem {
|
||||
func logDebugAndReturnEmpty(log wrapper.Log, errMsg string, args ...interface{}) (string, *LimitRuleItem, *LimitConfigItem) {
|
||||
log.Debugf(errMsg, args...)
|
||||
return "", nil, nil
|
||||
}
|
||||
|
||||
func findMatchingItem(limitType limitRuleItemType, items []LimitConfigItem, key string) *LimitConfigItem {
|
||||
for _, item := range items {
|
||||
// per类型,检查allType和regexpType
|
||||
if limitType == limitByPerHeaderType ||
|
||||
limitType == limitByPerParamType ||
|
||||
limitType == limitByPerConsumerType ||
|
||||
limitType == limitByPerCookieType {
|
||||
if item.configType == allType || (item.configType == regexpType && item.regexp.MatchString(key)) {
|
||||
return &item
|
||||
}
|
||||
}
|
||||
// 其他类型,直接比较key
|
||||
if item.key == key {
|
||||
return &item
|
||||
}
|
||||
@@ -170,13 +220,13 @@ func findMatchingItem(items []LimitItem, key string) *LimitItem {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getDownStreamIp(config ClusterKeyRateLimitConfig) (net.IP, error) {
|
||||
func getDownStreamIp(rule LimitRuleItem) (net.IP, error) {
|
||||
var (
|
||||
realIpStr string
|
||||
err error
|
||||
)
|
||||
if config.limitByPerIp.sourceType == HeaderSourceType {
|
||||
realIpStr, err = proxywasm.GetHttpRequestHeader(config.limitByPerIp.headerName)
|
||||
if rule.limitByPerIp.sourceType == HeaderSourceType {
|
||||
realIpStr, err = proxywasm.GetHttpRequestHeader(rule.limitByPerIp.headerName)
|
||||
if err == nil {
|
||||
realIpStr = strings.Split(strings.Trim(realIpStr, " "), ",")[0]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user