限流,支持用户.

This commit is contained in:
许晓东
2023-02-05 23:08:14 +08:00
parent 608f7cdc47
commit 5a87e9cad8
9 changed files with 585 additions and 19 deletions

View File

@@ -11,6 +11,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.config.internals.QuotaConfigs;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.springframework.stereotype.Service;
import scala.Tuple2;
import java.util.*;
import java.util.stream.Collectors;
@@ -98,10 +99,16 @@ public class ClientQuotaServiceImpl implements ClientQuotaService {
configsToBeAddedMap.put(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, String.valueOf(Math.floor(Double.valueOf(request.getRequestPercentage()))));
}
clientQuotaConsole.addQuotaConfigs(types, names, configsToBeAddedMap);
Tuple2<Object, String> tuple2 = clientQuotaConsole.addQuotaConfigs(types, names, configsToBeAddedMap);
if (!(Boolean) tuple2._1) {
return ResponseData.create().failed(tuple2._2);
}
if (CollectionUtils.isNotEmpty(request.getDeleteConfigs())) {
List<String> delete = request.getDeleteConfigs().stream().map(key -> configDict.get(key)).collect(Collectors.toList());
clientQuotaConsole.deleteQuotaConfigs(types, names, delete);
Tuple2<Object, String> tuple2Del = clientQuotaConsole.deleteQuotaConfigs(types, names, delete);
if (!(Boolean) tuple2Del._1) {
return ResponseData.create().failed(tuple2Del._2);
}
}
return ResponseData.create().success();
}
@@ -118,7 +125,10 @@ public class ClientQuotaServiceImpl implements ClientQuotaService {
configs.add(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG);
configs.add(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG);
configs.add(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG);
clientQuotaConsole.deleteQuotaConfigs(types, names, configs);
Tuple2<Object, String> tuple2 = clientQuotaConsole.deleteQuotaConfigs(types, names, configs);
if (!(Boolean) tuple2._1) {
return ResponseData.create().failed(tuple2._2);
}
return ResponseData.create().success();
}