diff --git a/src/main/java/com/xuxd/kafka/console/beans/dos/KafkaUserDO.java b/src/main/java/com/xuxd/kafka/console/beans/dos/KafkaUserDO.java index 53e09c5..a9b5b5e 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/dos/KafkaUserDO.java +++ b/src/main/java/com/xuxd/kafka/console/beans/dos/KafkaUserDO.java @@ -23,4 +23,6 @@ public class KafkaUserDO { private String password; private String updateTime; + + private Long clusterInfoId; } diff --git a/src/main/java/com/xuxd/kafka/console/config/ContextConfig.java b/src/main/java/com/xuxd/kafka/console/config/ContextConfig.java index 9b9a030..29a51c7 100644 --- a/src/main/java/com/xuxd/kafka/console/config/ContextConfig.java +++ b/src/main/java/com/xuxd/kafka/console/config/ContextConfig.java @@ -1,6 +1,7 @@ package com.xuxd.kafka.console.config; import java.util.Properties; +import org.apache.kafka.clients.CommonClientConfigs; /** * kafka-console-ui. @@ -12,6 +13,10 @@ public class ContextConfig { public static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000; + private Long clusterInfoId; + + private String clusterName; + private String bootstrapServer; private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS; @@ -27,7 +32,8 @@ public class ContextConfig { } public int getRequestTimeoutMs() { - return requestTimeoutMs; + return properties.containsKey(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG) ? + Integer.parseInt(properties.getProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG)) : requestTimeoutMs; } public void setRequestTimeoutMs(int requestTimeoutMs) { @@ -38,6 +44,22 @@ public class ContextConfig { return properties; } + public Long getClusterInfoId() { + return clusterInfoId; + } + + public void setClusterInfoId(Long clusterInfoId) { + this.clusterInfoId = clusterInfoId; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + public void setProperties(Properties properties) { this.properties = properties; } diff --git a/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java b/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java index bded9ff..f3a0592 100644 --- a/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java +++ b/src/main/java/com/xuxd/kafka/console/config/KafkaConfig.java @@ -16,24 +16,8 @@ public class KafkaConfig { private String bootstrapServer; - private int requestTimeoutMs; - - private String securityProtocol; - - private String saslMechanism; - - private String saslJaasConfig; - - private String adminUsername; - - private String adminPassword; - - private boolean adminCreate; - private String zookeeperAddr; - private boolean enableAcl; - private Properties properties; public String getBootstrapServer() { @@ -44,62 +28,6 @@ public class KafkaConfig { this.bootstrapServer = bootstrapServer; } - public int getRequestTimeoutMs() { - return requestTimeoutMs; - } - - public void setRequestTimeoutMs(int requestTimeoutMs) { - this.requestTimeoutMs = requestTimeoutMs; - } - - public String getSecurityProtocol() { - return securityProtocol; - } - - public void setSecurityProtocol(String securityProtocol) { - this.securityProtocol = securityProtocol; - } - - public String getSaslMechanism() { - return saslMechanism; - } - - public void setSaslMechanism(String saslMechanism) { - this.saslMechanism = saslMechanism; - } - - public String getSaslJaasConfig() { - return saslJaasConfig; - } - - public void setSaslJaasConfig(String saslJaasConfig) { - this.saslJaasConfig = saslJaasConfig; - } - - public String getAdminUsername() { - return adminUsername; - } - - public void setAdminUsername(String adminUsername) { - this.adminUsername = adminUsername; - } - - public String getAdminPassword() { - return adminPassword; - } - - public void setAdminPassword(String adminPassword) { - this.adminPassword = adminPassword; - } - - public boolean isAdminCreate() { - return adminCreate; - } - - public void setAdminCreate(boolean adminCreate) { - this.adminCreate = adminCreate; - } - public String getZookeeperAddr() { return zookeeperAddr; } @@ -108,14 +36,6 @@ public class KafkaConfig { this.zookeeperAddr = zookeeperAddr; } - public boolean isEnableAcl() { - return enableAcl; - } - - public void setEnableAcl(boolean enableAcl) { - this.enableAcl = enableAcl; - } - public Properties getProperties() { return properties; } diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java b/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java index 3c2983f..c7104dd 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConfigController.java @@ -36,7 +36,7 @@ public class ConfigController { this.configService = configService; } - @GetMapping + @GetMapping("/console") public Object getConfig() { return ResponseData.create().data(configMap).success(); } diff --git a/src/main/java/com/xuxd/kafka/console/interceptor/ContextSetFilter.java b/src/main/java/com/xuxd/kafka/console/interceptor/ContextSetFilter.java index ab3809e..43421c2 100644 --- a/src/main/java/com/xuxd/kafka/console/interceptor/ContextSetFilter.java +++ b/src/main/java/com/xuxd/kafka/console/interceptor/ContextSetFilter.java @@ -27,7 +27,7 @@ import org.springframework.http.MediaType; * @author xuxd * @date 2022-01-05 19:56:25 **/ -@WebFilter(filterName = "context-set-filter", urlPatterns = {"/*"}) +@WebFilter(filterName = "context-set-filter", urlPatterns = {"/acl/*","/user/*","/cluster/*","/config/*","/consumer/*","/message/*","/topic/*","/op/*"}) @Slf4j public class ContextSetFilter implements Filter { @@ -36,6 +36,7 @@ public class ContextSetFilter implements Filter { { excludes.add("/cluster/info/peek"); excludes.add("/cluster/info"); + excludes.add("/config/console"); } @Autowired @@ -49,16 +50,27 @@ public class ContextSetFilter implements Filter { if (!excludes.contains(uri)) { String headerId = request.getHeader(Header.ID); if (StringUtils.isBlank(headerId)) { - ResponseData failed = ResponseData.create().failed("Cluster id is null."); - response.setContentType(MediaType.APPLICATION_JSON_VALUE); - response.getOutputStream().println(ConvertUtil.toJsonString(failed)); +// ResponseData failed = ResponseData.create().failed("Cluster info is null."); + ResponseData failed = ResponseData.create().failed("没有集群信息,请先切换集群"); + response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); + response.getWriter().println(ConvertUtil.toJsonString(failed)); return; } else { ClusterInfoDO infoDO = clusterInfoMapper.selectById(Long.valueOf(headerId)); + if (infoDO == null) { + ResponseData failed = ResponseData.create().failed("该集群找不到信息,请切换一个有效集群"); + response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); + response.getWriter().println(ConvertUtil.toJsonString(failed)); + return; + } ContextConfig config = new ContextConfig(); + config.setClusterInfoId(infoDO.getId()); + config.setClusterName(infoDO.getClusterName()); config.setBootstrapServer(infoDO.getAddress()); - config.setProperties(ConvertUtil.toProperties(infoDO.getProperties())); + if (StringUtils.isNotBlank(infoDO.getProperties())) { + config.setProperties(ConvertUtil.toProperties(infoDO.getProperties())); + } ContextConfigHolder.CONTEXT_CONFIG.set(config); } } diff --git a/src/main/java/com/xuxd/kafka/console/schedule/KafkaAclSchedule.java b/src/main/java/com/xuxd/kafka/console/schedule/KafkaAclSchedule.java index b49414a..fcacc0a 100644 --- a/src/main/java/com/xuxd/kafka/console/schedule/KafkaAclSchedule.java +++ b/src/main/java/com/xuxd/kafka/console/schedule/KafkaAclSchedule.java @@ -1,9 +1,22 @@ package com.xuxd.kafka.console.schedule; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.xuxd.kafka.console.beans.dos.ClusterInfoDO; +import com.xuxd.kafka.console.beans.dos.KafkaUserDO; +import com.xuxd.kafka.console.config.ContextConfig; +import com.xuxd.kafka.console.config.ContextConfigHolder; +import com.xuxd.kafka.console.dao.ClusterInfoMapper; import com.xuxd.kafka.console.dao.KafkaUserMapper; +import com.xuxd.kafka.console.utils.ConvertUtil; +import com.xuxd.kafka.console.utils.SaslUtil; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import kafka.console.KafkaConfigConsole; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -21,25 +34,58 @@ public class KafkaAclSchedule { private final KafkaConfigConsole configConsole; - public KafkaAclSchedule(KafkaUserMapper userMapper, KafkaConfigConsole configConsole) { - this.userMapper = userMapper; - this.configConsole = configConsole; + private final ClusterInfoMapper clusterInfoMapper; + + public KafkaAclSchedule(ObjectProvider userMapper, + ObjectProvider configConsole, ObjectProvider clusterInfoMapper) { + this.userMapper = userMapper.getIfAvailable(); + this.configConsole = configConsole.getIfAvailable(); + this.clusterInfoMapper = clusterInfoMapper.getIfAvailable(); } @Scheduled(cron = "${cron.clear-dirty-user}") public void clearDirtyKafkaUser() { - log.info("Start clear dirty data for kafka user from database."); - Set userSet = configConsole.getUserList(null); - userMapper.selectList(null).forEach(u -> { - if (!userSet.contains(u.getUsername())) { - log.info("clear user: {} from database.", u.getUsername()); - try { - userMapper.deleteById(u.getId()); - } catch (Exception e) { - log.error("userMapper.deleteById error, user: " + u, e); + try { + log.info("Start clear dirty data for kafka user from database."); + List clusterInfoDOS = clusterInfoMapper.selectList(null); + List clusterInfoIds = new ArrayList<>(); + for (ClusterInfoDO infoDO : clusterInfoDOS) { + ContextConfig config = new ContextConfig(); + config.setClusterInfoId(infoDO.getId()); + config.setClusterName(infoDO.getClusterName()); + + config.setBootstrapServer(infoDO.getAddress()); + if (StringUtils.isNotBlank(infoDO.getProperties())) { + config.setProperties(ConvertUtil.toProperties(infoDO.getProperties())); + } + ContextConfigHolder.CONTEXT_CONFIG.set(config); + if (SaslUtil.isEnableSasl() && SaslUtil.isEnableScram()) { + log.info("Start clear cluster: {}", infoDO.getClusterName()); + Set userSet = configConsole.getUserList(null); + QueryWrapper queryWrapper = new QueryWrapper<>(); + queryWrapper.eq("cluster_info_id", infoDO.getId()); + userMapper.selectList(queryWrapper).forEach(u -> { + if (!userSet.contains(u.getUsername())) { + log.info("clear user: {} from database.", u.getUsername()); + try { + userMapper.deleteById(u.getId()); + } catch (Exception e) { + log.error("userMapper.deleteById error, user: " + u, e); + } + } + }); + clusterInfoIds.add(infoDO.getId()); } } - }); - log.info("Clear end."); + if (CollectionUtils.isNotEmpty(clusterInfoIds)) { + log.info("Clear the cluster id {}, which not found.", clusterInfoIds); + QueryWrapper wrapper = new QueryWrapper<>(); + wrapper.notIn("cluster_info_id", clusterInfoIds); + userMapper.delete(wrapper); + } + log.info("Clear end."); + } finally { + ContextConfigHolder.CONTEXT_CONFIG.remove(); + } } } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java index f8c1ea2..81a5cc3 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/AclServiceImpl.java @@ -6,29 +6,37 @@ import com.xuxd.kafka.console.beans.CounterMap; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.dos.KafkaUserDO; import com.xuxd.kafka.console.beans.vo.KafkaUserDetailVO; -import com.xuxd.kafka.console.config.KafkaConfig; +import com.xuxd.kafka.console.config.ContextConfigHolder; import com.xuxd.kafka.console.dao.KafkaUserMapper; import com.xuxd.kafka.console.service.AclService; +import com.xuxd.kafka.console.utils.SaslUtil; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import kafka.console.KafkaAclConsole; import kafka.console.KafkaConfigConsole; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.ScramMechanism; import org.apache.kafka.clients.admin.UserScramCredentialsDescription; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import scala.Tuple2; +import static com.xuxd.kafka.console.utils.SaslUtil.isEnableSasl; +import static com.xuxd.kafka.console.utils.SaslUtil.isEnableScram; + /** * kafka-console-ui. * @@ -37,7 +45,7 @@ import scala.Tuple2; **/ @Slf4j @Service -public class AclServiceImpl implements AclService, SmartInitializingSingleton { +public class AclServiceImpl implements AclService { @Autowired private KafkaConfigConsole configConsole; @@ -45,9 +53,6 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton { @Autowired private KafkaAclConsole aclConsole; - @Autowired - private KafkaConfig kafkaConfig; - private final KafkaUserMapper kafkaUserMapper; public AclServiceImpl(ObjectProvider kafkaUserMapper) { @@ -64,15 +69,23 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton { } @Override public ResponseData addOrUpdateUser(String name, String pass) { + if (!isEnableSasl()) { + return ResponseData.create().failed("Only support SASL protocol."); + } + if (!isEnableScram()) { + return ResponseData.create().failed("Only support SASL_SCRAM."); + } log.info("add or update user, username: {}, password: {}", name, pass); - if (!configConsole.addOrUpdateUser(name, pass)) { + Tuple2 tuple2 = configConsole.addOrUpdateUser(name, pass); + if (!(boolean) tuple2._1()) { log.error("add user to kafka failed."); - return ResponseData.create().failed("add user to kafka failed"); + return ResponseData.create().failed("add user to kafka failed: " + tuple2._2()); } // save user info to database. KafkaUserDO userDO = new KafkaUserDO(); userDO.setUsername(name); userDO.setPassword(pass); + userDO.setClusterInfoId(ContextConfigHolder.CONTEXT_CONFIG.get().getClusterInfoId()); try { Map map = new HashMap<>(); map.put("username", name); @@ -86,12 +99,24 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton { } @Override public ResponseData deleteUser(String name) { + if (!isEnableSasl()) { + return ResponseData.create().failed("Only support SASL protocol."); + } + if (!isEnableScram()) { + return ResponseData.create().failed("Only support SASL_SCRAM."); + } log.info("delete user: {}", name); Tuple2 tuple2 = configConsole.deleteUser(name); return (boolean) tuple2._1() ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); } @Override public ResponseData deleteUserAndAuth(String name) { + if (!isEnableSasl()) { + return ResponseData.create().failed("Only support SASL protocol."); + } + if (!isEnableScram()) { + return ResponseData.create().failed("Only support SASL_SCRAM."); + } log.info("delete user and authority: {}", name); AclEntry entry = new AclEntry(); entry.setPrincipal(name); @@ -120,7 +145,8 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton { Map resultMap = new HashMap<>(); entryMap.forEach((k, v) -> { Map> map = v.stream().collect(Collectors.groupingBy(e -> e.getResourceType() + "#" + e.getName())); - if (k.equals(kafkaConfig.getAdminUsername())) { + String username = SaslUtil.findUsername(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG)); + if (k.equals(username)) { Map map2 = new HashMap<>(map); Map userMap = new HashMap<>(); userMap.put("role", "admin"); @@ -133,7 +159,8 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton { detailList.values().forEach(u -> { if (!resultMap.containsKey(u.name()) && !u.credentialInfos().isEmpty()) { - if (!u.name().equals(kafkaConfig.getAdminUsername())) { + String username = SaslUtil.findUsername(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG)); + if (!u.name().equals(username)) { resultMap.put(u.name(), Collections.emptyMap()); } else { Map map2 = new HashMap<>(); @@ -194,27 +221,29 @@ public class AclServiceImpl implements AclService, SmartInitializingSingleton { } Map param = new HashMap<>(); param.put("username", username); + param.put("cluster_info_id", ContextConfigHolder.CONTEXT_CONFIG.get().getClusterInfoId()); List dos = kafkaUserMapper.selectByMap(param); if (dos.isEmpty()) { vo.setConsistencyDescription("Password is null."); } else { vo.setPassword(dos.stream().findFirst().get().getPassword()); // check for consistency. - boolean consistent = configConsole.isPassConsistent(username, vo.getPassword()); - vo.setConsistencyDescription(consistent ? "Consistent" : "Password is not consistent."); +// boolean consistent = configConsole.isPassConsistent(username, vo.getPassword()); +// vo.setConsistencyDescription(consistent ? "Consistent" : "Password is not consistent."); + vo.setConsistencyDescription("Can not check password consistent."); } return ResponseData.create().data(vo).success(); } - @Override public void afterSingletonsInstantiated() { - if (kafkaConfig.isEnableAcl() && kafkaConfig.isAdminCreate()) { - log.info("Start create admin user, username: {}, password: {}", kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword()); - boolean done = configConsole.addOrUpdateUserWithZK(kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword()); - if (!done) { - log.error("Create admin failed."); - throw new IllegalStateException(); - } - } - } +// @Override public void afterSingletonsInstantiated() { +// if (kafkaConfig.isEnableAcl() && kafkaConfig.isAdminCreate()) { +// log.info("Start create admin user, username: {}, password: {}", kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword()); +// boolean done = configConsole.addOrUpdateUserWithZK(kafkaConfig.getAdminUsername(), kafkaConfig.getAdminPassword()); +// if (!done) { +// log.error("Create admin failed."); +// throw new IllegalStateException(); +// } +// } +// } } diff --git a/src/main/java/com/xuxd/kafka/console/utils/SaslUtil.java b/src/main/java/com/xuxd/kafka/console/utils/SaslUtil.java new file mode 100644 index 0000000..dfb8ffc --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/utils/SaslUtil.java @@ -0,0 +1,61 @@ +package com.xuxd.kafka.console.utils; + +import com.xuxd.kafka.console.config.ContextConfigHolder; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.ScramMechanism; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2022-01-06 11:07:41 + **/ +public class SaslUtil { + + public static final Pattern JAAS_PATTERN = Pattern.compile("^.*(username=\"(.*)\"[ \t]+).*$"); + + private SaslUtil() { + } + + public static String findUsername(String saslJaasConfig) { + Matcher matcher = JAAS_PATTERN.matcher(saslJaasConfig); + return matcher.find() ? matcher.group(2) : ""; + } + + public static boolean isEnableSasl() { + Properties properties = ContextConfigHolder.CONTEXT_CONFIG.get().getProperties(); + if (!properties.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) { + return false; + } + String s = properties.getProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); + SecurityProtocol protocol = SecurityProtocol.valueOf(s); + switch (protocol) { + case SASL_SSL: + case SASL_PLAINTEXT: + return true; + default: + return false; + } + } + + + public static boolean isEnableScram() { + Properties properties = ContextConfigHolder.CONTEXT_CONFIG.get().getProperties(); + if (!properties.containsKey(SaslConfigs.SASL_MECHANISM)) { + return false; + } + String s = properties.getProperty(SaslConfigs.SASL_MECHANISM); + ScramMechanism mechanism = ScramMechanism.fromMechanismName(s); + switch (mechanism) { + case UNKNOWN: + return false; + default: + return true; + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 5290308..63f6641 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,23 +6,12 @@ server: kafka: config: - # kafka broker地址,多个以逗号分隔 - bootstrap-server: 'localhost:9092' - request-timeout-ms: 5000 - # 服务端是否启用acl,如果不启用,下面的所有配置都忽略即可,只用配置上面的Kafka集群地址就行了 - enable-acl: false - # 只支持2种安全协议SASL_PLAINTEXT和PLAINTEXT,启用acl则设置为SASL_PLAINTEXT,不启用acl不需关心这个配置 - security-protocol: SASL_PLAINTEXT - sasl-mechanism: SCRAM-SHA-256 - # 超级管理员用户名,在broker上已经配置为超级管理员 - admin-username: admin - # 超级管理员密码 - admin-password: admin - # 启动自动创建配置的超级管理员用户 - admin-create: false - # broker连接的zk地址,如果启动自动创建配置的超级管理员用户则必须配置,否则忽略 - zookeeper-addr: 'localhost:2181' - sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${kafka.config.admin-username}" password="${kafka.config.admin-password}"; + # 如果不存在default集群,启动的时候默认会把这个加载进来(如果这里配置集群地址了),如果已经存在,则不加载 + # kafka broker地址,多个以逗号分隔,不是必须在这里配置,也可以启动之后,在页面上添加集群信息 + bootstrap-server: + # 集群其它属性配置 + properties: +# request.timeout.ms: 5000 spring: application: @@ -46,6 +35,7 @@ spring: logging: home: ./ +# 基于scram方案的acl,这里会记录创建的用户密码等信息,定时扫描,如果集群中已经不存在这些用户,就把这些信息从db中清除掉 cron: # clear-dirty-user: 0 * * * * ? clear-dirty-user: 0 0 1 * * ? diff --git a/src/main/resources/db/schema-h2.sql b/src/main/resources/db/schema-h2.sql index a85429d..b67d667 100644 --- a/src/main/resources/db/schema-h2.sql +++ b/src/main/resources/db/schema-h2.sql @@ -3,10 +3,11 @@ -- kafka ACL启用SASL_SCRAM中的用户 CREATE TABLE IF NOT EXISTS T_KAFKA_USER ( - ID IDENTITY NOT NULL COMMENT '主键ID', - USERNAME VARCHAR(50) NOT NULL DEFAULT '' COMMENT '用户名', - PASSWORD VARCHAR(50) NOT NULL DEFAULT '' COMMENT '密码', - UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间', + ID IDENTITY NOT NULL COMMENT '主键ID', + USERNAME VARCHAR(50) NOT NULL DEFAULT '' COMMENT '用户名', + PASSWORD VARCHAR(50) NOT NULL DEFAULT '' COMMENT '密码', + UPDATE_TIME TIMESTAMP NOT NULL DEFAULT NOW() COMMENT '更新时间', + CLUSTER_INFO_ID BIGINT NOT NULL COMMENT '集群信息里的集群ID', PRIMARY KEY (ID), UNIQUE (USERNAME) ); diff --git a/src/main/scala/kafka/console/ClusterConsole.scala b/src/main/scala/kafka/console/ClusterConsole.scala index bafe6ff..a9e3cf8 100644 --- a/src/main/scala/kafka/console/ClusterConsole.scala +++ b/src/main/scala/kafka/console/ClusterConsole.scala @@ -2,9 +2,8 @@ package kafka.console import java.util.Collections import java.util.concurrent.TimeUnit - import com.xuxd.kafka.console.beans.{BrokerNode, ClusterInfo} -import com.xuxd.kafka.console.config.KafkaConfig +import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig} import org.apache.kafka.clients.admin.DescribeClusterResult import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava, SetHasAsScala} @@ -19,6 +18,7 @@ class ClusterConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf def clusterInfo(): ClusterInfo = { withAdminClientAndCatchError(admin => { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val clusterResult: DescribeClusterResult = admin.describeCluster() val clusterInfo = new ClusterInfo clusterInfo.setClusterId(clusterResult.clusterId().get(timeoutMs, TimeUnit.MILLISECONDS)) diff --git a/src/main/scala/kafka/console/ConfigConsole.scala b/src/main/scala/kafka/console/ConfigConsole.scala index 6232eff..77a4f63 100644 --- a/src/main/scala/kafka/console/ConfigConsole.scala +++ b/src/main/scala/kafka/console/ConfigConsole.scala @@ -3,8 +3,7 @@ package kafka.console import java.util import java.util.Collections import java.util.concurrent.TimeUnit - -import com.xuxd.kafka.console.config.KafkaConfig +import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig} import kafka.console.ConfigConsole.BrokerLoggerConfigType import kafka.server.ConfigType import org.apache.kafka.clients.admin.{AlterConfigOp, Config, ConfigEntry, DescribeConfigsOptions} @@ -69,6 +68,7 @@ class ConfigConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfi val configResource = new ConfigResource(getResourceTypeAndValidate(entityType, entityName), entityName) val config = Map(configResource -> Collections.singletonList(new AlterConfigOp(entry, opType)).asInstanceOf[util.Collection[AlterConfigOp]]) + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() admin.incrementalAlterConfigs(config.asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS) (true, "") }, e => { diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala index d13d415..631fd98 100644 --- a/src/main/scala/kafka/console/ConsumerConsole.scala +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -1,6 +1,6 @@ package kafka.console -import com.xuxd.kafka.console.config.KafkaConfig +import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig} import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata, OffsetResetStrategy} @@ -75,6 +75,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon val endOffsets = commitOffsets.keySet.map { topicPartition => topicPartition -> OffsetSpec.latest }.toMap + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() admin.listOffsets(endOffsets.asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS) }, e => { log.error("listOffsets error.", e) @@ -166,6 +167,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon def resetPartitionToTargetOffset(groupId: String, partition: TopicPartition, offset: Long): (Boolean, String) = { withAdminClientAndCatchError(admin => { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() admin.alterConsumerGroupOffsets(groupId, Map(partition -> new OffsetAndMetadata(offset)).asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS) (true, "") }, e => { @@ -178,7 +180,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon timestamp: java.lang.Long): (Boolean, String) = { withAdminClientAndCatchError(admin => { val logOffsets = getLogTimestampOffsets(admin, groupId, topicPartitions.asScala, timestamp) - + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() admin.alterConsumerGroupOffsets(groupId, logOffsets.asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS) (true, "") }, e => { @@ -256,6 +258,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon val timestampOffsets = topicPartitions.map { topicPartition => topicPartition -> OffsetSpec.forTimestamp(timestamp) }.toMap + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val offsets = admin.listOffsets( timestampOffsets.asJava, new ListOffsetsOptions().timeoutMs(timeoutMs) @@ -280,6 +283,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon val endOffsets = topicPartitions.map { topicPartition => topicPartition -> OffsetSpec.latest }.toMap + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val offsets = admin.listOffsets( endOffsets.asJava, new ListOffsetsOptions().timeoutMs(timeoutMs) diff --git a/src/main/scala/kafka/console/KafkaAclConsole.scala b/src/main/scala/kafka/console/KafkaAclConsole.scala index 0fd0c3e..6ece1a8 100644 --- a/src/main/scala/kafka/console/KafkaAclConsole.scala +++ b/src/main/scala/kafka/console/KafkaAclConsole.scala @@ -3,9 +3,8 @@ package kafka.console import java.util import java.util.concurrent.TimeUnit import java.util.{Collections, List} - import com.xuxd.kafka.console.beans.AclEntry -import com.xuxd.kafka.console.config.KafkaConfig +import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig} import org.apache.commons.lang3.StringUtils import org.apache.kafka.common.acl._ import org.apache.kafka.common.resource.{ResourcePattern, ResourcePatternFilter, ResourceType} @@ -58,6 +57,7 @@ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon def addAcl(acls: List[AclBinding]): Boolean = { withAdminClient(adminClient => { try { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() adminClient.createAcls(acls).all().get(timeoutMs, TimeUnit.MILLISECONDS) true } catch { @@ -100,6 +100,7 @@ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon def deleteAcl(entry: AclEntry, allResource: Boolean, allPrincipal: Boolean, allOperation: Boolean): Boolean = { withAdminClient(adminClient => { try { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val result = adminClient.deleteAcls(Collections.singleton(entry.toAclBindingFilter(allResource, allPrincipal, allOperation))).all().get(timeoutMs, TimeUnit.MILLISECONDS) log.info("delete acl: {}", result) true @@ -113,6 +114,7 @@ class KafkaAclConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon def deleteAcl(filters: util.Collection[AclBindingFilter]): Boolean = { withAdminClient(adminClient => { try { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val result = adminClient.deleteAcls(filters).all().get(timeoutMs, TimeUnit.MILLISECONDS) log.info("delete acl: {}", result) true diff --git a/src/main/scala/kafka/console/KafkaConfigConsole.scala b/src/main/scala/kafka/console/KafkaConfigConsole.scala index 69464e3..f7ac72a 100644 --- a/src/main/scala/kafka/console/KafkaConfigConsole.scala +++ b/src/main/scala/kafka/console/KafkaConfigConsole.scala @@ -1,16 +1,16 @@ package kafka.console +import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig} +import kafka.server.ConfigType +import kafka.utils.Implicits.PropertiesOps +import org.apache.kafka.clients.admin._ +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter} + import java.security.MessageDigest import java.util import java.util.concurrent.TimeUnit import java.util.{Properties, Set} - -import com.xuxd.kafka.console.config.KafkaConfig -import kafka.server.ConfigType -import kafka.utils.Implicits.PropertiesOps -import org.apache.kafka.clients.admin._ -import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter} - import scala.jdk.CollectionConverters.{CollectionHasAsScala, DictionaryHasAsScala, SeqHasAsJava} /** @@ -35,31 +35,32 @@ class KafkaConfigConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka }).asInstanceOf[util.Map[String, UserScramCredentialsDescription]] } - def addOrUpdateUser(name: String, pass: String): Boolean = { + def addOrUpdateUser(name: String, pass: String): (Boolean, String) = { withAdminClient(adminClient => { try { - adminClient.alterUserScramCredentials(util.Arrays.asList( - new UserScramCredentialUpsertion(name, - new ScramCredentialInfo(ScramMechanism.fromMechanismName(config.getSaslMechanism), defaultIterations), pass))) - .all().get(timeoutMs, TimeUnit.MILLISECONDS) - true + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() + val mechanisms = ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_MECHANISM).split(",").toSeq + val scrams = mechanisms.map(m => new UserScramCredentialUpsertion(name, + new ScramCredentialInfo(ScramMechanism.fromMechanismName(m), defaultIterations), pass)) + adminClient.alterUserScramCredentials(scrams.asInstanceOf[Seq[UserScramCredentialAlteration]].asJava).all().get(timeoutMs, TimeUnit.MILLISECONDS) + (true, "") } catch { case ex: Exception => log.error("addOrUpdateUser error", ex) - false + (false, ex.getMessage) } - }).asInstanceOf[Boolean] + }).asInstanceOf[(Boolean, String)] } def addOrUpdateUserWithZK(name: String, pass: String): Boolean = { withZKClient(adminZkClient => { try { - val credential = new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(config.getSaslMechanism)) + val credential = new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_MECHANISM))) .generateCredential(pass, defaultIterations) val credentialStr = ScramCredentialUtils.credentialToString(credential) val userConfig: Properties = new Properties() - userConfig.put(config.getSaslMechanism, credentialStr) + userConfig.put(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties().getProperty(SaslConfigs.SASL_MECHANISM), credentialStr) val configs = adminZkClient.fetchEntityConfig(ConfigType.User, name) userConfig ++= configs @@ -101,6 +102,7 @@ class KafkaConfigConsole(config: KafkaConfig) extends KafkaConsole(config: Kafka // .all().get(timeoutMs, TimeUnit.MILLISECONDS) // all delete val userDetail = getUserDetailList(util.Collections.singletonList(name)) + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() userDetail.values().asScala.foreach(u => { adminClient.alterUserScramCredentials(u.credentialInfos().asScala.map(s => new UserScramCredentialDeletion(u.name(), s.mechanism()) .asInstanceOf[UserScramCredentialAlteration]).toList.asJava) diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index dac21c5..88f01a7 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -2,12 +2,10 @@ package kafka.console import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig} import kafka.zk.{AdminZkClient, KafkaZkClient} -import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringSerializer} import org.apache.kafka.common.utils.Time @@ -25,7 +23,7 @@ import scala.jdk.CollectionConverters.{MapHasAsJava, MapHasAsScala} * */ class KafkaConsole(config: KafkaConfig) { - protected val timeoutMs: Int = config.getRequestTimeoutMs +// protected val timeoutMs: Int = config.getRequestTimeoutMs protected def withAdminClient(f: Admin => Any): Any = { @@ -108,7 +106,7 @@ class KafkaConsole(config: KafkaConfig) { } protected def withTimeoutMs[T <: AbstractOptions[T]](options: T) = { - options.timeoutMs(timeoutMs) + options.timeoutMs(ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()) } private def createAdminClient(): Admin = { @@ -120,11 +118,6 @@ class KafkaConsole(config: KafkaConfig) { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getBootstrapServer()) props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()) props.putAll(ContextConfigHolder.CONTEXT_CONFIG.get().getProperties()) - if (config.isEnableAcl) { - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getSecurityProtocol()) - props.put(SaslConfigs.SASL_MECHANISM, config.getSaslMechanism()) - props.put(SaslConfigs.SASL_JAAS_CONFIG, config.getSaslJaasConfig()) - } props } } diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index d31e754..7fb3fe9 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -2,7 +2,7 @@ package kafka.console import com.xuxd.kafka.console.beans.MessageFilter import com.xuxd.kafka.console.beans.enums.FilterType -import com.xuxd.kafka.console.config.KafkaConfig +import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig} import org.apache.commons.lang3.StringUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.producer.ProducerRecord @@ -27,6 +27,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf var startOffTable: immutable.Map[TopicPartition, Long] = Map.empty var endOffTable: immutable.Map[TopicPartition, Long] = Map.empty withAdminClientAndCatchError(admin => { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val startTable = KafkaConsole.getLogTimestampOffsets(admin, partitions.asScala.toSeq, startTime, timeoutMs) startOffTable = startTable.map(t2 => (t2._1, t2._2.offset())).toMap @@ -93,6 +94,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf if (searchNums >= maxNums) { terminate = true } else { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val records = consumer.poll(Duration.ofMillis(timeoutMs)) if (records.isEmpty) { @@ -189,6 +191,7 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf var terminate = tpSet.isEmpty while (!terminate) { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val records = consumer.poll(Duration.ofMillis(timeoutMs)) val tps = new util.HashSet(tpSet).asScala for (tp <- tps) { diff --git a/src/main/scala/kafka/console/OperationConsole.scala b/src/main/scala/kafka/console/OperationConsole.scala index feddfb0..53510dc 100644 --- a/src/main/scala/kafka/console/OperationConsole.scala +++ b/src/main/scala/kafka/console/OperationConsole.scala @@ -1,6 +1,6 @@ package kafka.console -import com.xuxd.kafka.console.config.KafkaConfig +import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig} import kafka.admin.ReassignPartitionsCommand import org.apache.kafka.clients.admin.{ElectLeadersOptions, ListPartitionReassignmentsOptions, PartitionReassignment} import org.apache.kafka.clients.consumer.KafkaConsumer @@ -34,6 +34,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, throw new UnsupportedOperationException("exist consumer client.") } } + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val thatGroupDescriptionList = thatAdmin.describeConsumerGroups(searchGroupIds).all().get(timeoutMs, TimeUnit.MILLISECONDS).values() if (groupDescriptionList.isEmpty) { throw new IllegalArgumentException("that consumer group info is null.") @@ -101,6 +102,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, thatMinOffset: util.Map[TopicPartition, Long]): (Boolean, String) = { val thatAdmin = createAdminClient(props) try { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val searchGroupIds = Collections.singleton(groupId) val groupDescriptionList = consumerConsole.getConsumerGroupList(searchGroupIds) if (groupDescriptionList.isEmpty) { @@ -178,6 +180,7 @@ class OperationConsole(config: KafkaConfig, topicConsole: TopicConsole, val thatConsumer = new KafkaConsumer(props, new ByteArrayDeserializer, new ByteArrayDeserializer) try { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val thisTopicPartitions = consumerConsole.listSubscribeTopics(groupId).get(topic).asScala.sortBy(_.partition()) val thatTopicPartitionMap = thatAdmin.listConsumerGroupOffsets( groupId diff --git a/src/main/scala/kafka/console/TopicConsole.scala b/src/main/scala/kafka/console/TopicConsole.scala index f557a1c..a16bb03 100644 --- a/src/main/scala/kafka/console/TopicConsole.scala +++ b/src/main/scala/kafka/console/TopicConsole.scala @@ -1,6 +1,6 @@ package kafka.console -import com.xuxd.kafka.console.config.KafkaConfig +import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig} import kafka.admin.ReassignPartitionsCommand._ import kafka.utils.Json import org.apache.kafka.clients.admin._ @@ -28,6 +28,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig * @return all topic name set. */ def getTopicNameList(internal: Boolean = true): Set[String] = { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(internal)).names() .get(timeoutMs, TimeUnit.MILLISECONDS), e => { @@ -42,6 +43,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig * @return internal topic name set. */ def getInternalTopicNameList(): Set[String] = { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(true)).listings() .get(timeoutMs, TimeUnit.MILLISECONDS).asScala.filter(_.isInternal).map(_.name()).toSet[String].asJava, e => { @@ -69,6 +71,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig */ def deleteTopic(topic: String): (Boolean, String) = { withAdminClientAndCatchError(admin => { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() admin.deleteTopics(Collections.singleton(topic), new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS) (true, "") }, @@ -103,6 +106,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig */ def createTopic(topic: NewTopic): (Boolean, String) = { withAdminClientAndCatchError(admin => { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val createResult = admin.createTopics(Collections.singleton(topic), new CreateTopicsOptions().retryOnQuotaViolation(false)) createResult.all().get(timeoutMs, TimeUnit.MILLISECONDS) (true, "") @@ -117,6 +121,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig */ def createPartitions(newPartitions: util.Map[String, NewPartitions]): (Boolean, String) = { withAdminClientAndCatchError(admin => { + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() admin.createPartitions(newPartitions, new CreatePartitionsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS) (true, "") @@ -241,6 +246,7 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig .asScala.map(info => new TopicPartition(topic, info.partition())).toSeq case None => throw new IllegalArgumentException("topic is not exist.") } + val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val offsetMap = KafkaConsole.getLogTimestampOffsets(admin, partitions, timestamp, timeoutMs) offsetMap.map(tuple2 => (tuple2._1, tuple2._2.offset())).toMap.asJava }, e => { diff --git a/ui/src/App.vue b/ui/src/App.vue index b82662f..540f245 100644 --- a/ui/src/App.vue +++ b/ui/src/App.vue @@ -11,8 +11,8 @@ >消费组 |消息 - ||Acl |