限流,支持客户端ID查询.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package com.xuxd.kafka.console.beans.vo;
|
||||
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.common.config.internals.QuotaConfigs;
|
||||
import org.apache.kafka.common.quota.ClientQuotaEntity;
|
||||
|
||||
@@ -43,10 +44,39 @@ public class ClientQuotaEntityVO {
|
||||
break;
|
||||
}
|
||||
});
|
||||
entityVO.setConsumerRate(config.getOrDefault(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "").toString());
|
||||
entityVO.setProducerRate(config.getOrDefault(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "").toString());
|
||||
entityVO.setConsumerRate(convert(config.getOrDefault(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, "")));
|
||||
entityVO.setProducerRate(convert(config.getOrDefault(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, "")));
|
||||
entityVO.setRequestPercentage(config.getOrDefault(QuotaConfigs.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, "").toString());
|
||||
|
||||
return entityVO;
|
||||
}
|
||||
|
||||
|
||||
public static String convert(Object num) {
|
||||
if (num == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (num instanceof String) {
|
||||
if ((StringUtils.isBlank((String) num))) {
|
||||
return (String) num;
|
||||
}
|
||||
}
|
||||
|
||||
if (num instanceof Number) {
|
||||
Number number = (Number) num;
|
||||
double value = number.doubleValue();
|
||||
double _1kb = 1024;
|
||||
double _1mb = 1024 * _1kb;
|
||||
if (value < _1kb) {
|
||||
return value + "Byte";
|
||||
}
|
||||
if (value < _1mb) {
|
||||
return String.format("%.1f KB", (value / _1kb));
|
||||
}
|
||||
if (value >= _1mb) {
|
||||
return String.format("%.1f MB", (value / _1mb));
|
||||
}
|
||||
}
|
||||
return String.valueOf(num);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,28 +6,25 @@ import com.xuxd.kafka.console.config.ContextConfig;
|
||||
import com.xuxd.kafka.console.config.ContextConfigHolder;
|
||||
import com.xuxd.kafka.console.dao.ClusterInfoMapper;
|
||||
import com.xuxd.kafka.console.utils.ConvertUtil;
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import javax.servlet.Filter;
|
||||
import javax.servlet.FilterChain;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletRequest;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.annotation.WebFilter;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.MediaType;
|
||||
|
||||
import javax.servlet.*;
|
||||
import javax.servlet.annotation.WebFilter;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2022-01-05 19:56:25
|
||||
**/
|
||||
@WebFilter(filterName = "context-set-filter", urlPatterns = {"/acl/*","/user/*","/cluster/*","/config/*","/consumer/*","/message/*","/topic/*","/op/*"})
|
||||
@WebFilter(filterName = "context-set-filter", urlPatterns = {"/acl/*", "/user/*", "/cluster/*", "/config/*", "/consumer/*", "/message/*", "/topic/*", "/op/*", "/client/*"})
|
||||
@Slf4j
|
||||
public class ContextSetFilter implements Filter {
|
||||
|
||||
@@ -42,8 +39,9 @@ public class ContextSetFilter implements Filter {
|
||||
@Autowired
|
||||
private ClusterInfoMapper clusterInfoMapper;
|
||||
|
||||
@Override public void doFilter(ServletRequest req, ServletResponse response,
|
||||
FilterChain chain) throws IOException, ServletException {
|
||||
@Override
|
||||
public void doFilter(ServletRequest req, ServletResponse response,
|
||||
FilterChain chain) throws IOException, ServletException {
|
||||
try {
|
||||
HttpServletRequest request = (HttpServletRequest) req;
|
||||
String uri = request.getRequestURI();
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.xuxd.kafka.console.service;
|
||||
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.dto.AlterClientQuotaDTO;
|
||||
import com.xuxd.kafka.console.beans.vo.ClientQuotaEntityVO;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -10,7 +10,7 @@ import java.util.List;
|
||||
*/
|
||||
public interface ClientQuotaService {
|
||||
|
||||
List<ClientQuotaEntityVO> getClientQuotaConfigs(List<String> types, List<String> names);
|
||||
ResponseData getClientQuotaConfigs(List<String> types, List<String> names);
|
||||
|
||||
Object alterClientQuotaConfigs(AlterClientQuotaDTO request);
|
||||
ResponseData alterClientQuotaConfigs(AlterClientQuotaDTO request);
|
||||
}
|
||||
|
||||
@@ -35,34 +35,39 @@ public class ClientQuotaServiceImpl implements ClientQuotaService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ClientQuotaEntityVO> getClientQuotaConfigs(List<String> types, List<String> names) {
|
||||
public ResponseData getClientQuotaConfigs(List<String> types, List<String> names) {
|
||||
List<String> entityNames = names == null ? Collections.emptyList() : new ArrayList<>(names);
|
||||
List<String> entityTypes = types.stream().map(e -> typeDict.get(e)).filter(e -> e != null).collect(Collectors.toList());
|
||||
if (entityTypes.isEmpty() || entityTypes.size() != types.size()) {
|
||||
throw new IllegalArgumentException("types illegal.");
|
||||
}
|
||||
|
||||
boolean userWithClientFilterClientOnly = false;
|
||||
boolean userAndClientFilterClientOnly = false;
|
||||
// only type: [user and client-id], type.size == 2
|
||||
if (entityTypes.size() == 2) {
|
||||
if (names.size() == 2 && StringUtils.isBlank(names.get(0)) && StringUtils.isNotBlank(names.get(1))) {
|
||||
userWithClientFilterClientOnly = true;
|
||||
userAndClientFilterClientOnly = true;
|
||||
}
|
||||
}
|
||||
Map<ClientQuotaEntity, Map<String, Object>> clientQuotasConfigs = clientQuotaConsole.getClientQuotasConfigs(entityTypes,
|
||||
userWithClientFilterClientOnly ? Collections.emptyList() : entityNames);
|
||||
userAndClientFilterClientOnly ? Collections.emptyList() : entityNames);
|
||||
|
||||
List<ClientQuotaEntityVO> voList = clientQuotasConfigs.entrySet().stream().map(entry -> ClientQuotaEntityVO.from(
|
||||
entry.getKey(), entityTypes, entry.getValue())).collect(Collectors.toList());
|
||||
if (!userWithClientFilterClientOnly) {
|
||||
return voList;
|
||||
if (!userAndClientFilterClientOnly) {
|
||||
return ResponseData.create().data(voList).success();
|
||||
}
|
||||
return voList.stream().filter(e -> names.get(1).equals(e.getClient())).collect(Collectors.toList());
|
||||
List<ClientQuotaEntityVO> list = voList.stream().filter(e -> names.get(1).equals(e.getClient())).collect(Collectors.toList());
|
||||
|
||||
return ResponseData.create().data(list).success();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object alterClientQuotaConfigs(AlterClientQuotaDTO request) {
|
||||
public ResponseData alterClientQuotaConfigs(AlterClientQuotaDTO request) {
|
||||
|
||||
return ResponseData.create().failed();
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user