From 9deb37e9dc6f9b3e637cf868a354fb94ff4e9971 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Sun, 26 Sep 2021 21:06:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9F=A5=E7=9C=8B=E6=B6=88=E8=B4=B9=E7=AB=AF?= =?UTF-8?q?=E6=88=90=E5=91=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/console/beans/TopicPartition.java | 29 ++++ .../console/beans/vo/ConsumerMemberVO.java | 38 ++++++ .../controller/ConsumerController.java | 6 + .../console/service/ConsumerService.java | 2 + .../service/impl/ConsumerServiceImpl.java | 24 ++++ src/main/resources/application.yml | 6 +- ui/src/utils/api.js | 4 + ui/src/views/group/Group.vue | 22 +++- ui/src/views/group/Member.vue | 124 ++++++++++++++++++ 9 files changed, 249 insertions(+), 6 deletions(-) create mode 100644 src/main/java/com/xuxd/kafka/console/beans/TopicPartition.java create mode 100644 src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerMemberVO.java create mode 100644 ui/src/views/group/Member.vue diff --git a/src/main/java/com/xuxd/kafka/console/beans/TopicPartition.java b/src/main/java/com/xuxd/kafka/console/beans/TopicPartition.java new file mode 100644 index 0000000..fa6aab2 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/TopicPartition.java @@ -0,0 +1,29 @@ +package com.xuxd.kafka.console.beans; + +import lombok.Data; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-26 20:32:15 + **/ +@Data +public class TopicPartition implements Comparable { + + private final String topic; + + private final int partition; + + @Override public int compareTo(Object o) { + if (o == null) { + return -1; + } + TopicPartition other = (TopicPartition) o; + if (!this.topic.equals(other.getTopic())) { + return this.compareTo(other); + } + + return this.partition - other.partition; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerMemberVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerMemberVO.java new file mode 100644 index 0000000..620c696 --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerMemberVO.java @@ -0,0 +1,38 @@ +package com.xuxd.kafka.console.beans.vo; + +import com.xuxd.kafka.console.beans.TopicPartition; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import lombok.Data; +import org.apache.kafka.clients.admin.MemberDescription; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-09-26 20:02:59 + **/ +@Data +public class ConsumerMemberVO { + + private String memberId; + private String groupInstanceId; + private String clientId; + private String host; + private List partitions; + + public static ConsumerMemberVO from(MemberDescription description) { + ConsumerMemberVO vo = new ConsumerMemberVO(); + vo.setMemberId(description.consumerId()); + vo.setGroupInstanceId(description.groupInstanceId().orElse("")); + vo.setClientId(description.clientId()); + vo.setHost(description.host()); + List collect = description.assignment().topicPartitions().stream().map(t -> new TopicPartition(t.topic(), t.partition())).collect(Collectors.toList()); + collect.sort(Comparator.naturalOrder()); + vo.setPartitions(collect); + + return vo; + } + +} diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java index c558c76..bb0a7c4 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java @@ -12,6 +12,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.ConsumerGroupState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -49,4 +50,9 @@ public class ConsumerController { public Object deleteConsumerGroup(@RequestParam String groupId) { return consumerService.deleteConsumerGroup(groupId); } + + @GetMapping("/member") + public Object getConsumerMembers(@RequestParam String groupId) { + return consumerService.getConsumerMembers(groupId); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java index 06d3218..5ad1a9f 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java @@ -16,4 +16,6 @@ public interface ConsumerService { ResponseData getConsumerGroupList(List groupIds, Set states); ResponseData deleteConsumerGroup(String groupId); + + ResponseData getConsumerMembers(String groupId); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index da84821..246237a 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -3,7 +3,9 @@ package com.xuxd.kafka.console.service.impl; import com.xuxd.kafka.console.beans.CounterList; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.vo.ConsumerGroupVO; +import com.xuxd.kafka.console.beans.vo.ConsumerMemberVO; import com.xuxd.kafka.console.service.ConsumerService; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -11,6 +13,9 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; import kafka.console.ConsumerConsole; +import org.apache.commons.collections.CollectionUtils; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.common.ConsumerGroupState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -29,6 +34,7 @@ public class ConsumerServiceImpl implements ConsumerService { private ConsumerConsole consumerConsole; @Override public ResponseData getConsumerGroupList(List groupIds, Set states) { + String simulateGroup = "inner_xxx_not_exit_group_###"; Set groupList = new HashSet<>(); if (groupIds != null && !groupIds.isEmpty()) { if (states != null && !states.isEmpty()) { @@ -44,8 +50,15 @@ public class ConsumerServiceImpl implements ConsumerService { } } else { groupList.addAll(consumerConsole.getConsumerGroupIdList(states)); + if (groupList.isEmpty()) { + // The consumer groupId that match the specified states could not find, so simulate an impossible groupId. + groupList.add(simulateGroup); + } } List consumerGroupVOS = consumerConsole.getConsumerGroupList(groupList).stream().map(c -> ConsumerGroupVO.from(c)).collect(Collectors.toList()); + if (consumerGroupVOS.size() == 1 && consumerGroupVOS.get(0).getGroupId().equals(simulateGroup)) { + consumerGroupVOS.clear(); + } consumerGroupVOS.sort(Comparator.comparing(ConsumerGroupVO::getGroupId)); return ResponseData.create().data(new CounterList<>(consumerGroupVOS)).success(); } @@ -54,4 +67,15 @@ public class ConsumerServiceImpl implements ConsumerService { Tuple2 tuple2 = consumerConsole.deleteConsumerGroups(Collections.singletonList(groupId)); return (Boolean) tuple2._1 ? ResponseData.create().success() : ResponseData.create().failed(tuple2._2); } + + @Override public ResponseData getConsumerMembers(String groupId) { + Set groupList = consumerConsole.getConsumerGroupList(Collections.singleton(groupId)); + if (CollectionUtils.isEmpty(groupList)) { + return ResponseData.create().data(Collections.emptyList()).success(); + } + Collection members = groupList.stream().findFirst().get().members(); + List vos = members.stream().map(ConsumerMemberVO::from).collect(Collectors.toList()); + vos.sort(Comparator.comparing(ConsumerMemberVO::getClientId)); + return ResponseData.create().data(vos).success(); + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 903115c..e10f001 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -7,7 +7,7 @@ server: kafka: config: # kafka broker地址,多个以逗号分隔 - bootstrap-server: 'localhost:9092' + bootstrap-server: '10.100.64.48:9092,10.100.77.250:9092,10.100.73.154:9092' request-timeout-ms: 60000 # 服务端是否启用acl,如果不启用,下面的几项都忽略即可 enable-acl: true @@ -17,11 +17,11 @@ kafka: # 超级管理员用户名,在broker上已经配置为超级管理员 admin-username: admin # 超级管理员密码 - admin-password: admin + admin-password: admin!QAZ # 启动自动创建配置的超级管理员用户 admin-create: false # broker连接的zk地址 - zookeeper-addr: 'localhost:2181' + zookeeper-addr: '10.100.64.48:5181,10.100.77.250:5181,10.100.73.154:5181' sasl-jaas-config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${kafka.config.admin-username}" password="${kafka.config.admin-password}"; spring: diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 1bdea79..e8219df 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -80,4 +80,8 @@ export const KafkaConsumerApi = { url: "/consumer/group", method: "delete", }, + getConsumerMembers: { + url: "/consumer/member", + method: "get", + }, }; diff --git a/ui/src/views/group/Group.vue b/ui/src/views/group/Group.vue index 4444996..65a4857 100644 --- a/ui/src/views/group/Group.vue +++ b/ui/src/views/group/Group.vue @@ -69,8 +69,10 @@ bordered row-key="groupId" > -
- {{ text }} +
@@ -91,6 +93,11 @@
+
@@ -100,10 +107,11 @@ import request from "@/utils/request"; import { KafkaConsumerApi } from "@/utils/api"; import notification from "ant-design-vue/es/notification"; +import Member from "@/views/group/Member"; export default { name: "ConsumerGroup", - components: {}, + components: { Member }, data() { return { queryParam: {}, @@ -121,6 +129,7 @@ export default { username: "", }, loading: false, + showConsumerGroupDialog: false, }; }, methods: { @@ -163,6 +172,13 @@ export default { } }); }, + openConsumerMemberDialog(groupId) { + this.showConsumerGroupDialog = true; + this.selectDetail.resourceName = groupId; + }, + closeConsumerDialog() { + this.showConsumerGroupDialog = false; + }, }, created() { this.getConsumerGroupList(); diff --git a/ui/src/views/group/Member.vue b/ui/src/views/group/Member.vue new file mode 100644 index 0000000..2651dbc --- /dev/null +++ b/ui/src/views/group/Member.vue @@ -0,0 +1,124 @@ + + + + +