客户端限流 service.

This commit is contained in:
许晓东
2023-01-03 22:01:43 +08:00
parent 7d76632f08
commit 4dbadee0d4
5 changed files with 92 additions and 10 deletions

View File

@@ -0,0 +1,29 @@
package com.xuxd.kafka.console.beans.vo;
import lombok.Data;
import org.apache.kafka.common.config.internals.QuotaConfigs;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import java.util.Map;
@Data
public class ClientQuotaEntityVO {
private String entry;
private String consumerRate;
private String producerRate;
private String requestPercentage;
public static ClientQuotaEntityVO from(ClientQuotaEntity entity, String type, Map<String, Object> config) {
ClientQuotaEntityVO entityVO = new ClientQuotaEntityVO();
entityVO.setEntry(entity.entries().get(type));
entityVO.setConsumerRate(config.getOrDefault(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "").toString());
entityVO.setProducerRate(config.getOrDefault(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "").toString());
entityVO.setRequestPercentage(config.getOrDefault(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "").toString());
return entityVO;
}
}

View File

@@ -1,13 +1,6 @@
package com.xuxd.kafka.console.config;
import kafka.console.ClusterConsole;
import kafka.console.ConfigConsole;
import kafka.console.ConsumerConsole;
import kafka.console.KafkaAclConsole;
import kafka.console.KafkaConfigConsole;
import kafka.console.MessageConsole;
import kafka.console.OperationConsole;
import kafka.console.TopicConsole;
import kafka.console.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -52,7 +45,7 @@ public class KafkaConfiguration {
@Bean
public OperationConsole operationConsole(KafkaConfig config, TopicConsole topicConsole,
ConsumerConsole consumerConsole) {
ConsumerConsole consumerConsole) {
return new OperationConsole(config, topicConsole, consumerConsole);
}
@@ -60,4 +53,9 @@ public class KafkaConfiguration {
public MessageConsole messageConsole(KafkaConfig config) {
return new MessageConsole(config);
}
@Bean
public ClientQuotaConsole clientQuotaConsole(KafkaConfig config) {
return new ClientQuotaConsole(config);
}
}

View File

@@ -0,0 +1,10 @@
package com.xuxd.kafka.console.service;
import com.xuxd.kafka.console.beans.vo.ClientQuotaEntityVO;
import java.util.List;
public interface ClientQuotaService {
List<ClientQuotaEntityVO> getClientQuotaConfigs(String type, String name);
}

View File

@@ -0,0 +1,43 @@
package com.xuxd.kafka.console.service.impl;
import com.xuxd.kafka.console.beans.vo.ClientQuotaEntityVO;
import com.xuxd.kafka.console.service.ClientQuotaService;
import kafka.console.ClientQuotaConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Service
public class ClientQuotaServiceImpl implements ClientQuotaService {
private final ClientQuotaConsole clientQuotaConsole;
private final Map<String, String> typeDict = new HashMap<>();
{
typeDict.put("user", ClientQuotaEntity.USER);
typeDict.put("client-id", ClientQuotaEntity.CLIENT_ID);
typeDict.put("ip", ClientQuotaEntity.IP);
}
public ClientQuotaServiceImpl(ClientQuotaConsole clientQuotaConsole) {
this.clientQuotaConsole = clientQuotaConsole;
}
@Override
public List<ClientQuotaEntityVO> getClientQuotaConfigs(String type, String name) {
List<String> entityNames = StringUtils.isNotBlank(name) ? Arrays.asList(name) : Collections.emptyList();
String entityType = typeDict.get(type);
if (StringUtils.isEmpty(entityType)) {
throw new IllegalArgumentException("type不正确" + type);
}
Map<ClientQuotaEntity, Map<String, Object>> clientQuotasConfigs = clientQuotaConsole.getClientQuotasConfigs(Arrays.asList(entityType), entityNames);
return clientQuotasConfigs.entrySet().stream().map(entry -> ClientQuotaEntityVO.from(entry.getKey(), entityType, entry.getValue())).collect(Collectors.toList());
}
}