From fad17302c8eb4b3706447a34c5aceb40a2da8311 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Wed, 8 Sep 2021 21:18:12 +0800 Subject: [PATCH] add topic console --- README.md | 2 + .../console/beans/vo/TopicDescriptionVO.java | 26 +++++++++++++ .../console/config/KafkaConfiguration.java | 6 +++ .../console/controller/TopicController.java | 26 +++++++++++++ .../kafka/console/service/TopicService.java | 17 ++++++++ .../service/impl/TopicServiceImpl.java | 39 +++++++++++++++++++ src/main/resources/application.yml | 5 ++- .../scala/kafka/console/KafkaConsole.scala | 8 ++++ .../scala/kafka/console/TopicConsole.scala | 37 ++++++++++++++++++ .../service/impl/TopicServiceImplTest.java | 32 +++++++++++++++ 10 files changed, 196 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/xuxd/kafka/console/beans/vo/TopicDescriptionVO.java create mode 100644 src/main/java/com/xuxd/kafka/console/controller/TopicController.java create mode 100644 src/main/java/com/xuxd/kafka/console/service/TopicService.java create mode 100644 src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java create mode 100644 src/main/scala/kafka/console/TopicConsole.scala create mode 100644 src/test/java/com/xuxd/kafka/console/service/impl/TopicServiceImplTest.java diff --git a/README.md b/README.md index b854e5a..d5a51af 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,8 @@ * vue ## kafka版本 * 当前使用的kafka 2.8.0 +## 监控 +仅提供运维管理功能,监控、告警需要配合其它组件,使用请查看:https://blog.csdn.net/x763795151/article/details/119705372 # 打包、部署 ## 打包 环境要求 diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/TopicDescriptionVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/TopicDescriptionVO.java new file mode 100644 index 0000000..0ec0bf3 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/TopicDescriptionVO.java @@ -0,0 +1,26 @@ +package com.xuxd.kafka.console.beans.vo; + +import lombok.Data; +import org.apache.kafka.clients.admin.TopicDescription; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-08 21:12:36 + **/ +@Data +public class TopicDescriptionVO { + + private String name; + private boolean internal; + private int partitions; + + public static TopicDescriptionVO from(TopicDescription description) { + TopicDescriptionVO vo = new TopicDescriptionVO(); + vo.setName(description.name()); + vo.setInternal(description.isInternal()); + vo.setPartitions(description.partitions().size()); + return vo; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java b/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java index 47d7c94..53877d8 100644 --- a/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java +++ b/src/main/java/com/xuxd/kafka/console/config/KafkaConfiguration.java @@ -2,6 +2,7 @@ package com.xuxd.kafka.console.config; import kafka.console.KafkaAclConsole; import kafka.console.KafkaConfigConsole; +import kafka.console.TopicConsole; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -23,4 +24,9 @@ public class KafkaConfiguration { public KafkaAclConsole kafkaAclConsole(KafkaConfig config) { return new KafkaAclConsole(config); } + + @Bean + public TopicConsole topicConsole(KafkaConfig config) { + return new TopicConsole(config); + } } diff --git a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java new file mode 100644 index 0000000..0549e1d --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java @@ -0,0 +1,26 @@ +package com.xuxd.kafka.console.controller; + +import com.xuxd.kafka.console.service.TopicService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-08 20:28:35 + **/ +@RestController +@RequestMapping("/topic") +public class TopicController { + + @Autowired + private TopicService topicService; + + @GetMapping("/list") + public Object getTopicList() { + return topicService.getTopicList(); + } +} diff --git a/src/main/java/com/xuxd/kafka/console/service/TopicService.java b/src/main/java/com/xuxd/kafka/console/service/TopicService.java new file mode 100644 index 0000000..df17dd3 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/TopicService.java @@ -0,0 +1,17 @@ +package com.xuxd.kafka.console.service; + +import com.xuxd.kafka.console.beans.ResponseData; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-08 20:01:49 + **/ +public interface TopicService { + + ResponseData getTopicNameList(); + + ResponseData getTopicList(); + +} diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java new file mode 100644 index 0000000..099cd94 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java @@ -0,0 +1,39 @@ +package com.xuxd.kafka.console.service.impl; + +import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO; +import com.xuxd.kafka.console.service.TopicService; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import kafka.console.TopicConsole; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.TopicDescription; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-08 20:02:56 + **/ +@Slf4j +@Service +public class TopicServiceImpl implements TopicService { + + @Autowired + private TopicConsole topicConsole; + + @Override public ResponseData getTopicNameList() { + return ResponseData.create().data(topicConsole.getTopicNameList()).success(); + } + + @Override public ResponseData getTopicList() { + List topicDescriptions = topicConsole.getTopicList(topicConsole.getTopicNameList()); + topicDescriptions.sort(Comparator.comparing(TopicDescription::name)); + + return ResponseData.create().data(topicDescriptions.stream().map(d -> TopicDescriptionVO.from(d))).success(); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 1f6c13f..3624bf4 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -7,7 +7,8 @@ server: kafka: config: # kafka broker地址,多个以逗号分隔 - bootstrap-server: 'localhost:9092' +# bootstrap-server: 'localhost:9092' + bootstrap-server: '10.100.64.48:9092,10.100.77.250:9092,10.100.73.154:9092' request-timeout-ms: 60000 # 服务端是否启用acl,如果不启用,下面的几项都忽略即可 enable-acl: true @@ -16,7 +17,7 @@ kafka: # 超级管理员用户名,在broker上已经配置为超级管理员 admin-username: admin # 超级管理员密码 - admin-password: admin + admin-password: admin!QAZ # 启动自动创建配置的超级管理员用户 admin-create: true # broker连接的zk地址 diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index f554697..b8f999c 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -27,6 +27,14 @@ class KafkaConsole(config: KafkaConfig) { } } + protected def withAdminClientAndCatchError(f: Admin => Any, eh: Exception => Any): Any = { + try { + withAdminClient(f) + } catch { + case er: Exception => eh(er) + } + } + protected def withZKClient(f: AdminZkClient => Any): Any = { val zkClient = KafkaZkClient(config.getZookeeperAddr, false, 30000, 30000, Int.MaxValue, Time.SYSTEM) val adminZkClient = new AdminZkClient(zkClient) diff --git a/src/main/scala/kafka/console/TopicConsole.scala b/src/main/scala/kafka/console/TopicConsole.scala new file mode 100644 index 0000000..92802a3 --- /dev/null +++ b/src/main/scala/kafka/console/TopicConsole.scala @@ -0,0 +1,37 @@ +package kafka.console + +import java.util +import java.util.concurrent.TimeUnit +import java.util.{Collections, List, Set} + +import com.xuxd.kafka.console.config.KafkaConfig +import org.apache.kafka.clients.admin.{ListTopicsOptions, TopicDescription} + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-08 19:52:27 + * */ +class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig) with Logging { + + def getTopicNameList(): Set[String] = { + withAdminClientAndCatchError(admin => admin.listTopics(new ListTopicsOptions().listInternal(true)).names() + .get(3000, TimeUnit.MILLISECONDS), + e => { + log.error("listTopics error.", e) + Collections.emptySet() + }).asInstanceOf[Set[String]] + } + + def getTopicList(topics: Set[String]): List[TopicDescription] = { + if (topics == null || topics.isEmpty) { + Collections.emptyList() + } else { + withAdminClientAndCatchError(admin => new util.ArrayList[TopicDescription](admin.describeTopics(topics).all().get().values()), e => { + log.error("describeTopics error.", e) + Collections.emptyList() + }).asInstanceOf[List[TopicDescription]] + } + } +} diff --git a/src/test/java/com/xuxd/kafka/console/service/impl/TopicServiceImplTest.java b/src/test/java/com/xuxd/kafka/console/service/impl/TopicServiceImplTest.java new file mode 100644 index 0000000..05d810f --- /dev/null +++ b/src/test/java/com/xuxd/kafka/console/service/impl/TopicServiceImplTest.java @@ -0,0 +1,32 @@ +package com.xuxd.kafka.console.service.impl; + +import com.xuxd.kafka.console.service.TopicService; +import java.util.Collections; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-08 20:04:28 + **/ +@Slf4j +@SpringBootTest +public class TopicServiceImplTest { + + @Autowired + private TopicService topicService; + + @Test + public void getTopicNameList() { + log.info(topicService.getTopicNameList().getData().toString()); + } + + @Test + public void getTopicList() { + log.info(topicService.getTopicList().getData().toString()); + } +} \ No newline at end of file