支持在线删除消息

This commit is contained in:
许晓东
2022-07-04 17:16:00 +08:00
parent b163e5f776
commit 979859b232
7 changed files with 248 additions and 7 deletions

View File

@@ -1,14 +1,15 @@
package com.xuxd.kafka.console.controller;
import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
import com.xuxd.kafka.console.beans.dto.QueryMessageDTO;
import com.xuxd.kafka.console.service.MessageService;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* kafka-console-ui.
@@ -52,4 +53,12 @@ public class MessageController {
public Object resend(@RequestBody SendMessage message) {
return messageService.resend(message);
}
@DeleteMapping
public Object delete(@RequestBody List<QueryMessage> messages) {
if (CollectionUtils.isEmpty(messages)) {
return ResponseData.create().failed("params is null");
}
return messageService.delete(messages);
}
}

View File

@@ -4,6 +4,8 @@ import com.xuxd.kafka.console.beans.QueryMessage;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.SendMessage;
import java.util.List;
/**
* kafka-console-ui.
*
@@ -23,4 +25,6 @@ public interface MessageService {
ResponseData send(SendMessage message);
ResponseData resend(SendMessage message);
ResponseData delete(List<QueryMessage> messages);
}

View File

@@ -24,6 +24,7 @@ import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -242,6 +243,18 @@ public class MessageServiceImpl implements MessageService, ApplicationContextAwa
return success ? ResponseData.create().success("success: " + tuple2._2()) : ResponseData.create().failed(tuple2._2());
}
@Override
public ResponseData delete(List<QueryMessage> messages) {
Map<TopicPartition, RecordsToDelete> params = new HashMap<>(messages.size(), 1f);
messages.forEach(message -> {
params.put(new TopicPartition(message.getTopic(), message.getPartition()), RecordsToDelete.beforeOffset(message.getOffset()));
});
Tuple2<Object, String> tuple2 = messageConsole.delete(params);
boolean success = (boolean) tuple2._1();
return success ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2());
}
private Map<TopicPartition, ConsumerRecord<byte[], byte[]>> searchRecordByOffset(QueryMessage queryMessage) {
Set<TopicPartition> partitions = getPartitions(queryMessage);