diff --git a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java index 0de784a..d5993b1 100644 --- a/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java +++ b/src/main/java/com/xuxd/kafka/console/controller/ConsumerController.java @@ -116,4 +116,9 @@ public class ConsumerController { public Object getSubscribeTopicList(@RequestParam String groupId) { return consumerService.getSubscribeTopicList(groupId); } + + @GetMapping("/topic/subscribed") + public Object getTopicSubscribedByGroups(@RequestParam String topic) { + return consumerService.getTopicSubscribedByGroups(topic); + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java index eb1e2da..7e04436 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java @@ -32,4 +32,6 @@ public interface ConsumerService { ResponseData getGroupIdList(); ResponseData getSubscribeTopicList(String groupId); + + ResponseData getTopicSubscribedByGroups(String topic); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index 7b8f75c..4e28e88 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -38,6 +38,8 @@ public class ConsumerServiceImpl implements ConsumerService { @Autowired private ConsumerConsole consumerConsole; + private TopicSubscribedInfo topicSubscribedInfo = new TopicSubscribedInfo(); + @Override public ResponseData getConsumerGroupList(List groupIds, Set states) { String simulateGroup = "inner_xxx_not_exit_group_###" + System.currentTimeMillis(); Set groupList = new HashSet<>(); @@ -142,4 +144,63 @@ public class ConsumerServiceImpl implements ConsumerService { @Override public ResponseData getSubscribeTopicList(String groupId) { return ResponseData.create().data(consumerConsole.listSubscribeTopics(groupId).keySet()).success(); } + + @Override public ResponseData getTopicSubscribedByGroups(String topic) { + if (topicSubscribedInfo.isNeedRefresh(topic)) { + Set groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet()); + Map> cache = new HashMap<>(); + Map> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList); + + subscribeTopics.forEach((groupId, tl) -> { + tl.forEach(topicPartition -> { + String t = topicPartition.topic(); + if (!cache.containsKey(t)) { + cache.put(t, new HashSet<>()); + } + cache.get(t).add(groupId); + }); + }); + + topicSubscribedInfo.refresh(cache); + } + + Set groups = topicSubscribedInfo.getSubscribedGroups(topic); + + Map res = new HashMap<>(); + Collection consumerDetail = consumerConsole.getConsumerDetail(groups); + List collect = consumerDetail.stream().filter(c -> topic.equals(c.topicPartition().topic())).map(ConsumerDetailVO::from).collect(Collectors.toList()); + Map> map = collect.stream().collect(Collectors.groupingBy(ConsumerDetailVO::getGroupId)); + + map.forEach((groupId, list) -> { + Map sorting = new HashMap<>(); + Collections.sort(list); + sorting.put("data", list); + sorting.put("lag", list.stream().map(ConsumerDetailVO::getLag).reduce(Long::sum)); + res.put(groupId, sorting); + }); + + return ResponseData.create().data(res).success(); + } + + class TopicSubscribedInfo { + long lastTime = System.currentTimeMillis(); + + long refreshThreshold = 120 * 1000; + + Map> cache = new HashMap<>(); + + public void refresh(Map> newCache) { + cache = newCache; + lastTime = System.currentTimeMillis(); + } + + public Set getSubscribedGroups(String topic) { + return cache.getOrDefault(topic, Collections.emptySet()); + } + + public boolean isNeedRefresh(String topic) { + return System.currentTimeMillis() - lastTime > refreshThreshold || !cache.containsKey(topic); + } + + } } diff --git a/src/main/scala/kafka/console/ConsumerConsole.scala b/src/main/scala/kafka/console/ConsumerConsole.scala index 8125a4b..80edba3 100644 --- a/src/main/scala/kafka/console/ConsumerConsole.scala +++ b/src/main/scala/kafka/console/ConsumerConsole.scala @@ -173,6 +173,10 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon }).asInstanceOf[(Boolean, String)] } + /** + * + * @return k: topic, v: list[topic]. + */ def listSubscribeTopics(groupId: String): util.Map[String, util.List[TopicPartition]] = { val commitOffs = getCommittedOffsets(groupId) val map: util.Map[String, util.List[TopicPartition]] = new util.HashMap[String, util.List[TopicPartition]]() @@ -185,6 +189,32 @@ class ConsumerConsole(config: KafkaConfig) extends KafkaConsole(config: KafkaCon map } + /** + * + * @return k: groupId, v: list[topic]. + */ + def listSubscribeTopics(groups: util.Set[String]): util.Map[String, util.List[TopicPartition]] = { + val map: util.Map[String, util.List[TopicPartition]] = new util.HashMap[String, util.List[TopicPartition]]() + withAdminClientAndCatchError(admin => { + for(groupId <- groups.asScala) { + val commitOffs = admin.listConsumerGroupOffsets( + groupId + ).partitionsToOffsetAndMetadata.get.asScala + + for (t <- commitOffs.keySet) { + if (!map.containsKey(groupId)) { + map.put(groupId, new util.ArrayList[TopicPartition]()) + } + map.get(groupId).add(t) + } + } + map + }, e => { + log.error("listSubscribeTopics error.", e) + map + }).asInstanceOf[util.Map[String, util.List[TopicPartition]]] + } + private def describeConsumerGroups(groupIds: util.Set[String]): mutable.Map[String, ConsumerGroupDescription] = { withAdminClientAndCatchError(admin => { admin.describeConsumerGroups(groupIds).describedGroups().asScala.map { diff --git a/ui/src/utils/api.js b/ui/src/utils/api.js index 5a4a31f..3b170de 100644 --- a/ui/src/utils/api.js +++ b/ui/src/utils/api.js @@ -116,6 +116,10 @@ export const KafkaConsumerApi = { url: "/consumer/topic/list", method: "get", }, + getTopicSubscribedByGroups: { + url: "/consumer/topic/subscribed", + method: "get", + }, }; export const KafkaClusterApi = { diff --git a/ui/src/views/topic/ConsumedDetail.vue b/ui/src/views/topic/ConsumedDetail.vue new file mode 100644 index 0000000..2b92e0c --- /dev/null +++ b/ui/src/views/topic/ConsumedDetail.vue @@ -0,0 +1,190 @@ + + + + + diff --git a/ui/src/views/topic/Topic.vue b/ui/src/views/topic/Topic.vue index d06ce86..3f47177 100644 --- a/ui/src/views/topic/Topic.vue +++ b/ui/src/views/topic/Topic.vue @@ -77,6 +77,13 @@ @click="openAddPartitionDialog(record.name)" >增加分区 + 消费详情 + + + @@ -106,10 +119,11 @@ import notification from "ant-design-vue/es/notification"; import PartitionInfo from "@/views/topic/PartitionInfo"; import CreateTopic from "@/views/topic/CreateTopic"; import AddPartition from "@/views/topic/AddPartition"; +import ConsumedDetail from "@/views/topic/ConsumedDetail"; export default { name: "Topic", - components: { PartitionInfo, CreateTopic, AddPartition }, + components: { PartitionInfo, CreateTopic, AddPartition, ConsumedDetail }, data() { return { queryParam: { type: "normal" }, @@ -128,6 +142,7 @@ export default { loading: false, showCreateTopic: false, showAddPartition: false, + showConsumedDetailDialog: false, }; }, methods: { @@ -194,6 +209,13 @@ export default { this.getTopicList(); } }, + openConsumedDetailDialog(topic) { + this.showConsumedDetailDialog = true; + this.selectDetail.resourceName = topic; + }, + closeConsumedDetailDialog() { + this.showConsumedDetailDialog = false; + }, }, created() { this.getTopicList();