支持批量删除topic.

This commit is contained in:
许晓东
2022-07-24 11:52:48 +08:00
parent ccdcebb24d
commit 428fe179e3
5 changed files with 52 additions and 21 deletions

View File

@@ -43,8 +43,8 @@ public class TopicController {
} }
@DeleteMapping @DeleteMapping
public Object deleteTopic(@RequestParam String topic) { public Object deleteTopic(@RequestBody List<String> topics) {
return topicService.deleteTopic(topic); return topicService.deleteTopics(topics);
} }
@GetMapping("/partition") @GetMapping("/partition")

View File

@@ -4,6 +4,8 @@ import com.xuxd.kafka.console.beans.ReplicaAssignment;
import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.ResponseData;
import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch; import com.xuxd.kafka.console.beans.enums.TopicThrottleSwitch;
import com.xuxd.kafka.console.beans.enums.TopicType; import com.xuxd.kafka.console.beans.enums.TopicType;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
@@ -19,7 +21,7 @@ public interface TopicService {
ResponseData getTopicList(String topic, TopicType type); ResponseData getTopicList(String topic, TopicType type);
ResponseData deleteTopic(String topic); ResponseData deleteTopics(Collection<String> topics);
ResponseData getTopicPartitionInfo(String topic); ResponseData getTopicPartitionInfo(String topic);

View File

@@ -9,16 +9,6 @@ import com.xuxd.kafka.console.beans.vo.TopicDescriptionVO;
import com.xuxd.kafka.console.beans.vo.TopicPartitionVO; import com.xuxd.kafka.console.beans.vo.TopicPartitionVO;
import com.xuxd.kafka.console.service.TopicService; import com.xuxd.kafka.console.service.TopicService;
import com.xuxd.kafka.console.utils.GsonUtil; import com.xuxd.kafka.console.utils.GsonUtil;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import kafka.console.MessageConsole; import kafka.console.MessageConsole;
import kafka.console.TopicConsole; import kafka.console.TopicConsole;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -33,6 +23,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import scala.Tuple2; import scala.Tuple2;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/** /**
* kafka-console-ui. * kafka-console-ui.
* *
@@ -87,8 +81,8 @@ public class TopicServiceImpl implements TopicService {
return ResponseData.create().data(topicDescriptions.stream().map(d -> TopicDescriptionVO.from(d))).success(); return ResponseData.create().data(topicDescriptions.stream().map(d -> TopicDescriptionVO.from(d))).success();
} }
@Override public ResponseData deleteTopic(String topic) { @Override public ResponseData deleteTopics(Collection<String> topics) {
Tuple2<Object, String> tuple2 = topicConsole.deleteTopic(topic); Tuple2<Object, String> tuple2 = topicConsole.deleteTopics(topics);
return (Boolean) tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2); return (Boolean) tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2);
} }

View File

@@ -66,17 +66,17 @@ class TopicConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaConfig
/** /**
* delete topic by topic name. * delete topic by topic name.
* *
* @param topic topic name. * @param topics topic name list.
* @return result or : fail message. * @return result or : fail message.
*/ */
def deleteTopic(topic: String): (Boolean, String) = { def deleteTopics(topics: util.Collection[String]): (Boolean, String) = {
withAdminClientAndCatchError(admin => { withAdminClientAndCatchError(admin => {
val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs() val timeoutMs = ContextConfigHolder.CONTEXT_CONFIG.get().getRequestTimeoutMs()
admin.deleteTopics(Collections.singleton(topic), new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS) admin.deleteTopics(topics, new DeleteTopicsOptions().retryOnQuotaViolation(false)).all().get(timeoutMs, TimeUnit.MILLISECONDS)
(true, "") (true, "")
}, },
e => { e => {
log.error("delete topic error, topic: " + topic, e) log.error("delete topic error, topic: " + topics, e)
(false, e.getMessage) (false, e.getMessage)
}).asInstanceOf[(Boolean, String)] }).asInstanceOf[(Boolean, String)]
} }

View File

@@ -49,10 +49,26 @@
<a-button type="primary" @click="openCreateTopicDialog" <a-button type="primary" @click="openCreateTopicDialog"
>新增</a-button >新增</a-button
> >
<a-popconfirm
title="删除这些Topic?"
ok-text="确认"
cancel-text="取消"
@confirm="deleteTopics(selectedRowKeys)"
>
<a-button type="danger" class="btn-left" :disabled="!hasSelected" :loading="loading">
批量删除
</a-button>
</a-popconfirm>
<span style="margin-left: 8px">
<template v-if="hasSelected">
{{ `已选择 ${selectedRowKeys.length} 个Topic` }}
</template>
</span>
</div> </div>
<a-table <a-table
:columns="columns" :columns="columns"
:data-source="filteredData" :data-source="filteredData"
:row-selection="{ selectedRowKeys: selectedRowKeys, onChange: onSelectChange }"
bordered bordered
row-key="name" row-key="name"
> >
@@ -225,8 +241,14 @@ export default {
filterTopic: "", filterTopic: "",
filteredData: [], filteredData: [],
type: "normal", type: "normal",
selectedRowKeys: [], // Check here to configure the default column
}; };
}, },
computed: {
hasSelected() {
return this.selectedRowKeys.length > 0;
},
},
methods: { methods: {
handleSearch(e) { handleSearch(e) {
e.preventDefault(); e.preventDefault();
@@ -256,14 +278,16 @@ export default {
} }
}); });
}, },
deleteTopic(topic) { deleteTopics(topics) {
request({ request({
url: KafkaTopicApi.deleteTopic.url + "?topic=" + topic, url: KafkaTopicApi.deleteTopic.url,
method: KafkaTopicApi.deleteTopic.method, method: KafkaTopicApi.deleteTopic.method,
data: topics
}).then((res) => { }).then((res) => {
if (res.code == 0) { if (res.code == 0) {
this.$message.success(res.msg); this.$message.success(res.msg);
this.getTopicList(); this.getTopicList();
this.selectedRowKeys = [];
} else { } else {
notification.error({ notification.error({
message: "error", message: "error",
@@ -272,6 +296,9 @@ export default {
} }
}); });
}, },
deleteTopic(topic) {
this.deleteTopics([topic])
},
onTopicUpdate(input) { onTopicUpdate(input) {
this.filterTopic = input.target.value; this.filterTopic = input.target.value;
this.filter(); this.filter();
@@ -342,9 +369,13 @@ export default {
closeThrottleDialog() { closeThrottleDialog() {
this.showThrottleDialog = false; this.showThrottleDialog = false;
}, },
onSelectChange(selectedRowKeys) {
this.selectedRowKeys = selectedRowKeys;
},
}, },
created() { created() {
this.getTopicList(); this.getTopicList();
this.selectedRowKeys = [];
}, },
}; };
@@ -431,4 +462,8 @@ const columns = [
.type-select { .type-select {
width: 200px !important; width: 200px !important;
} }
.btn-left {
margin-left: 1%;
}
</style> </style>