From b642647b2e36c5544b66bcb8f1db4b3776fba047 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Wed, 13 Oct 2021 17:15:14 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Etopic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/console/beans/dto/NewTopicDTO.java | 30 ++++ .../console/controller/TopicController.java | 8 + .../kafka/console/service/TopicService.java | 3 + .../service/impl/TopicServiceImpl.java | 6 + .../scala/kafka/console/TopicConsole.scala | 16 +- ui/src/utils/api.js | 4 + ui/src/views/topic/CreateTopic.vue | 158 ++++++++++++++++++ ui/src/views/topic/Topic.vue | 22 ++- 8 files changed, 244 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/xuxd/kafka/console/beans/dto/NewTopicDTO.java create mode 100644 ui/src/views/topic/CreateTopic.vue diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/NewTopicDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/NewTopicDTO.java new file mode 100644 index 0000000..7ad0faa --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/NewTopicDTO.java @@ -0,0 +1,30 @@ +package com.xuxd.kafka.console.beans.dto; + +import java.util.HashMap; +import java.util.Map; +import lombok.Data; +import org.apache.commons.collections.MapUtils; +import org.apache.kafka.clients.admin.NewTopic; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-13 14:58:11 + **/ +@Data +public class NewTopicDTO { + + private String name; + private Integer numPartitions; + private Short replicationFactor; + private Map configs = new HashMap<>(); + + public NewTopic toNewTopic() { + NewTopic topic = new NewTopic(name, numPartitions, replicationFactor); + if (MapUtils.isNotEmpty(configs)) { + topic.configs(configs); + } + return topic; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java index 89b7ae7..27690fc 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java @@ -1,10 +1,13 @@ package com.xuxd.kafka.console.controller; +import com.xuxd.kafka.console.beans.dto.NewTopicDTO; import com.xuxd.kafka.console.beans.enums.TopicType; import com.xuxd.kafka.console.service.TopicService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @@ -36,4 +39,9 @@ public class TopicController { public Object getTopicPartitionInfo(@RequestParam String topic) { return topicService.getTopicPartitionInfo(topic); } + + @PostMapping("/new") + public Object createNewTopic(@RequestBody NewTopicDTO topicDTO) { + return topicService.createTopic(topicDTO.toNewTopic()); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/TopicService.java b/src/main/java/com/xuxd/kafka/console/service/TopicService.java index 2a48b6d..55f4201 100644 --- a/src/main/java/com/xuxd/kafka/console/service/TopicService.java +++ b/src/main/java/com/xuxd/kafka/console/service/TopicService.java @@ -2,6 +2,7 @@ package com.xuxd.kafka.console.service; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.enums.TopicType; +import org.apache.kafka.clients.admin.NewTopic; /** * kafka-console-ui. @@ -18,4 +19,6 @@ public interface TopicService { ResponseData deleteTopic(String topic); ResponseData getTopicPartitionInfo(String topic); + + ResponseData createTopic(NewTopic topic); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java index 3e32759..dbff6e7 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java @@ -16,6 +16,7 @@ import java.util.stream.Collectors; import kafka.console.TopicConsole; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; @@ -106,4 +107,9 @@ public class TopicServiceImpl implements TopicService { } return ResponseData.create().data(voList).success(); } + + @Override public ResponseData createTopic(NewTopic topic) { + Tuple2 createResult = topicConsole.createTopic(topic); + return (boolean) createResult._1 ? ResponseData.create().success() : ResponseData.create().failed(String.valueOf(createResult._2)); + } } diff --git a/src/main/scala/kafka/console/TopicConsole.scala b/src/main/scala/kafka/console/TopicConsole.scala index b731db6..4970df8 100644 --- a/src/main/scala/kafka/console/TopicConsole.scala +++ b/src/main/scala/kafka/console/TopicConsole.scala @@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit import java.util.{Collections, List, Set} import com.xuxd.kafka.console.config.KafkaConfig -import org.apache.kafka.clients.admin.{DeleteTopicsOptions, ListTopicsOptions, TopicDescription} +import org.apache.kafka.clients.admin.{CreateTopicsOptions, DeleteTopicsOptions, ListTopicsOptions, NewTopic, TopicDescription} import org.apache.kafka.common.TopicPartition import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava} @@ -93,4 +93,18 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig (Collections.emptyMap(), Collections.emptyMap()) }).asInstanceOf[(util.Map[TopicPartition, Long], util.Map[TopicPartition, Long])] } + + /** + * create topic. + */ + def createTopic(topic: NewTopic): (Boolean, String) = { + withAdminClientAndCatchError(admin => { + val createResult = admin.createTopics(Collections.singleton(topic), new CreateTopicsOptions().retryOnQuotaViolation(false)) + createResult.all().get(timeoutMs, TimeUnit.MILLISECONDS) + (true, "") + }, e => { + log.error("create topic error, topic: " + topic.name(), e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 8d4eeed..77622a1 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -69,6 +69,10 @@ export const KafkaTopicApi = { url: "/topic/partition", method: "get", }, + creatTopic: { + url: "/topic/new", + method: "post", + }, }; export const KafkaConsumerApi = { diff --git a/ui/src/views/topic/CreateTopic.vue b/ui/src/views/topic/CreateTopic.vue new file mode 100644 index 0000000..f7d154a --- /dev/null +++ b/ui/src/views/topic/CreateTopic.vue @@ -0,0 +1,158 @@ + + + + + diff --git a/ui/src/views/topic/Topic.vue b/ui/src/views/topic/Topic.vue index f19aaf0..4121604 100644 --- a/ui/src/views/topic/Topic.vue +++ b/ui/src/views/topic/Topic.vue @@ -44,7 +44,9 @@
- 新增/更新 + 新增
@@ -75,6 +77,11 @@ :visible="showPartitionInfo" @closePartitionInfoDialog="closePartitionInfoDialog" > + +
@@ -85,10 +92,11 @@ import request from "@/utils/request"; import { KafkaTopicApi } from "@/utils/api"; import notification from "ant-design-vue/es/notification"; import PartitionInfo from "@/views/topic/PartitionInfo"; +import CreateTopic from "@/views/topic/CreateTopic"; export default { name: "Topic", - components: { PartitionInfo }, + components: { PartitionInfo, CreateTopic }, data() { return { queryParam: { type: "normal" }, @@ -105,6 +113,7 @@ export default { }, showPartitionInfo: false, loading: false, + showCreateTopic: false, }; }, methods: { @@ -152,6 +161,15 @@ export default { closePartitionInfoDialog() { this.showPartitionInfo = false; }, + openCreateTopicDialog() { + this.showCreateTopic = true; + }, + closeCreateTopicDialog(res) { + this.showCreateTopic = false; + if (res.refresh) { + this.getTopicList(); + } + }, }, created() { this.getTopicList();