diff --git a/README.md b/README.md index baed8e0..e1f18c6 100644 --- a/README.md +++ b/README.md @@ -84,4 +84,6 @@ sh bin/shutdown.sh ![集群](./document/集群.png) ![Topic](./document/Topic.png) ![消费组](./document/消费组.png) -![运维](./document/运维.png) \ No newline at end of file +![运维](./document/运维.png) +增加消息检索页面 +![消息](./document/消息.png) \ No newline at end of file diff --git a/document/消息.png b/document/消息.png new file mode 100644 index 0000000..e6f8ac7 Binary files /dev/null and b/document/消息.png differ diff --git a/src/main/java/com/xuxd/kafka/console/beans/vo/MessageDetailVO.java b/src/main/java/com/xuxd/kafka/console/beans/vo/MessageDetailVO.java index f97fb67..c305487 100644 --- a/src/main/java/com/xuxd/kafka/console/beans/vo/MessageDetailVO.java +++ b/src/main/java/com/xuxd/kafka/console/beans/vo/MessageDetailVO.java @@ -29,10 +29,19 @@ public class MessageDetailVO { private Object value; + private List consumers; + @Data public static class HeaderVO { String key; String value; } + + @Data + public static class ConsumerVO { + String groupId; + + String status; + } } diff --git a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java index cd4726c..22990c7 100644 --- a/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java +++ b/src/main/java/com/xuxd/kafka/console/service/ConsumerService.java @@ -38,4 +38,6 @@ public interface ConsumerService { ResponseData getTopicSubscribedByGroups(String topic); ResponseData getOffsetPartition(String groupId); + + ResponseData> getSubscribedGroups(String topic); } diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java index eb47edf..5408199 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/ConsumerServiceImpl.java @@ -15,6 +15,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import kafka.console.ConsumerConsole; import kafka.console.TopicConsole; @@ -48,6 +49,8 @@ public class ConsumerServiceImpl implements ConsumerService { @Autowired private TopicConsole topicConsole; + private ReentrantLock lock = new ReentrantLock(); + @Override public ResponseData getConsumerGroupList(List groupIds, Set states) { String simulateGroup = "inner_xxx_not_exit_group_###" + System.currentTimeMillis(); Set groupList = new HashSet<>(); @@ -167,25 +170,7 @@ public class ConsumerServiceImpl implements ConsumerService { } @Override public ResponseData getTopicSubscribedByGroups(String topic) { - if (topicSubscribedInfo.isNeedRefresh(topic)) { - Set groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet()); - Map> cache = new HashMap<>(); - Map> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList); - - subscribeTopics.forEach((groupId, tl) -> { - tl.forEach(topicPartition -> { - String t = topicPartition.topic(); - if (!cache.containsKey(t)) { - cache.put(t, new HashSet<>()); - } - cache.get(t).add(groupId); - }); - }); - - topicSubscribedInfo.refresh(cache); - } - - Set groups = topicSubscribedInfo.getSubscribedGroups(topic); + Set groups = this.getSubscribedGroups(topic).getData(); Map res = new HashMap<>(); Collection consumerDetail = consumerConsole.getConsumerDetail(groups); @@ -212,6 +197,34 @@ public class ConsumerServiceImpl implements ConsumerService { return ResponseData.create().data(Utils.abs(groupId.hashCode()) % size); } + @Override public ResponseData> getSubscribedGroups(String topic) { + if (topicSubscribedInfo.isNeedRefresh(topic) && !lock.isLocked()) { + try { + lock.lock(); + Set groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet()); + Map> cache = new HashMap<>(); + Map> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList); + + subscribeTopics.forEach((groupId, tl) -> { + tl.forEach(topicPartition -> { + String t = topicPartition.topic(); + if (!cache.containsKey(t)) { + cache.put(t, new HashSet<>()); + } + cache.get(t).add(groupId); + }); + }); + + topicSubscribedInfo.refresh(cache); + } finally { + lock.unlock(); + } + } + + Set groups = topicSubscribedInfo.getSubscribedGroups(topic); + return ResponseData.create(Set.class).data(groups).success(); + } + class TopicSubscribedInfo { long lastTime = System.currentTimeMillis(); diff --git a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java index c229c2f..82a0052 100644 --- a/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java +++ b/src/main/java/com/xuxd/kafka/console/service/impl/MessageServiceImpl.java @@ -4,14 +4,18 @@ import com.xuxd.kafka.console.beans.QueryMessage; import com.xuxd.kafka.console.beans.ResponseData; import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO; import com.xuxd.kafka.console.beans.vo.MessageDetailVO; +import com.xuxd.kafka.console.service.ConsumerService; import com.xuxd.kafka.console.service.MessageService; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import kafka.console.ConsumerConsole; import kafka.console.MessageConsole; import kafka.console.TopicConsole; import org.apache.commons.collections.CollectionUtils; @@ -24,7 +28,10 @@ import org.apache.kafka.common.serialization.DoubleDeserializer; import org.apache.kafka.common.serialization.FloatDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Service; /** @@ -34,7 +41,7 @@ import org.springframework.stereotype.Service; * @date 2021-12-11 09:43:44 **/ @Service -public class MessageServiceImpl implements MessageService { +public class MessageServiceImpl implements MessageService, ApplicationContextAware { @Autowired private MessageConsole messageConsole; @@ -42,6 +49,11 @@ public class MessageServiceImpl implements MessageService { @Autowired private TopicConsole topicConsole; + @Autowired + private ConsumerConsole consumerConsole; + + private ApplicationContext applicationContext; + private Map deserializerDict = new HashMap<>(); { @@ -112,6 +124,21 @@ public class MessageServiceImpl implements MessageService { vo.getHeaders().add(headerVO); }); + // 为了尽量保持代码好看,不直接注入另一个service层的实现类了 + Set groupIds = applicationContext.getBean(ConsumerService.class).getSubscribedGroups(record.topic()).getData(); + Collection consumerDetail = consumerConsole.getConsumerDetail(groupIds); + + List consumerVOS = new LinkedList<>(); + consumerDetail.forEach(consumerInfo -> { + if (consumerInfo.topicPartition().equals(new TopicPartition(record.topic(), record.partition()))) { + MessageDetailVO.ConsumerVO consumerVO = new MessageDetailVO.ConsumerVO(); + consumerVO.setGroupId(consumerInfo.getGroupId()); + consumerVO.setStatus(consumerInfo.getConsumerOffset() < record.offset() ? "unconsume" : "consumed"); + consumerVOS.add(consumerVO); + } + }); + + vo.setConsumers(consumerVOS); return ResponseData.create().data(vo).success(); } return ResponseData.create().failed("Not found message detail."); @@ -147,4 +174,8 @@ public class MessageServiceImpl implements MessageService { } return partitions; } + + @Override public void setApplicationContext(ApplicationContext context) throws BeansException { + this.applicationContext = context; + } } diff --git a/ui/src/views/message/Message.vue b/ui/src/views/message/Message.vue index 66eec63..3f6875b 100644 --- a/ui/src/views/message/Message.vue +++ b/ui/src/views/message/Message.vue @@ -5,10 +5,10 @@ - + - 消息发送1 + diff --git a/ui/src/views/message/MessageDetail.vue b/ui/src/views/message/MessageDetail.vue index cbf82a8..6852bb0 100644 --- a/ui/src/views/message/MessageDetail.vue +++ b/ui/src/views/message/MessageDetail.vue @@ -92,6 +92,21 @@

+
+

消费信息

+
+ +
+ 已消费未消费 +
+
+
@@ -120,6 +135,8 @@ export default { deserializerList: [], keyDeserializer: "String", valueDeserializer: "String", + consumerDetail: [], + columns, }; }, watch: { @@ -184,6 +201,19 @@ export default { }, }, }; +const columns = [ + { + title: "消费组", + dataIndex: "groupId", + key: "groupId", + }, + { + title: "消费情况", + dataIndex: "status", + key: "status", + scopedSlots: { customRender: "status" }, + }, +];