From fc7318274079ec6f82f3b330e9e33903395dbd93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=99=93=E4=B8=9C?= <763795151@qq.com> Date: Tue, 12 Oct 2021 20:27:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E8=B4=B9=E8=AF=A6=E6=83=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../console/beans/vo/ConsumerDetailVO.java | 72 +++++++++++ .../service/impl/ConsumerServiceImpl.java | 18 ++- .../scala/kafka/console/ConsumerConsole.scala | 11 +- ui/src/utils/api.js | 4 + ui/src/views/group/ConsumerDetail.vue | 121 ++++++++++++++++++ ui/src/views/group/Group.vue | 26 +++- 6 files changed, 246 insertions(+), 6 deletions(-) create mode 100644 src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerDetailVO.java create mode 100644 ui/src/views/group/ConsumerDetail.vue diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerDetailVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerDetailVO.java new file mode 100644 index 0000000..fd7462e --- /dev/null +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/ConsumerDetailVO.java @@ -0,0 +1,72 @@ +package com.xuxd.kafka.console.beans.vo; + +import kafka.console.ConsumerConsole; +import lombok.Data; +import org.apache.commons.lang3.StringUtils; + +/** + * kafka-console-ui. + * + * @author xuxd + * @date 2021-10-12 17:41:44 + **/ +@Data +public class ConsumerDetailVO implements Comparable { + + private String topic = ""; + + private int partition; + + private String groupId = ""; + + private long consumerOffset = 0L; + + private long logEndOffset = 0L; + + private long lag = 0L; + + private String consumerId = ""; + + private String clientId = ""; + + private String host = ""; + + public static ConsumerDetailVO from(ConsumerConsole.TopicPartitionConsumeInfo info) { + ConsumerDetailVO vo = new ConsumerDetailVO(); + vo.topic = info.topicPartition().topic(); + vo.partition = info.topicPartition().partition(); + if (StringUtils.isNotEmpty(info.getGroupId())) { + vo.groupId = info.getGroupId(); + } + + if (StringUtils.isNotEmpty(info.consumerId())) { + vo.consumerId = info.consumerId(); + } + + if (StringUtils.isNotEmpty(info.clientId())) { + vo.clientId = info.clientId(); + } + + if (StringUtils.isNotEmpty(info.host())) { + vo.host = info.host(); + } + vo.consumerOffset = info.consumerOffset(); + vo.logEndOffset = info.logEndOffset(); + vo.lag = info.lag(); + return vo; + } + + @Override public int compareTo(Object o) { + + ConsumerDetailVO that = (ConsumerDetailVO) o; + + if (!this.groupId.equals(that.groupId)) { + return this.groupId.compareTo(that.groupId); + } + if (!this.topic.equals(that.topic)) { + return this.topic.compareTo(that.topic); + } + + return this.partition - that.partition; + } +} diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index fc8bae5..012e514 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -2,14 +2,17 @@ package com.xuxd.kafka.console.service.impl; import com.xuxd.kafka.console.beans.CounterList; import com.xuxd.kafka.console.beans.ResponseData; +import com.xuxd.kafka.console.beans.vo.ConsumerDetailVO; import com.xuxd.kafka.console.beans.vo.ConsumerGroupVO; import com.xuxd.kafka.console.beans.vo.ConsumerMemberVO; import com.xuxd.kafka.console.service.ConsumerService; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import kafka.console.ConsumerConsole; @@ -80,6 +83,19 @@ public class ConsumerServiceImpl implements ConsumerService { } @Override public ResponseData getConsumerDetail(String groupId) { - return ResponseData.create().data(consumerConsole.getConsumerDetail(Collections.singleton(groupId))).success(); + Collection consumerDetail = consumerConsole.getConsumerDetail(Collections.singleton(groupId)); + + List collect = consumerDetail.stream().map(ConsumerDetailVO::from).collect(Collectors.toList()); + Map> map = collect.stream().collect(Collectors.groupingBy(ConsumerDetailVO::getTopic)); + + Map res = new HashMap<>(); + map.forEach((topic, list) -> { + Map sorting = new HashMap<>(); + Collections.sort(list); + sorting.put("data", list); + sorting.put("lag", list.stream().map(ConsumerDetailVO::getLag).reduce(Long::sum)); + res.put(topic, sorting); + }); + return ResponseData.create().data(res).success(); } } diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala index 18fbc66..022d59d 100644 --- a/src/main/scala/kafka/console/ConsumerConsole.scala +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -11,7 +11,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.{ConsumerGroupState, TopicPartition} import scala.beans.BeanProperty -import scala.collection.{Map, Seq, mutable} +import scala.collection.{Map, mutable} import scala.jdk.CollectionConverters._ /** @@ -86,6 +86,7 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon t.logEndOffset = endOffsets.get(t.topicPartition).get.offset() t.consumerOffset = getPartitionOffset(t.topicPartition).get t.lag = t.logEndOffset - t.consumerOffset + t.groupId = consumerGroup.groupId() (topicPartition, t) }).toMap @@ -94,12 +95,16 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon val t = topicPartitionConsumeInfoMap.get(topicPartition).get t.clientId = m.clientId() t.consumerId = m.consumerId() + t.host = m.host() }) }) - topicPartitionConsumeInfoMap + topicPartitionConsumeInfoMap.map(_._2).asInstanceOf[List[TopicPartitionConsumeInfo]] } - groupOffsets.asJava.asInstanceOf[ util.Collection[TopicPartitionConsumeInfo]] + val res = new util.ArrayList[TopicPartitionConsumeInfo]() + groupOffsets.flatMap(_.toList).foreach(res.add(_)) + + res } private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = { diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index d6c3467..8d4eeed 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -84,6 +84,10 @@ export const KafkaConsumerApi = { url: "/consumer/member", method: "get", }, + getConsumerDetail: { + url: "/consumer/detail", + method: "get", + }, }; export const KafkaClusterApi = { diff --git a/ui/src/views/group/ConsumerDetail.vue b/ui/src/views/group/ConsumerDetail.vue new file mode 100644 index 0000000..63a8852 --- /dev/null +++ b/ui/src/views/group/ConsumerDetail.vue @@ -0,0 +1,121 @@ + + + + + diff --git a/ui/src/views/group/Group.vue b/ui/src/views/group/Group.vue index 65a4857..6e2df55 100644 --- a/ui/src/views/group/Group.vue +++ b/ui/src/views/group/Group.vue @@ -61,7 +61,7 @@
- 新增/更新 +
删除 + 消费详情 + + + @@ -108,10 +121,11 @@ import request from "@/utils/request"; import { KafkaConsumerApi } from "@/utils/api"; import notification from "ant-design-vue/es/notification"; import Member from "@/views/group/Member"; +import ConsumerDetail from "@/views/group/ConsumerDetail"; export default { name: "ConsumerGroup", - components: { Member }, + components: { Member, ConsumerDetail }, data() { return { queryParam: {}, @@ -130,6 +144,7 @@ export default { }, loading: false, showConsumerGroupDialog: false, + showConsumerDetailDialog: false, }; }, methods: { @@ -179,6 +194,13 @@ export default { closeConsumerDialog() { this.showConsumerGroupDialog = false; }, + openConsumerDetailDialog(groupId) { + this.showConsumerDetailDialog = true; + this.selectDetail.resourceName = groupId; + }, + closeConsumerDetailDialog() { + this.showConsumerDetailDialog = false; + }, }, created() { this.getConsumerGroupList();