delete topic
This commit is contained in:
@@ -3,6 +3,7 @@ package com.xuxd.kafka.console.controller;
|
||||
import com.xuxd.kafka.console.beans.enums.TopicType;
|
||||
import com.xuxd.kafka.console.service.TopicService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.DeleteMapping;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
@@ -25,4 +26,9 @@ public class TopicController {
|
||||
public Object getTopicList(@RequestParam(required = false) String topic, @RequestParam String type) {
|
||||
return topicService.getTopicList(topic, TopicType.valueOf(type.toUpperCase()));
|
||||
}
|
||||
|
||||
@DeleteMapping
|
||||
public Object deleteTopic(@RequestParam String topic) {
|
||||
return topicService.deleteTopic(topic);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,4 +15,5 @@ public interface TopicService {
|
||||
|
||||
ResponseData getTopicList(String topic, TopicType type);
|
||||
|
||||
ResponseData deleteTopic(String topic);
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
@@ -64,4 +65,9 @@ public class TopicServiceImpl implements TopicService {
|
||||
|
||||
return ResponseData.create().data(topicDescriptions.stream().map(d -> TopicDescriptionVO.from(d))).success();
|
||||
}
|
||||
|
||||
@Override public ResponseData deleteTopic(String topic) {
|
||||
Tuple2<Object, String> tuple2 = topicConsole.deleteTopic(topic);
|
||||
return (Boolean) tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,8 @@ import org.apache.kafka.common.utils.Time
|
||||
* */
|
||||
class KafkaConsole(config: KafkaConfig) {
|
||||
|
||||
protected val timeoutMs: Int = 3000
|
||||
|
||||
protected def withAdminClient(f: Admin => Any): Any = {
|
||||
|
||||
val admin = createAdminClient()
|
||||
|
||||
@@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit
|
||||
import java.util.{Collections, List, Set}
|
||||
|
||||
import com.xuxd.kafka.console.config.KafkaConfig
|
||||
import org.apache.kafka.clients.admin.{ListTopicsOptions, TopicDescription}
|
||||
import org.apache.kafka.clients.admin.{DeleteTopicsOptions, ListTopicsOptions, TopicDescription}
|
||||
|
||||
import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava}
|
||||
|
||||
@@ -55,4 +55,15 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
|
||||
}).asInstanceOf[List[TopicDescription]]
|
||||
}
|
||||
}
|
||||
|
||||
def deleteTopic(topic: String): (Boolean, String) = {
|
||||
withAdminClientAndCatchError(admin => {
|
||||
admin.deleteTopics(Collections.singleton(topic), new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS)
|
||||
(true, "")
|
||||
},
|
||||
e => {
|
||||
log.error("delete topic error, topic: " + topic, e)
|
||||
(false, e.getMessage)
|
||||
}).asInstanceOf[(Boolean, String)]
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user