diff --git a/src/main/java/com/xuxd/kafka/console/CounterSet.java b/src/main/java/com/xuxd/kafka/console/CounterSet.java deleted file mode 100644 index c2a1775..0000000 --- a/src/main/java/com/xuxd/kafka/console/CounterSet.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.xuxd.kafka.console; - -/** - * kafka-console-ui. - * - * @author xuxd - * @date 2021-09-10 20:03:01 - **/ -public class CounterSet { -} diff --git a/src/main/java/com/xuxd/kafka/console/beans/dto/QueryConsumerGroupDTO.java b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryConsumerGroupDTO.java index 2d4180f..ae8166a 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/dto/QueryConsumerGroupDTO.java +++ b/src/main/java/com/xuxd/kafka/console/beans/dto/QueryConsumerGroupDTO.java @@ -14,5 +14,5 @@ public class QueryConsumerGroupDTO { private String groupId; - private List State; + private List states; } diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java index 1103c3f..c558c76 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java @@ -11,9 +11,11 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.ConsumerGroupState; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** @@ -29,7 +31,7 @@ public class ConsumerController { @Autowired private ConsumerService consumerService; - @GetMapping("/group/list") + @PostMapping("/group/list") public Object getGroupList(@RequestBody(required = false) QueryConsumerGroupDTO dto) { if (Objects.isNull(dto)) { return consumerService.getConsumerGroupList(null, null); @@ -37,9 +39,14 @@ public class ConsumerController { List groupIdList = StringUtils.isNotBlank(dto.getGroupId()) ? Collections.singletonList(dto.getGroupId()) : Collections.emptyList(); Set stateSet = new HashSet<>(); - if (CollectionUtils.isNotEmpty(dto.getState())) { - dto.getState().stream().forEach(s -> stateSet.add(ConsumerGroupState.valueOf(s))); + if (CollectionUtils.isNotEmpty(dto.getStates())) { + dto.getStates().stream().forEach(s -> stateSet.add(ConsumerGroupState.valueOf(s.toUpperCase()))); } return consumerService.getConsumerGroupList(groupIdList, stateSet); } + + @DeleteMapping("/group") + public Object deleteConsumerGroup(@RequestParam String groupId) { + return consumerService.deleteConsumerGroup(groupId); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java index f83a179..06d3218 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java @@ -14,4 +14,6 @@ import org.apache.kafka.common.ConsumerGroupState; public interface ConsumerService { ResponseData getConsumerGroupList(List groupIds, Set states); + + ResponseData deleteConsumerGroup(String groupId); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index 2f393b3..da84821 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -5,6 +5,7 @@ import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.vo.ConsumerGroupVO; import com.xuxd.kafka.console.service.ConsumerService; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -13,6 +14,7 @@ import kafka.console.ConsumerConsole; import org.apache.kafka.common.ConsumerGroupState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import scala.Tuple2; /** * kafka-console-ui. @@ -44,6 +46,12 @@ public class ConsumerServiceImpl implements ConsumerService { groupList.addAll(consumerConsole.getConsumerGroupIdList(states)); } List consumerGroupVOS = consumerConsole.getConsumerGroupList(groupList).stream().map(c -> ConsumerGroupVO.from(c)).collect(Collectors.toList()); + consumerGroupVOS.sort(Comparator.comparing(ConsumerGroupVO::getGroupId)); return ResponseData.create().data(new CounterList<>(consumerGroupVOS)).success(); } + + @Override public ResponseData deleteConsumerGroup(String groupId) { + Tuple2 tuple2 = consumerConsole.deleteConsumerGroups(Collections.singletonList(groupId)); + return (Boolean) tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2); + } } diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala index 1865df3..7c425f5 100644 --- a/src/main/scala/kafka/console/ConsumerConsole.scala +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -4,7 +4,7 @@ import java.util import java.util.{Collections, Set} import com.xuxd.kafka.console.config.KafkaConfig -import org.apache.kafka.clients.admin.{ConsumerGroupDescription, ListConsumerGroupsOptions} +import org.apache.kafka.clients.admin.{ConsumerGroupDescription, DeleteConsumerGroupsOptions, ListConsumerGroupsOptions} import org.apache.kafka.common.ConsumerGroupState import scala.jdk.CollectionConverters.{CollectionHasAsScala, SetHasAsJava} @@ -35,4 +35,19 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon Collections.emptySet() }).asInstanceOf[Set[ConsumerGroupDescription]] } + + def deleteConsumerGroups(groupIds: util.Collection[String]): (Boolean, String) = { + if (groupIds == null || groupIds.isEmpty) { + (false, "group id is empty.") + } else { + withAdminClientAndCatchError(admin => { + admin.deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions).all().get() + (true, "") + } + , e => { + log.error("deleteConsumerGroups error.", e) + (false, e.getMessage) + }).asInstanceOf[(Boolean, String)] + } + } } diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index ad0f25f..1ffbc62 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -70,6 +70,10 @@ export const KafkaTopicApi = { export const KafkaConsumerApi = { getConsumerGroupList: { url: "/consumer/group/list", - method: "get", + method: "post", + }, + deleteConsumerGroup: { + url: "/consumer/group", + method: "delete", }, }; diff --git a/ui/src/views/group/Group.vue b/ui/src/views/group/Group.vue index 14233d9..b24b080 100644 --- a/ui/src/views/group/Group.vue +++ b/ui/src/views/group/Group.vue @@ -9,29 +9,43 @@ > - + - - - - 所有 - 普通 - 系统 - + + + + + + Empty + + + + PreparingRebalance + + + + + CompletingRebalance + + + + Stable + + + Dead + + + - + 搜索 @@ -65,11 +79,11 @@ :title="'删除消费组: ' + record.groupId + '?'" ok-text="确认" cancel-text="取消" - @confirm="deleteTopic(record.groupId)" + @confirm="deleteGroup(record.groupId)" > 删除 + >删除 + @@ -79,18 +93,21 @@