From 4dbadee0d4a63e1d11b28b404dd9bca597c2d40f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Tue, 3 Jan 2023 22:01:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E9=99=90=E6=B5=81?= =?UTF-8?q?=20service.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../console/beans/vo/ClientQuotaEntityVO.java | 29 +++++++++++++ .../console/config/KafkaConfiguration.java | 16 +++---- .../console/service/ClientQuotaService.java | 10 +++++ .../service/impl/ClientQuotaServiceImpl.java | 43 +++++++++++++++++++ .../console/scala/ClientQuotaConsoleTest.java | 4 +- 5 files changed, 92 insertions(+), 10 deletions(-) create mode 100644 src/main/java/com/xuxd/kafka/console/beans/vo/ClientQuotaEntityVO.java create mode 100644 src/main/java/com/xuxd/kafka/console/service/ClientQuotaService.java create mode 100644 src/main/java/com/xuxd/kafka/console/service/impl/ClientQuotaServiceImpl.java diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/ClientQuotaEntityVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/ClientQuotaEntityVO.java new file mode 100644 index 0000000..a1559eb --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/ClientQuotaEntityVO.java @@ -0,0 +1,29 @@ +package com.xuxd.kafka.console.beans.vo; + +import lombok.Data; +import org.apache.kafka.common.config.internals.QuotaConfigs; +import org.apache.kafka.common.quota.ClientQuotaEntity; + +import java.util.Map; + +@Data +public class ClientQuotaEntityVO { + + private String entry; + + private String consumerRate; + + private String producerRate; + + private String requestPercentage; + + public static ClientQuotaEntityVO from(ClientQuotaEntity entity, String type, Map config) { + ClientQuotaEntityVO entityVO = new ClientQuotaEntityVO(); + entityVO.setEntry(entity.entries().get(type)); + entityVO.setConsumerRate(config.getOrDefault(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "").toString()); + entityVO.setProducerRate(config.getOrDefault(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "").toString()); + entityVO.setRequestPercentage(config.getOrDefault(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "").toString()); + + return entityVO; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java b/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java index c2277fb..90b03a3 100644 --- a/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java +++ b/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java @@ -1,13 +1,6 @@ package com.xuxd.kafka.console.config; -import kafka.console.ClusterConsole; -import kafka.console.ConfigConsole; -import kafka.console.ConsumerConsole; -import kafka.console.KafkaAclConsole; -import kafka.console.KafkaConfigConsole; -import kafka.console.MessageConsole; -import kafka.console.OperationConsole; -import kafka.console.TopicConsole; +import kafka.console.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -52,7 +45,7 @@ public class KafkaConfiguration { @Bean public OperationConsole operationConsole(KafkaConfig config, TopicConsole topicConsole, - ConsumerConsole consumerConsole) { + ConsumerConsole consumerConsole) { return new OperationConsole(config, topicConsole, consumerConsole); } @@ -60,4 +53,9 @@ public class KafkaConfiguration { public MessageConsole messageConsole(KafkaConfig config) { return new MessageConsole(config); } + + @Bean + public ClientQuotaConsole clientQuotaConsole(KafkaConfig config) { + return new ClientQuotaConsole(config); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/ClientQuotaService.java b/src/main/java/com/xuxd/kafka/console/service/ClientQuotaService.java new file mode 100644 index 0000000..2cbec29 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/ClientQuotaService.java @@ -0,0 +1,10 @@ +package com.xuxd.kafka.console.service; + +import com.xuxd.kafka.console.beans.vo.ClientQuotaEntityVO; + +import java.util.List; + +public interface ClientQuotaService { + + List getClientQuotaConfigs(String type, String name); +} diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ClientQuotaServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ClientQuotaServiceImpl.java new file mode 100644 index 0000000..afe5675 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ClientQuotaServiceImpl.java @@ -0,0 +1,43 @@ +package com.xuxd.kafka.console.service.impl; + +import com.xuxd.kafka.console.beans.vo.ClientQuotaEntityVO; +import com.xuxd.kafka.console.service.ClientQuotaService; +import kafka.console.ClientQuotaConsole; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.stream.Collectors; + +@Slf4j +@Service +public class ClientQuotaServiceImpl implements ClientQuotaService { + + private final ClientQuotaConsole clientQuotaConsole; + + private final Map typeDict = new HashMap<>(); + + { + typeDict.put("user", ClientQuotaEntity.USER); + typeDict.put("client-id", ClientQuotaEntity.CLIENT_ID); + typeDict.put("ip", ClientQuotaEntity.IP); + } + + public ClientQuotaServiceImpl(ClientQuotaConsole clientQuotaConsole) { + this.clientQuotaConsole = clientQuotaConsole; + } + + @Override + public List getClientQuotaConfigs(String type, String name) { + List entityNames = StringUtils.isNotBlank(name) ? Arrays.asList(name) : Collections.emptyList(); + String entityType = typeDict.get(type); + if (StringUtils.isEmpty(entityType)) { + throw new IllegalArgumentException("type不正确:" + type); + } + Map> clientQuotasConfigs = clientQuotaConsole.getClientQuotasConfigs(Arrays.asList(entityType), entityNames); + + return clientQuotasConfigs.entrySet().stream().map(entry -> ClientQuotaEntityVO.from(entry.getKey(), entityType, entry.getValue())).collect(Collectors.toList()); + } +} diff --git a/src/test/java/com/xuxd/kafka/console/scala/ClientQuotaConsoleTest.java b/src/test/java/com/xuxd/kafka/console/scala/ClientQuotaConsoleTest.java index 32850c2..abfc014 100644 --- a/src/test/java/com/xuxd/kafka/console/scala/ClientQuotaConsoleTest.java +++ b/src/test/java/com/xuxd/kafka/console/scala/ClientQuotaConsoleTest.java @@ -36,8 +36,10 @@ public class ClientQuotaConsoleTest { config.setBootstrapServer(bootstrapServer); ContextConfigHolder.CONTEXT_CONFIG.set(config); Map configsToBeAddedMap = new HashMap<>(); - configsToBeAddedMap.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "102400"); + configsToBeAddedMap.put(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "1024000000"); console.addQuotaConfigs(Arrays.asList(ClientQuotaEntity.USER), Arrays.asList("user-test"), configsToBeAddedMap); + console.addQuotaConfigs(Arrays.asList(ClientQuotaEntity.USER), Arrays.asList(""), configsToBeAddedMap); + console.deleteQuotaConfigs(Arrays.asList(ClientQuotaEntity.USER), Arrays.asList(""), Arrays.asList(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG)); } }