查询消息详情的时候展示消费情况
This commit is contained in:
@@ -29,10 +29,19 @@ public class MessageDetailVO {
|
||||
|
||||
private Object value;
|
||||
|
||||
private List<ConsumerVO> consumers;
|
||||
|
||||
@Data
|
||||
public static class HeaderVO {
|
||||
String key;
|
||||
|
||||
String value;
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class ConsumerVO {
|
||||
String groupId;
|
||||
|
||||
String status;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,4 +38,6 @@ public interface ConsumerService {
|
||||
ResponseData getTopicSubscribedByGroups(String topic);
|
||||
|
||||
ResponseData getOffsetPartition(String groupId);
|
||||
|
||||
ResponseData<Set<String>> getSubscribedGroups(String topic);
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
import kafka.console.ConsumerConsole;
|
||||
import kafka.console.TopicConsole;
|
||||
@@ -48,6 +49,8 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
@Autowired
|
||||
private TopicConsole topicConsole;
|
||||
|
||||
private ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
@Override public ResponseData getConsumerGroupList(List<String> groupIds, Set<ConsumerGroupState> states) {
|
||||
String simulateGroup = "inner_xxx_not_exit_group_###" + System.currentTimeMillis();
|
||||
Set<String> groupList = new HashSet<>();
|
||||
@@ -167,25 +170,7 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
}
|
||||
|
||||
@Override public ResponseData getTopicSubscribedByGroups(String topic) {
|
||||
if (topicSubscribedInfo.isNeedRefresh(topic)) {
|
||||
Set<String> groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet());
|
||||
Map<String, Set<String>> cache = new HashMap<>();
|
||||
Map<String, List<TopicPartition>> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList);
|
||||
|
||||
subscribeTopics.forEach((groupId, tl) -> {
|
||||
tl.forEach(topicPartition -> {
|
||||
String t = topicPartition.topic();
|
||||
if (!cache.containsKey(t)) {
|
||||
cache.put(t, new HashSet<>());
|
||||
}
|
||||
cache.get(t).add(groupId);
|
||||
});
|
||||
});
|
||||
|
||||
topicSubscribedInfo.refresh(cache);
|
||||
}
|
||||
|
||||
Set<String> groups = topicSubscribedInfo.getSubscribedGroups(topic);
|
||||
Set<String> groups = this.getSubscribedGroups(topic).getData();
|
||||
|
||||
Map<String, Object> res = new HashMap<>();
|
||||
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(groups);
|
||||
@@ -212,6 +197,34 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
return ResponseData.create().data(Utils.abs(groupId.hashCode()) % size);
|
||||
}
|
||||
|
||||
@Override public ResponseData<Set<String>> getSubscribedGroups(String topic) {
|
||||
if (topicSubscribedInfo.isNeedRefresh(topic) && !lock.isLocked()) {
|
||||
try {
|
||||
lock.lock();
|
||||
Set<String> groupIdList = consumerConsole.getConsumerGroupIdList(Collections.emptySet());
|
||||
Map<String, Set<String>> cache = new HashMap<>();
|
||||
Map<String, List<TopicPartition>> subscribeTopics = consumerConsole.listSubscribeTopics(groupIdList);
|
||||
|
||||
subscribeTopics.forEach((groupId, tl) -> {
|
||||
tl.forEach(topicPartition -> {
|
||||
String t = topicPartition.topic();
|
||||
if (!cache.containsKey(t)) {
|
||||
cache.put(t, new HashSet<>());
|
||||
}
|
||||
cache.get(t).add(groupId);
|
||||
});
|
||||
});
|
||||
|
||||
topicSubscribedInfo.refresh(cache);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
Set<String> groups = topicSubscribedInfo.getSubscribedGroups(topic);
|
||||
return ResponseData.create(Set.class).data(groups).success();
|
||||
}
|
||||
|
||||
class TopicSubscribedInfo {
|
||||
long lastTime = System.currentTimeMillis();
|
||||
|
||||
|
||||
@@ -4,14 +4,18 @@ import com.xuxd.kafka.console.beans.QueryMessage;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.vo.ConsumerRecordVO;
|
||||
import com.xuxd.kafka.console.beans.vo.MessageDetailVO;
|
||||
import com.xuxd.kafka.console.service.ConsumerService;
|
||||
import com.xuxd.kafka.console.service.MessageService;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import kafka.console.ConsumerConsole;
|
||||
import kafka.console.MessageConsole;
|
||||
import kafka.console.TopicConsole;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
@@ -24,7 +28,10 @@ import org.apache.kafka.common.serialization.DoubleDeserializer;
|
||||
import org.apache.kafka.common.serialization.FloatDeserializer;
|
||||
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
@@ -34,7 +41,7 @@ import org.springframework.stereotype.Service;
|
||||
* @date 2021-12-11 09:43:44
|
||||
**/
|
||||
@Service
|
||||
public class MessageServiceImpl implements MessageService {
|
||||
public class MessageServiceImpl implements MessageService, ApplicationContextAware {
|
||||
|
||||
@Autowired
|
||||
private MessageConsole messageConsole;
|
||||
@@ -42,6 +49,11 @@ public class MessageServiceImpl implements MessageService {
|
||||
@Autowired
|
||||
private TopicConsole topicConsole;
|
||||
|
||||
@Autowired
|
||||
private ConsumerConsole consumerConsole;
|
||||
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
private Map<String, Deserializer> deserializerDict = new HashMap<>();
|
||||
|
||||
{
|
||||
@@ -112,6 +124,21 @@ public class MessageServiceImpl implements MessageService {
|
||||
vo.getHeaders().add(headerVO);
|
||||
});
|
||||
|
||||
// 为了尽量保持代码好看,不直接注入另一个service层的实现类了
|
||||
Set<String> groupIds = applicationContext.getBean(ConsumerService.class).getSubscribedGroups(record.topic()).getData();
|
||||
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(groupIds);
|
||||
|
||||
List<MessageDetailVO.ConsumerVO> consumerVOS = new LinkedList<>();
|
||||
consumerDetail.forEach(consumerInfo -> {
|
||||
if (consumerInfo.topicPartition().equals(new TopicPartition(record.topic(), record.partition()))) {
|
||||
MessageDetailVO.ConsumerVO consumerVO = new MessageDetailVO.ConsumerVO();
|
||||
consumerVO.setGroupId(consumerInfo.getGroupId());
|
||||
consumerVO.setStatus(consumerInfo.getConsumerOffset() < record.offset() ? "unconsume" : "consumed");
|
||||
consumerVOS.add(consumerVO);
|
||||
}
|
||||
});
|
||||
|
||||
vo.setConsumers(consumerVOS);
|
||||
return ResponseData.create().data(vo).success();
|
||||
}
|
||||
return ResponseData.create().failed("Not found message detail.");
|
||||
@@ -147,4 +174,8 @@ public class MessageServiceImpl implements MessageService {
|
||||
}
|
||||
return partitions;
|
||||
}
|
||||
|
||||
@Override public void setApplicationContext(ApplicationContext context) throws BeansException {
|
||||
this.applicationContext = context;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user