diff --git a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java index 8e049fd..154f4bb 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/MessageController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/MessageController.java @@ -1,14 +1,15 @@ package com.xuxd.kafka.console.controller; +import com.xuxd.kafka.console.beans.QueryMessage; +import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.SendMessage; import com.xuxd.kafka.console.beans.dto.QueryMessageDTO; import com.xuxd.kafka.console.service.MessageService; +import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; + +import java.util.List; /** * kafka-console-ui. @@ -52,4 +53,12 @@ public class MessageController { public Object resend(@RequestBody SendMessage message) { return messageService.resend(message); } + + @DeleteMapping + public Object delete(@RequestBody List messages) { + if (CollectionUtils.isEmpty(messages)) { + return ResponseData.create().failed("params is null"); + } + return messageService.delete(messages); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/MessageService.java b/src/main/java/com/xuxd/kafka/console/service/MessageService.java index bb4fa0d..f7ae0bc 100644 --- a/src/main/java/com/xuxd/kafka/console/service/MessageService.java +++ b/src/main/java/com/xuxd/kafka/console/service/MessageService.java @@ -4,6 +4,8 @@ import com.xuxd.kafka.console.beans.QueryMessage; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.SendMessage; +import java.util.List; + /** * kafka-console-ui. * @@ -23,4 +25,6 @@ public interface MessageService { ResponseData send(SendMessage message); ResponseData resend(SendMessage message); + + ResponseData delete(List messages); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index fc08e2e..c8be627 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -24,6 +24,7 @@ import kafka.console.TopicConsole; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; @@ -242,6 +243,18 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa return success ? ResponseData.create().success("success: " + tuple2._2()) : ResponseData.create().failed(tuple2._2()); } + @Override + public ResponseData delete(List messages) { + Map params = new HashMap<>(messages.size(), 1f); + + messages.forEach(message -> { + params.put(new TopicPartition(message.getTopic(), message.getPartition()), RecordsToDelete.beforeOffset(message.getOffset())); + }); + Tuple2 tuple2 = messageConsole.delete(params); + boolean success = (boolean) tuple2._1(); + return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2()); + } + private Map> searchRecordByOffset(QueryMessage queryMessage) { Set partitions = getPartitions(queryMessage); diff --git a/src/main/scala/kafka/console/MessageConsole.scala b/src/main/scala/kafka/console/MessageConsole.scala index 2ee9446..34f1e5f 100644 --- a/src/main/scala/kafka/console/MessageConsole.scala +++ b/src/main/scala/kafka/console/MessageConsole.scala @@ -4,13 +4,14 @@ import com.xuxd.kafka.console.beans.MessageFilter import com.xuxd.kafka.console.beans.enums.FilterType import com.xuxd.kafka.console.config.{ContextConfigHolder, KafkaConfig} import org.apache.commons.lang3.StringUtils +import org.apache.kafka.clients.admin.{DeleteRecordsOptions, RecordsToDelete} import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import java.time.Duration import java.util -import java.util.Properties +import java.util.{Properties} import scala.collection.immutable import scala.jdk.CollectionConverters.{CollectionHasAsScala, MapHasAsScala, SeqHasAsJava} @@ -236,4 +237,14 @@ class MessageConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConf (false, e.getMessage) }).asInstanceOf[(Boolean, String)] } + + def delete(recordsToDelete: util.Map[TopicPartition, RecordsToDelete]): (Boolean, String) = { + withAdminClientAndCatchError(admin => { + admin.deleteRecords(recordsToDelete, withTimeoutMs(new DeleteRecordsOptions())).all().get() + (true, "") + }, e => { + log.error("delete message error.", e) + (false, "delete error :" + e.getMessage) + }).asInstanceOf[(Boolean, String)] + } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 123641c..79d1090 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -276,4 +276,8 @@ export const KafkaMessageApi = { url: "/message/resend", method: "post", }, + delete: { + url: "/message", + method: "delete", + }, }; diff --git a/ui/src/views/message/DeleteMessage.vue b/ui/src/views/message/DeleteMessage.vue new file mode 100644 index 0000000..ca41884 --- /dev/null +++ b/ui/src/views/message/DeleteMessage.vue @@ -0,0 +1,196 @@ + + + + + diff --git a/ui/src/views/message/Message.vue b/ui/src/views/message/Message.vue index 2ee892f..449f21b 100644 --- a/ui/src/views/message/Message.vue +++ b/ui/src/views/message/Message.vue @@ -11,6 +11,9 @@ + + + @@ -23,9 +26,10 @@ import request from "@/utils/request"; import { KafkaTopicApi } from "@/utils/api"; import notification from "ant-design-vue/lib/notification"; import SendMessage from "@/views/message/SendMessage"; +import DeleteMessage from "./DeleteMessage"; export default { name: "Message", - components: { SearchByTime, SearchByOffset, SendMessage }, + components: { DeleteMessage, SearchByTime, SearchByOffset, SendMessage }, data() { return { loading: false,