消费详情
This commit is contained in:
@@ -0,0 +1,72 @@
|
||||
package com.xuxd.kafka.console.beans.vo;
|
||||
|
||||
import kafka.console.ConsumerConsole;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
/**
|
||||
* kafka-console-ui.
|
||||
*
|
||||
* @author xuxd
|
||||
* @date 2021-10-12 17:41:44
|
||||
**/
|
||||
@Data
|
||||
public class ConsumerDetailVO implements Comparable {
|
||||
|
||||
private String topic = "";
|
||||
|
||||
private int partition;
|
||||
|
||||
private String groupId = "";
|
||||
|
||||
private long consumerOffset = 0L;
|
||||
|
||||
private long logEndOffset = 0L;
|
||||
|
||||
private long lag = 0L;
|
||||
|
||||
private String consumerId = "";
|
||||
|
||||
private String clientId = "";
|
||||
|
||||
private String host = "";
|
||||
|
||||
public static ConsumerDetailVO from(ConsumerConsole.TopicPartitionConsumeInfo info) {
|
||||
ConsumerDetailVO vo = new ConsumerDetailVO();
|
||||
vo.topic = info.topicPartition().topic();
|
||||
vo.partition = info.topicPartition().partition();
|
||||
if (StringUtils.isNotEmpty(info.getGroupId())) {
|
||||
vo.groupId = info.getGroupId();
|
||||
}
|
||||
|
||||
if (StringUtils.isNotEmpty(info.consumerId())) {
|
||||
vo.consumerId = info.consumerId();
|
||||
}
|
||||
|
||||
if (StringUtils.isNotEmpty(info.clientId())) {
|
||||
vo.clientId = info.clientId();
|
||||
}
|
||||
|
||||
if (StringUtils.isNotEmpty(info.host())) {
|
||||
vo.host = info.host();
|
||||
}
|
||||
vo.consumerOffset = info.consumerOffset();
|
||||
vo.logEndOffset = info.logEndOffset();
|
||||
vo.lag = info.lag();
|
||||
return vo;
|
||||
}
|
||||
|
||||
@Override public int compareTo(Object o) {
|
||||
|
||||
ConsumerDetailVO that = (ConsumerDetailVO) o;
|
||||
|
||||
if (!this.groupId.equals(that.groupId)) {
|
||||
return this.groupId.compareTo(that.groupId);
|
||||
}
|
||||
if (!this.topic.equals(that.topic)) {
|
||||
return this.topic.compareTo(that.topic);
|
||||
}
|
||||
|
||||
return this.partition - that.partition;
|
||||
}
|
||||
}
|
||||
@@ -2,14 +2,17 @@ package com.xuxd.kafka.console.service.impl;
|
||||
|
||||
import com.xuxd.kafka.console.beans.CounterList;
|
||||
import com.xuxd.kafka.console.beans.ResponseData;
|
||||
import com.xuxd.kafka.console.beans.vo.ConsumerDetailVO;
|
||||
import com.xuxd.kafka.console.beans.vo.ConsumerGroupVO;
|
||||
import com.xuxd.kafka.console.beans.vo.ConsumerMemberVO;
|
||||
import com.xuxd.kafka.console.service.ConsumerService;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import kafka.console.ConsumerConsole;
|
||||
@@ -80,6 +83,19 @@ public class ConsumerServiceImpl implements ConsumerService {
|
||||
}
|
||||
|
||||
@Override public ResponseData getConsumerDetail(String groupId) {
|
||||
return ResponseData.create().data(consumerConsole.getConsumerDetail(Collections.singleton(groupId))).success();
|
||||
Collection<ConsumerConsole.TopicPartitionConsumeInfo> consumerDetail = consumerConsole.getConsumerDetail(Collections.singleton(groupId));
|
||||
|
||||
List<ConsumerDetailVO> collect = consumerDetail.stream().map(ConsumerDetailVO::from).collect(Collectors.toList());
|
||||
Map<String, List<ConsumerDetailVO>> map = collect.stream().collect(Collectors.groupingBy(ConsumerDetailVO::getTopic));
|
||||
|
||||
Map<String, Object> res = new HashMap<>();
|
||||
map.forEach((topic, list) -> {
|
||||
Map<String, Object> sorting = new HashMap<>();
|
||||
Collections.sort(list);
|
||||
sorting.put("data", list);
|
||||
sorting.put("lag", list.stream().map(ConsumerDetailVO::getLag).reduce(Long::sum));
|
||||
res.put(topic, sorting);
|
||||
});
|
||||
return ResponseData.create().data(res).success();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user