diff --git a/src/main/java/com/xuxd/kafka/console/beans/enums/TopicType.java b/src/main/java/com/xuxd/kafka/console/beans/enums/TopicType.java new file mode 100644 index 0000000..82bf468 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/enums/TopicType.java @@ -0,0 +1,11 @@ +package com.xuxd.kafka.console.beans.enums; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-09 17:42:38 + **/ +public enum TopicType { + ALL, SYSTEM, NORMAL; +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java index 0549e1d..0bfdede 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java @@ -1,9 +1,11 @@ package com.xuxd.kafka.console.controller; +import com.xuxd.kafka.console.beans.enums.TopicType; import com.xuxd.kafka.console.service.TopicService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** @@ -20,7 +22,7 @@ public class TopicController { private TopicService topicService; @GetMapping("/list") - public Object getTopicList() { - return topicService.getTopicList(); + public Object getTopicList(@RequestParam(required = false) String topic, @RequestParam String type) { + return topicService.getTopicList(topic, TopicType.valueOf(type.toUpperCase())); } } diff --git a/src/main/java/com/xuxd/kafka/console/service/TopicService.java b/src/main/java/com/xuxd/kafka/console/service/TopicService.java index df17dd3..bf6ef26 100644 --- a/src/main/java/com/xuxd/kafka/console/service/TopicService.java +++ b/src/main/java/com/xuxd/kafka/console/service/TopicService.java @@ -1,6 +1,7 @@ package com.xuxd.kafka.console.service; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.enums.TopicType; /** * kafka-console-ui. @@ -12,6 +13,6 @@ public interface TopicService { ResponseData getTopicNameList(); - ResponseData getTopicList(); + ResponseData getTopicList(String topic, TopicType type); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java index 6bf3fc7..edb95c7 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java @@ -1,12 +1,17 @@ package com.xuxd.kafka.console.service.impl; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.enums.TopicType; import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO; import com.xuxd.kafka.console.service.TopicService; +import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.List; +import java.util.Set; import kafka.console.TopicConsole; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.TopicDescription; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -25,11 +30,36 @@ public class TopicServiceImpl implements TopicService { private TopicConsole topicConsole; @Override public ResponseData getTopicNameList() { - return ResponseData.create().data(topicConsole.getTopicNameList()).success(); + return ResponseData.create().data(topicConsole.getTopicNameList(true)).success(); } - @Override public ResponseData getTopicList() { - List topicDescriptions = topicConsole.getTopicList(topicConsole.getTopicNameList()); + @Override public ResponseData getTopicList(String topic, TopicType type) { + Set topicSet = new HashSet<>(); + switch (type) { + case SYSTEM: + Set internalTopicSet = topicConsole.getInternalTopicNameList(); + if (StringUtils.isEmpty(topic)) { + topicSet.addAll(internalTopicSet); + } else { + if (internalTopicSet.contains(topic)) { + topicSet.add(topic); + } else { + return ResponseData.create().data(Collections.emptyList()).success(); + } + } + break; + case NORMAL: + Set internalTopicS = topicConsole.getInternalTopicNameList(); + if (internalTopicS.contains(topic)) { + return ResponseData.create().data(Collections.emptyList()).success(); + } + topicSet.addAll(StringUtils.isEmpty(topic) ? topicConsole.getTopicNameList(false) : Collections.singleton(topic)); + break; + default: + topicSet.addAll(StringUtils.isEmpty(topic) ? topicConsole.getTopicNameList(true) : Collections.singleton(topic)); + break; + } + List topicDescriptions = topicConsole.getTopicList(topicSet); topicDescriptions.sort(Comparator.comparing(TopicDescription::name)); return ResponseData.create().data(topicDescriptions.stream().map(d -> TopicDescriptionVO.from(d))).success(); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 31fad25..6c70eb8 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -7,7 +7,7 @@ server: kafka: config: # kafka broker地址,多个以逗号分隔 -# bootstrap-server: 'localhost:9092' + bootstrap-server: 'localhost:9092' request-timeout-ms: 60000 # 服务端是否启用acl,如果不启用,下面的几项都忽略即可 enable-acl: true @@ -17,7 +17,7 @@ kafka: # 超级管理员用户名,在broker上已经配置为超级管理员 admin-username: admin # 超级管理员密码 - admin-password: admin!QAZ + admin-password: admin # 启动自动创建配置的超级管理员用户 admin-create: true # broker连接的zk地址 @@ -48,6 +48,3 @@ logging: cron: # clear-dirty-user: 0 * * * * ? clear-dirty-user: 0 0 1 * * ? - - -#spring.jpa.database-platform=org.hibernate.dialect.H2Dialect \ No newline at end of file diff --git a/src/main/scala/kafka/console/TopicConsole.scala b/src/main/scala/kafka/console/TopicConsole.scala index 92802a3..531626c 100644 --- a/src/main/scala/kafka/console/TopicConsole.scala +++ b/src/main/scala/kafka/console/TopicConsole.scala @@ -7,6 +7,8 @@ import java.util.{Collections, List, Set} import com.xuxd.kafka.console.config.KafkaConfig import org.apache.kafka.clients.admin.{ListTopicsOptions, TopicDescription} +import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava} + /** * kafka-console-ui. * @@ -15,8 +17,13 @@ import org.apache.kafka.clients.admin.{ListTopicsOptions, TopicDescription} * */ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging { - def getTopicNameList(): Set[String] = { - withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(true)).names() + /** + * get all topic name set. + * + * @return all topic name set. + */ + def getTopicNameList(internal: Boolean = true): Set[String] = { + withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(internal)).names() .get(3000, TimeUnit.MILLISECONDS), e => { log.error("listTopics error.", e) @@ -24,6 +31,20 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig }).asInstanceOf[Set[String]] } + /** + * get all internal topic name set. + * + * @return internal topic name set. + */ + def getInternalTopicNameList(): Set[String] = { + withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(true)).listings() + .get(3000, TimeUnit.MILLISECONDS).asScala.filter(_.isInternal).map(_.name()).toSet[String].asJava, + e => { + log.error("listInternalTopics error.", e) + Collections.emptySet() + }).asInstanceOf[Set[String]] + } + def getTopicList(topics: Set[String]): List[TopicDescription] = { if (topics == null || topics.isEmpty) { Collections.emptyList() diff --git a/src/test/java/com/xuxd/kafka/console/service/impl/TopicServiceImplTest.java b/src/test/java/com/xuxd/kafka/console/service/impl/TopicServiceImplTest.java index 05d810f..e607d3e 100644 --- a/src/test/java/com/xuxd/kafka/console/service/impl/TopicServiceImplTest.java +++ b/src/test/java/com/xuxd/kafka/console/service/impl/TopicServiceImplTest.java @@ -1,7 +1,7 @@ package com.xuxd.kafka.console.service.impl; +import com.xuxd.kafka.console.beans.enums.TopicType; import com.xuxd.kafka.console.service.TopicService; -import java.util.Collections; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -27,6 +27,6 @@ public class TopicServiceImplTest { @Test public void getTopicList() { - log.info(topicService.getTopicList().getData().toString()); + log.info(topicService.getTopicList(null, TopicType.ALL).getData().toString()); } } \ No newline at end of file diff --git a/ui/src/views/topic/Topic.vue b/ui/src/views/topic/Topic.vue index 23aced0..a31ebcb 100644 --- a/ui/src/views/topic/Topic.vue +++ b/ui/src/views/topic/Topic.vue @@ -17,6 +17,19 @@ /> + + + + 所有 + 普通 + 系统 + + + @@ -32,22 +45,18 @@
新增/更新
- +
- +
-
+
{ - let queryParam = {}; - if (values.username) { - queryParam.username = values.username; - } - if (values.topic) { - queryParam.resourceType = "TOPIC"; - queryParam.resourceName = values.topic; - } else if (values.groupId) { - queryParam.resourceType = "GROUP"; - queryParam.resourceName = values.groupId; - } - Object.assign(this.queryParam, queryParam); - }); + this.getTopicList(); }, handleReset() { @@ -108,9 +104,11 @@ export default { }, getTopicList() { + Object.assign(this.queryParam, this.form.getFieldsValue()); request({ url: KafkaTopicApi.getTopicList.url, method: KafkaTopicApi.getTopicList.method, + params: this.queryParam, }).then((res) => { this.data = res.data; }); @@ -199,4 +197,8 @@ const columns = [ .operation-btn { margin-right: 3%; } + +.type-select { + width: 200px !important; +}