支持批量删除topic.

This commit is contained in:
许晓东
2022-07-24 11:52:48 +08:00
parent ccdcebb24d
commit e9f34e1d19
5 changed files with 52 additions and 21 deletions

View File

@@ -43,8 +43,8 @@ public class TopicController {
}
@DeleteMapping
public Object deleteTopic(@RequestParam String topic) {
return topicService.deleteTopic(topic);
public Object deleteTopic(@RequestBody List<String> topics) {
return topicService.deleteTopics(topics);
}
@GetMapping("/partition")

View File

@@ -4,6 +4,8 @@ import com.xuxd.kafka.console.beans.ReplicaAssignment;
import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
import com.xuxd.kafka.console.beans.enums.TopicType;
import java.util.Collection;
import java.util.List;
import org.apache.kafka.clients.admin.NewTopic;
@@ -19,7 +21,7 @@ public interface TopicService {
ResponseData getTopicList(String topic, TopicType type);
ResponseData deleteTopic(String topic);
ResponseData deleteTopics(Collection<String> topics);
ResponseData getTopicPartitionInfo(String topic);

View File

@@ -9,16 +9,6 @@ import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO;
import com.xuxd.kafka.console.beans.vo.TopicPartitionVO;
import com.xuxd.kafka.console.service.TopicService;
import com.xuxd.kafka.console.utils.GsonUtil;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import kafka.console.MessageConsole;
import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j;
@@ -33,6 +23,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Tuple2;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
* kafka-console-ui.
*
@@ -87,8 +81,8 @@ public class TopicServiceImpl implements TopicService {
return ResponseData.create().data(topicDescriptions.stream().map(d -> TopicDescriptionVO.from(d))).success();
}
@Override public ResponseData deleteTopic(String topic) {
Tuple2<Object, String> tuple2 = topicConsole.deleteTopic(topic);
@Override public ResponseData deleteTopics(Collection<String> topics) {
Tuple2<Object, String> tuple2 = topicConsole.deleteTopics(topics);
return (Boolean) tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2);
}

View File

@@ -66,17 +66,17 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
/**
* delete topic by topic name.
*
* @param topic topic name.
* @param topics topic name list.
* @return result or : fail message.
*/
def deleteTopic(topic: String): (Boolean, String) = {
def deleteTopics(topics: util.Collection[String]): (Boolean, String) = {
withAdminClientAndCatchError(admin => {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
admin.deleteTopics(Collections.singleton(topic), new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS)
admin.deleteTopics(topics, new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "")
},
e => {
log.error("delete topic error, topic: " + topic, e)
log.error("delete topic error, topic: " + topics, e)
(false, e.getMessage)
}).asInstanceOf[(Boolean, String)]
}