diff --git a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java index 0bfdede..1e9eedd 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/TopicController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/TopicController.java @@ -3,6 +3,7 @@ package com.xuxd.kafka.console.controller; import com.xuxd.kafka.console.beans.enums.TopicType; import com.xuxd.kafka.console.service.TopicService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -25,4 +26,9 @@ public class TopicController { public Object getTopicList(@RequestParam(required = false) String topic, @RequestParam String type) { return topicService.getTopicList(topic, TopicType.valueOf(type.toUpperCase())); } + + @DeleteMapping + public Object deleteTopic(@RequestParam String topic) { + return topicService.deleteTopic(topic); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/TopicService.java b/src/main/java/com/xuxd/kafka/console/service/TopicService.java index bf6ef26..cdf16e4 100644 --- a/src/main/java/com/xuxd/kafka/console/service/TopicService.java +++ b/src/main/java/com/xuxd/kafka/console/service/TopicService.java @@ -15,4 +15,5 @@ public interface TopicService { ResponseData getTopicList(String topic, TopicType type); + ResponseData deleteTopic(String topic); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java index edb95c7..e116698 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/TopicServiceImpl.java @@ -15,6 +15,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.TopicDescription; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import scala.Tuple2; /** * kafka-console-ui. @@ -64,4 +65,9 @@ public class TopicServiceImpl implements TopicService { return ResponseData.create().data(topicDescriptions.stream().map(d -> TopicDescriptionVO.from(d))).success(); } + + @Override public ResponseData deleteTopic(String topic) { + Tuple2 tuple2 = topicConsole.deleteTopic(topic); + return (Boolean) tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2); + } } diff --git a/src/main/scala/kafka/console/KafkaConsole.scala b/src/main/scala/kafka/console/KafkaConsole.scala index b8f999c..5ffea38 100644 --- a/src/main/scala/kafka/console/KafkaConsole.scala +++ b/src/main/scala/kafka/console/KafkaConsole.scala @@ -17,6 +17,8 @@ import org.apache.kafka.common.utils.Time * */ class KafkaConsole(config: KafkaConfig) { + protected val timeoutMs: Int = 3000 + protected def withAdminClient(f: Admin => Any): Any = { val admin = createAdminClient() diff --git a/src/main/scala/kafka/console/TopicConsole.scala b/src/main/scala/kafka/console/TopicConsole.scala index 531626c..594efcf 100644 --- a/src/main/scala/kafka/console/TopicConsole.scala +++ b/src/main/scala/kafka/console/TopicConsole.scala @@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit import java.util.{Collections, List, Set} import com.xuxd.kafka.console.config.KafkaConfig -import org.apache.kafka.clients.admin.{ListTopicsOptions, TopicDescription} +import org.apache.kafka.clients.admin.{DeleteTopicsOptions, ListTopicsOptions, TopicDescription} import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava} @@ -55,4 +55,15 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig }).asInstanceOf[List[TopicDescription]] } } + + def deleteTopic(topic: String): (Boolean, String) = { + withAdminClientAndCatchError(admin => { + admin.deleteTopics(Collections.singleton(topic), new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS) + (true, "") + }, + e => { + log.error("delete topic error, topic: " + topic, e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 2d7bd24..a356f86 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -61,4 +61,8 @@ export const KafkaTopicApi = { url: "/topic/list", method: "get", }, + deleteTopic: { + url: "/topic", + method: "delete", + }, }; diff --git a/ui/src/views/acl/UserDetail.vue b/ui/src/views/acl/UserDetail.vue index 5b86eab..68f1f4a 100644 --- a/ui/src/views/acl/UserDetail.vue +++ b/ui/src/views/acl/UserDetail.vue @@ -8,12 +8,7 @@ :footer="null" @cancel="handleCancel" > - + {{ user.username }} diff --git a/ui/src/views/topic/Topic.vue b/ui/src/views/topic/Topic.vue index a31ebcb..af4a3b8 100644 --- a/ui/src/views/topic/Topic.vue +++ b/ui/src/views/topic/Topic.vue @@ -59,7 +59,7 @@ :title="'删除topic: ' + record.name + '?'" ok-text="确认" cancel-text="取消" - @confirm="handleReset(record)" + @confirm="deleteTopic(record.name)" > 删除 import request from "@/utils/request"; import { KafkaTopicApi } from "@/utils/api"; +import notification from "ant-design-vue/es/notification"; export default { name: "Topic", components: {}, @@ -113,6 +114,22 @@ export default { this.data = res.data; }); }, + deleteTopic(topic) { + request({ + url: KafkaTopicApi.deleteTopic.url + "?topic=" + topic, + method: KafkaTopicApi.deleteTopic.method, + }).then((res) => { + if (res.code == 0) { + this.$message.success(res.msg); + this.getTopicList(); + } else { + notification.error({ + message: "error", + description: res.msg, + }); + } + }); + }, }, created() { this.getTopicList();